Skip to content

Commit b9b8bf2

Browse files
committed
Optimize Values() and MarshalJSON() for REE
Smartly iterate over offsets if they're run-end encoded instead of doing a binary search at every iteration. This makes the loops O(n) instead of O(n*logn).
1 parent 7a83790 commit b9b8bf2

1 file changed

Lines changed: 65 additions & 14 deletions

File tree

arrow/extensions/timestamp_with_offset.go

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -285,13 +285,70 @@ func (a *TimestampWithOffsetArray) Value(i int) time.Time {
285285
return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
286286
}
287287

288+
// Iterates over the array and calls the callback with the timestamp at each position. If it's null,
289+
// the timestamp will be nil.
290+
//
291+
// This will iterate using the fastest method given the underlying storage array
292+
func (a* TimestampWithOffsetArray) iterValues(callback func(i int, utcTimestamp *time.Time)) {
293+
structs := a.Storage().(*array.Struct)
294+
offsets := structs.Field(1)
295+
if reeOffsets, isRee := offsets.(*array.RunEndEncoded); isRee {
296+
timestampField := structs.Field(0)
297+
timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit
298+
timestamps := timestampField.(*array.Timestamp)
299+
300+
offsetValues := reeOffsets.Values().(*array.Int16)
301+
offsetPhysicalIdx := 0
302+
303+
var getRunEnd (func(int) int)
304+
switch arr := reeOffsets.RunEndsArr().(type) {
305+
case *array.Int16:
306+
getRunEnd = func(idx int) int { return int(arr.Value(idx)) }
307+
case *array.Int32:
308+
getRunEnd = func(idx int) int { return int(arr.Value(idx)) }
309+
case *array.Int64:
310+
getRunEnd = func(idx int) int { return int(arr.Value(idx)) }
311+
}
312+
313+
for i := 0; i < a.Len(); i++ {
314+
if i >= getRunEnd(offsetPhysicalIdx) {
315+
offsetPhysicalIdx += 1
316+
}
317+
318+
timestamp := (*time.Time)(nil)
319+
if a.IsValid(i) {
320+
utcTimestamp := timestamps.Value(i)
321+
offsetMinutes := offsetValues.Value(offsetPhysicalIdx)
322+
v := timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
323+
timestamp = &v
324+
}
325+
326+
callback(i, timestamp)
327+
}
328+
} else {
329+
for i := 0; i < a.Len(); i++ {
330+
timestamp := (*time.Time)(nil)
331+
if a.IsValid(i) {
332+
utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i)
333+
v := timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
334+
timestamp = &v
335+
}
336+
337+
callback(i, timestamp)
338+
}
339+
}
340+
}
341+
342+
288343
func (a *TimestampWithOffsetArray) Values() []time.Time {
289344
values := make([]time.Time, a.Len())
290-
// TODO: optimize for run-end encoding
291-
for i := range a.Len() {
292-
val := a.Value(i)
293-
values[i] = val
294-
}
345+
a.iterValues(func(i int, timestamp *time.Time) {
346+
if timestamp == nil {
347+
values[i] = time.Unix(0, 0)
348+
} else {
349+
values[i] = *timestamp
350+
}
351+
})
295352
return values
296353
}
297354

@@ -306,15 +363,9 @@ func (a *TimestampWithOffsetArray) ValueStr(i int) string {
306363

307364
func (a *TimestampWithOffsetArray) MarshalJSON() ([]byte, error) {
308365
values := make([]interface{}, a.Len())
309-
// TODO: optimize for run-end encoding
310-
for i := 0; i < a.Len(); i++ {
311-
if a.IsValid(i) {
312-
utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i)
313-
values[i] = timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
314-
} else {
315-
values[i] = nil
316-
}
317-
}
366+
a.iterValues(func(i int, timestamp *time.Time) {
367+
values[i] = timestamp
368+
})
318369
return json.Marshal(values)
319370
}
320371

0 commit comments

Comments
 (0)