ØMQ Lightweight Messaging Kernel (v0.3)

WARNING: This text is deprecated and refers to an old version of ØMQ. It remains here for historical interest. DO NOT USE THIS TO LEARN ØMQ.

HISTORICAL WHITEPAPER

Introduction

ØMQ/0.3 includes many new features over ØMQ/0.2. Most importantly for application developers, it now provides the AMQP "exchange-queue-binding" semantics for message routing. Exchanges and queues are a superior way of designing loosely-coupled distributed applications, and make ØMQ/0.3 more familiar to users of AMQP messaging products such as OpenAMQ and RabbitMQ.

We have already discussed some of the concepts and algorithms in design documents for previous versions of ØMQ:

What's new?

Run time management of I/O connections

In ØMQ/0.2 you had to open network connections in advance and you had no way to add new connections or get rid of the old connections while the software was running.

The solution of the problem would be pretty straightforward (using dynamic lists of connections rather then preallocated arrays) if it was not for the negative impact of such dynamic lists on the overall performance of the system. The basic problem is that we would have to make the connection list thread safe (one thread may be inserting new connection, while other one is using it to poll on incoming data) and such a synchronisation (locking and unlocking mutexes) placed on the critical path of the system would decrease performance way below 1,000,000 messages a seconds.

Our solution of the problem is to place the connection list into the exclusive ownership of the I/O thread that's handling the connection and thus avoid synchronisation altogether. When a different thread want to add or remove connection from the list, its asks I/O thread to do so by sending it an asynchronous message also known as command.

Commands

Commands are asynchronous messages used to communicate between individual threads. Each command type is requesting a specific service from the destination thread: start receiving messages from a specified pipe, start writing messages to a specific pipe, release a pipe, wake up (when the destination thread is sleeping), register or unregister specific connection/engine with the thread etc.

From implementation point of view, commands are passed between threads using exactly the same mechanisms as messages do (y-suite). The process is thus very efficient requiring only a fraction of atomic operation per command.

Individual threads have to periodically poll for incoming commands. Given that polling can be pretty expensive (especially with POSIX poll function used by I/O threads; ØMQ's ypollset used by application threads is much better in this respect), it is crucial not to poll all the time, rather once in 100-1000 microseconds. That way we are getting very efficient message transfer with the trade-off of a little delay in processing the commands. And as commands are processed mainly when client application is starting and initialising it's ØMQ inftrastructure, a millisecond delay is not harmful in any way.

There are several algorithms used to eliminate excessive polling:

  • In I/O thread polling is done after all the connections are handled. Handling a connection means that data batch is read from or written to the socket. Given that each data batch consists of large amount of messages, polling will be done only once in a while.
  • Application thread performs polling in send, flush and receive functions:
    • In send and flush current time is checked and the polling is done only if at least 1 millisecond has elapsed since the previous polling.
    • In receive function polling is done once per 100 messages when messages are arriving at the full rate (there's no sleeping going on). However, if there is a gap between two subsequent messages (the thread would go asleep), poll is done immediately.

Multiple engines per thread

While ØMQ/0.2 allowed only for a single engine per thread, ØMQ/0.3 allows single thread to handle unlimited number of engines.

Exchanges and Queues

ØMQ/0.2 offered a way to build your messaging infrastructure using different engines connected to the dispatcher. While that is sufficient, it's certainly not very delightful to use and referring to the endpoints using IP adresses rather than logical names makes the management of the connections bit of a nightmare. Therefore, ØMQ/0.3 introduces a new layer on top of dispatcher/engine layer, based on the concept of exchanges and queues.

The idea is fairly simple:

  • Exchange is an entity you can send messages to.
  • Queue is an entity you can receive messages from.
  • Exchange is routing messages to appropriate queues.
  • Queues are storing messages.
  • Binding is a relationship between an exchange and a queue, meaning that messages should be routed from the exchange to the queue.

From user perspective, following rules apply:

  • Message producing application creates an exchange.
  • Message consuming application creates a queue.
  • One of the applications establishes a binding between the exchange and the queue.
  • Message producing application sends messages to the exchange.
  • Message consuming application retrieves messages from the queue.

Following diagram shows an example of exchange and queue layout on the network, boxes being exchanges and queues, arrows being bindings:

wiring0.png

The important point is that to transfer a message you need both exchange and queue. There's no way to transfer messages using exchange or queue alone. Following diagram shows more complex message flow arrangement:

wiring1.png

Few things to note:

  • There can be several queues and exchanges in single process/application.
  • Bindings can span between processes and machines, however, they can be fully contained within a single process as well.
  • Exchange can route messages to several queues.
  • Queue can receive messages from several exchanges.

From the implementation point of view, you can think of exchanges and queues as engines in the application thread, same way as there are wire protocol engines in the I/O thread. Following picture shows how the wiring from the diagram above would be rephrased in the terms of threads and engines:

wiring2.png

Locators

Given that we are going to use symbolic names (queue/exchange names) to refer to the components distributed on the network rather than raw IP addresses, it's clear we'll need some kind of directory service to translate the symbolic names to actual IP addresses and port numbers.

The directory service is called locator in ØMQ terminology and its scheme can be seen on the diagram below:

locators.png

As can be seen, the directory has three separate layers: local locator (locates objects within a thread), process locator (locates objects within a process) and global locator (locates objects anywhere on the network). Thus if you are searching for an object, you'll ask your local locator to find it. If it doesn't know about the object, it asks process locator to locate it. If process locator fails to find it, it'll forward the request to the global locator. If even global locator is not aware of the object, search fails.

Additionally, when creating an exchange or a queue, you can specify the scope of the object. Local scope means that the object will be registered with the local locator, however, the registration won't be passed to process and global locators. That way the object will be invisible from other threads aside of its own. Object declared with the process scope will be registered with the local locator and process locator, but not with the global locator. Consequently it will be visible only within its process. Object with global scope will be registered with all the three locators and thus will be visible all over the network.

Local locator is created automatically with every thread you create. Process locator is explicitly created by user when initialising ØMQ infrastructure on the program startup. Global locator is incorporated in zmq_server application that you have to start before any other ØMQ-enabled component on the network.

Using local scope for your private exchanges and queues is a convenient way to avoid possible name clashes with exchages and queues located elsewhere on the network.

The wire protocol between process locator and global locator is a simple ad hoc protocol at the moment. In the future we may choose one of the standardised protocols instead (LDAP, SIP, modified AMQP etc.)

Congestion control

Messaging systems often experience problems when there's excessive number of messages produced by a single application (or a group of applications). In such a case, load of the messages may virtually block all the remaining applications. In the worst case even the the administrative application that should in theory be used to kill the misbehaving application would block.

To prevent such a behaviour, we've based our design on John Nagle's fair queueing algorithm. The algorithm itself is pretty old (1985) but as far as we are aware it is not commonly used in business messaging implementations.

The algorithm is fairly simple. Instead of keeping messages belonging to one queue in a single linked list, we are keeping messages from each producer in a separate list. When the consumer asks for a message it will get one in a round-robin fashion. The consequence is that even if one producer produced million messages and other one only a single message later on, consumer will get the single message using at most 2 receives (because there are 2 message producers).

Following picture shows how the fair queueing is implemented:

congestion1.png

As opposed to packet switching (domain that RFC970 is referring to) we have two layers at which messages are aggregated:

  1. Individual queue aggregates messages from several message producers.
  2. Consumer application aggregates incoming messages from several queues.

Thus we have 2 levels of fair queueing. The above diagram shows how messages are aggregated in a single queue, the diagram below shows how messages are aggregated in a consumer application:

congestion2.png

This way even if your application is consuming from 2 queues, excessive load on one queue won't prevent it to get messages from the other queue.

Disconnection handling

When connection between the nodes breaks, the applications on both sides are notified about the fact using a callback function. The name of the object (exchange, queue) connection belongs to is passed to the function so that if there are multiple connections you can decide which one of them actually broke. After handling the disconnection you can decide either to ignore the error or cause the application to crash.

Memory management

With ØMQ/0.2, each message required two malloc/free pairs on sender side and two malloc/free pairs on receiver side. Given the uneven performance of malloc/free and scaling problems caused by inter-thread synchronisation within these functions, it is preferable to use them as little as possible.

One malloc/free pair (on both sender and receiver) in ØMQ/0.2 was used to add the message to the queue. Queue is actually a linked list and the allocation was used to create an item of the list. To get rid of excessive malloc/free calls we've written our own implementation of the linked list that allocates items in chunks rather than one by one (yqueue). Thus, malloc/free is now done once in 256 messages, meaning that each message requires 0.004 of a malloc/free pair in average.

The second malloc/free pair in ØMQ/0.2 was used to allocate and deallocate actual message body. This cannot be avoided completely, however, we've implemented several optimisations even here:

  • If you have the body allocated already (say you are porting a legacy software that hands you the buffer to send allocated by whatever legacy allocation mechanism), there's no point in allocating it anew and copying the data. Therefore, ØMQ/0.3 allows you to create a message from an existing buffer. When creating the message, you hand it the deallocation function and stop caring about the cleanup. When ØMQ is done with the message processing it deallocates the buffer itself.
  • We've introduced the concept of very small messages (VSMs). VSMs are so small that it's preferable to copy them to whoever needs them to dynamically allocating them and sharing the reference afterwards. For VSMs there's actually NO dynamic memory allocation on the path from the sender to the receiver.
  • If message is sent to several destinations it isn't copied for each destination (unless it is VSM). Instead, such messages are reference counted. When the last reference to the message is released, the message body gets deallocated. Keep in mind that reference counting is not for free. As the references may be held by different threads, reference counting has to be thread-safe. Thus, atomic operations are required. On the other hand, for messages sent to a single destination, the algorithm is optimised to avoid reference counting (and thus atomic operations) altogether.

Examples

We've written few example applications on top of ØMQ to give at least some impression of what can be done and what are the techniques to do so.

Instant messaging

Chat is a simple instant messaging application. It consists of "chatroom" application (IM server), "display" application that shows the discussion going on in particular chatroom and "prompt" application that allows you to take part in the discussion.

chat.png

This example is described in detail in ØMQ tutorial.

Videoconferencing

Videoconferencing example is similar in functionality to the example above. However, as opposed to chat example that actually builds central server (chatroom) over the brokerless architecture of ØMQ, videoconferencing example works in fully decentralised manner. There is no single bottleneck on the network and no single point of failure.

videoconferencing.png

Note: Videoconferencing example is dependent on Linux-based video capture libraries and thus it cannot be built on other operating systems.

For detailed description of the videoconferencing example have a look here.

Stock exchange

As ØMQ is primarily intended to power stock trading business, we've created a sample application that simulates internal working of a stock exchange. The primary focus of the example is demonstrate ØMQ's performance in a real-world-like scenario. You can start the scenario in your environment and check the performance monitoring console (see the picure below) for actual latencies and message rates.

exchange2.png

For detailed description of the stock exchange example have a look here.

What's missing?

  • Complex routing algorithms;
  • shared exchanges and queues;
  • load balancing the messages.

Conclusion

ØMQ/0.3 introduces new features that make it much more usable. While the high-performance components of ØMQ/0.2 remain as they were (or are even more efficient), there's a new layer above them to provide the user with a simple and elegant way to define message flows on the network.

Comments: 0

Add a New Comment