From 84ebf4febed297b08e55da417de9641458a44786 Mon Sep 17 00:00:00 2001 From: Godzilaa Date: Fri, 15 May 2026 00:06:09 +0530 Subject: [PATCH 1/2] fix(mysql): apply _quote_ident to all SQL paths Replace raw backtick-quoted table names with _quote_ident() in _load_replace, _build_insert_sql, _build_upsert_sql, and get_row_count so schema-qualified table names (e.g. mydb.scores) work in all modes. Closes #511 --- drt/destinations/mysql.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/drt/destinations/mysql.py b/drt/destinations/mysql.py index 706dabee..909c1cdb 100644 --- a/drt/destinations/mysql.py +++ b/drt/destinations/mysql.py @@ -140,13 +140,7 @@ def get_row_count(self, config: DestinationConfig) -> int: conn = self._connect(config) try: cur = conn.cursor() - # Escape table name with backticks for safety - escaped_table = ( - "`.`".join(config.table.split(".")) - if "." in config.table - else config.table - ) - cur.execute(f"SELECT COUNT(*) FROM `{escaped_table}`") + cur.execute(f"SELECT COUNT(*) FROM {self._quote_ident(config.table)}") row = cur.fetchone() return row[0] if row else 0 finally: @@ -187,7 +181,7 @@ def _load_replace( result = SyncResult() if not self._replace_truncated: - cur.execute(f"TRUNCATE TABLE `{table}`") + cur.execute(f"TRUNCATE TABLE {self._quote_ident(table)}") self._replace_truncated = True sql = self._build_insert_sql(table, columns) @@ -350,7 +344,7 @@ def _build_insert_sql(table: str, columns: list[str]) -> str: """Build plain INSERT SQL (no conflict handling).""" cols_str = ", ".join(f"`{c}`" for c in columns) placeholders = ", ".join(["%s"] * len(columns)) - return f"INSERT INTO `{table}` ({cols_str}) VALUES ({placeholders})" + return f"INSERT INTO {MySQLDestination._quote_ident(table)} ({cols_str}) VALUES ({placeholders})" @staticmethod def _build_upsert_sql( @@ -365,11 +359,11 @@ def _build_upsert_sql( if update_cols: set_clause = ", ".join(f"`{c}` = VALUES(`{c}`)" for c in update_cols) return ( - f"INSERT INTO `{table}` ({cols_str}) VALUES ({placeholders}) " + f"INSERT INTO {MySQLDestination._quote_ident(table)} ({cols_str}) VALUES ({placeholders}) " f"ON DUPLICATE KEY UPDATE {set_clause}" ) # All columns are part of the key — just ignore duplicates - return f"INSERT IGNORE INTO `{table}` ({cols_str}) VALUES ({placeholders})" + return f"INSERT IGNORE INTO {MySQLDestination._quote_ident(table)} ({cols_str}) VALUES ({placeholders})" @staticmethod def _connect(config: MySQLDestinationConfig) -> Any: From b36fe8151b6b35a77d11c4c3ebddd64d51d1756d Mon Sep 17 00:00:00 2001 From: Pratik v Patil Date: Sun, 17 May 2026 14:31:51 +0530 Subject: [PATCH 2/2] Address review feedback: ruff line-length fix, regression tests, CHANGELOG --- CHANGELOG.md | 4 ++ drt/destinations/mysql.py | 8 ++-- tests/unit/test_mysql_destination.py | 62 ++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a56b94b9..8c47a7a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **`ConnectionTestable` destination protocol** (#495, PR #497): New runtime-checkable `Protocol` in `drt/destinations/base.py` formalizing the `test_connection()` method introduced by #484. `drt validate --check-connection` now dispatches via `isinstance(dest, ConnectionTestable)` instead of dynamic `getattr`, giving destination implementers (including third-party ones) a typed contract. PostgreSQL / MySQL / ClickHouse / Snowflake destinations are declared `ConnectionTestable`; non-SQL destinations correctly fall through to the skip path. Contributed by @Photon101. - **Postgres: qualified `schema.table` identifiers now safely composed** (#442, PR #498): follow-up to PR #452 / PR #485. The row-count, replace, swap, finalize, insert, and upsert SQL paths previously passed `f"{schema}.{table}"` style strings through `psycopg2.sql.Identifier()`, which double-quoted the entire dotted name into a single identifier (`"marketing.email_events"`). The fix splits qualified names into separate schema and relation `Identifier()` components, while keeping swap shadow/old suffixes attached to the relation name only (so `marketing.email_events` becomes `marketing."email_events__drt_swap"`, not a single quoted identifier). Contributed by @Photon101. +### Fixed + +- **MySQL destination correctly quotes schema-qualified table names** (#511): `mydb.scores` now produces `` `mydb`.`scores` `` across replace, insert, and upsert paths (was treated as a single identifier). + ## [0.7.2] - 2026-05-11 **Theme: Production Ready follow-up #2.** Opt-in anonymous telemetry, deprecation warnings in `drt validate`, Postgres `psycopg2.sql` hardening — closing out the v0.7 cycle items that didn't make v0.7.1. diff --git a/drt/destinations/mysql.py b/drt/destinations/mysql.py index 909c1cdb..e1c26923 100644 --- a/drt/destinations/mysql.py +++ b/drt/destinations/mysql.py @@ -344,7 +344,8 @@ def _build_insert_sql(table: str, columns: list[str]) -> str: """Build plain INSERT SQL (no conflict handling).""" cols_str = ", ".join(f"`{c}`" for c in columns) placeholders = ", ".join(["%s"] * len(columns)) - return f"INSERT INTO {MySQLDestination._quote_ident(table)} ({cols_str}) VALUES ({placeholders})" + table_q = MySQLDestination._quote_ident(table) + return f"INSERT INTO {table_q} ({cols_str}) VALUES ({placeholders})" @staticmethod def _build_upsert_sql( @@ -355,15 +356,16 @@ def _build_upsert_sql( """Build INSERT ... ON DUPLICATE KEY UPDATE SQL.""" cols_str = ", ".join(f"`{c}`" for c in columns) placeholders = ", ".join(["%s"] * len(columns)) + table_q = MySQLDestination._quote_ident(table) if update_cols: set_clause = ", ".join(f"`{c}` = VALUES(`{c}`)" for c in update_cols) return ( - f"INSERT INTO {MySQLDestination._quote_ident(table)} ({cols_str}) VALUES ({placeholders}) " + f"INSERT INTO {table_q} ({cols_str}) VALUES ({placeholders}) " f"ON DUPLICATE KEY UPDATE {set_clause}" ) # All columns are part of the key — just ignore duplicates - return f"INSERT IGNORE INTO {MySQLDestination._quote_ident(table)} ({cols_str}) VALUES ({placeholders})" + return f"INSERT IGNORE INTO {table_q} ({cols_str}) VALUES ({placeholders})" @staticmethod def _connect(config: MySQLDestinationConfig) -> Any: diff --git a/tests/unit/test_mysql_destination.py b/tests/unit/test_mysql_destination.py index 1d6d2728..246272cd 100644 --- a/tests/unit/test_mysql_destination.py +++ b/tests/unit/test_mysql_destination.py @@ -102,6 +102,24 @@ def test_all_columns_are_key_uses_insert_ignore(self) -> None: assert "INSERT IGNORE INTO" in sql assert "ON DUPLICATE KEY" not in sql + def test_qualified_table_on_duplicate_key(self) -> None: + sql = MySQLDestination._build_upsert_sql( + table="mydb.scores", + columns=["id", "score"], + update_cols=["score"], + ) + assert "INSERT INTO `mydb`.`scores`" in sql + assert "`mydb.scores`" not in sql + + def test_qualified_table_all_key_uses_insert_ignore(self) -> None: + sql = MySQLDestination._build_upsert_sql( + table="mydb.lookup", + columns=["id"], + update_cols=[], + ) + assert "INSERT IGNORE INTO `mydb`.`lookup`" in sql + assert "`mydb.lookup`" not in sql + # --------------------------------------------------------------------------- # Load behavior @@ -204,6 +222,24 @@ def test_dict_and_list_values_are_json_serialized(self, mock_connect: MagicMock) assert values[3] == '["a", "b"]' assert values[4] == 0.9 + @patch("drt.destinations.mysql.MySQLDestination._connect") + def test_get_row_count_with_qualified_table( + self, mock_connect: MagicMock + ) -> None: + conn = _fake_connection() + cur = conn.cursor() + cur.fetchone.return_value = (42,) + mock_connect.return_value = conn + + count = MySQLDestination().get_row_count( + _config(table="mydb.learning_profiles") + ) + + assert count == 42 + sql = cur.execute.call_args.args[0] + assert "`mydb`.`learning_profiles`" in sql + assert "`mydb.learning_profiles`" not in sql + @patch("drt.destinations.mysql.MySQLDestination._connect") def test_connection_closed_on_error(self, mock_connect: MagicMock) -> None: conn = _fake_connection() @@ -233,6 +269,14 @@ def test_basic_insert(self) -> None: assert "ON DUPLICATE KEY" not in sql assert "VALUES (%s, %s, %s)" in sql + def test_qualified_insert_sql(self) -> None: + sql = MySQLDestination._build_insert_sql( + table="mydb.scores", + columns=["id", "score"], + ) + assert "INSERT INTO `mydb`.`scores`" in sql + assert "`mydb.scores`" not in sql + class TestMySQLReplaceMode: @patch("drt.destinations.mysql.MySQLDestination._connect") @@ -295,6 +339,24 @@ def test_replace_uses_plain_insert(self, mock_connect: MagicMock) -> None: assert "ON DUPLICATE KEY" not in insert_sql assert "INSERT INTO" in insert_sql + @patch("drt.destinations.mysql.MySQLDestination._connect") + def test_replace_uses_qualified_identifier( + self, mock_connect: MagicMock + ) -> None: + conn = _fake_connection() + cur = conn.cursor() + mock_connect.return_value = conn + + MySQLDestination().load( + [{"user_id": 1, "company_id": 5, "score": 0.5}], + _config(table="mydb.learning_profiles"), + _options(mode="replace"), + ) + + sqls = [c[0][0] for c in cur.execute.call_args_list] + assert any("TRUNCATE TABLE `mydb`.`learning_profiles`" in s for s in sqls) + assert any("INSERT INTO `mydb`.`learning_profiles`" in s for s in sqls) + @patch("drt.destinations.mysql.MySQLDestination._connect") def test_replace_serializes_json_values(self, mock_connect: MagicMock) -> None: conn = _fake_connection()