Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Directory;
Expand All @@ -33,11 +35,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* AbstractClusterInvoker
*
*/
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

Expand Down Expand Up @@ -92,10 +94,10 @@ public void destroy() {

/**
* Select a invoker using loadbalance policy.</br>
* a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
* a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
* if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
* b)Reslection, the validation rule for reselection: selected > available. This rule guarantees that
* the selected invoker has the minimum chance to be one in the previously selected list, and also
* the selected invoker has the minimum chance to be one in the previously selected list, and also
* guarantees this invoker is available.
*
* @param loadbalance load balance policy
Expand Down Expand Up @@ -225,6 +227,13 @@ private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}

List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
* Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
*
* <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
*
*/
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

Expand All @@ -57,50 +56,55 @@ public ForkingClusterInvoker(Directory<T> directory) {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
} finally {
// clear attachments which is binding to current thread.
RpcContext.getContext().clearAttachments();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Directory;
Expand Down Expand Up @@ -132,6 +133,32 @@ protected Result doInvoke(Invocation invocation, List invokers, LoadBalance load

}


@Test
public void testBindingAttachment() {
final String attachKey = "attach";
final String attachValue = "value";

// setup attachment
RpcContext.getContext().setAttachment(attachKey, attachValue);
Map<String, String> attachments = RpcContext.getContext().getAttachments();
Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);

cluster = new AbstractClusterInvoker(dic) {
@Override
protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance)
throws RpcException {
// attachment will be bind to invocation
String value = invocation.getAttachment(attachKey);
Assert.assertTrue("binding attachment failed!", value != null && value.equals(attachValue));
return null;
}
};

// invoke
cluster.invoke(invocation);
}

@Test
public void testSelect_Invokersize0() throws Exception {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testNoInvoke() {
* then we should reselect from the latest invokers before retry.
*/
@Test
public void testInvokerDestoryAndReList() {
public void testInvokerDestroyAndReList() {
final URL url = URL.valueOf("test://localhost/" + Demo.class.getName() + "?loadbalance=roundrobin&retries=" + retries);
RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION);
MockInvoker<Demo> invoker1 = new MockInvoker<Demo>(Demo.class, url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.Assert;
Expand All @@ -30,14 +31,14 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertFalse;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

/**
* ForkingClusterInvokerTest
*
*/
@SuppressWarnings("unchecked")
public class ForkingClusterInvokerTest {
Expand Down Expand Up @@ -71,6 +72,7 @@ public void setUp() throws Exception {
invokers.add(invoker3);

}

private void resetInvokerToException() {
given(invoker1.invoke(invocation)).willThrow(new RuntimeException());
given(invoker1.getUrl()).willReturn(url);
Expand Down Expand Up @@ -106,7 +108,7 @@ private void resetInvokerToNoException() {
}

@Test
public void testInvokeExceptoin() {
public void testInvokeException() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);
Expand All @@ -120,8 +122,32 @@ public void testInvokeExceptoin() {
}
}

@Test
public void testClearRpcContext() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);

String attachKey = "attach";
String attachValue = "value";

RpcContext.getContext().setAttachment(attachKey, attachValue);

Map<String, String> attachments = RpcContext.getContext().getAttachments();
Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);
try {
invoker.invoke(invocation);
Assert.fail();
} catch (RpcException expected) {
Assert.assertTrue("Successed to forking invoke provider !", expected.getMessage().contains("Failed to forking invoke provider"));
assertFalse(expected.getCause() instanceof RpcException);
}
Map<String, String> afterInvoke = RpcContext.getContext().getAttachments();
Assert.assertTrue("clear attachment failed!", afterInvoke != null && afterInvoke.size() == 0);
}

@Test()
public void testInvokeNoExceptoin() {
public void testInvokeNoException() {

resetInvokerToNoException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Result invoke(Invocation inv) throws RpcException {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null) {
if (contextAttachments != null && contextAttachments.size() != 0) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
Expand Down