Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.0k views
in Technique[技术] by (71.8m points)

apache spark - PySpark: fully cleaning checkpoints

According the documentation is possible to tell Spark to keep track of "out of scope" checkpoints - those that are not needed anymore - and clean them from disk.

SparkSession.builder
  ...
  .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
  .getOrCreate()

Apparently it does so but the problem, however, is that the last checkpointed rdds are never deleted.

Question

  • Is there any configuration I am missing to perform all cleanse?
  • If there isn't: Is there any way to get the name of the temporary folder created for a particular application so I can programatically delete it? I.e. Get 0c514fb8-498c-4455-b147-aff242bd7381 from SparkContext the same way you can get the applicationId
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I know its old question but recently i was exploring on checkpoint and had similar problems. Would like to share the findings.

Question :Is there any configuration I am missing to perform all cleanse?

Setting spark.cleaner.referenceTracking.cleanCheckpoints=true is working sometime but its hard to rely on it. official document says that by setting this property

clean checkpoint files if the reference is out of scope

I don't know what exactly it means because my understanding is once spark session/context is stopped it should clean it.

However, I found a answer to your below question

If there isn't: Is there any way to get the name of the temporary folder created for a particular application so I can programatically delete it? I.e. Get 0c514fb8-498c-4455-b147-aff242bd7381 from SparkContext the same way you can get the applicationId

Yes, We can get the checkpointed directory like below:

Scala :

//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")

scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3

//It gives String so we can use org.apache.hadoop.fs to delete path 

PySpark:

// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t 
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'

// notice 'u' at the start which means It returns unicode object
// Below are the steps to get hadoop file system object and delete

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...