• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.run_briefly函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.test_asyncio.utils.run_briefly函数的典型用法代码示例。如果您正苦于以下问题:Python run_briefly函数的具体用法?Python run_briefly怎么用?Python run_briefly使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了run_briefly函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: check_future_exception_never_retrieved

    def check_future_exception_never_retrieved(self, debug, m_log):
        self.loop.set_debug(debug)

        def memory_error():
            try:
                raise MemoryError()
            except BaseException as exc:
                return exc
        exc = memory_error()

        future = self._new_future(loop=self.loop)
        future.set_exception(exc)
        future = None
        test_utils.run_briefly(self.loop)
        support.gc_collect()

        if sys.version_info >= (3, 4):
            regex = f'^{self.cls.__name__} exception was never retrieved\n'
            exc_info = (type(exc), exc, exc.__traceback__)
            m_log.error.assert_called_once_with(mock.ANY, exc_info=exc_info)
        else:
            regex = r'^Future/Task exception was never retrieved\n'
            m_log.error.assert_called_once_with(mock.ANY, exc_info=False)
        message = m_log.error.call_args[0][0]
        self.assertRegex(message, re.compile(regex, re.DOTALL))
开发者ID:guptabandhu25,项目名称:cpython,代码行数:25,代码来源:test_futures.py


示例2: test_ctor

 def test_ctor(self):
     fut = asyncio.Future(loop=self.loop)
     tr = self.socket_transport(waiter=fut)
     test_utils.run_briefly(self.loop)
     self.assertIsNone(fut.result())
     self.protocol.connection_made(tr)
     self.proactor.recv_into.assert_called_with(self.sock, self.buf)
开发者ID:davidam,项目名称:cpython,代码行数:7,代码来源:test_proactor_events.py


示例3: test_del_stream_before_sock_closing

    def test_del_stream_before_sock_closing(self):
        messages = []
        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

        with test_utils.run_test_server() as httpd:
            rd, wr = self.loop.run_until_complete(
                asyncio.open_connection(*httpd.address, loop=self.loop))
            sock = wr.get_extra_info('socket')
            self.assertNotEqual(sock.fileno(), -1)

            wr.write(b'GET / HTTP/1.0\r\n\r\n')
            f = rd.readline()
            data = self.loop.run_until_complete(f)
            self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')

            # drop refs to reader/writer
            del rd
            del wr
            gc.collect()
            # make a chance to close the socket
            test_utils.run_briefly(self.loop)

            self.assertEqual(1, len(messages))
            self.assertEqual(sock.fileno(), -1)

        self.assertEqual(1, len(messages))
        self.assertEqual('An open stream object is being garbage '
                         'collected; call "stream.close()" explicitly.',
                         messages[0]['message'])
开发者ID:amanmehara,项目名称:cpython,代码行数:29,代码来源:test_streams.py


示例4: test_eof_received_waiter

 def test_eof_received_waiter(self):
     waiter = asyncio.Future(loop=self.loop)
     ssl_proto = self.ssl_protocol(waiter)
     self.connection_made(ssl_proto)
     ssl_proto.eof_received()
     test_utils.run_briefly(self.loop)
     self.assertIsInstance(waiter.exception(), ConnectionResetError)
开发者ID:ken860418,项目名称:cpython,代码行数:7,代码来源:test_sslproto.py


示例5: test_notify_all

    def test_notify_all(self):
        cond = asyncio.Condition(loop=self.loop)

        result = []

        async def c1(result):
            await cond.acquire()
            if await cond.wait():
                result.append(1)
                cond.release()
            return True

        async def c2(result):
            await cond.acquire()
            if await cond.wait():
                result.append(2)
                cond.release()
            return True

        t1 = asyncio.Task(c1(result), loop=self.loop)
        t2 = asyncio.Task(c2(result), loop=self.loop)

        test_utils.run_briefly(self.loop)
        self.assertEqual([], result)

        self.loop.run_until_complete(cond.acquire())
        cond.notify_all()
        cond.release()
        test_utils.run_briefly(self.loop)
        self.assertEqual([1, 2], result)

        self.assertTrue(t1.done())
        self.assertTrue(t1.result())
        self.assertTrue(t2.done())
        self.assertTrue(t2.result())
开发者ID:1st1,项目名称:cpython,代码行数:35,代码来源:test_locks.py


示例6: test_acquire

    def test_acquire(self):
        sem = asyncio.Semaphore(3, loop=self.loop)
        result = []

        self.assertTrue(self.loop.run_until_complete(sem.acquire()))
        self.assertTrue(self.loop.run_until_complete(sem.acquire()))
        self.assertFalse(sem.locked())

        async def c1(result):
            await sem.acquire()
            result.append(1)
            return True

        async def c2(result):
            await sem.acquire()
            result.append(2)
            return True

        async def c3(result):
            await sem.acquire()
            result.append(3)
            return True

        async def c4(result):
            await sem.acquire()
            result.append(4)
            return True

        t1 = asyncio.Task(c1(result), loop=self.loop)
        t2 = asyncio.Task(c2(result), loop=self.loop)
        t3 = asyncio.Task(c3(result), loop=self.loop)

        test_utils.run_briefly(self.loop)
        self.assertEqual([1], result)
        self.assertTrue(sem.locked())
        self.assertEqual(2, len(sem._waiters))
        self.assertEqual(0, sem._value)

        t4 = asyncio.Task(c4(result), loop=self.loop)

        sem.release()
        sem.release()
        self.assertEqual(2, sem._value)

        test_utils.run_briefly(self.loop)
        self.assertEqual(0, sem._value)
        self.assertEqual(3, len(result))
        self.assertTrue(sem.locked())
        self.assertEqual(1, len(sem._waiters))
        self.assertEqual(0, sem._value)

        self.assertTrue(t1.done())
        self.assertTrue(t1.result())
        race_tasks = [t2, t3, t4]
        done_tasks = [t for t in race_tasks if t.done() and t.result()]
        self.assertTrue(2, len(done_tasks))

        # cleanup locked semaphore
        sem.release()
        self.loop.run_until_complete(asyncio.gather(*race_tasks))
开发者ID:1st1,项目名称:cpython,代码行数:60,代码来源:test_locks.py


示例7: tearDown

    def tearDown(self):
        # just in case if we have transport close callbacks
        test_utils.run_briefly(self.loop)

        self.loop.close()
        gc.collect()
        super().tearDown()
开发者ID:Apoorvadabhere,项目名称:cpython,代码行数:7,代码来源:test_streams.py


示例8: test_clear_with_waiters

    def test_clear_with_waiters(self):
        ev = asyncio.Event(loop=self.loop)
        result = []

        async def c1(result):
            if await ev.wait():
                result.append(1)
            return True

        t = asyncio.Task(c1(result), loop=self.loop)
        test_utils.run_briefly(self.loop)
        self.assertEqual([], result)

        ev.set()
        ev.clear()
        self.assertFalse(ev.is_set())

        ev.set()
        ev.set()
        self.assertEqual(1, len(ev._waiters))

        test_utils.run_briefly(self.loop)
        self.assertEqual([1], result)
        self.assertEqual(0, len(ev._waiters))

        self.assertTrue(t.done())
        self.assertTrue(t.result())
开发者ID:1st1,项目名称:cpython,代码行数:27,代码来源:test_locks.py


示例9: test_proto_type_switch

    def test_proto_type_switch(self):
        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
        tr = self.socket_transport()

        res = asyncio.Future(loop=self.loop)
        res.set_result(b'data')

        tr = self.socket_transport()
        tr._read_fut = res
        tr._loop_reading(res)
        self.loop._proactor.recv.assert_called_with(self.sock, 32768)
        self.protocol.data_received.assert_called_with(b'data')

        # switch protocol to a BufferedProtocol

        buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
        buf = bytearray(4)
        buf_proto.get_buffer.side_effect = lambda hint: buf

        tr.set_protocol(buf_proto)
        test_utils.run_briefly(self.loop)
        res = asyncio.Future(loop=self.loop)
        res.set_result(4)

        tr._read_fut = res
        tr._loop_reading(res)
        self.loop._proactor.recv_into.assert_called_with(self.sock, buf)
        buf_proto.buffer_updated.assert_called_with(4)
开发者ID:FFMG,项目名称:myoddweb.piger,代码行数:28,代码来源:test_proactor_events.py


示例10: test_wrap_future_cancel

 def test_wrap_future_cancel(self):
     f1 = concurrent.futures.Future()
     f2 = asyncio.wrap_future(f1, loop=self.loop)
     f2.cancel()
     test_utils.run_briefly(self.loop)
     self.assertTrue(f1.cancelled())
     self.assertTrue(f2.cancelled())
开发者ID:guptabandhu25,项目名称:cpython,代码行数:7,代码来源:test_futures.py


示例11: test_close_kill_running

    def test_close_kill_running(self):

        async def kill_running():
            create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
                                               *PROGRAM_BLOCKED)
            transport, protocol = await create

            kill_called = False
            def kill():
                nonlocal kill_called
                kill_called = True
                orig_kill()

            proc = transport.get_extra_info('subprocess')
            orig_kill = proc.kill
            proc.kill = kill
            returncode = transport.get_returncode()
            transport.close()
            await transport._wait()
            return (returncode, kill_called)

        # Ignore "Close running child process: kill ..." log
        with test_utils.disable_logger():
            returncode, killed = self.loop.run_until_complete(kill_running())
        self.assertIsNone(returncode)

        # transport.close() must kill the process if it is still running
        self.assertTrue(killed)
        test_utils.run_briefly(self.loop)
开发者ID:1st1,项目名称:cpython,代码行数:29,代码来源:test_subprocess.py


示例12: test_tb_logger_exception_unretrieved

 def test_tb_logger_exception_unretrieved(self, m_log):
     fut = self._new_future(loop=self.loop)
     fut.set_exception(RuntimeError('boom'))
     del fut
     test_utils.run_briefly(self.loop)
     support.gc_collect()
     self.assertTrue(m_log.error.called)
开发者ID:guptabandhu25,项目名称:cpython,代码行数:7,代码来源:test_futures.py


示例13: test_get_with_waiting_putters

 def test_get_with_waiting_putters(self):
     q = asyncio.Queue(loop=self.loop, maxsize=1)
     asyncio.Task(q.put('a'), loop=self.loop)
     asyncio.Task(q.put('b'), loop=self.loop)
     test_utils.run_briefly(self.loop)
     self.assertEqual(self.loop.run_until_complete(q.get()), 'a')
     self.assertEqual(self.loop.run_until_complete(q.get()), 'b')
开发者ID:Eyepea,项目名称:cpython,代码行数:7,代码来源:test_queues.py


示例14: test_fatal_error_2

    def test_fatal_error_2(self):
        tr = self.socket_transport()
        tr._buffer = [b'data']
        tr._force_close(None)

        test_utils.run_briefly(self.loop)
        self.protocol.connection_lost.assert_called_with(None)
        self.assertEqual(None, tr._buffer)
开发者ID:davidam,项目名称:cpython,代码行数:8,代码来源:test_proactor_events.py


示例15: test_ctor

    def test_ctor(self):
        waiter = asyncio.Future(loop=self.loop)
        tr = self.socket_transport(waiter=waiter)
        self.loop.run_until_complete(waiter)

        self.loop.assert_reader(7, tr._read_ready)
        test_utils.run_briefly(self.loop)
        self.protocol.connection_made.assert_called_with(tr)
开发者ID:Eyepea,项目名称:cpython,代码行数:8,代码来源:test_selector_events.py


示例16: test_close_write_buffer

    def test_close_write_buffer(self):
        tr = self.create_transport()
        tr._buffer.extend(b'data')
        tr.close()

        self.assertFalse(self.loop.readers)
        test_utils.run_briefly(self.loop)
        self.assertFalse(self.protocol.connection_lost.called)
开发者ID:Eyepea,项目名称:cpython,代码行数:8,代码来源:test_selector_events.py


示例17: tearDown

    def tearDown(self):
        # just in case if we have transport close callbacks
        if not self.loop.is_closed():
            test_utils.run_briefly(self.loop)

        self.doCleanups()
        support.gc_collect()
        super().tearDown()
开发者ID:amanmehara,项目名称:cpython,代码行数:8,代码来源:test_sock_lowlevel.py


示例18: _basetest_open_connection_error

 def _basetest_open_connection_error(self, open_connection_fut):
     reader, writer = self.loop.run_until_complete(open_connection_fut)
     writer._protocol.connection_lost(ZeroDivisionError())
     f = reader.read()
     with self.assertRaises(ZeroDivisionError):
         self.loop.run_until_complete(f)
     writer.close()
     test_utils.run_briefly(self.loop)
开发者ID:Apoorvadabhere,项目名称:cpython,代码行数:8,代码来源:test_streams.py


示例19: test_wrap_future_cancel2

 def test_wrap_future_cancel2(self):
     f1 = concurrent.futures.Future()
     f2 = asyncio.wrap_future(f1, loop=self.loop)
     f1.set_result(42)
     f2.cancel()
     test_utils.run_briefly(self.loop)
     self.assertFalse(f1.cancelled())
     self.assertEqual(f1.result(), 42)
     self.assertTrue(f2.cancelled())
开发者ID:guptabandhu25,项目名称:cpython,代码行数:9,代码来源:test_futures.py


示例20: test_connection_lost

 def test_connection_lost(self):
     # From issue #472.
     # yield from waiter hang if lost_connection was called.
     waiter = asyncio.Future(loop=self.loop)
     ssl_proto = self.ssl_protocol(waiter)
     self.connection_made(ssl_proto)
     ssl_proto.connection_lost(ConnectionAbortedError)
     test_utils.run_briefly(self.loop)
     self.assertIsInstance(waiter.exception(), ConnectionAbortedError)
开发者ID:ken860418,项目名称:cpython,代码行数:9,代码来源:test_sslproto.py



注:本文中的test.test_asyncio.utils.run_briefly函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python test_client.get_client函数代码示例发布时间:2022-05-27
下一篇:
Python test_helper.request_mock函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap