• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python streaming.DStream类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.streaming.DStream的典型用法代码示例。如果您正苦于以下问题:Python DStream类的具体用法?Python DStream怎么用?Python DStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DStream类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: createStream

    def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
                     keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
        """
        Create an input stream that pulls messages from a Kafka Broker.

        :param ssc:  StreamingContext object
        :param zkQuorum:  Zookeeper quorum (hostname:port,hostname:port,..).
        :param groupId:  The group id for this consumer.
        :param topics:  Dict of (topic_name -> numPartitions) to consume.
                        Each partition is consumed in its own thread.
        :param kafkaParams: Additional params for Kafka
        :param storageLevel:  RDD storage level.
        :param keyDecoder:  A function used to decode key (default is utf8_decoder)
        :param valueDecoder:  A function used to decode value (default is utf8_decoder)
        :return: A DStream object
        """
        if kafkaParams is None:
            kafkaParams = dict()
        kafkaParams.update({
            "zookeeper.connect": zkQuorum,
            "group.id": groupId,
            "zookeeper.connection.timeout.ms": "10000",
        })
        if not isinstance(topics, dict):
            raise TypeError("topics should be dict")
        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
        helper = KafkaUtils._get_helper(ssc._sc)
        jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
        stream = DStream(jstream, ssc, ser)
        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
开发者ID:AnddyWang,项目名称:spark,代码行数:32,代码来源:kafka.py


示例2: createStream

    def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
                     storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
                     keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
        """
        Create an input stream that pulls messages from a Kafka Broker.

        :param ssc:  StreamingContext object
        :param zkQuorum:  Zookeeper quorum (hostname:port,hostname:port,..).
        :param groupId:  The group id for this consumer.
        :param topics:  Dict of (topic_name -> numPartitions) to consume.
                        Each partition is consumed in its own thread.
        :param kafkaParams: Additional params for Kafka
        :param storageLevel:  RDD storage level.
        :param keyDecoder:  A function used to decode key (default is utf8_decoder)
        :param valueDecoder:  A function used to decode value (default is utf8_decoder)
        :return: A DStream object
        """
        kafkaParams.update({
            "zookeeper.connect": zkQuorum,
            "group.id": groupId,
            "zookeeper.connection.timeout.ms": "10000",
        })
        if not isinstance(topics, dict):
            raise TypeError("topics should be dict")
        jtopics = MapConverter().convert(topics, ssc.sparkContext._gateway._gateway_client)
        jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

        try:
            # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
                .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
            helper = helperClass.newInstance()
            jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
        except Py4JJavaError as e:
            # TODO: use --jar once it also work on driver
            if 'ClassNotFoundException' in str(e.java_exception):
                print("""
________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...

________________________________________________________________________________________________

""" % (ssc.sparkContext.version, ssc.sparkContext.version))
            raise e
        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
        stream = DStream(jstream, ssc, ser)
        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
开发者ID:31z4,项目名称:spark,代码行数:60,代码来源:kafka.py


示例3: createStream

    def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
                     initialPositionInStream, checkpointInterval,
                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
                     awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder,
                     stsAssumeRoleArn=None, stsSessionName=None, stsExternalId=None):
        """
        Create an input stream that pulls messages from a Kinesis stream. This uses the
        Kinesis Client Library (KCL) to pull messages from Kinesis.

        .. note:: The given AWS credentials will get saved in DStream checkpoints if checkpointing
            is enabled. Make sure that your checkpoint directory is secure.

        :param ssc:  StreamingContext object
        :param kinesisAppName:  Kinesis application name used by the Kinesis Client Library (KCL) to
                                update DynamoDB
        :param streamName:  Kinesis stream name
        :param endpointUrl:  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
        :param regionName:  Name of region used by the Kinesis Client Library (KCL) to update
                            DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
        :param initialPositionInStream:  In the absence of Kinesis checkpoint info, this is the
                                         worker's initial starting position in the stream. The
                                         values are either the beginning of the stream per Kinesis'
                                         limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or
                                         the tip of the stream (InitialPositionInStream.LATEST).
        :param checkpointInterval:  Checkpoint interval for Kinesis checkpointing. See the Kinesis
                                    Spark Streaming documentation for more details on the different
                                    types of checkpoints.
        :param storageLevel:  Storage level to use for storing the received objects (default is
                              StorageLevel.MEMORY_AND_DISK_2)
        :param awsAccessKeyId:  AWS AccessKeyId (default is None. If None, will use
                                DefaultAWSCredentialsProviderChain)
        :param awsSecretKey:  AWS SecretKey (default is None. If None, will use
                              DefaultAWSCredentialsProviderChain)
        :param decoder:  A function used to decode value (default is utf8_decoder)
        :param stsAssumeRoleArn: ARN of IAM role to assume when using STS sessions to read from
                                 the Kinesis stream (default is None).
        :param stsSessionName: Name to uniquely identify STS sessions used to read from Kinesis
                               stream, if STS is being used (default is None).
        :param stsExternalId: External ID that can be used to validate against the assumed IAM
                              role's trust policy, if STS is being used (default is None).
        :return: A DStream object
        """
        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
        jduration = ssc._jduration(checkpointInterval)

        try:
            # Use KinesisUtilsPythonHelper to access Scala's KinesisUtils
            helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
        except TypeError as e:
            if str(e) == "'JavaPackage' object is not callable":
                KinesisUtils._printErrorMsg(ssc.sparkContext)
            raise
        jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
                                      regionName, initialPositionInStream, jduration, jlevel,
                                      awsAccessKeyId, awsSecretKey, stsAssumeRoleArn,
                                      stsSessionName, stsExternalId)
        stream = DStream(jstream, ssc, NoOpSerializer())
        return stream.map(lambda v: decoder(v))
开发者ID:CodingCat,项目名称:spark,代码行数:58,代码来源:kinesis.py


示例4: createStream

    def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
                     initialPositionInStream, checkpointInterval,
                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
                     awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder):
        """
        Create an input stream that pulls messages from a Kinesis stream. This uses the
        Kinesis Client Library (KCL) to pull messages from Kinesis.

        Note: The given AWS credentials will get saved in DStream checkpoints if checkpointing is
        enabled. Make sure that your checkpoint directory is secure.

        :param ssc:  StreamingContext object
        :param kinesisAppName:  Kinesis application name used by the Kinesis Client Library (KCL) to
                                update DynamoDB
        :param streamName:  Kinesis stream name
        :param endpointUrl:  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
        :param regionName:  Name of region used by the Kinesis Client Library (KCL) to update
                            DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
        :param initialPositionInStream:  In the absence of Kinesis checkpoint info, this is the
                                         worker's initial starting position in the stream. The
                                         values are either the beginning of the stream per Kinesis'
                                         limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or
                                         the tip of the stream (InitialPositionInStream.LATEST).
        :param checkpointInterval:  Checkpoint interval for Kinesis checkpointing. See the Kinesis
                                    Spark Streaming documentation for more details on the different
                                    types of checkpoints.
        :param storageLevel:  Storage level to use for storing the received objects (default is
                              StorageLevel.MEMORY_AND_DISK_2)
        :param awsAccessKeyId:  AWS AccessKeyId (default is None. If None, will use
                                DefaultAWSCredentialsProviderChain)
        :param awsSecretKey:  AWS SecretKey (default is None. If None, will use
                              DefaultAWSCredentialsProviderChain)
        :param decoder:  A function used to decode value (default is utf8_decoder)
        :return: A DStream object
        """
        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
        jduration = ssc._jduration(checkpointInterval)

        try:
            # Use KinesisUtilsPythonHelper to access Scala's KinesisUtils
            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
                .loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper")
            helper = helperClass.newInstance()
            jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
                                          regionName, initialPositionInStream, jduration, jlevel,
                                          awsAccessKeyId, awsSecretKey)
        except Py4JJavaError as e:
            if 'ClassNotFoundException' in str(e.java_exception):
                KinesisUtils._printErrorMsg(ssc.sparkContext)
            raise e
        stream = DStream(jstream, ssc, NoOpSerializer())
        return stream.map(lambda v: decoder(v))
开发者ID:0xqq,项目名称:spark,代码行数:52,代码来源:kinesis.py


示例5: createDirectStream

    def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
                           keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
        """
        .. note:: Experimental

        Create an input stream that directly pulls messages from a Kafka Broker and specific offset.

        This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
        in each batch duration and processed without storing.

        This does not use Zookeeper to store offsets. The consumed offsets are tracked
        by the stream itself. For interoperability with Kafka monitoring tools that depend on
        Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
        You can access the offsets used in each batch from the generated RDDs (see

        To recover from driver failures, you have to enable checkpointing in the StreamingContext.
        The information on consumed offset can be recovered from the checkpoint.
        See the programming guide for details (constraints, etc.).

        :param ssc:  StreamingContext object.
        :param topics:  list of topic_name to consume.
        :param kafkaParams: Additional params for Kafka.
        :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting
                            point of the stream.
        :param keyDecoder:  A function used to decode key (default is utf8_decoder).
        :param valueDecoder:  A function used to decode value (default is utf8_decoder).
        :return: A DStream object
        """
        if not isinstance(topics, list):
            raise TypeError("topics should be list")
        if not isinstance(kafkaParams, dict):
            raise TypeError("kafkaParams should be dict")

        try:
            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
                .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
            helper = helperClass.newInstance()

            jfromOffsets = dict([(k._jTopicAndPartition(helper),
                                  v) for (k, v) in fromOffsets.items()])
            jstream = helper.createDirectStream(ssc._jssc, kafkaParams, set(topics), jfromOffsets)
        except Py4JJavaError as e:
            if 'ClassNotFoundException' in str(e.java_exception):
                KafkaUtils._printErrorMsg(ssc.sparkContext)
            raise e

        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
        stream = DStream(jstream, ssc, ser)
        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
开发者ID:308306362,项目名称:spark,代码行数:49,代码来源:kafka.py


示例6: _toPythonDStream

    def _toPythonDStream(ssc, jstream, bodyDecoder):
        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
        stream = DStream(jstream, ssc, ser)

        def func(event):
            headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
            headers = {}
            strSer = UTF8Deserializer()
            for i in range(0, read_int(headersBytes)):
                key = strSer.loads(headersBytes)
                value = strSer.loads(headersBytes)
                headers[key] = value
            body = bodyDecoder(event[1])
            return (headers, body)
        return stream.map(func)
开发者ID:FavioVazquez,项目名称:spark,代码行数:15,代码来源:flume.py


示例7: createStream

    def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
                     storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
                     keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
        """
        Create an input stream that pulls messages from a Kafka Broker.

        :param ssc:  StreamingContext object
        :param zkQuorum:  Zookeeper quorum (hostname:port,hostname:port,..).
        :param groupId:  The group id for this consumer.
        :param topics:  Dict of (topic_name -> numPartitions) to consume.
                        Each partition is consumed in its own thread.
        :param kafkaParams: Additional params for Kafka
        :param storageLevel:  RDD storage level.
        :param keyDecoder:  A function used to decode key (default is utf8_decoder)
        :param valueDecoder:  A function used to decode value (default is utf8_decoder)
        :return: A DStream object
        """
        if kafkaParams is None:
            kafkaParams = dict()
        kafkaParams.update({
            "zookeeper.connect": zkQuorum,
            "group.id": groupId,
            "zookeeper.connection.timeout.ms": "10000",
        })
        if not isinstance(topics, dict):
            raise TypeError("topics should be dict")
        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

        try:
            # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
                .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
            helper = helperClass.newInstance()
            jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
        except Py4JJavaError as e:
            # TODO: use --jar once it also work on driver
            if 'ClassNotFoundException' in str(e.java_exception):
                KafkaUtils._printErrorMsg(ssc.sparkContext)
            raise e
        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
        stream = DStream(jstream, ssc, ser)
        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
开发者ID:BeforeRain,项目名称:spark,代码行数:42,代码来源:kafka.py


示例8: __init__

 def __init__(self, jdstream, ssc, jrdd_deserializer):
     DStream.__init__(self, jdstream, ssc, jrdd_deserializer)
开发者ID:BeforeRain,项目名称:spark,代码行数:2,代码来源:kafka.py


示例9: __init__

 def __init__(self, jdstream, ssc, jrdd_deserializer):
     warnings.warn(
         "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
         "See SPARK-21893.",
         DeprecationWarning)
     DStream.__init__(self, jdstream, ssc, jrdd_deserializer)
开发者ID:BaiBenny,项目名称:spark,代码行数:6,代码来源:kafka.py


示例10: SparkContext

from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", appName="jms py")
ssc = StreamingContext(sc, 5)

helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass("com.redhat.spark.streaming.jms.JMSUtilsPythonHelper")
helper = helperClass.newInstance()

jbrokerURL = "amqp://127.0.0.1:5672"
jqueuename = "default"
jlevel = ssc._sc._getJavaStorageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
jstream = helper.createStream(ssc._jssc, jbrokerURL, jqueuename, jlevel)

ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
utf8_decoder = lambda s: s and s.decode('utf-8')
keyDecoder = utf8_decoder
valueDecoder = utf8_decoder
a = stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))

def process(rdd):
   print rdd.count()

def protect(func):
   def _protect(rdd):
     if rdd.take(1):
       func(rdd)
   return _protect

a.foreachRDD(protect(process))
开发者ID:mattf,项目名称:spark-streaming-jms,代码行数:30,代码来源:example.py



注:本文中的pyspark.streaming.DStream类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python streaming.StreamingContext类代码示例发布时间:2022-05-27
下一篇:
Python window.Window类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap