• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python pyspark.SparkConf类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.SparkConf的典型用法代码示例。如果您正苦于以下问题:Python SparkConf类的具体用法?Python SparkConf怎么用?Python SparkConf使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SparkConf类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: getSparkContext

 def getSparkContext(self, appName, master):
     print(appName)
     print(master)
     conf = SparkConf().setAppName(appName).setMaster(master)
     conf.set("spark.local.ip", "127.0.0.1")
     conf.set("spark.driver.host", "127.0.0.1")
     return SparkContext(conf=conf)
开发者ID:seoeun25,项目名称:spark-app,代码行数:7,代码来源:WordCount2.py


示例2: main

def main():
	conf = SparkConf()
	conf.set("spark.default.parallelism", "24")
	sc = SparkContext(appName="PhoneLab Preprocessing", conf=conf)

	lines = sc.textFile(data_files, use_unicode=False)

	# Create LogLine objects and filter out empty lines
	logs = lines.flatMap(ll_mapper)

	# Save in an intermediate format
	logs.saveAsTextFile(out_dir, compressionCodecClass=codec)
	return

	# Gap detection
	keyed = logs.map(ll_gap_map)
	merged = keyed.groupByKey()

	# At this point we have ((boot_id, date), [line_num]) tuples The last step.
	# is to find all the gaps within each key/tuple.
	result = merged.flatMap(find_gaps)
	gaps = result.collect()

	fd = open("/spark/gaps.json", 'w')
	fd.write(json.dumps(gaps, indent=4))
开发者ID:gurupras,项目名称:phonelab-postprocessing,代码行数:25,代码来源:preprocess.py


示例3: configureSpark

def configureSpark(app_name, master):
	
	#Configure SPARK
	conf = SparkConf().setAppName(app_name)
	conf = conf.setMaster(master)
	spark_context = SparkContext(conf=conf)
	return spark_context
开发者ID:ashishsjsu,项目名称:Spark101,代码行数:7,代码来源:explore.py


示例4: main

def main():
    spark_conf = SparkConf().setAppName("Different-Sampling data").setMaster('local[*]')
    spark_conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sc = SparkContext(conf= spark_conf)
    GA.logInConsole(0, "input file read!")
    rdd = sc.textFile("/home/fatemeh/Data/saveData.txt",  minPartitions= 500, use_unicode=False)
    rdd.unpersist()
#     print('\nNumber of Partitions for this run: ', rdd.getNumPartitions())
    vectorRDD = rdd.map(lambda line: toVector(line, splitter = ' '))
    
    GA.logInConsole(0 , "Data Vectorized!")
    ss = list()
    GA.logInConsole(-1, 'Start the ensemble')
    GA.logInConsole(-10, "GA with range 3")
    ss.append(GA.parallel_GA_main(vectorRDD,sc, 5))
#     GA.logInConsole(-10, "GA with range 4")
#     ss.append(GA.parallel_GA_main(norm,sc, 4))
#     GA.logInConsole(-10, "GA with range 5")
#     ss.append(GA.parallel_GA_main(norm,sc, 5))
#     GA.logInConsole(-10, "GA with range 3 and Sampled data set")
#    sampleRDD = norm.sample(False, 0.6, seed=10)
#    ss.append(GA.parallel_GA_main(sampleRDD,sc, 3))
    print(ss)
    #selectedSS = voted_subsapces(ss)
#     SSD.outlierDetection(vectorRDD, ss)
    GA.logInConsole(100, "\nend of program")
    sc.stop()
开发者ID:fchgithub,项目名称:OriginPySparkRepository,代码行数:27,代码来源:ODHD.py


示例5: main

def main(args):

    if len(args) < 2:
        sys.exit(1)

    # Setting the cluster configuration parameters
    spark_master = args[0]
    spark_data_file_name = args[1]
    file_path = CURR_DIR + "/" + spark_data_file_name

    conf = SparkConf()
    conf.setMaster(spark_master)
    conf.setAppName("Log Scanner")

    # Creating a Spark Context with conf file
    sc = SparkContext(conf=conf)

    txt_logs = sc.textFile(file_path).filter(lambda line: check(line))
    access_logs = txt_logs.map(lambda line: AccessLog(line))

    #  Getting response_codes from log objects and caching it
    response_codes = access_logs.map(lambda log: log.get_status()).cache()
    log_count = response_codes.count()
    print("Total Resonse Codes: " + str(log_count))
    cnt = response_codes.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
    response200 = cnt.filter(lambda x: x[0] == "200").map(lambda (x, y): y).collect()
    print("###########################")
    print("##  Success Rate : " + str(int(response200[0])*100/log_count) + " %  ##")
    print("###########################")
开发者ID:alt-code,项目名称:AutoSpark,代码行数:29,代码来源:log_scanner.py


示例6: get_default_spark_conf

def get_default_spark_conf():
    conf = SparkConf(). \
        setAppName("pyunit-test"). \
        setMaster("local-cluster[3,1,2048]"). \
        set("spark.ext.h2o.disable.ga","true"). \
        set("spark.driver.memory", "2g"). \
        set("spark.executor.memory", "2g"). \
        set("spark.ext.h2o.client.log.level", "DEBUG"). \
        set("spark.ext.h2o.repl.enabled", "false"). \
        set("spark.task.maxFailures", "1"). \
        set("spark.rpc.numRetries", "1"). \
        set("spark.deploy.maxExecutorRetries", "1"). \
        set("spark.network.timeout", "360s"). \
        set("spark.worker.timeout", "360"). \
        set("spark.ext.h2o.backend.cluster.mode", ExternalClusterTestHelper.cluster_mode()). \
        set("spark.ext.h2o.cloud.name", ExternalClusterTestHelper.unique_cloud_name("test")). \
        set("spark.ext.h2o.external.start.mode", os.getenv("spark.ext.h2o.external.start.mode", "manual")) .\
        set("spark.sql.warehouse.dir", "file:" + os.path.join(os.getcwd(), "spark-warehouse"))


    if ExternalClusterTestHelper.tests_in_external_mode():
        conf.set("spark.ext.h2o.client.ip", ExternalClusterTestHelper.local_ip())
        conf.set("spark.ext.h2o.external.cluster.num.h2o.nodes", "2")

    return conf
开发者ID:seuwangcy,项目名称:sparkling-water,代码行数:25,代码来源:test_utils.py


示例7: start

def start():
    sconf = SparkConf()
    sconf.set('spark.cores.max', 2)
    sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
    ssc = StreamingContext(sc, 2)

    brokers = "192.192.0.27:9092"
    topics = ['topic7']

    kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})

    lines1 = kafkaStreams_lines.map(lambda x: x[1])  # 注意 取tuple下的第二个即为接收到的kafka流

    words = lines1.flatMap(lambda line: line.split(" "))

    pairs = words.map(lambda word: (word, 1))

    wordcounts = pairs.reduceByKey(lambda x, y: x + y)

    wordcounts.saveAsTextFiles("/var/lib/hadoop-hdfs/spark-libin/kafka")

    wordcounts.pprint()
    # 统计生成的随机数的分布情况
    ssc.start()  # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:25,代码来源:kafka_streaming_direct.py


示例8: __call__

    def __call__(self):
        log.info("Processing wiki dump: %s ...", self.wk_dump_path)
        c = SparkConf().setAppName("Wikijson")

        log.info("Using spark master: %s", c.get("spark.master"))
        sc = SparkContext(conf=c)

        if os.path.isdir(self.output_path):
            log.warn("Writing over output path: %s", self.output_path)
            shutil.rmtree(self.output_path)

        # rdd of tuples: (title, namespace, id, redirect, content)
        pages = wikispark.get_pages_from_wikidump(sc, self.wk_dump_path)
        pages.cache()

        articles = wikispark.get_articles_from_pages(pages)
        redirects = wikispark.get_redirects_from_pages(pages)

        if self.redirect_links:
            articles = wikispark.redirect_article_links(articles, redirects)

        articles.map(self.article_to_json).map(json.dumps).saveAsTextFile(
            self.output_path, "org.apache.hadoop.io.compress.GzipCodec"
        )

        log.info("Done.")
开发者ID:pombredanne,项目名称:wikijson,代码行数:26,代码来源:process.py


示例9: main

def main():
    """
    Main entry point of the application
    """

    # Create spark configuration and spark context
    include_path = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'preprocessing.py'))
    conf = SparkConf()
    conf.set('spark.executor.memory', '1500m')
    conf.setAppName("Generating predictions")
    sc = SparkContext(conf=conf, pyFiles=[include_path])

    # Set S3 configuration
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", os.environ['AWS_ACCESS_KEY'])
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", os.environ['AWS_SECRET_KEY'])

    # Single-pass predictions
    fast_predict(sc, file_input="s3n://twitter-stream-data/twitter-*",
                 file_output="s3n://twitter-stream-predictions/final",
                 sports_model="PyTwitterNews/models/sports.model",
                 politics_model="PyTwitterNews/models/politics.model",
                 technology_model="PyTwitterNews/models/technology.model")

    # Stop application
    sc.stop()
开发者ID:alialavia,项目名称:TwitterNews,代码行数:25,代码来源:predict.py


示例10: main

def main():
    parser = argparse.ArgumentParser(
        description='process some log messages, storing them and signaling '
                    'a rest server')
    parser.add_argument('--mongo', help='the mongodb url',
                        required=True)
    parser.add_argument('--rest', help='the rest endpoint to signal',
                        required=True)
    parser.add_argument('--port', help='the port to receive from '
                        '(default: 1984)',
                        default=1984, type=int)
    parser.add_argument('--appname', help='the name of the spark application '
                        '(default: SparkharaLogCounter)',
                        default='SparkharaLogCounter')
    parser.add_argument('--master',
                        help='the master url for the spark cluster')
    parser.add_argument('--socket',
                        help='the socket to attach for streaming text data '
                        '(default: caravan-pathfinder)',
                        default='caravan-pathfinder')
    args = parser.parse_args()
    mongo_url = args.mongo
    rest_url = args.rest

    sconf = SparkConf().setAppName(args.appname)
    if args.master:
        sconf.setMaster(args.master)
    sc = SparkContext(conf=sconf)
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(args.socket, args.port)
    lines.foreachRDD(lambda rdd: process_generic(rdd, mongo_url, rest_url))

    ssc.start()
    ssc.awaitTermination()
开发者ID:mattf,项目名称:sparkhara-sources,代码行数:35,代码来源:caravan_master.py


示例11: setUpClass

    def setUpClass(cls):

        class_name = cls.__name__
        conf = SparkConf()
        conf.set('spark.app.name', 'class_name')

        # Read the spark configuration and update the spark conf
        test_spark_config = ConfigParser.ConfigParser()
        test_spark_config.read('test_config.cfg')
        test_spark_config.sections()
        configs = dict(test_spark_config.items('spark_conf_test_generic'))
        for k, v in configs.items():
            conf.set(k, v)
        cls.spark_test_configs = configs
        # Create the spark context
        cls.sc = SparkContext(conf=conf)
        if 'PYSPARK_DRIVER_PYTHON' in configs.keys():
            cls.sc.pythonExec = configs['PYSPARK_DRIVER_PYTHON']
        else:
            cls.sc.pythonExec = 'python2.7'

        logger = cls.sc._jvm.org.apache.log4j
        logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
        logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)

        logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s: %(message)s')
        cls.logger = logging.getLogger(__name__)
        cls.logger.setLevel(logging.DEBUG)
开发者ID:bossjones,项目名称:sparkonda,代码行数:28,代码来源:test_sparkonda.py


示例12: stackexchange_xml_spark_job

def stackexchange_xml_spark_job():
    server = bluebook_conf.HDFS_FQDN
    conf = SparkConf()

    xml_file_address = "hdfs://" + server + "/" +\
                       bluebook_conf.STACKEXCHANGE_XML_FOLDER_NAME +\
                       bluebook_conf.STACKEXCHANGE_XML_FILE_NAME
                         
    json_ques_folder_address = "hdfs://" + server + "/" +\
                               bluebook_conf.STACKEXCHANGE_JSON_QUES_FOLDER_NAME
    json_ans_folder_address = "hdfs://" + server + "/" +\
                              bluebook_conf.STACKEXCHANGE_JSON_ANS_FOLDER_NAME
        
    conf.setAppName('stackexchange_xml_spark_job')
    spark_context = SparkContext(conf=conf)
        
    file = spark_context.textFile(xml_file_address)

    # Ques and Ans files are stored seperately depending of their 'posttypeid'
    # Ques -> posttypeid == 1
    # Ans -> posttypeid == 2
    ques = file.map(stackexchange_xml_mapper)\
               .filter(lambda dic: 'posttypeid' in dic.keys())\
               .filter(lambda dic: dic['posttypeid'] == '1')\
               .map(lambda d: jsoner(d))
    ans = file.map(stackexchange_xml_mapper)\
               .filter(lambda dic: 'posttypeid' in dic.keys())\
               .filter(lambda dic: dic['posttypeid'] == '2')\
               .map(lambda d: jsoner(d))
    ques.saveAsTextFile(json_ques_folder_address)
    ans.saveAsTextFile(json_ans_folder_address)
开发者ID:nave91,项目名称:rebot,代码行数:31,代码来源:parser_app.py


示例13: start

def start():
    sconf = SparkConf()
    sconf.set('spark.cores.max', 2)
    sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
    ssc = StreamingContext(sc, 2)

    brokers = "localhost:9092"
    topics = ['test']

    kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})

    lines1 = kafkaStreams_lines.map(lambda x: x[1])  # 注意 取tuple下的第二个即为接收到的kafka流

    words = lines1.flatMap(lambda line: line.split(" "))

    pairs = words.map(lambda word: (word, 1))

    wordcounts = pairs.reduceByKey(lambda x, y: x + y)

    print(wordcounts)

    kafkaStreams_lines.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)

    wordcounts.pprint()
    # 统计生成的随机数的分布情况
    ssc.start()  # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:27,代码来源:kafka-direct.py


示例14: setUp

 def setUp(self):
     conf = SparkConf().setAppName('testing').setMaster('local[2]').set('spark.driver.host', 'localhost')
     conf.set('spark.ui.showConsoleProgress', False)
     self.session = SparkSession.builder.config(conf=conf).getOrCreate()
     self.test_data = [
         ('Ricardo', 'engineering', 2),
         ('Tisa', 'sales', 3),
         ('Sheree', 'marketing', 4), 
         ('Chantelle', 'engineering', 5),
         ('Kylee', 'finance', 2),
         ('Tamatha', 'marketing', 5),
         ('Trena', 'engineering', 2),
         ('Arica', 'engineering', 1),
         ('Santina', 'finance', 2),
         ('Daria', 'marketing', 1),
         ('Magnolia', 'sales', 2),
         ('Antonina', 'finance', 1),
         ('Sumiko', 'engineering', 1),
         ('Carmen', 'sales', 2),
         ('Delois', 'engineering', 1),
         ('Luetta', 'marketing', 3),
         ('Yessenia', 'sales', 1),
         ('Petra', 'engineering', 3),
         ('Charisse', 'engineering', 4),
         ('Lillian', 'engineering', 3),
         ('Wei', 'engineering', 2),
         ('Lahoma', 'sales', 2),
         ('Lucilla', 'marketing', 1),
         ('Stephaine', 'finance', 2),
     ]
开发者ID:bfemiano,项目名称:misc_scripts,代码行数:30,代码来源:test_analysis_teacher.py


示例15: configureSpark

def configureSpark():
	#Configure SPARK
	conf = SparkConf().setAppName("a")
	conf = conf.setMaster("local[*]")
	conf = conf.set("spark.executor.memory", "2g").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer", "256").set("spark.akka.frameSize", "500").set("spark.rpc.askTimeout", "30").set('spark.executor.cores', '4').set('spark.driver.memory','2g')

	sc = SparkContext(conf=conf)
	return sc
开发者ID:ashishsjsu,项目名称:Spark101,代码行数:8,代码来源:extraction2.py


示例16: read_conf

def read_conf():
    """
    Setting up spark contexts
    """
    conf = SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("Testing")
    return conf
开发者ID:Ather23,项目名称:machine_learning,代码行数:8,代码来源:testing_spark.py


示例17: _test_broadcast_on_driver

 def _test_broadcast_on_driver(self, *extra_confs):
     conf = SparkConf()
     for key, value in extra_confs:
         conf.set(key, value)
     conf.setMaster("local-cluster[2,1,1024]")
     self.sc = SparkContext(conf=conf)
     bs = self.sc.broadcast(value=5)
     self.assertEqual(5, bs.value)
开发者ID:Brett-A,项目名称:spark,代码行数:8,代码来源:test_broadcast.py


示例18: OWSparkContext

class OWSparkContext(SharedSparkContext, widget.OWWidget):
    priority = 0
    name = "Context"
    description = "Create a shared Spark (sc) and Hive (hc) Contexts"
    icon = "../icons/spark.png"

    want_main_area = False
    resizing_enabled = True

    conf = None

    def __init__(self):
        super().__init__()

        # The main label of the Control's GUI.
        # gui.label(self.controlArea, self, "Spark Context")

        self.conf = SparkConf()
        all_prefedined = dict(self.conf.getAll())
        # Create parameters Box.
        box = gui.widgetBox(self.controlArea, "Spark Application", addSpace = True)

        self.gui_parameters = OrderedDict()

        main_parameters = OrderedDict()
        main_parameters['spark.app.name'] = 'OrangeSpark'
        main_parameters['spark.master'] = 'yarn-client'
        main_parameters["spark.executor.instances"] = "8"
        main_parameters["spark.executor.cores"] = "4"
        main_parameters["spark.executor.memory"] = "8g"
        main_parameters["spark.driver.cores"] = "4"
        main_parameters["spark.driver.memory"] = "2g"
        main_parameters["spark.logConf"] = "false"
        main_parameters["spark.app.id"] = "dummy"

        for k, v in main_parameters.items():
            default_value = all_prefedined.setdefault(k, v)
            self.gui_parameters[k] = GuiParam(parent_widget = box, label = k, default_value = v)
            all_prefedined.pop(k)

        for k, v in all_prefedined.items():
            self.gui_parameters[k] = GuiParam(parent_widget = box, label = k, default_value = str(v))

        action_box = gui.widgetBox(box)
        # Action Button
        self.create_sc_btn = gui.button(action_box, self, label = 'Submit', callback = self.create_context)

    def onDeleteWidget(self):
        if self.sc:
            self.sc.stop()

    def create_context(self):

        for key, parameter in self.gui_parameters.items():
            self.conf.set(key, parameter.get_value())

        self.sc = SparkContext(conf = self.conf)
        self.hc = HiveContext(self.sc)
开发者ID:kernc,项目名称:Orange3-Spark,代码行数:58,代码来源:spark_context.py


示例19: main

def main():
    spark_conf = SparkConf().setAppName("Different-Sampling data")
    spark_conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sc = SparkContext(conf= spark_conf)
    rdd = load_data(sc)  
    print(rdd.getNumPartitions())
    parallel_GA_main(sc, rdd, 5)
    
    sc.stop()
开发者ID:fchgithub,项目名称:OriginPySparkRepository,代码行数:9,代码来源:ParallelGADisributed.py


示例20: create_sc

def create_sc(master=None,
              py_files=None,
              spark_home=None,
              sparktk_home=None,
              pyspark_submit_args=None,
              app_name="sparktk",
              extra_conf=None):
    """
    Creates a SparkContext with sparktk defaults

    Many parameters can be overwritten

    :param master: spark master setting
    :param py_files: list of str of paths to python dependencies; Note the the current python
    package will be freshly zipped up and put in a tmp folder for shipping by spark, and then removed
    :param spark_home: override $SPARK_HOME
    :param sparktk_home: override $SPARKTK_HOME
    :param app_name: name of spark app
    :param extra_conf: dict for any extra spark conf settings, for ex. {"spark.hadoop.fs.default.name": "file:///"}
    :return: pyspark SparkContext
    """

    set_env_for_sparktk(spark_home, sparktk_home, pyspark_submit_args)

    # bug/behavior of PYSPARK_SUBMIT_ARGS requires 'pyspark-shell' on the end --check in future spark versions
    set_env('PYSPARK_SUBMIT_ARGS', ' '.join([os.environ['PYSPARK_SUBMIT_ARGS'], 'pyspark-shell']))

    if not master:
        master = default_spark_master
        logger.info("sparktk.create_sc() master not specified, setting to %s", master)

    conf = SparkConf().setMaster(master).setAppName(app_name)
    if extra_conf:
        for k, v in extra_conf.items():
            conf = conf.set(k, v)

    if not py_files:
        py_files = []

    # zip up the relevant pieces of sparktk and put it in the py_files...
    path = zip_sparktk()
    tmp_dir = os.path.dirname(path)
    logger.info("sparkconf created tmp dir for sparktk.zip %s" % tmp_dir)
    atexit.register(shutil.rmtree, tmp_dir)  # make python delete this folder when it shuts down

    py_files.append(path)

    msg = '\n'.join(["=" * 80,
                     "Creating SparkContext with the following SparkConf",
                     "pyFiles=%s" % str(py_files),
                     conf.toDebugString(),
                     "=" * 80])
    logger.info(msg)

    sc = SparkContext(conf=conf, pyFiles=py_files)

    return sc
开发者ID:AayushiD,项目名称:spark-tk,代码行数:57,代码来源:sparkconf.py



注:本文中的pyspark.SparkConf类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pyspark.SparkContext类代码示例发布时间:2022-05-26
下一篇:
Python pyspark.SQLContext类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap