本文整理汇总了Java中reactor.core.publisher.Operators类的典型用法代码示例。如果您正苦于以下问题:Java Operators类的具体用法?Java Operators怎么用?Java Operators使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Operators类属于reactor.core.publisher包,在下文中一共展示了Operators类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: manyToOne
import reactor.core.publisher.Operators; //导入依赖的package包/类
/**
* Implements a stream -> unary call as {@link Flux} -> {@link Mono}, where the client transits a stream of
* messages.
*/
public static <TRequest, TResponse> Mono<TResponse> manyToOne(
Flux<TRequest> rxRequest,
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) {
try {
return Mono
.<TResponse>create(emitter -> {
ReactiveProducerStreamObserver<TRequest, TResponse> reactiveProducerStreamObserver = new ReactiveProducerStreamObserver<>(
rxRequest,
emitter::success,
emitter::error,
Runnables.doNothing());
delegate.apply(
new CancellableStreamObserver<>(reactiveProducerStreamObserver,
reactiveProducerStreamObserver::cancel));
reactiveProducerStreamObserver.rxSubscribe();
})
.transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
} catch (Throwable throwable) {
return Mono.error(throwable);
}
}
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:26,代码来源:ClientCalls.java
示例2: timed
import reactor.core.publisher.Operators; //导入依赖的package包/类
public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> timed(MeterRegistry registry, String name, Iterable<Tag> tags) {
Counter success = Counter.builder(name + ".request")
.tags("status", "success")
.tags(tags)
.register(registry);
Counter error = Counter.builder(name + ".request")
.tags("status", "error")
.tags(tags)
.register(registry);
Counter cancelled = Counter.builder(name + ".request")
.tags("status", "cancelled")
.tags(tags)
.register(registry);
Timer timer = Timer.builder(name + ".latency")
.publishPercentileHistogram()
.tags(tags)
.register(registry);
return Operators.lift((scannable, subscriber) ->
new ProteusMetricsSubscriber<>(subscriber, success, error, cancelled, timer)
);
}
开发者ID:netifi,项目名称:proteus-java,代码行数:23,代码来源:ProteusMetrics.java
示例3: normalRequest
import reactor.core.publisher.Operators; //导入依赖的package包/类
protected final void normalRequest(long n) {
Subscription a = s;
if (a != null) {
a.request(n);
} else {
Operators.addCap(REQUESTED, this, n);
a = s;
if (a != null) {
long r = REQUESTED.getAndSet(this, 0L);
if (r != 0L) {
a.request(r);
}
}
}
}
开发者ID:apptik,项目名称:RHub,代码行数:19,代码来源:AssertSubscriber.java
示例4: setWithoutRequesting
import reactor.core.publisher.Operators; //导入依赖的package包/类
/**
* Sets the Subscription once but does not request anything.
* @param s the Subscription to push
* @return true if successful, false if the current subscription is not null
*/
protected final boolean setWithoutRequesting(Subscription s) {
Objects.requireNonNull(s, "s");
for (;;) {
Subscription a = this.s;
if (a == Operators.cancelledSubscription()) {
s.cancel();
return false;
}
if (a != null) {
s.cancel();
Operators.reportSubscriptionSet();
return false;
}
if (S.compareAndSet(this, null, s)) {
return true;
}
}
}
开发者ID:apptik,项目名称:RHub,代码行数:25,代码来源:AssertSubscriber.java
示例5: subscribe
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Override
public final void subscribe(final CoreSubscriber<? super Void> s) {
if(future.isDone()){
if(future.isSuccess()){
Operators.complete(s);
}
else{
Operators.error(s, future.cause());
}
return;
}
FutureSubscription<F> fs = new FutureSubscription<>(future, s);
s.onSubscribe(fs);
future.addListener(fs);
}
开发者ID:reactor,项目名称:reactor-netty,代码行数:17,代码来源:FutureMono.java
示例6: onHandlerTerminate
import reactor.core.publisher.Operators; //导入依赖的package包/类
/**
* Final release/close (last packet)
*/
protected final void onHandlerTerminate() {
if (replace(null)) {
if(log.isTraceEnabled()){
log.trace("{} Disposing ChannelOperation from a channel", channel(), new Exception
("ChannelOperation terminal stack"));
}
try {
Operators.terminate(OUTBOUND_CLOSE, this);
onInactive.onComplete(); //signal senders and other interests
// Do not call directly inbound.onInboundComplete()
// HttpClientOperations need to notify with error
// when there is no response state
onInboundComplete();
}
finally {
channel.pipeline()
.fireUserEventTriggered(NettyPipeline.handlerTerminatedEvent());
}
}
}
开发者ID:reactor,项目名称:reactor-netty,代码行数:24,代码来源:ChannelOperations.java
示例7: request
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Override
public void request(long n) {
if (Operators.validate(n)) {
for (; ; ) {
int s = state;
if (s == SDS_HAS_REQUEST_NO_VALUE || s == SDS_HAS_REQUEST_HAS_VALUE) {
return;
}
if (s == SDS_NO_REQUEST_HAS_VALUE) {
if (STATE.compareAndSet(this, SDS_NO_REQUEST_HAS_VALUE, SDS_HAS_REQUEST_HAS_VALUE)) {
Subscriber<? super O> a = subscriber;
a.onNext(value);
a.onComplete();
}
return;
}
if (STATE.compareAndSet(this, SDS_NO_REQUEST_NO_VALUE, SDS_HAS_REQUEST_NO_VALUE)) {
return;
}
}
}
}
开发者ID:akarnokd,项目名称:akarnokd-misc,代码行数:23,代码来源:DeferredScalarSubscriber.java
示例8: updateRequested
import reactor.core.publisher.Operators; //导入依赖的package包/类
private void updateRequested(Event<?> event) {
RequestEvent requestEvent = null;
if (event instanceof RequestEvent) requestEvent = (RequestEvent) event;
else if (event instanceof SubscriptionTaskEvent) {
SubscriptionTaskEvent ste = (SubscriptionTaskEvent) event;
if (ste.delegate instanceof RequestEvent) {
requestEvent = (RequestEvent) ste.delegate;
}
}
if (requestEvent == null) {
return;
}
else if (requestEvent.isBounded()) {
Operators.addCap(REQUESTED, this, requestEvent.getRequestAmount());
}
else {
REQUESTED.set(this, Long.MAX_VALUE);
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:22,代码来源:DefaultStepVerifierBuilder.java
示例9: assertDroppedElementsAllPass
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedElementsAllPass() {
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onNext("foo");
s.onComplete();
s.onNext("bar");
s.onNext("baz");
}).take(3))
.expectNext("foo")
.expectComplete()
.verifyThenAssertThat()
.hasDroppedElements()
.hasDropped("baz")
.hasDroppedExactly("baz", "bar");
}
开发者ID:reactor,项目名称:reactor-core,代码行数:17,代码来源:StepVerifierAssertionsTests.java
示例10: assertNotDroppedElementsFailureOneDrop
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertNotDroppedElementsFailureOneDrop() {
try {
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onNext("foo");
s.onComplete();
s.onNext("bar");
}).take(2))
.expectNext("foo")
.expectComplete()
.verifyThenAssertThat()
.hasNotDroppedElements();
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessage("Expected no dropped elements, found <[bar]>.");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:StepVerifierAssertionsTests.java
示例11: assertDroppedElementsFailureOneExtra
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedElementsFailureOneExtra() {
try {
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onNext("foo");
s.onComplete();
s.onNext("bar");
s.onNext("baz");
}).take(3))
.expectNext("foo")
.expectComplete()
.verifyThenAssertThat()
.hasDropped("foo");
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessage("Expected dropped elements to contain <[foo]>, was <[bar, baz]>.");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:21,代码来源:StepVerifierAssertionsTests.java
示例12: assertDroppedElementsFailureOneMissing
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedElementsFailureOneMissing() {
try {
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onNext("foo");
s.onComplete();
s.onNext("bar");
s.onNext("baz");
}).take(3))
.expectNext("foo")
.expectComplete()
.verifyThenAssertThat()
.hasDroppedExactly("baz");
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessage("Expected dropped elements to contain exactly <[baz]>, was <[bar, baz]>.");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:21,代码来源:StepVerifierAssertionsTests.java
示例13: assertDroppedErrorAllPass
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedErrorAllPass() {
Throwable err1 = new IllegalStateException("boom1");
Throwable err2 = new IllegalStateException("boom2");
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onError(err1);
s.onError(err2);
}).buffer(1))
.expectError()
.verifyThenAssertThat()
.hasDroppedErrors()
.hasDroppedErrors(1)
.hasDroppedErrorOfType(IllegalStateException.class)
.hasDroppedErrorWithMessageContaining("boom")
.hasDroppedErrorWithMessage("boom2")
.hasDroppedErrorMatching(t -> t instanceof IllegalStateException && "boom2".equals(t.getMessage()));
}
开发者ID:reactor,项目名称:reactor-core,代码行数:19,代码来源:StepVerifierAssertionsTests.java
示例14: assertNotDroppedErrorsFailureOneDrop
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertNotDroppedErrorsFailureOneDrop() {
try {
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onNext("foo");
s.onComplete();
s.onError(new IllegalStateException("boom"));
}).take(2))
.expectNext("foo")
.expectComplete()
.verifyThenAssertThat()
.hasNotDroppedErrors();
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessage("Expected no dropped errors, found <[java.lang.IllegalStateException: boom]>.");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:StepVerifierAssertionsTests.java
示例15: assertDroppedErrorFailureWrongType
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedErrorFailureWrongType() {
try {
Throwable err1 = new IllegalStateException("boom1");
Throwable err2 = new IllegalStateException("boom2");
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onError(err1);
s.onError(err2);
}).buffer(1))
.expectError()
.verifyThenAssertThat()
.hasDroppedErrorOfType(IllegalArgumentException.class);
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessage("Expected dropped error to be of type java.lang.IllegalArgumentException, was java.lang.IllegalStateException.");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:StepVerifierAssertionsTests.java
示例16: assertDroppedErrorFailureWrongContains
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedErrorFailureWrongContains() {
try {
Throwable err1 = new IllegalStateException("boom1");
Throwable err2 = new IllegalStateException("boom2");
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onError(err1);
s.onError(err2);
}).buffer(1))
.expectError()
.verifyThenAssertThat()
.hasDroppedErrorWithMessageContaining("foo");
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessage("Expected dropped error with message containing <\"foo\">, was <\"boom2\">.");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:StepVerifierAssertionsTests.java
示例17: assertDroppedErrorFailureWrongMessage
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedErrorFailureWrongMessage() {
try {
Throwable err1 = new IllegalStateException("boom1");
Throwable err2 = new IllegalStateException("boom2");
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onError(err1);
s.onError(err2);
}).buffer(1))
.expectError()
.verifyThenAssertThat()
.hasDroppedErrorWithMessage("boom1");
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessage("Expected dropped error with message <\"boom1\">, was <\"boom2\">.");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:StepVerifierAssertionsTests.java
示例18: assertDroppedErrorFailureWrongMatch
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedErrorFailureWrongMatch() {
try {
Throwable err1 = new IllegalStateException("boom1");
Throwable err2 = new IllegalStateException("boom2");
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onError(err1);
s.onError(err2);
}).buffer(1))
.expectError()
.verifyThenAssertThat()
.hasDroppedErrorMatching(t -> t instanceof IllegalStateException && "foo".equals(t.getMessage()));
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessage("Expected dropped error matching the given predicate, did not match: <java.lang.IllegalStateException: boom2>.");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:StepVerifierAssertionsTests.java
示例19: assertDroppedErrorsFailureWrongCount
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedErrorsFailureWrongCount() {
Throwable err1 = new IllegalStateException("boom1");
Throwable err2 = new IllegalStateException("boom2");
Throwable err3 = new IllegalStateException("boom3");
try {
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onError(err1);
s.onError(err2);
s.onError(err3);
}).buffer(1))
.expectError()
.verifyThenAssertThat()
.hasDroppedErrors()
.hasDroppedErrors(3);
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessage("Expected exactly 3 dropped errors, 2 found.");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:StepVerifierAssertionsTests.java
示例20: assertDroppedErrorsNotSatisfying
import reactor.core.publisher.Operators; //导入依赖的package包/类
@Test
public void assertDroppedErrorsNotSatisfying() {
Throwable err1 = new IllegalStateException("boom1");
Throwable err2 = new IllegalStateException("boom2");
Throwable err3 = new IllegalStateException("boom3");
try {
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onError(err1);
s.onError(err2);
s.onError(err3);
}).buffer(1))
.expectError()
.verifyThenAssertThat()
.hasDroppedErrorsSatisfying(c -> assertThat(c).hasSize(3));
fail("expected an AssertionError");
}
catch (AssertionError ae) {
assertThat(ae).hasMessageContaining("Expected size:<3> but was:<2> in:");
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:22,代码来源:StepVerifierAssertionsTests.java
注:本文中的reactor.core.publisher.Operators类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论