本文整理汇总了Python中pyspark.sql.functions.explode函数的典型用法代码示例。如果您正苦于以下问题:Python explode函数的具体用法?Python explode怎么用?Python explode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了explode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_explode
def test_explode(self):
from pyspark.sql.functions import explode, explode_outer, posexplode_outer
d = [
Row(a=1, intlist=[1, 2, 3], mapfield={"a": "b"}),
Row(a=1, intlist=[], mapfield={}),
Row(a=1, intlist=None, mapfield=None),
]
rdd = self.sc.parallelize(d)
data = self.spark.createDataFrame(rdd)
result = data.select(explode(data.intlist).alias("a")).select("a").collect()
self.assertEqual(result[0][0], 1)
self.assertEqual(result[1][0], 2)
self.assertEqual(result[2][0], 3)
result = data.select(explode(data.mapfield).alias("a", "b")).select("a", "b").collect()
self.assertEqual(result[0][0], "a")
self.assertEqual(result[0][1], "b")
result = [tuple(x) for x in data.select(posexplode_outer("intlist")).collect()]
self.assertEqual(result, [(0, 1), (1, 2), (2, 3), (None, None), (None, None)])
result = [tuple(x) for x in data.select(posexplode_outer("mapfield")).collect()]
self.assertEqual(result, [(0, 'a', 'b'), (None, None, None), (None, None, None)])
result = [x[0] for x in data.select(explode_outer("intlist")).collect()]
self.assertEqual(result, [1, 2, 3, None, None])
result = [tuple(x) for x in data.select(explode_outer("mapfield")).collect()]
self.assertEqual(result, [('a', 'b'), (None, None), (None, None)])
开发者ID:apache,项目名称:spark,代码行数:30,代码来源:test_functions.py
示例2: process
def process(rdd):
print(">>>> BEGIN CASS")
wonbids = getSqlContextInstance(rdd.context).createDataFrame(rdd)
wonbids.registerTempTable("wonbids")
wonbids.write.format("org.apache.spark.sql.cassandra").\
options(keyspace="text_bids", table="bidswon").\
save(mode="append")
#sqlContext.cacheTable('wonbids')
# wonbids.show()
symptoms = wonbids.select(wonbids.id,wonbids.created_utc,explode(wonbids.symptomtags).alias('symptom'))
symptoms.registerTempTable("symptoms")
symptoms.write.format("org.apache.spark.sql.cassandra").\
options(keyspace="text_bids", table="symptoms").\
save(mode="append")
# symptoms.show()
conditions = wonbids.select(wonbids.id,wonbids.created_utc,explode(wonbids.conditiontags).alias('condition'))
conditions.registerTempTable("conditions")
conditions.write.format("org.apache.spark.sql.cassandra").\
options(keyspace="text_bids", table="conditions").\
save(mode="append")
# conditions.show()
# send back to master to process
for w in wonbids.collect():
event.Event('toES', {'id':w.id,'pharmatag':w.pharmatag,'price':w.price,'created_utc':w.created_utc,'symptomtags':w.symptomtags,'conditiontags':w.conditiontags})
print(">>>> END CASS")
开发者ID:holajoyce,项目名称:twitter.rx,代码行数:28,代码来源:twitter_stream.py
示例3: test_udf_in_generate
def test_udf_in_generate(self):
from pyspark.sql.functions import udf, explode
df = self.spark.range(5)
f = udf(lambda x: list(range(x)), ArrayType(LongType()))
row = df.select(explode(f(*df))).groupBy().sum().first()
self.assertEqual(row[0], 10)
df = self.spark.range(3)
res = df.select("id", explode(f(df.id))).collect()
self.assertEqual(res[0][0], 1)
self.assertEqual(res[0][1], 0)
self.assertEqual(res[1][0], 2)
self.assertEqual(res[1][1], 0)
self.assertEqual(res[2][0], 2)
self.assertEqual(res[2][1], 1)
range_udf = udf(lambda value: list(range(value - 1, value + 1)), ArrayType(IntegerType()))
res = df.select("id", explode(range_udf(df.id))).collect()
self.assertEqual(res[0][0], 0)
self.assertEqual(res[0][1], -1)
self.assertEqual(res[1][0], 0)
self.assertEqual(res[1][1], 0)
self.assertEqual(res[2][0], 1)
self.assertEqual(res[2][1], 0)
self.assertEqual(res[3][0], 1)
self.assertEqual(res[3][1], 1)
开发者ID:drewrobb,项目名称:spark,代码行数:26,代码来源:test_udf.py
示例4: test_explode
def test_explode(self):
from pyspark.sql.functions import explode
d = [Row(a=1, intlist=[1, 2, 3], mapfield={"a": "b"})]
rdd = self.sc.parallelize(d)
data = self.sqlCtx.createDataFrame(rdd)
result = data.select(explode(data.intlist).alias("a")).select("a").collect()
self.assertEqual(result[0][0], 1)
self.assertEqual(result[1][0], 2)
self.assertEqual(result[2][0], 3)
result = data.select(explode(data.mapfield).alias("a", "b")).select("a", "b").collect()
self.assertEqual(result[0][0], "a")
self.assertEqual(result[0][1], "b")
开发者ID:uncleGen,项目名称:ps-on-spark,代码行数:14,代码来源:tests.py
示例5: data
def data(self):
from pyspark.sql.functions import array, explode, col, lit
return self.spark.range(10).toDF('id') \
.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \
.withColumn("v", explode(col('vs'))) \
.drop('vs') \
.withColumn('w', lit(1.0))
开发者ID:JingchengDu,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_grouped_agg.py
示例6: create_one_hot_dict
def create_one_hot_dict(self,input_df):
"""Creates a one-hot-encoder dictionary based on the input data.
Args:
input_df (DataFrame with 'features' column): A DataFrame where each row contains a list of
(featureID, value) tuples.
Returns:
dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
unique integers.
"""
distinct_feats = input_df.select(explode(input_df.features)).distinct()
#print distinct_feats.take(1)[0]
return distinct_feats.rdd.map(lambda x: tuple(x[0])).zipWithIndex().collectAsMap()
开发者ID:yingcuhk,项目名称:SparkPractice,代码行数:14,代码来源:Encoding.py
示例7: get_average_expected_change
def get_average_expected_change(timeframe, partner, purchase):
"""
returns the average expected change of day 1 to day 7
input: a period (string), a partner (string), a spark dataframe
output: a dictionary with key=day and value=average_expected_change
"""
keys = [1,2,3,4,5,6,7]
result = {}
result = {key: 0 for key in keys}
timeframe_is = get_date(timeframe)
unlisted = purchase.select(explode(purchase.prediction.days).alias("test")).collect()
# print 'length of unlisted:', len(unlisted)
total = len(unlisted) / len(keys)
# print total
for i in range(len(unlisted)):
result[i%7+1] += unlisted[i][0].expected_change / total
return result
开发者ID:burgosf,项目名称:spark_df,代码行数:17,代码来源:test_spark.py
示例8: elements
# MAGIC
# MAGIC Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
# MAGIC + The first issue is that that we need to split each line by its spaces.
# MAGIC + The second issue is we need to filter out empty lines or words.
# MAGIC
# MAGIC Apply a transformation that will split each 'sentence' in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row. To accomplish these two tasks you can use the `split` and `explode` functions found in [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).
# MAGIC
# MAGIC Once you have a DataFrame with one word per row you can apply the [DataFrame operation `where`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) to remove the rows that contain ''.
# MAGIC
# MAGIC > Note that `shakeWordsDF` should be a DataFrame with one column named `word`.
# COMMAND ----------
# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF.select(explode(split(shakespeareDF[0],"\s+")).alias("word"))).where("length(word) > 0")
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount
# COMMAND ----------
# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")
# COMMAND ----------
# MAGIC %md
# MAGIC ** (4e) Count the words **
开发者ID:Sun-shan,项目名称:edx-cs105x-spark-intro,代码行数:31,代码来源:cs105_lab1b_word_count.py
示例9: drop
# drop() is like the opposite of select(): Instead of selecting specific columns from a DataFrame, it drops a specifed column from a DataFrame
dataDF.drop('occupation').drop('age').show()
# the sample() transformation returns a new DataFrame with a random sample
sampledDF = dataDF.sample(withReplacement=False, fraction=0.10)
print sampledDF.count()
sampledDF.show()
# split() and explode() transformations
from pyspark.sql.functions import split, explode
shakeWordsSplit = (shakespeareDF
.select(split(shakespeareDF.word,' ').alias('word'))) # here split(DF,' ') splits the sentence at a space and returns each word in a single row
shakeWordsExplode = (shakeWordsSplit
.select(explode(shakeWordsSplit.word).alias('word'))) # explode() Returns a new row for each element in the given array
shakeWordsDF = shakeWordsExplode.filter(shakeWordsExplode.word != '') # removes all the blanks
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount
###############################################################
### ###
### ###
### GROUP BY ###
### ###
### ###
###############################################################
开发者ID:aroonjham,项目名称:CodeRepository,代码行数:30,代码来源:Python_SparkSQL.py
示例10: Tokens
from lib.pos_tags import PosTags
from lib.chunks import Chunks
t = Tokens()
p = PosTags()
c = Chunks()
c.train(c.load_training_data("../data/chunker_training_50_fixed.json"))
def pipeline(s):
"""
Given a string, return a list of relations
"""
return c.assemble(c.tag(p.tag(t.tokenize(s))))
pipeline_udf = sql.udf(pipeline, types.ArrayType(types.MapType(types.StringType(), types.StringType())))
phrases = (
notes.withColumn("phrases", pipeline_udf(notes["document"]))
.select(sql.explode(sql.col("phrases")).alias("text"))
.filter(sql.col("text")["tag"] == "NP")
.select(sql.lower(sql.col("text")["phrase"]).alias("phrase"))
.groupBy(sql.col("phrase"))
.count()
)
phrases.write.parquet("../data/idigbio_phrases.parquet")
开发者ID:mjcollin,项目名称:2016spr,代码行数:30,代码来源:find_phrases_job.py
示例11: rdd_to_recordstore
def rdd_to_recordstore(rdd_transform_context_rdd):
if rdd_transform_context_rdd.isEmpty():
MonMetricsKafkaProcessor.log_debug(
"rdd_to_recordstore: nothing to process...")
else:
sql_context = SQLContext(rdd_transform_context_rdd.context)
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
get_data_driven_specs_repo()
pre_transform_specs_df = data_driven_specs_repo.\
get_data_driven_specs(
sql_context=sql_context,
data_driven_spec_type=DataDrivenSpecsRepo.
pre_transform_specs_type)
#
# extract second column containing raw metric data
#
raw_mon_metrics = rdd_transform_context_rdd.map(
lambda nt: nt.rdd_info[1])
#
# convert raw metric data rdd to dataframe rdd
#
raw_mon_metrics_df = \
MonMetricUtils.create_mon_metrics_df_from_json_rdd(
sql_context,
raw_mon_metrics)
#
# filter out unwanted metrics and keep metrics we are interested in
#
cond = [
raw_mon_metrics_df.metric.name ==
pre_transform_specs_df.event_type]
filtered_metrics_df = raw_mon_metrics_df.join(
pre_transform_specs_df, cond)
#
# validate filtered metrics to check if required fields
# are present and not empty
# In order to be able to apply filter function had to convert
# data frame rdd to normal rdd. After validation the rdd is
# converted back to dataframe rdd
#
# FIXME: find a way to apply filter function on dataframe rdd data
validated_mon_metrics_rdd = filtered_metrics_df.rdd.filter(
MonMetricsKafkaProcessor._validate_raw_mon_metrics)
validated_mon_metrics_df = sql_context.createDataFrame(
validated_mon_metrics_rdd, filtered_metrics_df.schema)
#
# record generator
# generate a new intermediate metric record if a given metric
# metric_id_list, in pre_transform_specs table has several
# intermediate metrics defined.
# intermediate metrics are used as a convenient way to
# process (aggregated) metric in mutiple ways by making a copy
# of the source data for each processing
#
gen_mon_metrics_df = validated_mon_metrics_df.select(
validated_mon_metrics_df.meta,
validated_mon_metrics_df.metric,
validated_mon_metrics_df.event_processing_params,
validated_mon_metrics_df.event_type,
explode(validated_mon_metrics_df.metric_id_list).alias(
"this_metric_id"),
validated_mon_metrics_df.service_id)
#
# transform metrics data to record_store format
# record store format is the common format which will serve as
# source to aggregation processing.
# converting the metric to common standard format helps in writing
# generic aggregation routines driven by configuration parameters
# and can be reused
#
record_store_df = gen_mon_metrics_df.select(
(gen_mon_metrics_df.metric.timestamp / 1000).alias(
"event_timestamp_unix"),
from_unixtime(
gen_mon_metrics_df.metric.timestamp / 1000).alias(
"event_timestamp_string"),
gen_mon_metrics_df.event_type.alias("event_type"),
gen_mon_metrics_df.event_type.alias("event_quantity_name"),
(gen_mon_metrics_df.metric.value / 1.0).alias(
"event_quantity"),
when(gen_mon_metrics_df.metric.dimensions.state != '',
gen_mon_metrics_df.metric.dimensions.state).otherwise(
'NA').alias("event_status"),
lit('1.0').alias('event_version'),
lit('metrics').alias("record_type"),
# resource_uuid
when(gen_mon_metrics_df.metric.dimensions.instanceId != '',
gen_mon_metrics_df.metric.dimensions.instanceId).when(
gen_mon_metrics_df.metric.dimensions.resource_id != '',
gen_mon_metrics_df.metric.dimensions.resource_id).
otherwise('NA').alias("resource_uuid"),
#.........这里部分代码省略.........
开发者ID:bigluster,项目名称:monasca-transform,代码行数:101,代码来源:mon_metrics_kafka.py
示例12: Row
#linesOut.saveAsTextFile("hdfs:///tmp/fact_icd9_encounter_08242016_supP2_rdd_.csv")
# linesOut.saveAsTextFile('hdfs:///tmp/fact_icd9_encounter_08242016_supP2_rdd.csv')
linesOut.saveAsTextFile("/Users/jayurbain/Dropbox/MCW/fact_icd9_encounter_08242016_supP2_rdd.txt")
linesOut.reduce( lambda k,v: (k))
################
from pyspark.sql import Row
from pyspark.sql.functions import explode
df = sqlContext.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9]), Row(a=2, b=[4,5,6],c=[10,11,12])])
df1 = df.select(df.a,explode(df.b).alias("b"),df.c)
df2 = df1.select(df1.a,df1.b,explode(df1.c).alias("c"))
###################################
# fpgrowth example
itemsets = parts.map(lambda p: ( p[3].strip().split(',') ) )
itemsets.getNumPartitions()
model_fp = FPGrowth.train(itemsets, minSupport=0.005, numPartitions=10)
result = model_fp.freqItemsets().collect()
for i in sorted(result, key=operator.itemgetter(1), reverse=True):
print '(', ', '.join(i.items), ')', 'freq=', str(i.freq)
开发者ID:jayurbain,项目名称:machine-learning,代码行数:28,代码来源:subsequence_mining.py
示例13: elements
# MAGIC
# MAGIC Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
# MAGIC + The first issue is that that we need to split each line by its spaces.
# MAGIC + The second issue is we need to filter out empty lines or words.
# MAGIC
# MAGIC Apply a transformation that will split each 'sentence' in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row. To accomplish these two tasks you can use the `split` and `explode` functions found in [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).
# MAGIC
# MAGIC Once you have a DataFrame with one word per row you can apply the [DataFrame operation `where`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) to remove the rows that contain ''.
# MAGIC
# MAGIC > Note that `shakeWordsDF` should be a DataFrame with one column named `word`.
# COMMAND ----------
# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF.select(explode(split(col('sentence'),' ')).alias('word')))
shakeWordsDF = shakeWordsDF.filter(col('word') != '')
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount
# COMMAND ----------
# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")
# COMMAND ----------
开发者ID:HappyDash,项目名称:apacheSpark,代码行数:30,代码来源:cs105_lab1b_word_count.py
示例14: data
def data(self):
return self.spark.range(10).toDF('id') \
.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
.withColumn("v", explode(col('vs'))).drop('vs')
开发者ID:CodingCat,项目名称:spark,代码行数:4,代码来源:test_pandas_udf_grouped_map.py
示例15: author_pub_simu
casSession = cascluster.connect('test2')
for aggItem in agg:
if aggItem[0] != "":
casSession.execute('INSERT INTO author_pub_simu (author, pub) VALUES (%s, %s)', (str(aggItem[0]), str(aggItem[1])))
casSession.shutdown()
cascluster.shutdown()
sc = SparkContext("spark://ip-172-31-2-40:7077", "2016_test")
sqlContext = SQLContext(sc)
# read in data from HDFS and select columns
df1 = sqlContext.read.json("hdfs://ec2-52-34-128-244.us-west-2.compute.amazonaws.com:9000//simulated/fake_data_p1*.json").dropna()
df_sel = df1.select('recid', 'authors','co-authors','references', 'creation_date').withColumnRenamed('co-authors', 'co_authors').persist(StorageLevel.MEMORY_AND_DISK)
# explode references list and group by citation id to calcualte the number of times that one publication has been cited
df_references = df_sel.select('recid', explode('references')).withColumnRenamed('_c0','cited_id').groupBy('cited_id').count().withColumnRenamed('count','num_cited')
# combine author and co-author list to generate a total list of authors and convert rdd into dataframe
rdd_authors = df_sel.rdd.map(lambda x:{'recid':x.recid, 'authors': append_author(x.authors, x.co_authors), 'creation_date': fetch_year(x.creation_date)})
df_authors = sqlContext.createDataFrame(rdd_authors)
# join citation and author dataframes
df_join = df_references.join(df_authors, df_references.cited_id == df_authors.recid, 'inner').drop(df_authors.recid)
# explode author and save to Cassandra database
df_explode_author = df_join.select('cited_id', 'num_cited', explode('authors'), 'creation_date').withColumnRenamed('_c0', 'author')
df_explode_author.persist(StorageLevel.MEMORY_AND_DISK)
df_sel.unpersist()
df_explode_author.rdd.foreachPartition(aggToCassandra2)
# combine each author publication list, group by author and calculate H-index for each author
开发者ID:qnsosusong,项目名称:Hindex,代码行数:31,代码来源:spark_hindex_simu.py
示例16: gapply
#.........这里部分代码省略.........
... Row(course="dotNET", year=2012, earnings=5000),
... Row(course="dotNET", year=2013, earnings=48000),
... Row(course="Java", year=2013, earnings=30000)])
... .select("course", "year", "earnings"))
>>> def yearlyMedian(_, vals):
... all_years = set(vals['year'])
... # Note that interpolation is performed, so we need to cast back to int.
... yearly_median = [(year, int(vals['earnings'][vals['year'] == year].median()))
... for year in all_years]
... return pd.DataFrame.from_records(yearly_median)
>>> newSchema = StructType().add("year", LongType()).add("median_earnings", LongType())
>>> gapply(df.groupBy("course"), yearlyMedian, newSchema).orderBy("median_earnings").show()
+------+----+---------------+
|course|year|median_earnings|
+------+----+---------------+
|dotNET|2012| 7500|
| Java|2012| 20000|
| Java|2013| 30000|
|dotNET|2013| 48000|
+------+----+---------------+
<BLANKLINE>
>>> def twoKeyYearlyMedian(_, vals):
... return pd.DataFrame.from_records([(int(vals["earnings"].median()),)])
>>> newSchema = StructType([df.schema["earnings"]])
>>> gapply(df.groupBy("course", "year"), twoKeyYearlyMedian, newSchema, "earnings").orderBy(
... "earnings").show()
+------+----+--------+
|course|year|earnings|
+------+----+--------+
|dotNET|2012| 7500|
| Java|2012| 20000|
| Java|2013| 30000|
|dotNET|2013| 48000|
+------+----+--------+
<BLANKLINE>
>>> spark.stop(); SparkSession._instantiatedContext = None
"""
import pandas as pd
minPandasVersion = '0.7.1'
if LooseVersion(pd.__version__) < LooseVersion(minPandasVersion):
raise ImportError('Pandas installed but version is {}, {} required'
.format(pd.__version__, minPandasVersion))
# Do a null aggregation to retrieve the keys first (should be no computation)
# Also consistent with spark.sql.retainGroupColumns
keySchema = grouped_data.agg({}).schema
keyCols = grouped_data.agg({}).columns
if not cols:
# Extract the full column list with the parent df
javaDFName = "org$apache$spark$sql$RelationalGroupedDataset$$df"
parentDF = java_gateway.get_field(grouped_data._jgd, javaDFName)
allCols = DataFrame(parentDF, None).columns
keyColsSet = set(keyCols)
cols = [col for col in allCols if col not in keyColsSet]
if "*" in cols:
raise ValueError("cols expected to contain only singular columns")
if len(set(cols)) < len(cols):
raise ValueError("cols expected not to contain duplicate columns")
if not isinstance(schema, StructType):
raise ValueError("output schema should be a StructType")
inputAggDF = grouped_data.agg({col: 'collect_list' for col in cols})
# Recover canonical order (aggregation may change column order)
canonicalOrder = chain(keyCols, [inputAggDF['collect_list(' + col + ')'] for col in cols])
inputAggDF = inputAggDF.select(*canonicalOrder)
# Wraps the user-provided function with another python function, which prepares the
# input in the form specified by the documentation. Then, once the function completes,
# this wrapper prepends the keys to the output values and converts from pandas.
def pandasWrappedFunc(*args):
nvals = len(cols)
keys, collectedCols = args[:-nvals], args[-nvals:]
paramKeys = tuple(keys)
if len(paramKeys) == 1:
paramKeys = paramKeys[0]
valuesDF = pd.DataFrame.from_dict(dict(zip(cols, collectedCols)))
valuesDF = valuesDF[list(cols)] # reorder to canonical
outputDF = func(paramKeys, valuesDF)
valCols = outputDF.columns.tolist()
for key, keyName in zip(keys, keyCols):
outputDF[keyName] = key
outputDF = outputDF[keyCols + valCols] # reorder to canonical
# To recover native python types for serialization, we need
# to convert the pandas dataframe to a numpy array, then to a
# native list (can't go straight to native, since pandas will
# attempt to perserve the numpy type).
return outputDF.values.tolist()
keyPrependedSchema = StructType(list(chain(keySchema, schema)))
outputAggSchema = ArrayType(keyPrependedSchema, containsNull=False)
pandasUDF = udf(pandasWrappedFunc, outputAggSchema)
outputAggDF = inputAggDF.select(pandasUDF(*inputAggDF))
explodedDF = outputAggDF.select(explode(*outputAggDF).alias("gapply"))
# automatically retrieves nested schema column names
return explodedDF.select("gapply.*")
开发者ID:dominguezus,项目名称:spark-sklearn,代码行数:101,代码来源:group_apply.py
示例17: elements
# MAGIC Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
# MAGIC + The first issue is that that we need to split each line by its spaces.
# MAGIC + The second issue is we need to filter out empty lines or words.
# MAGIC
# MAGIC Apply a transformation that will split each 'sentence' in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row. To accomplish these two tasks you can use the `split` and `explode` functions found in [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).
# MAGIC
# MAGIC Once you have a DataFrame with one word per row you can apply the [DataFrame operation `where`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) to remove the rows that contain ''.
# MAGIC
# MAGIC > Note that `shakeWordsDF` should be a DataFrame with one column named `word`.
# COMMAND ----------
# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF
.select(explode(split(shakespeareDF.sentence, ' '))
.alias("word"))
.where("word != ''"))
shakeWordsDF.show(truncate=False)
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount
# COMMAND ----------
# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")
# COMMAND ----------
开发者ID:VaniKandhasamy,项目名称:BerkeleyX-Apache-Spark-Labs,代码行数:31,代码来源:cs105_lab1b_word_count.py
示例18: with_explode_column
def with_explode_column(df):
import pyspark.sql.functions as F
df2 = df.withColumn('values', F.array(F.lit(1), F.lit(2)))
df2 = df2.withColumn('value', F.explode(df2.values))
return df2
开发者ID:mattomatic,项目名称:flint,代码行数:5,代码来源:test_partition_preserve.py
示例19: elements
# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF
<FILL IN>)
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount
# COMMAND ----------
# ANSWER
from pyspark.sql.functions import split, size, explode
shakeWordsDF = (shakespeareDF
.select(split('sentence', '\s+').alias('words'))
.select(explode('words').alias('word'))
.where(col('word') != ''))
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount
# COMMAND ----------
# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")
# COMMAND ----------
# PRIVATE_TEST Remove empty elements (4d)
开发者ID:ANTPHAM,项目名称:mooc-setup,代码行数:31,代码来源:cs105_lab1b_word_count.py
示例20: elements
# MAGIC Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
# MAGIC + The first issue is that that we need to split each line by its spaces.
# MAGIC + The second issue is we need to filter out empty lines or words.
# MAGIC
# MAGIC Apply a transformation that will split each 'sentence' in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row. To accomplish these two tasks you can use the `split` and `explode` functions found in [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).
# MAGIC
# MAGIC Once you have a DataFrame with one word per row you can apply the [DataFrame operation `where`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) to remove the rows that contain ''.
# MAGIC
# MAGIC > Note that `shakeWordsDF` should be a DataFrame with one column named `word`.
# COMMAND ----------
# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode
shakeWordsDF = (shakespeareDF
.select(explode(split(shakespeareDF.value , ' ')).alias('word'))
.filter("word<>''"))
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount
# COMMAND ----------
# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")
# COMMAND ----------
# MAGIC %md
# MAGIC ** (4e) Count the words **
开发者ID:Teitlax,项目名称:ApacheSpark,代码行数:31,代码来源:cs105_lab1b_word_count.py
注:本文中的pyspark.sql.functions.explode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论