Recurring jobs, don't queue when already in queue

I tried to create a possibility to queue jobs only when they aren’t queued.

Let’s put it this way, I’ve got 32 recurring tasks, and handling 4 at a time. Time between rescheduling is 5 minutes. A task normally runs within 10 seconds to 1 minute, sometimes it takes 5 minutes for 1 job.

When scheduling this, after 3 days, it has scheduled more tasks than it can ever handle. So I wanted to change this by not queueing tasks, already in queue.

You can deal with this in a couple of ways:

  • IElectStateFilter , Set failed (or some other state)
  • IServerFilter, Cancel on creating
  • IClientFilter, Cancel on performing

The only problem I’m facing is that RecurringJobScheduler isn’t passing through the recurringJobId, so I have no clue which is in queue, and which is not, other than by passing an identifying argument in the method call and read them.

Any ideas?

1 Like

You can check by method name. It’s not the prettiest but it gets the job done.

That’s fast, the thing is, they all use the same method (in this case). It’s doing a task for multiple customers, only with another set of arguments telling it where to write the data.

http://imgur.com/EM4I53m

This is how it’s currently looking… as you can see, due to the cancellation, it makes it also impossible to see if it has run or not. But beside that, why not expose the RecurringJobId?

Or make it possible to pass arguments from a RecurringJob to a Job.

Having jobId would be nice, but you can also use method name to limit max jobs with same name to 4 or any other number.

 public class DisableMultipleQueuedItemsAttribute : JobFilterAttribute, IClientFilter, IServerFilter, IElectStateFilter
    {
        private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(1);
        private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(10);

    void IClientFilter.OnCreated(CreatedContext filterContext)
    {
    }

    void IServerFilter.OnPerforming(PerformingContext filterContext)
    {
    }

    public void OnCreating(CreatingContext filterContext)
    {
        if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
        {
            filterContext.Canceled = true;
        }
    }

    public void OnPerformed(PerformedContext filterContext)
    {
        RemoveFingerprint(filterContext.Connection, filterContext.BackgroundJob);
    }

    public void OnStateElection(ElectStateContext context)
    {
        // If for any reason we delete a job from queue we release the lock.
        if (context.CandidateState.Name == "Deleted")
        {
            RemoveFingerprint(context.Connection, context.BackgroundJob);
        }
    }

    private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
    {
        using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
        {
            var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));

            DateTimeOffset timestamp;

            if (fingerprint != null &&
                fingerprint.ContainsKey("Timestamp") &&
                DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out timestamp) &&
                DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
            {
                // Actual fingerprint found, returning.
                return false;
            }

            // Fingerprint does not exist, it is invalid (no `Timestamp` key),
            // or it is not actual (timeout expired).
            connection.SetRangeInHash(
                GetFingerprintKey(job),
                new Dictionary<string, string>
            {
                { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
            });

            return true;
        }
    }

    private static string GetFingerprint(Job job)
    {
        string parameters = string.Empty;
        if (job.Args != null)
        {
            parameters = string.Join(".", job.Args);
        }

        if (job.Type == null || job.Method == null)
        {
            return string.Empty;
        }

        var fingerprint = $"{job.Type.Name}.{job.Method.Name}.{parameters}";

        return fingerprint;
    }

    private static string GetFingerprintKey(Job job)
    {
        return $"Fingerprint:{GetFingerprint(job)}";
    }

    private static string GetFingerprintLockKey(Job job)
    {
        return $"{GetFingerprintKey(job)}:lock";
    }

    private static void RemoveFingerprint(IStorageConnection connection, BackgroundJob job)
    {
        RemoveFingerprint(connection, job.Job);
    }

    private static void RemoveFingerprint(IStorageConnection connection, Job job)
    {
        using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
        {
            using (var transaction = connection.CreateWriteTransaction())
            {
                transaction.RemoveHash(GetFingerprintKey(job));
                transaction.Commit();
            }
        }
    }
}`

You can modify AddFingerprintIfNotExists to allow for 4 jobs queued at the same time.

1 Like

As you can see in the screenshot, my current solution is indeed looking through a list to see if it’s already scheduled, if so, cancel the job, if not well, let it pass through.

Though this one is saving it in store / database, so it’s better then my internal list solution. Thanks for that. (I also forgot to add the “Delete” section, in case a Job is deleted while executing).

Still in my opinion it would be easier (and less expensive) to include the RecurringJobId in the Creating/Created/CreateContext (or create a RecurringCreateContext : CreateContext (though it isn’t playing nice with the creation of new CreatedContext(RecurringCreateContext()) because you will loose your properties)

Now the job gets cancelled.

Even better would be to create a filter here:

The RecurringJob knows it’s lastJobId, you can check that (if Job == Finished) before scheduling a new Job. This way it will even skip the creaton of a job.

Giving Id’s to jobs manually and gettting them in context would be really nice, not sure if there’s a deeper reason why it isn’t implemented.

First of all thanks for your suggested Attribute, using it for now.

I’m willing to create a PullRequest, don’t know how active developers are over here, basic idea:

interface IRecurringFilter {
    void OnCreatingJob(RecurringCreatingContext filterContext);
    void OnCreatedJob(RecurringCreatedContext filterContext);
}
// Basically the same as the client/server filter
public class RecurringCreateContext : CreateContext
{
    public bool Canceled { get; set; }
    public T GetJobParameter<T>(string name);
    public void SetJobParameter(string name, object value);
}
public class RecurringCreatedContext : CreateContext
{
    public string CreatedJobId {get;set;} // To point to the created job for this recurring task
    public bool Canceled { get; set; } // Don't know about this
    public T GetCreatedJobParameter<T>(string name);
    public void SetCreatedJobParameter(string name, object value);
    public T GetJobParameter<T>(string name);
    public void SetJobParameter(string name, object value);
}

And then modify RecurringJobScheduler to apply this filter, Canceled in this case means it doesn’t get scheduled.

Using this you can create a filter which gets:

LastJobId, which makes it possible to create cases like: If currently in progress, kill task and start again (usefull for long running mail observers, to reboot them every day)
LastJobId, to check if you need to schedule it, or not, when it’s already in queue

Passing arguments from a RecurringJob to a Job (using OnCreatedJob)

Is there any way to make this filter fully transactional? As of right now, if the server fails mid-job then the fingerprint will never be removed. When the server comes back up the fingerprint will exist, so the job will never start again until the timeout period is reached.

Edit: nevermind - I see what you mean about enqueing an already scheduled job vs adding a recurring job.

Perhaps I’m missing something. But you can manually name recurring job id’s. I do this where I have recurring jobs that run against different directories. and I only want one per directory.

RecurringJob.AddOrUpdate(
    string.Format("rjscan-{0}", srcdir.Replace(Path.DirectorySeparatorChar, '-')),
    () => MyNamespace.MyMethod(srcdir, "*.tif"), Cron.Minutely()
);