diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java
index 2e4b039287a5..5a1448581dda 100644
--- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java
@@ -2,8 +2,12 @@
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
+import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
*
A list of listeners, each with an associated {@code Executor}, that
@@ -35,6 +39,8 @@ public final class ExecutionList {
private boolean executed;
+ private static final Executor DEFAULT_EXECUTOR = new ThreadPoolExecutor(1, 10, 60000L, TimeUnit.MILLISECONDS, new SynchronousQueue(), new NamedThreadFactory("DubboFutureCallbackDefault", true));
+
/**
* Creates a new, empty {@link ExecutionList}.
*/
@@ -63,10 +69,13 @@ public void add(Runnable runnable, Executor executor) {
// Fail fast on a null. We throw NPE here because the contract of
// Executor states that it throws NPE on null listener, so we propagate
// that contract up into the add method as well.
- if (runnable == null || executor == null) {
- throw new NullPointerException("Both Runnable and Executor can not be null!");
+ if (runnable == null) {
+ throw new NullPointerException("Runnable can not be null!");
+ }
+ if (executor == null) {
+ logger.info("Executor for listenablefuture is null, will use default executor!");
+ executor = DEFAULT_EXECUTOR;
}
-
// Lock while we check state. We must maintain the lock while adding the
// new pair so that another thread can't run the list out from under us.
// We only add to the list if we have not yet started execution.
diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java
index 7b1ff0e9d77f..1600b3007985 100644
--- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java
@@ -116,4 +116,6 @@ public interface ListenableFuture extends Future {
* immediately but the executor rejected it.
*/
void addListener(Runnable listener, Executor executor);
+
+ void addListener(Runnable listener);
}
diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java
index b77d173b6b70..538d05c983e9 100644
--- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java
@@ -65,6 +65,11 @@ public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}
+ @Override
+ public void addListener(Runnable listener) {
+ executionList.add(listener, null);
+ }
+
/**
* Internal implementation detail used to invoke the listeners.
*/
diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java
index bda7c4b60cc9..333f3e7d7d88 100644
--- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java
@@ -30,4 +30,10 @@ public static void notNull(Object obj, String message) {
}
}
+ public static void notNull(Object obj, RuntimeException exeception) {
+ if (obj == null) {
+ throw exeception;
+ }
+ }
+
}
diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java
index d4bda27fe540..8e91ab3f8c38 100644
--- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java
+++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java
@@ -80,7 +80,7 @@ public Monitor getMonitor(URL url) {
final URL monitorUrl = url;
final ListenableFutureTask listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl));
- listenableFutureTask.addListener(new MonitorListener(key), executor);
+ listenableFutureTask.addListener(new MonitorListener(key));
executor.execute(listenableFutureTask);
FUTURES.put(key, listenableFutureTask);
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
index de3b3fbff3e5..f211a3e02a35 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
@@ -27,7 +27,7 @@ public CuratorZookeeperClient(URL url) {
try {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
- .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
+ .retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java
new file mode 100644
index 000000000000..318443ae53c8
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java
@@ -0,0 +1,122 @@
+package com.alibaba.dubbo.remoting.zookeeper.zkclient;
+
+import com.alibaba.dubbo.common.concurrent.ListenableFutureTask;
+import com.alibaba.dubbo.common.logger.Logger;
+import com.alibaba.dubbo.common.logger.LoggerFactory;
+import com.alibaba.dubbo.common.utils.Assert;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 连接超时后,能自动监听连接状态的zkclient包装类
+ * 也为和curator在使用上总体保持一致
+ * @author ken.lj
+ * @date 2017/10/29
+ */
+public class ZkClientWrapper {
+ Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class);
+
+ private long timeout;
+ private ZkClient client;
+ private volatile KeeperState state;
+ private ListenableFutureTask listenableFutureTask;
+ private volatile boolean started = false;
+
+
+ public ZkClientWrapper(final String serverAddr, long timeout) {
+ this.timeout = timeout;
+ listenableFutureTask = ListenableFutureTask.create(new Callable() {
+ @Override
+ public ZkClient call() throws Exception {
+ return new ZkClient(serverAddr, Integer.MAX_VALUE);
+ }
+ });
+ }
+
+ public void start() {
+ if (!started) {
+ Thread connectThread = new Thread(listenableFutureTask);
+ connectThread.setName("DubboZkclientConnector");
+ connectThread.setDaemon(true);
+ connectThread.start();
+ try {
+ client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS);
+ } catch (Throwable t) {
+ logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
+ }
+ started = true;
+ } else {
+ logger.warn("Zkclient has already been started!");
+ }
+ }
+
+ public void addListener(final IZkStateListener listener) {
+ listenableFutureTask.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ client = listenableFutureTask.get();
+ client.subscribeStateChanges(listener);
+ } catch (InterruptedException e) {
+ logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!");
+ } catch (ExecutionException e) {
+ logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e);
+ }
+ }
+ });
+ }
+
+ public boolean isConnected() {
+ return client != null && state == KeeperState.SyncConnected;
+ }
+
+ public void createPersistent(String path) {
+ Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
+ client.createPersistent(path, true);
+ }
+
+ public void createEphemeral(String path) {
+ Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
+ client.createEphemeral(path);
+ }
+
+ public void delete(String path) {
+ Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
+ client.delete(path);
+ }
+
+ public List getChildren(String path) {
+ Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
+ return client.getChildren(path);
+ }
+
+ public boolean exists(String path) {
+ Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
+ return client.exists(path);
+ }
+
+ public void close() {
+ Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
+ client.close();
+ }
+
+ public List subscribeChildChanges(String path, final IZkChildListener listener) {
+ Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
+ return client.subscribeChildChanges(path, listener);
+ }
+
+ public void unsubscribeChildChanges(String path, IZkChildListener listener) {
+ Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
+ client.unsubscribeChildChanges(path, listener);
+ }
+
+
+}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java
index dec4da12542c..498f43158b95 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java
@@ -7,7 +7,6 @@
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -16,14 +15,14 @@
public class ZkclientZookeeperClient extends AbstractZookeeperClient {
- private final ZkClient client;
+ private final ZkClientWrapper client;
private volatile KeeperState state = KeeperState.SyncConnected;
public ZkclientZookeeperClient(URL url) {
super(url);
- client = new ZkClient(url.getBackupAddress());
- client.subscribeStateChanges(new IZkStateListener() {
+ client = new ZkClientWrapper(url.getBackupAddress(), 30000);
+ client.addListener(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
@@ -37,11 +36,13 @@ public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
+ client.start();
}
+
public void createPersistent(String path) {
try {
- client.createPersistent(path, true);
+ client.createPersistent(path);
} catch (ZkNodeExistsException e) {
}
}