本文整理汇总了Java中reactor.core.publisher.MonoProcessor类的典型用法代码示例。如果您正苦于以下问题:Java MonoProcessor类的具体用法?Java MonoProcessor怎么用?Java MonoProcessor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
MonoProcessor类属于reactor.core.publisher包,在下文中一共展示了MonoProcessor类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: monoProcessorGetEmployee
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Override
public void monoProcessorGetEmployee(Integer id) {
MonoProcessor<Integer> future = MonoProcessor.create();
Consumer<Integer> checkEmp = (rowId) ->{
if(employeeDaoImpl.getEmployee(rowId) == null){
System.out.println("Employee with id: " + rowId + " does not exists.");
}else{
System.out.println("Employee with id: " + rowId + " exists.");
}
};
Mono<Integer> engine = future
.doOnNext(checkEmp)
.doOnSuccess(emp -> {
System.out.println("Employee's age is " + employeeDaoImpl.getEmployee(emp).getAge());
System.out.println("Employee's dept is: " + employeeDaoImpl.getEmployee(emp).getDeptId());
})
.doOnTerminate((sup, ex) -> System.out.println("Transaction terminated with error: " +ex.getMessage()))
.doOnError(ex -> System.out.println("Error: " + ex.getMessage()));
engine.subscribe(System.out::println);
future.onNext(id);
int valStream = future.block();
System.out.println("Employee's ID again is: " + valStream);
}
开发者ID:PacktPublishing,项目名称:Spring-5.0-Cookbook,代码行数:27,代码来源:EmployeeHotStreamServiceImpl.java
示例2: promiseErrorCountCannotExceedOne
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void promiseErrorCountCannotExceedOne() {
MonoProcessor<Object> deferred = MonoProcessor.create();
Throwable error = new IOException("foo");
StepVerifier.create(deferred)
.then(() -> {
deferred.onError(error);
deferred.onNext(error);
})
.expectErrorMessage("foo")
.verifyThenAssertThat()
.hasDroppedExactly(error);
Assertions.assertThat(deferred.getError()).isSameAs(error);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:17,代码来源:FluxTests.java
示例3: promiseAcceptCountAndErrorCountCannotExceedOneInTotal
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void promiseAcceptCountAndErrorCountCannotExceedOneInTotal() {
MonoProcessor<Object> deferred = MonoProcessor.create();
Throwable error = new IOException("foo");
StepVerifier.create(deferred)
.then(() -> {
deferred.onError(error);
deferred.onNext("alpha");
})
.expectErrorMessage("foo")
.verifyThenAssertThat()
.hasDroppedExactly("alpha");
Assertions.assertThat(deferred.getError()).isSameAs(error);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:17,代码来源:FluxTests.java
示例4: streamValuesCanBeExploded
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void streamValuesCanBeExploded() {
// Stream"s values can be exploded
// given: "a source composable with a mapMany function"
EmitterProcessor<Integer> source = EmitterProcessor.create();
Flux<Integer> mapped = source
.log()
.publishOn(Schedulers.parallel())
.log()
.flatMap(v -> Flux.just(v * 2))
.doOnError(Throwable::printStackTrace);
// when: "the source accepts a value"
MonoProcessor<Integer> value = mapped.next()
.toProcessor();
value.subscribe();
source.sink().next(1);
// then: "the value is mapped"
int result = value.block(Duration.ofSeconds(5));
assertThat(result).isEqualTo(2);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:FluxSpecTests.java
示例5: streamCanBeCounted
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void streamCanBeCounted() {
// "Stream can be counted"
// given: "source composables to count and tap"
EmitterProcessor<Integer> source = EmitterProcessor.create();
MonoProcessor<Long> tap = source.count()
.subscribeWith(MonoProcessor.create());
// when: "the sources accept a value"
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onComplete();
// then: "the count value matches the number of accept"
assertThat(tap.peek()).isEqualTo(3);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:18,代码来源:FluxSpecTests.java
示例6: knownNumberOfValuesCanBeReduced
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void knownNumberOfValuesCanBeReduced() {
// "A known number of values can be reduced"
// given: "a composable that will accept 5 values and a reduce function"
EmitterProcessor<Integer> source = EmitterProcessor.create();
Mono<Integer> reduced = source.reduce(new Reduction());
MonoProcessor<Integer> value = reduced.subscribeWith(MonoProcessor.create());
// when: "the expected number of values is accepted"
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onNext(5);
source.onComplete();
// then: "the reduced composable holds the reduced value"
assertThat(value.peek()).isEqualTo(120);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:FluxSpecTests.java
示例7: whenKnownNumberOfValuesIsReducedOnlyFinalValueMadeAvailable
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void whenKnownNumberOfValuesIsReducedOnlyFinalValueMadeAvailable() {
// "When a known number of values is being reduced, only the final value is made available"
// given: "a composable that will accept 2 values and a reduce function"
EmitterProcessor<Integer> source = EmitterProcessor.create();
MonoProcessor<Integer> value = source.reduce(new Reduction())
.subscribeWith(MonoProcessor.create());
// when: "the first value is accepted"
source.onNext(1);
// then: "the reduced value is unknown"
assertThat(value.peek()).isNull();
// when: "the second value is accepted"
source.onNext(2);
source.onComplete();
// then: "the reduced value is known"
assertThat(value.peek()).isEqualTo(2);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:22,代码来源:FluxSpecTests.java
示例8: reduceWillAccumulateListOfAcceptedValues
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void reduceWillAccumulateListOfAcceptedValues() {
// "Reduce will accumulate a list of accepted values"
// given: "a composable"
FluxProcessor<Integer, Integer> source =
EmitterProcessor.create();
Mono<List<Integer>> reduced = source.collectList();
MonoProcessor<List<Integer>> value = reduced.toProcessor();
value.subscribe();
// when: "the first value is accepted"
source.onNext(1);
source.onComplete();
// then: "the list contains the first element"
assertThat(value.block()).containsExactly(1);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:18,代码来源:FluxSpecTests.java
示例9: whenUnknownNumberOfValuesReducedEachReductionPassedToConsumerOnWindow
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void whenUnknownNumberOfValuesReducedEachReductionPassedToConsumerOnWindow() {
// "When an unknown number of values is being reduced, each reduction is passed to a consumer on window"
// given: "a composable with a reduce function"
FluxProcessor<Integer, Integer> source =
EmitterProcessor.create();
Flux<Integer> reduced = source.window(2)
.log()
.flatMap(it -> it.log("lol")
.reduce(new Reduction()));
MonoProcessor<Integer> value = reduced.subscribeWith(MonoProcessor.create());
// when: "the first value is accepted"
source.onNext(1);
// then: "the reduction is not available"
assertThat(value.peek()).isNull();
// when: "the second value is accepted and flushed"
source.onNext(2);
// then: "the updated reduction is available"
assertThat(value.peek()).isEqualTo(2);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:25,代码来源:FluxSpecTests.java
示例10: subProtocol
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void subProtocol() throws Exception {
String protocol = "echo-v1";
AtomicReference<HandshakeInfo> infoRef = new AtomicReference<>();
MonoProcessor<Object> output = MonoProcessor.create();
client.execute(getUrl("/sub-protocol"),
new WebSocketHandler() {
@Override
public List<String> getSubProtocols() {
return Collections.singletonList(protocol);
}
@Override
public Mono<Void> handle(WebSocketSession session) {
infoRef.set(session.getHandshakeInfo());
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then();
}
})
.block(Duration.ofMillis(5000));
HandshakeInfo info = infoRef.get();
assertThat(info.getHeaders().getFirst("Upgrade"), Matchers.equalToIgnoringCase("websocket"));
assertEquals(protocol, info.getHeaders().getFirst("Sec-WebSocket-Protocol"));
assertEquals("Wrong protocol accepted", protocol, info.getSubProtocol());
assertEquals("Wrong protocol detected on the server side", protocol, output.block(Duration.ofMillis(5000)));
}
开发者ID:spring-cloud,项目名称:spring-cloud-gateway,代码行数:30,代码来源:WebSocketIntegrationTests.java
示例11: customHeader
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void customHeader() throws Exception {
HttpHeaders headers = new HttpHeaders();
headers.add("my-header", "my-value");
MonoProcessor<Object> output = MonoProcessor.create();
client.execute(getUrl("/custom-header"), headers,
session -> session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then())
.block(Duration.ofMillis(5000));
assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000)));
}
开发者ID:spring-cloud,项目名称:spring-cloud-gateway,代码行数:16,代码来源:WebSocketIntegrationTests.java
示例12: testMono
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void testMono() throws Exception {
MonoProcessor<String> promise = MonoProcessor.create();
promise.onNext("test");
final CountDownLatch successCountDownLatch = new CountDownLatch(1);
promise.subscribe(v -> successCountDownLatch.countDown());
assertThat("Failed", successCountDownLatch.await(10, TimeUnit.SECONDS));
}
开发者ID:reactor,项目名称:reactor-core,代码行数:9,代码来源:MonoTests.java
示例13: promiseAcceptCountCannotExceedOne
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void promiseAcceptCountCannotExceedOne() {
MonoProcessor<Object> deferred = MonoProcessor.create();
deferred.onNext("alpha");
try {
deferred.onNext("bravo");
}
catch (Exception e) {
if(!Exceptions.isCancel(e)) {
throw e;
}
}
assertEquals(deferred.block(), "alpha");
}
开发者ID:reactor,项目名称:reactor-core,代码行数:15,代码来源:FluxTests.java
示例14: fluxCanBeEnforcedToDispatchValuesDistinctFromPredecessors
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void fluxCanBeEnforcedToDispatchValuesDistinctFromPredecessors() {
// "A Flux can be enforced to dispatch values distinct from their immediate predecessors"
// given:"a composable with values 1 to 3 with duplicates"
Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 1, 2, 2, 3));
// when:"the values are filtered and result is collected"
MonoProcessor<List<Integer>> tap = s.distinctUntilChanged()
.collectList()
.toProcessor();
tap.subscribe();
// then:"collected must remove duplicates"
assertThat(tap.block()).containsExactly(1, 2, 3);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:16,代码来源:FluxSpecTests.java
示例15: fluxCanBeEnforcedToDispatchValuesWithKeysDistinctFromPredecessors
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void fluxCanBeEnforcedToDispatchValuesWithKeysDistinctFromPredecessors() {
// "A Flux can be enforced to dispatch values with keys distinct from their immediate predecessors keys"
// given:"a composable with values 1 to 5 with duplicate keys"
Flux<Integer> s = Flux.fromIterable(Arrays.asList(2, 4, 3, 5, 2, 5));
// when:"the values are filtered and result is collected"
MonoProcessor<List<Integer>> tap = s.distinctUntilChanged(it -> it % 2 == 0)
.collectList()
.toProcessor();
// then:"collected must remove duplicates"
assertThat(tap.block()).containsExactly(2, 3, 2, 5);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:15,代码来源:FluxSpecTests.java
示例16: fluxCanBeEnforcedToDispatchDistinctValues
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void fluxCanBeEnforcedToDispatchDistinctValues() {
// "A Flux can be enforced to dispatch distinct values"
// given:"a composable with values 1 to 4 with duplicates"
Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));
// when:"the values are filtered and result is collected"
MonoProcessor<List<Integer>> tap = s.distinct()
.collectList()
.toProcessor();
tap.subscribe();
// then:"collected should be without duplicates"
assertThat(tap.block()).containsExactly(1, 2, 3, 4);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:16,代码来源:FluxSpecTests.java
示例17: fluxCanBeEnforcedToDispatchValuesHavingDistinctKeys
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void fluxCanBeEnforcedToDispatchValuesHavingDistinctKeys() {
// "A Flux can be enforced to dispatch values having distinct keys"
// given: "a composable with values 1 to 4 with duplicate keys"
Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));
// when: "the values are filtered and result is collected"
MonoProcessor<List<Integer>> tap = s.distinct(it -> it % 3)
.collectList()
.toProcessor();
tap.subscribe();
// then: "collected should be without duplicates"
assertThat(tap.block()).containsExactly(1, 2, 3);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:16,代码来源:FluxSpecTests.java
示例18: send
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Override
public Mono<Void> send(Flux<?> flux) {
MonoProcessor<Void> sendResult = MonoProcessor.create();
// add error handling and reconnect in the event of an error
this.disposable = flux
.doOnError(e -> this.log.error("Error during processing: ", e))
.retry()
.subscribe(
this.consumer,
sendResult::onError,
sendResult::onComplete);
return sendResult;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:14,代码来源:DefaultFluxSender.java
示例19: sendFileSecure
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
@Test
public void sendFileSecure()
throws CertificateException, SSLException, InterruptedException, URISyntaxException {
Path largeFile = Paths.get(getClass().getResource("/largeFile.txt").toURI());
SelfSignedCertificate ssc = new SelfSignedCertificate();
SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
SslContext sslClient = SslContextBuilder.forClient().trustManager(ssc.cert()).build();
NettyContext context =
TcpServer.create(opt -> opt.sslContext(sslServer))
.newHandler((in, out) ->
in.receive()
.asString()
.flatMap(word -> "GOGOGO".equals(word) ?
out.sendFile(largeFile).then() :
out.sendString(Mono.just("NOPE"))
)
)
.block();
MonoProcessor<String> m1 = MonoProcessor.create();
MonoProcessor<String> m2 = MonoProcessor.create();
NettyContext client1 =
TcpClient.create(opt -> opt.port(context.address().getPort())
.sslContext(sslClient))
.newHandler((in, out) -> {
in.receive()
.asString()
.log("-----------------CLIENT1")
.subscribe(m1::onNext);
return out.sendString(Mono.just("gogogo"))
.neverComplete();
})
.block();
NettyContext client2 =
TcpClient.create(opt -> opt.port(context.address().getPort())
.sslContext(sslClient))
.newHandler((in, out) -> {
in.receive()
.asString(StandardCharsets.UTF_8)
.take(2)
.reduceWith(String::new, String::concat)
.log("-----------------CLIENT2")
.subscribe(m2::onNext);
return out.sendString(Mono.just("GOGOGO"))
.neverComplete();
})
.block();
String client1Response = m1.block();
String client2Response = m2.block();
client1.dispose();
client1.onClose().block();
client2.dispose();
client2.onClose().block();
context.dispose();
context.onClose().block();
Assertions.assertThat(client1Response).isEqualTo("NOPE");
Assertions.assertThat(client2Response)
.startsWith("This is an UTF-8 file that is larger than 1024 bytes. " + "It contains accents like é.")
.contains("1024 mark here ->")
.contains("<- 1024 mark here")
.endsWith("End of File");
}
开发者ID:reactor,项目名称:reactor-netty,代码行数:74,代码来源:TcpServerTests.java
示例20: assertSendFile
import reactor.core.publisher.MonoProcessor; //导入依赖的package包/类
private void assertSendFile(Function<NettyOutbound, NettyOutbound> fn)
throws InterruptedException {
NettyContext context =
TcpServer.create()
.newHandler((in, out) ->
in.receive()
.asString()
.flatMap(word -> "GOGOGO".equals(word) ?
fn.apply(out).then() :
out.sendString(Mono.just("NOPE"))
)
)
.block();
MonoProcessor<String> m1 = MonoProcessor.create();
MonoProcessor<String> m2 = MonoProcessor.create();
NettyContext client1 =
TcpClient.create(opt -> opt.port(context.address().getPort()))
.newHandler((in, out) -> {
in.receive()
.asString()
.log("-----------------CLIENT1")
.subscribe(m1::onNext);
return out.sendString(Mono.just("gogogo"))
.neverComplete();
})
.block();
NettyContext client2 =
TcpClient.create(opt -> opt.port(context.address().getPort()))
.newHandler((in, out) -> {
in.receive()
.asString(StandardCharsets.UTF_8)
.take(2)
.reduceWith(String::new, String::concat)
.log("-----------------CLIENT2")
.subscribe(m2::onNext);
return out.sendString(Mono.just("GOGOGO"))
.neverComplete();
})
.block();
String client1Response = m1.block();
String client2Response = m2.block();
client1.dispose();
client1.onClose().block();
client2.dispose();
client2.onClose().block();
context.dispose();
context.onClose().block();
Assertions.assertThat(client1Response).isEqualTo("NOPE");
Assertions.assertThat(client2Response)
.startsWith("This is an UTF-8 file that is larger than 1024 bytes. " + "It contains accents like é.")
.contains("1024 mark here ->")
.contains("<- 1024 mark here")
.endsWith("End of File");
}
开发者ID:reactor,项目名称:reactor-netty,代码行数:69,代码来源:TcpServerTests.java
注:本文中的reactor.core.publisher.MonoProcessor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论