Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand All @@ -27,7 +26,6 @@

/**
* LeastActiveLoadBalance
*
*/
public class LeastActiveLoadBalance extends AbstractLoadBalance {

Expand All @@ -39,26 +37,26 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
int leastActive = -1; // The least active value of all invokers
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length]; // The index of invokers having the same least active value (leastActive)
int totalWeight = 0; // The sum of weights
int firstWeight = 0; // Initial value, used for comparision
int totalWeightAfterWarmUp = 0; // The sum of after warmup weights
int firstWeightAfterWarmUp = 0; // Initial value, used for comparision
boolean sameWeight = true; // Every invoker has the same weight value?
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
int afterWarmup = getWeight(invoker, invocation);
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexes[0] = i; // Reset
totalWeight = weight; // Reset
firstWeight = weight; // Record the weight the first invoker
totalWeightAfterWarmUp = afterWarmup; // Reset
firstWeightAfterWarmUp = afterWarmup; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexes[leastCount++] = i; // Record index number of this invoker
totalWeight += weight; // Add this invoker's weight to totalWeight.
totalWeightAfterWarmUp += afterWarmup; // Add this invoker's after warmup weight to totalWeightAfterWarmUp.
// If every invoker has the same weight?
if (sameWeight && i > 0
&& weight != firstWeight) {
&& afterWarmup != firstWeightAfterWarmUp) {
sameWeight = false;
}
}
Expand All @@ -68,9 +66,9 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
if (!sameWeight && totalWeightAfterWarmUp > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeightAfterWarmUp.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeightAfterWarmUp) + 1;
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
Expand All @@ -79,7 +77,7 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
return invokers.get(leastIndex);
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// If all invokers have the same weight value or totalWeightAfterWarmUp=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,24 @@
*/
package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

public class LeastActiveBalanceTest extends LoadBalanceBaseTest {
@Ignore
@Test
Expand All @@ -39,4 +48,119 @@ public void testLeastActiveLoadBalance_select() {
}
}

private List<Invoker<LoadBalanceBaseTest>> invokers = new ArrayList<Invoker<LoadBalanceBaseTest>>();
private Invoker<LoadBalanceBaseTest> invoker1;
private Invoker<LoadBalanceBaseTest> invoker2;

@Before
public void before() throws Exception {
invoker1 = mock(Invoker.class);
invoker2 = mock(Invoker.class);
invoker3 = mock(Invoker.class);

URL url1 = URL.valueOf("test1://0:1/DemoService");
URL url2 = URL.valueOf("test2://0:9/DemoService");
URL url3 = URL.valueOf("test3://1:6/DemoService");

given(invoker1.isAvailable()).willReturn(true);
given(invoker1.getUrl()).willReturn(url1);

given(invoker2.isAvailable()).willReturn(true);
given(invoker2.getUrl()).willReturn(url2);

given(invoker3.isAvailable()).willReturn(true);
given(invoker3.getUrl()).willReturn(url3);

invokers.add(invoker1);
invokers.add(invoker2);
invokers.add(invoker3);
}

@Test
public void testSelect() {
int sumInvoker1 = 0;
int sumInvoker2 = 0;
for (int i = 0; i < 100000; i++) {
MyLeastActiveLoadBalance lb = new MyLeastActiveLoadBalance();
Invoker selected = lb.select(invokers, null, null);

if (selected.getUrl().getProtocol().equals("test1")) {
sumInvoker1++;
}

if (selected.getUrl().getProtocol().equals("test2")) {
sumInvoker2++;
}
// never select invoker3 because it's active is more than invoker1 and invoker2
Assert.assertTrue("select is not the least active one", !selected.getUrl().getProtocol().equals("test3"));
}

// the sumInvoker1 : sumInvoker2 approximately equal to 1: 9
System.out.println(sumInvoker1);
System.out.println(sumInvoker2);
}

class MyLeastActiveLoadBalance extends AbstractLoadBalance {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you reuse existing code?

org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveBalanceTest#testLeastActiveLoadBalance_select:

public void testLeastActiveLoadBalance_select() {
    ...
    Map<Invoker, AtomicLong> counter = getInvokeCounter(runs, LeastActiveLoadBalance.NAME);
    ...

Load your repaired code this way:

ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(LeastActiveLoadBalance.NAME)


private final Random random = new Random();

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int leastActive = -1; // The least active value of all invokers
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
int totalWeightAfterWarmUp = 0; // The sum of after warmup weights
int firstWeightAfterWarmUp = 0; // Initial value, used for comparision
boolean sameWeight = true; // Every invoker has the same weight value?
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);

// mock active is invoker's url.getHost
int active = Integer.valueOf(invoker.getUrl().getHost()); // Active number

// mock weight is invoker's url.getPort
int afterWarmup = invoker.getUrl().getPort();

if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexs[0] = i; // Reset
totalWeightAfterWarmUp = afterWarmup; // Reset
firstWeightAfterWarmUp = afterWarmup; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexs[leastCount++] = i; // Record index number of this invoker
totalWeightAfterWarmUp += afterWarmup; // Add this invoker's after warmup weight to totalWeightAfterWarmUp.
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeightAfterWarmUp) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexs[0]);
}
if (!sameWeight && totalWeightAfterWarmUp > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeightAfterWarmUp.
int offsetWeight = random.nextInt(totalWeightAfterWarmUp) + 1;
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];

// mock weight is invoker's url.getPort
offsetWeight -= invokers.get(leastIndex).getUrl().getPort();
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
// assert that at most loop 'leastCount' counts
Assert.assertTrue("leastCount is still > 0", leastCount < 0);
}
// If all invokers have the same weight value or totalWeightAfterWarmUp=0, return evenly.
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
}
}