We want to fine-tune the degree of concurrency depending on parameter values.
Given many buckets, each with the same multiple segments
- Bucket 1
- Segment A
- Item 1A-1
- Item 1A-2
- Segment C
- Item 1C-1
- Segment A
- Bucket 2
- Segment B
- Item 2B-1
- Segment C
- Item 2C-1
- Item 2C-2
- Segment B
Users can add new buckets, while segments are fixed (hard coded as an enum).
Segments are updated often and can be large, so we want to only update a subset of the segments per bucket. But to avoid conflicting updates, no parallel updates of the same bucket should run.
Task UpdateList(Guid bucketId, Segment[] segments)
So what is the best way to allow multiple UpdateList
to run for different buckets, but not for the same? Hangfire has some tools, but they haven’t worked for us:
DisableConcurrentExecution attribute
Does not scale. When processing a big load of jobs fur the same bucket, we observed that out of 20 workers only 1 was processing, the other 19 were waiting for the lock while blocking the processing of other jobs.
Furthermore DisableConcurrentExecution
prevents concurrent execution for different buckets and thus scaling.
Mutexes
We have not tried this out, but are not confident that it works. We have use cases where 10-100k updates are queued and observed that the whole system was overwhelmed by this number. The note Mutexes aren’t suitable for heavy workloads and the concept of just re-scheduling jobs that don’t get to processing seems to make it very likely that mutexes will not prevent the overwhelming. For o lang time there will be 10k jobs competing for the mutex.
We already pay for Hangfire.Pro
and would be willing to pay for Ace if it solved our use case.
Custom state
So we tried implementing a custom state (see example for this concept). Only one job is allowed in the states [enqueued
, processing
], other jobs wait in the custom state BlockedByConcurrent
. This prevents the competing for locks nicely, but involves heavy customisation. With the available extension points it is tricky to prevent stalled for all combinations of concurrent events happening (pulling next job from blocked, another job gets enqueued). We looked into forking Hangfire.SqlServer
, but that is where we stopped and reflected if we went too far.
Chaining with ContinueWith()
Hangfire allows serial processing of jobs with ContinueWith()
. So we tried to just chain together jobs that should not be processed concurrently. This works ok, but it does not feel perfect. The idea of ContinueWith()
is rather a chaining of tasks in one business workflow, e.g. after the mail was generated, send it. But maybe it works anyway? JobContinuationOptions.OnAnyFinishedState
allows to continue even if the previous job fails. If a job in the chain gets deleted, the previous job continues with the following one. So this approach is currently our best hope.
Still the question lures - are we missing something? Is there a better solution (out of the box) as this setting, while not standard, still does not seem to be too unique?
Btw we are using the mediator pattern, the example is adapted to be more standard. In our code it would actually look like
// this is the method that a hangfire worker executes
public class JobExecuter<TCommand> where TCommand : IRequest
{
Task Execute(TCommand command)
}
// command to be executed
public class UpdateListCommand : IRequest
{
Guid BucketId { get; set; }
Segment[] Segments { get; set; }
}
// command gets enqueued (simplified)
var command = new UpdateListCommand { ... };
BackgroundJob.Enqueue<JobExecuter<UpdateListCommand>>(jex => jex.Execute(command));