Skip to content

Commit 8c5244d

Browse files
authored
master(dm): support extra SHOW FULL TABLES columns from PolarDB-X (pingcap#12481)
close pingcap#12569
1 parent abbaf8d commit 8c5244d

File tree

12 files changed

+231
-26
lines changed

12 files changed

+231
-26
lines changed

dm/checker/check_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -863,7 +863,10 @@ func initMockDB(t *testing.T) sqlmock.Sqlmock {
863863
mock, err := conn.MockDefaultDBProvider()
864864
require.NoError(t, err)
865865
mock.ExpectQuery("SHOW DATABASES").WillReturnRows(sqlmock.NewRows([]string{"DATABASE"}).AddRow(schema))
866-
mock.ExpectQuery("SHOW FULL TABLES").WillReturnRows(sqlmock.NewRows([]string{"Tables_in_" + schema, "Table_type"}).AddRow(tb1, "BASE TABLE").AddRow(tb2, "BASE TABLE"))
866+
mock.ExpectQuery("SHOW FULL TABLES").WillReturnRows(
867+
sqlmock.NewRows([]string{"Tables_in_" + schema, "Table_type", "Auto_partition", "Table_group"}).
868+
AddRow(tb1, "BASE TABLE", "NO", "single_tg").
869+
AddRow(tb2, "BASE TABLE", "NO", "single_tg"))
867870
return mock
868871
}
869872

dm/master/openapi_view.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ func (s *Server) DMAPIGetSourceTableList(c *gin.Context, sourceName string, sche
391391
return
392392
}
393393
defer baseDB.Close()
394-
tableList, err := dbutil.GetTables(c.Request.Context(), baseDB.DB, schemaName)
394+
tableList, err := conn.GetTables(c.Request.Context(), baseDB.DB, schemaName)
395395
if err != nil {
396396
_ = c.Error(err)
397397
return

dm/master/openapi_view_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,8 @@ func (s *OpenAPIViewSuite) TestSourceAPI() {
706706
s.NoError(err)
707707
tableName := "CHARACTER_SETS"
708708
mockDB.ExpectQuery("SHOW FULL TABLES IN `information_schema` WHERE Table_Type != 'VIEW';").WillReturnRows(
709-
sqlmock.NewRows([]string{"Tables_in_information_schema", "Table_type"}).AddRow(tableName, "BASE TABLE"))
709+
sqlmock.NewRows([]string{"Tables_in_information_schema", "Table_type", "Auto_partition", "Table_group"}).
710+
AddRow(tableName, "BASE TABLE", "NO", "single_tg"))
710711
tableURL := fmt.Sprintf("%s/%s/schemas/%s", baseURL, source1.SourceName, schemaName)
711712
result = testutil.NewRequest().Get(tableURL).GoWithHTTPHandler(s.T(), s1.openapiHandles)
712713
s.Equal(http.StatusOK, result.Code())

dm/pkg/checker/onlineddl.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import (
1717
"context"
1818
"database/sql"
1919

20-
"github.com/pingcap/tidb/pkg/util/dbutil"
2120
"github.com/pingcap/tidb/pkg/util/filter"
21+
"github.com/pingcap/tiflow/dm/pkg/conn"
2222
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
2323
)
2424

@@ -47,7 +47,7 @@ func (c *OnlineDDLChecker) Check(ctx context.Context) *Result {
4747
}
4848

4949
for schema := range c.checkSchemas {
50-
tableList, err := dbutil.GetTables(ctx, c.db, schema)
50+
tableList, err := conn.GetTables(ctx, c.db, schema)
5151
if err != nil {
5252
markCheckError(r, err)
5353
return r

dm/pkg/conn/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ func FetchAllDoTables(ctx context.Context, db *BaseDB, bw *filter.Filter) (map[s
483483
schemaToTables := make(map[string][]string)
484484
for _, ftSchema := range ftSchemas {
485485
schema := ftSchema.Schema
486-
tables, err := dbutil.GetTables(ctx, db.DB, schema)
486+
tables, err := GetTables(ctx, db.DB, schema)
487487
if err != nil {
488488
return nil, terror.DBErrorAdapt(err, db.Scope, terror.ErrDBDriverError)
489489
}

dm/pkg/conn/db_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,55 @@ func TestGetRandomServerID(t *testing.T) {
8080
require.NotEqual(t, 101, serverID)
8181
}
8282

83+
func TestGetTables(t *testing.T) {
84+
t.Parallel()
85+
86+
db, mock, err := sqlmock.New()
87+
require.NoError(t, err)
88+
89+
schema := "test_db"
90+
tables := []string{"tbl1", "tbl2"}
91+
92+
rows := sqlmock.NewRows([]string{fmt.Sprintf("Tables_in_%s", schema), "Table_type"})
93+
addRowsForTables(rows, tables)
94+
mock.ExpectQuery(fmt.Sprintf("SHOW FULL TABLES IN `%s` WHERE Table_Type != 'VIEW'", schema)).WillReturnRows(rows)
95+
96+
got, err := GetTables(context.Background(), db, schema)
97+
require.NoError(t, err)
98+
require.Equal(t, tables, got)
99+
require.NoError(t, mock.ExpectationsWereMet())
100+
101+
rows = sqlmock.NewRows([]string{fmt.Sprintf("Tables_in_%s", schema), "Table_type", "Auto_partition", "Table_group"})
102+
addRowsForPolarDBXTables(rows, tables)
103+
mock.ExpectQuery(fmt.Sprintf("SHOW FULL TABLES IN `%s` WHERE Table_Type != 'VIEW'", schema)).WillReturnRows(rows)
104+
105+
got, err = GetTables(context.Background(), db, schema)
106+
require.NoError(t, err)
107+
require.Equal(t, tables, got)
108+
require.NoError(t, mock.ExpectationsWereMet())
109+
}
110+
111+
func TestGetTablesErrors(t *testing.T) {
112+
t.Parallel()
113+
114+
db, mock, err := sqlmock.New()
115+
require.NoError(t, err)
116+
117+
schema := "test_db"
118+
query := fmt.Sprintf("SHOW FULL TABLES IN `%s` WHERE Table_Type != 'VIEW'", schema)
119+
120+
mock.ExpectQuery(query).WillReturnError(errors.New("query failed"))
121+
_, err = GetTables(context.Background(), db, schema)
122+
require.ErrorContains(t, err, "query failed")
123+
require.NoError(t, mock.ExpectationsWereMet())
124+
125+
rows := sqlmock.NewRows([]string{fmt.Sprintf("Tables_in_%s", schema)}).AddRow("tbl1")
126+
mock.ExpectQuery(query).WillReturnRows(rows)
127+
_, err = GetTables(context.Background(), db, schema)
128+
require.ErrorContains(t, err, "unexpected SHOW FULL TABLES result column count 1")
129+
require.NoError(t, mock.ExpectationsWereMet())
130+
}
131+
83132
func TestGetMariaDBGtidDomainID(t *testing.T) {
84133
t.Parallel()
85134

@@ -571,6 +620,18 @@ func TestFetchAllDoTables(t *testing.T) {
571620
require.Len(t, got, 1)
572621
require.Equal(t, []string{"tbl1", "tbl2"}, got[doSchema])
573622
require.NoError(t, mock.ExpectationsWereMet())
623+
624+
rows = sqlmock.NewRows([]string{"Database"})
625+
addRowsForSchemas(rows, schemas)
626+
mock.ExpectQuery(`SHOW DATABASES`).WillReturnRows(rows)
627+
rows = sqlmock.NewRows([]string{fmt.Sprintf("Tables_in_%s", doSchema), "Table_type", "Auto_partition", "Table_group"})
628+
addRowsForPolarDBXTables(rows, tables)
629+
mock.ExpectQuery(fmt.Sprintf("SHOW FULL TABLES IN `%s` WHERE Table_Type != 'VIEW'", doSchema)).WillReturnRows(rows)
630+
got, err = FetchAllDoTables(context.Background(), NewBaseDBForTest(db), ba)
631+
require.NoError(t, err)
632+
require.Len(t, got, 1)
633+
require.Equal(t, []string{"tbl1", "tbl2"}, got[doSchema])
634+
require.NoError(t, mock.ExpectationsWereMet())
574635
}
575636

576637
func TestFetchTargetDoTables(t *testing.T) {
@@ -647,6 +708,12 @@ func addRowsForTables(rows *sqlmock.Rows, tables []string) {
647708
}
648709
}
649710

711+
func addRowsForPolarDBXTables(rows *sqlmock.Rows, tables []string) {
712+
for _, table := range tables {
713+
rows.AddRow(table, "BASE TABLE", "NO", "single_tg")
714+
}
715+
}
716+
650717
func TestGetGTIDMode(t *testing.T) {
651718
t.Parallel()
652719

dm/pkg/conn/table.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package conn
15+
16+
import (
17+
"context"
18+
"database/sql"
19+
"fmt"
20+
"strings"
21+
22+
"github.com/pingcap/errors"
23+
"github.com/pingcap/tidb/pkg/util/dbutil"
24+
)
25+
26+
// GetTables returns names of all non-view tables in the specified schema.
27+
// It is compatible with upstreams like PolarDB-X, whose SHOW FULL TABLES
28+
// may return extra columns after the standard table name and table type.
29+
func GetTables(ctx context.Context, db dbutil.QueryExecutor, schemaName string) ([]string, error) {
30+
query := fmt.Sprintf("SHOW FULL TABLES IN `%s` WHERE Table_Type != 'VIEW';", escapeName(schemaName))
31+
return queryTables(ctx, db, query)
32+
}
33+
34+
func queryTables(ctx context.Context, db dbutil.QueryExecutor, query string) ([]string, error) {
35+
rows, err := db.QueryContext(ctx, query)
36+
if err != nil {
37+
return nil, errors.Trace(err)
38+
}
39+
defer rows.Close()
40+
41+
columns, err := rows.Columns()
42+
if err != nil {
43+
return nil, errors.Trace(err)
44+
}
45+
if len(columns) < 2 {
46+
return nil, errors.Errorf("unexpected SHOW FULL TABLES result column count %d, expected at least 2", len(columns))
47+
}
48+
49+
tables := make([]string, 0, 8)
50+
for rows.Next() {
51+
var table, tableType sql.NullString
52+
values := make([]any, len(columns))
53+
values[0] = &table
54+
values[1] = &tableType
55+
for i := 2; i < len(values); i++ {
56+
values[i] = new(sql.RawBytes)
57+
}
58+
59+
if err := rows.Scan(values...); err != nil {
60+
return nil, errors.Trace(err)
61+
}
62+
if !table.Valid || !tableType.Valid {
63+
continue
64+
}
65+
66+
tables = append(tables, table.String)
67+
}
68+
69+
return tables, errors.Trace(rows.Err())
70+
}
71+
72+
func escapeName(name string) string {
73+
return strings.ReplaceAll(name, "`", "``")
74+
}

sync_diff_inspector/source/mysql_shard.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/pingcap/tidb/pkg/util/dbutil"
2828
"github.com/pingcap/tidb/pkg/util/filter"
2929
tableFilter "github.com/pingcap/tidb/pkg/util/table-filter"
30+
"github.com/pingcap/tiflow/dm/pkg/conn"
3031
"github.com/pingcap/tiflow/sync_diff_inspector/config"
3132
"github.com/pingcap/tiflow/sync_diff_inspector/source/common"
3233
"github.com/pingcap/tiflow/sync_diff_inspector/splitter"
@@ -351,7 +352,7 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*
351352
if filter.IsSystemSchema(strings.ToLower(schema)) {
352353
continue
353354
}
354-
allTables, err := dbutil.GetTables(ctx, sourceDB.Conn, schema)
355+
allTables, err := conn.GetTables(ctx, sourceDB.Conn, schema)
355356
if err != nil {
356357
return nil, errors.Annotatef(err, "get tables from %d source %s", i, schema)
357358
}

sync_diff_inspector/source/source.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/pingcap/tidb/pkg/util/filter"
2929
tableFilter "github.com/pingcap/tidb/pkg/util/table-filter"
3030
router "github.com/pingcap/tidb/pkg/util/table-router"
31+
"github.com/pingcap/tiflow/dm/pkg/conn"
3132
"github.com/pingcap/tiflow/sync_diff_inspector/config"
3233
"github.com/pingcap/tiflow/sync_diff_inspector/source/common"
3334
"github.com/pingcap/tiflow/sync_diff_inspector/splitter"
@@ -327,7 +328,7 @@ func initTables(ctx context.Context, cfg *config.Config) (cfgTables []*config.Ta
327328
if filter.IsSystemSchema(strings.ToLower(schema)) {
328329
continue
329330
}
330-
allTables, err := dbutil.GetTables(ctx, downStreamConn, schema)
331+
allTables, err := conn.GetTables(ctx, downStreamConn, schema)
331332
if err != nil {
332333
return nil, errors.Annotatef(err, "get tables from target source %s", schema)
333334
}

0 commit comments

Comments
 (0)