Skip to content

Commit e427764

Browse files
authored
Mansi/rate limiter (#23274)
* intial changes for adding global rate limit * Adding controller for updating limiter on config-entry update * code refactoring and writing test cases * pipeline fix * reverting changes not required * testcase fix * changing config entry type * adding ConfigEntry.get rpc to default exemption for global rate as for consul k8s its required * updating return condition when global rate is enabled * Adding changelog * Review comment * Adding consistant alias * Fixing tests * Adding nil check
1 parent 7e3ad26 commit e427764

26 files changed

Lines changed: 2157 additions & 41 deletions

.changelog/23274.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:feature
2+
Global Rate Limiter: a new "rate-limit" config entry kind that enables dynamic, cluster-wide RPC rate limiting stored in Raft and automatically replicated to all servers. This allows operators to apply or adjust global rate limits at runtime without restarting Consul servers — a critical capability for emergency scenarios where the cluster is under excessive load.
3+
```

agent/consul/fsm/decode_downgrade.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,8 @@ func MakeShadowConfigEntry(kind, name string) (structs.ConfigEntry, error) {
593593
switch kind {
594594
case structs.RateLimitIPConfig:
595595
return nil, ErrDroppingTenantedReq
596+
case structs.RateLimit:
597+
return &ShadowGlobalRateLimitConfigEntry{GlobalRateLimitConfigEntry: &structs.GlobalRateLimitConfigEntry{Name: name}}, nil
596598
case structs.ServiceDefaults:
597599
return &ShadowServiceConfigEntry{ServiceConfigEntry: &structs.ServiceConfigEntry{Name: name}}, nil
598600
case structs.ProxyDefaults:
@@ -1020,3 +1022,12 @@ type ShadowJWTProviderConfigEntry struct {
10201022
func (s ShadowJWTProviderConfigEntry) GetRealConfigEntry() structs.ConfigEntry {
10211023
return s.JWTProviderConfigEntry
10221024
}
1025+
1026+
type ShadowGlobalRateLimitConfigEntry struct {
1027+
ShadowBase
1028+
*structs.GlobalRateLimitConfigEntry
1029+
}
1030+
1031+
func (s ShadowGlobalRateLimitConfigEntry) GetRealConfigEntry() structs.ConfigEntry {
1032+
return s.GlobalRateLimitConfigEntry
1033+
}

agent/consul/fsm/fsm.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,11 @@ func (c *FSM) registerStreamSnapshotHandlers() {
433433
}, true)
434434
panicIfErr(err)
435435

436+
err = c.deps.Publisher.RegisterHandler(state.EventTopicGlobalRateLimit, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
437+
return c.State().GlobalRateLimiterSnapshot(req, buf)
438+
}, true)
439+
panicIfErr(err)
440+
436441
err = c.deps.Publisher.RegisterHandler(state.EventTopicSamenessGroup, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
437442
return c.State().SamenessGroupSnapshot(req, buf)
438443
}, true)
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright (c) HashiCorp, Inc.
2+
// SPDX-License-Identifier: BUSL-1.1
3+
4+
package controller
5+
6+
import (
7+
"context"
8+
9+
"github.com/hashicorp/go-hclog"
10+
11+
"github.com/hashicorp/consul/agent/consul/controller"
12+
"github.com/hashicorp/consul/agent/consul/state"
13+
"github.com/hashicorp/consul/agent/consul/stream"
14+
"github.com/hashicorp/consul/agent/structs"
15+
)
16+
17+
//go:generate mockery --name ceReadEntryFunc --inpackage --filename mock_ceReadEntry.go
18+
type ceReadEntryFunc func(k string, n string) (uint64, structs.ConfigEntry, error)
19+
20+
//go:generate mockery --name ceUpdater --inpackage --filename mock_ceUpdater.go
21+
type ceUpdater interface {
22+
UpdateGlobalRateLimitConfig(cfg *structs.GlobalRateLimitConfigEntry)
23+
}
24+
25+
type rateLimiterReconciler struct {
26+
readEntry ceReadEntryFunc
27+
logger hclog.Logger
28+
controller controller.Controller
29+
updater ceUpdater
30+
}
31+
32+
func (r rateLimiterReconciler) Reconcile(ctx context.Context, req controller.Request) error {
33+
switch req.Kind {
34+
case structs.RateLimit:
35+
return reconcileEntry(r.readEntry, requestLogger(r.logger, req), ctx, req, r.updater)
36+
default:
37+
return nil
38+
}
39+
}
40+
41+
func reconcileEntry(readEntry ceReadEntryFunc, logger hclog.Logger, _ context.Context, req controller.Request, updater ceUpdater) error {
42+
_, entry, err := readEntry(req.Kind, req.Name)
43+
if err != nil {
44+
logger.Warn("error fetching config entry for reconciliation request", "error", err)
45+
return err
46+
}
47+
48+
// Entry is deleted — reset to empty config
49+
if entry == nil {
50+
updater.UpdateGlobalRateLimitConfig(nil)
51+
return nil
52+
}
53+
54+
// Update with the actual config entry when it exists
55+
cfg, ok := entry.(*structs.GlobalRateLimitConfigEntry)
56+
if !ok {
57+
logger.Error("failed to cast config entry to GlobalRateLimitConfigEntry",
58+
"entry_type", entry.GetKind())
59+
return nil
60+
}
61+
updater.UpdateGlobalRateLimitConfig(cfg)
62+
return nil
63+
}
64+
65+
// NewRateLimiterController initializes a controller that reconciles rate limiter config
66+
func NewRateLimiterController(readEntry ceReadEntryFunc, publisher state.EventPublisher, logger hclog.Logger, updater ceUpdater) controller.Controller {
67+
reconciler := &rateLimiterReconciler{
68+
readEntry: readEntry,
69+
logger: logger,
70+
updater: updater,
71+
}
72+
reconciler.controller = controller.New(publisher, reconciler).
73+
WithLogger(logger.With("controller", "rateLimiterController"))
74+
return reconciler.controller.Subscribe(
75+
&stream.SubscribeRequest{
76+
Topic: state.EventTopicGlobalRateLimit,
77+
Subject: stream.SubjectWildcard,
78+
},
79+
)
80+
}
81+
82+
// requestLogger returns a logger with request-specific fields.
83+
func requestLogger(logger hclog.Logger, request controller.Request) hclog.Logger {
84+
return logger.With("kind", request.Kind, "name", request.Name)
85+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Copyright (c) HashiCorp, Inc.
2+
// SPDX-License-Identifier: BUSL-1.1
3+
4+
//go:build !consulent
5+
6+
package controller
7+
8+
import (
9+
"context"
10+
"errors"
11+
"testing"
12+
"time"
13+
14+
"github.com/hashicorp/go-hclog"
15+
"github.com/stretchr/testify/mock"
16+
"github.com/stretchr/testify/require"
17+
18+
"github.com/hashicorp/consul/agent/consul/controller"
19+
"github.com/hashicorp/consul/agent/consul/stream"
20+
"github.com/hashicorp/consul/agent/structs"
21+
)
22+
23+
// TestReconcileEntry_Success tests the successful reconciliation of a rate limit entry.
24+
func TestReconcileEntry_Success(t *testing.T) {
25+
logger := hclog.NewNullLogger()
26+
27+
// Create a sample rate limit config entry
28+
cfg := &structs.GlobalRateLimitConfigEntry{
29+
Kind: structs.RateLimit,
30+
Name: "global",
31+
}
32+
33+
mockReadEntry := NewMockceReadEntryFunc()
34+
mockReadEntry.On("Execute", structs.RateLimit, "global").
35+
Return(uint64(1), cfg, nil)
36+
37+
// Mock the updater
38+
mockUpdater := NewMockceUpdater()
39+
mockUpdater.On("UpdateGlobalRateLimitConfig", cfg).Return()
40+
41+
req := controller.Request{
42+
Kind: structs.RateLimit,
43+
Name: "global",
44+
}
45+
46+
// Call reconcileEntry
47+
err := reconcileEntry(mockReadEntry.Execute, logger, context.Background(), req, mockUpdater)
48+
49+
require.NoError(t, err)
50+
mockReadEntry.AssertCalled(t, "Execute", structs.RateLimit, "global")
51+
mockUpdater.AssertCalled(t, "UpdateGlobalRateLimitConfig", cfg)
52+
mockUpdater.AssertNumberOfCalls(t, "UpdateGlobalRateLimitConfig", 1)
53+
}
54+
55+
// TestReconcileEntry_EntryNotFound tests reconciliation when the config entry is not found.
56+
func TestReconcileEntry_EntryNotFound(t *testing.T) {
57+
logger := hclog.NewNullLogger()
58+
59+
// Mock the readEntry function returning nil (entry not found)
60+
mockReadEntry := NewMockceReadEntryFunc()
61+
mockReadEntry.On("Execute", structs.RateLimit, "non-existent").
62+
Return(uint64(0), nil, nil)
63+
64+
// Mock the updater
65+
mockUpdater := NewMockceUpdater()
66+
mockUpdater.On("UpdateGlobalRateLimitConfig", mock.MatchedBy(func(cfg *structs.GlobalRateLimitConfigEntry) bool {
67+
return cfg == nil
68+
})).Return()
69+
70+
req := controller.Request{
71+
Kind: structs.RateLimit,
72+
Name: "non-existent",
73+
}
74+
75+
// Call reconcileEntry
76+
err := reconcileEntry(mockReadEntry.Execute, logger, context.Background(), req, mockUpdater)
77+
78+
require.NoError(t, err)
79+
mockReadEntry.AssertCalled(t, "Execute", structs.RateLimit, "non-existent")
80+
mockUpdater.AssertNumberOfCalls(t, "UpdateGlobalRateLimitConfig", 1)
81+
}
82+
83+
// TestReconcileEntry_ReadError tests reconciliation when reading the config entry fails.
84+
func TestReconcileEntry_ReadError(t *testing.T) {
85+
logger := hclog.NewNullLogger()
86+
87+
// Mock the readEntry function returning an error
88+
expectedErr := errors.New("failed to read from store")
89+
90+
mockReadEntry := NewMockceReadEntryFunc()
91+
mockReadEntry.On("Execute", structs.RateLimit, "global").
92+
Return(uint64(0), nil, expectedErr)
93+
94+
// Mock the updater - should not be called
95+
mockUpdater := NewMockceUpdater()
96+
97+
req := controller.Request{
98+
Kind: structs.RateLimit,
99+
Name: "global",
100+
}
101+
102+
// Call reconcileEntry
103+
err := reconcileEntry(mockReadEntry.Execute, logger, context.Background(), req, mockUpdater)
104+
105+
require.Error(t, err)
106+
require.Equal(t, expectedErr, err)
107+
mockReadEntry.AssertCalled(t, "Execute", structs.RateLimit, "global")
108+
mockUpdater.AssertNotCalled(t, "UpdateGlobalRateLimitConfig")
109+
}
110+
111+
// TestReconcileEntry_InvalidCast tests reconciliation when the entry cannot be cast to GlobalRateLimitConfigEntry.
112+
func TestReconcileEntry_InvalidCast(t *testing.T) {
113+
logger := hclog.NewNullLogger()
114+
115+
// Create a different type of config entry (not GlobalRateLimitConfigEntry)
116+
// We can use a simple string to simulate a non-matching type
117+
invalidEntry := &structs.ProxyConfigEntry{} // Different config entry type
118+
119+
mockReadEntry := NewMockceReadEntryFunc()
120+
mockReadEntry.On("Execute", structs.RateLimit, "global").
121+
Return(uint64(1), invalidEntry, nil)
122+
123+
// Mock the updater - should not be called when cast fails
124+
mockUpdater := NewMockceUpdater()
125+
126+
req := controller.Request{
127+
Kind: structs.RateLimit,
128+
Name: "global",
129+
}
130+
131+
// Call reconcileEntry
132+
err := reconcileEntry(mockReadEntry.Execute, logger, context.Background(), req, mockUpdater)
133+
134+
require.NoError(t, err)
135+
mockReadEntry.AssertCalled(t, "Execute", structs.RateLimit, "global")
136+
mockUpdater.AssertNotCalled(t, "UpdateGlobalRateLimitConfig")
137+
}
138+
139+
// TestReconcilerReconcile_UnknownKind tests the Reconcile method with an unknown kind (should return nil).
140+
func TestReconcilerReconcile_UnknownKind(t *testing.T) {
141+
mockReadEntry := NewMockceReadEntryFunc()
142+
mockUpdater := NewMockceUpdater()
143+
144+
reconciler := &rateLimiterReconciler{
145+
readEntry: mockReadEntry.Execute,
146+
logger: hclog.NewNullLogger(),
147+
updater: mockUpdater,
148+
}
149+
150+
req := controller.Request{
151+
Kind: "unknown-kind",
152+
Name: "test-config",
153+
}
154+
155+
err := reconciler.Reconcile(context.Background(), req)
156+
157+
require.NoError(t, err)
158+
mockReadEntry.AssertNotCalled(t, "Execute")
159+
mockUpdater.AssertNotCalled(t, "UpdateGlobalRateLimitConfig")
160+
}
161+
162+
// TestNewRateLimiterController_InitializesController tests that NewRateLimiterController properly initializes the controller.
163+
func TestNewRateLimiterController_InitializesController(t *testing.T) {
164+
mockReadEntry := NewMockceReadEntryFunc()
165+
publisher := stream.NewEventPublisher(1 * time.Millisecond)
166+
logger := hclog.NewNullLogger()
167+
mockUpdater := NewMockceUpdater()
168+
169+
ctrl := NewRateLimiterController(mockReadEntry.Execute, publisher, logger, mockUpdater)
170+
171+
// Verify that the controller is not nil
172+
require.NotNil(t, ctrl)
173+
}

agent/consul/rate/controller/mock_ceReadEntry.go

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/consul/rate/controller/mock_ceUpdater.go

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)