本文整理汇总了Python中pyspark.streaming.StreamingContext类的典型用法代码示例。如果您正苦于以下问题:Python StreamingContext类的具体用法?Python StreamingContext怎么用?Python StreamingContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了StreamingContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: createContext
def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
print "Creating new context"
if os.path.exists(outputPath):
os.remove(outputPath)
sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
ssc = StreamingContext(sc, 120)
# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (eg. generated by 'nc')
lines = ssc.socketTextStream(host, port)
print '\n\n\nconnectionMade\n\n\n'
addresses = lines.map(splitLine)
transcationsum = addresses.map(lambda x: (x[0], (1, x[1]))).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
def echo(time, rdd):
counts = "Counts at time %s %s" % (time, rdd.collect())
print counts
print "Appending to " + os.path.abspath(outputPath)
with open(outputPath, 'a') as f:
f.write(counts + "\n")
transcationsum.foreachRDD(echo)
return ssc
开发者ID:samchorlton,项目名称:python,代码行数:25,代码来源:bitcoin_ip_count.py
示例2: ss_direct_kafka_bucket_counter
def ss_direct_kafka_bucket_counter(brokers, topic, bucket_interval, output_msg, message_parse, valueDecoder=None):
"""Starts a Spark Streaming job from a Kafka input and parses message time
WARNING!! This function only works for spark 1.4.0+
Args:
brokers: the kafka broker that we look at for the topic
topic: the kafka topic for input
timeinterval: the time interval in seconds (int) that the job will
bucket
Returns:
None
"""
sc = SparkContext(appName="PythonKafkaBucketCounter")
ssc = StreamingContext(sc, timeinterval + 5)
if valueDecoder:
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=valueDecoder)
else:
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
interval_counts = lines.map(lambda line: (message_parse(line), 1)).reduceByKey(lambda a, b: a + b)
output_msg_func = output_msg(sc, ssc)
interval_counts.foreachRDD(output_msg_func)
ssc.start()
ssc.awaitTermination()
开发者ID:kelvinfann,项目名称:spark-streaming-kafka-bucket-counter,代码行数:32,代码来源:spark-streaming-kafka-bucket-counter.py
示例3: BaseStreamingTestCase
class BaseStreamingTestCase(unittest.TestCase):
""" From https://github.com/apache/spark/blob/
master/python/pyspark/streaming/tests.py """
timeout = 10 # seconds
duration = .5
def setUp(self):
self.ssc = StreamingContext(sc, self.duration)
def tearDown(self):
self.ssc.stop(False)
def wait_for(self, result, n):
start_time = time.time()
while len(result) < n and time.time() - start_time < self.timeout:
time.sleep(0.01)
if len(result) < n:
print("timeout after", self.timeout)
def _collect(self, dstream, n):
result = []
def get_output(_, rdd):
if rdd and len(result) < n:
r = rdd.collect()
if r:
result.append(r)
dstream.foreachRDD(get_output)
self.ssc.start()
self.wait_for(result, n)
return result
开发者ID:Fighting-Toghter,项目名称:scrapybook,代码行数:34,代码来源:boostwords.py
示例4: main
def main():
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10) # Create a streaming context with batch interval of 10 sec
ssc.checkpoint("checkpoint")
geolocator = Nominatim()
stream(ssc,geolocator,100)
开发者ID:HackerPack,项目名称:disasterManagement,代码行数:7,代码来源:twitterStream.py
示例5: start
def start():
sconf = SparkConf()
sconf.set('spark.cores.max', 2)
sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
ssc = StreamingContext(sc, 2)
brokers = "192.192.0.27:9092"
topics = ['topic7']
kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})
lines1 = kafkaStreams_lines.map(lambda x: x[1]) # 注意 取tuple下的第二个即为接收到的kafka流
words = lines1.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordcounts = pairs.reduceByKey(lambda x, y: x + y)
wordcounts.saveAsTextFiles("/var/lib/hadoop-hdfs/spark-libin/kafka")
wordcounts.pprint()
# 统计生成的随机数的分布情况
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:25,代码来源:kafka_streaming_direct.py
示例6: start
def start():
sconf = SparkConf()
sconf.set('spark.cores.max', 2)
sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
ssc = StreamingContext(sc, 2)
brokers = "localhost:9092"
topics = ['test']
kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})
lines1 = kafkaStreams_lines.map(lambda x: x[1]) # 注意 取tuple下的第二个即为接收到的kafka流
words = lines1.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordcounts = pairs.reduceByKey(lambda x, y: x + y)
print(wordcounts)
kafkaStreams_lines.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
wordcounts.pprint()
# 统计生成的随机数的分布情况
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:27,代码来源:kafka-direct.py
示例7: main
def main():
if len(sys.argv) != 4:
print("Usage: kafka_wordcount.py <zk> <topic> <timeout>",
file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
timeout = None
if len(sys.argv) == 4:
zk, topic, timeout = sys.argv[1:]
timeout = int(timeout)
else:
zk, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(
ssc, zk, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: (line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b))
counts.pprint()
kwargs = {}
if timeout:
kwargs['timeout'] = timeout
ssc.start()
ssc.awaitTermination(**kwargs)
开发者ID:butterfy76,项目名称:sahara,代码行数:26,代码来源:spark-kafka-example.py
示例8: createStreamingContext
def createStreamingContext():
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("spark://%s:7077" % MASTER_NAME, appName="GlutenTweet", pyFiles=PYFILES)
ssc = StreamingContext(sc, 2)
# Create a DStream of raw data
raw = ssc.socketTextStream(MASTER_IP, 9999)
# Convert into models
tweets = raw.map(lambda r: Tweet(raw_json=r))
# Store models
tweets.foreachRDD(storeTweetsRDD)
# Sliding window analysis
window = tweets.window(20*60, 30)
hashtagCounts = analysisHahtagCount(window)
streamTop(hashtagCounts).pprint()
# Keyword extraction - note tweets is immutable
tweetsKeyword = tweets.map(lambda t: keywordExtraction(t))
# Update models
tweetsKeyword.foreachRDD(updateTweetsRDD)
# Sliding window analysis
window2 = tweetsKeyword.window(20*60, 30)
keywordCounts = analysisKeywordCount(window2)
streamTop(keywordCounts).pprint()
ssc.checkpoint(CHECKPOINT_DIR)
return ssc
开发者ID:ecesena,项目名称:spark-tutorial,代码行数:33,代码来源:app.py
示例9: main
def main():
sc = SparkContext(appName="IntrusionDetector")
ssc = StreamingContext(sc, batch_durations)
kvs = KafkaUtils.createDirectStream(ssc, [input_topic], {"metadata.broker.list": broker})
kvs.foreachRDD(processRDD)
ssc.start()
ssc.awaitTermination()
开发者ID:dfeldman,项目名称:intrusion-detector,代码行数:8,代码来源:processor.py
示例10: kafka_spark_streaming_sql_main
def kafka_spark_streaming_sql_main(app_name, brokers, topic, interval_seconds, sql_function):
sc = SparkContext(appName=app_name)
sqlContext = SQLContext(sc)
# ssc = StreamingContext(sc, interval_seconds)
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
kvs.foreachRDD(sql_function)
ssc.start()
ssc.awaitTermination()
开发者ID:clearclouds-spark,项目名称:spark-sql-py,代码行数:9,代码来源:http_util.py
示例11: main
def main():
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10) # Create a streaming context with batch interval of 10 sec
ssc.checkpoint("checkpoint")
pwords = load_wordlist("positive.txt")
nwords = load_wordlist("negative.txt")
counts = stream(ssc, pwords, nwords, 100)
make_plot(counts)
开发者ID:prutha28,项目名称:Twitter-Sentiment-Analysis,代码行数:9,代码来源:twitterStream.py
示例12: read_tweets
def read_tweets():
sc = SparkContext(appName="sentimentProducer")
ssc = StreamingContext(sc,600) # Test 60 segundos
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"metadata.broker.list": brokers})
kvs.foreachRDD(create_format)
producer.flush()
ssc.start()
ssc.awaitTermination()
开发者ID:lrsolorzano,项目名称:BigDataProject3,代码行数:10,代码来源:sentimentProducer.py
示例13: functionToCreateContext
def functionToCreateContext():
sc = SparkContext(appName="StreamingExampleWithKafka")
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
opts = {"metadata.broker.list": "node1.example.com:6667,node2.example.com:6667"}
kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], opts)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).updateStateByKey(updateFunction)
counts.pprint()
return ssc
开发者ID:bithu30,项目名称:myRepo,代码行数:10,代码来源:streamingWordCountWithState.py
示例14: setup
def setup():
conf = SparkConf().set("spark.default.parallelism", 1)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1))
wc = dstream.updateStateByKey(updater)
wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
wc.checkpoint(2)
self.setupCalled = True
return ssc
开发者ID:JingchengDu,项目名称:spark,代码行数:10,代码来源:test_dstream.py
示例15: invoke
def invoke():
# object to keep track of offsets
ConfigInitializer.basic_config()
# app name
application_name = "mon_metrics_kafka"
my_spark_conf = SparkConf().setAppName(application_name)
spark_context = SparkContext(conf=my_spark_conf)
# read at the configured interval
spark_streaming_context = \
StreamingContext(spark_context, cfg.CONF.service.stream_interval)
kafka_stream = MonMetricsKafkaProcessor.get_kafka_stream(
cfg.CONF.messaging.topic,
spark_streaming_context)
# transform to recordstore
MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream)
# catch interrupt, stop streaming context gracefully
# signal.signal(signal.SIGINT, signal_handler)
# start processing
spark_streaming_context.start()
# FIXME: stop spark context to relinquish resources
# FIXME: specify cores, so as not to use all the resources on the cluster.
# FIXME: HA deploy multiple masters, may be one on each control node
try:
# Wait for the Spark driver to "finish"
spark_streaming_context.awaitTermination()
except Exception as e:
MonMetricsKafkaProcessor.log_debug(
"Exception raised during Spark execution : " + str(e))
# One exception that can occur here is the result of the saved
# kafka offsets being obsolete/out of range. Delete the saved
# offsets to improve the chance of success on the next execution.
# TODO(someone) prevent deleting all offsets for an application,
# but just the latest revision
MonMetricsKafkaProcessor.log_debug(
"Deleting saved offsets for chance of success on next execution")
MonMetricsKafkaProcessor.reset_kafka_offsets(application_name)
# delete pre hourly processor offsets
if cfg.CONF.stage_processors.pre_hourly_processor_enabled:
PreHourlyProcessor.reset_kafka_offsets()
开发者ID:openstack,项目名称:monasca-transform,代码行数:54,代码来源:mon_metrics_kafka.py
示例16: main
def main():
conf = SparkConf().setAppName("kafka_source_mongo_sink_pymongo_filtered")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
try:
kafka_streams = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {"splash_json": 2})
kafka_streams.foreachRDD(process_rdd)
except Exception as e:
print e
ssc.start()
ssc.awaitTermination()
开发者ID:petergdoyle,项目名称:StreamWorks,代码行数:11,代码来源:kafka_source_mongo_sink_pymongo_filtered.py
示例17: MLLibStreamingTestCase
class MLLibStreamingTestCase(unittest.TestCase):
def setUp(self):
self.sc = sc
self.ssc = StreamingContext(self.sc, 1.0)
def tearDown(self):
self.ssc.stop(False)
@staticmethod
def _ssc_wait(start_time, end_time, sleep_time):
while time() - start_time < end_time:
sleep(0.01)
开发者ID:HodaAlemi,项目名称:spark,代码行数:12,代码来源:tests.py
示例18: createContext
def createContext(conf):
spConf = conf.getSparkConf()
sc = SparkContext(conf=spConf)
ssc = StreamingContext(sc, conf.INTERVAL)
ssc.remember(conf.REMEMBER)
# get reader
lines = conf.getReader(ssc)
# use window
lines = lines.window(conf.WINDOW, conf.WINDOW)
lines = lines.map(lambda line: jsonDecode(line))
deal(lines, conf)
return ssc
开发者ID:sekaiamber,项目名称:KSE-Sample,代码行数:12,代码来源:submit.py
示例19: createContext
def createContext():
uBATCH_INTERVAL = 10
sc = SparkContext(SPARK_MASTER, appName="StreamingKafka")
sc.broadcast(batchUserPostDict)
sc.broadcast(batchPostUserDict)
#sc = SparkContext("local[*]", appName="StreamingKafka")
# streaming batch interval of 5 sec first, and reduce later to 1 sec or lower
ssc = StreamingContext(sc, uBATCH_INTERVAL)
ssc.checkpoint(CHECKPOINT_DIR) # set checkpoint directory in HDFS
#ssc.checkpoint(10 * uBATCH_INTERVAL)
return ssc
ssc = StreamingContext.getOrCreate(CHECKPOINT_DIR, createContext)
开发者ID:lingding0,项目名称:HottestTopicOnReddit,代码行数:13,代码来源:hotred_stream.py
示例20: main
def main():
conf = SparkConf()
conf.setAppName("TopAirports")
conf.set("spark.streaming.kafka.maxRatePerPartition", "0")
conf.set("spark.dynamicAllocation.enabled", "true")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1) # Stream every 1 second
ssc.checkpoint("checkpoint")
# Clear the cassandra table
init_cassandra().execute('TRUNCATE {}'.format(top_airports_table))
stream_kafka(ssc)
开发者ID:karthikBG,项目名称:AviationAnalytics,代码行数:13,代码来源:2.2.TopDestinationsByAirport.py
注:本文中的pyspark.streaming.StreamingContext类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论