• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Scala Observable类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中rx.lang.scala.Observable的典型用法代码示例。如果您正苦于以下问题:Scala Observable类的具体用法?Scala Observable怎么用?Scala Observable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Observable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ObservableInPin

//设置package包名称以及导入依赖的类
package framboos.async

import rx.lang.scala.Observable
import rx.lang.scala.Subject
import rx.lang.scala.subjects._
import scala.concurrent.duration._
import framboos._

object ObservableInPin {

  def apply(pinNumber: Int): Observable[Boolean] = {
    val inPin = ReverseInPin(pinNumber)
    var lastValue = inPin.value
    val subject = BehaviorSubject(lastValue)
    val intervals = Observable.interval(50 milliseconds)
    intervals.subscribe(next => {
      val currentValue = inPin.value
      if (currentValue != lastValue) {
        // TODO access Akka logging?
        // log.debug(s"value of in#$pinNumber changed to $currentValue")
        subject.onNext(currentValue)
      }
      lastValue = currentValue
    })
    subject
  }
} 
开发者ID:nwswanson,项目名称:thermopi,代码行数:28,代码来源:ObservableInPin.scala


示例2: SynchronizedManualRxEmitter

//设置package包名称以及导入依赖的类
package org.nephtys.rxjavamanualemitter

import rx.lang.scala.Observable


class SynchronizedManualRxEmitter[T] {
  private val innerEmitter = new ManualRxEmitter[T]

  def err(e : Throwable) = this.synchronized {
      innerEmitter.err(e)
  }
  def emit(t : T) = this.synchronized {
    innerEmitter.emit(t)
  }
  def complete() = this.synchronized {
    innerEmitter.complete()
  }

  def Observable : Observable[T] = innerEmitter.Observable
} 
开发者ID:n3phtys,项目名称:rxjava-manualemitter,代码行数:21,代码来源:SynchronizedManualRxEmitter.scala


示例3: emit

//设置package包名称以及导入依赖的类
package org.nephtys.rxjavamanualemitter

import java.util.concurrent.atomic.AtomicBoolean

import rx.lang.scala.{Observable, Subscriber, Subscription}

import scala.collection.parallel.mutable.{ParHashMap, ParHashSet}
import rx.lang.scala


  def emit(t : T) = {
    if (!completed.get() && !errored.get()) {
      hashmap.foreach(sub => if(!sub.isUnsubscribed) {
        sub.onNext(t)
      })
    }
  }

  def complete() = {
    if (!completed.get() && !errored.get()) {
      hashmap.foreach(sub => if(!sub.isUnsubscribed) {
        sub.onCompleted()
      })
      completed.set(true)
      hashmap.clear()
    }
  }

  def err(e : Throwable) = {
    if (!completed.get() && !errored.get()) {
      hashmap.foreach(sub => if(!sub.isUnsubscribed) {
        sub.onError(e)
      })
      errored.set(true)
      hashmap.clear()
    }
  }



  private def subscribefunc(subscriber : Subscriber[T]) : Unit = {
    hashmap.+=(subscriber)
  }

  val Observable : Observable[T] = scala.Observable.apply[T](subscriber => subscribefunc(subscriber))

} 
开发者ID:n3phtys,项目名称:rxjava-manualemitter,代码行数:48,代码来源:ManualRxEmitter.scala


示例4: RunInterval

//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.component.core

import scala.concurrent.duration._

import akka.actor._
import rx.lang.scala.Observable

import net.ruippeixotog.scalafbp.component.ComponentActor.OnAllOutputPortsClosed
import net.ruippeixotog.scalafbp.component._

case object RunInterval extends Component {
  val name = "core/RunInterval"
  val description = "Sends a signal periodically"
  val icon = Some("clock-o")

  val intervalPort = InPort[Long]("interval", "Interval at which signals are emitted (ms)")
  val stopPort = InPort[Unit]("stop", "Stop the emission")
  val inPorts = List(intervalPort, stopPort)

  val outPort = OutPort[Unit]("out", "A signal sent at the given interval")
  val outPorts = List(outPort)

  val instanceProps = Props(new ComponentActor(this) {
    override val terminationPolicy = List(OnAllOutputPortsClosed)

    intervalPort.stream
      .switchMap { int => Observable.interval(int.millis).map(_ => ()) }
      .doOnCompleted(context.stop(self))
      .pipeTo(outPort)

    stopPort.stream.foreach(_ => context.stop(self))
  })
} 
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:34,代码来源:RunInterval.scala


示例5: RunTimeout

//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.component.core

import scala.concurrent.duration._

import akka.actor._
import rx.lang.scala.Observable

import net.ruippeixotog.scalafbp.component.ComponentActor.OnAllOutputPortsClosed
import net.ruippeixotog.scalafbp.component._

case object RunTimeout extends Component {
  val name = "core/RunTimeout"
  val description = "Sends a signal after the given time"
  val icon = Some("clock-o")

  val timePort = InPort[Long]("time", "Time after which a signal will be sent (ms)")
  val inPorts = List(timePort)

  val outPort = OutPort[Unit]("out", "A signal sent after the given time")
  val outPorts = List(outPort)

  val instanceProps = Props(new ComponentActor(this) {
    override val terminationPolicy = List(OnAllOutputPortsClosed)

    timePort.stream.take(1)
      .flatMap { t => Observable.timer(t.millis).map(_ => ()) }
      .doOnCompleted(context.stop(self))
      .pipeTo(outPort)
  })
} 
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:31,代码来源:RunTimeout.scala


示例6: RepeatDelayed

//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.component.core

import scala.concurrent.duration._

import akka.actor.Props
import rx.lang.scala.Observable
import spray.json.JsValue

import net.ruippeixotog.scalafbp.component._

case object RepeatDelayed extends Component {
  val name = "core/RepeatDelayed"
  val description = "Forwards packets after a set delay"
  val icon = Some("clock-o")

  val inPort = InPort[JsValue]("in", "Packet to forward with a delay")
  val delayPort = InPort[Long]("delay", "Delay length (ms)")
  val inPorts = List(inPort, delayPort)

  val outPort = OutPort[JsValue]("out", "Forwarded packet")
  val outPorts = List(outPort)

  val instanceProps = Props(new ComponentActor(this) {
    val str = inPort.stream
      .withLatestFrom(delayPort.stream)((_, _))
      .flatMap { case (in, delay) => Observable.just(in).delay(delay.millis) }
      .pipeTo(outPort)
  })
} 
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:30,代码来源:RepeatDelayed.scala


示例7: MapConcat

//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.component.stream

import akka.actor.Props
import rx.lang.scala.Observable
import spray.json.{ JsArray, JsValue }

import net.ruippeixotog.scalafbp.component._
import net.ruippeixotog.scalafbp.util.NashornEngine

case object MapConcat extends Component {
  val name = "stream/MapConcat"
  val description = "Transforms the elements of a stream into arrays of elements and flatterns them"
  val icon = Some("code")

  val inPort = InPort[JsValue]("in", "The stream to transform")
  val funcPort = InPort[String]("func", "The function with argument x to use for transformation. " +
    "Must return an array. While not defined, all elements pass untouched.")
  val inPorts = List(inPort, funcPort)

  val outPort = OutPort[JsValue]("out", "The transformed stream")
  val outPorts = List(outPort)

  val instanceProps = Props(new ComponentActor(this) with NashornEngine {
    val defaultFunc = Observable.just[JsFunction](JsArray(_))

    val func = defaultFunc ++ funcPort.stream.map(JsFunction(_))
    inPort.stream.withLatestFrom(func) { (x, f) => f(x) }.flatMapIterable {
      case JsArray(elems) => elems
      case js => throw new IllegalArgumentException(
        s"The value ${js.compactPrint} returned by the function is not an array")
    }.pipeTo(outPort)
  })
} 
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:34,代码来源:MapConcat.scala


示例8: request

//设置package包名称以及导入依赖的类
package com.mishiranu.instantimage.async

import com.squareup.okhttp.{OkHttpClient, Request, Response}

import rx.lang.scala.Observable

import scala.language.implicitConversions

import java.util.concurrent.TimeUnit

trait RxHttp {
  private val client = new OkHttpClient()

  List(client.setConnectTimeout _, client.setReadTimeout _, client.setWriteTimeout _)
    .foreach(_(15000, TimeUnit.MILLISECONDS))

  private val userAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:51.0) Gecko/20100101 Firefox/51.0"

  def request(url: String): Request.Builder = {
    new Request.Builder().url(url).header("User-Agent", userAgent)
  }

  class ExecutableBuilder(builder: Request.Builder) {
    def execute(): Observable[Response] = Observable { e =>
      try {
        e.onNext(client.newCall(builder.build()).execute())
        e.onCompleted()
      } catch {
        case t: Throwable => e.onError(t)
      }
    }
  }

  implicit def executable(request: Request.Builder): ExecutableBuilder = new ExecutableBuilder(request)
} 
开发者ID:Mishiranu,项目名称:Instant-Image,代码行数:36,代码来源:RxHttp.scala


示例9: TokenReader

//设置package包名称以及导入依赖的类
package com.piotrglazar.receiptlottery.core

import com.piotrglazar.receiptlottery.Token
import org.springframework.beans.factory.annotation.{Value, Autowired}
import org.springframework.stereotype.Component
import rx.lang.scala.Observable

import scala.io.Source
import scala.util.{Failure, Success, Try}

@Component
class TokenReader @Autowired()(@Value("${token.file}") private val path: String) {

  def readTokens(): Observable[Token] = {
    readContent() match {
      case Success(items) => Observable.from(items)
      case Failure(e) => Observable.error(e)
    }
  }

  private def readContent(): Try[List[Token]] =
    Try {
      Source.fromInputStream(getClass.getResourceAsStream(path))
        .getLines()
        .filter(!_.isEmpty)
        .map(Token)
        .toList
    }
} 
开发者ID:piotrglazar,项目名称:receipt-lottery,代码行数:30,代码来源:TokenReader.scala


示例10: OfferMatcherLaunchTokensActor

//设置package包名称以及导入依赖的类
package mesosphere.marathon.core.flow.impl

import akka.actor.{ Actor, Cancellable, Props }
import mesosphere.marathon.core.flow.LaunchTokenConfig
import mesosphere.marathon.core.instance.update.{ InstanceChange, InstanceUpdated }
import mesosphere.marathon.core.matcher.manager.OfferMatcherManager
import mesosphere.marathon.core.task.bus.TaskChangeObservables
import rx.lang.scala.{ Observable, Subscription }

import scala.concurrent.duration._

private[flow] object OfferMatcherLaunchTokensActor {
  def props(
    conf: LaunchTokenConfig,
    taskStatusObservables: TaskChangeObservables,
    offerMatcherManager: OfferMatcherManager): Props = {
    Props(new OfferMatcherLaunchTokensActor(conf, taskStatusObservables, offerMatcherManager))
  }
}


private class OfferMatcherLaunchTokensActor(
  conf: LaunchTokenConfig,
  taskStatusObservables: TaskChangeObservables, offerMatcherManager: OfferMatcherManager)
    extends Actor {
  var taskStatusUpdateSubscription: Subscription = _
  var periodicSetToken: Cancellable = _

  override def preStart(): Unit = {
    val all: Observable[InstanceChange] = taskStatusObservables.forAll
    taskStatusUpdateSubscription = all.subscribe(self ! _)

    import context.dispatcher
    periodicSetToken = context.system.scheduler.schedule(0.seconds, conf.launchTokenRefreshInterval().millis)(
      offerMatcherManager.setLaunchTokens(conf.launchTokens())
    )
  }

  override def postStop(): Unit = {
    taskStatusUpdateSubscription.unsubscribe()
    periodicSetToken.cancel()
  }

  override def receive: Receive = {
    case InstanceUpdated(instance, _, _) if instance.isRunning && instance.state.healthy.fold(true)(_ == true) =>
      offerMatcherManager.addLaunchTokens(1)
  }
} 
开发者ID:xiaozai512,项目名称:marathon,代码行数:49,代码来源:OfferMatcherLaunchTokensActor.scala


示例11: refillOfferMatcherManagerLaunchTokens

//设置package包名称以及导入依赖的类
package mesosphere.marathon.core.flow

import akka.event.EventStream
import mesosphere.marathon.MarathonSchedulerDriverHolder
import mesosphere.marathon.core.base.Clock
import mesosphere.marathon.core.flow.impl.{ OfferReviverDelegate, OfferMatcherLaunchTokensActor, ReviveOffersActor }
import mesosphere.marathon.core.leadership.LeadershipModule
import mesosphere.marathon.core.matcher.manager.OfferMatcherManager
import mesosphere.marathon.core.task.bus.TaskChangeObservables
import org.slf4j.LoggerFactory
import rx.lang.scala.Observable


  def refillOfferMatcherManagerLaunchTokens(
    conf: LaunchTokenConfig,
    taskStatusObservables: TaskChangeObservables,
    offerMatcherManager: OfferMatcherManager): Unit =
    {
      lazy val offerMatcherLaunchTokensProps = OfferMatcherLaunchTokensActor.props(
        conf, taskStatusObservables, offerMatcherManager
      )
      leadershipModule.startWhenLeader(offerMatcherLaunchTokensProps, "offerMatcherLaunchTokens")

    }
} 
开发者ID:xiaozai512,项目名称:marathon,代码行数:26,代码来源:FlowModule.scala


示例12: TaskChangeObservablesImpl

//设置package包名称以及导入依赖的类
package mesosphere.marathon.core.task.bus.impl

import mesosphere.marathon.core.instance.update.InstanceChange
import mesosphere.marathon.core.task.bus.TaskChangeObservables
import mesosphere.marathon.state.PathId
import rx.lang.scala.{ Observable, Subscription }

private[bus] class TaskChangeObservablesImpl(eventStream: InternalTaskChangeEventStream)
    extends TaskChangeObservables {

  override def forAll: Observable[InstanceChange] = forRunSpecId(PathId.empty)

  override def forRunSpecId(appId: PathId): Observable[InstanceChange] = {
    Observable { observer =>
      observer.add(Subscription(eventStream.unsubscribe(observer, appId)))
      eventStream.subscribe(observer, appId)
    }
  }
} 
开发者ID:xiaozai512,项目名称:marathon,代码行数:20,代码来源:TaskChangeObservablesImpl.scala


示例13: RxScala

//设置package包名称以及导入依赖的类
package org.zalando.benchmarks

import akka.actor.ActorSystem
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.{ComputationScheduler, ExecutionContextScheduler}

class RxScala(system: ActorSystem) {
  import ComputationFollowedByAsyncPublishing._

  def benchmark: Unit = {
    // looks nice, not sure if correct, blows up the heap
    Observable
      .from(1 to numTasks map Job)
      .subscribeOn(ComputationScheduler())
      .map(Computer compute)
      .subscribeOn(ExecutionContextScheduler(system dispatcher))
      .flatMap(1024, r => Observable.from(Publisher publish (r, system))(system dispatcher))
      .foldLeft(0) { case (s, r) => s + computeResult(r) }
      .foreach(println)
  }
} 
开发者ID:narayana-glassbeam,项目名称:scala-concurrency-playground,代码行数:22,代码来源:RxScala.scala


示例14: StatsReportingDao

//设置package包名称以及导入依赖的类
package com.flipkart.connekt.commons.dao

import com.couchbase.client.java.Bucket
import com.couchbase.client.java.document.StringDocument
import com.couchbase.client.java.query.Query
import com.flipkart.connekt.commons.factories.{ConnektLogger, LogFile}
import rx.lang.scala.Observable

import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt


class StatsReportingDao(bucket: Bucket) extends Dao {

  val ttl = 15.days.toSeconds.toInt

  def put(kv: List[(String, Long)]) =
    Observable.from(kv).flatMap(kv => {
      rx.lang.scala.JavaConversions.toScalaObservable(bucket.async().upsert(StringDocument.create(kv._1, ttl, kv._2.toString)))
    }).last.toBlocking.single


  def get(keys: List[String]): Predef.Map[String, Long] = {
    Observable.from(keys).flatMap(key => {
      rx.lang.scala.JavaConversions.toScalaObservable(bucket.async().get(StringDocument.create(key))).filter(_ != null).map(d => key -> d.content().toLong)
    }).toList.toBlocking.single.toMap
  }

  def counter(kvList: List[(String, Long)]) = {
    Observable.from(kvList).flatMap(kv => {
      rx.lang.scala.JavaConversions.toScalaObservable(bucket.async().counter(kv._1, kv._2, kv._2, ttl))
    }).last.toBlocking.single
  }

  def prefix(prefixString: String): List[String] = {
    try {

      val queryResult = bucket.query(
        Query.simple(s"SELECT META(${bucket.name()}).id FROM ${bucket.name()} WHERE META(${bucket.name()}).id LIKE '$prefixString%'")
      )

      queryResult.iterator().asScala.map(queryResult => {
        queryResult.value().get("id").toString
      }).toList

    } catch {
      case e: Exception =>
        ConnektLogger(LogFile.SERVICE).error("StatsReportingDao prefix search failure", e)
        throw e
    }
  }
}

object StatsReportingDao {
  def apply(bucket: Bucket) = new StatsReportingDao(bucket)
} 
开发者ID:ayush03agarwal,项目名称:connekt,代码行数:57,代码来源:StatsReportingDao.scala


示例15: ObserveBlockingQueue

//设置package包名称以及导入依赖的类
package nl.knaw.dans.experiments.hazelcast

import java.util.concurrent.{BlockingQueue, TimeUnit}

import com.hazelcast.core.HazelcastInstanceNotActiveException
import rx.lang.scala.{Observable, Scheduler}
import rx.lang.scala.schedulers.NewThreadScheduler
import rx.lang.scala.subscriptions.CompositeSubscription

import scala.concurrent.duration.Duration

package object queue {

  implicit class ObserveBlockingQueue[T](val queue: BlockingQueue[T]) extends AnyVal {
    def observe(timeout: Duration, scheduler: Scheduler = NewThreadScheduler())(running: () => Boolean): Observable[T] = {
      Observable(subscriber => {
        val worker = scheduler.createWorker
        val subscription = worker.scheduleRec {
          try {
            if (running()) {
              val t = Option(queue.poll(timeout.toMillis, TimeUnit.MILLISECONDS))
              println(s"received: $t")
              t.foreach(subscriber.onNext)
            }
            else subscriber.onCompleted()
          }
          catch {
            case e: HazelcastInstanceNotActiveException => subscriber.onCompleted()
            case e: Throwable => println(s"  caught ${e.getClass.getSimpleName}: ${e.getMessage}"); subscriber.onError(e)
          }
        }

        subscriber.add(CompositeSubscription(subscription, worker))
      })
    }
  }
} 
开发者ID:rvanheest-DANS-KNAW,项目名称:Hazelcast-experiments,代码行数:38,代码来源:package.scala


示例16: RxController

//设置package包名称以及导入依赖的类
package controllers

import javax.inject.Singleton
import javax.inject.Inject
import play.api.mvc._
import play.Logger
import models._
import controllers._
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
import rx.lang.scala.subjects.PublishSubject
import services._
import scala.concurrent.Future

@Singleton
class RxController @Inject()(priceService: IPriceService) extends Controller {
    def prices = Action { implicit request =>
        Logger.info("RX called.")
        import scala.concurrent.ExecutionContext.Implicits.global
        val sourceObservable = priceService.generatePrices
        val rxResult = Observable.create {
            sourceObservable.subscribe
        }.subscribeOn(IOScheduler())
         .take(1)
         .flatMap { x => println(x) ; Observable.just(x) }
         .toBlocking
         .first
        Ok("RxScala Price suggested is = " + rxResult)
    }

    def pricesAsync = Action.async { implicit request =>
        Logger.info("RX Async called.")
        import play.api.libs.concurrent.Execution.Implicits.defaultContext
        val sourceObservable = priceService.generatePrices
        val rxResult = Observable.create {
            sourceObservable.subscribe
        }.subscribeOn(IOScheduler())
         .take(1)
         .flatMap { x => println(x) ; Observable.just(x) }
         .toBlocking
         .first
        Future { Ok("RxScala Price suggested is = " + rxResult)}
    }
} 
开发者ID:manojtc,项目名称:ReactiveWebStore,代码行数:45,代码来源:RxController.scala


示例17: generatePrices

//设置package包名称以及导入依赖的类
package services

import javax.inject.Singleton
import play.Logger
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
import rx.lang.scala.subjects.PublishSubject
import scala.concurrent.Future
import scala.util.Random.nextDouble

trait IPriceService {
    def generatePrices: Observable[Double]
}

@Singleton
class PriceService extends IPriceService {
    var doubleInfiniteStreamSubject = PublishSubject.apply[Double]()
    import scala.concurrent.ExecutionContext.Implicits.global
    Future {
        Stream.continually(nextDouble * 1000.0).foreach {
            x => Thread.sleep(1000)
            doubleInfiniteStreamSubject.onNext(x)
        }
    }

    override def generatePrices: Observable[Double] = {
        var observableEven = Observable.create {
            doubleInfiniteStreamSubject.subscribe
        }.subscribeOn(IOScheduler())
         .flatMap { x => Observable.from(Iterable.fill(1)(x + 10))}
         .filter { x => x.toInt % 2 != 0 }
        var observableOdd = Observable.create {
            doubleInfiniteStreamSubject.subscribe
        }.subscribeOn(IOScheduler())
         .flatMap { x => Observable.from(Iterable.fill(1)(x + 10))}
         .filter { x => x.toInt % 2 == 0}
        var mergeObservable = Observable
            .empty
            .subscribeOn(IOScheduler())
            .merge(observableEven)
            .merge(observableOdd)
            .take(10)
            .foldLeft(0.0)(_+_)
            .flatMap { x => Observable.just( x - (x * 0.9)) }
        return mergeObservable
    }
} 
开发者ID:manojtc,项目名称:ReactiveWebStore,代码行数:48,代码来源:PriceService.scala


示例18: PublishSubjectSpec

//设置package包名称以及导入依赖的类
package name.mikulskibartosz.rxscala.tests.subject

import name.mikulskibartosz.rxscala.tests.assert.Assertions._
import org.scalatest.{FlatSpec, Matchers}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.{Observable, Observer, Subscription}

class PublishSubjectSpec extends FlatSpec with Matchers {
  "A publish subject" should "emit all values emitted by the source after the subject subscribed to it" in {
    var observer: Observer[Int] = null
    val observable = Observable.create[Int](o => {
      observer = o //do not do it at home ;)
      o.onNext(1)
      o.onNext(2)
      Subscription()
    })

    when {
      val subject = PublishSubject[Int]()
      observable.subscribe(subject)
      subject
    } assert {
      subscriber => {
        observer.onNext(3)
        observer.onNext(4)
        subscriber.assertValues(3, 4)
      }
    }
  }

  //NOTE every subject can subscribe to more than one observable, but this one does not do anything interesting, therefore it is easy to explain its behaviour
  it should "emit values from multiple sources" in {
    var firstObserver: Observer[Int] = null
    val firstObservable = Observable.create[Int](o => {
      firstObserver = o
      Subscription()
    })

    var secondObserver: Observer[Int] = null
    val secondObservable = Observable.create[Int](o => {
      secondObserver = o
      Subscription()
    })

    when {
      val subject = PublishSubject[Int]()
      firstObservable.subscribe(subject)
      secondObservable.subscribe(subject)
      subject
    } assert {
      subscriber => {
        firstObserver.onNext(1)
        secondObserver.onNext(2)
        subscriber.assertValues(1, 2)
      }
    }
  }
} 
开发者ID:mikulskibartosz,项目名称:RXScalaMeetup,代码行数:59,代码来源:PublishSubjectSpec.scala


示例19: AsyncSubjectSpec

//设置package包名称以及导入依赖的类
package name.mikulskibartosz.rxscala.tests.subject

import org.scalatest.{FlatSpec, Matchers}
import rx.lang.scala.{Subscription, Observable}
import rx.lang.scala.subjects.AsyncSubject

import name.mikulskibartosz.rxscala.tests.assert.Assertions._

class AsyncSubjectSpec extends FlatSpec with Matchers {
  "An async subject" should "emit the last value emitted by the observable it observes" in {
    val observable = Observable.from(Array(1, 2, 3))

    when {
      val subject = AsyncSubject[Int]()
      observable.subscribe(subject)
      subject
    } assert {
      _.assertValue(3)
    }
  }

  it should "emit a failure if the observable has failed" in {
    val observable = Observable.error(new IllegalStateException())

    when {
      val subject = AsyncSubject[Int]()
      observable.subscribe(subject)
      subject
    } assert {
      _.assertError(classOf[IllegalStateException])
    }
  }

  it should "not emit a value if the observable has not completed" in {
    val observable = Observable.create[Int](o => {
      o.onNext(1)
      o.onNext(2)
      o.onNext(3)
      //NOTE the function does not call the onCompleted method

      Subscription()
    })

    when {
      val subject = AsyncSubject[Int]()
      observable.subscribe(subject)
      subject
    } assert {
      subscriber => {
        subscriber.assertNoValues()
        subscriber.assertNotCompleted()
      }
    }
  }
} 
开发者ID:mikulskibartosz,项目名称:RXScalaMeetup,代码行数:56,代码来源:AsyncSubjectSpec.scala


示例20: ReplaySubjectSpec

//设置package包名称以及导入依赖的类
package name.mikulskibartosz.rxscala.tests.subject

import name.mikulskibartosz.rxscala.tests.assert.Assertions._
import org.scalatest.{FlatSpec, Matchers}
import rx.lang.scala.subjects.ReplaySubject
import rx.lang.scala.{Observable, Observer, Subscription}

class ReplaySubjectSpec extends FlatSpec with Matchers {
  "A replay subject" should "emit all value emitted by its source" in {
    var observer: Observer[Int] = null
    val observable = Observable.create[Int](o => {
      observer = o //do not do it at home ;)
      o.onNext(1)
      o.onNext(2)
      Subscription()
    })

    when {
      val subject = ReplaySubject[Int]()
      observable.subscribe(subject)
      subject
    } assert {
      subscriber => {
        observer.onNext(3)
        observer.onNext(4)
        subscriber.assertValues(1, 2, 3, 4)
      }
    }
  }

  it should "cache a specified number of items emitted before subscribing and emit all added afterwards" in {
    var observer: Observer[Int] = null
    val observable = Observable.create[Int](o => {
      observer = o //do not do it at home ;)
      o.onNext(1)
      o.onNext(2)
      o.onNext(3)
      o.onNext(4)
      Subscription()
    })

    when {
      val subject = ReplaySubject.withSize[Int](3)
      observable.subscribe(subject)
      subject
    } assert {
      subscriber => {
        observer.onNext(5)
        observer.onNext(6)
        subscriber.assertValues(2, 3, 4, 5, 6)
      }
    }
  }
} 
开发者ID:mikulskibartosz,项目名称:RXScalaMeetup,代码行数:55,代码来源:ReplaySubjectSpec.scala



注:本文中的rx.lang.scala.Observable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Redirect类代码示例发布时间:2022-05-23
下一篇:
Scala HikariDataSource类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap