Skip to content

Commit 3af6b39

Browse files
authored
fix: eliminate per-element heap allocs in is_in kernel for binary types (#745)
## Summary - Add `BinaryMemoTable.ExistsDirect` that inlines the hash table probe loop, avoiding the closure in `HashTable.Lookup` that causes `val []byte` to escape to the heap - Add `isInBinaryDirect` specialized kernel path that bypasses the `visitBinary` → `VisitBitBlocksShort` closure chain by directly iterating with `OptionalBitBlockCounter` - Route `BinaryDataType` dispatch in `DispatchIsIn` to the new direct path (handles both int32 and int64 offsets) ## Motivation The `is_in` kernel for binary types allocated once per input element because the `[]byte` value escaped to the heap through a closure chain: 1. `visitBinary` slices `rawBytes[offsets[pos]:offsets[pos+1]]` and passes to a callback 2. The callback calls `BinaryMemoTable.Exists(v)` 3. `Exists` calls `lookup` which creates a closure capturing `val` 4. The closure is passed to `HashTable.Lookup`, causing escape analysis to move `val` to the heap Closes #736 ## Benchmark (100k rows, 10-element value set) | Metric | Before | After | Improvement | |--------|--------|-------|-------------| | ns/op | 4,133,679 | 923,565 | **4.5x faster** | | B/op | 2,435,327 | 33,092 | **73x less memory** | | allocs/op | 100,075 | 70 | **1,430x fewer allocs** | All existing `TestIsInBinary` subtests pass (binary, large\_binary, utf8, large\_utf8 × all null matching behaviors).
1 parent 5383f8f commit 3af6b39

3 files changed

Lines changed: 165 additions & 1 deletion

File tree

arrow/compute/internal/kernels/scalar_set_lookup.go

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,12 @@ func DispatchIsIn(state lookupState, in *exec.ArraySpan, out *exec.ExecResult) e
235235

236236
switch ty := inType.(type) {
237237
case arrow.BinaryDataType:
238-
return isInKernelExec(state.(*SetLookupState[[]byte]), in, out)
238+
switch ty.Layout().Buffers[1].ByteWidth {
239+
case 8:
240+
return isInBinaryDirect[int64](state.(*SetLookupState[[]byte]), in, out)
241+
default:
242+
return isInBinaryDirect[int32](state.(*SetLookupState[[]byte]), in, out)
243+
}
239244
case arrow.FixedWidthDataType:
240245
switch ty.Bytes() {
241246
case 1:
@@ -254,6 +259,98 @@ func DispatchIsIn(state lookupState, in *exec.ArraySpan, out *exec.ExecResult) e
254259
}
255260
}
256261

262+
// isInBinaryDirect is a specialized is_in path for binary/string types
263+
// that avoids the nested closure chain (isInKernelExec -> visitBinary ->
264+
// VisitBitBlocksShort) which causes per-element heap allocations due to
265+
// closure escape analysis. It inlines the bit block iteration and calls
266+
// BinaryMemoTable.ExistsDirect which also avoids closure-based lookup.
267+
func isInBinaryDirect[OffsetT int32 | int64](state *SetLookupState[[]byte], in *exec.ArraySpan, out *exec.ExecResult) error {
268+
if in.Len == 0 {
269+
return nil
270+
}
271+
272+
writerBool := bitutil.NewBitmapWriter(out.Buffers[1].Buf, int(out.Offset), int(out.Len))
273+
defer writerBool.Finish()
274+
writerNulls := bitutil.NewBitmapWriter(out.Buffers[0].Buf, int(out.Offset), int(out.Len))
275+
defer writerNulls.Finish()
276+
277+
valueSetHasNull := state.NullIndex != -1
278+
rawBytes := in.Buffers[2].Buf
279+
offsets := exec.GetSpanOffsets[OffsetT](in, 1)
280+
lookup := state.Lookup.(*hashing.BinaryMemoTable)
281+
282+
bitmap := in.Buffers[0].Buf
283+
counter := bitutils.NewOptionalBitBlockCounter(bitmap, in.Offset, in.Len)
284+
pos := int64(0)
285+
for pos < in.Len {
286+
block := counter.NextBlock()
287+
if block.AllSet() {
288+
for i := 0; i < int(block.Len); i, pos = i+1, pos+1 {
289+
val := rawBytes[offsets[pos]:offsets[pos+1]]
290+
if lookup.ExistsDirect(val) {
291+
writerBool.Set()
292+
writerNulls.Set()
293+
} else if state.NullBehavior == NullMatchingInconclusive && valueSetHasNull {
294+
writerBool.Clear()
295+
writerNulls.Clear()
296+
} else {
297+
writerBool.Clear()
298+
writerNulls.Set()
299+
}
300+
writerBool.Next()
301+
writerNulls.Next()
302+
}
303+
} else if block.NoneSet() {
304+
for i := 0; i < int(block.Len); i, pos = i+1, pos+1 {
305+
switch {
306+
case state.NullBehavior == NullMatchingMatch && valueSetHasNull:
307+
writerBool.Set()
308+
writerNulls.Set()
309+
case state.NullBehavior == NullMatchingSkip || (!valueSetHasNull && state.NullBehavior == NullMatchingMatch):
310+
writerBool.Clear()
311+
writerNulls.Set()
312+
default:
313+
writerBool.Clear()
314+
writerNulls.Clear()
315+
}
316+
writerBool.Next()
317+
writerNulls.Next()
318+
}
319+
} else {
320+
for i := 0; i < int(block.Len); i, pos = i+1, pos+1 {
321+
if bitutil.BitIsSet(bitmap, int(in.Offset+pos)) {
322+
val := rawBytes[offsets[pos]:offsets[pos+1]]
323+
if lookup.ExistsDirect(val) {
324+
writerBool.Set()
325+
writerNulls.Set()
326+
} else if state.NullBehavior == NullMatchingInconclusive && valueSetHasNull {
327+
writerBool.Clear()
328+
writerNulls.Clear()
329+
} else {
330+
writerBool.Clear()
331+
writerNulls.Set()
332+
}
333+
} else {
334+
switch {
335+
case state.NullBehavior == NullMatchingMatch && valueSetHasNull:
336+
writerBool.Set()
337+
writerNulls.Set()
338+
case state.NullBehavior == NullMatchingSkip || (!valueSetHasNull && state.NullBehavior == NullMatchingMatch):
339+
writerBool.Clear()
340+
writerNulls.Set()
341+
default:
342+
writerBool.Clear()
343+
writerNulls.Clear()
344+
}
345+
}
346+
writerBool.Next()
347+
writerNulls.Next()
348+
}
349+
}
350+
}
351+
return nil
352+
}
353+
257354
func isInKernelExec[T hashing.MemoTypes](state *SetLookupState[T], in *exec.ArraySpan, out *exec.ExecResult) error {
258355
writerBool := bitutil.NewBitmapWriter(out.Buffers[1].Buf, int(out.Offset), int(out.Len))
259356
defer writerBool.Finish()

arrow/compute/scalar_set_lookup_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package compute_test
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"strings"
2223
"sync"
2324
"testing"
@@ -639,3 +640,43 @@ func (ss *ScalarSetLookupSuite) TearDownTest() {
639640
func TestScalarSetLookup(t *testing.T) {
640641
suite.Run(t, new(ScalarSetLookupSuite))
641642
}
643+
644+
func BenchmarkIsInBinary(b *testing.B) {
645+
const numRows = 100_000
646+
const valueSetSize = 10
647+
648+
mem := memory.DefaultAllocator
649+
ctx := compute.WithAllocator(context.TODO(), mem)
650+
651+
bldr := array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)
652+
defer bldr.Release()
653+
for i := range numRows {
654+
v := []byte(fmt.Sprintf("value-%08d", i%1000))
655+
bldr.Append(v)
656+
}
657+
input := bldr.NewArray()
658+
defer input.Release()
659+
660+
vsBldr := array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)
661+
defer vsBldr.Release()
662+
for i := range valueSetSize {
663+
v := []byte(fmt.Sprintf("value-%08d", i))
664+
vsBldr.Append(v)
665+
}
666+
valueSet := vsBldr.NewArray()
667+
defer valueSet.Release()
668+
669+
opts := compute.SetOptions{
670+
ValueSet: compute.NewDatumWithoutOwning(valueSet),
671+
}
672+
673+
b.ResetTimer()
674+
b.ReportAllocs()
675+
for range b.N {
676+
result, err := compute.IsIn(ctx, opts, compute.NewDatumWithoutOwning(input))
677+
if err != nil {
678+
b.Fatal(err)
679+
}
680+
result.Release()
681+
}
682+
}

internal/hashing/xxh3_memo_table.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,32 @@ func (b *BinaryMemoTable) Exists(val []byte) bool {
225225
return ok
226226
}
227227

228+
// ExistsDirect checks if val exists in the table by inlining the hash
229+
// table probe loop directly. This avoids the closure allocation that
230+
// occurs in the normal Exists -> lookup -> HashTable.Lookup path,
231+
// where the comparison closure captures val and causes it to escape
232+
// to the heap.
233+
func (b *BinaryMemoTable) ExistsDirect(val []byte) bool {
234+
const perturbShift uint8 = 5
235+
236+
h := Hash(val, 0)
237+
v := b.tbl.fixHash(h)
238+
idx := v & b.tbl.capMask
239+
perturb := (v >> uint64(perturbShift)) + 1
240+
241+
for {
242+
e := &b.tbl.entries[idx]
243+
if e.h == v && bytes.Equal(val, b.builder.Value(int(e.payload.val))) {
244+
return true
245+
}
246+
if e.h == sentinel {
247+
return false
248+
}
249+
idx = (idx + perturb) & b.tbl.capMask
250+
perturb = (perturb >> uint64(perturbShift)) + 1
251+
}
252+
}
253+
228254
// Get returns the index of the specified value in the table or KeyNotFound,
229255
// and a boolean indicating whether it was found in the table.
230256
func (b *BinaryMemoTable) Get(val interface{}) (int, bool) {

0 commit comments

Comments
 (0)