本文整理汇总了Python中rx.disposable.CompositeDisposable类的典型用法代码示例。如果您正苦于以下问题:Python CompositeDisposable类的具体用法?Python CompositeDisposable怎么用?Python CompositeDisposable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了CompositeDisposable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: subscribe
def subscribe(observer, scheduler=None):
group = CompositeDisposable()
is_stopped = [False]
m = SingleAssignmentDisposable()
group.add(m)
def on_next(inner_source):
inner_subscription = SingleAssignmentDisposable()
group.add(inner_subscription)
inner_source = from_future(inner_source) if is_future(inner_source) else inner_source
@synchronized(source.lock)
def on_completed():
group.remove(inner_subscription)
if is_stopped[0] and len(group) == 1:
observer.on_completed()
on_next = synchronized(source.lock)(observer.on_next)
on_error = synchronized(source.lock)(observer.on_error)
subscription = inner_source.subscribe_(on_next, on_error, on_completed, scheduler)
inner_subscription.disposable = subscription
def on_completed():
is_stopped[0] = True
if len(group) == 1:
observer.on_completed()
m.disposable = source.subscribe_(on_next, observer.on_error, on_completed, scheduler)
return group
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:30,代码来源:merge.py
示例2: _subscribe_core
def _subscribe_core(self, observer=None, scheduler=None) -> typing.Disposable:
clock = self.scheduler.to_seconds(self.scheduler.now)
self.subscriptions.append(Subscription(clock))
index = len(self.subscriptions) - 1
disp = CompositeDisposable()
def get_action(notification):
def action(scheduler, state):
notification.accept(observer)
return Disposable()
return action
for message in self.messages:
notification = message.value
# Don't make closures within a loop
action = get_action(notification)
disp.add(self.scheduler.schedule_relative(message.time, action))
def dispose() -> None:
start = self.subscriptions[index].subscribe
end = self.scheduler.to_seconds(self.scheduler.now)
self.subscriptions[index] = Subscription(start, end)
disp.dispose()
return Disposable(dispose)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:26,代码来源:coldobservable.py
示例3: test_groupdisposable_clear
def test_groupdisposable_clear():
disp1 = [False]
disp2 = [False]
def action1():
disp1[0] = True
d1 = Disposable(action1)
def action2():
disp2[0] = True
d2 = Disposable(action2)
g = CompositeDisposable(d1, d2)
assert g.length == 2
g.clear()
assert disp1[0]
assert disp2[0]
assert not g.length
disp3 = [False]
def action3():
disp3[0] = True
d3 = Disposable(action3)
g.add(d3);
assert not disp3[0]
assert g.length == 1
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:26,代码来源:test_disposable.py
示例4: subscribe
def subscribe(observer, scheduler=None):
is_open = [False]
def on_next(left):
if is_open[0]:
observer.on_next(left)
def on_completed():
if is_open[0]:
observer.on_completed()
subs = source.subscribe_(on_next, observer.on_error, on_completed, scheduler)
subscriptions = CompositeDisposable(subs)
right_subscription = SingleAssignmentDisposable()
subscriptions.add(right_subscription)
def on_next2(x):
is_open[0] = True
right_subscription.dispose()
def on_completed2():
right_subscription.dispose()
right_subscription.disposable = other.subscribe_(on_next2, observer.on_error, on_completed2, scheduler)
return subscriptions
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:27,代码来源:skipuntil.py
示例5: subscribe
def subscribe(observer, scheduler=None):
window = [Subject()]
d = CompositeDisposable()
r = RefCountDisposable(d)
observer.on_next(add_ref(window[0], r))
def on_next_window(x):
window[0].on_next(x)
def on_error(err):
window[0].on_error(err)
observer.on_error(err)
def on_completed():
window[0].on_completed()
observer.on_completed()
d.add(source.subscribe_(on_next_window, on_error, on_completed, scheduler))
def on_next_observer(w):
window[0].on_completed()
window[0] = Subject()
observer.on_next(add_ref(window[0], r))
d.add(window_boundaries.subscribe_(on_next_observer, on_error, on_completed, scheduler))
return r
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:27,代码来源:window.py
示例6: subscribe
def subscribe(observer, scheduler=None):
was_invoked = [False]
def on_completed():
observer.on_completed()
try:
if not was_invoked[0]:
finally_action()
was_invoked[0] = True
except Exception as err: # pylint: disable=broad-except
observer.on_error(err)
def on_error(exception):
observer.on_error(exception)
try:
if not was_invoked[0]:
finally_action()
was_invoked[0] = True
except Exception as err: # pylint: disable=broad-except
observer.on_error(err)
composite_disposable = CompositeDisposable()
composite_disposable.add(OnDispose(was_invoked))
subscription = source.subscribe_(observer.on_next, on_error, on_completed, scheduler)
composite_disposable.add(subscription)
return composite_disposable
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:28,代码来源:do.py
示例7: run
def run(self):
srcs = list(self.parent.sources)
N = len(srcs)
self.queues = [None] * N
self.isDone = [False] * N
self.subscriptions = [None] * N
self.gate = RLock()
for i in range(0, N):
self.queues[i] = deque()
# Loop twice because subscribing could already yield
# a value before all queues are initialized
for i in range(0, N):
d = SingleAssignmentDisposable()
self.subscriptions[i] = d
o = self.O(self, i)
d.disposable = srcs[i].subscribeSafe(o)
c = CompositeDisposable(self.subscriptions)
def dispose():
for q in self.queues:
q.clear()
c.add(Disposable.create(dispose))
return c
开发者ID:aguil,项目名称:RxPython,代码行数:31,代码来源:zip.py
示例8: subscribe
def subscribe(observer, scheduler_=None):
_scheduler = scheduler or scheduler_ or timeout_scheduler
n = [0]
s = [None]
timer_d = SerialDisposable()
window_id = [0]
group_disposable = CompositeDisposable(timer_d)
ref_count_disposable = RefCountDisposable(group_disposable)
def create_timer(_id):
m = SingleAssignmentDisposable()
timer_d.disposable = m
def action(scheduler, state):
if _id != window_id[0]:
return
n[0] = 0
window_id[0] += 1
new_id = window_id[0]
s[0].on_completed()
s[0] = Subject()
observer.on_next(add_ref(s[0], ref_count_disposable))
create_timer(new_id)
m.disposable = _scheduler.schedule_relative(timespan, action)
s[0] = Subject()
observer.on_next(add_ref(s[0], ref_count_disposable))
create_timer(0)
def on_next(x):
new_window = False
new_id = 0
s[0].on_next(x)
n[0] += 1
if n[0] == count:
new_window = True
n[0] = 0
window_id[0] += 1
new_id = window_id[0]
s[0].on_completed()
s[0] = Subject()
observer.on_next(add_ref(s[0], ref_count_disposable))
if new_window:
create_timer(new_id)
def on_error(e):
s[0].on_error(e)
observer.on_error(e)
def on_completed():
s[0].on_completed()
observer.on_completed()
group_disposable.add(source.subscribe_(on_next, on_error, on_completed, scheduler_))
return ref_count_disposable
开发者ID:ReactiveX,项目名称:RxPY,代码行数:60,代码来源:windowwithtimeorcount.py
示例9: test_groupdisposable_contains
def test_groupdisposable_contains():
d1 = Disposable()
d2 = Disposable()
g = CompositeDisposable(d1, d2)
assert g.length == 2
assert g.contains(d1)
assert g.contains(d2)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:9,代码来源:test_disposable.py
示例10: run
def run(self):
self.gate = RLock()
groupDisposable = CompositeDisposable()
self.refCountDisposable = RefCountDisposable(groupDisposable)
self.createWindow()
groupDisposable.add(self.parent.scheduler.schedulePeriodic(self.parent.timeSpan, self.tick))
groupDisposable.add(self.parent.source.subscribeSafe(self))
return self.refCountDisposable
开发者ID:aguil,项目名称:RxPython,代码行数:12,代码来源:window.py
示例11: test_groupdisposable_remove
def test_groupdisposable_remove():
disp1 = [False]
disp2 = [False]
def action1():
disp1[0] = True
d1 = Disposable(action1)
def action2():
disp2[0] = True
d2 = Disposable(action2)
g = CompositeDisposable(d1, d2)
assert g.length == 2
assert g.contains(d1)
assert g.contains(d2)
assert g.remove(d1)
assert g.length == 1
assert not g.contains(d1)
assert g.contains(d2)
assert disp1[0]
assert g.remove(d2)
assert not g.contains(d1)
assert not g.contains(d2)
assert disp2[0]
disp3 = [False]
def action3():
disp3[0] = True
d3 = Disposable(action3)
assert not g.remove(d3)
assert not disp3[0]
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:34,代码来源:test_disposable.py
示例12: RecursiveScheduledFunction
class RecursiveScheduledFunction(object):
def __init__(self, action, scheduler, method = None):
self.action = action
self.group = CompositeDisposable()
self.lock = RLock()
if method == None:
self.schedule = scheduler.scheduleWithState
else:
self.schedule = getattr(scheduler, method)
def run(self, state):
self.action(state, self.actionCallback)
def actionCallback(self, newState, dueTime = None):
self.isDone = False
self.isAdded = False
if dueTime == None:
self.cancel = self.schedule(
newState,
self.schedulerCallback
)
else:
self.cancel = self.schedule(
newState,
dueTime,
self.schedulerCallback
)
with self.lock:
if not self.isDone:
self.group.add(self.cancel)
self.isAdded = True
def schedulerCallback(self, scheduler, state):
with self.lock:
if self.isAdded:
self.group.remove(self.cancel)
else:
self.isDone = True
self.run(state)
return Disposable.empty()
开发者ID:aguil,项目名称:RxPython,代码行数:45,代码来源:scheduler.py
示例13: run
def run(self, observer, cancel, setSink):
self.groupDisposable = CompositeDisposable()
self.refCountDisposable = RefCountDisposable(self.groupDisposable)
sink = self.Sink(self, observer, cancel)
setSink(sink)
self.groupDisposable.add(self.source.subscribeSafe(sink))
return self.refCountDisposable
开发者ID:aguil,项目名称:RxPython,代码行数:9,代码来源:groupByUntil.py
示例14: __init__
def __init__(self, action, scheduler, method = None):
self.action = action
self.group = CompositeDisposable()
self.lock = RLock()
if method == None:
self.schedule = scheduler.scheduleWithState
else:
self.schedule = getattr(scheduler, method)
开发者ID:aguil,项目名称:RxPython,代码行数:9,代码来源:scheduler.py
示例15: run
def run(self):
self.gate = RLock()
self.isStopped = False
self.group = CompositeDisposable()
self.sourceSubscription = SingleAssignmentDisposable()
self.group.add(self.sourceSubscription)
self.sourceSubscription.disposable = self.parent.sources.subscribeSafe(self)
return self.group
开发者ID:aguil,项目名称:RxPython,代码行数:10,代码来源:merge.py
示例16: run
def run(self):
self.delays = CompositeDisposable()
self.gate = RLock()
self.atEnd = False
self.subscription = SerialDisposable()
if self.parent.subscriptionDelay == None:
self.start()
else:
self.subscription.disposable = self.parent.subscriptionDelay.subscribeSafe(self.Sigma(self))
return CompositeDisposable(self.subscription, self.delays)
开发者ID:aguil,项目名称:RxPython,代码行数:12,代码来源:delay.py
示例17: test_groupdisposable_addafterdispose
def test_groupdisposable_addafterdispose():
disp1 = [False]
disp2 = [False]
def action1():
disp1[0] = True
d1 = Disposable(action1)
def action2():
disp2[0] = True
d2 = Disposable(action2)
g = CompositeDisposable(d1)
assert g.length == 1
g.dispose()
assert disp1[0]
assert g.length == 0
g.add(d2)
assert disp2[0]
assert g.length == 0
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:22,代码来源:test_disposable.py
示例18: subscribe
def subscribe(observer, scheduler=None):
has_current = [False]
is_stopped = [False]
m = SingleAssignmentDisposable()
g = CompositeDisposable()
g.add(m)
def on_next(inner_source):
if not has_current[0]:
has_current[0] = True
inner_source = rx.from_future(inner_source) if is_future(inner_source) else inner_source
inner_subscription = SingleAssignmentDisposable()
g.add(inner_subscription)
def on_completed_inner():
g.remove(inner_subscription)
has_current[0] = False
if is_stopped[0] and len(g) == 1:
observer.on_completed()
inner_subscription.disposable = inner_source.subscribe_(
observer.on_next,
observer.on_error,
on_completed_inner,
scheduler
)
def on_completed():
is_stopped[0] = True
if not has_current[0] and len(g) == 1:
observer.on_completed()
m.disposable = source.subscribe_(on_next, observer.on_error, on_completed, scheduler)
return g
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:37,代码来源:exclusive.py
示例19: run
def run(self):
self.gate = RLock()
self.group = CompositeDisposable()
self.refCount = RefCountDisposable(self.group)
leftSubscription = SingleAssignmentDisposable()
self.group.add(leftSubscription)
self.leftID = 0
self.leftMap = {}
rightSubscription = SingleAssignmentDisposable()
self.group.add(rightSubscription)
self.rightID = 0
self.rightMap = {}
leftSubscription.disposable = self.parent.left.subscribeSafe(self.Left(self, leftSubscription))
rightSubscription.disposable = self.parent.right.subscribeSafe(self.Right(self, rightSubscription))
return self.refCount
开发者ID:aguil,项目名称:RxPython,代码行数:19,代码来源:groupJoin.py
示例20: ConcurrentSink
class ConcurrentSink(rx.linq.sink.Sink):
def __init__(self, parent, observer, cancel):
super(Merge.ConcurrentSink, self).__init__(observer, cancel)
self.parent = parent
def run(self):
self.gate = RLock()
self.q = Queue()
self.isStopped = False
self.activeCount = 0
self.group = CompositeDisposable()
self.sourceSubscription = SingleAssignmentDisposable()
self.group.add(self.sourceSubscription)
self.sourceSubscription.disposable = self.parent.sources.subscribeSafe(self)
return self.group
def onNext(self, value):
with self.gate:
if self.activeCount < self.parent.maxConcurrency:
self.activeCount += 1
self.subscribe(value)
else:
self.q.put_nowait(value)
def onError(self, exception):
with self.gate:
self.observer.onError(exception)
self.dispose()
def onCompleted(self):
with self.gate:
self.isStopped = True
if self.activeCount == 0:
self.observer.onCompleted()
self.dispose()
else:
self.sourceSubscription.dispose()
def subscribe(self, innerSource):
subscription = SingleAssignmentDisposable()
self.group.add(subscription)
subscription.disposable = innerSource.subscribeSafe(self.LockObserver(self, subscription))
class LockObserver(Observer):
def __init__(self, parent, subscription):
self.parent = parent
self.subscription = subscription
def onNext(self, value):
with self.parent.gate:
self.parent.observer.onNext(value)
def onError(self, exception):
with self.parent.gate:
self.parent.observer.onError(exception)
self.parent.dispose()
def onCompleted(self):
self.parent.group.remove(self.subscription)
with self.parent.gate:
if self.parent.q.qsize() > 0:
s = self.q.get()
self.parent.subscribe(s)
else:
self.parent.activeCount -= 1
if self.parent.isStopped and self.parent.activeCount == 0:
self.parent.observer.onCompleted()
self.parent.dispose()
开发者ID:aguil,项目名称:RxPython,代码行数:73,代码来源:merge.py
注:本文中的rx.disposable.CompositeDisposable类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论