Butterfly example



This tutorial explores the strategies to use to break a single monolithic application into a set of components forming distributed computational environment. The main focus is on the overall performance of the system and on the scaling issues.


Monolithic application

Application is a software component that is fed by input data ("requests") producing output data ("replies") in the process:


The life-cycle of a single transaction consists of input phase (fetching the request to process), processing phase itself and output phase (posting the reply):


Throughput of application is measured by number of requests it can process during fixed time interval. With monolithic application throughput is equal to 1/x where x is the overall time needed to process a single request (16 us in this case).

To compute the throughput we'll use following equation:

\begin{align} throughput = {{microseconds \textrm{-} per \textrm{-} second \over {input \textrm{-} time + processing \textrm{-} time + output \textrm{-} time}}} \end{align}

The equation yields 1,000,000 / (4 + 8 + 4) = 62,500 requests processed per second.


To improve the throughput we can divide the processing into two consecutive steps (named "component1" and "component2") and process each of them on a separate box:


Following diagram shows how processing looks like in such pipelined architecture. Let's suppose that from the 8 microseconds our monolithic application needs to process a transaction, component1 will process initial 6 microseconds while component2 will take care of remaining 2 microseconds.

Moreover we have to pass the output from component1 to component2. Let's say it takes 2 microseconds to send the message from component1 and another 2 microseconds to receive the message in component2:


Using the equation above we can find out what will be the throughput of component1 and component2:

Component1: 1,000,000 / (4 + 6 + 2) = 83,333

Component2: 1,000,000 / (2 + 2 + 4) = 125,000

Component2 is able to process 125,000 transactions per second, however, it gets only 83,333 transactions from component1, thus it has to sleep occasionally and the overall throughput of the system will be 83,333 transactions per second.

We've used two computers to boost out application up. We would naively expect that with twice the processing power, we would get twice the throughput of the original application. What we've seen instead is that the boost achieved was 33%. What's the problem here?

Firstly, with pipelined architecture the weakest link determines performance of the system as a whole. This way fast component2 wasn't able to improve performance of the distributed application, because the bottleneck was in slow component1. Later on we'll describe how this kind of problem can be solved using parallelisation.

Secondly, actual boost in the pipelined architecture is heavily dependent on the ratio of the processing time and the time needed to send/receive the messages. If send/receive time is negligible when compared to the processing time, throughput of your application will scale linearly with the number of steps in your pipeline. For example, if the overall processing time of the original application was 1 second per transaction splitting the work fairly between two pipelined steps would increase the throughput by 99.997% (almost exactly twice).

At this point is should be noted that send/receive times in ØMQ are extremely low (fractions of microsecond) and thus pipelining of components doing as little work as several tens of microseconds can still yield improved performance.


Alternative approach to distibution of the application is parallelisation. Basically it means that instead of partitioning the work done within single transaction we'll process several transactions at the same time. This architecture is also known as load-balancing:


Throughput scales linearly with the number of parallel application instances:

\begin{align} throughput = {{throughput \textrm{-} per \textrm{-} application * number \textrm{-} of \textrm{-} application \textrm{-} instances \end{align}

In our case it'll be 62,500 * 3 = 187,500 transactions per second.

Pipelining vs. Parallelisation

Pipelining pros:

  • Pipelined workflow often closely matches business processes in the enterprise (task is worked on in department A, then transferred to department B etc.)
  • Pipelining preserves transaction ordering. Reply for the first request will arrive first, reply for the second one second etc.

Pipelining cons:

  • Pipelining doesn't scale - if you partition your application to 4 steps, you can never scale it up to 10 boxes without rewriting the code.
  • Each step in the pipeline adds latency to the transaction - the time needed to pass output from step N to step N+1. The more steps in the pipeline the worse the latency.

Parallelisation pros:

  • Parallelisation scales. You can always throw more boxes on the problem if the current setup is not able to handle the load.
  • Latency overhead is constant. Irrespective of how many parallel instances are running, latency stays the same.

Parallelisation cons:

  • Parallelisation doesn't preserve ordering - you may get reply to request 2 before reply to request 1.
  • If parallel instances contend for a shared resource, adding more instances may not improve the performance in any way.

Conclusion: Use parallelisation wherever possible. Use pipelining only where parallelism won't work.

Combining pipelining and parallelisation

In real-world environments you won't use any of the two architectures exclusively. What you'll encounter in most cases would be the mix of both.

Recall the pipelined architecture with component1 and component2. Although component2 was almost twice as fast as component1, overall throughput was kept low by slow component1. The obvious solution is to run two instances of component1 per component2:


Each instance of component1 computes 83,333 transactions per second, giving total of 166,666 transactions per second for two instances. The single instance of component2 processes 125,000 transactions per second. Throughput of the pipelined system is given by the throughput of the weakest link, thus we get min (166666, 125000) = 125000 - the throughput of the system as a whole is 125,000 transactions per second.

The "butterfly" example

Butterfly example (to be found in directory examples/butterfly) is an implementation of pipelined system consisting of two steps, where each step is parallelised so that you can run arbitrary number of component instances to handle the load:


The workload of the example is waiting rather than doing any real work. In other words the example allows you to wait much faster than you would be able to wait in a monolithic application. Although funny, the goal is to demonstrate the boost you can get from a distributed application without having to bother about CPU exhaustion issues. This way you can run the test even on a single laptop. If the workload was CPU-greedy you would have to test it on real distributed environment with multiple boxes.

So, when running a worker component (component1 or component2) you can specify the time to wait for each transaction to simulate actual processing.

Once all the transactions are processed, receive_replies component sends a notification to the send_request component (dotted line on the left). send_requests component then computes and prints out the overall time it took to run the test.

The code

The architecture of the example involves three global singleton components (send_requests, intermediate and receive_replies) and arbitrary number of instances of two worker components (component1 and component2).

Singleton components declare global queues and global exchanges (see the diagram above). Worker components declare local queues and local exchanges and bind them to the appropriate global objects exposed by the singleton components. This way you can use as many worker component instances to the system as needed.

First, let's have a look how global objects are created.

In send_requests component single global exchange is created to distribute the messages to the first phase of processing (instances of component1):

int eid = api->create_exchange ("SEND_REQUESTS_OUT",
    scope_global, out_interface, io, 1, &io, style_load_balancing);

Note that exchange is created with style_load_balancing flag. It means that messages should be distributed between the bound queues in round-robin manner rather than distribute each message to every queue.

Once user hits a key, test is started and N request are issued by send_requests component (requests are dummy messages 100 bytes long):

for (int counter = 0; counter != transaction_count; counter ++) {
    message_t msg (100);
    api->send (eid, msg);

In intermediate component both global exchange and global queue are created to gather messages from component1's and distribute them in round-robin fashion (style_load_balancing) to component2's:

api->create_queue ("INTERMEDIATE_IN", scope_global, in_interface,
    io, 1, &io);
int eid = api->create_exchange ("INTERMEDIATE_OUT", scope_global,
    out_interface, io, 1, &io, style_load_balancing);

Here's the immediate's message loop. It simply fetches a messages from one of component1s and forwards it to one of component2s:

while (true) {
    message_t msg;
    api->receive (&msg);
    api->send (eid, msg);

Finally, in receive_replies component, global queue is created to collect all the replies from all the component2's:

api->create_queue ("RECEIVE_REPLIES_IN", scope_global, in_interface,
    io, 1, &io);

In the message loop, all the messages are received and discarded straight away:

for (int counter = 0; counter != transaction_count; counter ++) {
    message_t msg;
    api->receive (&msg);

At this point we are done with all the singletons. We'll examine the worker components now.

component1 creates local queue and binds it to the global exchange exposed by send_requests component. The queue provides the way to fetch new transaction requests:

api->create_queue ("COMPONENT1_IN");

component1 creates a local exchange as well and binds it to the global queue exposed by intermediate component. The exchange provides the way to push transaction replies further down the pipeline:

int eid_dest = api->create_exchange ("COMPONENT1_OUT");

The message loop of component1 fetches a request, processes it (actually waits for a specified time to simulate the processing) and forwards the reply to the next component (intermediate):

while (true) {
    message_t msg;
    api->receive (&msg);
    usleep (processing_time * 1000);
    api->send (eid_dest, msg);

component2 works in the exactly same way as component1 does the only difference being that requests are fetched from intermediate component rather than from send_requests component and that replies are sent to receive_replies component rather than to intermediate component.

Building it

ØMQ build system is able to build butterfly example out of the box:

$ ./configure --with-butterfly
$ make

Running it

First, start zmq_server, say on box SVR001:


Once zmq_server is running, start the singleton components: send_requests, intermediate and recieve_replies. Let's suppose send_request is to be run on box A with IP address, intermediate on box B with IP address and receive_replies on box C with IP address The test will consist of processing 100 transactions.

Run send_requests on box A:

$ send_requests SVR001 100

Run intermediary on box B:

$ intermediate SVR001

Run receive_replies on box C:

$ receive_replies SVR001 100

Test results

Now that we have all the global services running, we can start arbitrary number of component1's and component2's.

At the moment we are going to start just a single component1 and a single component2.

We'll assume that our monolithic application required 1 second to process a transaction and that workload is distributed fairly between component1 and component2. Thus, component1 processes first half of a transaction in 1/2 seconds (500 milliseconds) and component2 processes second half of a transaction in another 1/2 second:

$ component1 SVR001 500
$ component2 SVR001 500

Hit a key in send_requests window to start the test:

$ send_requests SVR001 100
Hit ENTER to start!
Throughput: 1.928 transactions/second.

The result is pretty consistent with the theory introduced above. If the original monolithic application was able to process 1 transaction/second. Splitting the work into two equally demanding pipelined steps yields almost twice the throughput.

Now let's try the same thing with two instances of component1 and two instances of component2:

$ send_requests SVR001 100
Hit ENTER to start!
Throughput: 3.780 transactions/second.

As expected we've got approximately fourfold throughput when compared to original monolithic application.

With three instances of component1 and three instances of component2 we would expect approximately six transactions a second:

$ send_requests SVR001 100
Hit ENTER to start!
Throughput: 5.584 transactions/second.

Now, let's test some sub-optimal setups.

With two instances of component1 and single instance of component2 we would assume that bottleneck of passing all the transactions through a single component2 would undo all the advantage we've got from having two instances of component1:

$ send_requests SVR001 100
Hit ENTER to start!
Throughput: 1.972 transactions/second.

We were right. Throughput of this setup is almost the same as throughput of single component1 and single component2 (1.928 transaction/second).

Next, let's try to split the original workload in an unbalanced manner. Let the ratio be 25/75 rather than 50/50 as it was so far. Start single instance of component1 with processing time set to 250 milliseconds:

$ component1 SVR001 250

Start single instance of component2 with processing time set to 750 milliseconds:

$ component2 SVR001 750

Run the test:

$ send_requests SVR001 100
Hit ENTER to start!
Throughput: 1.297 transactions/second.

As expected, throughput increase is much lower when compared to the case of fairly split workload (1.928 transaction/second).


This tutorial is actually a brief introduction to basic supercomputing. ØMQ provides you with the capabilities similar to high-performance computing tools like MPICH, however, the interface is intentionally modeled to resemble standard business messaging brokers (a.k.a. queueing systems). This way ØMQ provides supercomputing-like functionality with flat learning curve for enterprise developers.

Written: 1234649635|%Y.%m.%e
Revised: 1323687140|%Y.%m.%e

If you found this page useful, please rate it up so others will find it.

rating: +1+x

Edit this page | Tags | Print

See also

Show summary of tutorials category

Who's watching this page?

Jay LorenzoJay Lorenzo
Fernando J QuinteroFernando J Quintero
Tarun JangraTarun Jangra
Daniel MooreDaniel Moore

... and more

Watch: site | category | page