Skip to content

Commit e78b931

Browse files
feat(arrow/compute): sort support (#749)
## Summary Implements stable `sort_indices` (and `sort` via `take`) for arrays, chunked arrays, record batches, and tables using logical row indices over `Chunked` data **without concatenating chunks**. The control flow and ordering rules are modeled on Apache Arrow C++ **`vector_sort.cc` / `vector_sort_internal.h`**, with a few **Go- and performance-driven** differences called out below. ## Parity with Arrow C++ (`vector_sort.cc` / `vector_sort_internal.h`) **Same overall structure** - **Single sort key, one column** - **Multiple chunks:** per-chunk sort then **pairwise merge** of sorted spans (C++ **ChunkedArraySorter** / **ChunkedMergeImpl** idea). - **Single chunk, no validity nulls and no null-likes:** direct stable sort on indices (C++ skips null partitioning when `null_count == 0` and there are no null-likes). - Otherwise: **partition validity nulls**, **partition float null-likes (NaN)**, stable sort of finite values, then **VisitConstantRanges**-style handling of ties (`vector_sort_internal.go`). - **Multiple sort keys** - **`len(keys) <= kMaxRadixSortKeys` (8):** **MSD radix** path per record-batch range (`radixRecordBatchSortRange` ↔ **ConcreteRecordBatchColumnSorter::SortRange**). - **More than 8 keys:** **MultipleKeyRecordBatchSorter**-style global stable sort with lexicographic compare across keys (`multipleKeyRecordBatchSortRange`). - **Aligned chunk boundaries** across all keyed columns (typical table): sort **each chunk slice** with the same strategy, then **merge spans** like C++ **TableSorter** batch merge. **Same ordering semantics (intended match to C++)** - Per-key **ascending / descending** and **null placement** (including **NaN** as null-like for floats). - **Stable** ordering: merge and `slices.SortStableFunc` are used so tie-breaking matches the C++ “left before right” stable merge behavior where documented in code. **Same “column comparator” role** - Go **`columnComparator`** interface ↔ C++ **`ColumnComparator`**: `compareRowsForKey`, null / null-like metadata, `columnHasValidityNulls` (skip **PartitionNullsOnly** when there are no validity nulls). **Physical types** - One **monomorphic** comparator type per supported physical pattern in **`vector_sort_physical.go`**, analogous to C++ **`ConcreteColumnComparator<T>`** (concrete `*array.T` + direct `Value` / `Cmp` / special cases for bool and intervals). ## Intentional differences and rationale | Area | C++ | This Go port | | ---------------------------------------------- | ------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | **Resolving logical row → (chunk, offset)** | Chunk / resolver machinery in C++ | **Dense `logicalRowMap`**: one `rowMapCell{chunk, local}` per logical row when `len(chunks) > 1`; **`pair(i,j)`** resolves two rows in one shot. **Why:** random compares during sort/merge need O(1) resolution; a flat table + co-located fields beats repeated resolver work and improves locality vs separate `chunk`/`local` slices. | | **`physicalColumnBase` methods** | N/A (different language) | **Pointer receivers** on `pair` / `isNullAtGlobal` / `cell`. **Why:** value receivers would copy slice headers (and map state) on every compare. | | **Stable sort primitive** | `std::stable_sort` | **`slices.SortStableFunc`** (Go 1.21+). **Why:** library primitive; semantics aligned with stable weak ordering used elsewhere in the port. | | **Column dispatch at runtime** | Templates + virtuals | **`columnComparator` interface** for “which column” in multi-key and merge loops. **Why:** idiomatic Go; per-type work stays in concrete `compareRowsForKey` implementations. | | **Chunked merge with null-likes (e.g. float)** | C++ can **split** merge for null-like vs non-null-like regions (**ChunkedMergeImpl**) | **Single `less` over full row order** after per-chunk partitioning/sort. **Why:** simpler merge while preserving order as long as per-chunk phases match C++; documented in `vector_sort.go` comments. | | **Generics for physical columns** | Templates instantiate fully | **Explicit monomorphs only** for the hot compare path. **Why:** measured regression vs Go generics on this hot path (inlining / assertions); verbosity traded for performance. | ## File Layout - `arrow/compute/vector_sort.go` — `sort_indices` / `sort` registration and datum dispatch. - `arrow/compute/vector_sort_test.go` — functional tests. - `arrow/compute/internal/kernels/vector_sort.go` — orchestration, merge, `SortIndices` kernel. - `arrow/compute/internal/kernels/vector_sort_internal.go` — null partitions, radix / multi-key batch sort. - `arrow/compute/internal/kernels/vector_sort_support.go` — `logicalRowMap` and ordering helpers. - `arrow/compute/internal/kernels/vector_sort_physical.go` — per-type column comparators. - `arrow/compute/internal/kernels/vector_sort_bench_test.go` — benchmarks. ## Testing - `go test ./arrow/compute -run TestSort -count=1` - Benchmarks: `go test ./arrow/compute/internal/kernels -bench=BenchmarkSortIndices -benchmem` . ## References - Arrow C++: `cpp/src/arrow/compute/kernels/vector_sort.cc` and `vector_sort_internal.h` (and related comparators). - https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/kernels/vector_sort.cc - https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/kernels/vector_sort_internal.h ## Related Issues - Closes #66
1 parent ea84305 commit e78b931

9 files changed

Lines changed: 4207 additions & 1 deletion

arrow/compute/internal/kernels/vector_sort.go

Lines changed: 481 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
//go:build go1.22
18+
19+
package kernels
20+
21+
import (
22+
"context"
23+
"fmt"
24+
"testing"
25+
26+
"github.com/apache/arrow-go/v18/arrow"
27+
"github.com/apache/arrow-go/v18/arrow/array"
28+
"github.com/apache/arrow-go/v18/arrow/compute/exec"
29+
"github.com/apache/arrow-go/v18/arrow/memory"
30+
)
31+
32+
// Benchmarks target kernels.SortIndices (chunked comparators + stable sort) without compute
33+
// registry or CallFunction overhead. Use e.g.:
34+
//
35+
// go test -bench=BenchmarkSortIndices -benchmem -cpuprofile=cpu.prof ./arrow/compute/internal/kernels/
36+
// go tool pprof -http=:8080 cpu.prof
37+
38+
func newBenchKernelCtx(tb testing.TB) (*exec.KernelCtx, memory.Allocator) {
39+
tb.Helper()
40+
mem := memory.NewGoAllocator()
41+
ctx := &exec.KernelCtx{Ctx: exec.WithAllocator(context.Background(), mem)}
42+
return ctx, mem
43+
}
44+
45+
// makeChunkedInt64Split returns n int64 rows in numChunks contiguous arrays. Values are a
46+
// deterministic function of global row index so the sort does non-trivial work.
47+
func makeChunkedInt64Split(tb testing.TB, mem memory.Allocator, n, numChunks int) *arrow.Chunked {
48+
tb.Helper()
49+
if numChunks < 1 {
50+
numChunks = 1
51+
}
52+
if n < numChunks {
53+
numChunks = n
54+
}
55+
base := n / numChunks
56+
rem := n % numChunks
57+
chunks := make([]arrow.Array, 0, numChunks)
58+
global := 0
59+
for c := range numChunks {
60+
sz := base
61+
if c < rem {
62+
sz++
63+
}
64+
bld := array.NewInt64Builder(mem)
65+
for i := range sz {
66+
x := int64(global + i)
67+
bld.Append((x * 6364136223846793005) ^ (x >> 12))
68+
}
69+
arr := bld.NewArray()
70+
chunks = append(chunks, arr)
71+
global += sz
72+
}
73+
ch := arrow.NewChunked(arrow.PrimitiveTypes.Int64, chunks)
74+
tb.Cleanup(func() { ch.Release() })
75+
return ch
76+
}
77+
78+
func BenchmarkSortIndices_Int64(b *testing.B) {
79+
const rows = 65536
80+
for _, numChunks := range []int{1, 16, 128} {
81+
b.Run(fmt.Sprintf("rows=%d/chunks=%d", rows, numChunks), func(b *testing.B) {
82+
ctx, mem := newBenchKernelCtx(b)
83+
col := makeChunkedInt64Split(b, mem, rows, numChunks)
84+
keys := []SortKey{{ColumnIndex: 0, Order: Ascending, NullPlacement: NullsAtEnd}}
85+
columns := []*arrow.Chunked{col}
86+
87+
b.ReportAllocs()
88+
b.ResetTimer()
89+
for range b.N {
90+
res, err := SortIndices(ctx, columns, keys)
91+
if err != nil {
92+
b.Fatal(err)
93+
}
94+
res.Release()
95+
}
96+
})
97+
}
98+
}
99+
100+
func BenchmarkSortIndices_Int64_TwoKeys(b *testing.B) {
101+
const rows = 65536
102+
const numChunks = 64
103+
ctx, mem := newBenchKernelCtx(b)
104+
colA := makeChunkedInt64Split(b, mem, rows, numChunks)
105+
colB := makeChunkedInt64Split(b, mem, rows, numChunks)
106+
keys := []SortKey{
107+
{ColumnIndex: 0, Order: Ascending, NullPlacement: NullsAtEnd},
108+
{ColumnIndex: 1, Order: Descending, NullPlacement: NullsAtStart},
109+
}
110+
columns := []*arrow.Chunked{colA, colB}
111+
112+
b.ReportAllocs()
113+
b.ResetTimer()
114+
for range b.N {
115+
res, err := SortIndices(ctx, columns, keys)
116+
if err != nil {
117+
b.Fatal(err)
118+
}
119+
res.Release()
120+
}
121+
}
122+
123+
func makeChunkedStringSplit(tb testing.TB, mem memory.Allocator, n, numChunks int) *arrow.Chunked {
124+
tb.Helper()
125+
if numChunks < 1 {
126+
numChunks = 1
127+
}
128+
if n < numChunks {
129+
numChunks = n
130+
}
131+
base := n / numChunks
132+
rem := n % numChunks
133+
chunks := make([]arrow.Array, 0, numChunks)
134+
global := 0
135+
for c := range numChunks {
136+
sz := base
137+
if c < rem {
138+
sz++
139+
}
140+
bld := array.NewStringBuilder(mem)
141+
for i := range sz {
142+
x := global + i
143+
v := (x * 6364136223846793005) ^ (x >> 12)
144+
bld.Append(fmt.Sprintf("%016x", v))
145+
}
146+
arr := bld.NewArray()
147+
chunks = append(chunks, arr)
148+
global += sz
149+
}
150+
ch := arrow.NewChunked(arrow.BinaryTypes.String, chunks)
151+
tb.Cleanup(func() { ch.Release() })
152+
return ch
153+
}
154+
155+
func BenchmarkSortIndices_String(b *testing.B) {
156+
const rows = 65536
157+
for _, numChunks := range []int{1, 32} {
158+
b.Run(fmt.Sprintf("rows=%d/chunks=%d", rows, numChunks), func(b *testing.B) {
159+
ctx, mem := newBenchKernelCtx(b)
160+
col := makeChunkedStringSplit(b, mem, rows, numChunks)
161+
keys := []SortKey{{ColumnIndex: 0, Order: Ascending, NullPlacement: NullsAtEnd}}
162+
columns := []*arrow.Chunked{col}
163+
164+
b.ReportAllocs()
165+
b.ResetTimer()
166+
for range b.N {
167+
res, err := SortIndices(ctx, columns, keys)
168+
if err != nil {
169+
b.Fatal(err)
170+
}
171+
res.Release()
172+
}
173+
})
174+
}
175+
}

0 commit comments

Comments
 (0)