Architecture: Background Processing
So we have a bunch of models and are doing stuff with them in service objects. The next thing we might need is to process some code in the background.
Not everything can be done inline from the API request. For example, we might need to geocode a user’s postal code when they change it in their account. Or when an invoice is created, we want to charge it 24 hours later.
When working with background jobs, we default to the following practices:
- Workers are enqueued with a dictionary of inputs
- These inputs should be used to fetch data from the source of truth
- Workers know how to check if they still need to run
- Locking schemes should protect parallel execution
Enqueue
When we enqueue a worker, we have found that it’s quite helpful to always use a dictionary (hash) of key/value pairs. Resque and Sidekiq both take a list of arguments like so:
1 2 3 4 5 6 7 8 9 |
|
This has proved to be problematic when adding new parameters or having optional parameters. For example, if we add a new (third) input parameter, there might be stuff in the queue with the old two. When the new code gets deployed, it will throw an ‘invalid number of arguments’ type of error. When using a hash, we can give it a default, fail gracefully, or do whatever we like on a class by class basis.
So to provide better change management and optional arguments, we always do it like so:
1 2 3 4 5 6 7 8 9 10 11 |
|
Source of Truth
Let’s say we want to update a search index every time a user record is changed. We need to write their first name, last name, etc to Elasticsearch.
We could do something like this:
1 2 3 4 5 6 7 8 9 10 11 |
|
This certainly would work, but is not considered best practice. It is better to be idempotent. It writes everything that should ) by passing the minimal information to the background worker, who then looks up the source of truth. That way, if there is any delay between when it is enqueued and run, it will still send the correct information.
The better approach would look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
In the same vein, the worker should be in charge of whether or not it needs to do anything in the first place. For example, we can enqueue a worker to run later about an Invoice
. If, at that time, the payment is Invoice
still should be charged, then charge it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
This is another example of single source of truth. Even for jobs that are run immediately, this check is something we always put in place: return immediately if the worker is no longer relevant.
Mutual Exclusion
Let’s say the User
object can sometimes change a few times rapidly. The “source of truth” approach will make sure the right thing always gets indexed. So that’s great. But it is pretty silly to index the same data twice or more times, right?
In this case, we add a queue lock. The effect is that if something is in the queue and waiting to be processed and you try to enqueue another one with the same inputs, then it will be a no-op. It looks like this:
1 2 3 4 5 6 |
|
Another case that often arises is mutual exclusion for runtime. Maybe weird payment things happen to the payment service if two invoices for the same user are happening at the same time.
In this case, we add a worker lock. The effect is that if something is in the queue and about to start running and there is another running at that moment, then it will re-enqueue itself to run later. It looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
For either type, you don’t have to lock on all the attributes or can (as shown in the last example) use calculations. The namespace of the lock is the worker class name. You can also set the namespace to allow locking between different workers.
Message Bus
Our message bus and our use of background processes have a lot in common. In fact, the message bus is built on top of the same background processing infrastructure. The question that arises is this: when should something be enqueued directly and when should it publish and respond to a bus subscription?
The first note is that you should always be publishing (ABP). It doesn’t hurt anything to give (optional) visibility to other systems what is happening. Or use this as logging framework.
Just publishing, however, doesn’t mean we have to use that to do work in the background. Be can bother publish and enqueue a background worker. We enqueue a worker when the work in the background is essential to the correct operation of the use case at hand.
One example to enqueue directly would be the geocoding worker I mentioned earlier: when the user gives a new postal code, figure out where that is. It’s key to the account management system.
The search example I’ve been using might not actually be the best one because we would have the search system subscribed to changes in the account system. What I didn’t show that the enqueue
call might actually happen from within a subscription.
1 2 3 |
|
So these two concepts can work together. Why not just index it right in the subscription, though? A primary reason might be to use some of the locking mechanisms as the bus does not have that. It also might be the case that the worker is enqueued from other locations and this keeps things DRY. The worker is also easier to unit test.
TResque
We use Resque as a base foundation and built on top of it with an abstraction layer called TResque. That’s TR (TaskRabbit) Resque. Get it? It puts all of these practices into place as well as adding and abstraction layer for the inevitable, but as yet unprioritized, move to Sidekiq.
I don’t necessarily expect anyone to use this, but it doesn’t hurt to make it available as an example of how we are using these tools.
You define a worker and enqueue things as show in the examples above. Then only layer left is around prioritization. You can give a queue name to a worker and then register what priority those workers are. If no queue is given, it is assumed to be the default
queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
Then when you run Resque, you can use these registrations to process the queues in the right order.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
1 2 |
|
This registration layer allows each of the systems (engines) to work independently and still have centralized background processing.