Skip to content

Commit 3677770

Browse files
Merge pull request #811 from jeremycw/require-consistent-flag
Add command line flag to toggle required consistency on consul reads
2 parents d73417b + ff15d97 commit 3677770

6 files changed

Lines changed: 40 additions & 30 deletions

File tree

config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ type Consul struct {
166166
ServiceMonitors int
167167
TLS ConsulTlS
168168
PollInterval time.Duration
169+
RequireConsistent bool
170+
AllowStale bool
169171
}
170172

171173
type Custom struct {

config/default.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,21 +56,23 @@ var defaultConfig = &Config{
5656
Registry: Registry{
5757
Backend: "consul",
5858
Consul: Consul{
59-
Addr: "localhost:8500",
60-
Scheme: "http",
61-
KVPath: "/fabio/config",
62-
NoRouteHTMLPath: "/fabio/noroute.html",
63-
TagPrefix: "urlprefix-",
64-
Register: true,
65-
ServiceAddr: ":9998",
66-
ServiceName: "fabio",
67-
ServiceStatus: []string{"passing"},
68-
ServiceMonitors: 1,
69-
CheckInterval: time.Second,
70-
CheckTimeout: 3 * time.Second,
71-
CheckScheme: "http",
72-
ChecksRequired: "one",
73-
PollInterval: 0,
59+
Addr: "localhost:8500",
60+
Scheme: "http",
61+
KVPath: "/fabio/config",
62+
NoRouteHTMLPath: "/fabio/noroute.html",
63+
TagPrefix: "urlprefix-",
64+
Register: true,
65+
ServiceAddr: ":9998",
66+
ServiceName: "fabio",
67+
ServiceStatus: []string{"passing"},
68+
ServiceMonitors: 1,
69+
CheckInterval: time.Second,
70+
CheckTimeout: 3 * time.Second,
71+
CheckScheme: "http",
72+
ChecksRequired: "one",
73+
PollInterval: 0,
74+
RequireConsistent: true,
75+
AllowStale: false,
7476
},
7577
Custom: Custom{
7678
Host: "",

config/load.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
197197
f.StringVar(&cfg.Registry.Consul.ChecksRequired, "registry.consul.checksRequired", defaultConfig.Registry.Consul.ChecksRequired, "number of checks which must pass: one or all")
198198
f.IntVar(&cfg.Registry.Consul.ServiceMonitors, "registry.consul.serviceMonitors", defaultConfig.Registry.Consul.ServiceMonitors, "concurrency for route updates")
199199
f.DurationVar(&cfg.Registry.Consul.PollInterval, "registry.consul.pollinterval", defaultConfig.Registry.Consul.PollInterval, "poll interval for route updates")
200+
f.BoolVar(&cfg.Registry.Consul.RequireConsistent, "registry.consul.requireConsistent", defaultConfig.Registry.Consul.RequireConsistent, "is consistent read mode on consul queries required")
201+
f.BoolVar(&cfg.Registry.Consul.AllowStale, "registry.consul.allowStale", defaultConfig.Registry.Consul.AllowStale, "is stale read mode on consul queries allowed")
200202
f.IntVar(&cfg.Runtime.GOGC, "runtime.gogc", defaultConfig.Runtime.GOGC, "sets runtime.GOGC")
201203
f.IntVar(&cfg.Runtime.GOMAXPROCS, "runtime.gomaxprocs", defaultConfig.Runtime.GOMAXPROCS, "sets runtime.GOMAXPROCS")
202204
f.StringVar(&cfg.UI.Access, "ui.access", defaultConfig.UI.Access, "access mode, one of [ro, rw]")
@@ -327,6 +329,10 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
327329
return nil, fmt.Errorf("proxy.noroutestatus must be between 100 and 999")
328330
}
329331

332+
if cfg.Registry.Consul.AllowStale && cfg.Registry.Consul.RequireConsistent {
333+
return nil, fmt.Errorf("registry.consul.allowStale and registry.consul.requireConsistent cannot both be true")
334+
}
335+
330336
// handle deprecations
331337
deprecate := func(name, msg string) {
332338
if f.IsSet(name) {

registry/consul/backend.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,14 @@ func (b *be) DeregisterAll() error {
110110
}
111111

112112
func (b *be) ManualPaths() ([]string, error) {
113-
keys, _, err := listKeys(b.c, b.cfg.KVPath, 0)
113+
keys, _, err := listKeys(b.c, b.cfg.KVPath, 0, b.cfg.RequireConsistent, b.cfg.AllowStale)
114114
return keys, err
115115
}
116116

117117
func (b *be) ReadManual(path string) (value string, version uint64, err error) {
118118
// we cannot rely on the value provided by WatchManual() since
119119
// someone has to call that method first to kick off the go routine.
120-
return getKV(b.c, b.cfg.KVPath+path, 0)
120+
return getKV(b.c, b.cfg.KVPath+path, 0, b.cfg.RequireConsistent, b.cfg.AllowStale)
121121
}
122122

123123
func (b *be) WriteManual(path string, value string, version uint64) (ok bool, err error) {
@@ -144,15 +144,15 @@ func (b *be) WatchManual() chan string {
144144
log.Printf("[INFO] consul: Watching KV path %q", b.cfg.KVPath)
145145

146146
kv := make(chan string)
147-
go watchKV(b.c, b.cfg.KVPath, kv, true)
147+
go watchKV(b.c, b.cfg.KVPath, kv, true, b.cfg.RequireConsistent, b.cfg.AllowStale)
148148
return kv
149149
}
150150

151151
func (b *be) WatchNoRouteHTML() chan string {
152152
log.Printf("[INFO] consul: Watching KV path %q", b.cfg.NoRouteHTMLPath)
153153

154154
html := make(chan string)
155-
go watchKV(b.c, b.cfg.NoRouteHTMLPath, html, false)
155+
go watchKV(b.c, b.cfg.NoRouteHTMLPath, html, false, b.cfg.RequireConsistent, b.cfg.AllowStale)
156156
return html
157157
}
158158

registry/consul/kv.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ import (
1010

1111
// watchKV monitors a key in the KV store for changes.
1212
// The intended use case is to add additional route commands to the routing table.
13-
func watchKV(client *api.Client, path string, config chan string, separator bool) {
13+
func watchKV(client *api.Client, path string, config chan string, separator bool, requireConsistent bool, allowStale bool) {
1414
var lastIndex uint64
1515
var lastValue string
1616

1717
for {
18-
value, index, err := listKV(client, path, lastIndex, separator)
18+
value, index, err := listKV(client, path, lastIndex, separator, requireConsistent, allowStale)
1919
if err != nil {
2020
log.Printf("[WARN] consul: Error fetching config from %s. %v", path, err)
2121
time.Sleep(time.Second)
@@ -30,8 +30,8 @@ func watchKV(client *api.Client, path string, config chan string, separator bool
3030
}
3131
}
3232

33-
func listKeys(client *api.Client, path string, waitIndex uint64) ([]string, uint64, error) {
34-
q := &api.QueryOptions{RequireConsistent: true, WaitIndex: waitIndex}
33+
func listKeys(client *api.Client, path string, waitIndex uint64, requireConsistent bool, allowStale bool) ([]string, uint64, error) {
34+
q := &api.QueryOptions{RequireConsistent: requireConsistent, AllowStale: allowStale, WaitIndex: waitIndex}
3535
kvpairs, meta, err := client.KV().List(path, q)
3636
if err != nil {
3737
return nil, 0, err
@@ -46,8 +46,8 @@ func listKeys(client *api.Client, path string, waitIndex uint64) ([]string, uint
4646
return keys, meta.LastIndex, nil
4747
}
4848

49-
func listKV(client *api.Client, path string, waitIndex uint64, separator bool) (string, uint64, error) {
50-
q := &api.QueryOptions{RequireConsistent: true, WaitIndex: waitIndex}
49+
func listKV(client *api.Client, path string, waitIndex uint64, separator bool, requireConsistent bool, allowStale bool) (string, uint64, error) {
50+
q := &api.QueryOptions{RequireConsistent: requireConsistent, AllowStale: allowStale, WaitIndex: waitIndex}
5151
kvpairs, meta, err := client.KV().List(path, q)
5252
if err != nil {
5353
return "", 0, err
@@ -66,8 +66,8 @@ func listKV(client *api.Client, path string, waitIndex uint64, separator bool) (
6666
return strings.Join(s, "\n\n"), meta.LastIndex, nil
6767
}
6868

69-
func getKV(client *api.Client, key string, waitIndex uint64) (string, uint64, error) {
70-
q := &api.QueryOptions{RequireConsistent: true, WaitIndex: waitIndex}
69+
func getKV(client *api.Client, key string, waitIndex uint64, requireConsistent bool, allowStale bool) (string, uint64, error) {
70+
q := &api.QueryOptions{RequireConsistent: requireConsistent, AllowStale: allowStale, WaitIndex: waitIndex}
7171
kvpair, meta, err := client.KV().Get(key, q)
7272
if err != nil {
7373
return "", 0, err

registry/consul/service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ func (w *ServiceMonitor) Watch(updates chan string) {
3535
var q *api.QueryOptions
3636
for {
3737
if w.config.PollInterval != 0 {
38-
q = &api.QueryOptions{RequireConsistent: true}
38+
q = &api.QueryOptions{RequireConsistent: w.config.RequireConsistent, AllowStale: w.config.AllowStale}
3939
time.Sleep(w.config.PollInterval)
4040
} else {
41-
q = &api.QueryOptions{RequireConsistent: true, WaitIndex: lastIndex}
41+
q = &api.QueryOptions{RequireConsistent: w.config.RequireConsistent, AllowStale: w.config.AllowStale, WaitIndex: lastIndex}
4242
}
4343
checks, meta, err := w.client.Health().State("any", q)
4444
if err != nil {
@@ -110,7 +110,7 @@ func (w *ServiceMonitor) serviceConfig(name string, passing map[string]bool) (co
110110
return nil
111111
}
112112

113-
q := &api.QueryOptions{RequireConsistent: true}
113+
q := &api.QueryOptions{RequireConsistent: w.config.RequireConsistent, AllowStale: w.config.AllowStale}
114114
svcs, _, err := w.client.Catalog().Service(name, "", q)
115115
if err != nil {
116116
log.Printf("[WARN] consul: Error getting catalog service %s. %v", name, err)

0 commit comments

Comments
 (0)