Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

Airflow 2.0 - Scheduler is unable to find serialized DAG in the serialized_dag table

I have 2 files inside dags directory - dag_1.py and dag_2.py

dag_1.py creates a static DAG and dag_2.py creates dynamic DAGs based on external json files at some location.

The static DAG (created by dag_1.py) contains a task at a later stage which generates some of these input json files for dag_2.py and dynamic DAGs are created in this manner.

This used to work fine with Airflow 1.x versions where DAG Serialization was not used. But with Airflow 2.0 DAG Serialization has become mandatory. Sometimes, I get the following exception in the Scheduler when dynamic DAGs are spawned -

[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
    self._run_scheduler_loop()
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
    self._create_dag_runs(query.all(), session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
    dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table

After this the scheduler gets terminated which is expected. When I check the table manually after this error, I am able to see the DAG entry in it.

This issue is not reproducible all the time. What can be the probable cause for this? Is there any Airflow configuration which I should try tweaking?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

We had the same issue after updating in the following order:

  1. 1.10.12 -> 1.10.14
  2. 1.10.14 -> 2.0.0

I've followed their guide through, and we had no issues until at some random point after a few hours scheduler started crashing complaining about random DAGs not being found in the database.

Our deployment procedure involves clearing out /opt/airflow/dags folder and doing a clean install every time (we store dags and supporting code in python packages)

So every now and then on 1.10.x version we had cases when scheduler parsed an empty folder and wiped serialized dags from the database, but it always was able to restore the picture on next parse

Apparently in 2.0, as a part of the effort to make scheduler HA, they fully separated DAG processor and scheduler. Which leads to a race condition:

  • if scheduler job hits a database before DAG processor has updated serialized_dag table values, it finds nothing and crashes
  • if luck is on your side, the above will not happen and you won't see this exception

In order to get rid of this problem, I disabled scheduling of all DAGs by updating is_paused in the database, restarted the scheduler and once it generated serialized dags, turned all dags back ON


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...