• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sql.HiveContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.sql.HiveContext的典型用法代码示例。如果您正苦于以下问题:Python HiveContext类的具体用法?Python HiveContext怎么用?Python HiveContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了HiveContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: table_schema_from_spark

def table_schema_from_spark(hcat_table_name):
    #returns schema of table with this database.name in hcatalog
    #   (spark-workaround as long as hcatweb api is not available...)
    # initialize spark
    import findspark
    findspark.init()
     
    import pyspark
    from pyspark.sql import HiveContext
    
    sc_conf = pyspark.SparkConf()
    #sc_conf.set('spark.executor.extraClassPath','/opt/cloudera/parcels/CDH/lib/hive/lib/*')
    #sc_conf.set('spark.master','yarn-client')
    
    sc = pyspark.SparkContext(appName = 'ade_get_table_schema', conf=sc_conf)
    hc = HiveContext(sc)
    
    hive_schema = hc.table(hcat_table_name).schema.jsonValue()
    
    print hive_schema
    
    sc.stop()
    
    table_schema = {'columns':{}}
    
    col_sequence = 0
    for field in hive_schema['fields']:
        table_schema['columns'][field['name']] = {'col_sequence': col_sequence, 'type':field['type']}
        col_sequence += 1
    
    return table_schema
开发者ID:heuvel,项目名称:den,代码行数:31,代码来源:den_hadoop.py


示例2: get_context_test

def get_context_test():
    conf = SparkConf()
    sc = SparkContext('local[1]', conf=conf)
    sql_context = HiveContext(sc)
    sql_context.sql("""use fex_test""")
    sql_context.setConf("spark.sql.shuffle.partitions", "1")
    return sc, sql_context
开发者ID:hongbin0908,项目名称:bintrade,代码行数:7,代码来源:index.py


示例3: get_context

def get_context():
    conf = SparkConf()
    conf.set("spark.executor.instances", "4")
    conf.set("spark.executor.cores", "4")
    conf.set("spark.executor.memory", "8g")
    sc = SparkContext(appName="__file__", conf=conf)
    sql_context = HiveContext(sc)
    sql_context.sql("""use fex""")
    sql_context.setConf("spark.sql.shuffle.partitions", "32")
    return sc, sql_context
开发者ID:hongbin0908,项目名称:bintrade,代码行数:10,代码来源:index.py


示例4: main

def main():
    sc = SparkContext()
    hc = HiveContext(sc)

    df = hc.sql("""{{sql}}""")
    df_writer = DataFrameWriter(df)
    df_writer.saveAsTable(name='{{tableName}}',
                          format='json',
                          mode='overwrite',
                          path='s3://data/{{tableName}}')
开发者ID:tgknight,项目名称:aws-sdk-hands-on,代码行数:10,代码来源:job.py


示例5: gen_report_table

 def gen_report_table(hc,curUnixDay):
     rows_indoor=sc.textFile("/data/indoor/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),seconds=int(p[4]),utoday=int(p[5]),ufirstday=int(p[6])))
     HiveContext.createDataFrame(hc,rows_indoor).registerTempTable("df_indoor")
     #ClientMac|etime|ltime|seconds|utoday|ENTITYID|UFIRSTDAY 
     sql="select entityid,clientmac,utoday,UFIRSTDAY,seconds,"
     sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  2505600 preceding) as day_30," # 2505600 is 29 days
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  518400 preceding)  as day_7," #518400 is 6 days
     sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY  range 1 preceding) as pre_mon "
     sql=sql+"from df_indoor order by entityid,clientmac,utoday" 
     df_id_stat=hc.sql(sql)
     df_id_mm=df_id_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
     #df_id_mm df_min_max ,to caculate firtarrival and last arrival 
     df_id_stat_distinct=df_id_stat.drop("seconds").drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
     #distinct df is for lag function to work
     df_id_prepremon=df_id_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
     
     cond_id = [df_id_mm.clientmac == df_id_prepremon.clientmac, df_id_mm.entityid == df_id_prepremon.entityid, df_id_mm.UFIRSTDAY==df_id_prepremon.UFIRSTDAY]
     df_indoor_fin_tmp=df_id_mm.join(df_id_prepremon, cond_id, 'outer').select(df_id_mm.entityid,df_id_mm.clientmac,df_id_mm.utoday,df_id_mm.UFIRSTDAY,df_id_mm.seconds,df_id_mm.day_30,df_id_mm.day_7,df_id_mm.min,df_id_mm.max,df_id_mm.total_cnt,df_id_prepremon.prepre_mon)
     df_indoor_fin_tmp=df_indoor_fin_tmp.selectExpr("entityid as entityid","clientmac as  clientmac","utoday as utoday","UFIRSTDAY as ufirstday","seconds as secondsbyday","day_30 as indoors30","day_7 as indoors7","min as FirstIndoor","max as LastIndoor","total_cnt as indoors","prepre_mon as indoorsPrevMonth")
     
     #newly added part for indoors7 and indoors30 based on current date
     df_indoor_fin_tmp1= df_indoor_fin_tmp.withColumn("r_day_7", func.when((curUnixDay- df_indoor_fin_tmp.utoday)/86400<7 , 1).otherwise(0))
     df_indoor_fin_tmp2=df_indoor_fin_tmp1.withColumn("r_day_30", func.when((curUnixDay- df_indoor_fin_tmp1.utoday)/86400<30 , 1).otherwise(0))
     df_indoor_fin_tmp3=df_indoor_fin_tmp2.withColumn("r_indoors7",func.sum("r_day_7").over(Window.partitionBy("entityid","clientmac")))
     df_indoor_fin_tmp4=df_indoor_fin_tmp3.withColumn("r_indoors30",func.sum("r_day_30").over(Window.partitionBy("entityid","clientmac")))
     df_indoor_fin=df_indoor_fin_tmp4.drop("r_day_7").drop("r_day_30")
     hc.sql("drop table if exists df_indoor_fin")
     df_indoor_fin.write.saveAsTable("df_indoor_fin")
     
     rows_flow=sc.textFile("/data/flow/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),utoday=int(p[4]),ufirstday=int(p[5])))
     HiveContext.createDataFrame(hc,rows_flow).registerTempTable("df_flow")
     
     # ClientMac|ENTITYID|UFIRSTDAY|etime|ltime|utoday
     sql="select entityid,clientmac,utoday,UFIRSTDAY,"
     sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  2505600 preceding) as day_30," # 2505600 is 29 days
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  518400 preceding)  as day_7," #518400 is 6 days
     sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY  range 1 preceding) as pre_mon "
     sql=sql+"from df_flow order by entityid,clientmac,utoday" 
     df_fl_stat=hc.sql(sql)
     df_fl_mm=df_fl_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
     #df_fl_mm df_min_max ,to caculate firtarrival and last arrival 
     df_fl_stat_distinct=df_fl_stat.drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
     #distinct df is for lag function to work
     df_fl_prepremon=df_fl_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
     
     cond_fl = [df_fl_mm.clientmac == df_fl_prepremon.clientmac, df_fl_mm.entityid == df_fl_prepremon.entityid, df_fl_mm.UFIRSTDAY==df_fl_prepremon.UFIRSTDAY]
     df_flow_fin=df_fl_mm.join(df_fl_prepremon, cond_fl, 'outer').select(df_fl_mm.entityid,df_fl_mm.clientmac,df_fl_mm.utoday,df_fl_mm.UFIRSTDAY,df_fl_mm.day_30,df_fl_mm.day_7,df_fl_mm.min,df_fl_mm.max,df_fl_mm.total_cnt,df_fl_prepremon.prepre_mon)
     df_flow_fin=df_flow_fin.selectExpr("entityid as entityid","clientmac as  clientmac","utoday as utoday","UFIRSTDAY as ufirstday","day_30 as visits30","day_7 as visits7","min as FirstVisit","max as LastVisit","total_cnt as visits","prepre_mon as visitsPrevMonth")
     hc.sql("drop table if exists df_flow_fin")
     df_flow_fin.write.saveAsTable("df_flow_fin") 
开发者ID:dalinqin,项目名称:src,代码行数:52,代码来源:main_report.py


示例6: query12_no

def query12_no(query_name, conf=None):
    sc = SparkContext(conf=conf)
    sqlContext = HiveContext(sc)

    # SQL statements can be run by using the sql methods provided by sqlContext
    sql = "use tpcds_text_db_1_50"
    _ = sqlContext.sql(sql)

    output = execute_sql(query_name, sqlContext)
    output['describe'] = output['output'].describe().show()

    sc.stop()
    return output
开发者ID:fmacias64,项目名称:big-data-system,代码行数:13,代码来源:question2_pyspark.py


示例7: query12_input

def query12_input(query_name, conf=None, output_persist=False):
    sc = SparkContext(conf=conf)
    sqlContext = HiveContext(sc)

    # SQL statements can be run by using the sql methods provided by sqlContext
    sql = "use tpcds_text_db_1_50"
    _ = sqlContext.sql(sql)

#    web_sales_sql = "select * from web_sales"
#    web_sales = sqlContext.sql(web_sales_sql)
#    web_sales.persist()
#    web_sales.registerAsTable("web_sales")
#    item_sql = "select * from item"
#    item = sqlContext.sql(item_sql)
#    item.persist()
#    item.registerAsTable("item")
#    date_dim_sql = "select * from date_dim"
#    date_dim = sqlContext.sql(date_dim_sql)
#    date_dim.persist()
#    date_dim.registerAsTable("date_dim")
    sqlContext.cacheTable("web_sales")
    sqlContext.cacheTable("item")
    sqlContext.cacheTable("date_dim")

    # discard the first query
    output = execute_sql(query_name, sqlContext, output_persist)
    # check the re-run statistics
    output = execute_sql(query_name, sqlContext)
    output['describe'] = output['output'].describe().show()

    sc.stop()
    return output
开发者ID:fmacias64,项目名称:big-data-system,代码行数:32,代码来源:question2_pyspark.py


示例8: _test

def _test():
    import doctest
    import os
    import tempfile
    from pyspark.context import SparkContext
    from pyspark.sql import Row, SQLContext, HiveContext
    import pyspark.sql.readwriter

    os.chdir(os.environ["SPARK_HOME"])

    globs = pyspark.sql.readwriter.__dict__.copy()
    sc = SparkContext('local[4]', 'PythonTest')

    globs['tempfile'] = tempfile
    globs['os'] = os
    globs['sc'] = sc
    globs['sqlContext'] = SQLContext(sc)
    globs['hiveContext'] = HiveContext._createForTesting(sc)
    globs['df'] = \
        globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming')

    (failure_count, test_count) = doctest.testmod(
        pyspark.sql.readwriter, globs=globs,
        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
    globs['sc'].stop()
    if failure_count:
        exit(-1)
开发者ID:15652101501,项目名称:spark,代码行数:27,代码来源:streaming.py


示例9: run

    def run(self):
	sc = SparkContext("local", "Course Activity")
	#sqlHC is the SQLHiveContext        
	sqlHC = HiveContext(sc)
	
	lines=sqlHC.sql(""" select courseName,lmsUserId,createDateTime,
		            eventType,eventName,eventNo from logdata where 
			    eventType not in ('enrollment','instructor','admin') 
			    and lmsUserId is not NULL 
   			    and courseName is not NULL 
			    and eventNo is not NULL limit 10""")


	maplvl1=lines.flatMap(lambda p: mapp(p[0],str(p[1]),p[2].strftime('%Y-%m-%d'),p[4]))
	reduceRDD=maplvl1.reduceByKey(lambda a,b : a+b)
	with self.output().open('w') as out_file:
		for line in reduceRDD.collect():
        		out_file.write(line[0][0]+"\x01"+line[0][1]+"\x01"+line[0][2]+"\x01"+line[0][3]+"\x01"+str(line[1])+"\n")
开发者ID:Zarana-Parekh,项目名称:analytics,代码行数:18,代码来源:course_activity.py


示例10: __init__

 def __init__(self, sc, debug=False):
     self.export_path = os.environ['COOPERHEWITT_ROOT'] + "/export/"
     self.sc = sc
     # hive requires writable permissions: ~/ephemeral-hdfs/bin/hadoop fs -chmod 777 /tmp/hive
     self.hive_cxt = HiveContext(sc)
     self.sql_cxt  = SQLContext(sc)
     if debug:
         print "{0}\n{1}\n{2}\n".format(sc.master, self.hive_cxt, self.sql_cxt)
         print sc._conf.getAll()
开发者ID:akamlani,项目名称:cooperhewitt,代码行数:9,代码来源:ch_spark.py


示例11: ch9_sql

def ch9_sql():
    # Import Spark SQL
    from pyspark.sql import HiveContext, Row
    # Or if you can't include the hive requirements 
    from pyspark.sql import SQLContext, Row

    hiveCtx = HiveContext(sc)

    input_file = hiveCtx.read.json("testweet.json")
    # Register the input_file schema RDD 
    input_file.registerTempTable("tweets")
    # Select tweets based on the retweetCount
    topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM
      tweets ORDER BY retweetCount LIMIT 10""")

    topTweetText = topTweets.map(lambda row: row.text)  
    topTweetText.collect()

    topTweets.schema
    hiveCtx.cacheTable("tweets")
开发者ID:jichen3000,项目名称:codes,代码行数:20,代码来源:interactive.py


示例12: run

def run(inpath, outpath, mode='append'):
    
    gc.disable()
    print("===== Checking if Log Exists =====")
    check_log(inpath)
    print("===== Pass Log Checking =====")
    
    # initial SparkContext
    conf = SparkConf().setAppName("Forgate Log Parser")
    sc = SparkContext(conf=conf)
    sqlCtx = HiveContext(sc)
    start_time = time.time()
    print("===== INPUT FILE PATH: %s =====" % (str(inpath)))
    print("===== OUTPUT FILE PATH: %s =====" % (str(outpath)))
    print("===== %s Reading Data From HDFS" % (now()))
    distFile = sc.textFile(inpath)
    cnt_raw = distFile.count()
    print("===== Count of Input Data: %s =====" % (str(cnt_raw)))
    
    print("===== %s Parsing Data" % (now()))
    parsedData = parse_data(sc, distFile)
    print("===== Count of Parsed Data: %s =====" % (str(parsedData.count())))
    
    print("===== %s Saving Data" % (now()))
    jsonData = sqlCtx.jsonRDD(parsedData)
    old_col=['time','date']
    new_col=['time_','dt']
    jsonData = rename_column(jsonData, old_col, new_col)
    jsonData.write.partitionBy('dt').parquet(outpath, mode=mode)
    
    print("===== %s Checking Data" % (now()))
    confirm_row(sqlCtx, outpath)
    write_log(inpath)
    print("---Total took %s seconds ---" % (time.time() - start_time))
    
    sc.stop()
    gc.enable()
开发者ID:bryanyang0528,项目名称:fortigate-log-parser-in-spark,代码行数:37,代码来源:logparser_spark.py


示例13: float

                w.blowing_out = self.isBlowingOut(w.windBearing, float(game.center_azimuth))
                try:
                    w.ozone = val.ozone
                except Exception:
                    w.ozone = self.domeVal['ozone']
                print "w=", w
            updates.append(w)

        self.saveWeather(updates)
        print "updates=", updates

if __name__ == '__main__':
    print "Starting.", datetime.now()
    sc = SparkContext()
    #sqlContext = SQLContext(sc)
    sqlContext = HiveContext(sc)

    games = sqlContext.read.parquet(CreateStatsRDD.rddDir + "/" + Games.table_name + ".parquet")
    games.registerTempTable("games")
    games.cache()
    print "games=", games
    print games.take(2)

    stadium = sqlContext.load(source="com.databricks.spark.csv", header="true", path = SRCDIR + "/stadium.csv")
    stadium.registerTempTable("stadium")
    stadium.cache()
    print "stadium=", stadium.take(2)
    
    weather = UpdateWeather(sc, sqlContext, games, stadium)
    weather.update()
开发者ID:xiaokekehaha,项目名称:mlb_stats_spark,代码行数:30,代码来源:UpdateWeather.py


示例14: print

	
	print(""" 
		Error: This program takes 2 arguments
		Usage: bin/spark-submit --master <spark-master> nrem.py <input dir> <output dir>
	""")
	sys.exit(1)

#Parallelism
partitions = 1
#Output Directories
matched_output = os.path.join(sys.argv[2],"matched")
eliminated_output = os.path.join(sys.argv[2], "eliminated")

conf = SparkConf().setAppName("Non Redundant Entity Matching")
sc = SparkContext(conf=conf)
sqlCtx = HiveContext(sc)

def attr_key(l):
	"""
		[obj, attr1, attr2, attr3 ...] -> [(attr1, obj), (attr2, obj), (attr3, obj) ...]
	"""
	a = []
	for attr in l[1:]:
		a.append((attr, l[0]))
	return a

"""
	Assuming input file(s) to be tsv, and first field to be object and rest of the fields as attributes 
"""
#Read input
inRDD = sc.textFile(sys.argv[1], partitions)
开发者ID:ParineethaT,项目名称:BlackBox,代码行数:30,代码来源:nrem.py


示例15: SparkConf

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

# Initialize Spark
SparkContext.setSystemProperty("spark.executor.memory", "4g")
conf = SparkConf()
conf.set("spark.executor.instances", 20)
sc = SparkContext("yarn-client", "kdd99", conf=conf)
hc = HiveContext(sc)

kdd = hc.table("kdd99")

(trainData, testData) = kdd.randomSplit([0.7, 0.3], seed=42)
trainData.cache()
services = trainData.withColumnRenamed("service", "srvc").select("srvc").distinct()
testData = testData.join(services, testData.service == services.srvc)
# filter out any rows with a service not trained upon
testData.cache()

print "training set has " + str(trainData.count()) + " instances"
print "test set has " + str(testData.count()) + " instances"

# Build model
inx1 = StringIndexer(inputCol="protocol", outputCol="protocol-cat")
inx2 = StringIndexer(inputCol="service", outputCol="service-cat")
inx3 = StringIndexer(inputCol="flag", outputCol="flag-cat")
inx4 = StringIndexer(inputCol="is_anomaly", outputCol="label")
ohe2 = OneHotEncoder(inputCol="service-cat", outputCol="service-ohe")
开发者ID:ofermend,项目名称:data-science-with-hadoop-book,代码行数:31,代码来源:anomaly.py


示例16: len

# A simple hive demo. If you do not have a table to load from look run MakeHiveTable.py
from pyspark import SparkContext
from pyspark.sql import HiveContext
import json
import sys

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print "Error usage: LoadHive [sparkmaster] [inputtable]"
        sys.exit(-1)
    master = sys.argv[1]
    inputTable = sys.argv[2]
    sc = SparkContext(master, "LoadHive")
    hiveCtx = HiveContext(sc)
    # Query hive
    input = hiveCtx.hql("FROM " + inputTable + " SELECT key, value")
    data = input.map(lambda x: x['key'] * x['key'])
    result = data.collect()
    for element in result:
        print "Got data " + str(element)
    sc.stop()
    print "Done!"
开发者ID:IChocolateKapa,项目名称:learning-spark,代码行数:22,代码来源:LoadHive.py


示例17: SparkContext

    return str_time




if __name__ == '__main__':
    if (len(sys.argv) != 1):
        print "Usage: spark-submit <Python Code File>"
        sys.exit(1)

    #App name which shows up in the Spark UI
    sc = SparkContext(appName='User Recommendation')


    #Context provides connection to Hive metastore
    sqlContext = HiveContext(sc)
    

    '''
    Pulling data out of Hive.  I created a relication of 'watson_bisum_purchases' table locally to test.
    '''
    
    rdd = sqlContext.sql("SELECT person_id,deal_id,aasm_state FROM watson_bisum_purchases")


    '''
    Creating datasets.  Formating the data and also creating sample datasets in order to create and test the model. 
    '''
    
    #Formating all the data using the 'parse_rating' method above
    all_data = rdd.map(parse_rating)
开发者ID:rodyou,项目名称:Spark_Examples,代码行数:31,代码来源:collaborative_filtering.py


示例18: SparkContext

"""transform.py"""

from pyspark import SparkContext
sc = SparkContext("local", "Transform")
from pyspark.sql import HiveContext

from pyspark.sql.types import *
import numpy as np

sqlContext = HiveContext(sc)

##########
# Util functions

def saveAsHiveTable(df, name):
    # Save it this way so that native HIVE can read the table as well
    df.registerTempTable("df")
    sqlContext.sql("drop table if exists " + name)
    sqlContext.sql("CREATE TABLE " + name + " AS SELECT * FROM df")

def getFloat(s):
    if s is None:
        return None
    try:
        f = float(s)
        return f
    except ValueError:
        return None

##########
开发者ID:patng323,项目名称:w205-ex1,代码行数:30,代码来源:transform.py


示例19: main

def main():
    # set up the logger
    logging.basicConfig(filename=os.path.join(config.mrqos_logging, 'mpg_cluster.log'),
                            level=logging.INFO,
                            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                            datefmt='%m/%d/%Y %H:%M:%S')
    logger = logging.getLogger(__name__)

    # NSJOIN dayidx # only partitioned by DAY
    day_idx = beeline.get_last_partitions('mapper.nsjoin').split('=')[1]
    # BAREBONES dayidx # only partitioned by DAY
    day_bb = [x for x in beeline.show_partitions('mapper.barebones').split('\n') if '=%s' % (day_idx) in x]
    # MAPPOINTS dayidx # partitioned by DAY and UUID (pick the last uuid)
    mappoints_data = sorted([x for x in beeline.show_partitions('mapper.mappoints').split('\n') if '=%s' % (day_idx) in x])[-1].split('/')
    [day_mps, uuid_idx] = [x.split('=')[1] for x in mappoints_data]

    if day_idx != day_mps:
        logger.error('mapper.mappoints and mapper.nsjoin different day, possible data missing in the source.')
        return

    if len(day_bb) == 0:
        logger.warning('mapper.barebone data missing for this particular day.')
        #return

    logger.info('Processing data in day=%s, uuid=%s' % (day_idx, uuid_idx))

    logger.info('begin spark process.')
    getting_mappoint_data = ''' select b1.mpgid mpgid, b1.lat lat, b1.lon lon, b1.country country, b1.mpgload mpgload, b1.allowed_private_regions allowed_private_regions, b2.asnum asnum, b2.ip ip from (select mpgid, lat, lon, country, mpgload, allowed_private_regions from mapper.mappoints where day=%s and uuid="%s" and lat is not NULL and lon is not NULL and ghostonly=0 ) b1 left outer join (select collect_set(ns_ip) ip, collect_set(asnum) asnum, mpgid from (select ns_ip, mpd_uuid, mpgid, asnum, demand, day from mapper.nsjoin where day=%s and mpd_uuid="%s" and demand>0.01 order by demand desc) a group by mpgid) b2 on b2.mpgid=b1.mpgid ''' % (day_idx, uuid_idx, day_idx, uuid_idx)
    geo_total_cap_query = ''' select * from (select country, network, sum(peak_bitcap_mbps) peak_bitcap_mbps, sum(peak_flitcap_mfps) peak_flitcap_mfps, sum(numvips) numvips from mapper.regioncapday where day=%s and network in ('freeflow', 'essl') and prp='private' group by country, network) a ''' % day_idx
    geo_total_cap_public_query = ''' select * from (select country, network, sum(peak_bitcap_mbps) peak_bitcap_mbps, sum(peak_flitcap_mfps) peak_flitcap_mfps, sum(numvips) numvips from mapper.regioncapday where day=%s and network in ('freeflow', 'essl') and prp='public' group by country, network) a ''' % day_idx

    sc = SparkContext()
    hiveCtx = HiveContext(sc)

    rows = hiveCtx.sql(getting_mappoint_data)

    regInfoRows = hiveCtx.sql('select * from mapper.regioncapday where day=%s and peak_bitcap_mbps is not null and peak_flitcap_mfps is not null' % (day_idx))
    geo_total_cap = hiveCtx.sql(geo_total_cap_query)
    geo_total_cap_p = hiveCtx.sql(geo_total_cap_public_query)


    # rdd format: [regionid, [mpgid, mpg-lat, mpg-lon, mpg-country, mpg-load, mpg-asnum, mpg-nsip]]
    region_mpginfo_pair = rows.map(lambda x: [[x.mpgid,
                                               x.lat,
                                               x.lon,
                                               x.country,
                                               x.mpgload,
                                               x.asnum,
                                               x.ip], x.allowed_private_regions])\
                                .flatMapValues(lambda x: x).map(lambda x: [x[1], x[0]])

    #region_mpginfo_pair.first()

    # rdd format: [regionid, [reg-lat, reg-lon, reg-capacity(bit mbps), reg-capacity(bit mfps), reg-country, reg-numvips, reg-service, reg-prp]]
    # ps. prp=1: private, prp=0: public
    region_latlon = regInfoRows.map(lambda x: [x.region, [x.latitude,
                                                          x.longitude,
                                                          x.peak_bitcap_mbps,
                                                          x.peak_flitcap_mfps,
                                                          x.country,
                                                          x.numvips,
                                                          'W' if x.network=='freeflow' else ('S' if x.network=='essl' else 'O'),
                                                          1 if x.prp=='private' else 0]])\
                                .filter(lambda x: x[1][6]=='W' or x[1][6]=='S')

    region_public_list = region_latlon\
        .filter(lambda x: x[1][7] == 0)\
        .map(lambda x: ('all', [[x[0]]]))\
        .reduceByKey(lambda a, b: [a[0]+b[0]])\
        .map(lambda x: x[1][0]).collect()

    region_public_list = [0] + sorted(region_public_list[0])

    # dummy region
    rdd2 = sc.parallelize([([0, [0, 0, 0.0, 0.0, 'US', 0, 'W', 1]])])
    region_latlon = region_latlon.union(rdd2)

    # perform the join into tuple of (K, (V1, V2):
    # (regionid, ([mpgid, mpg-lat, mpg-lon, mpg-country, mpg-load], [reg-lat, reg-lon, reg-cap, reg-country, reg-numvips, reg-service]))
    # rdd  = (mpgid, regionid, [lat1, lon1, lat2, lon2, distance],
    #               reg-cap-bit(gbps), reg-cap-flit(gbps), reg-country, reg-numvips, reg-services,
    #               mpg-country, mpg-load, mpg-asnum, mpg-nsip,
    #               mpg-lat, mpg-lon)
    mpgid_reg_geo = region_mpginfo_pair.join(region_latlon).map(lambda x: [x[1][0][0],
                                                                           x[0],
                                                                           geodesic_distance(x[1][0][1],
                                                                                             x[1][0][2],
                                                                                             x[1][1][0],
                                                                                             x[1][1][1]),
                                                                           round(float(x[1][1][2])/1000.0, 3),
                                                                           round(float(x[1][1][3])/1000.0, 3),
                                                                           x[1][1][4], # reg-country
                                                                           x[1][1][5], # reg-numvips
                                                                           x[1][1][6], # reg-services
                                                                           x[1][0][3],
                                                                           x[1][0][4],
                                                                           x[1][0][5],
                                                                           x[1][0][6],
                                                                           x[1][0][1],
                                                                           x[1][0][2]])
#.........这里部分代码省略.........
开发者ID:YuTengChang,项目名称:akam_mrqos,代码行数:101,代码来源:mpgCluster_spark.py


示例20: len

        without this change they would appear as u'Foo' and 'Foo' in
        the final key string. Although python doesn't care about this
        difference, hadoop does, and will bucket the values
        separately. Which is not what we want.
        """
        # TODO: refactor this into a utility function and update jobs
        # to always UTF8 encode mapper keys.
        if len(values) > 1:
            return tuple([value.encode('utf8') for value in values])
        else:
            return values[0].encode('utf8')

    
sc = SparkContext("local", "Course Activity")
	#sqlHC is the SQLHiveContext        
sqlHC = HiveContext(sc)

lines=sqlHC.sql(""" select courseName,lmsUserId,createDateTime,
		            eventType,eventName,eventNo from logdata where 
			    eventType not in ('enrollment','instructor','admin') 
			    and lmsUserId is not NULL 
   			    and courseName is not NULL 
			    and eventNo is not NULL limit 100""")


maplvl1=lines.flatMap(lambda p: mapp(p[0],str(p[1]),p[2].strftime('%Y-%m-%d'),p[4]))
for linet in maplvl1.collect():
	print linet

reduceRDD = maplvl1.reduceByKey(lambda a, b : a + b)
开发者ID:Zarana-Parekh,项目名称:analytics,代码行数:30,代码来源:test.py



注:本文中的pyspark.sql.HiveContext类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sql.Row类代码示例发布时间:2022-05-26
下一篇:
Python serializers.BatchedSerializer类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap