Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
722 views
in Technique[技术] by (71.8m points)

pyspark - Airflow spark submit operator

I give the command for spark2 submit as:

    value = Varibale.get('value')
    cmd = """
                    spark2-submit --master yarn --deploy-mode cluster 
                    --driver-memory=10G 
                    --conf spark.dynamicAllocation.minExecutors=5
                    --conf spark.dynamicAllocation.maxExecutors=10 
                    --queue test 
                    --executor-memory=10G
                    --executor-cores=2
                    --conf spark.yarn.driver.memoryOverhead=5120
                    --conf spark.driver.maxResultSize=2G 
                    --conf spark.yarn.executor.memoryOverhead=5120 
                    --conf spark.kryoserializer.buffer.max=1000m 
                    --conf spark.executor.extraJavaOptions=-XX:+UseG1GC 
                    --conf spark.network.timeout=15000s
                    --conf spark.executor.heartbeatInterval=1500s 
                    --conf spark.task.maxDirectResultSize=8G 
                    --principal test-host@test
                    --keytab /home/test-host.keytab 
                    --conf spark.ui.view.acls="*" 
                    /home/test/test.py {0}
                    """.format(value)

    test = SSHOperator(task_id='TEST',
                   ssh_conn_id='test-conn',
                   command=cmd
                   )

I want this to be converted to SparkSubmitOperator. Also, I need spark2 submit.

How can the above be converted to the SparkSubmitOperator? So far I have tried:

          
                                      
SparkSubmitOperator(task_id='TEST',
conn_id='test-conn',
application=f'/home/test/test.py {0}'.format(value),
executor_cores=2,
executor_memory='10g',
)


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The options that the SparkSubmitOperator in Airflow requires can be sent in a the dictionary. Keep in mind that the keys in the dictionary should be the same as teh parameter names to the function.

Create the following two dictionaries:

base_config = {
    "task_id":"TEST",
    "conn_id":"test-conn",
    "application": "/home/test/test.py"
    "executor-memory":"10G",
    "driver-memory":"10G",
    "executor-cores":2,
    "principal":"test-host@test",
    "keytab":"/home/test-host.keytab",
    "env_vars":{"SPARK_MAJOR_VERSION":2}
    }

spark_config = {
    "spark.master": "yarn",
    "spark.submit.deployMode": "client",
    "spark.yarn.queue":"test",
    "spark.dynamicAllocation.minExecutors":5,
    "spark.dynamicAllocation.maxExecutors":10, 
    "spark.yarn.driver.memoryOverhead":5120,
    "spark.driver.maxResultSize":"2G",
    "spark.yarn.executor.memoryOverhead":5120,
    "spark.kryoserializer.buffer.max":"1000m",
    "spark.executor.extraJavaOptions":"-XX:+UseG1GC",
    "spark.network.timeout":"15000s",
    "spark.executor.heartbeatInterval":"1500s",
    "spark.task.maxDirectResultSize":"8G",
    "spark.ui.view.acls":"*"
}

SparkSubmitOperator(**base_config,conf=spark_config)

This should make your flow configuration driven.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...