Skip to content

Commit 65e2358

Browse files
committed
weight test for all the loadBalance.
1 parent 009ff4e commit 65e2358

File tree

5 files changed

+230
-47
lines changed

5 files changed

+230
-47
lines changed

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
3737
int leastActive = -1; // The least active value of all invokers
3838
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
3939
int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
40-
int totalWeightAfterWarmUp = 0; // The sum of after warmup weights
41-
int firstWeightAfterWarmUp = 0; // Initial value, used for comparision
40+
int taotalWeightWithWarmUp = 0; // The sum of with warmup weights
41+
int firstWeightWithWarmUp = 0; // Initial value, used for comparision
4242
boolean sameWeight = true; // Every invoker has the same weight value?
4343
for (int i = 0; i < length; i++) {
4444
Invoker<T> invoker = invokers.get(i);
@@ -48,15 +48,15 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
4848
leastActive = active; // Record the current least active value
4949
leastCount = 1; // Reset leastCount, count again based on current leastCount
5050
leastIndexs[0] = i; // Reset
51-
totalWeightAfterWarmUp = afterWarmup; // Reset
52-
firstWeightAfterWarmUp = afterWarmup; // Record the weight the first invoker
51+
taotalWeightWithWarmUp = afterWarmup; // Reset
52+
firstWeightWithWarmUp = afterWarmup; // Record the weight the first invoker
5353
sameWeight = true; // Reset, every invoker has the same weight value?
5454
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
5555
leastIndexs[leastCount++] = i; // Record index number of this invoker
56-
totalWeightAfterWarmUp += afterWarmup; // Add this invoker's after warmup weight to totalWeightAfterWarmUp.
56+
taotalWeightWithWarmUp += afterWarmup; // Add this invoker's with warmup weight to totalWeightWithWarmUp.
5757
// If every invoker has the same weight?
5858
if (sameWeight && i > 0
59-
&& afterWarmup != firstWeightAfterWarmUp) {
59+
&& afterWarmup != firstWeightWithWarmUp) {
6060
sameWeight = false;
6161
}
6262
}
@@ -66,9 +66,9 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
6666
// If we got exactly one invoker having the least active value, return this invoker directly.
6767
return invokers.get(leastIndexs[0]);
6868
}
69-
if (!sameWeight && totalWeightAfterWarmUp > 0) {
70-
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeightAfterWarmUp.
71-
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeightAfterWarmUp) + 1;
69+
if (!sameWeight && taotalWeightWithWarmUp > 0) {
70+
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeightWithWarmUp.
71+
int offsetWeight = ThreadLocalRandom.current().nextInt(taotalWeightWithWarmUp) + 1;
7272
// Return a invoker based on the random value.
7373
for (int i = 0; i < leastCount; i++) {
7474
int leastIndex = leastIndexs[i];
@@ -77,7 +77,7 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
7777
return invokers.get(leastIndex);
7878
}
7979
}
80-
// If all invokers have the same weight value or totalWeightAfterWarmUp=0, return evenly.
80+
// If all invokers have the same weight value or totalWeightWithWarmUp=0, return evenly.
8181
return invokers.get(leastIndexs[ThreadLocalRandom.current().nextInt(leastCount)]);
8282
}
8383
}

dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveBalanceTest.java

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,15 @@
1919
import org.apache.dubbo.common.URL;
2020
import org.apache.dubbo.rpc.Invocation;
2121
import org.apache.dubbo.rpc.Invoker;
22-
2322
import org.junit.Assert;
24-
import org.junit.Before;
2523
import org.junit.Ignore;
2624
import org.junit.Test;
2725

28-
import java.util.ArrayList;
2926
import java.util.List;
3027
import java.util.Map;
3128
import java.util.Random;
3229
import java.util.concurrent.atomic.AtomicLong;
3330

34-
import static org.mockito.BDDMockito.given;
35-
import static org.mockito.Mockito.mock;
36-
3731
public class LeastActiveBalanceTest extends LoadBalanceBaseTest {
3832
@Ignore
3933
@Test
@@ -48,41 +42,15 @@ public void testLeastActiveLoadBalance_select() {
4842
}
4943
}
5044

51-
private List<Invoker<LoadBalanceBaseTest>> invokers = new ArrayList<Invoker<LoadBalanceBaseTest>>();
52-
private Invoker<LoadBalanceBaseTest> invoker1;
53-
private Invoker<LoadBalanceBaseTest> invoker2;
54-
55-
@Before
56-
public void before() throws Exception {
57-
invoker1 = mock(Invoker.class);
58-
invoker2 = mock(Invoker.class);
59-
invoker3 = mock(Invoker.class);
60-
61-
URL url1 = URL.valueOf("test1://0:1/DemoService");
62-
URL url2 = URL.valueOf("test2://0:9/DemoService");
63-
URL url3 = URL.valueOf("test3://1:6/DemoService");
64-
65-
given(invoker1.isAvailable()).willReturn(true);
66-
given(invoker1.getUrl()).willReturn(url1);
67-
68-
given(invoker2.isAvailable()).willReturn(true);
69-
given(invoker2.getUrl()).willReturn(url2);
70-
71-
given(invoker3.isAvailable()).willReturn(true);
72-
given(invoker3.getUrl()).willReturn(url3);
73-
74-
invokers.add(invoker1);
75-
invokers.add(invoker2);
76-
invokers.add(invoker3);
77-
}
78-
7945
@Test
80-
public void testSelect() {
46+
public void testSelectByWeight() {
8147
int sumInvoker1 = 0;
8248
int sumInvoker2 = 0;
49+
int loop = 100000;
50+
51+
MyLeastActiveLoadBalance lb = new MyLeastActiveLoadBalance();
8352
for (int i = 0; i < 100000; i++) {
84-
MyLeastActiveLoadBalance lb = new MyLeastActiveLoadBalance();
85-
Invoker selected = lb.select(invokers, null, null);
53+
Invoker selected = lb.select(weightInvokers, null, null);
8654

8755
if (selected.getUrl().getProtocol().equals("test1")) {
8856
sumInvoker1++;
@@ -98,6 +66,8 @@ public void testSelect() {
9866
// the sumInvoker1 : sumInvoker2 approximately equal to 1: 9
9967
System.out.println(sumInvoker1);
10068
System.out.println(sumInvoker2);
69+
70+
Assert.assertEquals("select failed!", sumInvoker1 + sumInvoker2, loop);
10171
}
10272

10373
class MyLeastActiveLoadBalance extends AbstractLoadBalance {

dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,34 @@ private static int calculateDefaultWarmupWeight(int uptime) {
145145
return AbstractLoadBalance.calculateWarmupWeight(uptime, Constants.DEFAULT_WARMUP, Constants.DEFAULT_WEIGHT);
146146
}
147147

148+
/*------------------------------------test invokers for weight---------------------------------------*/
149+
150+
protected List<Invoker<LoadBalanceBaseTest>> weightInvokers = new ArrayList<Invoker<LoadBalanceBaseTest>>();
151+
protected Invoker<LoadBalanceBaseTest> weightInvoker1;
152+
protected Invoker<LoadBalanceBaseTest> weightInvoker2;
153+
protected Invoker<LoadBalanceBaseTest> weightInvoker3;
154+
155+
@Before
156+
public void before() throws Exception {
157+
weightInvoker1 = mock(Invoker.class);
158+
weightInvoker2 = mock(Invoker.class);
159+
weightInvoker3 = mock(Invoker.class);
160+
161+
URL url1 = URL.valueOf("test1://0:1/DemoService");
162+
URL url2 = URL.valueOf("test2://0:9/DemoService");
163+
URL url3 = URL.valueOf("test3://1:6/DemoService");
164+
165+
given(weightInvoker1.isAvailable()).willReturn(true);
166+
given(weightInvoker1.getUrl()).willReturn(url1);
167+
168+
given(weightInvoker2.isAvailable()).willReturn(true);
169+
given(weightInvoker2.getUrl()).willReturn(url2);
170+
171+
given(weightInvoker3.isAvailable()).willReturn(true);
172+
given(weightInvoker3.getUrl()).willReturn(url3);
173+
174+
weightInvokers.add(weightInvoker1);
175+
weightInvokers.add(weightInvoker2);
176+
weightInvokers.add(weightInvoker3);
177+
}
148178
}

dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalanceTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
*/
1717
package org.apache.dubbo.rpc.cluster.loadbalance;
1818

19+
import org.apache.dubbo.common.URL;
20+
import org.apache.dubbo.rpc.Invocation;
1921
import org.apache.dubbo.rpc.Invoker;
2022
import org.apache.dubbo.rpc.RpcStatus;
2123

2224
import org.junit.Assert;
2325
import org.junit.Test;
2426

27+
import java.util.List;
2528
import java.util.Map;
29+
import java.util.Random;
2630
import java.util.concurrent.atomic.AtomicLong;
2731

2832
/**
@@ -54,4 +58,73 @@ public void testRandomLoadBalanceSelect() {
5458
Assert.assertEquals(0, counter.get(invoker5).intValue());
5559
}
5660

61+
@Test
62+
public void testSelectByWeight() {
63+
int sumInvoker1 = 0;
64+
int sumInvoker2 = 0;
65+
int sumInvoker3 = 0;
66+
int loop = 100000;
67+
68+
MyRandomLoadBalance lb = new MyRandomLoadBalance();
69+
for (int i = 0; i < loop; i++) {
70+
Invoker selected = lb.select(weightInvokers, null, null);
71+
72+
if (selected.getUrl().getProtocol().equals("test1")) {
73+
sumInvoker1++;
74+
}
75+
76+
if (selected.getUrl().getProtocol().equals("test2")) {
77+
sumInvoker2++;
78+
}
79+
80+
if (selected.getUrl().getProtocol().equals("test3")) {
81+
sumInvoker3++;
82+
}
83+
}
84+
85+
// 1 : 9 : 6
86+
System.out.println(sumInvoker1);
87+
System.out.println(sumInvoker2);
88+
System.out.println(sumInvoker3);
89+
Assert.assertEquals("select failed!", sumInvoker1 + sumInvoker2 + sumInvoker3, loop);
90+
}
91+
92+
class MyRandomLoadBalance extends AbstractLoadBalance {
93+
94+
public static final String NAME = "random";
95+
96+
private final Random random = new Random();
97+
98+
@Override
99+
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
100+
int length = invokers.size(); // Number of invokers
101+
int totalWeight = 0; // The sum of weights
102+
boolean sameWeight = true; // Every invoker has the same weight?
103+
for (int i = 0; i < length; i++) {
104+
105+
// mock weight
106+
int weight = invokers.get(i).getUrl().getPort();
107+
108+
totalWeight += weight; // Sum
109+
if (sameWeight && i > 0
110+
&& weight != invokers.get(i - 1).getUrl().getPort()) {
111+
sameWeight = false;
112+
}
113+
}
114+
if (totalWeight > 0 && !sameWeight) {
115+
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
116+
int offset = random.nextInt(totalWeight);
117+
// Return a invoker based on the random value.
118+
for (int i = 0; i < length; i++) {
119+
offset -= invokers.get(i).getUrl().getPort();
120+
if (offset < 0) {
121+
return invokers.get(i);
122+
}
123+
}
124+
}
125+
// If all invokers have the same weight value or totalWeight=0, return evenly.
126+
return invokers.get(random.nextInt(length));
127+
}
128+
}
129+
57130
}

dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalanceTest.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,19 @@
1616
*/
1717
package org.apache.dubbo.rpc.cluster.loadbalance;
1818

19+
import org.apache.dubbo.common.URL;
20+
import org.apache.dubbo.common.utils.AtomicPositiveInteger;
21+
import org.apache.dubbo.rpc.Invocation;
1922
import org.apache.dubbo.rpc.Invoker;
2023

2124
import org.junit.Assert;
2225
import org.junit.Test;
2326

27+
import java.util.LinkedHashMap;
28+
import java.util.List;
2429
import java.util.Map;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.ConcurrentMap;
2532
import java.util.concurrent.atomic.AtomicLong;
2633

2734
public class RoundRobinLoadBalanceTest extends LoadBalanceBaseTest {
@@ -34,4 +41,107 @@ public void testRoundRobinLoadBalanceSelect() {
3441
Assert.assertTrue("abs diff should < 1", Math.abs(count - runs / (0f + invokers.size())) < 1f);
3542
}
3643
}
44+
45+
@Test
46+
public void testSelectByWeight() {
47+
int sumInvoker1 = 0;
48+
int sumInvoker2 = 0;
49+
int sumInvoker3 = 0;
50+
int loop = 100000;
51+
52+
MyRoundRobinLoadBalance lb = new MyRoundRobinLoadBalance();
53+
for (int i = 0; i < loop; i++) {
54+
Invoker selected = lb.select(weightInvokers, null, null);
55+
56+
if (selected.getUrl().getProtocol().equals("test1")) {
57+
sumInvoker1++;
58+
}
59+
60+
if (selected.getUrl().getProtocol().equals("test2")) {
61+
sumInvoker2++;
62+
}
63+
64+
if (selected.getUrl().getProtocol().equals("test3")) {
65+
sumInvoker3++;
66+
}
67+
}
68+
69+
// 1 : 9 : 6
70+
System.out.println(sumInvoker1);
71+
System.out.println(sumInvoker2);
72+
System.out.println(sumInvoker3);
73+
Assert.assertEquals("select failed!", sumInvoker1 + sumInvoker2 + sumInvoker3, loop);
74+
}
75+
76+
class MyRoundRobinLoadBalance extends AbstractLoadBalance {
77+
78+
public static final String NAME = "roundrobin";
79+
80+
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
81+
82+
@Override
83+
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
84+
String key = "method1";
85+
int length = invokers.size(); // Number of invokers
86+
int maxWeight = 0; // The maximum weight
87+
int minWeight = Integer.MAX_VALUE; // The minimum weight
88+
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
89+
int weightSum = 0;
90+
for (int i = 0; i < length; i++) {
91+
92+
int weight = invokers.get(i).getUrl().getPort();
93+
94+
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
95+
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
96+
if (weight > 0) {
97+
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
98+
weightSum += weight;
99+
}
100+
}
101+
AtomicPositiveInteger sequence = sequences.get(key);
102+
if (sequence == null) {
103+
sequences.putIfAbsent(key, new AtomicPositiveInteger());
104+
sequence = sequences.get(key);
105+
}
106+
int currentSequence = sequence.getAndIncrement();
107+
if (maxWeight > 0 && minWeight < maxWeight) {
108+
int mod = currentSequence % weightSum;
109+
for (int i = 0; i < maxWeight; i++) {
110+
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
111+
final Invoker<T> k = each.getKey();
112+
final IntegerWrapper v = each.getValue();
113+
if (mod == 0 && v.getValue() > 0) {
114+
return k;
115+
}
116+
if (v.getValue() > 0) {
117+
v.decrement();
118+
mod--;
119+
}
120+
}
121+
}
122+
}
123+
// Round robin
124+
return invokers.get(currentSequence % length);
125+
}
126+
127+
private final class IntegerWrapper {
128+
private int value;
129+
130+
public IntegerWrapper(int value) {
131+
this.value = value;
132+
}
133+
134+
public int getValue() {
135+
return value;
136+
}
137+
138+
public void setValue(int value) {
139+
this.value = value;
140+
}
141+
142+
public void decrement() {
143+
this.value--;
144+
}
145+
}
146+
}
37147
}

0 commit comments

Comments
 (0)