Skip to content

insert_at step ordering silently destroyed by resource clone during pipeline.run() #3838

@AyushPatel101

Description

@AyushPatel101

dlt version

1.24.0

Describe the problem

When a user positions a pipe step (via add_map, add_filter, add_metrics, or add_step with insert_at) after the Incremental step, pipeline.run() silently reverses the ordering. The step ends up running before Incremental, producing incorrect results with no error or warning.

The step reordering happens in both cases, but the observable impact differs:

Incremental type Steps reordered? Data corruption? Why?
Hints-based (apply_hints) Yes Yes Incremental is a pure pipe step; step order = execution order
Signature-based (cursor param) Yes No IncrementalResourceWrapper.wrap() filters at the generator level; pipe step is redundant

When the bug does NOT trigger

Calling the resource directly (r = my_resource()) sets _args_bound = True, causing _eject_config to skip the eject/inject cycle. The pipe ordering is preserved. This masks the bug in simple test cases.

I am willing to submit a PR about this request

Expected behavior

insert_at positioning should be preserved across pipeline.run(). MetricsItem placed after Incremental should only see rows that survived filtering:

Steps: [gen, Incremental, MetricsItem]  — before and after clone
Output: row_count == 2  (only rows with updated_at >= 2)

Steps to reproduce

import dlt

@dlt.source
def my_source():
    @dlt.resource
    def events():
        yield [
            {"id": 1, "updated_at": 1},
            {"id": 2, "updated_at": 2},
            {"id": 3, "updated_at": 3},
        ]
    return events

def count_rows(items, meta, metrics):
    if isinstance(items, list):
        metrics["row_count"] = metrics.get("row_count", 0) + len(items)

source = my_source()
r = source.events  # _args_bound is False

r.apply_hints(incremental=dlt.sources.incremental("updated_at", initial_value=2))
r.add_metrics(count_rows, insert_at=len(r._pipe))
# Steps: [gen, Incremental(affinity=1), MetricsItem(affinity=0)]

pipeline = dlt.pipeline(pipeline_name="test", destination="duckdb", dev_mode=True)
pipeline.run(r)

for step in pipeline.last_trace.steps:
    if step.step == "extract":
        for load_id, metrics_list in step.step_info.metrics.items():
            for m in metrics_list:
                for resource_name, resource_m in m["resource_metrics"].items():
                    if resource_m.custom_metrics:
                        print(f"{resource_name}: {resource_m.custom_metrics}")

# Expected: row_count == 2  (2 rows pass incremental: updated_at >= 2)
# Actual:   row_count == 3  (all rows — MetricsItem ran BEFORE incremental)

Output

events: {'row_count': 3, 'unfiltered_items_count': 3, 'unfiltered_batches_count': 1, ...}

Operating system

Linux

Runtime environment

Local

Python version

3.13

dlt data source

No response

dlt destination

No response

Other deployment details

No response

Additional information

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

Status

In Progress

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions