ZeroMQ synchronized PUB-SUB: Subscriber hangs with synchronization but works normally otherwise









up vote
1
down vote

favorite
1












I'm using ZeroMQ to establish a publisher/subscriber communication model.

The publisher creates a zmq context and then opens a socket with the PUB

communication pattern. It then binds to a port, as the TCP transport protocol is used. For the synchronization a separate socket is opened with the REP communication pattern that binds in a different path. Unless a synchronization request in msg = syncservice.recv() is received, the program cannot continue. It then does some rudimentary work and starts over again. Here's the code for the publisher:



import pickle, zmq, random, string

# Wait for 1 subscriber
SUBSCRIBERS_EXPECTED = 1

def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))

while True:
try:
arguments =
data =
context = zmq.Context()

# Socket to talk to clients
publisher = context.socket(zmq.PUB)
# set SNDHWM, in case of slow subscribers
publisher.sndhwm = 1100000
publisher.bind('tcp://*:5561')

# Socket to receive signals
syncservice = context.socket(zmq.REP)
syncservice.bind('tcp://*:5562')

# Get synchronization from subscribers
subscribers = 0
while subscribers < SUBSCRIBERS_EXPECTED:
# wait for synchronization request
msg = syncservice.recv()
# send synchronization reply
syncservice.send(b'')
subscribers += 1

for n in range(1000):
for i in range(random.randrange(1, 6)):
arguments[i] = randomword(random.randrange(2, 10))

data['func_name_' + str(n)] = randomword(8)
data['arguments_' + str(n)] = arguments

data_string = pickle.dumps(data)
publisher.send(data_string)

except KeyboardInterrupt:
print("Interrupt received, stopping...")
break


The subscriber functions pretty much the same way the publisher does, albeit
from a subscriber perspective. Here's the code for the subscriber:



import pickle, zmq, pprint, time

context = zmq.Context()

# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5561')
subscriber.setsockopt(zmq.SUBSCRIBE, b'')

time.sleep(1)

# Synchronize with publisher
syncclient = context.socket(zmq.REQ)
syncclient.connect('tcp://localhost:5562')

# Initialize poll set
poller = zmq.Poller()
poller.register(syncclient, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

# send a synchronization request
syncclient.send(b'')

while True:
try:
socks = dict(poller.poll())

except KeyboardInterrupt:
print("Interrupt received, stopping...")
break

# wait for synchronization reply
if syncclient in socks:
syncclient.recv()
print('Sync')

if subscriber in socks:
msg = subscriber.recv()
data = pickle.loads(msg)
pprint.pprint(data)
syncclient.send(b'')


The desired result would be for the publisher to publish endlessly, while the
subscriber continually receives and prints everything. If I remove the
synchronization part, everything runs as expected. If I keep the synchronization
part the subscriber hangs after a number of transmissions. The interesting thing
is that if I send a keyboard interrupt (Ctrl-C) and then restart the subscriber,
it will receive a couple of transmissions again and hang again and so on and so
forth.



I have tried different high-watermark settings, but it didn't make any difference. I have tried closing the sockets and terminating the context after every loop. I've tested if the overhead from printing or pickling (serializing) was too much, but it wasn't it either. I have also modified the suicidal snail example to work in this case, but the subscriber didn't die. What am I missing? (Python 3 is used for every example)










share|improve this question



























    up vote
    1
    down vote

    favorite
    1












    I'm using ZeroMQ to establish a publisher/subscriber communication model.

    The publisher creates a zmq context and then opens a socket with the PUB

    communication pattern. It then binds to a port, as the TCP transport protocol is used. For the synchronization a separate socket is opened with the REP communication pattern that binds in a different path. Unless a synchronization request in msg = syncservice.recv() is received, the program cannot continue. It then does some rudimentary work and starts over again. Here's the code for the publisher:



    import pickle, zmq, random, string

    # Wait for 1 subscriber
    SUBSCRIBERS_EXPECTED = 1

    def randomword(length):
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(length))

    while True:
    try:
    arguments =
    data =
    context = zmq.Context()

    # Socket to talk to clients
    publisher = context.socket(zmq.PUB)
    # set SNDHWM, in case of slow subscribers
    publisher.sndhwm = 1100000
    publisher.bind('tcp://*:5561')

    # Socket to receive signals
    syncservice = context.socket(zmq.REP)
    syncservice.bind('tcp://*:5562')

    # Get synchronization from subscribers
    subscribers = 0
    while subscribers < SUBSCRIBERS_EXPECTED:
    # wait for synchronization request
    msg = syncservice.recv()
    # send synchronization reply
    syncservice.send(b'')
    subscribers += 1

    for n in range(1000):
    for i in range(random.randrange(1, 6)):
    arguments[i] = randomword(random.randrange(2, 10))

    data['func_name_' + str(n)] = randomword(8)
    data['arguments_' + str(n)] = arguments

    data_string = pickle.dumps(data)
    publisher.send(data_string)

    except KeyboardInterrupt:
    print("Interrupt received, stopping...")
    break


    The subscriber functions pretty much the same way the publisher does, albeit
    from a subscriber perspective. Here's the code for the subscriber:



    import pickle, zmq, pprint, time

    context = zmq.Context()

    # Connect the subscriber socket
    subscriber = context.socket(zmq.SUB)
    subscriber.connect('tcp://localhost:5561')
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')

    time.sleep(1)

    # Synchronize with publisher
    syncclient = context.socket(zmq.REQ)
    syncclient.connect('tcp://localhost:5562')

    # Initialize poll set
    poller = zmq.Poller()
    poller.register(syncclient, zmq.POLLIN)
    poller.register(subscriber, zmq.POLLIN)

    # send a synchronization request
    syncclient.send(b'')

    while True:
    try:
    socks = dict(poller.poll())

    except KeyboardInterrupt:
    print("Interrupt received, stopping...")
    break

    # wait for synchronization reply
    if syncclient in socks:
    syncclient.recv()
    print('Sync')

    if subscriber in socks:
    msg = subscriber.recv()
    data = pickle.loads(msg)
    pprint.pprint(data)
    syncclient.send(b'')


    The desired result would be for the publisher to publish endlessly, while the
    subscriber continually receives and prints everything. If I remove the
    synchronization part, everything runs as expected. If I keep the synchronization
    part the subscriber hangs after a number of transmissions. The interesting thing
    is that if I send a keyboard interrupt (Ctrl-C) and then restart the subscriber,
    it will receive a couple of transmissions again and hang again and so on and so
    forth.



    I have tried different high-watermark settings, but it didn't make any difference. I have tried closing the sockets and terminating the context after every loop. I've tested if the overhead from printing or pickling (serializing) was too much, but it wasn't it either. I have also modified the suicidal snail example to work in this case, but the subscriber didn't die. What am I missing? (Python 3 is used for every example)










    share|improve this question

























      up vote
      1
      down vote

      favorite
      1









      up vote
      1
      down vote

      favorite
      1






      1





      I'm using ZeroMQ to establish a publisher/subscriber communication model.

      The publisher creates a zmq context and then opens a socket with the PUB

      communication pattern. It then binds to a port, as the TCP transport protocol is used. For the synchronization a separate socket is opened with the REP communication pattern that binds in a different path. Unless a synchronization request in msg = syncservice.recv() is received, the program cannot continue. It then does some rudimentary work and starts over again. Here's the code for the publisher:



      import pickle, zmq, random, string

      # Wait for 1 subscriber
      SUBSCRIBERS_EXPECTED = 1

      def randomword(length):
      letters = string.ascii_lowercase
      return ''.join(random.choice(letters) for i in range(length))

      while True:
      try:
      arguments =
      data =
      context = zmq.Context()

      # Socket to talk to clients
      publisher = context.socket(zmq.PUB)
      # set SNDHWM, in case of slow subscribers
      publisher.sndhwm = 1100000
      publisher.bind('tcp://*:5561')

      # Socket to receive signals
      syncservice = context.socket(zmq.REP)
      syncservice.bind('tcp://*:5562')

      # Get synchronization from subscribers
      subscribers = 0
      while subscribers < SUBSCRIBERS_EXPECTED:
      # wait for synchronization request
      msg = syncservice.recv()
      # send synchronization reply
      syncservice.send(b'')
      subscribers += 1

      for n in range(1000):
      for i in range(random.randrange(1, 6)):
      arguments[i] = randomword(random.randrange(2, 10))

      data['func_name_' + str(n)] = randomword(8)
      data['arguments_' + str(n)] = arguments

      data_string = pickle.dumps(data)
      publisher.send(data_string)

      except KeyboardInterrupt:
      print("Interrupt received, stopping...")
      break


      The subscriber functions pretty much the same way the publisher does, albeit
      from a subscriber perspective. Here's the code for the subscriber:



      import pickle, zmq, pprint, time

      context = zmq.Context()

      # Connect the subscriber socket
      subscriber = context.socket(zmq.SUB)
      subscriber.connect('tcp://localhost:5561')
      subscriber.setsockopt(zmq.SUBSCRIBE, b'')

      time.sleep(1)

      # Synchronize with publisher
      syncclient = context.socket(zmq.REQ)
      syncclient.connect('tcp://localhost:5562')

      # Initialize poll set
      poller = zmq.Poller()
      poller.register(syncclient, zmq.POLLIN)
      poller.register(subscriber, zmq.POLLIN)

      # send a synchronization request
      syncclient.send(b'')

      while True:
      try:
      socks = dict(poller.poll())

      except KeyboardInterrupt:
      print("Interrupt received, stopping...")
      break

      # wait for synchronization reply
      if syncclient in socks:
      syncclient.recv()
      print('Sync')

      if subscriber in socks:
      msg = subscriber.recv()
      data = pickle.loads(msg)
      pprint.pprint(data)
      syncclient.send(b'')


      The desired result would be for the publisher to publish endlessly, while the
      subscriber continually receives and prints everything. If I remove the
      synchronization part, everything runs as expected. If I keep the synchronization
      part the subscriber hangs after a number of transmissions. The interesting thing
      is that if I send a keyboard interrupt (Ctrl-C) and then restart the subscriber,
      it will receive a couple of transmissions again and hang again and so on and so
      forth.



      I have tried different high-watermark settings, but it didn't make any difference. I have tried closing the sockets and terminating the context after every loop. I've tested if the overhead from printing or pickling (serializing) was too much, but it wasn't it either. I have also modified the suicidal snail example to work in this case, but the subscriber didn't die. What am I missing? (Python 3 is used for every example)










      share|improve this question















      I'm using ZeroMQ to establish a publisher/subscriber communication model.

      The publisher creates a zmq context and then opens a socket with the PUB

      communication pattern. It then binds to a port, as the TCP transport protocol is used. For the synchronization a separate socket is opened with the REP communication pattern that binds in a different path. Unless a synchronization request in msg = syncservice.recv() is received, the program cannot continue. It then does some rudimentary work and starts over again. Here's the code for the publisher:



      import pickle, zmq, random, string

      # Wait for 1 subscriber
      SUBSCRIBERS_EXPECTED = 1

      def randomword(length):
      letters = string.ascii_lowercase
      return ''.join(random.choice(letters) for i in range(length))

      while True:
      try:
      arguments =
      data =
      context = zmq.Context()

      # Socket to talk to clients
      publisher = context.socket(zmq.PUB)
      # set SNDHWM, in case of slow subscribers
      publisher.sndhwm = 1100000
      publisher.bind('tcp://*:5561')

      # Socket to receive signals
      syncservice = context.socket(zmq.REP)
      syncservice.bind('tcp://*:5562')

      # Get synchronization from subscribers
      subscribers = 0
      while subscribers < SUBSCRIBERS_EXPECTED:
      # wait for synchronization request
      msg = syncservice.recv()
      # send synchronization reply
      syncservice.send(b'')
      subscribers += 1

      for n in range(1000):
      for i in range(random.randrange(1, 6)):
      arguments[i] = randomword(random.randrange(2, 10))

      data['func_name_' + str(n)] = randomword(8)
      data['arguments_' + str(n)] = arguments

      data_string = pickle.dumps(data)
      publisher.send(data_string)

      except KeyboardInterrupt:
      print("Interrupt received, stopping...")
      break


      The subscriber functions pretty much the same way the publisher does, albeit
      from a subscriber perspective. Here's the code for the subscriber:



      import pickle, zmq, pprint, time

      context = zmq.Context()

      # Connect the subscriber socket
      subscriber = context.socket(zmq.SUB)
      subscriber.connect('tcp://localhost:5561')
      subscriber.setsockopt(zmq.SUBSCRIBE, b'')

      time.sleep(1)

      # Synchronize with publisher
      syncclient = context.socket(zmq.REQ)
      syncclient.connect('tcp://localhost:5562')

      # Initialize poll set
      poller = zmq.Poller()
      poller.register(syncclient, zmq.POLLIN)
      poller.register(subscriber, zmq.POLLIN)

      # send a synchronization request
      syncclient.send(b'')

      while True:
      try:
      socks = dict(poller.poll())

      except KeyboardInterrupt:
      print("Interrupt received, stopping...")
      break

      # wait for synchronization reply
      if syncclient in socks:
      syncclient.recv()
      print('Sync')

      if subscriber in socks:
      msg = subscriber.recv()
      data = pickle.loads(msg)
      pprint.pprint(data)
      syncclient.send(b'')


      The desired result would be for the publisher to publish endlessly, while the
      subscriber continually receives and prints everything. If I remove the
      synchronization part, everything runs as expected. If I keep the synchronization
      part the subscriber hangs after a number of transmissions. The interesting thing
      is that if I send a keyboard interrupt (Ctrl-C) and then restart the subscriber,
      it will receive a couple of transmissions again and hang again and so on and so
      forth.



      I have tried different high-watermark settings, but it didn't make any difference. I have tried closing the sockets and terminating the context after every loop. I've tested if the overhead from printing or pickling (serializing) was too much, but it wasn't it either. I have also modified the suicidal snail example to work in this case, but the subscriber didn't die. What am I missing? (Python 3 is used for every example)







      python zeromq pyzmq






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 10 at 21:44









      Benyamin Jafari

      2,58531833




      2,58531833










      asked Nov 2 at 11:30









      Pxcel

      156




      156



























          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53117782%2fzeromq-synchronized-pub-sub-subscriber-hangs-with-synchronization-but-works-nor%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown






























          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53117782%2fzeromq-synchronized-pub-sub-subscriber-hangs-with-synchronization-but-works-nor%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Kleinkühnau

          Makov (Slowakei)

          Deutsches Schauspielhaus