Skip to content

Commit 5817ce3

Browse files
committed
oss store
1 parent 9a5b4bc commit 5817ce3

File tree

5 files changed

+557
-3
lines changed

5 files changed

+557
-3
lines changed

pkg/objstore/ossstore/store.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ossstore
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"net/http"
21+
"strings"
22+
"time"
23+
24+
"github.com/alibabacloud-go/tea/tea"
25+
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
26+
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
27+
"github.com/aliyun/credentials-go/credentials/providers"
28+
"github.com/pingcap/errors"
29+
backuppb "github.com/pingcap/kvproto/pkg/brpb"
30+
"github.com/pingcap/log"
31+
berrors "github.com/pingcap/tidb/br/pkg/errors"
32+
"github.com/pingcap/tidb/pkg/objstore/recording"
33+
"github.com/pingcap/tidb/pkg/objstore/s3like"
34+
"github.com/pingcap/tidb/pkg/objstore/storeapi"
35+
"go.uber.org/zap"
36+
)
37+
38+
const defaultRegion = "cn-hangzhou"
39+
40+
// OSSStore is the OSS storage implementation.
41+
type OSSStore struct {
42+
*s3like.Storage
43+
credRefresher *credentialRefresher
44+
}
45+
46+
// Close implements storeapi.Storage.
47+
func (s *OSSStore) Close() {
48+
s.Storage.Close()
49+
if s.credRefresher != nil {
50+
s.credRefresher.close()
51+
}
52+
}
53+
54+
// NewOSSStorage creates a OSS storage client.
55+
//
56+
// permissions required to create the client:
57+
// - GetBucketLocation
58+
//
59+
// permissions required to r/w data:
60+
// - GetBucketLocation (used to get bucket region info)
61+
// - GetBucketAcl (used to check AccessBuckets permission)
62+
// - ListObjectsV2
63+
// - GetObject
64+
// - PutObject
65+
// - DeleteObject
66+
func NewOSSStorage(ctx context.Context, backend *backuppb.S3, opts *storeapi.Options) (obj *OSSStore, errRet error) {
67+
qs := *backend
68+
69+
// TODO changing the input backend is a side effect, it shouldn't be part of
70+
// the NewXXX, but we have to do it here to keep compatibility now.
71+
//
72+
// OSS credential through assume role need refresh periodically, if we do
73+
// send them out to TiKV, they also need to be refreshed, not sure how this
74+
// works for BR now, we can add it later.
75+
if !opts.SendCredentials {
76+
backend.AccessKey, backend.SecretAccessKey, backend.SessionToken = "", "", ""
77+
} else {
78+
return nil, errors.New("sending OSS credentials to TiKV is not supported")
79+
}
80+
81+
var ossOptFns []func(*oss.Options)
82+
if qs.ForcePathStyle {
83+
// in doc of ossutil and the SDK code, it states path-style addressing
84+
// is allowed, but in "Differences between OSS and S3", it states that
85+
// "For security reasons, OSS supports only the virtual-hosted style".
86+
// anyway, we don't support it now.
87+
log.Warn("force-path-style is not supported on OSS")
88+
}
89+
90+
ossCfg := oss.NewConfig().
91+
WithRetryer(newRetryer()).
92+
WithLogLevel(getOSSLogLevel()).
93+
WithLogPrinter(newLogPrinter(
94+
zap.String("bucket", qs.GetBucket()),
95+
zap.String("prefix", qs.GetPrefix()),
96+
zap.String("context", "oss"),
97+
))
98+
99+
// TODO OSS charges for traffic, consider auto use internal endpoint when
100+
// not specified explicitly and the bucket is in the same region with the
101+
// client.
102+
if len(qs.Endpoint) != 0 {
103+
ossCfg = ossCfg.WithEndpoint(qs.Endpoint)
104+
}
105+
var credRefresher *credentialRefresher
106+
if qs.AccessKey != "" && qs.SecretAccessKey != "" {
107+
ossCfg = ossCfg.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(qs.AccessKey, qs.SecretAccessKey, qs.SessionToken))
108+
} else {
109+
var provider providers.CredentialsProvider = providers.NewDefaultCredentialsProvider()
110+
if qs.RoleArn != "" {
111+
var err2 error
112+
provider, err2 = providers.NewRAMRoleARNCredentialsProviderBuilder().
113+
WithCredentialsProvider(provider).
114+
WithRoleArn(qs.RoleArn).
115+
WithExternalId(qs.ExternalId).
116+
WithHttpOptions(&providers.HttpOptions{
117+
ReadTimeout: int(30 * time.Second.Milliseconds()),
118+
ConnectTimeout: int(30 * time.Second.Milliseconds()),
119+
}).
120+
Build()
121+
if err2 != nil {
122+
return nil, errors.Trace(err2)
123+
}
124+
}
125+
credRefresher = newCredentialRefresher(provider, log.L().With(
126+
zap.String("bucket", qs.GetBucket()),
127+
zap.String("prefix", qs.GetPrefix()),
128+
))
129+
if err := credRefresher.refreshOnce(); err != nil {
130+
return nil, errors.Annotatef(err, "failed to get initial OSS credentials")
131+
}
132+
ossCfg = ossCfg.WithCredentialsProvider(credRefresher)
133+
}
134+
135+
if opts.AccessRecording != nil {
136+
ossOptFns = append(ossOptFns, func(o *oss.Options) {
137+
o.ResponseHandlers = append(o.ResponseHandlers, func(resp *http.Response) error {
138+
opts.AccessRecording.RecRequest(resp.Request)
139+
return nil
140+
})
141+
})
142+
}
143+
144+
// get bucket location or check the specified region is correct
145+
getLocCfg := &(*ossCfg)
146+
if qs.Region == "" {
147+
getLocCfg = getLocCfg.WithRegion(defaultRegion)
148+
} else {
149+
getLocCfg = getLocCfg.WithRegion(qs.Region)
150+
}
151+
ossCli := oss.NewClient(getLocCfg, ossOptFns...)
152+
resp, err := ossCli.GetBucketLocation(ctx, &oss.GetBucketLocationRequest{Bucket: oss.Ptr(qs.Bucket)})
153+
if err != nil {
154+
return nil, errors.Annotatef(err, "failed to get location of bucket %s", qs.Bucket)
155+
}
156+
157+
detectedRegion := trimOSSRegionID(tea.StringValue(resp.LocationConstraint))
158+
if qs.Region != "" && detectedRegion != qs.Region {
159+
return nil, errors.Trace(fmt.Errorf("bucket and region are not matched, bucket=%s, input region=%s, real region=%s",
160+
qs.Bucket, qs.Region, detectedRegion))
161+
}
162+
163+
log.Info("succeed to get bucket region", zap.String("region", detectedRegion))
164+
165+
qs.Prefix = storeapi.NewPrefix(qs.Prefix).String()
166+
bucketPrefix := storeapi.NewBucketPrefix(qs.Bucket, qs.Prefix)
167+
ossCfg = ossCfg.WithRegion(detectedRegion)
168+
169+
cli := &client{
170+
svc: oss.NewClient(ossCfg, ossOptFns...),
171+
BucketPrefix: bucketPrefix,
172+
options: &qs,
173+
}
174+
if err := s3like.CheckPermissions(ctx, cli, opts.CheckPermissions); err != nil {
175+
return nil, errors.Annotatef(berrors.ErrStorageInvalidPermission, "check permission failed due to %v", err)
176+
}
177+
178+
if credRefresher != nil {
179+
if err = credRefresher.startRefresh(); err != nil {
180+
return nil, errors.Annotatef(err, "failed to start OSS credential refresher")
181+
}
182+
}
183+
184+
return &OSSStore{
185+
Storage: s3like.NewStorage(cli, bucketPrefix, &qs, opts.AccessRecording),
186+
credRefresher: credRefresher,
187+
}, nil
188+
}
189+
190+
func newOSSStorageForTest(svc API, options *backuppb.S3, accessRec *recording.AccessStats) *s3like.Storage {
191+
bucketPrefix := storeapi.NewBucketPrefix(options.Bucket, options.Prefix)
192+
return s3like.NewStorage(
193+
&client{
194+
svc: svc,
195+
BucketPrefix: bucketPrefix,
196+
options: options,
197+
},
198+
bucketPrefix,
199+
options,
200+
accessRec,
201+
)
202+
}
203+
204+
// OSS has `oss-` prefix in their region ID, but even its own SDK don't use it, 😑.
205+
func trimOSSRegionID(region string) string {
206+
if strings.HasPrefix(region, "oss-") {
207+
return strings.TrimPrefix(region, "oss-")
208+
}
209+
return region
210+
}

0 commit comments

Comments
 (0)