Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
* <p>A list of listeners, each with an associated {@code Executor}, that
Expand Down Expand Up @@ -35,6 +37,8 @@ public final class ExecutionList {

private boolean executed;

private static final Executor DEFAULT_EXECUTOR = Executors.newSingleThreadExecutor(new NamedThreadFactory("DubboFutureCallbackDefaultExecutor", true));

/**
* Creates a new, empty {@link ExecutionList}.
*/
Expand Down Expand Up @@ -63,9 +67,12 @@ public void add(Runnable runnable, Executor executor) {
// Fail fast on a null. We throw NPE here because the contract of
// Executor states that it throws NPE on null listener, so we propagate
// that contract up into the add method as well.
if (runnable == null || executor == null) {
if (runnable == null) {
throw new NullPointerException("Both Runnable and Executor can not be null!");
}
if (executor == null) {
executor = DEFAULT_EXECUTOR;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方与之前的行为不一致,建议是传参数的时候,显示的传递 DEFAULT_EXECUTOR

}

// Lock while we check state. We must maintain the lock while adding the
// new pair so that another thread can't run the list out from under us.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,6 @@ public interface ListenableFuture<V> extends Future<V> {
* immediately but the executor rejected it.
*/
void addListener(Runnable listener, Executor executor);

void addListener(Runnable listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}

@Override
public void addListener(Runnable listener) {
executionList.add(listener, null);
}

/**
* Internal implementation detail used to invoke the listeners.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ public static void notNull(Object obj, String message) {
}
}

public static void notNull(Object obj, RuntimeException exeception) {
if (obj == null) {
throw exeception;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Monitor getMonitor(URL url) {

final URL monitorUrl = url;
final ListenableFutureTask<Monitor> listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl));
listenableFutureTask.addListener(new MonitorListener(key), executor);
listenableFutureTask.addListener(new MonitorListener(key));
executor.execute(listenableFutureTask);
FUTURES.put(key, listenableFutureTask);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public CuratorZookeeperClient(URL url) {
try {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.alibaba.dubbo.remoting.zookeeper.zkclient;

import com.alibaba.dubbo.common.concurrent.ListenableFutureTask;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.Assert;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.Watcher.Event.KeeperState;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* 连接超时后,能自动监听连接状态的zkclient包装类
* 也为和curator在使用上总体保持一致
* @author ken.lj
* @date 2017/10/29
*/
public class ZkClientWrapper {
Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class);

private long timeout;
private ZkClient client;
private volatile KeeperState state;
private ListenableFutureTask<ZkClient> listenableFutureTask;
private volatile boolean started = false;


public ZkClientWrapper(final String serverAddr, long timeout) {
this.timeout = timeout;
listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
@Override
public ZkClient call() throws Exception {
return new ZkClient(serverAddr, Integer.MAX_VALUE);
}
});
}

public void start() {
if (!started) {
Thread connectThread = new Thread(listenableFutureTask);
connectThread.setName("DubboZkclientConnector");
connectThread.setDaemon(true);
connectThread.start();
try {
client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
}
started = true;
} else {
throw new IllegalStateException("Zkclient has already been started!");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

重复初始化直接跳过应该会比较好一些

}
}

public void addListener(final IZkStateListener listener) {
listenableFutureTask.addListener(new Runnable() {
@Override
public void run() {
try {
client = listenableFutureTask.get();
client.subscribeStateChanges(listener);
} catch (InterruptedException e) {
e.printStackTrace();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方要用 logger 输出

} catch (ExecutionException e) {
e.printStackTrace();
}
}
});
}

public boolean isConnected() {
return client != null && state == KeeperState.SyncConnected;
}

public void createPersistent(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!"));
client.createPersistent(path, true);
}

public void createEphemeral(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!"));
client.createEphemeral(path);
}

public void delete(String path) {
if (client == null) {
throw new IllegalStateException("Zookeeper is not connected!");
}
client.delete(path);
}

public List<String> getChildren(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!"));
return client.getChildren(path);
}

public boolean exists(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!"));
return client.exists(path);
}

public void close() {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!"));
client.close();
}

public List<String> subscribeChildChanges(String path, final IZkChildListener listener) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!"));
return client.subscribeChildChanges(path, listener);
}

public void unsubscribeChildChanges(String path, IZkChildListener listener) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!"));
client.unsubscribeChildChanges(path, listener);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.Watcher.Event.KeeperState;
Expand All @@ -16,14 +15,14 @@

public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> {

private final ZkClient client;
private final ZkClientWrapper client;

private volatile KeeperState state = KeeperState.SyncConnected;

public ZkclientZookeeperClient(URL url) {
super(url);
client = new ZkClient(url.getBackupAddress());
client.subscribeStateChanges(new IZkStateListener() {
client = new ZkClientWrapper(url.getBackupAddress(), 30000);
client.addListener(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
Expand All @@ -37,11 +36,13 @@ public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
client.start();
}


public void createPersistent(String path) {
try {
client.createPersistent(path, true);
client.createPersistent(path);
} catch (ZkNodeExistsException e) {
}
}
Expand Down