BlazingMQ is a message queue middleware system developed at Bloomberg which was opened sourced around a year ago. I think it has a lot of very useful features for some use-cases, so this series of articles looks at it in a bit more detail. Having looked at an overview of the features in the previous article, I’m going to drill into clustering and a high-level look at the way messages flow through the system.
This is the 2nd of the 4 articles that currently make up the “BlazingMQ” series.
In the previous article in this series, I took a first look at BlazingMQ and ran through the primary features. If you haven’t read that one already, I strongly suggest you at least skim it, beacuse this article might be a little trickier to understand without the concepts I covered in it.
In this article I’m going to take a look at some of the slightly more detailed aspects, such as clustering of brokers and the way that messages flow through the system. First let’s look at how you might typically deploy BlazingMQ brokers in a cluster.
As with a lot of middleware solutions, resilience is a primary requirement of BlazingMQ. As a result, it’s typically run in a cluster where multiple machines all host their own copy of the broker. Some sort of clustering or automated failover to another machine is essential, or the system will be vulnerable to the failure of a singe machine.
BlazingMQ is typically deployed in a cluster of 3-7 machines, with sizes outside this possible but not recommended. The assumption is that this cluster is a set of machines separate from those on which the applications using BlazingMQ are running, and the documentation recommends 5 machines as an optimal cluster size.
In the rest of this section we’ll look in more detail about how this clustering works.
Each queue is allocated to one of the nodes in the cluster, and this is known as the primary node for that queue. The primary node is responsible for managing the queue and replicating data for it to other nodes—all the other nodes are replicas for that queue. We’ll see how queues get assigned to primary nodes later in this article.
Applications, whether producers or consumers, can connect to any node—they do not need to be connected to the primary. The replica to which a producer is connected will forward messages to the primary node, and the primary will forward messages to any replica which has a consumer for that queue connected.
A queue is also assigned to a storage shard, which is also known as a partition. There are typically many queues per shard, and per-shard is the granularity that’s assigned to primary nodes—each shard has exactly one primary node, and each node is primary for zero or more shards.
Every node has a copy of every storage shard, but the primary node for a shard is the only one which ever modifies it. All the nodes other than the primary just receive a constantly updated copy of the shard from the primary, whilst concurrently sending copies of any shard for which they are the primary to other nodes.
The shards are all replicated in isolation of each other using stream of updates, each of which has a sequence number so replicas can detect reorderings or gaps in the stream and make sure their copy of the shard is complete and up to date. However, nodes only maintain a single TCP connection with each other, so updates for all shards are performed across this single connection.
The number of shards is statically configured for a given BlazingMQ cluster, as is the storage capacity of each shard—these must always been consistent between all nodes, and changing them values requires all the brokers to be restarted. Selecting the number and size of shards requires some advance knowledge of the total number and capacity of queues that this cluster is expected to handle.
One consequence of the fact that every node has a copy of every storage shard is that adding new nodes improves the resilience and availability, and may improve the ability to handle a big fan-out, but doesn’t actually improve the storage capacity of the cluster.
A shard is represented on disk by two files:
So when a message arrives from a producer, the sequence of events is:
The types of records which can reside in the journal file are:
Now we’ve seen how queues are stored and replicated is managed by the primary node for their storage shard, now we’ll take a step up and see how queues are assigned to storage shards and shards are assigned to primary nodes.
Clusters use a quorum-based approach where one of the nodes is elected a leader, with the rest of the nodes being followers. The leader maintains cluster level state and replicates it to the followers, and also decides which nodes should be primary for which storage shards.
The leader/follower status of a node is unrelated to being the primary for any shards, and the assignment strategy can be controlled with the masterAssignment
configuration setting—a value of E_LEADER_IS_MASTER_ALL
means the leader will make itself primary for all shards, whereas E_LEAST_ASSIGNED
tries to balance shards equally across the whole cluster.
The cluster-level state that the leader owns and replicates comprises at least the following:
The number of shards is known in advance, since that’s fixed by the cluster configuration and cannot change an runtime, as mentioned in the previous section. So once the leader is elected they can immediately assign shards to nodes, and they can keep this mapping maintained as nodes leave and rejoin the cluster, ensuring that every shard has an active primary node.
Queues are not known in advance and created dynamically, so the leader needs a strategy to assign them to shards. From what I can tell, queues are always assigned to the shard which has a primary node and the least number of current queues, which seems reasonable enough in practice. Just be aware of this if you’re planning, say, just a single queue with a huge amount of traffic—you’d be better off spreading your traffic across multiple queues to make best use of the capacity of the cluster.
To summarise all this, the diagram below attempts to convey the different responsibilities that we’ve covered in the last two sections. For simplicity I’ve just shown two storage shards, but in reality there would likely be more.
Now we know what the leader does, let’s see how one gets selected. Whenever a follower node detects the absence of a leader, which it does by missing a certain number of heartbeats, it triggers an election to select a new one. Leader election is an in-depth topic which I’m not going to dive into here, so I’ll just briefly describe the algorithm that BlazingMQ uses currently and a few brief critiques. The consensus algorithm used is inspired by, but not identical to, Raft.
The issue that overly simplistic leader election algorithms suffer is generally a lack of priority—you tend to have several nodes all triggering elections at similar times, and they have no way to differentiate themselves to reliably select a consistent leader. To resolve this, BlazingMQ’s algorithm uses a term counter—this is a monotonically increasing integer that’s tracked separately on each node, and is used to settle ties between two or more candidates1. On every node the term counter starts at zero.
Bearing this in mind, the algorithm runs as follows:
ElectionProposal
frame with this new term.Heartbeat
frames to all other peers, which contain its own term value.Heartbeat
frame, they confirm that the contained term value matches their own.Heartbeat
frames are missed, the node triggers a new election.The state of nodes during this process is summarised in the following state diagram:
The leader can also include its current term along with another per-node monotonic counter to create a leader sequence number which can be used to create uniquely sequenced application messages. When a new leader is elected then the term will be higher, so the leader sequence numbers from the new leader will always be higher than the previous leader, which is helpful if the old leader has gone stale but is still sending messages to some peers.
The algorithm above does have some issues. For example, if a node is excluded from a cluster temporarily then it will lose heartbeats and eventually become a new candidate with a higher term—when it rejoins the cluster it will usurp the leadership, leading to unnecessary leadership changes.
Also, if a node gets partitioned away from the current leader, but still in contact with other nodes, this can also lead to issues as the existing leader will not see the new leader, but will have its heartbeats rejected due to its term being too low—this will cause it to trigger a new election, and leadership will keep alternating between the nodes that can’t see each other.
To mitigate these concerns, BlazingMQ has introduced an ElectionScouting
request. which is sent with the term that would be used in a prospective ElectionProposal
request—i.e the value is the current term plus one, but the node doesn’t (yet) increment its actual term. Peers respond to this message with a yes response only if they are not aware of a current leader (of any term) and the proposed term value is greater than their own.
The addition of ElectionScouting
allows nodes to non-disruptively check whether they’d have the support of a majority of the cluster in a leader election, so they can only trigger one if they do. This does a good job of avoiding the scenarios of unnecessary leadership changes described above.
Well, that feels like a deeper diver into the leader election system than I originally intended to write, but it’s quite critical to the performance and reliability of the cluster, so I think it’s useful to see how it works in some detail. In general it looks fairly robust to me, although I do wonder how well it would work with very high latency links between nodes, which might spoil the effect of the random delay before triggering an election—this relay is only 0-3 seconds, so a random latency above this could upset things. That said, if your nodes are connected with links whose latencies frequently strays into seconds, then you’ve probably got bigger problems around repliction anyway.
It’s also possible to run a BlazingMQ agent in a replica-only mode, where it doesn’t take on primary responsibility for any queues but only forwards messages to other agents—in this mode it’s known as a proxy. Proxies are entirely optional, and they do introduce an extra hop into message flows which can slightly increase latencies—however, they can be useful in some particular situations.
Some of the ways in which proxies can be helpful are:
My sense is that proxies are probably only worth using in larger deployments of BlazingMQ, with large numbers of producers and consumers expected. The nice thing is that you can easily set up a cluster which doesn’t use them but then add them later if the need arises as things scale or usage patterns change.
For those larger cases, however, this layered hierarchy allows almost unlimited scalability of the fan-out—the primary node can send through multiple replicas, each replica can send through multiple proxies, and actually it’s also possible to introduce further layers of proxies beyond this if required.
Having looked in more detail at the physical architecture, now it’s time to shift our focus to the temporal and see the flow of events during message transmission.
There are four network frame types involved in message delivery:
As mentioned earlier, all of these can go through proxies and replicas before being routed to their eventual destination. Also, multiple PUSH frames will be needed in the case of fan-out mode, as multiple groups of consumers need to receive the messages. Ignoring proxies and fan-out, however, the sequence diagram in the simple case looks like this.
This diagram doesn’t show the replication of the storage shard, which happens asynchronously in the background of this process. The PUSH message contains the payload of the message being pushed, so replicas don’t need an updated storage shard on initial send—in order to retransmit after a delay or failure, however, they might. I’ll be honest, I haven’t yet been through the code in enough detail to quite derive the precise intricacies of how that works.
At this point it’s worth discussing how BlazingMQ provides protection against node failures along this path, as this is a key part of the resilience that a message queue provides.
Let’s start with the flow of the PUT messages—you might want to refer to the diagram above for this section. At each stage, these are held in an in-memory buffer until the corresponding ACK is received from the next node in the chain, confirming that the message has reached the primary node and been persisted into the storage shard and, in the case of strong consistency mode, also guaranteed to be replicated to a majority of the replicas.
Since this resilience is added at each node, whether proxy or replica, then in practice this can add a good amount of resilience despite the buffer being only in-memory. Of course, not system is perfect and there is some potential for loss if multiple nodes fail at the same time—but the time from PUT to ACK is expected to be very short under normal circumstances, so the window for these failures should be very small.
Earlier we discussed the benefits of using a proxy close to consumers for large fan-out ratios—this setup illustrates the benefit of using a proxy close to producers, as any failure involving the whole BlazingMQ cluster but not the producers is unlikely to affect a proxy if that proxy is physically colocated with the producers. This means the proxy could hold PUT messages in its buffer for, say, a few minutes while the entire BlazingMQ cluster restarts. This is another example of the resilience benefits of proxies.
Once the message has reached the primary and been stored in the storage shard, and replicated in the case of strong consistency, then there’s no more need for in-memory buffers—the replicated shard already acts as a retransmit buffer. This allows replicas to retransmit messages independently without that needing to be triggered from the primary node.
The fact that multiple hops along the inbound path will retransmit if needed is great for adding resilience, but does introduce potential for another issue which is message duplication. Let’s say a producer sends a PUSH to a proxy, which itself sends to a replica, and that replica successfully sends to the primary, which persists the message. Let’s further suppose, however, that the replica immediately fails after this and the returning ACK from the primary is missed.
In this instance, the proxy will assume that the PUSH was not received, since it never received an ACK, and will retransmit it as soon as it has a new path to the primary. Since the initial PUSH did actually get through, this will lead to a duplicate message being added to the queue.
To avoid this issue, BlazingMQ generates the unique GUID of a message within the SDK within the producer, so the message has a consistent GUID across its lifetime which is durable across any retransmissions. This allows the primary node to keep a history of recent GUIDs that it’s already seen—these are held for a configurable period which defaults to 5 minutes, but can be overridden with the deduplicationTimeMs
configurable within a domain.
If a PUSH arrives which matches a GUID already in this history, an ACK is sent back but not duplicate message is added to the queue.
It’s worth noting that although BlazingMQ makes a best effort to avoid these duplicates within the queue itself, it’s quite possible for later PUSH messages to be duplicated. For example, if a consumer commits some work to an application database, but then immediately fails before it can send a CONFIRM back to BlazingMQ, then the PUSH will be retried to another consumer which will then go ahead and presumably try the same action. This is just one of those windows of failure which is almost impossible to close entirely—however, since the GUID of the message is consistent then it’s usually possible for consumer applications to use this to avoid duplication on their side as well. For example, if every update to the database includes a write of the GUID to a table with a unique constraint on that column, then the transaction will fail if the message has already been processed.
Also under the broad umbrella of “resilience features” is the process of gracefully shutting down a BlazingMQ node.
When a node is shutting down, it sends a StopRequest
frame to its peers, both upstream and downstream, which instructs the peers to cease sending any new PUSH and PUT frames. This prevents the peer taking on any new work, although ACK and CONFIRM frames are still sent to allow existing active messages to drain out of the system.
Once CONFIRM or ACK frames have been received for every pending message within a given peer, that peer then sends a StopResponse
frame to indicate that this link is now idle—peers should not send any more frames after StopResponse
. Each peer also has a timeout which causes them to send StopResponse
even if they’re still waiting for CONFIRMs or ACKs.
The choice of this timeout is a delicate balance between the risk of creating PUSH messages by dropping CONFIRMs, and the risk of taking longer to shut down and fail over to another node. In different use-cases and situations consumers may take quite different amounts of time to process messages and send back CONFIRMs, so the choice of this timeout will likley differ between clusters.
Once StopResponse
has been received from all peers, the node is free to shut down fully.
The measures described above work well to add resilience against nodes failing within the cluster, but what about producers and consumers? If the node to which they’re connected goes down, they also need some level of resilience, and this is non-trivial to implement.
Thankfully, the BlazingMQ SDK embedded in the applications has features which support this resilience, removing the need for every application to implement it themselves.
Once a queue has been opened within the SDK, it will buffer any PUT frames send by a producer, even if the connection itself goes down temporarily—these buffered messages are released on receipt of the corresponding ACK frame. Whilst the connection is down these buffered frames collect within the SDK and are eventually transmitted when the connection is re-established. Even if some PUTs did get through earlier, the message deduplication described above should handle this.
As well as PUT frames, requests to open and configure queues are also buffered. CONFIRM frames, however, are not buffered—this is because when a BlazingMQ consumer is disconnected, the brokers will typically resend messages to a different consumer, which will presumably then go on to confirm them. This is something the SDK can’t fully handle for the consumer, and design of consumers needs to make a decision on whether CONFIRMs are sent back before or after initiating side-effects of message receipt, depending on the desired behaviour in failure cases.
The name term counter comes from Raft, but other consensus protocols have related concepts, such as the proposal number in Paxos and the round number in ZooKeeper’s Fast Leader Election algorithm. ↩
The random time interval is very important, because it’s what prevents nodes all starting a new election at the same moment, which could lead to elections which take a long time to settle down to a stable state, if all nodes are operating at very similar speeds and network latencies. ↩