Description
Apologies in advance for how long this description is
I am using the Batches / Batch Continuations feature of Hangfire.Pro to control a polling-based system that monitors changes in another system. The general construct that I have is this:
public class TransactionLogDetector
{
IRepository _repository;
public void DetectChanges()
{
// determine what has changed since last DetectChanges
var lastChecked = _repository.LastChecked;
var allChanges = _repository.ChangesSince(lastChecked);
// create a batch so we do nothing again until the batch is finished
var current = BatchJob.StartNew(x =>
{
allChanges.ForEach(change => x.Enqueue<ChangeHandler>(job => job.Handle(change)));
});
// schedule the next DetectChanges after the current batch is finished
BatchJob.ContinueWith(current, x =>
x.Schedule<TransactionLogDetector>(job => job.DetectChanges())
, options: BatchContinuationOptions.OnAnyFinishedState);
_repository.SaveLastChecked(lastChecked);
}
}
In my application startup I have:
private void Main()
{
using(WebApp.Start<Startup>("http://localhost:12345"))
{
BackgroundJob.Enqueue<TransactionLogDetector>(x => x.DetectChanges());
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
The gist of this is that I want to trigger the initial DetectChanges
when my application starts, and it handles scheduling the next DetectChanges
ad infinitum.
Obviously there is a problem with if I run my application the 2nd time because there is already either enqueued jobs or a batch awaiting.
In order to determine if I needed to queue up the initial DetectChanges
I decided that the only way I could do this would be to interrogate Hangfire using the IMonitoringApi
interface. Here is what I attempted to do:
private void Main()
{
using(WebApp.Start<Startup>("http://localhost:12345"))
{
if(!IsOutThere<TransactionLogDetector>())
{
BackgroundJob.Enqueue<TransactionLogDetector>(x => x.DetectChanges());
}
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
private static bool IsOutThere<TJob>()
{
var api = JobStorage.Current.GetMonitoringApi();
var isEnqueued = api.EnqueuedJobs("default", 0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
var isFetched = api.FetchedJobs("default", 0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
var isScheduled = api.ScheduledJobs(0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
var isProcessing = api.ProcessingJobs(0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
return isEnqueued || isFetched || isScheduled || isProcessing;
};
This works out pretty decently to determine if I need to start. However, the caveat is if for whatever reason the application was stopped in the middle of processing a large batch of transactions / is started again.
In this situation, what seems to happen is the IMonitoringApi
is blocking the read of the current jobs states since there are already workers started processing the queueā¦and since Iām using batches / batch continuations, what seems to happen is this:
- Try to check
IsOutThere<TransactionLogDetector>()
- this hangs until the workers are pretty much through the batch
- For a brief instant Iām able to read the current job states, but it doesnāt find anything because the Continuation job isnāt quite anywhere just yet.
IsOutThere<TransactionLogDetector>()
returnsfalse
, then anotherDetectChanges
gets fired off- The continuation now gets fired and the original
DetectChanges
is out there, plus the new one we just fired off
Question
- Where is the continuation job for that split second? I canāt seem to find it via
IMonitoringApi
- my theory is that itās somewhere in the batches
- Is there a way to look into the continuations via the
JobStorageConnection
to see if there is a continuation with a type ofTransactionLogDetector
?
Thanks in advance for any information that you can provide me.