Skip to content

Commit 3553da6

Browse files
fix(firestore): bubble up errors in BulkWriter (#14481)
This PR addresses an issue where the Firestore `BulkWriter` could silently fail to persist documents without notifying the caller. This behavior was primarily observed when the `BulkWriter`'s context was canceled or when internal limits were reached. #### Root Causes 1. **Dropped Batches on Cancellation:** In the background `send` function, if the context was canceled, the current batch of jobs was dropped immediately without notifying the individual job result channels . 2. **Ignored Queuing Errors:** The `write` function and the retry logic ignored errors returned by the underlying bundler when attempting to queue a write . 3. **Ignored Rate Limit Errors:** Context errors occurring during the rate limiter's wait period were not checked or propagated . #### Changes - Updated the internal `write` method and its public callers (`Create`, `Delete`, `Set`, and `Update`) to return errors from the rate limiter and the bundler. - Modified the `send` function to iterate through and notify all jobs in a batch with a context error if the transmission is aborted due to cancellation. - Added error checking to the retry logic within `send` to ensure failures to re-queue a job are surfaced. These changes ensure that any failure to queue or send a document is explicitly reported through the `BulkWriterJob` results or as an immediate return value, preventing data loss. #11422. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 84d2146 commit 3553da6

2 files changed

Lines changed: 66 additions & 13 deletions

File tree

firestore/bulkwriter.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ func (bw *BulkWriter) Create(doc *DocumentRef, datum interface{}) (*BulkWriterJo
186186
return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter")
187187
}
188188

189-
j := bw.write(w[0])
189+
j, err := bw.write(w[0])
190+
if err != nil {
191+
return nil, err
192+
}
190193
return j, nil
191194
}
192195

@@ -209,7 +212,10 @@ func (bw *BulkWriter) Delete(doc *DocumentRef, preconds ...Precondition) (*BulkW
209212
return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter")
210213
}
211214

212-
j := bw.write(w[0])
215+
j, err := bw.write(w[0])
216+
if err != nil {
217+
return nil, err
218+
}
213219
return j, nil
214220
}
215221

@@ -232,7 +238,10 @@ func (bw *BulkWriter) Set(doc *DocumentRef, datum interface{}, opts ...SetOption
232238
return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter")
233239
}
234240

235-
j := bw.write(w[0])
241+
j, err := bw.write(w[0])
242+
if err != nil {
243+
return nil, err
244+
}
236245
return j, nil
237246
}
238247

@@ -255,7 +264,10 @@ func (bw *BulkWriter) Update(doc *DocumentRef, updates []Update, preconds ...Pre
255264
return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter")
256265
}
257266

258-
j := bw.write(w[0])
267+
j, err := bw.write(w[0])
268+
if err != nil {
269+
return nil, err
270+
}
259271
return j, nil
260272
}
261273

@@ -284,20 +296,23 @@ func (bw *BulkWriter) checkWriteConditions(doc *DocumentRef) error {
284296
}
285297

286298
// write packages up write requests into bulkWriterJob objects.
287-
func (bw *BulkWriter) write(w *pb.Write) *BulkWriterJob {
299+
func (bw *BulkWriter) write(w *pb.Write) (*BulkWriterJob, error) {
288300

289301
j := &BulkWriterJob{
290302
resultChan: make(chan bulkWriterResult, 1),
291303
write: w,
292304
ctx: bw.ctx,
293305
}
294306

295-
bw.limiter.Wait(bw.ctx)
296-
// ignore operation size constraints and related errors; can't be inferred at compile time
297-
// Bundler is set to accept an unlimited amount of bytes
298-
_ = bw.bundler.Add(j, 0)
307+
if err := bw.limiter.Wait(bw.ctx); err != nil {
308+
return nil, err
309+
}
310+
err := bw.bundler.Add(j, 0)
311+
if err != nil {
312+
return nil, err
313+
}
299314

300-
return j
315+
return j, nil
301316
}
302317

303318
// send transmits writes to the service and matches response results to job channels.
@@ -321,6 +336,9 @@ func (bw *BulkWriter) send(i interface{}) {
321336

322337
select {
323338
case <-bw.ctx.Done():
339+
for _, j := range bwj {
340+
j.setError(bw.ctx.Err())
341+
}
324342
return
325343
default:
326344
resp, err := bw.vc.BatchWrite(bw.ctx, bwr)
@@ -342,9 +360,10 @@ func (bw *BulkWriter) send(i interface{}) {
342360
// Do we need separate retry bundler?
343361
_, isRetryable := batchWriteRetryCodes[codes.Code(s.Code)]
344362
if j.attempts < maxRetryAttempts && isRetryable {
345-
// ignore operation size constraints and related errors; job size can't be inferred at compile time
346-
// Bundler is set to accept an unlimited amount of bytes
347-
_ = bw.bundler.Add(j, 0)
363+
err := bw.bundler.Add(j, 0)
364+
if err != nil {
365+
j.setError(fmt.Errorf("firestore: bulk write retry failed %w original error %v", err, status.Error(codes.Code(s.Code), s.Message)))
366+
}
348367
} else {
349368
j.setError(status.Error(codes.Code(s.Code), s.Message))
350369
}

firestore/bulkwriter_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,15 @@ func TestBulkWriterErrors(t *testing.T) {
245245
return b.Delete(c.Doc("C/b"))
246246
},
247247
},
248+
{
249+
name: "cannot write with cancelled context",
250+
test: func(ignored *BulkWriter) (*BulkWriterJob, error) {
251+
ctx, cancel := context.WithCancel(context.Background())
252+
bw := c.BulkWriter(ctx)
253+
cancel()
254+
return bw.Delete(c.Doc("C/c"))
255+
},
256+
},
248257
}
249258

250259
for _, tc := range tcs {
@@ -479,3 +488,28 @@ func TestBulkWriterConcurrent(t *testing.T) {
479488
}
480489
}
481490
}
491+
492+
func TestBulkWriterSendCancelled(t *testing.T) {
493+
c, _, cleanup := newMock(t)
494+
defer cleanup()
495+
496+
ctx, cancel := context.WithCancel(context.Background())
497+
bw := c.BulkWriter(ctx)
498+
499+
j, err := bw.Create(c.Doc("C/a"), testData)
500+
if err != nil {
501+
t.Fatalf("Create failed: %v", err)
502+
}
503+
504+
cancel() // Cancel context before Flush/send can complete successfully
505+
506+
bw.Flush()
507+
508+
_, err = j.Results()
509+
if err == nil {
510+
t.Fatal("wanted error, got nil")
511+
}
512+
if err != context.Canceled {
513+
t.Errorf("want context.Canceled, got %v", err)
514+
}
515+
}

0 commit comments

Comments
 (0)