Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **`sync.mode: mirror` — Step 3: ClickHouse support** ([#340](https://github.com/drt-hub/drt/issues/340) Step 3, follow-up to [#596](https://github.com/drt-hub/drt/pull/596) + [#597](https://github.com/drt-hub/drt/pull/597)): ClickHouse destination now supports `sync.mode: mirror` with the same application-side diff semantics as Postgres / MySQL — accumulate `upsert_key` tuples seen across all batches in `load()`, then issue a single mutation from `finalize_sync()` that removes destination rows whose key was not observed. ClickHouse uses `ALTER TABLE ... DELETE WHERE key NOT IN (collected)` with `mutations_sync=1` so the call blocks until the mutation completes. Unlike Postgres / MySQL where the placeholder shape had to be assembled manually, clickhouse_connect's native `{name:Type}` parameter substitution accepts `Array(String)` (single column) and `Array(Tuple(String, ...))` (composite) directly — so the call site is one parameter dict, not a flat placeholder list. Both column references and parameter values are coerced via `toString()` so the comparison works regardless of source column type (cost: the comparison can't use a column index on `upsert_key` — mirror mode is intended for small/medium reference tables, not high-volume fact tables; the temp-table strategy is a planned follow-up for high-cardinality cases). The mutation rewrites affected parts, which is expensive in ClickHouse — the new docstring notes this explicitly so misuse is hard. `ClickHouseDestinationConfig.upsert_key` is `list[str] | None` (it's informational only for the existing INSERT path, where dedup is handled by `ReplacingMergeTree` at merge time), so the runtime guard in `load()` raises `ValueError` early when mirror mode is requested without a populated key — fail-fast before any INSERT touches the table. Backtick-quoting for database-qualified table identifiers (`db.table` → `` `db`.`table` ``) added via a new `_quote_ident` helper, matching the v0.7.4-hardened MySQL pattern. New `tests/unit/test_clickhouse_mirror_mode.py` ships 12 tests covering key accumulation, dedupe across overlapping batches, database-qualified DELETE shape, single + composite key DELETE structure, the empty-source safety path, state reset, the missing-`upsert_key` ValueError, row-error skip path, and coexistence with the existing `EXCHANGE TABLES` swap-finalize path. Snowflake follows in the next PR.
- **`sync.mode: mirror` — Step 2: MySQL support** ([#340](https://github.com/drt-hub/drt/issues/340) Step 2, follow-up to [#596](https://github.com/drt-hub/drt/pull/596)): MySQL destination now supports `sync.mode: mirror` with the same application-side diff semantics as Postgres — accumulate `upsert_key` tuples seen across all batches in `load()`, then issue a single `DELETE WHERE key NOT IN (collected)` from `finalize_sync()`. Same safety paths as Step 1: empty-source short-circuit (`_mirror_keys` never populated → no DELETE issued, so a transient empty source can't wipe the table), state reset after `finalize_sync` runs, ignores rows that failed during upsert (only successfully-loaded keys count as "source state"), and a `ValueError` at load time when `upsert_key` is empty. Single-column and composite `upsert_key` both supported. Because pymysql does not auto-expand a tuple-of-tuples into a `NOT IN %s` parameter the way psycopg2 does, the DELETE is built with explicit `%s` placeholders — single-column form `WHERE \`c\` NOT IN (%s, %s, ...)` with a flat values list, composite form `WHERE (\`c1\`, \`c2\`) NOT IN ((%s, %s), (%s, %s), ...)` with the values flattened in row-major order. Backtick-quoting + schema-qualified table handling reuses the existing `_quote_ident` helper that v0.7.4 hardened. New `tests/unit/test_mysql_mirror_mode.py` ships 10 tests covering key accumulation, dedupe across overlapping batches, schema-qualified DELETE shape, single + composite key DELETE structure, the empty-source safety path, state reset, the missing-`upsert_key` ValueError, and coexistence with the existing swap-finalize path. ClickHouse and Snowflake follow in subsequent PRs.
- **`sync.mode: mirror` — differential delete for Postgres + CI test coverage** ([#340](https://github.com/drt-hub/drt/issues/340), PR [#596](https://github.com/drt-hub/drt/pull/596)): new sync mode that **upserts** every source row (same as `full`) and then **DELETEs** destination rows whose `upsert_key` tuple was not observed in the source — without the TRUNCATE / re-insert overhead of `replace` mode. Strategy: application-side diff (collect `upsert_key` tuples in memory during `load()`, then issue a single `DELETE WHERE key NOT IN (collected)` from `finalize_sync()`). Memory-bound to the source key cardinality; a temp-table strategy is a planned follow-up for tables larger than a few million rows. Safety: when the source produces no batches with records, `_mirror_keys` stays empty and `finalize_sync` skips the DELETE — a transient empty source won't wipe the destination. Single-column and composite `upsert_key` both supported (composite uses `(c1, c2) NOT IN ((v1a, v2a), …)` shape). Postgres only in Step 1; MySQL / ClickHouse / Snowflake follow in subsequent PRs. 10 unit tests cover key accumulation, dedupe across overlapping batches, single + composite key DELETE shape, the empty-source safety path, state reset after finalize, and coexistence with the existing swap-finalize path. Also extends CI's install line to include `[postgres,mysql,clickhouse]` so the new mirror tests + the existing 43 postgres + 33 mysql + 26 clickhouse tests actually **run** in CI rather than being silently skipped by their `pytest.importorskip` guards — the pre-existing coverage gap that this PR's codecov delta surfaced.
- **Destination contract tests — Step 2b: SQL destinations + empty-batch invariant** (PR [#595](https://github.com/drt-hub/drt/pull/595), follow-up to [#594](https://github.com/drt-hub/drt/pull/594)): closes out the empty-batch contract suite with the four SQL destinations (`postgres` / `mysql` / `clickhouse` / `snowflake`). Two parametrised contracts × 4 destinations = **8 tests**. Notable: no driver mocking required — CI's minimal install (`[dev,mcp,duckdb]`) deliberately excludes the SQL extras, and each destination uses lazy driver imports inside `load()` (after the `if not records: return SyncResult()` short-circuit). The implicit third contract is "no driver was ever imported": if a destination ever reaches its driver import on empty input, the test crashes with `ModuleNotFoundError` and surfaces the bug immediately. Combined with #593 (HTTP × 4) and #594 (file × 3 + parquet conditional), this locks empty-batch behaviour for 12 of drt's 24 destinations. Remaining (15 destinations, including the `StagedDestination` Protocol ones like Salesforce Bulk + Amazon Marketing Cloud) will follow once their per-protocol fixture shapes are designed.
Expand Down
145 changes: 140 additions & 5 deletions drt/destinations/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@
``CREATE TABLE ... AS ...``, INSERT into the shadow, then atomically
``EXCHANGE TABLES`` in :meth:`finalize_sync`).

Also supports ``sync.mode: mirror`` (#340 Step 3): INSERT every source
row, then in :meth:`finalize_sync` issue a single ``ALTER TABLE ...
DELETE WHERE <upsert_key> NOT IN (<observed>)`` mutation that removes
destination rows whose key was not in the source. The mutation runs
with ``mutations_sync=1`` so it completes before the call returns.
Mutations rewrite affected parts and are expensive — mirror mode is
appropriate for small/medium reference tables, not for high-volume
fact tables.

Requires: pip install drt-core[clickhouse]

Example sync YAML:
Expand Down Expand Up @@ -46,6 +55,12 @@ def __init__(self) -> None:
self._replace_truncated: bool = False
self._swap_shadow_created: bool = False
self._swap_table: str | None = None
# sync.mode: mirror (#340 Step 3) — accumulates upsert_key tuples seen
# across batches so finalize_sync can DELETE missing rows.
# ``None`` means mirror mode hasn't engaged yet (no batch with
# records); finalize_sync treats that as "skip DELETE" — safety
# against deleting everything when the source produced no data.
self._mirror_keys: list[tuple[Any, ...]] | None = None

def load(
self,
Expand Down Expand Up @@ -79,6 +94,15 @@ def load(
client.command(f"TRUNCATE TABLE {config.table}")
self._replace_truncated = True

# sync.mode: mirror (#340 Step 3) — validate upsert_key
# before any INSERT so a misconfigured sync fails fast
# rather than after partially populating the table.
if sync_options.mode == "mirror" and not config.upsert_key:
raise ValueError(
"sync.mode: mirror requires destination.upsert_key "
"(needed to identify which rows to DELETE)."
)

# TODO: batch insert with fallback to row-by-row on error
for i, record in enumerate(records):
try:
Expand All @@ -98,6 +122,24 @@ def load(
if sync_options.on_error == "fail":
return result
continue

# sync.mode: mirror (#340 Step 3) — accumulate upsert_key
# tuples for the finalize_sync DELETE pass. Only keys from
# successfully-loaded records are tracked (failed records
# don't count as "source state").
if sync_options.mode == "mirror":
assert config.upsert_key # guarded above
if self._mirror_keys is None:
self._mirror_keys = []
failed_indices = {
re.batch_index for re in result.row_errors
}
for idx, record in enumerate(records):
if idx in failed_indices:
continue
self._mirror_keys.append(
tuple(record.get(k) for k in config.upsert_key)
)
finally:
client.close()

Expand Down Expand Up @@ -161,12 +203,24 @@ def finalize_sync(
config: DestinationConfig,
sync_options: SyncOptions,
) -> SyncResult | None:
"""Atomic EXCHANGE: shadow contents become live; old data dropped.

After ``EXCHANGE TABLES original AND shadow``, the shadow table now
holds the OLD data, so we drop it. ``EXCHANGE TABLES`` is atomic in
ClickHouse 21.8+.
"""End-of-sync hook: EXCHANGE for swap-replace, ALTER DELETE for mirror.

- ``mode=replace, replace_strategy=swap``: atomic ``EXCHANGE TABLES``
(existing behaviour). After the exchange the shadow table holds the
OLD data, so we drop it. ``EXCHANGE TABLES`` is atomic in
ClickHouse 21.8+.
- ``mode=mirror`` (#340 Step 3): ``ALTER TABLE ... DELETE WHERE
<upsert_key> NOT IN (<observed>)`` mutation that removes
destination rows whose key was not in the source. Skipped if the
source produced no batches with records — treats "no observation"
as "don't delete anything" for safety.
"""
if sync_options.mode == "mirror":
result = self._finalize_mirror(config, sync_options)
# Reset mirror state regardless of result so a re-run starts fresh.
self._mirror_keys = None
return result

if not self._swap_shadow_created or self._swap_table is None:
return None

Expand All @@ -186,6 +240,87 @@ def finalize_sync(

return SyncResult()

def _finalize_mirror(
self,
config: DestinationConfig,
sync_options: SyncOptions,
) -> SyncResult | None:
"""``sync.mode: mirror`` end-of-sync DELETE pass (#340 Step 3).

Deletes destination rows whose ``upsert_key`` tuple is not in the
set of keys observed across all batches via an ``ALTER TABLE ...
DELETE`` mutation. Runs with ``mutations_sync=1`` so the call
blocks until the mutation finishes.

Uses clickhouse_connect's native ``{name:Type}`` parameter
substitution with ``Array(String)`` (single column) or
``Array(Tuple(String, ...))`` (composite). Both column references
and parameter values are coerced with ``toString()`` so the
comparison works regardless of the source column type — at the
cost of skipping any index on the upsert_key column. Mirror mode
is intended for small/medium reference tables where this is
acceptable; the temp-table strategy (#340 follow-up) targets the
high-cardinality case.

Returns ``None`` when ``_mirror_keys`` is empty or ``None`` —
treats "no batch with records was ever observed" as a signal to
skip the DELETE entirely, so a transient empty source doesn't
wipe the destination.
"""
assert isinstance(config, ClickHouseDestinationConfig)
if not self._mirror_keys:
return None

upsert_cols = config.upsert_key
assert upsert_cols # guarded in load()

# Dedupe to keep the IN list compact when batches overlap.
keys = list({tuple(k) for k in self._mirror_keys})
table_q = self._quote_ident(config.table)

client = self._connect(config)
try:
if len(upsert_cols) == 1:
col_q = f"`{upsert_cols[0]}`"
sql = (
f"ALTER TABLE {table_q} DELETE "
f"WHERE toString({col_q}) NOT IN {{keys:Array(String)}}"
)
params: dict[str, Any] = {
"keys": [str(k[0]) for k in keys]
}
else:
col_tuple = (
"(" + ", ".join(f"toString(`{c}`)" for c in upsert_cols) + ")"
)
tuple_type = "Tuple(" + ", ".join(["String"] * len(upsert_cols)) + ")"
sql = (
f"ALTER TABLE {table_q} DELETE "
f"WHERE {col_tuple} NOT IN {{keys:Array({tuple_type})}}"
)
params = {
"keys": [tuple(str(v) for v in k) for k in keys]
}
client.command(sql, parameters=params, settings={"mutations_sync": 1})
finally:
client.close()

# SyncResult has no dedicated `deleted` field; future work tracks
# this separately. Returning a bare SyncResult signals "finalize
# ran successfully" to the engine without inflating success/failed.
return SyncResult()

@staticmethod
def _quote_ident(table: str) -> str:
"""Backtick-quote a (possibly database-qualified) identifier.

``mydb.scores`` -> ``\\`mydb\\`.\\`scores\\```
``scores`` -> ``\\`scores\\```
"""
if "." in table:
return "`" + "`.`".join(table.split(".")) + "`"
return f"`{table}`"

def get_row_count(self, config: DestinationConfig) -> int:
"""Get the current row count from the destination table.

Expand Down
Loading
Loading