• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java MonoProcessor类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中reactor.core.publisher.MonoProcessor的典型用法代码示例。如果您正苦于以下问题:Java MonoProcessor类的具体用法?Java MonoProcessor怎么用?Java MonoProcessor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



MonoProcessor类属于reactor.core.publisher包,在下文中一共展示了MonoProcessor类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: monoProcessorGetEmployee

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Override
public void monoProcessorGetEmployee(Integer id) {
	MonoProcessor<Integer> future = MonoProcessor.create();
	Consumer<Integer> checkEmp = (rowId) ->{
		if(employeeDaoImpl.getEmployee(rowId) == null){
			System.out.println("Employee with id: " + rowId + " does not exists.");
		}else{
			System.out.println("Employee with id: " + rowId + " exists.");
		}
	};
	
	Mono<Integer> engine = future
		    .doOnNext(checkEmp)
	     	.doOnSuccess(emp -> {
				System.out.println("Employee's age is " + employeeDaoImpl.getEmployee(emp).getAge());
				System.out.println("Employee's dept is: " + employeeDaoImpl.getEmployee(emp).getDeptId());
			})
	        .doOnTerminate((sup, ex) -> System.out.println("Transaction terminated with error: " +ex.getMessage()))
	        .doOnError(ex -> System.out.println("Error: " + ex.getMessage()));
	
	engine.subscribe(System.out::println);
	
	future.onNext(id);
	int valStream = future.block();
	System.out.println("Employee's ID again is: " + valStream);
}
 
开发者ID:PacktPublishing,项目名称:Spring-5.0-Cookbook,代码行数:27,代码来源:EmployeeHotStreamServiceImpl.java


示例2: promiseErrorCountCannotExceedOne

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void promiseErrorCountCannotExceedOne() {
	MonoProcessor<Object> deferred = MonoProcessor.create();
	Throwable error = new IOException("foo");

	StepVerifier.create(deferred)
	            .then(() -> {
		            deferred.onError(error);
		            deferred.onNext(error);
	            })
	            .expectErrorMessage("foo")
	            .verifyThenAssertThat()
	            .hasDroppedExactly(error);

	Assertions.assertThat(deferred.getError()).isSameAs(error);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:17,代码来源:FluxTests.java


示例3: promiseAcceptCountAndErrorCountCannotExceedOneInTotal

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void promiseAcceptCountAndErrorCountCannotExceedOneInTotal() {
	MonoProcessor<Object> deferred = MonoProcessor.create();
	Throwable error = new IOException("foo");

	StepVerifier.create(deferred)
	            .then(() -> {
		            deferred.onError(error);
		            deferred.onNext("alpha");
	            })
	            .expectErrorMessage("foo")
	            .verifyThenAssertThat()
	            .hasDroppedExactly("alpha");

	Assertions.assertThat(deferred.getError()).isSameAs(error);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:17,代码来源:FluxTests.java


示例4: streamValuesCanBeExploded

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void streamValuesCanBeExploded() {
//		Stream"s values can be exploded
//			given: "a source composable with a mapMany function"
		EmitterProcessor<Integer> source = EmitterProcessor.create();
		Flux<Integer> mapped = source
				.log()
				.publishOn(Schedulers.parallel())
				.log()
				.flatMap(v -> Flux.just(v * 2))
				.doOnError(Throwable::printStackTrace);

//			when: "the source accepts a value"
		MonoProcessor<Integer> value = mapped.next()
		                                     .toProcessor();
		value.subscribe();
		source.sink().next(1);

//		then: "the value is mapped"
		int result = value.block(Duration.ofSeconds(5));
		assertThat(result).isEqualTo(2);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:FluxSpecTests.java


示例5: streamCanBeCounted

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void streamCanBeCounted() {
//		"Stream can be counted"
//		given: "source composables to count and tap"
		EmitterProcessor<Integer> source = EmitterProcessor.create();
		MonoProcessor<Long> tap = source.count()
		                                .subscribeWith(MonoProcessor.create());

//		when: "the sources accept a value"
		source.onNext(1);
		source.onNext(2);
		source.onNext(3);
		source.onComplete();

//		then: "the count value matches the number of accept"
		assertThat(tap.peek()).isEqualTo(3);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:18,代码来源:FluxSpecTests.java


示例6: knownNumberOfValuesCanBeReduced

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void knownNumberOfValuesCanBeReduced() {
//		"A known number of values can be reduced"
//		given: "a composable that will accept 5 values and a reduce function"
		EmitterProcessor<Integer> source = EmitterProcessor.create();
		Mono<Integer> reduced = source.reduce(new Reduction());
		MonoProcessor<Integer> value = reduced.subscribeWith(MonoProcessor.create());

//		when: "the expected number of values is accepted"
		source.onNext(1);
		source.onNext(2);
		source.onNext(3);
		source.onNext(4);
		source.onNext(5);
		source.onComplete();

//		then: "the reduced composable holds the reduced value"
		assertThat(value.peek()).isEqualTo(120);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:FluxSpecTests.java


示例7: whenKnownNumberOfValuesIsReducedOnlyFinalValueMadeAvailable

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void whenKnownNumberOfValuesIsReducedOnlyFinalValueMadeAvailable() {
//		"When a known number of values is being reduced, only the final value is made available"
//		given: "a composable that will accept 2 values and a reduce function"
		EmitterProcessor<Integer> source = EmitterProcessor.create();
		MonoProcessor<Integer> value = source.reduce(new Reduction())
		                                     .subscribeWith(MonoProcessor.create());

//		when: "the first value is accepted"
		source.onNext(1);

//		then: "the reduced value is unknown"
		assertThat(value.peek()).isNull();

//		when: "the second value is accepted"
		source.onNext(2);
		source.onComplete();

//		then: "the reduced value is known"
		assertThat(value.peek()).isEqualTo(2);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:22,代码来源:FluxSpecTests.java


示例8: reduceWillAccumulateListOfAcceptedValues

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void reduceWillAccumulateListOfAcceptedValues() {
//		"Reduce will accumulate a list of accepted values"
//		given: "a composable"
		FluxProcessor<Integer, Integer> source =
				EmitterProcessor.create();
		Mono<List<Integer>> reduced = source.collectList();
		MonoProcessor<List<Integer>> value = reduced.toProcessor();
		value.subscribe();

//		when: "the first value is accepted"
		source.onNext(1);
		source.onComplete();

//		then: "the list contains the first element"
		assertThat(value.block()).containsExactly(1);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:18,代码来源:FluxSpecTests.java


示例9: whenUnknownNumberOfValuesReducedEachReductionPassedToConsumerOnWindow

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void whenUnknownNumberOfValuesReducedEachReductionPassedToConsumerOnWindow() {
//		"When an unknown number of values is being reduced, each reduction is passed to a consumer on window"
//		given: "a composable with a reduce function"
		FluxProcessor<Integer, Integer> source =
				EmitterProcessor.create();
		Flux<Integer> reduced = source.window(2)
		                              .log()
		                              .flatMap(it -> it.log("lol")
		                                               .reduce(new Reduction()));
		MonoProcessor<Integer> value = reduced.subscribeWith(MonoProcessor.create());

//		when: "the first value is accepted"
		source.onNext(1);

//		then: "the reduction is not available"
		assertThat(value.peek()).isNull();

//		when: "the second value is accepted and flushed"
		source.onNext(2);

//		then: "the updated reduction is available"
		assertThat(value.peek()).isEqualTo(2);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:25,代码来源:FluxSpecTests.java


示例10: subProtocol

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void subProtocol() throws Exception {
	String protocol = "echo-v1";
	AtomicReference<HandshakeInfo> infoRef = new AtomicReference<>();
	MonoProcessor<Object> output = MonoProcessor.create();

	client.execute(getUrl("/sub-protocol"),
			new WebSocketHandler() {
				@Override
				public List<String> getSubProtocols() {
					return Collections.singletonList(protocol);
				}
				@Override
				public Mono<Void> handle(WebSocketSession session) {
					infoRef.set(session.getHandshakeInfo());
					return session.receive()
							.map(WebSocketMessage::getPayloadAsText)
							.subscribeWith(output)
							.then();
				}
			})
			.block(Duration.ofMillis(5000));

	HandshakeInfo info = infoRef.get();
	assertThat(info.getHeaders().getFirst("Upgrade"), Matchers.equalToIgnoringCase("websocket"));
	assertEquals(protocol, info.getHeaders().getFirst("Sec-WebSocket-Protocol"));
	assertEquals("Wrong protocol accepted", protocol, info.getSubProtocol());
	assertEquals("Wrong protocol detected on the server side", protocol, output.block(Duration.ofMillis(5000)));
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-gateway,代码行数:30,代码来源:WebSocketIntegrationTests.java


示例11: customHeader

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void customHeader() throws Exception {
	HttpHeaders headers = new HttpHeaders();
	headers.add("my-header", "my-value");
	MonoProcessor<Object> output = MonoProcessor.create();

	client.execute(getUrl("/custom-header"), headers,
			session -> session.receive()
					.map(WebSocketMessage::getPayloadAsText)
					.subscribeWith(output)
					.then())
			.block(Duration.ofMillis(5000));

	assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000)));
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-gateway,代码行数:16,代码来源:WebSocketIntegrationTests.java


示例12: testMono

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void testMono() throws Exception {
	MonoProcessor<String> promise = MonoProcessor.create();
	promise.onNext("test");
	final CountDownLatch successCountDownLatch = new CountDownLatch(1);
	promise.subscribe(v -> successCountDownLatch.countDown());
	assertThat("Failed", successCountDownLatch.await(10, TimeUnit.SECONDS));
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:9,代码来源:MonoTests.java


示例13: promiseAcceptCountCannotExceedOne

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void promiseAcceptCountCannotExceedOne() {
	MonoProcessor<Object> deferred = MonoProcessor.create();
	deferred.onNext("alpha");
	try {
		deferred.onNext("bravo");
	}
	catch (Exception e) {
		if(!Exceptions.isCancel(e)) {
			throw e;
		}
	}
	assertEquals(deferred.block(), "alpha");
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:15,代码来源:FluxTests.java


示例14: fluxCanBeEnforcedToDispatchValuesDistinctFromPredecessors

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void fluxCanBeEnforcedToDispatchValuesDistinctFromPredecessors() {
//		"A Flux can be enforced to dispatch values distinct from their immediate predecessors"
//		given:"a composable with values 1 to 3 with duplicates"
		Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 1, 2, 2, 3));

//		when:"the values are filtered and result is collected"
		MonoProcessor<List<Integer>> tap = s.distinctUntilChanged()
		                                    .collectList()
		                                    .toProcessor();
		tap.subscribe();

//		then:"collected must remove duplicates"
		assertThat(tap.block()).containsExactly(1, 2, 3);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:16,代码来源:FluxSpecTests.java


示例15: fluxCanBeEnforcedToDispatchValuesWithKeysDistinctFromPredecessors

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void fluxCanBeEnforcedToDispatchValuesWithKeysDistinctFromPredecessors() {
//		"A Flux can be enforced to dispatch values with keys distinct from their immediate predecessors keys"
//		given:"a composable with values 1 to 5 with duplicate keys"
		Flux<Integer> s = Flux.fromIterable(Arrays.asList(2, 4, 3, 5, 2, 5));

//		when:"the values are filtered and result is collected"
		MonoProcessor<List<Integer>> tap = s.distinctUntilChanged(it -> it % 2 == 0)
		                                    .collectList()
		                                    .toProcessor();

//		then:"collected must remove duplicates"
		assertThat(tap.block()).containsExactly(2, 3, 2, 5);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:15,代码来源:FluxSpecTests.java


示例16: fluxCanBeEnforcedToDispatchDistinctValues

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void fluxCanBeEnforcedToDispatchDistinctValues() {
//		"A Flux can be enforced to dispatch distinct values"
//		given:"a composable with values 1 to 4 with duplicates"
		Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));

//		when:"the values are filtered and result is collected"
		MonoProcessor<List<Integer>> tap = s.distinct()
		                                    .collectList()
		                                    .toProcessor();
		tap.subscribe();

//		then:"collected should be without duplicates"
		assertThat(tap.block()).containsExactly(1, 2, 3, 4);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:16,代码来源:FluxSpecTests.java


示例17: fluxCanBeEnforcedToDispatchValuesHavingDistinctKeys

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
	public void fluxCanBeEnforcedToDispatchValuesHavingDistinctKeys() {
//		"A Flux can be enforced to dispatch values having distinct keys"
//		given: "a composable with values 1 to 4 with duplicate keys"
		Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));

//		when: "the values are filtered and result is collected"
		MonoProcessor<List<Integer>> tap = s.distinct(it -> it % 3)
		                                    .collectList()
		                                    .toProcessor();
		tap.subscribe();

//		then: "collected should be without duplicates"
		assertThat(tap.block()).containsExactly(1, 2, 3);
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:16,代码来源:FluxSpecTests.java


示例18: send

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Override
public Mono<Void> send(Flux<?> flux) {
	MonoProcessor<Void> sendResult = MonoProcessor.create();
	// add error handling and reconnect in the event of an error
	this.disposable = flux
			.doOnError(e -> this.log.error("Error during processing: ", e))
			.retry()
			.subscribe(
					this.consumer,
					sendResult::onError,
					sendResult::onComplete);
	return sendResult;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:14,代码来源:DefaultFluxSender.java


示例19: sendFileSecure

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void sendFileSecure()
		throws CertificateException, SSLException, InterruptedException, URISyntaxException {
	Path largeFile = Paths.get(getClass().getResource("/largeFile.txt").toURI());
	SelfSignedCertificate ssc = new SelfSignedCertificate();
	SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
	SslContext sslClient = SslContextBuilder.forClient().trustManager(ssc.cert()).build();

	NettyContext context =
			TcpServer.create(opt -> opt.sslContext(sslServer))
			         .newHandler((in, out) ->
					         in.receive()
					           .asString()
					           .flatMap(word -> "GOGOGO".equals(word) ?
							           out.sendFile(largeFile).then() :
							           out.sendString(Mono.just("NOPE"))
					           )
			         )
			         .block();

	MonoProcessor<String> m1 = MonoProcessor.create();
	MonoProcessor<String> m2 = MonoProcessor.create();

	NettyContext client1 =
			TcpClient.create(opt -> opt.port(context.address().getPort())
			                           .sslContext(sslClient))
			         .newHandler((in, out) -> {
				         in.receive()
				           .asString()
				           .log("-----------------CLIENT1")
				           .subscribe(m1::onNext);

				         return out.sendString(Mono.just("gogogo"))
						         .neverComplete();
			         })
			         .block();

	NettyContext client2 =
			TcpClient.create(opt -> opt.port(context.address().getPort())
			                           .sslContext(sslClient))
			         .newHandler((in, out) -> {
				         in.receive()
				           .asString(StandardCharsets.UTF_8)
				           .take(2)
				           .reduceWith(String::new, String::concat)
				           .log("-----------------CLIENT2")
				           .subscribe(m2::onNext);

				         return out.sendString(Mono.just("GOGOGO"))
				                   .neverComplete();
			         })
			         .block();

	String client1Response = m1.block();
	String client2Response = m2.block();

	client1.dispose();
	client1.onClose().block();

	client2.dispose();
	client2.onClose().block();

	context.dispose();
	context.onClose().block();

	Assertions.assertThat(client1Response).isEqualTo("NOPE");

	Assertions.assertThat(client2Response)
	          .startsWith("This is an UTF-8 file that is larger than 1024 bytes. " + "It contains accents like é.")
	          .contains("1024 mark here ->")
	          .contains("<- 1024 mark here")
	          .endsWith("End of File");
}
 
开发者ID:reactor,项目名称:reactor-netty,代码行数:74,代码来源:TcpServerTests.java


示例20: assertSendFile

import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
private void assertSendFile(Function<NettyOutbound, NettyOutbound> fn)
		throws InterruptedException {



	NettyContext context =
			TcpServer.create()
			         .newHandler((in, out) ->
					         in.receive()
					           .asString()
					           .flatMap(word -> "GOGOGO".equals(word) ?
							           fn.apply(out).then() :
							           out.sendString(Mono.just("NOPE"))
					           )
			         )
			         .block();

	MonoProcessor<String> m1 = MonoProcessor.create();
	MonoProcessor<String> m2 = MonoProcessor.create();

	NettyContext client1 =
			TcpClient.create(opt -> opt.port(context.address().getPort()))
			         .newHandler((in, out) -> {
				         in.receive()
				           .asString()
				           .log("-----------------CLIENT1")
				           .subscribe(m1::onNext);

				         return out.sendString(Mono.just("gogogo"))
				                   .neverComplete();
			         })
			         .block();

	NettyContext client2 =
			TcpClient.create(opt -> opt.port(context.address().getPort()))
			         .newHandler((in, out) -> {
				         in.receive()
				           .asString(StandardCharsets.UTF_8)
				           .take(2)
				           .reduceWith(String::new, String::concat)
				           .log("-----------------CLIENT2")
				           .subscribe(m2::onNext);

				         return out.sendString(Mono.just("GOGOGO"))
				                   .neverComplete();
			         })
			         .block();

	String client1Response = m1.block();
	String client2Response = m2.block();

	client1.dispose();
	client1.onClose().block();

	client2.dispose();
	client2.onClose().block();

	context.dispose();
	context.onClose().block();

	Assertions.assertThat(client1Response).isEqualTo("NOPE");

	Assertions.assertThat(client2Response)
	          .startsWith("This is an UTF-8 file that is larger than 1024 bytes. " + "It contains accents like é.")
	          .contains("1024 mark here ->")
	          .contains("<- 1024 mark here")
	          .endsWith("End of File");
}
 
开发者ID:reactor,项目名称:reactor-netty,代码行数:69,代码来源:TcpServerTests.java



注:本文中的reactor.core.publisher.MonoProcessor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java AnnotationsList类代码示例发布时间:2022-05-22
下一篇:
Java Get类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap