• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Scala ReceiverInputDStream类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.streaming.dstream.ReceiverInputDStream的典型用法代码示例。如果您正苦于以下问题:Scala ReceiverInputDStream类的具体用法?Scala ReceiverInputDStream怎么用?Scala ReceiverInputDStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ReceiverInputDStream类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: RedditUtils

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.reddit

import com.github.catalystcode.fortis.spark.streaming.reddit.client.RedditClient
import com.github.catalystcode.fortis.spark.streaming.reddit.dto.RedditObject
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object RedditUtils {
  def createPageStream(redditAuth: RedditAuth,
                       keywords: Seq[String],
                       ssc: StreamingContext,
                       storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
                       pollingPeriodInSeconds: Int = 3,
                       subredit: Option[String] = None,
                       searchLimit: Int = 25,
                       searchResultType: Option[String] = Option("link")
  ): ReceiverInputDStream[RedditObject] = {
    return new RedditInputDStream(
      client = new RedditClient(redditAuth.applicationId, redditAuth.secret),
      keywords = keywords,
      ssc = ssc,
      storageLevel = storageLevel,
      subredit = subredit,
      searchLimit = searchLimit,
      searchResultType = searchResultType,
      pollingPeriodInSeconds = pollingPeriodInSeconds)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-reddit,代码行数:30,代码来源:RedditUtils.scala


示例2: TwitterInputDStream

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.streaming

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder
import twitter4j.{FilterQuery, Status}


class TwitterInputDStream(@transient ssc: StreamingContext,
                          twitterAuth: Option[Authorization],
                          filterQuery: Option[FilterQuery],
                          storageLevel: StorageLevel
                         ) extends ReceiverInputDStream[Status](ssc) {

  private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())

  private def createOAuthAuthorization(): Authorization = {
    new OAuthAuthorization(new ConfigurationBuilder().build())
  }

  override def getReceiver(): Receiver[Status] = {
    new TwitterReceiver(authorization, filterQuery, storageLevel)
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:29,代码来源:TwitterInputDStream.scala


示例3: FacebookPostReceiver

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.facebook

import java.util.Date

import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class FacebookPostReceiver(
  clients: Set[FacebookPageClient],
  pollingSchedule: PollingSchedule,
  storageLevel: StorageLevel,
  pollingWorkers: Int
) extends PollingReceiver[FacebookPost](pollingSchedule, pollingWorkers, storageLevel) with Logger {

  @volatile private var lastIngestedDate: Option[Date] = None

  override protected def poll(): Unit = {
    clients.par.foreach(_
      .loadNewFacebookPosts(lastIngestedDate)
      .filter(x => {
        logDebug(s"Got facebook ${x.post.getPermalinkUrl} from page ${x.pageId} time ${x.post.getCreatedTime}")
        isNew(x)
      })
      .foreach(x => {
        logInfo(s"Storing facebook ${x.post.getPermalinkUrl}")
        store(x)
        markStored(x)
      })
    )
  }

  private def isNew(item: FacebookPost) = {
    lastIngestedDate.isEmpty || item.post.getCreatedTime.after(lastIngestedDate.get)
  }

  private def markStored(item: FacebookPost): Unit = {
    if (isNew(item)) {
      lastIngestedDate = Some(item.post.getCreatedTime)
      logDebug(s"Updating last ingested date to ${item.post.getCreatedTime}")
    }
  }
}

class FacebookPostInputDStream(
  ssc: StreamingContext,
  clients: Set[FacebookPageClient],
  pollingSchedule: PollingSchedule,
  pollingWorkers: Int,
  storageLevel: StorageLevel
) extends ReceiverInputDStream[FacebookPost](ssc) {

  override def getReceiver(): Receiver[FacebookPost] = {
    logDebug("Creating facebook receiver")
    new FacebookPostReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-facebook,代码行数:62,代码来源:FacebookPostInputDStream.scala


示例4: FacebookCommentsReceiver

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.facebook

import java.util.Date

import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookComment
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class FacebookCommentsReceiver(
  clients: Set[FacebookPageClient],
  pollingSchedule: PollingSchedule,
  storageLevel: StorageLevel,
  pollingWorkers: Int
) extends PollingReceiver[FacebookComment](pollingSchedule, pollingWorkers, storageLevel) with Logger {

  @volatile private var lastIngestedDate: Option[Date] = None

  override protected def poll(): Unit = {
    clients.par.foreach(_
      .loadNewFacebookComments(lastIngestedDate)
      .filter(x => {
        logDebug(s"Got comment with id ${x.comment.getId} from page ${x.pageId}")
        isNew(x)
      })
      .foreach(x => {
        logInfo(s"Storing comment ${x.comment.getId} from page ${x.pageId}")
        store(x)
        markStored(x)
      })
    )
  }

  private def isNew(item: FacebookComment) = {
    lastIngestedDate.isEmpty || item.comment.getCreatedTime.after(lastIngestedDate.get)
  }

  private def markStored(item: FacebookComment): Unit = {
    if (isNew(item)) {
      lastIngestedDate = Some(item.comment.getCreatedTime)
      logDebug(s"Updating last ingested date to ${lastIngestedDate.get}")
    }
  }
}

class FacebookCommentsInputDStream(
  ssc: StreamingContext,
  clients: Set[FacebookPageClient],
  pollingSchedule: PollingSchedule,
  pollingWorkers: Int,
  storageLevel: StorageLevel
) extends ReceiverInputDStream[FacebookComment](ssc) {

  override def getReceiver(): Receiver[FacebookComment] = {
    logDebug("Creating facebook receiver")
    new FacebookCommentsReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-facebook,代码行数:62,代码来源:FacebookCommentsInputDStream.scala


示例5: InstagramReceiver

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.instagram

import com.github.catalystcode.fortis.spark.streaming.instagram.client.InstagramClient
import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class InstagramReceiver(
  client: InstagramClient,
  pollingSchedule: PollingSchedule,
  storageLevel: StorageLevel,
  pollingWorkers: Int
) extends PollingReceiver[InstagramItem](pollingSchedule, pollingWorkers, storageLevel) with Logger {

  @volatile private var lastIngestedEpoch = Long.MinValue

  override protected def poll(): Unit = {
    client
      .loadNewInstagrams()
      .filter(x => {
        val createdAt = x.created_time.toLong
        logDebug(s"Got instagram ${x.link} from time $createdAt")
        createdAt > lastIngestedEpoch
      })
      .foreach(x => {
        logInfo(s"Storing instagram ${x.link}")
        store(x)
        markStored(x)
      })
  }

  private def markStored(item: InstagramItem): Unit = {
    val itemCreatedAt = item.created_time.toLong
    if (itemCreatedAt > lastIngestedEpoch) {
      lastIngestedEpoch = itemCreatedAt
      logDebug(s"Updating last ingested epoch to $itemCreatedAt")
    }
  }
}

class InstagramInputDStream(
  ssc: StreamingContext,
  client: InstagramClient,
  pollingSchedule: PollingSchedule,
  pollingWorkers: Int,
  storageLevel: StorageLevel
) extends ReceiverInputDStream[InstagramItem](ssc) {

  override def getReceiver(): Receiver[InstagramItem] = {
    logDebug("Creating instagram receiver")
    new InstagramReceiver(client, pollingSchedule, storageLevel, pollingWorkers)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-instagram,代码行数:57,代码来源:InstagramInputDStream.scala


示例6: RadioInputDStream

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamwrappers.radio

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

class RadioInputDStream(
  ssc: StreamingContext,
  radioUrl: String,
  audioType: String,
  locale: String,
  subscriptionKey: String,
  speechType: String,
  outputFormat: String,
  storageLevel: StorageLevel
) extends ReceiverInputDStream[RadioTranscription](ssc) {
  override def getReceiver(): Receiver[RadioTranscription] = {
    logDebug("Creating radio transcription receiver")
    new TranscriptionReceiver(radioUrl, audioType, locale, subscriptionKey, speechType, outputFormat, storageLevel)
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:23,代码来源:RadioInputDStream.scala


示例7: BingUtils

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.bing

import java.util.concurrent.TimeUnit
import com.github.catalystcode.fortis.spark.streaming.PollingSchedule
import com.github.catalystcode.fortis.spark.streaming.bing.client.BingCustomSearchClient
import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object BingUtils {
  def createPageStream(
                        ssc: StreamingContext,
                        auth: BingAuth,
                        searchInstanceId: String,
                        keywords: Seq[String],
                        pollingSchedule: PollingSchedule = PollingSchedule(30, TimeUnit.SECONDS),
                        pollingWorkers: Int = 1,
                        storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
  ): ReceiverInputDStream[BingPost] = {
    new BingInputDStream(
      ssc = ssc,
      client = new BingCustomSearchClient(
        searchInstanceId = searchInstanceId,
        keywords = keywords,
        auth = auth),
      pollingSchedule = pollingSchedule,
      pollingWorkers = pollingWorkers,
      storageLevel = storageLevel)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-bing,代码行数:32,代码来源:BingUtils.scala


示例8: BingReceiver

//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.bing

import com.github.catalystcode.fortis.spark.streaming.bing.client.BingClient
import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class BingReceiver(
                            client: BingClient,
                            pollingSchedule: PollingSchedule,
                            storageLevel: StorageLevel,
                            pollingWorkers: Int
) extends PollingReceiver[BingPost](pollingSchedule, pollingWorkers, storageLevel) with Logger {

  @volatile private var lastIngestedDate = Long.MinValue

  override protected def poll(): Unit = {
    client
      .loadNewPostings
      .filter(x => {
        logDebug(s"Received Bing result ${x.name} from time ${x.dateLastCrawled}")
        isNew(x)
      })
      .foreach(x => {
        logInfo(s"Storing bing result ${x.url}")
        store(x)
        markStored(x)
      })
  }

  private def isNew(item: BingPost) = {
    val createdAt = item.dateLastCrawled.toLong
    createdAt > lastIngestedDate
  }

  private def markStored(item: BingPost): Unit = {
    val itemCreatedAt = item.dateLastCrawled.toLong

    if (isNew(item)) {
      lastIngestedDate = itemCreatedAt
      logDebug(s"Updating last ingested date to ${item.dateLastCrawled}")
    }
  }
}

class BingInputDStream(
                        ssc: StreamingContext,
                        client: BingClient,
                        pollingSchedule: PollingSchedule,
                        pollingWorkers: Int,
                        storageLevel: StorageLevel
) extends ReceiverInputDStream[BingPost](ssc) {

  override def getReceiver(): Receiver[BingPost] = {
    logDebug("Creating bing receiver")
    new BingReceiver(client, pollingSchedule, storageLevel, pollingWorkers)
  }
} 
开发者ID:CatalystCode,项目名称:streaming-bing,代码行数:62,代码来源:BingInputDStream.scala



注:本文中的org.apache.spark.streaming.dstream.ReceiverInputDStream类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala NaiveBayes类代码示例发布时间:2022-05-23
下一篇:
Scala ParseException类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap