Skip to content

Commit 75e5193

Browse files
authored
chore(engine): expose new scheduler and worker (grafana#19633)
Signed-off-by: Robert Fratto <robertfratto@gmail.com>
1 parent 8b39d4d commit 75e5193

File tree

16 files changed

+948
-248
lines changed

16 files changed

+948
-248
lines changed

docs/sources/shared/configuration.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5118,6 +5118,11 @@ engine_v2:
51185118
# CLI flag: -querier.engine-v2.range-reads.min-range-size
51195119
[min_range_size: <int> | default = 1048576]
51205120
5121+
# Experimental: Number of worker threads to spawn. Each worker thread runs one
5122+
# task at a time. 0 means to use GOMAXPROCS value.
5123+
# CLI flag: -querier.engine-v2.worker-threads
5124+
[worker_threads: <int> | default = 0]
5125+
51215126
# The maximum number of queries that can be simultaneously processed by the
51225127
# querier.
51235128
# CLI flag: -querier.max-concurrent

pkg/engine/basic_engine.go

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
package engine
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
"github.com/pkg/errors"
12+
"github.com/prometheus/client_golang/prometheus"
13+
"github.com/thanos-io/objstore"
14+
"go.opentelemetry.io/otel"
15+
"go.opentelemetry.io/otel/attribute"
16+
"go.opentelemetry.io/otel/codes"
17+
"go.opentelemetry.io/otel/trace"
18+
19+
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
20+
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
21+
"github.com/grafana/loki/v3/pkg/engine/internal/planner/logical"
22+
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
23+
"github.com/grafana/loki/v3/pkg/logql"
24+
"github.com/grafana/loki/v3/pkg/logql/syntax"
25+
"github.com/grafana/loki/v3/pkg/logqlmodel"
26+
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
27+
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
28+
"github.com/grafana/loki/v3/pkg/util/httpreq"
29+
utillog "github.com/grafana/loki/v3/pkg/util/log"
30+
"github.com/grafana/loki/v3/pkg/util/rangeio"
31+
)
32+
33+
var tracer = otel.Tracer("pkg/engine")
34+
35+
var ErrNotSupported = errors.New("feature not supported in new query engine")
36+
37+
// NewBasic creates a new instance of the basic query engine that implements the
38+
// [logql.Engine] interface. The basic engine executes plans sequentially with
39+
// no local or distributed parallelism.
40+
func NewBasic(cfg ExecutorConfig, metastoreCfg metastore.Config, bucket objstore.Bucket, limits logql.Limits, reg prometheus.Registerer, logger log.Logger) *Basic {
41+
var ms metastore.Metastore
42+
if bucket != nil {
43+
indexBucket := bucket
44+
if metastoreCfg.IndexStoragePrefix != "" {
45+
indexBucket = objstore.NewPrefixedBucket(bucket, metastoreCfg.IndexStoragePrefix)
46+
}
47+
ms = metastore.NewObjectMetastore(indexBucket, logger, reg)
48+
}
49+
50+
if cfg.BatchSize <= 0 {
51+
panic(fmt.Sprintf("invalid batch size for query engine. must be greater than 0, got %d", cfg.BatchSize))
52+
}
53+
if cfg.RangeConfig.IsZero() {
54+
cfg.RangeConfig = rangeio.DefaultConfig
55+
}
56+
57+
return &Basic{
58+
logger: logger,
59+
metrics: newMetrics(reg),
60+
limits: limits,
61+
metastore: ms,
62+
bucket: bucket,
63+
cfg: cfg,
64+
}
65+
}
66+
67+
// Basic is a basic LogQL evaluation engine. Evaluation is performed
68+
// sequentially, with no local or distributed parallelism.
69+
type Basic struct {
70+
logger log.Logger
71+
metrics *metrics
72+
limits logql.Limits
73+
metastore metastore.Metastore
74+
bucket objstore.Bucket
75+
cfg ExecutorConfig
76+
}
77+
78+
// Query implements [logql.Engine].
79+
func (e *Basic) Query(params logql.Params) logql.Query {
80+
return &queryAdapter{
81+
engine: e,
82+
params: params,
83+
}
84+
}
85+
86+
// Execute executes a LogQL query and returns its results or alternatively an error.
87+
// The execution is done in three steps:
88+
// 1. Create a logical plan from the provided query parameters.
89+
// 2. Create a physical plan from the logical plan using information from the catalog.
90+
// 3. Evaluate the physical plan with the executor.
91+
func (e *Basic) Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error) {
92+
ctx, span := tracer.Start(ctx, "QueryEngine.Execute", trace.WithAttributes(
93+
attribute.String("type", string(logql.GetRangeType(params))),
94+
attribute.String("query", params.QueryString()),
95+
attribute.Stringer("start", params.Start()),
96+
attribute.Stringer("end", params.Start()),
97+
attribute.Stringer("step", params.Step()),
98+
attribute.Stringer("length", params.End().Sub(params.Start())),
99+
attribute.StringSlice("shards", params.Shards()),
100+
))
101+
defer span.End()
102+
103+
var (
104+
startTime = time.Now()
105+
106+
durLogicalPlanning time.Duration
107+
durPhysicalPlanning time.Duration
108+
durExecution time.Duration
109+
)
110+
111+
statsCtx, ctx := stats.NewContext(ctx)
112+
metadataCtx, ctx := metadata.NewContext(ctx)
113+
114+
statsCtx.SetQueryUsedV2Engine()
115+
116+
logger := utillog.WithContext(ctx, e.logger)
117+
logger = log.With(logger, "query", params.QueryString(), "shard", strings.Join(params.Shards(), ","), "engine", "v2")
118+
119+
// Inject the range config into the context for any calls to
120+
// [rangeio.ReadRanges] to make use of.
121+
ctx = rangeio.WithConfig(ctx, &e.cfg.RangeConfig)
122+
123+
logicalPlan, err := func() (*logical.Plan, error) {
124+
_, span := tracer.Start(ctx, "QueryEngine.Execute.logicalPlan")
125+
defer span.End()
126+
127+
timer := prometheus.NewTimer(e.metrics.logicalPlanning)
128+
logicalPlan, err := logical.BuildPlan(params)
129+
if err != nil {
130+
level.Warn(logger).Log("msg", "failed to create logical plan", "err", err)
131+
e.metrics.subqueries.WithLabelValues(statusNotImplemented).Inc()
132+
span.RecordError(err)
133+
span.SetStatus(codes.Error, "failed to create logical plan")
134+
return nil, ErrNotSupported
135+
}
136+
137+
durLogicalPlanning = timer.ObserveDuration()
138+
level.Info(logger).Log(
139+
"msg", "finished logical planning",
140+
"plan", logicalPlan.String(),
141+
"duration", durLogicalPlanning.String(),
142+
)
143+
span.SetStatus(codes.Ok, "")
144+
return logicalPlan, nil
145+
}()
146+
if err != nil {
147+
span.RecordError(err)
148+
span.SetStatus(codes.Error, "failed to create logical plan")
149+
return logqlmodel.Result{}, err
150+
}
151+
152+
physicalPlan, err := func() (*physical.Plan, error) {
153+
ctx, span := tracer.Start(ctx, "QueryEngine.Execute.physicalPlan")
154+
defer span.End()
155+
156+
timer := prometheus.NewTimer(e.metrics.physicalPlanning)
157+
158+
catalog := physical.NewMetastoreCatalog(ctx, e.metastore)
159+
planner := physical.NewPlanner(physical.NewContext(params.Start(), params.End()), catalog)
160+
plan, err := planner.Build(logicalPlan)
161+
if err != nil {
162+
level.Warn(logger).Log("msg", "failed to create physical plan", "err", err)
163+
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
164+
span.RecordError(err)
165+
span.SetStatus(codes.Error, "failed to create physical plan")
166+
return nil, ErrNotSupported
167+
}
168+
169+
plan, err = planner.Optimize(plan)
170+
if err != nil {
171+
level.Warn(logger).Log("msg", "failed to optimize physical plan", "err", err)
172+
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
173+
span.RecordError(err)
174+
span.SetStatus(codes.Error, "failed to optimize physical plan")
175+
return nil, ErrNotSupported
176+
}
177+
178+
durPhysicalPlanning = timer.ObserveDuration()
179+
level.Info(logger).Log(
180+
"msg", "finished physical planning",
181+
"plan", physical.PrintAsTree(plan),
182+
"duration", durPhysicalPlanning.String(),
183+
)
184+
span.SetStatus(codes.Ok, "")
185+
return plan, nil
186+
}()
187+
if err != nil {
188+
span.RecordError(err)
189+
span.SetStatus(codes.Error, "failed to create physical plan")
190+
return logqlmodel.Result{}, err
191+
}
192+
193+
builder, err := func() (ResultBuilder, error) {
194+
ctx, span := tracer.Start(ctx, "QueryEngine.Execute.Process")
195+
defer span.End()
196+
197+
level.Info(logger).Log("msg", "start executing query with new engine")
198+
199+
timer := prometheus.NewTimer(e.metrics.execution)
200+
201+
cfg := executor.Config{
202+
BatchSize: int64(e.cfg.BatchSize),
203+
MergePrefetchCount: e.cfg.MergePrefetchCount,
204+
Bucket: e.bucket,
205+
}
206+
pipeline := executor.Run(ctx, cfg, physicalPlan, logger)
207+
defer pipeline.Close()
208+
209+
var builder ResultBuilder
210+
switch params.GetExpression().(type) {
211+
case syntax.LogSelectorExpr:
212+
builder = newStreamsResultBuilder()
213+
case syntax.SampleExpr:
214+
if params.Step() > 0 {
215+
builder = newMatrixResultBuilder()
216+
} else {
217+
builder = newVectorResultBuilder()
218+
}
219+
default:
220+
// should never happen as we already check the expression type in the logical planner
221+
panic(fmt.Sprintf("failed to execute. Invalid exprression type (%T)", params.GetExpression()))
222+
}
223+
224+
if err := collectResult(ctx, pipeline, builder); err != nil {
225+
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
226+
span.RecordError(err)
227+
span.SetStatus(codes.Error, "failed to build results")
228+
return nil, err
229+
}
230+
231+
durExecution = timer.ObserveDuration()
232+
e.metrics.subqueries.WithLabelValues(statusSuccess).Inc()
233+
span.SetStatus(codes.Ok, "")
234+
return builder, nil
235+
}()
236+
if err != nil {
237+
span.RecordError(err)
238+
span.SetStatus(codes.Error, "failed to build results")
239+
return logqlmodel.Result{}, err
240+
}
241+
242+
durFull := time.Since(startTime)
243+
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
244+
stats := statsCtx.Result(durFull, queueTime, builder.Len())
245+
246+
level.Debug(logger).Log(
247+
"msg", "finished executing with new engine",
248+
"duration_logical_planning", durLogicalPlanning,
249+
"duration_physical_planning", durPhysicalPlanning,
250+
"duration_execution", durExecution,
251+
"duration_full", durFull,
252+
)
253+
254+
metadataCtx.AddWarning("Query was executed using the new experimental query engine and dataobj storage.")
255+
span.SetStatus(codes.Ok, "")
256+
return builder.Build(stats, metadataCtx), nil
257+
}
258+
259+
func IsQuerySupported(params logql.Params) bool {
260+
_, err := logical.BuildPlan(params)
261+
return err == nil
262+
}
263+
264+
func collectResult(ctx context.Context, pipeline executor.Pipeline, builder ResultBuilder) error {
265+
for {
266+
rec, err := pipeline.Read(ctx)
267+
if err != nil {
268+
if errors.Is(err, executor.EOF) {
269+
break
270+
}
271+
return err
272+
}
273+
274+
builder.CollectRecord(rec)
275+
}
276+
return nil
277+
}
278+
279+
var _ logql.Engine = (*Basic)(nil)
280+
281+
// queryAdapter dispatches query execution to the wrapped engine.
282+
type queryAdapter struct {
283+
params logql.Params
284+
engine *Basic
285+
}
286+
287+
// Exec implements [logql.Query].
288+
func (q *queryAdapter) Exec(ctx context.Context) (logqlmodel.Result, error) {
289+
return q.engine.Execute(ctx, q.params)
290+
}
291+
292+
var _ logql.Query = (*queryAdapter)(nil)

pkg/engine/config.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package engine
2+
3+
import "flag"
4+
5+
// Config holds the configuration options to use with the next generation Loki
6+
// Query Engine.
7+
type Config struct {
8+
// Enable the next generation Loki Query Engine for supported queries.
9+
Enable bool `yaml:"enable" category:"experimental"`
10+
11+
Executor ExecutorConfig `yaml:",inline"`
12+
Worker WorkerConfig `yaml:",inline"`
13+
}
14+
15+
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
16+
f.BoolVar(&cfg.Enable, prefix+"enable", false, "Experimental: Enable next generation query engine for supported queries.")
17+
18+
cfg.Executor.RegisterFlagsWithPrefix(prefix, f)
19+
cfg.Worker.RegisterFlagsWithPrefix(prefix, f)
20+
}

0 commit comments

Comments
 (0)