Skip to content

Commit 7ae2e33

Browse files
fix(compute): fix data race and memory leak in concurrent is_in kernel (#712)
by cleaning up SetLookupState via per-invocation ctx.State instead of shared kernel.Data fix: apache/iceberg-go/issues/489 ### Rationale for this change fix: apache/iceberg-go/issues/489 (flaky test in iceberg-go) ### What changes are included in this PR? fix data race and memory leak in concurrent is_in kernel ### Are these changes tested? yes, added a test to repro memory leak (3/10 runs fails) ### Are there any user-facing changes?
1 parent 5bf10f2 commit 7ae2e33

3 files changed

Lines changed: 41 additions & 2 deletions

File tree

arrow/compute/executor.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,12 @@ func (s *scalarExecutor) WrapResults(ctx context.Context, out <-chan Datum, hasC
581581

582582
func (s *scalarExecutor) executeSpans(data chan<- Datum) (err error) {
583583
defer func() {
584-
err = errors.Join(err, s.kernel.Cleanup())
584+
// Clean up per-invocation kernel state using ctx.State rather than
585+
// the shared kernel.Data field, which races when multiple goroutines
586+
// execute the same kernel concurrently.
587+
if sk, ok := s.kernel.(*exec.ScalarKernel); ok && sk.CleanupFn != nil && s.ctx.State != nil {
588+
err = errors.Join(err, sk.CleanupFn(s.ctx.State))
589+
}
585590
}()
586591

587592
var (

arrow/compute/scalar_set_lookup.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ type setLookupState interface {
143143

144144
func execIsIn(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
145145
state := ctx.State.(setLookupState)
146-
ctx.Kernel.(*exec.ScalarKernel).Data = state
147146
in := batch.Values[0]
148147

149148
if !arrow.TypeEqual(in.Type(), state.ValueType()) {

arrow/compute/scalar_set_lookup_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package compute_test
1919
import (
2020
"context"
2121
"strings"
22+
"sync"
2223
"testing"
2324

2425
"github.com/apache/arrow-go/v18/arrow"
@@ -597,6 +598,40 @@ func (ss *ScalarSetLookupSuite) TestIsInChunked() {
597598
ss.checkIsInChunked(input, valueSet, expected, compute.NullMatchingInconclusive)
598599
}
599600

601+
// TestIsInConcurrentNoLeak verifies that concurrent is_in executions do not
602+
// leak BinaryMemoTable buffers. The is_in kernel stores per-invocation
603+
// SetLookupState in the shared kernel.Data field, which races when multiple
604+
// goroutines execute simultaneously — earlier states get overwritten and
605+
// their buffers (192 bytes each) leak. TearDownTest's AssertSize catches this.
606+
func (ss *ScalarSetLookupSuite) TestIsInConcurrentNoLeak() {
607+
input := ss.getArr(arrow.BinaryTypes.String, `["alpha", "beta", "gamma", "delta"]`)
608+
defer input.Release()
609+
610+
valueSet := ss.getArr(arrow.BinaryTypes.String, `["alpha", "gamma"]`)
611+
defer valueSet.Release()
612+
613+
const workers = 4
614+
var wg sync.WaitGroup
615+
wg.Add(workers)
616+
for range workers {
617+
go func() {
618+
defer wg.Done()
619+
defer func() {
620+
recover() // race can cause panic in BinaryMemoTable.lookup
621+
}()
622+
623+
result, err := compute.IsIn(ss.ctx, compute.SetOptions{
624+
ValueSet: compute.NewDatumWithoutOwning(valueSet),
625+
}, compute.NewDatumWithoutOwning(input))
626+
if err != nil {
627+
return
628+
}
629+
result.Release()
630+
}()
631+
}
632+
wg.Wait()
633+
}
634+
600635
func (ss *ScalarSetLookupSuite) TearDownTest() {
601636
ss.mem.AssertSize(ss.T(), 0)
602637
}

0 commit comments

Comments
 (0)