Skip to content

Commit 2509c74

Browse files
jasonjoo2010jason-joo
authored andcommitted
Smooth Round Robin selection
1 parent 2e826a6 commit 2509c74

File tree

4 files changed

+333
-111
lines changed

4 files changed

+333
-111
lines changed

dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java

Lines changed: 110 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,87 +17,141 @@
1717
package com.alibaba.dubbo.rpc.cluster.loadbalance;
1818

1919
import com.alibaba.dubbo.common.URL;
20-
import com.alibaba.dubbo.common.utils.AtomicPositiveInteger;
2120
import com.alibaba.dubbo.rpc.Invocation;
2221
import com.alibaba.dubbo.rpc.Invoker;
2322

24-
import java.util.LinkedHashMap;
23+
import java.util.Collection;
24+
import java.util.Iterator;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Map.Entry;
2728
import java.util.concurrent.ConcurrentHashMap;
2829
import java.util.concurrent.ConcurrentMap;
30+
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.concurrent.atomic.AtomicLong;
2932

3033
/**
3134
* Round robin load balance.
35+
*
36+
* Smoothly round robin's implementation @since 2.6.5
37+
* @author jason
3238
*
3339
*/
3440
public class RoundRobinLoadBalance extends AbstractLoadBalance {
35-
3641
public static final String NAME = "roundrobin";
42+
43+
private static int RECYCLE_PERIOD = 60000;
44+
45+
protected static class WeightedRoundRobin {
46+
private int weight;
47+
private AtomicLong current = new AtomicLong(0);
48+
private long lastUpdate;
49+
public int getWeight() {
50+
return weight;
51+
}
52+
public void setWeight(int weight) {
53+
this.weight = weight;
54+
current.set(0);
55+
}
56+
public long increaseCurrent() {
57+
return current.addAndGet(weight);
58+
}
59+
public void sel(int total) {
60+
current.addAndGet(-1 * total);
61+
}
62+
public long getLastUpdate() {
63+
return lastUpdate;
64+
}
65+
public void setLastUpdate(long lastUpdate) {
66+
this.lastUpdate = lastUpdate;
67+
}
68+
}
3769

38-
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
39-
70+
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
71+
private AtomicBoolean updateLock = new AtomicBoolean();
72+
73+
/**
74+
* get invoker addr list cached for specified invocation
75+
* <p>
76+
* <b>for unit test only</b>
77+
*
78+
* @param invokers
79+
* @param invocation
80+
* @return
81+
*/
82+
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
83+
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
84+
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
85+
if (map != null) {
86+
return map.keySet();
87+
}
88+
return null;
89+
}
90+
4091
@Override
4192
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
4293
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
43-
int length = invokers.size(); // Number of invokers
44-
int maxWeight = 0; // The maximum weight
45-
int minWeight = Integer.MAX_VALUE; // The minimum weight
46-
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
47-
int weightSum = 0;
48-
for (int i = 0; i < length; i++) {
49-
int weight = getWeight(invokers.get(i), invocation);
50-
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
51-
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
52-
if (weight > 0) {
53-
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
54-
weightSum += weight;
55-
}
94+
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
95+
if (map == null) {
96+
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
97+
map = methodWeightMap.get(key);
5698
}
57-
AtomicPositiveInteger sequence = sequences.get(key);
58-
if (sequence == null) {
59-
sequences.putIfAbsent(key, new AtomicPositiveInteger());
60-
sequence = sequences.get(key);
99+
int totalWeight = 0;
100+
long maxCurrent = Long.MIN_VALUE;
101+
long now = System.currentTimeMillis();
102+
Invoker<T> selectedInvoker = null;
103+
WeightedRoundRobin selectedWRR = null;
104+
for (Invoker<T> invoker : invokers) {
105+
String identifyString = invoker.getUrl().toIdentityString();
106+
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
107+
int weight = getWeight(invoker, invocation);
108+
if (weight < 0) {
109+
weight = 0;
110+
}
111+
if (weightedRoundRobin == null) {
112+
weightedRoundRobin = new WeightedRoundRobin();
113+
weightedRoundRobin.setWeight(weight);
114+
map.putIfAbsent(identifyString, weightedRoundRobin);
115+
weightedRoundRobin = map.get(identifyString);
116+
}
117+
if (weight != weightedRoundRobin.getWeight()) {
118+
//weight changed
119+
weightedRoundRobin.setWeight(weight);
120+
}
121+
long cur = weightedRoundRobin.increaseCurrent();
122+
weightedRoundRobin.setLastUpdate(now);
123+
if (cur > maxCurrent) {
124+
maxCurrent = cur;
125+
selectedInvoker = invoker;
126+
selectedWRR = weightedRoundRobin;
127+
}
128+
totalWeight += weight;
61129
}
62-
int currentSequence = sequence.getAndIncrement();
63-
if (maxWeight > 0 && minWeight < maxWeight) {
64-
int mod = currentSequence % weightSum;
65-
for (int i = 0; i < maxWeight; i++) {
66-
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
67-
final Invoker<T> k = each.getKey();
68-
final IntegerWrapper v = each.getValue();
69-
if (mod == 0 && v.getValue() > 0) {
70-
return k;
71-
}
72-
if (v.getValue() > 0) {
73-
v.decrement();
74-
mod--;
130+
if (!updateLock.get() && invokers.size() != map.size()) {
131+
if (updateLock.compareAndSet(false, true)) {
132+
try {
133+
// copy -> modify -> update reference
134+
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
135+
newMap.putAll(map);
136+
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
137+
while (it.hasNext()) {
138+
Entry<String, WeightedRoundRobin> item = it.next();
139+
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
140+
it.remove();
141+
}
75142
}
143+
methodWeightMap.put(key, newMap);
144+
} finally {
145+
updateLock.set(false);
76146
}
77147
}
78148
}
79-
// Round robin
80-
return invokers.get(currentSequence % length);
81-
}
82-
83-
private static final class IntegerWrapper {
84-
private int value;
85-
86-
public IntegerWrapper(int value) {
87-
this.value = value;
88-
}
89-
90-
public int getValue() {
91-
return value;
92-
}
93-
94-
public void setValue(int value) {
95-
this.value = value;
96-
}
97-
98-
public void decrement() {
99-
this.value--;
149+
if (selectedInvoker != null) {
150+
selectedWRR.sel(totalWeight);
151+
return selectedInvoker;
100152
}
153+
// should not happen here
154+
return invokers.get(0);
101155
}
102156

103157
}

dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/StickyTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ public int testSticky(String methodName, boolean check) {
114114

115115
given(invoker1.invoke(invocation)).willReturn(result);
116116
given(invoker1.isAvailable()).willReturn(true);
117-
given(invoker1.getUrl()).willReturn(url);
117+
given(invoker1.getUrl()).willReturn(url.setPort(1));
118118
given(invoker1.getInterface()).willReturn(StickyTest.class);
119119

120120
given(invoker2.invoke(invocation)).willReturn(result);
121121
given(invoker2.isAvailable()).willReturn(true);
122-
given(invoker2.getUrl()).willReturn(url);
122+
given(invoker2.getUrl()).willReturn(url.setPort(2));
123123
given(invoker2.getInterface()).willReturn(StickyTest.class);
124124

125125
invocation.setMethodName(methodName);
@@ -158,4 +158,4 @@ public Invoker<T> getSelectedInvoker() {
158158
return selectedInvoker;
159159
}
160160
}
161-
}
161+
}

0 commit comments

Comments
 (0)