本文整理汇总了Python中pyspark.SQLContext类的典型用法代码示例。如果您正苦于以下问题:Python SQLContext类的具体用法?Python SQLContext怎么用?Python SQLContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SQLContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: hash_rating
def hash_rating(author_subreddit_rating_rdd, sc):
sql_context = SQLContext(sc)
author_sub_schema = StructType([
StructField("author", StringType(), True),
StructField("subreddit", StringType(), True),
StructField("rating", LongType(), True)
])
asr_df = sql_context.createDataFrame(author_subreddit_rating_rdd, author_sub_schema)
author_rdd = author_subreddit_rating_rdd.map(lambda (a, s, r): a)
aid_rdd = author_rdd.distinct().zipWithUniqueId().cache()
author_id_schema = StructType([
StructField("author", StringType(), True),
StructField("author_id", LongType(), True)
])
aid_df = sql_context.createDataFrame(aid_rdd, author_id_schema)
aid_s_r_df = aid_df.join(asr_df, on='author').drop('author').cache()
subreddit_rdd = author_subreddit_rating_rdd.map(lambda (a, s, r): s)
sid_rdd = subreddit_rdd.distinct().zipWithUniqueId().cache()
subreddit_id_schema = StructType([
StructField("subreddit", StringType(), True),
StructField("subreddit_id", LongType(), True)
])
sid_df = sql_context.createDataFrame(sid_rdd, subreddit_id_schema)
aid_sid_r_df = sid_df.join(aid_s_r_df, on='subreddit').drop('subreddit').cache()
row_aid_sid_r_rdd = aid_sid_r_df.rdd
aid_sid_r_rdd = row_aid_sid_r_rdd.map(lambda row: (row.author_id, row.subreddit_id, row.rating))
return aid_rdd, sid_rdd, aid_sid_r_rdd
开发者ID:wmaciel,项目名称:redditDataAnalysis,代码行数:31,代码来源:sub_recommender.py
示例2: _get_data
def _get_data(self):
sql_context = SQLContext(self.sc)
l = [
(
"I dont know why people think this is such a bad movie.",
Vectors.sparse(3, {1: 1.0, 2: 1.0, 3: 1.0})
),
]
return sql_context.createDataFrame(l, ['text', 'features'])
开发者ID:ngarneau,项目名称:sentiment-analysis,代码行数:9,代码来源:transformers.py
示例3: _get_train_data
def _get_train_data(self):
sql_context = SQLContext(self.sc)
l = [
(1, Vectors.dense([1, 2, 3]), 1.0),
(2, Vectors.dense([1, 2, 3]), 0.0),
(3, Vectors.dense([1, 2, 3]), 1.0),
(4, Vectors.dense([1, 2, 3]), 0.0),
]
return sql_context.createDataFrame(l, ['id', 'features', 'label'])
开发者ID:ngarneau,项目名称:sentiment-analysis,代码行数:9,代码来源:pipelines.py
示例4: main
def main():
conf = SparkConf().setAppName('artist_career')
sc = SparkContext(conf=conf)
assert sc.version >= '1.5.1'
sqlContext=SQLContext(sc)
inputs = sys.argv[1]
output = sys.argv[2]
customSchema = StructType([StructField('SongNumber', StringType(),False),StructField('SongID', StringType(),False),StructField('AlbumID', StringType(),False),StructField('AlbumName', StringType(),False),StructField('ArtistID', StringType(),False),StructField('ArtistLatitude', StringType(),False),StructField('ArtistLocation', StringType(),False),StructField('ArtistLongitude', StringType(),False),StructField('ArtistName', StringType(),False),StructField('Danceability', StringType(),False),StructField('Duration', StringType(),False),StructField('KeySignature', StringType(),False),StructField('KeySignatureConfidence', StringType(),False),StructField('Tempo', StringType(),False),StructField('TimeSignature', StringType(),False),StructField('TimeSignatureConfidence', StringType(),False),StructField('Title', StringType(),False),StructField('Year', StringType(),False),StructField('Energy', StringType(),False),StructField('ArtistFamiliarity', StringType(),False),StructField('ArtistMbid', StringType(),False),StructField('SongHotttnesss', StringType(),False),StructField('Loudness', StringType(),False),StructField('StartOfFadeOut', StringType(),False),StructField('EndOfFadeIn', StringType(),False),StructField('ModeConfidence', StringType(),False)])
df= sqlContext.read.format('com.databricks.spark.csv').options(header='true').load(inputs,schema = customSchema)
df.registerTempTable('artist_data')
million_song=sqlContext.sql("SELECT SongNumber,SongID,AlbumID,AlbumName,ArtistID,ArtistLatitude,ArtistLocation,ArtistLongitude,ArtistName,Danceability,Duration,KeySignature,KeySignatureConfidence,Tempo,TimeSignature,TimeSignatureConfidence,Title,Year,Energy,ArtistFamiliarity,ArtistMbid,SongHotttnesss,Loudness,StartOfFadeOut,EndOfFadeIn,ModeConfidence from artist_data where Year!=0 AND ArtistFamiliarity!='nan'")
million_song.write.format('parquet').save(output)
开发者ID:BandeepSingh,项目名称:Million-Song-DataSet-Analysis,代码行数:15,代码来源:artist_parquet.py
示例5: __init__
def __init__(self):
self.conf = (SparkConf()
.setAppName("BandCard")
.set("spark.cores.max", "2")
.set('spark.executor.extraClassPath', '/usr/local/env/lib/mysql-connector-java-5.1.38-bin.jar'))
self.sc = SparkContext(conf=self.conf)
self.sqlctx = SQLContext(self.sc)
self.mysql_helper = MySQLHelper('core', host='10.9.29.212')
开发者ID:summer-apple,项目名称:spark,代码行数:9,代码来源:band_card.py
示例6: ALS_fit
def ALS_fit():
usern = request.args.get('usern')
users_df = pd.read_sql_query('''SELECT DISTINCT mt3ratings.user, user_id FROM mt3ratings WHERE appdata = 1''', engine)
if usern not in users_df['user'].values:
return_str = "can't find user"
return jsonify(result = return_str)
user_id = users_df.user_id[users_df.user == usern].values[0]
try: key = request.args.get('key')
except NameError: key = 'e'
if key == 'abcd':
#start spark
try:
conf = SparkConf().setAppName("BeerSleuthALS").set("spark.executor.memory", "4g")
sc = SparkContext(conf=conf)
except ValueError: pass
sqlContext = SQLContext(sc)
ratings_sqldf = modeling.get_item_user_rev_from_pg(engine, sqlContext)
sqlContext.registerDataFrameAsTable(ratings_sqldf, "ratings")
print('fitting model')
model = modeling.fit_final_model(ratings_sqldf)
beer_ids = beer_dict.values()
to_predict = zip([user_id]*len(beer_ids), beer_ids)
to_predict_top20 = zip([user_id]*len(beer_id_filt), beer_id_filt)
user_preds = model.predictAll(sc.parallelize(to_predict)).collect()
user_preds_top20 = model.predictAll(sc.parallelize(to_predict_top20)).collect()
print('got preds')
preds = Counter({x[1]: x[2] for x in user_preds})
preds_top20 = Counter({x[1]: x[2] for x in user_preds_top20})
with open('%s%s_preds.pkl'%(pred_path, user_id),'wb') as f:
pickle.dump(preds, f)
with open('%s%s_preds_top20.pkl'%(pred_path, user_id),'wb') as f:
pickle.dump(preds_top20, f)
print('done')
sc.stop()
return jsonify(result="Model training complete, you may now get predictions")
开发者ID:JohnRenshaw,项目名称:BeerSleuth,代码行数:37,代码来源:beersleuth_web.py
示例7: SparkContext
import os
import sys
from pyspark import SQLContext
from pyspark import SparkContext
#os.environ["SPARK_HOME"] = "/opt/spark-1.6.1-bin-hadoop2.6"
#os.environ["HADOOP_HOME"] = "/opt/hadoop"
#os.environ["HADOOP_PREFIX"] = "/opt/hadoop"
#os.environ["HIVE_HOME"] = "/opt/hive"
sc = SparkContext('local[1]')
sql_context = SQLContext(sc)
sql_context.setConf( "spark.sql.shuffle.partitions", "1")
sql_context.sql(""" use fex_test """)
开发者ID:hongbin0908,项目名称:bintrade,代码行数:17,代码来源:sparklib.py
示例8: main
def main():
sc = SparkContext(SPARK_ADDRESS, appName="RedditBatchLayer")
#sc = SparkContext("local[*]", appName="RedditBatchLayer")
bcURL = sc.broadcast(urlTitlePool)
sqlContext = SQLContext(sc)
conn = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
#conn = boto.connect_s3(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
bucket = conn.get_bucket(RAW_JSON_REDDIT_BUCKET)
def addTitleURL(cmtTuple):
# 150,000/ 3000 = avg 50 comments/topic
onePst = bcURL.value[randint(0, 3000)]
return cmtTuple + (onePst[0], onePst[1]) # adding title and url
if (smallBatch):
logFile = 's3a://reddit-comments/2007/RC_2007-10'
#df = sqlContext.read.json(logFile)
df = sqlContext.jsonFile(logFile)
users_rdd = df.filter(df['author'] != '[deleted]')
year = 2007
month = 12
users_row = users_rdd.map(lambda json: (json.author, '{0}_{1}'.format(year, month), json.created_utc, json.subreddit, json.id, json.body, json.score, json.ups, json.controversiality))\
.map(addTitleURL)
#.repartition(REPARTITION_SIZE)
users_row.foreachPartition(insert_into_cassandra)
# calculate user relationship graph
# (URL, user) tuple
post2user = users_row.map(lambda x: (x[10], x[0]))
#graph = post2user.join(post2user)\ # self join to find user relationship by posts
# .filter(lambda x: x[1][0] != x[1][1])\ # remove all self linked relationship
# .map(makeAscOrder)\ # make to asc order by user name
# .distinct()\ # remove duplicated user pairs, because the relationship is mutual
# .map(lambda x: (x[1], 1))\ # ready tho count number of common edges
# .reduceByKey(lambda x, y: x+y)\ # count total number for every edge/relationship
# .map(lambda x: (x[0][0], x[1], x[0][1]))# flatten and ready to write table
graph = post2user.join(post2user)\
.filter(lambda x: x[1][0] != x[1][1])\
.map(makeAscOrder)\
.distinct()\
.map(lambda x: (x[1], 1))\
.reduceByKey(lambda x, y: x+y)\
.map(lambda x: (x[0][0], x[1], x[0][1]))
graph.foreachPartition(insert_graph)
else:
for key in bucket.list():
if '-' not in key.name.encode('utf-8'): # filter out folders and _SUCCESS
continue
logFile = 's3a://{0}/{1}'.format(RAW_JSON_REDDIT_BUCKET, key.name.encode('utf-8'))
year = logFile.split('-')[1][-4:]
month = logFile.split('-')[2]
from_year = FROM_YEAR_MONTH.split('_')[0]
from_month = FROM_YEAR_MONTH.split('_')[1]
if int(year) < int(from_year) or (int(year) == int(from_year) and int(month) < int(from_month)):
continue
#df = sqlContext.read.json(logFile)
df = sqlContext.jsonFile(logFile)
users_rdd = df.filter(df['author'] != '[deleted]')
# 0 1 2 3 4 5 6 7 8 9 (title) 10(url)
users_row = users_rdd.map(lambda json: (json.author, '{0}_{1}'.format(year, month), json.created_utc, json.subreddit, json.id, json.body, json.score, json.ups, json.controversiality))\
.map(addTitleURL)
#.repartition(REPARTITION_SIZE)
users_row.foreachPartition(insert_into_cassandra)
# calculate user relationship graph
# (URL, user) tuple
post2user = users_row.map(lambda x: (x[10], x[0]))
#graph = post2user.join(post2user)\ # self join to find user relationship by posts
# .filter(lambda x: x[1][0] != x[1][1])\ # remove all self linked relationship
# .map(makeAscOrder)\ # make to asc order by user name
# .distinct()\ # remove duplicated user pairs, because the relationship is mutual
# .map(lambda x: (x[1], 1))\ # ready tho count number of common edges
# .reduceByKey(lambda x, y: x+y)\ # count total number for every edge/relationship
# .map(lambda x: (x[0][0], x[1], x[0][1]))# flatten and ready to write table
graph = post2user.join(post2user)\
.filter(lambda x: x[1][0] != x[1][1])\
.map(makeAscOrder)\
.distinct()\
.map(lambda x: (x[1], 1))\
.reduceByKey(lambda x, y: x+y)\
.map(lambda x: (x[0][0], x[1], x[0][1]))
#.repartition(REPARTITION_SIZE)
graph.foreachPartition(insert_graph)
sc.stop()
开发者ID:europelee,项目名称:HottestTopicOnReddit,代码行数:90,代码来源:reddit2cassandra.py
示例9: SparkContext
'''
DAY_OFFSET=1
#--set datetime
now =datetime.datetime.now()
pro_time=now-datetime.timedelta(days=DAY_OFFSET)
dest_time_str=pro_time.strftime("%Y%m%d")
'''
master = "spark://master:7077"
sep = "\t"
app_name = 'user_sign_in_app'
'''
spark_home='/opt/cloud/spark'
os.environ['SPARK_HOME']=spark_home
'''
sc = SparkContext(master, app_name)
sql_context = SQLContext(sc)
lines = sc.textFile(input)
parts = lines.map(lambda l: l.split(sep)).filter(lambda x: len(x) == 18)
'''
portal id(*) gw_id user_id user_name
login_time logout_time(*) mac ip user_agent
download_flow(*) upload_flow(*) os browser ratio
batch_no user_type supp_id
'''
user_login = parts.map(lambda p: (p[1].strip(), p[2].strip(),p[17].strip(),p[3].strip(),p[16].strip(),
p[4].strip(),p[5].strip(),p[6].strip(),p[7].strip(),p[8].strip(),
p[9].strip(),p[10].strip(),p[11].strip(),p[12].strip(),p[13].strip(),
p[14].strip(),p[15].strip()))
schema_string = "id gw_id supp_id user_id user_type " \
"user_name login_time logout_time mac ip " \
开发者ID:wangcunxin,项目名称:spark_py,代码行数:31,代码来源:user_sign_in.py
示例10: SparkConf
if __name__ == '__main__':
conf = SparkConf()
sc = SparkContext(conf=conf)
datadir = "/Users/eyalbenivri/Developer/projects/spark-workshop/data/"
# sudo dpkg --configure -a
# sudo apt-get install python-setuptools
# sudo easy_install dateutils
# Download pyspark_csv.py from https://github.com/seahboonsiew/pyspark-csv
sys.path.append('/Users/eyalbenivri/Developer/libs/pyspark_libs') # replace as necessary
import pyspark_csv
sc.addFile('/Users/eyalbenivri/Developer/libs/pyspark_libs/pyspark_csv.py') # ditto
sqlContext = SQLContext(sc)
# Task 1: load the prop-prices.csv file as an RDD, and use the csvToDataFrame function from the pyspark_csv module
# to create a DataFrame and register it as a temporary table so that you can run SQL queries:
print("------- ******* Task 1 ******* -------")
columns = ['id', 'price', 'date', 'zip', 'type', 'new', 'duration', 'PAON',
'SAON', 'street', 'locality', 'town', 'district', 'county', 'ppd',
'status']
rdd = sc.textFile(datadir + "prop-prices.csv")
df = pyspark_csv.csvToDataFrame(sqlContext, rdd, columns=columns)
df.registerTempTable("properties")
df.persist()
# Task 2: let's do some basic analysis on the data.
# Find how many records we have per year, and print them out sorted by year.
开发者ID:eyalbenivri,项目名称:sdp-spark-workshop,代码行数:31,代码来源:lab4-solution.py
示例11: SparkContext
if __name__ == '__main__':
# --set datetime
DAY_OFFSET = 1
now = datetime.datetime.now()
pro_time = now - datetime.timedelta(days=DAY_OFFSET)
day = pro_time.strftime("%Y%m%d")
master = "spark://hadoop:7077"
appName = "spark_pageflow_outflow"
input = "/impala/parquet/site/site-pageflowv1/dat=%s" % day
spark_home = '/opt/cloud/spark'
os.environ['SPARK_HOME'] = spark_home
sc = SparkContext(master, appName)
sql_context = SQLContext(sc)
sql_context.registerFunction("to_day", lambda x: mill_date_str(x), StringType())
sql_context.registerFunction("to_str", lambda x: bytearray_str(x), StringType())
parquet_df = sql_context.read.parquet(input)
sql_context.registerDataFrameAsTable(parquet_df, "site_pageflowv1")
_sql = "select to_str(url),to_day(createtime) day,count(1) pv,count(distinct to_str(guuid)) uv " \
"from site_pageflowv1 where dat= %s and to_str(name)='outflow' " \
"group by to_str(url),to_day(createtime)" % day
rs_df = sql_context.sql(_sql)
rs = rs_df.collect()
logger.info("---->" + str(len(rs)))
list = []
开发者ID:wangcunxin,项目名称:spark_py,代码行数:31,代码来源:main.py
示例12: SparkContext
ymd = pro_time.strftime("%Y%m%d")
master = "spark://hadoop:7077"
appName = "spark_loginflowlog"
#input = "/impala/parquet/back/back-portal-loginflowlog/dat=%s*" % ym
input = '/input/loginfowlog/*'
spark_home = '/opt/cloud/spark'
os.environ['SPARK_HOME'] = spark_home
conf = (SparkConf()
.setMaster(master)
.setAppName(appName)
.set("spark.sql.parquet.binaryAsString","true")
)
sc = SparkContext(conf = conf)
sql_context = SQLContext(sc)
sql_context.registerFunction("to_mac", lambda x: normal_mac(x), StringType())
parquet_df = sql_context.read.parquet(input)
sql_context.registerDataFrameAsTable(parquet_df, "loginflowlog")
#_sql = "select to_mac(upper(usermac)),count(distinct dat) days from loginflowlog group by to_mac(upper(usermac))"
_sql = "select to_mac(upper(usermac)),count(distinct logtime) days from loginflowlog group by to_mac(upper(usermac))"
rs_df = sql_context.sql(_sql)
rs = rs_df.collect()
logger.info("---->" + str(len(rs)))
lists = []
for r in rs:
usermac = r[0]
days = r[1]
t = (usermac,days)
开发者ID:wangcunxin,项目名称:spark_py,代码行数:31,代码来源:login_days_main.py
示例13: SparkContext
logger.info(day)
'''
#
spark_home = '/opt/cloud/spark'
os.environ['SPARK_HOME'] = spark_home
#master = "spark://hadoop:7077"
master = "local[1]"
app_name = "spark_transferdata"
sep = "\t"
#input = "/data/140301_150731.csv"
input = "/input/loginlog/2015"
output = "/output/loginlog/2015"
sc = SparkContext(master, app_name)
sqlContext = SQLContext(sc)
# load
lines = sc.textFile(input)
rdd = lines.map(lambda l: l.split(sep))\
.filter(lambda l:len(l)==11)\
.map(lambda l:(l[0],l[1],l[2],to_long(l[3]),l[4],
long(l[5]),long(l[6]),l[7],l[8],l[9],
to_long(l[10])))
# uid,adid,guuid,guuidctime,url,referer,hosid,gwid,ua,ip,createtime
# uid,adid,guuid,createtime
fields = [
StructField('uid', StringType(), True),
StructField('adid', StringType(), True),
StructField('guuid', StringType(), True),
StructField('guuidctime', LongType(), True),
StructField('url', StringType(), True),
开发者ID:wangcunxin,项目名称:spark_py,代码行数:31,代码来源:transfer_advload.py
示例14: get_date
select_userlogin_repeat = "select get_date(login_time) as day,hos_id,mac,login_time from wxcity_userlogin_info where login_time >= '%s 00:00:00' and login_time <='%s 23:59:59' order by day,hos_id,mac,login_time"
select_userlogin_repeat_sta = "select day,hos_id,sum(t2),sum(t5),sum(t10),sum(t30),sum(t60) from repeat_login_list group by day,hos_id"
if __name__ == '__main__':
if len(sys.argv) != 5:
print("Usage: spark_streaming.py <master> <begin> <end> <input>", file=sys.stderr)
exit(-1)
master, time_begin, time_end, input = sys.argv[1:]
input_path = input + '/' + time_begin + '.csv'
logger.info("--->" + master + " " + input_path)
sc = SparkContext(master, 'wxcity_userlogin_repeat_app')
sql_context = SQLContext(sc)
lines = sc.hadoopFile(input,
'org.apache.hadoop.mapred.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text'
)
rs_tuples = MysqlDao().findWithQuery(ConfigPortalSql.select_mysql_hos_gw_sup)
gwid_hosid_dict = {}
for r in rs_tuples:
hos_id = str(r[0])
gw_id = r[1]
gwid_hosid_dict[gw_id] = hos_id
logger.debug('-->gwid_hosid:' + str(gwid_hosid_dict.__len__()))
users = lines.map(lambda x: x[1].split(',')).filter(lambda x: len(x) == 17) \
开发者ID:wangcunxin,项目名称:spark_py,代码行数:31,代码来源:userlogin_repeat.py
示例15: SparkContext
# -*- coding: utf-8 -*-
"""
Created on Tue Jun 21 09:31:37 2016
@author: rsk
"""
from pyspark import SparkContext
from pyspark import SQLContext
sc = SparkContext("local","recommendationEngineApp")
sqlContext = SQLContext(sc)
from pyspark.sql import SQLContext,Row
#from pyspark.sql import Functions as F
dataDir = "/home/rsk/Documents/Spark"
userData = sc.textFile(dataDir+"/ml-100k/u.user").map(lambda x : x.split("|"))
movieData = sc.textFile(dataDir+"/ml-100k/u.item").map(lambda x : x.split("|"))
ratingData = sc.textFile(dataDir+"/ml-100k/u.data").map(lambda x : x.split("\t"))
#%%
ratingDataDF = ratingData.map(lambda x : Row(userID = int(x[0]),
movieID = int(x[1]),
rating=float(x[2]),
timestamp = int(x[3])))
ratingDataDF = sqlContext.createDataFrame(ratingDataDF)
userDataDF = userData.map(lambda x : Row(userID=int(x[0]),
age = int(x[1]),
gender = x[2],
开发者ID:rsk2327,项目名称:RecommendationEngine,代码行数:31,代码来源:PySpark_code.py
示例16: SparkContext
from pyspark import SQLContext, SparkConf, SparkContext
from pyspark.sql.types import *
#setting the configurations for the SparkConf object here
conf = (SparkConf()
.setMaster("local[4]")
.setAppName("convert.py")
.set("spark.executor.memory", "1g"))
#creating the SparkConf object here
sc = SparkContext(conf = conf)
#creating the sqlContext that will be used
sqlContext = SQLContext(sc)
#reading the parquet file
#Change this line to be the directory where the parquet file exists
parquetFile = sqlContext.read.parquet('data/test2')
parquetFile.registerTempTable("parquetFile")
#Queries are made from the base + command.
#base SELECTS elements of what you are interested from WHERE
base = "SELECT * FROM parquetFile WHERE"
#command is the query you make.
command = ' ip_len >= 1500'
test = sqlContext.sql(base + command)
开发者ID:any027,项目名称:BinaryToParquet,代码行数:31,代码来源:parquetRead.py
示例17: SparkContext
"""
Given an RDD of dictionaries and a column to decile,
add decile of column to each row's dictionary by
pre-computing decile thresholds to avoid out of memory errors
when sorting an RDD with too many columns.
"""
from pyspark import SparkContext, SQLContext#, HiveContext
from datetime import datetime
sc = SparkContext()
sqlContext = SQLContext(sc)
# hiveContext = HiveContext(sc)
sc.setLogLevel("FATAL")
#
# setup
#
import numpy as np
# create random data
n = 52
prices = [float(list(5 + abs(np.random.randn(1)) * 100)[0])
for i in range(n)]
dates = [datetime(year=np.random.randint(2000, 2016),
month=np.random.randint(1, 12),
day=np.random.randint(1, 28)).date() for i in range(n)]
groups = [np.random.randint(1, 100) for i in range(n)]
data = [{"price": price, "date": _date, "group": group}
for price, _date, group in zip(prices, dates, groups)]
df = sqlContext.createDataFrame(data)
开发者ID:nightengalen,项目名称:spark-garbage,代码行数:31,代码来源:deciles.py
示例18: SQLContext
# Databricks notebook source
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.functions import *
target_query="SELECT * from databse_name.byod_dbfs_table"
sparkContext = SparkContext.getOrCreate()
sqlContext = SQLContext(sparkContext)
dataframe = sqlContext.sql(target_query)
dataframe.repartition(1).write.format('com.databricks.spark.csv').options(delimiter=",").save("s3n://amgen-edl-acux-aaaa123-bkt/BYOD/", header="true", mode="overwrite")
# COMMAND ----------
print "test2"
# COMMAND ----------
print "test3"
开发者ID:batCoder95,项目名称:IntelligentDataMining,代码行数:17,代码来源:BYOD_POC.py
示例19: print
# Path for pyspark and py4j
sys.path.append("/Users/dustinchen/Documents/APP/spark-1.6.1-bin-hadoop2.6/python")
sys.path.append("/Users/dustinchen/Documents/APP/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip")
try:
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.functions import regexp_extract
from pyspark.sql import Row
except ImportError as e:
print ("Can not import Spark Modules", e)
if __name__ == "__main__":
conf = SparkConf().setAppName("GISAPP").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
nyc_shapefile = shapefile.Reader("/Users/dustinchen/Documents/APP/Resources/NY_counties_clip/NY_counties_clip.shp")
"""
0 ('deletionflag', 'c', 1, 0)
1 ['objectid', 'n', 9, 0]
2 ['statefp', 'c', 2, 0]
3 ['countyfp', 'c', 3, 0]
4 ['countyns', 'c', 8, 0]
5 ['geoid', 'c', 5, 0]
6 ['name', 'c', 100, 0]
['namelsad', 'c', 100, 0]
['lsad', 'c', 2, 0]
['classfp', 'c', 2, 0]
['mtfcc', 'c', 5, 0]
['csafp', 'c', 3, 0]
['cbsafp', 'c', 5, 0]
开发者ID:DustinChen0,项目名称:GISProcessOnSpark_su2,代码行数:31,代码来源:APP.py
示例20: SparkContext
'''
day = "20151212"
master = "local[*]"
spark_home = '/opt/cloud/spark'
os.environ['SPARK_HOME'] = spark_home
# logFile = 'hdfs://master:8020/impala/parquet/back/back-portal-loginflowlog/dat=' + day
logFile = "/input/loginfowlog/02*"
conf = (SparkConf()
.setMaster(master)
.setAppName("loginflowlog2mysql")
# .set("spark.kryoserializer.buffer.mb", "256")
.set("spark.sql.parquet.binaryAsString", "true"))
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sqlContext.registerFunction("to_datestr", lambda x: longTime2str(x), StringType())
df = sqlContext.read.parquet(logFile)
rdd = df.select('logintype', 'logtype', 'hosid', 'suppid', 'logtime', 'usermac')
fields = [
StructField('logintype', StringType(), True),
StructField('logtype', StringType(), True),
StructField('hosid', StringType(), True),
StructField('suppid', StringType(), True),
StructField('logtime', LongType(), True),
StructField('usermac', StringType(), True)
]
开发者ID:wangcunxin,项目名称:spark_py,代码行数:31,代码来源:loginflowlog2mysql_update.py
注:本文中的pyspark.SQLContext类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论