4040import java .util .ArrayList ;
4141import java .util .Map ;
4242import java .util .Objects ;
43+ import java .util .concurrent .TimeUnit ;
44+ import java .util .concurrent .Executors ;
4345import java .util .concurrent .ConcurrentHashMap ;
4446import java .util .concurrent .ConcurrentMap ;
4547import java .util .concurrent .ExecutorService ;
48+ import java .util .concurrent .ScheduledExecutorService ;
4649import java .util .stream .Collectors ;
4750
4851import static java .util .concurrent .Executors .newCachedThreadPool ;
@@ -57,31 +60,34 @@ public class ConsulRegistry extends FailbackRegistry {
5760 private static final String SERVICE_TAG = "dubbo" ;
5861 private static final String URL_META_KEY = "url" ;
5962 private static final String WATCH_TIMEOUT = "consul-watch-timeout" ;
60- private static final String CHECK_INTERVAL = "consul-check-interval" ;
61- private static final String CHECK_TIMEOUT = "consul-check-timeout" ;
63+ private static final String CHECK_PASS_INTERVAL = "consul-check-pass-interval" ;
6264 private static final String DEREGISTER_AFTER = "consul-deregister-critical-service-after" ;
6365
6466 private static final int DEFAULT_PORT = 8500 ;
6567 // default watch timeout in millisecond
6668 private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000 ;
67- // default tcp check interval
68- private static final String DEFAULT_CHECK_INTERVAL = "10s" ;
69- // default tcp check timeout
70- private static final String DEFAULT_CHECK_TIMEOUT = "1s" ;
69+ // default time-to-live in millisecond
70+ private static final long DEFAULT_CHECK_PASS_INTERVAL = 16000L ;
7171 // default deregister critical server after
7272 private static final String DEFAULT_DEREGISTER_TIME = "20s" ;
7373
7474 private ConsulClient client ;
75-
75+ private long checkPassInterval ;
7676 private ExecutorService notifierExecutor = newCachedThreadPool (
7777 new NamedThreadFactory ("dubbo-consul-notifier" , true ));
7878 private ConcurrentMap <URL , ConsulNotifier > notifiers = new ConcurrentHashMap <>();
79+ private ScheduledExecutorService ttlConsulCheckExecutor ;
80+
7981
8082 public ConsulRegistry (URL url ) {
8183 super (url );
8284 String host = url .getHost ();
8385 int port = url .getPort () != 0 ? url .getPort () : DEFAULT_PORT ;
8486 client = new ConsulClient (host , port );
87+ checkPassInterval = url .getParameter (CHECK_PASS_INTERVAL , DEFAULT_CHECK_PASS_INTERVAL );
88+ ttlConsulCheckExecutor = Executors .newSingleThreadScheduledExecutor ();
89+ ttlConsulCheckExecutor .scheduleAtFixedRate (this ::checkPass , checkPassInterval / 8 ,
90+ checkPassInterval / 8 , TimeUnit .MILLISECONDS );
8591 }
8692
8793 @ Override
@@ -164,7 +170,7 @@ public List<URL> lookup(URL url) {
164170 }
165171 try {
166172 String service = url .getServiceKey ();
167- Response <List <HealthService >> result = client . getHealthServices (service , HealthServicesRequest . newBuilder (). setTag ( SERVICE_TAG ). build ( ));
173+ Response <List <HealthService >> result = getHealthServices (service , - 1 , buildWatchTimeout ( url ));
168174 if (result == null || result .getValue () == null || result .getValue ().isEmpty ()) {
169175 return new ArrayList <>();
170176 } else {
@@ -184,6 +190,21 @@ public boolean isAvailable() {
184190 public void destroy () {
185191 super .destroy ();
186192 notifierExecutor .shutdown ();
193+ ttlConsulCheckExecutor .shutdown ();
194+ }
195+
196+ private void checkPass () {
197+ for (URL url : getRegistered ()) {
198+ String checkId = buildId (url );
199+ try {
200+ client .agentCheckPass ("service:" + checkId );
201+ if (logger .isDebugEnabled ()) {
202+ logger .debug ("check pass for url: " + url + " with check id: " + checkId );
203+ }
204+ } catch (Throwable t ) {
205+ logger .warn ("fail to check pass for url: " + url + ", check id is: " + checkId );
206+ }
207+ }
187208 }
188209
189210 private Response <List <HealthService >> getHealthServices (String service , long index , int watchTimeout ) {
@@ -258,9 +279,7 @@ private String buildId(URL url) {
258279
259280 private NewService .Check buildCheck (URL url ) {
260281 NewService .Check check = new NewService .Check ();
261- check .setTcp (url .getAddress ());
262- check .setInterval (url .getParameter (CHECK_INTERVAL , DEFAULT_CHECK_INTERVAL ));
263- check .setTimeout (url .getParameter (CHECK_TIMEOUT , DEFAULT_CHECK_TIMEOUT ));
282+ check .setTtl ((checkPassInterval / 1000 ) + "s" );
264283 check .setDeregisterCriticalServiceAfter (url .getParameter (DEREGISTER_AFTER , DEFAULT_DEREGISTER_TIME ));
265284 return check ;
266285 }
0 commit comments