• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python operators.map函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中rx.operators.map函数的典型用法代码示例。如果您正苦于以下问题:Python map函数的具体用法?Python map怎么用?Python map使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了map函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_map_disposeinsidemapper

    def test_map_disposeinsidemapper(self):
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(on_next(100, 1), on_next(200, 2), on_next(500, 3), on_next(600, 4))
        results = scheduler.create_observer()
        d = SerialDisposable()
        invoked = [0]

        def projection(x, *args, **kw):
            invoked[0] += 1

            if scheduler.clock > 400:
                d.dispose()
            return x

        d.disposable = xs.pipe(
            map(projection)
        ).subscribe(results, scheduler)

        def action(scheduler, state):
            return d.dispose()

        scheduler.schedule_absolute(ReactiveTest.disposed, action)
        scheduler.start()

        assert results.messages == [on_next(100, 1), on_next(200, 2)]
        assert xs.subscriptions == [ReactiveTest.subscribe(0, 500)]

        assert invoked[0] == 3
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:28,代码来源:test_map.py


示例2: open

    def open(self):
        scheduler = AsyncIOScheduler()

        print("WebSocket opened")

        # A Subject is both an observable and observer, so we can both subscribe
        # to it and also feed (send) it with new values
        self.subject = Subject()

        # Get all distinct key up events from the input and only fire if long enough and distinct
        searcher = self.subject.pipe(
            ops.map(lambda x: x["term"]),
            ops.filter(lambda text: len(text) > 2),  # Only if the text is longer than 2 characters
            ops.debounce(0.750),                     # Pause for 750ms
            ops.distinct_until_changed(),            # Only if the value has changed
            ops.flat_map_latest(search_wikipedia)
        )

        def send_response(x):
            self.write_message(x.body)

        def on_error(ex):
            print(ex)

        searcher.subscribe(send_response, on_error, scheduler=scheduler)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:25,代码来源:autocomplete_asyncio.py


示例3: main

def main():
    app = QApplication(sys.argv)
    scheduler = QtScheduler(QtCore)

    window = Window()
    window.show()

    text = 'TIME FLIES LIKE AN ARROW'

    def on_next(info):
        label, (x, y), i = info
        label.move(x + i*12 + 15, y)
        label.show()

    def handle_label(label, i):
        delayer = ops.delay(i * 0.100)
        mapper = ops.map(lambda xy: (label, xy, i))

        return window.mousemove.pipe(
            delayer,
            mapper,
            )

    labeler = ops.flat_map_indexed(handle_label)
    mapper = ops.map(lambda c: QLabel(c, window))

    rx.from_(text).pipe(
        mapper,
        labeler,
    ).subscribe(on_next, on_error=print, scheduler=scheduler)

    sys.exit(app.exec_())
开发者ID:ReactiveX,项目名称:RxPY,代码行数:32,代码来源:timeflies_qt.py


示例4: handle_label

    def handle_label(label, i):
        delayer = ops.delay(i*0.100)
        mapper = ops.map(lambda xy: (label, xy, i))

        return window.mousemove.pipe(
            delayer,
            mapper,
        )
开发者ID:ReactiveX,项目名称:RxPY,代码行数:8,代码来源:timeflies_gtk.py


示例5: create

        def create():
            def mapper(x):
                return Timestamp(x.value, x.timestamp)

            return xs.pipe(
                ops.timestamp(),
                ops.map(mapper),
            )
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:8,代码来源:test_timestamp.py


示例6: create

        def create():
            def mapper(x):
                return TimeInterval(x.value, x.interval)

            return xs.pipe(
                ops.time_interval(),
                ops.map(mapper),
            )
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:8,代码来源:test_timeinterval.py


示例7: factory

        def factory():
            def projection(x):
                invoked[0] += 1
                if invoked[0] == 3:
                    raise Exception(ex)

                return x + 1
            return xs.pipe(map(projection))
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:8,代码来源:test_map.py


示例8: factory

        def factory():
            def key_mapper(x):
                key_invoked[0] += 1
                return x.lower().strip()

            return xs.pipe(
                    ops.group_by(key_mapper, lambda x: x),
                    ops.map(lambda g: g.key),
                    )
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:9,代码来源:test_groupby.py


示例9: handle_label

    def handle_label(label, i):
        label.config(dict(borderwidth=0, padx=0, pady=0))

        mapper = ops.map(lambda ev: (label, ev, i))
        delayer = ops.delay(i*0.1)

        return mousemove.pipe(
            delayer,
            mapper
        )
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:10,代码来源:timeflies_tkinter.py


示例10: _all

def _all(predicate: Predicate) -> Callable[[Observable], Observable]:

    filtering = ops.filter(lambda v: not predicate(v))
    mapping = ops.map(lambda b: not b)
    some = ops.some()

    return pipe(
        filtering,
        some,
        mapping
    )
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:11,代码来源:all.py


示例11: _pluck

def _pluck(key: Any) -> Callable[[Observable], Observable]:
    """Retrieves the value of a specified key using dict-like access (as in
    element[key]) from all elements in the Observable sequence.

    Args:
        key: The key to pluck.

    Returns a new Observable {Observable} sequence of key values.

    To pluck an attribute of each element, use pluck_attr.
    """

    return ops.map(lambda x: x[key])
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:13,代码来源:pluck.py


示例12: average

    def average(source: Observable) -> Observable:
        """Partially applied average operator.

        Computes the average of an observable sequence of values that
        are in the sequence or obtained by invoking a transform
        function on each element of the input sequence if present.

        Examples:
            >>> res = average(source)

        Args:
            source: Source observable to average.

        Returns:
            An observable sequence containing a single element with the
            average of the sequence of values.
        """

        if key_mapper:
            return source.pipe(
                operators.map(key_mapper),
                operators.average()
            )

        def accumulator(prev, cur):
            return AverageValue(sum=prev.sum+cur, count=prev.count+1)

        def mapper(s):
            if s.count == 0:
                raise Exception('The input sequence was empty')

            return s.sum / float(s.count)

        seed = AverageValue(sum=0, count=0)
        return source.pipe(
            operators.scan(accumulator, seed),
            operators.last(),
            operators.map(mapper)
        )
开发者ID:ReactiveX,项目名称:RxPY,代码行数:39,代码来源:average.py


示例13: create

        def create():
            def mapper(xy):
                x, y = xy
                return "{}{}".format(x.value, y.value)

            return xs.pipe(
                ops.join(
                    ys,
                    lambda x: rx.timer(x.interval),
                    lambda y: rx.timer(y.interval).pipe(ops.flat_map(rx.throw(ex) if y.value == "tin" else rx.empty())),
                    ),
                ops.map(mapper),
                )
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:13,代码来源:test_join.py


示例14: subscribe

        def subscribe(observer, scheduler_):
            _scheduler = scheduler or scheduler_ or timeout_scheduler
            last = _scheduler.now

            def mapper(value):
                nonlocal last

                now = _scheduler.now
                span = now - last
                last = now
                return TimeInterval(value=value, interval=span)

            return source.pipe(ops.map(mapper)).subscribe(observer, scheduler_)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:13,代码来源:timeinterval.py


示例15: observable_window_with_openings

def observable_window_with_openings(self, window_openings, window_closing_mapper):

    def mapper(args):
        _, window = args
        return window

    return window_openings.pipe(
        ops.group_join(
            self,
            window_closing_mapper,
            lambda _: empty(),
            ),
        ops.map(mapper),
        )
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:14,代码来源:window.py


示例16: _pluck_attr

def _pluck_attr(prop: str) -> Callable[[Observable], Observable]:
    """Retrieves the value of a specified property (using getattr) from
    all elements in the Observable sequence.

    Args:
        property: The property to pluck.

    Returns a new Observable {Observable} sequence of property values.

    To pluck values using dict-like access (as in element[key]) on each
    element, use pluck.
    """

    return ops.map(lambda x: getattr(x, prop))
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:14,代码来源:pluck.py


示例17: test_map_throws

    def test_map_throws(self):
        mapper = map(lambda x: x)
        with self.assertRaises(RxException):
            return_value(1).pipe(
                mapper
            ).subscribe(lambda x: _raise("ex"))

        with self.assertRaises(RxException):
            throw('ex').pipe(
                mapper
            ).subscribe(on_error=lambda ex: _raise(ex))

        with self.assertRaises(RxException):
            empty().pipe(
                mapper
            ).subscribe(lambda x: x, lambda ex: ex, lambda: _raise('ex'))

        def subscribe(observer, scheduler=None):
            _raise('ex')

        with self.assertRaises(RxException):
            create(subscribe).pipe(
                map(lambda x: x)
            ).subscribe()
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:24,代码来源:test_map.py


示例18: factory

        def factory(scheduler):
            nonlocal source

            has_accumulation = [False]
            accumulation = [None]

            def projection(x):
                if has_accumulation[0]:
                    accumulation[0] = accumulator(accumulation[0], x)
                else:
                    accumulation[0] = accumulator(seed, x) if has_seed else x
                    has_accumulation[0] = True

                return accumulation[0]
            return source.pipe(ops.map(projection))
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:15,代码来源:scan.py


示例19: factory

        def factory(scheduler):
            has_accumulation = False
            accumulation = None

            def projection(x):
                nonlocal has_accumulation
                nonlocal accumulation

                if has_accumulation:
                    accumulation = accumulator(accumulation, x)
                else:
                    accumulation = accumulator(seed, x) if has_seed else x
                    has_accumulation = True

                return accumulation
            return source.pipe(ops.map(projection))
开发者ID:ReactiveX,项目名称:RxPY,代码行数:16,代码来源:scan.py


示例20: main

def main():
    scheduler = GtkScheduler()
    scrolled_window = Gtk.ScrolledWindow()

    window = Window()
    window.connect("delete-event", Gtk.main_quit)

    container = Gtk.Fixed()

    scrolled_window.add(container)
    window.add(scrolled_window)
    text = 'TIME FLIES LIKE AN ARROW'

    def on_next(info):
        label, (x, y), i = info
        container.move(label, x + i*12 + 15, y)
        label.show()

    def handle_label(label, i):
        delayer = ops.delay(i*0.100)
        mapper = ops.map(lambda xy: (label, xy, i))

        return window.mousemove.pipe(
            delayer,
            mapper,
        )

    def make_label(char):
        label = Gtk.Label(label=char)
        container.put(label, 0, 0)
        label.hide()
        return label

    mapper = ops.map(make_label)
    labeler = ops.flat_map_indexed(handle_label)

    rx.from_(text).pipe(
        mapper,
        labeler,
    ).subscribe(on_next, on_error=print, scheduler=scheduler)

    window.show_all()

    Gtk.main()
开发者ID:ReactiveX,项目名称:RxPY,代码行数:44,代码来源:timeflies_gtk.py



注:本文中的rx.operators.map函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python subjects.Subject类代码示例发布时间:2022-05-27
下一篇:
Python observable.Observable类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap