• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python deque.BlockingDeque类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中slimta.util.deque.BlockingDeque的典型用法代码示例。如果您正苦于以下问题:Python BlockingDeque类的具体用法?Python BlockingDeque怎么用?Python BlockingDeque使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了BlockingDeque类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_run_multiple

 def test_run_multiple(self):
     result1 = AsyncResult()
     result2 = AsyncResult()
     env1 = Envelope('[email protected]', ['[email protected]'])
     env1.parse(b'From: [email protected]\r\n\r\ntest test\r\n')
     env2 = Envelope('[email protected]', ['[email protected]'])
     env2.parse(b'From: [email protected]\r\n\r\ntest test\r\n')
     queue = BlockingDeque()
     queue.append((result1, env1))
     queue.append((result2, env2))
     self.sock.recv(IsA(int)).AndReturn(b'220 Welcome\r\n')
     self.sock.sendall(b'EHLO test\r\n')
     self.sock.recv(IsA(int)).AndReturn(b'250-Hello\r\n250 PIPELINING\r\n')
     self.sock.sendall(b'MAIL FROM:<[email protected]>\r\nRCPT TO:<[email protected]>\r\nDATA\r\n')
     self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n250 Ok\r\n354 Go ahead\r\n')
     self.sock.sendall(b'From: [email protected]\r\n\r\ntest test\r\n.\r\n')
     self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n')
     self.sock.sendall(b'MAIL FROM:<[email protected]>\r\nRCPT TO:<[email protected]>\r\nDATA\r\n')
     self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n250 Ok\r\n354 Go ahead\r\n')
     self.sock.sendall(b'From: [email protected]\r\n\r\ntest test\r\n.\r\n')
     self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n')
     self.sock.sendall(b'QUIT\r\n')
     self.sock.recv(IsA(int)).AndReturn(b'221 Goodbye\r\n')
     self.sock.close()
     self.mox.ReplayAll()
     client = SmtpRelayClient('addr', queue, socket_creator=self._socket_creator, ehlo_as='test', idle_timeout=0.0)
     client._run()
     self.assertEqual({'[email protected]': Reply('250', 'Ok')}, result1.get_nowait())
     self.assertEqual({'[email protected]': Reply('250', 'Ok')}, result2.get_nowait())
开发者ID:thestick613,项目名称:python-slimta,代码行数:29,代码来源:test_slimta_relay_smtp_client.py


示例2: test_run_multiple

 def test_run_multiple(self):
     result1 = self.mox.CreateMock(AsyncResult)
     result2 = self.mox.CreateMock(AsyncResult)
     env1 = Envelope('[email protected]', ['[email protected]'])
     env1.parse('From: [email protected]\r\n\r\ntest test\r\n')
     env2 = Envelope('[email protected]', ['[email protected]'])
     env2.parse('From: [email protected]\r\n\r\ntest test\r\n')
     queue = BlockingDeque()
     queue.append((result1, env1))
     queue.append((result2, env2))
     self.sock.recv(IsA(int)).AndReturn('220 Welcome\r\n')
     self.sock.sendall('EHLO test\r\n')
     self.sock.recv(IsA(int)).AndReturn('250-Hello\r\n250 PIPELINING\r\n')
     self.sock.sendall('MAIL FROM:<[email protected]>\r\nRCPT TO:<[email protected]>\r\nDATA\r\n')
     self.sock.recv(IsA(int)).AndReturn('250 Ok\r\n250 Ok\r\n354 Go ahead\r\n')
     self.sock.sendall('From: [email protected]\r\n\r\ntest test\r\n.\r\n')
     self.sock.recv(IsA(int)).AndReturn('250 Ok\r\n')
     result1.set([None])
     self.sock.sendall('MAIL FROM:<[email protected]>\r\nRCPT TO:<[email protected]>\r\nDATA\r\n')
     self.sock.recv(IsA(int)).AndReturn('250 Ok\r\n250 Ok\r\n354 Go ahead\r\n')
     self.sock.sendall('From: [email protected]\r\n\r\ntest test\r\n.\r\n')
     self.sock.recv(IsA(int)).AndReturn('250 Ok\r\n')
     result2.set([None])
     self.sock.sendall('QUIT\r\n')
     self.sock.recv(IsA(int)).AndReturn('221 Goodbye\r\n')
     self.sock.close()
     self.mox.ReplayAll()
     client = SmtpRelayClient(None, queue, socket_creator=self._socket_creator, ehlo_as='test', idle_timeout=0.0)
     client._run()
开发者ID:MichaelEvanchik,项目名称:python-slimta,代码行数:29,代码来源:test_slimta_relay_smtp_client.py


示例3: RelayPool

class RelayPool(Relay):
    """Base class that inherits |Relay| to add the ability to create bounded,
    cached pools of outbound clients. It maintains a queue of messages to be
    delivered such that idle clients in the pool pick them up.

    :param pool_size: At most this many simultaneous connections will be open
                      to a destination. If this limit is reached and no
                      connections are idle, new attempts will block.

    """

    def __init__(self, pool_size=None):
        super(RelayPool, self).__init__()
        self.pool = set()
        self.pool_size = pool_size

        #: This attribute holds the queue object for providing delivery
        #: requests to idle clients in the pool.
        self.queue = BlockingDeque()

    def kill(self):
        for client in self.pool:
            client.kill()

    def _remove_client(self, client):
        self.pool.remove(client)
        if len(self.queue) > 0 and not self.pool:
            self._add_client()

    def _add_client(self):
        client = self.add_client()
        client.queue = self.queue
        client.start()
        client.link(self._remove_client)
        self.pool.add(client)

    def _check_idle(self):
        for client in self.pool:
            if client.idle:
                return
        if not self.pool_size or len(self.pool) < self.pool_size:
            self._add_client()

    def add_client(self):
        """Sub-classes must override this method to create and return a new
        :class:`RelayPoolClient` object that will poll for delivery requests.

        :rtype: :class:`RelayPoolClient`

        """
        raise NotImplementedError()

    def attempt(self, envelope, attempts):
        self._check_idle()
        result = AsyncResult()
        self.queue.append((result, envelope))
        return result.get()
开发者ID:slimta,项目名称:python-slimta,代码行数:57,代码来源:pool.py


示例4: test_run_banner_failure

 def test_run_banner_failure(self):
     result = self.mox.CreateMock(AsyncResult)
     env = Envelope('[email protected]', ['[email protected]'])
     env.parse('From: [email protected]\r\n\r\ntest test\r\n')
     queue = BlockingDeque()
     queue.append((result, env))
     self.sock.recv(IsA(int)).AndReturn('520 Not Welcome\r\n')
     result.set_exception(IsA(PermanentRelayError))
     self.sock.sendall('QUIT\r\n')
     self.sock.recv(IsA(int)).AndReturn('221 Goodbye\r\n')
     self.sock.close()
     self.mox.ReplayAll()
     client = SmtpRelayClient(None, queue, socket_creator=self._socket_creator, ehlo_as='test')
     client._run()
开发者ID:MichaelEvanchik,项目名称:python-slimta,代码行数:14,代码来源:test_slimta_relay_smtp_client.py


示例5: test_run_banner_failure

 def test_run_banner_failure(self):
     result = AsyncResult()
     env = Envelope('[email protected]', ['[email protected]'])
     env.parse(b'From: [email protected]\r\n\r\ntest test\r\n')
     queue = BlockingDeque()
     queue.append((result, env))
     self.sock.recv(IsA(int)).AndReturn(b'520 Not Welcome\r\n')
     self.sock.sendall(b'QUIT\r\n')
     self.sock.recv(IsA(int)).AndReturn(b'221 Goodbye\r\n')
     self.sock.close()
     self.mox.ReplayAll()
     client = SmtpRelayClient('addr', queue, socket_creator=self._socket_creator, ehlo_as='test')
     client._run()
     with self.assertRaises(PermanentRelayError):
         result.get_nowait()
开发者ID:thestick613,项目名称:python-slimta,代码行数:15,代码来源:test_slimta_relay_smtp_client.py


示例6: test_run_timeout

 def test_run_timeout(self):
     result = self.mox.CreateMock(AsyncResult)
     env = Envelope('[email protected]', ['[email protected]'])
     env.parse('From: [email protected]\r\n\r\ntest test\r\n')
     queue = BlockingDeque()
     queue.append((result, env))
     self.sock.recv(IsA(int)).AndRaise(Timeout(0.0))
     result.ready().AndReturn(False)
     result.set_exception(IsA(TransientRelayError))
     self.sock.sendall('QUIT\r\n')
     self.sock.recv(IsA(int)).AndReturn('221 Goodbye\r\n')
     self.sock.close()
     self.mox.ReplayAll()
     client = SmtpRelayClient(None, queue, socket_creator=self._socket_creator, ehlo_as='test')
     client._run()
开发者ID:MichaelEvanchik,项目名称:python-slimta,代码行数:15,代码来源:test_slimta_relay_smtp_client.py


示例7: test_run_smtperror

 def test_run_smtperror(self):
     result = self.mox.CreateMock(AsyncResult)
     env = Envelope("[email protected]", ["[email protected]"])
     env.parse("From: [email protected]\r\n\r\ntest test\r\n")
     queue = BlockingDeque()
     queue.append((result, env))
     self.sock.recv(IsA(int)).AndRaise(SmtpError("test error"))
     result.ready().AndReturn(False)
     result.set_exception(IsA(TransientRelayError))
     self.sock.sendall("QUIT\r\n")
     self.sock.recv(IsA(int)).AndReturn("221 Goodbye\r\n")
     self.sock.close()
     self.mox.ReplayAll()
     client = SmtpRelayClient(None, queue, socket_creator=self._socket_creator, ehlo_as="test")
     client._run()
开发者ID:rafaelnovello,项目名称:python-slimta,代码行数:15,代码来源:test_slimta_relay_smtp_client.py


示例8: test_run_random_exception

 def test_run_random_exception(self):
     result = AsyncResult()
     env = Envelope('[email protected]', ['[email protected]'])
     env.parse(b'From: [email protected]\r\n\r\ntest test\r\n')
     queue = BlockingDeque()
     queue.append((result, env))
     self.sock.recv(IsA(int)).AndRaise(ValueError('test error'))
     self.sock.sendall(b'QUIT\r\n')
     self.sock.recv(IsA(int)).AndReturn(b'221 Goodbye\r\n')
     self.sock.close()
     self.mox.ReplayAll()
     client = SmtpRelayClient('addr', queue, socket_creator=self._socket_creator, ehlo_as='test')
     with self.assertRaises(ValueError):
         client._run()
     with self.assertRaises(ValueError):
         result.get_nowait()
开发者ID:thestick613,项目名称:python-slimta,代码行数:16,代码来源:test_slimta_relay_smtp_client.py


示例9: __init__

    def __init__(self, pool_size=None):
        super(RelayPool, self).__init__()
        self.pool = set()
        self.pool_size = pool_size

        #: This attribute holds the queue object for providing delivery
        #: requests to idle clients in the pool.
        self.queue = BlockingDeque()
开发者ID:slimta,项目名称:python-slimta,代码行数:8,代码来源:pool.py


示例10: test_run

 def test_run(self):
     result = self.mox.CreateMock(AsyncResult)
     env = Envelope('[email protected]', ['[email protected]'])
     env.parse('From: [email protected]\r\n\r\ntest test\r\n')
     queue = BlockingDeque()
     queue.append((result, env))
     self.sock.recv(IsA(int)).AndReturn('220 Welcome\r\n')
     self.sock.sendall('EHLO test\r\n')
     self.sock.recv(IsA(int)).AndReturn('250-Hello\r\n250 PIPELINING\r\n')
     self.sock.sendall('MAIL FROM:<[email protected]>\r\nRCPT TO:<[email protected]>\r\nDATA\r\n')
     self.sock.recv(IsA(int)).AndReturn('250 Ok\r\n250 Ok\r\n354 Go ahead\r\n')
     self.sock.sendall('From: [email protected]\r\n\r\ntest test\r\n.\r\n')
     self.sock.recv(IsA(int)).AndReturn('250 Ok\r\n')
     result.set(True)
     self.sock.sendall('QUIT\r\n')
     self.sock.recv(IsA(int)).AndReturn('221 Goodbye\r\n')
     self.sock.close()
     self.mox.ReplayAll()
     client = SmtpRelayClient(None, queue, socket_creator=self._socket_creator, ehlo_as='test')
     client._run()
开发者ID:pombredanne,项目名称:python-slimta,代码行数:20,代码来源:test_slimta_relay_smtp_client.py


示例11: TestBlockingDeque

class TestBlockingDeque(MoxTestBase, unittest.TestCase):

    def setUp(self):
        super(TestBlockingDeque, self).setUp()
        self.deque = BlockingDeque()
        self.deque.sema = self.mox.CreateMockAnything()

    def test_append(self):
        self.deque.sema.release()
        self.mox.ReplayAll()
        self.deque.append(True)

    def test_appendleft(self):
        self.deque.sema.release()
        self.mox.ReplayAll()
        self.deque.appendleft(True)

    def test_clear(self):
        for i in range(3):
            self.deque.sema.release()
        for i in range(3):
            self.deque.sema.locked().AndReturn(False)
            self.deque.sema.acquire(blocking=False)
        self.deque.sema.locked().AndReturn(True)
        self.mox.ReplayAll()
        self.deque.append(True)
        self.deque.append(True)
        self.deque.append(True)
        self.deque.clear()

    def test_extend(self):
        self.deque.sema.release()
        self.deque.sema.release()
        self.deque.sema.release()
        self.mox.ReplayAll()
        self.deque.extend([1, 2, 3])

    def test_extendleft(self):
        self.deque.sema.release()
        self.deque.sema.release()
        self.deque.sema.release()
        self.mox.ReplayAll()
        self.deque.extendleft([1, 2, 3])

    def test_pop(self):
        self.deque.sema.release()
        self.deque.sema.release()
        self.deque.sema.acquire()
        self.mox.ReplayAll()
        self.deque.append(4)
        self.deque.append(5)
        self.assertEqual(5, self.deque.pop())

    def test_popleft(self):
        self.deque.sema.release()
        self.deque.sema.release()
        self.deque.sema.acquire()
        self.mox.ReplayAll()
        self.deque.append(4)
        self.deque.append(5)
        self.assertEqual(4, self.deque.popleft())

    def test_remove(self):
        self.deque.sema.release()
        self.deque.sema.release()
        self.deque.sema.acquire()
        self.mox.ReplayAll()
        self.deque.append(4)
        self.deque.append(5)
        self.deque.remove(4)

    def test_remove_notfound(self):
        self.deque.sema.release()
        self.deque.sema.release()
        self.mox.ReplayAll()
        self.deque.append(4)
        self.deque.append(5)
        with self.assertRaises(ValueError):
            self.deque.remove(6)
开发者ID:slimta,项目名称:python-slimta,代码行数:79,代码来源:test_slimta_util_deque.py


示例12: setUp

 def setUp(self):
     super(TestBlockingDeque, self).setUp()
     self.deque = BlockingDeque()
     self.deque.sema = self.mox.CreateMockAnything()
开发者ID:slimta,项目名称:python-slimta,代码行数:4,代码来源:test_slimta_util_deque.py


示例13: setUp

 def setUp(self):
     super(TestBlockingDeque, self).setUp()
     self.deque = BlockingDeque()
     self.deque.sema = self.mox.CreateMock(Semaphore)
开发者ID:madhugb,项目名称:python-slimta,代码行数:4,代码来源:test_slimta_util_deque.py



注:本文中的slimta.util.deque.BlockingDeque类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python dns.DNSResolver类代码示例发布时间:2022-05-27
下一篇:
Python server.Server类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap