Hi
Consider that I want to set a specific queue for a specific method that I decide at run-time what queue should be set.
In order to achieve the above goal I made below changes :
1 - in the below file :
\Hangfire-master\src\Hangfire.SqlServer\SqlServerJobQueue.cs
instead of this part of code:
public void Enqueue(string queue, string jobId)
{
const string enqueueJobSql = @"insert into HangFire.JobQueue (JobId, Queue) values (@jobId, @queue)";
_connection.Execute(enqueueJobSql, new { jobId = jobId, queue = queue });
}
I used the below code :
public void Enqueue(string queue, string jobId)
{
const string enqueueJobSql = @"insert into HangFire.JobQueue (JobId, Queue) values (@jobId, @queue)";
// these 2 lines are new
var d = ERP.Common.HangFireHelper.dict.FirstOrDefault(k => k.Key == jobId);
queue = d.Value ?? "default";
_connection.Execute(enqueueJobSql, new { jobId = jobId, queue = queue });
}
2 - in Hangfire-master\src\Hangfire.Core\States\EnqueuedState.cs file
instead of this :
public Dictionary<string, string> SerializeData()
{
return new Dictionary<string, string>
{
{ "EnqueuedAt", JobHelper.SerializeDateTime(EnqueuedAt) },
{ "Queue", queue }
};
}
I used this :
public Dictionary<string, string> SerializeData(string jobid=null)
{
// these 2 lines are new
var d = ERP.Common.HangFireHelper.dict.FirstOrDefault(k => k.Key == jobid);
var queue = d.Value ?? "default";
return new Dictionary<string, string>
{
{ "EnqueuedAt", JobHelper.SerializeDateTime(EnqueuedAt) },
{ "Queue", queue } // and here
};
}
3 - in Hangfire-master\src\Hangfire.Core\States\IState.cs file
I changed
Dictionary<string, string> SerializeData();
to
Dictionary<string, string> SerializeData(string jobid=null);
4 - Then change any
public Dictionary<string, string> SerializeData()
to
public Dictionary<string, string> SerializeData(string jobid=null)
5 - in Hangfire-master\src\Hangfire.SqlServer\SqlServerWriteOnlyTransaction.cs
instead of these parts :
public override void SetJobState(string jobId, IState state)
{
const string addAndSetStateSql = @"
insert into HangFire.State (JobId, Name, Reason, CreatedAt, Data)
values (@jobId, @name, @reason, @createdAt, @data);
update HangFire.Job set StateId = SCOPE_IDENTITY(), StateName = @name where Id = @id;";
QueueCommand(x => x.Execute(
addAndSetStateSql,
new
{
jobId = jobId,
name = state.Name,
reason = state.Reason,
createdAt = DateTime.UtcNow,
data = JobHelper.ToJson(state.SerializeData()),
id = jobId
}));
}
public override void AddJobState(string jobId, IState state)
{
const string addStateSql = @"insert into HangFire.State (JobId, Name, Reason, CreatedAt, Data)
values (@jobId, @name, @reason, @createdAt, @data)";
QueueCommand(x => x.Execute(
addStateSql,
new
{
jobId = jobId,
name = state.Name,
reason = state.Reason,
createdAt = DateTime.UtcNow,
data = JobHelper.ToJson(state.SerializeData())
}));
}
I used these:
public override void SetJobState(string jobId, IState state)
{
const string addAndSetStateSql = @"
insert into HangFire.State (JobId, Name, Reason, CreatedAt, Data)
values (@jobId, @name, @reason, @createdAt, @data);
update HangFire.Job set StateId = SCOPE_IDENTITY(), StateName = @name where Id = @id;";
QueueCommand(x => x.Execute(
addAndSetStateSql,
new
{
jobId = jobId,
name = state.Name,
reason = state.Reason,
createdAt = DateTime.UtcNow,
data = JobHelper.ToJson(state.SerializeData(jobId)),
id = jobId
}));
}
public override void AddJobState(string jobId, IState state)
{
const string addStateSql = @"insert into HangFire.State (JobId, Name, Reason, CreatedAt, Data)
values (@jobId, @name, @reason, @createdAt, @data)";
QueueCommand(x => x.Execute(
addStateSql,
new
{
jobId = jobId,
name = state.Name,
reason = state.Reason,
createdAt = DateTime.UtcNow,
data = JobHelper.ToJson(state.SerializeData(jobId))
}));
}
note :just jobid is added to state.SerializeData() as a parameter
6 - And this is my cofiguration :
[assembly: OwinStartup(typeof(Startup))]
public class Startup
{
public static string conn = "Data Source=.;Initial Catalog=HangFire;Integrated Security=True";
public static IAppBuilder app2;
public void Configuration(IAppBuilder app)
{
var options2 = new BackgroundJobServerOptions
{
SchedulePollingInterval = TimeSpan.FromSeconds(20),
ServerName = "Server1",
WorkerCount = 1,
Queues = new[] { "documents" } //Queues = new[] { "priority-1", "priority-1", "priority-1" }
};
var options3 = new BackgroundJobServerOptions
{
ServerName = "Server2",
WorkerCount = 2,
Queues = new[] { "sales" } //Queues = new[] { "priority-1", "priority-1", "priority-1" }
};
var options4 = new BackgroundJobServerOptions
{
ServerName = "Server_Test",
WorkerCount = 2,
Queues = new[] { "Test" } //Queues = new[] { "priority-1", "priority-1", "priority-1" }
};
app.UseHangfire(config =>
{
config.UseSqlServerStorage(conn, options);
//config.UseServer();
config.UseServer(options2);
config.UseServer(options3);
config.UseServer(options4);
});
app2=app;
}
}
7 - and then to be able to make queue dynamically i used this :
public void CreateQueue(string serverName,string queue, int workerCount)
{
var options = new SqlServerStorageOptions
{
PrepareSchemaIfNecessary = false,
QueuePollInterval = TimeSpan.FromSeconds(15)
};
var options = new BackgroundJobServerOptions
{
SchedulePollingInterval = TimeSpan.FromSeconds(20),
ServerName = serverName,
WorkerCount = workerCount ,
Queues = new[] {queue }
};
Startup.app2.UseHangfire(config =>
{
config.UseSqlServerStorage(conn, options);
config.UseServer(options);
});
}
and I defined a Dictionary in ERP.Common project in HangFireHelper class which called dict
and then to use it :
var jobid = BackgroundJob.Schedule<Caller>(c => c.Method_documents(), TimeSpan.FromMinutes(1));
// set "documents" as job's queue
ERP.Common.HangFireHelper.dict.Add(jobid, "documents");
"document"s is my queue
note : because I didn’t specify any queue as attribute for my method, it would be default . but with this line of code
ERP.Common.HangFireHelper.dict.Add(jobid, "documents");
I set the “documents” queue for the job. And with the above changes, when it wants to be inserted, the queue will be fetched from dictionary
Done .It works but,
What do u think about these changes ?
Is there any problem ?