• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test_utils.run_briefly函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tulip.test_utils.run_briefly函数的典型用法代码示例。如果您正苦于以下问题:Python run_briefly函数的具体用法?Python run_briefly怎么用?Python run_briefly使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了run_briefly函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_shield_effect

    def test_shield_effect(self):
        # Cancelling outer() does not affect inner().
        proof = 0
        waiter = futures.Future(loop=self.loop)

        @tasks.coroutine
        def inner():
            nonlocal proof
            yield from waiter
            proof += 1

        @tasks.coroutine
        def outer():
            nonlocal proof
            yield from tasks.shield(inner(), loop=self.loop)
            proof += 100

        f = tasks.async(outer(), loop=self.loop)
        test_utils.run_briefly(self.loop)
        f.cancel()
        with self.assertRaises(futures.CancelledError):
            self.loop.run_until_complete(f)
        waiter.set_result(None)
        test_utils.run_briefly(self.loop)
        self.assertEqual(proof, 1)
开发者ID:nerodong,项目名称:tulip,代码行数:25,代码来源:tasks_test.py


示例2: test_cancellation_broadcast

    def test_cancellation_broadcast(self):
        # Cancelling outer() cancels all children.
        proof = 0
        waiter = futures.Future(loop=self.one_loop)

        @tasks.coroutine
        def inner():
            nonlocal proof
            yield from waiter
            proof += 1

        child1 = tasks.async(inner(), loop=self.one_loop)
        child2 = tasks.async(inner(), loop=self.one_loop)
        gatherer = None

        @tasks.coroutine
        def outer():
            nonlocal proof, gatherer
            gatherer = tasks.gather(child1, child2, loop=self.one_loop)
            yield from gatherer
            proof += 100

        f = tasks.async(outer(), loop=self.one_loop)
        test_utils.run_briefly(self.one_loop)
        self.assertTrue(f.cancel())
        with self.assertRaises(futures.CancelledError):
            self.one_loop.run_until_complete(f)
        self.assertFalse(gatherer.cancel())
        self.assertTrue(waiter.cancelled())
        self.assertTrue(child1.cancelled())
        self.assertTrue(child2.cancelled())
        test_utils.run_briefly(self.one_loop)
        self.assertEqual(proof, 0)
开发者ID:nerodong,项目名称:tulip,代码行数:33,代码来源:tasks_test.py


示例3: test_step_result_future

    def test_step_result_future(self):
        # If coroutine returns future, task waits on this future.

        class Fut(futures.Future):
            def __init__(self, *args, **kwds):
                self.cb_added = False
                super().__init__(*args, **kwds)

            def add_done_callback(self, fn):
                self.cb_added = True
                super().add_done_callback(fn)

        fut = Fut(loop=self.loop)
        result = None

        @tasks.coroutine
        def wait_for_future():
            nonlocal result
            result = yield from fut

        t = tasks.Task(wait_for_future(), loop=self.loop)
        test_utils.run_briefly(self.loop)
        self.assertTrue(fut.cb_added)

        res = object()
        fut.set_result(res)
        test_utils.run_briefly(self.loop)
        self.assertIs(res, result)
        self.assertTrue(t.done())
        self.assertIsNone(t.result())
开发者ID:nerodong,项目名称:tulip,代码行数:30,代码来源:tasks_test.py


示例4: test_shield_cancel

 def test_shield_cancel(self):
     inner = futures.Future(loop=self.loop)
     outer = tasks.shield(inner)
     test_utils.run_briefly(self.loop)
     inner.cancel()
     test_utils.run_briefly(self.loop)
     self.assertTrue(outer.cancelled())
开发者ID:nerodong,项目名称:tulip,代码行数:7,代码来源:tasks_test.py


示例5: test_yield_future_passes_cancel

    def test_yield_future_passes_cancel(self):
        # Cancelling outer() cancels inner() cancels waiter.
        proof = 0
        waiter = futures.Future(loop=self.loop)

        @tasks.coroutine
        def inner():
            nonlocal proof
            try:
                yield from waiter
            except futures.CancelledError:
                proof += 1
                raise
            else:
                self.fail('got past sleep() in inner()')

        @tasks.coroutine
        def outer():
            nonlocal proof
            try:
                yield from inner()
            except futures.CancelledError:
                proof += 100  # Expect this path.
            else:
                proof += 10

        f = tasks.async(outer(), loop=self.loop)
        test_utils.run_briefly(self.loop)
        f.cancel()
        self.loop.run_until_complete(f)
        self.assertEqual(proof, 101)
        self.assertTrue(waiter.cancelled())
开发者ID:nerodong,项目名称:tulip,代码行数:32,代码来源:tasks_test.py


示例6: test_yield_wait_does_not_shield_cancel

    def test_yield_wait_does_not_shield_cancel(self):
        # Cancelling outer() makes wait() return early, leaves inner()
        # running.
        proof = 0
        waiter = futures.Future(loop=self.loop)

        @tasks.coroutine
        def inner():
            nonlocal proof
            yield from waiter
            proof += 1

        @tasks.coroutine
        def outer():
            nonlocal proof
            d, p = yield from tasks.wait([inner()], loop=self.loop)
            proof += 100

        f = tasks.async(outer(), loop=self.loop)
        test_utils.run_briefly(self.loop)
        f.cancel()
        self.assertRaises(
            futures.CancelledError, self.loop.run_until_complete, f)
        waiter.set_result(None)
        test_utils.run_briefly(self.loop)
        self.assertEqual(proof, 1)
开发者ID:nerodong,项目名称:tulip,代码行数:26,代码来源:tasks_test.py


示例7: tearDown

    def tearDown(self):
        # just in case if we have transport close callbacks
        test_utils.run_briefly(self.loop)

        self.loop.close()
        gc.collect()
        super().tearDown()
开发者ID:sah,项目名称:tulip,代码行数:7,代码来源:events_test.py


示例8: test_baseexception_during_cancel

    def test_baseexception_during_cancel(self):

        def gen():
            when = yield
            self.assertAlmostEqual(10.0, when)
            yield 0

        loop = test_utils.TestLoop(gen)
        self.addCleanup(loop.close)

        @tasks.coroutine
        def sleeper():
            yield from tasks.sleep(10, loop=loop)

        base_exc = BaseException()

        @tasks.coroutine
        def notmutch():
            try:
                yield from sleeper()
            except futures.CancelledError:
                raise base_exc

        task = tasks.Task(notmutch(), loop=loop)
        test_utils.run_briefly(loop)

        task.cancel()
        self.assertFalse(task.done())

        self.assertRaises(BaseException, test_utils.run_briefly, loop)

        self.assertTrue(task.done())
        self.assertFalse(task.cancelled())
        self.assertIs(task.exception(), base_exc)
开发者ID:nerodong,项目名称:tulip,代码行数:34,代码来源:tasks_test.py


示例9: test_read_pipe

    def test_read_pipe(self):
        proto = None

        rpipe, wpipe = os.pipe()
        pipeobj = io.open(rpipe, 'rb', 1024)

        @tasks.task
        def connect():
            nonlocal proto
            t = yield from self.loop.connect_read_pipe(pipeobj)
            proto = MyReadPipeProto(t, create_future=True)
            self.assertIs(t, proto.transport)
            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
            self.assertEqual(0, proto.nbytes)

        self.loop.run_until_complete(connect())

        os.write(wpipe, b'1')
        test_utils.run_briefly(self.loop)
        self.assertEqual(1, proto.nbytes)

        os.write(wpipe, b'2345')
        test_utils.run_briefly(self.loop)
        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
        self.assertEqual(5, proto.nbytes)

        os.close(wpipe)
        self.loop.run_until_complete(proto.done)
        self.assertEqual(
            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
        # extra info is available
        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
开发者ID:sah,项目名称:tulip,代码行数:32,代码来源:events_test.py


示例10: test_cancel_done_future

    def test_cancel_done_future(self):
        fut1 = futures.Future()
        fut2 = futures.Future()
        fut3 = futures.Future()

        @tasks.task
        def task():
            yield from fut1
            try:
                yield from fut2
            except futures.CancelledError:
                pass
            yield from fut3

        t = task()
        test_utils.run_briefly(self.loop)
        fut1.set_result(None)
        t.cancel()
        test_utils.run_once(self.loop)  # process fut1 result, delay cancel
        self.assertFalse(t.done())
        test_utils.run_once(self.loop)  # cancel fut2, but coro still alive
        self.assertFalse(t.done())
        test_utils.run_briefly(self.loop)  # cancel fut3
        self.assertTrue(t.done())

        self.assertEqual(fut1.result(), None)
        self.assertTrue(fut2.cancelled())
        self.assertTrue(fut3.cancelled())
        self.assertTrue(t.cancelled())
开发者ID:sah,项目名称:tulip,代码行数:29,代码来源:tasks_test.py


示例11: test_ctor_with_waiter

 def test_ctor_with_waiter(self):
     fut = futures.Future(loop=self.loop)
     tr = unix_events._UnixWritePipeTransport(
         self.loop, self.pipe, self.protocol, fut)
     self.loop.assert_reader(5, tr._read_ready)
     test_utils.run_briefly(self.loop)
     self.assertEqual(None, fut.result())
开发者ID:nerodong,项目名称:tulip,代码行数:7,代码来源:unix_events_test.py


示例12: test_get_with_waiting_putters

 def test_get_with_waiting_putters(self):
     q = queues.Queue(loop=self.loop, maxsize=1)
     tasks.Task(q.put('a'), loop=self.loop)
     tasks.Task(q.put('b'), loop=self.loop)
     test_utils.run_briefly(self.loop)
     self.assertEqual(self.loop.run_until_complete(q.get()), 'a')
     self.assertEqual(self.loop.run_until_complete(q.get()), 'b')
开发者ID:nerodong,项目名称:tulip,代码行数:7,代码来源:queues_test.py


示例13: test_sleep_cancel

    def test_sleep_cancel(self):

        def gen():
            when = yield
            self.assertAlmostEqual(10.0, when)
            yield 0

        loop = test_utils.TestLoop(gen)
        self.addCleanup(loop.close)

        t = tasks.Task(tasks.sleep(10.0, 'yeah', loop=loop),
                       loop=loop)

        handle = None
        orig_call_later = loop.call_later

        def call_later(self, delay, callback, *args):
            nonlocal handle
            handle = orig_call_later(self, delay, callback, *args)
            return handle

        loop.call_later = call_later
        test_utils.run_briefly(loop)

        self.assertFalse(handle._cancelled)

        t.cancel()
        test_utils.run_briefly(loop)
        self.assertTrue(handle._cancelled)
开发者ID:nerodong,项目名称:tulip,代码行数:29,代码来源:tasks_test.py


示例14: test_clear_with_waiters

    def test_clear_with_waiters(self):
        ev = locks.EventWaiter(loop=self.loop)
        result = []

        @tasks.coroutine
        def c1(result):
            if (yield from ev.wait()):
                result.append(1)
            return True

        t = tasks.Task(c1(result), loop=self.loop)
        run_briefly(self.loop)
        self.assertEqual([], result)

        ev.set()
        ev.clear()
        self.assertFalse(ev.is_set())

        ev.set()
        ev.set()
        self.assertEqual(1, len(ev._waiters))

        run_briefly(self.loop)
        self.assertEqual([1], result)
        self.assertEqual(0, len(ev._waiters))

        self.assertTrue(t.done())
        self.assertTrue(t.result())
开发者ID:concordusapps,项目名称:tulip,代码行数:28,代码来源:locks_test.py


示例15: test_ctor

 def test_ctor(self):
     fut = tulip.Future(loop=self.loop)
     tr = _ProactorSocketTransport(
         self.loop, self.sock, self.protocol, fut)
     test_utils.run_briefly(self.loop)
     self.assertIsNone(fut.result())
     self.protocol.connection_made(tr)
     self.proactor.recv.assert_called_with(self.sock, 4096)
开发者ID:nerodong,项目名称:tulip,代码行数:8,代码来源:proactor_events_test.py


示例16: test_shield_exception

 def test_shield_exception(self):
     inner = futures.Future(loop=self.loop)
     outer = tasks.shield(inner)
     test_utils.run_briefly(self.loop)
     exc = RuntimeError('expected')
     inner.set_exception(exc)
     test_utils.run_briefly(self.loop)
     self.assertIs(outer.exception(), exc)
开发者ID:nerodong,项目名称:tulip,代码行数:8,代码来源:tasks_test.py


示例17: test_fatal_error_2

    def test_fatal_error_2(self):
        tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
        tr._buffer = [b'data']
        tr._force_close(None)

        test_utils.run_briefly(self.loop)
        self.protocol.connection_lost.assert_called_with(None)
        self.assertEqual([], tr._buffer)
开发者ID:nerodong,项目名称:tulip,代码行数:8,代码来源:proactor_events_test.py


示例18: test_write_eof

    def test_write_eof(self):
        tr = unix_events._UnixWritePipeTransport(
            self.loop, self.pipe, self.protocol)

        tr.write_eof()
        self.assertTrue(tr._closing)
        self.assertFalse(self.loop.readers)
        test_utils.run_briefly(self.loop)
        self.protocol.connection_lost.assert_called_with(None)
开发者ID:nerodong,项目名称:tulip,代码行数:9,代码来源:unix_events_test.py


示例19: test__read_ready_blocked

    def test__read_ready_blocked(self, m_read):
        tr = unix_events._UnixReadPipeTransport(
            self.loop, self.pipe, self.protocol)
        m_read.side_effect = BlockingIOError
        tr._read_ready()

        m_read.assert_called_with(5, tr.max_size)
        test_utils.run_briefly(self.loop)
        self.assertFalse(self.protocol.data_received.called)
开发者ID:nerodong,项目名称:tulip,代码行数:9,代码来源:unix_events_test.py


示例20: test__close

    def test__close(self, m_read):
        tr = unix_events._UnixReadPipeTransport(
            self.loop, self.pipe, self.protocol)

        err = object()
        tr._close(err)
        self.assertTrue(tr._closing)
        self.assertFalse(self.loop.readers)
        test_utils.run_briefly(self.loop)
        self.protocol.connection_lost.assert_called_with(err)
开发者ID:nerodong,项目名称:tulip,代码行数:10,代码来源:unix_events_test.py



注:本文中的tulip.test_utils.run_briefly函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python unix_events._UnixWritePipeTransport函数代码示例发布时间:2022-05-27
下一篇:
Python synth.synthesize函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap