本文整理汇总了Python中pyspark.sql.window.Window类的典型用法代码示例。如果您正苦于以下问题:Python Window类的具体用法?Python Window怎么用?Python Window使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Window类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_window_functions_cumulative_sum
def test_window_functions_cumulative_sum(self):
df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", "value"])
from pyspark.sql import functions as F
# Test cumulative sum
sel = df.select(
df.key,
F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding, 0)))
rs = sorted(sel.collect())
expected = [("one", 1), ("two", 3)]
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[:len(r)])
# Test boundary values less than JVM's Long.MinValue and make sure we don't overflow
sel = df.select(
df.key,
F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding - 1, 0)))
rs = sorted(sel.collect())
expected = [("one", 1), ("two", 3)]
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[:len(r)])
# Test boundary values greater than JVM's Long.MaxValue and make sure we don't overflow
frame_end = Window.unboundedFollowing + 1
sel = df.select(
df.key,
F.sum(df.value).over(Window.rowsBetween(Window.currentRow, frame_end)))
rs = sorted(sel.collect())
expected = [("one", 3), ("two", 2)]
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[:len(r)])
开发者ID:JingchengDu,项目名称:spark,代码行数:31,代码来源:test_context.py
示例2: test_window_functions
def test_window_functions(self):
df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
w = Window.partitionBy("value").orderBy("key")
from pyspark.sql import functions as F
sel = df.select(
df.value,
df.key,
F.max("key").over(w.rowsBetween(0, 1)),
F.min("key").over(w.rowsBetween(0, 1)),
F.count("key").over(w.rowsBetween(float("-inf"), float("inf"))),
F.rowNumber().over(w),
F.rank().over(w),
F.denseRank().over(w),
F.ntile(2).over(w),
)
rs = sorted(sel.collect())
expected = [
("1", 1, 1, 1, 1, 1, 1, 1, 1),
("2", 1, 1, 1, 3, 1, 1, 1, 1),
("2", 1, 2, 1, 3, 2, 1, 1, 1),
("2", 2, 2, 2, 3, 3, 3, 2, 2),
]
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[: len(r)])
开发者ID:kmarquardsen,项目名称:spark,代码行数:25,代码来源:tests.py
示例3: train
def train(self, df, featureCols):
ntiles = []
for col in featureCols:
w = Window.partitionBy().orderBy(col)
aux = df.select(F.ntile(self._n).over(w).alias('ntile'),col)
ntiles.append(list(aux.groupby('ntile').max(col).collect()))
self.ntiles_ = np.array(ntiles)
self.columns_ = map(str,featureCols)
self._is_trained = True
开发者ID:elmi-gemini,项目名称:pyspark_utils,代码行数:11,代码来源:outlier_remover.py
示例4: compute
def compute(day):
# On veut les jours day-30 à day-1
sums = wikipediadata.where(
(wikipediadata.day >= day-30) & (wikipediadata.day <= day-1))
# Sous-ensemble de test
#sums = sums.where((sums.page == 'Cadillac_Brougham') | ((sums.page == 'Roald_Dahl') & (sums.projectcode == 'fr')))
# On somme les heures de la journées
sums = sums.groupby('projectcode', 'page', 'day').sum('views')
# On cache pour plus tard
sums.cache()
# on définit une windows := jour precedent
window_spec = Window.partitionBy(sums.projectcode, sums.page) \
.orderBy(sums.day.asc()).rowsBetween(-1, -1)
# on calcule la différence entre views(d) - views(d-1)
diffs = sums.withColumn('diff', sums.views - F.sum(sums.views) \
.over(window_spec))
# on calcule les coefs à appliquer à chaque jour
coefs = pd.DataFrame({'day': range(day-30, day)})
coefs['coef'] = 1. / (day - coefs.day)
coefs = hc.createDataFrame(coefs)
diffs = diffs.join(coefs, 'day')
# on calcul le score de chaque jour
diffs = diffs.withColumn('sub_score', diffs.diff * diffs.coef)
totals = diffs.groupby('projectcode', 'page').sum('views', 'sub_score')
# on normalise par la racine de la somme des views
totals = totals.withColumn('score',
totals['SUM(sub_score)'] / F.sqrt(totals['SUM(views)'])) \
.orderBy(F.desc('score')) \
.withColumnRenamed('SUM(views)', 'total_views') \
.limit(10)
views = sums.select('projectcode', 'page', 'day', 'views') \
.join(totals.select('projectcode', 'page', 'total_views', 'score'),
(totals.projectcode == sums.projectcode) & (totals.page == sums.page), 'right_outer')
df = totals.select('projectcode', 'page', 'total_views', 'score').toPandas()
df2 = views.toPandas()
df2 = df2.iloc[:, 2:]
df2 = df2.pivot_table(values='views', columns=['day'], index=['projectcode', 'page'], fill_value=0)
df = df.merge(df2, left_on=['projectcode', 'page'], right_index=True)
df.to_csv(filename(day), index=False)
# on vide le cache
hc.clearCache()
开发者ID:cygilbert,项目名称:projetnosql,代码行数:52,代码来源:Req30j.py
示例5: runOtherFunctions
def runOtherFunctions(spark, personDf):
df = spark.createDataFrame([("v1", "v2", "v3")], ["c1", "c2", "c3"]);
# array
df.select(df.c1, df.c2, df.c3, array("c1", "c2", "c3").alias("newCol")).show(truncate=False)
# desc, asc
personDf.show()
personDf.sort(functions.desc("age"), functions.asc("name")).show()
# pyspark 2.1.0 버전은 desc_nulls_first, desc_nulls_last, asc_nulls_first, asc_nulls_last 지원하지 않음
# split, length (pyspark에서 컬럼은 df["col"] 또는 df.col 형태로 사용 가능)
df2 = spark.createDataFrame([("Splits str around pattern",)], ['value'])
df2.select(df2.value, split(df2.value, " "), length(df2.value)).show(truncate=False)
# rownum, rank
f1 = StructField("date", StringType(), True)
f2 = StructField("product", StringType(), True)
f3 = StructField("amount", IntegerType(), True)
schema = StructType([f1, f2, f3])
p1 = ("2017-12-25 12:01:00", "note", 1000)
p2 = ("2017-12-25 12:01:10", "pencil", 3500)
p3 = ("2017-12-25 12:03:20", "pencil", 23000)
p4 = ("2017-12-25 12:05:00", "note", 1500)
p5 = ("2017-12-25 12:05:07", "note", 2000)
p6 = ("2017-12-25 12:06:25", "note", 1000)
p7 = ("2017-12-25 12:08:00", "pencil", 500)
p8 = ("2017-12-25 12:09:45", "note", 30000)
dd = spark.createDataFrame([p1, p2, p3, p4, p5, p6, p7, p8], schema)
w1 = Window.partitionBy("product").orderBy("amount")
w2 = Window.orderBy("amount")
dd.select(dd.product, dd.amount, functions.row_number().over(w1).alias("rownum"),
functions.rank().over(w2).alias("rank")).show()
开发者ID:oopchoi,项目名称:spark,代码行数:36,代码来源:dataframe_sample.py
示例6: unpartitioned_window
def unpartitioned_window(self):
return Window.partitionBy()
开发者ID:git-prodigy,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py
示例7: ordered_window
def ordered_window(self):
return Window.partitionBy('id').orderBy('v')
开发者ID:git-prodigy,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py
示例8: unbounded_window
def unbounded_window(self):
return Window.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
开发者ID:git-prodigy,项目名称:spark,代码行数:3,代码来源:test_pandas_udf_window.py
示例9: cal_mat_window
def cal_mat_window(sc, sqlContext, dfSC, window):
windowSpec = Window.partitionBy("symbol").orderBy("date").rangeBetween(-1 * window+1,1)
mat = func.avg("close").over(windowSpec)
dfSC = dfSC.select(dfSC.symbol, dfSC.date, dfSC.close, mat )
print dfSC.collect()
开发者ID:hongbin0908,项目名称:bintrade,代码行数:5,代码来源:mat_close.py
示例10: sliding_range_window
def sliding_range_window(self):
return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4)
开发者ID:Brett-A,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py
示例11: main
def main():
"Main function"
optmgr = OptionParser()
opts = optmgr.parser.parse_args()
# setup spark/sql context to be used for communication with HDFS
sc = SparkContext(appName="phedex_br")
if not opts.yarn:
sc.setLogLevel("ERROR")
sqlContext = HiveContext(sc)
schema_def = schema()
# read given file(s) into RDD
if opts.fname:
pdf = sqlContext.read.format('com.databricks.spark.csv')\
.options(treatEmptyValuesAsNulls='true', nullValue='null')\
.load(opts.fname, schema = schema_def)
elif opts.basedir:
fromdate, todate = defDates(opts.fromdate, opts.todate)
files = getFileList(opts.basedir, fromdate, todate)
msg = "Between dates %s and %s found %d directories" % (fromdate, todate, len(files))
print msg
if not files:
return
pdf = unionAll([sqlContext.read.format('com.databricks.spark.csv')
.options(treatEmptyValuesAsNulls='true', nullValue='null')\
.load(file_path, schema = schema_def) \
for file_path in files])
else:
raise ValueError("File or directory not specified. Specify fname or basedir parameters.")
# parsing additional data (to given data adding: group name, node kind, acquisition era, data tier, now date)
groupdic, nodedic = getJoinDic()
acquisition_era_reg = r"^/[^/]*/([^/^-]*)-[^/]*/[^/]*$"
data_tier_reg = r"^/[^/]*/[^/^-]*-[^/]*/([^/]*)$"
groupf = udf(lambda x: groupdic[x], StringType())
nodef = udf(lambda x: nodedic[x], StringType())
ndf = pdf.withColumn("br_user_group", groupf(pdf.br_user_group_id)) \
.withColumn("node_kind", nodef(pdf.node_id)) \
.withColumn("now", from_unixtime(pdf.now_sec, "YYYY-MM-dd")) \
.withColumn("acquisition_era", when(regexp_extract(pdf.dataset_name, acquisition_era_reg, 1) == "",\
lit("null")).otherwise(regexp_extract(pdf.dataset_name, acquisition_era_reg, 1))) \
.withColumn("data_tier", when(regexp_extract(pdf.dataset_name, data_tier_reg, 1) == "",\
lit("null")).otherwise(regexp_extract(pdf.dataset_name, data_tier_reg, 1)))
# print dataframe schema
if opts.verbose:
ndf.show()
print("pdf data type", type(ndf))
ndf.printSchema()
# process aggregation parameters
keys = [key.lower().strip() for key in opts.keys.split(',')]
results = [result.lower().strip() for result in opts.results.split(',')]
aggregations = [agg.strip() for agg in opts.aggregations.split(',')]
order = [orde.strip() for orde in opts.order.split(',')] if opts.order else []
asc = [asce.strip() for asce in opts.asc.split(',')] if opts.order else []
filtc, filtv = opts.filt.split(":") if opts.filt else (None,None)
validateAggregationParams(keys, results, aggregations, order, filtc)
if filtc and filtv:
ndf = ndf.filter(getattr(ndf, filtc) == filtv)
# if delta aggregation is used
if DELTA in aggregations:
validateDeltaParam(opts.interval, results)
result = results[0]
#1 for all dates generate interval group dictionary
datedic = generateDateDict(fromdate, todate, opts.interval)
boundic = generateBoundDict(datedic)
max_interval = max(datedic.values())
interval_group = udf(lambda x: datedic[x], IntegerType())
interval_start = udf(lambda x: boundic[x][0], StringType())
interval_end = udf(lambda x: boundic[x][1], StringType())
#2 group data by block, node, interval and last result in the interval
ndf = ndf.select(ndf.block_name, ndf.node_name, ndf.now, getattr(ndf, result))
idf = ndf.withColumn("interval_group", interval_group(ndf.now))
win = Window.partitionBy(idf.block_name, idf.node_name, idf.interval_group).orderBy(idf.now.desc())
idf = idf.withColumn("row_number", rowNumber().over(win))
rdf = idf.where((idf.row_number == 1) & (idf.interval_group != 0))\
.withColumn(result, when(idf.now == interval_end(idf.interval_group), getattr(idf, result)).otherwise(lit(0)))
rdf = rdf.select(rdf.block_name, rdf.node_name, rdf.interval_group, getattr(rdf, result))
rdf.cache()
#3 create intervals that not exist but has minus delta
win = Window.partitionBy(idf.block_name, idf.node_name).orderBy(idf.interval_group)
adf = rdf.withColumn("interval_group_aft", lead(rdf.interval_group, 1, 0).over(win))
hdf = adf.filter(((adf.interval_group + 1) != adf.interval_group_aft) & (adf.interval_group != max_interval))\
.withColumn("interval_group", adf.interval_group + 1)\
.withColumn(result, lit(0))\
.drop(adf.interval_group_aft)
#4 join data frames
#.........这里部分代码省略.........
开发者ID:aurimasrep,项目名称:PhEDEx-replicamonitoring,代码行数:101,代码来源:pbr.py
示例12: shrinking_range_window
def shrinking_range_window(self):
return Window.partitionBy('id').orderBy('v') \
.rangeBetween(-3, Window.unboundedFollowing)
开发者ID:Brett-A,项目名称:spark,代码行数:3,代码来源:test_pandas_udf_window.py
示例13: shrinking_row_window
def shrinking_row_window(self):
return Window.partitionBy('id').orderBy('v').rowsBetween(-2, Window.unboundedFollowing)
开发者ID:Brett-A,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py
示例14: growing_range_window
def growing_range_window(self):
return Window.partitionBy('id').orderBy('v') \
.rangeBetween(Window.unboundedPreceding, 4)
开发者ID:Brett-A,项目名称:spark,代码行数:3,代码来源:test_pandas_udf_window.py
示例15: growing_row_window
def growing_row_window(self):
return Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3)
开发者ID:Brett-A,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py
示例16: with_window_column
def with_window_column(df):
from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank
windowSpec = Window.partitionBy(df['id']).orderBy(df['forecast'])
return df.withColumn("r", percent_rank().over(windowSpec))
开发者ID:mattomatic,项目名称:flint,代码行数:5,代码来源:test_partition_preserve.py
示例17: sliding_row_window
def sliding_row_window(self):
return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)
开发者ID:Brett-A,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py
示例18: collect_numeric_metric
def collect_numeric_metric(metric, df, population):
cdf = df.select(df[metric['src']])
cdf = cdf.dropna(subset=metric['src'])
cdf = cdf.select(cdf[metric['src']].cast('float').alias('bucket'))
total_count = cdf.count()
num_partitions = total_count / 500
ws = Window.orderBy('bucket')
cdf = cdf.select(
cdf['bucket'],
cume_dist().over(ws).alias('c'),
row_number().over(ws).alias('i'))
cdf = cdf.filter("i = 1 OR i %% %d = 0" % num_partitions)
cdf = cdf.collect()
# Collapse rows with duplicate buckets.
collapsed_data = []
prev = None
for d in cdf:
if not collapsed_data:
collapsed_data.append(d) # Always keep first record.
continue
if prev and prev['bucket'] == d['bucket']:
collapsed_data.pop()
collapsed_data.append(d)
prev = d
# Calculate `p` from `c`.
data = []
prev = None
for i, d in enumerate(collapsed_data):
p = d['c'] - prev['c'] if prev else d['c']
data.append({
'bucket': d['bucket'],
'c': d['c'],
'p': p,
})
prev = d
"""
Example of what `data` looks like now::
[{'bucket': 0.0, 'c': 0.00126056, 'p': 0.00126056},
{'bucket': 3.0, 'c': 0.00372313, 'p': 0.00246256},
{'bucket': 4.0, 'c': 0.00430616, 'p': 0.0005830290622683026},
{'bucket': 6.13319683, 'c': 0.00599801, 'p': 0.00169184},
{'bucket': 8.0, 'c': 0.08114486, 'p': 0.07514685},
{'bucket': 8.23087882, 'c': 0.08197282, 'p': 0.00082795},
...]
"""
# Push data to database.
sql = ("INSERT INTO api_numericcollection "
"(num_observations, population, metric_id, dataset_id) "
"VALUES (%s, %s, %s, %s) "
"RETURNING id")
params = [total_count, population, metric['id'], dataset_id]
if DEBUG_SQL:
collection_id = 0
print sql, params
else:
cursor.execute(sql, params)
conn.commit()
collection_id = cursor.fetchone()[0]
for d in data:
sql = ("INSERT INTO api_numericpoint "
"(bucket, proportion, collection_id) "
"VALUES (%s, %s, %s)")
params = [d['bucket'], d['p'], collection_id]
if DEBUG_SQL:
print sql, params
else:
cursor.execute(sql, params)
if not DEBUG_SQL:
conn.commit()
开发者ID:mozilla,项目名称:distribution-viewer,代码行数:75,代码来源:aggregate-and-import.py
注:本文中的pyspark.sql.window.Window类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论