Hangfire Pro with redis. Long running batch jobs executing multiple times


#1

I have a batch job with continuations for sending emails. The issue is sometimes the jobs run multiple times for long running task. Please review the code below:

//prepare
var prepareJob = BatchJob.StartNew(c =>
{
    c.Enqueue(() => PrepareEmail(model, accountId, domain));
});

//send emails
var sendMailJob = BatchJob.ContinueWith(prepareJob, c =>
{
    c.Enqueue(() => SendEmail(model, accountId, emailLayout, domain));
});

//check email delivery status 1, 2, 3
var statusJob1 = BatchJob.ContinueWith(sendMailJob, c =>
{
    c.Schedule(() => GetMessageStatus(model.CampaignId, domain), TimeSpan.FromMinutes(2));
});

var statusJob2 = BatchJob.ContinueWith(statusJob1, c =>
{
    c.Schedule(() => GetMessageStatus(model.CampaignId, domain), TimeSpan.FromMinutes(7));
});

var statusJob3 = BatchJob.ContinueWith(statusJob2, c =>
{
    c.Schedule(() => GetMessageStatus(model.CampaignId, domain), TimeSpan.FromMinutes(20));
});

Multiple workers processing one Job (Hangfire 1.6.16)
#2

Please tell me, how long your long-running background job can be performed? There is the InvisibilityTimeout option in Hangfire.Pro.Redis, that may cause to re-run background jobs that take longer than 30 minutes.

Please also keep in mind that there are rare cases in which Hangfire may perform your background job several times, so consider making your background jobs idempotent.


#3

Hi @odinserj, we are experiencing the same issue and the UI doesn’t explain why the batch job is started multiple times.

In the UI we just see that here is multiple instances of the job being processed but no details about why.
Our batches having upwards of 100k+ items.

And we do have one 1 continuation job which we never see get queued when we queue all the jobs in a batch. If we reduce the batch size down to 50k there doesn’t seem to be any issue but of course we have more than 50k.

How can we diagnose this?


#4

For example @odinserj

`public void GenerateTargetSuggestionsForAllQualifiedAthletes(int? maxItems)
    {
        var generateSuggestionsForProfileIds = new List<int>();
        this.athleteTargetSuggestionGeneratorQueryStore.GetListOfProfileIds(generateSuggestionsForProfileIds, maxItems); //140k items returned

        this.athleteTargetSuggestionGeneratorRepository.EmptyTempTablesForAthleteTargetSuggestionsByCategory();


        var batchId = BatchJob.StartNew(
               batch =>
               {
                   foreach (var profileId in generateSuggestionsForProfileIds)
                   {
                       batch.Enqueue<GenerateTargetSuggestionForSingleAthleteJob>(y => y.Perform(profileId, true));
                   }
               });

           BatchJob.AwaitBatch(
               batchId,
               x =>
               {
                   x.Enqueue<AthleteTargetSuggestionsDatabaseCleanupJob>(y => y.Perform());
               }, null, BatchContinuationOptions.OnAnyFinishedState);
    }

`

If we have a large # of items get queued up, we never see the AwaitBatch job queue. When that happens this parent job that you see here never closes, it is just running, but we see the jobs in the batch begin to process so we believe the queuing of the batch is working.

Often we will also see “Started” show up multiple times in the UI for a batch job, but with no details about what happened. Why would it start over and over again?


#5

PS @odinserj we are using Redis.Pro


#6

Hi Mark,

Since background processing in Hangfire follow the at least once semantics, every of your job can be performed twice or more times, due to the following things: machine restart, process termination or shutdown, connection errors, invisibility timeout. The latter may also be caused by non-synchronized clocks on your machines. Some of these retries are shown in the dashboard, others are only in logs, because retries may happen early before even fetching the background job.

So you should prepare all of your background methods to be idempotent and allow them to run multiple times, checking whether they already performed, and returning from a method if so (with the previous result). You can achieve this by storing the results in a database.

What we should investigate here is why the batch continuation isn’t being fired. Do you see the continuation batch in the dashboard? And what is your invisibility timeout and approximate execution time of the GenerateTargetSuggestionsForAllQualifiedAthletes job? It’s worth to increase the synctimeout setting in your connection string to minutes, since your batches are heavy enough.


#7

@odinserj Re: idempotent. That is something we will work on. Thank you.

We got it to work, but don’t understand why Hangfire likes the below solution better.

Basically instead of populating the batch with all 140k jobs, we get the batch started with 10000, then loop through adding to the batch in segments, and then attach the await job. And this works!!!

For some reason Hangfire didn’t like us pushing in all 140k jobs to start with.

FYI we are using an Azure C3 Redis Instance which seems to have plenty of cache.

Any thoughts as to why this works? Its not intuitive to have to do this, nor does it make sense, but it does work.

`var generateSuggestionsForProfileIds = new List<int>();
        this.athleteTargetSuggestionGeneratorQueryStore.GetListOfProfileIds(generateSuggestionsForProfileIds, maxItems);

        GlobalExceptionHandler.LogWarning($"Recommendations:Fetched {generateSuggestionsForProfileIds.Count} athletes");


        this.athleteTargetSuggestionGeneratorRepository.EmptyTempTablesForAthleteTargetSuggestionsByCategory();

        GlobalExceptionHandler.LogWarning($"Recommendations:Emptied tables");
        
        const int BatchSize = 10000;
        var athleteCount = generateSuggestionsForProfileIds.Count;
        int estimatedBatchCount = (int)Math.Ceiling(athleteCount / (decimal)BatchSize);

        GlobalExceptionHandler.LogWarning($"Recommendations:Starting Batch Queing");

        var batchId = BatchJob.StartNew(
            batch =>
            {
                foreach (var profileId in generateSuggestionsForProfileIds.Take(BatchSize))
                {
                    batch.Enqueue<GenerateTargetSuggestionForSingleAthleteJob>(y => y.Perform(null, profileId, true));
                }
            });

        for (var i = 1; i <= estimatedBatchCount; i++)
        {

            var startIndex = BatchSize * i;

            GlobalExceptionHandler.LogWarning($"Recommendations:Starting loop at {startIndex}");

            if (startIndex > generateSuggestionsForProfileIds.Count)
            {
                break;
            }


            BatchJob.Attach(batchId, batch => {
                foreach (var profileId in generateSuggestionsForProfileIds.Skip(startIndex).Take(BatchSize))
                {
                    batch.Enqueue<GenerateTargetSuggestionForSingleAthleteJob>(y => y.Perform(null, profileId, true));
                }
            });
        }

        GlobalExceptionHandler.LogWarning($"Recommendations:All athletes have been queued");


        BatchJob.AwaitBatch(
            batchId,
            batch =>
            {
                batch.Enqueue<AthleteTargetSuggestionsDatabaseCleanupJob>(y => y.Perform());
            });


        GlobalExceptionHandler.LogWarning($"Recommendations:GenerateTargetSuggestionsForAllQualifiedAthletes done");   `