1515package handlers
1616
1717import (
18+ "bytes"
19+ "context"
20+ "fmt"
1821 "net/http"
22+ "sort"
1923
2024 "github.com/gin-gonic/gin"
25+ "go.etcd.io/etcd/api/v3/etcdserverpb"
26+ "go.uber.org/zap"
2127
28+ "github.com/pingcap/errors"
2229 "github.com/pingcap/failpoint"
30+ "github.com/pingcap/log"
2331
2432 "github.com/tikv/pd/pkg/storage"
2533 "github.com/tikv/pd/server"
2634 "github.com/tikv/pd/server/apiv2/middlewares"
2735)
2836
37+ // RegisterReadyCheck registers the ready check handlers.
38+ func RegisterReadyCheck (r * gin.RouterGroup ) {
39+ // --- Liveness Checks ---
40+ livezRegistry := NewCheckRegistry ()
41+ livezRegistry .register ("etcd-serializable-read" , checkEtcdSerializableRead )
42+ livezRegistry .installHandlers (r , "/livez" )
43+
44+ // --- Readiness Checks ---
45+ readyzRegistry := NewCheckRegistry ()
46+ readyzRegistry .register ("leader-promotion" , checkLeaderPromotion )
47+ readyzRegistry .register ("etcd-data-corruption" , checkEtcdDataCorruption )
48+ readyzRegistry .register ("etcd-serializable-read" , checkEtcdSerializableRead )
49+ readyzRegistry .register ("etcd-linearizable-read" , checkEtcdLinearizableRead )
50+ readyzRegistry .register ("etcd-non-learner" , checkEtcdNonLearner )
51+ readyzRegistry .installHandlers (r , "/readyz" )
52+
53+ // --- Backward Compatibility ---
54+ // The old /ready endpoint is now an alias for the leader-promotion readiness check.
55+ r .GET ("ready" , Ready )
56+ }
57+
58+ // checkFunction defines the function signature for a single check.
59+ // It returns an error if the check fails.
60+ type checkFunction func (ctx context.Context , svr * server.Server ) error
61+
62+ // checkRegistry manages a set of checks for a specific endpoint (e.g., "livez", "readyz").
63+ type checkRegistry struct {
64+ checks map [string ]checkFunction
65+ }
66+
67+ // NewCheckRegistry creates a new registry.
68+ func NewCheckRegistry () * checkRegistry {
69+ return & checkRegistry {
70+ checks : make (map [string ]checkFunction ),
71+ }
72+ }
73+
74+ // register adds a new check to the registry.
75+ func (reg * checkRegistry ) register (name string , check checkFunction ) {
76+ if _ , exists := reg .checks [name ]; exists {
77+ log .Warn ("check function is already registered" , zap .String ("name" , name ))
78+ }
79+ reg .checks [name ] = check
80+ }
81+
82+ // installHandlers registers the HTTP handlers for the root endpoint and all individual checks.
83+ func (reg * checkRegistry ) installHandlers (r * gin.RouterGroup , basePath string ) {
84+ checkNames := make ([]string , 0 , len (reg .checks ))
85+ for name := range reg .checks {
86+ checkNames = append (checkNames , name )
87+ }
88+ sort .Strings (checkNames ) // ensure consistent order
89+
90+ rootHandler := reg .createRootHandler (checkNames )
91+ r .GET (basePath , rootHandler )
92+
93+ for _ , name := range checkNames {
94+ checkName := name
95+ checkFunc := reg .checks [checkName ]
96+ fullCheckPath := basePath + "/" + checkName // e.g., "/readyz/leader-promotion"
97+ r .GET (fullCheckPath , func (c * gin.Context ) {
98+ svr := c .MustGet (middlewares .ServerContextKey ).(* server.Server )
99+ if err := checkFunc (c .Request .Context (), svr ); err != nil {
100+ c .String (http .StatusServiceUnavailable , "[-]%s failed: %v\n " , checkName , err )
101+ return
102+ }
103+ c .Status (http .StatusOK )
104+ if _ , verbose := c .GetQuery ("verbose" ); verbose {
105+ fmt .Fprintf (c .Writer , "[+]%s ok\n " , checkName )
106+ }
107+ fmt .Fprintf (c .Writer , "ok\n " )
108+ })
109+ }
110+ }
111+
112+ // createRootHandler creates the main handler that runs multiple checks.
113+ func (reg * checkRegistry ) createRootHandler (allChecks []string ) gin.HandlerFunc {
114+ return func (c * gin.Context ) {
115+ svr := c .MustGet (middlewares .ServerContextKey ).(* server.Server )
116+ excludeSet := make (map [string ]struct {})
117+ if excluded , ok := c .GetQueryArray ("exclude" ); ok {
118+ for _ , item := range excluded {
119+ excludeSet [item ] = struct {}{}
120+ }
121+ }
122+
123+ checksToRun := make ([]string , 0 , len (allChecks ))
124+ for _ , name := range allChecks {
125+ if _ , excluded := excludeSet [name ]; ! excluded {
126+ checksToRun = append (checksToRun , name )
127+ }
128+ }
129+
130+ var overallStatus = http .StatusOK
131+ var output bytes.Buffer
132+
133+ for _ , name := range checksToRun {
134+ check := reg .checks [name ]
135+ if err := check (c .Request .Context (), svr ); err != nil {
136+ overallStatus = http .StatusServiceUnavailable
137+ fmt .Fprintf (& output , "[-]%s failed: %v\n " , name , err )
138+ } else {
139+ fmt .Fprintf (& output , "[+]%s ok\n " , name )
140+ }
141+ }
142+
143+ if overallStatus != http .StatusOK {
144+ c .String (overallStatus , output .String ())
145+ return
146+ }
147+
148+ c .Status (http .StatusOK )
149+ if _ , verbose := c .GetQuery ("verbose" ); verbose {
150+ fmt .Fprint (c .Writer , output .String ())
151+ }
152+ fmt .Fprintf (c .Writer , "ok\n " )
153+ }
154+ }
155+
156+ // checkLeaderPromotion checks if the pd follower is ready to become leader.
157+ func checkLeaderPromotion (_ context.Context , svr * server.Server ) error {
158+ s := svr .GetStorage ()
159+ if ! storage .IsBootstrapped (s ) {
160+ // TODO: Do we still need it after introducing readyz?
161+ // Not bootstrapped is considered ready for this specific check.
162+ return nil
163+ }
164+
165+ regionLoaded := storage .AreRegionsLoaded (s )
166+ failpoint .Inject ("loadRegionSlow" , func (val failpoint.Value ) {
167+ if addr , ok := val .(string ); ok && svr .GetAddr () == addr {
168+ regionLoaded = false
169+ }
170+ })
171+
172+ if ! regionLoaded {
173+ return errors .New ("regions not loaded" )
174+ }
175+ return nil
176+ }
177+
178+ // checkEtcdSerializableRead checks if a local etcd read is ok.
179+ func checkEtcdSerializableRead (ctx context.Context , svr * server.Server ) error {
180+ failpoint .Inject ("etcdSerializableReadError" , func (val failpoint.Value ) {
181+ if addr , ok := val .(string ); ok && addr == svr .GetAddr () {
182+ failpoint .Return (errors .New ("injected serializable read error" ))
183+ }
184+ })
185+ // Ref: https://github.com/etcd-io/etcd/blob/9a5533382d84999e4e79642e1ec0f8bfa9b70ba8/server/etcdserver/api/etcdhttp/health.go#L454
186+ etcdServer := svr .GetMember ().Etcd ().Server
187+ _ , err := etcdServer .Range (ctx , & etcdserverpb.RangeRequest {KeysOnly : true , Limit : 1 , Serializable : true })
188+ if err != nil {
189+ return errors .New ("etcd serializable read failed: " + err .Error ())
190+ }
191+ return nil
192+ }
193+
194+ // checkEtcdLinearizableRead checks if there is consensus in the cluster.
195+ func checkEtcdLinearizableRead (ctx context.Context , svr * server.Server ) error {
196+ failpoint .Inject ("etcdLinearizableReadError" , func (val failpoint.Value ) {
197+ if addr , ok := val .(string ); ok && addr == svr .GetAddr () {
198+ failpoint .Return (errors .New ("injected linearizable read error" ))
199+ }
200+ })
201+ // Ref: https://github.com/etcd-io/etcd/blob/9a5533382d84999e4e79642e1ec0f8bfa9b70ba8/server/etcdserver/api/etcdhttp/health.go#L454
202+ etcdServer := svr .GetMember ().Etcd ().Server
203+ _ , err := etcdServer .Range (ctx , & etcdserverpb.RangeRequest {KeysOnly : true , Limit : 1 , Serializable : false })
204+ if err != nil {
205+ return errors .New ("etcd linearizable read failed: " + err .Error ())
206+ }
207+ return nil
208+ }
209+
210+ // checkEtcdDataCorruption checks if there is an active data corruption alarm.
211+ func checkEtcdDataCorruption (_ context.Context , svr * server.Server ) error {
212+ failpoint .Inject ("etcdDataCorruptionAlarm" , func (val failpoint.Value ) {
213+ if addr , ok := val .(string ); ok && addr == svr .GetAddr () {
214+ failpoint .Return (errors .New ("injected data corruption alarm" ))
215+ }
216+ })
217+ // Ref: https://github.com/etcd-io/etcd/blob/9a5533382d84999e4e79642e1ec0f8bfa9b70ba8/server/etcdserver/api/etcdhttp/health.go#L284
218+ etcdServer := svr .GetMember ().Etcd ().Server
219+ for _ , alarm := range etcdServer .Alarms () {
220+ if alarm .Alarm == etcdserverpb .AlarmType_CORRUPT {
221+ return errors .New ("etcd data corruption alarm is active" )
222+ }
223+ }
224+ return nil
225+ }
226+
227+ // checkEtcdNonLearner checks if the member is not a learner.
228+ func checkEtcdNonLearner (_ context.Context , svr * server.Server ) error {
229+ failpoint .Inject ("etcdIsLearner" , func (val failpoint.Value ) {
230+ if addr , ok := val .(string ); ok && addr == svr .GetAddr () {
231+ failpoint .Return (errors .New ("injected learner state" ))
232+ }
233+ })
234+ // Ref: https://github.com/etcd-io/etcd/commit/989c556645115201fdfcb4ba3026867f03709975
235+ etcdServer := svr .GetMember ().Etcd ().Server
236+ if etcdServer .IsLearner () {
237+ return errors .New ("member is a learner" )
238+ }
239+ return nil
240+ }
241+
29242// ReadyStatus reflects the cluster's ready status.
30243// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
31244type ReadyStatus struct {
@@ -34,39 +247,30 @@ type ReadyStatus struct {
34247}
35248
36249// Ready checks if the region is loaded.
250+ // @Description It will return whether pd follower is ready to became leader.
251+ // @Deprecated This endpoint is deprecated. Please use /readyz/leader-promotion instead.
37252// @Summary It will return whether pd follower is ready to became leader. This request is always served by the instance that receives it and never forwarded to the leader.
38253// @Router /ready [get]
39254// @Param verbose query bool false "Whether to return details."
40255// @Success 200
41256// @Failure 500
42257func Ready (c * gin.Context ) {
43258 svr := c .MustGet (middlewares .ServerContextKey ).(* server.Server )
44- s := svr . GetStorage ( )
259+ err := checkLeaderPromotion ( c . Request . Context (), svr )
45260
46- var regionLoaded bool
47- isBootstrapped := storage .IsBootstrapped (s )
48- if isBootstrapped {
49- regionLoaded = storage .AreRegionsLoaded (s )
50- failpoint .Inject ("loadRegionSlow" , func (val failpoint.Value ) {
51- if s , ok := val .(string ); ok {
52- if svr .GetAddr () == s {
53- regionLoaded = false
54- }
55- }
56- })
57- if regionLoaded {
58- c .Status (http .StatusOK )
59- } else {
261+ if _ , ok := c .GetQuery ("verbose" ); ! ok {
262+ if err != nil {
60263 c .Status (http .StatusInternalServerError )
264+ } else {
265+ c .Status (http .StatusOK )
61266 }
62- } else {
63- // If the cluster is not bootstrapped, we consider it as ready.
64- c .Status (http .StatusOK )
65- }
66267
67- if _ , ok := c .GetQuery ("verbose" ); ! ok {
68268 return
69269 }
270+
271+ s := svr .GetStorage ()
272+ isBootstrapped := storage .IsBootstrapped (s )
273+ regionLoaded := isBootstrapped && err == nil
70274 resp := & ReadyStatus {
71275 RegionLoaded : regionLoaded ,
72276 IsBootstrapped : isBootstrapped ,
0 commit comments