Skip to content

Commit 0a4d03a

Browse files
Dev-iLclaude
andcommitted
Add AIR304: flag runtime-varying values in DAG/task constructor arguments
Using runtime-varying values (like `datetime.now()`) as arguments to Airflow DAG or task constructors causes the serialized DAG hash to change on every parse, creating infinite DAG versions in the database. This rule detects such calls in DAG constructors, @dag decorators, operator/sensor constructors, and @task decorators, recursively checking through binary ops, dicts, lists, sets, tuples, and f-strings. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f8d1d29 commit 0a4d03a

8 files changed

Lines changed: 403 additions & 0 deletions

File tree

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import random
2+
import time
3+
import uuid
4+
from datetime import date, datetime, timedelta
5+
6+
import pendulum
7+
from airflow import DAG, dag
8+
from airflow.decorators import task
9+
from airflow.operators.bash import BashOperator
10+
from airflow.providers.standard.sensors.python import PythonSensor
11+
12+
13+
# Violations
14+
15+
DAG(dag_id="a", start_date=datetime.now())
16+
DAG(dag_id="b", start_date=datetime.now() - timedelta(days=1))
17+
DAG(dag_id="c", default_args={"start_date": datetime.now()})
18+
DAG(dag_id="d", start_date=datetime.utcnow())
19+
DAG(dag_id="e", start_date=datetime.today())
20+
DAG(dag_id="f", start_date=date.today())
21+
DAG(dag_id="g", tags=[f"v{random.randint(1, 9)}"])
22+
DAG(dag_id="h", start_date=pendulum.now())
23+
DAG(dag_id="i", start_date=pendulum.yesterday())
24+
DAG(dag_id="j", start_date=pendulum.tomorrow())
25+
DAG(dag_id="k", start_date=pendulum.today())
26+
DAG(dag_id="l", owner=f"team-{uuid.uuid4()}")
27+
DAG(dag_id="m", description=f"built at {time.time()}")
28+
29+
30+
@dag(start_date=pendulum.now())
31+
def my_dag():
32+
pass
33+
34+
35+
BashOperator(task_id="t", bash_command="echo hi", start_date=datetime.today())
36+
37+
PythonSensor(task_id="s", start_date=datetime.now())
38+
39+
40+
@task(start_date=datetime.utcnow())
41+
def my_task():
42+
pass
43+
44+
45+
# Non-violations
46+
47+
DAG(dag_id="ok_a", start_date=datetime(2024, 1, 1))
48+
DAG(dag_id="ok_b", start_date=pendulum.datetime(2024, 1, 1))
49+
DAG(dag_id="ok_c", schedule=timedelta(hours=1))
50+
DAG(dag_id="ok_d", default_args={"retries": 3})
51+
52+
BashOperator(task_id="t_ok", bash_command="echo hello")
53+
54+
55+
@task(retries=3)
56+
def my_static_task():
57+
pass
58+
59+
60+
# Non-airflow function call with dynamic arg — should not trigger
61+
def not_airflow(start_date):
62+
pass
63+
64+
65+
not_airflow(start_date=datetime.now())

crates/ruff_linter/src/checkers/ast/analyze/expression.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,6 +1284,9 @@ pub(crate) fn expression(expr: &Expr, checker: &Checker) {
12841284
if checker.is_rule_enabled(Rule::Airflow3IncompatibleFunctionSignature) {
12851285
airflow::rules::airflow_3_incompatible_function_signature(checker, expr);
12861286
}
1287+
if checker.is_rule_enabled(Rule::Airflow3DagDynamicValue) {
1288+
airflow::rules::airflow_3_dag_dynamic_value(checker, call);
1289+
}
12871290
if checker.is_rule_enabled(Rule::UnnecessaryCastToInt) {
12881291
ruff::rules::unnecessary_cast_to_int(checker, call);
12891292
}

crates/ruff_linter/src/codes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,6 +1130,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> {
11301130
(Airflow, "301") => rules::airflow::rules::Airflow3Removal,
11311131
(Airflow, "302") => rules::airflow::rules::Airflow3MovedToProvider,
11321132
(Airflow, "303") => rules::airflow::rules::Airflow3IncompatibleFunctionSignature,
1133+
(Airflow, "304") => rules::airflow::rules::Airflow3DagDynamicValue,
11331134
(Airflow, "311") => rules::airflow::rules::Airflow3SuggestedUpdate,
11341135
(Airflow, "312") => rules::airflow::rules::Airflow3SuggestedToMoveToProvider,
11351136
(Airflow, "321") => rules::airflow::rules::Airflow31Moved,

crates/ruff_linter/src/rules/airflow/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ mod tests {
5050
#[test_case(Rule::Airflow3MovedToProvider, Path::new("AIR302_standard.py"))]
5151
#[test_case(Rule::Airflow3MovedToProvider, Path::new("AIR302_try.py"))]
5252
#[test_case(Rule::Airflow3IncompatibleFunctionSignature, Path::new("AIR303.py"))]
53+
#[test_case(Rule::Airflow3DagDynamicValue, Path::new("AIR304.py"))]
5354
#[test_case(Rule::Airflow3SuggestedUpdate, Path::new("AIR311_args.py"))]
5455
#[test_case(Rule::Airflow3SuggestedUpdate, Path::new("AIR311_names.py"))]
5556
#[test_case(Rule::Airflow3SuggestedUpdate, Path::new("AIR311_try.py"))]

crates/ruff_linter/src/rules/airflow/rules/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub(crate) use function_signature_change_in_3::*;
33
pub(crate) use moved_in_3_1::*;
44
pub(crate) use moved_to_provider_in_3::*;
55
pub(crate) use removal_in_3::*;
6+
pub(crate) use runtime_value_in_dag_or_task::*;
67
pub(crate) use suggested_to_move_to_provider_in_3::*;
78
pub(crate) use suggested_to_update_3_0::*;
89
pub(crate) use task_variable_name::*;
@@ -12,6 +13,7 @@ mod function_signature_change_in_3;
1213
mod moved_in_3_1;
1314
mod moved_to_provider_in_3;
1415
mod removal_in_3;
16+
mod runtime_value_in_dag_or_task;
1517
mod suggested_to_move_to_provider_in_3;
1618
mod suggested_to_update_3_0;
1719
mod task_variable_name;
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
use ruff_macros::{ViolationMetadata, derive_message_formats};
2+
use ruff_python_ast::name::QualifiedName;
3+
use ruff_python_ast::{self as ast, Expr, ExprCall, InterpolatedStringElement};
4+
use ruff_python_semantic::{Modules, SemanticModel};
5+
use ruff_text_size::Ranged;
6+
7+
use crate::checkers::ast::Checker;
8+
use crate::rules::airflow::helpers::is_airflow_builtin_or_provider;
9+
use crate::{FixAvailability, Violation};
10+
11+
/// ## What it does
12+
/// Checks for calls to runtime-varying functions (such as `datetime.now()`)
13+
/// used as arguments in Airflow DAG or task constructors.
14+
///
15+
/// ## Why is this bad?
16+
/// Using runtime-varying values as arguments to DAG or task constructors
17+
/// causes the serialized DAG hash to change on every parse, creating
18+
/// infinite DAG versions in the `dag_version` and `serialized_dag` tables.
19+
/// This leads to unbounded database growth and can eventually cause
20+
/// out-of-memory conditions.
21+
///
22+
/// ## Example
23+
/// ```python
24+
/// from datetime import datetime
25+
///
26+
/// from airflow import DAG
27+
///
28+
/// dag = DAG(dag_id="my_dag", start_date=datetime.now())
29+
/// ```
30+
///
31+
/// Use instead:
32+
/// ```python
33+
/// from datetime import datetime
34+
///
35+
/// from airflow import DAG
36+
///
37+
/// dag = DAG(dag_id="my_dag", start_date=datetime(2024, 1, 1))
38+
/// ```
39+
#[derive(ViolationMetadata)]
40+
#[violation_metadata(preview_since = "0.14.11")]
41+
pub(crate) struct Airflow3DagDynamicValue {
42+
function_name: String,
43+
}
44+
45+
impl Violation for Airflow3DagDynamicValue {
46+
const FIX_AVAILABILITY: FixAvailability = FixAvailability::None;
47+
48+
#[derive_message_formats]
49+
fn message(&self) -> String {
50+
let Airflow3DagDynamicValue { function_name } = self;
51+
format!(
52+
"`{function_name}()` produces a value that changes at runtime; using it in a DAG or task argument causes infinite DAG version creation"
53+
)
54+
}
55+
}
56+
57+
/// AIR304
58+
pub(crate) fn airflow_3_dag_dynamic_value(checker: &Checker, call: &ExprCall) {
59+
if !checker.semantic().seen_module(Modules::AIRFLOW) {
60+
return;
61+
}
62+
63+
let Some(qualified_name) = checker.semantic().resolve_qualified_name(&call.func) else {
64+
return;
65+
};
66+
67+
if !is_dag_or_task_constructor(&qualified_name) {
68+
return;
69+
}
70+
71+
for keyword in &call.arguments.keywords {
72+
if let Some((expr, name)) = find_runtime_varying_call(&keyword.value, checker.semantic()) {
73+
checker.report_diagnostic(
74+
Airflow3DagDynamicValue {
75+
function_name: name.to_string(),
76+
},
77+
expr.range(),
78+
);
79+
}
80+
}
81+
}
82+
83+
/// Check if the qualified name refers to a DAG constructor, `@dag` decorator,
84+
/// operator, sensor, or `@task` decorator.
85+
fn is_dag_or_task_constructor(qualified_name: &QualifiedName) -> bool {
86+
let segments = qualified_name.segments();
87+
matches!(segments, ["airflow", .., "DAG" | "dag"])
88+
|| matches!(segments, ["airflow", "decorators" | "sdk", "task"])
89+
|| is_airflow_builtin_or_provider(segments, "operators", "Operator")
90+
|| is_airflow_builtin_or_provider(segments, "sensors", "Sensor")
91+
}
92+
93+
/// Recursively check an expression for calls to known runtime-varying functions.
94+
/// Returns the call expression and a display name (e.g., `"datetime.now"`) if found.
95+
fn find_runtime_varying_call<'a>(
96+
expr: &'a Expr,
97+
semantic: &SemanticModel,
98+
) -> Option<(&'a Expr, &'static str)> {
99+
match expr {
100+
Expr::Call(ExprCall { func, .. }) => {
101+
if let Some(qualified_name) = semantic.resolve_qualified_name(func) {
102+
let name = match qualified_name.segments() {
103+
["datetime", "datetime", "now"] => Some("datetime.now"),
104+
["datetime", "datetime", "utcnow"] => Some("datetime.utcnow"),
105+
["datetime", "datetime", "today"] => Some("datetime.today"),
106+
["datetime", "date", "today"] => Some("date.today"),
107+
["pendulum", "now"] => Some("pendulum.now"),
108+
["pendulum", "today"] => Some("pendulum.today"),
109+
["pendulum", "yesterday"] => Some("pendulum.yesterday"),
110+
["pendulum", "tomorrow"] => Some("pendulum.tomorrow"),
111+
["time", "time"] => Some("time.time"),
112+
["uuid", "uuid1"] => Some("uuid.uuid1"),
113+
["uuid", "uuid4"] => Some("uuid.uuid4"),
114+
["random", "random"] => Some("random.random"),
115+
["random", "randint"] => Some("random.randint"),
116+
["random", "choice"] => Some("random.choice"),
117+
["random", "uniform"] => Some("random.uniform"),
118+
["random", "randrange"] => Some("random.randrange"),
119+
["random", "sample"] => Some("random.sample"),
120+
["random", "getrandbits"] => Some("random.getrandbits"),
121+
_ => None,
122+
};
123+
if let Some(name) = name {
124+
return Some((expr, name));
125+
}
126+
}
127+
None
128+
}
129+
Expr::BinOp(ast::ExprBinOp { left, right, .. }) => {
130+
find_runtime_varying_call(left, semantic)
131+
.or_else(|| find_runtime_varying_call(right, semantic))
132+
}
133+
Expr::UnaryOp(ast::ExprUnaryOp { operand, .. }) => {
134+
find_runtime_varying_call(operand, semantic)
135+
}
136+
Expr::Dict(ast::ExprDict { items, .. }) => items
137+
.iter()
138+
.find_map(|item| find_runtime_varying_call(&item.value, semantic)),
139+
Expr::List(ast::ExprList { elts, .. })
140+
| Expr::Tuple(ast::ExprTuple { elts, .. })
141+
| Expr::Set(ast::ExprSet { elts, .. }) => elts
142+
.iter()
143+
.find_map(|elt| find_runtime_varying_call(elt, semantic)),
144+
Expr::FString(ast::ExprFString { value, .. }) => value.elements().find_map(|element| {
145+
if let InterpolatedStringElement::Interpolation(interpolation) = element {
146+
find_runtime_varying_call(&interpolation.expression, semantic)
147+
} else {
148+
None
149+
}
150+
}),
151+
_ => None,
152+
}
153+
}

0 commit comments

Comments
 (0)