本文整理汇总了Python中rx.disposables.CompositeDisposable类的典型用法代码示例。如果您正苦于以下问题:Python CompositeDisposable类的具体用法?Python CompositeDisposable怎么用?Python CompositeDisposable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了CompositeDisposable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: subscribe
def subscribe(observer):
m = SingleAssignmentDisposable()
group = CompositeDisposable()
is_stopped = False
group.add(m)
def on_next(inner_source):
inner_subscription = SingleAssignmentDisposable()
group.add(inner_subscription)
def on_complete():
nonlocal group
group.remove(inner_subscription)
if is_stopped and group.length == 1:
observer.on_completed()
disposable = inner_source.subscribe(
observer.on_next,
observer.on_error,
on_complete)
inner_subscription.disposable = disposable
def on_complete():
nonlocal is_stopped
is_stopped = True
if group.length == 1:
observer.on_completed()
m.disposable = sources.subscribe(on_next, observer.on_error, on_complete)
return group
开发者ID:Reactive-Extensions,项目名称:RxPy,代码行数:33,代码来源:observable_multiple.py
示例2: subscribe
def subscribe(observer):
is_open = [False]
def on_next(left):
if is_open[0]:
observer.on_next(left)
def on_completed():
if is_open[0]:
observer.on_completed()
disposables = CompositeDisposable(source.subscribe(on_next, observer.on_error, on_completed))
right_subscription = SingleAssignmentDisposable()
disposables.add(right_subscription)
def on_next2(x):
is_open[0] = True
right_subscription.dispose()
def on_completed2():
right_subscription.dispose()
right_subscription.disposable = other.subscribe(on_next2, observer.on_error, on_completed2)
return disposables
开发者ID:phaufe,项目名称:RxPY,代码行数:26,代码来源:skipuntil.py
示例3: subscribe
def subscribe(observer):
active_plans = []
external_subscriptions = {}
def on_error(err):
for v in external_subscriptions.values():
v.on_error(err)
observer.on_error(err)
out_observer = AnonymousObserver(observer.on_next, on_error, observer.on_completed)
def deactivate(active_plan):
active_plans.remove(active_plan)
if not len(active_plans):
observer.on_completed()
try:
for plan in plans:
active_plans.append(plan.activate(external_subscriptions,
out_observer, deactivate))
except Exception as ex:
Observable.throw(ex).subscribe(observer)
group = CompositeDisposable()
for join_observer in external_subscriptions.values():
join_observer.subscribe()
group.add(join_observer)
return group
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:28,代码来源:when.py
示例4: subscribe
def subscribe(self, on_next, on_error=None, on_completed=None):
print ("ColdObservable:subscribe()")
if isinstance(on_next, AbstractObserver):
observer = on_next
else:
observer = Observer(on_next, on_error, on_completed)
self.subscriptions.append(Subscription(self.scheduler.clock))
index = len(self.subscriptions) - 1
disposable = CompositeDisposable()
def get_action(notification):
def action(scheduler, state):
notification.accept(observer)
return Disposable.empty()
return action
for message in self.messages:
notification = message.value
print ("Notification: ", notification)
# Don't make closures within a loop
action = get_action(notification)
disposable.add(self.scheduler.schedule_relative(message.time, action))
def dispose():
print ("ColdObservable:dispose()")
start = self.subscriptions[index].subscribe
end = self.scheduler.clock
self.subscriptions[index] = Subscription(start, end)
disposable.dispose()
return Disposable(dispose)
开发者ID:Reactive-Extensions,项目名称:RxPy,代码行数:34,代码来源:coldobservable.py
示例5: test_groupdisposable_clear
def test_groupdisposable_clear():
disp1 = [False]
disp2 = [False]
def action1():
disp1[0] = True
d1 = Disposable.create(action1)
def action2():
disp2[0] = True
d2 = Disposable.create(action2)
g = CompositeDisposable(d1, d2)
assert g.length == 2
g.clear()
assert disp1[0]
assert disp2[0]
assert not g.length
disp3 = [False]
def action3():
disp3[0] = True
d3 = Disposable.create(action3)
g.add(d3);
assert not disp3[0]
assert g.length == 1
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:26,代码来源:test_disposable.py
示例6: subscribe
def subscribe(observer):
m = SingleAssignmentDisposable()
group = CompositeDisposable()
is_stopped = [False]
group.add(m)
def on_next(inner_source):
inner_subscription = SingleAssignmentDisposable()
group.add(inner_subscription)
inner_source = Observable.from_future(inner_source)
def on_complete():
group.remove(inner_subscription)
if is_stopped[0] and group.length == 1:
observer.on_completed()
disposable = inner_source.subscribe(
observer.on_next,
observer.on_error,
on_complete)
inner_subscription.disposable = disposable
def on_complete():
is_stopped[0] = True
if group.length == 1:
observer.on_completed()
m.disposable = sources.subscribe(on_next, observer.on_error, on_complete)
return group
开发者ID:mvschaik,项目名称:RxPY,代码行数:31,代码来源:merge.py
示例7: subscribe
def subscribe(observer):
window = Subject()
d = CompositeDisposable()
r = RefCountDisposable(d)
observer.on_next(add_ref(window, r))
def on_next_window(x):
window.on_next(x)
def on_error(err):
window.on_error(err)
observer.on_error(err)
def on_completed():
window.on_completed()
observer.on_completed()
d.add(source.subscribe(on_next_window, on_error, on_completed))
def on_next_observer(w):
window.on_completed()
window = Subject()
observer.on_next(add_ref(window, r))
d.add(window_boundaries.subscribe(on_next_observer, on_error, on_copleted))
return r
开发者ID:Reactive-Extensions,项目名称:RxPy,代码行数:27,代码来源:observable_coincidence.py
示例8: test_groupdisposable_clear
def test_groupdisposable_clear():
disp1 = False
disp2 = False
def action1():
nonlocal disp1
disp1 = True
d1 = Disposable(action1)
def action2():
nonlocal disp2
disp2 = True
d2 = Disposable(action2)
g = CompositeDisposable(d1, d2)
assert g.length == 2
g.clear()
assert disp1
assert disp2
assert not g.length
disp3 = False
def action3():
nonlocal disp3
disp3 = True
d3 = Disposable(action3)
g.add(d3);
assert not disp3
assert g.length == 1
开发者ID:Reactive-Extensions,项目名称:RxPy,代码行数:29,代码来源:test_disposable.py
示例9: _subscribe_core
def _subscribe_core(self, observer):
clock = self.scheduler.to_relative(self.scheduler.now)
self.subscriptions.append(Subscription(clock))
index = len(self.subscriptions) - 1
disposable = CompositeDisposable()
def get_action(notification):
def action(scheduler, state):
notification.accept(observer)
return Disposable.empty()
return action
for message in self.messages:
notification = message.value
# Don't make closures within a loop
action = get_action(notification)
disposable.add(self.scheduler.schedule_relative(message.time, action))
def dispose():
start = self.subscriptions[index].subscribe
end = self.scheduler.to_relative(self.scheduler.now)
self.subscriptions[index] = Subscription(start, end)
disposable.dispose()
return Disposable.create(dispose)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:26,代码来源:coldobservable.py
示例10: subscribe
def subscribe(observer):
n = [0]
s = [None]
timer_d = SerialDisposable()
window_id = [0]
group_disposable = CompositeDisposable(timer_d)
ref_count_disposable = RefCountDisposable(group_disposable)
def create_timer(_id):
m = SingleAssignmentDisposable()
timer_d.disposable = m
def action(scheduler, state):
if _id != window_id[0]:
return
n[0] = 0
window_id[0] += 1
new_id = window_id[0]
s[0].on_completed()
s[0] = Subject()
observer.on_next(add_ref(s[0], ref_count_disposable))
create_timer(new_id)
m.disposable = scheduler.schedule_relative(timespan, action)
s[0] = Subject()
observer.on_next(add_ref(s[0], ref_count_disposable))
create_timer(0)
def on_next(x):
new_window = False
new_id = 0
s[0].on_next(x)
n[0] += 1
if n[0] == count:
new_window = True
n[0] = 0
window_id[0] += 1
new_id = window_id[0]
s[0].on_completed()
s[0] = Subject()
observer.on_next(add_ref(s[0], ref_count_disposable))
if new_window:
create_timer(new_id)
def on_error(e):
s[0].on_error(e)
observer.on_error(e)
def on_completed():
s[0].on_completed()
observer.on_completed()
group_disposable.add(source.subscribe(on_next, on_error, on_completed))
return ref_count_disposable
开发者ID:mvschaik,项目名称:RxPY,代码行数:58,代码来源:windowwithtimeorcount.py
示例11: test_groupdisposable_contains
def test_groupdisposable_contains():
d1 = Disposable.empty()
d2 = Disposable.empty()
g = CompositeDisposable(d1, d2)
assert g.length == 2
assert g.contains(d1)
assert g.contains(d2)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:9,代码来源:test_disposable.py
示例12: _subscribe
def _subscribe(self, observer):
g = CompositeDisposable()
def action(scheduler, state):
observer.on_next(self.head)
g.add(self.tail.merge_observable().subscribe(observer))
g.add(current_thread_scheduler.schedule(action))
return g
开发者ID:jesonjn,项目名称:RxPY,代码行数:9,代码来源:manyselect.py
示例13: test_groupdisposable_remove
def test_groupdisposable_remove():
disp1 = [False]
disp2 = [False]
def action1():
disp1[0] = True
d1 = Disposable.create(action1)
def action2():
disp2[0] = True
d2 = Disposable.create(action2)
g = CompositeDisposable(d1, d2)
assert g.length == 2
assert g.contains(d1)
assert g.contains(d2)
assert g.remove(d1)
assert g.length == 1
assert not g.contains(d1)
assert g.contains(d2)
assert disp1[0]
assert g.remove(d2)
assert not g.contains(d1)
assert not g.contains(d2)
assert disp2[0]
disp3 = [False]
def action3():
disp3[0] = True
d3 = Disposable.create(action3)
assert not g.remove(d3)
assert not disp3[0]
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:34,代码来源:test_disposable.py
示例14: create_observer
def create_observer(observer):
subscription = CompositeDisposable()
for name in observables.keys():
if not subjects[name].is_disposed:
subscription.add(observables[name].subscribe(observer=subjects[name], on_error=log_error))
observer.on_next(subscription)
def dispose():
subscription.dispose()
for x in subjects:
if hasattr(subjects, x):
subjects[x].dispose()
return dispose
开发者ID:TylorS,项目名称:cyclepy,代码行数:14,代码来源:cycle.py
示例15: subscribe
def subscribe(observer):
n = len(sources)
queues = [[] for _ in range(n)]
is_done = [False] * n
def next(i):
if all([len(q) for q in queues]):
res = [x.pop(0) for x in queues]
observer.on_next(res)
elif all([x for j, x in enumerate(is_done) if j != i]):
observer.on_completed()
return
def done(i):
is_done[i] = True
if all(is_done):
observer.on_completed()
return
subscriptions = [None]*n
def func(i):
subscriptions[i] = SingleAssignmentDisposable()
def on_next(x):
queues[i].append(x)
next(i)
subscriptions[i].disposable = sources[i].subscribe(on_next, observer.on_error, lambda: done(i))
for idx in range(n):
func(idx)
composite_disposable = CompositeDisposable(subscriptions)
def action():
for _ in queues:
queues[n] = []
composite_disposable.add(Disposable.create(action))
return composite_disposable
开发者ID:Huskyeder,项目名称:RxPY,代码行数:41,代码来源:ziparray.py
示例16: init
def init(self):
logger.info('Starting %s', self)
self._subscription = CompositeDisposable(
self._subscribe_for_get_server_time_command(),
self._subscribe_for_get_price_command(),
self._subscribe_for_get_balance_command(),
self._subscribe_for_get_active_orders_command(),
self._subscribe_for_get_completed_orders_command(),
self._subscribe_for_create_sell_order_command(),
self._subscribe_for_create_buy_order_command(),
self._subscribe_for_cancel_order_command(),
)
开发者ID:micdm,项目名称:btce-trader,代码行数:12,代码来源:exchange.py
示例17: test_groupdisposable_addafterdispose
def test_groupdisposable_addafterdispose():
disp1 = [False]
disp2 = [False]
def action1():
disp1[0] = True
d1 = Disposable.create(action1)
def action2():
disp2[0] = True
d2 = Disposable.create(action2)
g = CompositeDisposable(d1)
assert g.length == 1
g.dispose()
assert disp1[0]
assert g.length == 0
g.add(d2)
assert disp2[0]
assert g.length == 0
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:22,代码来源:test_disposable.py
示例18: subscribe
def subscribe(observer):
active_count = [0]
group = CompositeDisposable()
is_stopped = [False]
q = []
def subscribe(xs):
subscription = SingleAssignmentDisposable()
group.add(subscription)
def on_completed():
group.remove(subscription)
if len(q):
s = q.pop(0)
subscribe(s)
else:
active_count[0] -= 1
if is_stopped[0] and active_count[0] == 0:
observer.on_completed()
subscription.disposable = xs.subscribe(observer.on_next,
observer.on_error,
on_completed)
def on_next(inner_source):
if active_count[0] < max_concurrent:
active_count[0] += 1
subscribe(inner_source)
else:
q.append(inner_source)
def on_completed():
is_stopped[0] = True
if active_count[0] == 0:
observer.on_completed()
group.add(sources.subscribe(on_next, observer.on_error,
on_completed))
return group
开发者ID:jesonjn,项目名称:RxPY,代码行数:39,代码来源:merge.py
示例19: init
def init(self):
logger.info('Starting %s', self)
self._subscription = CompositeDisposable(
self._subscribe_for_poll_server_time(),
self._subscribe_for_poll_price(),
self._subscribe_for_poll_balance(),
self._subscribe_for_poll_active_orders(),
self._subscribe_for_poll_completed_orders(),
self._subscribe_for_time_and_price(),
self._subscribe_for_balance(),
self._subscribe_for_active_orders(),
self._subscribe_for_completed_orders(),
self._subscribe_for_jumping_price()
)
开发者ID:micdm,项目名称:btce-trader,代码行数:14,代码来源:trader.py
示例20: test_groupdisposable_addafterdispose
def test_groupdisposable_addafterdispose():
disp1 = False
disp2 = False
def action1():
nonlocal disp1
disp1 = True
d1 = Disposable(action1)
def action2():
nonlocal disp2
disp2 = True
d2 = Disposable(action2)
g = CompositeDisposable(d1)
assert g.length == 1
g.dispose()
assert disp1
assert g.length == 0
g.add(d2)
assert disp2
assert g.length == 0
开发者ID:Reactive-Extensions,项目名称:RxPy,代码行数:24,代码来源:test_disposable.py
注:本文中的rx.disposables.CompositeDisposable类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论