Different Queues having different Worker counts


#1

Morning All.

I have a project i am looking at using Hang Fire for, which has some “interesting” stipulations. There are 3 parts of the process and they have different limits on the number of threads that can be kicked off on each. The first part generates a document, which i seem to be able to kick off 8 or so threads for. the next part does document signing, which i can reliably kick off about 10 threads on. finally, we send them by email, which has no defined limit, but should have a random wait time between sending and should probably be less than 120 per min, give or take.

I have the second part does, the document signing, and Hang Fire has the worker roles set to 10, but if i wanted to create a second and third queue, each with different worker counts, do i need to create new servers? This will be running as a Windows Service, by the way.

Also, on a related note, is it possible to “queue jump”? If we get a high priority request for a signed document, how would we go about getting that processed ASAP, and not go to the back of the queue?

Thanks in advance.

–Tiernan


#2

Hello, @tiernano, and sorry for the delay.

Yes, stipulations are very interesting. As a disclaimer, I should inform you that you can use multiple queues to perform the prioritization:

app.UseHangfire(cfg => cfg.UseServer("high", "normal", "low"));

Currently you are able to specify the exact amount of worker by using multiple Hangfire servers (you should also give them unique names) in one process:

var documentOptions = new BackgroundJobServerOptions
{
    ServerName = String.Format("{0}:documents", Environment.MachineName),
    WorkerCount = 8,
    Queues = new [] { "documents" } 
};

cfg.UseServer(options);

var signingOptions = new BackgroundJobServerOptions
{
    ServerName = /* {0}:signing */,
    WorkerCount = 10,
    Queues = new [] { "signing" }
};

cfg.UseServer(signingOptions);

// Other servers

Email throttling is an interesting issue, but I think it can be done using waits in the method itself. This is a naive implementation:

public void SendEmail()
{
    if (Database.GetSentEmailThisMinuteCount > MaxEmailsPerMinute)
    {
        Thread.Sleep(/* some amount of time */);
    }
}

Queue jump is impossible now. You can split high priority requests to multiple queues: document-high, document-normal and pass them to the server.


#3

So when creating a job to the queue “signing” as described here the server that handles this queue will automatically be called?


#4

Yep, if you put background job to the signing queue, it will be handled by the signing server when using this configuration.


#5

ok, I get “Cannot create an instance of an inteface” when trying to do that… Do I need to set any properties of the EnqueuedState object?
Here is the code i use:

EnqueuedState queueState = new Hangfire.States.EnqueuedState(param1.ToLower());    
_jobClient.Create<IFoo>(x => x.Bar(param1, param1, param3), queueState);

I am using autofac and it has registerred the IFoo interface for sure since I used _jobClient.Enqueue successfully just before changing to _jobClient.Create…

GlobalConfiguration.Configuration
                .UseSqlServerStorage(connstring)
                .UseAutofacActivator(container);
            JobActivator.Current = new AutofacJobActivator(container);

#6

I solved the issue using a jobfilter as described in the following post. This way retries are put to the correct queue also. One queue for the whole farm, and one queue by server


#7

ok, I’m running into problems here… how can I use the autofac container already registerred? Please see code below. This code fails because It needs a default constructor…

…I tried this and it works, question is if this is the best way…

(ICalculationEngineWorkQueueProcessor)System.Web.Http.GlobalConfiguration.Configuration.DependencyResolver.GetService(typeof(ICalculationEngineWorkQueueProcessor));

class to handle jobs

public class HangfireJobWrapper 
    {
        private readonly ICalculationEngineWorkQueueProcessor _calculationEngineWorkQueueProcessor;
        
        public HangfireJobWrapper(ICalculationEngineWorkQueueProcessor calculationEngineWorkQueueProcessor)
        {
            _calculationEngineWorkQueueProcessor = calculationEngineWorkQueueProcessor;
        }

        [UseQueueFromParameter(0)] 
        public void ProcessUserBudgetLineJob(string queuename, string userId, int budgetId, long executionId, CalcLineType calcLineType)
        {
            _calculationEngineWorkQueueProcessor.ProcessQueue(userId,budgetId,executionId,calcLineType);
        }

        [UseQueueFromParameter(0)]
        public void ProcessBudgetLineJob(string queueName, EntityType entity, int budgetId, long executionId, CalcLineType calcLineType)
        {
            _calculationEngineWorkQueueProcessor.ProcessQueue(entity, budgetId, executionId, calcLineType);
        }

        [UseQueueFromParameter(0)]
        public void ProcessBulkItem(string queueName, int workQueueItem)
        {
            _calculationEngineWorkQueueProcessor.ProcessBulkWorkQueuItem(workQueueItem);
        }
    }

and this is what I do from my web api controller

_jobClient.Enqueue<HangfireJobWrapper>(x => x.ProcessUserBudgetLineJob(
                        userId, 
                        userId,
                        budgetId,
                        executionId,
                        CalcLineType.Regular));

#8

I have created an extension to try and simplify this sort of problem and allow you to have many different worker counts for different jobs. Once installed you just add an attribute to the job:

public class ExampleJob
{
    [MaximumConcurrentExecutions(3)]
    public void MethodName(int arg1)
    {
       ....
    }
}

See https://github.com/alastairtree/Hangfire.MaximumConcurrentExecutions for more info.