Skip to content

Commit 89223c9

Browse files
tatianaromsharon98
authored andcommitted
Change dataset URI validation to raise warning instead of error in Airflow 2.9 (apache#39670)
Closes: apache#39486 # Context Valid DAGs that worked in Airflow 2.8.x and had tasks with outlets with specific URIs, such as `Dataset("postgres://postgres:5432/postgres.dbt.stg_customers")`, stopped working in Airflow 2.9.0 & Airflow 2.9.1, after apache#37005 was merged. This was a breaking change in an Airflow minor version. We should avoid this. Airflow < 3.0 should raise a warning, and from Airflow 3.0, we can make errors by default. We can have a feature flag to allow users who want to see this in advance to enable errors in Airflow 2. x, but this should not be the default behaviour. The DAGs should continue working on Airflow 2.x minor/micro releases without errors (unless the user opts in via configuration). # How to reproduce By running the following DAG with `apache-airflow==2.9.1` and `apache-airflow-providers-postgres==5.11.0`, as an example: ``` from datetime import datetime from airflow import DAG from airflow.datasets import Dataset from airflow.operators.empty import EmptyOperator with DAG(dag_id='empty_operator_example', start_date=datetime(2022, 1, 1), schedule_interval=None) as dag: task1 = EmptyOperator( task_id='empty_task1', dag=dag, outlets=[Dataset("postgres://postgres:5432/postgres.dbt.stg_customers")] ) task2 = EmptyOperator( task_id='empty_task2', dag=dag ) task1 >> task2 ``` Causes to the exception: ``` Broken DAG: [/usr/local/airflow/dags/example_issue.py] Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format postgres:// must contain database, schema, and table names ``` # About the changes introduced This PR introduces the following: 1. A boolean configuration within `[core],` named `strict_dataset_uri_validation,` which should be `False` by default. 2. When this configuration is `False,` Airflow should raise a warning saying: ``` From Airflow 3, Airflow will be more strict with Dataset URIs, and the URI xx will no longer be valid. Please, follow the expected standard as documented in XX. ``` 3. If this configuration is `True,` Airflow should raise the exception, as it does now in Airflow 2.9.0 and 2.9.1 4. From Airflow 3.0, we change this configuration to be `True` by default.
1 parent d013cdc commit 89223c9

3 files changed

Lines changed: 55 additions & 2 deletions

File tree

airflow/config_templates/config.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,15 @@ core:
464464
sensitive: true
465465
default: ~
466466
example: '{"some_param": "some_value"}'
467+
strict_dataset_uri_validation:
468+
description: |
469+
Dataset URI validation should raise an exception if it is not compliant with AIP-60.
470+
By default this configuration is false, meaning that Airflow 2.x only warns the user.
471+
In Airflow 3, this configuration will be enabled by default.
472+
default: "False"
473+
example: ~
474+
version_added: 2.9.2
475+
type: boolean
467476
database_access_isolation:
468477
description: (experimental) Whether components should use Airflow Internal API for DB connectivity.
469478
version_added: 2.6.0

airflow/datasets/__init__.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
if TYPE_CHECKING:
2828
from urllib.parse import SplitResult
2929

30+
31+
from airflow.configuration import conf
32+
3033
__all__ = ["Dataset", "DatasetAll", "DatasetAny"]
3134

3235

@@ -87,7 +90,18 @@ def _sanitize_uri(uri: str) -> str:
8790
fragment="", # Ignore any fragments.
8891
)
8992
if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None:
90-
parsed = normalizer(parsed)
93+
try:
94+
parsed = normalizer(parsed)
95+
except ValueError as exception:
96+
if conf.getboolean("core", "strict_dataset_uri_validation", fallback=False):
97+
raise exception
98+
else:
99+
warnings.warn(
100+
f"The dataset URI {uri} is not AIP-60 compliant. "
101+
f"In Airflow 3, this will raise an exception. More information: {repr(exception)}",
102+
UserWarning,
103+
stacklevel=3,
104+
)
91105
return urllib.parse.urlunsplit(parsed)
92106

93107

tests/datasets/test_dataset.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,17 @@
2020
import os
2121
from collections import defaultdict
2222
from typing import Callable
23+
from unittest.mock import patch
2324

2425
import pytest
2526
from sqlalchemy.sql import select
2627

27-
from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny
28+
from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny, _sanitize_uri
2829
from airflow.models.dataset import DatasetDagRunQueue, DatasetModel
2930
from airflow.models.serialized_dag import SerializedDagModel
3031
from airflow.operators.empty import EmptyOperator
3132
from airflow.serialization.serialized_objects import BaseSerialization, SerializedDAG
33+
from tests.test_utils.config import conf_vars
3234

3335

3436
@pytest.fixture
@@ -441,3 +443,31 @@ def test_datasets_expression_error(expression: Callable[[], None], error: str) -
441443
with pytest.raises(TypeError) as info:
442444
expression()
443445
assert str(info.value) == error
446+
447+
448+
def mock_get_uri_normalizer(normalized_scheme):
449+
def normalizer(uri):
450+
raise ValueError("Incorrect URI format")
451+
452+
return normalizer
453+
454+
455+
@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer)
456+
@patch("airflow.datasets.warnings.warn")
457+
def test__sanitize_uri_raises_warning(mock_warn):
458+
_sanitize_uri("postgres://localhost:5432/database.schema.table")
459+
msg = mock_warn.call_args.args[0]
460+
assert "The dataset URI postgres://localhost:5432/database.schema.table is not AIP-60 compliant." in msg
461+
assert (
462+
"In Airflow 3, this will raise an exception. More information: ValueError('Incorrect URI format')"
463+
in msg
464+
)
465+
466+
467+
@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer)
468+
@conf_vars({("core", "strict_dataset_uri_validation"): "True"})
469+
def test__sanitize_uri_raises_exception():
470+
with pytest.raises(ValueError) as e_info:
471+
_sanitize_uri("postgres://localhost:5432/database.schema.table")
472+
assert isinstance(e_info.value, ValueError)
473+
assert str(e_info.value) == "Incorrect URI format"

0 commit comments

Comments
 (0)