Jobs in Enqueue state, most never run

Tags: #<Tag:0x00007f186089bd48>
`
[assembly: OwinStartup(typeof(Poc.QueueApp.Web.Startup))]
namespace Poc.QueueApp.Web
{
	public partial class Startup
	{
		public void Configuration(IAppBuilder app)
		{
			Framework.Initialize();
			AreaRegistration.RegisterAllAreas();
			GlobalConfiguration.Configure(WebApiConfig.Register);
			FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
			RouteConfig.RegisterRoutes(RouteTable.Routes);
			BundleConfig.RegisterBundles(BundleTable.Bundles);
			UnityConfig.RegisterUnity();


			ConfigureAuth(app);
			HangfireConfig.RegisterHangfire(app);
		}
	}
}

namespace Poc.QueueApp.Web.App_Start
{
	public class HangfireConfig
	{
		public static void RegisterHangfire(IAppBuilder app)
		{
			GlobalConfiguration.Configuration.UseSqlServerStorage(Globals.DatabaseHangfire);
			app.UseHangfireDashboard();
			var options = new BackgroundJobServerOptions { Queues = Globals.QueueNames };
			app.UseHangfireServer(options);
		}
	}
}

namespace Poc.QueueApp.Common
{
	public static class Globals
	{
		public const string BaseAddress = "http://localhost:9900/";
		public const string DatabaseHangfire = "Server=kbarrow1-vm;Database=Hangfire;Integrated Security=true";
		public static string[] QueueNames => Queues.Select(kvp => kvp.Value).ToArray();
		public static readonly Dictionary<int, string> Queues = new Dictionary<int, string>
		{
			{0, "critical"},
			{1, "job_queue"},
			{2, "adhoc"},
		};
		public static string QueueName(QueueName name) { return Queues[(int) name]; }
	}
}

namespace Poc.QueueApp.Common.Interfaces
{
	public interface IJob
	{
		bool Execute(string payload);
	}
	public interface IJobPayload
	{
		string JobName { get; set; }
		string JobQueueName { get; set; }
		int Id { get; set; }
		JobType JobType { get; set; }
		CronExpression Cron { get; set; }
	}
}

namespace Poc.QueueApp.Web.Jobs
{
	public class CriticalJob : IJob
	{
		[Queue("critical")]
		public bool Execute(string payload)
		{
			try
			{
				var command = JsonConvert.DeserializeObject<Payload>(payload);
				File.WriteAllText($"c:\\logs\\CriticalJob-CriticalQueue-Success-{DateTime.Now:yy-MMM-dd-HH-mm-ss}.txt",
					$"In Queue: {Globals.QueueName(QueueName.Critical)}\r\npublic class CriticalJob\r\n{payload}");
			}
			catch (Exception ex)
			{
				File.WriteAllText($"c:\\logs\\CriticalJob-CriticalQueue-Failure-{DateTime.Now:yy-MMM-dd-HH-mm-ss}.txt",
					$"In Queue: {Globals.QueueName(QueueName.Critical)}\r\npublic class CriticalJob\r\n{ex.Message}\r\n{ex.StackTrace}");
			}
			return true;
		}

		public class Payload : IJobPayload
		{
			public string JobName { get; set; }
			public string JobQueueName { get; set; }
			public int Id { get; set; }
			public JobType JobType { get; set; }
			public CronExpression Cron { get; set; }
			//---
			public string ExtraData { get; set; }
		}
	}
}

// Home page portion (for testing) - runs 3 jobs in 3 different ways
namespace Poc.QueueApp.Web.Controllers
{
	public class HomeController : Controller
	{
		public ActionResult Index()
		{
			ViewBag.Title = "Home Page";

			// Check the status and reachability of hangfire...
			var job = new CriticalJob();
			var payload = new CriticalJob.Payload
			{
				JobType = JobType.AdHoc,
				JobQueueName = Globals.QueueName(QueueName.Critical),
				Id = 999,
				JobName = job.ClassName(),
				Cron = new CronExpression(0, 0, 1, "1", "0", 2016),
				ExtraData = "This is a test job to make sure the Critical Queue is up and running when this web app first launches!",
			};
			var payloadString = JsonConvert.SerializeObject(payload);

			// Test directly...
			var id = BackgroundJob.Enqueue(() => job.Execute(payloadString));
			ViewBag.DirectResponse = $"ID: {id}";

			// Test by pulling the JOB from Unity...
			payload.ExtraData = "This is a test job to make sure the Critical Queue is up and running and reachable by using UNITY when this web app first launches!";
			payloadString = JsonConvert.SerializeObject(payload);
			var container = UnityConfig.GetConfiguredContainer();
			var command = JsonConvert.DeserializeObject<CriticalJob.Payload>(payloadString);
			var job2 = container.Resolve<IJob>(command.JobName);
			id = BackgroundJob.Enqueue(() => job2.Execute(payloadString));  // <== ENQUEUES IN THE DEFAULT QUEUE BUT DOES NOT RUN
			ViewBag.UnityResponse = $"ID: {id}";

			// Test via REST service call...
			payload.ExtraData = "This is a test job to make sure the Critical Queue is up and running and reachable by a REST call when this web app first launches!";
			var response = payload.PostJobToQueue();  // <== ENQUEUES IN THE DEFAULT QUEUE BUT DOES NOT RUN
			ViewBag.RestResult = $"REST Response: {response}";
			ViewBag.RestResponse = $"ID: {response.Content.ReadAsStringAsync().Result}";

			return View();
		}
	}
}

// REST Handler...
namespace Poc.QueueApp.Web.Controllers
{
	public class QueueController : BatcApiController
	{
		[HttpPost]
		public string Post()
		{
			var json = Request.Content.ReadAsStringAsync().Result;
			var queue = new Queue();
			return queue.Execute(json);
		}
	}
}

namespace Poc.QueueApp.Web.Managers
{
	public class Queue
	{
		public string Execute(string payload)
		{
			var container = UnityConfig.GetConfiguredContainer();
			var command = JsonConvert.DeserializeObject<Payload>(payload);
			var job = container.Resolve<IJob>(command.JobName);
			string id;

			switch (command.JobType)
			{
				case JobType.AdHoc:
					id = BackgroundJob.Enqueue(() => job.Execute(payload));
					break;
				case JobType.Continuation:
					// removed for clarity...
					break;
				case JobType.Delayed:
					// removed for clarity...
					break;
				default:
					// removed for clarity...
					break;
			}
			return id;
		}

		// A generic (base) Payload object to allow JsonConvert to work 
		private class Payload : IJobPayload
		{
			public string JobName { get; set; }
			public string JobQueueName { get; set; }
			public int Id { get; set; }
			public JobType JobType { get; set; }
			public CronExpression Cron { get; set; }
		}
	}
}
 `

What am I doing wrong?

Just found another (bug?) and commented on that one. Cross linking:

Hello? Anyone here? It appears dead!

@KeithBarrows, if you are using instance methods, always use the .Enqueue<T> method overloads:

// This is for instance methods
/// Place [Queue] attribute to the IJob.Execute method
BackgroundJob.Enqueue<IJob>(x => x.Execute("payload"));

// This is for static methods
BackgroundJob.Enqueue(() => Program.Main());

Never use the following construct, as it even does not make any sense, because of unnecessary creation of the Job class instance.

// Don't do this at home
var job = new Job();
BackgroundJob.Enqueue(() => job.Execute());
1 Like

I think I am missing something here. I did as you suggested and now nothing runs.

How do you get the concrete class from this? What I currently see is the Interface definition is being spun up as if it were a class; which just does not work!

Hangfire uses the JobActivator abstraction to create class instances based on a type that passed to a background job, just before performance. There are different integrations with popular IoC containers, you can search Hangfire.Unity on NuGet and read its documentation on GitHub.

Yea - I am lost. In the weeds.

If I follow the (very slim) example correctly: I create a service class that will execute the actual concrete object for me. However, the concrete job is decorated with which queue I want to send it to, the Service class is not. Therefore, every single job I queue up is done so in the DEFAULT queue only.

My approach:

interface IJob defines a single method ==> Execute(string payload)
interface IJobPayload defines a minimum set of properties (UserId, JobName, Id, JobType, Cron)

[Queue("critical")] class Job1 : IJob {...}
[Queue("doors")] class Job2 : IJob {...}
[Queue("doors")] class Job3 : IJob {...}
[Queue("lights")] class Job4 : IJob {...}
[Queue("lights")] class Job5 : IJob {...}
[Queue("adhoc")] class Job6 : IJob {...}
...
[Queue("critical")] class JobN : IJob {...}

All of these jobs are registered with Unity. Each concrete Job has a Payload sub class that inherits from IJobPayload and defines additional data for the job if needed. Now, I created a Service object that will be enqueued:

    public class JobService
    {
        private readonly IUnityContainer _container = UnityConfig.GetConfiguredContainer();

        public bool Execute(string payload)
        {
            var command = JsonConvert.DeserializeObject<PayloadStub>(payload);
            var job = _container.Resolve<IJob>(command.JobName);
            return job.Execute(payload);
        }

        // ReSharper disable once ClassNeverInstantiated.Local
        private class PayloadStub : IJobPayload
        {
            public string UserId { get; set; }
            public string JobName { get; set; }
            public string JobQueueName { get; set; }
            public int Id { get; set; }
            public JobType JobType { get; set; }
            public CronExpression Cron { get; set; }
        }
    }

Now, when a POST call comes to my ApiController I am doing this:

[HttpPost]
public string Post()
{
    var json = Request.Content.ReadAsStringAsync().Result;
    var queue = new QueueHandler();
    return queue.Load(json);
}

The controller calls the QueueHandler and Enqueues the job via the JobService object:

public class QueueHandler
{
    public string Load(string payload)
    {
        var command = JsonConvert.DeserializeObject<PayloadStub>(payload);
        string id;

        switch (command.JobType)
        {
            case JobType.AdHoc:
                id = BackgroundJob.Enqueue<JobService>(x => x.Execute(payload));
                break;
            ...
        }
        return id;
    }
}

Still getting the same results and have the added drawback of losing the target Queue.

How would you do this?

Through all of this, the only job that SUCCEEDS is the job setup the way you said don’t do this!

// Don't do this at home
var job = new Job();
BackgroundJob.Enqueue(() => job.Execute());

@KeithBarrows, what is the purpose of the IJob interface, why not to make your jobs much simpler and enqueue specific classes instead of creating a bunch of unnecessary types (perhaps you are moving from another background processing solution)?

BackgroundJob.Enqueue<Job1>(x => x.Execute(userId: 12, id: 12341));
BackgroundJob.Enqueue<Job2>(x => x.Execute(userId: 42, id: 613));
BackgroundJob.Enqueue<JobN>(x => x.Execute(userId: 64, id: 5331));

The () => job.Execute() semantics is incorrect, because it creates an impression that the whole object is serialized or somehow passed to the background. And this is not true, since only fully qualified type name (with assembly), method name and actual arguments are serialized.

Nevertheless, this semantics is already enabled, and we can’t remove it without producing a breaking change. And since it leads to an inconsistent behavior (wrong queue, when creating a background job, and correct queue afterwards), we should fix this.

Bug report is moved to GitHub, please use GitHub issues to discuss the bug, leaving this topic for general discussion.

Hi @odinserj.

Yes, we are moving (investigating really) from a home grown job system. The direction we want to go is to use the IJob as the blueprint for all jobs. I did find a problem with our approach last night: The Execute definition is defined as bool Execute(string payload) and should be defined as void Execute(string payload) instead since your Enqueue method is defined as Action. Both of your Action based methods do not return a value. One of them does not accept a value while the other accepts a single value. The 2 Func based methods you define both return a Task (awaitable, async).

I switched up my IJob definition to void and most are running now. I have not tested the queue destination part yet. However, I am still having to do:

id = BackgroundJob.Enqueue(() => job.Execute(payload));
BackgroundJob.Requeue(id);

For now, a work around. Not pretty but it works.

To further explain why IJob:

We have several Line of Business apps. Most have a SQL or Oracle store for working with transitional data. All have several SOAP/REST based endpoints also treated as data (repositories). These are based on several large COTS apps purchased over the years for inventory management, assembly management, etc. The jobs that we define all work within this ecosystem. Since we have a few score of supporting apps we find ourselves re-adding just the parts we need in a (small) monolithic way.

IoC containers allow one way to scan a directory for multiple DLLs and, as you know, we can filter the matches with interfaces. Interfaces like: IJob, IWebRepository, ISqlRepository, IOracleRepository, IService, etc. With this approach, we are slowly pulling common code out of multiple apps and encapsulating each in their own project. IJob turns out to be a pool of several score projects. Because the Job collection is a growing concern (pretty much any action that takes more than 10 seconds) using a switch statement is not maintainable. So, we are leaning on IoC Containers (Unity in this case) to be able to get a concrete implementation on the fly rather than for DI use. (And we do use the DI aspect within our apps as well.)

The payload we are working on is not cemented down yet. We’ve used JSON to move data around in other areas and it seemed to be a good fit here as well. Yes, it contains an ID, UserName, JobName (Job’s Class Name), and a couple other bits. Most times, the working data is stored in a DB and the job will use the payload to get the key information to hydrate any objects we need.

I hope this helps clarify why the IJob approach.