• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python kafka.KafkaUtils类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.streaming.kafka.KafkaUtils的典型用法代码示例。如果您正苦于以下问题:Python KafkaUtils类的具体用法?Python KafkaUtils怎么用?Python KafkaUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了KafkaUtils类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: stream

def stream(ssc):

    zkQuorum = "localhost:2181"
    topic = "topic1"
    tweets = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    kstream = KafkaUtils.createDirectStream(ssc, topics = ['topic1'], kafkaParams = {"metadata.broker.list":"localhost:9092"})

    tweets = tweets.map(lambda x: x[1].encode("ascii","ignore"))
    return tweets
开发者ID:dataapplabTerm5,项目名称:SparkStreaming,代码行数:9,代码来源:ts.py


示例2: ss_direct_kafka_bucket_counter

def ss_direct_kafka_bucket_counter(brokers, topic, bucket_interval, output_msg, message_parse, valueDecoder=None):
    """Starts a Spark Streaming job from a Kafka input and parses message time

	WARNING!! This function only works for spark 1.4.0+ 

	Args:
		brokers: the kafka broker that we look at for the topic
		topic: the kafka topic for input
		timeinterval: the time interval in seconds (int) that the job will 
			bucket

	Returns:
		None
		
	"""
    sc = SparkContext(appName="PythonKafkaBucketCounter")
    ssc = StreamingContext(sc, timeinterval + 5)

    if valueDecoder:
        kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=valueDecoder)
    else:
        kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

    lines = kvs.map(lambda x: x[1])
    interval_counts = lines.map(lambda line: (message_parse(line), 1)).reduceByKey(lambda a, b: a + b)

    output_msg_func = output_msg(sc, ssc)

    interval_counts.foreachRDD(output_msg_func)

    ssc.start()
    ssc.awaitTermination()
开发者ID:kelvinfann,项目名称:spark-streaming-kafka-bucket-counter,代码行数:32,代码来源:spark-streaming-kafka-bucket-counter.py


示例3: get_kafka_stream

    def get_kafka_stream(topic, streaming_context):
        offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
        app_name = streaming_context.sparkContext.appName
        saved_offset_spec = offset_specifications.get_kafka_offsets(app_name)
        if len(saved_offset_spec) < 1:

            MonMetricsKafkaProcessor.log_debug(
                "No saved offsets available..."
                "connecting to kafka without specifying offsets")
            kvs = KafkaUtils.createDirectStream(
                streaming_context, [topic],
                {"metadata.broker.list": cfg.CONF.messaging.brokers})

            return kvs

        else:
            from_offsets = {}
            for key, value in saved_offset_spec.items():
                if key.startswith("%s_%s" % (app_name, topic)):
                    # spec_app_name = value.get_app_name()
                    spec_topic = value.get_topic()
                    spec_partition = int(value.get_partition())
                    # spec_from_offset = value.get_from_offset()
                    spec_until_offset = value.get_until_offset()
                    # composite_key = "%s_%s_%s" % (spec_app_name,
                    #                               spec_topic,
                    #                               spec_partition)
                    # partition = saved_offset_spec[composite_key]
                    from_offsets[
                        TopicAndPartition(spec_topic, spec_partition)
                    ] = long(spec_until_offset)

            MonMetricsKafkaProcessor.log_debug(
                "get_kafka_stream: calling createDirectStream :"
                " topic:{%s} : start " % topic)
            for key, value in from_offsets.items():
                MonMetricsKafkaProcessor.log_debug(
                    "get_kafka_stream: calling createDirectStream : "
                    "offsets : TopicAndPartition:{%s,%s}, value:{%s}" %
                    (str(key._topic), str(key._partition), str(value)))
            MonMetricsKafkaProcessor.log_debug(
                "get_kafka_stream: calling createDirectStream : "
                "topic:{%s} : done" % topic)

            kvs = KafkaUtils.createDirectStream(
                streaming_context, [topic],
                {"metadata.broker.list": cfg.CONF.messaging.brokers},
                from_offsets)
            return kvs
开发者ID:bigluster,项目名称:monasca-transform,代码行数:49,代码来源:mon_metrics_kafka.py


示例4: start

def start():
    sconf = SparkConf()
    sconf.set('spark.cores.max', 2)
    sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
    ssc = StreamingContext(sc, 2)

    brokers = "192.192.0.27:9092"
    topics = ['topic7']

    kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})

    lines1 = kafkaStreams_lines.map(lambda x: x[1])  # 注意 取tuple下的第二个即为接收到的kafka流

    words = lines1.flatMap(lambda line: line.split(" "))

    pairs = words.map(lambda word: (word, 1))

    wordcounts = pairs.reduceByKey(lambda x, y: x + y)

    wordcounts.saveAsTextFiles("/var/lib/hadoop-hdfs/spark-libin/kafka")

    wordcounts.pprint()
    # 统计生成的随机数的分布情况
    ssc.start()  # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:25,代码来源:kafka_streaming_direct.py


示例5: stream

def stream(ssc, pwords, nwords, duration):
    kstream = KafkaUtils.createDirectStream(ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
    tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
    tweets = tweets.flatMap(lambda line: line.split(" "))
    words = tweets.flatMap(lambda line: line.split(" "))
    tweets = tweets.filter(lambda x: x in pwords or x in nwords)
    tweets = tweets.map(lambda x: ("positive",1) if x in pwords else ("negative",1))
    tweets = tweets.reduceByKey(lambda x,y: x+y)
    tweets = tweets.updateStateByKey(updateFunction)
    tweets.pprint()

    pds = words.filter(lambda x: x in pwords)
    nds = words.filter(lambda x: x in nwords)

    plist=[]
    nlist=[]

    pds.foreachRDD(lambda t,rdd: plist.append(rdd.count()))    
    nds.foreachRDD(lambda t,rdd: nlist.append(rdd.count()))

    counts = []
  
    ssc.start()                         # Start the computation
    ssc.awaitTerminationOrTimeout(duration)
    ssc.stop(stopGraceFully=True)

    for i in range(0,len(plist)):
        counts.append((plist[i],nlist[i]))

    return counts
开发者ID:ParinSanghavi,项目名称:Music-Recommender-System-using-Apache-Spark-and-Python,代码行数:30,代码来源:twitterStream.py


示例6: bro_parse

def bro_parse(zk,topic,db,db_table,num_of_workers):
    
    app_name = "ONI-INGEST-{0}".format(topic)
    wrks = int(num_of_workers)

 	# create spark context
    sc = SparkContext(appName=app_name)
    ssc = StreamingContext(sc,1)
    sqc = HiveContext(sc)

    # create DStream for each topic partition.
    topic_dstreams = [ KafkaUtils.createStream(ssc, zk, app_name, {topic: 1}, keyDecoder=oni_decoder, valueDecoder=oni_decoder) for _ in range (wrks)  ] 
    tp_stream = ssc.union(*topic_dstreams)

    # Parallelism in Data Processing
    #processingDStream = tp_stream(wrks)

    # parse the RDD content.
    proxy_logs = tp_stream.map(lambda x: proxy_parser(x[1]))

    # save RDD into hive .
    proxy_logs.foreachRDD(lambda x: save_to_hive(x,sqc,db,db_table,topic))

    ssc.start()
    ssc.awaitTermination()
开发者ID:Open-Network-Insight,项目名称:oni-ingest,代码行数:25,代码来源:bro_parser.py


示例7: main

def main():
    if len(sys.argv) != 4:
        print("Usage: kafka_wordcount.py <zk> <topic> <timeout>",
              file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)
    timeout = None
    if len(sys.argv) == 4:
        zk, topic, timeout = sys.argv[1:]
        timeout = int(timeout)
    else:
        zk, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(
        ssc, zk, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: (line.split(" "))
                           .map(lambda word: (word, 1))
                           .reduceByKey(lambda a, b: a+b))
    counts.pprint()
    kwargs = {}
    if timeout:
        kwargs['timeout'] = timeout
    ssc.start()
    ssc.awaitTermination(**kwargs)
开发者ID:butterfy76,项目名称:sahara,代码行数:26,代码来源:spark-kafka-example.py


示例8: stream

def stream(ssc, pwords, nwords, duration):
    kstream = KafkaUtils.createDirectStream(
        ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
    tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))

    # Each element of tweets will be the text of a tweet.
    # You need to find the count of all the positive and negative words in these tweets.
    # Keep track of a running total counts and print this at every time step (use the pprint function).
    # YOUR CODE HERE
   
    words=tweets.flatMap(lambda x: x.split(" ")).filter(lambda x: x in pwords or x in nwords) 
    wordPairs=words.map(lambda x: ("positive",1) if x in pwords else ("negative",1))

    wordCount=wordPairs.reduceByKey(lambda x, y: x + y)
    
    runningCounts = wordPairs.updateStateByKey(updateFunction)

    runningCounts.pprint()
    

    # Let the counts variable hold the word counts for all time steps
    # You will need to use the foreachRDD function.
    # For our implementation, counts looked like:
    #   [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
    counts = []
    wordCount.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
    
    ssc.start()                         # Start the computation
    ssc.awaitTerminationOrTimeout(duration)
    ssc.stop(stopGraceFully=True)

    return counts
开发者ID:PragatiV,项目名称:Sentiment_Analysis_Spark,代码行数:32,代码来源:twitterStream.py


示例9: stream

def stream(ssc, pwords, nwords, duration):
    kstream = KafkaUtils.createDirectStream(
        ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
    tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))


    # Print the first ten elements of each RDD generated in this DStream to the console
    #tweets.pprint()
    words = tweets.flatMap(lambda line: line.split(" "))

    posNegPairs = words.map(lambda word: myMapping(word, pwords, nwords))
    filteredPairs = posNegPairs.filter(lambda x: x[0] != "na")
    posNegCounts = filteredPairs.reduceByKey(lambda x, y: x + y)


    # Each element of tweets will be the text of a tweet.
    # You need to find the count of all the positive and negative words in these tweets.
    # Keep track of a running total counts and print this at every time step (use the pprint function).

    cumulativeCounts = posNegCounts.updateStateByKey(myRunningUpdate)
    cumulativeCounts.pprint()    
    
    # Let the counts variable hold the word counts for all time steps
    # You will need to use the foreachRDD function.
    # For our implementation, counts looked like:
    #   [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
    counts = []
    posNegCounts.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
    
    ssc.start()                         # Start the computation
    ssc.awaitTerminationOrTimeout(duration)
    ssc.stop(stopGraceFully=True)
    return counts
开发者ID:himangshunits,项目名称:TinyProjectsOnMachineLeanring,代码行数:33,代码来源:twitterStream.py


示例10: test_kafka_direct_stream_transform_get_offsetRanges

    def test_kafka_direct_stream_transform_get_offsetRanges(self):
        """Test the Python direct Kafka stream transform get offsetRanges."""
        topic = self._randomTopic()
        sendData = {"a": 1, "b": 2, "c": 3}
        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
                       "auto.offset.reset": "smallest"}

        self._kafkaTestUtils.createTopic(topic)
        self._kafkaTestUtils.sendMessages(topic, sendData)

        stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)

        offsetRanges = []

        def transformWithOffsetRanges(rdd):
            for o in rdd.offsetRanges():
                offsetRanges.append(o)
            return rdd

        # Test whether it is ok mixing KafkaTransformedDStream and TransformedDStream together,
        # only the TransformedDstreams can be folded together.
        stream.transform(transformWithOffsetRanges).map(lambda kv: kv[1]).count().pprint()
        self.ssc.start()
        self.wait_for(offsetRanges, 1)

        self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
开发者ID:anitatailor,项目名称:spark,代码行数:26,代码来源:tests.py


示例11: stream

def stream(ssc, pwords, nwords, duration):
    kstream = KafkaUtils.createDirectStream(ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list":'localhost:9092'})
    tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))

    
    # Each element of tweets will be the text of a tweet.
    # Need to find the count of all the positive and negative words in these tweets.
    # Keep track of a running total counts and print this at every time step (use the pprint function).
    pnTweets = tweets.flatMap(lambda line: line.split(" "))
    pnTweetsPairs = pnTweets.map(lambda x: determine(x,pwords,nwords))
    wordCounts = pnTweetsPairs.reduceByKey(lambda x, y: x + y)
    
    totalCounts = pnTweetsPairs.updateStateByKey(updateFunction)
    totalCounts.pprint()
    # Let the counts variable hold the word counts for all time steps
    # Need to use the foreachRDD function.
    # For our implementation, counts looked like:
    #   [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
    counts = []
    wordCounts.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
    
    
    
    ssc.start()                         # Start the computation
    ssc.awaitTerminationOrTimeout(duration)
    ssc.stop(stopGraceFully=True)
    # becaue counts include those neither ones
    
    newCounts = []
    for count in counts:
        newCount = [item for item in count if item[0] == "positive" or item[0] =="negative"]
        newCounts.insert(len(newCounts),newCount)
    
    return newCounts
开发者ID:sxue2,项目名称:csc591,代码行数:34,代码来源:twitterStream.py


示例12: stream

def stream(ssc, pwords, nwords, duration):
    kstream = KafkaUtils.createDirectStream(
        ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
    tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))

    # Each element of tweets will be the text of a tweet.
    # You need to find the count of all the positive and negative words in these tweets.
    # Keep track of a running total counts and print this at every time step (use the pprint function).
    # YOUR CODE HERE
    words = tweets.flatMap(lambda line: line.split(' ')) \
            .map(lambda word: ('positive', 1) if word in pwords else ('negative', 1) if word in nwords else ('none', 1)) \
            .filter(lambda x: x[0]=='positive' or x[0]=='negative') \
            .reduceByKey(lambda x, y: x + y)
    # Print the first ten elements of each RDD generated in this DStream to the console
    def updateValues(values, count):
        if count is None:
            count = 0
        return sum(values, count)

    updatedWords = words.updateStateByKey(updateValues)
    updatedWords.pprint()
    
    # Let the counts variable hold the word counts for all time steps
    # You will need to use the foreachRDD function.
    # For our implementation, counts looked like:
    #   [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
    counts = []
    # YOURDSTREAMOBJECT.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
    words.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))

    ssc.start()                         # Start the computation
    ssc.awaitTerminationOrTimeout(duration)
    ssc.stop(stopGraceFully=True)

    return counts
开发者ID:mandgerohit,项目名称:BI-CSC591,代码行数:35,代码来源:twitterStream.py


示例13: stream

def stream(ssc, pwords, nwords, duration):
    kstream = KafkaUtils.createDirectStream(
        ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
    tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
    #print "HELOKOJOJEORUBEORUBOUBEROUBNOUONEROJOEJRNOJENROJENFOJEFOEJFNOEFUNOEUFN"
    #tweets.pprint()
    # Each element of tweets will be the text of a tweet.
    # You need to find the count of all the positive and negative words in these tweets.
    # Keep track of a running total counts and print this at every time step (use the pprint function).
    # YOUR CODE HERE
    words = tweets.flatMap(lambda line: line.split(" "))
    pairs = words.map(classifier).map(lambda word: (word, 1)).filter(lambda x: x[0] != 'none').reduceByKey(lambda a,b: a+b)
    runningCounts = pairs.updateStateByKey(updateFunction)
    runningCounts.pprint()
    # Let the counts variable hold the word counts for all time steps
    # You will need to use the foreachRDD function.
    # For our implementation, counts looked like:
    #   [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
    counts = []
    pairs.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
    
    ssc.start()                         # Start the computation
    ssc.awaitTerminationOrTimeout(duration)
    ssc.stop(stopGraceFully=True)
    #print counts
    return counts
开发者ID:chintanpanchamia,项目名称:CSC591BusinessIntelligenceProjectSuite,代码行数:26,代码来源:twitterStream-cpancha.py


示例14: start

def start():
    sconf = SparkConf()
    sconf.set('spark.cores.max', 2)
    sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
    ssc = StreamingContext(sc, 2)

    brokers = "localhost:9092"
    topics = ['test']

    kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})

    lines1 = kafkaStreams_lines.map(lambda x: x[1])  # 注意 取tuple下的第二个即为接收到的kafka流

    words = lines1.flatMap(lambda line: line.split(" "))

    pairs = words.map(lambda word: (word, 1))

    wordcounts = pairs.reduceByKey(lambda x, y: x + y)

    print(wordcounts)

    kafkaStreams_lines.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)

    wordcounts.pprint()
    # 统计生成的随机数的分布情况
    ssc.start()  # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:27,代码来源:kafka-direct.py


示例15: stream

def stream(ssc, pwords, nwords, duration):
    kstream = KafkaUtils.createDirectStream(ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})

    tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))

    pword_rdd=tweets.flatMap(lambda line: line.split(" ")).map(lambda word: ("positive",1) if word in pwords else ("positive",0)).reduceByKey(lambda a,b:a+b)
    nword_rdd=tweets.flatMap(lambda line: line.split(" ")).map(lambda word: ("negative",1) if word in nwords else ("negative",0)).reduceByKey(lambda a,b:a+b)

    # Each element of tweets will be the text of a tweet.
    # You need to find the count of all the positive and negative words in these tweets.
    # Keep track of a running total counts and print this at every time step (use the pprint function).
    # make the plot on this rdd -combined_rdd

    combined_rdd=pword_rdd.union(nword_rdd)
    running_counts=combined_rdd.updateStateByKey(updateFunction)
    
    # Let the counts variable hold the word counts for all time steps
    # You will need to use the foreachRDD function.
    # For our implementation, counts looked like:
    #   [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]

    counts = []
    combined_rdd.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
    
    # print "printing dstream"
    running_counts.pprint()
		
	# Start the computation
    ssc.start()                         
    ssc.awaitTerminationOrTimeout(duration)
    ssc.stop(stopGraceFully=True)

    return counts
开发者ID:prutha28,项目名称:Twitter-Sentiment-Analysis,代码行数:33,代码来源:twitterStream.py


示例16: start_spark

def start_spark(timeout=None, max_items_per_rdd_sent=None):
    sc = SparkContext("local[4]", "twitter.trending")
    ssc = StreamingContext(sc, 5)

    ssc.checkpoint('hdfs://localhost:9000/user/spark/checkpoint/')

    kafka_params = {
        'zookeeper.connect': config.get('zookeeper', 'host'),
        'group.id': config.get('kafka', 'group_id'),
        'metadata.broker.list': config.get('kafka', 'hosts')
    }

    ksc = KafkaUtils.createDirectStream(ssc,
                                        [config.get('kafka', 'topic')],
                                        kafka_params)

    hashtag_counts = get_word_counts(ksc)
    filtered_tweet_count = filter_tweets(hashtag_counts)
    send_dstream_data(filtered_tweet_count, max_items_per_rdd_sent)
    ssc.start()
    if timeout:
        ssc.awaitTermination(timeout)
        ssc.stop(stopSparkContext=True, stopGraceFully=True)
    else:
        ssc.awaitTermination()
开发者ID:joychugh,项目名称:learning-kafka,代码行数:25,代码来源:spark_example.py


示例17: main

def main():
    sc = SparkContext(appName="IntrusionDetector")
    ssc = StreamingContext(sc, batch_durations)

    kvs = KafkaUtils.createDirectStream(ssc, [input_topic], {"metadata.broker.list": broker})
    kvs.foreachRDD(processRDD)
    ssc.start()
    ssc.awaitTermination()
开发者ID:dfeldman,项目名称:intrusion-detector,代码行数:8,代码来源:processor.py


示例18: readSource

    def readSource(ssc, di_in_conf_with_ds_conf, app_conf):
        sourceType = di_in_conf_with_ds_conf['source.type']

        if sourceType == 'kafka':
            kafkaSimpleConsumerApiUsed = app_conf.get('kafka.simple.consumer.api.used', True)
            if kafkaSimpleConsumerApiUsed:
                topics = di_in_conf_with_ds_conf['topics']
                if not isinstance(topics, list):
                    raise TypeError("topic should be list")

                brokers = di_in_conf_with_ds_conf['metadata.broker.list']
                kafkaParams = {"metadata.broker.list": brokers}
                stream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams).map(lambda x: x[1])
            else:
                zkConnect = di_in_conf_with_ds_conf['zookeeper.connect']
                groupId = app_conf['group.id']
                numReceivers = app_conf.get('num.receivers', 1)
                numConsumerFetchers = app_conf.get('num.consumer.fetchers')
                topics = di_in_conf_with_ds_conf['topics']
                topic_map = dict(zip(topics, numConsumerFetchers))
                # streams = reduce(lambda x, y: x.union(y),
                #                  map(KafkaUtils.createStream(ssc, zkConnect, groupId, topic_map),
                #                      range(0, numReceivers)))
                streams = [KafkaUtils.createStream(ssc, zkConnect, groupId, topic_map) for i in range(0, numReceivers)]
                stream = ssc.union(streams).map(lambda x: x[1])
        elif sourceType == 'hdfs':
            path = di_in_conf_with_ds_conf['fs.defaultFS'] + '/' + di_in_conf_with_ds_conf['path']
            stream = ssc.textFilesStream(path)
        else:
            raise Exception('Error: unsupported source.type = ' + sourceType)

        num_repartition = app_conf.get('dataInterface.stream.repatition.partitions')
        if num_repartition is None or not isinstance(num_repartition, int):
            stream2 = stream
        else:
            stream2 = stream.repartition(num_repartition)

        # 是否使用格式化插件类格式化
        format_class_path = di_in_conf_with_ds_conf.get('format.class', '')
        if format_class_path.strip() == '':
            stream3 = stream2
        else:
            format_class_obj = get_class_obj(format_class_path)
            stream3 = format_class_obj.format(stream2)

        return stream3
开发者ID:tsingfu,项目名称:xuetangx-streaming-app,代码行数:46,代码来源:streaming_app_platform.py


示例19: stream

def stream(ssc, pwords, nwords, duration):
    kstream = KafkaUtils.createDirectStream(
        ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
    tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))

    # Each element of tweets will be the text of a tweet.
    # You need to find the count of all the positive and negative words in these tweets.
    # Keep track of a running total counts and print this at every time step (use the pprint function).
    

    #tweets.pprint()
    words = tweets.flatMap(lambda tweet:tweet.split(" "))
    #words.pprint()

    positive = words.filter(lambda x: (x in pwords))
    negative = words.filter(lambda x: (x in nwords))

    #positive.pprint()
    #negative.pprint()

    ppairs = positive.map(lambda p: ('positive', 1))
    npairs = negative.map(lambda n: ('negative', 1))

    pwordCounts = ppairs.reduceByKey(lambda x, y: x + y)
    nwordCounts = npairs.reduceByKey(lambda x, y: x + y)

    count = pwordCounts.union(nwordCounts)
    #count.pprint()
    #pwordCounts.pprint()
    #nwordCounts.pprint()

    def updateFunction(newValues, runningCount):
        if runningCount is None:
           runningCount = 0
        return sum(newValues, runningCount)

    prunningCounts = pwordCounts.updateStateByKey(updateFunction)
    nrunningCounts = nwordCounts.updateStateByKey(updateFunction)

    #prunningCounts.pprint()
    #nrunningCounts.pprint()

    total = prunningCounts.union(nrunningCounts)
    total.pprint()

    # Let the counts variable hold the word counts for all time steps
    # You will need to use the foreachRDD function.
    # For our implementation, counts looked like:
    #   [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
    counts = []
    count.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))

    ssc.start()                         # Start the computation
    ssc.awaitTerminationOrTimeout(duration)
    ssc.stop(stopGraceFully=True)

    return counts
开发者ID:alokrk,项目名称:twitterSentimentAnalysis,代码行数:57,代码来源:twitterStream.py


示例20: kafka_spark_streaming_sql_main

def kafka_spark_streaming_sql_main(app_name, brokers, topic, interval_seconds, sql_function):
    sc = SparkContext(appName=app_name)
    sqlContext = SQLContext(sc)
    # ssc = StreamingContext(sc, interval_seconds)
    ssc = StreamingContext(sc, 10)
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(sql_function)
    ssc.start()
    ssc.awaitTermination()
开发者ID:clearclouds-spark,项目名称:spark-sql-py,代码行数:9,代码来源:http_util.py



注:本文中的pyspark.streaming.kafka.KafkaUtils类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python tests.ReusedPySparkTestCase类代码示例发布时间:2022-05-27
下一篇:
Python context.StreamingContext类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap