Description
Apologies in advance for how long this description is
I am using the Batches / Batch Continuations feature of Hangfire.Pro to control a polling-based system that monitors changes in another system. The general construct that I have is this:
public class TransactionLogDetector
{
IRepository _repository;
public void DetectChanges()
{
// determine what has changed since last DetectChanges
var lastChecked = _repository.LastChecked;
var allChanges = _repository.ChangesSince(lastChecked);
// create a batch so we do nothing again until the batch is finished
var current = BatchJob.StartNew(x =>
{
allChanges.ForEach(change => x.Enqueue<ChangeHandler>(job => job.Handle(change)));
});
// schedule the next DetectChanges after the current batch is finished
BatchJob.ContinueWith(current, x =>
x.Schedule<TransactionLogDetector>(job => job.DetectChanges())
, options: BatchContinuationOptions.OnAnyFinishedState);
_repository.SaveLastChecked(lastChecked);
}
}
In my application startup I have:
private void Main()
{
using(WebApp.Start<Startup>("http://localhost:12345"))
{
BackgroundJob.Enqueue<TransactionLogDetector>(x => x.DetectChanges());
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
The gist of this is that I want to trigger the initial DetectChanges
when my application starts, and it handles scheduling the next DetectChanges
ad infinitum.
Obviously there is a problem with if I run my application the 2nd time because there is already either enqueued jobs or a batch awaiting.
In order to determine if I needed to queue up the initial DetectChanges
I decided that the only way I could do this would be to interrogate Hangfire using the IMonitoringApi
interface. Here is what I attempted to do:
private void Main()
{
using(WebApp.Start<Startup>("http://localhost:12345"))
{
if(!IsOutThere<TransactionLogDetector>())
{
BackgroundJob.Enqueue<TransactionLogDetector>(x => x.DetectChanges());
}
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
private static bool IsOutThere<TJob>()
{
var api = JobStorage.Current.GetMonitoringApi();
var isEnqueued = api.EnqueuedJobs("default", 0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
var isFetched = api.FetchedJobs("default", 0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
var isScheduled = api.ScheduledJobs(0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
var isProcessing = api.ProcessingJobs(0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
return isEnqueued || isFetched || isScheduled || isProcessing;
};
This works out pretty decently to determine if I need to start. However, the caveat is if for whatever reason the application was stopped in the middle of processing a large batch of transactions / is started again.
In this situation, what seems to happen is the IMonitoringApi
is blocking the read of the current jobs states since there are already workers started processing the queue…and since I’m using batches / batch continuations, what seems to happen is this:
- Try to check
IsOutThere<TransactionLogDetector>()
- this hangs until the workers are pretty much through the batch
- For a brief instant I’m able to read the current job states, but it doesn’t find anything because the Continuation job isn’t quite anywhere just yet.
IsOutThere<TransactionLogDetector>()
returnsfalse
, then anotherDetectChanges
gets fired off- The continuation now gets fired and the original
DetectChanges
is out there, plus the new one we just fired off
Question
- Where is the continuation job for that split second? I can’t seem to find it via
IMonitoringApi
- my theory is that it’s somewhere in the batches
- Is there a way to look into the continuations via the
JobStorageConnection
to see if there is a continuation with a type ofTransactionLogDetector
?
Thanks in advance for any information that you can provide me.