Architecture: Background Processing

So we have a bunch of models and are doing stuff with them in service objects. The next thing we might need is to process some code in the background.

Not everything can be done inline from the API request. For example, we might need to geocode a user’s postal code when they change it in their account. Or when an invoice is created, we want to charge it 24 hours later.

When working with background jobs, we default to the following practices:

  • Workers are enqueued with a dictionary of inputs
  • These inputs should be used to fetch data from the source of truth
  • Workers know how to check if they still need to run
  • Locking schemes should protect parallel execution

Enqueue

When we enqueue a worker, we have found that it’s quite helpful to always use a dictionary (hash) of key/value pairs. Resque and Sidekiq both take a list of arguments like so:

1
2
3
4
5
6
7
8
9
class HardWorker
  include Sidekiq::Worker
  def perform(name, count)
    # do something with name, count
  end
end

# enqueue
HardWorker.perform_async('bob', 5)

This has proved to be problematic when adding new parameters or having optional parameters. For example, if we add a new (third) input parameter, there might be stuff in the queue with the old two. When the new code gets deployed, it will throw an ‘invalid number of arguments’ type of error. When using a hash, we can give it a default, fail gracefully, or do whatever we like on a class by class basis.

So to provide better change management and optional arguments, we always do it like so:

1
2
3
4
5
6
7
8
9
10
11
class HardWorker
  include TResque::Worker
  inputs :name, :count

  def work
    # do something with self.name, self.count
  end
end

# enqueue
HardWorker.enqueue(name: 'bob', count: 5)

Source of Truth

Let’s say we want to update a search index every time a user record is changed. We need to write their first name, last name, etc to Elasticsearch.

We could do something like this:

1
2
3
4
5
6
7
8
9
10
11
class UserIndexWorker
  include TResque::Worker
  inputs :id, :first_name, :last_name, :etc

  def work
    Elasticsearch.index('users').write(id, id: id, first_name: first_name, last_name: last_name, etc: etc)
  end
end

# When user changes
UserIndexWorker.enqueue(user.attributes.slice(:id, :first_name, :last_name, :etc))

This certainly would work, but is not considered best practice. It is better to be idempotent. It writes everything that should ) by passing the minimal information to the background worker, who then looks up the source of truth. That way, if there is any delay between when it is enqueued and run, it will still send the correct information.

The better approach would look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class UserIndexWorker
  include TResque::Worker
  inputs :user_id

  def work
    Elasticsearch.index('users').write(user.attributes.slice(:id, :first_name, :last_name, :etc))
  end

  def user
    @user ||= User.find(user_id)
  end
end

# When user changes
UserIndexWorker.enqueue(user_id: user.id)

In the same vein, the worker should be in charge of whether or not it needs to do anything in the first place. For example, we can enqueue a worker to run later about an Invoice. If, at that time, the payment is Invoice still should be charged, then charge it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class InvoiceChargeWorker
  include TResque::Worker
  inputs :invoice_id

  def work
    return unless needed?
    invoice.charge!
  end

  def needed?
    invoice.pending?
  end

  def invoice
    @invoice ||= Invoice.find(invoice_id)
  end
end

# When invoice is created
InvoiceChargeWorker.enqueue_at(24.hours.from_now, invoice_id: invoice.id)

This is another example of single source of truth. Even for jobs that are run immediately, this check is something we always put in place: return immediately if the worker is no longer relevant.

Mutual Exclusion

Let’s say the User object can sometimes change a few times rapidly. The “source of truth” approach will make sure the right thing always gets indexed. So that’s great. But it is pretty silly to index the same data twice or more times, right?

In this case, we add a queue lock. The effect is that if something is in the queue and waiting to be processed and you try to enqueue another one with the same inputs, then it will be a no-op. It looks like this:

1
2
3
4
5
6
class UserIndexWorker
  include TResque::Worker
  inputs :user_id

  queue_lock :user_id
end

Another case that often arises is mutual exclusion for runtime. Maybe weird payment things happen to the payment service if two invoices for the same user are happening at the same time.

In this case, we add a worker lock. The effect is that if something is in the queue and about to start running and there is another running at that moment, then it will re-enqueue itself to run later. It looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class InvoiceChargeWorker
  include TResque::Worker
  inputs :invoice_id

  worker_lock :to_id

  def work
    return unless needed?
    invoice.charge!
  end

  def to_id
    invoice.to_id
  end

  def needed?
    invoice.pending?
  end

  def invoice
    @invoice ||= Invoice.find(invoice_id)
  end
end

For either type, you don’t have to lock on all the attributes or can (as shown in the last example) use calculations. The namespace of the lock is the worker class name. You can also set the namespace to allow locking between different workers.

Message Bus

Our message bus and our use of background processes have a lot in common. In fact, the message bus is built on top of the same background processing infrastructure. The question that arises is this: when should something be enqueued directly and when should it publish and respond to a bus subscription?

The first note is that you should always be publishing (ABP). It doesn’t hurt anything to give (optional) visibility to other systems what is happening. Or use this as logging framework.

Just publishing, however, doesn’t mean we have to use that to do work in the background. Be can bother publish and enqueue a background worker. We enqueue a worker when the work in the background is essential to the correct operation of the use case at hand.

One example to enqueue directly would be the geocoding worker I mentioned earlier: when the user gives a new postal code, figure out where that is. It’s key to the account management system.

The search example I’ve been using might not actually be the best one because we would have the search system subscribed to changes in the account system. What I didn’t show that the enqueue call might actually happen from within a subscription.

1
2
3
subscribe "user_changed" do |attributes|
  UserIndexWorker.enqueue(user_id: attributes['id'])
end

So these two concepts can work together. Why not just index it right in the subscription, though? A primary reason might be to use some of the locking mechanisms as the bus does not have that. It also might be the case that the worker is enqueued from other locations and this keeps things DRY. The worker is also easier to unit test.

TResque

We use Resque as a base foundation and built on top of it with an abstraction layer called TResque. That’s TR (TaskRabbit) Resque. Get it? It puts all of these practices into place as well as adding and abstraction layer for the inevitable, but as yet unprioritized, move to Sidekiq.

I don’t necessarily expect anyone to use this, but it doesn’t hurt to make it available as an example of how we are using these tools.

You define a worker and enqueue things as show in the examples above. Then only layer left is around prioritization. You can give a queue name to a worker and then register what priority those workers are. If no queue is given, it is assumed to be the default queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
require 'tresque'

module Account
  class RegularWorker
    include ::TResque::Worker
    # defaults to account_default queue
  end
end

module Account
  class RegularWorker
    include ::TResque::Worker
    queue :refresh # lower priority account_refresh queue
  end
end

TResque.register("account") do
  queue :default, 100
  queue :refresh, -5000
end

Then when you run Resque, you can use these registrations to process the queues in the right order.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
require 'resque/tasks'
require 'resque_scheduler/tasks'
require "resque_bus/tasks"

namespace :resque do
  task :setup => [:environment] do
    require 'resque_scheduler'
    require 'resque/scheduler'
    require 'tresque'
  end

  task :queues => [:setup] do
    queues = ::TResque::Registry.queues
    ENV["QUEUES"] = queues.join(",")
    puts "TResque: #{ENV["QUEUES"]}"
  end
end
1
2
  $ bundle exec rake resque:queues resque:work
  TResque: account_default, account_refresh

This registration layer allows each of the systems (engines) to work independently and still have centralized background processing.

Copyright © 2017 Brian Leonard