Skip to content

Commit f9b729c

Browse files
s3: accumulate object count across role passes
scanBuckets runs once per configured role and reset its object counter each pass, so the final progress message only reflected the last role's count. Multi-role scans could report 0 objects scanned even when earlier roles scanned objects.
1 parent 46204d5 commit f9b729c

2 files changed

Lines changed: 28 additions & 4 deletions

File tree

pkg/sources/s3/s3.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,17 +285,21 @@ func determineResumePosition(ctx context.Context, tracker *Checkpointer, buckets
285285
}
286286
}
287287

288+
// scanBuckets scans the given buckets using the given role and adds the number
289+
// of objects it scanned to totalObjectCount. The counter is owned by Chunks and
290+
// shared across role passes so that the completion message reflects the whole
291+
// scan, not just the last role's pass.
288292
func (s *Source) scanBuckets(
289293
ctx context.Context,
290294
client *s3.Client,
291295
role string,
292296
bucketsToScan []string,
293297
chunksChan chan *sources.Chunk,
298+
totalObjectCount *uint64,
294299
) {
295300
if role != "" {
296301
ctx = context.WithValue(ctx, "role", role)
297302
}
298-
var totalObjectCount uint64
299303

300304
checkpointer := NewCheckpointer(ctx, &s.Progress, false)
301305
pos := determineResumePosition(ctx, checkpointer, bucketsToScan)
@@ -340,13 +344,13 @@ func (s *Source) scanBuckets(
340344
}
341345

342346
objectCount := s.scanBucket(ctx, client, role, bucket, sources.ChanReporter{Ch: chunksChan}, startAfter, checkpointer)
343-
totalObjectCount += objectCount
347+
*totalObjectCount += objectCount
344348
}
345349

346350
s.SetProgressComplete(
347351
len(bucketsToScan),
348352
len(bucketsToScan),
349-
fmt.Sprintf("Completed scanning source %s. %d objects scanned.", s.name, totalObjectCount),
353+
fmt.Sprintf("Completed scanning source %s. %d objects scanned.", s.name, *totalObjectCount),
350354
"",
351355
)
352356
}
@@ -435,8 +439,9 @@ func (s *Source) listErrorsAreExpected(role string) bool {
435439

436440
// Chunks emits chunks of bytes over a channel.
437441
func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ ...sources.ChunkingTarget) error {
442+
var totalObjectCount uint64
438443
visitor := func(c context.Context, defaultRegionClient *s3.Client, roleArn string, buckets []string) error {
439-
s.scanBuckets(c, defaultRegionClient, roleArn, buckets, chunksChan)
444+
s.scanBuckets(c, defaultRegionClient, roleArn, buckets, chunksChan, &totalObjectCount)
440445
return nil
441446
}
442447

pkg/sources/s3/s3_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,25 @@ func TestSource_ListErrorsAreExpected(t *testing.T) {
8989
}
9090
}
9191

92+
func TestSource_ScanBucketsReportsCumulativeObjectCount(t *testing.T) {
93+
conn, err := anypb.New(&sourcespb.S3{
94+
Credential: &sourcespb.S3_Unauthenticated{},
95+
})
96+
require.NoError(t, err)
97+
98+
s := Source{}
99+
require.NoError(t, s.Init(context.Background(), "s3 test source", 0, 0, false, conn, 1))
100+
101+
// Simulate a later role pass after an earlier pass already scanned three
102+
// objects. The pass below scans no buckets, so the completion message must
103+
// still report the cumulative total rather than resetting to zero.
104+
totalObjectCount := uint64(3)
105+
s.scanBuckets(context.Background(), nil, "", nil, make(chan *sources.Chunk, 1), &totalObjectCount)
106+
107+
assert.Equal(t, uint64(3), totalObjectCount)
108+
assert.Contains(t, s.Message, "3 objects scanned")
109+
}
110+
92111
func TestSource_Chunks(t *testing.T) {
93112
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
94113
defer cancel()

0 commit comments

Comments
 (0)