• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java RxRingBuffer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中rx.internal.util.RxRingBuffer的典型用法代码示例。如果您正苦于以下问题:Java RxRingBuffer类的具体用法?Java RxRingBuffer怎么用?Java RxRingBuffer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



RxRingBuffer类属于rx.internal.util包,在下文中一共展示了RxRingBuffer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: start

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
static void start() {
    Schedulers s = INSTANCE;
    synchronized (s) {
        if (s.computationScheduler instanceof SchedulerLifecycle) {
            ((SchedulerLifecycle) s.computationScheduler).start();
        }
        if (s.ioScheduler instanceof SchedulerLifecycle) {
            ((SchedulerLifecycle) s.ioScheduler).start();
        }
        if (s.newThreadScheduler instanceof SchedulerLifecycle) {
            ((SchedulerLifecycle) s.newThreadScheduler).start();
        }
        GenericScheduledExecutorService.INSTANCE.start();
        RxRingBuffer.SPSC_POOL.start();
        RxRingBuffer.SPMC_POOL.start();
    }
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:18,代码来源:Schedulers.java


示例2: shutdown

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public static void shutdown() {
    Schedulers s = INSTANCE;
    synchronized (s) {
        if (s.computationScheduler instanceof SchedulerLifecycle) {
            ((SchedulerLifecycle) s.computationScheduler).shutdown();
        }
        if (s.ioScheduler instanceof SchedulerLifecycle) {
            ((SchedulerLifecycle) s.ioScheduler).shutdown();
        }
        if (s.newThreadScheduler instanceof SchedulerLifecycle) {
            ((SchedulerLifecycle) s.newThreadScheduler).shutdown();
        }
        GenericScheduledExecutorService.INSTANCE.shutdown();
        RxRingBuffer.SPSC_POOL.shutdown();
        RxRingBuffer.SPMC_POOL.shutdown();
    }
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:18,代码来源:Schedulers.java


示例3: queueScalar

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
protected void queueScalar(InnerSubscriber<T> subscriber, T value) {
    RxRingBuffer q = subscriber.queue;
    if (q == null) {
        q = RxRingBuffer.getSpscInstance();
        subscriber.add(q);
        subscriber.queue = q;
    }
    try {
        q.onNext(this.nl.next(value));
        emit();
    } catch (MissingBackpressureException ex) {
        subscriber.unsubscribe();
        subscriber.onError(ex);
    } catch (IllegalStateException ex2) {
        if (!subscriber.isUnsubscribed()) {
            subscriber.unsubscribe();
            subscriber.onError(ex2);
        }
    }
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:21,代码来源:OperatorMerge.java


示例4: request

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public void request(long n) {
    BackpressureUtils.getAndAddRequest(this.requested, n);
    if (!this.started.get() && this.started.compareAndSet(false, true)) {
        int sizePerSubscriber = RxRingBuffer.SIZE / this.sources.size();
        int leftOver = RxRingBuffer.SIZE % this.sources.size();
        for (int i = 0; i < this.sources.size(); i++) {
            Observable<? extends T> o = (Observable) this.sources.get(i);
            int toRequest = sizePerSubscriber;
            if (i == this.sources.size() - 1) {
                toRequest += leftOver;
            }
            MultiSourceRequestableSubscriber<T, R> s = new MultiSourceRequestableSubscriber(i, toRequest, this.child, this);
            this.subscribers[i] = s;
            o.unsafeSubscribe(s);
        }
    }
    tick();
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:19,代码来源:OnSubscribeCombineLatest.java


示例5: testBeyondBackpressureBuffer

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
@Test
public void testBeyondBackpressureBuffer() {
  int n = 100 * RxRingBuffer.SIZE;
  
  //    Similar to zip
  //    List<Observable<?>> os = new ArrayList<Observable<?>>();
  //    os.add(Observable.range(0, n));
  //    Iterator<Integer> iter = Observable.just(os.toArray(new Observable<?>[os.size()]))
  //        .lift(new OperatorZip<Integer>(args->(int)args[0]))
  Iterator<Integer> iter = Observable.just(Observable.range(0, n))
      .lift(new OperatorMergeSorted<Integer>(naturalComparator))
      .toBlocking().getIterator();
  for (int i = 0; i < n; i++) {
    Assert.assertTrue(iter.hasNext());
    int value = iter.next();
    Assert.assertEquals(i, value);
  }
  Assert.assertFalse(iter.hasNext());
}
 
开发者ID:ybayk,项目名称:rxjava-recipes,代码行数:20,代码来源:OperatorMergeSortedTest.java


示例6: testBackpressure

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
@Test
public void testBackpressure() {
  final int n = 100;
  final AtomicInteger max1 = new AtomicInteger();
  final AtomicInteger max2 = new AtomicInteger();
  Observable<Integer> o1 = Observable.range(0, RxRingBuffer.SIZE * n).delay(1, TimeUnit.SECONDS)
      .doOnNext(value->max1.set(value > max1.get()? value : max1.get()));
  Observable<Integer> o2 = Observable.range(RxRingBuffer.SIZE * n, RxRingBuffer.SIZE * n)
      .doOnNext(value->max2.set(value - RxRingBuffer.SIZE * n > max2.get()? value - RxRingBuffer.SIZE * n: max2.get()));
 
  Iterator<Integer> iter = Observable.just(o1, o2)
      .lift(new OperatorMergeSorted<Integer>(naturalComparator))
      .toBlocking().getIterator();

  for (int i = 0; i < RxRingBuffer.SIZE/2; i++) {
    Assert.assertTrue(iter.hasNext());
    Assert.assertEquals(i, (int)iter.next());
  }
  Assert.assertTrue(iter.hasNext());
  
  //make sure source observables are not fetched beyond double buffer size 
  //as toBlocking().getIterator() now supports backpressure
  Assert.assertTrue(max1.get() <= 2 * RxRingBuffer.SIZE);
  Assert.assertTrue(max2.get() <= 2 * RxRingBuffer.SIZE);
  
}
 
开发者ID:ybayk,项目名称:rxjava-recipes,代码行数:27,代码来源:OperatorMergeSortedTest.java


示例7: PublishSubscriber

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public PublishSubscriber(AtomicReference<PublishSubscriber<T>> current) {
    this.queue = UnsafeAccess.isUnsafeAvailable() ? new SpscArrayQueue(RxRingBuffer.SIZE) : new SynchronizedQueue(RxRingBuffer.SIZE);
    this.nl = NotificationLite.instance();
    this.producers = new AtomicReference(EMPTY);
    this.current = current;
    this.shouldConnect = new AtomicBoolean();
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:8,代码来源:OperatorPublish.java


示例8: requestMore

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public void requestMore(long n) {
    int r = this.outstanding - ((int) n);
    if (r > limit) {
        this.outstanding = r;
        return;
    }
    this.outstanding = RxRingBuffer.SIZE;
    int k = RxRingBuffer.SIZE - r;
    if (k > 0) {
        request((long) k);
    }
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:13,代码来源:OperatorMerge.java


示例9: removeInner

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
void removeInner(InnerSubscriber<T> inner) {
    RxRingBuffer q = inner.queue;
    if (q != null) {
        q.release();
    }
    this.subscriptions.remove(inner);
    synchronized (this.innerGuard) {
        InnerSubscriber<?>[] a = this.innerSubscribers;
        int n = a.length;
        int j = -1;
        for (int i = 0; i < n; i++) {
            if (inner.equals(a[i])) {
                j = i;
                break;
            }
        }
        if (j < 0) {
        } else if (n == 1) {
            this.innerSubscribers = EMPTY;
        } else {
            InnerSubscriber<?>[] b = new InnerSubscriber[(n - 1)];
            System.arraycopy(a, 0, b, 0, j);
            System.arraycopy(a, j + 1, b, j, (n - j) - 1);
            this.innerSubscribers = b;
        }
    }
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:28,代码来源:OperatorMerge.java


示例10: OnSubscribeCombineLatest

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public OnSubscribeCombineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
    this.sources = sources;
    this.combinator = combinator;
    if (sources.size() > RxRingBuffer.SIZE) {
        throw new IllegalArgumentException("More than RxRingBuffer.SIZE sources to combineLatest is not supported.");
    }
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:8,代码来源:OnSubscribeCombineLatest.java


示例11: ObserveOnSubscriber

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
    this.child = child;
    this.recursiveScheduler = scheduler.createWorker();
    if (UnsafeAccess.isUnsafeAvailable()) {
        this.queue = new SpscArrayQueue(RxRingBuffer.SIZE);
    } else {
        this.queue = new SynchronizedQueue(RxRingBuffer.SIZE);
    }
    this.scheduledUnsubscribe = new ScheduledUnsubscribe(this.recursiveScheduler);
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:11,代码来源:OperatorObserveOn.java


示例12: pollTopItem

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
/**
 * Polls a top item from the priority queue. Adds the top source back to
 * pending set. Not synchronized as accessed inside COUNTER_UPDATER block
 * 
 * @param pending
 *          set of source observers pending a value for the merge
 * @return true if we need a shutdown
 */
private boolean pollTopItem(Set<InnerSubscriber> pending) {
  // get a next item in order from the priority queue
  ValueSourcePair tuple = queue.poll();
  T value = tuple.value;

  InnerSubscriber observer = tuple.source;
  // mark the wining observer as pending again
  pending.add(observer);
  try {
    // emit the next item in order
    child.onNext(value);
    // we emitted so decrement the requested counter
    requested.decrementAndGet();
    observer.emitted++;
  } catch (Throwable e) {
    Exceptions.throwOrReport(e, child, value);
    return true;
  }
  // now remove
  RxRingBuffer buffer = observer.items;
  buffer.poll();
  // eagerly check if the next item on this queue is an onComplete
  if (buffer.isCompleted(buffer.peek())) {
    // we need to unsubscribe and remove from pending
    pending.remove(observer);
    childSubscription.remove(observer);
    if (!childSubscription.hasSubscriptions()) {
      // it is last upstream observer so onComplete so shut down
      child.onCompleted();
      return true;
    }
    return false; // dont request for this observer
  }
  if (observer.emitted > THRESHOLD) {
    observer.requestMore(observer.emitted);
    observer.emitted = 0;
  }
  return false;
}
 
开发者ID:ybayk,项目名称:rxjava-recipes,代码行数:48,代码来源:OperatorMergeSorted.java


示例13: combineLatestDelayError

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
    return create(new OnSubscribeCombineLatest(null, sources, combineFunction, RxRingBuffer.SIZE, true));
}
 
开发者ID:JackChan1999,项目名称:letv,代码行数:4,代码来源:Observable.java


示例14: concatMapIterable

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public final <R> Observable<R> concatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
    return OnSubscribeFlattenIterable.createFrom(this, collectionSelector, RxRingBuffer.SIZE);
}
 
开发者ID:JackChan1999,项目名称:letv,代码行数:4,代码来源:Observable.java


示例15: concatMapEager

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
@Experimental
public final <R> Observable<R> concatMapEager(Func1<? super T, ? extends Observable<? extends R>> mapper) {
    return concatMapEager(mapper, RxRingBuffer.SIZE);
}
 
开发者ID:JackChan1999,项目名称:letv,代码行数:5,代码来源:Observable.java


示例16: flatMapIterable

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
    return flatMapIterable((Func1) collectionSelector, RxRingBuffer.SIZE);
}
 
开发者ID:JackChan1999,项目名称:letv,代码行数:4,代码来源:Observable.java


示例17: observeOn

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, RxRingBuffer.SIZE);
}
 
开发者ID:JackChan1999,项目名称:letv,代码行数:4,代码来源:Observable.java


示例18: onStart

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public void onStart() {
    request((long) RxRingBuffer.SIZE);
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:4,代码来源:OperatorPublish.java


示例19: onStart

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
public void onStart() {
    this.outstanding = RxRingBuffer.SIZE;
    request((long) RxRingBuffer.SIZE);
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:5,代码来源:OperatorMerge.java


示例20: tick

import rx.internal.util.RxRingBuffer; //导入依赖的package包/类
void tick() {
    Object[] observers = this.observers;
    if (observers != null && getAndIncrement() == 0) {
        int length = observers.length;
        Observer<? super R> child = this.child;
        AtomicLong requested = this.requested;
        while (true) {
            RxRingBuffer buffer;
            Object[] vs = new Object[length];
            boolean allHaveValues = true;
            for (int i = 0; i < length; i++) {
                buffer = ((InnerSubscriber) observers[i]).items;
                Object n = buffer.peek();
                if (n == null) {
                    allHaveValues = false;
                } else if (buffer.isCompleted(n)) {
                    child.onCompleted();
                    this.childSubscription.unsubscribe();
                    return;
                } else {
                    vs[i] = buffer.getValue(n);
                }
            }
            if (requested.get() > 0 && allHaveValues) {
                try {
                    child.onNext(this.zipFunction.call(vs));
                    requested.decrementAndGet();
                    this.emitted++;
                    for (Object obj : observers) {
                        buffer = ((InnerSubscriber) obj).items;
                        buffer.poll();
                        if (buffer.isCompleted(buffer.peek())) {
                            child.onCompleted();
                            this.childSubscription.unsubscribe();
                            return;
                        }
                    }
                    if (this.emitted > THRESHOLD) {
                        for (Object obj2 : observers) {
                            ((InnerSubscriber) obj2).requestMore((long) this.emitted);
                        }
                        this.emitted = 0;
                    }
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, child, vs);
                    return;
                }
            } else if (decrementAndGet() <= 0) {
                return;
            }
        }
    }
}
 
开发者ID:JackChan1999,项目名称:boohee_v5.6,代码行数:54,代码来源:OperatorZip.java



注:本文中的rx.internal.util.RxRingBuffer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java BatchExecutor类代码示例发布时间:2022-05-22
下一篇:
Java Clazz类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap