/// <summary>
/// This attribute is a copy from origianl Hangfire DisableConcurrentExecutionAttribute but with some adaptions.
///
/// It allows us to define a scope, so attribute can be used for different jobs in one chain
/// Instead of throwing exception, it skips the execution if job is already running http://hangfire.discourse.group/t/job-reentrancy-avoidance-proposal/607/6
/// </summary>
public class SkipConcurrentExecutionInScopeAttribute : JobFilterAttribute, IServerFilter
{
private static readonly ILog Logger = LogProvider.GetCurrentClassLogger();
private readonly int timeoutInMilliseconds;
private readonly string scope;
public SkipConcurrentExecutionInScopeAttribute(int timeoutInMilliseconds, object scope)
{
this.timeoutInMilliseconds = timeoutInMilliseconds;
this.AllowRetry = false;
this.scope = "SkipConcurrentExecutionInScopeAttribute:" + scope;
if (timeoutInMilliseconds < 0)
{
throw new ArgumentException("Timeout argument value should be greater that zero.");
}
if (timeoutInMilliseconds > 3000)
{
throw new ArgumentException("Max timeout of 3 second is allowed to avoid Database Locking");
}
if (string.IsNullOrWhiteSpace(scope.ToString()))
{
throw new ArgumentException("Scope argument value has to have some reasonable value.");
}
}
/// <summary>
/// Defines if timeout by acquiring lock throws an error to allow retry policy by hangfire.
/// If there are Retries configured, warning is logged and retry is triggered. If not, error is logged and job is reported as failed.
/// Default value is false, which means, that timeout leads to
/// skipping the job without any error and job is reported as cancelled and appears as succeded in hangfire dashboard.
/// </summary>
public bool AllowRetry { get; set; }
public void OnPerforming(PerformingContext filterContext)
{
TimeSpan timeout = TimeSpan.FromMilliseconds(this.timeoutInMilliseconds);
try
{
IDisposable distributedLock = filterContext.Connection.AcquireDistributedLock(this.scope, timeout);
filterContext.Items["DistributedLock"] = distributedLock;
}
catch (DistributedLockTimeoutException e)
{
string message = $"Job {filterContext.BackgroundJob.Job.Type.FullName}.{filterContext.BackgroundJob.Job.Method.Name} " +
$"was cancelled because Lock for the scope could not be aquired in defined timeout of {this.timeoutInMilliseconds} milliseconds.";
if (this.AllowRetry)
{
// rethrow exception - this creates an error and allows retrys.
throw new Exception(message, e);
}
else
{
filterContext.Canceled = true;
Logger.Warn(message + $" The errror message was: {e.Message}");
}
}
}
public void OnPerformed(PerformedContext filterContext)
{
if (!filterContext.Items.ContainsKey("DistributedLock"))
{
throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
}
IDisposable distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
distributedLock.Dispose();
}
}
1 Like