Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/importsdk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_test(
"//pkg/util/table-filter",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_ngaut_pools//:pools",
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
Expand Down
41 changes: 22 additions & 19 deletions pkg/importsdk/file_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,25 @@ type FileScanner interface {
}

type fileScanner struct {
sourcePath string
db *sql.DB
store storeapi.Storage
loader *mydump.MDLoader
logger log.Logger
config *SDKConfig
sourcePath string
Comment thread
GMHDBJD marked this conversation as resolved.
Outdated
redactedSourcePath string
db *sql.DB
store storeapi.Storage
loader *mydump.MDLoader
logger log.Logger
config *SDKConfig
}

// NewFileScanner creates a new FileScanner
func NewFileScanner(ctx context.Context, sourcePath string, db *sql.DB, cfg *SDKConfig) (FileScanner, error) {
redactedSourcePath := ast.RedactURL(sourcePath)
Comment thread
GMHDBJD marked this conversation as resolved.
u, err := objstore.ParseBackend(sourcePath, nil)
if err != nil {
return nil, errors.Annotatef(ErrParseStorageURL, "source=%s, err=%v", sourcePath, err)
return nil, errors.Annotatef(ErrParseStorageURL, "source=%s", redactedSourcePath)
Comment thread
GMHDBJD marked this conversation as resolved.
Outdated
}
store, err := objstore.New(ctx, u, &storeapi.Options{})
if err != nil {
return nil, errors.Annotatef(ErrCreateExternalStorage, "source=%s, err=%v", sourcePath, err)
return nil, errors.Annotatef(ErrCreateExternalStorage, "source=%s, err=%v", redactedSourcePath, err)
}

ldrCfg := mydump.LoaderConfig{
Expand All @@ -90,24 +92,25 @@ func NewFileScanner(ctx context.Context, sourcePath string, db *sql.DB, cfg *SDK
loader, err := mydump.NewLoaderWithStore(ctx, ldrCfg, store, loaderOptions...)
if err != nil {
if loader == nil || !errors.ErrorEqual(err, common.ErrTooManySourceFiles) {
return nil, errors.Annotatef(ErrCreateLoader, "source=%s, charset=%s, err=%v", sourcePath, cfg.charset, err)
return nil, errors.Annotatef(ErrCreateLoader, "source=%s, charset=%s, err=%v", redactedSourcePath, cfg.charset, err)
}
}

return &fileScanner{
sourcePath: sourcePath,
db: db,
store: store,
loader: loader,
logger: cfg.logger,
config: cfg,
sourcePath: sourcePath,
redactedSourcePath: redactedSourcePath,
db: db,
store: store,
loader: loader,
logger: cfg.logger,
config: cfg,
}, nil
}

func (s *fileScanner) CreateSchemasAndTables(ctx context.Context) error {
dbMetas := s.loader.GetDatabases()
if len(dbMetas) == 0 {
return errors.Annotatef(ErrNoDatabasesFound, "source=%s", s.sourcePath)
return errors.Annotatef(ErrNoDatabasesFound, "source=%s", s.redactedSourcePath)
}

// Create all schemas and tables
Expand All @@ -121,7 +124,7 @@ func (s *fileScanner) CreateSchemasAndTables(ctx context.Context) error {

err := importer.Run(ctx, dbMetas)
if err != nil {
return errors.Annotatef(ErrCreateSchema, "source=%s, db_count=%d, err=%v", s.sourcePath, len(dbMetas), err)
return errors.Annotatef(ErrCreateSchema, "source=%s, db_count=%d, err=%v", s.redactedSourcePath, len(dbMetas), err)
}

return nil
Expand Down Expand Up @@ -155,7 +158,7 @@ func (s *fileScanner) CreateSchemaAndTableByName(ctx context.Context, schema, ta
Tables: []*mydump.MDTableMeta{tblMeta},
}})
if err != nil {
return errors.Annotatef(ErrCreateSchema, "source=%s, schema=%s, table=%s, err=%v", s.sourcePath, schema, table, err)
return errors.Annotatef(ErrCreateSchema, "source=%s, schema=%s, table=%s, err=%v", s.redactedSourcePath, schema, table, err)
}

return nil
Expand Down Expand Up @@ -202,7 +205,7 @@ func (s *fileScanner) GetTotalSize(ctx context.Context) int64 {
func (s *fileScanner) EstimateImportDataSize(ctx context.Context) (*ImportDataSizeEstimate, error) {
dbMetas := s.loader.GetDatabases()
if len(dbMetas) == 0 {
return nil, errors.Annotatef(ErrNoDatabasesFound, "source=%s", s.sourcePath)
return nil, errors.Annotatef(ErrNoDatabasesFound, "source=%s", s.redactedSourcePath)
}

result := &ImportDataSizeEstimate{
Expand Down
57 changes: 57 additions & 0 deletions pkg/importsdk/file_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/parser/ast"
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -117,6 +120,60 @@ func TestFileScanner(t *testing.T) {
require.NoError(t, mock.ExpectationsWereMet())
})

t.Run("NewFileScannerRedactsSensitiveSourcePathInInitErrors", func(t *testing.T) {
Comment thread
GMHDBJD marked this conversation as resolved.
Outdated
_, err := NewFileScanner(
ctx,
"s3://?access-key=ak&secret-access-key=sk&session-token=token",
db,
cfg,
)
require.Error(t, err)
require.ErrorContains(t, err, "access-key=xxxxxx")
require.ErrorContains(t, err, "secret-access-key=xxxxxx")
require.ErrorContains(t, err, "session-token=xxxxxx")
require.NotContains(t, err.Error(), "access-key=ak")
require.NotContains(t, err.Error(), "secret-access-key=sk")
require.NotContains(t, err.Error(), "session-token=token")
})

t.Run("CreateSchemasAndTablesRedactsSensitiveSourcePathOnError", func(t *testing.T) {
invalidDir := t.TempDir()
require.NoError(t, os.WriteFile(filepath.Join(invalidDir, "db1-schema-create.sql"), []byte("CREATE DATABASE db1;"), 0o644))
require.NoError(t, os.WriteFile(
filepath.Join(invalidDir, "db1.t1-schema.sql"),
[]byte("CREATE TABLE t1 (id INT,);"),
0o644,
))

invalidDB, invalidMock, err := sqlmock.New()
require.NoError(t, err)
defer invalidDB.Close()

invalidScanner, err := NewFileScanner(ctx, "file://"+invalidDir, invalidDB, defaultSDKConfig())
require.NoError(t, err)
defer invalidScanner.Close()

fs := invalidScanner.(*fileScanner)
sourcePath := "s3://bucket/path?access-key=ak&secret-access-key=sk&session-token=token"
fs.sourcePath = sourcePath
fs.redactedSourcePath = ast.RedactURL(sourcePath)

invalidMock.ExpectQuery("SELECT SCHEMA_NAME FROM information_schema.SCHEMATA.*").WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"}))
invalidMock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF NOT EXISTS `db1`")).WillReturnResult(sqlmock.NewResult(0, 0))
invalidMock.ExpectQuery("SHOW CREATE TABLE `db1`.`t1`").WillReturnError(&dmysql.MySQLError{Number: tmysql.ErrNoSuchTable})

err = invalidScanner.CreateSchemasAndTables(ctx)
require.Error(t, err)
require.ErrorContains(t, err, "access-key=xxxxxx")
require.ErrorContains(t, err, "secret-access-key=xxxxxx")
require.ErrorContains(t, err, "session-token=xxxxxx")
require.NotContains(t, err.Error(), "access-key=ak")
require.NotContains(t, err.Error(), "secret-access-key=sk")
require.NotContains(t, err.Error(), "session-token=token")
require.ErrorContains(t, err, "invalid schema statement")
require.NoError(t, invalidMock.ExpectationsWereMet())
})

t.Run("CreateSchemaAndTableByName", func(t *testing.T) {
mock.ExpectQuery("SELECT SCHEMA_NAME FROM information_schema.SCHEMATA.*").WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"}))
mock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF NOT EXISTS `db1`")).WillReturnResult(sqlmock.NewResult(0, 0))
Expand Down
Loading