BlazingMQ is a message queue middleware system developed at Bloomberg which was opened sourced around a year ago. I think it has a lot of very useful features for some use-cases, so this series of articles looks at it in a bit more detail.
This is the 1st of the 2 articles that currently make up the “BlazingMQ” series.
A brief disclaimer: this post discusses a piece of middleware that was developed at Bloomberg, and later released as open source, and I’d like to be transparent and mention that I’m currently a Bloomberg employee. I wrote this article because I thought it was an interesting technology, and it was entirely my own idea to do so. I work in a quite different team to that which developed this software and at time of writing have had no part at all in its development. Opinions and assertions herein are my own, as is responsibility for any errors.
When you’re dealing with distributed systems, it’s more or less always essential to have some sort of reliable means for the various components to communicate. There are various approaches, but quite often this takes the form of realtime or near-realtime message passing, where message sending and message delivery are somewhat decoupled, as this turns out to be quite useful for building resilient systems. Various forms of middleware have been created to serve this need.
In this series of articles I’m going to take a look at a comparatively new such middleware called BlazingMQ, which was developed at Bloomberg and later released as open source. I think it has some interesting properties that make it particularly applicable to certain use-cases, and technologically interesting.
Before we get into that, though, let’s just look at some of the other options already available, so we know what we’re comparing against.
I’ve listed some of the popular message-oriented middleware (MOM) that I’m aware of below, for context to help understand how BlazingMQ fits in. You can skip this section if you’re either already familiar or uninterested.
Note that there are a lot of these types of systems out there, and this list barely scratches the surface—these are just the ones that I happen to have heard of.
So, now we know a little about what other systems, let’s take a look at BlazingMQ—in the authors’ own words it is a distributed message queuing platform with a focus on efficiency, reliability and a rich feature set for modern day workflows.
Thinking back to the earlier list of other MOMs, I would say if you need an immediate mental picture of what BlazingMQ is, then think of it as most similar to Rabbit MQ. It’s a queue-based system, where queues are lightweight and dynamically created and removed, as with Rabbit, and messages are removed once consumed by a client, rather than Kafka’s long-term persistence.
BlazingMQ does offer some features which set it apart from Rabbit, however, particularly around cases with large fan-out ratios. It supports proxies which can be used to build hierarchies of fan-out, for particularly large ratios. It also offers a broadcast strategy for a non-persisted publish/subscribe model. As opposed to Rabbit’s implementation in Erlang, BlazingMQ is written in C++, which also yields some performance benefits on like-for-like hardware.
We’ll go through some of these features a little later, but for now let’s start with the basics. A queue is the basic stream of messages between producers and consumers, and queues are grouped into domains. Each domain specifies a configuration which specifies how the queues within that domain will work. Some examples include:
A queue is identified using a URI such as bmq://some.domain.name/a.queue
where some.domain.name
is the domain, and a.queue
is the name of the queue within that domain. The name of a queue must be pre-shared by the producers and consumers, and this URI is how they connect to that queue.
As with Rabbit, queues are automatically deleted once they contain no messages and have no applications connected to them, and creating a queue is idempotent—it will be created when opened if it doesn’t already exist. Unlike with Rabbit, this creation isn’t a separate function, it happens transparently when a queue is opened. The Python code to open a queue in Rabbit might look like this (using the pika
library):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
Compare this with the Python BlazingMQ library:
1 2 3 4 5 6 7 8 9 10 |
|
Messages in BlazingMQ have three main components:
For performance reasons, operations within BlazingMQ are asynchronous—so when a producer has published a message to the broker, the producer cannot be certain that the operation was succesful until an asynchronous acknowledgement is sent back, which includes the message’s assigned ID. This can also be a negative acknowledgement, indicating the message was not accepted, although BlazingMQ tries to avoid this as much as possible.
When a consumer has accepted a message, it sends back a confirmation, which is also asynchronous. This notifies the broker that the message has been successfully processed—the message is persisted until this is received, so it can be delivered to a new consumer if, for example, the existing consumer crashes.
Now we’ve looked at the fundamental concepts in BlazingMQ, let’s drill into a bit more detail on the ways that messages can be routed from producers to consumers. This is determined by the mode of a domain, which applies to all queues within that domain—of course, there’s nothing to stop you using multiple domains to mix-and-match different operational modes.
The three modes are covered in the sections below.
The first case, priority mode, is a standard “competing consumers” setup, where a queue has one or more producing applications putting messages on the queue, and one or more consumers who retrieve them. This can be a simple work queue where messages are sent to all consumers in a round-robin fashion, but consumers can also be assigned a priority—messages are round-robined only between the set of consumers with the joint-highest priority.
Because the consumer priority is set per-queue, you could have some setup you have multiple queues with different streams of work, and each one having a set of “primary” consumers. But the consumers from the other queues could be a lower priority, so they can take on the additional work of the other queues if all the primary consumers go down for some reason.
Often processing particular work items will involve fetching other resources and holding them in local memory or disk, so focusing a particular stream of related items onto one or a small number of consumers can have performance benefits, as these resources can be fetched by a smaller number of hosts, and stay hot in caches. The ability to use lower-priority consumers as a fallback allows this performance boost with the resilience of alternative options in case of failures.
One thing that’s worth noting, however, is that the consumer priority is rigorously respected—if there is a single highest-priority consumer which gets overloaded, the BlazingMQ broker will still continue to send every message to that consumer. The priority can be dynamically updated at runtime, however, so there wouldn’t be anything to stop you dynamically reducing a consumer’s priority in line with its current load, for example.
The second mode of opreation is fan-out. This operates in a somewhat similar way to the priority mode described above, except that in fan-out a message may go to multiple consumers, whereas in priority mode it only ever goes to one.
In this mode, consumers need to identify themselves with an App ID string, and the list of possible App IDs need to be configured for the BlazingMQ domain up front. This is required, because the broker needs to maintain the message within its queue until it has a confirmation from a consumer in every App ID group. Within each group, consumer priority applies in the same way as for priority mode.
Unlike with Rabbit, there’s still only one message in the queue under the hood, so even if there are a large number of consumer groups (i.e. a high fan-out ratio), you don’t have the overhead of duplicating messages across a large number of queues.
However, the requirement to configure the list of App IDs up front, and the need for each group to be regularly consuming messages, does limit the flexibility of this mode somewhat to cases where the classes of consumer are known up front when the system is configured.
That said, it’s possible to configure a short message TTL for a domain, so any message not consumed by a particular App ID will be freed fairly quickly—but of course, this also harms the resilience of a system, since messages will be lost of no consumer is active for this amount of time. Also, in the long-term there is a planned enhancement to allow messages to spill over to lower priority consumers when the highest-priority ones are at capacity—but I’m going to assume that’s not happening any time soon.
The previous two modes are clearly quite similar to each other, where priority mode is effectively just a special case of fan-out mode with only a single implicit App ID. Broadcast mode, however, operates in quite a different way.
In this mode, messages are not persisted—upon arrival, they are immediately dispatched to waiting consumers and then discarded. In other words, this mode is a purist publish/subscribe model, where consumers are sent just the messages being broadcast whilst they are connected. This mode provides an at most once guarantee on message delivery.
This mode still supports a consumer priority, and at the point at which a message arrives, it will be distributed to all connected consumers with the joint-highest priority at that moment.
This mode does allow dynamic runtime changes in the group of consumers, which fan-out mode with it’s pre-configured App IDs does not. However, this is at the expense of the resilience provided by persisting messages until a consumer has confirmed them.
This mode is useful for cases where updates are frequent and only the most recent update is of interest—for example, updates on stock prices, or the number of tickets still available for a popular event, or the current location of a moving public transport vehicle.
The value of having this mode included in BlazingMQ is that you can go through the effort of setting up your middleware ecosystem once, getting hosts installed with the brokers etc. and you can use it for both resilient “at least once” persistent message queues, and you’re near-realtime “at most once” event buses.
Now we’ve seen the core message routing modes, it’s time to look at one of BlazingMQ’s most powerful features which is subscriptions. These can be used with any of the modes listed above, and provide a layer of filtering to allow a consumer to declare that they only want to receive a defined subset of messages which are produced.
When a message is available, only those consumers whose subscriptions match the message will be considered. If no such consumers are available, the message is held on the queue to wait until there is one, just as if no consumers were available normally. If multiple subscriptions match, then the first match is used, although priority is respected as we’ll see in a moment.
One thing that’s important to note is that subscriptions don’t change the semantics of the routing type—so if you’re using priority mode then still only one consumer will be given a message, or one per App ID if you’re using fan-out mode. This might be different to other messaging systems, such as Rabbit’s topic exchange which duplicates a message across all queues matching a pattern. I don’t think either of these choices is objectively better or worse, but they’re certainly different and you need to be aware of this behaviour when setting things up.
The filters in a subscription are specified by an expression which is matched against the message properties, so use of this feature depends on producers to set properties which are of interest to consumers.
Note that properties are more flexible than some other systems, such as Rabbit’s topic exchange, where the message key typically has a fixed structure. In BlazingMQ, properties may be optional and only specified on some messages.
For example, consider a message with the following properties:
org = 1102
user = andy123
request = "DELETE"
file = "/index.html"
The following subscription expressions would match that message:
user == "andy123"
org > 1000
(request == "DELETE" || request == "RENAME") && file == "/index.html"
But the following subscriptions would not match that message:
request == "CREATE"
user == "andy123" && org < 1000
file == "/index.html" && request != "DELETE"
As you can see from those examples, the expression language is fairly flexible, supporting integers, strings and booleans, and allowing ordering comparisons as well as simple equality, and compositing multiple comparisons with boolean operators and brackets.
There are limits to the expressions, however, and in particular no form of regular expression string matching is supported. This is pure speculation on my part, but this may be for performance reasons—doing simple comparisons on these pre-defined types is very fast, whereas a regular expression library is going to be orders of slower. Since evaluating these expressions is a key part of message routing, it’s critical that the broker can do it quickly for high volume throughputs.
Now we’ve seen how subscription filters work, how does this feature interact with priority? Well, if you consider subscriptions to be a way of filtering available consumers, and then priority is respected within the filtered set, then you’d be fairly close, but there are some nuances which at first surprised me.
The first thing to note is that subscriptions are evaluted in decreasing order of priority, so the highest priority consumer with a matching subscription will be chosen. Indeed, priority can be set on a per-subscription basis, as can some other options. So a given consumer may advertise a very specific subscription with a high priority as primary handler of that message type, but also advertise a more general subscription with a lower priority to act as a fallback consumer for other primaries if they go down.
Within a priority level, you might imagine BlazingMQ would round-robin between matching consumers. However, it turns out this only holds true if the subscriptions are the same3. So if you had two consumers with the same priority and different, but still both matching, subscriptions, then BlazingMQ will pick one of the subscriptions arbitrarily and use that consumer exclusively.
In practice I don’t think this is a particular problem, as long as you keep it in mind, since in reality these filter expressions will almost invariably be derived in code so making them identical shouldn’t be an issue.
One other nuance that’s worth being aware of is that if you have multiple consumers with the same subscription expressions (lexicographically) then they will be effectively merged into a group of consumers within the BlazingMQ backend. This involves merging attributes such as capacity—so the group of consumers will be treated as a single unit having the capacity of the sum of all the individual consumer capacities. This merging makes sure that load balancing works as you’d expect across these consumers.
I’ve covered what I feel to be a decent high-level overview of BlazingMQ’s features, although I haven’t got into the clustering and resilience features—I plan to cover these in the next article, along with some more details on the client/broker communication.
However, there are a few more useful features which I’m just going to mention at a high level.
We all know that software has bugs, and one particularly annoying case of this is where your code crashes because of, say, a malformed message. A resilient broker like BlazingMQ ironically makes this worse, because as each consumer crashes, it dutifully delivers this malformed message (the so-called poison pill) to each consumer in turn, and before you know it every service instance is down and you have a major outage.
To prevent this failure case, BlazingMQ imposes a configurable limit on the number of retransmissions of a given message—once it’s failed this many times, it is dropped from the queue and its payload is dumped into a file for later inspection to aid diagnosis of the fault.
The BlazingMQ SDK (i.e. the client library) provides transparent compression as an option. This is handy, but doesn’t impact the broker at all since the broker never peeks inside the payload of messages. As a result, it’s a convenient feature, but doesn’t really affect the architecture and isn’t something that would be too hard to implement yourself. Still, it’s nice not to have to.
The SDK only takes a request to compress as a hint, and typically won’t compress smaller payloads under 1KB or so, as the CPU cost of compression is wasted effort in these cases. Consumer applications automatically detect whether a payload is compressed on a given message, so consuming applications don’t have to worry about the choice that the producer made—this is one of those fiddly aspects that makes this a convenient feature to have in the SDK directly.
BlazingMQ’s push model gets messages into client applications as quickly as possible, to reduce the overall latency of processing. This works well whilst consumers are keeping up with the input stream, but if there are spikes in load this can cause messages to pile up in consumers. If one particular consumer from a group gets slow this can cause an issue similar to head-of-line blocking, where messages that pile up in the slow consumer could have been processed if held back and sent to other consumers processing faster.
To an extent this is an inherent risk of the “push” model, and one of the trade-offs you make for the reduced latency. Often this effect can be tolerated in small doses, but if the backlog becomes large then it can cause serious issues. Generally the solution to this is to limit the size of this backlog in any one consumer, and BMQ supports this approach.
Consumer applications can use the SDK to specify limits on the number of messages and/or the total bytes of pending messages which they are capable of handling at one time. These limits can be set on connection, or even on a per-subscription basis, and they can also updated dynamically. This ability to change it dynamically means that there is the potential for consumers to adjust their limits continuously in response to their own performance, which could be fiddly to implement4, but also quite powerful.
A consumer can specify zero for either limit to effectively pause new messages temporarily, which could be helpful for pausing new messages but without the impact of already sent messages being reassigned to another consumer, which would happen if the connection was dropped. This be useful whilst executing a graceful shutdown, for example. Consumers can also specify a limit of one message to effectively use something like a “pull” model, although of course this would lose you the performance gains which motivate the choice of a “push” model in the first place.
The broker uses the unconfirmed messages left in the queue but held by a consumer to decide whether it’s at its limit, so there’s no need for the consumer to send explicit “source quench” messages back to alert the broker to its overloaded status. As consumers confirm messages, the broker send more to fill the space.
Another handy feature that BlazingMQ provides is the ability to suspend reading from queues if the host or application becomes unhealthy—application authors write their own code to define what “unhealthy” means to them.
The way this is done depends somewhat on the language of the SDK in use, but I’ll use the core C++ SDK as an example. Application code defines a class derived from the abstract HostHealthMonitor
base class, and this must implement the two methods defined:
observeHostHealth()
hostState()
Applications should add whatever other methods and structure they need to ensure that the health status is tracked and any callbacks invoked—typically I suspect this would involve a separate thread which periodically performs various checks.
There are three health states that the callback will accept and hostState()
function can return:
e_HEALTHY
e_UNHEALTHY
e_UNKNOWN
If you install these, you will recieve some additional session events in your consumer code which you can use to add whatever additional handling you wish to unhealthy conditions:
e_HOST_UNHEALTHY
when a host moves from healthy to unhealthy.e_HOST_HEALTH_RESTORED
when a host recovers and all queues have been resumed.There are also queue events:
e_QUEUE_SUSPENDED
when a queue is suspended due to an unealthy host.e_QUEUE_RESUMED
when the queue is re-enabled.These events are probably of primary interest to producers, which may need to take other action to stop new work items being created or sent to them. Consumers, in general, will just need to passively wait until the queues are re-enabled and they can resume their work.
Note that only queues with the health monitoring option set will respect this health and be suspended—other queues registered without this option will continue to function, so it’s entirely within the application author’s control which queues are affected.
Given that these host health checks are entirely optional, there’s no downside to them being present as an available feature, and in some cases they could be very useful indeed. It’s likely that a good proportion of applications using queues will depend on them heavily for events or work items to process, with the main event loop being the core of the whole application. Therefore it makes sense to integrate healthy monitoring into this loop, and BlazingMQ makes this comparatively straightforward.
The final detail I’d like to mention is that when configuring a domain, you have a choice of two options for the consistency guarantees you’d like BlazingMQ to make. To understand this fully probably requires an understanding of how a series of brokers act as a cluster, and that’s something I’m covering in the next article, but for now it suffices to know that each queue is assigned to a single primary node in a cluster, and messages on that queue are duplicated across the other nodes for resilience.
There are some optimisations to the implementation which makes it a little more efficient than these diagrams appear—for example, the replication of messages is sent in batches rather than individually—but the principle should be clear.
Note that even in the strong consistency model there is the outside possibility of some slightly odd edge cases in the presence of network issues. For example, the primary node has a timeout on waiting for replication acknowledgements from replicas, and if it hits this timeout then it will send a NACK (negative ACK) to the producer indicating that it couldn’t accept the message. It would normally not deliver the message to consumers at this point, but it’s not impossible that some replica received the replicated message, and then the primary crashes and that replica becomes the primary—at this point, it would have no knowledge of the timeout having been reached and would attempt to deliver the message to a consumer.
Such edge cases are probably vanishingly rare, but it just demonstrates that it’s never possible to quite close all the holes, so your applications need to be somewhat resilient against these edge cases if the consequences of message loss and duplication are severe.
That’s all I’m going to cover in this article. I hope it’s given you a good understanding of what BlazingMQ is, a set of its core features and how they differentiate it from other message-oriented middleware.
Personally I think it’s a really useful design for certain situations where your goal is to get as close to “exactly once” delivery as you can, and you want great resilience—but you also want to have your own load distribution schemes or other complexities that other messaging systems just can’t quite deliver. The subscription model in particular is quite powerful, and particularly the way that it interacts with priorities—this allows selecting processing on particular nodes for better locality performance gains, but with fallback options in cases of partial failures to keep the system running without manual intervention.
The fact that the code is also written in pure C++ with no dependency on any other frameworks or middleware is a nice plus—the fact that I’m quite able to run a broker and clients in Docker on my laptop indicates that its footprint under modest loads is quite small. If you’re interested, the team behind BlazingMQ have published some benchmarks, and rates of around 100,000 messages per second or more seem very achievable, although of course this will depend heavily on the configuration and specifics of the load. Mind you, many use-cases don’t come anywhere close to this messaging rate, so I suspect for the vast majority of people the messaging rate is simply a lot higher than you’ll need.
I should also mention that the project’s documentation is really excellent, and where I got the majority of the information for this article. It also goes into more detail than I’ve been able to here, so do check it out if you want to know more. The getting started guide in particular is great if you want to play with it—it walks you through getting a Docker image of the broker and clients set up, and it was really helpful for understanding some of the nuances around the interactions between subscriptions and priorities in particular.
I plan to follow up soon with another article in this series where I’ll look at how a cluster of BlazingMQ brokers work together, and also some more details on the client/broker messaging. Until then, I hope this article has been of interest!
AMQP is an open standard protocol for MOMs to communicate, and which imposes some consistent core semantics on the participants. It was developed in 2003, and is supported by a number of different messaging systems. ↩
This used to be assigned by the BlazingMQ broker and returned in an acknowledgement, but this was changed to allow more effective message deduplication. This will be covered in the next article along with other resilience features. ↩
I think the definition of “the same” here is that the subscription expressions compare lexicographically equal, but that’s based on very limited experimentation so the actual behaviour may be subtly different in ways of which I’m not aware right now. ↩
As an aside, if you do fancy implementing this sort of continuously adjusting throttling then the factor that you’ll want to spend effort on getting right is the damping. If you adjust the throttle too quickly, the value will seesaw back and forth violently in a way which can kill performance, and if you adjust it too slowly then a large backlog can still build up while you’re slowly ramping down the flow in the fact of a sudden load spike. When was in a team faced with this situation in the past, the approach we found worked best was to set up a simple simulation which ran the load shifting heuristic under a range of different load profiles and accelerated time, and found the parameter values which maximised the throughput under the greatest variety of expected loads. ↩