本文整理汇总了Java中io.reactivex.parallel.ParallelFlowable类的典型用法代码示例。如果您正苦于以下问题:Java ParallelFlowable类的具体用法?Java ParallelFlowable怎么用?Java ParallelFlowable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ParallelFlowable类属于io.reactivex.parallel包,在下文中一共展示了ParallelFlowable类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
public static void main(String[] args) {
int numberOfRails = 4; // can query #processors with parallelism()
ParallelFlowable
.from(Flowable.range(1, 10), numberOfRails)
.runOn(Schedulers.computation())
.map(i -> i * i)
.filter(i -> i % 3 == 0)
.sequential()
.subscribe(System.out::println);
}
开发者ID:yfain,项目名称:rxjava2,代码行数:13,代码来源:ParallelFlowableRange.java
示例2: ParallelOrderedMerge
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
ParallelOrderedMerge(ParallelFlowable<T> source,
Comparator<? super T> comparator,
boolean delayErrors, int prefetch) {
this.source = source;
this.comparator = comparator;
this.delayErrors = delayErrors;
this.prefetch = prefetch;
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:9,代码来源:ParallelOrderedMerge.java
示例3: apply
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
@Override public ParallelFlowableSubscribeProxy<T> apply(final ParallelFlowable<T> upstream) {
return new ParallelFlowableSubscribeProxy<T>() {
@Override public void subscribe(Subscriber<? super T>[] subscribers) {
new AutoDisposeParallelFlowable<>(upstream, scope()).subscribe(subscribers);
}
};
}
开发者ID:uber,项目名称:AutoDispose,代码行数:8,代码来源:ParallelFlowableScoper.java
示例4: setup
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
@Setup
public void setup() {
flowable = ParallelFlowable.from(Flowable.range(0, count)).runOn(Schedulers.computation())
.filter(v -> { Blackhole.consumeCPU(cost); return false; })
.sequential();
flowableFJ = ParallelFlowable.from(Flowable.range(0, count))
.runOn(Schedulers.from(ForkJoinPool.commonPool()))
.filter(v -> { Blackhole.consumeCPU(cost); return false; })
.sequential();
}
开发者ID:akarnokd,项目名称:akarnokd-misc,代码行数:12,代码来源:ParallelPerf.java
示例5: parallel
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
@CheckReturnValue
@Beta
public ParallelFlowable<T> parallel() {
return boxed.parallel();
}
开发者ID:aol,项目名称:cyclops,代码行数:8,代码来源:FlowableKind.java
示例6: ParallelFlowableOnAssembly
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
ParallelFlowableOnAssembly(ParallelFlowable<T> source) {
this.source = source;
this.assembled = new RxJavaAssemblyException();
}
开发者ID:akaita,项目名称:RxJava2Debug,代码行数:5,代码来源:ParallelFlowableOnAssembly.java
示例7: createParallelFlowable
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
static ParallelFlowable<Integer> createParallelFlowable() {
return Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new IOException())).parallel();
}
开发者ID:akaita,项目名称:RxJava2Debug,代码行数:4,代码来源:RxJava2AssemblyTrackingTest.java
示例8: ParallelFlowableValidator
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
ParallelFlowableValidator(ParallelFlowable<T> source, PlainConsumer<ProtocolNonConformanceException> onViolation) {
this.source = source;
this.onViolation = onViolation;
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:5,代码来源:ParallelFlowableValidator.java
示例9: ParallelSumInteger
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
ParallelSumInteger(ParallelFlowable<? extends Number> source) {
this.source = source;
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:4,代码来源:ParallelSumInteger.java
示例10: apply
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
@Override
public ParallelFlowable<Integer> apply(ParallelFlowable<T> t) {
return new ParallelSumInteger<T>(t);
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:5,代码来源:ParallelSumInteger.java
示例11: ParallelSumLong
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
ParallelSumLong(ParallelFlowable<? extends Number> source) {
this.source = source;
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:4,代码来源:ParallelSumLong.java
示例12: apply
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
@Override
public ParallelFlowable<Long> apply(ParallelFlowable<T> t) {
return new ParallelSumLong<T>(t);
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:5,代码来源:ParallelSumLong.java
示例13: ParallelSumDouble
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
ParallelSumDouble(ParallelFlowable<? extends Number> source) {
this.source = source;
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:4,代码来源:ParallelSumDouble.java
示例14: apply
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
@Override
public ParallelFlowable<Double> apply(ParallelFlowable<T> t) {
return new ParallelSumDouble<T>(t);
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:5,代码来源:ParallelSumDouble.java
示例15: subscribe
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
public void subscribe(ParallelFlowable<T> source) {
source.subscribe(subscribers);
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:4,代码来源:BasicMergeSubscription.java
示例16: parallelFlowable
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void parallelFlowable() {
ParallelFlowable<Integer> source = new ParallelFlowable<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer>[] s) {
validate(s);
s[0].onComplete();
s[0].onError(null);
s[0].onError(new IOException());
s[0].onNext(null);
s[0].onNext(1);
s[0].onSubscribe(null);
s[0].onSubscribe(new BooleanSubscription());
s[0].onSubscribe(new BooleanSubscription());
s[0].onComplete();
s[0].onNext(2);
}
@Override
public int parallelism() {
return 1;
}
};
RxJavaProtocolValidator.setOnViolationHandler(this);
Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler());
SavedHooks h = RxJavaProtocolValidator.enableAndChain();
Assert.assertTrue(RxJavaProtocolValidator.isEnabled());
try {
Flowable.just(1).publish().autoConnect().test().assertResult(1);
Flowable.empty().publish().autoConnect().test().assertResult();
Flowable.error(new IOException()).test().assertFailure(IOException.class);
ParallelFlowable<Integer> c = RxJavaPlugins.onAssembly(source);
c.subscribe(new Subscriber[] { new TestSubscriber<Integer>(0) });
Assert.assertEquals(15, errors.size());
TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class);
TestHelper.assertError(errors, 1, NullOnErrorParameterException.class);
TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class);
TestHelper.assertError(errors, 3, MultipleTerminationsException.class);
TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class);
Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException);
TestHelper.assertError(errors, 5, MultipleTerminationsException.class);
TestHelper.assertError(errors, 6, NullOnNextParameterException.class);
TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class);
TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class);
TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class);
TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class);
TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class);
TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class);
TestHelper.assertError(errors, 13, MultipleTerminationsException.class);
TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class);
} finally {
h.restore();
RxJavaProtocolValidator.setOnViolationHandler(null);
}
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:64,代码来源:RxJavaProtocolValidatorTest.java
示例17: AutoDisposeParallelFlowable
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
AutoDisposeParallelFlowable(ParallelFlowable<T> source, Maybe<?> scope) {
this.source = source;
this.scope = scope;
}
开发者ID:uber,项目名称:AutoDispose,代码行数:5,代码来源:ParallelFlowableScoper.java
示例18: orderedMerge
import io.reactivex.parallel.ParallelFlowable; //导入依赖的package包/类
/**
* Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from
* them (determined by the Comparator), allows delaying any error they may signal and sets the prefetch
* amount when requesting from these Publishers.
* @param <T> the value type of all sources
* @param source the source ParallelFlowable
* @param comparator the comparator to use for comparing items;
* it is called with the last known smallest in its first argument
* @param delayErrors if true, source errors are delayed until all sources terminate in some way
* @param prefetch the number of items to prefetch from the sources
* @return the new Flowable instance
* @since 0.17.9
*/
public static <T> Flowable<T> orderedMerge(ParallelFlowable<T> source, Comparator<? super T> comparator, boolean delayErrors, int prefetch) {
ObjectHelper.requireNonNull(comparator, "comparator is null");
ObjectHelper.requireNonNull(source, "sources is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new ParallelOrderedMerge<T>(source, comparator, delayErrors, prefetch));
}
开发者ID:akarnokd,项目名称:RxJava2Extensions,代码行数:20,代码来源:ParallelTransformers.java
注:本文中的io.reactivex.parallel.ParallelFlowable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论