• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python zmq.zmq_version函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zmq.zmq_version函数的典型用法代码示例。如果您正苦于以下问题:Python zmq_version函数的具体用法?Python zmq_version怎么用?Python zmq_version使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了zmq_version函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_single_socket_forwarder_bind

 def test_single_socket_forwarder_bind(self):
     if zmq.zmq_version() in ('4.1.1', '4.0.6'):
         raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
     dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
     # select random port:
     binder = self.context.socket(zmq.REQ)
     port = binder.bind_to_random_port('tcp://127.0.0.1')
     binder.close()
     time.sleep(0.1)
     req = self.context.socket(zmq.REQ)
     req.connect('tcp://127.0.0.1:%i'%port)
     dev.bind_in('tcp://127.0.0.1:%i'%port)
     dev.start()
     time.sleep(.25)
     msg = b'hello'
     req.send(msg)
     self.assertEqual(msg, self.recv(req))
     del dev
     req.close()
     dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
     # select random port:
     binder = self.context.socket(zmq.REQ)
     port = binder.bind_to_random_port('tcp://127.0.0.1')
     binder.close()
     time.sleep(0.1)
     req = self.context.socket(zmq.REQ)
     req.connect('tcp://127.0.0.1:%i'%port)
     dev.bind_in('tcp://127.0.0.1:%i'%port)
     dev.start()
     time.sleep(.25)
     msg = b'hello again'
     req.send(msg)
     self.assertEqual(msg, self.recv(req))
     del dev
     req.close()
开发者ID:326029212,项目名称:pyzmq,代码行数:35,代码来源:test_device.py


示例2: test_unicode_sockopts

 def test_unicode_sockopts(self):
     """test setting/getting sockopts with unicode strings"""
     topic = "tést"
     if str is not unicode:
         topic = topic.decode('utf8')
     p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
     self.assertEquals(s.send_unicode, s.send_unicode)
     self.assertEquals(p.recv_unicode, p.recv_unicode)
     self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
     if zmq.zmq_version() < '4.0.0':
         self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
         s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
     self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
     s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
     self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
     self.assertRaises(TypeError, s.getsockopt_unicode, zmq.SUBSCRIBE)
     if zmq.zmq_version() >= '4.0.0':
         # skip the rest on 4.0, because IDENTITY was removed
         return
     st = s.getsockopt(zmq.IDENTITY)
     self.assertEquals(st.decode('utf16'), s.getsockopt_unicode(zmq.IDENTITY, 'utf16'))
     time.sleep(0.1) # wait for connection/subscription
     p.send_unicode(topic,zmq.SNDMORE)
     p.send_unicode(topic*2, encoding='latin-1')
     self.assertEquals(topic, s.recv_unicode())
     self.assertEquals(topic*2, s.recv_unicode(encoding='latin-1'))
开发者ID:hoov,项目名称:pyzmq,代码行数:26,代码来源:test_socket.py


示例3: run

    def run(self):
        """Run the test suite, with nose, or unittest if nose is unavailable"""
        # crude check for inplace build:
        try:
            import zmq
        except ImportError:
            print_exc()
            fatal(
                "\n       ".join(
                    [
                        "Could not import zmq!",
                        "You must build pyzmq with 'python setup.py build_ext --inplace' for 'python setup.py test' to work.",
                        "If you did build pyzmq in-place, then this is a real error.",
                    ]
                )
            )
            sys.exit(1)

        info("Testing pyzmq-%s with libzmq-%s" % (zmq.pyzmq_version(), zmq.zmq_version()))

        if nose is None:
            warn("nose unavailable, falling back on unittest. Skipped tests will appear as ERRORs.")
            return self.run_unittest()
        else:
            return self.run_nose()
开发者ID:underrun,项目名称:pyzmq,代码行数:25,代码来源:setup.py


示例4: _printheader

    def _printheader(self, log):
        """Prints header to log file; inspired by that in GPAW."""
        log(logo)
        log('Amp: Atomistic Machine-learning Package')
        log('Developed by Andrew Peterson, Alireza Khorshidi, and others,')
        log('Brown University.')
        log(' PI Website: http://brown.edu/go/catalyst')
        log(' Official repository: http://bitbucket.org/andrewpeterson/amp')
        log(' Official documentation: http://amp.readthedocs.org/')
        log(' Citation:')
        log('  Khorshidi & Peterson, Computer Physics Communications')
        log('  doi:10.1016/j.cpc.2016.05.010 (2016)')
        log('=' * 70)
        log('User: %s' % getuser())
        log('Hostname: %s' % gethostname())
        log('Date: %s' % now(with_utc=True))
        uname = platform.uname()
        log('Architecture: %s' % uname[4])
        log('PID: %s' % os.getpid())
        log('Amp version: %s' % 'NOT NUMBERED YET.')  # FIXME/ap. Look at GPAW
        ampdirectory = os.path.dirname(os.path.abspath(__file__))
        log('Amp directory: %s' % ampdirectory)
        commithash, commitdate = get_git_commit(ampdirectory)
        log(' Last commit: %s' % commithash)
        log(' Last commit date: %s' % commitdate)
        log('Python: v{0}.{1}.{2}: %s'.format(*sys.version_info[:3]) %
            sys.executable)
        log('ASE v%s: %s' % (aseversion, os.path.dirname(ase.__file__)))
        log('NumPy v%s: %s' %
            (np.version.version, os.path.dirname(np.__file__)))
        # SciPy is not a strict dependency.
        try:
            import scipy
            log('SciPy v%s: %s' %
                (scipy.version.version, os.path.dirname(scipy.__file__)))
        except ImportError:
            log('SciPy: not available')
        # ZMQ an pxssh are only necessary for parallel calculations.
        try:
            import zmq
            log('ZMQ/PyZMQ v%s/v%s: %s' %
                (zmq.zmq_version(), zmq.pyzmq_version(),
                 os.path.dirname(zmq.__file__)))
        except ImportError:
            log('ZMQ: not available')
        try:
            import pxssh
            log('pxssh: %s' % os.path.dirname(pxssh.__file__))
        except ImportError:
            log('pxssh: Not available from pxssh.')
            try:
                from pexpect import pxssh
            except ImportError:
                log('pxssh: Not available from pexpect.')
            else:
                import pexpect
                log('pxssh (via pexpect v%s): %s' %
                    (pexpect.__version__, pxssh.__file__))

        log('=' * 70)
开发者ID:AkshayTharval,项目名称:Atomistic-Machine-Learning-Potentials,代码行数:60,代码来源:__init__.py


示例5: enable_monitor

    def enable_monitor(self, events=None):

        # The standard approach of binding and then connecting does not
        # work in this specific case. The event loop does not properly
        # detect messages on the inproc transport which means that event
        # messages get missed.
        # pyzmq's 'get_monitor_socket' method can't be used because this
        # performs the actions in the wrong order for use with an event
        # loop.
        # For more information on this issue see:
        # http://lists.zeromq.org/pipermail/zeromq-dev/2015-July/029181.html

        if (zmq.zmq_version_info() < (4,) or
                zmq.pyzmq_version_info() < (14, 4,)):
            raise NotImplementedError(
                "Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
                "have libzmq:{}, pyzmq:{}".format(
                    zmq.zmq_version(), zmq.pyzmq_version()))

        if self._monitor is None:
            addr = "inproc://monitor.s-{}".format(self._zmq_sock.FD)
            events = events or zmq.EVENT_ALL
            _, self._monitor = yield from create_zmq_connection(
                lambda: _ZmqEventProtocol(self._loop, self._protocol),
                zmq.PAIR, connect=addr, loop=self._loop)
            # bind must come after connect
            self._zmq_sock.monitor(addr, events)
            yield from self._monitor.wait_ready
开发者ID:TadLeonard,项目名称:aiozmq,代码行数:28,代码来源:core.py


示例6: test_router_router

 def test_router_router(self):
     """test router-router MQ devices"""
     if zmq.zmq_version() >= '4.0.0':
         raise SkipTest("Only for libzmq < 4")
     dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, asbytes('in'), asbytes('out'))
     dev.setsockopt_in(zmq.LINGER, 0)
     dev.setsockopt_out(zmq.LINGER, 0)
     dev.setsockopt_mon(zmq.LINGER, 0)
     
     binder = self.context.socket(zmq.DEALER)
     porta = binder.bind_to_random_port('tcp://127.0.0.1')
     portb = binder.bind_to_random_port('tcp://127.0.0.1')
     binder.close()
     time.sleep(0.1)
     a = self.context.socket(zmq.DEALER)
     a.identity = asbytes('a')
     b = self.context.socket(zmq.DEALER)
     b.identity = asbytes('b')
     
     a.connect('tcp://127.0.0.1:%i'%porta)
     dev.bind_in('tcp://127.0.0.1:%i'%porta)
     b.connect('tcp://127.0.0.1:%i'%portb)
     dev.bind_out('tcp://127.0.0.1:%i'%portb)
     dev.start()
     time.sleep(0.2)
     msg = [ asbytes(m) for m in ('hello', 'there')]
     a.send_multipart([asbytes('b')]+msg)
     bmsg = self.recv_multipart(b)
     self.assertEquals(bmsg, [asbytes('a')]+msg)
     b.send_multipart(bmsg)
     amsg = self.recv_multipart(a)
     self.assertEquals(amsg, [asbytes('b')]+msg)
开发者ID:actank,项目名称:zmon,代码行数:32,代码来源:test_monqueue.py


示例7: __init__

 def __init__(self):
   Tool.__init__(self)
   
   # * Initialize ZMQ context and open subscriber, publisher sockets
   self.logger.debug("ZMQ version: {}, PyZMQ version: {}".format(zmq.zmq_version(), zmq.pyzmq_version()))
   # ** Context
   self.zmqContext = zmq.Context()
   # ** Subscriber
   self.subSocket = self.zmqContext.socket(zmq.SUB)
   self.subSocket.connect(self.sub_address)
   time.sleep(0.005)  # mandatory sleep for ZMQ backend
   self.logger.debug("[sub] Connected to {}".format(self.sub_address))
   # ** Subscriber topics for input messages
   self.subSocket.setsockopt(zmq.SUBSCRIBE, self.sub_topic)
   self.subSocket.setsockopt(zmq.LINGER, self.sub_socket_linger)
   self.logger.debug("[sub]Subscribed to topic \"{}\"".format(self.sub_topic))
   time.sleep(0.005)  # mandatory sleep for ZMQ backend
   # ** Publisher
   self.pubSocket = self.zmqContext.socket(zmq.PUB)
   self.pubSocket.bind(self.pub_address)
   time.sleep(0.005)  # mandatory sleep for ZMQ backend
   self.logger.debug("[pub] Bound to {}".format(self.pub_address))
   
   # * Initialize other members
   self.valid = False
   self.buttons = [0, 0]  # primary, secondary
   self.transform = hm.translation(hm.identity(), self.position_offset)
   #self.position = self.position_offset
   self.loop = True  # TODO ensure this is properly shared across threads
   
   # * Start sensing loop
   self.senseThread = Thread(target=self.senseLoop)
   self.senseThread.daemon = True  # to prevent indefinite wait on recv()
   self.senseThread.start()
   time.sleep(0.005)  # sleep to allow child thread to run
开发者ID:Pallavistar,项目名称:pyTANG,代码行数:35,代码来源:HapticPointer.py


示例8: test_prefix

 def test_prefix(self):
     if zmq.zmq_version() < '3.0.0':
         raise SkipTest("Only applies to libzmq >= 3.0")
     xrep, xreq = self.create_bound_pair(zmq.XREP, zmq.XREQ)
     msg = [ asbytes(p) for p in 'hi there'.split() ]
     xreq.send_multipart(msg)
     recvd = xrep.recv_multipart()
     self.assertTrue(isinstance(recvd, tuple))
     self.assertEquals(len(recvd), 2)
     prefix, real = recvd
     self.assertTrue(isinstance(prefix, list))
     self.assertEquals(len(prefix), 1)
     self.assertEquals(real, msg)
     xrep.send_multipart(real, prefix=prefix)
     echo = xreq.recv_multipart()
     self.assertTrue(isinstance(echo, list))
     self.assertEquals(echo, real)
     extra = [asbytes('pre')]
     xrep.send_multipart(msg, prefix=prefix+extra)
     recvd = xreq.recv_multipart()
     self.assertTrue(isinstance(recvd, tuple))
     self.assertEquals(len(recvd), 2)
     prefix, real = recvd
     self.assertTrue(isinstance(prefix, list))
     self.assertEquals(len(prefix), 1)
     self.assertEquals(prefix, extra)
     self.assertEquals(real, msg)
开发者ID:thurday,项目名称:pyzmq,代码行数:27,代码来源:test_socket.py


示例9: test_labels

 def test_labels(self):
     """test device support for SNDLABEL"""
     if zmq.zmq_version() < '3.0.0':
         raise SkipTest("Only for libzmq 3")
     dev = devices.ThreadDevice(zmq.QUEUE, zmq.XREP, -1)
     # select random port:
     binder = self.context.socket(zmq.XREQ)
     port = binder.bind_to_random_port('tcp://127.0.0.1')
     binder.close()
     time.sleep(0.1)
     req = self.context.socket(zmq.REQ)
     req.connect('tcp://127.0.0.1:%i'%port)
     dev.bind_in('tcp://127.0.0.1:%i'%port)
     dev.start()
     time.sleep(.25)
     msg = asbytes('hello')
     req.send(msg, zmq.SNDLABEL)
     req.send(msg, zmq.SNDMORE)
     req.send(msg)
     
     self.assertEquals(msg, self.recv(req))
     self.assertTrue(req.rcvlabel)
     self.assertEquals(msg, self.recv(req))
     self.assertTrue(req.rcvmore)
     self.assertEquals(msg, self.recv(req))
     del dev
     req.close()
开发者ID:actank,项目名称:zmon,代码行数:27,代码来源:test_device.py


示例10: test_router_router

    def test_router_router(self):
        """test router-router MQ devices"""
        if zmq.zmq_version() >= "4.0.0":
            raise SkipTest("Only for libzmq < 4")
        dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, "in", "out")
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)

        binder = self.context.socket(zmq.DEALER)
        porta = binder.bind_to_random_port("tcp://127.0.0.1")
        portb = binder.bind_to_random_port("tcp://127.0.0.1")
        binder.close()
        time.sleep(0.1)
        a = self.context.socket(zmq.DEALER)
        a.identity = asbytes("a")
        b = self.context.socket(zmq.DEALER)
        b.identity = asbytes("b")

        a.connect("tcp://127.0.0.1:%i" % porta)
        dev.bind_in("tcp://127.0.0.1:%i" % porta)
        b.connect("tcp://127.0.0.1:%i" % portb)
        dev.bind_out("tcp://127.0.0.1:%i" % portb)
        dev.start()
        time.sleep(0.2)
        msg = [asbytes(m) for m in ("hello", "there")]
        a.send_multipart(["b"] + msg)
        bmsg = self.recv_multipart(b)
        self.assertEquals(bmsg, ["a"] + msg)
        b.send_multipart(bmsg)
        amsg = self.recv_multipart(a)
        self.assertEquals(amsg, ["b"] + msg)
开发者ID:thurday,项目名称:pyzmq,代码行数:32,代码来源:test_monqueue.py


示例11: check_for_zmq

def check_for_zmq(minimum_version, module='IPython.zmq'):
    min_vlist = [int(n) for n in minimum_version.split('.')]

    try:
        import zmq
    except ImportError:
        raise ImportError("%s requires pyzmq >= %s"%(module, minimum_version))

    pyzmq_version = zmq.__version__
    vlist = [int(n) for n in re.findall(r'\d+', pyzmq_version)]

    if 'dev' not in pyzmq_version and vlist < min_vlist:
        raise ImportError("%s requires pyzmq >= %s, but you have %s"%(
                        module, minimum_version, pyzmq_version))

    # fix missing DEALER/ROUTER aliases in pyzmq < 2.1.9
    if not hasattr(zmq, 'DEALER'):
        zmq.DEALER = zmq.XREQ
    if not hasattr(zmq, 'ROUTER'):
        zmq.ROUTER = zmq.XREP

    if zmq.zmq_version() >= '4.0.0':
        warnings.warn("""libzmq 4 detected.
        It is unlikely that IPython's zmq code will work properly.
        Please install libzmq stable, which is 2.1.x or 2.2.x""",
        RuntimeWarning)
开发者ID:adgaudio,项目名称:ipython,代码行数:26,代码来源:__init__.py


示例12: zmqversion

def zmqversion():
    '''
    Return the zeromq version
    '''
    # Provides:
    #   zmqversion
    import zmq
    return {'zmqversion': zmq.zmq_version()}
开发者ID:Anbcorp,项目名称:salt,代码行数:8,代码来源:core.py


示例13: __init__

 def __init__(self, min_version, msg='Feature'):
     global _zmq_version
     if _zmq_version is None:
         from zmq import zmq_version
         _zmq_version = zmq_version()
     self.msg = msg
     self.min_version = min_version
     self.version = _zmq_version
开发者ID:326029212,项目名称:pyzmq,代码行数:8,代码来源:error.py


示例14: print_info

def print_info():
    log.info("Python version: %s, %s",
             '.'.join((str(e) for e in sys.version_info)),
             sys.executable)

    log.info("zeromq version: %s", zmq.zmq_version())
    log.info("pyzmq version:  %s", zmq.pyzmq_version())
    log.info("track version:  %s", str(track_base.version_info))
开发者ID:onlyone0001,项目名称:track,代码行数:8,代码来源:track_server.py


示例15: log_versions

def log_versions(logger):
    logger.info(
        'Versions:\n'
        '  PyZMQ: %s\n'
        '  libzmq: %s\n'
        '  Tornado: %s\n'
        '  SockJS-Tornado: Not Available\n'  # TODO find the verison
        '  SockJS-ZMQProxy: %s\n',
        zmq.pyzmq_version(),
        zmq.zmq_version(),
        tornado_version,
        version
    )
开发者ID:njoyce,项目名称:sockjs-zmqproxy,代码行数:13,代码来源:zmqproxy.py


示例16: test_bad_send_recv

    def test_bad_send_recv(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)

        if zmq.zmq_version() != "2.1.8":
            # this doesn't work on 2.1.8
            for copy in (True, False):
                self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
                self.assertRaisesErrno(zmq.EFSM, s2.send, b"asdf", copy=copy)

        # I have to have this or we die on an Abort trap.
        msg1 = b"asdf"
        msg2 = self.ping_pong(s1, s2, msg1)
        self.assertEqual(msg1, msg2)
开发者ID:hitej,项目名称:meta-core,代码行数:13,代码来源:test_reqrep.py


示例17: test_router_dealer

    def test_router_dealer(self):
        if zmq.zmq_version() >= '4.0.0':
            raise SkipTest("ROUTER/DEALER change in 4.0")
        router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)

        msg1 = asbytes('message1')
        dealer.send(msg1)
        ident = self.recv(router)
        more = router.rcvmore
        self.assertEquals(more, True)
        msg2 = self.recv(router)
        self.assertEquals(msg1, msg2)
        more = router.rcvmore
        self.assertEquals(more, False)
开发者ID:actank,项目名称:zmon,代码行数:14,代码来源:test_multipart.py


示例18: test_router_dealer

    def test_router_dealer(self):
        if zmq.zmq_version() >= '3.0.0':
            raise SkipTest("Known bug in libzmq 3.0.0, see https://zeromq.jira.com/browse/LIBZMQ-232")
        router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)

        msg1 = asbytes('message1')
        dealer.send(msg1)
        ident = router.recv()
        more = router.rcvmore()
        self.assertEquals(more, True)
        msg2 = router.recv()
        self.assertEquals(msg1, msg2)
        more = router.rcvmore()
        self.assertEquals(more, False)
开发者ID:hoov,项目名称:pyzmq,代码行数:14,代码来源:test_multipart.py


示例19: test_int_sockopts

 def test_int_sockopts(self):
     "test non-uint64 sockopts"
     v = zmq.zmq_version()
     if not v >= '2.1':
         raise SkipTest("only on libzmq >= 2.1")
     elif v < '3.0':
         hwm = zmq.HWM
         default_hwm = 0
     else:
         hwm = zmq.SNDHWM
         default_hwm = 1000
     p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
     p.setsockopt(zmq.LINGER, 0)
     self.assertEquals(p.getsockopt(zmq.LINGER), 0)
     p.setsockopt(zmq.LINGER, -1)
     self.assertEquals(p.getsockopt(zmq.LINGER), -1)
     self.assertEquals(p.getsockopt(hwm), default_hwm)
     p.setsockopt(hwm, 11)
     self.assertEquals(p.getsockopt(hwm), 11)
     # p.setsockopt(zmq.EVENTS, zmq.POLLIN)
     self.assertEquals(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
     self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
     self.assertEquals(p.getsockopt(zmq.TYPE), p.socket_type)
     self.assertEquals(p.getsockopt(zmq.TYPE), zmq.PUB)
     self.assertEquals(s.getsockopt(zmq.TYPE), s.socket_type)
     self.assertEquals(s.getsockopt(zmq.TYPE), zmq.SUB)
     
     # check for overflow / wrong type:
     errors = []
     backref = {}
     constants = zmq.core.constants
     for name in constants.__all__:
         value = getattr(constants, name)
         if isinstance(value, int):
             backref[value] = name
     for opt in zmq.core.constants.int_sockopts+zmq.core.constants.int64_sockopts:
         sopt = backref[opt]
         try:
             n = p.getsockopt(opt)
         except zmq.ZMQError:
             e = sys.exc_info()[1]
             errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
         else:
             if n > 2**31:
                 errors.append("getsockopt(zmq.%s) returned a ridiculous value."
                                 " It is probably the wrong type."%sopt)
     if errors:
         self.fail('\n'.join(errors))
开发者ID:actank,项目名称:zmon,代码行数:48,代码来源:test_socket.py


示例20: test_closing_full

def test_closing_full():
    ctx				= zmq.Context()
    ver				= zmq.zmq_version()[:3]

    global port
    out 			= ctx.socket( zmq.XREQ )
    out.bind( "tcp://*:%d" % port )

    inc				= ctx.socket( zmq.XREP )
    inc.connect( "tcp://localhost:%d" % port )
    port	       	       += 1

    try:
        # Test that ..._multipart and zmq.RCVMORE behave as we think
        out.send_multipart( ['', 'TEST1'] )
        msg			= inc.recv_multipart()
        if ver in ("2.1", "3.1"):
            assert len( msg ) == 3    
        elif ver in ("3.0"):
            assert isinstance( msg, tuple )
            assert len( msg[0] ) == 1
            assert len( msg[1] ) == 2
        else:
            assert False and "Unknown zmq version"
        
        out.send_multipart( ['', 'TEST2'] )
        msg 			= []
        poller			= zmq.Poller()
        poller.register(inc, zmq.POLLIN)
        while poller.poll( timeout=100 ):
            msg.append( inc.recv() )
            if not inc.getsockopt( zmq.RCVMORE ):
                break
        assert len( msg ) == 3
        
        ## Add another stage; see if we can intercept and re-route between
        ## inc/nxt
        #nxt			= ctx.socket( zmq.XREQ )
        #nxt.connect( "tcp://localhost:%d" % port )
        #
        #lst			= ctx.socket( zme.XREP )
        #nxt.bind( "tcp://*:%d" % port )
        #port	       	       += 1
        
    finally:
        out.close()
        inc.close()
        ctx.term()
开发者ID:dyim42,项目名称:pyzmq-demos,代码行数:48,代码来源:assigner_test.py



注:本文中的zmq.zmq_version函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python zmq.zmq_version_info函数代码示例发布时间:2022-05-26
下一篇:
Python zmq.select函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap