ReScheduling the job to Default Queue despite it is being set in another Queue

queues
recurring
Tags: #<Tag:0x00007f8b9ef00880> #<Tag:0x00007f8b9ef00510>

#1

Hi
In version 1.3.3.0 I am experience an strange behaviour which is as follows.
I have two separated physical services and each of them belongs to an specific queue.
everything works perfectly fine when there is no exception, however when an exception happens, the system reschedule the job for the default Queue which neither of services is listening to.

I would be thankful if you can let me know if this is a bug on that version or it can be handle somehow!
As the name of queue is vary , I have tried JobFilterAttribute and other interceptors, however the job is strangely being assigned into a DEFAULT queue which technically neither of services are listening to.


Job reschedule puts job in DEFAULT queue
#2

I have also tested the scenario on latest version and same problem detected.


#3

Do you happen to use BackgroundJobClient.Create() rather than BackgroundJobClient.Enqueue()? I had the same problem last time, and I found out that ONLY when you use Enqueue Hangfire will respect the original queue name during the retry.


#4

Unfortunately once a job is being failed, it re queue it to Default Queue even I don’t have this Queue.
This is the code I am using and it is still causing same problem…
var client = new BackgroundJobClient();
var state = new EnqueuedState(queueName);
var result=client.Create(()=>JobProcessor.Execute(value), state);


Queue name will become "default" after automatic fail retry
#5

Do this instead.

var result = client.Enqueue(() => JobProcessor.Execute(value));

If you are concerned that you cannot put the queue name, you can actually put the queue name as an attribute in the JobProcessor.Execute

[Queue("specificqueue")]
public static void Execute(object value){
    ....
}

#6

I don’t like having to decorate my methods with attributes so have looked at using Job Parameters

Using a Job Filter, detect when the job is being enqueued then store the queuename in the job parameter.
When a retry occurs it will be enqueued again, so we can set the queue name at the point it is enqueued

In your filter, implement Hangfire.States.IElectStateFilter

public void OnStateElection(Hangfire.States.ElectStateContext context)
    {          
        var enqueuedState = context.CandidateState as Hangfire.States.EnqueuedState;
        if (enqueuedState != null)
        {
            var qn = context.GetJobParameter<string>("QueueName");
            if (!String.IsNullOrWhiteSpace(qn))
            {
                enqueuedState.Queue = qn;
            }
            else
            {
                context.SetJobParameter("QueueName", enqueuedState.Queue);
            }
        }
    }

Still learning hangfire so interested to know of any drawbacks to this approach


#7

I fixed the problem coding a few.

Please refer to my branch (requeue-to-initial-queue):


#8

Is there a pull request to get this back into the main repo? I’m having the same problem, and this is definitely a bug in my eyes.

Thanks


#9

Never mind, it is -> https://github.com/HangfireIO/Hangfire/pull/502

Thanks


#10

Similar behavior seems to occur for us whenever .ContinueWith(…) throws an exception. Has anyone else experienced this?


#11

Has there been any official update to this issue on the main software?

We have had so much trouble with setting up queues, and then jobs go back to default when they fail / retry.

I have seen a number of discussions online about this, but is there an official fix. I see this issue is almost 5 years old now?


#12

Though this thread is pretty old, adding solution I found out for Hangfire with dotnet core, so it might help somebody.

if you are using Aspnet core 3.1, add below filter for hangfire queues:

public class PreserveOriginalQueueAttribute : JobFilterAttribute, IApplyStateFilter, IElectStateFilter
{
    public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
    {
        var id = context.BackgroundJob.Id;
        var state = context.Connection.GetStateData(id);
        var initialQueue = context.Connection.GetJobParameter(id, JobParam.InitialQueue);
        state.Data.TryGetValue(Constants.Queue, out string name);          
        if (string.IsNullOrEmpty(initialQueue))
        {
            context.Connection.SetJobParameter(id, JobParam.InitialQueue, name);           
        }
        if(state.Name.Equals(States.Scheduled) && state.Reason.StartsWith(Reason.Retry))
        {
            //FailedState
            if(context.NewState is EnqueuedState newState)
            {
                newState.Queue = initialQueue;
                context.Connection.SetJobParameter(context.BackgroundJob.Id, JobParam.QueueReason, Reason.Retry);
            }
        }           
        if(context.NewState is ProcessingState processingState)
        {
            //Remove QueueReason Param for subsequent manual triggers.

            context.Connection.SetJobParameter(context.BackgroundJob.Id, JobParam.QueueReason, null);
        }
    }
    public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
    {
    }
  
    /// For changing state from default to original queue for manual trigger
  
    public void OnStateElection(ElectStateContext context)
    {  
        var initialQueue = context.Connection.GetJobParameter(context.BackgroundJob.Id, JobParam.InitialQueue);
        var queueReason = context.Connection.GetJobParameter(context.BackgroundJob.Id, JobParam.QueueReason);
        if (context.CurrentState.Equals(States.Enqueued) &&  !string.IsNullOrEmpty(initialQueue) && string.IsNullOrEmpty(queueReason))
        {
            context.Connection.SetJobParameter(context.BackgroundJob.Id,JobParam.QueueReason, Reason.Trigger);
            context.CandidateState = new EnqueuedState(initialQueue);
        }
    }
}
 static internal class States
{
  public static readonly string Enqueued = "Enqueued";
  public static readonly string Scheduled = "Scheduled";
}
static internal class JobParam
{
    public static readonly string InitialQueue = "InitialQueue";
    public static readonly string QueueReason = "QueueReason";
}
static internal class Reason
{
    public static readonly string Trigger = "Trigger";
    public static readonly string Retry = "Retry";
}

You can add this filter in startup.cs:
GlobalConfiguration.Configuration.UseActivator(new CustomHangfireActivator(serviceProvider))
.UseFilter(new PreserveOriginalQueueAttribute());

This will switch to original queues for failed jobs… while triggering job from dashboard, this will just change state to default and instantly change to original queue.


#13

Not seeing the definition of Constants.Queue, I assume it is a string with the string value"Queue"?

And I also assume that using the built in activator using the DI system from aspnetcore is also allowed, meaning that

GlobalJobFilters.Filters.Add(new PreserveOriginalQueueAttribute());

should also work??