Cannot get this work. Always the following error when using norm or pgm:
zmq.error.ZMQError: Protocol not supported
Hi. I built ZeroMQ manually as prescribed above. I point it to the norm source tree path, and on the make steps it shows that it builds the norm_file. However, I still get a "protocol not supported" issue when trying to run my app. Am I missing something? Also, in the Qt Creator .pro file, how do I add the NORM library? (e.g. LIBS += -lzmq for ZeroMQ)
Hi,
I am planning to support "Reliable Multicast" for Bulk data transfer in my Desktop application by using ZMQ NORM_Engine. Before that i need clarification like "Is NORM require any network hardware dependency or NORM enabled router to make it as reliable transport apart from NORM based Sender & receiver.
If router doesn't support NORM or PGM protocol, Is it useful to integrate ZMQ PGM or ZMQ NORM library.
As from my analysis, PGM is transport layer protocol and it requires PGM enabled Transport layer in router devices. Also PGM supported only in cisco, Juniper network routers.
Please clarify.
ZeroMQ ( version - zeromq-4.1.6 & zeromq-4.2.1) PGM multicast packet receive stuck in between, even Sender still sending the packets without any issue.
If we restart the Receiver, application now receives the packets, but it won't be a solution. I tried with various ZMQ_RATE in both Sender & Receiver side.
Issue:
Sender sends almost 300,000 packets with following socket options, but Receiver stuck in between & not receiving all the packets.
Environment Setup:
( Sender & Receiver connected within the single subnet using D-Link switch. Media speed is 1Gbps )
Code Snippet
Sender: JZMQ ( ZMQ C library, openPGM )
JZMQ Sender:
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PUB);
socket.setRate(80000);
socket.setRecoveryInterval(60*60);
socket.setSendTimeOut(-1);
socket.setSendBufferSize(1024*64);
socket.bind("pgm:local_IP;239.255.0.20:30001");
byte[] bytesToSend = new byte[1024];
int count = 0;
while(count < 300000) {
socket.send(bytesToSend, 0);
count++;
}
//Receiver: ZMQ C++ ( ZMQ C library, openPGM )
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"
int main(int argc, char* argv[]) {
try {
zmq::context_t context(1);
// Socket to talk to server
printf ("Connecting to server…");
zmq::socket_t *s1 = new zmq::socket_t(context, ZMQ_SUB);
int recvTimeout = 3000;
s1->setsockopt(ZMQ_RCVTIMEO,&recvTimeout,sizeof(int));
int recvRate = 80000;
s1->setsockopt(ZMQ_RATE,&recvRate,sizeof(int));
int recvHwMark = 5000;
s1->setsockopt ( ZMQ_RCVHWM, &recvHwMark, sizeof(int) );
int recsec = 60 * 60;
// s1->setsockopt(ZMQ_RECOVERY_IVL,&recsec,sizeof(recsec));
s1->connect("pgm://local_IP;239.255.0.20:30001");
s1->setsockopt (ZMQ_SUBSCRIBE, NULL, 0);
printf ("done. \n");
int seq=0;
while(true) {
zmq::message_t msgbuff;
int ret = s1->recv(&msgbuff,0);
if(!ret)
{
printf ("Received not received timeout\n");
continue;
}
printf ("Seq(%d) Received data size=%d\n",seq,msgbuff.size());
++seq;
}
//Socket close to be handled
}
catch( zmq::error_t &e ) {
printf ("An error occurred: %s\n", e.what());
return 1;
}
catch( std::exception &e ) {
printf ("An error occurred: %s\n", e.what());
return 1;
}
return 0;
}
What can be the issue?
Maybe due to the mismatch in IP addresses used in the sender and receiver. The sender seems to be using .2, while the receiver connects to .20.
@Marc F Sorry:-) that is typo…Both sender and receiver using same multicast IP address as 239.255.0.20.
From src/norm_engine.cpp:
//NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1
NormSetTTL(norm_session, 255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported
Is there any information on which platforms don't support ZMQ_MULTICAST_HOPS very well?
I'm using ZMQ with NORM but I'm stuck at an error. Here is my C++ code:
PUB Sender :
string sendHost = "norm:2,127.0.0.1:5556"; <NormNodeId>,<addr:port>
string tag = "MyTag";
string sentMessage = "HelloWorld";
string fullMessage = tag + sentMessage;
zmq::context_t *context = new zmq::context_t( 20 );
zmq::socket_t publisher( *context, ZMQ_PUB );
zmq_connect( publisher, sendHost.c_str() );
zmq_send( publisher,
fullMessage.c_str(),
fullMessage.size(),
0
);
SUB receiver:
char message[256];
string receiveHost = "norm:1,127.0.0.1:5556"; <NormNodeId>,<addr:port>
string tag = "MyTag";
zmq::context_t *context = new zmq::context_t( 20 );
zmq::socket_t subscriber( *context, ZMQ_SUB );
zmq_bind( subscriber, receiveHost.c_str() );
zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, tag.c_str(), tag.size() );
zmq_recv( subscriber,
message,
256,
0
);
cout « bytesReceived « endl;
cout « message « endl;
When I run the above code on a multi-threaded environment I get the following error printed to my console:
Proto Error: ProtoSocket::Bind() bind() error: Address already in use
Proto Fatal: NormSession::Open() error: rx_socket.Bind() error
Address already in use (src/session_base.cpp:692)
Aborted (core dumped)
I added some debugging statements. First I ran the receiver which binded successfully. After that I ran the sender which failed after the connect statement. I don't why is the error that bind failed while it was supposed to be just a connect request, not a bind one. Any help?