ØMQ Lightweight Messaging Kernel (v0.6)

WARNING: This text is deprecated and refers to an old version of ØMQ. It remains here for historical interest. DO NOT USE THIS TO LEARN ØMQ.

HISTORICAL WHITEPAPER

Introduction

Most importantly, 0.6. version of ØMQ introduces load-balancing style of messaging and on-disk offload for the queues (swapping). Support for OpenVMS and Mono are of interest as well.

We have already discussed some of the concepts and algorithms in design documents for previous versions of ØMQ:

Load balancing

Till now, the only messaging style supported by ØMQ was "data distribution" i.e. each message is passed to everyone who subscribed to the particular message feed.

In the current version, support for "load balancing" was added. Load balancing means that each message is sent to exactly one receiver, the messages being distributed in fair fashion among the receivers connected at the moment.

Picture below summarises the both messaging styles. The numbers indicate which messages are passed to which receivers:

lb1.png

To choose the messaging style, use the appropriate constant when creating an exchange.

Swapping

RabbitMQ team has done a nice work measuring how their messaging server behaves when it runs out of physical memory and starts swapping. Have a look at their results here. Although the measurement was done for RabbitMQ, you should expect similar results for any messaging server running out of memory.

The obvious problem with OS swapping is that it has no knowledge of individual queues. Thus, swapping caused by one queue that run out of memory can cause data from another queue to be swapped as well, inflicting performance penalty even on applications that have nothing to do with the overflown queue.

To solve this problem, ØMQ/0.6 introduces concept of per-queue swap. When creating a queue, the size of swap can be defined. Once the queue runs out of its memory limit (defined by high watermark) subsequent messages are written to a file. When the memory usage of the queue gets below the low watermark, data are retrieved from the swap and placed in memory.

Note that until memory limit is reached, there are no disk I/O operations involved. Thus, unless there's a congestion in the system, ØMQ runs in memory, avoiding the higher latency and lower throughput caused by disk I/O. This behaviour is exactly what one would expect: If there's no congestion, everything runs very fast. Once there's a congestion - i.e. business logic is not able to keep pace with the data - the latency grows anyway (messages have to wait in queue till business logic is able to process them) so additional latency introduced by disk I/O doesn't really matter. However, you are still sure, that your process won't crash because of running out of memory - the swap can be defined to be as large as hundreds of gigabytes.

It should be also noted that I/O operations are done in batches, making the swap as fast as it can possibly get.

Enhanced language bindings

APIs for different languages have changed slightly since 0.5 version. The goal was to sacrifice some of the backward compatibility to make the APIs expose as much of ØMQ functionality as possible. The additions are:

  • setting notification mask and receiving notifications
  • retrieving queue ID when receiving a message
  • non-blocking version of send and receive functions
  • memory limits for queues (high watermark and low watermark)
  • on-disk swap limit for queues
  • messaging style for exchanges (data distribution vs. load balancing)

Additionally, all the language bindings are now conforming to a single guidelines, meaning that the exposed functionality is the same in all languages and that the names of functions, constants and parameters are the same as well. This kind of unified API makes it easy for programmer to switch from using ØMQ in one language to using it in another one. Also, it makes the communication between programmers of different components in different languages less difficult.

OpenVMS port

ØMQ have been ported to OpenVMS. The port includes C++, C, Java and native VMS API. Native API can be used to access ØMQ from different languages like COBOL, Pascal, BASIC etc. Have a look at Fortran performance tests in /perf/tests/zmq for an example. Additionally, port includes performance test for different language bindings and example applications (chat, exchange, butterfly).

The port is particularly useful when connecting applications running on OpenVMS with applications running on different operating systems (Linux, Windows).

Support for Mono

.NET extension in previous versions was usable only on Win32 platform.

In version 0.6 it was renamed to CLR extension and it can be used also on the top of Mono framework that in its turn runs on Linux, Mac OS X and Windows. Mono support is particularly helpful when migrating your C# or VB projects from Windows to Linux. While undergoing such migration, you can move individual applications one by one from .NET/Win32 to Mono/Linux while still keeping the system functional at any given moment.

Centralised management (experimental)

There are two basic use cases of ØMQ. Firstly, there's an enterprise deployment with hundreds of applications running on the network. Secondly, there a simple point-to-point communication. The former requires centralised management and monitoring of the ecosystem, the latter is much simpler, more of glorified sockets than a messaging system.

ØMQ/0.6 does few preliminary steps to support both use cases. While it still supports the old on-the-fly registration of global objects that's important for the simple use case, it adds support for complex deployments.

With the large enterprise-scale deployments the main problem of the distributed system becomes its manageability. With hundreds or thousands of applications running on the network it's inherently hard to keep record of all the dataflows, analyse the dependencies or make changes to the system.

The only way to manage such a system properly is to keep all the relevant information centralised in a single repository (directory service) which can be browsed, analysed or modified in an automated fashion.

Although there was such a directory service in ØMQ (zmq_server) the drawback was that individual applications registered themselves on the fly rather than being configured in advance by the administrator. Because of that directory service was aware only of the applications running at the specific moment and haven't allowed for a exhaustive analysis/management of the system as a whole - including the applications that might have been offline at the moment.

Starting with version 0.6, ØMQ moves in the direction of "administered environment" model in addition to the simple configure-as-you-go model.

To give it a try, have a look at chatroom example. Currently when starting a chatroom, you have to supply it with the network interface and the port to use. Actually, you have to provide network interface and port for both incoming and outgoing messages:

$ ./chatroom localhost cheese 192.168.0.115:5555 192.168.0.115:5556

As far as there are just a couple of chatrooms running on your system, it's possible to manage it this way. However, imagine the administrative nightmare of managing hundreds of chatrooms located on several servers, each using particular network interface and specific ports.

Moreover, there is a reliability problem with regsiter-as-you-go approach. Try running a display component without running the chatroom first. What you'll get is error. The problem is that without centralised configuration ØMQ cannot distinguish between the case where the application you are trying to connect to is just temporarily offline (in which case regular retries to reconnect are appropriate) and the case where the application doesn't exist at all (in which case raising error is appropriate).

Now, let's try to do the same thing in a different way. First, you have to determine which global objects (exchanges or queues) are used. Brief examination of the code shows that each chatroom exposes two global objects. One global queue for incoming traffic and one global exchange for outgoing traffic. Names of the objects are created from the chatroom name: the queue is called Q_<chatroom-name>, the exchange is called E_<chatroom-name>. So, if our chatroom's name is "cheese", the objects will be named Q_cheese and E_cheese respectively.

Create an configuration file named config.xml (or any other name):

<root>
    <node name = "E_cheese" location = "zmq.tcp://192.168.0.115:5555" />
    <node name = "Q_cheese" location = "zmq.tcp://192.168.0.115:5556" />
</root>

Then start the zmq_server supplying it with the name of the configuration file:

$ zmq_server --config-file config.xml

At this point, you don't have to pass the configuration into to the application. Instead, application will retrieve the configuration from the zmq_server:

$ ./chatroom localhost cheese "" ""

Now imagine you have several hundred chatrooms running in your datacenter. You can administer all of them by changing the config.xml file. If there was no centralised administration, you would have to change the scripts starting the chatrooms on individual servers.

As for the reliability, start the zmq_server with the config.xml. Start the display application and prompt applications. As the chatroom is not running, messages cannot pass from one to another. However, the applications run, trying to connect to the chatroom. You can even type a message to the prompt and once the chatroom goes online, the message will be passed to the display.

Conclusion

ØMQ/0.6 allows you to use it for whole new set of use cases based on load-balancing architecture. It is also more reliable as it adds support for on-disk offload - the preventing out-of-memory problems - and centralised management.

Comments: 0

Add a New Comment