• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Scala Receiver类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.streaming.receiver.Receiver的典型用法代码示例。如果您正苦于以下问题:Scala Receiver类的具体用法?Scala Receiver怎么用?Scala Receiver使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Receiver类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: FqueueStreamingReceiver

//设置package包名称以及导入依赖的类
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket

import Fqueue.FqueueReceiver
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver


class FqueueStreamingReceiver(val address: String, val connectionPoolSize: Int, val timeOut: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
  private var receiver: Option[FqueueReceiver] = None

  def onStart() {
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop(): Unit = {
    receiver foreach { _.stop() }
  }

  private def receive(): Unit = {
    val fqueueReceiver = new FqueueReceiver(address, connectionPoolSize, timeOut)
    receiver = Some(fqueueReceiver)
    receiver foreach { _.connect() }

    try
    {
      var stop = false
      while (!isStopped() && !stop) {
        val data = fqueueReceiver.deQueue("track_BOdao2015*")
        data match {
          case Some(str) => store(str)
          case None => Thread.sleep(1000)//stop = true
        }
      }
      receiver foreach { _.stop() }
    } catch {
      case e: Exception =>
        println("get data from fqueue err! pleace sure the server is live")
        println(e.getMessage)
        println(e.getStackTraceString)
        receiver foreach { _.stop() }
    }
  }
} 
开发者ID:TopSpoofer,项目名称:FqueueStreamingReceiver,代码行数:49,代码来源:FqueueStreamingReceiver.scala


示例2: JMSInputDStream

//设置package包名称以及导入依赖的类
package com.redhat.spark.streaming.jms

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver

private[streaming]
class JMSInputDStream(
                       @transient ssc_ : StreamingContext,
                       brokerURL: String,
                       username: String,
                       password: String,
                       queuename: String,
                       selector: String,
                       storageLevel: StorageLevel
                       ) extends ReceiverInputDStream[JMSEvent](ssc_) {

  override def getReceiver(): Receiver[JMSEvent] = {
    new JMSReceiver(brokerURL, username, password, queuename, selector, storageLevel)
  }
} 
开发者ID:xiaoJacky,项目名称:sparkLearning,代码行数:23,代码来源:JMSInputDStream.scala


示例3: TwitterInputDStream

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.streaming

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder
import twitter4j.{FilterQuery, Status}


class TwitterInputDStream(@transient ssc: StreamingContext,
                          twitterAuth: Option[Authorization],
                          filterQuery: Option[FilterQuery],
                          storageLevel: StorageLevel
                         ) extends ReceiverInputDStream[Status](ssc) {

  private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())

  private def createOAuthAuthorization(): Authorization = {
    new OAuthAuthorization(new ConfigurationBuilder().build())
  }

  override def getReceiver(): Receiver[Status] = {
    new TwitterReceiver(authorization, filterQuery, storageLevel)
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:29,代码来源:TwitterInputDStream.scala


示例4: FacebookPostReceiver

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.facebook

import java.util.Date

import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class FacebookPostReceiver(
  clients: Set[FacebookPageClient],
  pollingSchedule: PollingSchedule,
  storageLevel: StorageLevel,
  pollingWorkers: Int
) extends PollingReceiver[FacebookPost](pollingSchedule, pollingWorkers, storageLevel) with Logger {

  @volatile private var lastIngestedDate: Option[Date] = None

  override protected def poll(): Unit = {
    clients.par.foreach(_
      .loadNewFacebookPosts(lastIngestedDate)
      .filter(x => {
        logDebug(s"Got facebook ${x.post.getPermalinkUrl} from page ${x.pageId} time ${x.post.getCreatedTime}")
        isNew(x)
      })
      .foreach(x => {
        logInfo(s"Storing facebook ${x.post.getPermalinkUrl}")
        store(x)
        markStored(x)
      })
    )
  }

  private def isNew(item: FacebookPost) = {
    lastIngestedDate.isEmpty || item.post.getCreatedTime.after(lastIngestedDate.get)
  }

  private def markStored(item: FacebookPost): Unit = {
    if (isNew(item)) {
      lastIngestedDate = Some(item.post.getCreatedTime)
      logDebug(s"Updating last ingested date to ${item.post.getCreatedTime}")
    }
  }
}

class FacebookPostInputDStream(
  ssc: StreamingContext,
  clients: Set[FacebookPageClient],
  pollingSchedule: PollingSchedule,
  pollingWorkers: Int,
  storageLevel: StorageLevel
) extends ReceiverInputDStream[FacebookPost](ssc) {

  override def getReceiver(): Receiver[FacebookPost] = {
    logDebug("Creating facebook receiver")
    new FacebookPostReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-facebook,代码行数:62,代码来源:FacebookPostInputDStream.scala


示例5: FacebookCommentsReceiver

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.facebook

import java.util.Date

import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookComment
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class FacebookCommentsReceiver(
  clients: Set[FacebookPageClient],
  pollingSchedule: PollingSchedule,
  storageLevel: StorageLevel,
  pollingWorkers: Int
) extends PollingReceiver[FacebookComment](pollingSchedule, pollingWorkers, storageLevel) with Logger {

  @volatile private var lastIngestedDate: Option[Date] = None

  override protected def poll(): Unit = {
    clients.par.foreach(_
      .loadNewFacebookComments(lastIngestedDate)
      .filter(x => {
        logDebug(s"Got comment with id ${x.comment.getId} from page ${x.pageId}")
        isNew(x)
      })
      .foreach(x => {
        logInfo(s"Storing comment ${x.comment.getId} from page ${x.pageId}")
        store(x)
        markStored(x)
      })
    )
  }

  private def isNew(item: FacebookComment) = {
    lastIngestedDate.isEmpty || item.comment.getCreatedTime.after(lastIngestedDate.get)
  }

  private def markStored(item: FacebookComment): Unit = {
    if (isNew(item)) {
      lastIngestedDate = Some(item.comment.getCreatedTime)
      logDebug(s"Updating last ingested date to ${lastIngestedDate.get}")
    }
  }
}

class FacebookCommentsInputDStream(
  ssc: StreamingContext,
  clients: Set[FacebookPageClient],
  pollingSchedule: PollingSchedule,
  pollingWorkers: Int,
  storageLevel: StorageLevel
) extends ReceiverInputDStream[FacebookComment](ssc) {

  override def getReceiver(): Receiver[FacebookComment] = {
    logDebug("Creating facebook receiver")
    new FacebookCommentsReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-facebook,代码行数:62,代码来源:FacebookCommentsInputDStream.scala


示例6: PollingSchedule

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming

import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

case class PollingSchedule(interval: Long, unit: TimeUnit, initialDelay: Long = 1)

// Taken from https://github.com/CatalystCode/streaming-instagram/blob/3873a197212ba5929dd54ec4949f3d1ac10ffc1f/src/main/scala/com/github/catalystcode/fortis/spark/streaming/PollingReceiver.scala
// Put this into a shared library at some point
abstract class PollingReceiver[T](
 pollingSchedule: PollingSchedule,
 pollingWorkers: Int,
 storageLevel: StorageLevel
) extends Receiver[T](storageLevel) {

  private var threadPool: ScheduledThreadPoolExecutor = _

  def onStart(): Unit = {
    threadPool = new ScheduledThreadPoolExecutor(pollingWorkers)

    val pollingThread = new Thread("Polling thread") {
      override def run(): Unit = {
        poll()
      }
    }

    threadPool.scheduleAtFixedRate(
      pollingThread, pollingSchedule.initialDelay,
      pollingSchedule.interval, pollingSchedule.unit)
  }

  def onStop(): Unit = {
    if (threadPool != null) {
      threadPool.shutdown()
    }
  }

  protected def poll(): Unit
} 
开发者ID:CatalystCode,项目名称:streaming-facebook,代码行数:42,代码来源:PollingReceiver.scala


示例7: PollingSchedule

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming

import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

case class PollingSchedule(interval: Long, unit: TimeUnit, initialDelay: Long = 1)

abstract class PollingReceiver[T](
 pollingSchedule: PollingSchedule,
 pollingWorkers: Int,
 storageLevel: StorageLevel
) extends Receiver[T](storageLevel) {

  private var threadPool: ScheduledThreadPoolExecutor = _

  def onStart(): Unit = {
    threadPool = new ScheduledThreadPoolExecutor(pollingWorkers)

    val pollingThread = new Thread("Polling thread") {
      override def run(): Unit = {
        poll()
      }
    }

    threadPool.scheduleAtFixedRate(
      pollingThread, pollingSchedule.initialDelay,
      pollingSchedule.interval, pollingSchedule.unit)
  }

  def onStop(): Unit = {
    if (threadPool != null) {
      threadPool.shutdown()
    }
  }

  protected def poll(): Unit
} 
开发者ID:CatalystCode,项目名称:streaming-instagram,代码行数:40,代码来源:PollingReceiver.scala


示例8: InstagramReceiver

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.instagram

import com.github.catalystcode.fortis.spark.streaming.instagram.client.InstagramClient
import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class InstagramReceiver(
  client: InstagramClient,
  pollingSchedule: PollingSchedule,
  storageLevel: StorageLevel,
  pollingWorkers: Int
) extends PollingReceiver[InstagramItem](pollingSchedule, pollingWorkers, storageLevel) with Logger {

  @volatile private var lastIngestedEpoch = Long.MinValue

  override protected def poll(): Unit = {
    client
      .loadNewInstagrams()
      .filter(x => {
        val createdAt = x.created_time.toLong
        logDebug(s"Got instagram ${x.link} from time $createdAt")
        createdAt > lastIngestedEpoch
      })
      .foreach(x => {
        logInfo(s"Storing instagram ${x.link}")
        store(x)
        markStored(x)
      })
  }

  private def markStored(item: InstagramItem): Unit = {
    val itemCreatedAt = item.created_time.toLong
    if (itemCreatedAt > lastIngestedEpoch) {
      lastIngestedEpoch = itemCreatedAt
      logDebug(s"Updating last ingested epoch to $itemCreatedAt")
    }
  }
}

class InstagramInputDStream(
  ssc: StreamingContext,
  client: InstagramClient,
  pollingSchedule: PollingSchedule,
  pollingWorkers: Int,
  storageLevel: StorageLevel
) extends ReceiverInputDStream[InstagramItem](ssc) {

  override def getReceiver(): Receiver[InstagramItem] = {
    logDebug("Creating instagram receiver")
    new InstagramReceiver(client, pollingSchedule, storageLevel, pollingWorkers)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-instagram,代码行数:57,代码来源:InstagramInputDStream.scala


示例9: CustomReceiver

//设置package包名称以及导入依赖的类
package com.knoldus.receiver


import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

object CustomReceiver extends App {

  val sparkConf = new SparkConf().setAppName("CustomReceiver")
  val ssc = new StreamingContext(sparkConf, Seconds(1))

  val lines = ssc.receiverStream(new CustomReceiver(args(0)))
  val words = lines.flatMap(_.split(" "))
  val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  wordCounts.print()

  ssc.start()
  ssc.awaitTermination()
}


class CustomReceiver(path: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
    new Thread("File Reader") {
      override def run() {
        receive()
      }
    }.start()
  }

  def onStop() {}

  private def receive() =
    try {
      println("Reading file " + path)
      val reader = new BufferedReader(
        new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8))
      var userInput = reader.readLine()
      while (!isStopped && userInput != null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      println("Stopped receiving")
      restart("Trying to connect again")
    } catch {
      case ex: Exception =>
        restart("Error reading file " + path, ex)
    }

} 
开发者ID:knoldus,项目名称:spark-streaming-meetup,代码行数:58,代码来源:CustomReceiver.scala


示例10: TranscriptionReceiver

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamwrappers.radio

import java.io.InputStream
import java.net.URL
import java.util.Locale
import java.util.function.Consumer

import com.github.catalystcode.fortis.speechtotext.Transcriber
import com.github.catalystcode.fortis.speechtotext.config.{OutputFormat, SpeechServiceConfig, SpeechType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

class TranscriptionReceiver(
  radioUrl: String,
  audioType: String,
  locale: String,
  subscriptionKey: String,
  speechType: String,
  outputFormat: String,
  storageLevel: StorageLevel
) extends Receiver[RadioTranscription](storageLevel) {

  private val language = new Locale(locale).getLanguage
  private var audioStream: InputStream = _
  private var transcriber: Transcriber = _

  private val onTranscription = new Consumer[String] {
    override def accept(text: String): Unit = {
      val transcription = RadioTranscription(text = text, language = language, radioUrl = radioUrl)
      store(transcription)
    }
  }

  private val onHypothesis = new Consumer[String] {
    override def accept(hypothesis: String): Unit = {
      // do nothing
    }
  }

  override def onStart(): Unit = {
    val config = new SpeechServiceConfig(
      subscriptionKey,
      SpeechType.valueOf(speechType),
      OutputFormat.valueOf(outputFormat),
      new Locale(locale))

    transcriber = Transcriber.create(audioType, config)
    audioStream = new URL(radioUrl).openConnection.getInputStream
    transcriber.transcribe(audioStream, onTranscription, onHypothesis)
  }

  override def onStop(): Unit = {
    if (audioStream != null) audioStream.close()
    if (transcriber != null) transcriber = null
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:57,代码来源:TranscriptionReceiver.scala


示例11: RadioInputDStream

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamwrappers.radio

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

class RadioInputDStream(
  ssc: StreamingContext,
  radioUrl: String,
  audioType: String,
  locale: String,
  subscriptionKey: String,
  speechType: String,
  outputFormat: String,
  storageLevel: StorageLevel
) extends ReceiverInputDStream[RadioTranscription](ssc) {
  override def getReceiver(): Receiver[RadioTranscription] = {
    logDebug("Creating radio transcription receiver")
    new TranscriptionReceiver(radioUrl, audioType, locale, subscriptionKey, speechType, outputFormat, storageLevel)
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:23,代码来源:RadioInputDStream.scala


示例12: PollingSource

//设置package包名称以及导入依赖的类
package com.github.btmorr
package tutorials.spark.stream

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

class PollingSource(sleepSeconds: Int, uri: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Dummy Source") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }

  
  private def receive() {
    while(!isStopped()) {
      store(uri) // this just takes whatever the uri is and emits it over and over
      Thread.sleep( sleepSeconds * 1000 )
    }
  }
} 
开发者ID:btmorr,项目名称:scala-boost,代码行数:29,代码来源:PollingSource.scala


示例13: BingReceiver

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.bing

import com.github.catalystcode.fortis.spark.streaming.bing.client.BingClient
import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class BingReceiver(
                            client: BingClient,
                            pollingSchedule: PollingSchedule,
                            storageLevel: StorageLevel,
                            pollingWorkers: Int
) extends PollingReceiver[BingPost](pollingSchedule, pollingWorkers, storageLevel) with Logger {

  @volatile private var lastIngestedDate = Long.MinValue

  override protected def poll(): Unit = {
    client
      .loadNewPostings
      .filter(x => {
        logDebug(s"Received Bing result ${x.name} from time ${x.dateLastCrawled}")
        isNew(x)
      })
      .foreach(x => {
        logInfo(s"Storing bing result ${x.url}")
        store(x)
        markStored(x)
      })
  }

  private def isNew(item: BingPost) = {
    val createdAt = item.dateLastCrawled.toLong
    createdAt > lastIngestedDate
  }

  private def markStored(item: BingPost): Unit = {
    val itemCreatedAt = item.dateLastCrawled.toLong

    if (isNew(item)) {
      lastIngestedDate = itemCreatedAt
      logDebug(s"Updating last ingested date to ${item.dateLastCrawled}")
    }
  }
}

class BingInputDStream(
                        ssc: StreamingContext,
                        client: BingClient,
                        pollingSchedule: PollingSchedule,
                        pollingWorkers: Int,
                        storageLevel: StorageLevel
) extends ReceiverInputDStream[BingPost](ssc) {

  override def getReceiver(): Receiver[BingPost] = {
    logDebug("Creating bing receiver")
    new BingReceiver(client, pollingSchedule, storageLevel, pollingWorkers)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-bing,代码行数:62,代码来源:BingInputDStream.scala



注:本文中的org.apache.spark.streaming.receiver.Receiver类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ZkUtils类代码示例发布时间:2022-05-23
下一篇:
Scala ListBuffer类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap