ZeroMQ synchronized PUB-SUB: Subscriber hangs with synchronization but works normally otherwise
up vote
1
down vote
favorite
I'm using ZeroMQ to establish a publisher/subscriber communication model.
The publisher creates a zmq context and then opens a socket with the PUB
communication pattern. It then binds to a port, as the TCP transport protocol is used. For the synchronization a separate socket is opened with the REP communication pattern that binds in a different path. Unless a synchronization request in msg = syncservice.recv() is received, the program cannot continue. It then does some rudimentary work and starts over again. Here's the code for the publisher:
import pickle, zmq, random, string
# Wait for 1 subscriber
SUBSCRIBERS_EXPECTED = 1
def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
while True:
try:
arguments =
data =
context = zmq.Context()
# Socket to talk to clients
publisher = context.socket(zmq.PUB)
# set SNDHWM, in case of slow subscribers
publisher.sndhwm = 1100000
publisher.bind('tcp://*:5561')
# Socket to receive signals
syncservice = context.socket(zmq.REP)
syncservice.bind('tcp://*:5562')
# Get synchronization from subscribers
subscribers = 0
while subscribers < SUBSCRIBERS_EXPECTED:
# wait for synchronization request
msg = syncservice.recv()
# send synchronization reply
syncservice.send(b'')
subscribers += 1
for n in range(1000):
for i in range(random.randrange(1, 6)):
arguments[i] = randomword(random.randrange(2, 10))
data['func_name_' + str(n)] = randomword(8)
data['arguments_' + str(n)] = arguments
data_string = pickle.dumps(data)
publisher.send(data_string)
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
The subscriber functions pretty much the same way the publisher does, albeit
from a subscriber perspective. Here's the code for the subscriber:
import pickle, zmq, pprint, time
context = zmq.Context()
# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5561')
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(1)
# Synchronize with publisher
syncclient = context.socket(zmq.REQ)
syncclient.connect('tcp://localhost:5562')
# Initialize poll set
poller = zmq.Poller()
poller.register(syncclient, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
# send a synchronization request
syncclient.send(b'')
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
# wait for synchronization reply
if syncclient in socks:
syncclient.recv()
print('Sync')
if subscriber in socks:
msg = subscriber.recv()
data = pickle.loads(msg)
pprint.pprint(data)
syncclient.send(b'')
The desired result would be for the publisher to publish endlessly, while the
subscriber continually receives and prints everything. If I remove the
synchronization part, everything runs as expected. If I keep the synchronization
part the subscriber hangs after a number of transmissions. The interesting thing
is that if I send a keyboard interrupt (Ctrl-C) and then restart the subscriber,
it will receive a couple of transmissions again and hang again and so on and so
forth.
I have tried different high-watermark settings, but it didn't make any difference. I have tried closing the sockets and terminating the context after every loop. I've tested if the overhead from printing or pickling (serializing) was too much, but it wasn't it either. I have also modified the suicidal snail example to work in this case, but the subscriber didn't die. What am I missing? (Python 3 is used for every example)
python zeromq pyzmq
add a comment |
up vote
1
down vote
favorite
I'm using ZeroMQ to establish a publisher/subscriber communication model.
The publisher creates a zmq context and then opens a socket with the PUB
communication pattern. It then binds to a port, as the TCP transport protocol is used. For the synchronization a separate socket is opened with the REP communication pattern that binds in a different path. Unless a synchronization request in msg = syncservice.recv() is received, the program cannot continue. It then does some rudimentary work and starts over again. Here's the code for the publisher:
import pickle, zmq, random, string
# Wait for 1 subscriber
SUBSCRIBERS_EXPECTED = 1
def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
while True:
try:
arguments =
data =
context = zmq.Context()
# Socket to talk to clients
publisher = context.socket(zmq.PUB)
# set SNDHWM, in case of slow subscribers
publisher.sndhwm = 1100000
publisher.bind('tcp://*:5561')
# Socket to receive signals
syncservice = context.socket(zmq.REP)
syncservice.bind('tcp://*:5562')
# Get synchronization from subscribers
subscribers = 0
while subscribers < SUBSCRIBERS_EXPECTED:
# wait for synchronization request
msg = syncservice.recv()
# send synchronization reply
syncservice.send(b'')
subscribers += 1
for n in range(1000):
for i in range(random.randrange(1, 6)):
arguments[i] = randomword(random.randrange(2, 10))
data['func_name_' + str(n)] = randomword(8)
data['arguments_' + str(n)] = arguments
data_string = pickle.dumps(data)
publisher.send(data_string)
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
The subscriber functions pretty much the same way the publisher does, albeit
from a subscriber perspective. Here's the code for the subscriber:
import pickle, zmq, pprint, time
context = zmq.Context()
# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5561')
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(1)
# Synchronize with publisher
syncclient = context.socket(zmq.REQ)
syncclient.connect('tcp://localhost:5562')
# Initialize poll set
poller = zmq.Poller()
poller.register(syncclient, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
# send a synchronization request
syncclient.send(b'')
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
# wait for synchronization reply
if syncclient in socks:
syncclient.recv()
print('Sync')
if subscriber in socks:
msg = subscriber.recv()
data = pickle.loads(msg)
pprint.pprint(data)
syncclient.send(b'')
The desired result would be for the publisher to publish endlessly, while the
subscriber continually receives and prints everything. If I remove the
synchronization part, everything runs as expected. If I keep the synchronization
part the subscriber hangs after a number of transmissions. The interesting thing
is that if I send a keyboard interrupt (Ctrl-C) and then restart the subscriber,
it will receive a couple of transmissions again and hang again and so on and so
forth.
I have tried different high-watermark settings, but it didn't make any difference. I have tried closing the sockets and terminating the context after every loop. I've tested if the overhead from printing or pickling (serializing) was too much, but it wasn't it either. I have also modified the suicidal snail example to work in this case, but the subscriber didn't die. What am I missing? (Python 3 is used for every example)
python zeromq pyzmq
add a comment |
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I'm using ZeroMQ to establish a publisher/subscriber communication model.
The publisher creates a zmq context and then opens a socket with the PUB
communication pattern. It then binds to a port, as the TCP transport protocol is used. For the synchronization a separate socket is opened with the REP communication pattern that binds in a different path. Unless a synchronization request in msg = syncservice.recv() is received, the program cannot continue. It then does some rudimentary work and starts over again. Here's the code for the publisher:
import pickle, zmq, random, string
# Wait for 1 subscriber
SUBSCRIBERS_EXPECTED = 1
def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
while True:
try:
arguments =
data =
context = zmq.Context()
# Socket to talk to clients
publisher = context.socket(zmq.PUB)
# set SNDHWM, in case of slow subscribers
publisher.sndhwm = 1100000
publisher.bind('tcp://*:5561')
# Socket to receive signals
syncservice = context.socket(zmq.REP)
syncservice.bind('tcp://*:5562')
# Get synchronization from subscribers
subscribers = 0
while subscribers < SUBSCRIBERS_EXPECTED:
# wait for synchronization request
msg = syncservice.recv()
# send synchronization reply
syncservice.send(b'')
subscribers += 1
for n in range(1000):
for i in range(random.randrange(1, 6)):
arguments[i] = randomword(random.randrange(2, 10))
data['func_name_' + str(n)] = randomword(8)
data['arguments_' + str(n)] = arguments
data_string = pickle.dumps(data)
publisher.send(data_string)
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
The subscriber functions pretty much the same way the publisher does, albeit
from a subscriber perspective. Here's the code for the subscriber:
import pickle, zmq, pprint, time
context = zmq.Context()
# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5561')
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(1)
# Synchronize with publisher
syncclient = context.socket(zmq.REQ)
syncclient.connect('tcp://localhost:5562')
# Initialize poll set
poller = zmq.Poller()
poller.register(syncclient, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
# send a synchronization request
syncclient.send(b'')
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
# wait for synchronization reply
if syncclient in socks:
syncclient.recv()
print('Sync')
if subscriber in socks:
msg = subscriber.recv()
data = pickle.loads(msg)
pprint.pprint(data)
syncclient.send(b'')
The desired result would be for the publisher to publish endlessly, while the
subscriber continually receives and prints everything. If I remove the
synchronization part, everything runs as expected. If I keep the synchronization
part the subscriber hangs after a number of transmissions. The interesting thing
is that if I send a keyboard interrupt (Ctrl-C) and then restart the subscriber,
it will receive a couple of transmissions again and hang again and so on and so
forth.
I have tried different high-watermark settings, but it didn't make any difference. I have tried closing the sockets and terminating the context after every loop. I've tested if the overhead from printing or pickling (serializing) was too much, but it wasn't it either. I have also modified the suicidal snail example to work in this case, but the subscriber didn't die. What am I missing? (Python 3 is used for every example)
python zeromq pyzmq
I'm using ZeroMQ to establish a publisher/subscriber communication model.
The publisher creates a zmq context and then opens a socket with the PUB
communication pattern. It then binds to a port, as the TCP transport protocol is used. For the synchronization a separate socket is opened with the REP communication pattern that binds in a different path. Unless a synchronization request in msg = syncservice.recv() is received, the program cannot continue. It then does some rudimentary work and starts over again. Here's the code for the publisher:
import pickle, zmq, random, string
# Wait for 1 subscriber
SUBSCRIBERS_EXPECTED = 1
def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
while True:
try:
arguments =
data =
context = zmq.Context()
# Socket to talk to clients
publisher = context.socket(zmq.PUB)
# set SNDHWM, in case of slow subscribers
publisher.sndhwm = 1100000
publisher.bind('tcp://*:5561')
# Socket to receive signals
syncservice = context.socket(zmq.REP)
syncservice.bind('tcp://*:5562')
# Get synchronization from subscribers
subscribers = 0
while subscribers < SUBSCRIBERS_EXPECTED:
# wait for synchronization request
msg = syncservice.recv()
# send synchronization reply
syncservice.send(b'')
subscribers += 1
for n in range(1000):
for i in range(random.randrange(1, 6)):
arguments[i] = randomword(random.randrange(2, 10))
data['func_name_' + str(n)] = randomword(8)
data['arguments_' + str(n)] = arguments
data_string = pickle.dumps(data)
publisher.send(data_string)
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
The subscriber functions pretty much the same way the publisher does, albeit
from a subscriber perspective. Here's the code for the subscriber:
import pickle, zmq, pprint, time
context = zmq.Context()
# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5561')
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(1)
# Synchronize with publisher
syncclient = context.socket(zmq.REQ)
syncclient.connect('tcp://localhost:5562')
# Initialize poll set
poller = zmq.Poller()
poller.register(syncclient, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
# send a synchronization request
syncclient.send(b'')
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
# wait for synchronization reply
if syncclient in socks:
syncclient.recv()
print('Sync')
if subscriber in socks:
msg = subscriber.recv()
data = pickle.loads(msg)
pprint.pprint(data)
syncclient.send(b'')
The desired result would be for the publisher to publish endlessly, while the
subscriber continually receives and prints everything. If I remove the
synchronization part, everything runs as expected. If I keep the synchronization
part the subscriber hangs after a number of transmissions. The interesting thing
is that if I send a keyboard interrupt (Ctrl-C) and then restart the subscriber,
it will receive a couple of transmissions again and hang again and so on and so
forth.
I have tried different high-watermark settings, but it didn't make any difference. I have tried closing the sockets and terminating the context after every loop. I've tested if the overhead from printing or pickling (serializing) was too much, but it wasn't it either. I have also modified the suicidal snail example to work in this case, but the subscriber didn't die. What am I missing? (Python 3 is used for every example)
python zeromq pyzmq
python zeromq pyzmq
edited Nov 10 at 21:44
Benyamin Jafari
2,58531833
2,58531833
asked Nov 2 at 11:30
Pxcel
156
156
add a comment |
add a comment |
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53117782%2fzeromq-synchronized-pub-sub-subscriber-hangs-with-synchronization-but-works-nor%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53117782%2fzeromq-synchronized-pub-sub-subscriber-hangs-with-synchronization-but-works-nor%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown