Skip to content

(feat) use Incremental instances as Relation filters #3750

@rudolfix

Description

@rudolfix

Background

Incremental instances are used to filter source data in sql_database (via SQLAlchemy WHERE), rest_api (via URL parameter substitution), and other sources. This ticket extends the same concept to Relation — generating SQLGlot WHERE clauses from Incremental state.

This ticket does not cover ModelIncremental (pluggable transform for automatic incremental on models) or common time sources for end_value — those are separate follow-ups.

requires: PR #3590 (magic joins)

Public Interface

Dataset.table() extension

def table(
    self,
    table_name: str,
    *,
    load_ids: Optional[Collection[str]] = None,
    incremental: Optional[Incremental] = None,  # NEW
    **kwargs: Any,
) -> dlt.Relation:

When incremental is provided, equivalent to dataset.table(name).incremental(incremental).

Relation.incremental()

def incremental(self, incremental: Incremental) -> Self:
    """Apply incremental filtering as a WHERE clause on this relation.

    Generates a SQL WHERE condition from the Incremental's cursor state
    (last_value, end_value, range boundaries). Only supports `max` and
    `min` as `last_value_func` — custom functions raise ValueError.

    The cursor_path is interpreted as:
    - Simple name ("created_at") — WHERE on this relation's column
    - Dotted name ("_dlt_loads.inserted_at") — auto-join the table
      (via schema references from PR #3590), then WHERE on the
      joined column

    Args:
        incremental: Incremental instance with cursor configuration.

    Returns:
        New Relation with the WHERE clause applied.
    """

Relation.is_incremental

@property
def is_incremental(self) -> bool:
    """Returns True if this relation has an incremental WHERE clause."""

Walks the SQLGlot AST looking for nodes with meta.get("dlt_incremental") — no flag needed on the Relation instance, since SQLGlot .meta survives .copy() and all AST transformations.

Cursor on another table

The user can define the incremental column as "table.column". This triggers an auto-join (PR #3590) on "table" and builds the WHERE clause on "column".

Example — incremental on _dlt_loads.inserted_at:

dataset.table("orders", incremental=dlt.sources.incremental("_dlt_loads.inserted_at"))

This joins orders._dlt_load_id == _dlt_loads._dlt_load_id (discovered from schema references) and filters on _dlt_loads."inserted_at".

Requirements:

  • Auto-join must succeed: the schema must contain an explicit table reference or nested table relationship between the base table and the join target.
  • If the target table is already joined (from a prior .join() call), reuse the existing join — don't re-join.
  • The auto-join must not modify the projection — no columns from the joined table are added to the SELECT list. Only the JOIN and WHERE are added.

Implementation

Relation.incremental() method

File: dlt/dataset/relation.py

def incremental(self, inc: Incremental) -> Self:
    table_name, cursor_column = _parse_incremental_cursor_path(inc.cursor_path)

    rel = self
    if table_name:
        # check if already joined
        qualifier_map = _extract_joined_table_aliases(rel.sqlglot_expression)
        if table_name not in qualifier_map:
            # auto-join without projection
            join_params, target_table, target_qualifier = _discover_join_params(rel, table_name)
            rel = _apply_join(
                rel, join_params,
                target_table=target_table,
                target_qualifier=target_qualifier,
                projection_prefix=table_name,
                project=False,  # NEW flag — skip _apply_join_projection
            )
            table_qualifier = target_qualifier
        else:
            table_qualifier = qualifier_map[table_name]

        column_ref = sge.Column(
            this=sge.to_identifier(cursor_column, quoted=True),
            table=sge.to_identifier(table_qualifier, quoted=False),
        )
    else:
        column_ref = sge.Column(this=sge.to_identifier(cursor_column, quoted=True))

    condition = _build_incremental_condition(inc, column_ref)
    condition.meta["dlt_incremental"] = True
    return rel.where(condition)

_apply_join extension — project flag

File: dlt/dataset/relation.py (in PR #3590's _apply_join function)

Add project: bool = True parameter. When False, skip the _apply_join_projection() call — the JOIN is added to the query but no columns from the joined table appear in the SELECT list.

def _apply_join(
    relation: _TRelation,
    join_params: list[_JoinParams],
    *,
    target_table: str,
    target_qualifier: str,
    projection_prefix: str,
    kind: TJoinType = "inner",
    project: bool = True,  # NEW
) -> _TRelation:
    query = relation.sqlglot_expression.copy()
    # ... build JOIN expressions (unchanged) ...

    if project:
        _apply_join_projection(...)

    rel = relation.__copy__()
    rel._sqlglot_expression = query
    return rel

WHERE generation — _build_incremental_condition

File: dlt/dataset/relation.py

Port of sql_database/helpers.py:172-196 logic from SQLAlchemy to SQLGlot:

def _build_incremental_condition(
    inc: Incremental,
    column_ref: sge.Column,
) -> sge.Expression:
    """Build SQLGlot WHERE condition from Incremental state.

    Only supports max and min as last_value_func.
    """
    last_value_func = inc.last_value_func
    if last_value_func not in (min, max):
        raise ValueError(
            "Incremental WHERE generation only supports min/max last_value_func,"
            f" got {last_value_func}"
        )

    start_value = inc.last_value  # reads from state, applies lag
    end_value = inc.end_value

    if last_value_func is max:
        start_op = sge.GTE if inc.range_start == "closed" else sge.GT
        end_op = sge.LT if inc.range_end == "open" else sge.LTE
    else:  # min
        start_op = sge.LTE if inc.range_start == "closed" else sge.LT
        end_op = sge.GT if inc.range_end == "open" else sge.GTE

    conditions = []
    if start_value is not None:
        conditions.append(start_op(this=column_ref, expression=_to_typed_literal(start_value)))
        if end_value is not None:
            conditions.append(end_op(this=column_ref, expression=_to_typed_literal(end_value)))
        if inc.on_cursor_value_missing == "include":
            # OR cursor IS NULL — wraps the existing conditions
            pass  # applied after AND combination below

    if inc.on_cursor_value_missing == "exclude":
        conditions.append(sge.Not(this=sge.Is(this=column_ref, expression=sge.Null())))

    if not conditions:
        raise ValueError("Incremental has no last_value and no on_cursor_value_missing='exclude'")

    result = conditions[0]
    for c in conditions[1:]:
        result = sge.And(this=result, expression=c)

    # wrap with OR IS NULL for "include" mode
    if inc.on_cursor_value_missing == "include" and start_value is not None:
        result = sge.Or(this=result, expression=sge.Is(this=column_ref, expression=sge.Null()))

    return result

Value conversion uses build_typed_literal() (already in dlt/dataset/relation.py) for type-safe SQLGlot literals.

Cursor path parsing — _parse_incremental_cursor_path

def _parse_incremental_cursor_path(cursor_path: str) -> tuple[Optional[str], str]:
    """Parse 'table.column' or 'column' from incremental cursor_path."""
    if "." in cursor_path:
        table_name, column_name = cursor_path.rsplit(".", 1)
        return table_name, column_name
    return None, cursor_path

is_incremental property

@property
def is_incremental(self) -> bool:
    for node in self.sqlglot_expression.walk():
        if node.meta.get("dlt_incremental"):
            return True
    return False

This works because SQLGlot .meta dict survives .copy() and embedding in larger ASTs.

Tests

Column incremental with various data types

  • datetime cursor: incremental("created_at", initial_value=pendulum.parse("2024-01-01"))
  • bigint cursor: incremental("id", initial_value=0)
  • Verify generated WHERE clause matches expected SQL for each type

Incremental parameters

  • range_start="open" vs "closed"> vs >=
  • range_end="open" vs "closed"< vs <=
  • last_value_func=min → reversed operators
  • end_value set → adds upper bound condition
  • on_cursor_value_missing="include"OR col IS NULL
  • on_cursor_value_missing="exclude"AND col IS NOT NULL
  • lag applied → verify last_value is adjusted
  • Custom last_value_func → raises ValueError

Failure modes

  • Column not found in schema → error from Relation.where() / column type lookup
  • last_value_func is not min or maxValueError
  • Dotted cursor path with unknown table → auto-join fails with schema reference error
  • Dotted cursor path with no reference chain → auto-join fails

Incremental on table level vs query

  • dataset.table("orders", incremental=...) → same as dataset.table("orders").incremental(...)
  • dataset.query("SELECT ...").incremental(...) → works on arbitrary queries (cursor column must be in projection)

Auto-join tests

  • Cursor on un-joined table → auto-join added, no projection pollution, WHERE on join qualifier
  • Cursor on already-joined table → no re-join, WHERE uses existing join qualifier
  • Auto-join wrapped in subsequent .select() / .where() → verify qualification still works
  • _dlt_loads.inserted_at example end-to-end with duckdb

is_incremental detection

  • relation.is_incremental is False before .incremental()
  • relation.is_incremental is True after .incremental()
  • Survives chaining: relation.incremental(...).select("id").is_incremental is True
  • Regular .where() does not set is_incremental

Note: project=False does not prevent access to joined columns

When the auto-join uses project=False, the joined table's columns are not in the SELECT list but are accessible in the FROM/JOIN clause. If the user later calls .select("id", "amount", "inserted_at"), the inserted_at column resolves to _dlt_loads.inserted_at at the database level.

This works both with and without merge_subqueries:

  • With merge: .select() collapses the subquery, and the SELECT sits directly on the JOIN — columns from both tables are visible.
  • Without merge: the inner SELECT * passes through all columns from both sides of the JOIN, so the outer SELECT can reference them.

The project=False flag keeps the default projection clean (only the base table's columns), but does not restrict what the user can explicitly select afterwards.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Status

In Progress

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions