本文整理汇总了Python中rx.operators.map函数的典型用法代码示例。如果您正苦于以下问题:Python map函数的具体用法?Python map怎么用?Python map使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了map函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_map_disposeinsidemapper
def test_map_disposeinsidemapper(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(on_next(100, 1), on_next(200, 2), on_next(500, 3), on_next(600, 4))
results = scheduler.create_observer()
d = SerialDisposable()
invoked = [0]
def projection(x, *args, **kw):
invoked[0] += 1
if scheduler.clock > 400:
d.dispose()
return x
d.disposable = xs.pipe(
map(projection)
).subscribe(results, scheduler)
def action(scheduler, state):
return d.dispose()
scheduler.schedule_absolute(ReactiveTest.disposed, action)
scheduler.start()
assert results.messages == [on_next(100, 1), on_next(200, 2)]
assert xs.subscriptions == [ReactiveTest.subscribe(0, 500)]
assert invoked[0] == 3
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:28,代码来源:test_map.py
示例2: open
def open(self):
scheduler = AsyncIOScheduler()
print("WebSocket opened")
# A Subject is both an observable and observer, so we can both subscribe
# to it and also feed (send) it with new values
self.subject = Subject()
# Get all distinct key up events from the input and only fire if long enough and distinct
searcher = self.subject.pipe(
ops.map(lambda x: x["term"]),
ops.filter(lambda text: len(text) > 2), # Only if the text is longer than 2 characters
ops.debounce(0.750), # Pause for 750ms
ops.distinct_until_changed(), # Only if the value has changed
ops.flat_map_latest(search_wikipedia)
)
def send_response(x):
self.write_message(x.body)
def on_error(ex):
print(ex)
searcher.subscribe(send_response, on_error, scheduler=scheduler)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:25,代码来源:autocomplete_asyncio.py
示例3: main
def main():
app = QApplication(sys.argv)
scheduler = QtScheduler(QtCore)
window = Window()
window.show()
text = 'TIME FLIES LIKE AN ARROW'
def on_next(info):
label, (x, y), i = info
label.move(x + i*12 + 15, y)
label.show()
def handle_label(label, i):
delayer = ops.delay(i * 0.100)
mapper = ops.map(lambda xy: (label, xy, i))
return window.mousemove.pipe(
delayer,
mapper,
)
labeler = ops.flat_map_indexed(handle_label)
mapper = ops.map(lambda c: QLabel(c, window))
rx.from_(text).pipe(
mapper,
labeler,
).subscribe(on_next, on_error=print, scheduler=scheduler)
sys.exit(app.exec_())
开发者ID:ReactiveX,项目名称:RxPY,代码行数:32,代码来源:timeflies_qt.py
示例4: handle_label
def handle_label(label, i):
delayer = ops.delay(i*0.100)
mapper = ops.map(lambda xy: (label, xy, i))
return window.mousemove.pipe(
delayer,
mapper,
)
开发者ID:ReactiveX,项目名称:RxPY,代码行数:8,代码来源:timeflies_gtk.py
示例5: create
def create():
def mapper(x):
return Timestamp(x.value, x.timestamp)
return xs.pipe(
ops.timestamp(),
ops.map(mapper),
)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:8,代码来源:test_timestamp.py
示例6: create
def create():
def mapper(x):
return TimeInterval(x.value, x.interval)
return xs.pipe(
ops.time_interval(),
ops.map(mapper),
)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:8,代码来源:test_timeinterval.py
示例7: factory
def factory():
def projection(x):
invoked[0] += 1
if invoked[0] == 3:
raise Exception(ex)
return x + 1
return xs.pipe(map(projection))
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:8,代码来源:test_map.py
示例8: factory
def factory():
def key_mapper(x):
key_invoked[0] += 1
return x.lower().strip()
return xs.pipe(
ops.group_by(key_mapper, lambda x: x),
ops.map(lambda g: g.key),
)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:9,代码来源:test_groupby.py
示例9: handle_label
def handle_label(label, i):
label.config(dict(borderwidth=0, padx=0, pady=0))
mapper = ops.map(lambda ev: (label, ev, i))
delayer = ops.delay(i*0.1)
return mousemove.pipe(
delayer,
mapper
)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:10,代码来源:timeflies_tkinter.py
示例10: _all
def _all(predicate: Predicate) -> Callable[[Observable], Observable]:
filtering = ops.filter(lambda v: not predicate(v))
mapping = ops.map(lambda b: not b)
some = ops.some()
return pipe(
filtering,
some,
mapping
)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:11,代码来源:all.py
示例11: _pluck
def _pluck(key: Any) -> Callable[[Observable], Observable]:
"""Retrieves the value of a specified key using dict-like access (as in
element[key]) from all elements in the Observable sequence.
Args:
key: The key to pluck.
Returns a new Observable {Observable} sequence of key values.
To pluck an attribute of each element, use pluck_attr.
"""
return ops.map(lambda x: x[key])
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:13,代码来源:pluck.py
示例12: average
def average(source: Observable) -> Observable:
"""Partially applied average operator.
Computes the average of an observable sequence of values that
are in the sequence or obtained by invoking a transform
function on each element of the input sequence if present.
Examples:
>>> res = average(source)
Args:
source: Source observable to average.
Returns:
An observable sequence containing a single element with the
average of the sequence of values.
"""
if key_mapper:
return source.pipe(
operators.map(key_mapper),
operators.average()
)
def accumulator(prev, cur):
return AverageValue(sum=prev.sum+cur, count=prev.count+1)
def mapper(s):
if s.count == 0:
raise Exception('The input sequence was empty')
return s.sum / float(s.count)
seed = AverageValue(sum=0, count=0)
return source.pipe(
operators.scan(accumulator, seed),
operators.last(),
operators.map(mapper)
)
开发者ID:ReactiveX,项目名称:RxPY,代码行数:39,代码来源:average.py
示例13: create
def create():
def mapper(xy):
x, y = xy
return "{}{}".format(x.value, y.value)
return xs.pipe(
ops.join(
ys,
lambda x: rx.timer(x.interval),
lambda y: rx.timer(y.interval).pipe(ops.flat_map(rx.throw(ex) if y.value == "tin" else rx.empty())),
),
ops.map(mapper),
)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:13,代码来源:test_join.py
示例14: subscribe
def subscribe(observer, scheduler_):
_scheduler = scheduler or scheduler_ or timeout_scheduler
last = _scheduler.now
def mapper(value):
nonlocal last
now = _scheduler.now
span = now - last
last = now
return TimeInterval(value=value, interval=span)
return source.pipe(ops.map(mapper)).subscribe(observer, scheduler_)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:13,代码来源:timeinterval.py
示例15: observable_window_with_openings
def observable_window_with_openings(self, window_openings, window_closing_mapper):
def mapper(args):
_, window = args
return window
return window_openings.pipe(
ops.group_join(
self,
window_closing_mapper,
lambda _: empty(),
),
ops.map(mapper),
)
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:14,代码来源:window.py
示例16: _pluck_attr
def _pluck_attr(prop: str) -> Callable[[Observable], Observable]:
"""Retrieves the value of a specified property (using getattr) from
all elements in the Observable sequence.
Args:
property: The property to pluck.
Returns a new Observable {Observable} sequence of property values.
To pluck values using dict-like access (as in element[key]) on each
element, use pluck.
"""
return ops.map(lambda x: getattr(x, prop))
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:14,代码来源:pluck.py
示例17: test_map_throws
def test_map_throws(self):
mapper = map(lambda x: x)
with self.assertRaises(RxException):
return_value(1).pipe(
mapper
).subscribe(lambda x: _raise("ex"))
with self.assertRaises(RxException):
throw('ex').pipe(
mapper
).subscribe(on_error=lambda ex: _raise(ex))
with self.assertRaises(RxException):
empty().pipe(
mapper
).subscribe(lambda x: x, lambda ex: ex, lambda: _raise('ex'))
def subscribe(observer, scheduler=None):
_raise('ex')
with self.assertRaises(RxException):
create(subscribe).pipe(
map(lambda x: x)
).subscribe()
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:24,代码来源:test_map.py
示例18: factory
def factory(scheduler):
nonlocal source
has_accumulation = [False]
accumulation = [None]
def projection(x):
if has_accumulation[0]:
accumulation[0] = accumulator(accumulation[0], x)
else:
accumulation[0] = accumulator(seed, x) if has_seed else x
has_accumulation[0] = True
return accumulation[0]
return source.pipe(ops.map(projection))
开发者ID:MichaelSchneeberger,项目名称:RxPY,代码行数:15,代码来源:scan.py
示例19: factory
def factory(scheduler):
has_accumulation = False
accumulation = None
def projection(x):
nonlocal has_accumulation
nonlocal accumulation
if has_accumulation:
accumulation = accumulator(accumulation, x)
else:
accumulation = accumulator(seed, x) if has_seed else x
has_accumulation = True
return accumulation
return source.pipe(ops.map(projection))
开发者ID:ReactiveX,项目名称:RxPY,代码行数:16,代码来源:scan.py
示例20: main
def main():
scheduler = GtkScheduler()
scrolled_window = Gtk.ScrolledWindow()
window = Window()
window.connect("delete-event", Gtk.main_quit)
container = Gtk.Fixed()
scrolled_window.add(container)
window.add(scrolled_window)
text = 'TIME FLIES LIKE AN ARROW'
def on_next(info):
label, (x, y), i = info
container.move(label, x + i*12 + 15, y)
label.show()
def handle_label(label, i):
delayer = ops.delay(i*0.100)
mapper = ops.map(lambda xy: (label, xy, i))
return window.mousemove.pipe(
delayer,
mapper,
)
def make_label(char):
label = Gtk.Label(label=char)
container.put(label, 0, 0)
label.hide()
return label
mapper = ops.map(make_label)
labeler = ops.flat_map_indexed(handle_label)
rx.from_(text).pipe(
mapper,
labeler,
).subscribe(on_next, on_error=print, scheduler=scheduler)
window.show_all()
Gtk.main()
开发者ID:ReactiveX,项目名称:RxPY,代码行数:44,代码来源:timeflies_gtk.py
注:本文中的rx.operators.map函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论