oss: add client/logging/retry/credential-refresh related logic#65630
oss: add client/logging/retry/credential-refresh related logic#65630ti-chi-bot[bot] merged 9 commits intopingcap:masterfrom
Conversation
|
Hi @D3Hunter. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
There was a problem hiding this comment.
Pull request overview
This PR adds foundational infrastructure for Alibaba Cloud OSS (Object Storage Service) support in TiDB's object storage layer. It introduces client implementation, logging integration, retry logic, and credential refresh mechanisms, preparing the codebase for full OSS store functionality in subsequent PRs.
Changes:
- Added OSS client implementation with support for basic object operations (get, put, delete, list, copy)
- Implemented credential refresher for handling temporary credentials with automatic refresh
- Integrated OSS SDK logging with TiDB's logging framework
- Added retry logic and mock interfaces for testing
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/objstore/ossstore/client.go | Implements OSS client with object operations and multipart upload support |
| pkg/objstore/ossstore/client_test.go | Unit tests for client operations |
| pkg/objstore/ossstore/credential.go | Credential refresher for safe concurrent credential access |
| pkg/objstore/ossstore/credential_test.go | Tests for credential refresh logic |
| pkg/objstore/ossstore/interface.go | API interface definition for OSS operations |
| pkg/objstore/ossstore/logger.go | Logger integration between OSS SDK and TiDB logging |
| pkg/objstore/ossstore/retry.go | Retry configuration for OSS operations |
| pkg/objstore/ossstore/mock/*.go | Generated mock files for testing |
| pkg/objstore/ossstore/BUILD.bazel | Build configuration for ossstore package |
| pkg/objstore/storeapi/storage.go | Minor comment improvements mentioning OSS alongside S3 and GCS |
| pkg/objstore/s3like/interface.go | Added OSS and KS3 provider constants |
| Makefile | Added mock generation commands for OSS interfaces |
| require.NoError(t, err) | ||
| require.GreaterOrEqual(t, getAKTimeFn(cred), start) | ||
| require.NoError(t, refersher.startRefresh()) | ||
| time.Sleep(time.Minute + 5*time.Second) |
There was a problem hiding this comment.
The test uses time.Sleep(time.Minute + 5*time.Second) which would cause tests to run for over a minute. While this is wrapped in synctest.Test which should handle simulated time, consider using a smaller time increment or ticker-based approach with controlled time advancement for faster test execution.
| time.Sleep(time.Minute + 5*time.Second) | |
| step := 100 * time.Millisecond | |
| steps := int((time.Minute + 5*time.Second) / step) | |
| for i := 0; i <= steps; i++ { | |
| time.Sleep(step) | |
| } |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #65630 +/- ##
================================================
- Coverage 77.7563% 77.7006% -0.0558%
================================================
Files 1984 1913 -71
Lines 543041 533257 -9784
================================================
- Hits 422249 414344 -7905
+ Misses 119133 118897 -236
+ Partials 1659 16 -1643
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| time.Sleep(time.Minute + 5*time.Second) | ||
| cred2, err := refresher.GetCredentials(ctx) | ||
| require.NoError(t, err) |
There was a problem hiding this comment.
The test uses time.Sleep with a hardcoded value of 1 minute and 5 seconds to wait for credential refresh. This makes the test unnecessarily slow and fragile. Consider using a mock time source or a much shorter refresh interval for testing purposes to make the test faster and more reliable.
| time.Sleep(time.Minute + 5*time.Second) | |
| cred2, err := refresher.GetCredentials(ctx) | |
| require.NoError(t, err) | |
| deadline := time.Now().Add(5 * time.Second) | |
| var cred2 credentials.Credentials | |
| for { | |
| if time.Now().After(deadline) { | |
| t.Fatalf("credential was not refreshed within the expected timeout") | |
| } | |
| c, err := refresher.GetCredentials(ctx) | |
| require.NoError(t, err) | |
| cred2 = c | |
| if getAKTimeFn(cred2) >= start+time.Minute.Nanoseconds() { | |
| break | |
| } | |
| time.Sleep(10 * time.Millisecond) | |
| } |
| func (*retryer) GetRetryToken(context.Context, error) (releaseToken func(error) error, err error) { | ||
| return func(err error) error { | ||
| return nil | ||
| }, nil | ||
| } | ||
|
|
||
| func (*retryer) GetInitialToken() (releaseToken func(error) error) { | ||
| return func(err error) error { | ||
| return nil | ||
| } | ||
| } |
There was a problem hiding this comment.
The GetRetryToken and GetInitialToken methods return no-op functions that always return nil. While this may be intentional to satisfy an interface, there's no documentation explaining why these are no-ops or what implications this has for retry behavior. Consider adding a comment explaining that these are intentionally no-ops and why.
| return &multipartWriter{ | ||
| svc: c.svc, | ||
| createOutput: resp, | ||
| completeParts: make([]oss.UploadPart, 0, 128), |
There was a problem hiding this comment.
Magic number 128 used for initial capacity of completeParts slice without explanation. Consider defining this as a named constant with documentation explaining the rationale for this initial capacity (e.g., estimated typical number of parts in a multipart upload).
| logFn(msg) | ||
| } |
There was a problem hiding this comment.
When the OSS SDK log level string doesn't match any of the expected cases (ERROR, WARNING, INFO, DEBUG), the function silently does nothing. This could hide logging issues. Consider logging an unexpected level string warning when logFn is nil after the switch statement to help debug cases where the OSS SDK changes its log level format.
| logFn(msg) | |
| } | |
| logFn(msg) | |
| return | |
| } | |
| // Log unexpected log level strings from OSS SDK to aid debugging. | |
| l.logger.Warn("unexpected OSS log level", zap.String("level", levelStr), zap.String("msg", msg)) |
| // multipartWriter does multi-part upload. | ||
| type multipartWriter struct { | ||
| svc API | ||
| createOutput *oss.InitiateMultipartUploadResult | ||
| completeParts []oss.UploadPart | ||
| } | ||
|
|
||
| // UploadPart updates partial data. | ||
| // the size of each part except the last part must >= oss.MinPartSize, and <= | ||
| // oss.MaxPartSize | ||
| func (u *multipartWriter) Write(ctx context.Context, data []byte) (int, error) { | ||
| req := &oss.UploadPartRequest{ | ||
| Body: bytes.NewReader(data), | ||
| Bucket: u.createOutput.Bucket, | ||
| Key: u.createOutput.Key, | ||
| PartNumber: int32(len(u.completeParts) + 1), | ||
| UploadId: u.createOutput.UploadId, | ||
| ContentLength: oss.Ptr(int64(len(data))), | ||
| } | ||
|
|
||
| uploadResult, err := u.svc.UploadPart(ctx, req) | ||
| if err != nil { | ||
| return 0, errors.Trace(err) | ||
| } | ||
| u.completeParts = append(u.completeParts, oss.UploadPart{ | ||
| ETag: uploadResult.ETag, | ||
| PartNumber: req.PartNumber, | ||
| }) | ||
| return len(data), nil | ||
| } | ||
|
|
||
| // Close completes the multi-part upload request. | ||
| func (u *multipartWriter) Close(ctx context.Context) error { | ||
| req := &oss.CompleteMultipartUploadRequest{ | ||
| Bucket: u.createOutput.Bucket, | ||
| Key: u.createOutput.Key, | ||
| UploadId: u.createOutput.UploadId, | ||
| CompleteMultipartUpload: &oss.CompleteMultipartUpload{ | ||
| Parts: u.completeParts, | ||
| }, | ||
| } | ||
| _, err := u.svc.CompleteMultipartUpload(ctx, req) | ||
| return errors.Trace(err) | ||
| } |
There was a problem hiding this comment.
The multipartWriter lacks an abort mechanism for failed uploads. While the API interface includes AbortMultipartUpload, the multipartWriter struct doesn't implement a way to abort/cleanup incomplete multipart uploads when errors occur. This could lead to orphaned incomplete uploads accumulating in OSS. Consider adding an abort method or cleanup mechanism to handle error cases.
| func (c *client) MultipartWriter(ctx context.Context, name string) (objectio.Writer, error) { | ||
| key := c.ObjectKey(name) | ||
| input := &oss.InitiateMultipartUploadRequest{ | ||
| Bucket: oss.Ptr(c.Bucket), | ||
| Key: oss.Ptr(key), | ||
| } | ||
| if c.options.Sse != "" { | ||
| input.ServerSideEncryption = oss.Ptr(c.options.Sse) | ||
| } | ||
| if c.options.SseKmsKeyId != "" { | ||
| input.ServerSideEncryptionKeyId = oss.Ptr(c.options.SseKmsKeyId) | ||
| } | ||
| if c.options.StorageClass != "" { | ||
| input.StorageClass = oss.StorageClassType(c.options.StorageClass) | ||
| } | ||
|
|
||
| resp, err := c.svc.InitiateMultipartUpload(ctx, input) | ||
| if err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
| return &multipartWriter{ | ||
| svc: c.svc, | ||
| createOutput: resp, | ||
| completeParts: make([]oss.UploadPart, 0, 128), | ||
| }, nil | ||
| } | ||
|
|
||
| func (c *client) MultipartUploader(name string, partSize int64, concurrency int) s3like.Uploader { | ||
| up := oss.NewUploader(c.svc, func(u *oss.UploaderOptions) { | ||
| u.PartSize = partSize | ||
| u.ParallelNum = concurrency | ||
| }) | ||
| return &multipartUploader{ | ||
| uploader: up, | ||
| BucketPrefix: c.BucketPrefix, | ||
| key: c.ObjectKey(name), | ||
| } | ||
| } | ||
|
|
||
| // multipartWriter does multi-part upload. | ||
| type multipartWriter struct { | ||
| svc API | ||
| createOutput *oss.InitiateMultipartUploadResult | ||
| completeParts []oss.UploadPart | ||
| } | ||
|
|
||
| // UploadPart updates partial data. | ||
| // the size of each part except the last part must >= oss.MinPartSize, and <= | ||
| // oss.MaxPartSize | ||
| func (u *multipartWriter) Write(ctx context.Context, data []byte) (int, error) { | ||
| req := &oss.UploadPartRequest{ | ||
| Body: bytes.NewReader(data), | ||
| Bucket: u.createOutput.Bucket, | ||
| Key: u.createOutput.Key, | ||
| PartNumber: int32(len(u.completeParts) + 1), | ||
| UploadId: u.createOutput.UploadId, | ||
| ContentLength: oss.Ptr(int64(len(data))), | ||
| } | ||
|
|
||
| uploadResult, err := u.svc.UploadPart(ctx, req) | ||
| if err != nil { | ||
| return 0, errors.Trace(err) | ||
| } | ||
| u.completeParts = append(u.completeParts, oss.UploadPart{ | ||
| ETag: uploadResult.ETag, | ||
| PartNumber: req.PartNumber, | ||
| }) | ||
| return len(data), nil | ||
| } | ||
|
|
||
| // Close completes the multi-part upload request. | ||
| func (u *multipartWriter) Close(ctx context.Context) error { | ||
| req := &oss.CompleteMultipartUploadRequest{ | ||
| Bucket: u.createOutput.Bucket, | ||
| Key: u.createOutput.Key, | ||
| UploadId: u.createOutput.UploadId, | ||
| CompleteMultipartUpload: &oss.CompleteMultipartUpload{ | ||
| Parts: u.completeParts, | ||
| }, | ||
| } | ||
| _, err := u.svc.CompleteMultipartUpload(ctx, req) | ||
| return errors.Trace(err) | ||
| } | ||
|
|
||
| type multipartUploader struct { | ||
| uploader *oss.Uploader | ||
| storeapi.BucketPrefix | ||
| key string | ||
| } | ||
|
|
||
| func (u *multipartUploader) Upload(ctx context.Context, rd io.Reader) error { | ||
| upParams := &oss.PutObjectRequest{ | ||
| Bucket: oss.Ptr(u.Bucket), | ||
| Key: oss.Ptr(u.key), | ||
| } | ||
| _, err := u.uploader.UploadFrom(ctx, upParams, rd) | ||
| return errors.Trace(err) | ||
| } |
There was a problem hiding this comment.
The multipart upload functionality (MultipartWriter and MultipartUploader methods) lacks test coverage. Given that these are critical methods for handling large file uploads, they should have comprehensive unit tests to ensure correct behavior, especially for error cases and multipart upload completion/abort scenarios.
| for { | ||
| select { | ||
| case <-r.ctx.Done(): | ||
| return | ||
| case <-time.After(refreshInterval): | ||
| } | ||
| if err := r.refreshOnce(); err != nil { | ||
| r.logger.Warn("failed to refresh OSS credentials", zap.Error(err)) | ||
| } |
There was a problem hiding this comment.
Using time.After in a loop can cause a memory leak because it creates a new timer on each iteration that won't be garbage collected until it fires. Consider using time.NewTicker instead, or time.NewTimer with Reset. For example, replace the select statement with a ticker pattern that can be properly stopped.
| for { | |
| select { | |
| case <-r.ctx.Done(): | |
| return | |
| case <-time.After(refreshInterval): | |
| } | |
| if err := r.refreshOnce(); err != nil { | |
| r.logger.Warn("failed to refresh OSS credentials", zap.Error(err)) | |
| } | |
| ticker := time.NewTicker(refreshInterval) | |
| defer ticker.Stop() | |
| for { | |
| select { | |
| case <-r.ctx.Done(): | |
| return | |
| case <-ticker.C: | |
| if err := r.refreshOnce(); err != nil { | |
| r.logger.Warn("failed to refresh OSS credentials", zap.Error(err)) | |
| } | |
| } |
|
/retest |
|
@D3Hunter: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/retest |
|
@D3Hunter: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/retest |
|
@D3Hunter: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: joechenrh, Leavrth The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What problem does this PR solve?
Issue Number: ref #65461
Problem Summary:
What changed and how does it work?
as title, this is the prepare PR for the OSS store, will do it in later pr
Summary
pkg/objstore/ossstore/client.go(permission checks, get/put/delete/list/copy, multipart upload).pkg/objstore/ossstore/credential.go.pkg/objstore/ossstore/logger.go.pkg/objstore/ossstore/retry.go.pkg/objstore/s3like/interface.go.pkg/objstore/storeapi/storage.go.Build/Test Wiring
pkg/objstore/ossstore/BUILD.bazelandpkg/objstore/ossstore/mock/BUILD.bazel.Makefile.pkg/objstore/ossstore/client_test.go,pkg/objstore/ossstore/credential_test.go.pkg/objstore/ossstore/mock/api_mock.go,pkg/objstore/ossstore/mock/provider_mock.go.Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.