本文整理汇总了Python中pyspark.sql.functions.min函数的典型用法代码示例。如果您正苦于以下问题:Python min函数的具体用法?Python min怎么用?Python min使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了min函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_mixed_sql_and_udf
def test_mixed_sql_and_udf(self):
df = self.data
w = self.unbounded_window
ow = self.ordered_window
max_udf = self.pandas_agg_max_udf
min_udf = self.pandas_agg_min_udf
result1 = df.withColumn('v_diff', max_udf(df['v']).over(w) - min_udf(df['v']).over(w))
expected1 = df.withColumn('v_diff', max(df['v']).over(w) - min(df['v']).over(w))
# Test mixing sql window function and window udf in the same expression
result2 = df.withColumn('v_diff', max_udf(df['v']).over(w) - min(df['v']).over(w))
expected2 = expected1
# Test chaining sql aggregate function and udf
result3 = df.withColumn('max_v', max_udf(df['v']).over(w)) \
.withColumn('min_v', min(df['v']).over(w)) \
.withColumn('v_diff', col('max_v') - col('min_v')) \
.drop('max_v', 'min_v')
expected3 = expected1
# Test mixing sql window function and udf
result4 = df.withColumn('max_v', max_udf(df['v']).over(w)) \
.withColumn('rank', rank().over(ow))
expected4 = df.withColumn('max_v', max(df['v']).over(w)) \
.withColumn('rank', rank().over(ow))
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
self.assertPandasEqual(expected3.toPandas(), result3.toPandas())
self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
开发者ID:git-prodigy,项目名称:spark,代码行数:31,代码来源:test_pandas_udf_window.py
示例2: gen_report_table
def gen_report_table(hc,curUnixDay):
rows_indoor=sc.textFile("/data/indoor/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),seconds=int(p[4]),utoday=int(p[5]),ufirstday=int(p[6])))
HiveContext.createDataFrame(hc,rows_indoor).registerTempTable("df_indoor")
#ClientMac|etime|ltime|seconds|utoday|ENTITYID|UFIRSTDAY
sql="select entityid,clientmac,utoday,UFIRSTDAY,seconds,"
sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 2505600 preceding) as day_30," # 2505600 is 29 days
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 518400 preceding) as day_7," #518400 is 6 days
sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY range 1 preceding) as pre_mon "
sql=sql+"from df_indoor order by entityid,clientmac,utoday"
df_id_stat=hc.sql(sql)
df_id_mm=df_id_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
#df_id_mm df_min_max ,to caculate firtarrival and last arrival
df_id_stat_distinct=df_id_stat.drop("seconds").drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
#distinct df is for lag function to work
df_id_prepremon=df_id_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
cond_id = [df_id_mm.clientmac == df_id_prepremon.clientmac, df_id_mm.entityid == df_id_prepremon.entityid, df_id_mm.UFIRSTDAY==df_id_prepremon.UFIRSTDAY]
df_indoor_fin_tmp=df_id_mm.join(df_id_prepremon, cond_id, 'outer').select(df_id_mm.entityid,df_id_mm.clientmac,df_id_mm.utoday,df_id_mm.UFIRSTDAY,df_id_mm.seconds,df_id_mm.day_30,df_id_mm.day_7,df_id_mm.min,df_id_mm.max,df_id_mm.total_cnt,df_id_prepremon.prepre_mon)
df_indoor_fin_tmp=df_indoor_fin_tmp.selectExpr("entityid as entityid","clientmac as clientmac","utoday as utoday","UFIRSTDAY as ufirstday","seconds as secondsbyday","day_30 as indoors30","day_7 as indoors7","min as FirstIndoor","max as LastIndoor","total_cnt as indoors","prepre_mon as indoorsPrevMonth")
#newly added part for indoors7 and indoors30 based on current date
df_indoor_fin_tmp1= df_indoor_fin_tmp.withColumn("r_day_7", func.when((curUnixDay- df_indoor_fin_tmp.utoday)/86400<7 , 1).otherwise(0))
df_indoor_fin_tmp2=df_indoor_fin_tmp1.withColumn("r_day_30", func.when((curUnixDay- df_indoor_fin_tmp1.utoday)/86400<30 , 1).otherwise(0))
df_indoor_fin_tmp3=df_indoor_fin_tmp2.withColumn("r_indoors7",func.sum("r_day_7").over(Window.partitionBy("entityid","clientmac")))
df_indoor_fin_tmp4=df_indoor_fin_tmp3.withColumn("r_indoors30",func.sum("r_day_30").over(Window.partitionBy("entityid","clientmac")))
df_indoor_fin=df_indoor_fin_tmp4.drop("r_day_7").drop("r_day_30")
hc.sql("drop table if exists df_indoor_fin")
df_indoor_fin.write.saveAsTable("df_indoor_fin")
rows_flow=sc.textFile("/data/flow/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),utoday=int(p[4]),ufirstday=int(p[5])))
HiveContext.createDataFrame(hc,rows_flow).registerTempTable("df_flow")
# ClientMac|ENTITYID|UFIRSTDAY|etime|ltime|utoday
sql="select entityid,clientmac,utoday,UFIRSTDAY,"
sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 2505600 preceding) as day_30," # 2505600 is 29 days
sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range 518400 preceding) as day_7," #518400 is 6 days
sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY range 1 preceding) as pre_mon "
sql=sql+"from df_flow order by entityid,clientmac,utoday"
df_fl_stat=hc.sql(sql)
df_fl_mm=df_fl_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
#df_fl_mm df_min_max ,to caculate firtarrival and last arrival
df_fl_stat_distinct=df_fl_stat.drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
#distinct df is for lag function to work
df_fl_prepremon=df_fl_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
cond_fl = [df_fl_mm.clientmac == df_fl_prepremon.clientmac, df_fl_mm.entityid == df_fl_prepremon.entityid, df_fl_mm.UFIRSTDAY==df_fl_prepremon.UFIRSTDAY]
df_flow_fin=df_fl_mm.join(df_fl_prepremon, cond_fl, 'outer').select(df_fl_mm.entityid,df_fl_mm.clientmac,df_fl_mm.utoday,df_fl_mm.UFIRSTDAY,df_fl_mm.day_30,df_fl_mm.day_7,df_fl_mm.min,df_fl_mm.max,df_fl_mm.total_cnt,df_fl_prepremon.prepre_mon)
df_flow_fin=df_flow_fin.selectExpr("entityid as entityid","clientmac as clientmac","utoday as utoday","UFIRSTDAY as ufirstday","day_30 as visits30","day_7 as visits7","min as FirstVisit","max as LastVisit","total_cnt as visits","prepre_mon as visitsPrevMonth")
hc.sql("drop table if exists df_flow_fin")
df_flow_fin.write.saveAsTable("df_flow_fin")
开发者ID:dalinqin,项目名称:src,代码行数:52,代码来源:main_report.py
示例3: getValueFieldValueLists
def getValueFieldValueLists(self, handlerId, keyFields, valueFields):
df = self.entity.groupBy(keyFields)
agg = self.options.get("aggregation",self.getDefaultAggregation(handlerId))
maxRows = int(self.options.get("rowCount","100"))
numRows = min(maxRows,df.count())
valueLists = []
for valueField in valueFields:
valueDf = None
if agg == "SUM":
valueDf = df.agg(F.sum(valueField).alias("agg"))
elif agg == "AVG":
valueDf = df.agg(F.avg(valueField).alias("agg"))
elif agg == "MIN":
valueDf = df.agg(F.min(valueField).alias("agg"))
elif agg == "MAX":
valueDf = df.agg(F.max(valueField).alias("agg"))
else:
valueDf = df.agg(F.count(valueField).alias("agg"))
for keyField in keyFields:
valueDf = valueDf.sort(F.col(keyField).asc())
valueDf = valueDf.dropna()
rows = valueDf.select("agg").take(numRows)
valueList = []
for row in rows:
valueList.append(row["agg"])
valueLists.append(valueList)
return valueLists
开发者ID:ygoverdhan,项目名称:pixiedust,代码行数:27,代码来源:mpld3ChartDisplay.py
示例4: test_window_functions
def test_window_functions(self):
df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
w = Window.partitionBy("value").orderBy("key")
from pyspark.sql import functions as F
sel = df.select(
df.value,
df.key,
F.max("key").over(w.rowsBetween(0, 1)),
F.min("key").over(w.rowsBetween(0, 1)),
F.count("key").over(w.rowsBetween(float("-inf"), float("inf"))),
F.rowNumber().over(w),
F.rank().over(w),
F.denseRank().over(w),
F.ntile(2).over(w),
)
rs = sorted(sel.collect())
expected = [
("1", 1, 1, 1, 1, 1, 1, 1, 1),
("2", 1, 1, 1, 3, 1, 1, 1, 1),
("2", 1, 2, 1, 3, 2, 1, 1, 1),
("2", 2, 2, 2, 3, 3, 3, 2, 2),
]
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[: len(r)])
开发者ID:kmarquardsen,项目名称:spark,代码行数:25,代码来源:tests.py
示例5: reduce_to_ohlc
def reduce_to_ohlc(time, rdd):
row_rdd = rdd.map(lambda row: row.split(',')) \
.filter(lambda row: len(row) == 3) \
.map(lambda row: Row(
symbol=row[0],
tx_time=datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S.%f'),
price=float(row[1])
))
sql_context = get_sql_context_instance(rdd.context)
data = sql_context.createDataFrame(row_rdd)
data.cache()
data.write.format('org.apache.spark.sql.cassandra') \
.options(table='transactions2', keyspace='stock', cluster='Test Cluster') \
.mode('append') \
.save()
ohlc = data.select('symbol', truncate_min(data.tx_time).alias('batch_time'), 'price', 'tx_time') \
.orderBy('tx_time') \
.groupBy('symbol', 'batch_time') \
.agg(
F.first(data.price).alias('open'),
F.max(data.price).alias('high'),
F.min(data.price).alias('low'),
F.last(data.price).alias('close'),
F.first(data.tx_time).alias('open_time'),
F.last(data.tx_time).alias('close_time')
)
existing_ohlc = sql_context.read.format('org.apache.spark.sql.cassandra') \
.options(table='ohlc_1_min2', keyspace='stock', cluster='Test Cluster') \
.load() \
.select('symbol', 'batch_time', 'open', 'open_time', 'high', 'low', 'close', 'close_time')
merged_ohlc = ohlc.join(existing_ohlc,
(ohlc.symbol == existing_ohlc.symbol) &
(ohlc.batch_time == existing_ohlc.batch_time),
'left'
)
merged_ohlc = merged_ohlc.select(
ohlc.symbol.alias('symbol'),
ohlc.batch_time.alias('batch_time'),
F.when(existing_ohlc.open_time < ohlc.open_time, existing_ohlc.open).otherwise(ohlc.open).alias('open'),
F.when(existing_ohlc.open_time < ohlc.open_time, existing_ohlc.open_time).otherwise(ohlc.open_time).alias('open_time'),
F.when(existing_ohlc.close_time > ohlc.close_time, existing_ohlc.close).otherwise(ohlc.close).alias('close'),
F.when(existing_ohlc.close_time > ohlc.close_time, existing_ohlc.close_time).otherwise(ohlc.close_time).alias('close_time'),
F.when(existing_ohlc.low < ohlc.low, existing_ohlc.low).otherwise(ohlc.low).alias('low'),
F.when(existing_ohlc.high > ohlc.high, existing_ohlc.high).otherwise(ohlc.high).alias('high')
)
merged_ohlc.write.format('org.apache.spark.sql.cassandra') \
.options(table='ohlc_1_min2', keyspace='stock', cluster='Test Cluster') \
.mode('append') \
.save()
开发者ID:tansinee,项目名称:poc01,代码行数:53,代码来源:stream_ohlc.py
示例6: test_multiple_udfs
def test_multiple_udfs(self):
df = self.data
w = self.unbounded_window
result1 = df.withColumn('mean_v', self.pandas_agg_mean_udf(df['v']).over(w)) \
.withColumn('max_v', self.pandas_agg_max_udf(df['v']).over(w)) \
.withColumn('min_w', self.pandas_agg_min_udf(df['w']).over(w))
expected1 = df.withColumn('mean_v', mean(df['v']).over(w)) \
.withColumn('max_v', max(df['v']).over(w)) \
.withColumn('min_w', min(df['w']).over(w))
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
开发者ID:git-prodigy,项目名称:spark,代码行数:13,代码来源:test_pandas_udf_window.py
示例7: test_timestamp_splitter
def test_timestamp_splitter(test_specs, spark_dataset):
dfs_rating = spark_dataset.withColumn(DEFAULT_TIMESTAMP_COL, col(DEFAULT_TIMESTAMP_COL).cast("float"))
splits = spark_timestamp_split(
dfs_rating, ratio=test_specs["ratio"], col_timestamp=DEFAULT_TIMESTAMP_COL
)
assert splits[0].count() / test_specs["number_of_rows"] == pytest.approx(
test_specs["ratio"], test_specs["tolerance"]
)
assert splits[1].count() / test_specs["number_of_rows"] == pytest.approx(
1 - test_specs["ratio"], test_specs["tolerance"]
)
max_split0 = splits[0].agg(F.max(DEFAULT_TIMESTAMP_COL)).first()[0]
min_split1 = splits[1].agg(F.min(DEFAULT_TIMESTAMP_COL)).first()[0]
assert(max_split0 <= min_split1)
# Test multi split
splits = spark_timestamp_split(dfs_rating, ratio=test_specs["ratios"])
assert splits[0].count() / test_specs["number_of_rows"] == pytest.approx(
test_specs["ratios"][0], test_specs["tolerance"]
)
assert splits[1].count() / test_specs["number_of_rows"] == pytest.approx(
test_specs["ratios"][1], test_specs["tolerance"]
)
assert splits[2].count() / test_specs["number_of_rows"] == pytest.approx(
test_specs["ratios"][2], test_specs["tolerance"]
)
max_split0 = splits[0].agg(F.max(DEFAULT_TIMESTAMP_COL)).first()[0]
min_split1 = splits[1].agg(F.min(DEFAULT_TIMESTAMP_COL)).first()[0]
assert(max_split0 <= min_split1)
max_split1 = splits[1].agg(F.max(DEFAULT_TIMESTAMP_COL)).first()[0]
min_split2 = splits[2].agg(F.min(DEFAULT_TIMESTAMP_COL)).first()[0]
assert(max_split1 <= min_split2)
开发者ID:David-Li-L,项目名称:recommenders,代码行数:38,代码来源:test_spark_splitter.py
示例8: handleUIOptions
def handleUIOptions(self, displayColName):
agg = self.options.get("aggregation")
valFields = self.options.get("valueFields")
if agg == 'COUNT':
return self.entity.groupBy(displayColName).agg(F.count(displayColName).alias("agg")).toPandas()
elif agg == 'SUM':
return self.entity.groupBy(displayColName).agg(F.sum(valFields).alias("agg")).toPandas()
elif agg == 'AVG':
return self.entity.groupBy(displayColName).agg(F.avg(valFields).alias("agg")).toPandas()
elif agg == 'MIN':
return self.entity.groupBy(displayColName).agg(F.min(valFields).alias("agg")).toPandas()
elif agg == 'MAX':
return self.entity.groupBy(displayColName).agg(F.max(valFields).alias("agg")).toPandas()
elif agg == 'MEAN':
return self.entity.groupBy(displayColName).agg(F.mean(valFields).alias("agg")).toPandas()
else:
return self.entity.groupBy(displayColName).agg(F.count(displayColName).alias("agg")).toPandas()
开发者ID:ygoverdhan,项目名称:pixiedust,代码行数:18,代码来源:pieChartDisplay.py
示例9: do_something_only_once
def do_something_only_once():
# the command I use to run this script:
#~/spark-1.6.1/bin/spark-submit --packages=com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.4.0 server.py
global topdis, meta, dic, towo, cluto, doctopdat, maxdate, mindate, lda
## Loading of data
sc = SparkContext(appName='Simple App') #"local"
sqlContext = SQLContext(sc)
# Load metadata avro
reader = sqlContext.read.format('com.databricks.spark.avro')
meta = reader.load('data/spark_metadata.avro')
# # Loading topic distributions
topdisFile = 'data/spark_output.tuples'
csvLoader = sqlContext.read.format('com.databricks.spark.csv')
topdis = csvLoader.options(delimiter=',',header='false', inferschema='true').load(topdisFile)
strip_first_col_int = udf(lambda row: int(row[1:]), IntegerType())
topdis = topdis.withColumn('C0',strip_first_col_int(topdis['C0']))
strip_first_col_float = udf(lambda row: float(row[1:]), FloatType())
topdis = topdis.withColumn('C1',strip_first_col_float(topdis['C1']))
strip_last_col = udf(lambda row: float(row[:-2]), FloatType())
topdis = topdis.withColumn('C20',strip_last_col(topdis['C20']))
# # Load dictionary CSV
dicFile = 'data/spark_dic.csv'
csvLoader = sqlContext.read.format('com.databricks.spark.csv')
dic = csvLoader.options(delimiter='\t', header='false', inferschema='true').load(dicFile)
dic = dic.select(dic['C0'].alias('id'), dic['C1'].alias('word'), dic['C2'].alias('count'))
ldaFile = 'data/spark_lda.csv'
csvLoader = sqlContext.read.format('com.databricks.spark.csv')
lda = csvLoader.options(delimiter='\t', header='false', inferschema='true').load(ldaFile)
lda = lda.select(rowNumber().alias('id'), lda.columns).join(dic, dic.id == lda.id, 'inner').cache()
# dic = dic.select(dic['C0'].alias('id'), dic['C1'].alias('word'), dic['C2'].alias('count'))
# # # Load clustertopics CSV
# clutoFile = 'enron_small_clustertopics.csv'
# csvLoader = sqlContext.read.format('com.databricks.spark.csv')
# cluto = csvLoader.options(delimiter=',', header='false', inferschema='true').load(clutoFile)
# # # Load topicswords CSV
# towoFile = 'enron_small_lda_transposed.csv'
# csvLoader = sqlContext.read.format('com.databricks.spark.csv')
# towo = csvLoader.options(delimiter=',', header='false', inferschema='true').load(towoFile)
# # Merge topdis which has document id and with metadata, based on document id
metasmall = meta.select('id',unix_timestamp(meta['date'],"yyyy-MM-dd'T'HH:mm:ssX").alias("timestamp"))
doctopdat = topdis.join(metasmall, metasmall.id == topdis.C0,'inner').cache()
maxdate = doctopdat.select(max('timestamp').alias('maxtimestamp')).collect()[0]['maxtimestamp']
mindate = doctopdat.select(min('timestamp').alias('mintimestamp')).collect()[0]['mintimestamp']
开发者ID:nlesc-sherlock,项目名称:corpora-vis,代码行数:43,代码来源:server.py
示例10: _if_later
def _if_later(data1, data2):
"""Helper function to test if records in data1 are earlier than that in data2.
Returns:
bool: True or False indicating if data1 is earlier than data2.
"""
x = (data1.select(DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL)
.groupBy(DEFAULT_USER_COL)
.agg(F.max(DEFAULT_TIMESTAMP_COL).cast('long').alias('max'))
.collect())
max_times = {row[DEFAULT_USER_COL]: row['max'] for row in x}
y = (data2.select(DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL)
.groupBy(DEFAULT_USER_COL)
.agg(F.min(DEFAULT_TIMESTAMP_COL).cast('long').alias('min'))
.collect())
min_times = {row[DEFAULT_USER_COL]: row['min'] for row in y}
result = True
for user, max_time in max_times.items():
result = result and min_times[user] >= max_time
return result
开发者ID:David-Li-L,项目名称:recommenders,代码行数:22,代码来源:test_spark_splitter.py
示例11: test_bounded_simple
def test_bounded_simple(self):
from pyspark.sql.functions import mean, max, min, count
df = self.data
w1 = self.sliding_row_window
w2 = self.shrinking_range_window
plus_one = self.python_plus_one
count_udf = self.pandas_agg_count_udf
mean_udf = self.pandas_agg_mean_udf
max_udf = self.pandas_agg_max_udf
min_udf = self.pandas_agg_min_udf
result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1)) \
.withColumn('count_v', count_udf(df['v']).over(w2)) \
.withColumn('max_v', max_udf(df['v']).over(w2)) \
.withColumn('min_v', min_udf(df['v']).over(w1))
expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1)) \
.withColumn('count_v', count(df['v']).over(w2)) \
.withColumn('max_v', max(df['v']).over(w2)) \
.withColumn('min_v', min(df['v']).over(w1))
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
开发者ID:Brett-A,项目名称:spark,代码行数:24,代码来源:test_pandas_udf_window.py
示例12:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
if __name__ == "__main__":
spark = SparkSession.builder.master("local").appName("pyspark homework").getOrCreate()
file_path = "hdfs:///dataset/bank-data.csv"
df = spark.read.csv(path=file_path, header=True, inferSchema=True)
df.groupBy("sex").agg(F.min("income"), F.max("income"), F.mean("income")).show()
df.groupBy("region").agg({"income": "mean"}).show()
开发者ID:ShuochengWang,项目名称:Homework,代码行数:11,代码来源:main.py
示例13: StructField
#load stores ... I'm lazy and use string ids
stores_rdd=sc.textFile(base_path+"stores.txt").map(lambda x:x.split('\t')) # id | name
stores_fields=[
StructField('id',StringType(),False), # name,type,nullable
StructField('name',StringType(),False),
]
stores=sqlCtx.createDataFrame(stores_rdd,StructType(stores_fields))
products_rdd=sc.textFile(base_path+"products.txt").map(lambda x:x.split('\t')) # id | name | category
products_fields=[
StructField('id',StringType(),False), # name,type,nullable
StructField('name',StringType(),False),
StructField('category',StringType(),True),
]
products=sqlCtx.createDataFrame(products_rdd,StructType(products_fields))
sqlCtx.registerDataFrameAsTable(sales,"sales")
sqlCtx.registerDataFrameAsTable(stores,"stores")
sqlCtx.registerDataFrameAsTable(products,"products")
# can do SQL, including joins and GroupBy !!
sqlCtx.sql("SELECT * FROM sales sa join stores st on sa.store=st.id").show()
sqlCtx.sql("SELECT * FROM sales s join products p on s.product=p.id").show()
# .explain
# .agg
from pyspark.sql import functions as funcs
sales.groupBy('day').agg(funcs.min('store').alias('MinStore'),funcs.max('quantity').alias('MaxQty')).show()
开发者ID:Spicysheep,项目名称:spark-pycon15,代码行数:29,代码来源:sales_datatable.py
示例14: MAX
trunc_df = yelp_df.filter("review_count>=10 and open = 'True'").groupBy("state").count()
trunc_df.orderBy(desc("count")).collect()
###################
/usr/lib/hue/apps/search/examples/collections/solr_configs_log_analytics_demo/index_data.csv
logs_df = sqlCtx.load(source="com.databricks.spark.csv",header = 'true',inferSchema = 'true',path ='index_data_http.csv')
sc._jsc.hadoopConfiguration().set('textinputformat.record.delimiter','\r\n')
sc._jsc.hadoopConfiguration().set('textinputformat.record.delimiter','\r\n')
from pyspark.sql.functions import asc, desc
logs_df.groupBy("code").count().orderBy(desc("count")).show()
logs_df.groupBy("code").avg("bytes").show()
import pyspark.sql.functions as F
logs_df.groupBy("code").agg(logs_df.code,F.avg(logs_df.bytes),F.min(logs_df.bytes),F.max(logs_df.bytes)).show()
###########################################
yelp_df = sqlCtx.load(source='com.databricks.spark.csv',header = 'true',inferSchema = 'true',path ='index_data.csv')
yelp_df.registerTempTable("yelp")
filtered_yelp = sqlCtx.sql("SELECT * FROM yelp WHERE useful >= 1")
filtered_yelp.count()
sqlCtx.sql("SELECT MAX(useful) AS max_useful FROM yelp").collect()
useful_perc_data.join(yelp_df,yelp_df.id == useful_perc_data.uid,"inner").select(useful_perc_data.uid, "useful_perc", "review_count")
useful_perc_data.registerTempTable("useful_perc_data")
sqlCtx.sql(
"""SELECT useful_perc_data.uid, useful_perc,
review_count
FROM useful_perc_data
开发者ID:samidh15,项目名称:spark_misc,代码行数:30,代码来源:test_pyspark.py
示例15: doRenderMpld3
def doRenderMpld3(self, handlerId, figure, axes, keyFields, keyFieldValues, keyFieldLabels, valueFields, valueFieldValues):
allNumericCols = self.getNumericalFieldNames()
if len(allNumericCols) == 0:
self._addHTML("Unable to find a numerical column in the dataframe")
return
keyFields = self.options.get("keyFields")
valueField = self.options.get("valueFields")
if(keyFields==None and valueField==None):
keyFields=self.getFirstStringColInfo()
valueField=self.getFirstNumericalColInfo()
else:
keyFields = keyFields.split(',')
valueField = valueField.split(',')
if(len(valueField) > 1):
self._addHTML("You can enter only have one value field for Bar Charts (2-D)"+str(len(valueField)))
return
keyFields = keyFields[0]
valueField=valueField[0]
#if(len(valueFields>)):
#init
fig=figure
ax=axes
#fig, ax = plt.subplots()
#fig = plt.figure()
params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2, plSize[1]*2) )
agg=self.options.get("aggregation")
groupByCol=self.options.get("groupByCol")
if (agg=="None" or agg==None):
colLabel = keyFields
y = self.entity.select(valueField).toPandas()[valueField].dropna().tolist()
x_intv = np.arange(len(y))
labels = self.entity.select(keyFields).toPandas()[keyFields].dropna().tolist()
plt.xticks(x_intv,labels)
plt.xlabel(keyFields, fontsize=18)
plt.ylabel(valueField, fontsize=18)
elif(agg=='AVG'):
y1=self.entity.groupBy(keyFields).agg(F.avg(valueField).alias("avg")).toPandas().sort_values(by=keyFields)
y=y1["avg"].dropna().tolist()
x_intv = np.arange(len(y))
labels=y1[keyFields].dropna().tolist()
plt.xticks(x_intv,labels)
plt.xlabel(keyFields, fontsize=18)
plt.ylabel("Average "+valueField, fontsize=18)
elif(agg=='SUM'):
y1=self.entity.groupBy(keyFields).agg(F.sum(valueField).alias("sum")).toPandas().sort_values(by=keyFields)
y=y1["sum"].dropna().tolist()
x_intv = np.arange(len(y))
labels=y1[keyFields].dropna().tolist()
plt.xticks(x_intv,labels)
plt.xlabel(keyFields, fontsize=18)
plt.ylabel("sum "+valueField, fontsize=18)
elif(agg=='MAX'):
y1=self.entity.groupBy(keyFields).agg(F.max(valueField).alias("max")).toPandas().sort_values(by=keyFields)
y=y1["max"].dropna().tolist()
x_intv = np.arange(len(y))
labels=y1[keyFields].dropna().tolist()
plt.xticks(x_intv,labels)
plt.xlabel(keyFields, fontsize=18)
plt.ylabel("max "+valueField, fontsize=18)
elif(agg=='MIN'):
y1=self.entity.groupBy(keyFields).agg(F.min(valueField).alias("min")).toPandas().sort_values(by=keyFields)
y=y1["min"].dropna().tolist()
x_intv = np.arange(len(y))
labels=y1[keyFields].dropna().tolist()
plt.xticks(x_intv,labels)
plt.xlabel(keyFields, fontsize=18)
plt.ylabel("min "+valueField, fontsize=18)
elif(agg=='COUNT'):
y1=self.entity.groupBy(keyFields).agg(F.count(valueField).alias("count")).toPandas().sort_values(by=keyFields)
y=y1["count"].dropna().tolist()
x_intv = np.arange(len(y))
labels=y1[keyFields].dropna().tolist()
plt.xticks(x_intv,labels)
plt.xlabel(keyFields, fontsize=18)
plt.ylabel("count "+valueField, fontsize=18)
mpld3.enable_notebook()
plt.bar(x_intv,y,color="blue",alpha=0.5)
ax_fmt = BarChart(labels)
mpld3.plugins.connect(fig, ax_fmt)
开发者ID:ygoverdhan,项目名称:pixiedust,代码行数:96,代码来源:barChartDisplay.py
示例16: min
Consulta d)
Los usuarios con la fecha de creación más antigua
y la más reciente, respectivamente
'''
'''
Necesario para utilizar la función to_date
'''
from pyspark.sql.functions import *
df.select("*")\
.where((to_date(df.CreationDate) ==
df.select(
min(
to_date("CreationDate"))\
.alias("min"))\
.collect()[0].min) | (
to_date(df.CreationDate) ==
df.select(
max(to_date("CreationDate"))\
.alias("max"))\
.collect()[0].max))\
.orderBy(to_date("CreationDate"))\
.show()
''' Comparando fechas hasta los milisegundos'''
'''
Usuario más antiguo
'''
开发者ID:jiep,项目名称:ABD,代码行数:31,代码来源:queries.py
示例17: SparkContext
sc = SparkContext(conf = conf)
sqlcontext = SQLContext(sc)
# 1. Create a DataFrame with one int column and 10 rows.
df = sqlcontext.range(0, 10)
df.show()
# Generate two other columns using uniform distribution and normal distribution.
df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df.show()
# 2. Summary and Descriptive Statistics
df = sqlcontext.range(0, 10).withColumn('uniform', rand(seed=10)).withColumn('normal', randn(seed=27))
df.describe('uniform', 'normal').show()
df.select([mean('uniform'), min('uniform'), max('uniform')]).show()
# 3. Sample covariance and correlation
# Covariance is a measure of how two variables change with respect to each other.
# A positive number would mean that there is a tendency that as one variable increases,
# the other increases as well.
# A negative number would mean that as one variable increases,
# the other variable has a tendency to decrease.
df = sqlcontext.range(0, 10).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))
df.stat.cov('rand1', 'rand2')
df.stat.cov('id', 'id')
# Correlation is a normalized measure of covariance that is easier to understand,
# as it provides quantitative measurements of the statistical dependence between two random variables.
df.stat.corr('rand1', 'rand2')
df.stat.corr('id', 'id')
开发者ID:xialei,项目名称:sparkme,代码行数:31,代码来源:sparkDataFrame.py
示例18: last
# COMMAND ----------
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show() # 3364
# COMMAND ----------
from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()
# COMMAND ----------
from pyspark.sql.functions import min, max
df.select(min("Quantity"), max("Quantity")).show()
# COMMAND ----------
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show() # 5176450
# COMMAND ----------
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show() # 29310
# COMMAND ----------
开发者ID:yehonatc,项目名称:Spark-The-Definitive-Guide,代码行数:31,代码来源:Structured_APIs-Chapter_7_Aggregations.py
示例19: rand
# A slightly different way to generate the two random columns
df = sqlContext.range(0, 10).withColumn('uniform', rand(seed=10)).withColumn('normal', randn(seed=27))
#df.describe().show()
display(df.describe())
# COMMAND ----------
#df.describe('uniform', 'normal').show()
display(df.describe('uniform', 'normal'))
# COMMAND ----------
from pyspark.sql.functions import mean, min, max
#df.select([mean('uniform'), min('uniform'), max('uniform')]).show()
display(df.select([mean('uniform'), min('uniform'), max('uniform')]))
# COMMAND ----------
# MAGIC %md ### Sample covariance and correlation
# MAGIC
# MAGIC Covariance is a measure of how two variables change with respect to each other. A positive number would mean that there is a tendency that as one variable increases, the other increases as well. A negative number would mean that as one variable increases, the other variable has a tendency to decrease. The sample covariance of two columns of a DataFrame can be calculated as follows:
# COMMAND ----------
from pyspark.sql.functions import rand
df = sqlContext.range(0, 10).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))
# COMMAND ----------
开发者ID:dennyglee,项目名称:databricks,代码行数:30,代码来源:Statistical+and+Mathematical+Functions+with+DataFrames+in+Spark.py
示例20: int
#set time variables for date filtering
time = datetime.datetime.now()
epochtime = int(time.strftime("%s"))
start_time = epochtime - 86400
compare_time = datetime.datetime.fromtimestamp(start_time)
#create a dataframe from the raw metrics
rawmetrics = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="raw_metrics", keyspace="metrics").load()
#filter metrics to those in last 24 hours
last_day = rawmetrics.where(rawmetrics.metric_time > compare_time)
#aggregates
averages = last_day.groupby('device_id').agg(func.avg('metric_value').alias('metric_avg'))
maximums = last_day.groupby('device_id').agg(func.max('metric_value').alias('metric_max'))
minimums = last_day.groupby('device_id').agg(func.min('metric_value').alias('metric_min'))
#rename id columns for uniqueness
averages_a = averages.withColumnRenamed("device_id", "id")
maximums_a = maximums.withColumnRenamed("device_id", "maxid")
minimums_a = minimums.withColumnRenamed("device_id", "minid")
#join the tables above
temp = averages_a.join(maximums_a, averages_a.id == maximums_a.maxid)
aggs = temp.join(minimums, temp.id == minimums.device_id).select('id','metric_min','metric_max','metric_avg')
#add columns to format for cassandra
addday = aggs.withColumn("metric_day", lit(time))
addname = addday.withColumn("metric_name",lit("KWH"))
inserts = addname.withColumnRenamed("id","device_id")
开发者ID:pbayliss,项目名称:energy_iot,代码行数:30,代码来源:batch.py
注:本文中的pyspark.sql.functions.min函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论