• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python core.Observable类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中rx.core.Observable的典型用法代码示例。如果您正苦于以下问题:Python Observable类的具体用法?Python Observable怎么用?Python Observable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Observable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_repeat_observable_repeat_count_throws

    def test_repeat_observable_repeat_count_throws(self):
        scheduler1 = TestScheduler()
        xs = Observable.return_value(1, scheduler1).repeat(3)
        xs.subscribe(lambda x: _raise('ex'))

        with self.assertRaises(RxException):
            scheduler1.start()

        scheduler2 = TestScheduler()
        ys = Observable.throw_exception('ex1', scheduler2).repeat(3)
        ys.subscribe(on_error=lambda ex: _raise('ex2'))

        with self.assertRaises(RxException):
            scheduler2.start()

        scheduler3 = TestScheduler()
        zs = Observable.return_value(1, scheduler3).repeat(100)
        d = zs.subscribe(on_completed=lambda: _raise('ex3'))

        scheduler3.schedule_absolute(10, lambda sc, st: d.dispose())
        scheduler3.start()

        xss = Observable.create(lambda o: _raise('ex4')).repeat(3)
        with self.assertRaises(RxException):
            xss.subscribe()
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:25,代码来源:test_repeat.py


示例2: to_future

    def to_future(source: Observable) -> Future:
        """Converts an existing observable sequence to a Future.

        Example:
            future = rx.return_value(42).pipe(ops.to_future(asyncio.Future))

        Args:
            future_ctor: [Optional] The constructor of the future.

        Returns:
            A future with the last value from the observable sequence.
        """

        has_value = []

        def on_next(value):
            has_value.append(value)

        def on_error(err):
            future.set_exception(err)

        def on_completed():
            if has_value:
                future.set_result(has_value.pop())

        source.subscribe_(on_next, on_error, on_completed)

        # No cancellation can be done
        return future
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:29,代码来源:tofuture.py


示例3: subscribe

    def subscribe(observer):
        try:
            result = observable_factory()
        except Exception as ex:
            return Observable.throw_exception(ex).subscribe(observer)

        result = Observable.from_future(result)
        return result.subscribe(observer)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:8,代码来源:defer.py


示例4: mode

def mode(source: Observable) -> Observable:
    """
    Returns the most frequently emitted value (or "values" if they have the same number of occurrences).
    The sequence must be finite.
    """
    return source.group_by(lambda v: v) \
        .flat_map(lambda grp: grp.count().map(lambda ct: (grp.key, ct))) \
        .to_sorted_list(lambda t: t[1], reverse=True) \
        .flat_map(lambda l: Observable.from_(l).take_while(lambda t: t[1] == l[0][1])) \
        .map(lambda t: t[0])
开发者ID:ReactiveX,项目名称:RxPY,代码行数:10,代码来源:statistics.py


示例5: test_zip_never_never

    def test_zip_never_never(self):
        scheduler = TestScheduler()
        o1 = Observable.never()
        o2 = Observable.never()

        def create():
            return o1.zip(o2, lambda x, y: x + y)

        results = scheduler.start(create)
        results.messages.assert_equal()
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:10,代码来源:test_zip.py


示例6: create

 def create():
     import sys
     sys.setrecursionlimit(1000)
     def predicate(x):
         n[0] += 1
         return n[0] < 1000
     def subscribe(o):
         o.on_next(1)
         o.on_completed()
         return lambda: None
     return Observable.while_do(predicate, Observable.create(subscribe))
开发者ID:riccardomarotti,项目名称:RxPY,代码行数:11,代码来源:test_while.py


示例7: test_for_each_index_some_data

    def test_for_each_index_some_data(self):
        lstX = []
        lstI = []

        def action(x, i):
            lstX.append(x)
            lstI.append(i)

        Observable.range(10, 10).to_blocking().for_each(action)
        assert(lstX == [x for x in range(10, 20)])
        assert(lstI == [x for x in range(10)])
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:11,代码来源:test_foreach.py


示例8: test_for_each_index_return

    def test_for_each_index_return(self):
        lstX = []
        lstI = []

        def action(x, i):
            lstX.append(x)
            lstI.append(i)

        Observable.return_value(42).to_blocking().for_each(action)
        assert(lstX == [42])
        assert(lstI == [0])
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:11,代码来源:test_foreach.py


示例9: variance

def variance(source: ObservableBase) -> ObservableBase:
    """
    Returns the statistical variance of the numerical emissions.
    The sequence must be finite.
    """
    squared_values = source.to_list() \
        .flat_map(lambda l: Observable.from_(l).average().flat_map(lambda avg: Observable.from_(l).map(lambda i: i - avg))) \
        .map(lambda i: i * i) \
        .publish() \
        .auto_connect(2)

    return Observable.zip(squared_values.sum(), squared_values.count(), lambda sum, ct: sum / (ct - 1))
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:12,代码来源:statistics.py


示例10: test_when_never_never

    def test_when_never_never(self):
        scheduler = TestScheduler()
        xs = Observable.never()
        ys = Observable.never()

        def create():
            def selector(x, y):
                return x + y
            return Observable.when(xs.and_(ys).then_do(selector))

        results = scheduler.start(create)
        results.messages.assert_equal()
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:12,代码来源:test_when.py


示例11: test_return_observer_throws

    def test_return_observer_throws(self):
        scheduler1 = TestScheduler()
        xs = Observable.return_value(1, scheduler1)
        xs.subscribe(lambda x: _raise('ex'))

        self.assertRaises(RxException, scheduler1.start)

        scheduler2 = TestScheduler()
        ys = Observable.return_value(1, scheduler2)
        ys.subscribe(lambda x: x, lambda ex: ex, lambda: _raise('ex'))

        self.assertRaises(RxException, scheduler2.start)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:12,代码来源:test_returnvalue.py


示例12: do_while

def do_while(self, condition):
    """Repeats source as long as condition holds emulating a do while loop.

    Keyword arguments:
    condition -- {Function} The condition which determines if the source
        will be repeated.

    Returns an observable {Observable} sequence which is repeated as long
    as the condition holds.
    """

    return Observable.concat([self, Observable.while_do(condition, self)])
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:12,代码来源:dowhile.py


示例13: repeat

def repeat(self, repeat_count=None):
    """Repeats the observable sequence a specified number of times. If the
    repeat count is not specified, the sequence repeats indefinitely.

    1 - repeated = source.repeat()
    2 - repeated = source.repeat(42)

    Keyword arguments:
    repeat_count -- Number of times to repeat the sequence. If not
        provided, repeats the sequence indefinitely.

    Returns the observable sequence producing the elements of the given
    sequence repeatedly."""

    return Observable.defer(lambda: Observable.concat(Enumerable.repeat(self, repeat_count)))
开发者ID:cobain,项目名称:ipython,代码行数:15,代码来源:repeat.py


示例14: test_many_select_law_1

    def test_many_select_law_1(self):
        xs = Observable.range(1, 0)

        left = xs.many_select(lambda x: x.first())
        right = xs

        left.sequence_equal(right).first().subscribe(self.assertTrue)
开发者ID:JohnWowUs,项目名称:RxPY,代码行数:7,代码来源:test_manyselect.py


示例15: partition

    def partition(source: Observable) -> List[Observable]:
        """The partially applied `partition` operator.

        Returns two observables which partition the observations of the
        source by the given function. The first will trigger
        observations for those values for which the predicate returns
        true. The second will trigger observations for those values
        where the predicate returns false. The predicate is executed
        once for each subscribed observer. Both also propagate all
        error observations arising from the source and each completes
        when the source completes.

        Args:
            source: Source obserable to partition.

        Returns:
            A list of observables. The first triggers when the
            predicate returns True, and the second triggers when the
            predicate returns False.
        """

        published = source.pipe(
            ops.publish(),
            ops.ref_count()
        )
        return [
            published.pipe(ops.filter(predicate)),
            published.pipe(ops.filter(lambda x: not predicate(x)))
        ]
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:29,代码来源:partition.py


示例16: create

        def create():
            def subscribe(o):
                o.on_next(1)
                o.on_next(2)
                return lambda: None

            return Observable.create(subscribe)
开发者ID:JohnWowUs,项目名称:RxPY,代码行数:7,代码来源:test_create.py


示例17: go

 async def go():
     nonlocal result
     source = Observable.throw(error)
     try:
         result = await source
     except Exception as ex:
         result = ex
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:7,代码来源:py3_tofuture.py


示例18: on_next

            def on_next(inner_source: Observable):
                nonlocal source

                d = SingleAssignmentDisposable()
                with source.lock:
                    latest[0] += 1
                    _id = latest[0]
                has_latest[0] = True
                inner_subscription.disposable = d

                # Check if Future or Observable
                inner_source = from_future(inner_source) if is_future(inner_source) else inner_source

                def on_next(x: Any) -> None:
                    if latest[0] == _id:
                        observer.on_next(x)

                def on_error(e: Exception) -> None:
                    if latest[0] == _id:
                        observer.on_error(e)

                def on_completed() -> None:
                    if latest[0] == _id:
                        has_latest[0] = False
                        if is_stopped[0]:
                            observer.on_completed()

                d.disposable = inner_source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:28,代码来源:switchlatest.py


示例19: time_interval

def time_interval(self, scheduler):
    """Records the time interval between consecutive values in an
    observable sequence.

    1 - res = source.time_interval();
    2 - res = source.time_interval(Scheduler.timeout)

    Keyword arguments:
    scheduler -- [Optional] Scheduler used to compute time intervals. If
        not specified, the timeout scheduler is used.

    Return An observable sequence with time interval information on values.
    """

    source = self
    scheduler = scheduler or timeout_scheduler

    def defer():
        last = [scheduler.now]

        def selector(x):
            now = scheduler.now
            span = now - last[0]
            last[0] = now
            return TimeInterval(value=x, interval=span)

        return source.map(selector)
    return Observable.defer(defer)
开发者ID:ESSL-CQQ,项目名称:RxPY,代码行数:28,代码来源:timeinterval.py


示例20: last

    def last(source: Observable) -> Observable:
        """Partially applied last operator.

        Returns the last element of an observable sequence that
        satisfies the condition in the predicate if specified, else
        the last element.

        Examples:
            >>> res = last(source)

        Args:
            source: Source observable to get last item from.

        Returns:
            An observable sequence containing the last element in the
            observable sequence that satisfies the condition in the
            predicate.
        """

        if predicate:
            return source.pipe(
                operators.filter(predicate),
                operators.last()
            )

        return last_or_default_async(source, False)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:26,代码来源:last.py



注:本文中的rx.core.Observable类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python disposable.CompositeDisposable类代码示例发布时间:2022-05-27
下一篇:
Python move.create_move函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap