Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
672 views
in Technique[技术] by (71.8m points)

scala - What's the performance impact of converting between `DataFrame`, `RDD` and back?

While my first instinct is to use DataFrames for everything, it's just not possible -- some operations are clearly easier and / or better performing as RDD operations, not to mention certain APIs like GraphX only work on RDDs.

I seem to be spending a lot of time these days converting back and forth between DataFrames and RDDs -- so what's the performance hit? Take RDD.checkpoint -- there's no DataFrame equivalent, so what happens under the hood when I do:

val df = Seq((1,2),(3,4)).toDF("key","value")
val rdd = df.rdd.map(...)
val newDf = rdd.map(r => (r.getInt(0), r.getInt(1))).toDF("key","value")

Obviously, this is a trivally small example, but it would be great to know what happens behind the scene in the conversion.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Let's look at df.rdd first. This is defined as:

lazy val rdd: RDD[Row] = {
  // use a local variable to make sure the map closure doesn't capture the whole DataFrame
  val schema = this.schema
  queryExecution.toRdd.mapPartitions { rows =>
    val converter = CatalystTypeConverters.createToScalaConverter(schema)
    rows.map(converter(_).asInstanceOf[Row])
  }
}

So firstly, it runs queryExecution.toRdd, which basically prepares the execution plan based on the operators used to build up the DataFrame, and computes an RDD[InternalRow] that represents the outcome of plan.

Next these InternalRows (which are only for internal use) of that RDD will be mapped to normal Rows. This will entail the following for each row:

override def toScala(row: InternalRow): Row = {
  if (row == null) {
    null
  } else {
    val ar = new Array[Any](row.numFields)
    var idx = 0
    while (idx < row.numFields) {
      ar(idx) = converters(idx).toScala(row, idx)
      idx += 1
    }
    new GenericRowWithSchema(ar, structType)
  }
}

So it loops over all elements, coverts them to 'scala' space (from Catalyst space), and creates the final row with them. toDf will pretty much do these things in reverse.

This all will indeed have some impact on your performance. How much depends on how complex these operations are compared to the things you do with the data. The bigger possible impact however will be that Spark's Catalyst optimizer can only optimize the operations between the conversions to and from RDDs, rather than optimize the full execution plan in its whole. It would be interesting to see which operations you have trouble with, I find most things can be done using basic expressions or UDFs. Using modules that only work on RDDs is a very valid use case though!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...