Interrogating for the existence of a particular job type

Description

Apologies in advance for how long this description is :frowning:

I am using the Batches / Batch Continuations feature of Hangfire.Pro to control a polling-based system that monitors changes in another system. The general construct that I have is this:

public class TransactionLogDetector
{
  IRepository _repository;

  public void DetectChanges()
  {
    // determine what has changed since last DetectChanges
    var lastChecked = _repository.LastChecked;
    var allChanges = _repository.ChangesSince(lastChecked);

    // create a batch so we do nothing again until the batch is finished
    var current = BatchJob.StartNew(x =>
    {
      allChanges.ForEach(change => x.Enqueue<ChangeHandler>(job => job.Handle(change)));
    });

    // schedule the next DetectChanges after the current batch is finished
    BatchJob.ContinueWith(current, x =>
      x.Schedule<TransactionLogDetector>(job => job.DetectChanges())
    , options: BatchContinuationOptions.OnAnyFinishedState);
    _repository.SaveLastChecked(lastChecked);
  }
}

In my application startup I have:

private void Main()
{
  using(WebApp.Start<Startup>("http://localhost:12345"))
  {
    BackgroundJob.Enqueue<TransactionLogDetector>(x => x.DetectChanges());

    Console.WriteLine("Press <enter> to exit");
    Console.ReadLine();
  }
}

The gist of this is that I want to trigger the initial DetectChanges when my application starts, and it handles scheduling the next DetectChanges ad infinitum.

Obviously there is a problem with if I run my application the 2nd time because there is already either enqueued jobs or a batch awaiting.

In order to determine if I needed to queue up the initial DetectChanges I decided that the only way I could do this would be to interrogate Hangfire using the IMonitoringApi interface. Here is what I attempted to do:

private void Main()
{
  using(WebApp.Start<Startup>("http://localhost:12345"))
  {
    if(!IsOutThere<TransactionLogDetector>())
    {
      BackgroundJob.Enqueue<TransactionLogDetector>(x => x.DetectChanges());
    }

    Console.WriteLine("Press <enter> to exit");
    Console.ReadLine();
  }
}

private static bool IsOutThere<TJob>()
{
  var api = JobStorage.Current.GetMonitoringApi();
  var isEnqueued = api.EnqueuedJobs("default", 0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
  var isFetched = api.FetchedJobs("default", 0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
  var isScheduled = api.ScheduledJobs(0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));
  var isProcessing = api.ProcessingJobs(0, int.MaxValue).Any(x => x.Value.Job.Type == typeof(TJob));

  return isEnqueued || isFetched || isScheduled || isProcessing;
};

This works out pretty decently to determine if I need to start. However, the caveat is if for whatever reason the application was stopped in the middle of processing a large batch of transactions / is started again.

In this situation, what seems to happen is the IMonitoringApi is blocking the read of the current jobs states since there are already workers started processing the queueā€¦and since Iā€™m using batches / batch continuations, what seems to happen is this:

  • Try to check IsOutThere<TransactionLogDetector>()
    • this hangs until the workers are pretty much through the batch
  • For a brief instant Iā€™m able to read the current job states, but it doesnā€™t find anything because the Continuation job isnā€™t quite anywhere just yet.
  • IsOutThere<TransactionLogDetector>() returns false, then another DetectChanges gets fired off
  • The continuation now gets fired and the original DetectChanges is out there, plus the new one we just fired off

Question

  • Where is the continuation job for that split second? I canā€™t seem to find it via IMonitoringApi
    • my theory is that itā€™s somewhere in the batches
    • Is there a way to look into the continuations via the JobStorageConnection to see if there is a continuation with a type of TransactionLogDetector?

Thanks in advance for any information that you can provide me.

Ok, I think that I found out how to get these:

var continuationJobTypes = jobStorage.GetAllItemsFromSet($"batch:{batchId}:created").Select(batch => jobStorage.GetJobData(batch).Job.Type);

There has to be a better way to say ā€œare there any jobs in any state in the system of TJobType?ā€ Am I missing something?

IsOutThere method will kill the performance of background job processing, once you have other background jobs to be processed. Itā€™s better to avoid using batches and continuations here at all, consider using background processes.

Background process will check your repository for changes and create background jobs. Depending on your backing storage, itā€™s better to use its own locking mechanisms (for RDBMS), or distributed locks built into Hangfire to prevent multiple process instances of doing the same work.

Donā€™t worry about the number of background job it will create, youā€™ll need to process all of them anyway.

public class DetectChangesProcess : IBackgroundProcess
{
    public void Execute(BackgroundProcessContext context)
    {
        using (var connection = context.Storage.GetConnection())
        using (connection.AcquireDistributedLock("myapp:detect-changes", TimeSpan.FromMinutes(1)))
        {
                // It's better to use regular transactions here instead
                // of distributed locks. 

                var lastChecked = _repository.LastChecked;
                var allChanges = _repository.ChangesSince(lastChecked);

                allChanges.ForEach(change => BackgroundJob.Enqueue<ChangeHandler>(job => job.Handle(change.Id)));

                _repository.SaveLastChecked(lastChecked);
        }

        context.Wait(TimeSpan.FromSecond(1));
    }
}

Then add the process to your processing server:

app.UseHangfireServer(new DetectChangesProcess());

Instead of preventing background jobs from creation, itā€™s better to make the ChangeHandler.Handle method atomic and idempotent, and let the background process to create as many background jobs as it wants. Atomicity and idempotency will prevent your jobs from processing same entities multiple times.

The simplest way to make method idempotent is to add the IsProcessed property to the Change entity and check it before doing any processing. The simplest way to make it atomic is to use transactions, if you are using RDBMS:

public void Handle(int changeId)
{
    using (var transaction = CreateTransaction(IsolationLevel.RepeatableRead))
    {
        // When using SQL Server, it's better to add the UPDLOCK hint
        // to the following query to avoid deadlocks.
        var change = GetChange(changeId); 
        if (change.IsProcessed) return;

        // Processing

        SetProcessed(change);        
    }
}

The caveat to that is what this is doing is syncing data between two systems. Here is a scenario I think the background process may not cover:

  1. An entry is added
  2. the background process picks it up, and BackgroundJob.Enqueue<Added>(x => x.Handle(thingAdded)) as job id 123
  3. this job fails the first time for whatever reason and Hangfire re-queues it
  4. In the meantime, the next set of changes is detected and determines the same thingAdded has not been removed, so it BackgroundJob.Enqueue<Deleted>(x => x.Handle(sameThingDeleted)) as job id 234
  5. The Deleted succeeds before the Added, so now my system does not have that
  6. The Added now comes through, but doesnā€™t know Deleted happened

I know thatā€™s confusing, but that was the purpose of me doing the continuationsā€¦so I could guarantee that Iā€™m only processing a set of changes at a time and that I donā€™t even check the next time until the last changeset has been processed.

Does that make sense?

Just one other note; the IsOutThere<TJob> only happens once; at the application startup. It is not something that happens any other time other than the initial check when the application starts up.

Do I understand correctly, that all the Change entities should be processed in a strict order?

Yes, that is effectively what I was trying to achieve.

I know that Hangfire canā€™t guarantee order, which was why I was trying to maintain that with batch continuations.

But all the background jobs inside a batch may be processed in a random order anyway. Most of the time the order will be correct, but anyway you canā€™t stay with this solution, it is not robust. The only solution is to use background job continuations (not batch ones), or process changes in a background process.

Sorry I wasnā€™t clear about that; in an individual batch, I am fine. They can be processed in any order whatsoever. I am already maintaining that by what I put in the batch. The subsequent checks need to be somewhat serialized.

Not a problem if you are happy with unordered execution inside a batch. But I I donā€™t recommend using IsOutThere method anyway, because it uses indirect checks to get an answer for the ā€œare there any jobsā€ question. New states, new features, etc. may break it in the future.

Instead, you can use background process (yep, again :smile:) that waits for a signal, and a continuation that sets the signal. When background process receives a signal, it scans for the new records, creates a new batch (to process records), adds a continuation (that sets a signal), and resets the signal. This solution is much cleaner IMO, but requires some locking to prevent race conditions (for example, continuation is fired before process resets a signal).

Iā€™m not opposed to that at all; I am just unclear on:

  • what sets the initial signal? Do you still not have the issue of not knowing that something is out there, about to set the signal?
  • what is something to use as a signal within Hangfire? Or is it assumed that all workers run on the same box? I donā€™t anticipate running on multiple machines, but just curious what is a good, default choice for signaling (semaphore? named mutex?)

To be clear, Iā€™m not super happy with my solution, but itā€™s working. My next step would be to just set a user setting for my application that has a IsWatchingChanges flag to true initially and just settle on that.

The signal should be persistent to be a correct solution, to work after process restarts. You already have the LastChecked property of a repository, perhaps move the signal there? But I donā€™t know what storage are you using. The initial signal should be set by a migration or a user, and not by the code itself.

Currently Iā€™m only using application setting storage (as user settings, so I can write to them and leveraging the Settings.Upgrade() if necessary for when file version information changes) as I wanted to keep it lightweight.

I will likely move to this model then. Is the concept of a IsOutThere<TJob> never going to be a viable concept / generally applicable idea in Hangfire? My only motivation for this is that Hangfire is the only source of truth for things like this.

Thanks for the feedback.

Speaking generally, the concept of IsOutThere<TJob> may be viable if you teach Hangfire how to perform this logic. One way is to create a IApplyStateFilter that will be running, when background job is starting or completed. You can even use Hangfireā€™s storage to perform the tracking:

public class TrackBackgroundJobsAttribute : JobFilterAttribute, IApplyStateFilter
{
    public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
    {
        if (context.NewState.IsFinal)
        {
            // Background job is completed: Succeeded or Deleted
            context.Transaction.RemoveFromSet(GetSetKey(context.BackgroundJob.Job.Type), context.BackgroundJob.Id);
        }
        else
        {
            // Background job is pending: Enqueued, Scheduled, Failed, Processing and so on
            context.Transaction.AddToSet(GetSetKey(context.BackgroundJob.Job.Type), context.BackgroundJob.Id);
        }
    }

    public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
    {
    }

    private static string GetSetKey(Type type)
    {
        return $"pending:{type.Name}";
    }
}

Then, if you mark background jobs using this attribute, and query that set:

using (var connection = JobStorage.Current.GetConnection())
{
    var storageConnection = (JobStorageConnection) connection; // Oh, backwards compatibility, you are so annoying...
    var pendingJobCount = storageConnection.GetSetCount(GetSetKey(typeof(Services)));
}

But the problem is that this data is highly volatile, and there may be problems, for example, when a user retried a background job just after you queried for GetSetCount, so more locking is needed to prevent race conditions.

1 Like

@odinserj thank you! I think this is a viable solution for what I am trying to achieve. This fulfills my desire to have Hangfire be the source of truth in this case. Thanks for all of the suggestions / insight!

@odinserj I think that this is working out exactly like I expected. The only additional thing that I had to add was the IClientFilter because sometimes I have the thing Iā€™m looking for queued up inside a batch as a ScheduledJob, and in that circumstance it was missing that it was actually queued up and ready to go, so I additionally am adding to the set when the job is created.

The OnCreated I had to create my own IWriteOnlyTransaction and Commit() it at the end, whereas it looks like in the IApplyStateFilter the transaction is committed for you within Hangfire. Iā€™m also not sure under which conditions youā€™d get an OnCreated with context.Canceled being set to true, but I assumed I could use that flag to know whether or not to add or remove from the set.

Can you verify that my implementation seems sound? Thanks again for all of the help.

Note that a Chain is there in case I want to group particular job types as part of the same ā€œchainā€. That is usually when I have JobA :arrow_right: JobB :arrow_right: JobC and then restart back at JobA, I want to group all of those as SomeChainGroup, so on each one of those jobs I can decorate it with a [TrackBackgroundJobs(Chain = typeof(SomeChainGroup)] and they all will use that key to the set.

public class TrackBackgroundJobsAttribute : JobFilterAttribute, IApplyStateFilter, IClientFilter
{
    private readonly ILog _logger = LogProvider.GetCurrentClassLogger();

    public Type Chain { get; set; }

    public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction _)
    {
        var isRemovingJob = context.NewState.IsFinal;
        HandleStateChange(context.BackgroundJob, context.Transaction, isRemovingJob, context.NewState.Name, context.OldStateName);
    }

    public void OnCreated(CreatedContext context)
    {
        using (var transaction = context.Connection.CreateWriteTransaction())
        {
            HandleStateChange(context.BackgroundJob, transaction, context.Canceled, "Created");
            transaction.Commit();
        }
    }

    public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
    {
    }

    public void OnCreating(CreatingContext filterContext)
    {
    }

    private void HandleStateChange(BackgroundJob backgroundJob, IWriteOnlyTransaction transaction, bool isRemovingJob, string newState, string oldState = "")
    {
        var transition = $"({oldState} --> {newState})";
        var key = PendingKeyNameFrom(backgroundJob);
        var jobId = backgroundJob.Id;

        if (isRemovingJob)
        {
            _logger.Debug($"Removing set {key} / {jobId} {transition}");
            transaction.RemoveFromSet(key, jobId);
            return;
        }

        _logger.Debug($"Adding set {key} / {jobId} {transition}");
        transaction.AddToSet(key, jobId);
    }

    private string PendingKeyNameFrom(BackgroundJob backgroundJob)
    {
        return (Chain ?? backgroundJob.Job.Type).PendingKeyName();
    }
}

public class JobQueueInformation
{
    private readonly ILog _logger;
    private readonly JobStorageConnectionFactory _jobStorageConnectionFactory;

    public JobQueueInformation(ILog logger, JobStorageConnectionFactory jobStorageConnectionFactory)
    {
        _logger = logger;
        _jobStorageConnectionFactory = jobStorageConnectionFactory;
    }

    public bool HasInThePipeline<TJob>()
    {
        ValidateTracksBackgroundJobs<TJob>();

        using (var connection = _jobStorageConnectionFactory.GetConnection())
        {
            var pendingKeyName = typeof (TJob).PendingKeyName();
            var count = connection.GetSetCount(pendingKeyName);
            _logger.Debug($"{pendingKeyName} count is {count}");
            return count > 0;
        }
    }

    private static void ValidateTracksBackgroundJobs<TJob>()
    {
        if (!Attribute.IsDefined(typeof(TJob), typeof(TrackBackgroundJobsAttribute)))
        {
            throw new ArgumentException($"{typeof(TJob)} is not decorated with {typeof(TrackBackgroundJobsAttribute)}");
        }
    }
}