• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python streaming.StreamingContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.streaming.StreamingContext的典型用法代码示例。如果您正苦于以下问题:Python StreamingContext类的具体用法?Python StreamingContext怎么用?Python StreamingContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了StreamingContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: createContext

def createContext(host, port, outputPath):
    # If you do not see this printed, that means the StreamingContext has been loaded
    # from the new checkpoint
    print "Creating new context"
    if os.path.exists(outputPath):
        os.remove(outputPath)
    sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
    ssc = StreamingContext(sc, 120)

    # Create a socket stream on target ip:port and count the
    # words in input stream of \n delimited text (eg. generated by 'nc')
    lines = ssc.socketTextStream(host, port)
    print '\n\n\nconnectionMade\n\n\n'
    addresses = lines.map(splitLine)
    transcationsum = addresses.map(lambda x: (x[0], (1, x[1]))).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

    def echo(time, rdd):
        counts = "Counts at time %s %s" % (time, rdd.collect())
        print counts
        print "Appending to " + os.path.abspath(outputPath)
        with open(outputPath, 'a') as f:
            f.write(counts + "\n")

    transcationsum.foreachRDD(echo)
    return ssc
开发者ID:samchorlton,项目名称:python,代码行数:25,代码来源:bitcoin_ip_count.py


示例2: ss_direct_kafka_bucket_counter

def ss_direct_kafka_bucket_counter(brokers, topic, bucket_interval, output_msg, message_parse, valueDecoder=None):
    """Starts a Spark Streaming job from a Kafka input and parses message time

	WARNING!! This function only works for spark 1.4.0+ 

	Args:
		brokers: the kafka broker that we look at for the topic
		topic: the kafka topic for input
		timeinterval: the time interval in seconds (int) that the job will 
			bucket

	Returns:
		None
		
	"""
    sc = SparkContext(appName="PythonKafkaBucketCounter")
    ssc = StreamingContext(sc, timeinterval + 5)

    if valueDecoder:
        kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=valueDecoder)
    else:
        kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

    lines = kvs.map(lambda x: x[1])
    interval_counts = lines.map(lambda line: (message_parse(line), 1)).reduceByKey(lambda a, b: a + b)

    output_msg_func = output_msg(sc, ssc)

    interval_counts.foreachRDD(output_msg_func)

    ssc.start()
    ssc.awaitTermination()
开发者ID:kelvinfann,项目名称:spark-streaming-kafka-bucket-counter,代码行数:32,代码来源:spark-streaming-kafka-bucket-counter.py


示例3: BaseStreamingTestCase

class BaseStreamingTestCase(unittest.TestCase):
    """ From https://github.com/apache/spark/blob/
    master/python/pyspark/streaming/tests.py """

    timeout = 10  # seconds
    duration = .5

    def setUp(self):
        self.ssc = StreamingContext(sc, self.duration)

    def tearDown(self):
        self.ssc.stop(False)

    def wait_for(self, result, n):
        start_time = time.time()
        while len(result) < n and time.time() - start_time < self.timeout:
            time.sleep(0.01)
        if len(result) < n:
            print("timeout after", self.timeout)

    def _collect(self, dstream, n):
        result = []

        def get_output(_, rdd):
            if rdd and len(result) < n:
                r = rdd.collect()
                if r:
                    result.append(r)

        dstream.foreachRDD(get_output)

        self.ssc.start()
        self.wait_for(result, n)
        return result
开发者ID:Fighting-Toghter,项目名称:scrapybook,代码行数:34,代码来源:boostwords.py


示例4: main

def main():
    conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 10)   # Create a streaming context with batch interval of 10 sec
    ssc.checkpoint("checkpoint")
    geolocator = Nominatim()
    stream(ssc,geolocator,100) 
开发者ID:HackerPack,项目名称:disasterManagement,代码行数:7,代码来源:twitterStream.py


示例5: start

def start():
    sconf = SparkConf()
    sconf.set('spark.cores.max', 2)
    sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
    ssc = StreamingContext(sc, 2)

    brokers = "192.192.0.27:9092"
    topics = ['topic7']

    kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})

    lines1 = kafkaStreams_lines.map(lambda x: x[1])  # 注意 取tuple下的第二个即为接收到的kafka流

    words = lines1.flatMap(lambda line: line.split(" "))

    pairs = words.map(lambda word: (word, 1))

    wordcounts = pairs.reduceByKey(lambda x, y: x + y)

    wordcounts.saveAsTextFiles("/var/lib/hadoop-hdfs/spark-libin/kafka")

    wordcounts.pprint()
    # 统计生成的随机数的分布情况
    ssc.start()  # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:25,代码来源:kafka_streaming_direct.py


示例6: start

def start():
    sconf = SparkConf()
    sconf.set('spark.cores.max', 2)
    sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
    ssc = StreamingContext(sc, 2)

    brokers = "localhost:9092"
    topics = ['test']

    kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})

    lines1 = kafkaStreams_lines.map(lambda x: x[1])  # 注意 取tuple下的第二个即为接收到的kafka流

    words = lines1.flatMap(lambda line: line.split(" "))

    pairs = words.map(lambda word: (word, 1))

    wordcounts = pairs.reduceByKey(lambda x, y: x + y)

    print(wordcounts)

    kafkaStreams_lines.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)

    wordcounts.pprint()
    # 统计生成的随机数的分布情况
    ssc.start()  # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:27,代码来源:kafka-direct.py


示例7: main

def main():
    if len(sys.argv) != 4:
        print("Usage: kafka_wordcount.py <zk> <topic> <timeout>",
              file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)
    timeout = None
    if len(sys.argv) == 4:
        zk, topic, timeout = sys.argv[1:]
        timeout = int(timeout)
    else:
        zk, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(
        ssc, zk, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: (line.split(" "))
                           .map(lambda word: (word, 1))
                           .reduceByKey(lambda a, b: a+b))
    counts.pprint()
    kwargs = {}
    if timeout:
        kwargs['timeout'] = timeout
    ssc.start()
    ssc.awaitTermination(**kwargs)
开发者ID:butterfy76,项目名称:sahara,代码行数:26,代码来源:spark-kafka-example.py


示例8: createStreamingContext

def createStreamingContext():

    # Create a local StreamingContext with two working thread and batch interval of 1 second
    sc = SparkContext("spark://%s:7077" % MASTER_NAME, appName="GlutenTweet", pyFiles=PYFILES)
    ssc = StreamingContext(sc, 2)

    # Create a DStream of raw data
    raw = ssc.socketTextStream(MASTER_IP, 9999)

    # Convert into models
    tweets = raw.map(lambda r: Tweet(raw_json=r))

    # Store models
    tweets.foreachRDD(storeTweetsRDD)

    # Sliding window analysis
    window = tweets.window(20*60, 30)
    hashtagCounts = analysisHahtagCount(window)
    streamTop(hashtagCounts).pprint()

    # Keyword extraction - note tweets is immutable
    tweetsKeyword = tweets.map(lambda t: keywordExtraction(t))

    # Update models
    tweetsKeyword.foreachRDD(updateTweetsRDD)

    # Sliding window analysis
    window2 = tweetsKeyword.window(20*60, 30)
    keywordCounts = analysisKeywordCount(window2)
    streamTop(keywordCounts).pprint()

    ssc.checkpoint(CHECKPOINT_DIR)
    return ssc
开发者ID:ecesena,项目名称:spark-tutorial,代码行数:33,代码来源:app.py


示例9: main

def main():
    sc = SparkContext(appName="IntrusionDetector")
    ssc = StreamingContext(sc, batch_durations)

    kvs = KafkaUtils.createDirectStream(ssc, [input_topic], {"metadata.broker.list": broker})
    kvs.foreachRDD(processRDD)
    ssc.start()
    ssc.awaitTermination()
开发者ID:dfeldman,项目名称:intrusion-detector,代码行数:8,代码来源:processor.py


示例10: kafka_spark_streaming_sql_main

def kafka_spark_streaming_sql_main(app_name, brokers, topic, interval_seconds, sql_function):
    sc = SparkContext(appName=app_name)
    sqlContext = SQLContext(sc)
    # ssc = StreamingContext(sc, interval_seconds)
    ssc = StreamingContext(sc, 10)
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(sql_function)
    ssc.start()
    ssc.awaitTermination()
开发者ID:clearclouds-spark,项目名称:spark-sql-py,代码行数:9,代码来源:http_util.py


示例11: main

def main():
    conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 10)   # Create a streaming context with batch interval of 10 sec
    ssc.checkpoint("checkpoint")
    pwords = load_wordlist("positive.txt")
    nwords = load_wordlist("negative.txt")
    counts = stream(ssc, pwords, nwords, 100)
    make_plot(counts)
开发者ID:prutha28,项目名称:Twitter-Sentiment-Analysis,代码行数:9,代码来源:twitterStream.py


示例12: read_tweets

def read_tweets():

    sc = SparkContext(appName="sentimentProducer")
    ssc = StreamingContext(sc,600)  # Test 60 segundos
    brokers = "localhost:9092"
    kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"metadata.broker.list": brokers})
    kvs.foreachRDD(create_format)
    producer.flush()
    ssc.start()
    ssc.awaitTermination()
开发者ID:lrsolorzano,项目名称:BigDataProject3,代码行数:10,代码来源:sentimentProducer.py


示例13: functionToCreateContext

def functionToCreateContext():
    sc = SparkContext(appName="StreamingExampleWithKafka")
    ssc = StreamingContext(sc, 10)
    ssc.checkpoint("checkpoint")
    opts = {"metadata.broker.list": "node1.example.com:6667,node2.example.com:6667"}
    kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], opts)
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).updateStateByKey(updateFunction)
    counts.pprint()
    return ssc
开发者ID:bithu30,项目名称:myRepo,代码行数:10,代码来源:streamingWordCountWithState.py


示例14: setup

 def setup():
     conf = SparkConf().set("spark.default.parallelism", 1)
     sc = SparkContext(conf=conf)
     ssc = StreamingContext(sc, 2)
     dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1))
     wc = dstream.updateStateByKey(updater)
     wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
     wc.checkpoint(2)
     self.setupCalled = True
     return ssc
开发者ID:JingchengDu,项目名称:spark,代码行数:10,代码来源:test_dstream.py


示例15: invoke

def invoke():
    # object to keep track of offsets
    ConfigInitializer.basic_config()

    # app name
    application_name = "mon_metrics_kafka"

    my_spark_conf = SparkConf().setAppName(application_name)

    spark_context = SparkContext(conf=my_spark_conf)

    # read at the configured interval
    spark_streaming_context = \
        StreamingContext(spark_context, cfg.CONF.service.stream_interval)

    kafka_stream = MonMetricsKafkaProcessor.get_kafka_stream(
        cfg.CONF.messaging.topic,
        spark_streaming_context)

    # transform to recordstore
    MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream)

    # catch interrupt, stop streaming context gracefully
    # signal.signal(signal.SIGINT, signal_handler)

    # start processing
    spark_streaming_context.start()

    # FIXME: stop spark context to relinquish resources

    # FIXME: specify cores, so as not to use all the resources on the cluster.

    # FIXME: HA deploy multiple masters, may be one on each control node

    try:
        # Wait for the Spark driver to "finish"
        spark_streaming_context.awaitTermination()
    except Exception as e:
        MonMetricsKafkaProcessor.log_debug(
            "Exception raised during Spark execution : " + str(e))
        # One exception that can occur here is the result of the saved
        # kafka offsets being obsolete/out of range.  Delete the saved
        # offsets to improve the chance of success on the next execution.

        # TODO(someone) prevent deleting all offsets for an application,
        # but just the latest revision
        MonMetricsKafkaProcessor.log_debug(
            "Deleting saved offsets for chance of success on next execution")

        MonMetricsKafkaProcessor.reset_kafka_offsets(application_name)

        # delete pre hourly processor offsets
        if cfg.CONF.stage_processors.pre_hourly_processor_enabled:
            PreHourlyProcessor.reset_kafka_offsets()
开发者ID:openstack,项目名称:monasca-transform,代码行数:54,代码来源:mon_metrics_kafka.py


示例16: main

def main():
    conf = SparkConf().setAppName("kafka_source_mongo_sink_pymongo_filtered")
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 1)
    try:
        kafka_streams = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {"splash_json": 2})
        kafka_streams.foreachRDD(process_rdd)
    except Exception as e:
        print e
    ssc.start()
    ssc.awaitTermination()
开发者ID:petergdoyle,项目名称:StreamWorks,代码行数:11,代码来源:kafka_source_mongo_sink_pymongo_filtered.py


示例17: MLLibStreamingTestCase

class MLLibStreamingTestCase(unittest.TestCase):
    def setUp(self):
        self.sc = sc
        self.ssc = StreamingContext(self.sc, 1.0)

    def tearDown(self):
        self.ssc.stop(False)

    @staticmethod
    def _ssc_wait(start_time, end_time, sleep_time):
        while time() - start_time < end_time:
            sleep(0.01)
开发者ID:HodaAlemi,项目名称:spark,代码行数:12,代码来源:tests.py


示例18: createContext

def createContext(conf):
    spConf = conf.getSparkConf()
    sc = SparkContext(conf=spConf)
    ssc = StreamingContext(sc, conf.INTERVAL)
    ssc.remember(conf.REMEMBER)
    # get reader
    lines = conf.getReader(ssc)
    # use window
    lines = lines.window(conf.WINDOW, conf.WINDOW)
    lines = lines.map(lambda line: jsonDecode(line))
    deal(lines, conf)
    return ssc
开发者ID:sekaiamber,项目名称:KSE-Sample,代码行数:12,代码来源:submit.py


示例19: createContext

        def createContext():
            uBATCH_INTERVAL = 10
            sc = SparkContext(SPARK_MASTER, appName="StreamingKafka")
            sc.broadcast(batchUserPostDict)
            sc.broadcast(batchPostUserDict)
            #sc = SparkContext("local[*]", appName="StreamingKafka")
            # streaming batch interval of 5 sec first, and reduce later to 1 sec or lower
            ssc = StreamingContext(sc, uBATCH_INTERVAL)
            ssc.checkpoint(CHECKPOINT_DIR)   # set checkpoint directory in HDFS
            #ssc.checkpoint(10 * uBATCH_INTERVAL)
            return ssc

            ssc = StreamingContext.getOrCreate(CHECKPOINT_DIR, createContext)
开发者ID:lingding0,项目名称:HottestTopicOnReddit,代码行数:13,代码来源:hotred_stream.py


示例20: main

def main():
    conf = SparkConf()
    conf.setAppName("TopAirports")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "0")
    conf.set("spark.dynamicAllocation.enabled", "true")
    sc = SparkContext(conf = conf)
    ssc = StreamingContext(sc, 1) # Stream every 1 second
    ssc.checkpoint("checkpoint")

    # Clear the cassandra table
    init_cassandra().execute('TRUNCATE {}'.format(top_airports_table))

    stream_kafka(ssc)
开发者ID:karthikBG,项目名称:AviationAnalytics,代码行数:13,代码来源:2.2.TopDestinationsByAirport.py



注:本文中的pyspark.streaming.StreamingContext类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python context.StreamingContext类代码示例发布时间:2022-05-27
下一篇:
Python streaming.DStream类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap