• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python functions.lit函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.sql.functions.lit函数的典型用法代码示例。如果您正苦于以下问题:Python lit函数的具体用法?Python lit怎么用?Python lit使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了lit函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: data

 def data(self):
     from pyspark.sql.functions import array, explode, col, lit
     return self.spark.range(10).toDF('id') \
         .withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \
         .withColumn("v", explode(col('vs'))) \
         .drop('vs') \
         .withColumn('w', lit(1.0))
开发者ID:JingchengDu,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_grouped_agg.py


示例2: test_basic

    def test_basic(self):
        df = self.data
        weighted_mean_udf = self.pandas_agg_weighted_mean_udf

        # Groupby one column and aggregate one UDF with literal
        result1 = df.groupby('id').agg(weighted_mean_udf(df.v, lit(1.0))).sort('id')
        expected1 = df.groupby('id').agg(mean(df.v).alias('weighted_mean(v, 1.0)')).sort('id')
        self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

        # Groupby one expression and aggregate one UDF with literal
        result2 = df.groupby((col('id') + 1)).agg(weighted_mean_udf(df.v, lit(1.0)))\
            .sort(df.id + 1)
        expected2 = df.groupby((col('id') + 1))\
            .agg(mean(df.v).alias('weighted_mean(v, 1.0)')).sort(df.id + 1)
        self.assertPandasEqual(expected2.toPandas(), result2.toPandas())

        # Groupby one column and aggregate one UDF without literal
        result3 = df.groupby('id').agg(weighted_mean_udf(df.v, df.w)).sort('id')
        expected3 = df.groupby('id').agg(mean(df.v).alias('weighted_mean(v, w)')).sort('id')
        self.assertPandasEqual(expected3.toPandas(), result3.toPandas())

        # Groupby one expression and aggregate one UDF without literal
        result4 = df.groupby((col('id') + 1).alias('id'))\
            .agg(weighted_mean_udf(df.v, df.w))\
            .sort('id')
        expected4 = df.groupby((col('id') + 1).alias('id'))\
            .agg(mean(df.v).alias('weighted_mean(v, w)'))\
            .sort('id')
        self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
开发者ID:Brett-A,项目名称:spark,代码行数:29,代码来源:test_pandas_udf_grouped_agg.py


示例3: make_prediction

def  make_prediction(event, df):
    event_timestamp, event_dayofweek, pickup_lat, pickup_lon, dropoff_lat, dropoff_lon, event_passenger_count = event[0], event[1], event[2], event[3], event[4], event[5], event[6]
    udf_diff_timeofday=udf(utils.diff_timeofday, IntegerType())
    udf_shortest_distance=udf(utils.shortest_distance, FloatType())
    df = df.withColumn("diff_timeofday", udf_diff_timeofday(df.pickup, lit(event_timestamp))).filter("`diff_timeofday` < 30")
    df = df.withColumn("event_sum_distance",
        udf_shortest_distance(df.pick_lat, df.pick_lon, lit(pickup_lat), lit(pickup_lon))+udf_shortest_distance(df.drop_lat, df.drop_lon, lit(dropoff_lat), lit(dropoff_lon))).filter("`event_sum_distance` < 2")
    df = df.sort('event_sum_distance')
    if df.count() < 10:
        return [0,0]
    a = pd.DataFrame(df.take(50))
    a.columns = df.columns

    speed_array = a.as_matrix(["avg_speed"])
    dist_sf_array = a.as_matrix(["dist_sf"])
    distance_array = a["trip_distance"].tolist()
    fare_array = a["total_notip"].tolist()
    time_array = a["trip_time_in_secs"].tolist()

    #set initial parameter values
    x0 = [0.5, 0.5, 3.0, 3.0]
    bnds = ((0.25, 0.75), (0.25, 0.75), (0.1,20), (0,10))
    
    #perform the fit
    res = optimize.minimize(func_to_optimize, x0, args=(distance_array, time_array, fare_array), method='TNC', bounds=bnds)
    grid_dist = utils.grid_distance(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon)

    #get the predictions
    time_pred = utils.time_prediction(speed_array.mean(), grid_dist, dist_sf_array.mean())
    fare_pred = utils.fare_prediction(res.x[0], grid_dist, dist_sf_array.mean(), res.x[1], res.x[2], res.x[3])
    if res.success == True:
        return [fare_pred, time_pred]
    else:
        return [0,0]
开发者ID:jgran,项目名称:TaxiPredict,代码行数:34,代码来源:local_fare.py


示例4: test_udf_with_decorator

    def test_udf_with_decorator(self):
        from pyspark.sql.functions import lit, udf
        from pyspark.sql.types import IntegerType, DoubleType

        @udf(IntegerType())
        def add_one(x):
            if x is not None:
                return x + 1

        @udf(returnType=DoubleType())
        def add_two(x):
            if x is not None:
                return float(x + 2)

        @udf
        def to_upper(x):
            if x is not None:
                return x.upper()

        @udf()
        def to_lower(x):
            if x is not None:
                return x.lower()

        @udf
        def substr(x, start, end):
            if x is not None:
                return x[start:end]

        @udf("long")
        def trunc(x):
            return int(x)

        @udf(returnType="double")
        def as_double(x):
            return float(x)

        df = (
            self.spark
                .createDataFrame(
                    [(1, "Foo", "foobar", 3.0)], ("one", "Foo", "foobar", "float"))
                .select(
                    add_one("one"), add_two("one"),
                    to_upper("Foo"), to_lower("Foo"),
                    substr("foobar", lit(0), lit(3)),
                    trunc("float"), as_double("one")))

        self.assertListEqual(
            [tpe for _, tpe in df.dtypes],
            ["int", "double", "string", "string", "string", "bigint", "double"]
        )

        self.assertListEqual(
            list(df.first()),
            [2, 3.0, "FOO", "foo", "foo", 3, 1.0]
        )
开发者ID:drewrobb,项目名称:spark,代码行数:56,代码来源:test_udf.py


示例5: _reduce_for_stat_function

    def _reduce_for_stat_function(self, sfun, only_numeric):
        groupkeys = self._groupkeys
        groupkey_cols = [s._scol.alias('__index_level_{}__'.format(i))
                         for i, s in enumerate(groupkeys)]
        sdf = self._kdf._sdf

        data_columns = []
        if len(self._agg_columns) > 0:
            stat_exprs = []
            for ks in self._agg_columns:
                spark_type = ks.spark_type
                # TODO: we should have a function that takes dataframes and converts the numeric
                # types. Converting the NaNs is used in a few places, it should be in utils.
                # Special handle floating point types because Spark's count treats nan as a valid
                # value, whereas Pandas count doesn't include nan.
                if isinstance(spark_type, DoubleType) or isinstance(spark_type, FloatType):
                    stat_exprs.append(sfun(F.nanvl(ks._scol, F.lit(None))).alias(ks.name))
                    data_columns.append(ks.name)
                elif isinstance(spark_type, NumericType) or not only_numeric:
                    stat_exprs.append(sfun(ks._scol).alias(ks.name))
                    data_columns.append(ks.name)
            sdf = sdf.groupby(*groupkey_cols).agg(*stat_exprs)
        else:
            sdf = sdf.select(*groupkey_cols).distinct()
        sdf = sdf.sort(*groupkey_cols)
        metadata = Metadata(data_columns=data_columns,
                            index_map=[('__index_level_{}__'.format(i), s.name)
                                       for i, s in enumerate(groupkeys)])
        return DataFrame(sdf, metadata)
开发者ID:zhang01GA,项目名称:koalas,代码行数:29,代码来源:groupby.py


示例6: Dijkstra

def Dijkstra(graph, start, end = None):
    # dist = sqlContext.createDataFrame(sc.emptyRDD(), StructType([]))
    field = [StructField("weight", FloatType(), True)]
    schema = StructType(field)
    
    dist = sqlContext.createDataFrame(sc.emptyRDD(), schema)
    # prev = sqlContext.createDataFrame(sc.emptyRDD(), StructType([]))
    prev = sqlContext.createDataFrame(sc.emptyRDD(), schema)
    
    queue = sqlContext.createDataFrame(sc.emptyRDD(), StructType([]))

    queue = queue.withColumn("weight", lit(0))
    
    for v in queue.show():
        dist_length = dist.filter(dist.weight == v)
        queue_length = queue.filter(queue.weight == v)
        dist_length = queue_length
        if dist_length == end:
            break 
        
        for w in graph.edges.filter("src = " + str(v)):
            vw_length = dist_length + graph.edges.filter("src = " + str(v) + "and dst = " + str(w))
            curr_w = queue.filter("weight = " + str(w))
            if dist.filter("weight = " + str(w)) != "":
                print "Dijkstra: found better path to already-final vertex"
            elif curr_w != "" or vw_length < curr_w:
                queue = queue.replace(queue.filter("weight = " + str(w)), vw_length)
                prev = prev.replace(prev.filter("weight = " + str(w)), v)                
    return(dist)
开发者ID:Entelodont,项目名称:TheOracle,代码行数:29,代码来源:dijkstra_pyspark.py


示例7: test_udf

    def test_udf(self):
        from ts.flint import udf
        import pyspark.sql.functions as F
        from pyspark.sql.types import LongType

        vol = self.vol()

        @udf(LongType())
        def foo(v, w):
            return v*2

        result1 = vol.withColumn("volume", foo(vol['volume'], F.lit(42))).toPandas()
        result2 = vol.withColumn("volume", udf(lambda v, w: v*2, LongType())(vol['volume'], F.lit(42))).toPandas()

        expected_pdf1 = make_pdf([
            (1000, 7, 200,),
            (1000, 3, 400,),
            (1050, 3, 600,),
            (1050, 7, 800,),
            (1100, 3, 1000,),
            (1100, 7, 1200,),
            (1150, 3, 1400,),
            (1150, 7, 1600,),
            (1200, 3, 1800,),
            (1200, 7, 2000,),
            (1250, 3, 2200,),
            (1250, 7, 2400,)
        ], ['time', 'id', 'volume'])

        assert_same(result1, expected_pdf1)
        assert_same(result2, expected_pdf1)
开发者ID:mattomatic,项目名称:flint,代码行数:31,代码来源:test_dataframe_api.py


示例8: derive_schema

    def derive_schema(self):
        '''
        Loads all data in self.path and derives the schema, saves with pickle to "schema.p"
        '''

        df = self.hiveContext.read.format('com.databricks.spark.xml') \
                    .options(rowTag='trkpt') \
                    .load(self.path+'gpx/*')

        df = df.withColumn('athlete',lit(None).cast(StringType())) \
               .withColumn('activity_type',lit(None).cast(StringType()))

        df.printSchema()
        pickle.dump(df.schema, open("schema.p", "wb"))

        pass
开发者ID:larsbkrogvig,项目名称:strava-spark,代码行数:16,代码来源:classes.py


示例9: processRdd

def processRdd(rdd):
	
	try:
		print 'processRDD'
		#covnert to a dataframe from rdd
		
		printOnConsole('Started Processing the streams')

		#desiredCol = ['c-ip','cs-uri-stem','c-user-agent','customer-id','x-ec_custom-1']
		if rdd.count() > 0:
			df = pycsv.csvToDataFrame(sqlContext, rdd, columns=COLUMNS, colTypes=COLUMN_TYPES)
			#df = df.select(desiredCol)
			
			#startTime
			endTime = getCurrentTimeStamp()
			startTime = endTime - SPARK_STREAM_BATCH
			
			endTime = getDateTimeFormat(endTime)
			startTime = getDateTimeFormat(startTime)
			df = df.withColumn(COL_STARTTIME, lit(startTime))
			
			#endTime
			df = df.withColumn(COL_ENDTIME, lit(endTime))

			df.registerTempTable("tempTable")
			query = ('select' + 
					' startTime,' +  																				#startTime
					' endTime,' +  																					#endTime				
					' \'\' as ' +  COL_CUSTOMERID +  ',' +															#customerid				
					' setProjectId(`projectid`) as ' +  COL_PROJECTID + ',' +														#projectid					 	
					' \'\' as ' +  COL_FONTTYPE +  ',' + 															#FontType
					' \'\' as ' +  COL_FONTID +  ',' + 																#FontId
					' getDomainName(`referrer`) as ' +  COL_DOMAINNAME +  ',' + 											#DomainName
					' getBrowser(`useragent`) as ' + COL_USERAGENT +  ',' + 										#UserAgent
					' setIpaddress(`ip`) as ' +  COL_IPADDRESS + 																	#customer ipaddress   
					' from tempTable')

			df = sqlContext.sql(query)
			
			type =  PAGEVIEW_TYPE | PAGEVIEWGEO_TYPE
			processForTable(df, type)
		else:
			printOnConsole('Nothing to process')
	
	except Exception, ex:
		printOnConsole('There was an error...')
		print ex			
开发者ID:ibnipun10,项目名称:Projects,代码行数:47,代码来源:kinesisStream.py


示例10: process_writeable_df

def process_writeable_df(joined_df, date_format="yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"):
    """
    Prepares the dataframe for writing to mongo
    :param joined_df:
    :param date_format:
    :return:
    """
    df_with_parsed_dates = parse_dates(joined_df, date_format)
    df_with_id = df_with_parsed_dates.withColumn("id", f.concat(f.col('account_id'), f.lit("_"), f.col("unix_ts")))
    return df_with_id.na.drop()
开发者ID:faizana,项目名称:rbc_challenge,代码行数:10,代码来源:etl.py


示例11: test_datasource_with_udf

    def test_datasource_with_udf(self):
        # Same as SQLTests.test_datasource_with_udf, but with Pandas UDF
        # This needs to a separate test because Arrow dependency is optional
        import pandas as pd
        import numpy as np
        from pyspark.sql.functions import pandas_udf, lit, col

        path = tempfile.mkdtemp()
        shutil.rmtree(path)

        try:
            self.spark.range(1).write.mode("overwrite").format('csv').save(path)
            filesource_df = self.spark.read.option('inferSchema', True).csv(path).toDF('i')
            datasource_df = self.spark.read \
                .format("org.apache.spark.sql.sources.SimpleScanSource") \
                .option('from', 0).option('to', 1).load().toDF('i')
            datasource_v2_df = self.spark.read \
                .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \
                .load().toDF('i', 'j')

            c1 = pandas_udf(lambda x: x + 1, 'int')(lit(1))
            c2 = pandas_udf(lambda x: x + 1, 'int')(col('i'))

            f1 = pandas_udf(lambda x: pd.Series(np.repeat(False, len(x))), 'boolean')(lit(1))
            f2 = pandas_udf(lambda x: pd.Series(np.repeat(False, len(x))), 'boolean')(col('i'))

            for df in [filesource_df, datasource_df, datasource_v2_df]:
                result = df.withColumn('c', c1)
                expected = df.withColumn('c', lit(2))
                self.assertEquals(expected.collect(), result.collect())

            for df in [filesource_df, datasource_df, datasource_v2_df]:
                result = df.withColumn('c', c2)
                expected = df.withColumn('c', col('i') + 1)
                self.assertEquals(expected.collect(), result.collect())

            for df in [filesource_df, datasource_df, datasource_v2_df]:
                for f in [f1, f2]:
                    result = df.filter(f)
                    self.assertEquals(0, result.count())
        finally:
            shutil.rmtree(path)
开发者ID:JingchengDu,项目名称:spark,代码行数:42,代码来源:test_pandas_udf_scalar.py


示例12: test_string_functions

 def test_string_functions(self):
     from pyspark.sql.functions import col, lit
     df = self.spark.createDataFrame([['nick']], schema=['name'])
     self.assertRaisesRegexp(
         TypeError,
         "must be the same type",
         lambda: df.select(col('name').substr(0, lit(1))))
     if sys.version_info.major == 2:
         self.assertRaises(
             TypeError,
             lambda: df.select(col('name').substr(long(0), long(1))))
开发者ID:Brett-A,项目名称:spark,代码行数:11,代码来源:test_functions.py


示例13: runBPwithGraphFrames

    def runBPwithGraphFrames(cls, g, numIter):
        """Run Belief Propagation using GraphFrame.

        This implementation of BP shows how to use GraphFrame's aggregateMessages method.
        """
        # choose colors for vertices for BP scheduling
        colorG = cls._colorGraph(g)
        numColors = colorG.vertices.select('color').distinct().count()

        # TODO: handle vertices without any edges

        # initialize vertex beliefs at 0.0
        gx = GraphFrame(colorG.vertices.withColumn('belief', sqlfunctions.lit(0.0)), colorG.edges)

        # run BP for numIter iterations
        for iter_ in range(numIter):
            # for each color, have that color receive messages from neighbors
            for color in range(numColors):
                # Send messages to vertices of the current color.
                # We may send to source or destination since edges are treated as undirected.
                msgForSrc = sqlfunctions.when(
                    AM.src['color'] == color,
                    AM.edge['b'] * AM.dst['belief'])
                msgForDst = sqlfunctions.when(
                    AM.dst['color'] == color,
                    AM.edge['b'] * AM.src['belief'])
                # numerically stable sigmoid
                logistic = sqlfunctions.udf(cls._sigmoid, returnType=types.DoubleType())
                aggregates = gx.aggregateMessages(
                    sqlfunctions.sum(AM.msg).alias("aggMess"),
                    sendToSrc=msgForSrc,
                    sendToDst=msgForDst)
                v = gx.vertices
                # receive messages and update beliefs for vertices of the current color
                newBeliefCol = sqlfunctions.when(
                    (v['color'] == color) & (aggregates['aggMess'].isNotNull()),
                    logistic(aggregates['aggMess'] + v['a'])
                ).otherwise(v['belief'])  # keep old beliefs for other colors
                newVertices = (v
                    .join(aggregates, on=(v['id'] == aggregates['id']), how='left_outer')
                    .drop(aggregates['id'])  # drop duplicate ID column (from outer join)
                    .withColumn('newBelief', newBeliefCol)  # compute new beliefs
                    .drop('aggMess')  # drop messages
                    .drop('belief')  # drop old beliefs
                    .withColumnRenamed('newBelief', 'belief')
                )
                # cache new vertices using workaround for SPARK-1334
                cachedNewVertices = AM.getCachedDataFrame(newVertices)
                gx = GraphFrame(cachedNewVertices, gx.edges)

        # Drop the "color" column from vertices
        return GraphFrame(gx.vertices.drop('color'), gx.edges)
开发者ID:mengxr,项目名称:graphframes,代码行数:52,代码来源:belief_propagation.py


示例14: test_vectorized_udf_struct_with_empty_partition

    def test_vectorized_udf_struct_with_empty_partition(self):
        df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))\
            .withColumn('name', lit('John Doe'))

        @pandas_udf("first string, last string")
        def split_expand(n):
            return n.str.split(expand=True)

        result = df.select(split_expand('name')).collect()
        self.assertEqual(1, len(result))
        row = result[0]
        self.assertEqual('John', row[0]['first'])
        self.assertEqual('Doe', row[0]['last'])
开发者ID:q977734161,项目名称:spark,代码行数:13,代码来源:test_pandas_udf_scalar.py


示例15: test_datasource_with_udf

    def test_datasource_with_udf(self):
        from pyspark.sql.functions import udf, lit, col

        path = tempfile.mkdtemp()
        shutil.rmtree(path)

        try:
            self.spark.range(1).write.mode("overwrite").format('csv').save(path)
            filesource_df = self.spark.read.option('inferSchema', True).csv(path).toDF('i')
            datasource_df = self.spark.read \
                .format("org.apache.spark.sql.sources.SimpleScanSource") \
                .option('from', 0).option('to', 1).load().toDF('i')
            datasource_v2_df = self.spark.read \
                .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \
                .load().toDF('i', 'j')

            c1 = udf(lambda x: x + 1, 'int')(lit(1))
            c2 = udf(lambda x: x + 1, 'int')(col('i'))

            f1 = udf(lambda x: False, 'boolean')(lit(1))
            f2 = udf(lambda x: False, 'boolean')(col('i'))

            for df in [filesource_df, datasource_df, datasource_v2_df]:
                result = df.withColumn('c', c1)
                expected = df.withColumn('c', lit(2))
                self.assertEquals(expected.collect(), result.collect())

            for df in [filesource_df, datasource_df, datasource_v2_df]:
                result = df.withColumn('c', c2)
                expected = df.withColumn('c', col('i') + 1)
                self.assertEquals(expected.collect(), result.collect())

            for df in [filesource_df, datasource_df, datasource_v2_df]:
                for f in [f1, f2]:
                    result = df.filter(f)
                    self.assertEquals(0, result.count())
        finally:
            shutil.rmtree(path)
开发者ID:drewrobb,项目名称:spark,代码行数:38,代码来源:test_udf.py


示例16: labelRDDs

def labelRDDs(driver, path, sc):
    sqlContext = SQLContext(sc)

    target = str(driver) + '.json'
    driver_pool = list(all_drivers)
    driver_pool.remove(target)

    sample_drivers = np.random.choice(driver_pool, K, replace=False)
    sample_drivers_paths = [path + i for i in sample_drivers]
    sampled = sqlContext.read.json(sample_drivers_paths)
    orig = sqlContext.read.json(path + target)
    samples = sampled.sample(False, .0055)
    samples = samples.withColumn('label', lit(0))
    orig = orig.withColumn('label', lit(1))
    rawdata = samples.unionAll(orig)
    rawdata = rawdata.select(rawdata['driver'],
                             rawdata['trip'],
                             rawdata['x'],
                             rawdata['y'],
                             rawdata['step'],
                             rawdata['label'])
    rawRDD = rawdata.rdd
    return rawRDD.map(maketups)
开发者ID:USF-ML2,项目名称:SKYNET-,代码行数:23,代码来源:sampling_improved.py


示例17: test_summary_weighted_covariance

    def test_summary_weighted_covariance(self):
        import pyspark.sql.functions as F
        from ts.flint import summarizers

        price = self.price()
        forecast = self.forecast()

        expected_pdf = make_pdf([
            (0, -1.96590909091,)
        ], ["time", "price_forecast_weight_weightedCovariance"])

        joined = price.leftJoin(forecast, key="id").withColumn('weight', F.lit(2.0))
        result = joined.summarize(summarizers.weighted_covariance("price", "forecast", "weight")).toPandas()
        pdt.assert_frame_equal(result, expected_pdf)
开发者ID:mattomatic,项目名称:flint,代码行数:14,代码来源:test_summarizer.py


示例18: _load_dataset

    def _load_dataset(self):
        '''
        Loads strava activities from source to DataFrame self.df
        '''

        # Get athlete list if not already set
        if not self.athletes:
            self._get_athlete_directories()

        # Initialize empty dataset
        self.df = self.hiveContext.createDataFrame(
            self.sc.emptyRDD(),
            self.schema
        )

        for athlete in self.athletes:
            for activity_type in self.activity_types:
        
                # Check that there are files of that type (or else .load fails)
                if self._activities_exist(athlete, activity_type):

                    # Read data
                    dfadd = self.hiveContext.read.format('com.databricks.spark.xml') \
                                    .options(rowTag='trkpt', treatEmptyValuesAsNulls=False) \
                                    .schema(self.schema) \
                                    .load(self.path+'gpx/%s/*%s.gpx' % (athlete, activity_type))
                
                    dfadd = dfadd.withColumn('athlete', lit(athlete)) \
                                 .withColumn('activity_type', lit(activity_type))
                
                    self.df = self.df.unionAll(dfadd)

        if self.filter_bug_inducing_rows:
            self.df = self.df.filter(self.df['extensions.gpxtpx:TrackPointExtension.#VALUE'].isNull())

        pass
开发者ID:larsbkrogvig,项目名称:strava-spark,代码行数:36,代码来源:classes.py


示例19: _minimize_query

 def _minimize_query(self):
     # From the temporal table, we need minimize the location (multiple locations) to the appropriate sample timestamp
     tb_samples = self.hive_cxt.sql("""
         SELECT *
         FROM (
             SELECT *,
             MIN(delta)   OVER ( PARTITION BY refers_to_object_id, created) AS min_delta,
             row_number() OVER ( PARTITION BY refers_to_object_id, created) AS ranks
             FROM samplestemporal st
             ORDER BY refers_to_object_id
         ) query
         where query.ranks = 1
     """)
     tb_samples = tb_samples.withColumn("meta_store", lit(1))
     tb_samples.registerTempTable('minimizedsamples')
     self.hive_cxt.cacheTable('minimizedsamples')
     return tb_samples
开发者ID:akamlani,项目名称:cooperhewitt,代码行数:17,代码来源:ch_spark.py


示例20: test_string_functions

    def test_string_functions(self):
        from pyspark.sql import functions
        from pyspark.sql.functions import col, lit, _string_functions
        df = self.spark.createDataFrame([['nick']], schema=['name'])
        self.assertRaisesRegexp(
            TypeError,
            "must be the same type",
            lambda: df.select(col('name').substr(0, lit(1))))
        if sys.version_info.major == 2:
            self.assertRaises(
                TypeError,
                lambda: df.select(col('name').substr(long(0), long(1))))

        for name in _string_functions.keys():
            self.assertEqual(
                df.select(getattr(functions, name)("name")).first()[0],
                df.select(getattr(functions, name)(col("name"))).first()[0])
开发者ID:apache,项目名称:spark,代码行数:17,代码来源:test_functions.py



注:本文中的pyspark.sql.functions.lit函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python functions.max函数代码示例发布时间:2022-05-27
下一篇:
Python functions.explode函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap