Mike Pearce (eb.weak|ekim#eb.weak|ekim)
(http://www.linkedin.com/pub/mike-pearce/0/76b/610)
This paper gives an overview of the message passing and internal thread structure of 0MQ to facilitate its termination.
To give POSIX-like non-blocking behavior to the 0MQ socket close command an additional thread was created inside 0MQ to handle the shutdown of each socket. This thread was given the name 'reaper' and its job is to continue running to manage the closure of all the system sockets that 0MQ created on your behalf. This means that your application is able to initiate closure via this non-blocking call to zmq_close() and then to continue with its other duties. To ensure that your application does not terminate before the reaper completes its duties, the zmq_term command does a forever block until the reaper confirms to it that all sockets have closed. This will present a problem to you if you fail to call the zmq_close() command for every zmq_socket() command that you did.
Command Sequence and Threading
Chances are you will want your application to have one or more dedicated threads to block on the various socket activities such as waiting for messages to arrive for your application (e.g. zmq_recv). If you read the 0MQ reference guide or man pages for the 0MQ socket commands you will learn that these are not thread safe and so each thread will have to remain responsible for the open and closure of its own socket. A parent thread will then remain responsible for creating the zmq_context, and perhaps for sending messages. If the latter then it will also have sockets to close prior to calling zmq_term.
Internal Messaging.
The following diagram shows us an example of an application with two threads – The main thread which has been used to manage the zmq context (zmq_init and zmq_term) and a reader thread which is blocking on zmq_recv awaiting work.
In order for the application to exit we need to unblock the reader thread and get it to close its open socket. In order to unblock the reader thread the main thread needs to terminate the 0MQ context and it does this via a call to zmq_term.
As mentioned in the 0MQ design paper a reap message is sent to the 0MQ reaper thread via the internal pipes used for 0MQ intercommunication. But what is more interesting for us is the exit message that is dispatched to release our blocking reader thread. Once our reader thread is free to terminate it can initiate the closure of its socket. Remember the 0MQ socket is not thread safe and so we must not close this socket from threads other than the one who opened and did the blocking read on that thread.
As this socket closes it sends a reap message to the reaper who takes over the job of closing that socket. The applications reader thread is now free to terminate itself.
The reaper thread has a list of open sockets and will remain blocked forever until this list is empty. As reap commands arrive for each of the open sockets, the reaper will send pipe_term commands to the 0MQ io_thread. The io_thread is created within 0MQ to manage the sockets IO.
The io_thread must now clean up its pending messages for that socket and in due time it sends a pipe_term_ack command which is intercepted by the reaper thread who then does the real closure call and then ticks it off his list of open sockets.
Eventually the reaper thread will get his empty list and will send a 'done' command to the thread that initiated the zmq_term. The reaper thread then dies.
The zmq_term command is now free to send a final exit command to free up the io_thread and then everything is able to terminate cleanly.
The consequences of not respecting 0MQ Thread Safety
So whats this 'not thread safe' disclaimer all about on these 0MQ sockets then? Well, if you call the 0MQ commands for a single socket from multiple threads you will get race conditions. If you find yourself with the assertion of 'nbytes<>sizeof(commant_t)' and after some debug you discover that nbytes=0 then you will eventually discover that you closed the socket whilst another thread was awaiting data and the OS socket informed you of this by returning 0 bytes but 0MQ is not designed to be awaiting input from closing sockets and interprets 0 bytes as an error. This is not your only race condition. Lets look at another .
In the following diagram we see a race condition where the main thread was used to terminate the socket for which the reader thread is responsible. On this occasion the pipe_term_ack that is supposed to get intercepted by the reaper is instead caught by the reader thread and lost.
When the zmq_term is called it gets to free the reader thread which is then able to exit but zmq_term will then get stuck awaiting the pipe_term_ack and your application will be unable to terminate itself and will leave the system socket open.
Termination via Class Destructors (C++)
There exists a header file that defines a set of class wrappers around the 0MQ interface. The functions used to interface 0MQ then become encapsulated inside instances of context_t, socket_t and message_t. The advantage of this is that the destructor of each class cleans up by calling the appropriate 0MQ function. For example the socket_t constructor calls zmq_socket and its destructor calls zmq_close -
Class | Constructor | Destructor |
---|---|---|
context_t | zmq_init | zmq_term |
socket_t | zmq_socket | zmq_close |
message_t | zmq_msg_init | zmq_msg_close |
This idea is good until applied to a multithreaded solution. In a multi threaded application that distributes the 0MQ socket usage across multiple threads you need to be very careful of how you scope the declaration of these new class objects. If not then you can get into quite a pickle.
The complexity stems from the fact that the 0MQ socket class is not thread safe and if you are not thinking on your feet then you can easily implement a socket_t class solution that violates this. How? Well, lets return the the example of before where you have created a separate thread to allow you to block on the zmq_recv. You then decide to implement this as a new class. This reader class will now interface to 0MQ via socket_t. Your first instinct may be to define an instance of socket_t as private data for your new reader thread class and give it class scope. This will mean that the destructor for socket_t will execute when your reader class destructs and its destructor will execute in the thread that instantiated your reader class.
What you have achieved inadvertantly is that the zmq_socket and zmq_close will execute from inside the parent class address space and the zmq_recv will execute within the derived thread space. To avoid this problem you must always declare instances of socket_t as local data to the workerFunc that gets executed as the new thread.
Another problem is where you declare your instance of context_t. Remember that its destructor is responsible for calling zmq_term. Lets say that you decide to declare this as class scoped data within some parent class. If you bind its destruction to the parent class destruction then you have another fine mess. The problem is that any derived reader thread class will get destructed prior to its parent class but the reader thread will not terminate itself until the zmq_term call has released its blocked recv call. This will not happen until the parent class destructor is called but by then the derived class will have already been destructed.
What this shows is the execution sequence in time. So the controlling class destructor chains the derived class destructor which will call the socket_t destructor which will make the call to close the socket. This is the good news. The destructor of the derived class will exit and the derived class will have been destructed. This is the bad news as it means that the WorkerFunc has also been destroyed as part of the classes destruction.
To avoid this you need to ensure that your instance of context_t can be destructed prior to the parent class destructor. So what you want to implement is a shutdown sequence that stops 0MQ prior to exiting your application and the job of this stop is to call zmq_term.
The following diagram shows a working solution for this. The parent class now has a stop function that first calls the reader class to set a flag that will allow its forever loop to exit. It then destructs its instance of the context_t. The destructor for this will call zmq_term. We already learn't that this will send exit messages to unblock all 0MQ socket blocked calls and so our workerFunc will unblock and terminate. In doing this it will call the destructor for the socket_t and this will call zmq_close for the socket. This will notify the reaper thread within 0MQ who will close the socket and release the block on the zmq_term(). Thus the parent stop function will exit. When the application itself exits, it will call the chain of destructors and the destructor for the reader thread can call the pthread_join() in order to complete the termination of the thread.
The moral of this story is that it is perhaps better to change the definition of these 0MQ wrapper classes so that they provide a stop() interface rather then using their destructor's. If you use the wrapper classes in the current form then there is a hidden danger that someone might get the scoping wrong on the declaration of them. This adds a minefield of future misery for the code maintenance crew.
By adding an explicit stop() to the code that is independent of any destructor call gives you an opportunity to separate the two termination sequences. If you forget to call stop then you have a problem but you can check for that within the wrapper classes and give a good failure message.
This does not solve the problem of class scoped data within a thread encapsulating class but this is a problem that exists for all class scoped data of such classes and so you have to learn this lesson independent of 0MQ.
Can you please provide a working sample code for properly closing/deleting the socket and context for a single threaded environment as well as for multithreaded cases?
Thanks in advance.
It's explained in the Guide, and you can also see how this is done in CZMQ.
Portfolio
@pieterh - can you please be more specific?
Set linger to zero on the socket before closing it, and make sure you close all sockets before terminating the context.
Portfolio
I though the whole idea was to delete the context before closing associated sockets, so blocking recv()'s would be unblocked.
BTW, I solved my issue once I realized that in C++, terminating the context causes recv() to throw with error code 'ETERM'. Come to think of it, this fact makes the 'stop' flag described here completely redundant.
Thanks Mike, learned some valuable stuff here.
I tried following your recommendations, but for some reason, recv() keeps blocking after zmq_term() has been called.
Any ideas?
Thanks,
Boaz
I don't have enough Karma to post a link to the related question I posted in Stack-Overflow, so here is the code listing:
I do have the same problem. The destruction of the context, does not unblock the socket.
It's pretty straightforward. To terminate, you call term on context from main thread. This call is blocking. This causes all your blocking socket threads (blocking reads, pollers, etc) to get unblocked. You proceed to close your sockets on their respective threads. Once all sockets are closed the call to term will unblock. This works every time even if you are sending thousands of messages while terminating. Clean shutdown.
The Java Jeromq's ZContext has a nasty bug in it. If you call close or destroy on the context, it loops on its sockets and destroys them first, then it calls term. This is bad, you get race conditions trying to close on wrong threads. It could be that not everyone is bothered by this, but in my view clean shutdown is important and it makes running tests a lot easier
Sergey
I have written a demo Windows service using POCO library.
As per POCO API the function waitForTerminationRequest() waits for a service termination request.
Now, in this POCO based windows service, I want to start a ZeroMQ library based Message Queue proxy to implement an XSUB/XPUB message queue.
From this service I will be running a xsub-xpub proxy.
More can be learnt here zguide.zeromq.org page:all.
For this I wrote another class ZeroMQProxy, which starts the proxy in the service's main function.
My aim was when I start the service then the zmq::proxy() should have started the proxy and when I stop the service then proxy should be closed along with the sockets.
Problem is zmq::proxy() does not return back. So I am not able to stop the service.
Even if I do net stop <service name> the waitForTerminationRequest() does not receive termination request because of zmq::proxy().
I had tried calling the proxy() in a poco runnable thread. But it did not help.
Then called the thread as
What should I do to stop/close the proxy when I stop the service?
Anyone?
How do I terminate zmq_proxy?
Bump. I do have the exact same problem and I hope someone will answer. Thanks.
I had the same problem and came here to say, I don't think you can. Ultimately I used a Proxy_Steerable, and then I said up another PUB/SUB as they did on their sample. Then I sent the TERMINATE to it, and proxy returned and shutdown. Then I closed other connections, shutdown the context. At this point its a clean termination and you can restart a new context or exit your app.
Here is what I've learned in dealing with this issue:
Finally, see the various answers that address this issue of "infinite wait":
stackoverflow[dot]com/questions/7538988/zeromq-how-to-prevent-infinite-wait