I have 2 files inside dags directory - dag_1.py and dag_2.py
dag_1.py creates a static DAG and dag_2.py creates dynamic DAGs based on external json files at some location.
The static DAG (created by dag_1.py) contains a task at a later stage which generates some of these input json files for dag_2.py and dynamic DAGs are created in this manner.
This used to work fine with Airflow 1.x versions where DAG Serialization was not used. But with Airflow 2.0 DAG Serialization has become mandatory. Sometimes, I get the following exception in the Scheduler when dynamic DAGs are spawned -
[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
self._run_scheduler_loop()
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
self._create_dag_runs(query.all(), session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
self._add_dag_from_db(dag_id=dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table
After this the scheduler gets terminated which is expected.
When I check the table manually after this error, I am able to see the DAG entry in it.
This issue is not reproducible all the time. What can be the probable cause for this? Is there any Airflow configuration which I should try tweaking?
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…