3333import java .util .concurrent .TimeUnit ;
3434import java .util .concurrent .atomic .AtomicInteger ;
3535
36+ import com .alibaba .dubbo .common .utils .StringUtils ;
3637import org .apache .commons .pool .impl .GenericObjectPool ;
3738
3839import redis .clients .jedis .Jedis ;
4849import com .alibaba .dubbo .registry .NotifyListener ;
4950import com .alibaba .dubbo .registry .support .FailbackRegistry ;
5051import com .alibaba .dubbo .rpc .RpcException ;
52+ import redis .clients .jedis .exceptions .JedisConnectionException ;
5153
5254/**
5355 * RedisRegistry
54- *
56+ *
5557 * @author william.liangf
5658 */
5759public class RedisRegistry extends FailbackRegistry {
@@ -65,26 +67,26 @@ public class RedisRegistry extends FailbackRegistry {
6567 private final ScheduledExecutorService expireExecutor = Executors .newScheduledThreadPool (1 , new NamedThreadFactory ("DubboRegistryExpireTimer" , true ));
6668
6769 private final ScheduledFuture <?> expireFuture ;
68-
70+
6971 private final String root ;
7072
7173 private final Map <String , JedisPool > jedisPools = new ConcurrentHashMap <String , JedisPool >();
7274
7375 private final ConcurrentMap <String , Notifier > notifiers = new ConcurrentHashMap <String , Notifier >();
74-
76+
7577 private final int reconnectPeriod ;
7678
7779 private final int expirePeriod ;
78-
80+
7981 private volatile boolean admin = false ;
80-
82+
8183 private boolean replicate ;
8284
8385 public RedisRegistry (URL url ) {
8486 super (url );
8587 if (url .isAnyHost ()) {
86- throw new IllegalStateException ("registry address == null" );
87- }
88+ throw new IllegalStateException ("registry address == null" );
89+ }
8890 GenericObjectPool .Config config = new GenericObjectPool .Config ();
8991 config .testOnBorrow = url .getParameter ("test.on.borrow" , true );
9092 config .testOnReturn = url .getParameter ("test.on.return" , false );
@@ -103,19 +105,22 @@ public RedisRegistry(URL url) {
103105 config .timeBetweenEvictionRunsMillis = url .getParameter ("time.between.eviction.runs.millis" , 0 );
104106 if (url .getParameter ("min.evictable.idle.time.millis" , 0 ) > 0 )
105107 config .minEvictableIdleTimeMillis = url .getParameter ("min.evictable.idle.time.millis" , 0 );
106-
108+
107109 String cluster = url .getParameter ("cluster" , "failover" );
108110 if (! "failover" .equals (cluster ) && ! "replicate" .equals (cluster )) {
109- throw new IllegalArgumentException ("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate." );
111+ throw new IllegalArgumentException ("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate." );
110112 }
111113 replicate = "replicate" .equals (cluster );
112-
114+
113115 List <String > addresses = new ArrayList <String >();
114116 addresses .add (url .getAddress ());
115117 String [] backups = url .getParameter (Constants .BACKUP_KEY , new String [0 ]);
116118 if (backups != null && backups .length > 0 ) {
117119 addresses .addAll (Arrays .asList (backups ));
118120 }
121+
122+ // 增加Redis密码支持
123+ String password = url .getPassword ();
119124 for (String address : addresses ) {
120125 int i = address .indexOf (':' );
121126 String host ;
@@ -127,10 +132,16 @@ public RedisRegistry(URL url) {
127132 host = address ;
128133 port = DEFAULT_REDIS_PORT ;
129134 }
130- this .jedisPools .put (address , new JedisPool (config , host , port ,
131- url .getParameter (Constants .TIMEOUT_KEY , Constants .DEFAULT_TIMEOUT )));
135+ if (StringUtils .isEmpty (password )) {
136+ this .jedisPools .put (address , new JedisPool (config , host , port ,
137+ url .getParameter (Constants .TIMEOUT_KEY , Constants .DEFAULT_TIMEOUT )));
138+ } else {
139+ // 使用密码连接。 此处要求备用redis与主要redis使用相同的密码
140+ this .jedisPools .put (address , new JedisPool (config , host , port ,
141+ url .getParameter (Constants .TIMEOUT_KEY , Constants .DEFAULT_TIMEOUT ), password ));
142+ }
132143 }
133-
144+
134145 this .reconnectPeriod = url .getParameter (Constants .REGISTRY_RECONNECT_PERIOD_KEY , Constants .DEFAULT_REGISTRY_RECONNECT_PERIOD );
135146 String group = url .getParameter (Constants .GROUP_KEY , DEFAULT_ROOT );
136147 if (! group .startsWith (Constants .PATH_SEPARATOR )) {
@@ -140,7 +151,7 @@ public RedisRegistry(URL url) {
140151 group = group + Constants .PATH_SEPARATOR ;
141152 }
142153 this .root = group ;
143-
154+
144155 this .expirePeriod = url .getParameter (Constants .SESSION_TIMEOUT_KEY , Constants .DEFAULT_SESSION_TIMEOUT );
145156 this .expireFuture = expireExecutor .scheduleWithFixedDelay (new Runnable () {
146157 public void run () {
@@ -152,10 +163,11 @@ public void run() {
152163 }
153164 }, expirePeriod / 2 , expirePeriod / 2 , TimeUnit .MILLISECONDS );
154165 }
155-
166+
156167 private void deferExpired () {
157168 for (Map .Entry <String , JedisPool > entry : jedisPools .entrySet ()) {
158169 JedisPool jedisPool = entry .getValue ();
170+ boolean isBroken = false ;
159171 try {
160172 Jedis jedis = jedisPool .getResource ();
161173 try {
@@ -170,18 +182,24 @@ private void deferExpired() {
170182 if (admin ) {
171183 clean (jedis );
172184 }
173- if (! replicate ) {
174- break ;// 如果服务器端已同步数据,只需写入单台机器
185+ if (!replicate ) {
186+ break ;// 如果服务器端已同步数据,只需写入单台机器
175187 }
188+ } catch (JedisConnectionException e ){
189+ isBroken = true ;
176190 } finally {
177- jedisPool .returnResource (jedis );
191+ if (isBroken ){
192+ jedisPool .returnBrokenResource (jedis );
193+ } else {
194+ jedisPool .returnResource (jedis );
195+ }
178196 }
179197 } catch (Throwable t ) {
180198 logger .warn ("Failed to write provider heartbeat to redis registry. registry: " + entry .getKey () + ", cause: " + t .getMessage (), t );
181199 }
182200 }
183201 }
184-
202+
185203 // 监控中心负责删除过期脏数据
186204 private void clean (Jedis jedis ) {
187205 Set <String > keys = jedis .keys (root + Constants .ANY_VALUE );
@@ -202,7 +220,7 @@ private void clean(Jedis jedis) {
202220 logger .warn ("Delete expired key: " + key + " -> value: " + entry .getKey () + ", expire: " + new Date (expire ) + ", now: " + new Date (now ));
203221 }
204222 }
205- }
223+ }
206224 }
207225 if (delete ) {
208226 jedis .publish (key , Constants .UNREGISTER );
@@ -214,16 +232,20 @@ private void clean(Jedis jedis) {
214232
215233 public boolean isAvailable () {
216234 for (JedisPool jedisPool : jedisPools .values ()) {
235+ Jedis jedis = jedisPool .getResource ();
236+ boolean isBroken = false ;
217237 try {
218- Jedis jedis = jedisPool .getResource ();
219- try {
220- if (jedis .isConnected ()) {
221- return true ; // 至少需单台机器可用
222- }
223- } finally {
238+ if (jedis .isConnected ()) {
239+ return true ; // 至少需单台机器可用
240+ }
241+ } catch (JedisConnectionException e ) {
242+ isBroken = true ;
243+ } finally {
244+ if (isBroken ) {
245+ jedisPool .returnBrokenResource (jedis );
246+ } else {
224247 jedisPool .returnResource (jedis );
225248 }
226- } catch (Throwable t ) {
227249 }
228250 }
229251 return false ;
@@ -265,15 +287,22 @@ public void doRegister(URL url) {
265287 JedisPool jedisPool = entry .getValue ();
266288 try {
267289 Jedis jedis = jedisPool .getResource ();
290+ boolean isBroken = false ;
268291 try {
269292 jedis .hset (key , value , expire );
270293 jedis .publish (key , Constants .REGISTER );
271294 success = true ;
272295 if (! replicate ) {
273- break ; // 如果服务器端已同步数据,只需写入单台机器
296+ break ; // 如果服务器端已同步数据,只需写入单台机器
274297 }
298+ } catch (JedisConnectionException e ){
299+ isBroken = true ;
275300 } finally {
276- jedisPool .returnResource (jedis );
301+ if (isBroken ){
302+ jedisPool .returnBrokenResource (jedis );
303+ } else {
304+ jedisPool .returnResource (jedis );
305+ }
277306 }
278307 } catch (Throwable t ) {
279308 exception = new RpcException ("Failed to register service to redis registry. registry: " + entry .getKey () + ", service: " + url + ", cause: " + t .getMessage (), t );
@@ -298,15 +327,22 @@ public void doUnregister(URL url) {
298327 JedisPool jedisPool = entry .getValue ();
299328 try {
300329 Jedis jedis = jedisPool .getResource ();
330+ boolean isBroken = false ;
301331 try {
302332 jedis .hdel (key , value );
303333 jedis .publish (key , Constants .UNREGISTER );
304334 success = true ;
305335 if (! replicate ) {
306- break ; // 如果服务器端已同步数据,只需写入单台机器
336+ break ; // 如果服务器端已同步数据,只需写入单台机器
307337 }
338+ } catch (JedisConnectionException e ){
339+ isBroken = true ;
308340 } finally {
309- jedisPool .returnResource (jedis );
341+ if (isBroken ){
342+ jedisPool .returnBrokenResource (jedis );
343+ } else {
344+ jedisPool .returnResource (jedis );
345+ }
310346 }
311347 } catch (Throwable t ) {
312348 exception = new RpcException ("Failed to unregister service to redis registry. registry: " + entry .getKey () + ", service: " + url + ", cause: " + t .getMessage (), t );
@@ -320,7 +356,7 @@ public void doUnregister(URL url) {
320356 }
321357 }
322358 }
323-
359+
324360 @ Override
325361 public void doSubscribe (final URL url , final NotifyListener listener ) {
326362 String service = toServicePath (url );
@@ -339,6 +375,7 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
339375 JedisPool jedisPool = entry .getValue ();
340376 try {
341377 Jedis jedis = jedisPool .getResource ();
378+ boolean isBroken = false ;
342379 try {
343380 if (service .endsWith (Constants .ANY_VALUE )) {
344381 admin = true ;
@@ -363,8 +400,14 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
363400 }
364401 success = true ;
365402 break ; // 只需读一个服务器的数据
403+ } catch (JedisConnectionException e ){
404+ isBroken = true ;
366405 } finally {
367- jedisPool .returnResource (jedis );
406+ if (isBroken ){
407+ jedisPool .returnBrokenResource (jedis );
408+ } else {
409+ jedisPool .returnResource (jedis );
410+ }
368411 }
369412 } catch (Throwable t ) { // 尝试下一个服务器
370413 exception = new RpcException ("Failed to subscribe service from redis registry. registry: " + entry .getKey () + ", service: " + url + ", cause: " + t .getMessage (), t );
@@ -470,7 +513,7 @@ private String toCategoryPath(URL url) {
470513 }
471514
472515 private class NotifySub extends JedisPubSub {
473-
516+
474517 private final JedisPool jedisPool ;
475518
476519 public NotifySub (JedisPool jedisPool ) {
@@ -482,14 +525,21 @@ public void onMessage(String key, String msg) {
482525 if (logger .isInfoEnabled ()) {
483526 logger .info ("redis event: " + key + " = " + msg );
484527 }
485- if (msg .equals (Constants .REGISTER )
528+ if (msg .equals (Constants .REGISTER )
486529 || msg .equals (Constants .UNREGISTER )) {
487530 try {
488531 Jedis jedis = jedisPool .getResource ();
532+ boolean isBroken = false ;
489533 try {
490534 doNotify (jedis , key );
535+ } catch (JedisConnectionException e ){
536+ isBroken = true ;
491537 } finally {
492- jedisPool .returnResource (jedis );
538+ if (isBroken ){
539+ jedisPool .returnBrokenResource (jedis );
540+ } else {
541+ jedisPool .returnResource (jedis );
542+ }
493543 }
494544 } catch (Throwable t ) { // TODO 通知失败没有恢复机制保障
495545 logger .error (t .getMessage (), t );
@@ -527,23 +577,23 @@ private class Notifier extends Thread {
527577 private volatile Jedis jedis ;
528578
529579 private volatile boolean first = true ;
530-
580+
531581 private volatile boolean running = true ;
532-
582+
533583 private final AtomicInteger connectSkip = new AtomicInteger ();
534584
535585 private final AtomicInteger connectSkiped = new AtomicInteger ();
536586
537587 private final Random random = new Random ();
538-
588+
539589 private volatile int connectRandom ;
540590
541591 private void resetSkip () {
542592 connectSkip .set (0 );
543593 connectSkiped .set (0 );
544594 connectRandom = 0 ;
545595 }
546-
596+
547597 private boolean isSkip () {
548598 int skip = connectSkip .get (); // 跳过次数增长
549599 if (skip >= 10 ) { // 如果跳过次数增长超过10,取随机数
@@ -560,13 +610,13 @@ private boolean isSkip() {
560610 connectRandom = 0 ;
561611 return false ;
562612 }
563-
613+
564614 public Notifier (String service ) {
565615 super .setDaemon (true );
566616 super .setName ("DubboRedisSubscribe" );
567617 this .service = service ;
568618 }
569-
619+
570620 @ Override
571621 public void run () {
572622 while (running ) {
@@ -618,7 +668,7 @@ public void run() {
618668 }
619669 }
620670 }
621-
671+
622672 public void shutdown () {
623673 try {
624674 running = false ;
@@ -627,7 +677,7 @@ public void shutdown() {
627677 logger .warn (t .getMessage (), t );
628678 }
629679 }
630-
680+
631681 }
632682
633- }
683+ }
0 commit comments