Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
552 views
in Technique[技术] by (71.8m points)

scala - How to calculate the best numberOfPartitions for coalesce?

So, I understand that in general one should use coalesce() when:

the number of partitions decreases due to a filter or some other operation that may result in reducing the original dataset (RDD, DF). coalesce() is useful for running operations more efficiently after filtering down a large dataset.

I also understand that it is less expensive than repartition as it reduces shuffling by moving data only if necessary. My problem is how to define the parameter that coalesce takes (idealPartionionNo). I am working on a project which was passed to me from another engineer and he was using the below calculation to compute the value of that parameter.

// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)

val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR

This is then used with a partitioner object:

val partitioner = new HashPartitioner(idealPartionionNo)

but also used with:

RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)

Is this the right approach? What is the main idea behind the idealPartionionNo value computation? What is the REPARTITION_FACTOR? How do I generally work to define that?

Also, since YARN is responsible for identifying the available executors on the fly is there a way of getting that number (AVAILABLE_EXECUTOR_INSTANCES) on the fly and use that for computing idealPartionionNo (i.e. replace NO_OF_EXECUTOR_INSTANCES with AVAILABLE_EXECUTOR_INSTANCES)?

Ideally, some actual examples of the form:

  • Here 's a dataset (size);
  • Here's a number of transformations and possible reuses of an RDD/DF.
  • Here is where you should repartition/coalesce.
  • Assume you have n executors with m cores and a partition factor equal to k

then:

  • The ideal number of partitions would be ==> ???

Also, if you can refer me to a nice blog that explains these I would really appreciate it.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

In practice optimal number of partitions depends more on the data you have, transformations you use and overall configuration than the available resources.

  • If the number of partitions is too low you'll experience long GC pauses, different types of memory issues, and lastly suboptimal resource utilization.
  • If the number of partitions is too high then maintenance cost can easily exceed processing cost. Moreover, if you use non-distributed reducing operations (like reduce in contrast to treeReduce), a large number of partitions results in a higher load on the driver.

You can find a number of rules which suggest oversubscribing partitions compared to the number of cores (factor 2 or 3 seems to be common) or keeping partitions at a certain size but this doesn't take into account your own code:

  • If you allocate a lot you can expect long GC pauses and it is probably better to go with smaller partitions.
  • If a certain piece of code is expensive then your shuffle cost can be amortized by a higher concurrency.
  • If you have a filter you can adjust the number of partitions based on a discriminative power of the predicate (you make different decisions if you expect to retain 5% of the data and 99% of the data).

In my opinion:

  • With one-off jobs keep higher number partitions to stay on the safe side (slower is better than failing).
  • With reusable jobs start with conservative configuration then execute - monitor - adjust configuration - repeat.
  • Don't try to use fixed number of partitions based on the number of executors or cores. First understand your data and code, then adjust configuration to reflect your understanding.

    Usually, it is relatively easy to determine the amount of raw data per partition for which your cluster exhibits stable behavior (in my experience it is somewhere in the range of few hundred megabytes, depending on the format, data structure you use to load data, and configuration). This is the "magic number" you're looking for.

Some things you have to remember in general:

  • Number of partitions doesn't necessarily reflect data distribution. Any operation that requires shuffle (*byKey, join, RDD.partitionBy, Dataset.repartition) can result in non-uniform data distribution. Always monitor your jobs for symptoms of a significant data skew.
  • Number of partitions in general is not constant. Any operation with multiple dependencies (union, coGroup, join) can affect the number of partitions.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...