• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python select.error函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中select.error函数的典型用法代码示例。如果您正苦于以下问题:Python error函数的具体用法?Python error怎么用?Python error使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了error函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: select

 def select(cls, rlist, wlist, xlist, timeout=5):
     the_time = time.time()
     if timeout is not None:
         tstop = the_time + timeout
     else:
         tstop = 0
     while timeout is None or the_time <= tstop:
         # we must always go around at least once
         rs = []
         ws = []
         for r in rlist:
             try:
                 if r.recv_pipe.canread():
                     rs.append(r)
             except IOError:
                 # raise a socket error
                 raise select.error(errno.EPIPE, os.strerror(errno.EPIPE))
         for w in wlist:
             try:
                 if w.send_pipe.canwrite():
                     ws.append(w)
             except IOError:
                 # raise a socket error
                 raise select.error(errno.EPIPE, os.strerror(errno.EPIPE))
         if rs or ws:
             return rs, ws, []
         else:
             time.sleep(1)
             the_time = time.time()
     return [], [], []
开发者ID:damycra,项目名称:pyslet,代码行数:30,代码来源:test_http_server.py


示例2: test_select_eintr

 def test_select_eintr(self):
     # EINTR is supposed to be ignored
     with mock.patch('pyftpdlib.ioloop.select.select',
                     side_effect=select.error()) as m:
         m.side_effect.errno = errno.EINTR
         s, rd, wr = self.test_register()
         s.poll(0)
     # ...but just that
     with mock.patch('pyftpdlib.ioloop.select.select',
                     side_effect=select.error()) as m:
         m.side_effect.errno = errno.EBADF
         s, rd, wr = self.test_register()
         self.assertRaises(select.error, s.poll, 0)
开发者ID:BroLsd999,项目名称:pyftpdlib,代码行数:13,代码来源:test_ioloop.py


示例3: test_invalidate_connection

 def test_invalidate_connection(slef, queued_pool):
     msg = Message.generate()
     with pytest.raises(select.error):
         with queued_pool.acquire() as cxn:
             fairy = cxn.fairy
             raise select.error(9, 'Bad file descriptor')
     assert fairy.cxn.is_closed
开发者ID:LyuGGang,项目名称:pika-pool,代码行数:7,代码来源:test.py


示例4: __call__

 def __call__(self, *args):
     self.called += 1
     if self.called == 1:
         # raise the exception on first call
         raise select.error(errno.EINTR, os.strerror(errno.EINTR))
     else:
         # Return real select value for consecutive calls
         return old_select(*args)
开发者ID:Kelauni22,项目名称:Meeple,代码行数:8,代码来源:test_socketserver.py


示例5: test_nofailure_with_errno_EINTR

 def test_nofailure_with_errno_EINTR(self):
     """checks no exception is raised if errno.EINTR is raised
     while it's selecting"""
     self.__call_count = 0
     select.select = lambda r, w, x, t: self.__faked_select(
         select.error(errno.EINTR))
     self.stats_httpd = MyStatsHttpd(get_availaddr())
     self.stats_httpd.start() # shouldn't leak the exception
     self.assertFalse(self.stats_httpd.running)
     self.assertIsNone(self.stats_httpd.mccs)
开发者ID:Distrotech,项目名称:bind10,代码行数:10,代码来源:stats-httpd_test.py


示例6: test_ppl

    def test_ppl(self, read_mock, select_mock):
        # Simulate the two files
        stdout = mock.Mock(name='pipe.stdout')
        stdout.fileno.return_value = 65
        stderr = mock.Mock(name='pipe.stderr')
        stderr.fileno.return_value = 66

        # Recipients for results
        out_list = []
        err_list = []

        # StreamLineProcessors
        out_proc = StreamLineProcessor(stdout, out_list.append)
        err_proc = StreamLineProcessor(stderr, err_list.append)

        # The select call always returns all the streams
        select_mock.side_effect = [
            [[out_proc, err_proc], [], []],
            select.error(errno.EINTR),  # Test interrupted system call
            [[out_proc, err_proc], [], []],
            [[out_proc, err_proc], [], []],
        ]

        # The read calls return out and err interleaved
        # Lines are split in various ways, to test all the code paths
        read_mock.side_effect = ['line1\nl'.encode('utf-8'),
                                 'err'.encode('utf-8'),
                                 'ine2'.encode('utf-8'),
                                 '1\nerr2\n'.encode('utf-8'),
                                 '', '',
                                 Exception]  # Make sure it terminates

        command_wrappers.Command.pipe_processor_loop([out_proc, err_proc])

        # Check the calls order and the output
        assert read_mock.mock_calls == [
            mock.call(65, 4096),
            mock.call(66, 4096),
            mock.call(65, 4096),
            mock.call(66, 4096),
            mock.call(65, 4096),
            mock.call(66, 4096),
        ]
        assert out_list == ['line1', 'line2']
        assert err_list == ['err1', 'err2', '']
开发者ID:damnski,项目名称:barman,代码行数:45,代码来源:test_command_wrappers.py


示例7: poll

 def poll(self, timeout):
     if self.error:
         raise select.error(self.error)
     return self.result
开发者ID:alexsilva,项目名称:supervisor,代码行数:4,代码来源:test_poller.py


示例8: select

 def select(self, r, w, x, timeout):
     if self.error:
         raise select.error(self.error)
     return self.readables, self.writables, []
开发者ID:alexsilva,项目名称:supervisor,代码行数:4,代码来源:test_poller.py


示例9: select

 def select(self, r, w, x, timeout):
     import select
     if self.select_error:
         raise select.error(self.select_error)
     return self.select_result
开发者ID:fedosov,项目名称:supervisor,代码行数:5,代码来源:base.py


示例10: test_ppl_select_failure

    def test_ppl_select_failure(self, select_mock):
        # Test if select errors are passed through
        select_mock.side_effect = select.error('not good')

        with pytest.raises(select.error):
            command_wrappers.Command.pipe_processor_loop([None])
开发者ID:damnski,项目名称:barman,代码行数:6,代码来源:test_command_wrappers.py


示例11: raise_eintr_once

 def raise_eintr_once(*args):
     select_call.side_effect = None
     raise select.error(4, "Interrupted system call")
开发者ID:kkhalasi,项目名称:stompest,代码行数:3,代码来源:transport_test.py


示例12: test_read_select_err

 def test_read_select_err(self):
     '''Recovers from select errors'''
     with mock.patch('nsq.client.select.select') as mock_select:
         mock_select.side_effect = select.error(errno.EBADF)
         # This test passes if no exception is raised
         self.client.read()
开发者ID:life360,项目名称:nsq-py,代码行数:6,代码来源:test_client.py


示例13: test_internal_utils

 def test_internal_utils(self):
     err = select.error(errno.EINTR, "interrupted")
     assert omcache._select_errno(err) == errno.EINTR
开发者ID:CrowdStrike,项目名称:omcache,代码行数:3,代码来源:test_omcache.py


示例14: raise_eintr

 def raise_eintr():
     raise select.error(4, 'Interrupted system call')
开发者ID:nikipore,项目名称:stompest,代码行数:2,代码来源:transport_test.py


示例15: raise_select_except

 def raise_select_except(*args):
     raise select.error('dummy error')
开发者ID:Distrotech,项目名称:bind10,代码行数:2,代码来源:stats-httpd_test.py



注:本文中的select.error函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python select.kqueue函数代码示例发布时间:2022-05-27
下一篇:
Python select.epoll函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap