Skip to content

Commit 770d2ae

Browse files
author
Richard Park
committed
Update changelog and remove client field formatting for client ID.
1 parent d988711 commit 770d2ae

3 files changed

Lines changed: 12 additions & 10 deletions

File tree

sdk/messaging/azeventhubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Processor wasn't distributing partitions optimally. (PR#22153)
12+
1113
### Other Changes
1214

1315
## 1.0.2 (2023-11-07)

sdk/messaging/azeventhubs/processor_load_balancer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
7676
// - I have too many. We expect to have some stolen from us, but we'll maintain
7777
// ownership for now.
7878
claimMorePartitions = false
79-
log.Writef(EventConsumer, "[%0.5s] Owns %d/%d, no more needed", lb.details.ClientID, len(lbinfo.current), lbinfo.maxAllowed)
79+
log.Writef(EventConsumer, "[%s] Owns %d/%d, no more needed", lb.details.ClientID, len(lbinfo.current), lbinfo.maxAllowed)
8080
} else if lbinfo.extraPartitionPossible && len(lbinfo.current) == lbinfo.maxAllowed-1 {
8181
// In the 'extraPartitionPossible' scenario, some consumers will have an extra partition
8282
// since things don't divide up evenly. We're one under the max, which means we _might_
@@ -85,7 +85,7 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
8585
// We will attempt to grab _one_ more but only if there are free partitions available
8686
// or if one of the consumers has more than the max allowed.
8787
claimMorePartitions = len(lbinfo.unownedOrExpired) > 0 || len(lbinfo.aboveMax) > 0
88-
log.Writef(EventConsumer, "[%0.5s] Unowned/expired: %d, above max: %d, need to claim: %t",
88+
log.Writef(EventConsumer, "[%s] Unowned/expired: %d, above max: %d, need to claim: %t",
8989
lb.details.ClientID,
9090
len(lbinfo.unownedOrExpired),
9191
len(lbinfo.aboveMax),
@@ -97,17 +97,17 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
9797
if claimMorePartitions {
9898
switch lb.strategy {
9999
case ProcessorStrategyGreedy:
100-
log.Writef(EventConsumer, "[%0.5s] Using greedy strategy to claim partitions", lb.details.ClientID)
100+
log.Writef(EventConsumer, "[%s] Using greedy strategy to claim partitions", lb.details.ClientID)
101101
ownerships = lb.greedyLoadBalancer(ctx, lbinfo)
102102
case ProcessorStrategyBalanced:
103-
log.Writef(EventConsumer, "[%0.5s] Using balanced strategy to claim partitions", lb.details.ClientID)
103+
log.Writef(EventConsumer, "[%s] Using balanced strategy to claim partitions", lb.details.ClientID)
104104
o := lb.balancedLoadBalancer(ctx, lbinfo)
105105

106106
if o != nil {
107107
ownerships = append(lbinfo.current, *o)
108108
}
109109
default:
110-
return nil, fmt.Errorf("[%0.5s] invalid load balancing strategy '%s'", lb.details.ClientID, lb.strategy)
110+
return nil, fmt.Errorf("[%s] invalid load balancing strategy '%s'", lb.details.ClientID, lb.strategy)
111111
}
112112
}
113113

@@ -118,7 +118,7 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
118118
}
119119

120120
if log.Should(EventConsumer) {
121-
log.Writef(EventConsumer, "[%0.5s] Asked for %s, got %s", lb.details.ClientID, partitionsForOwnerships(ownerships), partitionsForOwnerships(actual))
121+
log.Writef(EventConsumer, "[%s] Asked for %s, got %s", lb.details.ClientID, partitionsForOwnerships(ownerships), partitionsForOwnerships(actual))
122122
}
123123

124124
return actual, nil

sdk/messaging/azeventhubs/processor_load_balancers_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,15 +285,15 @@ func TestProcessorLoadBalancers_AnyStrategy_StealsToBalance(t *testing.T) {
285285
func TestProcessorLoadBalancers_InvalidStrategy(t *testing.T) {
286286
cps := newCheckpointStoreForTest()
287287

288-
lb := newProcessorLoadBalancer(cps, newTestConsumerDetails("does not matter"), "", time.Hour)
288+
lb := newProcessorLoadBalancer(cps, newTestConsumerDetails("clientid"), "", time.Hour)
289289
ownerships, err := lb.LoadBalance(context.Background(), []string{"0"})
290290
require.Nil(t, ownerships)
291-
require.EqualError(t, err, "invalid load balancing strategy ''")
291+
require.EqualError(t, err, "[clientid] invalid load balancing strategy ''")
292292

293-
lb = newProcessorLoadBalancer(cps, newTestConsumerDetails("does not matter"), "super-greedy", time.Hour)
293+
lb = newProcessorLoadBalancer(cps, newTestConsumerDetails("clientid"), "super-greedy", time.Hour)
294294
ownerships, err = lb.LoadBalance(context.Background(), []string{"0"})
295295
require.Nil(t, ownerships)
296-
require.EqualError(t, err, "invalid load balancing strategy 'super-greedy'")
296+
require.EqualError(t, err, "[clientid] invalid load balancing strategy 'super-greedy'")
297297
}
298298

299299
func TestProcessorLoadBalancers_AnyStrategy_GrabRelinquishedPartition(t *testing.T) {

0 commit comments

Comments
 (0)