本文整理汇总了Python中rx.core.Observable类的典型用法代码示例。如果您正苦于以下问题:Python Observable类的具体用法?Python Observable怎么用?Python Observable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Observable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_repeat_observable_repeat_count_throws
def test_repeat_observable_repeat_count_throws(self):
scheduler1 = TestScheduler()
xs = Observable.return_value(1, scheduler1).repeat(3)
xs.subscribe(lambda x: _raise('ex'))
with self.assertRaises(RxException):
scheduler1.start()
scheduler2 = TestScheduler()
ys = Observable.throw_exception('ex1', scheduler2).repeat(3)
ys.subscribe(on_error=lambda ex: _raise('ex2'))
with self.assertRaises(RxException):
scheduler2.start()
scheduler3 = TestScheduler()
zs = Observable.return_value(1, scheduler3).repeat(100)
d = zs.subscribe(on_completed=lambda: _raise('ex3'))
scheduler3.schedule_absolute(10, lambda sc, st: d.dispose())
scheduler3.start()
xss = Observable.create(lambda o: _raise('ex4')).repeat(3)
with self.assertRaises(RxException):
xss.subscribe()
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:25,代码来源:test_repeat.py
示例2: to_future
def to_future(source: Observable) -> Future:
"""Converts an existing observable sequence to a Future.
Example:
future = rx.return_value(42).pipe(ops.to_future(asyncio.Future))
Args:
future_ctor: [Optional] The constructor of the future.
Returns:
A future with the last value from the observable sequence.
"""
has_value = []
def on_next(value):
has_value.append(value)
def on_error(err):
future.set_exception(err)
def on_completed():
if has_value:
future.set_result(has_value.pop())
source.subscribe_(on_next, on_error, on_completed)
# No cancellation can be done
return future
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:29,代码来源:tofuture.py
示例3: subscribe
def subscribe(observer):
try:
result = observable_factory()
except Exception as ex:
return Observable.throw_exception(ex).subscribe(observer)
result = Observable.from_future(result)
return result.subscribe(observer)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:8,代码来源:defer.py
示例4: mode
def mode(source: Observable) -> Observable:
"""
Returns the most frequently emitted value (or "values" if they have the same number of occurrences).
The sequence must be finite.
"""
return source.group_by(lambda v: v) \
.flat_map(lambda grp: grp.count().map(lambda ct: (grp.key, ct))) \
.to_sorted_list(lambda t: t[1], reverse=True) \
.flat_map(lambda l: Observable.from_(l).take_while(lambda t: t[1] == l[0][1])) \
.map(lambda t: t[0])
开发者ID:ReactiveX,项目名称:RxPY,代码行数:10,代码来源:statistics.py
示例5: test_zip_never_never
def test_zip_never_never(self):
scheduler = TestScheduler()
o1 = Observable.never()
o2 = Observable.never()
def create():
return o1.zip(o2, lambda x, y: x + y)
results = scheduler.start(create)
results.messages.assert_equal()
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:10,代码来源:test_zip.py
示例6: create
def create():
import sys
sys.setrecursionlimit(1000)
def predicate(x):
n[0] += 1
return n[0] < 1000
def subscribe(o):
o.on_next(1)
o.on_completed()
return lambda: None
return Observable.while_do(predicate, Observable.create(subscribe))
开发者ID:riccardomarotti,项目名称:RxPY,代码行数:11,代码来源:test_while.py
示例7: test_for_each_index_some_data
def test_for_each_index_some_data(self):
lstX = []
lstI = []
def action(x, i):
lstX.append(x)
lstI.append(i)
Observable.range(10, 10).to_blocking().for_each(action)
assert(lstX == [x for x in range(10, 20)])
assert(lstI == [x for x in range(10)])
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:11,代码来源:test_foreach.py
示例8: test_for_each_index_return
def test_for_each_index_return(self):
lstX = []
lstI = []
def action(x, i):
lstX.append(x)
lstI.append(i)
Observable.return_value(42).to_blocking().for_each(action)
assert(lstX == [42])
assert(lstI == [0])
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:11,代码来源:test_foreach.py
示例9: variance
def variance(source: ObservableBase) -> ObservableBase:
"""
Returns the statistical variance of the numerical emissions.
The sequence must be finite.
"""
squared_values = source.to_list() \
.flat_map(lambda l: Observable.from_(l).average().flat_map(lambda avg: Observable.from_(l).map(lambda i: i - avg))) \
.map(lambda i: i * i) \
.publish() \
.auto_connect(2)
return Observable.zip(squared_values.sum(), squared_values.count(), lambda sum, ct: sum / (ct - 1))
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:12,代码来源:statistics.py
示例10: test_when_never_never
def test_when_never_never(self):
scheduler = TestScheduler()
xs = Observable.never()
ys = Observable.never()
def create():
def selector(x, y):
return x + y
return Observable.when(xs.and_(ys).then_do(selector))
results = scheduler.start(create)
results.messages.assert_equal()
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:12,代码来源:test_when.py
示例11: test_return_observer_throws
def test_return_observer_throws(self):
scheduler1 = TestScheduler()
xs = Observable.return_value(1, scheduler1)
xs.subscribe(lambda x: _raise('ex'))
self.assertRaises(RxException, scheduler1.start)
scheduler2 = TestScheduler()
ys = Observable.return_value(1, scheduler2)
ys.subscribe(lambda x: x, lambda ex: ex, lambda: _raise('ex'))
self.assertRaises(RxException, scheduler2.start)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:12,代码来源:test_returnvalue.py
示例12: do_while
def do_while(self, condition):
"""Repeats source as long as condition holds emulating a do while loop.
Keyword arguments:
condition -- {Function} The condition which determines if the source
will be repeated.
Returns an observable {Observable} sequence which is repeated as long
as the condition holds.
"""
return Observable.concat([self, Observable.while_do(condition, self)])
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:12,代码来源:dowhile.py
示例13: repeat
def repeat(self, repeat_count=None):
"""Repeats the observable sequence a specified number of times. If the
repeat count is not specified, the sequence repeats indefinitely.
1 - repeated = source.repeat()
2 - repeated = source.repeat(42)
Keyword arguments:
repeat_count -- Number of times to repeat the sequence. If not
provided, repeats the sequence indefinitely.
Returns the observable sequence producing the elements of the given
sequence repeatedly."""
return Observable.defer(lambda: Observable.concat(Enumerable.repeat(self, repeat_count)))
开发者ID:cobain,项目名称:ipython,代码行数:15,代码来源:repeat.py
示例14: test_many_select_law_1
def test_many_select_law_1(self):
xs = Observable.range(1, 0)
left = xs.many_select(lambda x: x.first())
right = xs
left.sequence_equal(right).first().subscribe(self.assertTrue)
开发者ID:JohnWowUs,项目名称:RxPY,代码行数:7,代码来源:test_manyselect.py
示例15: partition
def partition(source: Observable) -> List[Observable]:
"""The partially applied `partition` operator.
Returns two observables which partition the observations of the
source by the given function. The first will trigger
observations for those values for which the predicate returns
true. The second will trigger observations for those values
where the predicate returns false. The predicate is executed
once for each subscribed observer. Both also propagate all
error observations arising from the source and each completes
when the source completes.
Args:
source: Source obserable to partition.
Returns:
A list of observables. The first triggers when the
predicate returns True, and the second triggers when the
predicate returns False.
"""
published = source.pipe(
ops.publish(),
ops.ref_count()
)
return [
published.pipe(ops.filter(predicate)),
published.pipe(ops.filter(lambda x: not predicate(x)))
]
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:29,代码来源:partition.py
示例16: create
def create():
def subscribe(o):
o.on_next(1)
o.on_next(2)
return lambda: None
return Observable.create(subscribe)
开发者ID:JohnWowUs,项目名称:RxPY,代码行数:7,代码来源:test_create.py
示例17: go
async def go():
nonlocal result
source = Observable.throw(error)
try:
result = await source
except Exception as ex:
result = ex
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:7,代码来源:py3_tofuture.py
示例18: on_next
def on_next(inner_source: Observable):
nonlocal source
d = SingleAssignmentDisposable()
with source.lock:
latest[0] += 1
_id = latest[0]
has_latest[0] = True
inner_subscription.disposable = d
# Check if Future or Observable
inner_source = from_future(inner_source) if is_future(inner_source) else inner_source
def on_next(x: Any) -> None:
if latest[0] == _id:
observer.on_next(x)
def on_error(e: Exception) -> None:
if latest[0] == _id:
observer.on_error(e)
def on_completed() -> None:
if latest[0] == _id:
has_latest[0] = False
if is_stopped[0]:
observer.on_completed()
d.disposable = inner_source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:28,代码来源:switchlatest.py
示例19: time_interval
def time_interval(self, scheduler):
"""Records the time interval between consecutive values in an
observable sequence.
1 - res = source.time_interval();
2 - res = source.time_interval(Scheduler.timeout)
Keyword arguments:
scheduler -- [Optional] Scheduler used to compute time intervals. If
not specified, the timeout scheduler is used.
Return An observable sequence with time interval information on values.
"""
source = self
scheduler = scheduler or timeout_scheduler
def defer():
last = [scheduler.now]
def selector(x):
now = scheduler.now
span = now - last[0]
last[0] = now
return TimeInterval(value=x, interval=span)
return source.map(selector)
return Observable.defer(defer)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:28,代码来源:timeinterval.py
示例20: last
def last(source: Observable) -> Observable:
"""Partially applied last operator.
Returns the last element of an observable sequence that
satisfies the condition in the predicate if specified, else
the last element.
Examples:
>>> res = last(source)
Args:
source: Source observable to get last item from.
Returns:
An observable sequence containing the last element in the
observable sequence that satisfies the condition in the
predicate.
"""
if predicate:
return source.pipe(
operators.filter(predicate),
operators.last()
)
return last_or_default_async(source, False)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:26,代码来源:last.py
注:本文中的rx.core.Observable类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论