Skip to content

Commit 370e0ed

Browse files
authored
feat(storage): integrate parallel composite upload (PCU) into storage writer (#14232)
1 parent 269dd2d commit 370e0ed

6 files changed

Lines changed: 640 additions & 91 deletions

File tree

storage/grpc_client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,9 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec
10061006
dstObjPb.Name = req.dstObject.name
10071007

10081008
if req.sendCRC32C {
1009+
if dstObjPb.Checksums == nil {
1010+
dstObjPb.Checksums = &storagepb.ObjectChecksums{}
1011+
}
10091012
dstObjPb.Checksums.Crc32C = &req.dstObject.attrs.CRC32C
10101013
}
10111014

storage/integration_test.go

Lines changed: 249 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2281,7 +2281,11 @@ func TestIntegration_WriterCRC32CValidation(t *testing.T) {
22812281
t.Fatalf("ReadAll failed: %v", err)
22822282
}
22832283
if !bytes.Equal(gotContent, tc.content) {
2284-
t.Errorf("content mismatch: got %d bytes, want %d bytes", len(gotContent), len(tc.content))
2284+
if len(gotContent) == len(tc.content) {
2285+
t.Errorf("content mismatch: lengths match (%d bytes) but bytes differ", len(gotContent))
2286+
} else {
2287+
t.Errorf("content mismatch: got %d bytes, want %d bytes", len(gotContent), len(tc.content))
2288+
}
22852289
}
22862290
})
22872291
}
@@ -8716,3 +8720,247 @@ func setUpRequesterPaysBucket(ctx context.Context, t *testing.T, bucket, object
87168720
func crc32c(b []byte) uint32 {
87178721
return crc32.Checksum(b, crc32.MakeTable(crc32.Castagnoli))
87188722
}
8723+
8724+
func TestIntegration_ParallelUpload(t *testing.T) {
8725+
ctx := skipExtraReadAPIs(skipHTTP("PCU only supported for gRPC"), "no reads in test")
8726+
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
8727+
h := testHelper{t}
8728+
8729+
testCases := []struct {
8730+
name string
8731+
content []byte
8732+
config ParallelUploadConfig
8733+
expected int64
8734+
}{
8735+
{
8736+
name: "small object",
8737+
content: bytes.Repeat([]byte("a"), 1<<20), // 1 MiB
8738+
config: ParallelUploadConfig{PartSize: 5 << 20, MaxConcurrency: 2},
8739+
expected: 1 << 20,
8740+
},
8741+
{
8742+
name: "exact part size",
8743+
content: bytes.Repeat([]byte("b"), 5<<20), // 5 MiB
8744+
config: ParallelUploadConfig{PartSize: 5 << 20, MaxConcurrency: 2},
8745+
expected: 5 << 20,
8746+
},
8747+
{
8748+
name: "large object",
8749+
content: bytes.Repeat([]byte("c"), 12<<20), // 12 MiB
8750+
config: ParallelUploadConfig{PartSize: 5 << 20, MaxConcurrency: 4},
8751+
expected: 12 << 20,
8752+
},
8753+
{
8754+
name: "recursive composition logic",
8755+
content: bytes.Repeat([]byte("d"), 165<<20), // 165 MiB = 33 parts of 5 MiB each (tests recursive compose)
8756+
config: ParallelUploadConfig{PartSize: 5 << 20, MaxConcurrency: 16},
8757+
expected: 165 << 20,
8758+
},
8759+
}
8760+
8761+
for _, tc := range testCases {
8762+
t.Run(tc.name, func(t *testing.T) {
8763+
objName := "pcu-" + uidSpaceObjects.New()
8764+
obj := client.Bucket(bucket).Object(objName)
8765+
t.Cleanup(func() {
8766+
h.mustDeleteObject(obj)
8767+
})
8768+
8769+
w := obj.NewWriter(ctx)
8770+
w.EnableParallelUpload = true
8771+
w.ParallelUploadConfig = tc.config
8772+
8773+
if _, err := w.Write(tc.content); err != nil {
8774+
t.Fatalf("Writer.Write: %v", err)
8775+
}
8776+
if err := w.Close(); err != nil {
8777+
t.Fatalf("Writer.Close: %v", err)
8778+
}
8779+
8780+
// Verify object size and existence.
8781+
attrs, err := obj.Attrs(ctx)
8782+
if err != nil {
8783+
t.Fatalf("obj.Attrs: %v", err)
8784+
}
8785+
if attrs.Size != tc.expected {
8786+
t.Errorf("Object size mismatch: got %d, want %d", attrs.Size, tc.expected)
8787+
}
8788+
8789+
// Verify contents.
8790+
r, err := obj.NewReader(ctx)
8791+
if err != nil {
8792+
t.Fatalf("NewReader failed: %v", err)
8793+
}
8794+
defer r.Close()
8795+
gotContent, err := io.ReadAll(r)
8796+
if err != nil {
8797+
t.Fatalf("ReadAll failed: %v", err)
8798+
}
8799+
if !bytes.Equal(gotContent, tc.content) {
8800+
if len(gotContent) == len(tc.content) {
8801+
t.Errorf("content mismatch: lengths match (%d bytes) but bytes differ", len(gotContent))
8802+
} else {
8803+
t.Errorf("content mismatch: got %d bytes, want %d bytes", len(gotContent), len(tc.content))
8804+
}
8805+
}
8806+
8807+
// Verify cleanup of intermediate/part objects.
8808+
// Since cleanup runs in the background, we retry for a few seconds.
8809+
var count int
8810+
err = retry(ctx, func() error {
8811+
it := client.Bucket(bucket).Objects(ctx, &Query{Prefix: tmpObjectPrefix})
8812+
count = 0
8813+
for {
8814+
attrsObj, err := it.Next()
8815+
if err == iterator.Done {
8816+
break
8817+
}
8818+
if err != nil {
8819+
return err
8820+
}
8821+
// Only count temporary chunks belonging to this specific test object to avoid test flakes.
8822+
if strings.Contains(attrsObj.Name, objName) {
8823+
count++
8824+
}
8825+
}
8826+
if count != 0 {
8827+
return fmt.Errorf("found %d temporary objects after Parallel Upload, expected 0", count)
8828+
}
8829+
return nil
8830+
}, func() error {
8831+
return nil
8832+
})
8833+
8834+
if err != nil {
8835+
t.Error(err)
8836+
}
8837+
})
8838+
}
8839+
})
8840+
}
8841+
8842+
// TestIntegration_ParallelUploadConcurrency verifies that multiple parallel uploads
8843+
// can occur simultaneously to different objects without resource interference or leaks.
8844+
func TestIntegration_ParallelUploadConcurrency(t *testing.T) {
8845+
ctx := skipExtraReadAPIs(skipHTTP("PCU only supported for gRPC"), "no reads in test")
8846+
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
8847+
h := testHelper{t}
8848+
8849+
content := bytes.Repeat([]byte("z"), 15<<20) // 15 MiB
8850+
config := ParallelUploadConfig{PartSize: 5 << 20, MaxConcurrency: 4}
8851+
8852+
var wg sync.WaitGroup
8853+
numUploads := 5
8854+
errs := make(chan error, numUploads)
8855+
8856+
for i := 0; i < numUploads; i++ {
8857+
wg.Add(1)
8858+
go func(idx int) {
8859+
defer wg.Done()
8860+
objName := fmt.Sprintf("pcu-concurrent-%d-%s", idx, uidSpaceObjects.New())
8861+
obj := client.Bucket(bucket).Object(objName)
8862+
defer h.mustDeleteObject(obj)
8863+
8864+
w := obj.NewWriter(ctx)
8865+
w.EnableParallelUpload = true
8866+
w.ParallelUploadConfig = config
8867+
8868+
if _, err := w.Write(content); err != nil {
8869+
errs <- fmt.Errorf("Writer.Write failed: %v", err)
8870+
return
8871+
}
8872+
if err := w.Close(); err != nil {
8873+
errs <- fmt.Errorf("Writer.Close failed: %v", err)
8874+
return
8875+
}
8876+
8877+
// Verify object size.
8878+
attrs, err := obj.Attrs(ctx)
8879+
if err != nil {
8880+
errs <- fmt.Errorf("obj.Attrs failed: %v", err)
8881+
return
8882+
}
8883+
if attrs.Size != int64(len(content)) {
8884+
errs <- fmt.Errorf("Object size mismatch: got %d, want %d", attrs.Size, len(content))
8885+
return
8886+
}
8887+
}(i)
8888+
}
8889+
8890+
wg.Wait()
8891+
close(errs)
8892+
8893+
for err := range errs {
8894+
t.Errorf("Concurrent upload error: %v", err)
8895+
}
8896+
})
8897+
}
8898+
8899+
func TestIntegration_ParallelUpload_ChecksumValidation(t *testing.T) {
8900+
ctx := skipExtraReadAPIs(skipHTTP("PCU only supported for gRPC"), "no reads in test")
8901+
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
8902+
h := testHelper{t}
8903+
8904+
content := bytes.Repeat([]byte("z"), 12<<20) // 12 MiB
8905+
correctCRC := crc32.Checksum(content, crc32cTable)
8906+
8907+
testCases := []struct {
8908+
name string
8909+
crc32c uint32
8910+
expectedErr string
8911+
}{
8912+
{
8913+
name: "correct checksum",
8914+
crc32c: correctCRC,
8915+
expectedErr: "",
8916+
},
8917+
{
8918+
name: "incorrect checksum",
8919+
crc32c: correctCRC + 1,
8920+
expectedErr: "does not match the expected CRC32C",
8921+
},
8922+
}
8923+
8924+
for _, tc := range testCases {
8925+
t.Run(tc.name, func(t *testing.T) {
8926+
objName := "pcu-crc32c-" + uidSpaceObjects.New()
8927+
obj := client.Bucket(bucket).Object(objName)
8928+
t.Cleanup(func() {
8929+
h.mustDeleteObject(obj)
8930+
})
8931+
8932+
w := obj.NewWriter(ctx)
8933+
w.EnableParallelUpload = true
8934+
w.ParallelUploadConfig = ParallelUploadConfig{PartSize: 5 << 20, MaxConcurrency: 2}
8935+
w.SendCRC32C = true
8936+
w.CRC32C = tc.crc32c
8937+
8938+
if _, err := w.Write(content); err != nil {
8939+
t.Fatalf("Writer.Write: %v", err)
8940+
}
8941+
8942+
err := w.Close()
8943+
if tc.expectedErr != "" {
8944+
if err == nil {
8945+
t.Fatalf("expected error containing %q, got nil", tc.expectedErr)
8946+
}
8947+
if !strings.Contains(err.Error(), tc.expectedErr) {
8948+
t.Fatalf("expected error containing %q, but got %v", tc.expectedErr, err)
8949+
}
8950+
} else {
8951+
if err != nil {
8952+
t.Fatalf("Writer.Close: %v", err)
8953+
}
8954+
8955+
attrs, err := obj.Attrs(ctx)
8956+
if err != nil {
8957+
t.Fatalf("obj.Attrs: %v", err)
8958+
}
8959+
if attrs.CRC32C != tc.crc32c {
8960+
t.Errorf("Object CRC32C mismatch: got %d, want %d", attrs.CRC32C, tc.crc32c)
8961+
}
8962+
}
8963+
})
8964+
}
8965+
})
8966+
}

0 commit comments

Comments
 (0)