• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java BackpressureUtils类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中rx.internal.operators.BackpressureUtils的典型用法代码示例。如果您正苦于以下问题:Java BackpressureUtils类的具体用法?Java BackpressureUtils怎么用?Java BackpressureUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



BackpressureUtils类属于rx.internal.operators包,在下文中一共展示了BackpressureUtils类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
    if (n < 0) {
        throw new IllegalArgumentException("reuest must be >=0");
    } else if (n == 0) {
        return;
    } else if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
        long requested = n;
        long emitted = 0;
        do {
            emitted = requested;
            while (requested-- > 0 && !subscriber.isUnsubscribed()) {
                subscriber.onNext(v);
            }
        } while ((requested = this.addAndGet(-emitted)) > 0);
    }
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:18,代码来源:OnSubscribeRepeating.java


示例2: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
    if (BackpressureUtils.validate(n)) {
        for (; ; ) {
            long r = get();
            if (r == Long.MIN_VALUE) {
                return;
            }
            long u = BackpressureUtils.addCap(r, n);
            if (compareAndSet(r, u)) {
                return;
            }
        }
    }
}
 
开发者ID:josesamuel,项目名称:RxRemote,代码行数:16,代码来源:RemoteSubject.java


示例3: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
public void request(long n) {
    if (n < 0) {
        throw new IllegalArgumentException("n >= 0 required");
    } else if (n > 0) {
        BackpressureUtils.getAndAddRequest(this, n);
        drain();
    }
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:9,代码来源:QueuedProducer.java


示例4: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
public void request(long n) {
    if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0) {
        if (n == Long.MAX_VALUE) {
            fastpath();
        } else {
            slowPath(n);
        }
    }
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:10,代码来源:SyncOnSubscribe.java


示例5: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(final long n) {
    if (n < 0) {
        throw new IllegalArgumentException("Invalid requested amount.");
    }

    if (n > 0) {
        BackpressureUtils.getAndAddRequest(this, n);
        drain();
    }
}
 
开发者ID:upday,项目名称:RxProxy,代码行数:12,代码来源:ProxyProducer.java


示例6: toObservable

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
/**
 * Convert this cursor to an observable
 * @return an observable emitting the chunks from the cursor and their
 * respective path in the store (retrieved via {@link #getChunkPath()})
 */
public Observable<Pair<ChunkMeta, String>> toObservable() {
  return Observable.unsafeCreate(s -> {
    s.setProducer(new Producer() {
      private AtomicLong requested = new AtomicLong();
      
      @Override
      public void request(long n) {
        if (n > 0 && !s.isUnsubscribed() &&
            BackpressureUtils.getAndAddRequest(requested, n) == 0) {
          drain();
        }
      }
      
      private void drain() {
        if (requested.get() > 0) {
          if (!hasNext()) {
            if (!s.isUnsubscribed()) {
              s.onCompleted();
            }
            return;
          }
          
          next(ar -> {
            if (s.isUnsubscribed()) {
              return;
            }
            if (ar.failed()) {
              s.onError(ar.cause());
            } else {
              s.onNext(Pair.of(ar.result(), getChunkPath()));
              requested.decrementAndGet();
              drain();
            }
          });
        }
      }
    });
  });
}
 
开发者ID:georocket,项目名称:georocket,代码行数:45,代码来源:RxStoreCursor.java


示例7: toObservable

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
/**
 * Convert this cursor to an observable
 * @return an observable emitting the items from the cursor
 */
public Observable<T> toObservable() {
  return Observable.unsafeCreate(s -> {
    s.setProducer(new Producer() {
      private AtomicLong requested = new AtomicLong();

      @Override
      public void request(long n) {
        if (n > 0 && !s.isUnsubscribed() &&
          BackpressureUtils.getAndAddRequest(requested, n) == 0) {
          drain();
        }
      }

      private void drain() {
        if (requested.get() > 0) {
          if (!hasNext()) {
            if (!s.isUnsubscribed()) {
              s.onCompleted();
            }
            return;
          }

          next(ar -> {
            if (s.isUnsubscribed()) {
              return;
            }
            if (ar.failed()) {
              s.onError(ar.cause());
            } else {
              s.onNext(ar.result());
              requested.decrementAndGet();
              drain();
            }
          });
        }
      }
    });
  });
}
 
开发者ID:georocket,项目名称:georocket,代码行数:44,代码来源:RxAsyncCursor.java


示例8: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
  if (n < 0) {
    throw new IllegalArgumentException();
  }
  if (n > 0) {
    BackpressureUtils.getAndAddRequest(requested, n);
    if (strategy == BUFFER) {
      drain();
    }
  }
}
 
开发者ID:jszczygiel,项目名称:android-common,代码行数:13,代码来源:PublishSubject.java


示例9: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
    if (n == Long.MAX_VALUE && requested.compareAndSet(0, Long.MAX_VALUE)) {
        // emitting all elements
        try (CloseableIterator<T> iterator = result.iterator()) {
            while (!subscriber.isUnsubscribed()) {
                if (iterator.hasNext()) {
                    subscriber.onNext(iterator.next());
                    emitted.incrementAndGet();
                } else {
                    subscriber.onCompleted();
                    break;
                }
            }
        }
    } else if (n > 0 && BackpressureUtils.getAndAddRequest(requested, n) == 0) {
        // emitting with limit/offset
        long count = n;
        while (count > 0) {
            try (CloseableIterator<T> iterator =
                     result.iterator(emitted.intValue(), (int) n)) {
                long i = 0;
                while (!subscriber.isUnsubscribed() && iterator.hasNext()) {
                    if (i++ < count) {
                        subscriber.onNext(iterator.next());
                    } else {
                        break;
                    }
                }
                emitted.addAndGet(i);
                // no more items
                if (!subscriber.isUnsubscribed() && i < count) {
                    subscriber.onCompleted();
                    break;
                }
                count = requested.addAndGet(-count);
            }
        }
    }
}
 
开发者ID:requery,项目名称:requery,代码行数:41,代码来源:OnSubscribeFromQuery.java


示例10: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
    if (BackpressureUtils.validate(n)) {
        BackpressureUtils.getAndAddRequest(requested, n);
        drain();
    }
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:8,代码来源:OnSubscribeMatch.java


示例11: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
    if (n > 0) {
        BackpressureUtils.getAndAddRequest(this, n);
        drain();
    }
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:8,代码来源:OperatorBufferToFile.java


示例12: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(final long n) {
    if (n == Long.MAX_VALUE && requested.compareAndSet(0, Long.MAX_VALUE)
        || n > 0 && BackpressureUtils.getAndAddRequest(requested, n) == 0) {
        // emitting all elements
        modelQueriable.queryResults().subscribe(new CursorResultAction(n));
    }
}
 
开发者ID:Raizlabs,项目名称:DBFlow,代码行数:9,代码来源:CursorResultSubscriber.java


示例13: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
    if (n < 0) {
        throw new IllegalArgumentException();
    }
    if (n == 0) {
        return;
    }
    if (BackpressureUtils.getAndAddRequest(requested, n) != 0) {
        return;
    }
    execute(this::produce);
}
 
开发者ID:hawkular,项目名称:hawkular-metrics,代码行数:14,代码来源:ResultSetToRowsTransformer.java


示例14: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
  BackpressureUtils.getAndAddRequest(this, n);
  // try and claim emission if no other threads are doing so
  merge.tick();
}
 
开发者ID:ybayk,项目名称:rxjava-recipes,代码行数:7,代码来源:OperatorMergeSorted.java


示例15: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
  if (n < 0) {
    throw new IllegalArgumentException();
  }
  if (BackpressureUtils.getAndAddRequest(currentRequest, n) != 0) {
    return;
  }
  long r = BackpressureUtils.addCap(backlog.get(), n);
  backlog.set(r);
  currentRequest.set(r);
  if (r == 0) {
    return;
  }
  if (upstream.completed() && upstream.buffer.isEmpty()) {
    complete();
    return;
  }
  if (!upstream.started()) {
    backlog.set(r);
    currentRequest.set(0);
    return;
  }
  // loop so that additional requests are processed in sequence
  for (;;) {
    if (downstream.isUnsubscribed()) {
      return;
    }
    int e = 0;

    if (upstream.buffer.isEmpty()) {
      upstream.requestMore();
    }
    if (upstream.buffer.isEmpty()) {
      if (upstream.completed() && !downstream.isUnsubscribed()) {
        complete();
        return;
      }
      backlog.set(r);
      currentRequest.set(0);
      return;
    }
    while (r > 0 && !upstream.buffer.isEmpty()) {
      JsonPathEvent pathEvent = upstream.buffer.poll();
      if (isDocumentEnd(pathEvent)) {
        if (!tokenBuffer.isEmpty()) {
          if (emit(null)) {
            ++e;
            --r;
          }
        }
      } else {
        if (newObject(pathEvent)) {
          if (emit(pathEvent.getMatchedPathFragment())) {
            ++e;
            --r;
          }
        }
        tokenBuffer.add(pathEvent.getTokenEvent());
      }
      if (downstream.isUnsubscribed()) {
        return;
      }

      if (tokenBuffer.isEmpty()) {
        if (upstream.completed() && !downstream.isUnsubscribed()) {
          complete();
          return;
        }
        break;
      }
    }
    // check for more requests
    r = currentRequest.addAndGet(-e);
    backlog.addAndGet(-e);
    if (r == 0) {
      return;
    }
  }
}
 
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:81,代码来源:OperatorCollectObjects.java


示例16: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
  if (n < 0) {
    throw new IllegalArgumentException();
  }
  if (BackpressureUtils.getAndAddRequest(currentRequest, n) != 0) {
    return;
  }
  long r = BackpressureUtils.addCap(backlog.get(), n);
  backlog.set(r);
  currentRequest.set(r);
  if (r == 0) {
    return;
  }
  if (upstream.completed() && upstream.isEmpty()) {
    complete();
    return;
  }
  if (!upstream.started()) {
    backlog.set(r);
    currentRequest.set(0);
    return;
  }
  // loop so that additional requests are processed in sequence
  for (;;) {
    if (downstream.isUnsubscribed()) {
      return;
    }
    int e = 0;

    if (upstream.isEmpty()) {
      upstream.requestMore();
    }
    if (upstream.isEmpty()) {
      if (upstream.completed() && !downstream.isUnsubscribed()) {
        complete();
        return;
      }
      backlog.set(r);
      currentRequest.set(0);
      return;
    }
    while (r > 0 && !upstream.isEmpty()) {
      downstream.onNext(upstream.poll());
      if (downstream.isUnsubscribed()) {
        return;
      }

      ++e;
      --r;
    }
    if (upstream.isEmpty() && upstream.completed() && !downstream.isUnsubscribed()) {
      complete();
      return;
    }
    // check for more requests
    r = currentRequest.addAndGet(-e);
    backlog.addAndGet(-e);
    if (r == 0) {
      return;
    }
  }
}
 
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:64,代码来源:OperatorJsonToken.java


示例17: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
  if (n < 0) {
    throw new IllegalArgumentException();
  }
  if (n == 0) {
    return;
  }
  if (BackpressureUtils.getAndAddRequest(requested, n) != 0) {
    return;
  }
  long r = n;
  // loop so that additional requests are processed in sequence
  for (; ; ) {
    if (subscriber.isUnsubscribed()) {
      return;
    }
    int i = index;
    int e = 0;
    while (r > 0 && i < source.length()) {
      subscriber.onNext(source.charAt(i));
      if (subscriber.isUnsubscribed()) {
        return;
      }

      ++i;
      ++e;
      --r;
      if (i == source.length()) {
        subscriber.onCompleted();
        return;
      }
    }
    index = i;
    // check for more requests
    r = requested.addAndGet(-e);
    if (r == 0) {
      return;
    }
  }
}
 
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:42,代码来源:OnSubscribeStringToChar.java


示例18: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
  if (n < 0) {
    throw new IllegalArgumentException();
  }
  if (BackpressureUtils.getAndAddRequest(currentRequest, n) != 0) {
    return;
  }
  long r = BackpressureUtils.addCap(backlog.get(), n);
  backlog.set(r);
  currentRequest.set(r);
  if (r == 0) {
    return;
  }
  if (upstream.completed() && upstream.buffer.isEmpty()) {
    complete();
    return;
  }
  if (!upstream.started()) {
    backlog.set(r);
    currentRequest.set(0);
    return;
  }
  // loop so that additional requests are processed in sequence
  for (; ; ) {
    if (downstream.isUnsubscribed()) {
      return;
    }
    int i = index;
    int e = 0;

    if (upstream.buffer.isEmpty()) {
      upstream.requestMore();
    }
    while (!upstream.buffer.isEmpty() && upstream.buffer.peek().isEmpty()) {
      upstream.buffer.poll();
      upstream.requestMore();
    }
    if (upstream.buffer.isEmpty()) {
      if (upstream.completed() && !downstream.isUnsubscribed()) {
        complete();
        return;
      }
      backlog.set(r);
      currentRequest.set(0);
      return;
    }
    String s = upstream.buffer.peek();
    while (r > 0 && i < s.length()) {
      downstream.onNext(s.charAt(i));
      if (downstream.isUnsubscribed()) {
        return;
      }

      ++i;
      ++e;
      --r;
      if (i == s.length()) {
        if (upstream.completed() && !downstream.isUnsubscribed()) {
          complete();
          return;
        }
        upstream.buffer.poll();
        i = 0;
        break;
      }
    }
    index = i;
    // check for more requests
    r = currentRequest.addAndGet(-e);
    backlog.addAndGet(-e);
    if (r == 0) {
      return;
    }
  }
}
 
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:77,代码来源:OperatorStringToChar.java


示例19: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
  if (n < 0) {
    throw new IllegalArgumentException();
  }
  if (BackpressureUtils.getAndAddRequest(currentRequest, n) != 0) {
    return;
  }
  long r = BackpressureUtils.addCap(backlog.get(), n);
  backlog.set(r);
  currentRequest.set(r);
  if (r == 0) {
    return;
  }
  if (upstream.completed() && upstream.buffer.isEmpty()) {
    complete();
    return;
  }
  if (!upstream.started()) {
    backlog.set(r);
    currentRequest.set(0);
    return;
  }
  // loop so that additional requests are processed in sequence
  for (;;) {
    if (downstream.isUnsubscribed()) {
      return;
    }
    int e = 0;

    if (upstream.buffer.isEmpty()) {
      upstream.requestMore();
    }
    if (upstream.buffer.isEmpty()) {
      if (upstream.completed() && !downstream.isUnsubscribed()) {
        complete();
        return;
      }
      backlog.set(r);
      currentRequest.set(0);
      return;
    }
    while (r > 0 && !upstream.buffer.isEmpty()) {
      if (consumeAndEmit()) {
        ++e;
        --r;
      }
      if (downstream.isUnsubscribed()) {
        return;
      }

      if (upstream.buffer.isEmpty()) {
        if (upstream.completed() && !downstream.isUnsubscribed()) {
          complete();
          return;
        }
        break;
      }
    }
    // check for more requests
    r = currentRequest.addAndGet(-e);
    backlog.addAndGet(-e);
    if (r == 0) {
      return;
    }
  }
}
 
开发者ID:Trunkplatform,项目名称:rxjava-json,代码行数:68,代码来源:OperatorJsonGson.java


示例20: request

import rx.internal.operators.BackpressureUtils; //导入依赖的package包/类
@Override
public void request(long n) {
    BackpressureUtils.getAndAddRequest(this, n);
    emit();
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:6,代码来源:OrderedMerge.java



注:本文中的rx.internal.operators.BackpressureUtils类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java WorldArchetype类代码示例发布时间:2022-05-22
下一篇:
Java PortChangeEvent类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap