I came up with this filter attribute to disable concurrent execution per queue. Looking for feedback on the approach, I was trying to work within the current design without forking the source.
using System;
using System.Collections;
using Hangfire;
using Hangfire.Common;
using Hangfire.Server;
using Hangfire.States;
namespace Hangfire
{
public class DisableConcurrentExecutionPerQueueAttribute : JobFilterAttribute, IElectStateFilter, IServerFilter
{
private readonly int _timeoutInSeconds;
private Hashtable Queues { get; }
public DisableConcurrentExecutionPerQueueAttribute(int timeoutInSeconds)
{
if (timeoutInSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");
_timeoutInSeconds = timeoutInSeconds;
Queues = new Hashtable();
}
public void OnStateElection(ElectStateContext context)
{
var enqueuedState = context.CandidateState as EnqueuedState;
if (enqueuedState != null)
{
if (Queues.ContainsKey(context.BackgroundJob.Id))
{
Queues[context.BackgroundJob.Id] = enqueuedState.Queue;
}
else
{
Queues.Add(context.BackgroundJob.Id, enqueuedState.Queue);
}
}
}
public void OnPerforming(PerformingContext filterContext)
{
var resource = GetResource(filterContext.BackgroundJob);
var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);
var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeout);
filterContext.Items["DistributedLock"] = distributedLock;
}
public void OnPerformed(PerformedContext filterContext)
{
if (!filterContext.Items.ContainsKey("DistributedLock"))
{
throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
}
var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
distributedLock.Dispose();
Queues.Remove(filterContext.BackgroundJob.Id);
}
private string GetResource(BackgroundJob backgroundJob)
{
var resource = $"{backgroundJob.Job.Type.ToGenericTypeString()}.{backgroundJob.Job.Method.Name}";
var queue = Queues[backgroundJob.Id];
if (!string.IsNullOrWhiteSpace(queue?.ToString()))
{
resource += $".{queue}";
}
return resource;
}
}
}