If I go to recurring jobs tab and trigger job myself OnCreating
method is never called. If it gets triggered normally everything is fine.
First event that I get is public void OnStateElection(ElectStateContext context)
with states going from enqueued -> processing.
Same thing happens if I enqueue a job trough code. Only instance where OnCreating
is called when the recurring job gets triggered by itself. OnPerforming
is the only one I can rely on calling for every job.
public class DisableMultipleQueuedItemsAttribute : JobFilterAttribute, IClientFilter, IServerFilter, IElectStateFilter
{
private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(10);
private static readonly List<string> ReleaseStates = new List<string> { DeletedState.StateName, FailedState.StateName };
public void OnCreating(CreatingContext filterContext)
{
Console.WriteLine($"Creating {filterContext.Job.Method.Name}");
}
public void OnCreated(CreatedContext filterContext)
{
Console.WriteLine($"Created {filterContext.BackgroundJob.Id}");
}
public void OnPerforming(PerformingContext filterContext)
{
Console.WriteLine($"Performing ({filterContext.BackgroundJob.Id}) {filterContext.BackgroundJob.Job.Method.Name}");
if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.BackgroundJob))
{
filterContext.Canceled = true;
}
}
public void OnPerformed(PerformedContext filterContext)
{
Console.WriteLine($"Performed {filterContext.BackgroundJob.Id}");
// If job finished with success we remove lock.
if (filterContext.Exception == null || filterContext.ExceptionHandled)
{
RemoveFingerprint(filterContext.Connection, filterContext.BackgroundJob);
}
}
public void OnStateElection(ElectStateContext context)
{
Console.WriteLine($"State change ({context.BackgroundJob.Id}) {context.CurrentState} -> {context.CandidateState.Name}");
// If job is going to release state and it isn't in failed state we release the lock.
// if job is already in failed state we don't release lock because lock can be from a running job, failed job already had it's lock removed.
if (ReleaseStates.Contains(context.CandidateState.Name) && context.CurrentState != FailedState.StateName)
{
RemoveFingerprint(context.Connection, context.BackgroundJob);
}
}
private static bool AddFingerprintIfNotExists(IStorageConnection connection, BackgroundJob job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
{
var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));
if (fingerprint != null && fingerprint.ContainsKey("Timestamp"))
{
// Actual fingerprint found, returning.
return false;
}
// Fingerprint does not exist, it is invalid (no `Timestamp` key),
// or it is not actual (timeout expired).
connection.SetRangeInHash(
GetFingerprintKey(job),
new Dictionary<string, string>
{
{ "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
});
return true;
}
}
private static string GetFingerprint(BackgroundJob job)
{
string parameters = string.Empty;
if (job.Job.Args != null)
{
parameters = string.Join(".", job.Job.Args);
}
if (job.Job.Type == null || job.Job.Method == null)
{
return string.Empty;
}
var fingerprint = $"{job.Job.Type.Name}.{job.Job.Method.Name}.{parameters}";
return fingerprint;
}
private static string GetFingerprintKey(BackgroundJob job)
{
return $"Fingerprint:{GetFingerprint(job)}";
}
private static string GetFingerprintLockKey(BackgroundJob job)
{
return $"{GetFingerprintKey(job)}:lock";
}
private static void RemoveFingerprint(IStorageConnection connection, BackgroundJob job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
{
using (var transaction = connection.CreateWriteTransaction())
{
transaction.RemoveHash(GetFingerprintKey(job));
transaction.Commit();
}
}
}
}