Hangfire ContinueWith with multiple sources

I’ve already posted to StackOverflow: Ping Pong Jobs

Basically, if I could, from inside the job, put the job on hold until an outside stimulus told hangfire to continue the job I would be golden. I have not yet cracked how to accomplish this though.

Modify your job to take a PerformContext in addition to whatever other parameters it takes. Set it to null when queueing the job. Hangfire will fill this in for you within the job. Assuming the parameter is called cxt the following will give you the job ID. Using it you should be able to reschedule the job to a later time and quit. You’re outside stimulus would then just need to reschedule it again to a more immediate time.
cxt.BackgroundJob.Id

Hope that helps.

On the surface - makes sense! Let me play with it today and tomorrow and I’ll let you know what I found.

Questions:

  1. My biggest concern is maintaining the (Non) Concurrency of the individual job.
  2. Can the outside stimulus pass in updated parameters?

As long as I can get by those 2 hurdles, I can make this work.

Not having as much luck as I would have liked. The closest I can get is a ContinueWith() option. Requeue only takes a job id so I cannot pass in a result of the outside work. I could store in a DB but there is nothing in the Requeue to allow the job to know to look in the DB, or, I always look in the DB. <meh/>

What I am now (trying) to do is this flow:

  1. Job runs strategy “Initialize” - get’s job ID and other (minor) setups, reads from DB to populate data classes, etc
  • Job runs strategy “Processing” - calls outside web service then sets delayed execution for 6 hours in the future (have not yet figured this out 100%)
  • External job finishes, send JSON packet to job endpoint handler
  • If hangfire ID is present, add a ContinueWith() to the original job, requeue the original job (which just completes)
  • “New” Job runs strategy “ExtComplete” - send email, finalizes DB, etc
  • “New” Job runs strategy “Complete” - ends job, original job is ended as well

Each Job can be in any one of these statuses (no jobs use all statuses!):

public enum JobStatus
{
Initialize,
Processing,
Handoff,
Complete,
Warning,
Error,
ExtWarning,
ExtProcessing,
ExtComplete,
ExtError,
}

Each status has a strategy handler class backing it up. In essence, the job will keep calling a strategy handler until it is “Done”:

public string Execute(IServerFilter jobContext, string payload, IJobCancellationToken cancellationToken)
{
while (Payload.JobInfo.JobStatus != JobStatus.Done)
{
cancellationToken?.ThrowIfCancellationRequested();
var jobStrategy = new JobExecutorStrategy(Executors);
payload = jobStrategy.Execute(Payload);
}
return payload;
}

Note: I’ve been looking through the Hangfire code base. I somewhat understand what the various job status classes are kinda of doing. It is not plain (to me) how I could add a few additions to the server code:

  • New State Classes (OnHold, Resume)
  • New State Engine handlers for new state classes
  • BackgroundJob.Resume(jobId, [params passed in], [optional] existingState = [default] any)

OK. I have figured out a hack to get this to work.

As I stated earlier, I am using a strategy pattern to run different parts of each job. I have a JobStatus called Handoff and now do this:

public class Processing : BaseJobExecutor<PayloadModel>, IJobExecutor<PayloadModel>
{
	public Processing(JobPingPong job) : base(job, JobStatus.Processing) {}

	public void Handle()
	{
		JobInfo.JobStatus = JobStatus.ExtProcessing;
		JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
		Payload.PostToQueueText(@"http://localhost:8080/aws/clone");

		// Pause the current job (this is the parent job) so the outside web service has a chance to complete...
		var enqueuedIn = new TimeSpan(0, 6, 0, 0);  // 6 hours out...
		JobPutOnHold(JobInfo.HangfireJobId, enqueuedIn);

		// The next status to be executed upon hydration...
		JobInfo.JobStatus = JobStatus.Complete;
		Job.CachePut();

		// Signal the job executor that this job is "done" due to an outside process needing to run...
		JobInfo.JobStatus = JobStatus.Handoff;
	}
}
public void JobPutOnHold(string jobId, TimeSpan enqueuedIn)
{
	var jobClient = new BackgroundJobClient();
	jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
}

Now, in the strategy executor I can do this:

public string Execute(IServerFilter jobContext, IJobCancellationToken cancellationToken)
{
	while (Payload.JobInfo.JobStatus != JobStatus.Done)
	{
		cancellationToken?.ThrowIfCancellationRequested();
		var jobStrategy = new JobExecutorStrategy<TPayload>(Executors);
		Payload = jobStrategy.Execute(Payload);

		if (Payload.JobInfo.JobStatus == JobStatus.Handoff)
			break;
	}
	return PayloadAsString;
}

The 2nd part of the job fires off the same as the 1st part but comes in from the outside service with an ExtComplete status, which allows the job to execute the post processing based on the results from the outside world (stored in the DB). Like this:

public class ExtComplete : BaseJobExecutor<PayloadModel>, IJobExecutor<PayloadModel>
{
	public ExtComplete(JobPingPong job) : base(job, JobStatus.ExtComplete) { }

	public void Handle()
	{
		// do post processing here...
		Payload.Tokens = null;
		JobInfo.JobStatus = JobStatus.Complete;
		if (JobInfo.HangfireJobId != JobContext.JobId || JobInfo.HangfireParentJobId == JobInfo.HangfireJobId)
		{
			JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
			JobInfo.HangfireJobId = JobContext.JobId;
		}

		// Enqueue the previous (parent) job so it can complete...
		JobExecuteNow(JobInfo.HangfireParentJobId); //, JobInfo.JobQueueName);
	}
}
public void JobExecuteNow(string jobId)
{
	var enqueuedIn = new TimeSpan(0, 0, 0, 15);
	var jobClient = new BackgroundJobClient();
	jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
}

Eventually, the timing will be config driven, but for now I am setting it to have the 1st job pick up execution in 15 seconds.

The only challenge I faced with this approach is the job payload that comes in is the original payload before any processing happened. That is why you see the “caching” up above. When the job restarts I check to see if a cache exists for that Hangfire JobId, if it does, load up the last known payload from cache then allow the executor to go on its merry way.

Works very well so far.

NOTE: I am still trying to learn how to alter/inject the Chain of Command and State Objects in Hangfire to make this more internal to hangfire. We have one job that makes a dozen or more outside calls. Currently, it takes about 12 hours to run.

@odinserj - do you have any sample code on how to add new States and alter the pipeline then inject it correctly? It would also be nice to be able to alter the call within hangfire to reflect the changes to params originally passed in. TIA!