Horizontally scaling consumers with Eventide

(This is part of a larger series on scaling autonomous components. If you like, you can start at the beginning.)

Yesterday we went through Message DB’s capabilities for supporting horizontally scaled consumers. If you want to write your own Message DB client, that will help you understand how to enable horizontal scaling.

Today we go through how to use The Eventide Project to leverage those capabilities.

Starting your consumer

In a typical Eventide component, you’ll have a start.rb file in your component’s folder. For example, here is the account-component’s:

module AccountComponent
  module Start
    def self.call
      Consumers::Commands.start('account:command')
      Consumers::Commands::Transactions.start('accountTransaction')
    end
  end
end

Those start calls can take additional arguments. Three of them are interesting to us right now, namely group_member, group_size, and identifier.

group_member and group_size are passed directly as the consumer_group_member and consumer_group_size arguments to Message DB’s get_category_messages that was the main topic of yesterday’s missive.

So if you have 3 total instances, you could start instance 0 as follows (the module lines are omitted for space):

def self.call
  Consumers::Commands.start('account:command', group_member: 0, group_size: 3)
  Consumers::Commands::Transactions.start('accountTransaction', group_member: 0, group_size: 3)
end

You would do the same thing with instances 1 and 2, only the group_member value would change accordingly for those instances.

If you remember from yesterday, an instance gets the messages where the hash of the stream_name modulo group_size is equal to that instance’s group_member. With all this combined, hopefully a couple of things stand out:

  1. All the instances need to agree on the group_size, and
  2. If the group_size changes, then a given instance is likely to become responsible for streams it wasn’t responsible for before

You can handle the first point easily enough by supplying the group_size as an environment variable. You can choose another means, of course, but that’s a way to handle it.

This second point leads into the third parameter that matters, identifier .

identifier affects where consumer positions are stored

Eventide consumers periodically make a record of the last message they’ve processed. By default, this is every 100 messages (incidentally controlled by another one of the arguments you pass to start).

Also by default, components store these positions in streams. In the account-component example, using Consumers::Commands.start('account:command') to start a consumer results in a position stream named account:command+position. It takes the consumer’s category, and adds the position type modifier.

Adding group_member and group_size does not change this!

If we started all three of our instances as written above, they’d all write their position updates to the same stream. In so doing, they would clobber one another and miss messages. Instance 0 might say it processed through global_position 300, and instance 1 could pick that up as its starting point on a restart, even though it had only processed through global_position 200. They’re looking in the same place for their starting point.

Enter the identifier parameter. It will be added to the position stream’s name as, wait for it, the identifier portion of the stream_name. If you started a consumer with Consumers::Commands.start('account:command', identifier: 'hello') , position updates would go to the account:command+position-hello stream.

Bringing it together

Okay, so what do we put in our identifier? Each of our instances needs its position updates to go to a different stream. So a starting point might be:

def self.call
  group_size = ENV['group_size']
  group_member = ENV['group_member']
  identifier = group_member

  Consumers::Commands.start('account:command', group_member:, group_size:, identifier:)
  Consumers::Commands::Transactions.start('accountTransaction', group_member:, group_size:, identifier:)
end

We’re just passing the group_member as the identifier. This gives account:command+position-0 as the position stream for instance 0.

Provided your group_size never changes, this would work, but refer back to the bulleted list above, particularly that second point. If the group size changes, then a consumer becomes responsible for different streams. We really need them to start over to make sure they didn’t miss anything. So we want the group_size to be part of the identifier.

def self.call
  group_size = ENV['group_size']
  group_member = ENV['group_member']
  identifier = "#{group_size}+#{group-member}"

  Consumers::Commands.start('account:command', group_member:, group_size:, identifier:)
  Consumers::Commands::Transactions.start('accountTransaction', group_member:, group_size:, identifier:)
end

With our working example, this would give position streams account:command+position-0+0 and accountTransaction:position-0+0 to instance 0 in our consumer group.

And just like that, if the group_size changes, the consumers will pick up that and using a stream specific to their member number and group size for position updates. If you ever changed the size and then changed it back, it would still work because we now have position streams dedicated to group_members at particular group_sizes. They might recycle some messages in this case, but these position updates are performance optimizations and used to guarantee exactly-once semantics.

For next time

At least 2 questions remain open at this time.

The first is how would you supply group_size and group_member to your instances? We’ll look at how you might handle that with kubernetes tomorrow, although by no means is kubernetes required to deploy Eventide components. I just think it’s interesting, and this is my list 🤪.

The second is to call out that there are other uses for the identifier parameter, so please don’t take from today’s that you can only use it for horizontal scaling.

Until next time, happy coding.


Like this message? I send out a short email each day to help software development leaders build organizations the deliver value. Join us!


Get the book!

Ready to learn how to build an autonomous, event-sourced microservices-based system? Practical Microservices is the hands-on guidance you've been looking for.

Roll up your sleeves and get ready to build Video Tutorials, the next-gen web-based learning platform. You'll build it as a collection of loosely-coupled autonomous services, developing a message store interface along the way.

When you're done, you'll be ready to contribute to microservices-based projects.

In ebook or in print.