本文整理汇总了Python中pyspark.streaming.kafka.KafkaUtils类的典型用法代码示例。如果您正苦于以下问题:Python KafkaUtils类的具体用法?Python KafkaUtils怎么用?Python KafkaUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了KafkaUtils类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: stream
def stream(ssc):
zkQuorum = "localhost:2181"
topic = "topic1"
tweets = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
kstream = KafkaUtils.createDirectStream(ssc, topics = ['topic1'], kafkaParams = {"metadata.broker.list":"localhost:9092"})
tweets = tweets.map(lambda x: x[1].encode("ascii","ignore"))
return tweets
开发者ID:dataapplabTerm5,项目名称:SparkStreaming,代码行数:9,代码来源:ts.py
示例2: ss_direct_kafka_bucket_counter
def ss_direct_kafka_bucket_counter(brokers, topic, bucket_interval, output_msg, message_parse, valueDecoder=None):
"""Starts a Spark Streaming job from a Kafka input and parses message time
WARNING!! This function only works for spark 1.4.0+
Args:
brokers: the kafka broker that we look at for the topic
topic: the kafka topic for input
timeinterval: the time interval in seconds (int) that the job will
bucket
Returns:
None
"""
sc = SparkContext(appName="PythonKafkaBucketCounter")
ssc = StreamingContext(sc, timeinterval + 5)
if valueDecoder:
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=valueDecoder)
else:
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
interval_counts = lines.map(lambda line: (message_parse(line), 1)).reduceByKey(lambda a, b: a + b)
output_msg_func = output_msg(sc, ssc)
interval_counts.foreachRDD(output_msg_func)
ssc.start()
ssc.awaitTermination()
开发者ID:kelvinfann,项目名称:spark-streaming-kafka-bucket-counter,代码行数:32,代码来源:spark-streaming-kafka-bucket-counter.py
示例3: get_kafka_stream
def get_kafka_stream(topic, streaming_context):
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
app_name = streaming_context.sparkContext.appName
saved_offset_spec = offset_specifications.get_kafka_offsets(app_name)
if len(saved_offset_spec) < 1:
MonMetricsKafkaProcessor.log_debug(
"No saved offsets available..."
"connecting to kafka without specifying offsets")
kvs = KafkaUtils.createDirectStream(
streaming_context, [topic],
{"metadata.broker.list": cfg.CONF.messaging.brokers})
return kvs
else:
from_offsets = {}
for key, value in saved_offset_spec.items():
if key.startswith("%s_%s" % (app_name, topic)):
# spec_app_name = value.get_app_name()
spec_topic = value.get_topic()
spec_partition = int(value.get_partition())
# spec_from_offset = value.get_from_offset()
spec_until_offset = value.get_until_offset()
# composite_key = "%s_%s_%s" % (spec_app_name,
# spec_topic,
# spec_partition)
# partition = saved_offset_spec[composite_key]
from_offsets[
TopicAndPartition(spec_topic, spec_partition)
] = long(spec_until_offset)
MonMetricsKafkaProcessor.log_debug(
"get_kafka_stream: calling createDirectStream :"
" topic:{%s} : start " % topic)
for key, value in from_offsets.items():
MonMetricsKafkaProcessor.log_debug(
"get_kafka_stream: calling createDirectStream : "
"offsets : TopicAndPartition:{%s,%s}, value:{%s}" %
(str(key._topic), str(key._partition), str(value)))
MonMetricsKafkaProcessor.log_debug(
"get_kafka_stream: calling createDirectStream : "
"topic:{%s} : done" % topic)
kvs = KafkaUtils.createDirectStream(
streaming_context, [topic],
{"metadata.broker.list": cfg.CONF.messaging.brokers},
from_offsets)
return kvs
开发者ID:bigluster,项目名称:monasca-transform,代码行数:49,代码来源:mon_metrics_kafka.py
示例4: start
def start():
sconf = SparkConf()
sconf.set('spark.cores.max', 2)
sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
ssc = StreamingContext(sc, 2)
brokers = "192.192.0.27:9092"
topics = ['topic7']
kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})
lines1 = kafkaStreams_lines.map(lambda x: x[1]) # 注意 取tuple下的第二个即为接收到的kafka流
words = lines1.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordcounts = pairs.reduceByKey(lambda x, y: x + y)
wordcounts.saveAsTextFiles("/var/lib/hadoop-hdfs/spark-libin/kafka")
wordcounts.pprint()
# 统计生成的随机数的分布情况
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:25,代码来源:kafka_streaming_direct.py
示例5: stream
def stream(ssc, pwords, nwords, duration):
kstream = KafkaUtils.createDirectStream(ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
tweets = tweets.flatMap(lambda line: line.split(" "))
words = tweets.flatMap(lambda line: line.split(" "))
tweets = tweets.filter(lambda x: x in pwords or x in nwords)
tweets = tweets.map(lambda x: ("positive",1) if x in pwords else ("negative",1))
tweets = tweets.reduceByKey(lambda x,y: x+y)
tweets = tweets.updateStateByKey(updateFunction)
tweets.pprint()
pds = words.filter(lambda x: x in pwords)
nds = words.filter(lambda x: x in nwords)
plist=[]
nlist=[]
pds.foreachRDD(lambda t,rdd: plist.append(rdd.count()))
nds.foreachRDD(lambda t,rdd: nlist.append(rdd.count()))
counts = []
ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(duration)
ssc.stop(stopGraceFully=True)
for i in range(0,len(plist)):
counts.append((plist[i],nlist[i]))
return counts
开发者ID:ParinSanghavi,项目名称:Music-Recommender-System-using-Apache-Spark-and-Python,代码行数:30,代码来源:twitterStream.py
示例6: bro_parse
def bro_parse(zk,topic,db,db_table,num_of_workers):
app_name = "ONI-INGEST-{0}".format(topic)
wrks = int(num_of_workers)
# create spark context
sc = SparkContext(appName=app_name)
ssc = StreamingContext(sc,1)
sqc = HiveContext(sc)
# create DStream for each topic partition.
topic_dstreams = [ KafkaUtils.createStream(ssc, zk, app_name, {topic: 1}, keyDecoder=oni_decoder, valueDecoder=oni_decoder) for _ in range (wrks) ]
tp_stream = ssc.union(*topic_dstreams)
# Parallelism in Data Processing
#processingDStream = tp_stream(wrks)
# parse the RDD content.
proxy_logs = tp_stream.map(lambda x: proxy_parser(x[1]))
# save RDD into hive .
proxy_logs.foreachRDD(lambda x: save_to_hive(x,sqc,db,db_table,topic))
ssc.start()
ssc.awaitTermination()
开发者ID:Open-Network-Insight,项目名称:oni-ingest,代码行数:25,代码来源:bro_parser.py
示例7: main
def main():
if len(sys.argv) != 4:
print("Usage: kafka_wordcount.py <zk> <topic> <timeout>",
file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
timeout = None
if len(sys.argv) == 4:
zk, topic, timeout = sys.argv[1:]
timeout = int(timeout)
else:
zk, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(
ssc, zk, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: (line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b))
counts.pprint()
kwargs = {}
if timeout:
kwargs['timeout'] = timeout
ssc.start()
ssc.awaitTermination(**kwargs)
开发者ID:butterfy76,项目名称:sahara,代码行数:26,代码来源:spark-kafka-example.py
示例8: stream
def stream(ssc, pwords, nwords, duration):
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
# Each element of tweets will be the text of a tweet.
# You need to find the count of all the positive and negative words in these tweets.
# Keep track of a running total counts and print this at every time step (use the pprint function).
# YOUR CODE HERE
words=tweets.flatMap(lambda x: x.split(" ")).filter(lambda x: x in pwords or x in nwords)
wordPairs=words.map(lambda x: ("positive",1) if x in pwords else ("negative",1))
wordCount=wordPairs.reduceByKey(lambda x, y: x + y)
runningCounts = wordPairs.updateStateByKey(updateFunction)
runningCounts.pprint()
# Let the counts variable hold the word counts for all time steps
# You will need to use the foreachRDD function.
# For our implementation, counts looked like:
# [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
counts = []
wordCount.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(duration)
ssc.stop(stopGraceFully=True)
return counts
开发者ID:PragatiV,项目名称:Sentiment_Analysis_Spark,代码行数:32,代码来源:twitterStream.py
示例9: stream
def stream(ssc, pwords, nwords, duration):
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
# Print the first ten elements of each RDD generated in this DStream to the console
#tweets.pprint()
words = tweets.flatMap(lambda line: line.split(" "))
posNegPairs = words.map(lambda word: myMapping(word, pwords, nwords))
filteredPairs = posNegPairs.filter(lambda x: x[0] != "na")
posNegCounts = filteredPairs.reduceByKey(lambda x, y: x + y)
# Each element of tweets will be the text of a tweet.
# You need to find the count of all the positive and negative words in these tweets.
# Keep track of a running total counts and print this at every time step (use the pprint function).
cumulativeCounts = posNegCounts.updateStateByKey(myRunningUpdate)
cumulativeCounts.pprint()
# Let the counts variable hold the word counts for all time steps
# You will need to use the foreachRDD function.
# For our implementation, counts looked like:
# [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
counts = []
posNegCounts.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(duration)
ssc.stop(stopGraceFully=True)
return counts
开发者ID:himangshunits,项目名称:TinyProjectsOnMachineLeanring,代码行数:33,代码来源:twitterStream.py
示例10: test_kafka_direct_stream_transform_get_offsetRanges
def test_kafka_direct_stream_transform_get_offsetRanges(self):
"""Test the Python direct Kafka stream transform get offsetRanges."""
topic = self._randomTopic()
sendData = {"a": 1, "b": 2, "c": 3}
kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
"auto.offset.reset": "smallest"}
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
offsetRanges = []
def transformWithOffsetRanges(rdd):
for o in rdd.offsetRanges():
offsetRanges.append(o)
return rdd
# Test whether it is ok mixing KafkaTransformedDStream and TransformedDStream together,
# only the TransformedDstreams can be folded together.
stream.transform(transformWithOffsetRanges).map(lambda kv: kv[1]).count().pprint()
self.ssc.start()
self.wait_for(offsetRanges, 1)
self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
开发者ID:anitatailor,项目名称:spark,代码行数:26,代码来源:tests.py
示例11: stream
def stream(ssc, pwords, nwords, duration):
kstream = KafkaUtils.createDirectStream(ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list":'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
# Each element of tweets will be the text of a tweet.
# Need to find the count of all the positive and negative words in these tweets.
# Keep track of a running total counts and print this at every time step (use the pprint function).
pnTweets = tweets.flatMap(lambda line: line.split(" "))
pnTweetsPairs = pnTweets.map(lambda x: determine(x,pwords,nwords))
wordCounts = pnTweetsPairs.reduceByKey(lambda x, y: x + y)
totalCounts = pnTweetsPairs.updateStateByKey(updateFunction)
totalCounts.pprint()
# Let the counts variable hold the word counts for all time steps
# Need to use the foreachRDD function.
# For our implementation, counts looked like:
# [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
counts = []
wordCounts.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(duration)
ssc.stop(stopGraceFully=True)
# becaue counts include those neither ones
newCounts = []
for count in counts:
newCount = [item for item in count if item[0] == "positive" or item[0] =="negative"]
newCounts.insert(len(newCounts),newCount)
return newCounts
开发者ID:sxue2,项目名称:csc591,代码行数:34,代码来源:twitterStream.py
示例12: stream
def stream(ssc, pwords, nwords, duration):
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
# Each element of tweets will be the text of a tweet.
# You need to find the count of all the positive and negative words in these tweets.
# Keep track of a running total counts and print this at every time step (use the pprint function).
# YOUR CODE HERE
words = tweets.flatMap(lambda line: line.split(' ')) \
.map(lambda word: ('positive', 1) if word in pwords else ('negative', 1) if word in nwords else ('none', 1)) \
.filter(lambda x: x[0]=='positive' or x[0]=='negative') \
.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
def updateValues(values, count):
if count is None:
count = 0
return sum(values, count)
updatedWords = words.updateStateByKey(updateValues)
updatedWords.pprint()
# Let the counts variable hold the word counts for all time steps
# You will need to use the foreachRDD function.
# For our implementation, counts looked like:
# [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
counts = []
# YOURDSTREAMOBJECT.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
words.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(duration)
ssc.stop(stopGraceFully=True)
return counts
开发者ID:mandgerohit,项目名称:BI-CSC591,代码行数:35,代码来源:twitterStream.py
示例13: stream
def stream(ssc, pwords, nwords, duration):
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
#print "HELOKOJOJEORUBEORUBOUBEROUBNOUONEROJOEJRNOJENROJENFOJEFOEJFNOEFUNOEUFN"
#tweets.pprint()
# Each element of tweets will be the text of a tweet.
# You need to find the count of all the positive and negative words in these tweets.
# Keep track of a running total counts and print this at every time step (use the pprint function).
# YOUR CODE HERE
words = tweets.flatMap(lambda line: line.split(" "))
pairs = words.map(classifier).map(lambda word: (word, 1)).filter(lambda x: x[0] != 'none').reduceByKey(lambda a,b: a+b)
runningCounts = pairs.updateStateByKey(updateFunction)
runningCounts.pprint()
# Let the counts variable hold the word counts for all time steps
# You will need to use the foreachRDD function.
# For our implementation, counts looked like:
# [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
counts = []
pairs.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(duration)
ssc.stop(stopGraceFully=True)
#print counts
return counts
开发者ID:chintanpanchamia,项目名称:CSC591BusinessIntelligenceProjectSuite,代码行数:26,代码来源:twitterStream-cpancha.py
示例14: start
def start():
sconf = SparkConf()
sconf.set('spark.cores.max', 2)
sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
ssc = StreamingContext(sc, 2)
brokers = "localhost:9092"
topics = ['test']
kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})
lines1 = kafkaStreams_lines.map(lambda x: x[1]) # 注意 取tuple下的第二个即为接收到的kafka流
words = lines1.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordcounts = pairs.reduceByKey(lambda x, y: x + y)
print(wordcounts)
kafkaStreams_lines.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
wordcounts.pprint()
# 统计生成的随机数的分布情况
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:27,代码来源:kafka-direct.py
示例15: stream
def stream(ssc, pwords, nwords, duration):
kstream = KafkaUtils.createDirectStream(ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
pword_rdd=tweets.flatMap(lambda line: line.split(" ")).map(lambda word: ("positive",1) if word in pwords else ("positive",0)).reduceByKey(lambda a,b:a+b)
nword_rdd=tweets.flatMap(lambda line: line.split(" ")).map(lambda word: ("negative",1) if word in nwords else ("negative",0)).reduceByKey(lambda a,b:a+b)
# Each element of tweets will be the text of a tweet.
# You need to find the count of all the positive and negative words in these tweets.
# Keep track of a running total counts and print this at every time step (use the pprint function).
# make the plot on this rdd -combined_rdd
combined_rdd=pword_rdd.union(nword_rdd)
running_counts=combined_rdd.updateStateByKey(updateFunction)
# Let the counts variable hold the word counts for all time steps
# You will need to use the foreachRDD function.
# For our implementation, counts looked like:
# [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
counts = []
combined_rdd.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
# print "printing dstream"
running_counts.pprint()
# Start the computation
ssc.start()
ssc.awaitTerminationOrTimeout(duration)
ssc.stop(stopGraceFully=True)
return counts
开发者ID:prutha28,项目名称:Twitter-Sentiment-Analysis,代码行数:33,代码来源:twitterStream.py
示例16: start_spark
def start_spark(timeout=None, max_items_per_rdd_sent=None):
sc = SparkContext("local[4]", "twitter.trending")
ssc = StreamingContext(sc, 5)
ssc.checkpoint('hdfs://localhost:9000/user/spark/checkpoint/')
kafka_params = {
'zookeeper.connect': config.get('zookeeper', 'host'),
'group.id': config.get('kafka', 'group_id'),
'metadata.broker.list': config.get('kafka', 'hosts')
}
ksc = KafkaUtils.createDirectStream(ssc,
[config.get('kafka', 'topic')],
kafka_params)
hashtag_counts = get_word_counts(ksc)
filtered_tweet_count = filter_tweets(hashtag_counts)
send_dstream_data(filtered_tweet_count, max_items_per_rdd_sent)
ssc.start()
if timeout:
ssc.awaitTermination(timeout)
ssc.stop(stopSparkContext=True, stopGraceFully=True)
else:
ssc.awaitTermination()
开发者ID:joychugh,项目名称:learning-kafka,代码行数:25,代码来源:spark_example.py
示例17: main
def main():
sc = SparkContext(appName="IntrusionDetector")
ssc = StreamingContext(sc, batch_durations)
kvs = KafkaUtils.createDirectStream(ssc, [input_topic], {"metadata.broker.list": broker})
kvs.foreachRDD(processRDD)
ssc.start()
ssc.awaitTermination()
开发者ID:dfeldman,项目名称:intrusion-detector,代码行数:8,代码来源:processor.py
示例18: readSource
def readSource(ssc, di_in_conf_with_ds_conf, app_conf):
sourceType = di_in_conf_with_ds_conf['source.type']
if sourceType == 'kafka':
kafkaSimpleConsumerApiUsed = app_conf.get('kafka.simple.consumer.api.used', True)
if kafkaSimpleConsumerApiUsed:
topics = di_in_conf_with_ds_conf['topics']
if not isinstance(topics, list):
raise TypeError("topic should be list")
brokers = di_in_conf_with_ds_conf['metadata.broker.list']
kafkaParams = {"metadata.broker.list": brokers}
stream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams).map(lambda x: x[1])
else:
zkConnect = di_in_conf_with_ds_conf['zookeeper.connect']
groupId = app_conf['group.id']
numReceivers = app_conf.get('num.receivers', 1)
numConsumerFetchers = app_conf.get('num.consumer.fetchers')
topics = di_in_conf_with_ds_conf['topics']
topic_map = dict(zip(topics, numConsumerFetchers))
# streams = reduce(lambda x, y: x.union(y),
# map(KafkaUtils.createStream(ssc, zkConnect, groupId, topic_map),
# range(0, numReceivers)))
streams = [KafkaUtils.createStream(ssc, zkConnect, groupId, topic_map) for i in range(0, numReceivers)]
stream = ssc.union(streams).map(lambda x: x[1])
elif sourceType == 'hdfs':
path = di_in_conf_with_ds_conf['fs.defaultFS'] + '/' + di_in_conf_with_ds_conf['path']
stream = ssc.textFilesStream(path)
else:
raise Exception('Error: unsupported source.type = ' + sourceType)
num_repartition = app_conf.get('dataInterface.stream.repatition.partitions')
if num_repartition is None or not isinstance(num_repartition, int):
stream2 = stream
else:
stream2 = stream.repartition(num_repartition)
# 是否使用格式化插件类格式化
format_class_path = di_in_conf_with_ds_conf.get('format.class', '')
if format_class_path.strip() == '':
stream3 = stream2
else:
format_class_obj = get_class_obj(format_class_path)
stream3 = format_class_obj.format(stream2)
return stream3
开发者ID:tsingfu,项目名称:xuetangx-streaming-app,代码行数:46,代码来源:streaming_app_platform.py
示例19: stream
def stream(ssc, pwords, nwords, duration):
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))
# Each element of tweets will be the text of a tweet.
# You need to find the count of all the positive and negative words in these tweets.
# Keep track of a running total counts and print this at every time step (use the pprint function).
#tweets.pprint()
words = tweets.flatMap(lambda tweet:tweet.split(" "))
#words.pprint()
positive = words.filter(lambda x: (x in pwords))
negative = words.filter(lambda x: (x in nwords))
#positive.pprint()
#negative.pprint()
ppairs = positive.map(lambda p: ('positive', 1))
npairs = negative.map(lambda n: ('negative', 1))
pwordCounts = ppairs.reduceByKey(lambda x, y: x + y)
nwordCounts = npairs.reduceByKey(lambda x, y: x + y)
count = pwordCounts.union(nwordCounts)
#count.pprint()
#pwordCounts.pprint()
#nwordCounts.pprint()
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
prunningCounts = pwordCounts.updateStateByKey(updateFunction)
nrunningCounts = nwordCounts.updateStateByKey(updateFunction)
#prunningCounts.pprint()
#nrunningCounts.pprint()
total = prunningCounts.union(nrunningCounts)
total.pprint()
# Let the counts variable hold the word counts for all time steps
# You will need to use the foreachRDD function.
# For our implementation, counts looked like:
# [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
counts = []
count.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(duration)
ssc.stop(stopGraceFully=True)
return counts
开发者ID:alokrk,项目名称:twitterSentimentAnalysis,代码行数:57,代码来源:twitterStream.py
示例20: kafka_spark_streaming_sql_main
def kafka_spark_streaming_sql_main(app_name, brokers, topic, interval_seconds, sql_function):
sc = SparkContext(appName=app_name)
sqlContext = SQLContext(sc)
# ssc = StreamingContext(sc, interval_seconds)
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
kvs.foreachRDD(sql_function)
ssc.start()
ssc.awaitTermination()
开发者ID:clearclouds-spark,项目名称:spark-sql-py,代码行数:9,代码来源:http_util.py
注:本文中的pyspark.streaming.kafka.KafkaUtils类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论