• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python context.StreamingContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.streaming.context.StreamingContext的典型用法代码示例。如果您正苦于以下问题:Python StreamingContext类的具体用法?Python StreamingContext怎么用?Python StreamingContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了StreamingContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _writeAndVerify

    def _writeAndVerify(self, ports):
        # Set up the streaming context and input streams
        ssc = StreamingContext(self.sc, self.duration)
        try:
            addresses = [("localhost", port) for port in ports]
            dstream = FlumeUtils.createPollingStream(
                ssc,
                addresses,
                maxBatchSize=self._utils.eventsPerBatch(),
                parallelism=5)
            outputBuffer = []

            def get_output(_, rdd):
                for e in rdd.collect():
                    outputBuffer.append(e)

            dstream.foreachRDD(get_output)
            ssc.start()
            self._utils.sendDatAndEnsureAllDataHasBeenReceived()

            self.wait_for(outputBuffer, self._utils.getTotalEvents())
            outputHeaders = [event[0] for event in outputBuffer]
            outputBodies = [event[1] for event in outputBuffer]
            self._utils.assertOutput(outputHeaders, outputBodies)
        finally:
            ssc.stop(False)
开发者ID:anitatailor,项目名称:spark,代码行数:26,代码来源:tests.py


示例2: setup

 def setup():
     conf = SparkConf().set("spark.default.parallelism", 1)
     sc = SparkContext(conf=conf)
     ssc = StreamingContext(sc, 0.5)
     dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1))
     wc = dstream.updateStateByKey(updater)
     wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
     wc.checkpoint(0.5)
     return ssc
开发者ID:LakeCarrot,项目名称:EC2_Initializing,代码行数:9,代码来源:tests.py


示例3: test_get_or_create

    def test_get_or_create(self):
        inputd = tempfile.mkdtemp()
        outputd = tempfile.mkdtemp() + "/"

        def updater(vs, s):
            return sum(vs, s or 0)

        def setup():
            conf = SparkConf().set("spark.default.parallelism", 1)
            sc = SparkContext(conf=conf)
            ssc = StreamingContext(sc, 0.5)
            dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1))
            wc = dstream.updateStateByKey(updater)
            wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
            wc.checkpoint(0.5)
            return ssc

        cpd = tempfile.mkdtemp("test_streaming_cps")
        ssc = StreamingContext.getOrCreate(cpd, setup)
        ssc.start()

        def check_output(n):
            while not os.listdir(outputd):
                time.sleep(0.01)
            time.sleep(1)  # make sure mtime is larger than the previous one
            with open(os.path.join(inputd, str(n)), "w") as f:
                f.writelines(["%d\n" % i for i in range(10)])

            while True:
                p = os.path.join(outputd, max(os.listdir(outputd)))
                if "_SUCCESS" not in os.listdir(p):
                    # not finished
                    time.sleep(0.01)
                    continue
                ordd = ssc.sparkContext.textFile(p).map(lambda line: line.split(","))
                d = ordd.values().map(int).collect()
                if not d:
                    time.sleep(0.01)
                    continue
                self.assertEqual(10, len(d))
                s = set(d)
                self.assertEqual(1, len(s))
                m = s.pop()
                if n > m:
                    continue
                self.assertEqual(n, m)
                break

        check_output(1)
        check_output(2)
        ssc.stop(True, True)

        time.sleep(1)
        ssc = StreamingContext.getOrCreate(cpd, setup)
        ssc.start()
        check_output(3)
        ssc.stop(True, True)
开发者ID:LakeCarrot,项目名称:EC2_Initializing,代码行数:57,代码来源:tests.py


示例4: setup

        def setup():
            conf = SparkConf().set("spark.default.parallelism", 1)
            sc = SparkContext(conf=conf)
            ssc = StreamingContext(sc, 0.5)

            # A function that cannot be serialized
            def process(time, rdd):
                sc.parallelize(range(1, 10))

            ssc.textFileStream(inputd).foreachRDD(process)
            return ssc
开发者ID:ahnqirage,项目名称:spark,代码行数:11,代码来源:tests.py


示例5: test_transform_function_serializer_failure

    def test_transform_function_serializer_failure(self):
        inputd = tempfile.mkdtemp()
        self.cpd = tempfile.mkdtemp("test_transform_function_serializer_failure")

        def setup():
            conf = SparkConf().set("spark.default.parallelism", 1)
            sc = SparkContext(conf=conf)
            ssc = StreamingContext(sc, 0.5)

            # A function that cannot be serialized
            def process(time, rdd):
                sc.parallelize(range(1, 10))

            ssc.textFileStream(inputd).foreachRDD(process)
            return ssc

        self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
        try:
            self.ssc.start()
        except:
            import traceback
            failure = traceback.format_exc()
            self.assertTrue(
                "It appears that you are attempting to reference SparkContext" in failure)
            return

        self.fail("using SparkContext in process should fail because it's not Serializable")
开发者ID:ahnqirage,项目名称:spark,代码行数:27,代码来源:tests.py


示例6: test_slice

    def test_slice(self):
        """Basic operation test for DStream.slice."""
        import datetime as dt
        self.ssc = StreamingContext(self.sc, 1.0)
        self.ssc.remember(4.0)
        input = [[1], [2], [3], [4]]
        stream = self.ssc.queueStream([self.sc.parallelize(d, 1) for d in input])

        time_vals = []

        def get_times(t, rdd):
            if rdd and len(time_vals) < len(input):
                time_vals.append(t)

        stream.foreachRDD(get_times)

        self.ssc.start()
        self.wait_for(time_vals, 4)
        begin_time = time_vals[0]

        def get_sliced(begin_delta, end_delta):
            begin = begin_time + dt.timedelta(seconds=begin_delta)
            end = begin_time + dt.timedelta(seconds=end_delta)
            rdds = stream.slice(begin, end)
            result_list = [rdd.collect() for rdd in rdds]
            return [r for result in result_list for r in result]

        self.assertEqual(set([1]), set(get_sliced(0, 0)))
        self.assertEqual(set([2, 3]), set(get_sliced(1, 2)))
        self.assertEqual(set([2, 3, 4]), set(get_sliced(1, 4)))
        self.assertEqual(set([1, 2, 3, 4]), set(get_sliced(0, 4)))
开发者ID:ahnqirage,项目名称:spark,代码行数:31,代码来源:tests.py


示例7: test_stop_multiple_times

 def test_stop_multiple_times(self):
     self.ssc = StreamingContext(master=self.master, appName=self.appName,
                            duration=self.batachDuration)
     self._addInputStream(self.ssc)
     self.ssc.start()
     self.ssc.stop()
     self.ssc.stop()
开发者ID:giworld,项目名称:spark,代码行数:7,代码来源:tests.py


示例8: test_stop_only_streaming_context

 def test_stop_only_streaming_context(self):
     self.sc = SparkContext(master=self.master, appName=self.appName)
     self.ssc = StreamingContext(sparkContext=self.sc, duration=self.batachDuration)
     self._addInputStream(self.ssc)
     self.ssc.start()
     self.ssc.stop(False)
     self.assertEqual(len(self.sc.parallelize(range(5), 5).glom().collect()), 5)
开发者ID:giworld,项目名称:spark,代码行数:7,代码来源:tests.py


示例9: test_from_conf_with_settings

 def test_from_conf_with_settings(self):
     conf = SparkConf()
     conf.set("spark.cleaner.ttl", "10")
     conf.setMaster(self.master)
     conf.setAppName(self.appName)
     self.ssc = StreamingContext(conf=conf, duration=self.batachDuration)
     self.assertEqual(int(self.ssc.sparkContext._conf.get("spark.cleaner.ttl")), 10)
开发者ID:giworld,项目名称:spark,代码行数:7,代码来源:tests.py


示例10: setUp

 def setUp(self):
     class_name = self.__class__.__name__
     conf = SparkConf().set("spark.default.parallelism", 1)
     self.sc = SparkContext(appName=class_name, conf=conf)
     self.sc.setCheckpointDir("/tmp")
     # TODO: decrease duration to speed up tests
     self.ssc = StreamingContext(self.sc, self.duration)
开发者ID:31z4,项目名称:spark,代码行数:7,代码来源:tests.py


示例11: test_from_no_conf_constructor

 def test_from_no_conf_constructor(self):
     self.ssc = StreamingContext(master=self.master, appName=self.appName,
                            duration=self.batachDuration)
     # Alternative call master: ssc.sparkContext.master
     # I try to make code close to Scala.
     self.assertEqual(self.ssc.sparkContext._conf.get("spark.master"), self.master)
     self.assertEqual(self.ssc.sparkContext._conf.get("spark.app.name"), self.appName)
开发者ID:giworld,项目名称:spark,代码行数:7,代码来源:tests.py


示例12: createSSC

def createSSC():
    # ssc 생성
    conf = SparkConf()
    sc = SparkContext(master="local[*]", appName="CheckpointSample", conf=conf)
    ssc = StreamingContext(sc, 3)

    # DStream 생성
    ids1 = ssc.socketTextStream("127.0.0.1", 9000)
    ids2 = ids1.flatMap(lambda v: v.split(" ")).map(lambda v: (v, 1))

    # updateStateByKey
    ids2.updateStateByKey(updateFunc).pprint()

    # checkpoint
    ssc.checkpoint("./checkPoints/checkPointSample/Python")

    # return
    return ssc
开发者ID:oopchoi,项目名称:spark,代码行数:18,代码来源:checkpoint_sample.py


示例13: test_text_file_stream

 def test_text_file_stream(self):
     d = tempfile.mkdtemp()
     self.ssc = StreamingContext(self.sc, self.duration)
     dstream2 = self.ssc.textFileStream(d).map(int)
     result = self._collect(dstream2, 2, block=False)
     self.ssc.start()
     for name in ('a', 'b'):
         time.sleep(1)
         with open(os.path.join(d, name), "w") as f:
             f.writelines(["%d\n" % i for i in range(10)])
     self.wait_for(result, 2)
     self.assertEqual([list(range(10)), list(range(10))], result)
开发者ID:anitatailor,项目名称:spark,代码行数:12,代码来源:tests.py


示例14: test_binary_records_stream

 def test_binary_records_stream(self):
     d = tempfile.mkdtemp()
     self.ssc = StreamingContext(self.sc, self.duration)
     dstream = self.ssc.binaryRecordsStream(d, 10).map(lambda v: struct.unpack("10b", bytes(v)))
     result = self._collect(dstream, 2, block=False)
     self.ssc.start()
     for name in ("a", "b"):
         time.sleep(1)
         with open(os.path.join(d, name), "wb") as f:
             f.write(bytearray(range(10)))
     self.wait_for(result, 2)
     self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result])
开发者ID:LakeCarrot,项目名称:EC2_Initializing,代码行数:12,代码来源:tests.py


示例15: test_get_active_or_create

    def test_get_active_or_create(self):
        # Test StreamingContext.getActiveOrCreate() without checkpoint data
        # See CheckpointTests for tests with checkpoint data
        self.ssc = None
        self.assertEqual(StreamingContext.getActive(), None)

        def setupFunc():
            ssc = StreamingContext(self.sc, self.duration)
            ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
            self.setupCalled = True
            return ssc

        # Verify that getActiveOrCreate() (w/o checkpoint) calls setupFunc when no context is active
        self.setupCalled = False
        self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
        self.assertTrue(self.setupCalled)

        # Verify that getActiveOrCreate() retuns active context and does not call the setupFunc
        self.ssc.start()
        self.setupCalled = False
        self.assertEqual(StreamingContext.getActiveOrCreate(None, setupFunc), self.ssc)
        self.assertFalse(self.setupCalled)

        # Verify that getActiveOrCreate() calls setupFunc after active context is stopped
        self.ssc.stop(False)
        self.setupCalled = False
        self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
        self.assertTrue(self.setupCalled)

        # Verify that if the Java context is stopped, then getActive() returns None
        self.ssc = StreamingContext(self.sc, self.duration)
        self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
        self.ssc.start()
        self.assertEqual(StreamingContext.getActive(), self.ssc)
        self.ssc._jssc.stop(False)
        self.setupCalled = False
        self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
        self.assertTrue(self.setupCalled)
开发者ID:anitatailor,项目名称:spark,代码行数:38,代码来源:tests.py


示例16: test_get_active

    def test_get_active(self):
        self.assertEqual(StreamingContext.getActive(), None)

        # Verify that getActive() returns the active context
        self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
        self.ssc.start()
        self.assertEqual(StreamingContext.getActive(), self.ssc)

        # Verify that getActive() returns None
        self.ssc.stop(False)
        self.assertEqual(StreamingContext.getActive(), None)

        # Verify that if the Java context is stopped, then getActive() returns None
        self.ssc = StreamingContext(self.sc, self.duration)
        self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
        self.ssc.start()
        self.assertEqual(StreamingContext.getActive(), self.ssc)
        self.ssc._jssc.stop(False)
        self.assertEqual(StreamingContext.getActive(), None)
开发者ID:anitatailor,项目名称:spark,代码行数:19,代码来源:tests.py


示例17: PySparkStreamingTestCase

class PySparkStreamingTestCase(unittest.TestCase):

    timeout = 10  # seconds
    duration = .5

    @classmethod
    def setUpClass(cls):
        class_name = cls.__name__
        conf = SparkConf().set("spark.default.parallelism", 1)
        cls.sc = SparkContext(appName=class_name, conf=conf)
        cls.sc.setCheckpointDir("/tmp")

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()
        # Clean up in the JVM just in case there has been some issues in Python API
        jSparkContextOption = SparkContext._jvm.SparkContext.get()
        if jSparkContextOption.nonEmpty():
            jSparkContextOption.get().stop()

    def setUp(self):
        self.ssc = StreamingContext(self.sc, self.duration)

    def tearDown(self):
        if self.ssc is not None:
            self.ssc.stop(False)
        # Clean up in the JVM just in case there has been some issues in Python API
        jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
        if jStreamingContextOption.nonEmpty():
            jStreamingContextOption.get().stop(False)

    def wait_for(self, result, n):
        start_time = time.time()
        while len(result) < n and time.time() - start_time < self.timeout:
            time.sleep(0.01)
        if len(result) < n:
            print("timeout after", self.timeout)

    def _take(self, dstream, n):
        """
        Return the first `n` elements in the stream (will start and stop).
        """
        results = []

        def take(_, rdd):
            if rdd and len(results) < n:
                results.extend(rdd.take(n - len(results)))

        dstream.foreachRDD(take)

        self.ssc.start()
        self.wait_for(results, n)
        return results

    def _collect(self, dstream, n, block=True):
        """
        Collect each RDDs into the returned list.

        :return: list, which will have the collected items.
        """
        result = []

        def get_output(_, rdd):
            if rdd and len(result) < n:
                r = rdd.collect()
                if r:
                    result.append(r)

        dstream.foreachRDD(get_output)

        if not block:
            return result

        self.ssc.start()
        self.wait_for(result, n)
        return result

    def _test_func(self, input, func, expected, sort=False, input2=None):
        """
        @param input: dataset for the test. This should be list of lists.
        @param func: wrapped function. This function should return PythonDStream object.
        @param expected: expected output for this testcase.
        """
        if not isinstance(input[0], RDD):
            input = [self.sc.parallelize(d, 1) for d in input]
        input_stream = self.ssc.queueStream(input)
        if input2 and not isinstance(input2[0], RDD):
            input2 = [self.sc.parallelize(d, 1) for d in input2]
        input_stream2 = self.ssc.queueStream(input2) if input2 is not None else None

        # Apply test function to stream.
        if input2:
            stream = func(input_stream, input_stream2)
        else:
            stream = func(input_stream)

        result = self._collect(stream, len(expected))
        if sort:
            self._sort_result_based_on_key(result)
            self._sort_result_based_on_key(expected)
#.........这里部分代码省略.........
开发者ID:anitatailor,项目名称:spark,代码行数:101,代码来源:tests.py


示例18: StreamingContextTests

class StreamingContextTests(PySparkStreamingTestCase):

    duration = 0.1
    setupCalled = False

    def _add_input_stream(self):
        inputs = [range(1, x) for x in range(101)]
        stream = self.ssc.queueStream(inputs)
        self._collect(stream, 1, block=False)

    def test_stop_only_streaming_context(self):
        self._add_input_stream()
        self.ssc.start()
        self.ssc.stop(False)
        self.assertEqual(len(self.sc.parallelize(range(5), 5).glom().collect()), 5)

    def test_stop_multiple_times(self):
        self._add_input_stream()
        self.ssc.start()
        self.ssc.stop(False)
        self.ssc.stop(False)

    def test_queue_stream(self):
        input = [list(range(i + 1)) for i in range(3)]
        dstream = self.ssc.queueStream(input)
        result = self._collect(dstream, 3)
        self.assertEqual(input, result)

    def test_text_file_stream(self):
        d = tempfile.mkdtemp()
        self.ssc = StreamingContext(self.sc, self.duration)
        dstream2 = self.ssc.textFileStream(d).map(int)
        result = self._collect(dstream2, 2, block=False)
        self.ssc.start()
        for name in ('a', 'b'):
            time.sleep(1)
            with open(os.path.join(d, name), "w") as f:
                f.writelines(["%d\n" % i for i in range(10)])
        self.wait_for(result, 2)
        self.assertEqual([list(range(10)), list(range(10))], result)

    def test_binary_records_stream(self):
        d = tempfile.mkdtemp()
        self.ssc = StreamingContext(self.sc, self.duration)
        dstream = self.ssc.binaryRecordsStream(d, 10).map(
            lambda v: struct.unpack("10b", bytes(v)))
        result = self._collect(dstream, 2, block=False)
        self.ssc.start()
        for name in ('a', 'b'):
            time.sleep(1)
            with open(os.path.join(d, name), "wb") as f:
                f.write(bytearray(range(10)))
        self.wait_for(result, 2)
        self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result])

    def test_union(self):
        input = [list(range(i + 1)) for i in range(3)]
        dstream = self.ssc.queueStream(input)
        dstream2 = self.ssc.queueStream(input)
        dstream3 = self.ssc.union(dstream, dstream2)
        result = self._collect(dstream3, 3)
        expected = [i * 2 for i in input]
        self.assertEqual(expected, result)

    def test_transform(self):
        dstream1 = self.ssc.queueStream([[1]])
        dstream2 = self.ssc.queueStream([[2]])
        dstream3 = self.ssc.queueStream([[3]])

        def func(rdds):
            rdd1, rdd2, rdd3 = rdds
            return rdd2.union(rdd3).union(rdd1)

        dstream = self.ssc.transform([dstream1, dstream2, dstream3], func)

        self.assertEqual([2, 3, 1], self._take(dstream, 3))

    def test_get_active(self):
        self.assertEqual(StreamingContext.getActive(), None)

        # Verify that getActive() returns the active context
        self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
        self.ssc.start()
        self.assertEqual(StreamingContext.getActive(), self.ssc)

        # Verify that getActive() returns None
        self.ssc.stop(False)
        self.assertEqual(StreamingContext.getActive(), None)

        # Verify that if the Java context is stopped, then getActive() returns None
        self.ssc = StreamingContext(self.sc, self.duration)
        self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
        self.ssc.start()
        self.assertEqual(StreamingContext.getActive(), self.ssc)
        self.ssc._jssc.stop(False)
        self.assertEqual(StreamingContext.getActive(), None)

    def test_get_active_or_create(self):
        # Test StreamingContext.getActiveOrCreate() without checkpoint data
        # See CheckpointTests for tests with checkpoint data
#.........这里部分代码省略.........
开发者ID:anitatailor,项目名称:spark,代码行数:101,代码来源:tests.py


示例19: SparkConf

# 6.2.3절

from pyspark import SparkContext, SparkConf
from pyspark.streaming.context import StreamingContext

conf = SparkConf()
sc = SparkContext(master="local[*]", appName="QueueSample", conf=conf)
ssc = StreamingContext(sc, 3)

rdd1 = sc.parallelize(["a", "b", "c"])
rdd2 = sc.parallelize(["c", "d", "e"])

queue = [rdd1, rdd2]

ds = ssc.queueStream(queue)

ds.pprint()

ssc.start()
ssc.awaitTermination()
开发者ID:oopchoi,项目名称:spark,代码行数:20,代码来源:queue_sample.py


示例20: setUp

 def setUp(self):
     self.ssc = StreamingContext(self.sc, self.duration)
开发者ID:anitatailor,项目名称:spark,代码行数:2,代码来源:tests.py



注:本文中的pyspark.streaming.context.StreamingContext类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python kafka.KafkaUtils类代码示例发布时间:2022-05-27
下一篇:
Python streaming.StreamingContext类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap