Architecture: Consider Kron

The last post in our architecture series discussed background processing. There is a special type of background processing that I wanted to make a quick note about. These are things that need to be done periodically or otherwise on a schedule.

In our internal speak, we call this a “kron” job. If you are familiar with cron jobs, it’s the same idea. A product manager misspelled it once and it stuck! We don’t actually use regular cron infrastructure, so the spelling nuance is helpful.

The specifics of how we implement it involve our message bus infrastructure, but I think the concept and the decisions involved could include many other implementations.

When to use it

Let’s take the job from the previous article. The “charge an invoice 24 hours later” case is an interesting one. The system certainly supports delaying that code to run for an arbitrary time, but that’s not always the best idea.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class InvoiceChargeWorker
  include TResque::Worker
  inputs :invoice_id

  worker_lock :invoice_id

  def work
    return unless needed?
    invoice.charge!
  end

  def to_id
    invoice.to_id
  end

  def needed?
    invoice.pending?
  end

  def invoice
    @invoice ||= Invoice.find(invoice_id)
  end
end

# When invoice is created
InvoiceChargeWorker.enqueue_at(24.hours.from_now, invoice_id: invoice.id)

One reason would be memory. When there a lot of invoices (woot!), we still have to save the notion of what should be done somewhere until it gets processed. In this case, the Redis instance will have it stored in memory. The memory could fill up and adding more workers won’t help because of the delay.

The second reason is stability. This is important stuff and Redis could have issues and lose the data. We made everything idempotent and could recreate everything, but it would certainly be a huge hassle.

So when enqueueing something to run in the future, especially if it is important or a long time from now (more than a few minutes), we consider kron.

Batch mode

If we were going to accomplish the same things but on a schedule, the code would have to change in some way. I like the existing worker because it already has the good stuff from the last article: source of truth, knowing whether or not it still needs to be run, and mutual exclusion. When batch processing, I believe it’s also good to still operate on this one at a time where the count (memory for redis) is low or the risk of issues is high. Both are the case here.

To turn it into a batch processor we need to know what needs to be processed at any given moment. This is easy to determine because we have the needed? method. It looks to be invoices that are in the pending state. Sometimes we need to add a state column or other piece of data to know what needs to be in the batch but in this case we are good to go.

From there we can decide if we are going to update the class as-is or make a batch worker. A batch worker is its own worker and would look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class InvoiceChargeBatchWorker
  include TResque::Worker

  worker_lock :all
  queue_lock  :all

  def work
    Invoice.where(stat: 'pending').find_each do |invoice|
      InvoiceChargeWorker.enqueue(invoice_id: invoice.id)
    end
  end
end

# process all pending invoices
InvoiceChargeBatchWorker.enqueue()

That’s it. Because of the worker lock on InvoiceChargeWorker and the state checking, it would be ok even if we were to enqueue it twice or something. Making a custom batch worker also prevents us from running this code twice.

We could also stick it as a class method on the original:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class InvoiceChargeWorker
  include TResque::Worker
  inputs :invoice_id

  worker_lock :invoice_id

  def self.process_all!
    Invoice.where(stat: 'pending').find_each do |invoice|
      self.enqueue(invoice_id: invoice.id)
    end
  end

  def work
    return unless needed?
    invoice.charge!
  end

  def needed?
    invoice.pending?
  end

  def invoice
    @invoice ||= Invoice.find(invoice_id)
  end
end

# process all pending invoices
InvoiceChargeWorker.process_all!

How it works

Again, in any given architecture there is probably a best way to do it. For example, maybe this is a good way to do it on top of Mesos.

The challenge is running something on a schedule. In this case, process all invoices that need to be paid. That is what regular cron is made to do. However, we do not want to run that on every box. If we did that, we would have serious race conditions and might pay an invoice twice. Rather, we want to run it once globally across the entire infrastructure or at least per service.

We could probably do this by noting in the devops setup that one of the servers is special. It should get the cron setup. We could use something like the whenever gem to say what to do and we would only run that on one box per system. It needs to be per system because it has to be able to know what worker to enqueue or, in general, what code to run.

What we do instead is have a single service that has a process that sends out a heartbeat on the message bus. Every minute, it publishes an event that looks like this.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  # for Tue, 11 Apr 2017 00:25:00 UTC +00:00
  # epoch time: 1491870300

  QueueBus.publish(heartbeat_seconds", {
    "epoch_seconds"=>1491870300,
    "epoch_minutes"=>24864505,
    "epoch_hours"=>414408,
    "epoch_days"=>17267,
    "minute"=>25,
    "hour"=>0, 
    "day"=>11,
    "month"=>4,
    "year"=>2017,
    "yday"=>101,
    "wday"=>2
  })

The current code for the process is already checked into queue-bus and ready to use here.

Resque bus supports this using the resque-scheduler gem. It is setup off by calling QueueBus.heartbeat!. We make sure it’s setup every time we start up Resque.

1
2
3
4
5
6
7
8
9
namespace :resque do
  task :setup => [:environment] do
    require 'resque_scheduler'
    require 'resque/scheduler'
    require 'tresque'

    QueueBus.heartbeat!
  end
end

This setup is automatically called every time Resque starts.

Usage

So now we can subscribe to this event to run something every minute, hour, day, Monday, month, whatever.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# every minute
subscribe "every_minute", 'bus_event_type' => 'heartbeat_minutes' do |attributes|
  InvoiceChargeWorker.process_all!
end

# every hour: 4:22, 5:22, 6:22, etc
subscribe "once_an_hour", 'bus_event_type' => 'heartbeat_minutes', 'minute' => 22 do |attributes|
  InvoiceChargeWorker.process_all!
end

# every day at 12:05 am
subscribe "once_a_day", 'bus_event_type' => 'heartbeat_minutes', 'hour' => 0, 'minute' => 5 do |attributes|
  InvoiceChargeWorker.process_all!
end

# every monday at 1:52 am
subscribe "early_monday_morning", 'bus_event_type' => 'heartbeat_minutes', 'wday' => 1, 'hour' => 1, 'minute' => 52 do |attributes|
  InvoiceChargeWorker.process_all!
end

# the 3rd of every month at 2:10 am
subscribe "once_a_month", 'bus_event_type' => 'heartbeat_minutes', 'day' => 3, 'hour' => 2, 'minute' => 10 do |attributes|
  InvoiceChargeWorker.process_all!
end

# every 5 minutes: 4:00, 4:05, 4:10, etc
subscribe "every 5 minutes" do |attributes|
  # if it doesn't fit the subscribe pattern, just subscribe to every minute and use ruby
  next unless attributes['minute'] % 5 == 0
  InvoiceChargeWorker.process_all!
end

Summary

So that is how “kron” works.

Over time, we have decided this is a much more reliable way to process items in the background when a delay is acceptable. By setting up some sort of centralized architecture for this, many services and subscribe in a way that is familiar and unsurprising. We have found a lot of value in that.

Copyright © 2017 Brian Leonard