Queue With 1 Worker Processes More than 1 job concurrently

Requirement:
I am trying to limit the number of jobs that can be processed in a queue, to one job concurrently. To do this, I created a queue with 1 worker. For the first few submissions, it maintains 1 job per queue (monitoring with Dashboard). But if I keep adding jobs, it begins processing 2, 3, 4, and so forth jobs per queue. This causes a problem, since the jobs are compute intensive parallel processing bundles with the front end configurations that are single threaded. Thus multiple jobs on a single worker can cause config file clashes. Below is a code snippet showing how I configure the queue with 1 worker.

Does 1 worker mean only 1 job can be processed at time? If so, is my code wrong below or is there an issue with Workers managing job limits per queue? If this is not the role of a worker, then how do I achieve what I want to achieve?

var freeBackgroundServer = new BackgroundJobServerOptions
{
    ServerName = System.Environment.MachineName + "." + _jobProcessor,
    WorkerCount = 1,
    Queues = new[] { MACAddress.Replace(":","_") }
};

Startup.app.UseHangfire(config =>
{
    config.UseAuthorizationFilters();
    config.UseSqlServerStorage("Hangfire"); // + i);
    config.UseServer(1, freeBackgroundServer.Queues);
    //  config.UseServer(freeBackgroundServer);  //This doesn't work, so split out (above)
});

IBackgroundJobClient client = new BackgroundJobClient();

string JobQueueName = freeBackgroundServer.Queues.FirstOrDefault().Replace("-", "_");
IState state = new EnqueuedState
{
    Queue = JobQueueName
};

client.Create(() => KESConsoleRunner.Program.ExecuteBackgroundJob(RunString), state);

I tried the distributed lock attribute [DisableConcurrentExecution(timeoutInSeconds: 10 * 600)] on the public class that calls the static console app which runs the transaction. This did not help. I was still getting config file clashes, and the server is running in the same VM that the client is running on - it’s not even a remote machine. So I must be using the attribute incorrectly (I placed the attribute on a public static class in the DLL that calls the main() class as shown in the code snippet below).

[DisableConcurrentExecution(timeoutInSeconds: 10 * 600)]
public static void ExecuteBackgroundJob(string runstring)
{
    // ...
    Main(args);
}

However, I created a hack that takes care of my need for now, for sequential queue background processing for this transaction using the queue monitoring api (see code snippet below). I need to figure this out because I need to harness distributed machines, and I want to be able to stack queues, so distributed lock is important. The job that is run from main() contains deep nested parallel processing, maxing out as many processors as you configure into the VM, so I am OK with sequential job processing. But I would like to have Hangfire manage concurrent threads on the same static process on the same machine, so that the intent of the static lock in the main() class can be preserved on the same VM.

Any help would be appreciated. In the meantime, I’ll keep trying to figure this out, so I can get a production-level solution.

if (JobStorage.Current.GetMonitoringApi().ProcessingCount() > 0 ||
    JobStorage.Current.GetMonitoringApi().EnqueuedCount(freeBackgroundServer.Queues.First()) > 0 ||
    JobStorage.Current.GetMonitoringApi().ScheduledCount() > 0)
    {
        return false;
    }
    
client.Create(() => KESConsoleRunner.Program.ExecuteBackgroundJob(RunString), state);

It would appear that you and I have the same issue.

The monitoring API hack is actually a pretty damn good one, I think I’ll implement that for now as I need finer grained control of disabling concurrent execution.

Well, the problem with the hack is that it is too limiting: you can only Fire, but you can Forget because you can’t stack jobs in a queue. But good to know I’m not the only one struggling with this feature. For me this feature is critical. My plan is to have distributed Hangfire servers each with a WebFarm and jobs distributed to best-fit Hangfire server. Getting Sequential Queue processing to work is a key also to Job Workflow Dependency processing, which I will also need later on.

I see you have tried debugging the DistributedLock. I thought I’d take a run at that today, so see if I can determine what’s going on. Will report back to the post on any findings.

I just tried the worker count with the latest version of HangFire (v1.1.1) and found that it seems to work.

DisableConcurrentExecution (Used with SQL Server) does seem to work in an unusual way when used in conjunction with the worker count. Please consider the following with the worker count set to 2 on a single server, where Test() is called once to enqueue the jobs:

private void Test()
{
	_log.WarnFormat("[{0}] Queuing tasks", DateTime.Now);

	BackgroundJob.Enqueue(() => TestTaskConcurrent(1));
	BackgroundJob.Enqueue(() => TestTaskConcurrent(2));
	BackgroundJob.Enqueue(() => TestTask(3));
	BackgroundJob.Enqueue(() => TestTask(4));
}

[DisableConcurrentExecution(600)]
public static void TestTaskConcurrent(int index)
{
	ILog log = LogManager.GetLogger("Hangfire.Job");

	log.WarnFormat("[{0}] {1} TestTaskConcurrent started", DateTime.Now, index);
	System.Threading.Thread.Sleep(10000);
	log.WarnFormat("[{0}] {1} TestTaskConcurrent completed", DateTime.Now, index);
}

public static void TestTask(int index)
{
	ILog log = LogManager.GetLogger("Hangfire.Job");

	log.WarnFormat("[{0}] {1} TestTask started", DateTime.Now, index);
	System.Threading.Thread.Sleep(10000);
	log.WarnFormat("[{0}] {1} TestTask completed", DateTime.Now, index);
}

Both sets of jobs are set to take 10 seconds.

This is the output you get:

[22/10/2014 00:22:32] 1 TestTaskConcurrent started
[22/10/2014 00:22:42] 1 TestTaskConcurrent completed
[22/10/2014 00:22:42] 2 TestTaskConcurrent started
[22/10/2014 00:22:42] 3 TestTask started
[22/10/2014 00:22:52] 2 TestTaskConcurrent completed
[22/10/2014 00:22:52] 4 TestTask started
[22/10/2014 00:22:52] 3 TestTask completed
[22/10/2014 00:23:02] 4 TestTask completed

You would expect task #3 to start earlier at 22/10/2014 00:22:32 but instead it is blocked by task #2 which is waiting for task #1. This means that not all of the workers are able to process jobs as you might expect. The above set of tasks could have been completed in a total of 20 seconds, but instead they took 30 seconds.

Additionally in the web UI, task #2 is shown as having a duration of 20 seconds rather than 10 seconds - I’m not sure that the duration time should include blocking caused by the DisableConcurrentExecution attribute.

Tx yngndrw. I’ll build a test to see if i can get it to work the way I need, and will report back in my findings.


Later: I got it working for SQL Server. I don’t use DistributedLock - it’s not needed or useful, which is good because I didn’t see a distributed locking mechanism in Hangfire built in for REDIS, which I’ll get working next. I believe I’d have to build one (i could be wrong, but won’t worry about that for the time being)

  • I set the number of WorkerProcesses for the Queue to 1, along with other queue attributes using the BackgroundServerOptions object (I found this to be very fragile in getting it to set queue properties), and assign the queue to a server which has a suffix of the same name as the queue (I use the Server MAC address as the queue name and the Server Suffix), so that I can match queues to servers. This makes it easier to see which queue is assigned to which server.
    – Note: I could not force the Enqueued background job to take on the queue name I provided. It kept using DEFAULT (this is my issue - I had it working at one time, so just need to figure that out). So I added two queues to the Server (the intended queue name, and DEFAULT.) I’ll need to resolve this later, since DEFAULT will not work with multiple servers where I want to map given queues to target servers.
  • Currently I am using only one server. I have a some cleanup to do with backgroundjob sequential job processing on successive job setup, but that should be no more than a day’s effort.
    – Before that, I’ll get REDIS working, to make sure there are no issues with it.
    – Once REDIS Sequential queues are working, and background job sequential processing startup is cleaned up, I will test multiple servers. For multiple servers, I’ll need to resolve named queue mapping to named servers (and remove DEFAULT)
    – Once I get multiple servers working, I’ll build support for distributed Hangfire Servers, each Hangfire Server having multiple BackgroundJob Processing Servers.

Code for BackgroundServerOptions property assignment

                 macAddress = macAddress.Replace(":", "_"); //cannot have ":" so use "_" on queue name

                //Max 20 char for QueueName so max 20 char for matchine machine name
                string _jobProcessor = macAddress;
                Startup.BackgroundServerPool = Startup.BackgroundServerPool ??
                                               new Dictionary<int, Dictionary<BackgroundJobServerOptions, bool>>();
                Startup.ServerNumber++;
                Startup.BackgroundServerPool.Add(Startup.ServerNumber,
                    new Dictionary<BackgroundJobServerOptions, bool>());
                Startup.BackgroundServerPool[Startup.ServerNumber].Add(new BackgroundJobServerOptions
                {
                    ServerName = System.Environment.MachineName + "." + _jobProcessor,
                    WorkerCount = 1,
                 //oct 22 2014 dbm. Add DEFAULT queue as well as workaround for bug on Enqueued name assignment:   Queues = new[] { _jobProcessor }
                    Queues = new[] { _jobProcessor, "DEFAULT" }
                }, false);

code for Hangfire setup
Startup.app.UseHangfire(config =>
{
config.UseAuthorizationFilters();
config.UseSqlServerStorage(Startup.SQLServerConnectionstring);
if (!Startup.UseWebFarmForDocumentIndexing) config.UseServer(Startup.BackgroundServerPool[Startup.ServerNumber].LastOrDefault().Key);
});

code for backgroundJob Enqueuing
BackgroundJob.Enqueue<KESConsoleRunner.KvmBackgroundJob>(m => m.SubmitJob(
RunString,
Startup.server.Instance.Address.ToString(), //mongodb
Startup.useSQLServer
? Startup.SQLServerConnectionstring
: Startup.RedisConnectionString
));

@bain: Please see the below links regarding the distributed lock when using Redis:

Update:
Regarding the issue with applying the queue name to background jobs, have you tried the following method:

Tx for this. Saves me a lot of time. Really appreciate the support. Your links will help me remove the DEFAULT queue and implement distributed locks when I have distributed HangFire Servers. I see Redis MQ. I hope as well to find some job dependency / workflow code for Redis MQ in your links. Looking forward to learn more about developing with Redis…


Later: I now have REDIS working as a sequential queue the way I want it - with queue stacking. I have yet to resolve the DEFAULT queue requirement. I’ll focus on that tomorrow. One item, when dealing with queue names, is that when using SQLServer as the JobStore, queue names are case insensitive. When using REDIS, queue names are case sensitive. This matters when the background app is on a different VM/machine.

  • In my design, the backend server (VMWare Fusion Windows VM), which can be on a separate machine from the Client Application (VMWare Fusion Windows VM), automatically registers its server name in the Host REDIS job store which runs on OS X (Yosemite) as the Hangfire Backgroundjob server. Thus, when you spool up a new Windows Server VM under Yosemite, it automatically shows up (registers itself) in the REDIS job store, which the client application points to. The client application does not create the JobStore entry in REDIS as in conf.userHangfire(…useServer(…)…). This is done on the Server App according to its config file. In fact, the client application know only about the Hangfire Server JobStore / address. Here’s how this works:

ON SERVER
public Service()
{
JobStorage.Current = new RedisStorage(RedisConnectionString, RedisDB);
var options = new BackgroundJobServerOptions
{
ServerName = System.Environment.MachineName + “.” + macAddress,
WorkerCount = 1,
Queues = new[] { macAddress, “default” }
}
var _server = new BackgroundJobServer(options, JobStorage.Current);

}

protected override void OnStart(string[] args)
{
_server.Start();
}

ON CLIENT APP

             //"default" is case sensitive in REDIS, but not in SQLServer. "default" will be removed
               var options = new BackgroundJobServerOptions
                {
                    ServerName = System.Environment.MachineName + "." + macAddress
                    WorkerCount = 1,
                    Queues = new[] { macAddress, "default" } 
                };

            app.UseHangfire(config =>
            {
                config.UseAuthorizationFilters();
                config.UseStorage(JobStorage.Current);
            });

                JobStorage.Current = new RedisStorage(RedisConnectionString,RedisDB);
                app.MapHangfireDashboard(
                    "/hangfire-redis",  
                    new Hangfire.Dashboard.IAuthorizationFilter[0],
                   new RedisStorage(DashboardConnectionString,RedisDB)
                   );


             //ASYNCH CALL
               
	   Startup.app.UseHangfire(config =>
                        {
                            config.UseAuthorizationFilters();
                            config.UseRedisStorage(Startup.RedisConnectionString, Startup.RedisDB);
                        });

                   BackgroundJob.Enqueue<KESConsoleRunner.KvmBackgroundJob>(m => m.SubmitJob(
                    RunString,
                    Startup.server.Instance.Address.ToString(), //mongodb
                    Startup.RedisConnectionString
                    ));

To run the app:
localHost:4142

To run the Dashboard:
localHost:4142/hangfire

Left to do is create/config the WebFarm layer for parallel job distribution. Then interconnect a network of Hangfire servers. Of course, I’ll need to clean up security. Later on, Job dependencies and Job Workflow are planned. Hopefully there is REDIS code to facilitate that. Can anyone point me to examples?

Hopefully my ramblings offer others some guidance as I muddled through this.

As always, any glaring error identification is well received. So far it’s working as planned. I’ll add more insights wet Hangfire if and when they surface.

Tx to yngndrw for the links. And of course, big thanks to odinserj for authoring Hangfire.

Bain

=====

Later: Here’s the code for submitting the job, assigning a specific work queue (replaces: BackgroundJob.Enqueue<KESConsoleRunner.KvmBackgroundJob>(m => …)

           IBackgroundJobClient client = new BackgroundJobClient();
           client.Create<KESConsoleRunner.KvmBackgroundJob>(m => m.SubmitJob(
                   RunString,
                   Startup.server.Instance.Address.ToString(), //mongodb
                   Startup.useSQLServer
                       ? Startup.SQLServerConnectionstring
                       : Startup.RedisConnectionString
                   ), new EnqueuedState { Queue = JobQueueName });

Note:

  1. Though a Web.config setting, the code allows you to set SQLServer or REDIS as your JobStore.
  2. The solution does not (yet) use a distributed lock. It will be implemented only when and only if needed.

Later
Adding to the evolution of the Hanfire solution, I now have VMs lighting up around the network, processing Transactions submitted from the Client MVC5 Signal R Application to Hangfire. This is very cool!!! SignalR is very fast, efficient, flexible and powerful, facilitating an event-driven integration to arbitrary network applications. And Hangfire “fire and forget” design facilitates this process, so that client applications can focus on what they need to do, while consuming network services to play as a respected network application citizen within the overall network application ecosystem framework. All this is great, but I was not able to use Hangfire statistics to create an optimal cost-base constraint Transaction Distribution algorithm, since I needed more granularity to calculate next-best server to assign a Background Transaction Job.

Reading REDIS: To achieve this, I needed to read and parse REDIS OS X keys and values from the Windows VM Application Client at transaction submission time. As more servers lit up around the network, new key combination behaviours surfaced, and with that code changes required to consume a better understanding of how keys are used to represent job processing state transitions for guaranteed transaction completion. I created a JSon model as the schema of the behavioural processing, which I will provide once I am sure I caught all the combinations that derive the granular statistics I require for a Cost-base Network Server Computability Algorithm.

Challenges Refining the Network Ecosystem Optimal Throughput Computability Algorithm: The algorithm, specific to the network application ecosystem, considers what’s being processed at Transaction submission time on each of the network VM servers, and what is committed (queued) to that server. Note that Enqueued state has not yet selected a transaction processing server on the Hangfire server, where the client application submitted the transaction to an assigned server. It’s not until the enqueued transaction had been queued, that Hangfire provides, through REDIS keys, which server the transaction is mapped to. I need this earlier, since the algorithm is predicting the transaction completion time, given all the dynamics happening on the network eccosystem. I want to avoid making any updates to Hangfire statistis core code, so that I can refresh NUGET on builds to pickup latest Hangfire updates and not break the Transaction Processing Network (TPN) service.

Preserving ‘Fire And Forget’: Recall that the client application submits the transaction to a named job queue which is assigned to a server that has appended the queue name to its server name, ensuring the job is processed by the intended server. Once assigned, then Hangfire will requeue the job if the server fails before the job completes. Since the backgound application uses incremental indexing with checkpoint-restart recovery, the application simply continues where it left off if a server fails. And since job results are stored in Mongoldb in OS X, it’s simply a matter of the background application jumping to the restart-checkpoint to continue its incremental index processing on transaction documents. Thus, Fire and Forget is preserved.

The Devil is In the Details: However, there is quite a bit that needs to be considered for the application to create an optimal cost-based Transaction Network Distribution algorithm, including where the source documents are located (network saturating minimization), VM performance (number of processes, processor type, memory amount and speed, VM host disk performance, and so forth). There is also, due to incremental indexing, the opportunity to dynamically split large jobs into self-partitioned parallel jobs that take advantage of network VMs available, to service transaction partitions and recombine partitions when completed (it will be good when job-depency is added to Hangfire). Furthermore, since VMs auto-register in Hangfire when turned on, and disappear when turned off, the network can self-expand dynamically, adapting to meet service demand on a temporal basis according to cost constraints (also, I’ll need to add the ability to optionally intercept Hangfire jobs if the VM is dead, to move them to the next-best server to respect global and local optima constraints within the (parallel distributed) Transaction Processing Network (TPN) ecosystem. Thus, an application service layer intercepts and intermediates application transaction submissions, on behalf of client applications, so that optimal Transaction partitioning and distribution across the TPN is determined based on current and committee compute resource consumption.

MRP Global and Local Optima: Eventually, the TPN optimal throughput distribution algorithm will also consider promises such as scheduled calendar repeatable jobs, and corporate priorities according to corporate business plans, when slotting transaction partitions across the TPN for optimal cost-sensitive throughput. In effect, the TPN Optimization Service takes on the posture of a MRP and Operational Planning service for client applications, facilitating both Global Optima (that TPN Corporate objectives are met and overall proprieties respected) and Local Optima (the transaction throughput needs are respected for best turnaround within global optima constraints).

Customized Ecosystem Throughput Optimization: Most of this is customized to the application mix, and most probably is noise to those who do not need to consider such a constraint complex, so I’ll not bore readers with unnecessary detail. I will provide the Json behavioural schema, since I think that probably provides value to others. If one already exists, please feel free to provide to I don’t ramble on with unnecessary minutia.