NORM Transport Notes
A new transport option is available in libzmq. The "norm_engine.hpp" and "norm_engine.cpp" files provide an implementation of a NACK-Oriented Reliable Multicast (NORM) transport protocol option for ZeroMQ. NORM is an IETF open standards protocol specified in RFC 5740 and supporting documents. The Naval Research Laboratory (NRL) provides an open source reference implementation that is hosted at http://www.nrl.navy.mil/itd/ncs/products/norm.
NORM supports reliable data delivery over IP multicast but also supports unicast (point-to-point) data transfers. NORM operates on top of the User Datagram Protocol (UDP) and supports reliability via a NACK-based Automated Repeat Request (ARQ) that uses packet erasure coding for very efficient group communication. NORM also provides for automated TCP-friendly congestion control and mechanisms for support end-to-end flow control. The NRL NORM implementation can also be configured to provide basic UDP-like best effort transport service (with no receiver feedback) and this can be enhanced by adding some amount application-settable proactive forward error correction (FEC) packets to the transmission. I.e., by default NORM only sends 'reactive' FEC repair packets in response to NACKs but can also be configured to proactively send added repair packets for a level of reliability without any feedback from the receivers. In addition to its TCP-friendly congestion control, NORM can also be configured for fixed-rate operation and the NRL implementation supports some additional automated congestion control options suitable for use in bit error prone wireless communication environments. While its reliable ARQ operation is principally NACK-based (negative acknowledgement when packet loss is detected), it also supports optional positive acknowledgment (ACK) from receivers that can be used for delivery confirmation and explicit flow control.
By keeping the end-to-end transport aspects of reliability, congestion control, and flow control as separable components of protocol operation, NORM can be configured to meet a wide variety of application data delivery needs. This ranges from "object-based" delivery of bulky content items like files and large memory segments to messaging and even real-time, reliable streaming. NORM has been applied in experimental applications for file delivery, messaging (including serverless multicast chat and file/image sharing), TCP flow proxying, and real-time video and even Voice over IP (VoIP) streaming.
The NRL implementation provides a procedural API compatible with C/C++ and also provides Java and Python (pynorm) language bindings. The NORM code can be built on Linux, BSD/MacOSX, Windows, iOS, Android, and several other Unix operating system variants. NORM can be built as a static or shared library (e.g. libnorm.so, libnorm.dylib, norm.dll, etc) and a "normApi.h" header file is provided for the C/C++ API. The "norm_engine" addition to libzmq uses this API. A comprehensive NORM Developer's reference guide and a growing number of simple examples is included in the source code distribution. NRL also hosts a NORM developer's mailing list.
This initial use of NORM within ZeroMQ supports the ZMQ_PUB/ZMQ_SUB and ZMQ_XPUB/ZMQ_XSUB socket types in a similar manner to the Pragmatic General Multicast (PGM) transport option that was already present in the ZeroMQ code. A notable difference is NORM's support of automated TCP-friendly congestion control although NORM can also be configured to support fixed-rate (or bounded-rate) operation if desired. Unicast addresses can also be used for NORM session addresses and "many-to-one" sender to receiver relationships can be established with a single NORM ZeroMQ subscriber receiving unicast messages from multiple senders. The many additional transport options that NORM provides (e.g. UDP-like service, etc) could also be supported with extension of ZeroMQ socket options or establishment of multiple NORM protocol profiles. It is hoped that the ZeroMQ community will be interested in exploring these options.
The current NORM code has been tested with the Python pyzmq API binding. The basic transport endpoint format for the "norm_engine" is similar to that used for PGM consisting of:
norm://[<interfaceName>;]<addr>:<port>
where the optional <interfaceName> field provides a means to specify a specific host network interface for IP multicast group join and packet transmissions. The <addr> can be an IP unicast or multicast address (NORM supports IPv4 and IPv6). A ZeroMQ subscriber can use its loopback address (e.g. 127.0.0,1) for unicast sessions and multiple ZeroMQ publishers can "connect" to that subscriber. In fact it is even possible for a NORM multicast subscriber to also terminate unicast connections for mixed-mode (unicast and multicast) sessions.
One aspect of the NORM protocol is that NORM session participants must use unique "NormNodeId" identifiers for the NORM transport. The default behavior of the NORM implementation is to auto-select a NormNodeId based on the host IP address. However, if multiple NORM nodes are co-located on the same host, this can be problematic. As an initial means to resolve this issue, the "norm_engine" transport endpoint specifier also has an optional <nodeId> field that can be inserted, with a comma delimiter, as the first field. Thus, the full endpoint specification format is:
norm://[<nodeId>,][<interfaceName>;]<addr>:<port>
where both the <nodeId> and <interfaceName> are optional fields. The NormNodeId is a 32-bit identifier. Its purpose is similar to the Real-time Protocol (RTP) synchronization source (SSRC) identifier to provide an address-independent means to identify the multicast source in potentially multi-homed or asymmetric network environments. A future version of the "norm_engine" may use a hash of the ZeroMQ Identity or other means to establish unique NormNodeId values for multicast participants.
The following snippets of Python "pyzmq" code illustrates simple NORM-based ZeroMQ subscription and publication:
#Subscriber
import pyzmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind("norm://2,224.1.2.3:5556")
socket.setsockopt(zmq.SUBSCRIBE, "ZMQ-Test")
while True:
string = socket.recv()
print string
#Publisher
import pyzmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("norm://1,224.1.2.3:5556")
i = 1
while True:
topic = "ZMQ-Test"
message = "Hello, NORM " + str(i) + " …"
socket.send("%s %s" % (topic, message))
i += 1
time.sleep(1)
Note, as with the PGM transport option, the ZeroMQ socket "connect()" and "bind()" methods are interchangeable since NORM is a "connection-free" protocol. I.e., it's not really "connectionless" since receivers do establish state on a per-sender basis, but no explicit connection setup handshake is required to begin reliable data transfer.
Regarding the reliable transfer state establishment, NORM receivers (ZeroMQ subscribers in this case) "synchronize" to NORM sender (ZMQ publishers) transmissions upon reception of NORM packets. Because it was designed for IP multicast, "late-joining" receivers can synchronize to a NORM sender that is already in progress. NORM supports multiple receiver "synchronization policies". The current "norm_engine" is hard coded to use the NORM_SYNC_STREAM policy. This policy allows the newly joining receiver to request retransmission of all content the sender has buffered/cached (e.g. possibly to the beginning of the stream). Another relevant NORM sync policy is NORM_SYNC_CURRENT where the receiver attempts reliable reception for current and newer content the sender is transmitting and does not request retransmission of older content. Synchronization policy choice may be something that future versions of the ZeroMQ "norm_engine" may wish to expose. Similarly, sender/receiver buffer and cache sizes (including UDP socket buffer sizes) are configurable along with many other details of NORM transport operation. The NORM_OBJECT_STREAM transport mode was selected because it enforces in-order delivery of transmitted content from the sender to the receiver(s). NORM also supports other "bulk" object data delivery models that can provide out-of-order (but still reliable) message/item delivery depending on network conditions. This can allow lower latency delivery of some messages if message order is not a requirement.
The following list summarizes some of key features of NORM and this "norm_engine" extension to libzmq:
- NORM supports reliable multicast and unicast via a highly-efficient NACK-based ARQ protocol using FEC-based packet erasure coding for repair transmissions in response to negative acknowledgments. NORM rides on top of UDP. It can also provide UDP-like "best effort" service and this can be supplemented with added FEC packet erasure coding packets for something "better than best effort" at the expense of modest transmission overhead.
- NORM supports fixed-rate and automated, TCP-friendly congestion control operation. NORM also supports some experimental congestion control options useful for wireless network environments. The automated congestion control operation can also be rate-bounded (min/max rate excursion) if desired.
- NORM's separable reliability, congestion control, and flow control aspects offer support for a wide range of application data delivery needs. This has included everything from bulk file delivery via IP multicast to unicast TCP stream proxying and even real-time quasi-reliable video and VoIP delivery.
- The current "norm_engine" assumes a fairly specific form of NORM transport options. It may be desirable to expose additional NORM features and transport modalities via the ZeroMQ API. Some examples include:
- Different NORM receiver sync policies and sender/receiver buffering and cache sizes
- Fixed-rate vs. automated congestion control options
- Specifying the NORM receiver set to the sender to allow for explicit positive acknowledgment (ACK) based flow control and delivery confirmation. While the current "norm_engine" default NORM delivery model is highly reliable, at more extreme bandwidth*delay, packet loss factors, the ACK-based flow control provides a higher guarantee of data delivery. The ACK mechanism NORM uses easily scales well to multicast groups with 100's of nodes.
- NORM's ability to alternatively provide UDP-like "best effort" and "better than best effort" (using packet erasure coding) delivery services for applications. This would include control of the NORM FEC encoding parameters.
- NORM file caching, bulk file delivery options
- Consideration of NORM use for other ZMQ socket types in addition to pub/sub.
The current NORM API is fairly "low level" where some of the likely useful "use patterns" (e.g. positive acknowledgment management) are left to the application programmer to implement. However, the NORM API will continue to evolve to expose additional features as needed and to provide more convenience functions for implementing common "use patterns" in a simpler manner. The ZeroMQ API provides a simple means to make use of NORM for different purposes. As the ZeroMQ community considers and explores NORM use, the evolution of the NORM API and its ZeroMQ adaptation can be expanded.
Build Notes:
To build libzmq with the NORM protocol extension, the NORM source code tree is needed for the "normApi.h" header file for libzmq code compilation and to build the shared NORM library for linkage. The current NORM source code tree is available at http://downloads.pf.itd.nrl.navy.mil/norm/. Source code version 1.5b3 (and higher) support the functions used by the current ZeroMQ "norm_engine".
Additionally, NORM source code "nightly snapshots" located at http://downloads.pf.itd.nrl.navy.mil/norm/nightly_snapshots/ are available when updated source code tarballs are note yet available. The NORM nightly snapshot code also requires the current Protolib "nightly snapshot" available at http://downloads.pf.itd.nrl.navy.mil/protolib/nightly_snapshots/. Protolib is a cross-platform support library used by NORM that provides an abstraction layer for operating system specific calls. Normally, the "protolib" source tree is included as part of the "norm" source tree release but is not included in the nightly snapshots. The "protolib" source directory tree should be placed in the top level of the "norm" source directory or a symbolic link to it provided there.
The NORM library can be built with platform-specific Makefiles in the "norm/makefiles" directory or with the "waf" build tool included in the "norm" distribution. There are also Windows Visual Studio "solution" (.sln) IDE files in "norm/makefiles/win32" and Android NDK build files in "norm/makefiles/android". For example, for a Linux build:
cd norm/makefiles
make -f Makefile.linux libnorm.so
Finally, for libzmq with NORM extension (after using the "autogen.sh" script to generate the "configure" script and build files) the commands:
./configure —with-norm=<norm source tree path>
make
will build the library. The shared NORM library (libnorm.so or libnorm.dylib, etc) also needs to be placed in a system location like /usr/local/lib/ so it can be found at run time. Simpler NORM build and installation options will be provided in the future.
make -f [need a space]
Cannot get this work. Always the following error when using norm or pgm:
zmq.error.ZMQError: Protocol not supported
Hey, you need to build zeromq manually with ./configure —with-norm=<norm source tree path>
Hi. I built ZeroMQ manually as prescribed above. I point it to the norm source tree path, and on the make steps it shows that it builds the norm_file. However, I still get a "protocol not supported" issue when trying to run my app. Am I missing something? Also, in the Qt Creator .pro file, how do I add the NORM library? (e.g. LIBS += -lzmq for ZeroMQ)
Hi,
I am planning to support "Reliable Multicast" for Bulk data transfer in my Desktop application by using ZMQ NORM_Engine. Before that i need clarification like "Is NORM require any network hardware dependency or NORM enabled router to make it as reliable transport apart from NORM based Sender & receiver.
If router doesn't support NORM or PGM protocol, Is it useful to integrate ZMQ PGM or ZMQ NORM library.
As from my analysis, PGM is transport layer protocol and it requires PGM enabled Transport layer in router devices. Also PGM supported only in cisco, Juniper network routers.
Please clarify.
ZeroMQ ( version - zeromq-4.1.6 & zeromq-4.2.1) PGM multicast packet receive stuck in between, even Sender still sending the packets without any issue.
If we restart the Receiver, application now receives the packets, but it won't be a solution. I tried with various ZMQ_RATE in both Sender & Receiver side.
Issue:
Sender sends almost 300,000 packets with following socket options, but Receiver stuck in between & not receiving all the packets.
Environment Setup:
( Sender & Receiver connected within the single subnet using D-Link switch. Media speed is 1Gbps )
Code Snippet
Sender: JZMQ ( ZMQ C library, openPGM )
JZMQ Sender:
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PUB);
socket.setRate(80000);
socket.setRecoveryInterval(60*60);
socket.setSendTimeOut(-1);
socket.setSendBufferSize(1024*64);
socket.bind("pgm:local_IP;239.255.0.20:30001");
byte[] bytesToSend = new byte[1024];
int count = 0;
while(count < 300000) {
socket.send(bytesToSend, 0);
count++;
}
//Receiver: ZMQ C++ ( ZMQ C library, openPGM )
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"
int main(int argc, char* argv[]) {
try {
zmq::context_t context(1);
// Socket to talk to server
printf ("Connecting to server…");
zmq::socket_t *s1 = new zmq::socket_t(context, ZMQ_SUB);
int recvTimeout = 3000;
s1->setsockopt(ZMQ_RCVTIMEO,&recvTimeout,sizeof(int));
int recvRate = 80000;
s1->setsockopt(ZMQ_RATE,&recvRate,sizeof(int));
int recvHwMark = 5000;
s1->setsockopt ( ZMQ_RCVHWM, &recvHwMark, sizeof(int) );
int recsec = 60 * 60;
// s1->setsockopt(ZMQ_RECOVERY_IVL,&recsec,sizeof(recsec));
s1->connect("pgm://local_IP;239.255.0.20:30001");
s1->setsockopt (ZMQ_SUBSCRIBE, NULL, 0);
printf ("done. \n");
int seq=0;
while(true) {
zmq::message_t msgbuff;
int ret = s1->recv(&msgbuff,0);
if(!ret)
{
printf ("Received not received timeout\n");
continue;
}
printf ("Seq(%d) Received data size=%d\n",seq,msgbuff.size());
++seq;
}
//Socket close to be handled
}
catch( zmq::error_t &e ) {
printf ("An error occurred: %s\n", e.what());
return 1;
}
catch( std::exception &e ) {
printf ("An error occurred: %s\n", e.what());
return 1;
}
return 0;
}
What can be the issue?
Maybe due to the mismatch in IP addresses used in the sender and receiver. The sender seems to be using .2, while the receiver connects to .20.
@Marc F Sorry:-) that is typo…Both sender and receiver using same multicast IP address as 239.255.0.20.
From src/norm_engine.cpp:
Is there any information on which platforms don't support ZMQ_MULTICAST_HOPS very well?
I'm using ZMQ with NORM but I'm stuck at an error. Here is my C++ code:
PUB Sender :
string sendHost = "norm:2,127.0.0.1:5556"; <NormNodeId>,<addr:port>
string tag = "MyTag";
string sentMessage = "HelloWorld";
string fullMessage = tag + sentMessage;
zmq::context_t *context = new zmq::context_t( 20 );
zmq::socket_t publisher( *context, ZMQ_PUB );
zmq_connect( publisher, sendHost.c_str() );
zmq_send( publisher,
fullMessage.c_str(),
fullMessage.size(),
0
);
SUB receiver:
char message[256];
string receiveHost = "norm:1,127.0.0.1:5556"; <NormNodeId>,<addr:port>
string tag = "MyTag";
zmq::context_t *context = new zmq::context_t( 20 );
zmq::socket_t subscriber( *context, ZMQ_SUB );
zmq_bind( subscriber, receiveHost.c_str() );
zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, tag.c_str(), tag.size() );
zmq_recv( subscriber,
message,
256,
0
);
cout « bytesReceived « endl;
cout « message « endl;
When I run the above code on a multi-threaded environment I get the following error printed to my console:
Proto Error: ProtoSocket::Bind() bind() error: Address already in use
Proto Fatal: NormSession::Open() error: rx_socket.Bind() error
Address already in use (src/session_base.cpp:692)
Aborted (core dumped)
I added some debugging statements. First I ran the receiver which binded successfully. After that I ran the sender which failed after the connect statement. I don't why is the error that bind failed while it was supposed to be just a connect request, not a bind one. Any help?