☑ BlazingMQ: Client/Broker Protocol

20 Oct 2024 at 5:45PM in Software
 | 
Photo by Andy Pearce (via Midjourney)
 | 

BlazingMQ is a message queue middleware system developed at Bloomberg which was opened sourced around a year ago. I think it has a lot of very useful features for some use-cases, so this series of articles looks at it in a bit more detail. In this fourth article I’m going to discuss the low-level details of the network protocol between clients and brokers.

This is the 4th of the 4 articles that currently make up the “BlazingMQ” series.

blazingmq bricks

In this article in this series on the BlazingMQ middleware, I’d like to drill into the details of the client/broker protocol. Mostly this is because I was curious—I come from a background of writing network stacks on embedded devices, so I’m always curious about protocols. In terms of utility, however, this might be useful for anyone planning to implement a BlazingMQ client library in a new language.

Before I start, I should say that the official documentation has an excellent page on the protocol, which contains a lot of great details. I’m still writing this, partly because I find writing up an article is a great way to learn something, and partly because I’m quite a visual person and I wanted to produce a few diagrams to illustrate the concepts. How much value I’m adding for other people on top of that page is hard to say, but sometimes having something explained a different way can be helpful to cement your understanding so this might still be a useful article.

A note on terminology: in keeping with the BlazingMQ documentation, I’ve adopted the conventions here that a word is a 32-bit value and a halfword is a 16-bit value, although I’ve also tried to avoid ambiguity when using them as well.

Packet Structure

Before we look at how packets are exchanged during handshake and sending and receiving messages, first let’s look at their basic structure.

Packet Header

All BlazingMQ packets start with the same header, which is shown in the diagram below.

BlazingMQ protocol header structure

The fields in the header are all transmitted in typical network byte order, i.e. big-endian1. The fields in the header are:

Fragment
This is a single bit which is always 0 in the current versions of the protocol.
Length
The remaining 31 bits represent the number of bytes in the packet, including this header.
Protocol version
There’s a protocol version field, which should always be 01 for now. Since it’s only two bits, the assumption is that protocol version changes will be extremely rare.
Type
The type of the payload of this packet. Packets may sometimes contain multiple individual messages, but they must all be of the same type. Note that the list of types in the diagram above also contains messages only sent between brokers, so we’re only going to be looking at types 1-5 in this article.
Header words
The number of 4-byte words in the header. Currently always 2, but this allows the header to be expanded without breaking older clients who don’t support the extension—they can just ignore the additional bytes.
Type specific
The interpretation of this field depends on the type of the message.
Reserved
The final byte of the header is currently reserved for future use, and all current clients must set it to 0.

Packet Types

The message types fall into two basic types: binary and schema.

Binary message payloads are binary structures with a pre-defined structure. These are used for the very common message-passing operations, and these message types are the ones which support packing multiple messages into a single packet. The message types which use a binary format are PUT, ACK, PUSH and CONFIRM.

All the remaining message types are schema messages, and typically these are much less frequently, being used during initial handshake and to subscribe and unsubscribe from queues. These messages use the message type 1 (CONTROL) in the header. These messages are never batched, so every packet either contains one or more binary messages, or a single schema message. The protocol supports multiple encodings, including JSON, BER and XML, but JSON is recommended.

All the remaining message types in the diagram above are used for communication between brokers, and are outside the scope of this article.

The control messages exchanged between client and broker are:

NegotiationMessage
This is used for the initial handshake on a new connection from client to broker. It has two flavours, one used by the client and one uses as a response by the broker. We’ll look at this negotiation in more detail a little later.
OpenQueue and OpenQueueResponse
Used by the client to attach to a queue, either for sending or receiving messages. The response is sent back by the broker.
ConfigureQueueStream and ConfigureQueueStreamResponse
After opening a queue, the client sends this to configure its parameters, such as priority and flow control. It can send additional instances later to change these parameters. The response is sent back by the broker.
CloseQueue
Sent by the client to unsubcribe from a previously opened queue.
Disconnect and DisconnectResponse
Sent by the client to close the entire session with the broker. The response is sent back by the broker.

Protocol Handshake

Putting all this together, here’s an illustration of the initial handshake between a client and broker. Against each message I’ve listed what I understand to be the attributes which are sent, although some of those are potentially optional. I don’t plan to talk through all these in detail, but I think it’s helpful to see the kinds of information that is exchanged.

BlazingMQ client/broker handshake

Based on this, let’s run through these messages in more detail.

As an aside, to look at the content of the packets, I started off using tcpdump to capture network traffic and dump it to a file, and Wireshark to view them. This worked to an extent, but the default use of BER encoding was a bit of a pain to decode—I did enough with the asn1 library in Python to convince myself things were as I’d expect, and then I just fellback on using bmqtool which produces detailed logging of packet contents.

This is probably why the authors recommend using JSON for schema messages for new code, since it’s going to be a lot less hassle than getting the BER encoding working, and you get the human readability (useful for debugging) without the verbosity of XML2.

Negotiation » ClientIdentity

The Negotiation message has two flavours, the first being ClientIdentity which is the first packet sent by the client after it has set up the TCP connection. Here are the attributes set by bmqtool when I ran it:

protocolVersion = 1
sdkVersion = 999999
clientType = E_TCPCLIENT
processName = "/Users/andy/src/misc/blazingmq/build/blazingmq/src/applications/bmqtool/bmqtool.tsk"
pid = 67171
sessionId = 1
hostName = ""
features = "PROTOCOL_ENCODING:BER,JSON;
            MPS:MESSAGE_PROPERTIES_EX"
clusterName = ""
clusterNodeId = -1
sdkLanguage = E_CPP
guidInfo = [
    clientId = "C28F4E06E47E"
    nanoSecondsFromEpoch = 1728746082922639000
]

It’s worth noting that the processName, hostName, features and clusterName fields were the only ones that seemed to be represented by literal strings in the BER encoding—all the others were either numeric or nested types.

Many of these fields seem useful for diagnostics, with features in particular being of more importance as it may presumably affect how the broker interacts with it. The clientId is used as part of generating unique message GUIDs, I believe, and I’m assuming that nanoSecondsFromEpoch is sent just to allow the broker to resolve the timestamp in the GUIDs generated by client SDK back into an absolute time—but that’s spectulation on my part and I don’t know how critical this is.

Note that since I was just using a single broker, I suspect the cluster-related fields won’t be populated in any of these messages. But in this case, I don’t think the client would populate these anyway—I think they’ll be used primarily for broker/broker messaging, such as a proxy communicating with members of a cluster.

Negotiation » BrokerResponse

The broker then responds with the other variant, BrokerResponse. The response to the bmqtool request outlined above was as follows:

result = [
    category = E_SUCCESS
    code = 0
    message = ""
]
protocolVersion = 1
brokerVersion = 999999
isDeprecatedSdk = false
brokerIdentity = [
    protocolVersion = 1
    sdkVersion = 999999
    clientType = E_TCPBROKER
    processName = "/usr/local/bin/bmqbrkr"
    pid = 1
    sessionId = 4
    hostName = "earth"
    features = "PROTOCOL_ENCODING:BER,JSON;
                HA:GRACEFUL_SHUTDOWN,GRACEFUL_SHUTDOWN_V2,BROADCAST_TO_PROXIES;
                MPS:MESSAGE_PROPERTIES_EX;
                SUBSCRIPTIONS:CONFIGURE_STREAM"
    clusterName = ""
    clusterNodeId = -1
    sdkLanguage = E_CPP
    guidInfo = [
        clientId = ""
        nanoSecondsFromEpoch = 0
    ]
]

Once again, much of this looks primarily for diagnostic purposes, but there are some key fields. The most important one is, of course, result, which indicates whether the client’s request to connect to the broker was successful. I’m guessing if this was anything other than E_SUCCESS then the TCP connection would be broken immediately after sending the response.

I believe isDeprecatedSdk is used for the broker to reject clients using versions of the SDK which are presumably still compatible, as this would be caught by the protocolVersion, but which are otherwise disallowed. For example, perhaps particular builds of the SDK might be found to have critical security flaws which mean they should be disallowed.

The features field in the brokerIdentity structure is the fairly obvious analogue of the same field in the client request, and much of the rest is self-explanatory. I was surprised to see the guidInfo field still present, because it’s clearly not been populated with anything useful, but I’m going to assume this is just to keep the schemas more consistent between the two variants of the Negotiation message—if it ever is populated, I’m not quite sure what that would mean.

OpenQueue

After the session is created, the client application will then typically proceed to open one or more queues. The OpenQueue control message is sent for this purpose.

handleParameters = [
    uri = "bmq://bmq.test.persistent.priority/somequeue"
    qId = 0
    subIdInfo = NULL
    flags = 2
    readCount = 1
    writeCount = 0
    adminCount = 0
]

The key fields here are the uri of the queue to open, and then qId to identify the handle to use for this client—we saw this used in the API in the previous blog post. The use of an integer ID avoids having to pass the full queue URI in each subsequent message, and it only needs to be unique between a given client and the broker.

The other important field is flags which indicates how the client is opening the queue. This is a bitfield with the following possible values:

Name Value Meaning
ADMIN 1 Open in admin mode (where permitted)
READ 2 Open queue for consuming messages
WRITE 4 Open queue for posting messages
ACK 8 Receive ACK events for posted messages

Most clients will be some combination of READ and WRITE, and I’m not quite sure what ADMIN is for but I’m guessing it’s for tasks who can open the queue in a privileged mode to make administrative updates at runtime.

The subIdInfo is used for fan-out consumers. As we saw in the first article in this series, these consumers supply an App ID string, which functions rather like a consumer group in Kafka. The subIdInfo contains the App ID for this consumer, but also a sub-queue ID which is an integer field. I’m not quite sure what this integer field is for, or whether it’s allocated by the client or the broker, but I don’t think this detail is too critical for the level of this article.

Finally, the readCount, writeCount and adminCount seem to be either zero or one depending on whether the corresponding flag was set. I’m guessing this is primarily of use for broker/broker communications, such as with proxies, which presumably send the total count of readers, writers and admin clients connected to that broker. Since an individual client only represents itself, each of these counts is either zero or one.

OpenQueueResponse

The response to an OpenQueue message contains just the original request, unsurprisingly in a field called originalRequest, and a few other fields on top. I didn’t bother including the original request, since we saw it above:

originalRequest = [
    handleParameters = [ ... ]
]
routingConfiguration = [
    flags = 2
]
deduplicationTimeMs = 300000

The flags field in the routingConfiguration structure indicates how messages will be routed to consumers on this queue, and can take the following bitfield values:

Name Value Meaning
AT_MOST_ONCE 1 At most once semantics
DELIVER_CONSUMER_PRIORITY 2 Deliver only to highest-priority consumers
DELIVER_ALL 4 Deliver to all available consumers
HAS_MULTIPLE_SUB_STREAMS 8 Consider multiple consumers as destination

As far as I can tell, a queue in broadcast mode has DELIVER_ALL and AT_MOST_ONCE set, and fan-out queues have HAS_MULTIPLE_SUB_STREAMS set. I would expect DELIVER_CONSUMER_PRIORITY to always be set for non-broadcast queues, but I’m guessing this setting may exist from a time before BlazingMQ supported consumer priority, and some queues could be declared as not supporting it.

The deduplicationTimeMs is the number of milliseconds to maintain a record of message GUIDs for the purposes of deduplication, as discussed in the second article of this series. You can see in the message above, the default of 300,000 milliseconds, or 5 minutes, was specified.

ConfigureQueueStream

After the client has received OpenQueueResponse, they should then send the ConfigureQueueStream request. The documentation says that both producers and consumers need to send this, but the values are generally only applicable to consumers.

However, when I was using bmqtool I didn’t see any log trace indicating this had been sent—but I definitely did see such trace when I opened the queue for reading. It’s possible that it was still sent under the hood, of course, and there’s just no log trace for this since it would presumably be sent with just some dummy values.

Anyway, the message reproduced below was taken from the bmqtool log trace, so I’m not 100% sure how closely this matches what gets sent on the wire. In particular, I didn’t specify any subscriptions, so I was a little surprised to see some dummy entries in the expression field within subscriptions:

qId = 0
streamParameters = [
    appId = "__default"
    subscriptions = [
        [
            sId = 1
            expression = [
                version = E_UNDEFINED
                text = ""
            ]
            consumers = [
                [
                    maxUnconfirmedMessages = 1024
                    maxUnconfirmedBytes = 33554432
                    consumerPriority = 0
                    consumerPriorityCount = 1
                ]
            ]
        ]
    ]
]

It looks like if you’re not using fan-out, a dummy App ID of "__default" is used, and the consumers section in subscriptions looks like it contains the flow control parameters, which we looked at in the first article. I didn’t specify values for these, so they’ll be whatever default bmqtool uses.

For comparison, here’s another example where I specified a subscription using the expression xyz > 123:

qId = 0
streamParameters = [
    appId = "__default"
    subscriptions = [
        [ 
            sId = 1
            expression = [
                version = E_VERSION_1
                text = "xyz > 123"
            ]
            consumers = [
                [
                    maxUnconfirmedMessages = 1024
                    maxUnconfirmedBytes = 33554432
                    consumerPriority = 0
                    consumerPriorityCount = 1
                ]
            ]
        ]
    ]
]

You can see that the dummy subscription entry isn’t sent, and instead we just see the single subscription that I specified.

ConfigureQueueSteamResponse

Finally, we get the response to our configure request:

request = [ ... ]

As you can see, this simply reflects back the request that was sent as confirmation. Not much else to say here.

Sending and Receiving messages

So we’ve seen the exchange of packets when a client connects to a broker, and opens a queue for reading or writing or both. Now let’s see the flow of packets when a message is sent to the broker by a producer application.

To refresh your memory on the basic flow, here’s a diagram showing the flow of messages from producer to broker to consumer, which was included in the second article in this series:

BlazingMQ message flow

As you can see there are four primary message types involved in this flow:

  • The PUT adds the message to the queue
  • The ACK confirms back to the producer whether the message was queued
  • The PUSH sends the message to one or more consumers
  • The CONFIRM is sent by the consumer to indicate it’s processed the message.

As mentioned earlier, all of these message types are sent in binary format. In addition, each packet may contain multiple such messages. In each of the sections below we’ll look at the structure of each of these payloads.

In all of these cases, the packet as a whole has the common header that we saw earlier, and if multiple messages are included in a single packet, these are simply concatenated together.

PUT Message

The PUT message has quite a few different parts to it, as you’ll see in the detailed diagram I drew of the full message structure below. But first it’s useful to look at the high-level structure of this packet. After the event header that all packets share, the structure is as follows:

BlazingMQ PUT message structure summary

So, there’s an additional PUT header which is specific to PUT messages and found at the start of each message within the packet. After this there are, in principle, zero or more options—as per the documentation, however, PUT messages don’t currently contain options. This may change in future, however, so any client code should at least tolerate options being included, even if it just ignores them.

Following the options, there’s an optional message properties section—a flag within the PUT header indicates whether properties are included. If so, there is an initial message properties header, which is then followed by a message property header for each included property. Finally, each property name and value is included.

Following any message properties section, there is finally the message payload itself.

With all that in mind, hopefully this more detailed diagram of the actual packet structure is now comprehensible:

BlazingMQ PUT message structure details

This is a lot to take in, but taken one part at a time it’s all fairly straightforward.

PUT Header

First let’s look through the fields in the PUT header:

Ack Requested
The header starts with four bits of flags, the first two of which are currently defined and the other two are reserved for future use. The first flag indicates whether an ACK should be sent back to the producer for this PUT or not.
Message Properties
The second flag indicates whether message properties are included in this message.
Message Words
The remaining 28 bits of the first word indicate the number of 32-bit words which make up the entirety of this PUT message, including this header, all the other headers, plus any options or properties, plus the payload and any padding required to align things with a word boundary.
Options Words
A 24-bit value indicating the number of 32-bit words comprising the options, including any padding. For the moment, clients cannot support any options since none are defined, so they should just skip this many words of the packet following the header. This may change in the future.
Compression Algorithm Type
A 3-bit value indicating the compression algorithm, if any, used for the message payload. The only defined values at present, as far as I can tell, are 0 for no compression, or 1 for zlib’s DEFLATE algorithm.
Header Words
A 5-bit value indicating the number of 32-bit words in this header.
Queue ID
This is the numeric queue identifier that was assigned when the queue was opened.
Message GUID or Correlation ID

This is one aspect I’m slightly hazy about. As far as I can tell, this is either a message GUID, if the client is choosing that; or a correlation ID which is used to link this request with asynchronous responses, if the broker is going to pick the message GUID. The code says the interpretation varies “depending on the context”, so I don’t know if it’s something that just predates the SDK creating the message GUID, or if there are still cases where the correlation ID is used.

The correlation ID is limited to 24-bits, stored in bytes 1-3 of the field with the remaining bits zeroed. The message GUID, however, occupies a full 32 bytes (i.e. four words).

CRC32C
A 32-bit checksum calculated using the CRC32C algorithm. This is a fairly well-used checksum used not only in iSCSI, but also filesystems such as Btrfs, ext4 and Ceph, and many platforms provide hardware acceleration of its calculation. The checksum is calculated over the “application data”, which consists of the part of the message containing the properties and the payload collectively.
Schema ID

A unique identifier for the sequence of message property names (not values) included in this message. At first I thought this was some sort of hash, but from a glance at the code it looks like it’s essentially a memoisation cache that just sequetially assigns IDs to each unique set of property names. Once the size limit has been reached, IDs are recycled in on a least-recently-used basis.

I must confess I’m still a little hazy on the importance of this schema ID, but I’m assuming it’s used to avoid having to do so many string comparisons against property names all over the place.

Reserved
The final two bytes of the header are reserved for future use, and should always be set to zero.

Message Properties

As mentioned earlier, no options are yet defined for PUT messages, so the next section we’ll look at is the message properties. As noted above, these are only present if the appropriate flag is set in the header.

If present, the first item is always the message properties header which just occurs once in the PUT message. Let’s look at the fields included in this:

Reserved
The first two bytes are reserved and always zero.
Property Header Size
A 3-bit field specifying the size of the per-property header structures in 16-bit halfwords. The reason for using 16-bit values is that these headers are half-word aligned and hence may not be a multiple of 32-bit words. The maximum size of these headers is therefore 14 bytes.
Properties Header Size
A 3-bit field specifying he size of this header in 16-bit halfwords.
Properties Area Words
The next three bytes store the total size of the message properties area, including this header, the property headers and the property names and values. This value is in units of 32-bit words, since the entire area must be word-aligned. The 24-bit value is still in network byte order, so the second byte in the header stores the upper 8 bits of the value, the next byte the middle 8 bits and finally the low-order bits.
Reserved
The next byte is also reserved, and should be all zeroes.
Number of Properties
The final byte stores the number of properties for this message, leaving the theoretical maximum as being 255 properties per message. This would seem to me to be more than enough, but one should always be aware that such assertions can come back to bite one!

Immediately following this are a series of property header structures, one for each included property. The fields in each property header are:

Reserved:
The first byte is reserved and should always be 0.
Property Type

This is a 5-bit value indicating the data type of the property value, using the following values:

Value Type
1 bool
2 char
3 int16_t
4 int32_t
5 int64_t
6 string
7 char[]

I’ve used C/C++ types here, but the meaning should be fairly obvious from the names, I think.

Property Value Length
A 26-bit value indicating the length, in bytes, of the property value.
Property Name Length
A 12-bit value indicating the length, in bytes, of the property name.

The part which is perhaps a little counter-intuitive is that the headers for all the properties occur first, before the actual names and values. This makes sense when you think about it, however, as it means all of these headers remain halfword-aligned. The names and values can be arbitrary length, and so are only byte-aligned—this means leaving them to the end makes the serialising and parsing quite a lot easier.

So, after all these headers all that’s left is the names and values in pairs, just packed in with no particular alignment. They don’t need any delimiters or structure, because the property headers already indicated the length of each one.

Finally, if the entire message properties area isn’t word-aligned then sufficient zero bytes are added to round out the total length to a 32-bit word boundary.

Message Payload

After all this, the message payload itself is added, which is just a blob of bytes. The length of this is determined by subtracting the size of the message properties, options and headers from the message words value in the header.

ACK Message

OK, so the structure of the PUT message was a bit of a marathon. Thankfully, the structure of the ACK is quite a lot simpler.

Essentially the structure here is a single header for the whole packet, followed by a section for each message being acknowledged. This is different to the PUT message, where each separate PUT has its own header—in this case the header really one per entire packet.

BlazingMQ ACK message structure

The fields in the ACK header are as follows:

Header Words
The number of 32-bit words in this header.
Per Message Words
The number of 32-bit words in each ACK message following this header.
Flags
The next byte consists of 1-bit flags, but as far as I can tell none are defined yet.
Reserved:
The final two bytes are reserved for future use and should be zeroed.

Following this simple header are one or more ACK messages which comprise the acknowledgements themselves. Each one of these has the following fields:

Reserved
The first 4 bits are reserved and should be zeroed.
Status

The status of accepting this message. The following values are currently defined:

Value Name Purpose
0 SUCCESS Succesfully enqueued
1 LIMIT_MESSAGES Queue has reached messages limit
2 LIMIT_BYTES Queue has reached byte limit
6 STORAGE_FAILURE Disk full on broker
7 NOT_READY Was not expecting PUT
Correlation ID
I’m assuming that this is the correlation ID from the PUT message. Once again, I’m a little hazy on exactly what is specified here, given that the PUT message specifies either this or the message GUID. These ACK messages specify both, however.
Message GUID
This will be the message GUID of the message created, whoever allocated it.
Queue ID
The same queue ID as was in the corresponding PUT message.

PUSH Message

We’ve covered the two types used by producers, now we look at the pair that are used by consumers. First the PUSH message, which is used by the broker to send new messages to consumers.

Perhaps unsurprisingly the structure of this is very similar to that of the PUT message we looked at above—each PUSH message in the package consists of a header followed by zero or more options, then optionally some properties, and finally the message payload. The header is also very similar to that of the PUT message. There are a few notable differences, however:

  • PUSH messages do currently have one defined option, which we’ll look at in a moment
  • There is no CRC32C field in a PUSH message
  • The header always contains the message GUID, never a correlation ID

That said, let’s take a look at the message structure:

BlazingMQ PUSH message structure

PUSH Header

The fields in the header are very similar to the PUT header, as I said, but there are some differences in the flags, so let’s run through them:

Implicit Payload
This flag seems to indicate that the payload is not included in this message. What I’ve not been able to determine is under what circumstances this might happen, however—it may be as simple as when a producer explicitly sends an empty message, just as a way to share message properties, or there may be some more subtle circumstances in which this is used.
Message Properties
As with PUT messages, this indicates whether this message has any properties specified.
Out of Order
I’m speculating that this is set when the broker knows that it’s send the message after one that was logically earlier in the stream, but I’m not quite sure of this or whether this has some additional implications that I’m missing.

PUSH Options

Let’s first talk about the general structure of options, and then look the sole option currently available in PUSH messages.

Each option consists of an option header which may be followed by some content. Here are the fields in the header:

Option Type

This specifies the type of this option. Although the documentation claims that only a single option type is defined, SUB_QUEUE_INFOS, the code actually defines three types:

Value Name
1 SUB_QUEUE_IDS_OLD
2 MSG_GROUP_ID
3 SUB_QUEUE_INFOS

Based on the name, I’m assuming the first is an older equivalent of SUB_QUEUE_INFOS that was deprecated as the feature evolved. The second option looks more promising, but it’s possible that it’s either also old, or perhaps an experimental feature which isn’t properly supported yet.

Either way, I’m going to look at just the third option in a moment.

Packed & Option Words

This 1-bit flag indicates the meaning of the following OptionWords field, and whether there is any option content following this header. If this flag is zero, then the OptionWords field contains the number of 32-bit words which comprise the content for this option following the header.

If, however, this flag is set then the OptionWords itself is used for additional type-specific content and there will be no option content following the header.

Type Specific
The interpretation of this 4-bit field depends on the option type, as the name implies.

Sub Queue Infos Option

So now we’ve looked at the generic structure of how options are specified, how is the only defined option, SUB_QUEUE_INFOS used?

As you’re probably getting tired of me saying, I’ve pieced this together from code and comments, so I may have some details wrong, but there’s the gist of what I’ve figured out. Hopefully the developers will update the official documentation to fill in some of these gaps at some point.

There are two basic cases here, one of fanout mode and one for non-fanout modes. Let’s look at non-fanout modes first.

In the non-fanout case, it seems that the option uses packed mode and Option Words field is used to store two pieces of information:

  • Sub-queue ID
  • RDA counter

The RDA counter appears to be effectively a TTL on the message—I’m speculating that RDA stands for “redelivery attempts”. This probably relates to the poison pill detection feature that we looked at in the first article. I could only speculate on why it needs to be included in the PUSH message as opposed to just being tracked in the broker, but it may be due to the heavily distributed nature of BlazingMQ.

All this said, I’m not even sure that packed mode is ever currently used with this option at the moment.

In the fan-out case, there are typically multiple App IDs, and it seems to me that each of these is represented as its own sub-queue. As a result in this case the Option Words field contains the number of 32-bit words in the option content, and this content will form an array of SubQueueInfo structures. The Type Specific field in this case indicates the size of each item in the array (not sure if this is in words or bytes).

Each item in this array appears to encode the same two fields, a subqueue ID and an RDA counter. So my suspicion is that the purpose of this option is to track these values separately for each App ID, since each one acts as a separate group of consumers. The subqueue ID is, I believe, something that the SDK chooses just to uniquely identify that particular App ID between that client and broker, in the same way that a queue ID is used as a more efficient representation of a queue.

Once again, though, I realise none of this is a particularly satisfying explanation of why these things must be communicated in the PUSH message as opposed to just being handled internally within the broker. If I get some more conclusive info on this and the other areas I’m hazy on, I’ll post a short follow-up article and edit this one to link to it.

Message Properties and Payload

The remainder of the PUSH message works just like the PUT message we’ve already covered in some detail, so I won’t repeat it here.

CONFIRM Message

The last message that I’m going to look at this article is the CONFIRM, which the consumer sends back to the broker to indicate that it has processed a message and it can be removed permanently from the queue. Until this confirmation is received by the broker, the message will be regarded as still live and may be delivered to other consumers if the current consumer disconnects.

Similar to the ACK message, these consist of a single CONFIRM header at the start, and then one or more CONFIRM messages which are the actual confirmations.

BlazingMQ CONFIRM message structure

The header is the same as for the ACK message we saw earlier, the only difference being that there isn’t a byte reserved for flags, and instead all three remaining bytes are just reserved. This difference isn’t particularly meaningful, however, since you’ll recall that there aren’t any flags reserved for ACK messages anyway yet. This may change in future, of course.

The Queue ID field we’ve already seen many times before, as with the Message GUID field.

The SubQueue ID is new, and the documentation mentions that this ID is only unique between the client and broker for this particular session and queue, and also that it’s chosen by the SDK. This more or less concurs with my speculation earlier around the SUB_QUEUE_INFOS option, which is that the subqueue ID is just an efficient handle attached to a particular App ID on a particular queue within the lifetime of a particular session, just as a queue ID is an efficient handle on a queue within a domain.

If fan-out isn’t in use, then I’m going to totally guess that everyone just passes the subqueue ID as zero everywhere, since there’s only ever one subqueue, but that’s speculation on my part. If I ever get around to writing a client library at some point, I’m sure I’ll find out.

Closing Queues and Sessions

One thing I decided not to drill into in detail was the way that clients can close a queue or the entire session as needed. However, I thought I’d briefly mention the messages involved, even if I didn’t choose to go through their detailed structure.

Closing Queues

Closing a queue is the mirror image of opening it. First, a ConfigureQueueStream packet is sent with empty flow control and subscription parameters. This indicates to the broker that the client is about to detach from the queue, and presumably stops further messages from being sent if it’s a consumer.

After this, a CloseQueue packet is sent. This contains the same queue handle parameters as the previous OpenQueue packet, but also contains an additional isFinal field which, if set to true, indicates that this client doesn’t intend to reconnect to this queue in the near future. From a brief look at the code, this seems to mostly be used as a hint as to whether the broker should keep resources around or free them, but I don’t have enough familiarity to be much more specific than that.

Closing Sessions

When an application wants to stop using BlazingMQ entirely, which would typically be at shutdown, it can close the session entirely. The documentation doesn’t say so explicitly, but I’m assuming that it’s normal to first close any open queues before doing this—it’s possible that I’m being overly cautious and that would be wasted effort. It would be useful to know when implementing a client, because closing queues would add that bit of extra delay to shutting down an application, which you typically want to be both fast and safe.

To initiate the disconnection the client sends the Disconnect method, and then waits for a DisconnectResponse to come back. The idea is that if anything’s still in-flight, it’ll be flushed down to the client to be handled before DisconnectResponse is sent, so once that arrives the client application knows it’s safe to close the TCP connection without risk of dropping any messages.

Since the SDK uses callbacks to handle events, the implication here is that the application must ensure that these callbacks can still work correctly until the SDK has come back to indicate that the session has been closed—i.e. the DisconnectResponse has been recieved.

Of course, for resilience the application should set some sort of time limit and kill things hard if it’s exceeded, but typically service management layers (e.g. systemd, SysV init, etc.) will do this anyway with a SIGKILL if a services takes too long to shut down—still, this has a tendency to create other annoyances because people don’t always write their code to cope gracefully if it starts up after an unclean shutdown. Maximising the chances that your application can shut down quickly and safely every time usually makes your life easier.

State Machines

That was more or less what I wanted to cover in this article, but in the course of browsing through the code I did spot something else of interest which I think is quite helpful for understanding the behaviour of the code: state machines.

I do love myself a finite state machine, as I find them great ways to understand how a system responds to external events, and the BlazingMQ protocol handling actually implements formalised state machines so it was fairly easy to convert them to diagrammatic form.

Well, easy in principle, although I did struggle to find a clear represetnation given the number of transitions—but they’re the best I could manage.

Session State Machine

First up, we have the state machine for the session as a whole. In terms of the states this is fairly simple, but the number of transition events is really the interesting point here. The authors have clearly considered how all the possible failure modes should be handled, even if, in some cases, the handling of them doesn’t actually affect the current state.

BlazingMQ session state machine

I’m not going to drill into this in detail, partly because I haven’t gone through the code in enough detail to offer much more insight that the state machine itself, partly because I don’t think a deep understanding adds all that much unless you’re literally planning to work on the code, and partly because this article is already longer than I intended.

One thing I will say, however, is a quick note on terminology that’s used in the code (and on my diagram). As far as I can tell, the channel referred to is the underlying TCP connection (or potentially other transport channels, I guess), and the session is what’s established once the initial Negotiation messages are exchanged.

So, a CHANNEL_DOWN event would be when the underlying TCP connection is dropped for some reason, whereas a SESSION_DOWN event would be receipt of the DisconnectResponse message from the broker.

Queue State Machine

The other state machine I found was for a queue, and this is significnatly more complicated. This makes sense, because to initialise a queue takes multiple steps, and the state machine not only has to track progress through these steps, but then also cope with all the different failure modes gracefully in all these states.

BlazingMQ session state machine

This was the diagram I really struggled to make readable, but I’m hoping it’s somewhat legible. You can see the happy path on the left-hand side, where all the messages get through and are correct—all of the other states are around error handling, reconnecting and closing the queue.

One thing I thought was particularly interesting was that the channel going down whilst opened isn’t just handled as a disconnection—instead there is some process of reopening the existing open queue. I find the code in the broker a little hard to follow—it’s that sort of non-linear style with callbacks and overrides in quite a few places—so I’m not quite sure how a client handles reconnection. The naive way would be as if starting again from scratch, but if the broker has some persistent state then it’s possible the client should behave differently between establishing a new session, and reestablishing an existing one after the channel has dropped.

Still, these are quite deep nuances which only those building robust client libraries from scratch would need to worry about, so I wouldn’t concern youself deeply with them. I’m just the sort of masochist who loves digging into these details.

Anyhow, I’m not going to examine these state machines any more closely than that, but having drawn out these diagrams I do feel I’ve got a little more insight into how things work at runtime, particularly in the presence of failures and edge cases.

Conclusions

That’s it for this article. It’s been quite the deep dive into BlazingMQ internals, but I’ve found it interesting. I do maintain hopes of implementing a client library in Go at some point, as I feel it’s an increasingly popular language and it would probably be useful. For that, these sorts of details will come in useful. But it certainly wouldn’t be the first project I set myself and then just hopelessly failed to make time for—I’m still hoping to build a HTTP/3 server at some point, preferably before everyone moves on to whatever it’s superceded by.

For those just using BlazingMQ using its existing libraries in C++, Java and Python, however, this sort of detail is purely for interest. It might still be peripherally helpful, if you’re trying to diagnose the source of problems, but I wouldn’t want anyone to be put off using BlazingMQ by the assumption that the details in this post are in any way required to use it. They’re emphatically not essential!

I’ll close off just by summarising, largely for my own benefit, the question marks I still have over the operation of the client and the broker:

  1. In the PUT messages, when is correlation ID specified, and when message GUID?
  2. What is the benefit of specifying the schema ID for message properties?
  3. Why does the ACK include both correlation ID and message GUID? Are both always guaranteed to be populated?
  4. When would the implicit payload option be specified in a PUSH message?
  5. When is out of order set in PUSH messages, and does that change the behaviour in the SDK or is it just passed straight to the application code?
  6. Are “subqueues” only used in fan-out mode when there are multiple App IDs, or are there other uses of them besides this?
  7. How does the RDA counter and other fields in the SubQueueInfo field affect how a PUSH message is handled, both in the broker and the client?

If I get hold of the answers to somr or all of these questions, I’ll write a quick follow-up article. But for now I think this one has more than enough content.

Hopefully that’s been of interest!


  1. Worth noting that not all network protcols use big-endian as network byte order—for example, I believe SMB uses little-endian. But the IETF protocols generally use big-endian representations, so it’s fairly safe to treat the terms “network byte order” and “big-endian” as interchangable in most contexts. 

  2. I mean, you’d struggle to find an encoding that’s more verbose than XML, without being some April 1st RFC that was specifically designed for inefficiency. XML certainly has its virtues, but concision is not, in my view at least, among them. 

This is the most recent article in the “BlazingMQ” series, which started with BlazingMQ: Introduction
Thu 27 Jun, 2024
20 Oct 2024 at 5:45PM in Software
 | 
Photo by Andy Pearce (via Midjourney)
 |