☑ BlazingMQ: Clustering and Message Flow

3 Jul 2024 at 9:08AM in Software
 | 
Photo by Andy Pearce (via Midjourney)
 | 

BlazingMQ is a message queue middleware system developed at Bloomberg which was opened sourced around a year ago. I think it has a lot of very useful features for some use-cases, so this series of articles looks at it in a bit more detail. Having looked at an overview of the features in the previous article, I’m going to drill into clustering and a high-level look at the way messages flow through the system.

This is the 2nd of the 2 articles that currently make up the “BlazingMQ” series.

flaming mq

In the previous article in this series, I took a first look at BlazingMQ and ran through the primary features. If you haven’t read that one already, I strongly suggest you at least skim it, beacuse this article might be a little trickier to understand without the concepts I covered in it.

In this article I’m going to take a look at some of the slightly more detailed aspects, such as clustering of brokers and the way that messages flow through the system. First let’s look at how you might typically deploy BlazingMQ brokers in a cluster.

Clustering

As with a lot of middleware solutions, resilience is a primary requirement of BlazingMQ. As a result, it’s typically run in a cluster where multiple machines all host their own copy of the broker. Some sort of clustering or automated failover to another machine is essential, or the system will be vulnerable to the failure of a singe machine.

BlazingMQ is typically deployed in a cluster of 3-7 machines, with sizes outside this possible but not recommended. The assumption is that this cluster is a set of machines separate from those on which the applications using BlazingMQ are running, and the documentation recommends 5 machines as an optimal cluster size.

In the rest of this section we’ll look in more detail about how this clustering works.

Queue Coordination and Storage

Each queue is allocated to one of the nodes in the cluster, and this is known as the primary node for that queue. The primary node is responsible for managing the queue and replicating data for it to other nodes—all the other nodes are replicas for that queue. We’ll see how queues get assigned to primary nodes later in this article.

Applications, whether producers or consumers, can connect to any node—they do not need to be connected to the primary. The replica to which a producer is connected will forward messages to the primary node, and the primary will forward messages to any replica which has a consumer for that queue connected.

BlazingMQ Cluster

A queue is also assigned to a storage shard, which is also known as a partition. There are typically many queues per shard, and per-shard is the granularity that’s assigned to primary nodes—each shard has exactly one primary node, and each node is primary for zero or more shards.

Every node has a copy of every storage shard, but the primary node for a shard is the only one which ever modifies it. All the nodes other than the primary just receive a constantly updated copy of the shard from the primary, whilst concurrently sending copies of any shard for which they are the primary to other nodes.

The shards are all replicated in isolation of each other using stream of updates, each of which has a sequence number so replicas can detect reorderings or gaps in the stream and make sure their copy of the shard is complete and up to date. However, nodes only maintain a single TCP connection with each other, so updates for all shards are performed across this single connection.

The number of shards is statically configured for a given BlazingMQ cluster, as is the storage capacity of each shard—these must always been consistent between all nodes, and changing them values requires all the brokers to be restarted. Selecting the number and size of shards requires some advance knowledge of the total number and capacity of queues that this cluster is expected to handle.

One consequence of the fact that every node has a copy of every storage shard is that adding new nodes improves the resilience and availability, and may improve the ability to handle a big fan-out, but doesn’t actually improve the storage capacity of the cluster.

Storage Shards

A shard is represented on disk by two files:

Data file
These contain the payloads of messages stored on queues in the shard. It takes the form of a series of records of variable length, each representing the payload of one message. Records are addressed by their offset within this file.
Journal file
Contains a journal of events, such as the addition of a message to a queue, confirmation of a message by a consumer, creation or deletion of a queue, and so on. These records are fixed-length structures, each being 60 bytes. For records pertaining to messages, the offset of the payload within the data file is stored within the record.

So when a message arrives from a producer, the sequence of events is:

  1. Route the message to the primary node for the queue in question.
  2. The message is handed to the thread or subsystem handling the appropriate shard.
  3. Append the message payload to the data file, recording the offset.
  4. Append a MessageRecord to the journal file, containing the offset in the data file.
  5. At some point soon after, the shard is replicated to other nodes, which all update their copies of the data and journal files for that shard.
  6. The primary node sends acknowledgement back to the producer that the message is accepted, potentially after waiting for confirmation of replication if strong consistency has been chosen for the domain.

The types of records which can reside in the journal file are:

MessageRecord
Represents a message on the queue, and contains fields such as the message GUID, queue name, arrival time, checksum, and the offset of the payload in the data file.
ConfirmRecord
Represents a confirmation from a consumer, and contains fields such as the consumer ID, message GUID, queue name, and the time that the confirmation was received.
DeletionRecord
Represents the deletion of a message from the queue, and contains fields that identify the message, and also the reason for the deletion. Commonly the reason will be that it was confirmed by all applicable consumers, but it could also be because the message exceeded its TTL or it reached its retransmission limit for poison pill detection.
QueueOpRecord
Represents a change to the set of queues in this shared, such as a queue addition, deletion or modification.
JournalOpRecord
This is a meta record which contains information about the primary node and the replication process. This is used for things like validating the replication process.

Cluster Coordination

Now we’ve seen how queues are stored and replicated is managed by the primary node for their storage shard, now we’ll take a step up and see how queues are assigned to storage shards and shards are assigned to primary nodes.

Clusters use a quorum-based approach where one of the nodes is elected a leader, with the rest of the nodes being followers. The leader maintains cluster level state and replicates it to the followers, and also decides which nodes should be primary for which storage shards.

The leader/follower status of a node is unrelated to being the primary for any shards, and the assignment strategy can be controlled with the masterAssignment configuration setting—a value of E_LEADER_IS_MASTER_ALL means the leader will make itself primary for all shards, whereas E_LEAST_ASSIGNED tries to balance shards equally across the whole cluster.

The cluster-level state that the leader owns and replicates comprises at least the following:

  • Cluster membership and health status
  • A list of the queues currently hosted across all shards
  • Details about the current set of storage shards across the cluster
  • The current mapping of storage shard to primary node
  • The current mapping of queues to storage shards

The number of shards is known in advance, since that’s fixed by the cluster configuration and cannot change an runtime, as mentioned in the previous section. So once the leader is elected they can immediately assign shards to nodes, and they can keep this mapping maintained as nodes leave and rejoin the cluster, ensuring that every shard has an active primary node.

Queues are not known in advance and created dynamically, so the leader needs a strategy to assign them to shards. From what I can tell, queues are always assigned to the shard which has a primary node and the least number of current queues, which seems reasonable enough in practice. Just be aware of this if you’re planning, say, just a single queue with a huge amount of traffic—you’d be better off spreading your traffic across multiple queues to make best use of the capacity of the cluster.

To summarise all this, the diagram below attempts to convey the different responsibilities that we’ve covered in the last two sections. For simplicity I’ve just shown two storage shards, but in reality there would likely be more.

BlazingMQ Cluster Details

Leader Election

Now we know what the leader does, let’s see how one gets selected. Whenever a follower node detects the absence of a leader, which it does by missing a certain number of heartbeats, it triggers an election to select a new one. Leader election is an in-depth topic which I’m not going to dive into here, so I’ll just briefly describe the algorithm that BlazingMQ uses currently and a few brief critiques. The consensus algorithm used is inspired by, but not identical to, Raft.

The issue that overly simplistic leader election algorithms suffer is generally a lack of priority—you tend to have several nodes all triggering elections at similar times, and they have no way to differentiate themselves to reliably select a consistent leader. To resolve this, BlazingMQ’s algorithm uses a term counter—this is a monotonically increasing integer that’s tracked separately on each node, and is used to settle ties between two or more candidates1. On every node the term counter starts at zero.

Bearing this in mind, the algorithm runs as follows:

  1. Any node detecting the absence of a leader waits a random time interval2 in the order of a few seconds, and then increments its own term and sends out an ElectionProposal frame with this new term.
  2. When a peer receives the proposal:
    • It responds yes if the proposed term is higher than its own current term value, and also sets its own term value to the proposed value.
    • If not, it responds no and leaves its own term value unmodified.
  3. If the proposing node gets more yes votes than no votes, then it sets its own term to the proposed term and assumes leadership.
  4. The new leader starts sending periodic Heartbeat frames to all other peers, which contain its own term value.
  5. Whenever followers receive a Heartbeat frame, they confirm that the contained term value matches their own.
  6. If a configurable number of Heartbeat frames are missed, the node triggers a new election.

The state of nodes during this process is summarised in the following state diagram:

Leader election state diagram

The leader can also include its current term along with another per-node monotonic counter to create a leader sequence number which can be used to create uniquely sequenced application messages. When a new leader is elected then the term will be higher, so the leader sequence numbers from the new leader will always be higher than the previous leader, which is helpful if the old leader has gone stale but is still sending messages to some peers.

The algorithm above does have some issues. For example, if a node is excluded from a cluster temporarily then it will lose heartbeats and eventually become a new candidate with a higher term—when it rejoins the cluster it will usurp the leadership, leading to unnecessary leadership changes.

Leader election disconnection

Also, if a node gets partitioned away from the current leader, but still in contact with other nodes, this can also lead to issues as the existing leader will not see the new leader, but will have its heartbeats rejected due to its term being too low—this will cause it to trigger a new election, and leadership will keep alternating between the nodes that can’t see each other.

Leader election flip-flop

To mitigate these concerns, BlazingMQ has introduced an ElectionScouting request. which is sent with the term that would be used in a prospective ElectionProposal request—i.e the value is the current term plus one, but the node doesn’t (yet) increment its actual term. Peers respond to this message with a yes response only if they are not aware of a current leader (of any term) and the proposed term value is greater than their own.

The addition of ElectionScouting allows nodes to non-disruptively check whether they’d have the support of a majority of the cluster in a leader election, so they can only trigger one if they do. This does a good job of avoiding the scenarios of unnecessary leadership changes described above.

Well, that feels like a deeper diver into the leader election system than I originally intended to write, but it’s quite critical to the performance and reliability of the cluster, so I think it’s useful to see how it works in some detail. In general it looks fairly robust to me, although I do wonder how well it would work with very high latency links between nodes, which might spoil the effect of the random delay before triggering an election—this relay is only 0-3 seconds, so a random latency above this could upset things. That said, if your nodes are connected with links whose latencies frequently strays into seconds, then you’ve probably got bigger problems around repliction anyway.

Proxies

It’s also possible to run a BlazingMQ agent in a replica-only mode, where it doesn’t take on primary responsibility for any queues but only forwards messages to other agents—in this mode it’s known as a proxy. Proxies are entirely optional, and they do introduce an extra hop into message flows which can slightly increase latencies—however, they can be useful in some particular situations.

Proxies

Some of the ways in which proxies can be helpful are:

Hiding cluster changes
Applications can just connect to a local proxy and not worry about the details of the cluster itself. For example if the cluster goes away—let’s say all instances need to be upgraded and restarted—then the proxy can buffer messages and save applications from having to handle being disconnected.
Reduce TCP connections
If there are a large number of producers and consumers, that makes a lot of applications with active TCP connections to each node in the cluster. A local proxy only needs one connection to each node, so focusing many connections through a proxy can keep the total number of TCP connections manageable in these larger-scale cases.
One potentially subtle benefit of this is that in failure cases, each application having to handle reconnection individually potentially extends recovery time, not to mention the overhead of re-establishing a large number of connections. A single proxy only has to restablish a small number of connections, however, which is faster and more efficient.
Local fan-out
In cases where you have a large fan-out ratio—particularly using broadcast mode—the cluster having to send a copy of the same message multiple times is quite inefficient. This overhead becomes more painful the further hosts are from each other—if consumers are geographically spread across data centres and there are a lot of them, this duplication could waste a lot of bandwidth.
If you use a proxy, however, the message is only sent once to the proxy and that performs the fan-out across local network infrastructure where traffic is faster and cheaper.

Proxies for fan out

My sense is that proxies are probably only worth using in larger deployments of BlazingMQ, with large numbers of producers and consumers expected. The nice thing is that you can easily set up a cluster which doesn’t use them but then add them later if the need arises as things scale or usage patterns change.

For those larger cases, however, this layered hierarchy allows almost unlimited scalability of the fan-out—the primary node can send through multiple replicas, each replica can send through multiple proxies, and actually it’s also possible to introduce further layers of proxies beyond this if required.

Large fan out ratio

Message Flow

Having looked in more detail at the physical architecture, now it’s time to shift our focus to the temporal and see the flow of events during message transmission.

There are four network frame types involved in message delivery:

  • A PUT frame is sent by the producer to create a new message on the queue, and is routed to the primary node for the destination queue.
  • An ACK frame is sent by the primary node back to the producer to confirm whether the message was accepted.
  • A PUSH frame is sent by the primary node to towards the consumer to deliver a message to them.
  • A CONFIRM frame is sent back from the consumer to confirm that a message has been handled, and is routed to the primary node.

As mentioned earlier, all of these can go through proxies and replicas before being routed to their eventual destination. Also, multiple PUSH frames will be needed in the case of fan-out mode, as multiple groups of consumers need to receive the messages. Ignoring proxies and fan-out, however, the sequence diagram in the simple case looks like this.

Message flow

This diagram doesn’t show the replication of the storage shard, which happens asynchronously in the background of this process. The PUSH message contains the payload of the message being pushed, so replicas don’t need an updated storage shard on initial send—in order to retransmit after a delay or failure, however, they might. I’ll be honest, I haven’t yet been through the code in enough detail to quite derive the precise intricacies of how that works.

Message Resilience

At this point it’s worth discussing how BlazingMQ provides protection against node failures along this path, as this is a key part of the resilience that a message queue provides.

Retransmissions

Let’s start with the flow of the PUT messages—you might want to refer to the diagram above for this section. At each stage, these are held in an in-memory buffer until the corresponding ACK is received from the next node in the chain, confirming that the message has reached the primary node and been persisted into the storage shard and, in the case of strong consistency mode, also guaranteed to be replicated to a majority of the replicas.

Since this resilience is added at each node, whether proxy or replica, then in practice this can add a good amount of resilience despite the buffer being only in-memory. Of course, not system is perfect and there is some potential for loss if multiple nodes fail at the same time—but the time from PUT to ACK is expected to be very short under normal circumstances, so the window for these failures should be very small.

Earlier we discussed the benefits of using a proxy close to consumers for large fan-out ratios—this setup illustrates the benefit of using a proxy close to producers, as any failure involving the whole BlazingMQ cluster but not the producers is unlikely to affect a proxy if that proxy is physically colocated with the producers. This means the proxy could hold PUT messages in its buffer for, say, a few minutes while the entire BlazingMQ cluster restarts. This is another example of the resilience benefits of proxies.

Once the message has reached the primary and been stored in the storage shard, and replicated in the case of strong consistency, then there’s no more need for in-memory buffers—the replicated shard already acts as a retransmit buffer. This allows replicas to retransmit messages independently without that needing to be triggered from the primary node.

Message Deduplication

The fact that multiple hops along the inbound path will retransmit if needed is great for adding resilience, but does introduce potential for another issue which is message duplication. Let’s say a producer sends a PUSH to a proxy, which itself sends to a replica, and that replica successfully sends to the primary, which persists the message. Let’s further suppose, however, that the replica immediately fails after this and the returning ACK from the primary is missed.

In this instance, the proxy will assume that the PUSH was not received, since it never received an ACK, and will retransmit it as soon as it has a new path to the primary. Since the initial PUSH did actually get through, this will lead to a duplicate message being added to the queue.

To avoid this issue, BlazingMQ generates the unique GUID of a message within the SDK within the producer, so the message has a consistent GUID across its lifetime which is durable across any retransmissions. This allows the primary node to keep a history of recent GUIDs that it’s already seen—these are held for a configurable period which defaults to 5 minutes, but can be overridden with the deduplicationTimeMs configurable within a domain.

If a PUSH arrives which matches a GUID already in this history, an ACK is sent back but not duplicate message is added to the queue.

PUT frame deduplication

It’s worth noting that although BlazingMQ makes a best effort to avoid these duplicates within the queue itself, it’s quite possible for later PUSH messages to be duplicated. For example, if a consumer commits some work to an application database, but then immediately fails before it can send a CONFIRM back to BlazingMQ, then the PUSH will be retried to another consumer which will then go ahead and presumably try the same action. This is just one of those windows of failure which is almost impossible to close entirely—however, since the GUID of the message is consistent then it’s usually possible for consumer applications to use this to avoid duplication on their side as well. For example, if every update to the database includes a write of the GUID to a table with a unique constraint on that column, then the transaction will fail if the message has already been processed.

Graceful Shutdown

Also under the broad umbrella of “resilience features” is the process of gracefully shutting down a BlazingMQ node.

When a node is shutting down, it sends a StopRequest frame to its peers, both upstream and downstream, which instructs the peers to cease sending any new PUSH and PUT frames. This prevents the peer taking on any new work, although ACK and CONFIRM frames are still sent to allow existing active messages to drain out of the system.

Once CONFIRM or ACK frames have been received for every pending message within a given peer, that peer then sends a StopResponse frame to indicate that this link is now idle—peers should not send any more frames after StopResponse. Each peer also has a timeout which causes them to send StopResponse even if they’re still waiting for CONFIRMs or ACKs.

The choice of this timeout is a delicate balance between the risk of creating PUSH messages by dropping CONFIRMs, and the risk of taking longer to shut down and fail over to another node. In different use-cases and situations consumers may take quite different amounts of time to process messages and send back CONFIRMs, so the choice of this timeout will likley differ between clusters.

Once StopResponse has been received from all peers, the node is free to shut down fully.

Client SDK

The measures described above work well to add resilience against nodes failing within the cluster, but what about producers and consumers? If the node to which they’re connected goes down, they also need some level of resilience, and this is non-trivial to implement.

Thankfully, the BlazingMQ SDK embedded in the applications has features which support this resilience, removing the need for every application to implement it themselves.

Once a queue has been opened within the SDK, it will buffer any PUT frames send by a producer, even if the connection itself goes down temporarily—these buffered messages are released on receipt of the corresponding ACK frame. Whilst the connection is down these buffered frames collect within the SDK and are eventually transmitted when the connection is re-established. Even if some PUTs did get through earlier, the message deduplication described above should handle this.

As well as PUT frames, requests to open and configure queues are also buffered. CONFIRM frames, however, are not buffered—this is because when a BlazingMQ consumer is disconnected, the brokers will typically resend messages to a different consumer, which will presumably then go on to confirm them. This is something the SDK can’t fully handle for the consumer, and design of consumers needs to make a decision on whether CONFIRMs are sent back before or after initiating side-effects of message receipt, depending on the desired behaviour in failure cases.


  1. The name term counter comes from Raft, but other consensus protocols have related concepts, such as the proposal number in Paxos and the round number in ZooKeeper’s Fast Leader Election algorithm. 

  2. The random time interval is very important, because it’s what prevents nodes all starting a new election at the same moment, which could lead to elections which take a long time to settle down to a stable state, if all nodes are operating at very similar speeds and network latencies. 

This is the most recent article in the “BlazingMQ” series, which started with BlazingMQ: Introduction
Thu 27 Jun, 2024
3 Jul 2024 at 9:08AM in Software
 | 
Photo by Andy Pearce (via Midjourney)
 |