@@ -5,13 +5,11 @@ package main
55
66import (
77 "fmt"
8- "math"
98 "net/http"
109 "strings"
1110 "time"
1211
1312 extflag "github.com/efficientgo/tools/extkingpin"
14-
1513 "github.com/go-kit/log"
1614 "github.com/go-kit/log/level"
1715 grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
@@ -22,8 +20,6 @@ import (
2220 "github.com/prometheus/common/route"
2321 "github.com/prometheus/prometheus/model/labels"
2422 "github.com/prometheus/prometheus/promql"
25- "github.com/thanos-io/promql-engine/api"
26- "github.com/thanos-io/promql-engine/engine"
2723
2824 apiv1 "github.com/thanos-io/thanos/pkg/api/query"
2925 "github.com/thanos-io/thanos/pkg/api/query/querypb"
@@ -61,13 +57,6 @@ const (
6157 queryPushdown = "query-pushdown"
6258)
6359
64- type queryMode string
65-
66- const (
67- queryModeLocal queryMode = "local"
68- queryModeDistributed queryMode = "distributed"
69- )
70-
7160// registerQuery registers a query command.
7261func registerQuery (app * extkingpin.App ) {
7362 comp := component .Query
@@ -93,8 +82,8 @@ func registerQuery(app *extkingpin.App) {
9382 Enum (string (apiv1 .PromqlEnginePrometheus ), string (apiv1 .PromqlEngineThanos ))
9483 extendedFunctionsEnabled := cmd .Flag ("query.enable-x-functions" , "Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine." ).Default ("false" ).Bool ()
9584 promqlQueryMode := cmd .Flag ("query.mode" , "PromQL query mode. One of: local, distributed." ).
96- Default (string (queryModeLocal )).
97- Enum (string (queryModeLocal ), string (queryModeDistributed ))
85+ Default (string (apiv1 . PromqlQueryModeLocal )).
86+ Enum (string (apiv1 . PromqlQueryModeLocal ), string (apiv1 . PromqlQueryModeDistributed ))
9887
9988 maxConcurrentQueries := cmd .Flag ("query.max-concurrent" , "Maximum number of queries processed concurrently by query node." ).
10089 Default ("20" ).Int ()
@@ -257,6 +246,11 @@ func registerQuery(app *extkingpin.App) {
257246 return err
258247 }
259248
249+ if * promqlQueryMode != string (apiv1 .PromqlQueryModeLocal ) {
250+ level .Info (logger ).Log ("msg" , "Distributed query mode enabled, using Thanos as the default query engine." )
251+ * defaultEngine = string (apiv1 .PromqlEngineThanos )
252+ }
253+
260254 endpointSet , err := setupEndpointSet (
261255 g ,
262256 comp ,
@@ -325,11 +319,11 @@ func registerQuery(app *extkingpin.App) {
325319 * queryTelemetryDurationQuantiles ,
326320 * queryTelemetrySamplesQuantiles ,
327321 * queryTelemetrySeriesQuantiles ,
328- * defaultEngine ,
329322 storeRateLimits ,
330323 * extendedFunctionsEnabled ,
331324 store .NewTSDBSelector (tsdbSelector ),
332- queryMode (* promqlQueryMode ),
325+ apiv1 .PromqlEngineType (* defaultEngine ),
326+ apiv1 .PromqlQueryMode (* promqlQueryMode ),
333327 * tenantHeader ,
334328 * defaultTenant ,
335329 * tenantCertField ,
@@ -386,11 +380,11 @@ func runQuery(
386380 queryTelemetryDurationQuantiles []float64 ,
387381 queryTelemetrySamplesQuantiles []float64 ,
388382 queryTelemetrySeriesQuantiles []float64 ,
389- defaultEngine string ,
390383 storeRateLimits store.SeriesSelectLimits ,
391384 extendedFunctionsEnabled bool ,
392385 tsdbSelector * store.TSDBSelector ,
393- queryMode queryMode ,
386+ defaultEngine apiv1.PromqlEngineType ,
387+ queryMode apiv1.PromqlQueryMode ,
394388 tenantHeader string ,
395389 defaultTenant string ,
396390 tenantCertField string ,
@@ -429,6 +423,14 @@ func runQuery(
429423 maxConcurrentSelects ,
430424 queryTimeout ,
431425 )
426+ remoteEndpointsCreator = query .NewRemoteEndpointsCreator (
427+ logger ,
428+ endpointSet .GetQueryAPIClients ,
429+ queryPartitionLabels ,
430+ queryTimeout ,
431+ queryDistributedWithOverlappingInterval ,
432+ enableAutodownsampling ,
433+ )
432434 )
433435
434436 grpcProbe := prober .NewGRPC ()
@@ -439,48 +441,25 @@ func runQuery(
439441 prober .NewInstrumentation (comp , logger , extprom .WrapRegistererWithPrefix ("thanos_" , reg )),
440442 )
441443
442- engineOpts := engine.Opts {
443- EngineOpts : promql.EngineOpts {
444- Logger : logutil .GoKitLogToSlog (logger ),
445- Reg : reg ,
446- // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
447- MaxSamples : math .MaxInt32 ,
448- Timeout : queryTimeout ,
449- LookbackDelta : lookbackDelta ,
450- NoStepSubqueryIntervalFn : func (int64 ) int64 {
451- return defaultEvaluationInterval .Milliseconds ()
452- },
453- EnableNegativeOffset : true ,
454- EnableAtModifier : true ,
455- },
456- EnablePartialResponses : enableQueryPartialResponse ,
457- EnableXFunctions : extendedFunctionsEnabled ,
458- EnableAnalysis : true ,
459- }
460-
461444 // An active query tracker will be added only if the user specifies a non-default path.
462445 // Otherwise, the nil active query tracker from existing engine options will be used.
446+ var activeQueryTracker * promql.ActiveQueryTracker
463447 if activeQueryDir != "" {
464- engineOpts . ActiveQueryTracker = promql .NewActiveQueryTracker (activeQueryDir , maxConcurrentQueries , logutil .GoKitLogToSlog (logger ))
448+ activeQueryTracker = promql .NewActiveQueryTracker (activeQueryDir , maxConcurrentQueries , logutil .GoKitLogToSlog (logger ))
465449 }
466450
467- var remoteEngineEndpoints api.RemoteEndpoints
468- if queryMode != queryModeLocal {
469- level .Info (logger ).Log ("msg" , "Distributed query mode enabled, using Thanos as the default query engine." )
470- defaultEngine = string (apiv1 .PromqlEngineThanos )
471- remoteEngineEndpoints = query .NewRemoteEndpoints (logger , endpointSet .GetQueryAPIClients , query.Opts {
472- AutoDownsample : enableAutodownsampling ,
473- ReplicaLabels : queryReplicaLabels ,
474- PartitionLabels : queryPartitionLabels ,
475- Timeout : queryTimeout ,
476- EnablePartialResponse : enableQueryPartialResponse ,
477- QueryDistributedWithOverlappingInterval : queryDistributedWithOverlappingInterval ,
478- })
479- }
480-
481- engineFactory := apiv1 .NewQueryFactory (engineOpts , remoteEngineEndpoints )
451+ queryCreator := apiv1 .NewQueryFactory (
452+ reg ,
453+ logger ,
454+ queryTimeout ,
455+ lookbackDelta ,
456+ defaultEvaluationInterval ,
457+ extendedFunctionsEnabled ,
458+ activeQueryTracker ,
459+ queryMode ,
460+ )
482461
483- lookbackDeltaCreator := LookbackDeltaFactory (engineOpts . EngineOpts , dynamicLookbackDelta )
462+ lookbackDeltaCreator := LookbackDeltaFactory (lookbackDelta , dynamicLookbackDelta )
484463
485464 // Start query API + UI HTTP server.
486465 {
@@ -510,10 +489,11 @@ func runQuery(
510489 api := apiv1 .NewQueryAPI (
511490 logger ,
512491 endpointSet .GetEndpointStatus ,
513- engineFactory ,
492+ queryCreator ,
514493 apiv1 .PromqlEngineType (defaultEngine ),
515494 lookbackDeltaCreator ,
516495 queryableCreator ,
496+ remoteEndpointsCreator ,
517497 // NOTE: Will share the same replica label as the query for now.
518498 rules .NewGRPCClientWithDedup (rulesProxy , queryReplicaLabels ),
519499 targets .NewGRPCClientWithDedup (targetsProxy , queryReplicaLabels ),
@@ -600,8 +580,8 @@ func runQuery(
600580 info .WithQueryAPIInfoFunc (),
601581 )
602582
603- defaultEngineType := querypb .EngineType (querypb .EngineType_value [defaultEngine ])
604- grpcAPI := apiv1 .NewGRPCAPI (time .Now , queryReplicaLabels , queryableCreator , engineFactory , defaultEngineType , lookbackDeltaCreator , instantDefaultMaxSourceResolution )
583+ defaultEngineType := querypb .EngineType (querypb .EngineType_value [string ( defaultEngine ) ])
584+ grpcAPI := apiv1 .NewGRPCAPI (time .Now , queryReplicaLabels , queryableCreator , remoteEndpointsCreator , queryCreator , defaultEngineType , lookbackDeltaCreator , instantDefaultMaxSourceResolution )
605585 s := grpcserver .New (logger , reg , tracer , grpcLogOpts , logFilterMethods , comp , grpcProbe ,
606586 grpcserver .WithServer (apiv1 .RegisterQueryServer (grpcAPI )),
607587 grpcserver .WithServer (store .RegisterStoreServer (seriesProxy , logger )),
@@ -634,7 +614,7 @@ func runQuery(
634614// dynamicLookbackDelta and eo.LookbackDelta and returns a function
635615// that returns appropriate lookback delta for given maxSourceResolutionMillis.
636616func LookbackDeltaFactory (
637- eo promql. EngineOpts ,
617+ lookbackDelta time. Duration ,
638618 dynamicLookbackDelta bool ,
639619) func (int64 ) time.Duration {
640620 resolutions := []int64 {downsample .ResLevel0 }
@@ -643,10 +623,9 @@ func LookbackDeltaFactory(
643623 }
644624 var (
645625 lds = make ([]time.Duration , len (resolutions ))
646- ld = eo . LookbackDelta .Milliseconds ()
626+ ld = lookbackDelta .Milliseconds ()
647627 )
648628
649- lookbackDelta := eo .LookbackDelta
650629 for i , r := range resolutions {
651630 if ld < r {
652631 lookbackDelta = time .Duration (r ) * time .Millisecond
0 commit comments