在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称(OpenSource Name):akarnokd/kotlin-flow-extensions开源软件地址(OpenSource Url):https://github.com/akarnokd/kotlin-flow-extensions开源编程语言(OpenSource Language):Kotlin 100.0%开源软件介绍(OpenSource Introduction):kotlin-flow-extensionsExtensions to the Kotlin Flow library. dependencydependencies {
implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.14"
} FeaturesTable of contents
PublishSubjectMulticasts values to one or more flow collectors in a coordinated fashion, awaiting each collector to be ready to receive the next item or termination. import hu.akarnokd.kotlin.flow.*
runBlocking {
val publishSubject = PublishSubject<Int>()
val job = launch(Dispatchers.IO) {
publishSubject.collect {
println(it)
}
println("Done")
}
// wait for the collector to arrive
while (!publishSubject.hasCollectors()) {
delay(1)
}
publishSubject.emit(1)
publishSubject.complete()
job.join()
} ReplaySubjectCaches and replays some or all items to collectors. Constructors for size-bound, time-bound and both size-and-time bound
replays are available. An additional constructor with a import hu.akarnokd.kotlin.flow.*
runBlocking {
val replaySubject = ReplaySubject<Int>()
val job = launch(Dispatchers.IO) {
replaySubject.collect {
println(it)
}
println("Done")
}
// wait for the collector to arrive
while (!replaySubject.hasCollectors()) {
delay(1)
}
replaySubject.emit(1)
replaySubject.emit(2)
replaySubject.emit(3)
replaySubject.complete()
job.join()
replaySubject.collect {
println(it)
}
println("Done 2")
} BehaviorSubjectCaches the last item received and multicasts it and subsequent items (continuously) to collectors, awaiting each collector to be ready to receive the next item or termination. It is possible to set an initial value to be sent to fresh collectors via a constructor. import hu.akarnokd.kotlin.flow.*
runBlocking {
val behaviorSubject = BehaviorSubject<Int>()
behaviorSubject.emit(1)
// OR
// val behaviorSubject = BehaviorSubject<Int>(1)
val job = launch(Dispatchers.IO) {
behaviorSubject.collect {
println(it)
}
println("Done")
}
// wait for the collector to arrive
while (!behaviorSubject.hasCollectors()) {
delay(1)
}
behaviorSubject.emit(2)
behaviorSubject.emit(3)
behaviorSubject.complete()
job.join()
} Flow.flatMapDropMaps the upstream value into a import hu.akarnokd.kotlin.flow.*
range(1, 10)
.map {
delay(100)
it
}
.flatMapDrop {
range(it * 100, 5)
.map {
delay(30)
it
}
}
.assertResult(
100, 101, 102, 103, 104,
300, 301, 302, 303, 304,
500, 501, 502, 503, 504,
700, 701, 702, 703, 704,
900, 901, 902, 903, 904
) Flow.publishShares a single connection to the upstream source which can be consumed by many collectors inside a Effectively, one collector to the output Unfortunately, the suspending nature of coroutines/ Example: range(1, 5)
.publish(2) {
shared -> merge(shared.filter { it % 2 == 0 }, shared.filter { it % 2 != 0 })
}
.assertResult(1, 2, 3, 4, 5) In the example, it is known UnicastSubjectBuffers items until a single collector starts collecting items. Use Note that the subject uses an unbounded inner buffer and does not suspend its input side if the collector never arrives or can't keep up. val us = UnicastSubject()
launchIn(Dispatchers.IO) {
for (i in 1..200) {
println("Emitting $i")
us.emit(i)
delay(1)
}
emit.complete()
}
// collector arrives late for some reason
delay(100)
us.collect { println("Collecting $it") } UnicastWorkSubjectBuffers items until and inbetween a single collector is able to collect items. If the current collector cancels, the next collector will receive the subsequent items. Note that the subject uses an unbounded inner buffer and does not suspend its input side if the collector never arrives or can't keep up. val uws = UnicastWorkSubject()
generateInts(uws, 1, 15)
// prints lines 1..5
uws.take(5).collect { println(it) }
// prints lines 6..10
uws.take(5).collect { println(it) }
// prints lines 11..15
uws.take(5).collect { println(it) } concatArrayEagerLaunches all at once and emits all items from a source before items of the next are emitted. For example, given two sources, if the first is slow, the items of the second won't be emitted until the first has
finished emitting its items. This operators allows all sources to generate items in parallel but then still emit those
items in the order their respective Note that each source is consumed in an unbounded manner and thus, depending on the speed of the current source and the collector, the operator may retain items longer and may use more memory during its execution. concatArrayEager(
range(1, 5).onStart { delay(200) },
range(6, 5)
)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) Flow.concatMapEagerMaps the upstream values into [Flow]s and launches them all at once, then emits items from a source before items of the next are emitted. For example, given two inner sources, if the first is slow, the items of the second won't be emitted until the first has
finished emitting its items. This operators allows all sources to generate items in parallel but then still emit those
items in the order their respective Note that the upstream and each source is consumed in an unbounded manner and thus, depending on the speed of the current source and the collector, the operator may retain items longer and may use more memory during its execution. range(1, 5)
.concatMapEager {
range(it * 10, 5).onEach { delay(100) }
}
.assertResult(
10, 11, 12, 13, 14,
20, 21, 22, 23, 24,
30, 31, 32, 33, 34,
40, 41, 42, 43, 44,
50, 51, 52, 53, 54
) Flow.ambStarts collecting all source [Flow]s and relays the items of the first one to emit an item, cancelling the rest. amb(
range(1, 5).onStart { delay(1000) },
range(6, 5).onStart { delay(100) }
)
.assertResult(6, 7, 8, 9, 10) |
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论