• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python functions.explode函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.sql.functions.explode函数的典型用法代码示例。如果您正苦于以下问题:Python explode函数的具体用法?Python explode怎么用?Python explode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了explode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_explode

    def test_explode(self):
        from pyspark.sql.functions import explode, explode_outer, posexplode_outer
        d = [
            Row(a=1, intlist=[1, 2, 3], mapfield={"a": "b"}),
            Row(a=1, intlist=[], mapfield={}),
            Row(a=1, intlist=None, mapfield=None),
        ]
        rdd = self.sc.parallelize(d)
        data = self.spark.createDataFrame(rdd)

        result = data.select(explode(data.intlist).alias("a")).select("a").collect()
        self.assertEqual(result[0][0], 1)
        self.assertEqual(result[1][0], 2)
        self.assertEqual(result[2][0], 3)

        result = data.select(explode(data.mapfield).alias("a", "b")).select("a", "b").collect()
        self.assertEqual(result[0][0], "a")
        self.assertEqual(result[0][1], "b")

        result = [tuple(x) for x in data.select(posexplode_outer("intlist")).collect()]
        self.assertEqual(result, [(0, 1), (1, 2), (2, 3), (None, None), (None, None)])

        result = [tuple(x) for x in data.select(posexplode_outer("mapfield")).collect()]
        self.assertEqual(result, [(0, 'a', 'b'), (None, None, None), (None, None, None)])

        result = [x[0] for x in data.select(explode_outer("intlist")).collect()]
        self.assertEqual(result, [1, 2, 3, None, None])

        result = [tuple(x) for x in data.select(explode_outer("mapfield")).collect()]
        self.assertEqual(result, [('a', 'b'), (None, None), (None, None)])
开发者ID:apache,项目名称:spark,代码行数:30,代码来源:test_functions.py


示例2: process

def process(rdd):
  print(">>>> BEGIN CASS")
  wonbids = getSqlContextInstance(rdd.context).createDataFrame(rdd) 
  wonbids.registerTempTable("wonbids")
  wonbids.write.format("org.apache.spark.sql.cassandra").\
           options(keyspace="text_bids", table="bidswon").\
           save(mode="append")
  #sqlContext.cacheTable('wonbids')
  # wonbids.show()
 
  symptoms = wonbids.select(wonbids.id,wonbids.created_utc,explode(wonbids.symptomtags).alias('symptom'))
  symptoms.registerTempTable("symptoms")
  symptoms.write.format("org.apache.spark.sql.cassandra").\
         options(keyspace="text_bids", table="symptoms").\
         save(mode="append")
  # symptoms.show()

  conditions = wonbids.select(wonbids.id,wonbids.created_utc,explode(wonbids.conditiontags).alias('condition'))
  conditions.registerTempTable("conditions")
  conditions.write.format("org.apache.spark.sql.cassandra").\
         options(keyspace="text_bids", table="conditions").\
         save(mode="append")
  # conditions.show()
  
  # send back to master to process
  for w in wonbids.collect():
    event.Event('toES', {'id':w.id,'pharmatag':w.pharmatag,'price':w.price,'created_utc':w.created_utc,'symptomtags':w.symptomtags,'conditiontags':w.conditiontags})
  print(">>>> END CASS")
开发者ID:holajoyce,项目名称:twitter.rx,代码行数:28,代码来源:twitter_stream.py


示例3: test_udf_in_generate

    def test_udf_in_generate(self):
        from pyspark.sql.functions import udf, explode
        df = self.spark.range(5)
        f = udf(lambda x: list(range(x)), ArrayType(LongType()))
        row = df.select(explode(f(*df))).groupBy().sum().first()
        self.assertEqual(row[0], 10)

        df = self.spark.range(3)
        res = df.select("id", explode(f(df.id))).collect()
        self.assertEqual(res[0][0], 1)
        self.assertEqual(res[0][1], 0)
        self.assertEqual(res[1][0], 2)
        self.assertEqual(res[1][1], 0)
        self.assertEqual(res[2][0], 2)
        self.assertEqual(res[2][1], 1)

        range_udf = udf(lambda value: list(range(value - 1, value + 1)), ArrayType(IntegerType()))
        res = df.select("id", explode(range_udf(df.id))).collect()
        self.assertEqual(res[0][0], 0)
        self.assertEqual(res[0][1], -1)
        self.assertEqual(res[1][0], 0)
        self.assertEqual(res[1][1], 0)
        self.assertEqual(res[2][0], 1)
        self.assertEqual(res[2][1], 0)
        self.assertEqual(res[3][0], 1)
        self.assertEqual(res[3][1], 1)
开发者ID:drewrobb,项目名称:spark,代码行数:26,代码来源:test_udf.py


示例4: test_explode

    def test_explode(self):
        from pyspark.sql.functions import explode
        d = [Row(a=1, intlist=[1, 2, 3], mapfield={"a": "b"})]
        rdd = self.sc.parallelize(d)
        data = self.sqlCtx.createDataFrame(rdd)

        result = data.select(explode(data.intlist).alias("a")).select("a").collect()
        self.assertEqual(result[0][0], 1)
        self.assertEqual(result[1][0], 2)
        self.assertEqual(result[2][0], 3)

        result = data.select(explode(data.mapfield).alias("a", "b")).select("a", "b").collect()
        self.assertEqual(result[0][0], "a")
        self.assertEqual(result[0][1], "b")
开发者ID:uncleGen,项目名称:ps-on-spark,代码行数:14,代码来源:tests.py


示例5: data

 def data(self):
     from pyspark.sql.functions import array, explode, col, lit
     return self.spark.range(10).toDF('id') \
         .withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \
         .withColumn("v", explode(col('vs'))) \
         .drop('vs') \
         .withColumn('w', lit(1.0))
开发者ID:JingchengDu,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_grouped_agg.py


示例6: create_one_hot_dict

	def create_one_hot_dict(self,input_df):
		"""Creates a one-hot-encoder dictionary based on the input data.

		Args:
			input_df (DataFrame with 'features' column): A DataFrame where each row contains a list of
				(featureID, value) tuples.

		Returns:
			dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
				unique integers.
		"""
		distinct_feats = input_df.select(explode(input_df.features)).distinct()
		#print distinct_feats.take(1)[0]
		return distinct_feats.rdd.map(lambda x: tuple(x[0])).zipWithIndex().collectAsMap()
开发者ID:yingcuhk,项目名称:SparkPractice,代码行数:14,代码来源:Encoding.py


示例7: get_average_expected_change

def get_average_expected_change(timeframe, partner, purchase):
    """
    returns the average expected change of day 1 to day 7
    input: a period (string), a partner (string), a spark dataframe
    output: a dictionary with key=day and value=average_expected_change
    """
    keys = [1,2,3,4,5,6,7]
    result = {}
    result = {key: 0 for key in keys}
    timeframe_is = get_date(timeframe)
    unlisted = purchase.select(explode(purchase.prediction.days).alias("test")).collect()
    # print 'length of unlisted:', len(unlisted)
    total = len(unlisted) / len(keys)
    # print total
    for i in range(len(unlisted)):
        result[i%7+1] += unlisted[i][0].expected_change / total
    return result
开发者ID:burgosf,项目名称:spark_df,代码行数:17,代码来源:test_spark.py


示例8: elements

# MAGIC 
# MAGIC Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
# MAGIC   + The first issue is that  that we need to split each line by its spaces.
# MAGIC   + The second issue is we need to filter out empty lines or words.
# MAGIC 
# MAGIC Apply a transformation that will split each 'sentence' in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row.  To accomplish these two tasks you can use the `split` and `explode` functions found in [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).
# MAGIC 
# MAGIC Once you have a DataFrame with one word per row you can apply the [DataFrame operation `where`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) to remove the rows that contain ''.
# MAGIC 
# MAGIC > Note that `shakeWordsDF` should be a DataFrame with one column named `word`.

# COMMAND ----------

# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF.select(explode(split(shakespeareDF[0],"\s+")).alias("word"))).where("length(word) > 0")

shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount

# COMMAND ----------

# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")

# COMMAND ----------

# MAGIC %md
# MAGIC ** (4e) Count the words **
开发者ID:Sun-shan,项目名称:edx-cs105x-spark-intro,代码行数:31,代码来源:cs105_lab1b_word_count.py


示例9: drop

# drop() is like the opposite of select(): Instead of selecting specific columns from a DataFrame, it drops a specifed column from a DataFrame
dataDF.drop('occupation').drop('age').show()

# the sample() transformation returns a new DataFrame with a random sample
sampledDF = dataDF.sample(withReplacement=False, fraction=0.10)
print sampledDF.count()
sampledDF.show()

# split() and explode() transformations
from pyspark.sql.functions import split, explode

shakeWordsSplit = (shakespeareDF
                .select(split(shakespeareDF.word,' ').alias('word'))) # here split(DF,' ') splits the sentence at a space and returns each word in a single row
				
shakeWordsExplode = (shakeWordsSplit
                    .select(explode(shakeWordsSplit.word).alias('word'))) # explode() Returns a new row for each element in the given array
					
shakeWordsDF = shakeWordsExplode.filter(shakeWordsExplode.word != '') # removes all the blanks

shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount

###############################################################
###															###
###															###
###   						GROUP BY						###
###															###
###															###
###############################################################
开发者ID:aroonjham,项目名称:CodeRepository,代码行数:30,代码来源:Python_SparkSQL.py


示例10: Tokens

from lib.pos_tags import PosTags
from lib.chunks import Chunks

t = Tokens()
p = PosTags()
c = Chunks()

c.train(c.load_training_data("../data/chunker_training_50_fixed.json"))


def pipeline(s):
    """
    Given a string, return a list of relations
    """
    return c.assemble(c.tag(p.tag(t.tokenize(s))))


pipeline_udf = sql.udf(pipeline, types.ArrayType(types.MapType(types.StringType(), types.StringType())))


phrases = (
    notes.withColumn("phrases", pipeline_udf(notes["document"]))
    .select(sql.explode(sql.col("phrases")).alias("text"))
    .filter(sql.col("text")["tag"] == "NP")
    .select(sql.lower(sql.col("text")["phrase"]).alias("phrase"))
    .groupBy(sql.col("phrase"))
    .count()
)

phrases.write.parquet("../data/idigbio_phrases.parquet")
开发者ID:mjcollin,项目名称:2016spr,代码行数:30,代码来源:find_phrases_job.py


示例11: rdd_to_recordstore

    def rdd_to_recordstore(rdd_transform_context_rdd):

        if rdd_transform_context_rdd.isEmpty():
            MonMetricsKafkaProcessor.log_debug(
                "rdd_to_recordstore: nothing to process...")
        else:

            sql_context = SQLContext(rdd_transform_context_rdd.context)
            data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
                get_data_driven_specs_repo()
            pre_transform_specs_df = data_driven_specs_repo.\
                get_data_driven_specs(
                    sql_context=sql_context,
                    data_driven_spec_type=DataDrivenSpecsRepo.
                    pre_transform_specs_type)

            #
            # extract second column containing raw metric data
            #
            raw_mon_metrics = rdd_transform_context_rdd.map(
                lambda nt: nt.rdd_info[1])

            #
            # convert raw metric data rdd to dataframe rdd
            #
            raw_mon_metrics_df = \
                MonMetricUtils.create_mon_metrics_df_from_json_rdd(
                    sql_context,
                    raw_mon_metrics)

            #
            # filter out unwanted metrics and keep metrics we are interested in
            #
            cond = [
                raw_mon_metrics_df.metric.name ==
                pre_transform_specs_df.event_type]
            filtered_metrics_df = raw_mon_metrics_df.join(
                pre_transform_specs_df, cond)

            #
            # validate filtered metrics to check if required fields
            # are present and not empty
            # In order to be able to apply filter function had to convert
            # data frame rdd to normal rdd. After validation the rdd is
            # converted back to dataframe rdd
            #
            # FIXME: find a way to apply filter function on dataframe rdd data
            validated_mon_metrics_rdd = filtered_metrics_df.rdd.filter(
                MonMetricsKafkaProcessor._validate_raw_mon_metrics)
            validated_mon_metrics_df = sql_context.createDataFrame(
                validated_mon_metrics_rdd, filtered_metrics_df.schema)

            #
            # record generator
            # generate a new intermediate metric record if a given metric
            # metric_id_list, in pre_transform_specs table has several
            # intermediate metrics defined.
            # intermediate metrics are used as a convenient way to
            # process (aggregated) metric in mutiple ways by making a copy
            # of the source data for each processing
            #
            gen_mon_metrics_df = validated_mon_metrics_df.select(
                validated_mon_metrics_df.meta,
                validated_mon_metrics_df.metric,
                validated_mon_metrics_df.event_processing_params,
                validated_mon_metrics_df.event_type,
                explode(validated_mon_metrics_df.metric_id_list).alias(
                    "this_metric_id"),
                validated_mon_metrics_df.service_id)

            #
            # transform metrics data to record_store format
            # record store format is the common format which will serve as
            # source to aggregation processing.
            # converting the metric to common standard format helps in writing
            # generic aggregation routines driven by configuration parameters
            #  and can be reused
            #
            record_store_df = gen_mon_metrics_df.select(
                (gen_mon_metrics_df.metric.timestamp / 1000).alias(
                    "event_timestamp_unix"),
                from_unixtime(
                    gen_mon_metrics_df.metric.timestamp / 1000).alias(
                    "event_timestamp_string"),
                gen_mon_metrics_df.event_type.alias("event_type"),
                gen_mon_metrics_df.event_type.alias("event_quantity_name"),
                (gen_mon_metrics_df.metric.value / 1.0).alias(
                    "event_quantity"),
                when(gen_mon_metrics_df.metric.dimensions.state != '',
                     gen_mon_metrics_df.metric.dimensions.state).otherwise(
                    'NA').alias("event_status"),
                lit('1.0').alias('event_version'),
                lit('metrics').alias("record_type"),

                # resource_uuid
                when(gen_mon_metrics_df.metric.dimensions.instanceId != '',
                     gen_mon_metrics_df.metric.dimensions.instanceId).when(
                    gen_mon_metrics_df.metric.dimensions.resource_id != '',
                    gen_mon_metrics_df.metric.dimensions.resource_id).
                otherwise('NA').alias("resource_uuid"),
#.........这里部分代码省略.........
开发者ID:bigluster,项目名称:monasca-transform,代码行数:101,代码来源:mon_metrics_kafka.py


示例12: Row

#linesOut.saveAsTextFile("hdfs:///tmp/fact_icd9_encounter_08242016_supP2_rdd_.csv")

# linesOut.saveAsTextFile('hdfs:///tmp/fact_icd9_encounter_08242016_supP2_rdd.csv')

linesOut.saveAsTextFile("/Users/jayurbain/Dropbox/MCW/fact_icd9_encounter_08242016_supP2_rdd.txt")

linesOut.reduce( lambda k,v: (k))

################

from pyspark.sql import Row
from pyspark.sql.functions import explode

df = sqlContext.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9]), Row(a=2, b=[4,5,6],c=[10,11,12])])
df1 = df.select(df.a,explode(df.b).alias("b"),df.c)
df2 = df1.select(df1.a,df1.b,explode(df1.c).alias("c"))



###################################

# fpgrowth example
itemsets = parts.map(lambda p: ( p[3].strip().split(',') ) )
itemsets.getNumPartitions()
model_fp = FPGrowth.train(itemsets, minSupport=0.005, numPartitions=10)
result = model_fp.freqItemsets().collect()
for i in sorted(result, key=operator.itemgetter(1), reverse=True):
    print '(', ', '.join(i.items), ')', 'freq=', str(i.freq)

开发者ID:jayurbain,项目名称:machine-learning,代码行数:28,代码来源:subsequence_mining.py


示例13: elements

# MAGIC 
# MAGIC Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
# MAGIC   + The first issue is that  that we need to split each line by its spaces.
# MAGIC   + The second issue is we need to filter out empty lines or words.
# MAGIC 
# MAGIC Apply a transformation that will split each 'sentence' in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row.  To accomplish these two tasks you can use the `split` and `explode` functions found in [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).
# MAGIC 
# MAGIC Once you have a DataFrame with one word per row you can apply the [DataFrame operation `where`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) to remove the rows that contain ''.
# MAGIC 
# MAGIC > Note that `shakeWordsDF` should be a DataFrame with one column named `word`.

# COMMAND ----------

# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF.select(explode(split(col('sentence'),' ')).alias('word')))

shakeWordsDF = shakeWordsDF.filter(col('word') != '')

shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount

# COMMAND ----------

# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")

# COMMAND ----------
开发者ID:HappyDash,项目名称:apacheSpark,代码行数:30,代码来源:cs105_lab1b_word_count.py


示例14: data

 def data(self):
     return self.spark.range(10).toDF('id') \
         .withColumn("vs", array([lit(i) for i in range(20, 30)])) \
         .withColumn("v", explode(col('vs'))).drop('vs')
开发者ID:CodingCat,项目名称:spark,代码行数:4,代码来源:test_pandas_udf_grouped_map.py


示例15: author_pub_simu

        casSession = cascluster.connect('test2')
        for aggItem in agg:
            if aggItem[0] != "":
                    casSession.execute('INSERT INTO author_pub_simu (author, pub) VALUES (%s, %s)', (str(aggItem[0]), str(aggItem[1])))
        casSession.shutdown()
        cascluster.shutdown()

sc = SparkContext("spark://ip-172-31-2-40:7077", "2016_test")
sqlContext = SQLContext(sc)

# read in data from HDFS and select columns
df1 = sqlContext.read.json("hdfs://ec2-52-34-128-244.us-west-2.compute.amazonaws.com:9000//simulated/fake_data_p1*.json").dropna()
df_sel = df1.select('recid', 'authors','co-authors','references', 'creation_date').withColumnRenamed('co-authors', 'co_authors').persist(StorageLevel.MEMORY_AND_DISK)

# explode references list and group by citation id to calcualte the number of times that one publication has been cited
df_references = df_sel.select('recid', explode('references')).withColumnRenamed('_c0','cited_id').groupBy('cited_id').count().withColumnRenamed('count','num_cited')

# combine author and co-author list to generate a total list of authors and convert rdd into dataframe
rdd_authors = df_sel.rdd.map(lambda x:{'recid':x.recid, 'authors': append_author(x.authors, x.co_authors), 'creation_date': fetch_year(x.creation_date)})
df_authors = sqlContext.createDataFrame(rdd_authors)

# join citation and author dataframes
df_join = df_references.join(df_authors, df_references.cited_id == df_authors.recid, 'inner').drop(df_authors.recid)

# explode author and save to Cassandra database
df_explode_author = df_join.select('cited_id', 'num_cited', explode('authors'), 'creation_date').withColumnRenamed('_c0', 'author')
df_explode_author.persist(StorageLevel.MEMORY_AND_DISK)
df_sel.unpersist()
df_explode_author.rdd.foreachPartition(aggToCassandra2)

# combine each author publication list,  group by author and calculate H-index for each author
开发者ID:qnsosusong,项目名称:Hindex,代码行数:31,代码来源:spark_hindex_simu.py


示例16: gapply


#.........这里部分代码省略.........
    ...                       Row(course="dotNET", year=2012, earnings=5000),
    ...                       Row(course="dotNET", year=2013, earnings=48000),
    ...                       Row(course="Java",   year=2013, earnings=30000)])
    ...     .select("course", "year", "earnings"))
    >>> def yearlyMedian(_, vals):
    ...     all_years = set(vals['year'])
    ...     # Note that interpolation is performed, so we need to cast back to int.
    ...     yearly_median = [(year, int(vals['earnings'][vals['year'] == year].median()))
    ...                      for year in all_years]
    ...     return pd.DataFrame.from_records(yearly_median)
    >>> newSchema = StructType().add("year", LongType()).add("median_earnings", LongType())
    >>> gapply(df.groupBy("course"), yearlyMedian, newSchema).orderBy("median_earnings").show()
    +------+----+---------------+
    |course|year|median_earnings|
    +------+----+---------------+
    |dotNET|2012|           7500|
    |  Java|2012|          20000|
    |  Java|2013|          30000|
    |dotNET|2013|          48000|
    +------+----+---------------+
    <BLANKLINE>
    >>> def twoKeyYearlyMedian(_, vals):
    ...     return pd.DataFrame.from_records([(int(vals["earnings"].median()),)])
    >>> newSchema = StructType([df.schema["earnings"]])
    >>> gapply(df.groupBy("course", "year"), twoKeyYearlyMedian, newSchema, "earnings").orderBy(
    ...     "earnings").show()
    +------+----+--------+
    |course|year|earnings|
    +------+----+--------+
    |dotNET|2012|    7500|
    |  Java|2012|   20000|
    |  Java|2013|   30000|
    |dotNET|2013|   48000|
    +------+----+--------+
    <BLANKLINE>
    >>> spark.stop(); SparkSession._instantiatedContext = None
    """
    import pandas as pd
    minPandasVersion = '0.7.1'
    if LooseVersion(pd.__version__) < LooseVersion(minPandasVersion):
        raise ImportError('Pandas installed but version is {}, {} required'
                          .format(pd.__version__, minPandasVersion))

    # Do a null aggregation to retrieve the keys first (should be no computation)
    # Also consistent with spark.sql.retainGroupColumns
    keySchema = grouped_data.agg({}).schema
    keyCols = grouped_data.agg({}).columns

    if not cols:
        # Extract the full column list with the parent df
        javaDFName = "org$apache$spark$sql$RelationalGroupedDataset$$df"
        parentDF = java_gateway.get_field(grouped_data._jgd, javaDFName)
        allCols = DataFrame(parentDF, None).columns
        keyColsSet = set(keyCols)
        cols = [col for col in allCols if col not in keyColsSet]

    if "*" in cols:
        raise ValueError("cols expected to contain only singular columns")

    if len(set(cols)) < len(cols):
        raise ValueError("cols expected not to contain duplicate columns")

    if not isinstance(schema, StructType):
        raise ValueError("output schema should be a StructType")

    inputAggDF = grouped_data.agg({col: 'collect_list' for col in cols})
    # Recover canonical order (aggregation may change column order)
    canonicalOrder = chain(keyCols, [inputAggDF['collect_list(' + col + ')'] for col in cols])
    inputAggDF = inputAggDF.select(*canonicalOrder)

    # Wraps the user-provided function with another python function, which prepares the
    # input in the form specified by the documentation. Then, once the function completes,
    # this wrapper prepends the keys to the output values and converts from pandas.
    def pandasWrappedFunc(*args):
        nvals = len(cols)
        keys, collectedCols = args[:-nvals], args[-nvals:]
        paramKeys = tuple(keys)
        if len(paramKeys) == 1:
            paramKeys = paramKeys[0]
        valuesDF = pd.DataFrame.from_dict(dict(zip(cols, collectedCols)))
        valuesDF = valuesDF[list(cols)]  # reorder to canonical
        outputDF = func(paramKeys, valuesDF)
        valCols = outputDF.columns.tolist()
        for key, keyName in zip(keys, keyCols):
            outputDF[keyName] = key
        outputDF = outputDF[keyCols + valCols]  # reorder to canonical
        # To recover native python types for serialization, we need
        # to convert the pandas dataframe to a numpy array, then to a
        # native list (can't go straight to native, since pandas will
        # attempt to perserve the numpy type).
        return outputDF.values.tolist()

    keyPrependedSchema = StructType(list(chain(keySchema, schema)))
    outputAggSchema = ArrayType(keyPrependedSchema, containsNull=False)
    pandasUDF = udf(pandasWrappedFunc, outputAggSchema)
    outputAggDF = inputAggDF.select(pandasUDF(*inputAggDF))

    explodedDF = outputAggDF.select(explode(*outputAggDF).alias("gapply"))
    # automatically retrieves nested schema column names
    return explodedDF.select("gapply.*")
开发者ID:dominguezus,项目名称:spark-sklearn,代码行数:101,代码来源:group_apply.py


示例17: elements

# MAGIC Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
# MAGIC   + The first issue is that  that we need to split each line by its spaces.
# MAGIC   + The second issue is we need to filter out empty lines or words.
# MAGIC 
# MAGIC Apply a transformation that will split each 'sentence' in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row.  To accomplish these two tasks you can use the `split` and `explode` functions found in [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).
# MAGIC 
# MAGIC Once you have a DataFrame with one word per row you can apply the [DataFrame operation `where`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) to remove the rows that contain ''.
# MAGIC 
# MAGIC > Note that `shakeWordsDF` should be a DataFrame with one column named `word`.

# COMMAND ----------

# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF
                .select(explode(split(shakespeareDF.sentence, ' '))
                .alias("word"))
                .where("word != ''"))

shakeWordsDF.show(truncate=False)
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount


# COMMAND ----------

# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")

# COMMAND ----------
开发者ID:VaniKandhasamy,项目名称:BerkeleyX-Apache-Spark-Labs,代码行数:31,代码来源:cs105_lab1b_word_count.py


示例18: with_explode_column

 def with_explode_column(df):
     import pyspark.sql.functions as F
     df2 = df.withColumn('values', F.array(F.lit(1), F.lit(2)))
     df2 = df2.withColumn('value', F.explode(df2.values))
     return df2
开发者ID:mattomatic,项目名称:flint,代码行数:5,代码来源:test_partition_preserve.py


示例19: elements

# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF
                <FILL IN>)

shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount

# COMMAND ----------

# ANSWER
from pyspark.sql.functions import split, size, explode
shakeWordsDF = (shakespeareDF
                .select(split('sentence', '\s+').alias('words'))
                .select(explode('words').alias('word'))
                .where(col('word') != ''))

shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount

# COMMAND ----------

# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")

# COMMAND ----------

# PRIVATE_TEST Remove empty elements (4d)
开发者ID:ANTPHAM,项目名称:mooc-setup,代码行数:31,代码来源:cs105_lab1b_word_count.py


示例20: elements

# MAGIC Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
# MAGIC   + The first issue is that  that we need to split each line by its spaces.
# MAGIC   + The second issue is we need to filter out empty lines or words.
# MAGIC
# MAGIC Apply a transformation that will split each 'sentence' in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row.  To accomplish these two tasks you can use the `split` and `explode` functions found in [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).
# MAGIC
# MAGIC Once you have a DataFrame with one word per row you can apply the [DataFrame operation `where`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) to remove the rows that contain ''.
# MAGIC
# MAGIC > Note that `shakeWordsDF` should be a DataFrame with one column named `word`.

# COMMAND ----------

# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF
                .select(explode(split(shakespeareDF.value , ' ')).alias('word'))
                .filter("word<>''"))
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount

# COMMAND ----------

# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")

# COMMAND ----------

# MAGIC %md
# MAGIC ** (4e) Count the words **
开发者ID:Teitlax,项目名称:ApacheSpark,代码行数:31,代码来源:cs105_lab1b_word_count.py



注:本文中的pyspark.sql.functions.explode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python functions.lit函数代码示例发布时间:2022-05-27
下一篇:
Python functions.col函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap