本文整理汇总了Python中rx.testing.TestScheduler类的典型用法代码示例。如果您正苦于以下问题:Python TestScheduler类的具体用法?Python TestScheduler怎么用?Python TestScheduler使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TestScheduler类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_throttle_first_error
def test_throttle_first_error(self):
error = RxException()
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(150, 1),
on_next(210, 2),
on_next(250, 3),
on_next(310, 4),
on_next(350, 5),
on_error(410, error),
on_next(450, 7),
on_completed(500)
)
def create():
return xs.throttle_first(200, scheduler)
results = scheduler.start(create=create)
results.messages.assert_equal(
on_next(210, 2),
on_error(410, error)
)
xs.subscriptions.assert_equal(
subscribe(200, 410)
)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:29,代码来源:test_throttlefirst.py
示例2: test_interval_timespan_negative
def test_interval_timespan_negative(self):
scheduler = TestScheduler()
def create():
return Observable.interval(-1, scheduler=scheduler)
results = scheduler.start(create, disposed=210)
results.messages.assert_equal(on_next(201, 0), on_next(202, 1), on_next(203, 2), on_next(204, 3), on_next(205, 4), on_next(206, 5), on_next(207, 6), on_next(208, 7), on_next(209, 8))
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:7,代码来源:test_interval.py
示例3: test_timeout_duration_simple_timeoutbycompletion
def test_timeout_duration_simple_timeoutbycompletion(self):
ex = 'ex'
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(on_next(310, 1), on_next(350, 2), on_next(420, 3), on_completed(450))
ys = scheduler.create_cold_observable()
zs = scheduler.create_cold_observable()
def create():
def selector(x):
if x < 3:
return zs
else:
raise Exception(ex)
return xs.timeout_with_selector(ys, selector)
results = scheduler.start(create)
results.messages.assert_equal(
on_next(310, 1),
on_next(350, 2),
on_next(420, 3),
on_error(420, ex)
)
xs.subscriptions.assert_equal(subscribe(200, 420))
ys.subscriptions.assert_equal(subscribe(200, 310))
zs.subscriptions.assert_equal(subscribe(310, 350), subscribe(350, 420))
开发者ID:mvschaik,项目名称:RxPY,代码行数:27,代码来源:test_timeoutwithselector.py
示例4: test_min_by_comparer_return
def test_min_by_comparer_return(self):
scheduler = TestScheduler()
msgs = [
on_next(150, {
"key": 1,
"value": 'z'
}), on_next(210, {
"key": 2,
"value": 'a'
}), on_completed(250)
]
def reverse_comparer(a, b):
if a > b:
return -1
if a == b:
return 0
return 1
xs = scheduler.create_hot_observable(msgs)
def create():
return xs.pipe(ops.min_by(lambda x: x["key"], reverse_comparer))
res = scheduler.start(create=create).messages
self.assertEqual(2, len(res))
assert(res[0].value.kind == 'N')
self.assertEqual(1, len(res[0].value.value))
self.assertEqual(2, res[0].value.value[0]["key"])
self.assertEqual('a', res[0].value.value[0]["value"])
assert(res[1].value.kind == 'C' and res[1].time == 250)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:32,代码来源:test_minby.py
示例5: test_min_by_comparer_throws
def test_min_by_comparer_throws(self):
ex = 'ex'
scheduler = TestScheduler()
msgs = [
on_next(150, {
"key": 1,
"value": 'z'
}), on_next(210, {
"key": 3,
"value": 'b'
}), on_next(220, {
"key": 2,
"value": 'c'
}), on_next(230, {
"key": 4,
"value": 'a'
}), on_completed(250)
]
def reverse_comparer(a, b):
_raise(ex)
xs = scheduler.create_hot_observable(msgs)
def create():
return xs.pipe(ops.min_by(lambda x: x["key"], reverse_comparer))
res = scheduler.start(create=create).messages
assert res == [on_error(220, ex)]
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:29,代码来源:test_minby.py
示例6: test_pluck_attr_completed
def test_pluck_attr_completed(self):
scheduler = TestScheduler()
class DummyClass:
def __init__(self, prop):
self.prop = prop
xs = scheduler.create_hot_observable(
on_next(180, DummyClass(1)),
on_next(210, DummyClass(2)),
on_next(240, DummyClass(3)),
on_next(290, DummyClass(4)),
on_next(350, DummyClass(5)),
on_completed(400),
on_next(410, DummyClass(-1)),
on_completed(420),
on_error(430, Exception('ex'))
)
results = scheduler.start(create=lambda: xs.pipe(ops.pluck_attr('prop')))
assert results.messages == [
on_next(210, 2),
on_next(240, 3),
on_next(290, 4),
on_next(350, 5),
on_completed(400)]
assert xs.subscriptions == [subscribe(200, 400)]
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:28,代码来源:test_pluck.py
示例7: test_many_select_error
def test_many_select_error(self):
scheduler = TestScheduler()
ex = Exception()
xs = scheduler.create_hot_observable(
on_next(100, 1),
on_next(220, 2),
on_next(270, 3),
on_next(410, 4),
on_error(500, ex)
)
def create():
return xs.many_select(lambda ys: ys.first(), scheduler).merge_all()
res = scheduler.start(create)
res.messages.assert_equal(
on_next(221, 2),
on_next(271, 3),
on_next(411, 4),
on_error(501, ex)
)
xs.subscriptions.assert_equal(
subscribe(200, 500)
)
开发者ID:AlexMost,项目名称:RxPY,代码行数:28,代码来源:test_manyselect.py
示例8: test_paused_with_observable_controller_and_pause_and_unpause_after_end
def test_paused_with_observable_controller_and_pause_and_unpause_after_end(self):
scheduler = TestScheduler()
results = scheduler.create_observer()
xs = scheduler.create_hot_observable(
on_next(150, 1),
on_next(210, 2),
on_next(230, 3),
on_next(301, 4),
on_next(350, 5),
on_next(399, 6),
on_next(450, 7),
on_next(470, 8),
on_completed(500),
)
controller = scheduler.create_hot_observable(on_next(201, True), on_next(300, False), on_next(600, True))
def create():
return xs.pausable_buffered(controller)
results = scheduler.start(create)
results.messages.assert_equal(
on_next(210, 2),
on_next(230, 3),
on_next(600, 4),
on_next(600, 5),
on_next(600, 6),
on_next(600, 7),
on_next(600, 8),
on_completed(600),
)
开发者ID:michaelandersen,项目名称:RxPY,代码行数:34,代码来源:test_pausablebuffered.py
示例9: test_pairwise_error
def test_pairwise_error(self):
error = Exception()
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(180, 5),
on_next(210, 4),
on_next(240, 3),
on_error(290, error),
on_next(350, 1),
on_completed(360)
)
def create():
return xs.pairwise()
results = scheduler.start(create)
results.messages.assert_equal(
on_next(240, (4,3)),
on_error(290, error)
)
xs.subscriptions.assert_equal(
subscribe(200, 290)
)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:27,代码来源:test_pairwise.py
示例10: test_ignore_values_completed
def test_ignore_values_completed(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(on_next(180, 1), on_next(210, 2), on_next(250, 3), on_next(270, 4), on_next(310, 5), on_next(360, 6), on_next(380, 7), on_next(410, 8), on_next(590, 9), on_completed(610))
results = scheduler.start(create=lambda: xs.ignore_elements())
results.messages.assert_equal(on_completed(610))
xs.subscriptions.assert_equal(subscribe(200, 610))
开发者ID:AlexMost,项目名称:RxPY,代码行数:7,代码来源:test_ignoreelements.py
示例11: test_maxby_comparerthrows
def test_maxby_comparerthrows(self):
ex = 'ex'
scheduler = TestScheduler()
msgs = [
on_next(150, {
"key": 1,
"value": 'z'
}), on_next(210, {
"key": 3,
"value": 'b'
}), on_next(220, {
"key": 2,
"value": 'c'
}), on_next(230, {
"key": 4,
"value": 'a'
}), on_completed(250)
]
def reverse_comparer(a, b):
raise Exception(ex)
xs = scheduler.create_hot_observable(msgs)
def create():
return xs.max_by(lambda x: x["key"], reverse_comparer)
res = scheduler.start(create=create).messages
res.assert_equal(on_error(220, ex))
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:28,代码来源:test_maxby.py
示例12: test_using_error
def test_using_error(self):
scheduler = TestScheduler()
dispose_invoked = [0]
create_invoked = [0]
ex = 'ex'
disp = [None]
xs = [None]
_d = [None]
def create():
def create_resource():
dispose_invoked[0] += 1
disp[0] = MockDisposable(scheduler)
return disp[0]
def create_observable(d):
_d[0] = d
create_invoked[0] += 1
xs[0] = scheduler.create_cold_observable(
on_next(100, scheduler.clock), on_error(200, ex))
return xs[0]
return rx.using(create_resource, create_observable)
results = scheduler.start(create)
assert disp[0] == _d[0]
assert results.messages == [on_next(300, 200), on_error(400, ex)]
assert create_invoked[0] == 1
assert dispose_invoked[0] == 1
assert xs[0].subscriptions == [subscribe(200, 400)]
assert disp[0].disposes == [200, 400]
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:30,代码来源:test_using.py
示例13: test_using_null
def test_using_null(self):
disp = [None]
xs = [None]
_d = [None]
scheduler = TestScheduler()
dispose_invoked = [0]
create_invoked = [0]
def create():
def create_resources():
dispose_invoked[0] += 1
disp[0] = None
return disp[0]
def create_observable(d):
_d[0] = d
create_invoked[0] += 1
xs[0] = scheduler.create_cold_observable(
on_next(100, scheduler.clock), on_completed(200))
return xs[0]
return rx.using(create_resources, create_observable)
results = scheduler.start(create)
assert (disp[0] == _d[0])
assert results.messages == [on_next(300, 200), on_completed(400)]
assert (1 == create_invoked[0])
assert (1 == dispose_invoked[0])
assert xs[0].subscriptions == [subscribe(200, 400)]
assert (disp[0] is None)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:31,代码来源:test_using.py
示例14: test_catch_nested_outer_catches
def test_catch_nested_outer_catches(self):
ex = 'ex'
first_handler_called = [False]
second_handler_called = [False]
scheduler = TestScheduler()
msgs1 = [on_next(150, 1), on_next(210, 2), on_error(215, ex)]
msgs2 = [on_next(220, 3), on_completed(225)]
msgs3 = [on_next(220, 4), on_completed(225)]
o1 = scheduler.create_hot_observable(msgs1)
o2 = scheduler.create_hot_observable(msgs2)
o3 = scheduler.create_hot_observable(msgs3)
def create():
def handler1(e, source):
first_handler_called[0] = True
return o2
def handler2(e, source):
second_handler_called[0] = True
return o3
return o1.pipe(ops.catch(handler1), ops.catch(handler2))
results = scheduler.start(create)
assert results.messages == [on_next(210, 2), on_next(220, 3), on_completed(225)]
assert first_handler_called[0]
assert not second_handler_called[0]
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:26,代码来源:test_catch.py
示例15: test_retry_observable_error
def test_retry_observable_error(self):
ex = 'ex'
scheduler = TestScheduler()
xs = scheduler.create_cold_observable(on_next(100, 1), on_next(150, 2), on_next(200, 3), on_error(250, ex))
results = scheduler.start(lambda: xs.retry(), disposed=1100)
results.messages.assert_equal(on_next(300, 1), on_next(350, 2), on_next(400, 3), on_next(550, 1), on_next(600, 2), on_next(650, 3), on_next(800, 1), on_next(850, 2), on_next(900, 3), on_next(1050, 1))
return xs.subscriptions.assert_equal(subscribe(200, 450), subscribe(450, 700), subscribe(700, 950), subscribe(950, 1100))
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:7,代码来源:test_retry.py
示例16: test_pairwise_not_completed
def test_pairwise_not_completed(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(180, 5),
on_next(210, 4),
on_next(240, 3),
on_next(290, 2),
on_next(350, 1)
)
def create():
return xs.pairwise()
results = scheduler.start(create)
results.messages.assert_equal(
on_next(240, (4,3)),
on_next(290, (3, 2)),
on_next(350, (2, 1))
)
xs.subscriptions.assert_equal(
subscribe(200, 1000)
)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:26,代码来源:test_pairwise.py
示例17: test_retry_observable_retry_count_dispose
def test_retry_observable_retry_count_dispose(self):
scheduler = TestScheduler()
ex = 'ex'
xs = scheduler.create_cold_observable(on_next(5, 1), on_next(10, 2), on_next(15, 3), on_error(20, ex))
results = scheduler.start(lambda: xs.retry(3), disposed=231)
results.messages.assert_equal(on_next(205, 1), on_next(210, 2), on_next(215, 3), on_next(225, 1), on_next(230, 2))
xs.subscriptions.assert_equal(subscribe(200, 220), subscribe(220, 231))
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:7,代码来源:test_retry.py
示例18: test_to_dict_completed
def test_to_dict_completed(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(110, 1),
on_next(220, 2),
on_next(330, 3),
on_next(440, 4),
on_next(550, 5),
on_completed(660)
)
def create():
return xs.to_dict(lambda x: x * 2, lambda x: x * 4)
res = scheduler.start(create)
print(res.messages)
res.messages.assert_equal(
on_next(660, {4: 8, 6: 12, 8: 16, 10: 20}),
on_completed(660)
)
xs.subscriptions.assert_equal(
subscribe(200, 660)
)
开发者ID:riccardomarotti,项目名称:RxPY,代码行数:25,代码来源:test_todict.py
示例19: test_create_with_disposable_dispose
def test_create_with_disposable_dispose(self):
scheduler = TestScheduler()
def create():
def subscribe(o):
d = BooleanDisposable()
o.on_next(1)
o.on_next(2)
def action1(scheduler, state):
if not d.is_disposed:
o.on_next(3)
scheduler.schedule_relative(600, action1)
def action2(scheduler, state):
if not d.is_disposed:
o.on_next(4)
scheduler.schedule_relative(700, action2)
def action3(scheduler, state):
if not d.is_disposed:
o.on_next(5)
scheduler.schedule_relative(900, action3)
def action4(scheduler, state):
if not d.is_disposed:
o.on_next(6)
scheduler.schedule_relative(1100, action4)
return d
return Observable.create_with_disposable(subscribe)
results = scheduler.start(create)
results.messages.assert_equal(on_next(200, 1), on_next(200, 2), on_next(800, 3), on_next(900, 4))
开发者ID:mvschaik,项目名称:RxPY,代码行数:35,代码来源:test_create.py
示例20: test_to_dict_error
def test_to_dict_error(self):
scheduler = TestScheduler()
ex = Exception()
xs = scheduler.create_hot_observable(
on_next(110, 1),
on_next(220, 2),
on_next(330, 3),
on_next(440, 4),
on_next(550, 5),
on_error(660, ex)
)
def create():
return xs.to_dict(lambda x: x * 2, lambda x: x * 4)
res = scheduler.start(create)
res.messages.assert_equal(
on_error(660, ex)
)
xs.subscriptions.assert_equal(
subscribe(200, 660)
)
开发者ID:riccardomarotti,项目名称:RxPY,代码行数:26,代码来源:test_todict.py
注:本文中的rx.testing.TestScheduler类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论