本文整理汇总了Java中rx.internal.operators.BackpressureUtils类的典型用法代码示例。如果您正苦于以下问题:Java BackpressureUtils类的具体用法?Java BackpressureUtils怎么用?Java BackpressureUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
BackpressureUtils类属于rx.internal.operators包,在下文中一共展示了BackpressureUtils类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("reuest must be >=0");
} else if (n == 0) {
return;
} else if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
long requested = n;
long emitted = 0;
do {
emitted = requested;
while (requested-- > 0 && !subscriber.isUnsubscribed()) {
subscriber.onNext(v);
}
} while ((requested = this.addAndGet(-emitted)) > 0);
}
}
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:18,代码来源:OnSubscribeRepeating.java
示例2: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (BackpressureUtils.validate(n)) {
for (; ; ) {
long r = get();
if (r == Long.MIN_VALUE) {
return;
}
long u = BackpressureUtils.addCap(r, n);
if (compareAndSet(r, u)) {
return;
}
}
}
}
开发者ID:josesamuel,项目名称:RxRemote,代码行数:16,代码来源:RemoteSubject.java
示例3: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required");
} else if (n > 0) {
BackpressureUtils.getAndAddRequest(this, n);
drain();
}
}
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:9,代码来源:QueuedProducer.java
示例4: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
public void request(long n) {
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastpath();
} else {
slowPath(n);
}
}
}
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:10,代码来源:SyncOnSubscribe.java
示例5: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(final long n) {
if (n < 0) {
throw new IllegalArgumentException("Invalid requested amount.");
}
if (n > 0) {
BackpressureUtils.getAndAddRequest(this, n);
drain();
}
}
开发者ID:upday,项目名称:RxProxy,代码行数:12,代码来源:ProxyProducer.java
示例6: toObservable
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
/**
* Convert this cursor to an observable
* @return an observable emitting the chunks from the cursor and their
* respective path in the store (retrieved via {@link #getChunkPath()})
*/
public Observable<Pair<ChunkMeta, String>> toObservable() {
return Observable.unsafeCreate(s -> {
s.setProducer(new Producer() {
private AtomicLong requested = new AtomicLong();
@Override
public void request(long n) {
if (n > 0 && !s.isUnsubscribed() &&
BackpressureUtils.getAndAddRequest(requested, n) == 0) {
drain();
}
}
private void drain() {
if (requested.get() > 0) {
if (!hasNext()) {
if (!s.isUnsubscribed()) {
s.onCompleted();
}
return;
}
next(ar -> {
if (s.isUnsubscribed()) {
return;
}
if (ar.failed()) {
s.onError(ar.cause());
} else {
s.onNext(Pair.of(ar.result(), getChunkPath()));
requested.decrementAndGet();
drain();
}
});
}
}
});
});
}
开发者ID:georocket,项目名称:georocket,代码行数:45,代码来源:RxStoreCursor.java
示例7: toObservable
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
/**
* Convert this cursor to an observable
* @return an observable emitting the items from the cursor
*/
public Observable<T> toObservable() {
return Observable.unsafeCreate(s -> {
s.setProducer(new Producer() {
private AtomicLong requested = new AtomicLong();
@Override
public void request(long n) {
if (n > 0 && !s.isUnsubscribed() &&
BackpressureUtils.getAndAddRequest(requested, n) == 0) {
drain();
}
}
private void drain() {
if (requested.get() > 0) {
if (!hasNext()) {
if (!s.isUnsubscribed()) {
s.onCompleted();
}
return;
}
next(ar -> {
if (s.isUnsubscribed()) {
return;
}
if (ar.failed()) {
s.onError(ar.cause());
} else {
s.onNext(ar.result());
requested.decrementAndGet();
drain();
}
});
}
}
});
});
}
开发者ID:georocket,项目名称:georocket,代码行数:44,代码来源:RxAsyncCursor.java
示例8: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException();
}
if (n > 0) {
BackpressureUtils.getAndAddRequest(requested, n);
if (strategy == BUFFER) {
drain();
}
}
}
开发者ID:jszczygiel,项目名称:android-common,代码行数:13,代码来源:PublishSubject.java
示例9: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n == Long.MAX_VALUE && requested.compareAndSet(0, Long.MAX_VALUE)) {
// emitting all elements
try (CloseableIterator<T> iterator = result.iterator()) {
while (!subscriber.isUnsubscribed()) {
if (iterator.hasNext()) {
subscriber.onNext(iterator.next());
emitted.incrementAndGet();
} else {
subscriber.onCompleted();
break;
}
}
}
} else if (n > 0 && BackpressureUtils.getAndAddRequest(requested, n) == 0) {
// emitting with limit/offset
long count = n;
while (count > 0) {
try (CloseableIterator<T> iterator =
result.iterator(emitted.intValue(), (int) n)) {
long i = 0;
while (!subscriber.isUnsubscribed() && iterator.hasNext()) {
if (i++ < count) {
subscriber.onNext(iterator.next());
} else {
break;
}
}
emitted.addAndGet(i);
// no more items
if (!subscriber.isUnsubscribed() && i < count) {
subscriber.onCompleted();
break;
}
count = requested.addAndGet(-count);
}
}
}
}
开发者ID:requery,项目名称:requery,代码行数:41,代码来源:OnSubscribeFromQuery.java
示例10: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (BackpressureUtils.validate(n)) {
BackpressureUtils.getAndAddRequest(requested, n);
drain();
}
}
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:8,代码来源:OnSubscribeMatch.java
示例11: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n > 0) {
BackpressureUtils.getAndAddRequest(this, n);
drain();
}
}
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:8,代码来源:OperatorBufferToFile.java
示例12: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(final long n) {
if (n == Long.MAX_VALUE && requested.compareAndSet(0, Long.MAX_VALUE)
|| n > 0 && BackpressureUtils.getAndAddRequest(requested, n) == 0) {
// emitting all elements
modelQueriable.queryResults().subscribe(new CursorResultAction(n));
}
}
开发者ID:Raizlabs,项目名称:DBFlow,代码行数:9,代码来源:CursorResultSubscriber.java
示例13: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException();
}
if (n == 0) {
return;
}
if (BackpressureUtils.getAndAddRequest(requested, n) != 0) {
return;
}
execute(this::produce);
}
开发者ID:hawkular,项目名称:hawkular-metrics,代码行数:14,代码来源:ResultSetToRowsTransformer.java
示例14: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
BackpressureUtils.getAndAddRequest(this, n);
// try and claim emission if no other threads are doing so
merge.tick();
}
开发者ID:ybayk,项目名称:rxjava-recipes,代码行数:7,代码来源:OperatorMergeSorted.java
示例15: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException();
}
if (BackpressureUtils.getAndAddRequest(currentRequest, n) != 0) {
return;
}
long r = BackpressureUtils.addCap(backlog.get(), n);
backlog.set(r);
currentRequest.set(r);
if (r == 0) {
return;
}
if (upstream.completed() && upstream.buffer.isEmpty()) {
complete();
return;
}
if (!upstream.started()) {
backlog.set(r);
currentRequest.set(0);
return;
}
// loop so that additional requests are processed in sequence
for (;;) {
if (downstream.isUnsubscribed()) {
return;
}
int e = 0;
if (upstream.buffer.isEmpty()) {
upstream.requestMore();
}
if (upstream.buffer.isEmpty()) {
if (upstream.completed() && !downstream.isUnsubscribed()) {
complete();
return;
}
backlog.set(r);
currentRequest.set(0);
return;
}
while (r > 0 && !upstream.buffer.isEmpty()) {
JsonPathEvent pathEvent = upstream.buffer.poll();
if (isDocumentEnd(pathEvent)) {
if (!tokenBuffer.isEmpty()) {
if (emit(null)) {
++e;
--r;
}
}
} else {
if (newObject(pathEvent)) {
if (emit(pathEvent.getMatchedPathFragment())) {
++e;
--r;
}
}
tokenBuffer.add(pathEvent.getTokenEvent());
}
if (downstream.isUnsubscribed()) {
return;
}
if (tokenBuffer.isEmpty()) {
if (upstream.completed() && !downstream.isUnsubscribed()) {
complete();
return;
}
break;
}
}
// check for more requests
r = currentRequest.addAndGet(-e);
backlog.addAndGet(-e);
if (r == 0) {
return;
}
}
}
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:81,代码来源:OperatorCollectObjects.java
示例16: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException();
}
if (BackpressureUtils.getAndAddRequest(currentRequest, n) != 0) {
return;
}
long r = BackpressureUtils.addCap(backlog.get(), n);
backlog.set(r);
currentRequest.set(r);
if (r == 0) {
return;
}
if (upstream.completed() && upstream.isEmpty()) {
complete();
return;
}
if (!upstream.started()) {
backlog.set(r);
currentRequest.set(0);
return;
}
// loop so that additional requests are processed in sequence
for (;;) {
if (downstream.isUnsubscribed()) {
return;
}
int e = 0;
if (upstream.isEmpty()) {
upstream.requestMore();
}
if (upstream.isEmpty()) {
if (upstream.completed() && !downstream.isUnsubscribed()) {
complete();
return;
}
backlog.set(r);
currentRequest.set(0);
return;
}
while (r > 0 && !upstream.isEmpty()) {
downstream.onNext(upstream.poll());
if (downstream.isUnsubscribed()) {
return;
}
++e;
--r;
}
if (upstream.isEmpty() && upstream.completed() && !downstream.isUnsubscribed()) {
complete();
return;
}
// check for more requests
r = currentRequest.addAndGet(-e);
backlog.addAndGet(-e);
if (r == 0) {
return;
}
}
}
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:64,代码来源:OperatorJsonToken.java
示例17: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException();
}
if (n == 0) {
return;
}
if (BackpressureUtils.getAndAddRequest(requested, n) != 0) {
return;
}
long r = n;
// loop so that additional requests are processed in sequence
for (; ; ) {
if (subscriber.isUnsubscribed()) {
return;
}
int i = index;
int e = 0;
while (r > 0 && i < source.length()) {
subscriber.onNext(source.charAt(i));
if (subscriber.isUnsubscribed()) {
return;
}
++i;
++e;
--r;
if (i == source.length()) {
subscriber.onCompleted();
return;
}
}
index = i;
// check for more requests
r = requested.addAndGet(-e);
if (r == 0) {
return;
}
}
}
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:42,代码来源:OnSubscribeStringToChar.java
示例18: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException();
}
if (BackpressureUtils.getAndAddRequest(currentRequest, n) != 0) {
return;
}
long r = BackpressureUtils.addCap(backlog.get(), n);
backlog.set(r);
currentRequest.set(r);
if (r == 0) {
return;
}
if (upstream.completed() && upstream.buffer.isEmpty()) {
complete();
return;
}
if (!upstream.started()) {
backlog.set(r);
currentRequest.set(0);
return;
}
// loop so that additional requests are processed in sequence
for (; ; ) {
if (downstream.isUnsubscribed()) {
return;
}
int i = index;
int e = 0;
if (upstream.buffer.isEmpty()) {
upstream.requestMore();
}
while (!upstream.buffer.isEmpty() && upstream.buffer.peek().isEmpty()) {
upstream.buffer.poll();
upstream.requestMore();
}
if (upstream.buffer.isEmpty()) {
if (upstream.completed() && !downstream.isUnsubscribed()) {
complete();
return;
}
backlog.set(r);
currentRequest.set(0);
return;
}
String s = upstream.buffer.peek();
while (r > 0 && i < s.length()) {
downstream.onNext(s.charAt(i));
if (downstream.isUnsubscribed()) {
return;
}
++i;
++e;
--r;
if (i == s.length()) {
if (upstream.completed() && !downstream.isUnsubscribed()) {
complete();
return;
}
upstream.buffer.poll();
i = 0;
break;
}
}
index = i;
// check for more requests
r = currentRequest.addAndGet(-e);
backlog.addAndGet(-e);
if (r == 0) {
return;
}
}
}
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:77,代码来源:OperatorStringToChar.java
示例19: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException();
}
if (BackpressureUtils.getAndAddRequest(currentRequest, n) != 0) {
return;
}
long r = BackpressureUtils.addCap(backlog.get(), n);
backlog.set(r);
currentRequest.set(r);
if (r == 0) {
return;
}
if (upstream.completed() && upstream.buffer.isEmpty()) {
complete();
return;
}
if (!upstream.started()) {
backlog.set(r);
currentRequest.set(0);
return;
}
// loop so that additional requests are processed in sequence
for (;;) {
if (downstream.isUnsubscribed()) {
return;
}
int e = 0;
if (upstream.buffer.isEmpty()) {
upstream.requestMore();
}
if (upstream.buffer.isEmpty()) {
if (upstream.completed() && !downstream.isUnsubscribed()) {
complete();
return;
}
backlog.set(r);
currentRequest.set(0);
return;
}
while (r > 0 && !upstream.buffer.isEmpty()) {
if (consumeAndEmit()) {
++e;
--r;
}
if (downstream.isUnsubscribed()) {
return;
}
if (upstream.buffer.isEmpty()) {
if (upstream.completed() && !downstream.isUnsubscribed()) {
complete();
return;
}
break;
}
}
// check for more requests
r = currentRequest.addAndGet(-e);
backlog.addAndGet(-e);
if (r == 0) {
return;
}
}
}
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:68,代码来源:OperatorJsonGson.java
示例20: request
import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
BackpressureUtils.getAndAddRequest(this, n);
emit();
}
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:6,代码来源:OrderedMerge.java
注:本文中的rx.internal.operators.BackpressureUtils类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论