DisableConcurentExecution for Job Groups

Hi, I am trying to create job chains and make sure, that two jobs from the same chain do not run at once. Maybe this feature is possible in the Pro version with Batches?

Example: I have job1, job2, job3. I am chaining them using ContinueWith into one chain. (Enqueue(job1), ContinueWith(job2), ContinueWith(job3). This chain is started with a RecurrentJob to trigger the whole chain each hour. So far so good. suggested here

Now I am trying to figure out how to make sure that job1 and job2 or job3 are not running at the same time. (in scenario where jobs are triggered faster then they finish). Using DisableConcurrentExecution seems not to help, as this is bound to one job only. Any ideas how to manage job chains more in detail?

Thx a lot for your suggestions…

I find out nice solution for this: created my own Attribute which is very similar to the original DisableConcurrentExecution. I called it SkipConcurrentExecutionInScopeAttribute and its constructor accepts two parameters: timeoutInMilliseconds and scope.

Another improvement was to swollow the DistributedLockTimeoutException to avoid retries and exceptions in log. I can post the code of this Attribute if someone is interested.

I’ll be interested by this code.

/// <summary>
/// This attribute is a copy from origianl Hangfire DisableConcurrentExecutionAttribute but with some adaptions.
/// 
/// It allows us to define a scope, so attribute can be used for different jobs in one chain
/// Instead of throwing exception, it skips the execution if job is already running http://hangfire.discourse.group/t/job-reentrancy-avoidance-proposal/607/6
/// </summary>
public class SkipConcurrentExecutionInScopeAttribute : JobFilterAttribute, IServerFilter
{
    private static readonly ILog Logger = LogProvider.GetCurrentClassLogger();

    private readonly int timeoutInMilliseconds;

    private readonly string scope;

    public SkipConcurrentExecutionInScopeAttribute(int timeoutInMilliseconds, object scope)
    {
        this.timeoutInMilliseconds = timeoutInMilliseconds;
        this.AllowRetry = false;

        this.scope = "SkipConcurrentExecutionInScopeAttribute:" + scope;

        if (timeoutInMilliseconds < 0)
        {
            throw new ArgumentException("Timeout argument value should be greater that zero.");
        }

        if (timeoutInMilliseconds > 3000)
        {
            throw new ArgumentException("Max timeout of 3 second is allowed to avoid Database Locking");
        }

        if (string.IsNullOrWhiteSpace(scope.ToString()))
        {
            throw new ArgumentException("Scope argument value has to have some reasonable value.");
        }
    }

    /// <summary>
    /// Defines if timeout by acquiring lock throws an error to allow retry policy by hangfire. 
    /// If there are Retries configured, warning is logged and retry is triggered. If not, error is logged and job is reported as failed.
    /// Default value is false, which means, that timeout leads to 
    /// skipping the job without any error and job is reported as cancelled and appears as succeded in hangfire dashboard.
    /// </summary>
    public bool AllowRetry { get; set; }

    public void OnPerforming(PerformingContext filterContext)
    {
        TimeSpan timeout = TimeSpan.FromMilliseconds(this.timeoutInMilliseconds);

        try
        {
            IDisposable distributedLock = filterContext.Connection.AcquireDistributedLock(this.scope, timeout);
            filterContext.Items["DistributedLock"] = distributedLock;
        }
        catch (DistributedLockTimeoutException e)
        {
            string message = $"Job {filterContext.BackgroundJob.Job.Type.FullName}.{filterContext.BackgroundJob.Job.Method.Name} " +
                             $"was cancelled because Lock for the scope could not be aquired in defined timeout of {this.timeoutInMilliseconds} milliseconds.";

            if (this.AllowRetry)
            {
                // rethrow exception - this creates an error and allows retrys.
                throw new Exception(message, e);
            }
            else
            {
                filterContext.Canceled = true;
                Logger.Warn(message + $" The errror message was: {e.Message}");
            }
        }
    }

    public void OnPerformed(PerformedContext filterContext)
    {
        if (!filterContext.Items.ContainsKey("DistributedLock"))
        {
            throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
        }

        IDisposable distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
        distributedLock.Dispose();
    }
}
1 Like

Thanks, I’ll have a look.

I noticed that you had to come up with your own scope, given that job.Type.ToGenericTypeString() is not accessible… https://github.com/HangfireIO/Hangfire/blob/v1.6.14/src/Hangfire.Core/DisableConcurrentExecutionAttribute.cs#L57

I wanted the exception-swallowing “Skip” attribute to be as close as possible to the “Disable” one, without having to manage scopes manually… would that be possible?

sure, you can inherit DisableConcurrentExecutionAttribute and override only OnPerforming method (catching the Exception on base method)

GetResource() is private, so I can’t use it from the new class…

ok, sorry, I misunderstood your question… this would be then a Change Request for the author - providing a parameter in constructor or making the method protected… you can try :wink: (or create new Pull Request)