• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python pyspark.SparkContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.SparkContext的典型用法代码示例。如果您正苦于以下问题:Python SparkContext类的具体用法?Python SparkContext怎么用?Python SparkContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SparkContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: run

 def run(self):
     sc = SparkContext("local", "gender")
     sqlContext = SQLContext(sc)
     #StringType =(str, unicode)
     _out = self.output().open('w')
     #lines = sc.textFile("myUser.csv")
     #fobj = self.input().open("r")
     #lines = sc.textFile(fobj.name)
     print(type(self.required_tasks['insert_source'].output()))
     print(self.required_tasks['insert_source'])
     #print(self.input()['insert_source'].input())
     lines = sc.textFile("myUser.csv")
     parts = lines.map(lambda l: l.split(","))
     users = parts.map(lambda p: (p[0], p[1],p[2],p[3],p[4],p[5],p[6],p[7],
         p[8],p[9],p[10],p[11],p[12],p[13],p[14],p[15],p[16],p[17],p[18],p[19]))
     schemaString = "userId lmsUserId lmsName orgName name gender registrationDate emailId mothertounge highestEduDegree goals city state active firstAccesDate lastAccessDate allowCert yearOfBirth pincode aadharId"
     print(schemaString)
     _out.write(schemaString )
     fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
     schema = StructType(fields)
     #schemaUser = sqlContext.createDataFrame(users, schema)
     schemaUser = sqlContext.applySchema(users, schema)
     schemaUser.registerTempTable("users")
     results = sqlContext.sql("SELECT gender FROM users")
     genders = results.map(lambda p : (p,1))
     counts = genders.reduceByKey(lambda a, b: a + b) #.map(lambda t : ("Gender " + t(0) + " No " + t(1))).collect()
     for name in counts.collect():
         _out.write(str(name))
     _out.close()
开发者ID:Zarana-Parekh,项目名称:analytics,代码行数:29,代码来源:genderTask.py


示例2: main

def main():
	inputs = sys.argv[1]
	output = sys.argv[2] 

	conf = SparkConf().setAppName('scalable multiplication')
	sc = SparkContext(conf=conf)
	assert sc.version >= '1.5.1'

	text = sc.textFile(inputs)

	# sbaronia - Split the row to get individual numbers
	row = text.map(lambda line: line.split())
	
	# sbaronia - calling element_wise_product on individual line 
	# and then adding all the returned 10x10 matrix to get
	# final matrix
	sub = row.map(element_wise_product).reduce(add_tuples)

	# sbaronia - writing formatted output to a file in 
	# a 10x10 matrix
	result = open(output, 'w')

	count = 0
	for i in range(len(sub)):
		result.write(str(sub[i]) + " ")
		count += 1
		if (count == 10):
			result.write("\n")
			count = 0

	result.close()
开发者ID:gitofsid,项目名称:MyBigDataCode,代码行数:31,代码来源:matrix_multiply.py


示例3: do_all

def do_all(f_path,out_name):
	sc = SparkContext()
	data = sc.textFile(f_path)

	data = data.map(parseKeepD).filter(lambda p: p[0] != None)

	# Scale Features
	features = data.map(lambda x: x[0].features)
	summary = Statistics.colStats(features)
	global means
	global varis
	means = summary.mean()
	varis = summary.variance()

	#scale the points
	data = data.map(lambda y: (conv_label_pt(y[0]),y[1]))

	#train model
	model = LinearRegressionWithSGD().train(data.map(lambda x: x[0]), intercept=True, regType='none')

	#calculate disparity
	disparity = data.map(lambda p: (p[0].label, model.predict(p[0].features), p[1]))  

	#calculate SSR for later
	ssr = disparity.map(lambda x: (x[0] - x[1])**2).sum()

	#keep N
	N = disparity.count()
	#shut down SC
	MSE = ssr/float(N)
	se = std_errors(data,MSE,N)
	disparity.saveAsTextFile(out_loc + out_name)

	sc.stop()
	return model.intercept,model.weights,se,disparity, ssr, N
开发者ID:ssz225,项目名称:bigdata_final,代码行数:35,代码来源:spark_reg_local.py


示例4: KMeansModel

def KMeansModel(dataPath, label, k, character, master):
    sc = SparkContext(master)
    data = sc.textFile(dataPath).map(lambda line: line.replace(character, ','))

    if label == 0:
        label_sum = data.map(lambda line: line.split(',')).map(lambda data: (float(data[0]), 1)).reduceByKey(add).collect()
        label = data.map(lambda line: line.split(',')).map(lambda data: float(data[0])).collect()        
        train_data = data.map(lambda line: line.split(',')).map(lambda x: map(lambda part: float(part), x[1:len(x)]))
    else:
        label_sum = data.map(lambda line: line.split(',')).map(lambda data: (float(data[-1]), 1)).reduceByKey(add).collect()
        label = data.map(lambda line: line.split(',')).map(lambda data: float(data[-1])).collect()        
        train_data = data.map(lambda line: line.split(',')).map(lambda x: map(lambda part: float(part) if part is not None else '', x[:len(x) - 1]))
    model = km.train(train_data, k)
    predict_data = train_data.collect()
    train = len(predict_data)
    acc = 0
    
    for i in range(len(label_sum)):
        ksum = np.zeros(k, dtype = int)
        cur_label = label_sum[i][0]
        for j in range(train):
            if label[j] == cur_label:
                ksum[model.predict(predict_data[j])] += 1
        acc += max(ksum)

    string = "KMeans Result: \n"
    center = model.centers
    for i in range(k):
        cur = str(i) + ":" + str(center[i]) + '\n'
        string += cur  
    string = string + "Acc: " + str((float(acc)/train) * 100) + "%"    
    sc.stop()
    return string
开发者ID:Tomlong,项目名称:MLlib-UI,代码行数:33,代码来源:mlKmeans.py


示例5: main

def main():
    cleanup()

    sc = SparkContext()
    spark = SparkSession(sc)
    path = os.path.join(mysql_export_dir, "name_string_indices.tsv")

    df = spark.read.csv(path, header=True, inferSchema=True, sep='\t', nullValue='NULL')

    names = df.select('name').rdd.map(lambda r: r['name'])
    names_json = parse_spark(sc, names) \
        .map(json.loads) \
        .zip(df.rdd)

    synonym_names = names_json.filter(lambda n: is_synonym(n))
    accepted_names = names_json.filter(lambda n: not is_synonym(n))

    synonym_names_with_accepted_columns = synonym_names \
        .map(to_key_value) \
        .leftOuterJoin(accepted_names.map(to_key_value)) \
        .map(add_accepted_data_to_synonym_name)
    accepted_names_with_accepted_columns = accepted_names \
        .map(add_accepted_data_accepted_name)
    sc.union([synonym_names_with_accepted_columns, accepted_names_with_accepted_columns]) \
        .map(join_fields) \
        .saveAsTextFile(output_dir_name_string_indices)
开发者ID:GlobalNamesArchitecture,项目名称:gnharvester,代码行数:26,代码来源:name_string_indices.py


示例6: stackexchange_json_spark_job

def stackexchange_json_spark_job():
    """
    Spark job to convert json data from hdfs into ques and ans.
    Result is written into elasticsearch for text based search from user.
    """
    server = bluebook_conf.HDFS_FQDN
    conf = SparkConf().setAppName("stackexchange_json_spark_job")
    spark_context = SparkContext(conf=conf)    
    json_ques_folder_address = "hdfs://" + server + "/" +\
                              bluebook_conf.STACKEXCHANGE_JSON_QUES_FOLDER_NAME +\
                              "/part-*"
    json_ans_folder_address = "hdfs://" + server + "/" +\
                              bluebook_conf.STACKEXCHANGE_JSON_ANS_FOLDER_NAME +\
                              "/part-*"
    
    # Ques and ans files are seperately read from hdfs
    ques_file = spark_context.textFile(json_ques_folder_address)
    ans_file = spark_context.textFile(json_ans_folder_address)
    ques_tups = ques_file.map(lambda line: stackexchange_json_mapper(line, 'ques'))
    ans_tups = ans_file.map(lambda line: stackexchange_json_mapper(line, 'ans'))

    # Join accepted answers with their respective questions
    ques_ans = ques_tups.join(ans_tups).map(lambda x: (x[0], {'ques': x[1][0], 'ans': x[1][1]}))
    ques_ans.saveAsNewAPIHadoopFile(
        path='-', 
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable", 
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
        conf=stackoverflow_es_write_conf)
开发者ID:nave91,项目名称:rebot,代码行数:29,代码来源:joiner_app.py


示例7: TestWordCounter

class TestWordCounter(unittest.TestCase):



	def setUp(self):
	   conf = SparkConf().setAppName("appTest").setMaster("local[*]")
	   self.sc = SparkContext(conf=conf)
	   self.counter = WordCounter() 

	def tearDown(self):
	   self.sc.stop()

	def test_when_exist_one_movie_and_counter(self):
	   movieList = ["1993::Toy Story Toy (1995)::Animation|Children's|Comedy",
	                "1993::ToyA StoryA ToyA (1995)::Animation|Children's|Comedy"]	 
	   result = (('ToyA', ['::ToyA StoryA ToyA (1995)::']),
	             ('Toy', ['::Toy Story Toy (1995)::']))                
	   movies = self.sc.parallelize(movieList)
 	   self.assertEqual(self.counter.getMaxValues(movies),result)   


 	def test_when_exist_one_movie_and_counter_moreMovies(self):
	   movieList = ["1993::Toy Story Toy (1995)::Animation|Children's|Comedy",
	                "1993::ToyA StoryB ToyA (1995)::Animation|Children's|Comedy",
	                "1993::ToyA StoryA ToyA (1995)::Animation|Children's|Comedy"]	 
	   result = (('ToyA', ['::ToyA StoryB ToyA (1995)::','::ToyA StoryA ToyA (1995)::']))                
	   movies = self.sc.parallelize(movieList)
 	   self.assertEqual(self.counter.getMaxValues(movies),result)   
开发者ID:cpedrero,项目名称:BigDataGroup,代码行数:28,代码来源:word_counter_tests.py


示例8: main

def main(name, divide):

    """
    old_g = pickle.load(open("/net/data/facebook/facebook-ucsb/Facebook_2008/"+name +"/original_pickles/"+name +".pickle", 'r'))
    new_g = networkx.Graph()
    for node, friends in old_g.adj.iteritems():
        if node not in new_g.nodes():
            new_g.add_node(node)
        for friend in friends.iterkeys():
            new_g.add_node(friend)
            new_g.add_edge(node, friend)
            """
    # serialize the networkx graph as text files of edgelist
    # into a text file for workers to read

    #   networkx.write_edgelist(new_g, "edgelist/"+name, data=False)
    #   subprocess.check_call("hdfs dfs -put edgelist/"+name+ " edgelist/", shell=True)

    new_g = networkx.read_adjlist(name + "_list.txt")  # Egypt_list is an edge list
    sc = SparkContext(appName="Sorted_removal")

    dataG = json_graph.node_link_data(new_g)
    stringG = json.dumps(dataG)
    originalG = sc.broadcast(stringG)
    edges = sc.textFile("hdfs://scrapper/user/xiaofeng/edgelist/" + name, 192 * 4 * int(divide))
    costs = edges.map(lambda line: line.split(" ")).map(lambda edge: edge_to_cost(edge, originalG.value))
    costs.saveAsTextFile("hdfs://scrapper/user/xiaofeng/costs_" + name)
    sc.stop()
    subprocess.check_call("hdfs dfs -get costs_" + name + " /home/xiaofeng/facebook/FacebookProject/costs/", shell=True)
    Reformat("/home/xiaofeng/facebook/FacebookProject/costs/costs_" + name + "/", name)
开发者ID:yangxiaoxiaoo,项目名称:cs281sec09,代码行数:30,代码来源:Simulation_sparkigraph.py


示例9: stackexchange_xml_spark_job

def stackexchange_xml_spark_job():
    server = bluebook_conf.HDFS_FQDN
    conf = SparkConf()

    xml_file_address = "hdfs://" + server + "/" +\
                       bluebook_conf.STACKEXCHANGE_XML_FOLDER_NAME +\
                       bluebook_conf.STACKEXCHANGE_XML_FILE_NAME
                         
    json_ques_folder_address = "hdfs://" + server + "/" +\
                               bluebook_conf.STACKEXCHANGE_JSON_QUES_FOLDER_NAME
    json_ans_folder_address = "hdfs://" + server + "/" +\
                              bluebook_conf.STACKEXCHANGE_JSON_ANS_FOLDER_NAME
        
    conf.setAppName('stackexchange_xml_spark_job')
    spark_context = SparkContext(conf=conf)
        
    file = spark_context.textFile(xml_file_address)

    # Ques and Ans files are stored seperately depending of their 'posttypeid'
    # Ques -> posttypeid == 1
    # Ans -> posttypeid == 2
    ques = file.map(stackexchange_xml_mapper)\
               .filter(lambda dic: 'posttypeid' in dic.keys())\
               .filter(lambda dic: dic['posttypeid'] == '1')\
               .map(lambda d: jsoner(d))
    ans = file.map(stackexchange_xml_mapper)\
               .filter(lambda dic: 'posttypeid' in dic.keys())\
               .filter(lambda dic: dic['posttypeid'] == '2')\
               .map(lambda d: jsoner(d))
    ques.saveAsTextFile(json_ques_folder_address)
    ans.saveAsTextFile(json_ans_folder_address)
开发者ID:nave91,项目名称:rebot,代码行数:31,代码来源:parser_app.py


示例10: count_triangles

def count_triangles(data, master="local[2]"):
    """
    @brief: Count triangles using Spark
    @param data: The data location for the input files
    @param master: The master URL as defined at
    https://spark.apache.org/docs/1.1.0/submitting-applications.html#master-urls
    """

    #################  NO EDITS HERE ###################
    assert not os.path.exists("triangles.out"), "File: triangles.out \
    already exists"
    sc = SparkContext(master, "Triangle Count")
    start = time()
    ###############  END NO EDITS HERE  ################
    # TODO: Your code goes here!
    people = sc.textFile(data)
    triad = people.flatMap(GetTriad).reduceByKey(add).filter(lambda x: x[1]>1)
    #triadCount = triad.map(lambda x: (x,1))
    #triadSum = triadCount.reduceByKey(add)
    #triangles = triadSum.filter(lambda x: x[1]>1)
    #output = triangles.collect()
    output = triad.collect()
    #triangles.saveAsTextFile("test1")
    #################  NO EDITS HERE  ###################
    print "\n\n*****************************************"
    print "\nTotal algorithm time: %.4f sec \n" % (time()-start)
    print "*****************************************\n\n""" 
    ###############  END NO EDITS HERE ################
    with open("triangles.out", "wb") as f:
        for friends in output:
            f.write(friends[0]+"\n") # TODO: Loop with f to write your result to file serially
        pass
开发者ID:dengdetian0603,项目名称:Parallel_Programming,代码行数:32,代码来源:triangle_count_old.py


示例11: bmRun

    def bmRun(self):
        """
        Connect DB from Spark and Run/Profile Query
        """
        #create output file for results
        print "Create benchmark output file for recoring..."
        file_out = open("/Users/mira67/Downloads/benchmark_output.txt", "w")
        print "start query evaluation, load tables from DB and register tables in Spark..."

        #load data with Spark
        with Timer() as tm:
            sc = SparkContext("local","penguin")
            #sc = SparkContext(master=local[2])
            sqlContext = SQLContext(sc)
             
            #queries test here, depends on queries to load table in memory
            df1 =sqlContext.read.jdbc(url=self.url, table = self.tbName[0],lowerBound = 0, upperBound = 350, numPartitions=200)#dbtable is variable
            df1.registerTempTable(self.tbName[0])

            df2 =sqlContext.read.jdbc(url=self.url, table = self.tbName[1],lowerBound = 0, upperBound = 350, numPartitions=200)#dbtable is variable
            df2.registerTempTable(self.tbName[1])

            #register helper functions for SQL
            sqlContext.registerFunction("MONTH", lambda x: x[5:7], StringType())#grab Month
            sqlContext.registerFunction("YEAR", lambda x: x[0:4], StringType())
            sqlContext.registerFunction("DAY", lambda x: x[8:10], StringType())

            rdf1 = sqlContext.sql("SELECT * FROM "+self.tbName[0])
            rdf2 = sqlContext.sql("SELECT * FROM " + self.tbName[1])
            sqlContext.registerDataFrameAsTable(rdf1, self.mtb[0])
            sqlContext.registerDataFrameAsTable(rdf2, self.mtb[1])

        mem_use = self.memory_usage_psutil()
        print "memory_use_load %s" %mem_use
        print "=> elasped load data: %s ms" % (tm.secs * 1000)

        #Query with Spark
        with Timer() as tm:
            #query
            rdf = sqlContext.sql(self.sqlStm)
#need register as table first
            print "Data schema from query:"
            rdf.printSchema()
            #hist of BT values
            #Todo
        mem_use = self.memory_usage_psutil()
        print "memory_use_load %s" %mem_use
        print "=> elasped: %s ms" % (tm.secs * 1000)

        file_out.write("Query Time %s Memory %s\n" % (str(tm.secs * 1000),str(mem_use))) 
                
        #example enabled
        day1 = sqlContext.sql("SELECT * FROM ssmi t1, map t2 WHERE t1.DATE BETWEEN '1990-01-01' AND '1990-01-01' AND t1.LOCID = t2.ID ORDER BY t1.LOCID")
        #call plot
        demoplt = qplt.queryPlot()
        demoplt.qMapDemo(day1)

        
        #stop sparkcontext
        sc.stop()
开发者ID:mira67,项目名称:condense_engine,代码行数:60,代码来源:penguin_main.py


示例12: main

def main(argv=None):
    '''this is called if run from command line'''

    parser = argparse.ArgumentParser()
    parser.add_argument('-i','--input', help="Seq input file on cluster.", required=True)
    parser.add_argument('-o','--output', help="UTF-8 output file on cluster.", required=False)
    parser.add_argument('-p','--printToLog', help="Print results to log.", required=False, action='store_true')
    args = parser.parse_args()

    sc = SparkContext()
    global goodJsonRecords, badJsonRecords
    goodJsonRecords = sc.accumulator(0)
    badJsonRecords = sc.accumulator(0)
    data = sc.sequenceFile(args.input, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text")
    tagCounts = data.values().flatMap(getTokens).countByValue()

    # So far, this code isn't useful.  The output fiile is written by the
    # master node into an isolated folder, and I don't know of a way to
    # retrieve it.
    if args.output != None:
        with codecs.open(args.output, 'wb', 'utf-8') as f:
            for k in sorted(tagCounts):
                f.write(k + " " + str(tagCounts[k]) + "\n")

    print "========================================"
    print "goodJsonRecords = %d" % goodJsonRecords.value
    print "badJsonRecords = %d" % badJsonRecords.value
    if args.printToLog:
        for k in sorted(tagCounts):
            print json.dumps(k), tagCounts[k]
    print "========================================"
开发者ID:cjsanjay,项目名称:dig-crf,代码行数:31,代码来源:countCrfResultTokens.py


示例13: main

def main(arglist):

    with open("log_file_v.txt", "a") as f:
        f.write("Start time of validation...... %s\n" % datetime.datetime.now())

    print("Start time of validation...... %s" % datetime.datetime.now())

    # mapreduce params
    output = arglist[0]
    minPartitions = int(arglist[1])

    # initialize
    sc = SparkContext(appName="PythonValidate")

    # rdd = sc.textFile(output_file_name, minPartitions=minPartitions)
    rdd = sc.wholeTextFiles(output, minPartitions=minPartitions)
    print('partitions', rdd.getNumPartitions())
    error_count = rdd.mapPartitions(separateBlocks).sum()

    sc.stop()

    print("End time of validation...... %s" % datetime.datetime.now())
    with open("log_file_v.txt", "a") as f:
        f.write("End time of validation...... %s\n" % datetime.datetime.now())
        f.write("Error count of sorted file...... %s" % error_count)

    f.close()
开发者ID:1enemyleft,项目名称:Hadoop-Sort,代码行数:27,代码来源:validate.py


示例14: __init__

    def __init__(self, file_path, train_file, test_file, real_file=None):
        """
        file_path: the folder where data files reside
        train_file: (user, item, rating) quote records
        test_file: (user, item) records, preferences to be predicted
        real_file: (user, option, value) real purchase records, can be none if it doesn't exist
        For this specific project:
        item here is the combination of options with their values,
            e.g. item 10 denotes option A with choice 0; item 21 denotes option B with choice 1
        rating is the number of quotes for a certain item by a user
        """
        self.file_path = file_path
        config = SparkConf().setMaster("local").setAppName("Kaggle")\
            .set("spark.executor.memory", "2g")\
            .set("spark.storage.memoryFraction", "1")

        sc = SparkContext(conf=config)

        self.train_data = sc.textFile("file:" + self.file_path + train_file).cache()\
            .map(lambda line: array([float(x) for x in line.split(',')]))

        self.test_data = sc.textFile("file:" + self.file_path + test_file).cache()\
            .map(lambda line: [float(x) for x in line.split(',')])

        if real_file:
            self.real_data = sc.textFile("file:" + self.file_path + real_file).cache()\
                .map(lambda line: [float(x) for x in line.split(',')]).map(lambda r: ((r[0], r[1]), r[2]))
开发者ID:farcryzry,项目名称:spala,代码行数:27,代码来源:MatrixFactorization.py


示例15: run

def run():
#if __name__ == "__main__":
    sc = SparkContext(master = spark_addr, appName= app_name)
    rdd = sc.textFile(hdfs_addr + file_path, 2).map(lambda line:format_list(line)).cache()
#    rdd = sc.parallelize(test_list,4).cache()
#********create rules************
    supp = float(rdd.count())*supp_rate
    item = create_item(rdd)   #create one item
    item = freq(rdd,item,supp)
    one_item = item
    freq_items = item
    while item.count() > 0:
        more_item = item_plus(sc,item)
        item = freq(rdd,more_item,supp)
        freq_items = freq_items.union(item)

    #result freq_items is key_value,key's type is frozenset
  #  rules = produce_rule(freq_items,one_item)
#    rule_result = rules.collect()
    freq_result = freq_items.collect()
#    one_result = one_item.keys().collect()
    one_result = one_item.keys().collect()
    dict_rule = produce_rule(freq_result,one_result)
    out,total = probability(rdd,dict_rule,0.5)
    out1 =out.collect()
    print "$$$$$$$$$$$$$$$$$$$$$$$out=",out1,"all=",total

#****************************
    
    sc.stop()
    return freq_result,dict_rule
开发者ID:optimus2014,项目名称:pyspark,代码行数:31,代码来源:ap1.py


示例16: solve_puzzle

def solve_puzzle(master, output, height, width, slaves):
    global HEIGHT, WIDTH, level
    HEIGHT=height
    WIDTH=width
    level = 0

    sc = SparkContext(master, "python")

    """ YOUR CODE HERE """
    """ YOUR MAP REDUCE PROCESSING CODE HERE """
    solution=Sliding.solution(WIDTH, HEIGHT)
    sol = Sliding.board_to_hash(WIDTH, HEIGHT, solution)
    data = sc.parallelize([(sol,level),])
    counter = 0
    curLen = 1 
    while(counter < curLen):
        level += 1
        data = data.flatMap(bfs_flat_map)
        

        if (level% 12 == 0):
            data = data.partitionBy(PARTITION_COUNT)
        data = data.reduceByKey(bfs_reduce)
        if (level% 6 == 0):
            counter = curLen
            curLen = data.count()
        
        
    """ YOUR OUTPUT CODE HERE """
    data.coalesce(slaves).saveAsTextFile(output)
    sc.stop()
开发者ID:VictoriaSnow,项目名称:CS-Projects,代码行数:31,代码来源:SlidingBfsSpark.py


示例17: recom

def recom(matrix_file_name, user_file_name, output="re.out"):
    sc = SparkContext("local[8]", "Recommendation")
    """ Reads in a sequence file FILE_NAME to be manipulated """
    matrix = sc.sequenceFile(matrix_file_name)
    user = sc.sequenceFile(user_file_name)

    """
    - flatMap takes in a function that will take one input and outputs 0 or more
      items
    - map takes in a function that will take one input and outputs a single item
    - reduceByKey takes in a function, groups the dataset by keys and aggregates
      the values of each key
    """
    user_tuples = user.flatMap(flat_user) \
                 .map(map_user) \
                 .sortByKey(keyfunc=lambda k: int(k))

    keys = user_tuples.keys().collect()

    matrix_tuples = matrix.flatMap(flat_matrix) \
                          .map(map_matrix) \
                          .filter(lambda x: x[0] in keys)
    global mt 
    mt = matrix_tuples.collectAsMap()

    recm = user_tuples.flatMap(flat_recom) \
                      .reduceByKey(reduce_recom) \
                      .filter(lambda x: x[0] not in keys) \
                      .sortBy(lambda (key, value): int(value))
 
    """ Takes the dataset stored in counts and writes everything out to OUTPUT """
    recm.coalesce(1).saveAsTextFile(output)
开发者ID:ShenghanGao,项目名称:Temp,代码行数:32,代码来源:recom.py


示例18: main

def main():
    conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
    sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)

    def run():
        rdd = sc.parallelize(range(10), 10).map(delayed(2))
        reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
        return reduced.map(delayed(2)).collect()

    result = call_in_background(run)
    status = sc.statusTracker()
    while result.empty():
        ids = status.getJobIdsForGroup()
        for id in ids:
            job = status.getJobInfo(id)
            print "Job", id, "status: ", job.status
            for sid in job.stageIds:
                info = status.getStageInfo(sid)
                if info:
                    print "Stage %d: %d tasks total (%d active, %d complete)" % \
                          (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks)
        time.sleep(1)

    print "Job results are:", result.get()
    sc.stop()
开发者ID:Amir-Github,项目名称:spark,代码行数:25,代码来源:status_api_demo.py


示例19: main

def main():

	
	input = sys.argv[1]
	output = sys.argv[2]
	
	
	conf = SparkConf().setAppName('Matrix Multiplication')
	sc = SparkContext(conf=conf)
	assert sc.version >= '1.5.1'
	
	row = sc.textFile(input).map(lambda row : row.split(' ')).cache()
	ncol = len(row.take(1)[0])
	intermediateResult = row.map(permutation).reduce(add_tuples)
	
	outputFile = open(output, 'w') 
	

	
	
	
	result = [intermediateResult[x:x+3] for x in range(0, len(intermediateResult), ncol)]
	
	
	for row in result:
		for element in row:
			outputFile.write(str(element) + ' ')
		outputFile.write('\n')
		
	outputFile.close()
开发者ID:Abhishek-Arora,项目名称:Scalable-Matrix-Multiplication-on-Apache-Spark,代码行数:30,代码来源:matrix_multiply.py


示例20: solve_puzzle

def solve_puzzle(master, output, height, width, slaves):
    global HEIGHT, WIDTH, level
    HEIGHT=height
    WIDTH=width
    level = 0

    sc = SparkContext(master, "python")

    """ YOUR CODE HERE """
    NUM_WORKERS = slaves

    sol = Sliding.solution(WIDTH, HEIGHT)
    """ MAP REDUCE PROCESSING CODE HERE """
    level_pos = sc.parallelize((make_state(level, sol),))
    prev_size, size = 0, 1

    while prev_size != size:
        level += 1
        if level % 10 == 0:
            level_pos = level_pos.partitionBy(PARTITION_COUNT)
        level_pos = level_pos.flatMap(bfs_flat_map).reduceByKey(bfs_reduce)
        prev_size = size
        size = level_pos.count()

    """ OUTPUT CODE HERE """
    level_pos = level_pos.map(unhash_board)
    level_pos.coalesce(NUM_WORKERS).saveAsTextFile(output)

    sc.stop()
开发者ID:hansongcal,项目名称:CS61C,代码行数:29,代码来源:SlidingBfsSpark.py



注:本文中的pyspark.SparkContext类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pyspark.SparkFiles类代码示例发布时间:2022-05-26
下一篇:
Python pyspark.SparkConf类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap