Skip to content

Commit 437b4eb

Browse files
jasonjoo2010jason-joo
authored andcommitted
Smooth Round Robin selection
1 parent 2e826a6 commit 437b4eb

File tree

3 files changed

+296
-110
lines changed

3 files changed

+296
-110
lines changed

dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java

Lines changed: 119 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
package com.alibaba.dubbo.rpc.cluster.loadbalance;
1818

1919
import com.alibaba.dubbo.common.URL;
20-
import com.alibaba.dubbo.common.utils.AtomicPositiveInteger;
2120
import com.alibaba.dubbo.rpc.Invocation;
2221
import com.alibaba.dubbo.rpc.Invoker;
2322

24-
import java.util.LinkedHashMap;
23+
import java.util.Collection;
24+
import java.util.HashMap;
25+
import java.util.Iterator;
2526
import java.util.List;
2627
import java.util.Map;
28+
import java.util.Map.Entry;
2729
import java.util.concurrent.ConcurrentHashMap;
2830
import java.util.concurrent.ConcurrentMap;
2931

@@ -32,72 +34,131 @@
3234
*
3335
*/
3436
public class RoundRobinLoadBalance extends AbstractLoadBalance {
35-
3637
public static final String NAME = "roundrobin";
38+
39+
private static int RECYCLE_PERIOD = 60000;
40+
41+
protected static class WeightedRoundRobin {
42+
private int weight;
43+
private int current;
44+
private long lastRecycle;
45+
private long lastUpdate;
46+
public int getWeight() {
47+
return weight;
48+
}
49+
public void setWeight(int weight) {
50+
this.weight = weight;
51+
}
52+
public void setCurrent(int current) {
53+
this.current = current;
54+
}
55+
public int increaseWeight() {
56+
current += weight;
57+
return current;
58+
}
59+
public void sel(int total) {
60+
current -= total;
61+
}
62+
public long getLastRecycle() {
63+
return lastRecycle;
64+
}
65+
public void setLastRecycle(long lastRecycle) {
66+
this.lastRecycle = lastRecycle;
67+
}
68+
public long getLastUpdate() {
69+
return lastUpdate;
70+
}
71+
public void setLastUpdate(long lastUpdate) {
72+
this.lastUpdate = lastUpdate;
73+
}
74+
}
3775

38-
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
39-
76+
private ConcurrentMap<String, Map<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, Map<String, WeightedRoundRobin>>();
77+
78+
/**
79+
* get invoker addr list cached for specified invocation
80+
* <p>
81+
* <b>for unit test only</b>
82+
*
83+
* @param invokers
84+
* @param invocation
85+
* @return
86+
*/
87+
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
88+
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
89+
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
90+
if (map != null) {
91+
return map.keySet();
92+
}
93+
return null;
94+
}
95+
4096
@Override
4197
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
4298
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
43-
int length = invokers.size(); // Number of invokers
44-
int maxWeight = 0; // The maximum weight
45-
int minWeight = Integer.MAX_VALUE; // The minimum weight
46-
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
47-
int weightSum = 0;
48-
for (int i = 0; i < length; i++) {
49-
int weight = getWeight(invokers.get(i), invocation);
50-
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
51-
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
52-
if (weight > 0) {
53-
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
54-
weightSum += weight;
55-
}
99+
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
100+
if (map == null) {
101+
methodWeightMap.putIfAbsent(key, new HashMap<String, WeightedRoundRobin>());
102+
map = methodWeightMap.get(key);
56103
}
57-
AtomicPositiveInteger sequence = sequences.get(key);
58-
if (sequence == null) {
59-
sequences.putIfAbsent(key, new AtomicPositiveInteger());
60-
sequence = sequences.get(key);
61-
}
62-
int currentSequence = sequence.getAndIncrement();
63-
if (maxWeight > 0 && minWeight < maxWeight) {
64-
int mod = currentSequence % weightSum;
65-
for (int i = 0; i < maxWeight; i++) {
66-
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
67-
final Invoker<T> k = each.getKey();
68-
final IntegerWrapper v = each.getValue();
69-
if (mod == 0 && v.getValue() > 0) {
70-
return k;
71-
}
72-
if (v.getValue() > 0) {
73-
v.decrement();
74-
mod--;
104+
synchronized (map) {
105+
int totalWeight = 0;
106+
int maxCurrent = Integer.MIN_VALUE;
107+
long now = System.currentTimeMillis();
108+
boolean needRecycle = false;
109+
Invoker<T> selectedInvoker = null;
110+
WeightedRoundRobin selectedWRR = null;
111+
for (Invoker<T> invoker : invokers) {
112+
String identifyString = invoker.getUrl().toIdentityString();
113+
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
114+
int weight = getWeight(invoker, invocation);
115+
if (weight < 0) {
116+
weight = 0;
117+
}
118+
if (weightedRoundRobin == null) {
119+
weightedRoundRobin = new WeightedRoundRobin();
120+
weightedRoundRobin.setCurrent(0);
121+
weightedRoundRobin.setWeight(weight);
122+
weightedRoundRobin.setLastRecycle(now);
123+
map.put(identifyString, weightedRoundRobin);
124+
}
125+
if (weight != weightedRoundRobin.getWeight()) {
126+
//weight changed
127+
weightedRoundRobin.setCurrent(0);
128+
weightedRoundRobin.setWeight(weight);
129+
}
130+
int cur = weightedRoundRobin.increaseWeight();
131+
weightedRoundRobin.setLastUpdate(now);
132+
if (!needRecycle && now - weightedRoundRobin.getLastRecycle() > RECYCLE_PERIOD) {
133+
//try to recycle useless item every 60s if there is one outdated
134+
needRecycle = true;
135+
}
136+
if (cur > maxCurrent) {
137+
maxCurrent = cur;
138+
selectedInvoker = invoker;
139+
selectedWRR = weightedRoundRobin;
140+
}
141+
totalWeight += weight;
142+
}
143+
//recycle
144+
if (needRecycle) {
145+
Iterator<Entry<String, WeightedRoundRobin>> it = map.entrySet().iterator();
146+
while (it.hasNext()) {
147+
Entry<String, WeightedRoundRobin> item = it.next();
148+
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
149+
it.remove();
150+
} else {
151+
//pass
152+
item.getValue().setLastRecycle(now);
75153
}
76154
}
77155
}
156+
if (selectedInvoker != null) {
157+
selectedWRR.sel(totalWeight);
158+
return selectedInvoker;
159+
}
78160
}
79-
// Round robin
80-
return invokers.get(currentSequence % length);
81-
}
82-
83-
private static final class IntegerWrapper {
84-
private int value;
85-
86-
public IntegerWrapper(int value) {
87-
this.value = value;
88-
}
89-
90-
public int getValue() {
91-
return value;
92-
}
93-
94-
public void setValue(int value) {
95-
this.value = value;
96-
}
97-
98-
public void decrement() {
99-
this.value--;
100-
}
161+
return null;
101162
}
102163

103164
}

0 commit comments

Comments
 (0)