本文整理汇总了Python中pyspark.SparkConf类的典型用法代码示例。如果您正苦于以下问题:Python SparkConf类的具体用法?Python SparkConf怎么用?Python SparkConf使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SparkConf类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: getSparkContext
def getSparkContext(self, appName, master):
print(appName)
print(master)
conf = SparkConf().setAppName(appName).setMaster(master)
conf.set("spark.local.ip", "127.0.0.1")
conf.set("spark.driver.host", "127.0.0.1")
return SparkContext(conf=conf)
开发者ID:seoeun25,项目名称:spark-app,代码行数:7,代码来源:WordCount2.py
示例2: main
def main():
conf = SparkConf()
conf.set("spark.default.parallelism", "24")
sc = SparkContext(appName="PhoneLab Preprocessing", conf=conf)
lines = sc.textFile(data_files, use_unicode=False)
# Create LogLine objects and filter out empty lines
logs = lines.flatMap(ll_mapper)
# Save in an intermediate format
logs.saveAsTextFile(out_dir, compressionCodecClass=codec)
return
# Gap detection
keyed = logs.map(ll_gap_map)
merged = keyed.groupByKey()
# At this point we have ((boot_id, date), [line_num]) tuples The last step.
# is to find all the gaps within each key/tuple.
result = merged.flatMap(find_gaps)
gaps = result.collect()
fd = open("/spark/gaps.json", 'w')
fd.write(json.dumps(gaps, indent=4))
开发者ID:gurupras,项目名称:phonelab-postprocessing,代码行数:25,代码来源:preprocess.py
示例3: configureSpark
def configureSpark(app_name, master):
#Configure SPARK
conf = SparkConf().setAppName(app_name)
conf = conf.setMaster(master)
spark_context = SparkContext(conf=conf)
return spark_context
开发者ID:ashishsjsu,项目名称:Spark101,代码行数:7,代码来源:explore.py
示例4: main
def main():
spark_conf = SparkConf().setAppName("Different-Sampling data").setMaster('local[*]')
spark_conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = SparkContext(conf= spark_conf)
GA.logInConsole(0, "input file read!")
rdd = sc.textFile("/home/fatemeh/Data/saveData.txt", minPartitions= 500, use_unicode=False)
rdd.unpersist()
# print('\nNumber of Partitions for this run: ', rdd.getNumPartitions())
vectorRDD = rdd.map(lambda line: toVector(line, splitter = ' '))
GA.logInConsole(0 , "Data Vectorized!")
ss = list()
GA.logInConsole(-1, 'Start the ensemble')
GA.logInConsole(-10, "GA with range 3")
ss.append(GA.parallel_GA_main(vectorRDD,sc, 5))
# GA.logInConsole(-10, "GA with range 4")
# ss.append(GA.parallel_GA_main(norm,sc, 4))
# GA.logInConsole(-10, "GA with range 5")
# ss.append(GA.parallel_GA_main(norm,sc, 5))
# GA.logInConsole(-10, "GA with range 3 and Sampled data set")
# sampleRDD = norm.sample(False, 0.6, seed=10)
# ss.append(GA.parallel_GA_main(sampleRDD,sc, 3))
print(ss)
#selectedSS = voted_subsapces(ss)
# SSD.outlierDetection(vectorRDD, ss)
GA.logInConsole(100, "\nend of program")
sc.stop()
开发者ID:fchgithub,项目名称:OriginPySparkRepository,代码行数:27,代码来源:ODHD.py
示例5: main
def main(args):
if len(args) < 2:
sys.exit(1)
# Setting the cluster configuration parameters
spark_master = args[0]
spark_data_file_name = args[1]
file_path = CURR_DIR + "/" + spark_data_file_name
conf = SparkConf()
conf.setMaster(spark_master)
conf.setAppName("Log Scanner")
# Creating a Spark Context with conf file
sc = SparkContext(conf=conf)
txt_logs = sc.textFile(file_path).filter(lambda line: check(line))
access_logs = txt_logs.map(lambda line: AccessLog(line))
# Getting response_codes from log objects and caching it
response_codes = access_logs.map(lambda log: log.get_status()).cache()
log_count = response_codes.count()
print("Total Resonse Codes: " + str(log_count))
cnt = response_codes.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
response200 = cnt.filter(lambda x: x[0] == "200").map(lambda (x, y): y).collect()
print("###########################")
print("## Success Rate : " + str(int(response200[0])*100/log_count) + " % ##")
print("###########################")
开发者ID:alt-code,项目名称:AutoSpark,代码行数:29,代码来源:log_scanner.py
示例6: get_default_spark_conf
def get_default_spark_conf():
conf = SparkConf(). \
setAppName("pyunit-test"). \
setMaster("local-cluster[3,1,2048]"). \
set("spark.ext.h2o.disable.ga","true"). \
set("spark.driver.memory", "2g"). \
set("spark.executor.memory", "2g"). \
set("spark.ext.h2o.client.log.level", "DEBUG"). \
set("spark.ext.h2o.repl.enabled", "false"). \
set("spark.task.maxFailures", "1"). \
set("spark.rpc.numRetries", "1"). \
set("spark.deploy.maxExecutorRetries", "1"). \
set("spark.network.timeout", "360s"). \
set("spark.worker.timeout", "360"). \
set("spark.ext.h2o.backend.cluster.mode", ExternalClusterTestHelper.cluster_mode()). \
set("spark.ext.h2o.cloud.name", ExternalClusterTestHelper.unique_cloud_name("test")). \
set("spark.ext.h2o.external.start.mode", os.getenv("spark.ext.h2o.external.start.mode", "manual")) .\
set("spark.sql.warehouse.dir", "file:" + os.path.join(os.getcwd(), "spark-warehouse"))
if ExternalClusterTestHelper.tests_in_external_mode():
conf.set("spark.ext.h2o.client.ip", ExternalClusterTestHelper.local_ip())
conf.set("spark.ext.h2o.external.cluster.num.h2o.nodes", "2")
return conf
开发者ID:seuwangcy,项目名称:sparkling-water,代码行数:25,代码来源:test_utils.py
示例7: start
def start():
sconf = SparkConf()
sconf.set('spark.cores.max', 2)
sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
ssc = StreamingContext(sc, 2)
brokers = "192.192.0.27:9092"
topics = ['topic7']
kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})
lines1 = kafkaStreams_lines.map(lambda x: x[1]) # 注意 取tuple下的第二个即为接收到的kafka流
words = lines1.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordcounts = pairs.reduceByKey(lambda x, y: x + y)
wordcounts.saveAsTextFiles("/var/lib/hadoop-hdfs/spark-libin/kafka")
wordcounts.pprint()
# 统计生成的随机数的分布情况
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:25,代码来源:kafka_streaming_direct.py
示例8: __call__
def __call__(self):
log.info("Processing wiki dump: %s ...", self.wk_dump_path)
c = SparkConf().setAppName("Wikijson")
log.info("Using spark master: %s", c.get("spark.master"))
sc = SparkContext(conf=c)
if os.path.isdir(self.output_path):
log.warn("Writing over output path: %s", self.output_path)
shutil.rmtree(self.output_path)
# rdd of tuples: (title, namespace, id, redirect, content)
pages = wikispark.get_pages_from_wikidump(sc, self.wk_dump_path)
pages.cache()
articles = wikispark.get_articles_from_pages(pages)
redirects = wikispark.get_redirects_from_pages(pages)
if self.redirect_links:
articles = wikispark.redirect_article_links(articles, redirects)
articles.map(self.article_to_json).map(json.dumps).saveAsTextFile(
self.output_path, "org.apache.hadoop.io.compress.GzipCodec"
)
log.info("Done.")
开发者ID:pombredanne,项目名称:wikijson,代码行数:26,代码来源:process.py
示例9: main
def main():
"""
Main entry point of the application
"""
# Create spark configuration and spark context
include_path = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'preprocessing.py'))
conf = SparkConf()
conf.set('spark.executor.memory', '1500m')
conf.setAppName("Generating predictions")
sc = SparkContext(conf=conf, pyFiles=[include_path])
# Set S3 configuration
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", os.environ['AWS_ACCESS_KEY'])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", os.environ['AWS_SECRET_KEY'])
# Single-pass predictions
fast_predict(sc, file_input="s3n://twitter-stream-data/twitter-*",
file_output="s3n://twitter-stream-predictions/final",
sports_model="PyTwitterNews/models/sports.model",
politics_model="PyTwitterNews/models/politics.model",
technology_model="PyTwitterNews/models/technology.model")
# Stop application
sc.stop()
开发者ID:alialavia,项目名称:TwitterNews,代码行数:25,代码来源:predict.py
示例10: main
def main():
parser = argparse.ArgumentParser(
description='process some log messages, storing them and signaling '
'a rest server')
parser.add_argument('--mongo', help='the mongodb url',
required=True)
parser.add_argument('--rest', help='the rest endpoint to signal',
required=True)
parser.add_argument('--port', help='the port to receive from '
'(default: 1984)',
default=1984, type=int)
parser.add_argument('--appname', help='the name of the spark application '
'(default: SparkharaLogCounter)',
default='SparkharaLogCounter')
parser.add_argument('--master',
help='the master url for the spark cluster')
parser.add_argument('--socket',
help='the socket to attach for streaming text data '
'(default: caravan-pathfinder)',
default='caravan-pathfinder')
args = parser.parse_args()
mongo_url = args.mongo
rest_url = args.rest
sconf = SparkConf().setAppName(args.appname)
if args.master:
sconf.setMaster(args.master)
sc = SparkContext(conf=sconf)
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(args.socket, args.port)
lines.foreachRDD(lambda rdd: process_generic(rdd, mongo_url, rest_url))
ssc.start()
ssc.awaitTermination()
开发者ID:mattf,项目名称:sparkhara-sources,代码行数:35,代码来源:caravan_master.py
示例11: setUpClass
def setUpClass(cls):
class_name = cls.__name__
conf = SparkConf()
conf.set('spark.app.name', 'class_name')
# Read the spark configuration and update the spark conf
test_spark_config = ConfigParser.ConfigParser()
test_spark_config.read('test_config.cfg')
test_spark_config.sections()
configs = dict(test_spark_config.items('spark_conf_test_generic'))
for k, v in configs.items():
conf.set(k, v)
cls.spark_test_configs = configs
# Create the spark context
cls.sc = SparkContext(conf=conf)
if 'PYSPARK_DRIVER_PYTHON' in configs.keys():
cls.sc.pythonExec = configs['PYSPARK_DRIVER_PYTHON']
else:
cls.sc.pythonExec = 'python2.7'
logger = cls.sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s: %(message)s')
cls.logger = logging.getLogger(__name__)
cls.logger.setLevel(logging.DEBUG)
开发者ID:bossjones,项目名称:sparkonda,代码行数:28,代码来源:test_sparkonda.py
示例12: stackexchange_xml_spark_job
def stackexchange_xml_spark_job():
server = bluebook_conf.HDFS_FQDN
conf = SparkConf()
xml_file_address = "hdfs://" + server + "/" +\
bluebook_conf.STACKEXCHANGE_XML_FOLDER_NAME +\
bluebook_conf.STACKEXCHANGE_XML_FILE_NAME
json_ques_folder_address = "hdfs://" + server + "/" +\
bluebook_conf.STACKEXCHANGE_JSON_QUES_FOLDER_NAME
json_ans_folder_address = "hdfs://" + server + "/" +\
bluebook_conf.STACKEXCHANGE_JSON_ANS_FOLDER_NAME
conf.setAppName('stackexchange_xml_spark_job')
spark_context = SparkContext(conf=conf)
file = spark_context.textFile(xml_file_address)
# Ques and Ans files are stored seperately depending of their 'posttypeid'
# Ques -> posttypeid == 1
# Ans -> posttypeid == 2
ques = file.map(stackexchange_xml_mapper)\
.filter(lambda dic: 'posttypeid' in dic.keys())\
.filter(lambda dic: dic['posttypeid'] == '1')\
.map(lambda d: jsoner(d))
ans = file.map(stackexchange_xml_mapper)\
.filter(lambda dic: 'posttypeid' in dic.keys())\
.filter(lambda dic: dic['posttypeid'] == '2')\
.map(lambda d: jsoner(d))
ques.saveAsTextFile(json_ques_folder_address)
ans.saveAsTextFile(json_ans_folder_address)
开发者ID:nave91,项目名称:rebot,代码行数:31,代码来源:parser_app.py
示例13: start
def start():
sconf = SparkConf()
sconf.set('spark.cores.max', 2)
sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf)
ssc = StreamingContext(sc, 2)
brokers = "localhost:9092"
topics = ['test']
kafkaStreams_lines = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})
lines1 = kafkaStreams_lines.map(lambda x: x[1]) # 注意 取tuple下的第二个即为接收到的kafka流
words = lines1.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordcounts = pairs.reduceByKey(lambda x, y: x + y)
print(wordcounts)
kafkaStreams_lines.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
wordcounts.pprint()
# 统计生成的随机数的分布情况
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
开发者ID:blair1,项目名称:hadoop-spark,代码行数:27,代码来源:kafka-direct.py
示例14: setUp
def setUp(self):
conf = SparkConf().setAppName('testing').setMaster('local[2]').set('spark.driver.host', 'localhost')
conf.set('spark.ui.showConsoleProgress', False)
self.session = SparkSession.builder.config(conf=conf).getOrCreate()
self.test_data = [
('Ricardo', 'engineering', 2),
('Tisa', 'sales', 3),
('Sheree', 'marketing', 4),
('Chantelle', 'engineering', 5),
('Kylee', 'finance', 2),
('Tamatha', 'marketing', 5),
('Trena', 'engineering', 2),
('Arica', 'engineering', 1),
('Santina', 'finance', 2),
('Daria', 'marketing', 1),
('Magnolia', 'sales', 2),
('Antonina', 'finance', 1),
('Sumiko', 'engineering', 1),
('Carmen', 'sales', 2),
('Delois', 'engineering', 1),
('Luetta', 'marketing', 3),
('Yessenia', 'sales', 1),
('Petra', 'engineering', 3),
('Charisse', 'engineering', 4),
('Lillian', 'engineering', 3),
('Wei', 'engineering', 2),
('Lahoma', 'sales', 2),
('Lucilla', 'marketing', 1),
('Stephaine', 'finance', 2),
]
开发者ID:bfemiano,项目名称:misc_scripts,代码行数:30,代码来源:test_analysis_teacher.py
示例15: configureSpark
def configureSpark():
#Configure SPARK
conf = SparkConf().setAppName("a")
conf = conf.setMaster("local[*]")
conf = conf.set("spark.executor.memory", "2g").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer", "256").set("spark.akka.frameSize", "500").set("spark.rpc.askTimeout", "30").set('spark.executor.cores', '4').set('spark.driver.memory','2g')
sc = SparkContext(conf=conf)
return sc
开发者ID:ashishsjsu,项目名称:Spark101,代码行数:8,代码来源:extraction2.py
示例16: read_conf
def read_conf():
"""
Setting up spark contexts
"""
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Testing")
return conf
开发者ID:Ather23,项目名称:machine_learning,代码行数:8,代码来源:testing_spark.py
示例17: _test_broadcast_on_driver
def _test_broadcast_on_driver(self, *extra_confs):
conf = SparkConf()
for key, value in extra_confs:
conf.set(key, value)
conf.setMaster("local-cluster[2,1,1024]")
self.sc = SparkContext(conf=conf)
bs = self.sc.broadcast(value=5)
self.assertEqual(5, bs.value)
开发者ID:Brett-A,项目名称:spark,代码行数:8,代码来源:test_broadcast.py
示例18: OWSparkContext
class OWSparkContext(SharedSparkContext, widget.OWWidget):
priority = 0
name = "Context"
description = "Create a shared Spark (sc) and Hive (hc) Contexts"
icon = "../icons/spark.png"
want_main_area = False
resizing_enabled = True
conf = None
def __init__(self):
super().__init__()
# The main label of the Control's GUI.
# gui.label(self.controlArea, self, "Spark Context")
self.conf = SparkConf()
all_prefedined = dict(self.conf.getAll())
# Create parameters Box.
box = gui.widgetBox(self.controlArea, "Spark Application", addSpace = True)
self.gui_parameters = OrderedDict()
main_parameters = OrderedDict()
main_parameters['spark.app.name'] = 'OrangeSpark'
main_parameters['spark.master'] = 'yarn-client'
main_parameters["spark.executor.instances"] = "8"
main_parameters["spark.executor.cores"] = "4"
main_parameters["spark.executor.memory"] = "8g"
main_parameters["spark.driver.cores"] = "4"
main_parameters["spark.driver.memory"] = "2g"
main_parameters["spark.logConf"] = "false"
main_parameters["spark.app.id"] = "dummy"
for k, v in main_parameters.items():
default_value = all_prefedined.setdefault(k, v)
self.gui_parameters[k] = GuiParam(parent_widget = box, label = k, default_value = v)
all_prefedined.pop(k)
for k, v in all_prefedined.items():
self.gui_parameters[k] = GuiParam(parent_widget = box, label = k, default_value = str(v))
action_box = gui.widgetBox(box)
# Action Button
self.create_sc_btn = gui.button(action_box, self, label = 'Submit', callback = self.create_context)
def onDeleteWidget(self):
if self.sc:
self.sc.stop()
def create_context(self):
for key, parameter in self.gui_parameters.items():
self.conf.set(key, parameter.get_value())
self.sc = SparkContext(conf = self.conf)
self.hc = HiveContext(self.sc)
开发者ID:kernc,项目名称:Orange3-Spark,代码行数:58,代码来源:spark_context.py
示例19: main
def main():
spark_conf = SparkConf().setAppName("Different-Sampling data")
spark_conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = SparkContext(conf= spark_conf)
rdd = load_data(sc)
print(rdd.getNumPartitions())
parallel_GA_main(sc, rdd, 5)
sc.stop()
开发者ID:fchgithub,项目名称:OriginPySparkRepository,代码行数:9,代码来源:ParallelGADisributed.py
示例20: create_sc
def create_sc(master=None,
py_files=None,
spark_home=None,
sparktk_home=None,
pyspark_submit_args=None,
app_name="sparktk",
extra_conf=None):
"""
Creates a SparkContext with sparktk defaults
Many parameters can be overwritten
:param master: spark master setting
:param py_files: list of str of paths to python dependencies; Note the the current python
package will be freshly zipped up and put in a tmp folder for shipping by spark, and then removed
:param spark_home: override $SPARK_HOME
:param sparktk_home: override $SPARKTK_HOME
:param app_name: name of spark app
:param extra_conf: dict for any extra spark conf settings, for ex. {"spark.hadoop.fs.default.name": "file:///"}
:return: pyspark SparkContext
"""
set_env_for_sparktk(spark_home, sparktk_home, pyspark_submit_args)
# bug/behavior of PYSPARK_SUBMIT_ARGS requires 'pyspark-shell' on the end --check in future spark versions
set_env('PYSPARK_SUBMIT_ARGS', ' '.join([os.environ['PYSPARK_SUBMIT_ARGS'], 'pyspark-shell']))
if not master:
master = default_spark_master
logger.info("sparktk.create_sc() master not specified, setting to %s", master)
conf = SparkConf().setMaster(master).setAppName(app_name)
if extra_conf:
for k, v in extra_conf.items():
conf = conf.set(k, v)
if not py_files:
py_files = []
# zip up the relevant pieces of sparktk and put it in the py_files...
path = zip_sparktk()
tmp_dir = os.path.dirname(path)
logger.info("sparkconf created tmp dir for sparktk.zip %s" % tmp_dir)
atexit.register(shutil.rmtree, tmp_dir) # make python delete this folder when it shuts down
py_files.append(path)
msg = '\n'.join(["=" * 80,
"Creating SparkContext with the following SparkConf",
"pyFiles=%s" % str(py_files),
conf.toDebugString(),
"=" * 80])
logger.info(msg)
sc = SparkContext(conf=conf, pyFiles=py_files)
return sc
开发者ID:AayushiD,项目名称:spark-tk,代码行数:57,代码来源:sparkconf.py
注:本文中的pyspark.SparkConf类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论