Disable Concurrent Execution Per Queue

I came up with this filter attribute to disable concurrent execution per queue. Looking for feedback on the approach, I was trying to work within the current design without forking the source.

using System;
using System.Collections;
using Hangfire;
using Hangfire.Common;
using Hangfire.Server;
using Hangfire.States;

namespace Hangfire
{
    public class DisableConcurrentExecutionPerQueueAttribute : JobFilterAttribute, IElectStateFilter, IServerFilter
    {
        private readonly int _timeoutInSeconds;
        private Hashtable Queues { get; }

        public DisableConcurrentExecutionPerQueueAttribute(int timeoutInSeconds)
        {
            if (timeoutInSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");

            _timeoutInSeconds = timeoutInSeconds;

            Queues = new Hashtable();
        }

        public void OnStateElection(ElectStateContext context)
        {
            var enqueuedState = context.CandidateState as EnqueuedState;
            if (enqueuedState != null)
            {
                if (Queues.ContainsKey(context.BackgroundJob.Id))
                {
                    Queues[context.BackgroundJob.Id] = enqueuedState.Queue;
                }
                else
                {
                    Queues.Add(context.BackgroundJob.Id, enqueuedState.Queue);
                }
            }
        }

        public void OnPerforming(PerformingContext filterContext)
        {
            var resource = GetResource(filterContext.BackgroundJob);

            var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);

            var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeout);
            filterContext.Items["DistributedLock"] = distributedLock;
        }

        public void OnPerformed(PerformedContext filterContext)
        {
            if (!filterContext.Items.ContainsKey("DistributedLock"))
            {
                throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
            }

            var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
            distributedLock.Dispose();

            Queues.Remove(filterContext.BackgroundJob.Id);
        }

        private string GetResource(BackgroundJob backgroundJob)
        {
            var resource = $"{backgroundJob.Job.Type.ToGenericTypeString()}.{backgroundJob.Job.Method.Name}";
            var queue = Queues[backgroundJob.Id];
            if (!string.IsNullOrWhiteSpace(queue?.ToString()))
            {
                resource += $".{queue}";
            }
            return resource;
        }
    }
}
2 Likes

I like this idea and I currently use it in the same way. However I’d really appreciate a solution where “queues” would really work like regular queues, not priority queues. I mean queues where second job in the queue would wait in scheduled section until first one is finished. In this implementation both jobs in the queue are in processing section, occupy two worker threads and race for the same distribution lock. When enqueing a job you could specify a queue in which the job waits for execution. I would like to have a separate queue for each customer where different jobs in the same queue would be processes one-by-one but concurrently with other queues.

Now I can create a priority queue for each customer and set-up a separate server with WorkerCount = 1 for each of this queue but this is not a good solution. Second option would be to use ContinueJobWith and enqueue the second job after first one. However I would like to execute second one even when first one fails and this doesn’t happen at the moment.

Is there a way how to achieve this behavior using Hangfire and don’t rely on race for acquiring distributed lock and occupying worker threads when waiting for execution?

1 Like