Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- **`staged_upload` finalize() ran the full upload/trigger/poll lifecycle on empty input** (PR for #340-adjacent Step 2e contract): a transient empty source — where every `stage()` call received `[]` — caused `StagedUploadDestination.finalize()` to serialize a 0-byte file, then POST it to the configured `stage.url`, then POST again to the configured `trigger.url`, then (if configured) poll. The result was a wasted upload + trigger + a zero-row job whose lifecycle still allocated quotas on the third-party API. `SalesforceBulkDestination.finalize()` already had the right guard (`if not self._records: return SyncResult(rows_extracted=0)` at the top); `StagedUploadDestination.finalize()` was missing it. Added the same short-circuit immediately after `record_count = len(self._records)` so no auth / upload / trigger / poll work runs when nothing was staged. Surfaced by the new contract test in this PR — the contract assertion against `httpx.Client.send` caught a single POST to `https://upload.example.com/files` during finalize() on empty input.

### Changed (Internal)

- **Destination contract tests — Step 2e: `StagedDestination` Protocol (staged_upload + salesforce_bulk)** (final follow-up to Step 2c PR [#604](https://github.com/drt-hub/drt/pull/604) / Step 2d PR [#605](https://github.com/drt-hub/drt/pull/605)): closes out the empty-batch contract suite with a new `tests/contracts/test_destination_staged_empty_batch.py` module covering the two destinations using the `StagedDestination` Protocol (`stage()` + `finalize()` shape) — `staged_upload` (generic three-phase upload) and `salesforce_bulk` (Salesforce Bulk API 2.0). Three contracts: Protocol satisfaction (`isinstance(dest, StagedDestination)`), empty `SyncResult` after only empty `stage([])` call(s) + `finalize()`, and zero `httpx.Client.send` calls during `finalize()` when nothing was staged. The third contract is load-bearing — the engine calls `finalize()` regardless of whether any batch accumulated records, so without the empty-source guard the destination wastes auth + upload + trigger + poll on a zero-row job. **The contract surfaced a real bug in `staged_upload`** (see the `Fixed` entry above); `salesforce_bulk` already passed because it had the guard. Same record-then-assert tripwire pattern as #604 (records into a captured list rather than raising, so broad `except Exception` row-error handlers in destinations can't swallow the AssertionError and mask the bug). 6 tests (2 destinations × 3 contracts). **Empty-batch contract suite is now complete: 24 of 24 registered destinations** (#593 HTTP × 4 + #594 file × 3 + parquet + #595 SQL × 4 + #604 API × 11 + #605 special-transport × 2 + this PR × 2 stage-shape).
- **Destination contract tests — Step 2d: special-transport destinations (email_smtp + google_sheets)** (follow-up to Step 2c PR [#604](https://github.com/drt-hub/drt/pull/604)): extends the empty-batch contract suite with a new `tests/contracts/test_destination_special_empty_batch.py` module covering the two destinations that use neither `httpx` nor the filesystem. `email_smtp` (stdlib `smtplib`) is asserted via an `smtplib.SMTP.__init__` / `smtplib.SMTP_SSL.__init__` tripwire that records every constructor call — the `smtplib.SMTP(host, port, ...)` constructor opens the TCP connection synchronously, so any connection attempt on empty input would show up in the captured list. `google_sheets` (`googleapiclient.discovery` via the optional `[sheets]` extra) uses the implicit "no driver was imported" contract pattern from #595's SQL module — CI's minimal install does not include `[sheets]`, so a regression that lost the empty-records short-circuit would surface as `ModuleNotFoundError` immediately when `_build_sheets_service` (the lazy-import wrapper) is reached. An explicit `monkeypatch` on `_build_sheets_service` complements the implicit check. 6 tests (2 destinations × 3 contracts), all passing on first run. After this PR the empty-batch contract suite covers **24 of 25 destinations** (#593 HTTP × 4 + #594 file × 3 + parquet + #595 SQL × 4 + #604 API × 11 + this PR × 2). Remaining: `salesforce_bulk` + `staged_upload`, both of which use the `StagedDestination` Protocol with `stage()` instead of `load()` — different contract shape, separate follow-up.
- **Destination contract tests — Step 2c: hardcoded-endpoint API destinations** (follow-up to [#593](https://github.com/drt-hub/drt/pull/593) / [#594](https://github.com/drt-hub/drt/pull/594) / [#595](https://github.com/drt-hub/drt/pull/595)): extends the empty-batch contract suite with a new `tests/contracts/test_destination_api_empty_batch.py` module covering 11 HTTP destinations that target **hardcoded** third-party API endpoints (HubSpot / Jira / Linear / Notion / Twilio / Amplitude / Zendesk / Google Ads / SendGrid / Intercom / GitHub Actions) — where `pytest_httpserver` can't be used because the destination doesn't read the URL from config. Same three contracts as the sibling modules (Protocol satisfaction, empty `SyncResult`, no network I/O) but the no-network-I/O assertion uses a `httpx.Client.send` / `httpx.AsyncClient.send` monkeypatch tripwire that records any attempted call into a captured list, asserted at test level. Records-then-asserts at test level instead of raising inside the patch — some destinations have broad `except Exception` row-error handlers that would swallow an `AssertionError` raised from the tripwire, masking the bug. 33 tests (11 destinations × 3 contracts), all passing on first run — every covered destination already short-circuits on empty input. Combined with #593 (HTTP × 4) + #594 (file × 3 + parquet) + #595 (SQL × 4), this brings the empty-batch contract coverage to 22 of 25 destinations. Remaining: `email_smtp` (uses `smtplib`, different transport), `google_sheets` (uses `googleapiclient.discovery`), `salesforce_bulk` + `staged_upload` (use the `StagedDestination` Protocol with `stage()` rather than `load()` — different contract shape) — those land in a follow-up.

Expand Down
9 changes: 9 additions & 0 deletions drt/destinations/staged_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ def finalize(
context: dict[str, str] = {}
record_count = len(self._records)

# Empty-source short-circuit — no auth, no upload, no trigger, no
# poll. Mirrors the same guard at the top of
# SalesforceBulkDestination.finalize(). The engine calls
# finalize() regardless of whether stage() ever received records,
# so without this guard a transient empty source produces a
# zero-row upload + job that wastes the trigger / poll cycle.
if record_count == 0:
return result

try:
# Phase 1: Stage — serialize and upload file
file_bytes = self._serialize(config.format)
Expand Down
192 changes: 192 additions & 0 deletions tests/contracts/test_destination_staged_empty_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
"""Empty-batch contract for the ``StagedDestination`` Protocol.

Sibling to:

- ``test_destination_empty_batch.py`` (HTTP webhooks via
``pytest_httpserver``)
- ``test_destination_file_empty_batch.py`` (filesystem)
- ``test_destination_sql_empty_batch.py`` (SQL drivers, lazy import)
- ``test_destination_api_empty_batch.py`` (HTTP API via ``httpx``)
- ``test_destination_special_empty_batch.py`` (smtplib + googleapiclient)

The ``StagedDestination`` Protocol
(:class:`drt.destinations.base.StagedDestination`) is the
"accumulate + upload" shape used by destinations that talk to async
bulk-upload APIs:

- ``salesforce_bulk`` — Salesforce Bulk API 2.0 (auth → create job →
upload → close → poll → fetch errors)
- ``staged_upload`` — generic three-phase upload (stage upload →
trigger job → poll for completion)

The shape is:

- ``stage(records, config, opts) -> None`` — accumulate records, no
HTTP allowed
- ``finalize(config, opts) -> SyncResult`` — upload + trigger + poll

Contracts under test:

1. The destination satisfies the ``StagedDestination`` Protocol.
2. After only empty ``stage([])`` call(s), ``finalize()`` returns
``SyncResult(success=0, failed=0, skipped=0)``.
3. After only empty ``stage([])`` call(s), ``finalize()`` never calls
``httpx.Client.send`` — the empty-source short-circuit must live
in ``finalize()`` itself, because the engine calls ``finalize()``
regardless of whether any batch accumulated records.

Why the third contract is load-bearing: a StagedDestination that
attempted the auth → upload → trigger → poll dance on empty input
would burn API quota and create a zero-row job (e.g. a Salesforce
Bulk job whose ``numberRecordsProcessed`` is zero), wasting an OAuth
token refresh and the job-id allocation. The contract pins the
short-circuit behaviour as required, not optional.

Tripwire mechanism is the same record-then-assert pattern as #604:
the patch captures attempted calls into a list rather than raising,
so broad ``except Exception`` row-error handlers in destinations
can't swallow the AssertionError and mask the bug the contract is
meant to catch.
"""

from __future__ import annotations

from collections.abc import Callable
from typing import Any

import pytest

from drt.config.models import (
SalesforceBulkDestinationConfig,
StagedUploadDestinationConfig,
StagedUploadPhaseConfig,
SyncOptions,
)
from drt.destinations.base import StagedDestination, SyncResult
from drt.destinations.salesforce_bulk import SalesforceBulkDestination
from drt.destinations.staged_upload import StagedUploadDestination

# Each entry: (destination class, factory → valid config).
STAGED_DESTINATIONS: list[Any] = [
pytest.param(
StagedUploadDestination,
lambda: StagedUploadDestinationConfig(
type="staged_upload",
format="csv",
stage=StagedUploadPhaseConfig(
url="https://upload.example.com/files",
method="POST",
),
trigger=StagedUploadPhaseConfig(
url="https://api.example.com/jobs",
method="POST",
),
poll=None,
),
id="staged_upload",
),
pytest.param(
SalesforceBulkDestination,
lambda: SalesforceBulkDestinationConfig(
type="salesforce_bulk",
instance_url="https://example.my.salesforce.com",
object_name="Contact",
operation="upsert",
external_id_field="External_Id__c",
client_id_env="SF_CLIENT_ID_TEST_UNSET",
client_secret_env="SF_CLIENT_SECRET_TEST_UNSET",
username_env="SF_USERNAME_TEST_UNSET",
password_env="SF_PASSWORD_TEST_UNSET",
),
id="salesforce_bulk",
),
]


@pytest.fixture
def block_httpx(monkeypatch: pytest.MonkeyPatch) -> list[tuple[Any, ...]]:
"""Patch ``httpx.Client.send`` (sync + async) to a tripwire that records
any attempted HTTP call as a captured tuple instead of raising.

See module docstring for why we record-then-assert at test level
rather than raising inside the patch.
"""
captured: list[tuple[Any, ...]] = []

def _record(self: Any, request: Any, *args: Any, **kwargs: Any) -> Any:
captured.append((self, request, args, kwargs))
return None

monkeypatch.setattr("httpx.Client.send", _record)
monkeypatch.setattr("httpx.AsyncClient.send", _record)
return captured


@pytest.fixture
def empty_sync_options() -> SyncOptions:
"""Minimal SyncOptions — staged destinations don't read rate-limit."""
return SyncOptions(mode="full", batch_size=100, on_error="skip")


@pytest.mark.parametrize("destination_class, config_factory", STAGED_DESTINATIONS)
def test_satisfies_staged_destination_protocol(
destination_class: type,
config_factory: Callable[[], Any],
) -> None:
"""Every staged destination satisfies the ``StagedDestination`` Protocol."""
dest = destination_class()
assert isinstance(dest, StagedDestination)


@pytest.mark.parametrize("destination_class, config_factory", STAGED_DESTINATIONS)
def test_empty_stage_then_finalize_returns_empty_sync_result(
destination_class: type,
config_factory: Callable[[], Any],
empty_sync_options: SyncOptions,
block_httpx: list[tuple[Any, ...]],
) -> None:
"""``stage([])`` then ``finalize()`` returns ``SyncResult(0, 0, 0)``.

Multiple empty stage calls are exercised because the engine may
invoke stage() many times if the source produced empty batches
across windows — the contract holds across the whole shape, not
just a single call.
"""
dest = destination_class()
config = config_factory()

dest.stage([], config, empty_sync_options)
dest.stage([], config, empty_sync_options)
result = dest.finalize(config, empty_sync_options)

assert isinstance(result, SyncResult)
assert result.success == 0
assert result.failed == 0
assert result.skipped == 0


@pytest.mark.parametrize("destination_class, config_factory", STAGED_DESTINATIONS)
def test_empty_stage_then_finalize_makes_no_http_request(
destination_class: type,
config_factory: Callable[[], Any],
empty_sync_options: SyncOptions,
block_httpx: list[tuple[Any, ...]],
) -> None:
"""``finalize()`` after only empty ``stage([])`` never calls
``httpx.Client.send``.

The load-bearing contract: a StagedDestination must short-circuit
in ``finalize()`` when nothing was staged, rather than running the
full auth → upload → trigger → poll lifecycle on an empty payload.
"""
dest = destination_class()
config = config_factory()

dest.stage([], config, empty_sync_options)
dest.finalize(config, empty_sync_options)

assert len(block_httpx) == 0, (
f"{destination_class.__name__} made {len(block_httpx)} HTTP "
"request(s) during finalize() after only empty stage([]) call(s); "
"StagedDestinations must short-circuit when nothing was staged."
)
Loading