(This is part of a larger series on scaling autonomous components. If you like, you can start at the beginning.)
Yesterday we went through Message DB’s capabilities for supporting horizontally scaled consumers. If you want to write your own Message DB client, that will help you understand how to enable horizontal scaling.
Today we go through how to use The Eventide Project to leverage those capabilities.
Starting your consumer
In a typical Eventide component, you’ll have a start.rb
file in your component’s folder. For example, here is the account-component
’s:
module AccountComponent
module Start
def self.call
Consumers::Commands.start('account:command')
Consumers::Commands::Transactions.start('accountTransaction')
end
end
end
Those start
calls can take additional arguments. Three of them are interesting to us right now, namely group_member
, group_size
, and identifier
.
group_member
and group_size
are passed directly as the consumer_group_member
and consumer_group_size
arguments to Message DB’s get_category_messages
that was the main topic of yesterday’s missive.
So if you have 3 total instances, you could start instance 0 as follows (the module
lines are omitted for space):
def self.call
Consumers::Commands.start('account:command', group_member: 0, group_size: 3)
Consumers::Commands::Transactions.start('accountTransaction', group_member: 0, group_size: 3)
end
You would do the same thing with instances 1 and 2, only the group_member
value would change accordingly for those instances.
If you remember from yesterday, an instance gets the messages where the hash of the stream_name
modulo group_size
is equal to that instance’s group_member
. With all this combined, hopefully a couple of things stand out:
- All the instances need to agree on the
group_size
, and - If the
group_size
changes, then a given instance is likely to become responsible for streams it wasn’t responsible for before
You can handle the first point easily enough by supplying the group_size
as an environment variable. You can choose another means, of course, but that’s a way to handle it.
This second point leads into the third parameter that matters, identifier
.
identifier
affects where consumer positions are stored
Eventide consumers periodically make a record of the last message they’ve processed. By default, this is every 100 messages (incidentally controlled by another one of the arguments you pass to start
).
Also by default, components store these positions in streams. In the account-component
example, using Consumers::Commands.start('account:command')
to start a consumer results in a position stream named account:command+position
. It takes the consumer’s category, and adds the position
type modifier.
Adding group_member
and group_size
does not change this!
If we started all three of our instances as written above, they’d all write their position updates to the same stream. In so doing, they would clobber one another and miss messages. Instance 0 might say it processed through global_position
300, and instance 1 could pick that up as its starting point on a restart, even though it had only processed through global_position
200. They’re looking in the same place for their starting point.
Enter the identifier
parameter. It will be added to the position stream’s name as, wait for it, the identifier portion of the stream_name
. If you started a consumer with Consumers::Commands.start('account:command', identifier: 'hello')
, position updates would go to the account:command+position-hello
stream.
Bringing it together
Okay, so what do we put in our identifier? Each of our instances needs its position updates to go to a different stream. So a starting point might be:
def self.call
group_size = ENV['group_size']
group_member = ENV['group_member']
identifier = group_member
Consumers::Commands.start('account:command', group_member:, group_size:, identifier:)
Consumers::Commands::Transactions.start('accountTransaction', group_member:, group_size:, identifier:)
end
We’re just passing the group_member
as the identifier. This gives account:command+position-0
as the position stream for instance 0.
Provided your group_size
never changes, this would work, but refer back to the bulleted list above, particularly that second point. If the group size changes, then a consumer becomes responsible for different streams. We really need them to start over to make sure they didn’t miss anything. So we want the group_size
to be part of the identifier
.
def self.call
group_size = ENV['group_size']
group_member = ENV['group_member']
identifier = "#{group_size}+#{group-member}"
Consumers::Commands.start('account:command', group_member:, group_size:, identifier:)
Consumers::Commands::Transactions.start('accountTransaction', group_member:, group_size:, identifier:)
end
With our working example, this would give position streams account:command+position-0+0
and accountTransaction:position-0+0
to instance 0 in our consumer group.
And just like that, if the group_size
changes, the consumers will pick up that and using a stream specific to their member number and group size for position updates. If you ever changed the size and then changed it back, it would still work because we now have position streams dedicated to group_member
s at particular group_size
s. They might recycle some messages in this case, but these position updates are performance optimizations and used to guarantee exactly-once semantics.
For next time
At least 2 questions remain open at this time.
The first is how would you supply group_size
and group_member
to your instances? We’ll look at how you might handle that with kubernetes tomorrow, although by no means is kubernetes required to deploy Eventide components. I just think it’s interesting, and this is my list 🤪.
The second is to call out that there are other uses for the identifier
parameter, so please don’t take from today’s that you can only use it for horizontal scaling.
Until next time, happy coding.