DisableConcurentExecution for Job Groups

/// <summary>
/// This attribute is a copy from origianl Hangfire DisableConcurrentExecutionAttribute but with some adaptions.
/// 
/// It allows us to define a scope, so attribute can be used for different jobs in one chain
/// Instead of throwing exception, it skips the execution if job is already running http://hangfire.discourse.group/t/job-reentrancy-avoidance-proposal/607/6
/// </summary>
public class SkipConcurrentExecutionInScopeAttribute : JobFilterAttribute, IServerFilter
{
    private static readonly ILog Logger = LogProvider.GetCurrentClassLogger();

    private readonly int timeoutInMilliseconds;

    private readonly string scope;

    public SkipConcurrentExecutionInScopeAttribute(int timeoutInMilliseconds, object scope)
    {
        this.timeoutInMilliseconds = timeoutInMilliseconds;
        this.AllowRetry = false;

        this.scope = "SkipConcurrentExecutionInScopeAttribute:" + scope;

        if (timeoutInMilliseconds < 0)
        {
            throw new ArgumentException("Timeout argument value should be greater that zero.");
        }

        if (timeoutInMilliseconds > 3000)
        {
            throw new ArgumentException("Max timeout of 3 second is allowed to avoid Database Locking");
        }

        if (string.IsNullOrWhiteSpace(scope.ToString()))
        {
            throw new ArgumentException("Scope argument value has to have some reasonable value.");
        }
    }

    /// <summary>
    /// Defines if timeout by acquiring lock throws an error to allow retry policy by hangfire. 
    /// If there are Retries configured, warning is logged and retry is triggered. If not, error is logged and job is reported as failed.
    /// Default value is false, which means, that timeout leads to 
    /// skipping the job without any error and job is reported as cancelled and appears as succeded in hangfire dashboard.
    /// </summary>
    public bool AllowRetry { get; set; }

    public void OnPerforming(PerformingContext filterContext)
    {
        TimeSpan timeout = TimeSpan.FromMilliseconds(this.timeoutInMilliseconds);

        try
        {
            IDisposable distributedLock = filterContext.Connection.AcquireDistributedLock(this.scope, timeout);
            filterContext.Items["DistributedLock"] = distributedLock;
        }
        catch (DistributedLockTimeoutException e)
        {
            string message = $"Job {filterContext.BackgroundJob.Job.Type.FullName}.{filterContext.BackgroundJob.Job.Method.Name} " +
                             $"was cancelled because Lock for the scope could not be aquired in defined timeout of {this.timeoutInMilliseconds} milliseconds.";

            if (this.AllowRetry)
            {
                // rethrow exception - this creates an error and allows retrys.
                throw new Exception(message, e);
            }
            else
            {
                filterContext.Canceled = true;
                Logger.Warn(message + $" The errror message was: {e.Message}");
            }
        }
    }

    public void OnPerformed(PerformedContext filterContext)
    {
        if (!filterContext.Items.ContainsKey("DistributedLock"))
        {
            throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
        }

        IDisposable distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
        distributedLock.Dispose();
    }
}
1 Like