Skip to content

Commit 820d0a2

Browse files
authored
fix(firestore): enforce backpressure in BulkWriter (#12938)
This PR addresses a critical issue where the Firestore `BulkWriter` could **silently drop document writes** without notifying the caller, particularly under high load or when the process context was canceled. ## 🐛 Issue and Root Causes Investigation revealed that the current implementation of `BulkWriter` bypassed the client's built-in safety and resource management mechanisms: 1. **Disabled Backpressure:** All document writes were enqueued with a size of `0`, effectively disabling the `BufferedByteLimit` (https://pkg.go.dev/google.golang.org/api/support/bundler#Bundler) enforcement. This allowed the internal buffer to grow without bound, leading to memory pressure and potential Out-of-Memory (OOM) crashes. 2. **Ignored Queuing Errors:** The internal `write` function ignored return values from `bundler.Add`, meaning queuing failures were never reported to the user. ## ✅ Proposed Fix The fix moves `BulkWriter` to a managed resource model that respects backpressure and ensures loud failures: * **Runtime Size Calculation:** Computes the actual serialized size of each write using `proto.Size(w)`. * **Enforced Backpressure:** Replaces `Add(j, 0)` with `AddWait(ctx, j, estimatedSize)`. This ensures that the producer (application code) blocks if the internal 1GB buffer limit is reached, preventing unbounded memory growth. ## 📌 Benefits * **Data Integrity:** Guarantees that documents are either successfully queued or returned with an explicit error. * **System Stability:** Prevents OOM crashes by capping memory usage and slowing down producers that outpace the network. * **Alignment:** Brings the Go SDK into parity with the backpressure behavior found in other Firestore SDKs like Java and Node.js. #### Java Implementation The Java SDK uses an asynchronous "task" model to manage writes. * **Concurrency:** It leverages async threads (BulkCommitBatch) to handle parallel requests. * **Backpressure:** It implements a buffer limit on the number of pending operations to prevent memory exhaustion. When this limit is reached, subsequent attempts to queue writes will block the producer until space is available. #### Node.js Implementation Node.js follows a similar pattern but is optimized for its event-driven architecture. * **Buffering:** It automatically buffers writes into batches and ensures they are sent in order. * **Memory Management:** Similar to Java, it uses an internal buffer limit to impose backpressure on the event loop, preventing an unbounded queue of pending promises. #### Python Implementation The Python SDK is designed to be user-friendly by hiding the complexities of asynchronous execution. * **Parallelization:** It uses a `ThreadPoolExecutor` to send batches in parallel. This allows users to gain performance benefits without manually managing an event loop or using `asyncio`. * **Rate Limiting:** It includes a dedicated `RateLimiter` class to manage the ramp-up of write traffic. ## Impact Analysis The "breaking" change here is that Create might now block. However: * If the user's load is within normal limits, they won't notice a difference (the 1GB buffer is large). * If the user's load is excessive, they are already experiencing silent failures or OOMs. Blocking is the correct "fail-safe" state for their application's stability. * The BulkWriter methods already return an error. Returning a "context deadline exceeded" error from a blocking Create call is a valid and much more helpful response than returning nil and dropping the write. Fixes #11422
1 parent 69993a5 commit 820d0a2

1 file changed

Lines changed: 14 additions & 4 deletions

File tree

firestore/bulkwriter.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"google.golang.org/api/support/bundler"
2828
"google.golang.org/grpc/codes"
2929
"google.golang.org/grpc/status"
30+
"google.golang.org/protobuf/proto"
3031
)
3132

3233
const (
@@ -297,7 +298,6 @@ func (bw *BulkWriter) checkWriteConditions(doc *DocumentRef) error {
297298

298299
// write packages up write requests into bulkWriterJob objects.
299300
func (bw *BulkWriter) write(w *pb.Write) (*BulkWriterJob, error) {
300-
301301
j := &BulkWriterJob{
302302
resultChan: make(chan bulkWriterResult, 1),
303303
write: w,
@@ -307,8 +307,9 @@ func (bw *BulkWriter) write(w *pb.Write) (*BulkWriterJob, error) {
307307
if err := bw.limiter.Wait(bw.ctx); err != nil {
308308
return nil, err
309309
}
310-
err := bw.bundler.Add(j, 0)
311-
if err != nil {
310+
311+
estimatedSize := proto.Size(w)
312+
if err := bw.bundler.AddWait(bw.ctx, j, estimatedSize); err != nil {
312313
return nil, err
313314
}
314315

@@ -360,7 +361,16 @@ func (bw *BulkWriter) send(i interface{}) {
360361
// Do we need separate retry bundler?
361362
_, isRetryable := batchWriteRetryCodes[codes.Code(s.Code)]
362363
if j.attempts < maxRetryAttempts && isRetryable {
363-
err := bw.bundler.Add(j, 0)
364+
// Re-queue the job for retry. We use a size of 0 here for two reasons:
365+
// 1. Consistency: Since the BulkWriter uses AddWait for backpressure,
366+
// we must continue using AddWait to avoid a "mixed methods" error from
367+
// the bundler.
368+
// 2. Deadlock Prevention: The send() function runs within the bundler's
369+
// handler. The memory for this job was already accounted for during the
370+
// initial write() and will not be released until this handler returns.
371+
// Attempting to acquire additional weight here could cause a deadlock
372+
// if the buffer is full.
373+
err := bw.bundler.AddWait(bw.ctx, j, 0)
364374
if err != nil {
365375
j.setError(fmt.Errorf("firestore: bulk write retry failed %w original error %v", err, status.Error(codes.Code(s.Code), s.Message)))
366376
}

0 commit comments

Comments
 (0)