Skip to content

Commit cdbbd27

Browse files
author
Richard Park
committed
More TODOs
1 parent 61fa8ed commit cdbbd27

2 files changed

Lines changed: 57 additions & 41 deletions

File tree

sdk/messaging/azeventhubs/processor_load_balancer.go

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,10 @@ type loadBalancerInfo struct {
4646
// it contains _all_ the partitions for that particular consumer.
4747
aboveMax []Ownership
4848

49-
// maxAllowed is the maximum number of partitions a consumer should have
50-
// If partitions do not divide evenly this will be the "theoretical" max
51-
// with the assumption that this particular consumer will get an extra
52-
// partition.
53-
maxAllowed int
54-
55-
// extraPartitionPossible is true if the partitions cannot split up evenly
56-
// amongst all the known consumers.
57-
extraPartitionPossible bool
49+
// claimMorePartitions is true when we should try to claim more partitions
50+
// because we're under the limit, or we're in a situation where we could claim
51+
// one extra partition.
52+
claimMorePartitions bool
5853

5954
raw []Ownership
6055
}
@@ -68,33 +63,9 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
6863
return nil, err
6964
}
7065

71-
claimMorePartitions := true
72-
73-
if len(lbinfo.current) >= lbinfo.maxAllowed {
74-
// - I have _exactly_ the right amount
75-
// or
76-
// - I have too many. We expect to have some stolen from us, but we'll maintain
77-
// ownership for now.
78-
claimMorePartitions = false
79-
log.Writef(EventConsumer, "[%s] Owns %d/%d, no more needed", lb.details.ClientID, len(lbinfo.current), lbinfo.maxAllowed)
80-
} else if lbinfo.extraPartitionPossible && len(lbinfo.current) == lbinfo.maxAllowed-1 {
81-
// In the 'extraPartitionPossible' scenario, some consumers will have an extra partition
82-
// since things don't divide up evenly. We're one under the max, which means we _might_
83-
// be able to claim another one.
84-
//
85-
// We will attempt to grab _one_ more but only if there are free partitions available
86-
// or if one of the consumers has more than the max allowed.
87-
claimMorePartitions = len(lbinfo.unownedOrExpired) > 0 || len(lbinfo.aboveMax) > 0
88-
log.Writef(EventConsumer, "[%s] Unowned/expired: %d, above max: %d, need to claim: %t",
89-
lb.details.ClientID,
90-
len(lbinfo.unownedOrExpired),
91-
len(lbinfo.aboveMax),
92-
claimMorePartitions)
93-
}
94-
9566
ownerships := lbinfo.current
9667

97-
if claimMorePartitions {
68+
if lbinfo.claimMorePartitions {
9869
switch lb.strategy {
9970
case ProcessorStrategyGreedy:
10071
log.Writef(EventConsumer, "[%s] Using greedy strategy to claim partitions", lb.details.ClientID)
@@ -118,7 +89,7 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
11889
}
11990

12091
if log.Should(EventConsumer) {
121-
log.Writef(EventConsumer, "[%s] Asked for %s, got %s", lb.details.ClientID, partitionsForOwnerships(ownerships), partitionsForOwnerships(actual))
92+
log.Writef(EventConsumer, "[%0.5s] Asked for %s, got %s", lb.details.ClientID, partitionsForOwnerships(ownerships), partitionsForOwnerships(actual))
12293
}
12394

12495
return actual, nil
@@ -199,6 +170,8 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par
199170
// only allow owners to keep extra partitions if we've already met our minimum bar. Otherwise
200171
// above the minimum is fair game.
201172
if allowExtraPartition && len(groupedByOwner[lb.details.ClientID]) >= minRequired {
173+
// raise the waterline - we've got all we need so we don't need to steal from processors
174+
// that only have one extra partition (ie, avoids thrash)
202175
maxAllowed += 1
203176
}
204177

@@ -214,13 +187,53 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par
214187
}
215188
}
216189

190+
asdkjlasdfas
191+
192+
// TODO: getAvailablePartitions() was exporting too much state.
193+
//
194+
// TODO: I've moved this logic into getAvailablePartitions, where it makes more sense
195+
// TODO: in context, and remove `lbinfo.maxAllowed` and `lbinfo.extraPartitionPossible` from the load balancer
196+
// TODO: info that's returned.
197+
//
198+
199+
asdkjlasdfas
200+
201+
claimMorePartitions := true
202+
current := groupedByOwner[lb.details.ClientID]
203+
204+
asdfasdfasfasdf
205+
206+
// TODO: this code needs to be checked for two variants - I am allowing an extra partition or
207+
// TODO: I'm not (based on the logic above that restricts it).
208+
209+
if len(current) >= maxAllowed {
210+
// - I have _exactly_ the right amount
211+
// or
212+
// - I have too many. We expect to have some stolen from us, but we'll maintain
213+
// ownership for now.
214+
claimMorePartitions = false
215+
log.Writef(EventConsumer, "[%s] Owns %d/%d, no more needed", lb.details.ClientID, len(current), maxAllowed)
216+
} else if allowExtraPartition && len(current) == maxAllowed-1 {
217+
// In the 'extraPartitionPossible' scenario, some consumers will have an extra partition
218+
// since things don't divide up evenly. We're one under the max, which means we _might_
219+
// be able to claim another one.
220+
//
221+
// We will attempt to grab _one_ more but only if there are free partitions available
222+
// or if one of the consumers has more than the max allowed.
223+
claimMorePartitions = len(unownedOrExpired) > 0 || len(aboveMax) > 0
224+
log.Writef(EventConsumer, "[%s] Unowned/expired: %d, above max: %d, need to claim: %t",
225+
lb.details.ClientID,
226+
len(unownedOrExpired),
227+
len(aboveMax),
228+
claimMorePartitions)
229+
}
230+
217231
return loadBalancerInfo{
218-
current: groupedByOwner[lb.details.ClientID],
219-
unownedOrExpired: unownedOrExpired,
220-
aboveMax: aboveMax,
221-
maxAllowed: maxAllowed,
222-
extraPartitionPossible: allowExtraPartition,
223-
raw: ownerships,
232+
current: current,
233+
unownedOrExpired: unownedOrExpired,
234+
aboveMax: aboveMax,
235+
claimMorePartitions: claimMorePartitions,
236+
raw: ownerships,
224237
}, nil
225238
}
226239

sdk/messaging/azeventhubs/processor_load_balancers_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,9 @@ func TestUnit_ProcessorLoadBalancer_allPartitionsAlreadyOwned(t *testing.T) {
410410
require.NoError(t, err)
411411
require.Empty(t, lbInfo.unownedOrExpired, "no partitions were expired in this load balancing round")
412412
require.Equal(t, td.AboveMax, ownersAsString(t, lbInfo.aboveMax, len(partitions)))
413+
414+
asdfasdfasfasdf
415+
// TODO: adjust this test to check for 'lbinfo.claimMorePartitions'
413416
})
414417
}
415418
}

0 commit comments

Comments
 (0)