Skip to content

Commit 22754cf

Browse files
jason810496uranusjrLee-W
authored andcommitted
Replace external_trigger check with DagRunType (apache#45961)
* Replace external_trigger check with DagRunType - Fix test_handle_multiple_columns_unique_constraint_error - Resolve ashb code review - Add newsfragment - Replace external_trigger is False logic with run_type is SCHEDULED - Fix _emit_true_scheduling_delay_stats_for_finished_state - Fix migrations fix after rebasing to latest main Fix test_exceptions and static check after rebasing Fix test_dag_run and erd static check * fixup! Fix run_type logic, remove unused externally_triggered_type * fixup! Restore external_trigger value when downgrade * Remove externally_trigger reference in frontend The 'triggered_by' field used to only be shown when the run is externally triggered. Rather than using run_type to maintain the conditional, I decided to render the field unconditionally instead, since the value could be useful for scheduler-created runs too (for example, distinguishing whether a run is asset-triggered or time-scheduled). * docs(newsfragments): add migration rules --------- Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com> Co-authored-by: Wei Lee <weilee.rx@gmail.com>
1 parent fbe0fa7 commit 22754cf

65 files changed

Lines changed: 1313 additions & 1357 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

airflow/api/client/local_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ def trigger_dag(
5454
"data_interval_start": dag_run.data_interval_start,
5555
"data_interval_end": dag_run.data_interval_end,
5656
"end_date": dag_run.end_date,
57-
"external_trigger": dag_run.external_trigger,
5857
"last_scheduling_decision": dag_run.last_scheduling_decision,
5958
"logical_date": dag_run.logical_date,
6059
"run_type": dag_run.run_type,

airflow/api/common/mark_tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
145145
if current_dagrun.logical_date is None:
146146
return [run_id]
147147

148-
last_dagrun = dag.get_last_dagrun(include_externally_triggered=True, session=session)
148+
last_dagrun = dag.get_last_dagrun(include_manually_triggered=True, session=session)
149149
first_dagrun = session.scalar(
150150
select(DagRun)
151151
.where(DagRun.dag_id == dag.dag_id, DagRun.logical_date.is_not(None))

airflow/api/common/trigger_dag.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ def _trigger_dag(
113113
conf=run_conf,
114114
run_type=DagRunType.MANUAL,
115115
triggered_by=triggered_by,
116-
external_trigger=True,
117116
dag_version=dag_version,
118117
state=DagRunState.QUEUED,
119118
session=session,

airflow/api_connexion/endpoints/dag_run_endpoint.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ def _fetch_dag_runs(
185185
"start_date",
186186
"end_date",
187187
"updated_at",
188-
"external_trigger",
189188
"conf",
190189
]
191190
query = apply_sorting(query, order_by, to_replace, allowed_sort_attrs)
@@ -365,7 +364,6 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
365364
conf=post_body.get("conf"),
366365
run_type=DagRunType.MANUAL,
367366
triggered_by=DagRunTriggeredByType.REST_API,
368-
external_trigger=True,
369367
dag_version=DagVersion.get_latest_version(dag.dag_id),
370368
state=DagRunState.QUEUED,
371369
session=session,

airflow/api_connexion/schemas/dag_run_schema.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ class Meta:
6868
start_date = auto_field(dump_only=True)
6969
end_date = auto_field(dump_only=True)
7070
state = DagStateField(dump_only=True)
71-
external_trigger = auto_field(dump_default=True, dump_only=True)
7271
conf = ConfObject()
7372
data_interval_start = auto_field(validate=validate_istimezone)
7473
data_interval_end = auto_field(validate=validate_istimezone)

airflow/api_fastapi/core_api/datamodels/dag_run.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ class DAGRunResponse(BaseModel):
7171
last_scheduling_decision: datetime | None
7272
run_type: DagRunType
7373
state: DagRunState
74-
external_trigger: bool
7574
triggered_by: DagRunTriggeredByType
7675
conf: dict
7776
note: str | None

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8527,9 +8527,6 @@ components:
85278527
$ref: '#/components/schemas/DagRunType'
85288528
state:
85298529
$ref: '#/components/schemas/DagRunState'
8530-
external_trigger:
8531-
type: boolean
8532-
title: External Trigger
85338530
triggered_by:
85348531
$ref: '#/components/schemas/DagRunTriggeredByType'
85358532
conf:
@@ -8554,7 +8551,6 @@ components:
85548551
- last_scheduling_decision
85558552
- run_type
85568553
- state
8557-
- external_trigger
85588554
- triggered_by
85598555
- conf
85608556
- note

airflow/api_fastapi/core_api/routes/public/assets.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,6 @@ def materialize_asset(
297297
run_after=run_after,
298298
run_type=DagRunType.MANUAL,
299299
triggered_by=DagRunTriggeredByType.REST_API,
300-
external_trigger=True,
301300
dag_version=DagVersion.get_latest_version(dag_id, session=session),
302301
state=DagRunState.QUEUED,
303302
session=session,

airflow/api_fastapi/core_api/routes/public/dag_run.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,6 @@ def get_dag_runs(
286286
"start_date",
287287
"end_date",
288288
"updated_at",
289-
"external_trigger",
290289
"conf",
291290
],
292291
DagRun,
@@ -367,7 +366,6 @@ def trigger_dag_run(
367366
conf=params["conf"],
368367
run_type=DagRunType.MANUAL,
369368
triggered_by=DagRunTriggeredByType.REST_API,
370-
external_trigger=True,
371369
dag_version=DagVersion.get_latest_version(dag.dag_id),
372370
state=DagRunState.QUEUED,
373371
session=session,
@@ -421,7 +419,6 @@ def get_list_dag_runs_batch(
421419
"start_date",
422420
"end_date",
423421
"updated_at",
424-
"external_trigger",
425422
"conf",
426423
],
427424
DagRun,

airflow/api_fastapi/execution_api/datamodels/taskinstance.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,6 @@ class DagRun(StrictBaseModel):
230230
clear_number: int
231231
run_type: DagRunType
232232
conf: Annotated[dict[str, Any], Field(default_factory=dict)]
233-
external_trigger: bool = False
234233

235234

236235
class TIRunContext(BaseModel):

0 commit comments

Comments
 (0)