• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python functions.max函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.sql.functions.max函数的典型用法代码示例。如果您正苦于以下问题:Python max函数的具体用法?Python max怎么用?Python max使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了max函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_mixed_sql_and_udf

    def test_mixed_sql_and_udf(self):
        df = self.data
        w = self.unbounded_window
        ow = self.ordered_window
        max_udf = self.pandas_agg_max_udf
        min_udf = self.pandas_agg_min_udf

        result1 = df.withColumn('v_diff', max_udf(df['v']).over(w) - min_udf(df['v']).over(w))
        expected1 = df.withColumn('v_diff', max(df['v']).over(w) - min(df['v']).over(w))

        # Test mixing sql window function and window udf in the same expression
        result2 = df.withColumn('v_diff', max_udf(df['v']).over(w) - min(df['v']).over(w))
        expected2 = expected1

        # Test chaining sql aggregate function and udf
        result3 = df.withColumn('max_v', max_udf(df['v']).over(w)) \
                    .withColumn('min_v', min(df['v']).over(w)) \
                    .withColumn('v_diff', col('max_v') - col('min_v')) \
                    .drop('max_v', 'min_v')
        expected3 = expected1

        # Test mixing sql window function and udf
        result4 = df.withColumn('max_v', max_udf(df['v']).over(w)) \
                    .withColumn('rank', rank().over(ow))
        expected4 = df.withColumn('max_v', max(df['v']).over(w)) \
                      .withColumn('rank', rank().over(ow))

        self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
        self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
        self.assertPandasEqual(expected3.toPandas(), result3.toPandas())
        self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
开发者ID:git-prodigy,项目名称:spark,代码行数:31,代码来源:test_pandas_udf_window.py


示例2: test_window_functions

    def test_window_functions(self):
        df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
        w = Window.partitionBy("value").orderBy("key")
        from pyspark.sql import functions as F

        sel = df.select(
            df.value,
            df.key,
            F.max("key").over(w.rowsBetween(0, 1)),
            F.min("key").over(w.rowsBetween(0, 1)),
            F.count("key").over(w.rowsBetween(float("-inf"), float("inf"))),
            F.rowNumber().over(w),
            F.rank().over(w),
            F.denseRank().over(w),
            F.ntile(2).over(w),
        )
        rs = sorted(sel.collect())
        expected = [
            ("1", 1, 1, 1, 1, 1, 1, 1, 1),
            ("2", 1, 1, 1, 3, 1, 1, 1, 1),
            ("2", 1, 2, 1, 3, 2, 1, 1, 1),
            ("2", 2, 2, 2, 3, 3, 3, 2, 2),
        ]
        for r, ex in zip(rs, expected):
            self.assertEqual(tuple(r), ex[: len(r)])
开发者ID:kmarquardsen,项目名称:spark,代码行数:25,代码来源:tests.py


示例3: getValueFieldValueLists

 def getValueFieldValueLists(self, handlerId, keyFields, valueFields):
     df = self.entity.groupBy(keyFields)
     agg = self.options.get("aggregation",self.getDefaultAggregation(handlerId))
     maxRows = int(self.options.get("rowCount","100"))
     numRows = min(maxRows,df.count())
     valueLists = []
     for valueField in valueFields:
         valueDf = None
         if agg == "SUM":
             valueDf = df.agg(F.sum(valueField).alias("agg"))
         elif agg == "AVG":
             valueDf = df.agg(F.avg(valueField).alias("agg"))
         elif agg == "MIN":
             valueDf = df.agg(F.min(valueField).alias("agg"))
         elif agg == "MAX":
             valueDf = df.agg(F.max(valueField).alias("agg"))
         else:
             valueDf = df.agg(F.count(valueField).alias("agg"))
         for keyField in keyFields:
             valueDf = valueDf.sort(F.col(keyField).asc())
         valueDf = valueDf.dropna()
         rows = valueDf.select("agg").take(numRows)
         valueList = []
         for row in rows:
             valueList.append(row["agg"])
         valueLists.append(valueList)
     return valueLists   
开发者ID:ygoverdhan,项目名称:pixiedust,代码行数:27,代码来源:mpld3ChartDisplay.py


示例4: process_file

def process_file(date_update):
    """Process downloaded MEDLINE folder to parquet file"""
    print("Process MEDLINE file to parquet")
    # remove if folder still exist
    if glob(os.path.join(save_dir, 'medline_*.parquet')):
        subprocess.call(['rm', '-rf', 'medline_*.parquet'])

    date_update_str = date_update.strftime("%Y_%m_%d")
    path_rdd = sc.parallelize(glob(os.path.join(download_dir, 'medline*.xml.gz')), numSlices=1000)
    parse_results_rdd = path_rdd.\
        flatMap(lambda x: [Row(file_name=os.path.basename(x), **publication_dict)
                           for publication_dict in pp.parse_medline_xml(x)])
    medline_df = parse_results_rdd.toDF()
    medline_df.write.parquet(os.path.join(save_dir, 'medline_raw_%s.parquet' % date_update_str),
                             mode='overwrite')

    window = Window.partitionBy(['pmid']).orderBy(desc('file_name'))
    windowed_df = medline_df.select(
        max('delete').over(window).alias('is_deleted'),
        rank().over(window).alias('pos'),
        '*')
    windowed_df.\
        where('is_deleted = False and pos = 1').\
        write.parquet(os.path.join(save_dir, 'medline_lastview_%s.parquet' % date_update_str),
                      mode='overwrite')

    # parse grant database
    parse_grant_rdd = path_rdd.flatMap(lambda x: pp.parse_medline_grant_id(x))\
        .filter(lambda x: x is not None)\
        .map(lambda x: Row(**x))
    grant_df = parse_grant_rdd.toDF()
    grant_df.write.parquet(os.path.join(save_dir, 'medline_grant_%s.parquet' % date_update_str),
                           mode='overwrite')
开发者ID:H-Plus-Time,项目名称:pubmed_parser,代码行数:33,代码来源:medline_spark.py


示例5: reduce_to_ohlc

def reduce_to_ohlc(time, rdd):
    row_rdd = rdd.map(lambda row: row.split(',')) \
                 .filter(lambda row: len(row) == 3) \
                 .map(lambda row: Row(
                       symbol=row[0],
                       tx_time=datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S.%f'),
                       price=float(row[1])
                 ))
    sql_context = get_sql_context_instance(rdd.context)
    data = sql_context.createDataFrame(row_rdd)
    data.cache()
    data.write.format('org.apache.spark.sql.cassandra') \
            .options(table='transactions2', keyspace='stock', cluster='Test Cluster') \
            .mode('append') \
            .save()

    ohlc = data.select('symbol', truncate_min(data.tx_time).alias('batch_time'), 'price', 'tx_time') \
                .orderBy('tx_time') \
                .groupBy('symbol', 'batch_time') \
                .agg(
                   F.first(data.price).alias('open'),
                   F.max(data.price).alias('high'),
                   F.min(data.price).alias('low'),
                   F.last(data.price).alias('close'),
                   F.first(data.tx_time).alias('open_time'),
                   F.last(data.tx_time).alias('close_time')
                )

    existing_ohlc = sql_context.read.format('org.apache.spark.sql.cassandra') \
            .options(table='ohlc_1_min2', keyspace='stock', cluster='Test Cluster') \
            .load() \
            .select('symbol', 'batch_time', 'open', 'open_time', 'high', 'low', 'close', 'close_time')

    merged_ohlc = ohlc.join(existing_ohlc,
                             (ohlc.symbol == existing_ohlc.symbol) &
                             (ohlc.batch_time == existing_ohlc.batch_time),
                             'left'
                           )

    merged_ohlc = merged_ohlc.select(
        ohlc.symbol.alias('symbol'),
        ohlc.batch_time.alias('batch_time'),
        F.when(existing_ohlc.open_time < ohlc.open_time, existing_ohlc.open).otherwise(ohlc.open).alias('open'),
        F.when(existing_ohlc.open_time < ohlc.open_time, existing_ohlc.open_time).otherwise(ohlc.open_time).alias('open_time'),
        F.when(existing_ohlc.close_time > ohlc.close_time, existing_ohlc.close).otherwise(ohlc.close).alias('close'),
        F.when(existing_ohlc.close_time > ohlc.close_time, existing_ohlc.close_time).otherwise(ohlc.close_time).alias('close_time'),
        F.when(existing_ohlc.low < ohlc.low, existing_ohlc.low).otherwise(ohlc.low).alias('low'),
        F.when(existing_ohlc.high > ohlc.high, existing_ohlc.high).otherwise(ohlc.high).alias('high')
    )
    merged_ohlc.write.format('org.apache.spark.sql.cassandra') \
                .options(table='ohlc_1_min2', keyspace='stock', cluster='Test Cluster') \
                .mode('append') \
                .save()
开发者ID:tansinee,项目名称:poc01,代码行数:53,代码来源:stream_ohlc.py


示例6: test_multiple_udfs

    def test_multiple_udfs(self):
        df = self.data
        w = self.unbounded_window

        result1 = df.withColumn('mean_v', self.pandas_agg_mean_udf(df['v']).over(w)) \
                    .withColumn('max_v', self.pandas_agg_max_udf(df['v']).over(w)) \
                    .withColumn('min_w', self.pandas_agg_min_udf(df['w']).over(w))

        expected1 = df.withColumn('mean_v', mean(df['v']).over(w)) \
                      .withColumn('max_v', max(df['v']).over(w)) \
                      .withColumn('min_w', min(df['w']).over(w))

        self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
开发者ID:git-prodigy,项目名称:spark,代码行数:13,代码来源:test_pandas_udf_window.py


示例7: test_timestamp_splitter

def test_timestamp_splitter(test_specs, spark_dataset):
    dfs_rating = spark_dataset.withColumn(DEFAULT_TIMESTAMP_COL, col(DEFAULT_TIMESTAMP_COL).cast("float"))

    splits = spark_timestamp_split(
        dfs_rating, ratio=test_specs["ratio"], col_timestamp=DEFAULT_TIMESTAMP_COL
    )

    assert splits[0].count() / test_specs["number_of_rows"] == pytest.approx(
        test_specs["ratio"], test_specs["tolerance"]
    )
    assert splits[1].count() / test_specs["number_of_rows"] == pytest.approx(
        1 - test_specs["ratio"], test_specs["tolerance"]
    )

    max_split0 = splits[0].agg(F.max(DEFAULT_TIMESTAMP_COL)).first()[0]
    min_split1 = splits[1].agg(F.min(DEFAULT_TIMESTAMP_COL)).first()[0]
    assert(max_split0 <= min_split1)

    # Test multi split
    splits = spark_timestamp_split(dfs_rating, ratio=test_specs["ratios"])

    assert splits[0].count() / test_specs["number_of_rows"] == pytest.approx(
        test_specs["ratios"][0], test_specs["tolerance"]
    )
    assert splits[1].count() / test_specs["number_of_rows"] == pytest.approx(
        test_specs["ratios"][1], test_specs["tolerance"]
    )
    assert splits[2].count() / test_specs["number_of_rows"] == pytest.approx(
        test_specs["ratios"][2], test_specs["tolerance"]
    )

    max_split0 = splits[0].agg(F.max(DEFAULT_TIMESTAMP_COL)).first()[0]
    min_split1 = splits[1].agg(F.min(DEFAULT_TIMESTAMP_COL)).first()[0]
    assert(max_split0 <= min_split1)

    max_split1 = splits[1].agg(F.max(DEFAULT_TIMESTAMP_COL)).first()[0]
    min_split2 = splits[2].agg(F.min(DEFAULT_TIMESTAMP_COL)).first()[0]
    assert(max_split1 <= min_split2)
开发者ID:David-Li-L,项目名称:recommenders,代码行数:38,代码来源:test_spark_splitter.py


示例8: handleUIOptions

    def handleUIOptions(self, displayColName):
        agg = self.options.get("aggregation")
        valFields = self.options.get("valueFields")

        if agg == 'COUNT':
            return self.entity.groupBy(displayColName).agg(F.count(displayColName).alias("agg")).toPandas()
        elif agg == 'SUM':
            return self.entity.groupBy(displayColName).agg(F.sum(valFields).alias("agg")).toPandas()
        elif agg == 'AVG':
            return self.entity.groupBy(displayColName).agg(F.avg(valFields).alias("agg")).toPandas()
        elif agg == 'MIN':
            return self.entity.groupBy(displayColName).agg(F.min(valFields).alias("agg")).toPandas()
        elif agg == 'MAX':
            return self.entity.groupBy(displayColName).agg(F.max(valFields).alias("agg")).toPandas()
        elif agg == 'MEAN':
            return self.entity.groupBy(displayColName).agg(F.mean(valFields).alias("agg")).toPandas()
        else:
            return self.entity.groupBy(displayColName).agg(F.count(displayColName).alias("agg")).toPandas()
开发者ID:ygoverdhan,项目名称:pixiedust,代码行数:18,代码来源:pieChartDisplay.py


示例9: do_something_only_once

def do_something_only_once():
    # the command I use to run this script:
    #~/spark-1.6.1/bin/spark-submit --packages=com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.4.0 server.py
    global topdis, meta, dic, towo, cluto, doctopdat, maxdate, mindate, lda
    ## Loading of data
    sc = SparkContext(appName='Simple App') #"local"
    sqlContext = SQLContext(sc)
    # Load metadata avro
    reader = sqlContext.read.format('com.databricks.spark.avro')
    meta = reader.load('data/spark_metadata.avro')
    # # Loading topic distributions
    topdisFile = 'data/spark_output.tuples'
    csvLoader = sqlContext.read.format('com.databricks.spark.csv')
    topdis = csvLoader.options(delimiter=',',header='false', inferschema='true').load(topdisFile)
    strip_first_col_int = udf(lambda row: int(row[1:]), IntegerType())
    topdis = topdis.withColumn('C0',strip_first_col_int(topdis['C0']))
    strip_first_col_float = udf(lambda row: float(row[1:]), FloatType())
    topdis = topdis.withColumn('C1',strip_first_col_float(topdis['C1']))
    strip_last_col = udf(lambda row: float(row[:-2]), FloatType())
    topdis = topdis.withColumn('C20',strip_last_col(topdis['C20']))
    # # Load dictionary CSV
    dicFile = 'data/spark_dic.csv'
    csvLoader = sqlContext.read.format('com.databricks.spark.csv')
    dic = csvLoader.options(delimiter='\t', header='false', inferschema='true').load(dicFile)
    dic = dic.select(dic['C0'].alias('id'), dic['C1'].alias('word'), dic['C2'].alias('count'))
    ldaFile = 'data/spark_lda.csv'
    csvLoader = sqlContext.read.format('com.databricks.spark.csv')
    lda = csvLoader.options(delimiter='\t', header='false', inferschema='true').load(ldaFile)
    lda = lda.select(rowNumber().alias('id'), lda.columns).join(dic, dic.id == lda.id, 'inner').cache()
    # dic = dic.select(dic['C0'].alias('id'), dic['C1'].alias('word'), dic['C2'].alias('count'))
    # # # Load clustertopics CSV
    # clutoFile = 'enron_small_clustertopics.csv'
    # csvLoader = sqlContext.read.format('com.databricks.spark.csv')
    # cluto = csvLoader.options(delimiter=',', header='false', inferschema='true').load(clutoFile)
    # # # Load topicswords CSV
    # towoFile = 'enron_small_lda_transposed.csv'
    # csvLoader = sqlContext.read.format('com.databricks.spark.csv')
    # towo = csvLoader.options(delimiter=',', header='false', inferschema='true').load(towoFile)
    # # Merge topdis which has document id and with metadata, based on document id
    metasmall = meta.select('id',unix_timestamp(meta['date'],"yyyy-MM-dd'T'HH:mm:ssX").alias("timestamp"))
    doctopdat = topdis.join(metasmall, metasmall.id == topdis.C0,'inner').cache()
    maxdate = doctopdat.select(max('timestamp').alias('maxtimestamp')).collect()[0]['maxtimestamp']
    mindate = doctopdat.select(min('timestamp').alias('mintimestamp')).collect()[0]['mintimestamp']
开发者ID:nlesc-sherlock,项目名称:corpora-vis,代码行数:43,代码来源:server.py


示例10: test_bounded_mixed

    def test_bounded_mixed(self):
        from pyspark.sql.functions import mean, max

        df = self.data
        w1 = self.sliding_row_window
        w2 = self.unbounded_window

        mean_udf = self.pandas_agg_mean_udf
        max_udf = self.pandas_agg_max_udf

        result1 = df.withColumn('mean_v', mean_udf(df['v']).over(w1)) \
            .withColumn('max_v', max_udf(df['v']).over(w2)) \
            .withColumn('mean_unbounded_v', mean_udf(df['v']).over(w1))

        expected1 = df.withColumn('mean_v', mean(df['v']).over(w1)) \
            .withColumn('max_v', max(df['v']).over(w2)) \
            .withColumn('mean_unbounded_v', mean(df['v']).over(w1))

        self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
开发者ID:Brett-A,项目名称:spark,代码行数:19,代码来源:test_pandas_udf_window.py


示例11: _if_later

def _if_later(data1, data2):
    """Helper function to test if records in data1 are earlier than that in data2.
    Returns:
        bool: True or False indicating if data1 is earlier than data2.
    """
    x = (data1.select(DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL)
         .groupBy(DEFAULT_USER_COL)
         .agg(F.max(DEFAULT_TIMESTAMP_COL).cast('long').alias('max'))
         .collect())
    max_times = {row[DEFAULT_USER_COL]: row['max'] for row in x}

    y = (data2.select(DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL)
         .groupBy(DEFAULT_USER_COL)
         .agg(F.min(DEFAULT_TIMESTAMP_COL).cast('long').alias('min'))
         .collect())
    min_times = {row[DEFAULT_USER_COL]: row['min'] for row in y}

    result = True
    for user, max_time in max_times.items():
        result = result and min_times[user] >= max_time

    return result
开发者ID:David-Li-L,项目名称:recommenders,代码行数:22,代码来源:test_spark_splitter.py


示例12: test_bounded_simple

    def test_bounded_simple(self):
        from pyspark.sql.functions import mean, max, min, count

        df = self.data
        w1 = self.sliding_row_window
        w2 = self.shrinking_range_window

        plus_one = self.python_plus_one
        count_udf = self.pandas_agg_count_udf
        mean_udf = self.pandas_agg_mean_udf
        max_udf = self.pandas_agg_max_udf
        min_udf = self.pandas_agg_min_udf

        result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1)) \
            .withColumn('count_v', count_udf(df['v']).over(w2)) \
            .withColumn('max_v',  max_udf(df['v']).over(w2)) \
            .withColumn('min_v', min_udf(df['v']).over(w1))

        expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1)) \
            .withColumn('count_v', count(df['v']).over(w2)) \
            .withColumn('max_v', max(df['v']).over(w2)) \
            .withColumn('min_v', min(df['v']).over(w1))

        self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
开发者ID:Brett-A,项目名称:spark,代码行数:24,代码来源:test_pandas_udf_window.py


示例13: main

def main():
    # set up the logger
    logging.basicConfig(filename=os.path.join(config.mrqos_logging, 'ra_summary.log'),
                        level=logging.INFO,
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                        datefmt='%m/%d/%Y %H:%M:%S')
    logger = logging.getLogger(__name__)
    # table nsjoin (day, uuid)
    # table mapmon (day, uuid)
    datenow = str(datetime.date.today()-datetime.timedelta(1))
    day_idx = datenow[0:4]+datenow[5:7]+datenow[8:10]
    uuid_list = [x.split('=')[-1] for x in beeline.show_partitions('mrqos.mapmon_sum').split('\n') if day_idx in x]
    sc = SparkContext()
    hiveCtx = HiveContext(sc)
    post_partition_n = 1000

    for uuid_idx in uuid_list:
        # ns_ip, demand, asnum ns_asnum, ns_country, ns_continent, ns_lat, ns_lon, ns_mpgid, mpgload
        nsjoin_query = """ select ns_ip, demand, asnum ns_asnum, country_code ns_country, continent ns_continent, round(latitude,3) ns_lat, round(longitude,3) ns_lon, mpgid ns_mpgid, mpgload from mapper.nsjoin where day={} and mpd_uuid='{}' and longitude is not NULL and latitude is not NULL and demand > 1""".format(day_idx,
                                                                                                                                                                                                                                                                                                                            uuid_idx)

        # mpgid, mrid, mpg_type, region, link, min_s, max_s, min_r, max_r, ping, local, cont_fb, mpd_dftime, ecor, continent, country, latitude, longitude, prp
        mapmon_query = """ select mpgid, mrid, mpg_type, region, link, min_s, max_s, min_r, max_r, ping, local, cont_fb, mpd_dftime, ecor, continent, country, latitude, longitude, prp from mrqos.mapmon_sum where day={} and mpd_uuid='{}' and longitude is not NULL and latitude is not NULL""".format(day_idx,
                                                                                                                                                                                                                                                                                                          uuid_idx)
        logger.info('Processing data in day=%s, uuid=%s' % (day_idx, uuid_idx))

        nsjoin = hiveCtx.sql(nsjoin_query)
        nsjoin_rows = nsjoin.repartition(post_partition_n).cache()
        data = hiveCtx.sql(mapmon_query)
        data_rows = data.repartition(post_partition_n).cache()

        col = ['mpgid', 'mrid', 'mpg_type', 'region', 'link', 'min_s', 'max_s', 'min_r', 'max_r',
               'ping', 'local', 'cont_fb', 'mpd_dftime', 'ecor', 'continent', 'country', 'latitude', 'longitude', 'prp',
               'ns_ip', 'demand', 'ns_asnum', 'ns_country', 'ns_continent', 'ns_lat', 'ns_lon', 'mpgload']

        cols_appended = ['nsip', 'mrid', 'ns_demand', 'ns_asnum', 'ns_country', 'ns_continent', 'ns_lat', 'ns_lon',
                         'mpgid', 'mpg_type', 'mpg_load', 'regions', 'region_links', 'dftime_ratio', 'ecors',
                         'list_min_s', 'list_max_s', 'list_min_r', 'list_max_r',
                         'region_lats', 'region_lons', 'min_s', 'max_s', 'min_r', 'max_r', 'ping_ratio', 'local_ratio',
                         'cont_fb_ratio', 'in_cont_ratio', 'in_country_ratio', 'private_ratio', 'avg_distance',
                         'num_region_mapped', 'mapping_entropy', 'sum_dftime']

        df = nsjoin_rows.join(data_rows, data_rows.mpgid == nsjoin_rows.ns_mpgid, 'inner')[col].cache()
        row1 = data_rows.agg(F.max(data_rows.mpd_dftime)).collect()[0]
        max_dftime = row1[0]

        df2 = df.map(lambda x: x + Row(geodesic_distance_weighted(x.ns_lat,
                                                                  x.ns_lon,
                                                                  x.latitude,
                                                                  x.longitude,
                                                                  x.mpd_dftime)))\
                .map(lambda x: ((   x[19], # nsip
                                    x[20], # demand
                                    x[21], # ns_asnum
                                    x[22], # ns_country
                                    x[23], # ns_continent
                                    round(x[24], 3), # ns_lat & ns_lon
                                    round(x[25], 3),
                                    x[0], # mpgid
                                    x[1], # mrid
                                    x[2], # mpg type
                                    x[26], # mpg load
                                    ),
                               [   [int(x[3])], # region
                                   [str(int(x[3])) + "_" + str(int(x[4]))], # region_link
                                   x[5]/max_dftime, # min_s
                                   x[6]/max_dftime, # max_s
                                   x[7]/max_dftime, # min_r
                                   x[8]/max_dftime, # max_r
                                   x[9]/max_dftime, # ping ratio
                                   x[10]/max_dftime, # local ratio
                                   x[11]/max_dftime, # cont_fb ratio
                                   [round(x[12]/max_dftime, 3)], # mpd_dftime/max_dftime (time ratio)
                                   [int(x[13])], # ecor
                                   x[12]/max_dftime * [0, 1][x[14] == x[23]], # mapping in-continent ratio
                                   x[12]/max_dftime * [0, 1][x[15] == x[22]], # mapping in-country ratio
                                   [round(x[16], 3)], # lat
                                   [round(x[17], 3)], # lon
                                   x[18]/max_dftime, # prp
                                   x[27]/max_dftime, # w_distance
                                   x[12],
                                   [round(x[5]/x[12], 2)], # min_s list
                                   [round(x[6]/x[12], 2)], # max_s list
                                   [round(x[7]/x[12], 2)], # min_r list
                                   [round(x[8]/x[12], 2)], # max_r list
                               ]))\
                .reduceByKey(lambda a, b: [x+y for x, y in zip(a, b)])\
                .map(lambda x: [x[0][0], # nsip
                                x[0][8], # mrid
                                x[0][1], # demand
                                x[0][2], # ns_asnum
                                x[0][3], # ns_country
                                x[0][4], # ns_continent
                                x[0][5], # ns_lat
                                x[0][6], # ns_lon
                                x[0][7], # mpgid
                                x[0][9], # mpg type
                                x[0][10], # mpg load
                                x[1][0], # list of region
                                x[1][1], # list of region_link
#.........这里部分代码省略.........
开发者ID:YuTengChang,项目名称:akam_mrqos,代码行数:101,代码来源:raSummary_spark.py


示例14: min

'''
Necesario para utilizar la función to_date
'''
from pyspark.sql.functions import *

df.select("*")\
  .where((to_date(df.CreationDate) ==
    df.select(
      min(
        to_date("CreationDate"))\
        .alias("min"))\
        .collect()[0].min) | (
        to_date(df.CreationDate) ==
          df.select(
            max(to_date("CreationDate"))\
            .alias("max"))\
            .collect()[0].max))\
  .orderBy(to_date("CreationDate"))\
  .show()

''' Comparando fechas hasta los milisegundos'''

'''
Usuario más antiguo
'''
df.sort("CreationDate", ascending=False)\
  .limit(1)\
  .show()
'''
Usuario más reciente
'''
开发者ID:jiep,项目名称:ABD,代码行数:31,代码来源:queries.py


示例15: SparkContext

sc = SparkContext(conf = conf)
sqlcontext = SQLContext(sc)

# 1. Create a DataFrame with one int column and 10 rows.
df = sqlcontext.range(0, 10)
df.show()

# Generate two other columns using uniform distribution and normal distribution.
df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df.show()

# 2. Summary and Descriptive Statistics
df = sqlcontext.range(0, 10).withColumn('uniform', rand(seed=10)).withColumn('normal', randn(seed=27))
df.describe('uniform', 'normal').show()

df.select([mean('uniform'), min('uniform'), max('uniform')]).show()

# 3. Sample covariance and correlation
# Covariance is a measure of how two variables change with respect to each other. 
# A positive number would mean that there is a tendency that as one variable increases, 
# the other increases as well. 
# A negative number would mean that as one variable increases, 
# the other variable has a tendency to decrease.
df = sqlcontext.range(0, 10).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))
df.stat.cov('rand1', 'rand2')
df.stat.cov('id', 'id')

# Correlation is a normalized measure of covariance that is easier to understand, 
# as it provides quantitative measurements of the statistical dependence between two random variables.
df.stat.corr('rand1', 'rand2')
df.stat.corr('id', 'id')
开发者ID:xialei,项目名称:sparkme,代码行数:31,代码来源:sparkDataFrame.py


示例16: last

# COMMAND ----------

from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show() # 3364


# COMMAND ----------

from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()


# COMMAND ----------

from pyspark.sql.functions import min, max
df.select(min("Quantity"), max("Quantity")).show()


# COMMAND ----------

from pyspark.sql.functions import sum
df.select(sum("Quantity")).show() # 5176450


# COMMAND ----------

from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show() # 29310


# COMMAND ----------
开发者ID:yehonatc,项目名称:Spark-The-Definitive-Guide,代码行数:31,代码来源:Structured_APIs-Chapter_7_Aggregations.py


示例17: print

    # konsum_user_agg=konsum_user.groupBy('a_user_key').agg(sqlfuncs.max('reg_date').alias('reg_date'),\
    #     sqlfuncs.avg('age').alias('age'), sqlfuncs.max('gender').alias('gender'),sqlfuncs.max('date').alias('last_consume'),\
    #     sqlfuncs.min('date').alias('first_consume')  )
    # konsum_user_agg.registerTempTable('user_agg')
    #
    #
    # print(konsum_user_agg.first())
    # konsum_user_agg.write.save('/home/erlenda/data/konsum/a_users_parquet')
    #
    #
    #
    # #reg_late=konsum_user.filter(konsum_user.reg_date<datetime.datetime(2015,11,16,0,0))
    #
    pvs=konsum_user.groupBy('a_virtual','a_user_key','timegroup','device').agg(sqlfuncs.sum(konsum_user.pv).alias("pvs"),\
                                                  sqlfuncs.sum(konsum_user.pv_bet).alias("pvs_bet"),\
                                                  sqlfuncs.max('date').alias('last_consume'),\
                                                  sqlfuncs.min('date').alias('first_consume'),\
                                                  sqlfuncs.sum(konsum_user.visits).alias("visits"))

    pprint(pvs.take(10))
    print()
    #print(pvs.take(100)[55])
    pvs_tot1=pvs.agg(sqlfuncs.sum(pvs.pvs)).first()
    print('Total after basic aggregation',pvs_tot1)

    pvs_mapped=pvs.rdd.map(lambda x:((x.a_user_key,x.a_virtual), (Counter({literal_eval(x.timegroup):x.pvs}),\
        Counter({literal_eval(x.timegroup):1}),\
        x.pvs,\
        x.pvs_bet,\
        Counter({x.device:x.pvs}) ) )  )
开发者ID:Froskekongen,项目名称:content-consumption,代码行数:30,代码来源:consume_profiles_spark.py


示例18: gen_report_table

 def gen_report_table(hc,curUnixDay):
     rows_indoor=sc.textFile("/data/indoor/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),seconds=int(p[4]),utoday=int(p[5]),ufirstday=int(p[6])))
     HiveContext.createDataFrame(hc,rows_indoor).registerTempTable("df_indoor")
     #ClientMac|etime|ltime|seconds|utoday|ENTITYID|UFIRSTDAY 
     sql="select entityid,clientmac,utoday,UFIRSTDAY,seconds,"
     sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  2505600 preceding) as day_30," # 2505600 is 29 days
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  518400 preceding)  as day_7," #518400 is 6 days
     sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY  range 1 preceding) as pre_mon "
     sql=sql+"from df_indoor order by entityid,clientmac,utoday" 
     df_id_stat=hc.sql(sql)
     df_id_mm=df_id_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
     #df_id_mm df_min_max ,to caculate firtarrival and last arrival 
     df_id_stat_distinct=df_id_stat.drop("seconds").drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
     #distinct df is for lag function to work
     df_id_prepremon=df_id_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
     
     cond_id = [df_id_mm.clientmac == df_id_prepremon.clientmac, df_id_mm.entityid == df_id_prepremon.entityid, df_id_mm.UFIRSTDAY==df_id_prepremon.UFIRSTDAY]
     df_indoor_fin_tmp=df_id_mm.join(df_id_prepremon, cond_id, 'outer').select(df_id_mm.entityid,df_id_mm.clientmac,df_id_mm.utoday,df_id_mm.UFIRSTDAY,df_id_mm.seconds,df_id_mm.day_30,df_id_mm.day_7,df_id_mm.min,df_id_mm.max,df_id_mm.total_cnt,df_id_prepremon.prepre_mon)
     df_indoor_fin_tmp=df_indoor_fin_tmp.selectExpr("entityid as entityid","clientmac as  clientmac","utoday as utoday","UFIRSTDAY as ufirstday","seconds as secondsbyday","day_30 as indoors30","day_7 as indoors7","min as FirstIndoor","max as LastIndoor","total_cnt as indoors","prepre_mon as indoorsPrevMonth")
     
     #newly added part for indoors7 and indoors30 based on current date
     df_indoor_fin_tmp1= df_indoor_fin_tmp.withColumn("r_day_7", func.when((curUnixDay- df_indoor_fin_tmp.utoday)/86400<7 , 1).otherwise(0))
     df_indoor_fin_tmp2=df_indoor_fin_tmp1.withColumn("r_day_30", func.when((curUnixDay- df_indoor_fin_tmp1.utoday)/86400<30 , 1).otherwise(0))
     df_indoor_fin_tmp3=df_indoor_fin_tmp2.withColumn("r_indoors7",func.sum("r_day_7").over(Window.partitionBy("entityid","clientmac")))
     df_indoor_fin_tmp4=df_indoor_fin_tmp3.withColumn("r_indoors30",func.sum("r_day_30").over(Window.partitionBy("entityid","clientmac")))
     df_indoor_fin=df_indoor_fin_tmp4.drop("r_day_7").drop("r_day_30")
     hc.sql("drop table if exists df_indoor_fin")
     df_indoor_fin.write.saveAsTable("df_indoor_fin")
     
     rows_flow=sc.textFile("/data/flow/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),utoday=int(p[4]),ufirstday=int(p[5])))
     HiveContext.createDataFrame(hc,rows_flow).registerTempTable("df_flow")
     
     # ClientMac|ENTITYID|UFIRSTDAY|etime|ltime|utoday
     sql="select entityid,clientmac,utoday,UFIRSTDAY,"
     sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  2505600 preceding) as day_30," # 2505600 is 29 days
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  518400 preceding)  as day_7," #518400 is 6 days
     sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY  range 1 preceding) as pre_mon "
     sql=sql+"from df_flow order by entityid,clientmac,utoday" 
     df_fl_stat=hc.sql(sql)
     df_fl_mm=df_fl_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
     #df_fl_mm df_min_max ,to caculate firtarrival and last arrival 
     df_fl_stat_distinct=df_fl_stat.drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
     #distinct df is for lag function to work
     df_fl_prepremon=df_fl_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
     
     cond_fl = [df_fl_mm.clientmac == df_fl_prepremon.clientmac, df_fl_mm.entityid == df_fl_prepremon.entityid, df_fl_mm.UFIRSTDAY==df_fl_prepremon.UFIRSTDAY]
     df_flow_fin=df_fl_mm.join(df_fl_prepremon, cond_fl, 'outer').select(df_fl_mm.entityid,df_fl_mm.clientmac,df_fl_mm.utoday,df_fl_mm.UFIRSTDAY,df_fl_mm.day_30,df_fl_mm.day_7,df_fl_mm.min,df_fl_mm.max,df_fl_mm.total_cnt,df_fl_prepremon.prepre_mon)
     df_flow_fin=df_flow_fin.selectExpr("entityid as entityid","clientmac as  clientmac","utoday as utoday","UFIRSTDAY as ufirstday","day_30 as visits30","day_7 as visits7","min as FirstVisit","max as LastVisit","total_cnt as visits","prepre_mon as visitsPrevMonth")
     hc.sql("drop table if exists df_flow_fin")
     df_flow_fin.write.saveAsTable("df_flow_fin") 
开发者ID:dalinqin,项目名称:src,代码行数:52,代码来源:main_report.py


示例19: dataframe

# MAGIC %md ### Question: What is the difference between the revenue of a product and the revenue of the best selling product in the same category as this product?

# COMMAND ----------

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

# Window function partioned by Category and ordered by Revenue
windowSpec = \
  Window \
    .partitionBy(df['category']) \
    .orderBy(df['revenue'].desc()) \
    .rangeBetween(-sys.maxsize, sys.maxsize)
    
# Create dataframe based on the productRevenue table    
dataFrame = sqlContext.table("productRevenue")

# Calculate the Revenue difference
revenue_difference = \
  (func.max(dataFrame['revenue']).over(windowSpec) - dataFrame['revenue'])
  
# Generate a new dataframe (original dataframe and the revenue difference)
revenue_diff = dataFrame.select(
  dataFrame['product'],
  dataFrame['category'],
  dataFrame['revenue'],
  revenue_difference.alias("revenue_difference"))

# Display revenue_diff
display(revenue_diff)
开发者ID:dennyglee,项目名称:databricks,代码行数:31,代码来源:Introducing+Window+Functions+in+Spark+SQL+Notebook.py


示例20:


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python functions.min函数代码示例发布时间:2022-05-27
下一篇:
Python functions.lit函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap