Skip to content

Commit ce21ca7

Browse files
committed
refactor: create a new prometheus registry for every server
1 parent 1d5d2c8 commit ce21ca7

20 files changed

Lines changed: 148 additions & 167 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
2222
- Tracing: When server is shutting down, flush traces. Also, elide the need for setting `OTEL_EXPORTER_OTLP_ENDPOINT`. (https://github.com/authzed/spicedb/pull/3108)
2323
- Fixed a LookupSubjects issue in the query planner around the handling of wildcards in compound permissions (https://github.com/authzed/spicedb/pull/3140)
2424
- MySQL: identifiers (object/subject IDs and relationship counter names) are now stored with a case-sensitive (binary) collation, matching the Postgres, CockroachDB, and Spanner datastores. Previously, identifiers differing only in letter case (e.g. `Foo` and `foo`) incorrectly collided in unique indexes and lookups. ⚠️ The migration rebuilds the `relation_tuple` table in place via `ALTER TABLE`, which can hold a metadata/table lock for a long time on large datasets — run the upgrade in a low-traffic window, or apply it with an online schema-change tool (e.g. gh-ost). (https://github.com/authzed/spicedb/pull/3161)
25-
- `server.NewConfigWithOptionsAndDefaults` now populates `Config` and its embedded structs with the same defaults as the CLI flags, fixing zero-value behavior when embedding SpiceDB as a library. (https://github.com/authzed/spicedb/pull/3156)
25+
- `server.NewConfigWithOptionsAndDefaults` now populates `Config` and its embedded structs with the same defaults as the CLI flags, fixing zero-value behavior when embedding SpiceDB as a library. (https://github.com/authzed/spicedb/pull/3156, https://github.com/authzed/spicedb/pull/3170)
2626

2727
## [1.53.0] - 2026-05-13
2828
### Added

internal/datastore/proxy/schemacaching/watchingcache_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ func TestWatchingCachingProxyUnwrap(t *testing.T) {
4040
wcache := NewWatchingCacheProxy(fakeDS, NewDefinitionCachingProxy(fakeDS, nil), 1*time.Hour, 100*time.Millisecond)
4141
unwrapped := wcache.Unwrap()
4242
require.Equal(t, fakeDS, unwrapped)
43+
t.Cleanup(func() {
44+
wcache.Close()
45+
})
4346
}
4447

4548
func TestOldWatchingCacheBasicOperation(t *testing.T) {

internal/dispatch/caching/caching.go

Lines changed: 20 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package caching
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"maps"
78
"sync"
@@ -55,93 +56,73 @@ func DispatchTestCache(t testing.TB) cache.Cache[keys.DispatchCacheKey, any] {
5556

5657
// NewCachingDispatcher creates a new dispatch.Dispatcher which delegates
5758
// dispatch requests and caches the responses when possible and desirable.
58-
func NewCachingDispatcher(cacheInst cache.Cache[keys.DispatchCacheKey, any], metricsEnabled bool, prometheusSubsystem string, keyHandler keys.Handler) (*Dispatcher, error) {
59+
func NewCachingDispatcher(cacheInst cache.Cache[keys.DispatchCacheKey, any], prometheusmetrics dispatch.MetricsOptions, keyHandler keys.Handler) (*Dispatcher, error) {
5960
if cacheInst == nil {
6061
cacheInst = cache.NoopCache[keys.DispatchCacheKey, any]()
6162
}
6263

6364
checkTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
6465
Namespace: prometheusNamespace,
65-
Subsystem: prometheusSubsystem,
66+
Subsystem: prometheusmetrics.PrometheusSubsystem,
6667
Name: "check_total",
6768
Help: "Total number of CheckPermission dispatch requests processed.",
6869
})
6970
checkFromCacheCounter := prometheus.NewCounter(prometheus.CounterOpts{
7071
Namespace: prometheusNamespace,
71-
Subsystem: prometheusSubsystem,
72+
Subsystem: prometheusmetrics.PrometheusSubsystem,
7273
Name: "check_from_cache_total",
7374
Help: "Total number of CheckPermission dispatch requests served directly from the dispatch cache, avoiding re-computation.",
7475
})
7576

7677
lookupResourcesTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
7778
Namespace: prometheusNamespace,
78-
Subsystem: prometheusSubsystem,
79+
Subsystem: prometheusmetrics.PrometheusSubsystem,
7980
Name: "lookup_resources_total",
8081
Help: "Total number of LookupResources dispatch requests processed.",
8182
})
8283
lookupResourcesFromCacheCounter := prometheus.NewCounter(prometheus.CounterOpts{
8384
Namespace: prometheusNamespace,
84-
Subsystem: prometheusSubsystem,
85+
Subsystem: prometheusmetrics.PrometheusSubsystem,
8586
Name: "lookup_resources_from_cache_total",
8687
Help: "Total number of LookupResources dispatch requests served directly from the dispatch cache.",
8788
})
8889

8990
lookupSubjectsTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
9091
Namespace: prometheusNamespace,
91-
Subsystem: prometheusSubsystem,
92+
Subsystem: prometheusmetrics.PrometheusSubsystem,
9293
Name: "lookup_subjects_total",
9394
Help: "Total number of LookupSubjects dispatch requests processed.",
9495
})
9596
lookupSubjectsFromCacheCounter := prometheus.NewCounter(prometheus.CounterOpts{
9697
Namespace: prometheusNamespace,
97-
Subsystem: prometheusSubsystem,
98+
Subsystem: prometheusmetrics.PrometheusSubsystem,
9899
Name: "lookup_subjects_from_cache_total",
99100
Help: "Total number of LookupSubjects dispatch requests served directly from the dispatch cache.",
100101
})
101102

102103
queryPlanTotalCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
103104
Namespace: prometheusNamespace,
104-
Subsystem: prometheusSubsystem,
105+
Subsystem: prometheusmetrics.PrometheusSubsystem,
105106
Name: "query_plan_total",
106107
Help: "Total number of DispatchQueryPlan requests processed, labelled by plan operation.",
107108
}, []string{"operation"})
108109
queryPlanFromCacheCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
109110
Namespace: prometheusNamespace,
110-
Subsystem: prometheusSubsystem,
111+
Subsystem: prometheusmetrics.PrometheusSubsystem,
111112
Name: "query_plan_from_cache_total",
112113
Help: "Total number of DispatchQueryPlan requests served directly from the dispatch cache, labelled by plan operation.",
113114
}, []string{"operation"})
114115

115-
if metricsEnabled && prometheusSubsystem != "" {
116-
err := prometheus.Register(checkTotalCounter)
117-
if err != nil {
118-
return nil, fmt.Errorf(errCachingInitialization, err)
119-
}
120-
err = prometheus.Register(checkFromCacheCounter)
121-
if err != nil {
122-
return nil, fmt.Errorf(errCachingInitialization, err)
123-
}
124-
err = prometheus.Register(lookupResourcesTotalCounter)
125-
if err != nil {
126-
return nil, fmt.Errorf(errCachingInitialization, err)
127-
}
128-
err = prometheus.Register(lookupResourcesFromCacheCounter)
129-
if err != nil {
130-
return nil, fmt.Errorf(errCachingInitialization, err)
131-
}
132-
err = prometheus.Register(lookupSubjectsTotalCounter)
133-
if err != nil {
134-
return nil, fmt.Errorf(errCachingInitialization, err)
135-
}
136-
err = prometheus.Register(lookupSubjectsFromCacheCounter)
137-
if err != nil {
138-
return nil, fmt.Errorf(errCachingInitialization, err)
139-
}
140-
err = prometheus.Register(queryPlanTotalCounter)
141-
if err != nil {
142-
return nil, fmt.Errorf(errCachingInitialization, err)
143-
}
144-
err = prometheus.Register(queryPlanFromCacheCounter)
116+
if prometheusmetrics.Enabled() {
117+
err1 := prometheusmetrics.PrometheusRegistry.Register(checkTotalCounter)
118+
err2 := prometheusmetrics.PrometheusRegistry.Register(checkFromCacheCounter)
119+
err3 := prometheusmetrics.PrometheusRegistry.Register(lookupResourcesTotalCounter)
120+
err4 := prometheusmetrics.PrometheusRegistry.Register(lookupResourcesFromCacheCounter)
121+
err5 := prometheusmetrics.PrometheusRegistry.Register(lookupSubjectsTotalCounter)
122+
err6 := prometheusmetrics.PrometheusRegistry.Register(lookupSubjectsFromCacheCounter)
123+
err7 := prometheusmetrics.PrometheusRegistry.Register(queryPlanTotalCounter)
124+
err8 := prometheusmetrics.PrometheusRegistry.Register(queryPlanFromCacheCounter)
125+
err := errors.Join(err1, err2, err3, err4, err5, err6, err7, err8)
145126
if err != nil {
146127
return nil, fmt.Errorf(errCachingInitialization, err)
147128
}

internal/dispatch/caching/cachingdispatch_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func TestMaxDepthCaching(t *testing.T) {
125125
}
126126
}
127127

128-
dispatch, err := NewCachingDispatcher(DispatchTestCache(t), false, "", nil)
128+
dispatch, err := NewCachingDispatcher(DispatchTestCache(t), dispatch.MetricsOptions{}, nil)
129129
dispatch.SetDelegate(delegate)
130130
require.NoError(err)
131131
defer dispatch.Close()
@@ -177,7 +177,7 @@ func TestConcurrentDebugInfoAccess(t *testing.T) {
177177
},
178178
}, nil)
179179

180-
dispatcher, err := NewCachingDispatcher(DispatchTestCache(t), false, "", nil)
180+
dispatcher, err := NewCachingDispatcher(DispatchTestCache(t), dispatch.MetricsOptions{}, nil)
181181
require.NoError(err)
182182
dispatcher.SetDelegate(delegate)
183183
t.Cleanup(func() {
@@ -294,8 +294,9 @@ func TestDispatchQueryPlanRecordsCachingMetrics(t *testing.T) {
294294
// Use a unique subsystem so we don't collide with other tests in the global
295295
// default Prometheus registry.
296296
subsystem := fmt.Sprintf("test_caching_%d", time.Now().UnixNano())
297+
reg := prometheus.NewRegistry()
297298

298-
dispatcher, err := NewCachingDispatcher(DispatchTestCache(t), true, subsystem, nil)
299+
dispatcher, err := NewCachingDispatcher(DispatchTestCache(t), dispatch.MetricsOptions{PrometheusSubsystem: subsystem, PrometheusRegistry: reg}, nil)
299300
require.NoError(t, err)
300301
t.Cleanup(func() { _ = dispatcher.Close() })
301302

@@ -326,16 +327,14 @@ func TestDispatchQueryPlanRecordsCachingMetrics(t *testing.T) {
326327
require.NoError(t, dispatcher.DispatchQueryPlan(req, stream2))
327328
require.Len(t, stream2.Results(), 1)
328329

329-
gatherer := prometheus.DefaultGatherer
330-
331-
totalCheck := sumOperationCounter(t, gatherer, "spicedb_"+subsystem+"_query_plan_total", "check")
330+
totalCheck := sumOperationCounter(t, reg, "spicedb_"+subsystem+"_query_plan_total", "check")
332331
require.InEpsilon(t, float64(2), totalCheck, 1e-9, "query_plan_total{operation=check} should bump for each plan dispatch")
333332

334-
fromCacheCheck := sumOperationCounter(t, gatherer, "spicedb_"+subsystem+"_query_plan_from_cache_total", "check")
333+
fromCacheCheck := sumOperationCounter(t, reg, "spicedb_"+subsystem+"_query_plan_from_cache_total", "check")
335334
require.InEpsilon(t, float64(1), fromCacheCheck, 1e-9, "query_plan_from_cache_total{operation=check} should bump only on cache hit")
336335

337336
// Ensure the plan path no longer inflates the classic DispatchCheck counter.
338-
checkTotal := sumPlainCounter(t, gatherer, "spicedb_"+subsystem+"_check_total")
337+
checkTotal := sumPlainCounter(t, reg, "spicedb_"+subsystem+"_check_total")
339338
require.Zero(t, checkTotal, "check_total must not be incremented from the plan path")
340339

341340
// The delegate is only consulted on the cache miss.

internal/dispatch/cluster/cluster.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ import (
1818
type Option func(*optionState)
1919

2020
type optionState struct {
21-
metricsEnabled bool
22-
prometheusSubsystem string
21+
metrics dispatch.MetricsOptions
22+
2323
cache cache.Cache[keys.DispatchCacheKey, any]
2424
concurrencyLimits graph.ConcurrencyLimits
2525
remoteDispatchTimeout time.Duration
@@ -38,17 +38,10 @@ func QueryPlanMetadata(m *query.QueryPlanMetadata) Option {
3838
}
3939
}
4040

41-
// MetricsEnabled enables issuing prometheus metrics
42-
func MetricsEnabled(enabled bool) Option {
43-
return func(state *optionState) {
44-
state.metricsEnabled = enabled
45-
}
46-
}
47-
48-
// PrometheusSubsystem sets the subsystem name for the prometheus metrics
49-
func PrometheusSubsystem(name string) Option {
41+
// Metrics sets the prometheus metrics
42+
func Metrics(reg dispatch.MetricsOptions) Option {
5043
return func(state *optionState) {
51-
state.prometheusSubsystem = name
44+
state.metrics = reg
5245
}
5346
}
5447

@@ -155,11 +148,11 @@ func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (disp
155148
return nil, fmt.Errorf("failed to create cluster dispatcher: %w", err)
156149
}
157150

158-
if opts.prometheusSubsystem == "" {
159-
opts.prometheusSubsystem = "dispatch"
151+
if opts.metrics.PrometheusSubsystem == "" {
152+
opts.metrics.PrometheusSubsystem = "dispatch"
160153
}
161154

162-
cachingClusterDispatch, err := caching.NewCachingDispatcher(opts.cache, opts.metricsEnabled, opts.prometheusSubsystem, &keys.CanonicalKeyHandler{})
155+
cachingClusterDispatch, err := caching.NewCachingDispatcher(opts.cache, opts.metrics, &keys.CanonicalKeyHandler{})
163156
if err != nil {
164157
return nil, err
165158
}

internal/dispatch/combined/combined.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ import (
2626
type Option func(*optionState)
2727

2828
type optionState struct {
29-
metricsEnabled bool
30-
prometheusSubsystem string
29+
metrics dispatch.MetricsOptions
3130
upstreamAddr string
3231
upstreamCAPath string
3332
grpcPresharedKey string
@@ -55,17 +54,10 @@ func QueryPlanMetadata(m *query.QueryPlanMetadata) Option {
5554
}
5655
}
5756

58-
// MetricsEnabled enables issuing prometheus metrics
59-
func MetricsEnabled(enabled bool) Option {
57+
// Metrics sets the prometheus metrics
58+
func Metrics(reg dispatch.MetricsOptions) Option {
6059
return func(state *optionState) {
61-
state.metricsEnabled = enabled
62-
}
63-
}
64-
65-
// PrometheusSubsystem sets the subsystem name for the prometheus metrics
66-
func PrometheusSubsystem(name string) Option {
67-
return func(state *optionState) {
68-
state.prometheusSubsystem = name
60+
state.metrics = reg
6961
}
7062
}
7163

@@ -202,11 +194,11 @@ func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) {
202194
opts := newOptions(options...)
203195
log.Debug().Str("upstream", opts.upstreamAddr).Msg("configured combined dispatcher")
204196

205-
if opts.prometheusSubsystem == "" {
206-
opts.prometheusSubsystem = "dispatch_client"
197+
if opts.metrics.PrometheusSubsystem == "" {
198+
opts.metrics.PrometheusSubsystem = "dispatch_client"
207199
}
208200

209-
cachingRedispatch, err := caching.NewCachingDispatcher(opts.cache, opts.metricsEnabled, opts.prometheusSubsystem, &keys.CanonicalKeyHandler{})
201+
cachingRedispatch, err := caching.NewCachingDispatcher(opts.cache, opts.metrics, &keys.CanonicalKeyHandler{})
210202
if err != nil {
211203
return nil, err
212204
}

internal/dispatch/combined/combined_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import (
44
"testing"
55
"time"
66

7+
"github.com/prometheus/client_golang/prometheus"
78
"github.com/stretchr/testify/require"
89

910
"github.com/authzed/spicedb/internal/datastore/dsfortesting"
1011
"github.com/authzed/spicedb/internal/datastore/memdb"
12+
"github.com/authzed/spicedb/internal/dispatch"
1113
"github.com/authzed/spicedb/internal/dispatch/graph"
1214
"github.com/authzed/spicedb/internal/dispatch/keys"
1315
"github.com/authzed/spicedb/internal/testfixtures"
@@ -84,9 +86,9 @@ func TestNewDispatcher_AppliesAllOptions_NoUpstream(t *testing.T) {
8486

8587
concurrencyLimits := graph.ConcurrencyLimits{Check: 10, LookupResources: 5}
8688

89+
reg := prometheus.NewRegistry()
8790
options := []Option{
88-
MetricsEnabled(true),
89-
PrometheusSubsystem("test_subsystem"),
91+
Metrics(dispatch.MetricsOptions{PrometheusSubsystem: "test_subsystem", PrometheusRegistry: reg}),
9092
DispatchChunkSize(50),
9193
RelationshipChunkCacheConfig(cacheConfig),
9294
CaveatTypeSet(caveattypes.Default.TypeSet),
@@ -96,8 +98,8 @@ func TestNewDispatcher_AppliesAllOptions_NoUpstream(t *testing.T) {
9698

9799
// Field-level assertions: each setter wrote to the right slot.
98100
opts := newOptions(options...)
99-
require.True(t, opts.metricsEnabled)
100-
require.Equal(t, "test_subsystem", opts.prometheusSubsystem)
101+
require.Equal(t, reg, opts.metrics.PrometheusRegistry)
102+
require.Equal(t, "test_subsystem", opts.metrics.PrometheusSubsystem)
101103
require.Equal(t, uint16(50), opts.dispatchChunkSize)
102104
require.Same(t, cacheConfig, opts.relationshipChunkCacheConfig)
103105
require.Same(t, caveattypes.Default.TypeSet, opts.caveatTypeSet)

internal/dispatch/graph/check_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,15 +2068,15 @@ func newLocalDispatcher(t testing.TB) (context.Context, dispatch.Dispatcher, dat
20682068

20692069
ds, revision := testfixtures.StandardDatastoreWithData(t, rawDS)
20702070

2071-
dispatch, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
2071+
dispatcher, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
20722072
require.NoError(t, err)
20732073
t.Cleanup(func() {
2074-
_ = dispatch.Close()
2074+
_ = dispatcher.Close()
20752075
})
20762076

2077-
cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{})
2077+
cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), dispatch.MetricsOptions{}, &keys.CanonicalKeyHandler{})
20782078
require.NoError(t, err)
2079-
cachingDispatcher.SetDelegate(dispatch)
2079+
cachingDispatcher.SetDelegate(dispatcher)
20802080
t.Cleanup(func() {
20812081
cachingDispatcher.Close()
20822082
})
@@ -2093,15 +2093,15 @@ func newLocalDispatcherWithSchemaAndRels(t testing.TB, schema string, rels []tup
20932093

20942094
ds, revision := testfixtures.DatastoreFromSchemaAndTestRelationships(t, rawDS, schema, rels)
20952095

2096-
dispatch, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
2096+
dispatcher, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
20972097
require.NoError(t, err)
20982098
t.Cleanup(func() {
2099-
dispatch.Close()
2099+
dispatcher.Close()
21002100
})
21012101

2102-
cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{})
2102+
cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), dispatch.MetricsOptions{}, &keys.CanonicalKeyHandler{})
21032103
require.NoError(t, err)
2104-
cachingDispatcher.SetDelegate(dispatch)
2104+
cachingDispatcher.SetDelegate(dispatcher)
21052105
t.Cleanup(func() {
21062106
cachingDispatcher.Close()
21072107
})

internal/dispatch/metrics.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package dispatch
2+
3+
import "github.com/prometheus/client_golang/prometheus"
4+
5+
// MetricsOptions configures how a dispatcher reports Prometheus metrics.
6+
//
7+
// Metrics are only emitted when both fields are set.
8+
// The zero value disables metrics entirely.
9+
type MetricsOptions struct {
10+
// PrometheusSubsystem is the subsystem name prepended to every metric
11+
// emitted by the dispatcher (e.g. "dispatch" or "dispatch_client"). It
12+
// namespaces the dispatcher's metrics so they don't collide with those of
13+
// other components registered to the same registry.
14+
PrometheusSubsystem string
15+
16+
// PrometheusRegistry is the registry the dispatcher's metrics are registered with.
17+
PrometheusRegistry prometheus.Registerer
18+
}
19+
20+
// Enabled reports whether metrics should be emitted.
21+
func (m *MetricsOptions) Enabled() bool {
22+
return m.PrometheusSubsystem != "" && m.PrometheusRegistry != nil
23+
}

internal/services/integrationtesting/benchmark_dispatchqueryplan_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func (h *dispatchQueryPlanHandle) newCachingDispatcher(b *testing.B) *caching.Di
181181
c, err := cache.NewStandardCache[keys.DispatchCacheKey, any](cacheConfig)
182182
require.NoError(b, err)
183183

184-
cd, err := caching.NewCachingDispatcher(c, false, "bench", &keys.DirectKeyHandler{})
184+
cd, err := caching.NewCachingDispatcher(c, dispatch.MetricsOptions{}, &keys.DirectKeyHandler{})
185185
require.NoError(b, err)
186186

187187
lpd := &localQueryPlanDispatcher{handle: h}

0 commit comments

Comments
 (0)