Skip to content

Commit 74ff1b6

Browse files
authored
Merge pull request #790 from chickenlj:bugfix_zookeeper_block
Optimize zookeeper block logic on app startup
1 parent 1420cb4 commit 74ff1b6

File tree

8 files changed

+155
-10
lines changed

8 files changed

+155
-10
lines changed

dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22

33
import com.alibaba.dubbo.common.logger.Logger;
44
import com.alibaba.dubbo.common.logger.LoggerFactory;
5+
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
56

67
import java.util.concurrent.Executor;
8+
import java.util.concurrent.SynchronousQueue;
9+
import java.util.concurrent.ThreadPoolExecutor;
10+
import java.util.concurrent.TimeUnit;
711

812
/**
913
* <p>A list of listeners, each with an associated {@code Executor}, that
@@ -35,6 +39,8 @@ public final class ExecutionList {
3539

3640
private boolean executed;
3741

42+
private static final Executor DEFAULT_EXECUTOR = new ThreadPoolExecutor(1, 10, 60000L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("DubboFutureCallbackDefault", true));
43+
3844
/**
3945
* Creates a new, empty {@link ExecutionList}.
4046
*/
@@ -63,10 +69,13 @@ public void add(Runnable runnable, Executor executor) {
6369
// Fail fast on a null. We throw NPE here because the contract of
6470
// Executor states that it throws NPE on null listener, so we propagate
6571
// that contract up into the add method as well.
66-
if (runnable == null || executor == null) {
67-
throw new NullPointerException("Both Runnable and Executor can not be null!");
72+
if (runnable == null) {
73+
throw new NullPointerException("Runnable can not be null!");
74+
}
75+
if (executor == null) {
76+
logger.info("Executor for listenablefuture is null, will use default executor!");
77+
executor = DEFAULT_EXECUTOR;
6878
}
69-
7079
// Lock while we check state. We must maintain the lock while adding the
7180
// new pair so that another thread can't run the list out from under us.
7281
// We only add to the list if we have not yet started execution.

dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,6 @@ public interface ListenableFuture<V> extends Future<V> {
116116
* immediately but the executor rejected it.
117117
*/
118118
void addListener(Runnable listener, Executor executor);
119+
120+
void addListener(Runnable listener);
119121
}

dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public void addListener(Runnable listener, Executor exec) {
6565
executionList.add(listener, exec);
6666
}
6767

68+
@Override
69+
public void addListener(Runnable listener) {
70+
executionList.add(listener, null);
71+
}
72+
6873
/**
6974
* Internal implementation detail used to invoke the listeners.
7075
*/

dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,10 @@ public static void notNull(Object obj, String message) {
3030
}
3131
}
3232

33+
public static void notNull(Object obj, RuntimeException exeception) {
34+
if (obj == null) {
35+
throw exeception;
36+
}
37+
}
38+
3339
}

dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Monitor getMonitor(URL url) {
8080

8181
final URL monitorUrl = url;
8282
final ListenableFutureTask<Monitor> listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl));
83-
listenableFutureTask.addListener(new MonitorListener(key), executor);
83+
listenableFutureTask.addListener(new MonitorListener(key));
8484
executor.execute(listenableFutureTask);
8585
FUTURES.put(key, listenableFutureTask);
8686

dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public CuratorZookeeperClient(URL url) {
2727
try {
2828
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
2929
.connectString(url.getBackupAddress())
30-
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
30+
.retryPolicy(new RetryNTimes(1, 1000))
3131
.connectionTimeoutMs(5000);
3232
String authority = url.getAuthority();
3333
if (authority != null && authority.length() > 0) {
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package com.alibaba.dubbo.remoting.zookeeper.zkclient;
2+
3+
import com.alibaba.dubbo.common.concurrent.ListenableFutureTask;
4+
import com.alibaba.dubbo.common.logger.Logger;
5+
import com.alibaba.dubbo.common.logger.LoggerFactory;
6+
import com.alibaba.dubbo.common.utils.Assert;
7+
8+
import org.I0Itec.zkclient.IZkChildListener;
9+
import org.I0Itec.zkclient.IZkStateListener;
10+
import org.I0Itec.zkclient.ZkClient;
11+
import org.apache.zookeeper.Watcher.Event.KeeperState;
12+
13+
import java.util.List;
14+
import java.util.concurrent.Callable;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.TimeUnit;
17+
18+
/**
19+
* 连接超时后,能自动监听连接状态的zkclient包装类
20+
* 也为和curator在使用上总体保持一致
21+
* @author ken.lj
22+
* @date 2017/10/29
23+
*/
24+
public class ZkClientWrapper {
25+
Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class);
26+
27+
private long timeout;
28+
private ZkClient client;
29+
private volatile KeeperState state;
30+
private ListenableFutureTask<ZkClient> listenableFutureTask;
31+
private volatile boolean started = false;
32+
33+
34+
public ZkClientWrapper(final String serverAddr, long timeout) {
35+
this.timeout = timeout;
36+
listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
37+
@Override
38+
public ZkClient call() throws Exception {
39+
return new ZkClient(serverAddr, Integer.MAX_VALUE);
40+
}
41+
});
42+
}
43+
44+
public void start() {
45+
if (!started) {
46+
Thread connectThread = new Thread(listenableFutureTask);
47+
connectThread.setName("DubboZkclientConnector");
48+
connectThread.setDaemon(true);
49+
connectThread.start();
50+
try {
51+
client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS);
52+
} catch (Throwable t) {
53+
logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
54+
}
55+
started = true;
56+
} else {
57+
logger.warn("Zkclient has already been started!");
58+
}
59+
}
60+
61+
public void addListener(final IZkStateListener listener) {
62+
listenableFutureTask.addListener(new Runnable() {
63+
@Override
64+
public void run() {
65+
try {
66+
client = listenableFutureTask.get();
67+
client.subscribeStateChanges(listener);
68+
} catch (InterruptedException e) {
69+
logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!");
70+
} catch (ExecutionException e) {
71+
logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e);
72+
}
73+
}
74+
});
75+
}
76+
77+
public boolean isConnected() {
78+
return client != null && state == KeeperState.SyncConnected;
79+
}
80+
81+
public void createPersistent(String path) {
82+
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
83+
client.createPersistent(path, true);
84+
}
85+
86+
public void createEphemeral(String path) {
87+
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
88+
client.createEphemeral(path);
89+
}
90+
91+
public void delete(String path) {
92+
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
93+
client.delete(path);
94+
}
95+
96+
public List<String> getChildren(String path) {
97+
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
98+
return client.getChildren(path);
99+
}
100+
101+
public boolean exists(String path) {
102+
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
103+
return client.exists(path);
104+
}
105+
106+
public void close() {
107+
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
108+
client.close();
109+
}
110+
111+
public List<String> subscribeChildChanges(String path, final IZkChildListener listener) {
112+
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
113+
return client.subscribeChildChanges(path, listener);
114+
}
115+
116+
public void unsubscribeChildChanges(String path, IZkChildListener listener) {
117+
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
118+
client.unsubscribeChildChanges(path, listener);
119+
}
120+
121+
122+
}

dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import org.I0Itec.zkclient.IZkChildListener;
99
import org.I0Itec.zkclient.IZkStateListener;
10-
import org.I0Itec.zkclient.ZkClient;
1110
import org.I0Itec.zkclient.exception.ZkNoNodeException;
1211
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
1312
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -16,14 +15,14 @@
1615

1716
public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> {
1817

19-
private final ZkClient client;
18+
private final ZkClientWrapper client;
2019

2120
private volatile KeeperState state = KeeperState.SyncConnected;
2221

2322
public ZkclientZookeeperClient(URL url) {
2423
super(url);
25-
client = new ZkClient(url.getBackupAddress());
26-
client.subscribeStateChanges(new IZkStateListener() {
24+
client = new ZkClientWrapper(url.getBackupAddress(), 30000);
25+
client.addListener(new IZkStateListener() {
2726
public void handleStateChanged(KeeperState state) throws Exception {
2827
ZkclientZookeeperClient.this.state = state;
2928
if (state == KeeperState.Disconnected) {
@@ -37,11 +36,13 @@ public void handleNewSession() throws Exception {
3736
stateChanged(StateListener.RECONNECTED);
3837
}
3938
});
39+
client.start();
4040
}
4141

42+
4243
public void createPersistent(String path) {
4344
try {
44-
client.createPersistent(path, true);
45+
client.createPersistent(path);
4546
} catch (ZkNodeExistsException e) {
4647
}
4748
}

0 commit comments

Comments
 (0)