Skip to content

Commit 1474ede

Browse files
authored
feat(engine): More aggregations for the new query engine (#19350)
1 parent ba5f308 commit 1474ede

17 files changed

+546
-128
lines changed

pkg/engine/compat.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,11 +304,11 @@ func collectSamplesFromRow(builder *labels.Builder, rec arrow.Record, i int) (pr
304304
return promql.Sample{}, false
305305
}
306306

307-
col, ok := col.(*array.Int64)
307+
col, ok := col.(*array.Float64)
308308
if !ok {
309309
return promql.Sample{}, false
310310
}
311-
sample.F = float64(col.Value(i))
311+
sample.F = col.Value(i)
312312
default:
313313
// allow any string columns
314314
if colDataType == datatype.Loki.String.String() {

pkg/engine/compat_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -181,17 +181,17 @@ func TestVectorResultBuilder(t *testing.T) {
181181
schema := arrow.NewSchema(
182182
[]arrow.Field{
183183
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
184-
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Integer)},
184+
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Float64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Float)},
185185
{Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
186186
{Name: "job", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
187187
},
188188
nil,
189189
)
190190

191191
rows := arrowtest.Rows{
192-
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: int64(42), "instance": "localhost:9090", "job": "prometheus"},
193-
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: int64(23), "instance": "localhost:9100", "job": "node-exporter"},
194-
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: int64(15), "instance": "localhost:9100", "job": "prometheus"},
192+
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(42), "instance": "localhost:9090", "job": "prometheus"},
193+
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(23), "instance": "localhost:9100", "job": "node-exporter"},
194+
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(15), "instance": "localhost:9100", "job": "prometheus"},
195195
}
196196

197197
record := rows.Record(alloc, schema)
@@ -236,14 +236,14 @@ func TestVectorResultBuilder(t *testing.T) {
236236
schema := arrow.NewSchema(
237237
[]arrow.Field{
238238
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
239-
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Integer)},
239+
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Float64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Float)},
240240
{Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
241241
},
242242
nil,
243243
)
244244

245245
rows := arrowtest.Rows{
246-
{types.ColumnNameBuiltinTimestamp: nil, types.ColumnNameGeneratedValue: int64(42), "instance": "localhost:9090"},
246+
{types.ColumnNameBuiltinTimestamp: nil, types.ColumnNameGeneratedValue: float64(42), "instance": "localhost:9090"},
247247
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: nil, "instance": "localhost:9100"},
248248
}
249249

@@ -276,20 +276,20 @@ func TestMatrixResultBuilder(t *testing.T) {
276276
schema := arrow.NewSchema(
277277
[]arrow.Field{
278278
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
279-
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Integer)},
279+
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Float64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Float)},
280280
{Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
281281
{Name: "job", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
282282
},
283283
nil,
284284
)
285285

286286
rows := arrowtest.Rows{
287-
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: int64(42), "instance": "localhost:9090", "job": "prometheus"},
288-
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000001000000000).UTC(), types.ColumnNameGeneratedValue: int64(43), "instance": "localhost:9090", "job": "prometheus"},
289-
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000002000000000).UTC(), types.ColumnNameGeneratedValue: int64(44), "instance": "localhost:9090", "job": "prometheus"},
290-
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: int64(23), "instance": "localhost:9100", "job": "node-exporter"},
291-
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000001000000000).UTC(), types.ColumnNameGeneratedValue: int64(24), "instance": "localhost:9100", "job": "node-exporter"},
292-
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000002000000000).UTC(), types.ColumnNameGeneratedValue: int64(25), "instance": "localhost:9100", "job": "node-exporter"},
287+
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(42), "instance": "localhost:9090", "job": "prometheus"},
288+
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000001000000000).UTC(), types.ColumnNameGeneratedValue: float64(43), "instance": "localhost:9090", "job": "prometheus"},
289+
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000002000000000).UTC(), types.ColumnNameGeneratedValue: float64(44), "instance": "localhost:9090", "job": "prometheus"},
290+
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(23), "instance": "localhost:9100", "job": "node-exporter"},
291+
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000001000000000).UTC(), types.ColumnNameGeneratedValue: float64(24), "instance": "localhost:9100", "job": "node-exporter"},
292+
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000002000000000).UTC(), types.ColumnNameGeneratedValue: float64(25), "instance": "localhost:9100", "job": "node-exporter"},
293293
}
294294

295295
record := rows.Record(alloc, schema)

pkg/engine/executor/aggregator.go

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,33 @@ import (
1818
)
1919

2020
type groupState struct {
21-
value int64 // aggregated value
21+
value float64 // aggregated value
2222
labelValues []string // grouping label values
2323
}
2424

25+
type aggregationOperation int
26+
27+
const (
28+
aggregationOperationSum aggregationOperation = iota
29+
aggregationOperationMax
30+
aggregationOperationMin
31+
aggregationOperationCount
32+
)
33+
2534
// aggregator is used to aggregate sample values by a set of grouping keys for each point in time.
26-
// Currently it only supports SUM operation, but can be extended to support other operations like COUNT, AVG, etc.
2735
type aggregator struct {
28-
groupBy []physical.ColumnExpression // columns to group by
29-
points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series
30-
digest *xxhash.Digest // used to compute key for each group
36+
groupBy []physical.ColumnExpression // columns to group by
37+
points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series
38+
digest *xxhash.Digest // used to compute key for each group
39+
operation aggregationOperation // aggregation type
3140
}
3241

3342
// newAggregator creates a new aggregator with the specified groupBy columns.
34-
func newAggregator(groupBy []physical.ColumnExpression, pointsSizeHint int) *aggregator {
43+
func newAggregator(groupBy []physical.ColumnExpression, pointsSizeHint int, operation aggregationOperation) *aggregator {
3544
a := aggregator{
36-
groupBy: groupBy,
37-
digest: xxhash.New(),
45+
groupBy: groupBy,
46+
digest: xxhash.New(),
47+
operation: operation,
3848
}
3949

4050
if pointsSizeHint > 0 {
@@ -48,7 +58,7 @@ func newAggregator(groupBy []physical.ColumnExpression, pointsSizeHint int) *agg
4858

4959
// Add adds a new sample value to the aggregation for the given timestamp and grouping label values.
5060
// It expects labelValues to be in the same order as the groupBy columns.
51-
func (a *aggregator) Add(ts time.Time, value int64, labelValues []string) {
61+
func (a *aggregator) Add(ts time.Time, value float64, labelValues []string) {
5262
point, ok := a.points[ts]
5363
if !ok {
5464
point = make(map[uint64]*groupState)
@@ -67,7 +77,22 @@ func (a *aggregator) Add(ts time.Time, value int64, labelValues []string) {
6777

6878
if state, ok := point[key]; ok {
6979
// TODO: handle hash collisions
70-
state.value += value
80+
81+
// accumulate value based on aggregation type
82+
switch a.operation {
83+
case aggregationOperationSum:
84+
state.value += value
85+
case aggregationOperationMax:
86+
if value > state.value {
87+
state.value = value
88+
}
89+
case aggregationOperationMin:
90+
if value < state.value {
91+
state.value = value
92+
}
93+
case aggregationOperationCount:
94+
state.value = state.value + 1
95+
}
7196
} else {
7297
// create a new slice since labelValues is reused by the calling code
7398
labelValuesCopy := make([]string, len(labelValues))
@@ -78,11 +103,20 @@ func (a *aggregator) Add(ts time.Time, value int64, labelValues []string) {
78103
labelValuesCopy[i] = strings.Clone(v)
79104
}
80105

81-
// TODO: add limits on number of groups
82-
point[key] = &groupState{
106+
state = &groupState{
83107
labelValues: labelValuesCopy,
84-
value: value,
85108
}
109+
110+
// set initial values based on aggregation type
111+
switch a.operation {
112+
case aggregationOperationSum, aggregationOperationMax, aggregationOperationMin:
113+
state.value = value
114+
case aggregationOperationCount:
115+
state.value = 1
116+
}
117+
118+
// TODO: add limits on number of groups
119+
point[key] = state
86120
}
87121
}
88122

@@ -97,9 +131,9 @@ func (a *aggregator) BuildRecord() (arrow.Record, error) {
97131
},
98132
arrow.Field{
99133
Name: types.ColumnNameGeneratedValue,
100-
Type: datatype.Arrow.Integer,
134+
Type: datatype.Arrow.Float,
101135
Nullable: false,
102-
Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Integer),
136+
Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Float),
103137
},
104138
)
105139

@@ -127,7 +161,7 @@ func (a *aggregator) BuildRecord() (arrow.Record, error) {
127161

128162
for _, entry := range a.points[ts] {
129163
rb.Field(0).(*array.TimestampBuilder).Append(tsValue)
130-
rb.Field(1).(*array.Int64Builder).Append(entry.value)
164+
rb.Field(1).(*array.Float64Builder).Append(entry.value)
131165

132166
for col, val := range entry.labelValues {
133167
builder := rb.Field(col + 2) // offset by 2 as the first 2 fields are timestamp and value

0 commit comments

Comments
 (0)