本文整理汇总了Python中pyspark.sql.HiveContext类的典型用法代码示例。如果您正苦于以下问题:Python HiveContext类的具体用法?Python HiveContext怎么用?Python HiveContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了HiveContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: table_schema_from_spark
def table_schema_from_spark(hcat_table_name):
#returns schema of table with this database.name in hcatalog
# (spark-workaround as long as hcatweb api is not available...)
# initialize spark
import findspark
findspark.init()
import pyspark
from pyspark.sql import HiveContext
sc_conf = pyspark.SparkConf()
#sc_conf.set('spark.executor.extraClassPath','/opt/cloudera/parcels/CDH/lib/hive/lib/*')
#sc_conf.set('spark.master','yarn-client')
sc = pyspark.SparkContext(appName = 'ade_get_table_schema', conf=sc_conf)
hc = HiveContext(sc)
hive_schema = hc.table(hcat_table_name).schema.jsonValue()
print hive_schema
sc.stop()
table_schema = {'columns':{}}
col_sequence = 0
for field in hive_schema['fields']:
table_schema['columns'][field['name']] = {'col_sequence': col_sequence, 'type':field['type']}
col_sequence += 1
return table_schema
开发者ID:heuvel,项目名称:den,代码行数:31,代码来源:den_hadoop.py
示例2: get_context_test
def get_context_test():
conf = SparkConf()
sc = SparkContext('local[1]', conf=conf)
sql_context = HiveContext(sc)
sql_context.sql("""use fex_test""")
sql_context.setConf("spark.sql.shuffle.partitions", "1")
return sc, sql_context
开发者ID:hongbin0908,项目名称:bintrade,代码行数:7,代码来源:index.py
示例3: get_context
def get_context():
conf = SparkConf()
conf.set("spark.executor.instances", "4")
conf.set("spark.executor.cores", "4")
conf.set("spark.executor.memory", "8g")
sc = SparkContext(appName="__file__", conf=conf)
sql_context = HiveContext(sc)
sql_context.sql("""use fex""")
sql_context.setConf("spark.sql.shuffle.partitions", "32")
return sc, sql_context
开发者ID:hongbin0908,项目名称:bintrade,代码行数:10,代码来源:index.py
示例4: main
def main():
sc = SparkContext()
hc = HiveContext(sc)
df = hc.sql("""{{sql}}""")
df_writer = DataFrameWriter(df)
df_writer.saveAsTable(name='{{tableName}}',
format='json',
mode='overwrite',
path='s3://data/{{tableName}}')
开发者ID:tgknight,项目名称:aws-sdk-hands-on,代码行数:10,代码来源:job.py
示例5: gen_report_table
def gen_report_table(hc,curUnixDay):
rows_indoor=sc.textFile("/data/indoor/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),seconds=int(p[4]),utoday=int(p[5]),ufirstday=int(p[6])))
HiveContext.createDataFrame(hc,rows_indoor).registerTempTable("df_indoor")
#ClientMac|etime|ltime|seconds|utoday|ENTITYID|UFIRSTDAY
sql="select entityid,clientmac,utoday,UFIRSTDAY,seconds,"
sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 2505600 preceding) as day_30," # 2505600 is 29 days
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 518400 preceding) as day_7," #518400 is 6 days
sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY range 1 preceding) as pre_mon "
sql=sql+"from df_indoor order by entityid,clientmac,utoday"
df_id_stat=hc.sql(sql)
df_id_mm=df_id_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
#df_id_mm df_min_max ,to caculate firtarrival and last arrival
df_id_stat_distinct=df_id_stat.drop("seconds").drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
#distinct df is for lag function to work
df_id_prepremon=df_id_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
cond_id = [df_id_mm.clientmac == df_id_prepremon.clientmac, df_id_mm.entityid == df_id_prepremon.entityid, df_id_mm.UFIRSTDAY==df_id_prepremon.UFIRSTDAY]
df_indoor_fin_tmp=df_id_mm.join(df_id_prepremon, cond_id, 'outer').select(df_id_mm.entityid,df_id_mm.clientmac,df_id_mm.utoday,df_id_mm.UFIRSTDAY,df_id_mm.seconds,df_id_mm.day_30,df_id_mm.day_7,df_id_mm.min,df_id_mm.max,df_id_mm.total_cnt,df_id_prepremon.prepre_mon)
df_indoor_fin_tmp=df_indoor_fin_tmp.selectExpr("entityid as entityid","clientmac as clientmac","utoday as utoday","UFIRSTDAY as ufirstday","seconds as secondsbyday","day_30 as indoors30","day_7 as indoors7","min as FirstIndoor","max as LastIndoor","total_cnt as indoors","prepre_mon as indoorsPrevMonth")
#newly added part for indoors7 and indoors30 based on current date
df_indoor_fin_tmp1= df_indoor_fin_tmp.withColumn("r_day_7", func.when((curUnixDay- df_indoor_fin_tmp.utoday)/86400<7 , 1).otherwise(0))
df_indoor_fin_tmp2=df_indoor_fin_tmp1.withColumn("r_day_30", func.when((curUnixDay- df_indoor_fin_tmp1.utoday)/86400<30 , 1).otherwise(0))
df_indoor_fin_tmp3=df_indoor_fin_tmp2.withColumn("r_indoors7",func.sum("r_day_7").over(Window.partitionBy("entityid","clientmac")))
df_indoor_fin_tmp4=df_indoor_fin_tmp3.withColumn("r_indoors30",func.sum("r_day_30").over(Window.partitionBy("entityid","clientmac")))
df_indoor_fin=df_indoor_fin_tmp4.drop("r_day_7").drop("r_day_30")
hc.sql("drop table if exists df_indoor_fin")
df_indoor_fin.write.saveAsTable("df_indoor_fin")
rows_flow=sc.textFile("/data/flow/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),utoday=int(p[4]),ufirstday=int(p[5])))
HiveContext.createDataFrame(hc,rows_flow).registerTempTable("df_flow")
# ClientMac|ENTITYID|UFIRSTDAY|etime|ltime|utoday
sql="select entityid,clientmac,utoday,UFIRSTDAY,"
sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 2505600 preceding) as day_30," # 2505600 is 29 days
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 518400 preceding) as day_7," #518400 is 6 days
sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY range 1 preceding) as pre_mon "
sql=sql+"from df_flow order by entityid,clientmac,utoday"
df_fl_stat=hc.sql(sql)
df_fl_mm=df_fl_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
#df_fl_mm df_min_max ,to caculate firtarrival and last arrival
df_fl_stat_distinct=df_fl_stat.drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
#distinct df is for lag function to work
df_fl_prepremon=df_fl_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
cond_fl = [df_fl_mm.clientmac == df_fl_prepremon.clientmac, df_fl_mm.entityid == df_fl_prepremon.entityid, df_fl_mm.UFIRSTDAY==df_fl_prepremon.UFIRSTDAY]
df_flow_fin=df_fl_mm.join(df_fl_prepremon, cond_fl, 'outer').select(df_fl_mm.entityid,df_fl_mm.clientmac,df_fl_mm.utoday,df_fl_mm.UFIRSTDAY,df_fl_mm.day_30,df_fl_mm.day_7,df_fl_mm.min,df_fl_mm.max,df_fl_mm.total_cnt,df_fl_prepremon.prepre_mon)
df_flow_fin=df_flow_fin.selectExpr("entityid as entityid","clientmac as clientmac","utoday as utoday","UFIRSTDAY as ufirstday","day_30 as visits30","day_7 as visits7","min as FirstVisit","max as LastVisit","total_cnt as visits","prepre_mon as visitsPrevMonth")
hc.sql("drop table if exists df_flow_fin")
df_flow_fin.write.saveAsTable("df_flow_fin")
开发者ID:dalinqin,项目名称:src,代码行数:52,代码来源:main_report.py
示例6: query12_no
def query12_no(query_name, conf=None):
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
# SQL statements can be run by using the sql methods provided by sqlContext
sql = "use tpcds_text_db_1_50"
_ = sqlContext.sql(sql)
output = execute_sql(query_name, sqlContext)
output['describe'] = output['output'].describe().show()
sc.stop()
return output
开发者ID:fmacias64,项目名称:big-data-system,代码行数:13,代码来源:question2_pyspark.py
示例7: query12_input
def query12_input(query_name, conf=None, output_persist=False):
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
# SQL statements can be run by using the sql methods provided by sqlContext
sql = "use tpcds_text_db_1_50"
_ = sqlContext.sql(sql)
# web_sales_sql = "select * from web_sales"
# web_sales = sqlContext.sql(web_sales_sql)
# web_sales.persist()
# web_sales.registerAsTable("web_sales")
# item_sql = "select * from item"
# item = sqlContext.sql(item_sql)
# item.persist()
# item.registerAsTable("item")
# date_dim_sql = "select * from date_dim"
# date_dim = sqlContext.sql(date_dim_sql)
# date_dim.persist()
# date_dim.registerAsTable("date_dim")
sqlContext.cacheTable("web_sales")
sqlContext.cacheTable("item")
sqlContext.cacheTable("date_dim")
# discard the first query
output = execute_sql(query_name, sqlContext, output_persist)
# check the re-run statistics
output = execute_sql(query_name, sqlContext)
output['describe'] = output['output'].describe().show()
sc.stop()
return output
开发者ID:fmacias64,项目名称:big-data-system,代码行数:32,代码来源:question2_pyspark.py
示例8: _test
def _test():
import doctest
import os
import tempfile
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext, HiveContext
import pyspark.sql.readwriter
os.chdir(os.environ["SPARK_HOME"])
globs = pyspark.sql.readwriter.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['tempfile'] = tempfile
globs['os'] = os
globs['sc'] = sc
globs['sqlContext'] = SQLContext(sc)
globs['hiveContext'] = HiveContext._createForTesting(sc)
globs['df'] = \
globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
globs['sc'].stop()
if failure_count:
exit(-1)
开发者ID:15652101501,项目名称:spark,代码行数:27,代码来源:streaming.py
示例9: run
def run(self):
sc = SparkContext("local", "Course Activity")
#sqlHC is the SQLHiveContext
sqlHC = HiveContext(sc)
lines=sqlHC.sql(""" select courseName,lmsUserId,createDateTime,
eventType,eventName,eventNo from logdata where
eventType not in ('enrollment','instructor','admin')
and lmsUserId is not NULL
and courseName is not NULL
and eventNo is not NULL limit 10""")
maplvl1=lines.flatMap(lambda p: mapp(p[0],str(p[1]),p[2].strftime('%Y-%m-%d'),p[4]))
reduceRDD=maplvl1.reduceByKey(lambda a,b : a+b)
with self.output().open('w') as out_file:
for line in reduceRDD.collect():
out_file.write(line[0][0]+"\x01"+line[0][1]+"\x01"+line[0][2]+"\x01"+line[0][3]+"\x01"+str(line[1])+"\n")
开发者ID:Zarana-Parekh,项目名称:analytics,代码行数:18,代码来源:course_activity.py
示例10: __init__
def __init__(self, sc, debug=False):
self.export_path = os.environ['COOPERHEWITT_ROOT'] + "/export/"
self.sc = sc
# hive requires writable permissions: ~/ephemeral-hdfs/bin/hadoop fs -chmod 777 /tmp/hive
self.hive_cxt = HiveContext(sc)
self.sql_cxt = SQLContext(sc)
if debug:
print "{0}\n{1}\n{2}\n".format(sc.master, self.hive_cxt, self.sql_cxt)
print sc._conf.getAll()
开发者ID:akamlani,项目名称:cooperhewitt,代码行数:9,代码来源:ch_spark.py
示例11: ch9_sql
def ch9_sql():
# Import Spark SQL
from pyspark.sql import HiveContext, Row
# Or if you can't include the hive requirements
from pyspark.sql import SQLContext, Row
hiveCtx = HiveContext(sc)
input_file = hiveCtx.read.json("testweet.json")
# Register the input_file schema RDD
input_file.registerTempTable("tweets")
# Select tweets based on the retweetCount
topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM
tweets ORDER BY retweetCount LIMIT 10""")
topTweetText = topTweets.map(lambda row: row.text)
topTweetText.collect()
topTweets.schema
hiveCtx.cacheTable("tweets")
开发者ID:jichen3000,项目名称:codes,代码行数:20,代码来源:interactive.py
示例12: run
def run(inpath, outpath, mode='append'):
gc.disable()
print("===== Checking if Log Exists =====")
check_log(inpath)
print("===== Pass Log Checking =====")
# initial SparkContext
conf = SparkConf().setAppName("Forgate Log Parser")
sc = SparkContext(conf=conf)
sqlCtx = HiveContext(sc)
start_time = time.time()
print("===== INPUT FILE PATH: %s =====" % (str(inpath)))
print("===== OUTPUT FILE PATH: %s =====" % (str(outpath)))
print("===== %s Reading Data From HDFS" % (now()))
distFile = sc.textFile(inpath)
cnt_raw = distFile.count()
print("===== Count of Input Data: %s =====" % (str(cnt_raw)))
print("===== %s Parsing Data" % (now()))
parsedData = parse_data(sc, distFile)
print("===== Count of Parsed Data: %s =====" % (str(parsedData.count())))
print("===== %s Saving Data" % (now()))
jsonData = sqlCtx.jsonRDD(parsedData)
old_col=['time','date']
new_col=['time_','dt']
jsonData = rename_column(jsonData, old_col, new_col)
jsonData.write.partitionBy('dt').parquet(outpath, mode=mode)
print("===== %s Checking Data" % (now()))
confirm_row(sqlCtx, outpath)
write_log(inpath)
print("---Total took %s seconds ---" % (time.time() - start_time))
sc.stop()
gc.enable()
开发者ID:bryanyang0528,项目名称:fortigate-log-parser-in-spark,代码行数:37,代码来源:logparser_spark.py
示例13: float
w.blowing_out = self.isBlowingOut(w.windBearing, float(game.center_azimuth))
try:
w.ozone = val.ozone
except Exception:
w.ozone = self.domeVal['ozone']
print "w=", w
updates.append(w)
self.saveWeather(updates)
print "updates=", updates
if __name__ == '__main__':
print "Starting.", datetime.now()
sc = SparkContext()
#sqlContext = SQLContext(sc)
sqlContext = HiveContext(sc)
games = sqlContext.read.parquet(CreateStatsRDD.rddDir + "/" + Games.table_name + ".parquet")
games.registerTempTable("games")
games.cache()
print "games=", games
print games.take(2)
stadium = sqlContext.load(source="com.databricks.spark.csv", header="true", path = SRCDIR + "/stadium.csv")
stadium.registerTempTable("stadium")
stadium.cache()
print "stadium=", stadium.take(2)
weather = UpdateWeather(sc, sqlContext, games, stadium)
weather.update()
开发者ID:xiaokekehaha,项目名称:mlb_stats_spark,代码行数:30,代码来源:UpdateWeather.py
示例14: print
print("""
Error: This program takes 2 arguments
Usage: bin/spark-submit --master <spark-master> nrem.py <input dir> <output dir>
""")
sys.exit(1)
#Parallelism
partitions = 1
#Output Directories
matched_output = os.path.join(sys.argv[2],"matched")
eliminated_output = os.path.join(sys.argv[2], "eliminated")
conf = SparkConf().setAppName("Non Redundant Entity Matching")
sc = SparkContext(conf=conf)
sqlCtx = HiveContext(sc)
def attr_key(l):
"""
[obj, attr1, attr2, attr3 ...] -> [(attr1, obj), (attr2, obj), (attr3, obj) ...]
"""
a = []
for attr in l[1:]:
a.append((attr, l[0]))
return a
"""
Assuming input file(s) to be tsv, and first field to be object and rest of the fields as attributes
"""
#Read input
inRDD = sc.textFile(sys.argv[1], partitions)
开发者ID:ParineethaT,项目名称:BlackBox,代码行数:30,代码来源:nrem.py
示例15: SparkConf
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
# Initialize Spark
SparkContext.setSystemProperty("spark.executor.memory", "4g")
conf = SparkConf()
conf.set("spark.executor.instances", 20)
sc = SparkContext("yarn-client", "kdd99", conf=conf)
hc = HiveContext(sc)
kdd = hc.table("kdd99")
(trainData, testData) = kdd.randomSplit([0.7, 0.3], seed=42)
trainData.cache()
services = trainData.withColumnRenamed("service", "srvc").select("srvc").distinct()
testData = testData.join(services, testData.service == services.srvc)
# filter out any rows with a service not trained upon
testData.cache()
print "training set has " + str(trainData.count()) + " instances"
print "test set has " + str(testData.count()) + " instances"
# Build model
inx1 = StringIndexer(inputCol="protocol", outputCol="protocol-cat")
inx2 = StringIndexer(inputCol="service", outputCol="service-cat")
inx3 = StringIndexer(inputCol="flag", outputCol="flag-cat")
inx4 = StringIndexer(inputCol="is_anomaly", outputCol="label")
ohe2 = OneHotEncoder(inputCol="service-cat", outputCol="service-ohe")
开发者ID:ofermend,项目名称:data-science-with-hadoop-book,代码行数:31,代码来源:anomaly.py
示例16: len
# A simple hive demo. If you do not have a table to load from look run MakeHiveTable.py
from pyspark import SparkContext
from pyspark.sql import HiveContext
import json
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print "Error usage: LoadHive [sparkmaster] [inputtable]"
sys.exit(-1)
master = sys.argv[1]
inputTable = sys.argv[2]
sc = SparkContext(master, "LoadHive")
hiveCtx = HiveContext(sc)
# Query hive
input = hiveCtx.hql("FROM " + inputTable + " SELECT key, value")
data = input.map(lambda x: x['key'] * x['key'])
result = data.collect()
for element in result:
print "Got data " + str(element)
sc.stop()
print "Done!"
开发者ID:IChocolateKapa,项目名称:learning-spark,代码行数:22,代码来源:LoadHive.py
示例17: SparkContext
return str_time
if __name__ == '__main__':
if (len(sys.argv) != 1):
print "Usage: spark-submit <Python Code File>"
sys.exit(1)
#App name which shows up in the Spark UI
sc = SparkContext(appName='User Recommendation')
#Context provides connection to Hive metastore
sqlContext = HiveContext(sc)
'''
Pulling data out of Hive. I created a relication of 'watson_bisum_purchases' table locally to test.
'''
rdd = sqlContext.sql("SELECT person_id,deal_id,aasm_state FROM watson_bisum_purchases")
'''
Creating datasets. Formating the data and also creating sample datasets in order to create and test the model.
'''
#Formating all the data using the 'parse_rating' method above
all_data = rdd.map(parse_rating)
开发者ID:rodyou,项目名称:Spark_Examples,代码行数:31,代码来源:collaborative_filtering.py
示例18: SparkContext
"""transform.py"""
from pyspark import SparkContext
sc = SparkContext("local", "Transform")
from pyspark.sql import HiveContext
from pyspark.sql.types import *
import numpy as np
sqlContext = HiveContext(sc)
##########
# Util functions
def saveAsHiveTable(df, name):
# Save it this way so that native HIVE can read the table as well
df.registerTempTable("df")
sqlContext.sql("drop table if exists " + name)
sqlContext.sql("CREATE TABLE " + name + " AS SELECT * FROM df")
def getFloat(s):
if s is None:
return None
try:
f = float(s)
return f
except ValueError:
return None
##########
开发者ID:patng323,项目名称:w205-ex1,代码行数:30,代码来源:transform.py
示例19: main
def main():
# set up the logger
logging.basicConfig(filename=os.path.join(config.mrqos_logging, 'mpg_cluster.log'),
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%m/%d/%Y %H:%M:%S')
logger = logging.getLogger(__name__)
# NSJOIN dayidx # only partitioned by DAY
day_idx = beeline.get_last_partitions('mapper.nsjoin').split('=')[1]
# BAREBONES dayidx # only partitioned by DAY
day_bb = [x for x in beeline.show_partitions('mapper.barebones').split('\n') if '=%s' % (day_idx) in x]
# MAPPOINTS dayidx # partitioned by DAY and UUID (pick the last uuid)
mappoints_data = sorted([x for x in beeline.show_partitions('mapper.mappoints').split('\n') if '=%s' % (day_idx) in x])[-1].split('/')
[day_mps, uuid_idx] = [x.split('=')[1] for x in mappoints_data]
if day_idx != day_mps:
logger.error('mapper.mappoints and mapper.nsjoin different day, possible data missing in the source.')
return
if len(day_bb) == 0:
logger.warning('mapper.barebone data missing for this particular day.')
#return
logger.info('Processing data in day=%s, uuid=%s' % (day_idx, uuid_idx))
logger.info('begin spark process.')
getting_mappoint_data = ''' select b1.mpgid mpgid, b1.lat lat, b1.lon lon, b1.country country, b1.mpgload mpgload, b1.allowed_private_regions allowed_private_regions, b2.asnum asnum, b2.ip ip from (select mpgid, lat, lon, country, mpgload, allowed_private_regions from mapper.mappoints where day=%s and uuid="%s" and lat is not NULL and lon is not NULL and ghostonly=0 ) b1 left outer join (select collect_set(ns_ip) ip, collect_set(asnum) asnum, mpgid from (select ns_ip, mpd_uuid, mpgid, asnum, demand, day from mapper.nsjoin where day=%s and mpd_uuid="%s" and demand>0.01 order by demand desc) a group by mpgid) b2 on b2.mpgid=b1.mpgid ''' % (day_idx, uuid_idx, day_idx, uuid_idx)
geo_total_cap_query = ''' select * from (select country, network, sum(peak_bitcap_mbps) peak_bitcap_mbps, sum(peak_flitcap_mfps) peak_flitcap_mfps, sum(numvips) numvips from mapper.regioncapday where day=%s and network in ('freeflow', 'essl') and prp='private' group by country, network) a ''' % day_idx
geo_total_cap_public_query = ''' select * from (select country, network, sum(peak_bitcap_mbps) peak_bitcap_mbps, sum(peak_flitcap_mfps) peak_flitcap_mfps, sum(numvips) numvips from mapper.regioncapday where day=%s and network in ('freeflow', 'essl') and prp='public' group by country, network) a ''' % day_idx
sc = SparkContext()
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql(getting_mappoint_data)
regInfoRows = hiveCtx.sql('select * from mapper.regioncapday where day=%s and peak_bitcap_mbps is not null and peak_flitcap_mfps is not null' % (day_idx))
geo_total_cap = hiveCtx.sql(geo_total_cap_query)
geo_total_cap_p = hiveCtx.sql(geo_total_cap_public_query)
# rdd format: [regionid, [mpgid, mpg-lat, mpg-lon, mpg-country, mpg-load, mpg-asnum, mpg-nsip]]
region_mpginfo_pair = rows.map(lambda x: [[x.mpgid,
x.lat,
x.lon,
x.country,
x.mpgload,
x.asnum,
x.ip], x.allowed_private_regions])\
.flatMapValues(lambda x: x).map(lambda x: [x[1], x[0]])
#region_mpginfo_pair.first()
# rdd format: [regionid, [reg-lat, reg-lon, reg-capacity(bit mbps), reg-capacity(bit mfps), reg-country, reg-numvips, reg-service, reg-prp]]
# ps. prp=1: private, prp=0: public
region_latlon = regInfoRows.map(lambda x: [x.region, [x.latitude,
x.longitude,
x.peak_bitcap_mbps,
x.peak_flitcap_mfps,
x.country,
x.numvips,
'W' if x.network=='freeflow' else ('S' if x.network=='essl' else 'O'),
1 if x.prp=='private' else 0]])\
.filter(lambda x: x[1][6]=='W' or x[1][6]=='S')
region_public_list = region_latlon\
.filter(lambda x: x[1][7] == 0)\
.map(lambda x: ('all', [[x[0]]]))\
.reduceByKey(lambda a, b: [a[0]+b[0]])\
.map(lambda x: x[1][0]).collect()
region_public_list = [0] + sorted(region_public_list[0])
# dummy region
rdd2 = sc.parallelize([([0, [0, 0, 0.0, 0.0, 'US', 0, 'W', 1]])])
region_latlon = region_latlon.union(rdd2)
# perform the join into tuple of (K, (V1, V2):
# (regionid, ([mpgid, mpg-lat, mpg-lon, mpg-country, mpg-load], [reg-lat, reg-lon, reg-cap, reg-country, reg-numvips, reg-service]))
# rdd = (mpgid, regionid, [lat1, lon1, lat2, lon2, distance],
# reg-cap-bit(gbps), reg-cap-flit(gbps), reg-country, reg-numvips, reg-services,
# mpg-country, mpg-load, mpg-asnum, mpg-nsip,
# mpg-lat, mpg-lon)
mpgid_reg_geo = region_mpginfo_pair.join(region_latlon).map(lambda x: [x[1][0][0],
x[0],
geodesic_distance(x[1][0][1],
x[1][0][2],
x[1][1][0],
x[1][1][1]),
round(float(x[1][1][2])/1000.0, 3),
round(float(x[1][1][3])/1000.0, 3),
x[1][1][4], # reg-country
x[1][1][5], # reg-numvips
x[1][1][6], # reg-services
x[1][0][3],
x[1][0][4],
x[1][0][5],
x[1][0][6],
x[1][0][1],
x[1][0][2]])
#.........这里部分代码省略.........
开发者ID:YuTengChang,项目名称:akam_mrqos,代码行数:101,代码来源:mpgCluster_spark.py
示例20: len
without this change they would appear as u'Foo' and 'Foo' in
the final key string. Although python doesn't care about this
difference, hadoop does, and will bucket the values
separately. Which is not what we want.
"""
# TODO: refactor this into a utility function and update jobs
# to always UTF8 encode mapper keys.
if len(values) > 1:
return tuple([value.encode('utf8') for value in values])
else:
return values[0].encode('utf8')
sc = SparkContext("local", "Course Activity")
#sqlHC is the SQLHiveContext
sqlHC = HiveContext(sc)
lines=sqlHC.sql(""" select courseName,lmsUserId,createDateTime,
eventType,eventName,eventNo from logdata where
eventType not in ('enrollment','instructor','admin')
and lmsUserId is not NULL
and courseName is not NULL
and eventNo is not NULL limit 100""")
maplvl1=lines.flatMap(lambda p: mapp(p[0],str(p[1]),p[2].strftime('%Y-%m-%d'),p[4]))
for linet in maplvl1.collect():
print linet
reduceRDD = maplvl1.reduceByKey(lambda a, b : a + b)
开发者ID:Zarana-Parekh,项目名称:analytics,代码行数:30,代码来源:test.py
注:本文中的pyspark.sql.HiveContext类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论