• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python window.Window类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.sql.window.Window的典型用法代码示例。如果您正苦于以下问题:Python Window类的具体用法?Python Window怎么用?Python Window使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Window类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_window_functions_cumulative_sum

    def test_window_functions_cumulative_sum(self):
        df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", "value"])
        from pyspark.sql import functions as F

        # Test cumulative sum
        sel = df.select(
            df.key,
            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding, 0)))
        rs = sorted(sel.collect())
        expected = [("one", 1), ("two", 3)]
        for r, ex in zip(rs, expected):
            self.assertEqual(tuple(r), ex[:len(r)])

        # Test boundary values less than JVM's Long.MinValue and make sure we don't overflow
        sel = df.select(
            df.key,
            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding - 1, 0)))
        rs = sorted(sel.collect())
        expected = [("one", 1), ("two", 3)]
        for r, ex in zip(rs, expected):
            self.assertEqual(tuple(r), ex[:len(r)])

        # Test boundary values greater than JVM's Long.MaxValue and make sure we don't overflow
        frame_end = Window.unboundedFollowing + 1
        sel = df.select(
            df.key,
            F.sum(df.value).over(Window.rowsBetween(Window.currentRow, frame_end)))
        rs = sorted(sel.collect())
        expected = [("one", 3), ("two", 2)]
        for r, ex in zip(rs, expected):
            self.assertEqual(tuple(r), ex[:len(r)])
开发者ID:JingchengDu,项目名称:spark,代码行数:31,代码来源:test_context.py


示例2: test_window_functions

    def test_window_functions(self):
        df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
        w = Window.partitionBy("value").orderBy("key")
        from pyspark.sql import functions as F

        sel = df.select(
            df.value,
            df.key,
            F.max("key").over(w.rowsBetween(0, 1)),
            F.min("key").over(w.rowsBetween(0, 1)),
            F.count("key").over(w.rowsBetween(float("-inf"), float("inf"))),
            F.rowNumber().over(w),
            F.rank().over(w),
            F.denseRank().over(w),
            F.ntile(2).over(w),
        )
        rs = sorted(sel.collect())
        expected = [
            ("1", 1, 1, 1, 1, 1, 1, 1, 1),
            ("2", 1, 1, 1, 3, 1, 1, 1, 1),
            ("2", 1, 2, 1, 3, 2, 1, 1, 1),
            ("2", 2, 2, 2, 3, 3, 3, 2, 2),
        ]
        for r, ex in zip(rs, expected):
            self.assertEqual(tuple(r), ex[: len(r)])
开发者ID:kmarquardsen,项目名称:spark,代码行数:25,代码来源:tests.py


示例3: train

 def train(self, df, featureCols):
     
     ntiles = []
     for col in featureCols:
         w = Window.partitionBy().orderBy(col)
         aux = df.select(F.ntile(self._n).over(w).alias('ntile'),col)
         ntiles.append(list(aux.groupby('ntile').max(col).collect()))
         
     self.ntiles_ = np.array(ntiles)
     self.columns_ = map(str,featureCols)
     self._is_trained = True
开发者ID:elmi-gemini,项目名称:pyspark_utils,代码行数:11,代码来源:outlier_remover.py


示例4: compute

def compute(day):
    # On veut les jours day-30 à day-1
    sums = wikipediadata.where(
            (wikipediadata.day >= day-30) & (wikipediadata.day <= day-1))

    # Sous-ensemble de test
    #sums = sums.where((sums.page == 'Cadillac_Brougham') | ((sums.page == 'Roald_Dahl') & (sums.projectcode == 'fr')))

    # On somme les heures de la journées
    sums = sums.groupby('projectcode', 'page', 'day').sum('views')
    # On cache pour plus tard
    sums.cache()

    # on définit une windows := jour precedent
    window_spec =  Window.partitionBy(sums.projectcode, sums.page) \
            .orderBy(sums.day.asc()).rowsBetween(-1, -1)

    # on calcule la différence entre views(d) - views(d-1)
    diffs = sums.withColumn('diff', sums.views - F.sum(sums.views) \
            .over(window_spec))

    # on calcule les coefs à appliquer à chaque jour
    coefs = pd.DataFrame({'day': range(day-30, day)})
    coefs['coef'] = 1. / (day - coefs.day)

    coefs = hc.createDataFrame(coefs)
    diffs = diffs.join(coefs, 'day')

    # on calcul le score de chaque jour
    diffs = diffs.withColumn('sub_score', diffs.diff * diffs.coef)

    totals = diffs.groupby('projectcode', 'page').sum('views', 'sub_score')
    # on normalise par la racine de la somme des views 
    totals = totals.withColumn('score',
            totals['SUM(sub_score)'] / F.sqrt(totals['SUM(views)'])) \
            .orderBy(F.desc('score')) \
            .withColumnRenamed('SUM(views)', 'total_views') \
            .limit(10)

    views = sums.select('projectcode', 'page', 'day', 'views') \
           .join(totals.select('projectcode', 'page', 'total_views', 'score'), 
                  (totals.projectcode == sums.projectcode) & (totals.page == sums.page), 'right_outer')

    df = totals.select('projectcode', 'page', 'total_views', 'score').toPandas()
    df2 = views.toPandas()
    df2 = df2.iloc[:, 2:]
    df2 = df2.pivot_table(values='views', columns=['day'], index=['projectcode', 'page'], fill_value=0)
    df = df.merge(df2, left_on=['projectcode', 'page'], right_index=True)
    df.to_csv(filename(day), index=False)
    
    # on vide le cache
    hc.clearCache()
开发者ID:cygilbert,项目名称:projetnosql,代码行数:52,代码来源:Req30j.py


示例5: runOtherFunctions

def runOtherFunctions(spark, personDf):
    df = spark.createDataFrame([("v1", "v2", "v3")], ["c1", "c2", "c3"]);

    # array
    df.select(df.c1, df.c2, df.c3, array("c1", "c2", "c3").alias("newCol")).show(truncate=False)

    # desc, asc
    personDf.show()
    personDf.sort(functions.desc("age"), functions.asc("name")).show()

    # pyspark 2.1.0 버전은 desc_nulls_first, desc_nulls_last, asc_nulls_first, asc_nulls_last 지원하지 않음

    # split, length (pyspark에서 컬럼은 df["col"] 또는 df.col 형태로 사용 가능)
    df2 = spark.createDataFrame([("Splits str around pattern",)], ['value'])
    df2.select(df2.value, split(df2.value, " "), length(df2.value)).show(truncate=False)

    # rownum, rank
    f1 = StructField("date", StringType(), True)
    f2 = StructField("product", StringType(), True)
    f3 = StructField("amount", IntegerType(), True)
    schema = StructType([f1, f2, f3])

    p1 = ("2017-12-25 12:01:00", "note", 1000)
    p2 = ("2017-12-25 12:01:10", "pencil", 3500)
    p3 = ("2017-12-25 12:03:20", "pencil", 23000)
    p4 = ("2017-12-25 12:05:00", "note", 1500)
    p5 = ("2017-12-25 12:05:07", "note", 2000)
    p6 = ("2017-12-25 12:06:25", "note", 1000)
    p7 = ("2017-12-25 12:08:00", "pencil", 500)
    p8 = ("2017-12-25 12:09:45", "note", 30000)

    dd = spark.createDataFrame([p1, p2, p3, p4, p5, p6, p7, p8], schema)
    w1 = Window.partitionBy("product").orderBy("amount")
    w2 = Window.orderBy("amount")
    dd.select(dd.product, dd.amount, functions.row_number().over(w1).alias("rownum"),
              functions.rank().over(w2).alias("rank")).show()
开发者ID:oopchoi,项目名称:spark,代码行数:36,代码来源:dataframe_sample.py


示例6: unpartitioned_window

 def unpartitioned_window(self):
     return Window.partitionBy()
开发者ID:git-prodigy,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py


示例7: ordered_window

 def ordered_window(self):
     return Window.partitionBy('id').orderBy('v')
开发者ID:git-prodigy,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py


示例8: unbounded_window

 def unbounded_window(self):
     return Window.partitionBy('id') \
         .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
开发者ID:git-prodigy,项目名称:spark,代码行数:3,代码来源:test_pandas_udf_window.py


示例9: cal_mat_window

def cal_mat_window(sc, sqlContext, dfSC, window):
    windowSpec = Window.partitionBy("symbol").orderBy("date").rangeBetween(-1 * window+1,1)
    mat = func.avg("close").over(windowSpec)
    dfSC = dfSC.select(dfSC.symbol, dfSC.date, dfSC.close, mat )
    print dfSC.collect()
开发者ID:hongbin0908,项目名称:bintrade,代码行数:5,代码来源:mat_close.py


示例10: sliding_range_window

 def sliding_range_window(self):
     return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4)
开发者ID:Brett-A,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py


示例11: main

def main():
    "Main function"
    optmgr  = OptionParser()
    opts = optmgr.parser.parse_args()

    # setup spark/sql context to be used for communication with HDFS
    sc = SparkContext(appName="phedex_br")
    if not opts.yarn:
        sc.setLogLevel("ERROR")
    sqlContext = HiveContext(sc)

    schema_def = schema()

    # read given file(s) into RDD
    if opts.fname:
        pdf = sqlContext.read.format('com.databricks.spark.csv')\
                        .options(treatEmptyValuesAsNulls='true', nullValue='null')\
                        .load(opts.fname, schema = schema_def)
    elif opts.basedir:
        fromdate, todate = defDates(opts.fromdate, opts.todate)
        files = getFileList(opts.basedir, fromdate, todate)
        msg = "Between dates %s and %s found %d directories" % (fromdate, todate, len(files))
        print msg

        if not files:
            return
        pdf = unionAll([sqlContext.read.format('com.databricks.spark.csv')
                        .options(treatEmptyValuesAsNulls='true', nullValue='null')\
                        .load(file_path, schema = schema_def) \
                        for file_path in files])
    else:
        raise ValueError("File or directory not specified. Specify fname or basedir parameters.")

    # parsing additional data (to given data adding: group name, node kind, acquisition era, data tier, now date)
    groupdic, nodedic = getJoinDic()
    acquisition_era_reg = r"^/[^/]*/([^/^-]*)-[^/]*/[^/]*$"	
    data_tier_reg = r"^/[^/]*/[^/^-]*-[^/]*/([^/]*)$"
    groupf = udf(lambda x: groupdic[x], StringType())
    nodef = udf(lambda x: nodedic[x], StringType())

    ndf = pdf.withColumn("br_user_group", groupf(pdf.br_user_group_id)) \
         .withColumn("node_kind", nodef(pdf.node_id)) \
         .withColumn("now", from_unixtime(pdf.now_sec, "YYYY-MM-dd")) \
         .withColumn("acquisition_era", when(regexp_extract(pdf.dataset_name, acquisition_era_reg, 1) == "",\
                    lit("null")).otherwise(regexp_extract(pdf.dataset_name, acquisition_era_reg, 1))) \
        .withColumn("data_tier", when(regexp_extract(pdf.dataset_name, data_tier_reg, 1) == "",\
                    lit("null")).otherwise(regexp_extract(pdf.dataset_name, data_tier_reg, 1)))

	# print dataframe schema
    if opts.verbose:
        ndf.show()
        print("pdf data type", type(ndf))
        ndf.printSchema()

    # process aggregation parameters
    keys = [key.lower().strip() for key in opts.keys.split(',')]
    results = [result.lower().strip() for result in opts.results.split(',')]
    aggregations = [agg.strip() for agg in opts.aggregations.split(',')]
    order = [orde.strip() for orde in opts.order.split(',')] if opts.order else []
    asc = [asce.strip() for asce in opts.asc.split(',')] if opts.order else []
    filtc, filtv = opts.filt.split(":") if opts.filt else (None,None)

    validateAggregationParams(keys, results, aggregations, order, filtc)

    if filtc and filtv:
        ndf = ndf.filter(getattr(ndf, filtc) == filtv)

    # if delta aggregation is used
    if DELTA in aggregations:
        validateDeltaParam(opts.interval, results)			
        result = results[0]

        #1 for all dates generate interval group dictionary
        datedic = generateDateDict(fromdate, todate, opts.interval)
        boundic = generateBoundDict(datedic)
        max_interval = max(datedic.values())

        interval_group = udf(lambda x: datedic[x], IntegerType())
        interval_start = udf(lambda x: boundic[x][0], StringType())		
        interval_end = udf(lambda x: boundic[x][1], StringType())

        #2 group data by block, node, interval and last result in the interval
        ndf = ndf.select(ndf.block_name, ndf.node_name, ndf.now, getattr(ndf, result))
        idf = ndf.withColumn("interval_group", interval_group(ndf.now))
        win = Window.partitionBy(idf.block_name, idf.node_name, idf.interval_group).orderBy(idf.now.desc())	
        idf = idf.withColumn("row_number", rowNumber().over(win))
        rdf = idf.where((idf.row_number == 1) & (idf.interval_group != 0))\
                 .withColumn(result, when(idf.now == interval_end(idf.interval_group), getattr(idf, result)).otherwise(lit(0)))
        rdf = rdf.select(rdf.block_name, rdf.node_name, rdf.interval_group, getattr(rdf, result))
        rdf.cache()

        #3 create intervals that not exist but has minus delta
        win = Window.partitionBy(idf.block_name, idf.node_name).orderBy(idf.interval_group)
        adf = rdf.withColumn("interval_group_aft", lead(rdf.interval_group, 1, 0).over(win))
        hdf = adf.filter(((adf.interval_group + 1) != adf.interval_group_aft) & (adf.interval_group != max_interval))\
                 .withColumn("interval_group", adf.interval_group + 1)\
                 .withColumn(result, lit(0))\
                 .drop(adf.interval_group_aft)

        #4 join data frames
#.........这里部分代码省略.........
开发者ID:aurimasrep,项目名称:PhEDEx-replicamonitoring,代码行数:101,代码来源:pbr.py


示例12: shrinking_range_window

 def shrinking_range_window(self):
     return Window.partitionBy('id').orderBy('v') \
         .rangeBetween(-3, Window.unboundedFollowing)
开发者ID:Brett-A,项目名称:spark,代码行数:3,代码来源:test_pandas_udf_window.py


示例13: shrinking_row_window

 def shrinking_row_window(self):
     return Window.partitionBy('id').orderBy('v').rowsBetween(-2, Window.unboundedFollowing)
开发者ID:Brett-A,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py


示例14: growing_range_window

 def growing_range_window(self):
     return Window.partitionBy('id').orderBy('v') \
         .rangeBetween(Window.unboundedPreceding, 4)
开发者ID:Brett-A,项目名称:spark,代码行数:3,代码来源:test_pandas_udf_window.py


示例15: growing_row_window

 def growing_row_window(self):
     return Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3)
开发者ID:Brett-A,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py


示例16: with_window_column

 def with_window_column(df):
     from pyspark.sql.window import Window
     from pyspark.sql.functions import percent_rank
     windowSpec = Window.partitionBy(df['id']).orderBy(df['forecast'])
     return df.withColumn("r", percent_rank().over(windowSpec))
开发者ID:mattomatic,项目名称:flint,代码行数:5,代码来源:test_partition_preserve.py


示例17: sliding_row_window

 def sliding_row_window(self):
     return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)
开发者ID:Brett-A,项目名称:spark,代码行数:2,代码来源:test_pandas_udf_window.py


示例18: collect_numeric_metric

def collect_numeric_metric(metric, df, population):
    cdf = df.select(df[metric['src']])
    cdf = cdf.dropna(subset=metric['src'])
    cdf = cdf.select(cdf[metric['src']].cast('float').alias('bucket'))

    total_count = cdf.count()
    num_partitions = total_count / 500
    ws = Window.orderBy('bucket')
    cdf = cdf.select(
        cdf['bucket'],
        cume_dist().over(ws).alias('c'),
        row_number().over(ws).alias('i'))
    cdf = cdf.filter("i = 1 OR i %% %d = 0" % num_partitions)
    cdf = cdf.collect()

    # Collapse rows with duplicate buckets.
    collapsed_data = []
    prev = None
    for d in cdf:
        if not collapsed_data:
            collapsed_data.append(d)  # Always keep first record.
            continue
        if prev and prev['bucket'] == d['bucket']:
            collapsed_data.pop()
        collapsed_data.append(d)
        prev = d

    # Calculate `p` from `c`.
    data = []
    prev = None
    for i, d in enumerate(collapsed_data):
        p = d['c'] - prev['c'] if prev else d['c']
        data.append({
            'bucket': d['bucket'],
            'c': d['c'],
            'p': p,
        })
        prev = d
    """
    Example of what `data` looks like now::

        [{'bucket': 0.0,        'c': 0.00126056, 'p': 0.00126056},
         {'bucket': 3.0,        'c': 0.00372313, 'p': 0.00246256},
         {'bucket': 4.0,        'c': 0.00430616, 'p': 0.0005830290622683026},
         {'bucket': 6.13319683, 'c': 0.00599801, 'p': 0.00169184},
         {'bucket': 8.0,        'c': 0.08114486, 'p': 0.07514685},
         {'bucket': 8.23087882, 'c': 0.08197282, 'p': 0.00082795},
         ...]
    """
    # Push data to database.
    sql = ("INSERT INTO api_numericcollection "
           "(num_observations, population, metric_id, dataset_id) "
           "VALUES (%s, %s, %s, %s) "
           "RETURNING id")
    params = [total_count, population, metric['id'], dataset_id]
    if DEBUG_SQL:
        collection_id = 0
        print sql, params
    else:
        cursor.execute(sql, params)
        conn.commit()
        collection_id = cursor.fetchone()[0]

    for d in data:
        sql = ("INSERT INTO api_numericpoint "
               "(bucket, proportion, collection_id) "
               "VALUES (%s, %s, %s)")
        params = [d['bucket'], d['p'], collection_id]
        if DEBUG_SQL:
            print sql, params
        else:
            cursor.execute(sql, params)

    if not DEBUG_SQL:
        conn.commit()
开发者ID:mozilla,项目名称:distribution-viewer,代码行数:75,代码来源:aggregate-and-import.py



注:本文中的pyspark.sql.window.Window类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python streaming.DStream类代码示例发布时间:2022-05-27
下一篇:
Python utils.toJArray函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap