Skip to content

Commit 7a83790

Browse files
committed
Allow run-end encoding of TimestampWithOffset
1 parent 7156421 commit 7a83790

2 files changed

Lines changed: 126 additions & 5 deletions

File tree

arrow/extensions/timestamp_with_offset.go

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package extensions
1919
import (
2020
"errors"
2121
"fmt"
22+
"math"
2223
"reflect"
2324
"strings"
2425
"time"
@@ -41,6 +42,13 @@ func isOffsetTypeOk(offsetType arrow.DataType) bool {
4142
return true
4243
case *arrow.DictionaryType:
4344
return arrow.IsInteger(offsetType.IndexType.ID()) && arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
45+
case *arrow.RunEndEncodedType:
46+
return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
47+
arrow.TypeEqual(offsetType.Encoded(), arrow.PrimitiveTypes.Int16)
48+
// FIXME: Technically this should be non-nullable, but a Arrow IPC does not deserialize
49+
// ValueNullable properly, so enforcing this here would always fail when reading from an IPC
50+
// stream
51+
// !offsetType.ValueNullable
4452
default:
4553
return false
4654
}
@@ -140,6 +148,21 @@ func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index arro
140148
return NewTimestampWithOffsetType(unit, &offsetType)
141149
}
142150

151+
// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to
152+
// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, Int16)), where T is any TimeUnit and E is a
153+
// valid run-ends type.
154+
//
155+
// The error will be populated if runEnds is not a valid run-end encoding run-ends type.
156+
func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds arrow.DataType) (*TimestampWithOffsetType, error) {
157+
offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
158+
if !offsetType.ValidRunEndsType(runEnds) {
159+
return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s", runEnds))
160+
}
161+
162+
return NewTimestampWithOffsetType(unit, offsetType)
163+
}
164+
165+
143166
func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
144167
return reflect.TypeOf(TimestampWithOffsetArray{})
145168
}
@@ -247,6 +270,8 @@ func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, int16
247270
offsetMinutes = offsets.Value(i)
248271
case *array.Dictionary:
249272
offsetMinutes = offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i))
273+
case *array.RunEndEncoded:
274+
offsetMinutes = offsets.Values().(*array.Int16).Value(offsets.GetPhysicalIndex(i))
250275
}
251276

252277
return utcTimestamp, offsetMinutes, timeUnit
@@ -262,6 +287,7 @@ func (a *TimestampWithOffsetArray) Value(i int) time.Time {
262287

263288
func (a *TimestampWithOffsetArray) Values() []time.Time {
264289
values := make([]time.Time, a.Len())
290+
// TODO: optimize for run-end encoding
265291
for i := range a.Len() {
266292
val := a.Value(i)
267293
values[i] = val
@@ -280,6 +306,7 @@ func (a *TimestampWithOffsetArray) ValueStr(i int) string {
280306

281307
func (a *TimestampWithOffsetArray) MarshalJSON() ([]byte, error) {
282308
values := make([]interface{}, a.Len())
309+
// TODO: optimize for run-end encoding
283310
for i := 0; i < a.Len(); i++ {
284311
if a.IsValid(i) {
285312
utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i)
@@ -307,6 +334,8 @@ type TimestampWithOffsetBuilder struct {
307334
Layout string
308335
unit arrow.TimeUnit
309336
offsetType arrow.DataType
337+
// lastOffset is only used to determine when to start new runs with run-end encoded offsets
338+
lastOffset int16
310339
}
311340

312341
// NewTimestampWithOffsetBuilder creates a new TimestampWithOffsetBuilder, exposing a convenient and efficient interface
@@ -320,38 +349,60 @@ func NewTimestampWithOffsetBuilder(mem memory.Allocator, unit arrow.TimeUnit, of
320349
return &TimestampWithOffsetBuilder{
321350
unit: unit,
322351
offsetType: offsetType,
352+
lastOffset: math.MaxInt16,
323353
Layout: time.RFC3339,
324354
ExtensionBuilder: array.NewExtensionBuilder(mem, dataType),
325355
}, nil
326356
}
327357

328358
func (b *TimestampWithOffsetBuilder) Append(v time.Time) {
329359
timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit)
360+
offsetMinutes16 := int16(offsetMinutes)
330361
structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder)
331362

332363
structBuilder.Append(true)
333364
structBuilder.FieldBuilder(0).(*array.TimestampBuilder).Append(timestamp)
334365

335366
switch offsets := structBuilder.FieldBuilder(1).(type) {
336367
case *array.Int16Builder:
337-
offsets.Append(int16(offsetMinutes))
368+
offsets.Append(offsetMinutes16)
338369
case *array.Int16DictionaryBuilder:
339-
offsets.Append(int16(offsetMinutes))
370+
offsets.Append(offsetMinutes16)
371+
case *array.RunEndEncodedBuilder:
372+
if offsetMinutes != b.lastOffset {
373+
offsets.Append(1)
374+
offsets.ValueBuilder().(*array.Int16Builder).Append(offsetMinutes16)
375+
} else {
376+
offsets.ContinueRun(1)
377+
}
378+
379+
b.lastOffset = offsetMinutes16
340380
}
381+
341382
}
342383

343384
func (b *TimestampWithOffsetBuilder) UnsafeAppend(v time.Time) {
344385
timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit)
386+
offsetMinutes16 := int16(offsetMinutes)
345387
structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder)
346388

347389
structBuilder.Append(true)
348390
structBuilder.FieldBuilder(0).(*array.TimestampBuilder).UnsafeAppend(timestamp)
349391

350392
switch offsets := structBuilder.FieldBuilder(1).(type) {
351393
case *array.Int16Builder:
352-
offsets.UnsafeAppend(int16(offsetMinutes))
394+
offsets.UnsafeAppend(offsetMinutes16)
353395
case *array.Int16DictionaryBuilder:
354-
offsets.Append(int16(offsetMinutes))
396+
offsets.Append(offsetMinutes16)
397+
case *array.RunEndEncodedBuilder:
398+
if offsetMinutes != b.lastOffset {
399+
offsets.Append(1)
400+
offsets.ValueBuilder().(*array.Int16Builder).Append(offsetMinutes16)
401+
} else {
402+
offsets.ContinueRun(1)
403+
}
404+
405+
b.lastOffset = offsetMinutes16
355406
}
356407
}
357408

@@ -399,13 +450,31 @@ func (b *TimestampWithOffsetBuilder) AppendValues(values []time.Time, valids []b
399450
timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit)
400451
if valids[i] {
401452
timestamps.UnsafeAppend(timestamp)
402-
// TODO: I was here, this needs to be equivalent to UnsafeAppend
403453
offsets.Append(offsetMinutes)
404454
} else {
405455
timestamps.UnsafeAppendBoolToBitmap(false)
406456
offsets.UnsafeAppendBoolToBitmap(false)
407457
}
408458
}
459+
case *array.RunEndEncodedBuilder:
460+
offsetValuesBuilder := offsets.ValueBuilder().(*array.Int16Builder)
461+
for i, v := range values {
462+
timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit)
463+
if valids[i] {
464+
timestamps.UnsafeAppend(timestamp)
465+
offsetMinutes16 := int16(offsetMinutes)
466+
if offsetMinutes != b.lastOffset {
467+
offsets.Append(1)
468+
offsetValuesBuilder.Append(offsetMinutes16)
469+
} else {
470+
offsets.ContinueRun(1)
471+
}
472+
b.lastOffset = offsetMinutes16
473+
} else {
474+
timestamps.UnsafeAppendBoolToBitmap(false)
475+
offsets.UnsafeAppendBoolToBitmap(false)
476+
}
477+
}
409478
}
410479
}
411480

arrow/extensions/timestamp_with_offset_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ func dict(index arrow.DataType) arrow.DataType {
5050
}
5151
}
5252

53+
func ree(runEnds arrow.DataType) arrow.DataType {
54+
v := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
55+
v.ValueNullable = false
56+
return v
57+
}
58+
5359
// All tests use this in a for loop to make sure everything works for every possible
5460
// encoding of offsets (primitive, dictionary, run-end)
5561
var allAllowedOffsetTypes = []arrow.DataType{
@@ -65,6 +71,11 @@ var allAllowedOffsetTypes = []arrow.DataType{
6571
dict(arrow.PrimitiveTypes.Int16),
6672
dict(arrow.PrimitiveTypes.Int32),
6773
dict(arrow.PrimitiveTypes.Int64),
74+
75+
// run-end encoded offsetType
76+
ree(arrow.PrimitiveTypes.Int16),
77+
ree(arrow.PrimitiveTypes.Int32),
78+
ree(arrow.PrimitiveTypes.Int64),
6879
}
6980

7081
func TestTimestampWithOffsetTypePrimitiveBasics(t *testing.T) {
@@ -141,6 +152,47 @@ func TestTimestampWithOffsetTypeDictionaryEncodedBasics(t *testing.T) {
141152
}
142153
}
143154

155+
func TestTimestampWithOffsetTypeRunEndEncodedBasics(t *testing.T) {
156+
invalidRunEndsType := arrow.PrimitiveTypes.Float32
157+
_, err := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit, invalidRunEndsType)
158+
assert.True(t, err != nil, "Err should not be nil if run ends type is invalid")
159+
160+
runEndsTypes := []arrow.DataType{
161+
arrow.PrimitiveTypes.Int16,
162+
arrow.PrimitiveTypes.Int32,
163+
arrow.PrimitiveTypes.Int64,
164+
};
165+
166+
for _, indexType := range runEndsTypes {
167+
typ, err := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit, indexType)
168+
assert.True(t, err == nil, "Err should be nil")
169+
170+
assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName())
171+
assert.True(t, typ.ExtensionEquals(typ))
172+
173+
assert.True(t, arrow.TypeEqual(typ, typ))
174+
assert.True(t, arrow.TypeEqual(
175+
arrow.StructOf(
176+
arrow.Field{
177+
Name: "timestamp",
178+
Type: &arrow.TimestampType{
179+
Unit: testTimeUnit,
180+
TimeZone: "UTC",
181+
},
182+
Nullable: false,
183+
},
184+
arrow.Field{
185+
Name: "offset_minutes",
186+
Type: ree(indexType),
187+
Nullable: false,
188+
},
189+
),
190+
typ.StorageType()))
191+
192+
assert.Equal(t, "extension<arrow.timestamp_with_offset>", typ.String())
193+
}
194+
}
195+
144196
func TestTimestampWithOffsetExtensionBuilder(t *testing.T) {
145197
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
146198
defer mem.AssertSize(t, 0)

0 commit comments

Comments
 (0)