Horizontally scaling consumers with Message DB

Yesterday we went through how scaling a stateful component isn’t as simple as just starting another instance of that component because they’ll end up trying to do the same work.

Today we go through how to get more than one instance working on different work with Message DB. The Eventide documentation refers to this as Consumer Groups.

While it is very possible to have additional instances working together, it is critical that all the messages given stream are processed by the same instance. There are things in computer programming that are art and where there’s room for opinion and nuance. This is not one of them.

For example, stream account:command-123 must be processed by the same instance Otherwise, in this case, commands for account 123 will be processed out of order. If a deposit goes in before a withdrawal, that’s important to the process of tracking an account balance.

So that’s the guiding principal. All messages with the same stream_name have to go through the same instance. So, we need to find some way of predictably partitioning the possible stream_name space.

We could just take the first character of the identifier portion of a stream_name, the first character to the right of the first dash. That would require that we run one instance per possible number of characters. In the case of a UUID, we’d have a minimum of 16 instances. Surely we can do better.

Modulo to the rescue

Enter modulo, the operation that gives you the remainder of a division operation. Classic programming 101 stuff uses modulo to tell if an integer is even or odd. if x % 2 equals 0 (% being the modulo operator in several languages), a number is even. Otherwise it’s odd.

Modulo also helps in partitioning spaces beyond even and odd. If we knew how many instances of a component we were running—which we do because we’re not auto-scaling—and we had a stable identifier for each instance—which we can have because we write the code—we have the makings of a solution.

Even/odd partitions into two separate groups, and that’s why we wrote it as x % 2, with group 0 being event and group 1 odd. So the check seems to be x % <group size> == <group member>. If we could turn our stream_name into that x, we’d have everything we need.

Did you know hashes are numbers?

You’ve probably seen text representations of hashes, but hashes, like anything in a computer, are just numbers. If you concatenated all of Microsoft Windows’s code, you’d just have a single number. A rather large one, to be sure, but still a single number.

So, we could make a hash of the stream_name column to get our x. So combined with knowledge of how many instances we’re running and which particular instance a given instance is, we could tell if a given instance should process a given message.

This is exactly how Message DB handles this. Here is the function definition used to retrieve messages for processing:

CREATE OR REPLACE FUNCTION message_store.get_category_messages(
  category varchar,
  "position" bigint DEFAULT 1,
  batch_size bigint DEFAULT 1000,
  correlation varchar DEFAULT NULL,
  consumer_group_member bigint DEFAULT NULL,
  consumer_group_size bigint DEFAULT NULL,
  condition varchar DEFAULT NULL
)

Notice the consumer_group_member and consumer_group_size parameters. If you use one, you must use the other, and consumer_group_membercan’t be greater than consumer_group_size. The reason for that is, well, I’m going to assume you can count. consumer_group_size must be greater than 1, and consumer_group_member must be greater than or equal to 0. Remember, that x mod y gives a result from 0 to x minus 1.

Later in the body of this function, assuming the above preconditions are valid, get_category_messages hashes the stream_name and adds a check to the query to see if modding that result by the consumer_group_size yields the given consumer_group_member:

_command := _command || ' AND
      MOD(@hash_64(cardinal_id(stream_name)), $6) = $5';

This function will only return to consumers the messages that they have the right to process.

What about hash collisions

Even if a hash collision occurred here, that wouldn’t be an issue. We’re not trying to say that each consumer must process only a single stream. We’re saying that a single stream must be processed by a single consumer. So, if there’s a collision, and two streams go to the same consumer, that’s fine. A given consumer is already processing more than one stream.

What if the consumer_group_size changes?

You’re right to notice this. It would mean that a stream that was being processed by one node will possibly start getting processed by a different node. From the Message DB perspective this is a non-issue, provided that all the instances restart together. You then won’t have different instances processing the same messages at the same time.

From the Eventide perspective, this does require some care, and since this email is pushing the boundary of our 2-minute-read promise, we’ll save that for tomorrow.


Like this message? I send out a short email each day to help software development leaders build organizations the deliver value. Join us!


Get the book!

Ready to learn how to build an autonomous, event-sourced microservices-based system? Practical Microservices is the hands-on guidance you've been looking for.

Roll up your sleeves and get ready to build Video Tutorials, the next-gen web-based learning platform. You'll build it as a collection of loosely-coupled autonomous services, developing a message store interface along the way.

When you're done, you'll be ready to contribute to microservices-based projects.

In ebook or in print.