• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python functions.sum函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.sql.functions.sum函数的典型用法代码示例。如果您正苦于以下问题:Python sum函数的具体用法?Python sum怎么用?Python sum使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了sum函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_mixed_sql

    def test_mixed_sql(self):
        """
        Test mixing group aggregate pandas UDF with sql expression.
        """
        df = self.data
        sum_udf = self.pandas_agg_sum_udf

        # Mix group aggregate pandas UDF with sql expression
        result1 = (df.groupby('id')
                   .agg(sum_udf(df.v) + 1)
                   .sort('id'))
        expected1 = (df.groupby('id')
                     .agg(sum(df.v) + 1)
                     .sort('id'))

        # Mix group aggregate pandas UDF with sql expression (order swapped)
        result2 = (df.groupby('id')
                     .agg(sum_udf(df.v + 1))
                     .sort('id'))

        expected2 = (df.groupby('id')
                       .agg(sum(df.v + 1))
                       .sort('id'))

        # Wrap group aggregate pandas UDF with two sql expressions
        result3 = (df.groupby('id')
                   .agg(sum_udf(df.v + 1) + 2)
                   .sort('id'))
        expected3 = (df.groupby('id')
                     .agg(sum(df.v + 1) + 2)
                     .sort('id'))

        self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
        self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
        self.assertPandasEqual(expected3.toPandas(), result3.toPandas())
开发者ID:Brett-A,项目名称:spark,代码行数:35,代码来源:test_pandas_udf_grouped_agg.py


示例2: test_groupedData

    def test_groupedData(self):
        from pyspark.sql import DataFrame
        from pyspark.sql.functions import sum, pandas_udf, PandasUDFType
        from ts.flint import TimeSeriesGroupedData

        price = self.price()

        assert(type(price.groupBy('time')) is TimeSeriesGroupedData)
        assert(type(price.groupby('time')) is TimeSeriesGroupedData)

        result1 = price.groupBy('time').agg(sum(price['price'])).sort('time').toPandas()
        expected1 = DataFrame.groupBy(price, 'time').agg(sum(price['price'])).sort('time').toPandas()
        assert_same(result1, expected1)

        result2 = price.groupBy('time').pivot('id').sum('price').toPandas()
        expected2 = DataFrame.groupBy(price, 'time').pivot('id').sum('price').toPandas()
        assert_same(result2, expected2)

        @pandas_udf(price.schema, PandasUDFType.GROUPED_MAP)
        def foo(df):
            return df
        result3 = price.groupby('time').apply(foo).toPandas()
        expected3 = DataFrame.groupBy(price, 'time').apply(foo).toPandas()
        assert_same(result3, expected3)

        result4 = price.groupby('time').count().toPandas()
        expected4 = DataFrame.groupBy(price, 'time').count().toPandas()
        assert_same(result4, expected4)

        result5 = price.groupby('time').mean('price').toPandas()
        expected5 = DataFrame.groupBy(price, 'time').mean('price').toPandas()
        assert_same(result5, expected5)
开发者ID:mattomatic,项目名称:flint,代码行数:32,代码来源:test_dataframe_api.py


示例3: test_window_functions_cumulative_sum

    def test_window_functions_cumulative_sum(self):
        df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", "value"])
        from pyspark.sql import functions as F

        # Test cumulative sum
        sel = df.select(
            df.key,
            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding, 0)))
        rs = sorted(sel.collect())
        expected = [("one", 1), ("two", 3)]
        for r, ex in zip(rs, expected):
            self.assertEqual(tuple(r), ex[:len(r)])

        # Test boundary values less than JVM's Long.MinValue and make sure we don't overflow
        sel = df.select(
            df.key,
            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding - 1, 0)))
        rs = sorted(sel.collect())
        expected = [("one", 1), ("two", 3)]
        for r, ex in zip(rs, expected):
            self.assertEqual(tuple(r), ex[:len(r)])

        # Test boundary values greater than JVM's Long.MaxValue and make sure we don't overflow
        frame_end = Window.unboundedFollowing + 1
        sel = df.select(
            df.key,
            F.sum(df.value).over(Window.rowsBetween(Window.currentRow, frame_end)))
        rs = sorted(sel.collect())
        expected = [("one", 3), ("two", 2)]
        for r, ex in zip(rs, expected):
            self.assertEqual(tuple(r), ex[:len(r)])
开发者ID:JingchengDu,项目名称:spark,代码行数:31,代码来源:test_context.py


示例4: test_nondeterministic_vectorized_udf_in_aggregate

    def test_nondeterministic_vectorized_udf_in_aggregate(self):
        df = self.spark.range(10)
        random_udf = self.nondeterministic_vectorized_udf

        with QuietTest(self.sc):
            with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'):
                df.groupby(df.id).agg(sum(random_udf(df.id))).collect()
            with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'):
                df.agg(sum(random_udf(df.id))).collect()
开发者ID:q977734161,项目名称:spark,代码行数:9,代码来源:test_pandas_udf_scalar.py


示例5: test_nondeterministic_udf_in_aggregate

    def test_nondeterministic_udf_in_aggregate(self):
        from pyspark.sql.functions import udf, sum
        import random
        udf_random_col = udf(lambda: int(100 * random.random()), 'int').asNondeterministic()
        df = self.spark.range(10)

        with QuietTest(self.sc):
            with self.assertRaisesRegexp(AnalysisException, "nondeterministic"):
                df.groupby('id').agg(sum(udf_random_col())).collect()
            with self.assertRaisesRegexp(AnalysisException, "nondeterministic"):
                df.agg(sum(udf_random_col())).collect()
开发者ID:drewrobb,项目名称:spark,代码行数:11,代码来源:test_udf.py


示例6: gen_report_table

 def gen_report_table(hc,curUnixDay):
     rows_indoor=sc.textFile("/data/indoor/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),seconds=int(p[4]),utoday=int(p[5]),ufirstday=int(p[6])))
     HiveContext.createDataFrame(hc,rows_indoor).registerTempTable("df_indoor")
     #ClientMac|etime|ltime|seconds|utoday|ENTITYID|UFIRSTDAY 
     sql="select entityid,clientmac,utoday,UFIRSTDAY,seconds,"
     sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  2505600 preceding) as day_30," # 2505600 is 29 days
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  518400 preceding)  as day_7," #518400 is 6 days
     sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY  range 1 preceding) as pre_mon "
     sql=sql+"from df_indoor order by entityid,clientmac,utoday" 
     df_id_stat=hc.sql(sql)
     df_id_mm=df_id_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
     #df_id_mm df_min_max ,to caculate firtarrival and last arrival 
     df_id_stat_distinct=df_id_stat.drop("seconds").drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
     #distinct df is for lag function to work
     df_id_prepremon=df_id_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
     
     cond_id = [df_id_mm.clientmac == df_id_prepremon.clientmac, df_id_mm.entityid == df_id_prepremon.entityid, df_id_mm.UFIRSTDAY==df_id_prepremon.UFIRSTDAY]
     df_indoor_fin_tmp=df_id_mm.join(df_id_prepremon, cond_id, 'outer').select(df_id_mm.entityid,df_id_mm.clientmac,df_id_mm.utoday,df_id_mm.UFIRSTDAY,df_id_mm.seconds,df_id_mm.day_30,df_id_mm.day_7,df_id_mm.min,df_id_mm.max,df_id_mm.total_cnt,df_id_prepremon.prepre_mon)
     df_indoor_fin_tmp=df_indoor_fin_tmp.selectExpr("entityid as entityid","clientmac as  clientmac","utoday as utoday","UFIRSTDAY as ufirstday","seconds as secondsbyday","day_30 as indoors30","day_7 as indoors7","min as FirstIndoor","max as LastIndoor","total_cnt as indoors","prepre_mon as indoorsPrevMonth")
     
     #newly added part for indoors7 and indoors30 based on current date
     df_indoor_fin_tmp1= df_indoor_fin_tmp.withColumn("r_day_7", func.when((curUnixDay- df_indoor_fin_tmp.utoday)/86400<7 , 1).otherwise(0))
     df_indoor_fin_tmp2=df_indoor_fin_tmp1.withColumn("r_day_30", func.when((curUnixDay- df_indoor_fin_tmp1.utoday)/86400<30 , 1).otherwise(0))
     df_indoor_fin_tmp3=df_indoor_fin_tmp2.withColumn("r_indoors7",func.sum("r_day_7").over(Window.partitionBy("entityid","clientmac")))
     df_indoor_fin_tmp4=df_indoor_fin_tmp3.withColumn("r_indoors30",func.sum("r_day_30").over(Window.partitionBy("entityid","clientmac")))
     df_indoor_fin=df_indoor_fin_tmp4.drop("r_day_7").drop("r_day_30")
     hc.sql("drop table if exists df_indoor_fin")
     df_indoor_fin.write.saveAsTable("df_indoor_fin")
     
     rows_flow=sc.textFile("/data/flow/*/*").map(lambda r: r.split(",")).map(lambda p: Row(clientmac=p[0], entityid=int(p[1]),etime=int(p[2]),ltime=int(p[3]),utoday=int(p[4]),ufirstday=int(p[5])))
     HiveContext.createDataFrame(hc,rows_flow).registerTempTable("df_flow")
     
     # ClientMac|ENTITYID|UFIRSTDAY|etime|ltime|utoday
     sql="select entityid,clientmac,utoday,UFIRSTDAY,"
     sql=sql+"count(1) over(partition by entityid,clientmac) as total_cnt,"
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  2505600 preceding) as day_30," # 2505600 is 29 days
     sql=sql+"count(1) over (partition by entityid,clientmac order by utoday range  518400 preceding)  as day_7," #518400 is 6 days
     sql=sql+"count(1) over (partition by entityid,clientmac,UFIRSTDAY order by UFIRSTDAY  range 1 preceding) as pre_mon "
     sql=sql+"from df_flow order by entityid,clientmac,utoday" 
     df_fl_stat=hc.sql(sql)
     df_fl_mm=df_fl_stat.withColumn("min", func.min("utoday").over(Window.partitionBy("entityid","clientmac"))).withColumn("max", func.max("utoday").over(Window.partitionBy("entityid","clientmac")))
     #df_fl_mm df_min_max ,to caculate firtarrival and last arrival 
     df_fl_stat_distinct=df_fl_stat.drop("day_30").drop("day_7").drop("utoday").drop("total_cnt").distinct()
     #distinct df is for lag function to work
     df_fl_prepremon=df_fl_stat_distinct.withColumn("prepre_mon",func.lag("pre_mon").over(Window.partitionBy("entityid","clientmac").orderBy("entityid","clientmac","UFIRSTDAY"))).drop("pre_mon").na.fill(0)
     
     cond_fl = [df_fl_mm.clientmac == df_fl_prepremon.clientmac, df_fl_mm.entityid == df_fl_prepremon.entityid, df_fl_mm.UFIRSTDAY==df_fl_prepremon.UFIRSTDAY]
     df_flow_fin=df_fl_mm.join(df_fl_prepremon, cond_fl, 'outer').select(df_fl_mm.entityid,df_fl_mm.clientmac,df_fl_mm.utoday,df_fl_mm.UFIRSTDAY,df_fl_mm.day_30,df_fl_mm.day_7,df_fl_mm.min,df_fl_mm.max,df_fl_mm.total_cnt,df_fl_prepremon.prepre_mon)
     df_flow_fin=df_flow_fin.selectExpr("entityid as entityid","clientmac as  clientmac","utoday as utoday","UFIRSTDAY as ufirstday","day_30 as visits30","day_7 as visits7","min as FirstVisit","max as LastVisit","total_cnt as visits","prepre_mon as visitsPrevMonth")
     hc.sql("drop table if exists df_flow_fin")
     df_flow_fin.write.saveAsTable("df_flow_fin") 
开发者ID:dalinqin,项目名称:src,代码行数:52,代码来源:main_report.py


示例7: test_multiple_udfs

    def test_multiple_udfs(self):
        """
        Test multiple group aggregate pandas UDFs in one agg function.
        """
        from pyspark.sql.functions import sum, mean

        df = self.data
        mean_udf = self.pandas_agg_mean_udf
        sum_udf = self.pandas_agg_sum_udf
        weighted_mean_udf = self.pandas_agg_weighted_mean_udf

        result1 = (df.groupBy('id')
                   .agg(mean_udf(df.v),
                        sum_udf(df.v),
                        weighted_mean_udf(df.v, df.w))
                   .sort('id')
                   .toPandas())
        expected1 = (df.groupBy('id')
                     .agg(mean(df.v),
                          sum(df.v),
                          mean(df.v).alias('weighted_mean(v, w)'))
                     .sort('id')
                     .toPandas())

        self.assertPandasEqual(expected1, result1)
开发者ID:JingchengDu,项目名称:spark,代码行数:25,代码来源:test_pandas_udf_grouped_agg.py


示例8: getValueFieldValueLists

 def getValueFieldValueLists(self, handlerId, keyFields, valueFields):
     df = self.entity.groupBy(keyFields)
     agg = self.options.get("aggregation",self.getDefaultAggregation(handlerId))
     maxRows = int(self.options.get("rowCount","100"))
     numRows = min(maxRows,df.count())
     valueLists = []
     for valueField in valueFields:
         valueDf = None
         if agg == "SUM":
             valueDf = df.agg(F.sum(valueField).alias("agg"))
         elif agg == "AVG":
             valueDf = df.agg(F.avg(valueField).alias("agg"))
         elif agg == "MIN":
             valueDf = df.agg(F.min(valueField).alias("agg"))
         elif agg == "MAX":
             valueDf = df.agg(F.max(valueField).alias("agg"))
         else:
             valueDf = df.agg(F.count(valueField).alias("agg"))
         for keyField in keyFields:
             valueDf = valueDf.sort(F.col(keyField).asc())
         valueDf = valueDf.dropna()
         rows = valueDf.select("agg").take(numRows)
         valueList = []
         for row in rows:
             valueList.append(row["agg"])
         valueLists.append(valueList)
     return valueLists   
开发者ID:ygoverdhan,项目名称:pixiedust,代码行数:27,代码来源:mpld3ChartDisplay.py


示例9: run_benchmarks

def run_benchmarks(base_path):
    print("=========================================================================================")
    print("Loading data for: ")
    print(base_path)
    print("=========================================================================================")

    start=time.time()
    df=hive_context.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(base_path)
    #print(df)
    #print(df.printSchema())
    print(df.count())
    df.cache()

    print("Time taken for groupBy on DataFrame column C followed by sum aggregate: ")
    start_task=time.time()
    df_groupby_C=df.groupBy('C').agg(F.sum(df.id))
    print(df_groupby_C.count())
    end_task=time.time()
    end=time.time()

    x=[base_path, end-start, end_task-start_task]
    print("=========================================================================================")
    print("OUTPUT")
    print(x)
    print("=========================================================================================")
    return x
开发者ID:MadStreetDen-OpenSource,项目名称:datatool_benchmarks,代码行数:26,代码来源:spark_df_groupby_C_benchmark.py


示例10: sum_aggregations

def sum_aggregations(category, hours=None):
    actual_suffix = ''
    new_suffix = '_%s' % category
    if hours:
        actual_suffix = '_%s' % category
        new_suffix += '_%sh' % hours

    return [func.sum(column + actual_suffix).alias(column + new_suffix) for column in ['Pickup_Count', 'Dropoff_Count']]
开发者ID:georgwiese,项目名称:MMDS-sose16,代码行数:8,代码来源:build_feature_vector.py


示例11: test_retain_group_columns

    def test_retain_group_columns(self):
        with self.sql_conf({"spark.sql.retainGroupColumns": False}):
            df = self.data
            sum_udf = self.pandas_agg_sum_udf

            result1 = df.groupby(df.id).agg(sum_udf(df.v))
            expected1 = df.groupby(df.id).agg(sum(df.v))
            self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
开发者ID:Brett-A,项目名称:spark,代码行数:8,代码来源:test_pandas_udf_grouped_agg.py


示例12: formatItens

def formatItens(firstTime):
    #format itenary data
    global itens
    itens = itens.withColumn("ORIGIN_AIRPORT_ID",toInt("ORIGIN_AIRPORT_ID"))
    itens = itens.withColumn("DEST_AIRPORT_ID",toInt("DEST_AIRPORT_ID"))
    itens = itens.withColumn("MARKET_MILES_FLOWN",toKm("MARKET_MILES_FLOWN"))
    itens = itens.withColumn("PASSENGERS",toInt("PASSENGERS"))
    if firstTime:
        aggArg = sum("PASSENGERS").alias("PASSENGERS"),mean("MARKET_MILES_FLOWN").alias("MARKET_KMS_FLOWN")
        itens = itens.groupBy("ORIGIN_AIRPORT_ID","DEST_AIRPORT_ID").agg(*aggArg).cache()
开发者ID:RobGeada,项目名称:FlightOptimize,代码行数:10,代码来源:FlightGUI.py


示例13: runBPwithGraphFrames

    def runBPwithGraphFrames(cls, g, numIter):
        """Run Belief Propagation using GraphFrame.

        This implementation of BP shows how to use GraphFrame's aggregateMessages method.
        """
        # choose colors for vertices for BP scheduling
        colorG = cls._colorGraph(g)
        numColors = colorG.vertices.select('color').distinct().count()

        # TODO: handle vertices without any edges

        # initialize vertex beliefs at 0.0
        gx = GraphFrame(colorG.vertices.withColumn('belief', sqlfunctions.lit(0.0)), colorG.edges)

        # run BP for numIter iterations
        for iter_ in range(numIter):
            # for each color, have that color receive messages from neighbors
            for color in range(numColors):
                # Send messages to vertices of the current color.
                # We may send to source or destination since edges are treated as undirected.
                msgForSrc = sqlfunctions.when(
                    AM.src['color'] == color,
                    AM.edge['b'] * AM.dst['belief'])
                msgForDst = sqlfunctions.when(
                    AM.dst['color'] == color,
                    AM.edge['b'] * AM.src['belief'])
                # numerically stable sigmoid
                logistic = sqlfunctions.udf(cls._sigmoid, returnType=types.DoubleType())
                aggregates = gx.aggregateMessages(
                    sqlfunctions.sum(AM.msg).alias("aggMess"),
                    sendToSrc=msgForSrc,
                    sendToDst=msgForDst)
                v = gx.vertices
                # receive messages and update beliefs for vertices of the current color
                newBeliefCol = sqlfunctions.when(
                    (v['color'] == color) & (aggregates['aggMess'].isNotNull()),
                    logistic(aggregates['aggMess'] + v['a'])
                ).otherwise(v['belief'])  # keep old beliefs for other colors
                newVertices = (v
                    .join(aggregates, on=(v['id'] == aggregates['id']), how='left_outer')
                    .drop(aggregates['id'])  # drop duplicate ID column (from outer join)
                    .withColumn('newBelief', newBeliefCol)  # compute new beliefs
                    .drop('aggMess')  # drop messages
                    .drop('belief')  # drop old beliefs
                    .withColumnRenamed('newBelief', 'belief')
                )
                # cache new vertices using workaround for SPARK-1334
                cachedNewVertices = AM.getCachedDataFrame(newVertices)
                gx = GraphFrame(cachedNewVertices, gx.edges)

        # Drop the "color" column from vertices
        return GraphFrame(gx.vertices.drop('color'), gx.edges)
开发者ID:mengxr,项目名称:graphframes,代码行数:52,代码来源:belief_propagation.py


示例14: compute

def compute(day):
    # On veut les jours day-30 à day-1
    sums = wikipediadata.where(
            (wikipediadata.day >= day-30) & (wikipediadata.day <= day-1))

    # Sous-ensemble de test
    #sums = sums.where((sums.page == 'Cadillac_Brougham') | ((sums.page == 'Roald_Dahl') & (sums.projectcode == 'fr')))

    # On somme les heures de la journées
    sums = sums.groupby('projectcode', 'page', 'day').sum('views')
    # On cache pour plus tard
    sums.cache()

    # on définit une windows := jour precedent
    window_spec =  Window.partitionBy(sums.projectcode, sums.page) \
            .orderBy(sums.day.asc()).rowsBetween(-1, -1)

    # on calcule la différence entre views(d) - views(d-1)
    diffs = sums.withColumn('diff', sums.views - F.sum(sums.views) \
            .over(window_spec))

    # on calcule les coefs à appliquer à chaque jour
    coefs = pd.DataFrame({'day': range(day-30, day)})
    coefs['coef'] = 1. / (day - coefs.day)

    coefs = hc.createDataFrame(coefs)
    diffs = diffs.join(coefs, 'day')

    # on calcul le score de chaque jour
    diffs = diffs.withColumn('sub_score', diffs.diff * diffs.coef)

    totals = diffs.groupby('projectcode', 'page').sum('views', 'sub_score')
    # on normalise par la racine de la somme des views 
    totals = totals.withColumn('score',
            totals['SUM(sub_score)'] / F.sqrt(totals['SUM(views)'])) \
            .orderBy(F.desc('score')) \
            .withColumnRenamed('SUM(views)', 'total_views') \
            .limit(10)

    views = sums.select('projectcode', 'page', 'day', 'views') \
           .join(totals.select('projectcode', 'page', 'total_views', 'score'), 
                  (totals.projectcode == sums.projectcode) & (totals.page == sums.page), 'right_outer')

    df = totals.select('projectcode', 'page', 'total_views', 'score').toPandas()
    df2 = views.toPandas()
    df2 = df2.iloc[:, 2:]
    df2 = df2.pivot_table(values='views', columns=['day'], index=['projectcode', 'page'], fill_value=0)
    df = df.merge(df2, left_on=['projectcode', 'page'], right_index=True)
    df.to_csv(filename(day), index=False)
    
    # on vide le cache
    hc.clearCache()
开发者ID:cygilbert,项目名称:projetnosql,代码行数:52,代码来源:Req30j.py


示例15: makeMapping

def makeMapping(firstTime):
    global routes
    grpString = "ORIGIN_AIRPORT_ID","ORIGIN_CITY_NAME","ORIGIN","DEST_AIRPORT_ID","DEST_CITY_NAME","DEST","UNIQUE_CARRIER_NAME"
    if firstTime:
        routes = routes.groupBy(*grpString).agg(sum("PASSENGERS").alias("PASSENGERS"),sum("DEPARTURES_PERFORMED").alias("DEPARTURES_PERFORMED"),mean("RAMP_TO_RAMP").alias("RAMP_TO_RAMP"))
    for i in routes.collect():
        if not dictAir.get("Airport{}".format(i[0])):
            initNode(i[0],(i[1],i[2]),i[8])
        if not dictAir.get("Airport{}".format(i[3])):
            initNode(i[3],(i[4],i[5]),0)
        if (i[9]!=9876543.21):
            tripTime =i[9]
            getApt(i[0])['depts'] += i[8]
            sourceCNX = getApt(i[0])['cnx']
            sourceCNX.append((int(i[3]),tripTime,i[6]))
开发者ID:RobGeada,项目名称:FlightOptimize,代码行数:15,代码来源:FlightGUI.py


示例16: test_udf_with_aggregate_function

    def test_udf_with_aggregate_function(self):
        df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
        from pyspark.sql.functions import udf, col, sum
        from pyspark.sql.types import BooleanType

        my_filter = udf(lambda a: a == 1, BooleanType())
        sel = df.select(col("key")).distinct().filter(my_filter(col("key")))
        self.assertEqual(sel.collect(), [Row(key=1)])

        my_copy = udf(lambda x: x, IntegerType())
        my_add = udf(lambda a, b: int(a + b), IntegerType())
        my_strlen = udf(lambda x: len(x), IntegerType())
        sel = df.groupBy(my_copy(col("key")).alias("k"))\
            .agg(sum(my_strlen(col("value"))).alias("s"))\
            .select(my_add(col("k"), col("s")).alias("t"))
        self.assertEqual(sel.collect(), [Row(t=4), Row(t=3)])
开发者ID:drewrobb,项目名称:spark,代码行数:16,代码来源:test_udf.py


示例17: handleUIOptions

    def handleUIOptions(self, displayColName):
        agg = self.options.get("aggregation")
        valFields = self.options.get("valueFields")

        if agg == 'COUNT':
            return self.entity.groupBy(displayColName).agg(F.count(displayColName).alias("agg")).toPandas()
        elif agg == 'SUM':
            return self.entity.groupBy(displayColName).agg(F.sum(valFields).alias("agg")).toPandas()
        elif agg == 'AVG':
            return self.entity.groupBy(displayColName).agg(F.avg(valFields).alias("agg")).toPandas()
        elif agg == 'MIN':
            return self.entity.groupBy(displayColName).agg(F.min(valFields).alias("agg")).toPandas()
        elif agg == 'MAX':
            return self.entity.groupBy(displayColName).agg(F.max(valFields).alias("agg")).toPandas()
        elif agg == 'MEAN':
            return self.entity.groupBy(displayColName).agg(F.mean(valFields).alias("agg")).toPandas()
        else:
            return self.entity.groupBy(displayColName).agg(F.count(displayColName).alias("agg")).toPandas()
开发者ID:ygoverdhan,项目名称:pixiedust,代码行数:18,代码来源:pieChartDisplay.py


示例18: test_aggregate_messages

 def test_aggregate_messages(self):
     g = self._graph("friends")
     # For each user, sum the ages of the adjacent users,
     # plus 1 for the src's sum if the edge is "friend".
     sendToSrc = (
         AM.dst['age'] +
         sqlfunctions.when(
             AM.edge['relationship'] == 'friend',
             sqlfunctions.lit(1)
         ).otherwise(0))
     sendToDst = AM.src['age']
     agg = g.aggregateMessages(
         sqlfunctions.sum(AM.msg).alias('summedAges'),
         sendToSrc=sendToSrc,
         sendToDst=sendToDst)
     # Run the aggregation again providing SQL expressions as String instead.
     agg2 = g.aggregateMessages(
         "sum(MSG) AS `summedAges`",
         sendToSrc="(dst['age'] + CASE WHEN (edge['relationship'] = 'friend') THEN 1 ELSE 0 END)",
         sendToDst="src['age']")
     # Convert agg and agg2 to a mapping from id to the aggregated message.
     aggMap = {id_: s for id_, s in agg.select('id', 'summedAges').collect()}
     agg2Map = {id_: s for id_, s in agg2.select('id', 'summedAges').collect()}
     # Compute the truth via brute force.
     user2age = {id_: age for id_, age in g.vertices.select('id', 'age').collect()}
     trueAgg = {}
     for src, dst, rel in g.edges.select("src", "dst", "relationship").collect():
         trueAgg[src] = trueAgg.get(src, 0) + user2age[dst] + (1 if rel == 'friend' else 0)
         trueAgg[dst] = trueAgg.get(dst, 0) + user2age[src]
     # Compare if the agg mappings match the brute force mapping
     self.assertEqual(aggMap, trueAgg)
     self.assertEqual(agg2Map, trueAgg)
     # Check that TypeError is raises with messages of wrong type
     with self.assertRaises(TypeError):
         g.aggregateMessages(
             "sum(MSG) AS `summedAges`",
             sendToSrc=object(),
             sendToDst="src['age']")
     with self.assertRaises(TypeError):
         g.aggregateMessages(
             "sum(MSG) AS `summedAges`",
             sendToSrc=dst['age'],
             sendToDst=object())
开发者ID:mengxr,项目名称:graphframes,代码行数:43,代码来源:tests.py


示例19: test_wrong_args

    def test_wrong_args(self):
        df = self.data

        with QuietTest(self.sc):
            with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
                df.groupby('id').apply(lambda x: x)
            with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
                df.groupby('id').apply(udf(lambda x: x, DoubleType()))
            with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
                df.groupby('id').apply(sum(df.v))
            with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
                df.groupby('id').apply(df.v + 1)
            with self.assertRaisesRegexp(ValueError, 'Invalid function'):
                df.groupby('id').apply(
                    pandas_udf(lambda: 1, StructType([StructField("d", DoubleType())])))
            with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
                df.groupby('id').apply(pandas_udf(lambda x, y: x, DoubleType()))
            with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUPED_MAP'):
                df.groupby('id').apply(
                    pandas_udf(lambda x, y: x, DoubleType(), PandasUDFType.SCALAR))
开发者ID:CodingCat,项目名称:spark,代码行数:20,代码来源:test_pandas_udf_grouped_map.py


示例20: test_complex_groupby

    def test_complex_groupby(self):
        from pyspark.sql.functions import sum

        df = self.data
        sum_udf = self.pandas_agg_sum_udf
        plus_one = self.python_plus_one
        plus_two = self.pandas_scalar_plus_two

        # groupby one expression
        result1 = df.groupby(df.v % 2).agg(sum_udf(df.v))
        expected1 = df.groupby(df.v % 2).agg(sum(df.v))

        # empty groupby
        result2 = df.groupby().agg(sum_udf(df.v))
        expected2 = df.groupby().agg(sum(df.v))

        # groupby one column and one sql expression
        result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
        expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)).orderBy(df.id, df.v % 2)

        # groupby one python UDF
        result4 = df.groupby(plus_one(df.id)).agg(sum_udf(df.v))
        expected4 = df.groupby(plus_one(df.id)).agg(sum(df.v))

        # groupby one scalar pandas UDF
        result5 = df.groupby(plus_two(df.id)).agg(sum_udf(df.v))
        expected5 = df.groupby(plus_two(df.id)).agg(sum(df.v))

        # groupby one expression and one python UDF
        result6 = df.groupby(df.v % 2, plus_one(df.id)).agg(sum_udf(df.v))
        expected6 = df.groupby(df.v % 2, plus_one(df.id)).agg(sum(df.v))

        # groupby one expression and one scalar pandas UDF
        result7 = df.groupby(df.v % 2, plus_two(df.id)).agg(sum_udf(df.v)).sort('sum(v)')
        expected7 = df.groupby(df.v % 2, plus_two(df.id)).agg(sum(df.v)).sort('sum(v)')

        self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
        self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
        self.assertPandasEqual(expected3.toPandas(), result3.toPandas())
        self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
        self.assertPandasEqual(expected5.toPandas(), result5.toPandas())
        self.assertPandasEqual(expected6.toPandas(), result6.toPandas())
        self.assertPandasEqual(expected7.toPandas(), result7.toPandas())
开发者ID:JingchengDu,项目名称:spark,代码行数:43,代码来源:test_pandas_udf_grouped_agg.py



注:本文中的pyspark.sql.functions.sum函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python functions.udf函数代码示例发布时间:2022-05-27
下一篇:
Python functions.rand函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap