BlazingMQ is a message queue middleware system developed at Bloomberg which was opened sourced around a year ago. I think it has a lot of very useful features for some use-cases, so this series of articles looks at it in a bit more detail. In this third article I’m going to try to get a basic producer and consumer working with the broker to get some basic exposure to the APIs in C++ and Python.
This is the 3rd of the 4 articles that currently make up the “BlazingMQ” series.
In the previous two articles in this series, we’ve taken a look at the features of BlazingMQ, and some details about the more operational considerations of how a cluster of brokers work together. In this article I’m going to actually try writing my own clients and get them working with the broker running in Docker1.
Before jumping into this, I ran through the BlazingMQ in Action section of their documentation, and then the follow-up More Fun with BlazingMQ—I strongly recommend you do the same before working on any code.
The instructions for these tutorials are based purely in Docker and using the command-line bmqtool
they provide to send and receive messages, so you don’t need to write or compile any code. This lets you play with different routing strategies and subscriptions, and also see how priorities interact with these things. It only takes maybe 20 minutes to run through it, and I found it very helpful in confirming my understanding.
Before worrying about writing a client, I wanted to make sure I could get the broker up and running in Docker, such that I could have a client running on the host machine and connect to the broker in the Docker container.
Since I already had a clone of the BlazingMQ repository, and I’d run through the Getting Started guide using the Docker Compose setups from it, I knew that there was a Dockerfile
in there I could use myself. To keep things simple I didn’t want to use Docker Compose, however, so I just used the Dockerfile
directly to build my own image for the broker. I built it with this command, from the root of the repository:
docker build --platform=linux/amd64 -t bmq-broker -f docker/Dockerfile .
Note that I’m using an Apple silicon (i.e. ARM-based) laptop, so if you’re using Linux on an x86 processor, you can probably skip --platform=linux/amd64
.
Then I needed to run the broker container, which I did with the command below:
docker run --platform linux/amd64 -p 127.0.0.1:30114:30114 -h mybroker \
--mount=type=bind,src=./docker/single-node/config,dst=/etc/local/bmq,readonly \
bmq-broker /usr/local/bin/bmqbrkr /etc/local/bmq
For those who, like me, may not be Docker experts, let me break this down for you—if you’re experienced with Docker, this is going to be largely stating the obvious.
--platform linux/amd64
linux/arm64/v8
and the image’s platform is linux/amd64
, the run would fail unless I repeat the platform declaration here. You shouldn’t need this if you’re using Linux.-p 127.0.0.1:30114:30114
30114
, which is the standard port for BlazingMQ, on the 127.0.0.1
localhost address on the host machine. This is what allows me to run a local client on my Mac and have it connect to the broker running inside Docker.-h mybroker
mybroker
. Mostly I wanted to just set it to something different from that used in docker-compose.yaml
—I expected it not to make a difference in the single node cluster case2, and wanted to confirm that.--mount=type=bind,src=./docker/single-node/config,dst=/etc/local/bmq,readonly
docker/single-node/config
within the repository, and we want it to be mounted at /etc/local/bmq
in the container. We mount it read-only as a precaution, as there’s no reason for the broker to make changes to this directory.bmq-broker
/usr/local/bin/bmqbrkr /etc/local/bmq
At this point I had the broker up and running in one window—you could run detached if you like with docker run -d ...
, but I quite like to see the log output while I’m doing other things.
As an aside, the configuration contained within docker/single-node/config
in the repository specifies a lot of global settings, which mostly we can ignore for the purposes of this article. It also specifies a set of domains within a subdirectory called domains
, which is more important because you need to use the name of a valid domain when creating a queue.
In this article I’m going to use the domain bmq.test.persistent.priority
, and the configuration for it is contained within a file of that name in the domains
directory, with the extension .json
added. I’ve reproduced the configuration below, mostly as an example of what these files look like, but really the only important section for the simple usage contained herein is the one which declares the mode
as priority
.
{
"definition": {
"location": "local",
"parameters": {
"maxDeliveryAttempts": 0,
"deduplicationTimeMs": 300000,
"consistency": {
"eventual": {}
},
"storage": {
"config": {
"fileBacked": {}
},
"domainLimits": {
"bytes": 2097152,
"messages": 2000,
"bytesWatermarkRatio": 0.8,
"messagesWatermarkRatio": 0.8
},
"queueLimits": {
"bytes": 1048576,
"messages": 1000,
"bytesWatermarkRatio": 0.8,
"messagesWatermarkRatio": 0.8
}
},
"messageTtl": 300,
"maxProducers": 0,
"maxConsumers": 0,
"maxQueues": 0,
"maxIdleTime": 0,
"mode": {
"priority": {}
}
}
}
}
At this point I wanted to test things before writing any of my own code, to make sure the broker was contactable from the host, so I wanted to build the bmqtool
so I could run it locally and check that it could connect.
BlazingMQ does have quite a few dependencies, but thankfully there’s a handy bin/build-darwin.sh
script which builds the broker and tools for MacOS. If you’re using Ubuntu, bin/build-ubuntu.sh
should do the job, not that I’ve tested that myself. For non-Debian-based distributions, you’ll probably need to roll your own, since that script first installs a bunch of pre-requisites using the apt
command-line utility.
This went very smoothly, with only one slight platform-specific hiccup3 which wasn’t related to BlazingMQ. The build had quite a few warnings, as always seems to be the case in cross-platform code, but the binaries were all generated successfully. What problems I did have seemed to be resolved by starting again from a clean repository, so there may have been some half-fetched dependencies or similar that were causing problems.
The binary gets built in a build
directory which has subdirectories mirroring the source layout, so you need to look in src/applictions/bmqtool
for the binary bmqtool.tsk
. By default the application attempts to connect to port 30114
on localhost
, which is exactly what we need here. If you want it to connect to a different host or port, you can use the -b
command-line option.
Having started an instance of bmqtool
, I then ran the following commands at its command prompt to open the connection and open a queue called somequeue
for writing:
start
open uri="bmq://bmq.test.persistent.priority/somequeue" flags="write"
I’m not going to reproduce the logging that occurs here, because it’s very chatty, but that’s actually very helpful in making sure everything is working as expected.
In another terminal window, I fired up a second instance of bmqtool
and ran the following commands to connect and open the same queue for reading:
start
open uri="bmq://bmq.test.persistent.priority/somequeue" flags="read"
Then it was time to try sending a message:
post uri="bmq://bmq.test.persistent.priority/somequeue" payload=["test message"]
The success of this can be immediately seen on the consumer as the following log line occurs immediately:
INFO m_bmqtool_application.cpp:617 ==> EVENT received:
[ type = "MESSAGE" rawEventType = PUSH ]
By the way, throughout this article I’ve trimmed and reformatted some of the output for brevity—for example, I’ve removed timestamps and wrapped long lines. All the important details should be intact, just be aware it may not exactly match what you see if you follow along.
Running the list
command on the consumer also confirms that an unconfirmed message has been delivered—note that I’ve split the output over multiple lines, whereas in reality it’s on one:
# 1 [400000000027F2BD56EF935754E5E32C]
Queue: '[ uri = bmq://bmq.test.persistent.priority/somequeue
correlationId = [ autoValue = 2 ] ]' = 'test message'
The only thing left to do was confirm the message, to have it removed from the queue again:
confirm uri="bmq://bmq.test.persistent.priority/somequeue" guid="400000000027F2BD56EF935754E5E32C"
That was all simple enough, and doesn’t explain anything that’s not already covered in the Getting Started guide on the BlazingMQ site—but it was a handy way to confirm that the broker was running in Docker and clients on the host were successfully able to connect to it.
One tip that I did glean by playing with this is that if you need a quick way to confirm all messages on the queue with bmqtool
you can do this:
confirm uri="bmq://bmq.test.persistent.priority/somequeue" guid="*"
The handy thing about bmqtool
is that it also allows you to test out your applications one at a time, allowing me to work on the producer application and get that working, able to check whether messages had hit the queue successfully without having written the consumer.
Now it was time to write the first client application—a producer to send messages to the queue. Once this was written, I could test it using bmqtool
. Since I’m not a fan of Java, I thought I’d aim to write the producer in C++, and then for the consumer I’d give the Python SDK a try.
Let’s start with the producer in C++, then. I’m going to run through the source code of the application I used to just produce some fixed messages. Because I’m just playing around to get things working, this is absolutely drastically bare-bones minimal—it just produces a fixed set of three messages every time it’s run.
I couldn’t find a great deal of specific documentation about actually writing a C++ client application, so I’ve winged it a little by copying some of the sample client code. I’m sure I’m not following some best practices here, so I wouldn’t take this as a good example—but it does seem to compile and work!
By the way, if you want to try this out yourself, I’ve put the source code into this Gist on GitHub, which is probably easier than pasting these into a source file yourself.
Let’s look through the code. First an amuse-bouche there are a few includes, a constant which defines the queue which I’m using—this is the same as the queue I used earlier in this article, as I’m using the same broker configuration—and also some prototypes for the functions I’m going to define.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
Probably it would be better practice to refer to the include files for every class I’m mentioning, so things don’t break if the includes within the project itself should change in future, but this minimal set seems to work for right now. The title of this blog is “That’s Good Enough”, not “A Categorical List of C++ Best Practices”, after all4.
Right, on to the starters. This simple main()
function just creates and starts a session and, assuming that worked, calls into runProducer()
to actually produce some messages. The main()
function also tries to stop the session as well.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
|
Now the main course—the runProducer()
function opens the specified queue and sends three messages to it. Each message is sent using sendMessage()
, which we’ll look at in a moment.
Perhaps a noteworthy detail here is the QueueId
which, as the name suggests, is an identifier which identifies an open queue. I’m passing an integer here to construct it, which seems to be some sort of context which I could use to disambiguate references to queues in case my application has several open at once.
This is passed to openQueueSync()
along with the URL of the queue to open and the flags, which in this case indicate that I want to write to the queue. Note that there’s also an openQueueAsync()
method which is a non-blocking call that reports the result asynchronously through a callback that you provide. This would be useful if you don’t want to dedicate a whole thread to your queue, or if you need to keep your application quick to respond to shutdown requests or similar changes.
As an aside, there’s also a method simply called openQueue()
which has a few minor differences in the interface, but from the documentation this would appear to be deprecated in favour of the other two. My advice is definitely read through the C++ API documentation carefully before starting on production-grade coding with it.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
|
Finally, a tasty dessert—the sendMessage()
function that actually does the work of sending each message.
The main take away here is that you can’t just pass a string directly to the Session::post()
method, you have to construct a MessageEvent
, which can contain one or more individual messages. You have to use loadMessageEventBuilder
to initialise a MessageEventBuilder
object, and then you can use that to load messages. For each message you want to send, you need to:
startMessage()
to return a new, empty Message
instance.setDataRef()
.packMessage()
on the builder to add the message to the event.Note that it doesn’t take a copy of the data until you call packMessage()
, which makes things quite efficient but does mean there’s the potential for irritating race conditions if you’re not careful about this.
In this simple test we’re sending each message as a separate event, but in reality things will probably be more efficient if you pack multiple messages into a single event, if that’s possible in the context of your application.
Anyway, once the messages are packed into the builder, the messageEvent()
returns an event which can be passed into the post()
method of the session to actually send the messages.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
|
And that’s it, my first BlazingMQ application! There were a few interesting details along the way, but all in all things were fairly painless.
Now that I had my code, the next job was to compile it. This is where things got a little sticky, since I couldn’t find any clear instructions on which header files and libraries I needed to link.
Full disclosure, there’s quite a lot to read through, so it’s entirely possible I’ve missed something in the documentation. But it might be handy to have clear instructions on which libraries need to be linked into a client application—ideally perhaps prelink everything into a single libbmqclient.a
file which has everything you need.
Anyway, in the end I just fiddled around a little and examined the symbols exported by various libraries in the repository, after I’d run the build-darwin.sh
script, and fairly quickly figured out a set of libraries and include paths which got my code to compile.
I’ve included my simple Makefile
below for reference, but do be aware that it may be much more complicated than it needs to be! I was in a bit of a rush just to get something working, so in a production system I’d take a little more care to build things more carefully, especially as you’re likely to be using a lot more of the functionality of the client library than my little application here.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
So now the moment of truth! I’d got the code to build, but did it work?
I fired up the broker in Docker, as I outlined earlier in this article, and I also run bmqtool
as a consumer as well. At this point I ran my new producer code—based on the output it seemed to work:
Starting session
Trying to send messages
Opening queue
Failed to post message msg=Message one status=0
Failed to post message msg=Message two status=0
Failed to post message msg=Message three status=0
Closing queue
Stopping session
More importantly, the logging in the consumer indicated that messages were available:
INFO m_bmqtool_application.cpp:617 ==> EVENT received:
[ type = "MESSAGE" rawEventType = PUSH ]
And the list
command confirmed this:
> list
INFO m_bmqtool_interactive.cpp:663 Unconfirmed message listing: 3 messages
# 1 [4000000000000185711A5D7A7819555B]
Queue: '[ uri = bmq://bmq.test.persistent.priority/somequeue
correlationId = [ autoValue = 2 ] ]' = 'Message one'
# 2 [400001000000018619E25D7A7819555B]
Queue: '[ uri = bmq://bmq.test.persistent.priority/somequeue
correlationId = [ autoValue = 2 ] ]' = 'Message two'
# 3 [4000020000000186927D5D7A7819555B]
Queue: '[ uri = bmq://bmq.test.persistent.priority/somequeue
correlationId = [ autoValue = 2 ] ]' = 'Message three'
Looking good! So, I confirmed these messages using bmqtool
and then moved on to building the consumer.
Having got the producer working, my next and final step was to write a very simple consumer in Python, so I could see both the C++ and Python client libraries working.
I didn’t have much luck with pip install blazingmq
, unfortunately, as I got errors about a missing bmq
library, which I assume is the underlying C++ library. This may be because I missed some pre-requisite steps, or it may be because I’m on MacOS and that’s a little less well tested, or it may just be because this library is quite recently open-sourced the maintainers are still ironing out some glitches.
I suspect these issues will be resolved, or perhaps just the documentation improved. I just wanted to try things out for now, so I thought the easiest solution is just to build the Python library myself. This was very much a trial-and-error approach, because I’m not at all familiar with the BlazingMQ installation procedures, or indeed with pkg-config
, so it was a bit of a learning experience. I’ve captured here the steps I followed in case it helps someone else, but I make no claims at all that this is optimal!
First I checked out the Python SDK repository, created myself a venv and used pip
to install the basic build dependencies—you may already have these on your system, but I always like to keep my global packages very minimal. For reference, here’s the output of pip freeze
in my venv:
Cython==3.0.11
pkgconfig==1.5.5
setuptools==75.1.0
I then ran the bin/build-macos-universal.sh
script from within the repo, which does some more git checkouts and builds some dependencies—in particular the underlying C++ BlazingMQ client library. The pkg-config
metadata files (with extension .pc
) are stored in install/lib/pkgconfig
by the end of this process, so then I set PKG_CONFIG_PATH
to this value for a run of make build
:
PKG_CONFIG_PATH=./install/lib/pkgconfig make build
At this point I didn’t want to mess around with any more installation-related annoyances due to my lack of familiarity, so I took a gamble—-I just changed into the src/
directory, where the blazingmq
directoy is located, and ran my code from there. Since the current directory is typically the first thing in sys.path
, this worked fine. Clearly not a suitable solution for a production environment, but hopefully at some point in the future pip install blazingmq
will just do the job.
Once I’d done an import
in the interactive Python interpreter to check everything was working, I used the script below to actually test things. This is stolen almost verbatim from the BlazingMQ Python API documentation.
After the import
s, we create threading.Event()
to use to terminate the later event loop, and we use signal.signal()
to install a handler which just flags the event. Nothing BlazingMQ-specific here.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Then we define a simple callback function, which will be called when a message arrives. For our purpose here this just prints the content, and confirms the message off the queue, but clearly in a real application you’d process it however was appropriate.
17 18 19 |
|
Finally, the code that does the real work—as you might expect from Python, this is much more concise than the C++ version.
We create a Session
object as a context manager, passing our on_message_callback()
defined earlier as the on_message
parameter. Within the context block we then call open_queue()
on the Session
object, and then just wait—the BlazingMQ infrastructure will invoke our callback for any messages which arrive. As you might expect, the callback is invoked within threads owned by the BlazingMQ libraries—this shouldn’t be surprising, but just worth noting that you’ll need to protect any shared structures from concurrent access, etc.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
|
I fired up the consumer application, which seemed to connect file, and then ran my producer from earlier. Lo and behold, it all worked!
$ python consumer.py
---- Session open
---- Queue open
Received message: b'Message one'
Received message: b'Message two'
Received message: b'Message three'
^C---- Terminating
---- Closing queue
Not much else to say here, this is about as simple as it gets—but it’s nice to see it working!
So after all that, what have I learned?
My first take-away is that the experience of using BlazingMQ’s open source distribution as a developer still has quite a few rough edges. My own personal development has typically been with langauges with a more formalised distribution model than C++ (e.g. Python, Go), so it’s entirely possible that I’m unaware of conventions in projects which would have simplified things.
Unless I just missed how to do it, it would be helpful to have all the BlazingMQ libraries pulled into a single .a
file, for example, to make it easier to build client applications. This approach may be complicated by the fact that this library also depends on the Bloomberg BDE library, and pulling that in as well might conflict with client code, but at least some documentation on this would be helpful.
Similarly with the Python library—I find that adoption of libraries is always better if pip install X
just works, although I realise this can be quite problematic for extension modules with lots of dependencies.
I make no claims at all that these are easy problems to solve, I’m just highlighting that they would be useful. BlazingMQ is an in-house developed solution which has then been open-sourced, so it’s very much expected that the processes for building and distributing it will need a lot of work—the in-house equivalents can’t afford to be broken, and that’s always a tricky challenge! This is why I was keen to solve these problems using only code and documentation publicly available.
All that said, however, getting my simple applications working didn’t prove too problematic with a little trial and error. The Python library in particular is simplicity itself to use in code as a consumer, and I’m sure writing a producer in it wouldn’t prove too tricky either. The C++ code is more verbose, as you’d expect, but once you gain a little familiarity it’s all quite logical. Just do remember to check the documentation and avoid any deprecated methods!
So that’s it for this article. I was plased that I got everything working that I needed to, and I’m hoping this might be useful feedback for those maintaining the project too. If I get any updates on anything I did wrong, I shall come back and update this article—or perhaps write a new one, if the updates are extensive—but I’ll leave all the original content in this article intact either way. Even if I got some things horribly wrong, I think it can be useful for others to see them and avoid the same.
When I get chance, I’d like to drill into the communication protocols that BlazingMQ uses in more detail, and I’ll add another article to this series if so. If at all possible, I’d like to try writing a client library in Go, as that’s currently not supported, but it’s quite an undertaking so who knows if and when I’ll get the time to do that!
I should clarify that I’m no expert in BlazingMQ—that’s why I’m writing these articles, as a learning experience—but also I’m not expert in Docker. I know just about enough to muddle through, but just be aware that all I’m doing in this article is explaining how I personally got it working—there may be better ways to get things working, and on other platforms you may need do things differently. ↩
In the multiple node cluster’s docker-compose.yaml
, I notice that the hostnames are passed as a command-line parameter to each broker, so in that case I think the hostname I chose would need to match the command-line—but in the single node case my assumption was that this wasn’t important, and that seems to be confirmed by the fact that I changed the hostname without it causing any issues. ↩
Specifically, the m4
macro processor wasn’t installed on my Mac. When the script attempted to invoke it for the first time, MacOS triggers an automatic install, but this didn’t do the job properly. I just went to the app store and installed XCode, rebooted for good measure (probably not required) and when I reran the build it all went through fine. ↩
In particular I’ve used std::endl
copiously, which I can practically guarantee will randomly enrage someone reading this. They’ll argue there’s never a reason to use it due to the inefficiency of the implicit flush operation. However, as with any “never use X” declaration, this is missing the nuances—people who oversimplify this way are a bit of a pet peeve of mine.
Yes, it’s true that output to interactive terminals (i.e. where isatty()
returns true) is typically line-buffered by default—but it’s not too uncommon to pipe output to a file and use tail -f
to watch that file, and without the flush implicit in std::endl
then the resultant block-buffering would lead to irritating delays in the case of long pauses in output. So when it’s used for diagnostic output, as in this case, there are legitimate reasons to prefer it.
Just one example of why you never just say “don’t do X”—you explain the implications and respect people enough to make their own choices. ↩