0MQ Termination

Mike Pearce (eb.weak|ekim#eb.weak|ekim)

This paper gives an overview of the message passing and internal thread structure of 0MQ to facilitate its termination.

To give POSIX-like non-blocking behavior to the 0MQ socket close command an additional thread was created inside 0MQ to handle the shutdown of each socket. This thread was given the name 'reaper' and its job is to continue running to manage the closure of all the system sockets that 0MQ created on your behalf. This means that your application is able to initiate closure via this non-blocking call to zmq_close() and then to continue with its other duties. To ensure that your application does not terminate before the reaper completes its duties, the zmq_term command does a forever block until the reaper confirms to it that all sockets have closed. This will present a problem to you if you fail to call the zmq_close() command for every zmq_socket() command that you did.

Command Sequence and Threading

Chances are you will want your application to have one or more dedicated threads to block on the various socket activities such as waiting for messages to arrive for your application (e.g. zmq_recv). If you read the 0MQ reference guide or man pages for the 0MQ socket commands you will learn that these are not thread safe and so each thread will have to remain responsible for the open and closure of its own socket. A parent thread will then remain responsible for creating the zmq_context, and perhaps for sending messages. If the latter then it will also have sockets to close prior to calling zmq_term.

Internal Messaging.

The following diagram shows us an example of an application with two threads – The main thread which has been used to manage the zmq context (zmq_init and zmq_term) and a reader thread which is blocking on zmq_recv awaiting work.


In order for the application to exit we need to unblock the reader thread and get it to close its open socket. In order to unblock the reader thread the main thread needs to terminate the 0MQ context and it does this via a call to zmq_term.

As mentioned in the 0MQ design paper a reap message is sent to the 0MQ reaper thread via the internal pipes used for 0MQ intercommunication. But what is more interesting for us is the exit message that is dispatched to release our blocking reader thread. Once our reader thread is free to terminate it can initiate the closure of its socket. Remember the 0MQ socket is not thread safe and so we must not close this socket from threads other than the one who opened and did the blocking read on that thread.

As this socket closes it sends a reap message to the reaper who takes over the job of closing that socket. The applications reader thread is now free to terminate itself.

The reaper thread has a list of open sockets and will remain blocked forever until this list is empty. As reap commands arrive for each of the open sockets, the reaper will send pipe_term commands to the 0MQ io_thread. The io_thread is created within 0MQ to manage the sockets IO.

The io_thread must now clean up its pending messages for that socket and in due time it sends a pipe_term_ack command which is intercepted by the reaper thread who then does the real closure call and then ticks it off his list of open sockets.

Eventually the reaper thread will get his empty list and will send a 'done' command to the thread that initiated the zmq_term. The reaper thread then dies.

The zmq_term command is now free to send a final exit command to free up the io_thread and then everything is able to terminate cleanly.

The consequences of not respecting 0MQ Thread Safety

So whats this 'not thread safe' disclaimer all about on these 0MQ sockets then? Well, if you call the 0MQ commands for a single socket from multiple threads you will get race conditions. If you find yourself with the assertion of 'nbytes<>sizeof(commant_t)' and after some debug you discover that nbytes=0 then you will eventually discover that you closed the socket whilst another thread was awaiting data and the OS socket informed you of this by returning 0 bytes but 0MQ is not designed to be awaiting input from closing sockets and interprets 0 bytes as an error. This is not your only race condition. Lets look at another .

In the following diagram we see a race condition where the main thread was used to terminate the socket for which the reader thread is responsible. On this occasion the pipe_term_ack that is supposed to get intercepted by the reaper is instead caught by the reader thread and lost.

When the zmq_term is called it gets to free the reader thread which is then able to exit but zmq_term will then get stuck awaiting the pipe_term_ack and your application will be unable to terminate itself and will leave the system socket open.


Termination via Class Destructors (C++)

There exists a header file that defines a set of class wrappers around the 0MQ interface. The functions used to interface 0MQ then become encapsulated inside instances of context_t, socket_t and message_t. The advantage of this is that the destructor of each class cleans up by calling the appropriate 0MQ function. For example the socket_t constructor calls zmq_socket and its destructor calls zmq_close -

Class Constructor Destructor
context_t zmq_init zmq_term
socket_t zmq_socket zmq_close
message_t zmq_msg_init zmq_msg_close

This idea is good until applied to a multithreaded solution. In a multi threaded application that distributes the 0MQ socket usage across multiple threads you need to be very careful of how you scope the declaration of these new class objects. If not then you can get into quite a pickle.

The complexity stems from the fact that the 0MQ socket class is not thread safe and if you are not thinking on your feet then you can easily implement a socket_t class solution that violates this. How? Well, lets return the the example of before where you have created a separate thread to allow you to block on the zmq_recv. You then decide to implement this as a new class. This reader class will now interface to 0MQ via socket_t. Your first instinct may be to define an instance of socket_t as private data for your new reader thread class and give it class scope. This will mean that the destructor for socket_t will execute when your reader class destructs and its destructor will execute in the thread that instantiated your reader class.


What you have achieved inadvertantly is that the zmq_socket and zmq_close will execute from inside the parent class address space and the zmq_recv will execute within the derived thread space. To avoid this problem you must always declare instances of socket_t as local data to the workerFunc that gets executed as the new thread.

Another problem is where you declare your instance of context_t. Remember that its destructor is responsible for calling zmq_term. Lets say that you decide to declare this as class scoped data within some parent class. If you bind its destruction to the parent class destruction then you have another fine mess. The problem is that any derived reader thread class will get destructed prior to its parent class but the reader thread will not terminate itself until the zmq_term call has released its blocked recv call. This will not happen until the parent class destructor is called but by then the derived class will have already been destructed.


What this shows is the execution sequence in time. So the controlling class destructor chains the derived class destructor which will call the socket_t destructor which will make the call to close the socket. This is the good news. The destructor of the derived class will exit and the derived class will have been destructed. This is the bad news as it means that the WorkerFunc has also been destroyed as part of the classes destruction.

To avoid this you need to ensure that your instance of context_t can be destructed prior to the parent class destructor. So what you want to implement is a shutdown sequence that stops 0MQ prior to exiting your application and the job of this stop is to call zmq_term.

The following diagram shows a working solution for this. The parent class now has a stop function that first calls the reader class to set a flag that will allow its forever loop to exit. It then destructs its instance of the context_t. The destructor for this will call zmq_term. We already learn't that this will send exit messages to unblock all 0MQ socket blocked calls and so our workerFunc will unblock and terminate. In doing this it will call the destructor for the socket_t and this will call zmq_close for the socket. This will notify the reaper thread within 0MQ who will close the socket and release the block on the zmq_term(). Thus the parent stop function will exit. When the application itself exits, it will call the chain of destructors and the destructor for the reader thread can call the pthread_join() in order to complete the termination of the thread.


The moral of this story is that it is perhaps better to change the definition of these 0MQ wrapper classes so that they provide a stop() interface rather then using their destructor's. If you use the wrapper classes in the current form then there is a hidden danger that someone might get the scoping wrong on the declaration of them. This adds a minefield of future misery for the code maintenance crew.

By adding an explicit stop() to the code that is independent of any destructor call gives you an opportunity to separate the two termination sequences. If you forget to call stop then you have a problem but you can check for that within the wrapper classes and give a good failure message.

This does not solve the problem of class scoped data within a thread encapsulating class but this is a problem that exists for all class scoped data of such classes and so you have to learn this lesson independent of 0MQ.

Comments: 13

Add a New Comment