Skip to content

Commit e2c2b2c

Browse files
Merge branch 'apache:main' into feature/optimize-bss
2 parents 2afbb90 + 970ca6b commit e2c2b2c

6 files changed

Lines changed: 705 additions & 19 deletions

File tree

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ require (
3737
github.com/pterm/pterm v0.12.82
3838
github.com/stoewer/go-strcase v1.3.1
3939
github.com/stretchr/testify v1.11.1
40-
github.com/substrait-io/substrait-go/v7 v7.2.2
40+
github.com/substrait-io/substrait-go/v7 v7.3.0
4141
github.com/substrait-io/substrait-protobuf/go v0.79.0
4242
github.com/tidwall/sjson v1.2.5
4343
github.com/zeebo/xxh3 v1.1.0
4444
golang.org/x/exp v0.0.0-20260112195511-716be5621a96
4545
golang.org/x/sync v0.19.0
46-
golang.org/x/sys v0.40.0
46+
golang.org/x/sys v0.41.0
4747
golang.org/x/tools v0.41.0
4848
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da
4949
gonum.org/v1/gonum v0.17.0
@@ -78,7 +78,7 @@ require (
7878
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
7979
github.com/rivo/uniseg v0.4.7 // indirect
8080
github.com/stretchr/objx v0.5.2 // indirect
81-
github.com/substrait-io/substrait v0.78.1 // indirect
81+
github.com/substrait-io/substrait v0.79.0 // indirect
8282
github.com/tidwall/gjson v1.14.2 // indirect
8383
github.com/tidwall/match v1.1.1 // indirect
8484
github.com/tidwall/pretty v1.2.0 // indirect

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,10 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
150150
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
151151
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
152152
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
153-
github.com/substrait-io/substrait v0.78.1 h1:Dsn+kvFQdC2k/2XRVE2+aD88WNbE4miWed2AZTtkBoQ=
154-
github.com/substrait-io/substrait v0.78.1/go.mod h1:MPFNw6sToJgpD5Z2rj0rQrdP/Oq8HG7Z2t3CAEHtkHw=
155-
github.com/substrait-io/substrait-go/v7 v7.2.2 h1:cp51u9ikUW6YjKZSV3tu6Pxeu48pkfcraYR9yG06chY=
156-
github.com/substrait-io/substrait-go/v7 v7.2.2/go.mod h1:FVQ38NeDorflB3ogd8F9tjh9S1y8RDwwfSFm24/u9HY=
153+
github.com/substrait-io/substrait v0.79.0 h1:32poPdhuLmKa9J1FedCnNgNiueeDvO65SPDeRnP7epA=
154+
github.com/substrait-io/substrait v0.79.0/go.mod h1:MPFNw6sToJgpD5Z2rj0rQrdP/Oq8HG7Z2t3CAEHtkHw=
155+
github.com/substrait-io/substrait-go/v7 v7.3.0 h1:012lEWWwN/dz+xzG1pIqXZKpv0Er3LmdoqH+LaFqJjQ=
156+
github.com/substrait-io/substrait-go/v7 v7.3.0/go.mod h1:fZXyRTKuuP3wpBZghS/OjywWgskq5FWPUkO8UTFu0dE=
157157
github.com/substrait-io/substrait-protobuf/go v0.79.0 h1:pjgYJN26jD6P1NgIrJwAQ1GGhhtBftYZzWdwyZEpAUM=
158158
github.com/substrait-io/substrait-protobuf/go v0.79.0/go.mod h1:hn+Szm1NmZZc91FwWK9EXD/lmuGBSRTJ5IvHhlG1YnQ=
159159
github.com/tidwall/gjson v1.14.2 h1:6BBkirS0rAHjumnjHF6qgy5d2YAJ1TLIaFE2lzfOLqo=
@@ -217,8 +217,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
217217
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
218218
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
219219
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
220-
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
221-
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
220+
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
221+
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
222222
golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2 h1:O1cMQHRfwNpDfDJerqRoE2oD+AFlyid87D40L/OkkJo=
223223
golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2/go.mod h1:b7fPSJ0pKZ3ccUh8gnTONJxhn3c/PS6tyzQvyqw4iA8=
224224
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

parquet/file/column_writer_types.gen.go

Lines changed: 158 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

parquet/file/column_writer_types.gen.go.tmpl

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,58 @@ func (w *{{.Name}}ColumnChunkWriter) WriteDictIndices(indices arrow.Array, defLe
185185
}
186186

187187
func (w *{{.Name}}ColumnChunkWriter) writeValues(values []{{.name}}, numNulls int64) {
188+
{{- if or (eq .Name "ByteArray") (eq .Name "FixedLenByteArray")}}
189+
// For variable-length types, we need to check buffer size to prevent int32 overflow
190+
// For small values (<1MB), checking frequently adds negligible overhead
191+
// For large values (>1MB), we MUST check before each value
192+
const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
193+
const largeValueThreshold = 1.0 * 1024 * 1024 // 1MB
194+
195+
encoder := w.currentEncoder.(encoding.{{.Name}}Encoder)
196+
currentSize := w.currentEncoder.EstimatedDataEncodedSize()
197+
198+
// Batch process small values, check individually for large values
199+
batchStart := 0
200+
for i := 0; i < len(values); i++ {
201+
{{- if eq .Name "ByteArray"}}
202+
valueSize := int64(len(values[i]))
203+
{{- else}}
204+
valueSize := int64(w.descr.TypeLength())
205+
{{- end}}
206+
207+
// If this value might cause overflow, flush first
208+
if currentSize + valueSize >= maxSafeBufferSize {
209+
// Add accumulated batch before flushing
210+
if i > batchStart {
211+
encoder.Put(values[batchStart:i])
212+
currentSize = w.currentEncoder.EstimatedDataEncodedSize()
213+
}
214+
// Flush the page
215+
if err := w.FlushCurrentPage(); err != nil {
216+
panic(err)
217+
}
218+
batchStart = i
219+
currentSize = 0
220+
}
221+
222+
// Track size estimate
223+
currentSize += valueSize + 4 // +4 for length prefix
224+
225+
// For large values, add and flush immediately if needed
226+
if valueSize >= largeValueThreshold {
227+
encoder.Put(values[i:i+1])
228+
batchStart = i + 1
229+
currentSize = w.currentEncoder.EstimatedDataEncodedSize()
230+
}
231+
}
232+
233+
// Add remaining batch
234+
if batchStart < len(values) {
235+
encoder.Put(values[batchStart:])
236+
}
237+
{{- else}}
188238
w.currentEncoder.(encoding.{{.Name}}Encoder).Put(values)
239+
{{- end}}
189240
if w.pageStatistics != nil {
190241
{{- if ne .Name "FixedLenByteArray"}}
191242
w.pageStatistics.(*metadata.{{.Name}}Statistics).Update(values, numNulls)
@@ -204,11 +255,54 @@ func (w *{{.Name}}ColumnChunkWriter) writeValues(values []{{.name}}, numNulls in
204255
}
205256

206257
func (w *{{.Name}}ColumnChunkWriter) writeValuesSpaced(spacedValues []{{.name}}, numRead, numValues int64, validBits []byte, validBitsOffset int64) {
258+
{{- if or (eq .Name "ByteArray") (eq .Name "FixedLenByteArray")}}
259+
// For variable-length types, we need to check buffer size to prevent int32 overflow
260+
// For small values (<1MB), checking frequently adds negligible overhead
261+
// For large values (>1MB), we MUST check before each value
262+
const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
263+
const largeValueThreshold = 1.0 * 1024 * 1024 // 1MB
264+
265+
encoder := w.currentEncoder.(encoding.{{.Name}}Encoder)
266+
currentSize := w.currentEncoder.EstimatedDataEncodedSize()
267+
268+
for i := 0; i < len(spacedValues); i++ {
269+
{{- if eq .Name "ByteArray"}}
270+
valueSize := int64(len(spacedValues[i]))
271+
{{- else}}
272+
valueSize := int64(w.descr.TypeLength())
273+
{{- end}}
274+
275+
// If this value might cause overflow, flush first
276+
if currentSize + valueSize >= maxSafeBufferSize {
277+
if err := w.FlushCurrentPage(); err != nil {
278+
// If flush fails, panic will be caught by WriteBatch's defer recover
279+
panic(err)
280+
}
281+
currentSize = 0
282+
}
283+
284+
// Add the value
285+
chunk := spacedValues[i:i+1]
286+
if len(spacedValues) != int(numRead) && validBits != nil {
287+
encoder.PutSpaced(chunk, validBits, validBitsOffset+int64(i))
288+
} else {
289+
encoder.Put(chunk)
290+
}
291+
292+
// Track size estimate (only update for large values or every 100 values)
293+
if valueSize >= largeValueThreshold || i % 100 == 0 {
294+
currentSize = w.currentEncoder.EstimatedDataEncodedSize()
295+
} else {
296+
currentSize += valueSize + 4 // +4 for length prefix
297+
}
298+
}
299+
{{- else}}
207300
if len(spacedValues) != int(numRead) {
208301
w.currentEncoder.(encoding.{{.Name}}Encoder).PutSpaced(spacedValues, validBits, validBitsOffset)
209302
} else {
210303
w.currentEncoder.(encoding.{{.Name}}Encoder).Put(spacedValues)
211304
}
305+
{{- end}}
212306
if w.pageStatistics != nil {
213307
nulls := numValues - numRead
214308
{{- if ne .Name "FixedLenByteArray"}}

0 commit comments

Comments
 (0)