RecurringJob Stuck Enqueued using MSMQ but not in SQL Server

Tags: #<Tag:0x00007f06690edde8> #<Tag:0x00007f06690edd20>

I created a Hangfire (v 1.3.4) Windows Service that used MSMQ. Sometimes, a recurring job gets enqueued and they do not get processed. As I want to have each job have a single running instance, each subsequent job gets enqueued and not processed until this job is completed.

Once I took out the MSMQ, the jobs were working fine and processed correctly. I am wondering if this is a race condition between the hangfire service enqueuing the job and not updating the Job in SQL in time for the worker to run.

2 Questions, what are the pros/cons between using MSMQ/Redis vs SQL Server and is this a known bug that others have experienced?

Related issue?:

Do you have any extension filters? What you are doing to have only one running instance?

Job storage based on Redis is for systems with heavy background job processing, simple benchmarks show over 4x improvement. SQL Server queues vs MSMQ queues are described here.

I have a single recurring task that I only want to have one instance of this job running at at time. So for example, if I run this job manually, but then the recurring job starts, I want it to be skipped.

I did implement IElectStateFilter on any state changes so I can keep track of jobs when they move from enqueued to processing to a finished state. I have a master table of ScheduledTasks that needs to be updated with the state. My code looks like this:

public void OnStateElection(ElectStateContext context)
    {
        ITransaction tran = null;
        try
        {
            var task = GetTask(context.JobId, ref tran);
            SetLogStatus(task, context, ref tran);
            if (task != null && (context.CandidateState is SucceededState || context.CandidateState is FailedState))
            {
                tran = _tranFactory.BuildITransaction("GetTask", IsolationLevel.ReadCommitted);
                task = GetTask(context.JobId, ref tran);
                if (context.CandidateState is EnqueuedState)
                {
                    var e = (EnqueuedState)context.CandidateState;
                    if (e.Reason != null && e.Reason.Contains("scheduler") && !(task.RunningJobId == null))
                    {
                        task.RunningJobId = context.JobId;
                        task.CurrentStatus = (byte)ScheduledTaskStatus.Queued;
                        _repo.Save(task, ref tran);                            
                    }
                }
                else if (context.CandidateState is ProcessingState)
                {
                    if (task.CurrentStatus == (byte)ScheduledTaskStatus.Running)
                    {
                        tran.Rollback();
                        BackgroundJob.Delete(context.JobId);
                    }
                    else
                    {
                        task.CurrentStatus = (byte)ScheduledTaskStatus.Running;
                        _repo.Save(task, ref tran);                            
                    }
                }
                else if (context.CandidateState is SucceededState)
                {
                    task.CurrentStatus = (byte)ScheduledTaskStatus.Ready;
                    task.LastRunDateTime = SystemTime.UtcNow();
                    task.RunningJobId = null;
                    task.NextRunDateTime = task.IsEnabled ? _cronSvc.GetNextScheduledOccurance(task.CronExpression, SystemTime.UtcNow()) : null;
                    task.LastStatus = "Successfully Executed";
                    _repo.Save(task, ref tran);                        
                }
                else if (context.CandidateState is FailedState)
                {
                    var failedState = (FailedState)context.CandidateState;
                    task.CurrentStatus = (byte)ScheduledTaskStatus.Ready;
                    task.LastRunDateTime = SystemTime.UtcNow();
                    task.RunningJobId = null;
                    task.LastStatus = string.Format("Job failed due to exception '{0}'", failedState.Exception);
                    task.NextRunDateTime = task.IsEnabled ? _cronSvc.GetNextScheduledOccurance(task.CronExpression, SystemTime.UtcNow()) : null;
                    _repo.Save(task, ref tran);                        
                }
                tran.Commit();
            }
        }
        finally
        {
            if (tran != null) tran.Dispose();
        }
    }

However, the job never gets to a state where it gets deleted in my code because this state election never gets triggered. The job is initialized and nothing happens to it and they sit there until old jobs get purged. I let it sit overnight and the queue was full of jobs from the recurring task (every 15 minutes).

There are some unreachable branches in your code:

if (task != null && (context.CandidateState is SucceededState || context.CandidateState is FailedState))
{
    // ...
    if (context.CandidateState is EnqueuedState)
    {
        // Unreachable branch: context.CandidateState is SucceededState || context.CandidateState is FailedState
    }
    else if (context.CandidateState is ProcessingState)
    {
        // Unreachable too, BackgroundJob.Delete(context.JobId) will never be called
    }
}

thank you, i will remove the check for CandidateState in the if statement and see if that fixes the issue

I changed the code to account for this, and now when i change the configuration to storage.UseMsmqQueues the Scheduled Jobs get enqueued, but do not run. Subsequent jobs are then enqueued and never executed until I manually requeue the job. It then proceeds with processing:

Please post here your configuration logic, seems no server is listening the multipleworker queue.

var storage = new SqlServerStorage(ConfigurationSettings.AppSettings["strConn"]);
        storage.UseMsmqQueues(ConfigurationSettings.AppSettings["QueueBasePath"], "multipleworker");
        var options = new BackgroundJobServerOptions()
        {
            Queues = new[] { "multipleworker" },
            WorkerCount = int.Parse(ConfigurationManager.AppSettings["WorkersToEnable"]),
            ServerName = String.Format("{0}-{1}", Environment.MachineName, "multipleworker")
        };
        _server = new BackgroundJobServer(options, storage);
        _server.Start();

Letting the task scheduler run overnight, a few more were queued and not executed:

And the server is set to respond to process new jobs.

If I Enqueue a task manually, it will run.

Also, there are missing enqueued jobs not listed on this page. Note that the jobId ends at 112 however, if i click on the detail of the job and change the url to jobId 113, it displays as Enqueued. There are over 50 of them that are stuck in an enqueued state

Additionally, there are 2 “multipleworker” queues displaying on the dashboard so I’m not sure what is going on here either.

I’ve created a gist to see if there’s something i’m missing in the configuration of the windows service/website

Wow, impressive example, it will take a lot of time to parse it. Before I start to do this, can you try to update Hangfire to 1.4.x – I’ve fixed an error there, when a workers couldn’t process enqueued jobs due to orphaned (created partly due to abnormal process termination, such as debug session termination) background job?

These are next 5 background jobs only, we should add something to indicate it on this dashboard page.

I’ve upgraded to 1.4.1 and still am not able to get the service to process in my multipleworker queue, but notice that there are two MultipleWorker queues listed and when clicking through to the detail of each of them, no jobs are listed.

This makes sence, thanks. StartProcessing method does not know anything about MSMQ and thus uses JobQueue SQL Server table instead of using MSMQ. So your servers listen MSMQ and don’t know anything about enqueued methods.

JobStorage.Current = new SqlServerStorage(ConfigurationSettings.AppSettings["strConn"]);

var storage = new SqlServerStorage(ConfigurationSettings.AppSettings["strConn"]);
storage.UseMsmqQueues(ConfigurationSettings.AppSettings["QueueBasePath"], "assignment");

Please specify all the queues by calling UseMsmqQueues method:

JobStorage.Current = new SqlServerStorage(ConfigurationSettings.AppSettings["strConn"])
    .UseMsmqQueues(ConfigurationSettings.AppSettings["QueueBasePath"], "assignment", "groupdpa", "multipleworker");

And you don’t have to specify separate storage instance to your background job servers explicitly, use JobStorage.Current instance in all of them – BackgroundJobServers listens only queues specified in BackgroundJobServerOptions.Queues property.

I removed the definition of the storage when creating each server and used only the options parameter and the jobs are now processing correctly.

Thank you so much for your help