lightning: enhance import into backend for dm#65473
lightning: enhance import into backend for dm#65473ti-chi-bot[bot] merged 13 commits intopingcap:masterfrom
Conversation
|
Hi @GMHDBJD. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #65473 +/- ##
================================================
+ Coverage 77.8399% 78.3966% +0.5567%
================================================
Files 1983 1914 -69
Lines 542760 532288 -10472
================================================
- Hits 422484 417296 -5188
+ Misses 118616 114556 -4060
+ Partials 1660 436 -1224
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
/hold Progress regression detected. |
a0a82e5 to
336ffa4
Compare
…riod and progress tracking - Added cancellation grace period and polling interval for job cancellation by group key. - Updated JobOrchestrator to handle job submission errors gracefully, ensuring submitted jobs are cancelled if submission fails. - Introduced job progress estimator to calculate job progress based on processed and total sizes. - Implemented logging redaction for sensitive information in SQL commands during job submission. - Added tests for job progress estimation, job submission error handling, and job cancellation scenarios. - Created a new ProgressUpdater interface to facilitate progress updates during job execution.
336ffa4 to
de16341
Compare
| } | ||
|
|
||
| if err := o.recordSubmission(egCtx, job); err != nil { | ||
| cpCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cancelTimeout) |
There was a problem hiding this comment.
what's the difference between WithoutCancel and background ctx?
and why we have to add a timeout for recording submission?
There was a problem hiding this comment.
WithoutCancel will retain context values like logging and tracing, change to background to avoid confuse.
There was a problem hiding this comment.
Timeout is to avoid the import hang due to network/db/lock, so I add a timeout here.
| return &jobProgressEstimator{logger: logger} | ||
| } | ||
|
|
||
| func (e *jobProgressEstimator) parseHumanSize(jobID int64, sizeText string, warnMsg string) (int64, bool) { |
There was a problem hiding this comment.
we can avoid those pause if we return the number directly in SHOW RAW IMPORT JOB as a JSON field
we can enhance this later
There was a problem hiding this comment.
Pull request overview
This PR enhances the import-into backend for DM (Data Migration) by adding robust job management capabilities. It introduces progress tracking, secure logging with credential redaction, graceful error handling during job submission, and failover-aware cancellation support.
Changes:
- Added job progress estimator to calculate and report import progress based on job phases and step completion
- Implemented SQL redaction to hide sensitive credentials (access keys, secret keys) in logs
- Enhanced job orchestrator with graceful cancellation including grace period and polling for late-appearing jobs
- Added failover-aware cancellation that preserves running jobs during DM worker transitions
- Fixed MySQL checkpoint schema bug by removing redundant db_name column and using composite table_name
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| lightning/pkg/importinto/job_progress.go | Implements progress estimation logic for global-sort and non-global-sort import phases |
| lightning/pkg/importinto/job_progress_test.go | Tests for progress calculation with various job states and phase transitions |
| lightning/pkg/server/lightning.go | Integrates progress adapter to bridge LightningStatus with ProgressUpdater interface |
| lightning/pkg/importinto/job_submitter.go | Adds SQL redaction using regex to mask credentials in cloud storage URLs |
| lightning/pkg/importinto/job_submitter_test.go | Validates that sensitive credentials are properly redacted in logs |
| lightning/pkg/importinto/job_orchestrator.go | Enhances error handling with automatic cancellation of submitted jobs on failure and adds grace period polling |
| lightning/pkg/importinto/job_orchestrator_test.go | Tests cancellation scenarios including late-appearing jobs and submission errors |
| lightning/pkg/importinto/job_monitor.go | Integrates progress estimator and removes inline cancellation logic (delegated to orchestrator) |
| lightning/pkg/importinto/job_monitor_test.go | Updates tests to verify progress tracking and non-rollback behavior |
| lightning/pkg/importinto/importer.go | Adds failover cancellation support via ErrFailoverCancel to preserve jobs during transitions |
| lightning/pkg/importinto/importer_test.go | Tests both user-initiated and failover-triggered cancellation paths |
| lightning/pkg/importinto/checkpoint.go | Fixes schema by removing db_name column and making table_name the sole primary key |
| lightning/pkg/importinto/BUILD.bazel | Adds new dependencies and increases test shard count for new test files |
| lightning/pkg/importinto/mock/import_mock.go | Adds MockProgressUpdater for testing progress tracking |
| Makefile | Updates mock generation command to include ProgressUpdater interface |
| createTableSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s ( | ||
| db_name VARCHAR(64) NOT NULL, | ||
| table_name VARCHAR(64) NOT NULL, | ||
| table_name VARCHAR(256) NOT NULL, | ||
| job_id BIGINT NOT NULL, | ||
| status TINYINT NOT NULL, | ||
| message TEXT, | ||
| group_key VARCHAR(128), | ||
| update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, | ||
| PRIMARY KEY (db_name, table_name) | ||
| PRIMARY KEY (table_name) | ||
| )`, common.EscapeIdentifier(m.schemaName), common.EscapeIdentifier(m.tableName)) |
There was a problem hiding this comment.
The checkpoint table schema has been changed from having separate db_name and table_name columns with a composite primary key to a single table_name column. However, the CREATE TABLE IF NOT EXISTS statement won't alter existing tables. If users upgrade from a previous version with the old checkpoint schema, they'll encounter issues because the code expects the new schema but the existing table has the old schema. Consider adding migration logic or documenting this as a breaking change that requires manual checkpoint table recreation.
Replace local clamp01 helper with mathutil.Clamp.
| createTableSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s ( | ||
| db_name VARCHAR(64) NOT NULL, | ||
| table_name VARCHAR(64) NOT NULL, | ||
| table_name VARCHAR(256) NOT NULL, |
There was a problem hiding this comment.
The table_name column is being changed from VARCHAR(64) to VARCHAR(256), which may not be sufficient for all cases. The UniqueTable function formats table names as backtick-quoted identifiers in the form `schema`.`table`.
In MySQL, identifiers can be up to 64 characters each, so a fully qualified table name with backticks would be: `{64 chars}`.`{64 chars}` = 64 + 1 + 64 + 4 backticks = 133 characters minimum. However, backticks within identifiers are escaped as double backticks, so in the worst case (all 64 characters are backticks), you could have: `{128 chars}`.`{128 chars}` = 128 + 1 + 128 + 4 = 261 characters, which exceeds VARCHAR(256).
Consider increasing the column size to VARCHAR(300) or VARCHAR(512) to safely accommodate escaped identifiers.
| table_name VARCHAR(256) NOT NULL, | |
| table_name VARCHAR(512) NOT NULL, |
|
/retest |
|
@GMHDBJD: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: Benjamin2037, D3Hunter, joechenrh The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/retest |
|
/unhold |
|
/retest |
5 similar comments
|
/retest |
|
/retest |
|
/retest |
|
/retest |
|
/retest |
|
@GMHDBJD: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/retest |
|
@GMHDBJD: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/retest |
|
@GMHDBJD: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
What problem does this PR solve?
Issue Number: ref #65092
Problem Summary:
What changed and how does it work?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.