Job reentrancy avoidance proposal

I have recurring long running tasks, so I needed a way to avoid reentrancy somehow. I looked at DisableConcurrentExecutionAttribute which seems to do the job, but because of filterContext.Connection.AcquireDistributedLock throwing an SqlServerDistributedLockException the default AutomaticRetry functionality kicks in and reschedules the task. I would like to silently dismiss these reentrant tasks.
What happens if I set the filterContext (PerformingContext parameter in OnPerforming()) Canceled property to true?

I have modified the DisableConcurrentExecutionAttribute with a bool parameter to swallow the distributed lock exception. Obviously you could check for the exact SqlServerDistributedLockException, because of it being internal I didn’t bothered to reflect the type, and check for magic strings.

This approach seems to work well. Reentrant tasks simply finish with success status after not being able to acquire the distributed lock.

Questions:

  • are there any side effects setting the Canceled property this way?
  • is this the right approach?

Pastie: http://pastie.org/private/dc4viibzxnzyfuwgwmtmg#6,13,31,33,39
Or inline:

public class public class DisableConcurrentExecutionAttribute : JobFilterAttribute, IServerFilter
{
    static Logger logger = LogManager.GetCurrentClassLogger();

    private readonly int _timeoutInSeconds;
    private readonly bool _retryOnLockTimeout;

    public DisableConcurrentExecutionAttribute(int timeoutInSeconds, bool retryOnLockTimeout = true)
    {
        if (timeoutInSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");

        _timeoutInSeconds = timeoutInSeconds;
        _retryOnLockTimeout = retryOnLockTimeout;
    }


    public void OnPerforming(PerformingContext filterContext)
    {
        var resource = String.Format(
            "{0}.{1}",
            filterContext.Job.Type.FullName,
            filterContext.Job.Method.Name);

        var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);

        try
        {
            var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeout);
            filterContext.Items["DistributedLock"] = distributedLock;
        }
        catch (Exception)
        {
            if (_retryOnLockTimeout)
            {
                throw;
            }
            else
            {
                filterContext.Canceled = true;
                logger.Warn("Cancelling run for {0} job, id: {1} ", resource, filterContext.JobId);
            }
        }

    }

    public void OnPerformed(PerformedContext filterContext)
    {
        if (!filterContext.Items.ContainsKey("DistributedLock"))
        {
            throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
        }

        var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
        distributedLock.Dispose();
    }
} : JobFilterAttribute, IServerFilter
{
    static Logger logger = LogManager.GetCurrentClassLogger();

    private readonly int _timeoutInSeconds;
    private readonly bool _retryOnLockTimeout;

    public DisableConcurrentExecutionAttribute(int timeoutInSeconds, bool retryOnLockTimeout = true)
    {
        if (timeoutInSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");

        _timeoutInSeconds = timeoutInSeconds;
        _retryOnLockTimeout = retryOnLockTimeout;
    }


    public void OnPerforming(PerformingContext filterContext)
    {
        var resource = String.Format(
            "{0}.{1}",
            filterContext.Job.Type.FullName,
            filterContext.Job.Method.Name);

        var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);

        try
        {
            var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeout);
            filterContext.Items["DistributedLock"] = distributedLock;
        }
        catch (Exception)
        {
            if (_retryOnLockTimeout)
            {
                throw;
            }
            else
            {
                filterContext.Canceled = true;
                logger.Warn("Cancelling run for {0} job, id: {1} ", resource, filterContext.JobId);
            }
        }

    }

    public void OnPerformed(PerformedContext filterContext)
    {
        if (!filterContext.Items.ContainsKey("DistributedLock"))
        {
            throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
        }

        var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
        distributedLock.Dispose();
    }
}

Thank you for the library.

1 Like

Interesting solution. Yes, if you set the Canceled property of a PerformingContext, the job will not performed and silently moved to the succeeded state. Can you create a pull request for this feature?

I will try to find time in the next couple of days.

Okay, will wait for it. Thank you in advance!

I had some unusual behaviour with DisableConcurrentExecutionAttribute’s distributed lock. It was not really working with timeouts grather than 0. I have a major deadline coming in the next few weeks, so I don’t have time right now to work out the details, but after that I will take a look, and if still needed, create a pull request.

Hi
@Domonkos were you already able to work on that improvement? We’re currently looking into avoiding concurrent execution and your proposal which avoids queuing altogether looks more promising to us.

@odinserj Could you incorporate this change?

In case someone else has the same problems, we went ahead and implemented an own JobFilterAttribute which we use instead of DisableConcurrentExecutionAttribute. The code is mostly the same as proposed by @Domonkos (thx!) :

/// <summary>
/// Attribute to skip a job execution if the same job is already running.
/// Mostly taken from: http://discuss.hangfire.io/t/job-reentrancy-avoidance-proposal/607
/// </summary>
public class SkipConcurrentExecutionAttribute : JobFilterAttribute, IServerFilter
{
	private static readonly Logger logger = LogManager.GetCurrentClassLogger();

	private readonly int _timeoutInSeconds;

	public SkipConcurrentExecutionAttribute(int timeoutInSeconds)
	{
		if (timeoutInSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");

		_timeoutInSeconds = timeoutInSeconds;
	}


	public void OnPerforming(PerformingContext filterContext)
	{
		var resource = String.Format(
							 "{0}.{1}",
							filterContext.Job.Type.FullName,
							filterContext.Job.Method.Name);

		var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);

		try
		{
			var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeout);
			filterContext.Items["DistributedLock"] = distributedLock;
		}
		catch (Exception)
		{
			filterContext.Canceled = true;
			logger.Warn("Cancelling run for {0} job, id: {1} ", resource, filterContext.JobId);
		}
	}

	public void OnPerformed(PerformedContext filterContext)
	{
		if (!filterContext.Items.ContainsKey("DistributedLock"))
		{
			throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
		}

		var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
		distributedLock.Dispose();
	}
}
1 Like

Thanks for posting this; I had a similar goal and your code was very helpful in implementing my own attribute to prevent similar instances of the task from running.

Guys, I need some help.

What is that timeout about?

After some testing and as far as I could understand, that timeout means how long that request should wait until it stops (raise a SqlServerDistributedLockException). Am I right?

This is my scenario:

  • Dashboard in a console application
  • Server in another console application
  • Class library to share interfaces between both projects (the attribute should decorate the method here)

I set the timeout to 0 and the job is being enqueued but not processed which is the idea behind SkipConcurrentExecutionAttribute. Right?

So, my question is how can I either changed it to another status like “Skipped” or set some description somewhere to show this one was skipped?

And another question: is that possible to prevent even to enqueue if there is another one enqueued or processing?

Thanks

The biggest problem with DisableConcurrentExecutionAttribute and this solution is that is does not provide a way to keep up with the other overloads of the job registration methods.

There is no guarantee that “Type.Method” is the key for your jobs when they are recurring as you can specify your own id.

Currently as far as i know retrieving the parent recurring job id is a manual process (get all recurring jobs and compare LastJobId to the current job attempting to execute).

The recurring job id should be easily to retrieve, and possibly associated with the job at creation (since it already associated with the LastJobId anyway) that way the actual job id can be used to filter regardless o whether it happened to be the default “Type.Method” or a user provided id.

Currently filtering only by “Type.Method” does not allow for cases where you have multiple recurring jobs using the same method for different purposes).

Is ‘DisableConcurrentExecutionAttribute’ implemented? Could you link / post an example?

I’ve implemented DisableConcurrentExecutionAttribute and the lock name in DB is correct (I don’t want to run multiple instances of my method with THE SAME VALUES OF ITS PARAMETERS).
However, it is still ‘perfectly’ triggering from multiple threads - looks like this attribute is ignored :frowning:

public class DisableConcurrentBatchExecutionAttribute : JobFilterAttribute, IServerFilter
{
    private readonly int _timeoutInSeconds;

    public DisableConcurrentBatchExecutionAttribute(int timeoutInSeconds)
    {
        if (timeoutInSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");

        _timeoutInSeconds = timeoutInSeconds;
    }

    public void OnPerforming(PerformingContext filterContext)
    {
        var resource = GetResource(filterContext.BackgroundJob.Job);

        var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);

        var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeout);
        filterContext.Items["DistributedLock"] = distributedLock;
    }

    public void OnPerformed(PerformedContext filterContext)
    {
        if (!filterContext.Items.ContainsKey("DistributedLock"))
        {
            throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
        }

        var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
        distributedLock.Dispose();
    }

    private static string GetResource(Job job)
    {
        var argsSalt = new StringBuilder();
        if (job.Args != null && job.Args.Count > 0)
        {
            foreach (var arg in job.Args)
            {
                var properties = arg.GetType().GetProperties(BindingFlags.Instance | BindingFlags.Public);
                foreach (var property in properties)
                {
                    argsSalt.Append($".{property.GetValue(arg, null)}");
                }
            }

        }

        return $"{job.Type.ToGenericTypeString()}.{job.Method.Name}{argsSalt}";
    }
}