def _if_later(data1, data2):
"""Helper function to test if records in data1 are earlier than that in data2.
Returns:
bool: True or False indicating if data1 is earlier than data2.
"""
x = (data1.select(DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL)
.groupBy(DEFAULT_USER_COL)
.agg(F.max(DEFAULT_TIMESTAMP_COL).cast('long').alias('max'))
.collect())
max_times = {row[DEFAULT_USER_COL]: row['max'] for row in x}
y = (data2.select(DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL)
.groupBy(DEFAULT_USER_COL)
.agg(F.min(DEFAULT_TIMESTAMP_COL).cast('long').alias('min'))
.collect())
min_times = {row[DEFAULT_USER_COL]: row['min'] for row in y}
result = True
for user, max_time in max_times.items():
result = result and min_times[user] >= max_time
return result
'''
Necesario para utilizar la función to_date
'''
from pyspark.sql.functions import *
df.select("*")\
.where((to_date(df.CreationDate) ==
df.select(
min(
to_date("CreationDate"))\
.alias("min"))\
.collect()[0].min) | (
to_date(df.CreationDate) ==
df.select(
max(to_date("CreationDate"))\
.alias("max"))\
.collect()[0].max))\
.orderBy(to_date("CreationDate"))\
.show()
''' Comparando fechas hasta los milisegundos'''
'''
Usuario más antiguo
'''
df.sort("CreationDate", ascending=False)\
.limit(1)\
.show()
'''
Usuario más reciente
'''
开发者ID:jiep,项目名称:ABD,代码行数:31,代码来源:queries.py
示例15: SparkContext
sc = SparkContext(conf = conf)
sqlcontext = SQLContext(sc)
# 1. Create a DataFrame with one int column and 10 rows.
df = sqlcontext.range(0, 10)
df.show()
# Generate two other columns using uniform distribution and normal distribution.
df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df.show()
# 2. Summary and Descriptive Statistics
df = sqlcontext.range(0, 10).withColumn('uniform', rand(seed=10)).withColumn('normal', randn(seed=27))
df.describe('uniform', 'normal').show()
df.select([mean('uniform'), min('uniform'), max('uniform')]).show()
# 3. Sample covariance and correlation
# Covariance is a measure of how two variables change with respect to each other.
# A positive number would mean that there is a tendency that as one variable increases,
# the other increases as well.
# A negative number would mean that as one variable increases,
# the other variable has a tendency to decrease.
df = sqlcontext.range(0, 10).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))
df.stat.cov('rand1', 'rand2')
df.stat.cov('id', 'id')
# Correlation is a normalized measure of covariance that is easier to understand,
# as it provides quantitative measurements of the statistical dependence between two random variables.
df.stat.corr('rand1', 'rand2')
df.stat.corr('id', 'id')
def gen_report_table(hc,curUnixDay):
rows_indoor=sc.textFile("/data/indoor/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),seconds=int(p[4]),utoday=int(p[5]),ufirstday=int(p[6])))
HiveContext.createDataFrame(hc,rows_indoor).registerTempTable("df_indoor")
#ClientMac|etime|ltime|seconds|utoday|ENTITYID|UFIRSTDAY
sql="select entityid,clientmac,utoday,UFIRSTDAY,seconds,"
sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 2505600 preceding) as day_30," # 2505600 is 29 days
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 518400 preceding) as day_7," #518400 is 6 days
sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY range 1 preceding) as pre_mon "
sql=sql+"from df_indoor order by entityid,clientmac,utoday"
df_id_stat=hc.sql(sql)
df_id_mm=df_id_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
#df_id_mm df_min_max ,to caculate firtarrival and last arrival
df_id_stat_distinct=df_id_stat.drop("seconds").drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
#distinct df is for lag function to work
df_id_prepremon=df_id_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
cond_id = [df_id_mm.clientmac == df_id_prepremon.clientmac, df_id_mm.entityid == df_id_prepremon.entityid, df_id_mm.UFIRSTDAY==df_id_prepremon.UFIRSTDAY]
df_indoor_fin_tmp=df_id_mm.join(df_id_prepremon, cond_id, 'outer').select(df_id_mm.entityid,df_id_mm.clientmac,df_id_mm.utoday,df_id_mm.UFIRSTDAY,df_id_mm.seconds,df_id_mm.day_30,df_id_mm.day_7,df_id_mm.min,df_id_mm.max,df_id_mm.total_cnt,df_id_prepremon.prepre_mon)
df_indoor_fin_tmp=df_indoor_fin_tmp.selectExpr("entityid as entityid","clientmac as clientmac","utoday as utoday","UFIRSTDAY as ufirstday","seconds as secondsbyday","day_30 as indoors30","day_7 as indoors7","min as FirstIndoor","max as LastIndoor","total_cnt as indoors","prepre_mon as indoorsPrevMonth")
#newly added part for indoors7 and indoors30 based on current date
df_indoor_fin_tmp1= df_indoor_fin_tmp.withColumn("r_day_7", func.when((curUnixDay- df_indoor_fin_tmp.utoday)/86400<7 , 1).otherwise(0))
df_indoor_fin_tmp2=df_indoor_fin_tmp1.withColumn("r_day_30", func.when((curUnixDay- df_indoor_fin_tmp1.utoday)/86400<30 , 1).otherwise(0))
df_indoor_fin_tmp3=df_indoor_fin_tmp2.withColumn("r_indoors7",func.sum("r_day_7").over(Window.partitionBy("entityid","clientmac")))
df_indoor_fin_tmp4=df_indoor_fin_tmp3.withColumn("r_indoors30",func.sum("r_day_30").over(Window.partitionBy("entityid","clientmac")))
df_indoor_fin=df_indoor_fin_tmp4.drop("r_day_7").drop("r_day_30")
hc.sql("drop table if exists df_indoor_fin")
df_indoor_fin.write.saveAsTable("df_indoor_fin")
rows_flow=sc.textFile("/data/flow/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),utoday=int(p[4]),ufirstday=int(p[5])))
HiveContext.createDataFrame(hc,rows_flow).registerTempTable("df_flow")
# ClientMac|ENTITYID|UFIRSTDAY|etime|ltime|utoday
sql="select entityid,clientmac,utoday,UFIRSTDAY,"
sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 2505600 preceding) as day_30," # 2505600 is 29 days
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 518400 preceding) as day_7," #518400 is 6 days
sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY range 1 preceding) as pre_mon "
sql=sql+"from df_flow order by entityid,clientmac,utoday"
df_fl_stat=hc.sql(sql)
df_fl_mm=df_fl_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
#df_fl_mm df_min_max ,to caculate firtarrival and last arrival
df_fl_stat_distinct=df_fl_stat.drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
#distinct df is for lag function to work
df_fl_prepremon=df_fl_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
cond_fl = [df_fl_mm.clientmac == df_fl_prepremon.clientmac, df_fl_mm.entityid == df_fl_prepremon.entityid, df_fl_mm.UFIRSTDAY==df_fl_prepremon.UFIRSTDAY]
df_flow_fin=df_fl_mm.join(df_fl_prepremon, cond_fl, 'outer').select(df_fl_mm.entityid,df_fl_mm.clientmac,df_fl_mm.utoday,df_fl_mm.UFIRSTDAY,df_fl_mm.day_30,df_fl_mm.day_7,df_fl_mm.min,df_fl_mm.max,df_fl_mm.total_cnt,df_fl_prepremon.prepre_mon)
df_flow_fin=df_flow_fin.selectExpr("entityid as entityid","clientmac as clientmac","utoday as utoday","UFIRSTDAY as ufirstday","day_30 as visits30","day_7 as visits7","min as FirstVisit","max as LastVisit","total_cnt as visits","prepre_mon as visitsPrevMonth")
hc.sql("drop table if exists df_flow_fin")
df_flow_fin.write.saveAsTable("df_flow_fin")
# MAGIC %md ### Question: What is the difference between the revenue of a product and the revenue of the best selling product in the same category as this product?
# COMMAND ----------
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
# Window function partioned by Category and ordered by Revenue
windowSpec = \
Window \
.partitionBy(df['category']) \
.orderBy(df['revenue'].desc()) \
.rangeBetween(-sys.maxsize, sys.maxsize)
# Create dataframe based on the productRevenue table
dataFrame = sqlContext.table("productRevenue")
# Calculate the Revenue difference
revenue_difference = \
(func.max(dataFrame['revenue']).over(windowSpec) - dataFrame['revenue'])
# Generate a new dataframe (original dataframe and the revenue difference)
revenue_diff = dataFrame.select(
dataFrame['product'],
dataFrame['category'],
dataFrame['revenue'],
revenue_difference.alias("revenue_difference"))
# Display revenue_diff
display(revenue_diff)
请发表评论