• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java Scheduler类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中reactor.core.scheduler.Scheduler的典型用法代码示例。如果您正苦于以下问题:Java Scheduler类的具体用法?Java Scheduler怎么用?Java Scheduler使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Scheduler类属于reactor.core.scheduler包,在下文中一共展示了Scheduler类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: readEmployeesByDescAge

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Override
public Flux<Employee> readEmployeesByDescAge() {
	Scheduler subWorker = Schedulers.newSingle("sub-thread");
	Scheduler pubWorker = Schedulers.newSingle("pub-thread");
	Supplier<Flux<Employee>> deferredTask = ()->{
		System.out.println("flux:defer task executor: "+ Thread.currentThread().getName());
		System.out.println("flux:defer task executor login: " + SecurityContextHolder.getContext().getAuthentication().getPrincipal());
		return Flux.fromIterable(employeeDaoImpl.getEmployees());
	};
	Comparator<Employee> descAge = (e1, e2) -> {
		System.out.println("flux:sort task executor: " + Thread.currentThread().getName());
		System.out.println("flux:sort task executor login: " + SecurityContextHolder.getContext().getAuthentication().getPrincipal());
		if(e1.getAge().compareTo(e2.getAge()) == 0){
			return 0;
		} else if(e1.getAge().compareTo(e2.getAge()) > 0){
			return -1;
		} else return 1;
	};
	Flux<Employee> deferred = Flux.defer(deferredTask).sort(descAge).subscribeOn(subWorker).publishOn(pubWorker);
	return deferred;
}
 
开发者ID:PacktPublishing,项目名称:Spring-5.0-Cookbook,代码行数:22,代码来源:EmployeeServiceImpl.java


示例2: sampleZipTest3

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Test
public void sampleZipTest3() throws Exception {
	int elements = 1;
	CountDownLatch latch = new CountDownLatch(elements + 1);
	EmitterProcessor<SensorData> sensorDataProcessor = EmitterProcessor.create();
	Scheduler scheduler = Schedulers.single();

	sensorDataProcessor.publishOn(scheduler)
	                   .subscribe(d -> latch.countDown(), null, latch::countDown);

	Flux.zip(Flux.just(new SensorData(2L, 12.0f)), Flux.just(new SensorData(1L, 14.0f)), this::computeMin)
	    .log("zip3")
	    .subscribe(sensorDataProcessor);

	awaitLatch(null, latch);
	scheduler.dispose();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:18,代码来源:CombinationTests.java


示例3: readEmployeesFlux

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Override
public Flux<Employee> readEmployeesFlux(int age) {
	Scheduler subWorker = Schedulers.newSingle("sub-thread");
	Scheduler pubWorker = Schedulers.newSingle("pub-thread");
	Predicate<Employee> validAge = (e) -> {
		System.out.println("flux:filter task executor: " + Thread.currentThread().getName());
		System.out.println("flux:filter task executor login: " + SecurityContextHolder.getContext().getAuthentication().getPrincipal());
		return e.getAge() > age;
	};
	Supplier<Flux<Employee>> deferredTask = ()->{
		System.out.println("flux:defer task executor: " + Thread.currentThread().getName());
		System.out.println("flux:defer task executor login: " + SecurityContextHolder.getContext().getAuthentication().getPrincipal());
		return Flux.fromIterable(employeeDaoImpl.getEmployees());
	};
	Flux<Employee> deferred = Flux.defer(deferredTask).filter(validAge).subscribeOn(subWorker).publishOn(pubWorker);
	return deferred;
}
 
开发者ID:PacktPublishing,项目名称:Spring-5.0-Cookbook,代码行数:18,代码来源:EmployeeServiceImpl.java


示例4: readEmployeesByAscLastName

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Override
public Flux<Employee> readEmployeesByAscLastName() {
	Scheduler subWorker = Schedulers.newSingle("sub-thread");
	Scheduler pubWorker = Schedulers.newSingle("pub-thread");
	Supplier<Flux<Employee>> deferredTask = ()->{
		System.out.println("flux:defer task executor: " + Thread.currentThread().getName());
		System.out.println("flux:defer task executor login: " + SecurityContextHolder.getContext().getAuthentication().getPrincipal());
		return Flux.fromIterable(employeeDaoImpl.getEmployees());
	};
	Comparator<Employee> descLName = (e1, e2) -> {
		System.out.println("flux:sort task executor: " + Thread.currentThread().getName());
		System.out.println("flux:sort task executor login: " + SecurityContextHolder.getContext().getAuthentication().getPrincipal());
		return e1.getLastName().compareTo(e2.getLastName());
	};
	Flux<Employee> deferred = Flux.defer(deferredTask).sort(descLName).subscribeOn(subWorker).publishOn(pubWorker);
	return deferred;
}
 
开发者ID:PacktPublishing,项目名称:Spring-5.0-Cookbook,代码行数:18,代码来源:EmployeeServiceImpl.java


示例5: assertNextWithSubscribeOnDirectProcessor

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Test
public void assertNextWithSubscribeOnDirectProcessor() {
	Scheduler scheduler = Schedulers.newElastic("test");
	DirectProcessor<Integer> processor = DirectProcessor.create();
	Mono<Integer> doAction = Mono.fromSupplier(() -> 22)
	                             .doOnNext(processor::onNext)
	                             .subscribeOn(scheduler);

	assertThatExceptionOfType(AssertionError.class)
			.isThrownBy(
					StepVerifier.create(processor)
					            .then(doAction::subscribe)
					            .assertNext(v -> assertThat(v).isEqualTo(23))
					            .thenCancel()
							::verify);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:17,代码来源:StepVerifierTests.java


示例6: FluxPublishOn

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
FluxPublishOn(Flux<? extends T> source,
		Scheduler scheduler,
		boolean delayError,
		int prefetch,
		int lowTide,
		Supplier<? extends Queue<T>> queueSupplier) {
	super(source);
	if (prefetch <= 0) {
		throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
	}
	this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
	this.delayError = delayError;
	this.prefetch = prefetch;
	this.lowTide = lowTide;
	this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:17,代码来源:FluxPublishOn.java


示例7: subscribeOn

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
/**
 * Run subscribe, onSubscribe and request on a specified {@link Scheduler}'s {@link Worker}.
 * As such, placing this operator anywhere in the chain will also impact the execution
 * context of onNext/onError/onComplete signals from the beginning of the chain up to
 * the next occurrence of a {@link #publishOn(Scheduler) publishOn}.
 * <p>
 * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/subscribeon1.png" alt="">
 *
 * <blockquote><pre>
 * {@code mono.subscribeOn(Schedulers.parallel()).subscribe()) }
 * </pre></blockquote>
 *
 * @param scheduler a {@link Scheduler} providing the {@link Worker} where to subscribe
 *
 * @return a {@link Flux} requesting asynchronously
 * @see #publishOn(Scheduler)
 */
public final Mono<T> subscribeOn(Scheduler scheduler) {
	if(this instanceof Callable) {
		if (this instanceof Fuseable.ScalarCallable) {
			try {
				T value = block();
				return onAssembly(new MonoSubscribeOnValue<>(value, scheduler));
			}
			catch (Throwable t) {
				//leave MonoSubscribeOnCallable defer error
			}
		}
		@SuppressWarnings("unchecked")
		Callable<T> c = (Callable<T>)this;
		return onAssembly(new MonoSubscribeOnCallable<>(c,
				scheduler));
	}
	return onAssembly(new MonoSubscribeOn<>(this, scheduler));
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:36,代码来源:Mono.java


示例8: testBufferSize1Created

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Test(timeout = 5_000)
public void testBufferSize1Created() throws Exception {
	TopicProcessor<String> broadcast = TopicProcessor.<String>builder().name("share-name").bufferSize(1).autoCancel(true).build();

	int simultaneousSubscribers = 3000;
	CountDownLatch latch = new CountDownLatch(simultaneousSubscribers);
	Scheduler scheduler = Schedulers.single();

	FluxSink<String> sink = broadcast.sink();
	Flux<String> flux = broadcast.filter(Objects::nonNull)
	                             .publishOn(scheduler)
	                             .cache(1);

	for (int i = 0; i < simultaneousSubscribers; i++) {
		flux.subscribe(s -> latch.countDown());
	}
	sink.next("data");

	assertThat(latch.await(4, TimeUnit.SECONDS))
			.overridingErrorMessage("Data not received")
			.isTrue();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:TopicProcessorTest.java


示例9: FluxBufferTimeout

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
FluxBufferTimeout(Flux<T> source,
		int maxSize,
		long timespan,
		Scheduler timer,
		Supplier<C> bufferSupplier) {
	super(source);
	if (timespan <= 0) {
		throw new IllegalArgumentException("Timeout period must be strictly positive");
	}
	if (maxSize <= 0) {
		throw new IllegalArgumentException("maxSize must be strictly positive");
	}
	this.timer = Objects.requireNonNull(timer, "Timer");
	this.timespan = timespan;
	this.batchSize = maxSize;
	this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier");
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:18,代码来源:FluxBufferTimeout.java


示例10: publishOn

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
	if (this instanceof Callable) {
		if (this instanceof Fuseable.ScalarCallable) {
			@SuppressWarnings("unchecked")
			Fuseable.ScalarCallable<T> s = (Fuseable.ScalarCallable<T>) this;
			try {
				return onAssembly(new FluxSubscribeOnValue<>(s.call(), scheduler));
			}
			catch (Exception e) {
				//leave FluxSubscribeOnCallable defer exception call
			}
		}
		@SuppressWarnings("unchecked")
		Callable<T> c = (Callable<T>)this;
		return onAssembly(new FluxSubscribeOnCallable<>(c, scheduler));
	}

	return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:Flux.java


示例11: testSubmitSession

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Test
public void testSubmitSession() throws Exception {
	FluxProcessor<Integer, Integer> processor = EmitterProcessor.create();
	AtomicInteger count = new AtomicInteger();
	CountDownLatch latch = new CountDownLatch(1);
	Scheduler scheduler = Schedulers.parallel();
	processor.publishOn(scheduler)
	         .delaySubscription(Duration.ofMillis(1000))
	         .limitRate(1)
	         .subscribe(d -> {
		         count.incrementAndGet();
		         latch.countDown();
	         });

	FluxSink<Integer> session = processor.sink();
	session.next(1);
	//System.out.println(emission);
	session.complete();

	latch.await(5, TimeUnit.SECONDS);
	Assert.assertTrue("latch : " + count, count.get() == 1);
	scheduler.dispose();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:24,代码来源:FluxProcessorTest.java


示例12: testParallelWithJava8StreamsInput

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
/**
 * https://gist.github.com/nithril/444d8373ce67f0a8b853 Contribution by Nicolas Labrot
 * @throws InterruptedException on interrupt
 */
@Test
public void testParallelWithJava8StreamsInput() throws InterruptedException {
	Scheduler supplier = Schedulers.newParallel("test-p", 2);

	int max = ThreadLocalRandom.current()
	                           .nextInt(100, 300);
	CountDownLatch countDownLatch = new CountDownLatch(max);

	Flux<Integer> worker = Flux.range(0, max)
	                                 .publishOn(asyncGroup);
	worker.parallel(2)
	      .runOn(supplier)
	      .map(v -> v)
	      .subscribe(v -> countDownLatch.countDown());

	countDownLatch.await(10, TimeUnit.SECONDS);
	Assert.assertEquals(0, countDownLatch.getCount());
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:FluxTests.java


示例13: consistentMultithreadingWithPartition

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Test
public void consistentMultithreadingWithPartition() throws InterruptedException {
	Scheduler supplier1 = Schedulers.newParallel("groupByPool", 2);
	Scheduler supplier2 = Schedulers.newParallel("partitionPool", 5);

	CountDownLatch latch = new CountDownLatch(10);

	/*Disposable c = */Flux.range(1, 10)
	                     .groupBy(n -> n % 2 == 0)
	                     .flatMap(stream -> stream.publishOn(supplier1)
	                                            .log("groupBy-" + stream.key()))
	                     .parallel(5)
	                     .runOn(supplier2)
	                     .sequential()
	                     .publishOn(asyncGroup)
	                     .log("join")
	                     .subscribe(t -> {
		                   latch.countDown();
	                   });


	latch.await(30, TimeUnit.SECONDS);
	assertThat("Not totally dispatched: " + latch.getCount(), latch.getCount() == 0);
	supplier1.dispose();
	supplier2.dispose();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:27,代码来源:FluxTests.java


示例14: crossRangePerfDefault

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Test
public void crossRangePerfDefault() {
	AssertSubscriber<Integer> ts = AssertSubscriber.create();

	Scheduler scheduler = Schedulers.fromExecutorService(exec);

	int count = 1000;

	Flux<Integer> source = Flux.range(1, count)
	                           .flatMap(v -> Flux.range(v, 2), false, 128, 32);

	source.publishOn(scheduler)
	      .subscribe(ts);

	if (!ts.await(Duration.ofSeconds(10))
	       .isTerminated()) {
		ts.cancel();
	}

	ts.assertValueCount(count * 2)
	  .assertNoError()
	  .assertComplete();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:24,代码来源:FluxPublishOnTest.java


示例15: onNextOnDisposedSchedulerThrows

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Test
public void onNextOnDisposedSchedulerThrows() {
	Scheduler scheduler = Schedulers.newSingle("onNextOnDisposedSchedulerThrows");
	scheduler.dispose();
	Mono<String> source = Mono.just("foo").hide();

	try {
		StepVerifier.create(new MonoDelayElement<>(source, 2, TimeUnit.SECONDS, scheduler))
		            .expectSubscription()
		            .verifyComplete(); //complete not relevant
		fail("expected exception here");
	}
	catch (Throwable e) {
		Throwable t = Exceptions.unwrap(e);

		assertThat(t).isEqualTo(e)
	                 .isInstanceOf(RejectedExecutionException.class)
	                 .hasMessage("Scheduler unavailable");

		assertThat(e).satisfies(Exceptions::isBubbling);
	}
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:MonoDelayElementTest.java


示例16: transformFlux

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Override
	Flux<Integer> transformFlux(Flux<Integer> f) {
		Flux<String> otherStream = Flux.just("test", "test2", "test3");
//		System.out.println("Providing new downstream");

		Scheduler asyncGroup = Schedulers.newParallel("flux-p-tck", 2);

		BiFunction<Integer, String, Integer> combinator = (t1, t2) -> t1;

		return f.publishOn(sharedGroup)
		        .parallel(2)
		        .groups()
		        .flatMap(stream -> stream.publishOn(asyncGroup)
				                          .doOnNext(this::monitorThreadUse)
				                          .scan((prev, next) -> next)
				                          .map(integer -> -integer)
				                          .filter(integer -> integer <= 0)
				                          .map(integer -> -integer)
				                          .bufferTimeout(batch, Duration.ofMillis(50))
				                          .flatMap(Flux::fromIterable)
				                          .flatMap(i -> Flux.zip(Flux.just(i), otherStream, combinator))
				 )
		        .publishOn(sharedGroup)
		        .doAfterTerminate(asyncGroup::dispose)
		        .doOnError(Throwable::printStackTrace);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:27,代码来源:FluxBlackboxProcessorVerification.java


示例17: mergeSequentialLargeUnorderedEach100

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Test
	public void mergeSequentialLargeUnorderedEach100() {
		Scheduler scheduler = Schedulers.elastic();
		AtomicBoolean comparisonFailure = new AtomicBoolean();
		long count = Flux.range(0, 500)
		                 .flatMapSequential(i -> {
			                 //ensure each pack of 100 is delayed in inverse order
			                 Duration sleep = Duration.ofMillis(600 - i % 100);
			                 return Mono.delay(sleep)
			                            .then(Mono.just(i))
			                            .subscribeOn(scheduler);
		                 })
		                 .zipWith(Flux.range(0, Integer.MAX_VALUE))
		                 .doOnNext(i -> {
			                 if (!Objects.equals(i.getT1(), i.getT2())) {
//				                 System.out.println(i);
				                 comparisonFailure.set(true);
			                 }
		                 })
		                 .count().block();

		assertEquals(500L, count);
		assertFalse(comparisonFailure.get());
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:25,代码来源:FluxMergeSequentialTest.java


示例18: elasticFlow

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Override
public Flux<Employee> elasticFlow() {
	
	Scheduler elastic = Schedulers.newElastic("elastic-runner");
	Predicate<Employee> validAge = (e) -> {
		System.out.println("filter thread " +Thread.currentThread().getName());
		return e.getAge() > 25;
	};
	Supplier<Flux<Employee>> deferredTask = ()->{
		System.out.println("defer thread "+Thread.currentThread().getName());
		return Flux.fromIterable(employeeDaoImpl.getEmployees());
	};
	Flux<Employee> deferred = Flux.defer(deferredTask).filter(validAge).subscribeOn(elastic);
	return deferred;
}
 
开发者ID:PacktPublishing,项目名称:Spring-5.0-Cookbook,代码行数:16,代码来源:EmployeeScheduledStreamServiceImpl.java


示例19: createPublisherThread

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Override
public Flux<Employee> createPublisherThread() {
	Scheduler pubWorker = Schedulers.newSingle("pub-thread");
	Predicate<Employee> validAge = (e) -> {
		System.out.println("filter thread " +Thread.currentThread().getName());
		return e.getAge() > 25;
	};
	Supplier<Flux<Employee>> deferredTask = ()->{
		System.out.println("defer thread "+Thread.currentThread().getName());
		return Flux.fromIterable(employeeDaoImpl.getEmployees());
	};
	Flux<Employee> deferred = Flux.defer(deferredTask).filter(validAge).publishOn(pubWorker);
	return deferred;
}
 
开发者ID:PacktPublishing,项目名称:Spring-5.0-Cookbook,代码行数:15,代码来源:EmployeeScheduledStreamServiceImpl.java


示例20: createSubscriberThread

import reactor.core.scheduler.Scheduler; //导入依赖的package包/类
@Override
public Flux<Employee> createSubscriberThread() {
	Scheduler subWorker = Schedulers.newSingle("sub-thread");
	Predicate<Employee> validAge = (e) -> {
		System.out.println("filter thread " +Thread.currentThread().getName());
		return e.getAge() > 25;
	};
	Supplier<Flux<Employee>> deferredTask = ()->{
		System.out.println("defer thread "+Thread.currentThread().getName());
		return Flux.fromIterable(employeeDaoImpl.getEmployees());
	};
	Flux<Employee> deferred = Flux.defer(deferredTask).filter(validAge).subscribeOn(subWorker);
	return deferred;
}
 
开发者ID:PacktPublishing,项目名称:Spring-5.0-Cookbook,代码行数:15,代码来源:EmployeeScheduledStreamServiceImpl.java



注:本文中的reactor.core.scheduler.Scheduler类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java FlagsAttribute类代码示例发布时间:2022-05-22
下一篇:
Java ConvergenceChecker类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap