Description
Apologies in advance for how long this description is
I am using the Batches / Batch Continuations feature of Hangfire.Pro to control a polling-based system that monitors changes in another system. The general construct that I have is this:
public class TransactionLogDetector
{
IRepository _repository;
public void DetectChanges()
{
// determine what has changed since last DetectChanges
var lastChecked = _repository.LastChecked;
var allChanges = _repository.ChangesSince(lastChecked);
// create a batch so we do nothing again until the batch is finished
var current = BatchJob.StartNew(x =>
{
allChanges.ForEach(change => x.Enqueue<ChangeHandler>(job => job.Handle(change)));
});
// schedule the next DetectChanges after the current batch is finished
BatchJob.ContinueWith(current, x =>
x.Schedule<TransactionLogDetector>(job => job.DetectChanges())
, options: BatchContinuationOptions.OnAnyFinishedState);
_repository.SaveLastChecked(lastChecked);
}
}
In my application startup I have:
private void Main()
{
using(WebApp.Start<Startup>("http://localhost:12345"))
{
BackgroundJob.Enqueue<TransactionLogDetector>(x => x.DetectChanges());
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
The gist of this is that I want to trigger the initial DetectChanges when my application starts, and it handles scheduling the next DetectChanges ad infinitum.
Obviously there is a problem with if I run my application the 2nd time because there is already either enqueued jobs or a batch awaiting.
In order to determine if I needed to queue up the initial DetectChanges I decided that the only way I could do this would be to interrogate Hangfire using the IMonitoringApi interface. Here is what I attempted to do:
private void Main()
{
using(WebApp.Start<Startup>("http://localhost:12345"))
{
if(!IsOutThere<TransactionLogDetector>())
{
BackgroundJob.Enqueue<TransactionLogDetector>(x => x.DetectChanges());
}
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
private static bool IsOutThere<TJob>()
{
var api = JobStorage.Current.GetMonitoringApi();
var isEnqueued = api.EnqueuedJobs("default", 0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
var isFetched = api.FetchedJobs("default", 0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
var isScheduled = api.ScheduledJobs(0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
var isProcessing = api.ProcessingJobs(0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
return isEnqueued || isFetched || isScheduled || isProcessing;
};
This works out pretty decently to determine if I need to start. However, the caveat is if for whatever reason the application was stopped in the middle of processing a large batch of transactions / is started again.
In this situation, what seems to happen is the IMonitoringApi is blocking the read of the current jobs states since there are already workers started processing the queue…and since I’m using batches / batch continuations, what seems to happen is this:
- Try to check
IsOutThere<TransactionLogDetector>()- this hangs until the workers are pretty much through the batch
- For a brief instant I’m able to read the current job states, but it doesn’t find anything because the Continuation job isn’t quite anywhere just yet.
IsOutThere<TransactionLogDetector>()returnsfalse, then anotherDetectChangesgets fired off- The continuation now gets fired and the original
DetectChangesis out there, plus the new one we just fired off
Question
- Where is the continuation job for that split second? I can’t seem to find it via
IMonitoringApi- my theory is that it’s somewhere in the batches
- Is there a way to look into the continuations via the
JobStorageConnectionto see if there is a continuation with a type ofTransactionLogDetector?
Thanks in advance for any information that you can provide me.
) that waits for a signal, and a continuation that sets the signal. When background process receives a signal, it scans for the new records, creates a new batch (to process records), adds a continuation (that sets a signal), and resets the signal. This solution is much cleaner IMO, but requires some locking to prevent race conditions (for example, continuation is fired before process resets a signal).