Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.5k views
in Technique[技术] by (71.8m points)

scala - Spark UDF for StructType / Row

I have a "StructType" column in spark Dataframe that has an array and a string as sub-fields. I'd like to modify the array and return the new column of the same type. Can I process it with UDF? Or what are the alternatives?

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil)
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil)
val data = Seq(Row(Row(Array(1,2),"eb")),  Row(Row(Array(3,2,1), "dsf")) )
val rd = sc.parallelize(data)
val df = spark.createDataFrame(rd, schema)
df.printSchema

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)

It seems that I need a UDF of the type Row, something like

val u =  udf((x:Row) => x)
       >> Schema for type org.apache.spark.sql.Row is not supported

This makes sense, since Spark does not know the schema for the return type. Unfortunately, udf.register fails too:

spark.udf.register("foo", (x:Row)=> Row, sub_schema)
     <console>:30: error: overloaded method value register with alternatives: ...
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

turns out you can pass the result schema as a second UDF parameter:

val u =  udf((x:Row) => x, sub_schema)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...