Skip to content

Commit d988711

Browse files
author
Richard Park
committed
Fixing an issue where a processor wouldn't grab partitions even when it didn't own any. This became especially bad in conditions where all the partitions were claimed.
- Adding stress test (not enabled, but useful for debugging/manual testing) - Adding a lot of more unit tests around different ownership splits.
1 parent bfa1322 commit d988711

6 files changed

Lines changed: 443 additions & 16 deletions

File tree

sdk/messaging/azeventhubs/internal/eh/stress/stress-test-resources.json

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,25 @@
5252
"apiVersion": "[variables('apiVersion')]",
5353
"name": "[variables('authorizationName')]",
5454
"location": "[variables('location')]",
55-
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"],
55+
"dependsOn": [
56+
"[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"
57+
],
5658
"properties": {
57-
"rights": ["Listen", "Manage", "Send"]
59+
"rights": [
60+
"Listen",
61+
"Manage",
62+
"Send"
63+
]
5864
}
5965
},
6066
{
6167
"type": "Microsoft.EventHub/namespaces/eventhubs",
6268
"apiVersion": "[variables('apiVersion')]",
6369
"name": "[variables('eventHubNameFull')]",
6470
"location": "[variables('location')]",
65-
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"],
71+
"dependsOn": [
72+
"[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"
73+
],
6674
"properties": {
6775
"messageRetentionInDays": 7,
6876
"partitionCount": 32
@@ -73,7 +81,9 @@
7381
"apiVersion": "[variables('apiVersion')]",
7482
"name": "[concat(variables('namespaceName'), '/default')]",
7583
"location": "[variables('location')]",
76-
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"],
84+
"dependsOn": [
85+
"[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"
86+
],
7787
"properties": {
7888
"defaultAction": "Deny",
7989
"virtualNetworkRules": [],
@@ -127,13 +137,15 @@
127137
"name": "[concat('default/', variables('containerName'))]",
128138
"type": "blobServices/containers",
129139
"apiVersion": "[variables('storageApiVersion')]",
130-
"dependsOn": ["[variables('storageAccountName')]"]
140+
"dependsOn": [
141+
"[variables('storageAccountName')]"
142+
]
131143
}
132144
]
133145
},
134146
],
135147
"outputs": {
136-
"EVENTHUB_NAME": {
148+
"EVENTHUB_NAME_STRESS": {
137149
"type": "string",
138150
"value": "[variables('eventHubName')]"
139151
},

sdk/messaging/azeventhubs/internal/eh/stress/stress.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func main() {
1717
fn func(ctx context.Context) error
1818
}{
1919
{name: "batch", fn: tests.BatchStressTester},
20+
{name: "balance", fn: tests.BalanceTester},
2021
{name: "processor", fn: tests.ProcessorStressTester},
2122
}
2223

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
package tests
2+
3+
import (
4+
"context"
5+
"errors"
6+
"flag"
7+
"fmt"
8+
"log"
9+
"os"
10+
"sort"
11+
"strings"
12+
"sync"
13+
"time"
14+
15+
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
16+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
17+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
18+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
19+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
20+
)
21+
22+
const (
23+
EventBalanceTest azlog.Event = "balance.test"
24+
)
25+
26+
// BalanceTester checks that we can properly distribute partitions and
27+
// maintain it over time.
28+
func BalanceTester(ctx context.Context) error {
29+
fs := flag.NewFlagSet("", flag.ContinueOnError)
30+
31+
numProcessors := fs.Int("processors", 32, "The # of processor instances to run")
32+
waitTime := fs.Duration("time", time.Minute, "The amount of time to wait before exiting the test")
33+
strategy := fs.String("strategy", string(azeventhubs.ProcessorStrategyBalanced), "The partition acquisition strategy to use (balanced, greedy)")
34+
35+
if err := fs.Parse(os.Args[2:]); err != nil {
36+
return err
37+
}
38+
39+
testData, err := newStressTestData("balancetester", false, map[string]string{
40+
"processors": fmt.Sprintf("%d", *numProcessors),
41+
"waitTime": waitTime.String(),
42+
"strategy": *strategy,
43+
})
44+
45+
if err != nil {
46+
return err
47+
}
48+
49+
containerName := testData.runID
50+
51+
wg := sync.WaitGroup{}
52+
failuresChan := make(chan error, *numProcessors)
53+
54+
testCtx, cancelTest := context.WithTimeout(context.Background(), *waitTime)
55+
defer cancelTest()
56+
57+
ownersMap := newOwners()
58+
59+
go func() {
60+
Loop:
61+
for {
62+
select {
63+
case <-ctx.Done():
64+
break Loop
65+
case <-time.After(30 * time.Second):
66+
log.Printf("Current: %s", ownersMap)
67+
}
68+
}
69+
}()
70+
71+
for i := 0; i < *numProcessors; i++ {
72+
wg.Add(1)
73+
74+
go func(i int) {
75+
defer wg.Done()
76+
77+
args := processArgs{
78+
Name: fmt.Sprintf("proc%02d", i),
79+
Owners: ownersMap,
80+
CS: testData.ConnectionString,
81+
HubName: testData.HubName,
82+
StorageCS: testData.StorageConnectionString,
83+
ContainerName: containerName,
84+
Strategy: azeventhubs.ProcessorStrategy(*strategy),
85+
}
86+
87+
if err := process(testCtx, args); err != nil {
88+
failuresChan <- err
89+
cancelTest()
90+
return
91+
}
92+
}(i)
93+
}
94+
95+
wg.Wait()
96+
close(failuresChan)
97+
cancelTest()
98+
99+
// any errors?
100+
for err := range failuresChan {
101+
testData.TC.TrackException(err)
102+
fmt.Printf("ERROR: %s\n", err)
103+
return err
104+
}
105+
106+
// grab the # of partitions
107+
err = func() error {
108+
pc, err := azeventhubs.NewProducerClientFromConnectionString(testData.ConnectionString, testData.HubName, nil)
109+
110+
if err != nil {
111+
return err
112+
}
113+
114+
defer func() {
115+
_ = pc.Close(ctx)
116+
}()
117+
118+
ehProps, err := pc.GetEventHubProperties(ctx, nil)
119+
120+
if err != nil {
121+
return err
122+
}
123+
124+
// look through the dictionary now - how does everything look? Did all processors get at least
125+
// one partition?
126+
if len(ownersMap.byProcessor) != *numProcessors {
127+
return errors.New("not all partition processors got a partition")
128+
}
129+
130+
// now let's make sure everyone only took a fair share
131+
min := len(ehProps.PartitionIDs) / *numProcessors
132+
133+
for _, owned := range ownersMap.byProcessor {
134+
if owned < min {
135+
return fmt.Errorf("a processor had %d partitions, but should have least %d", owned, min)
136+
}
137+
}
138+
139+
return nil
140+
}()
141+
142+
if err != nil {
143+
err := fmt.Errorf("unbalanced owners, map\n%s: %w", ownersMap, err)
144+
testData.TC.TrackException(err)
145+
return err
146+
}
147+
148+
log.Printf("BALANCED")
149+
return nil
150+
}
151+
152+
type owners struct {
153+
mu sync.Mutex
154+
byPartition map[string]ownerValue
155+
byProcessor map[string]int
156+
}
157+
158+
type ownerValue struct {
159+
When time.Time
160+
Who string
161+
}
162+
163+
func (v ownerValue) String() string {
164+
return fmt.Sprintf("%s: %s", v.Who, time.Since(v.When)/time.Second)
165+
}
166+
167+
func newOwners() *owners {
168+
o := &owners{
169+
byPartition: map[string]ownerValue{},
170+
byProcessor: map[string]int{},
171+
}
172+
173+
return o
174+
}
175+
176+
func partKey(partID string) string {
177+
return fmt.Sprintf("%2s", partID)
178+
}
179+
180+
func (o *owners) Set(processorName string, partitionID string) {
181+
o.mu.Lock()
182+
defer o.mu.Unlock()
183+
184+
partKey := partKey(partitionID)
185+
186+
o.byPartition[partKey] = ownerValue{
187+
When: time.Now(),
188+
Who: processorName,
189+
}
190+
191+
o.byProcessor[processorName]++
192+
azlog.Writef(EventBalanceTest, "[gain] processor: %s, partition: %s, total: %d", processorName, partitionID, o.byProcessor[processorName])
193+
}
194+
195+
func (o *owners) Remove(processorName string, partitionID string) {
196+
o.mu.Lock()
197+
defer o.mu.Unlock()
198+
199+
partKey := partKey(partitionID)
200+
201+
if o.byPartition[partKey].Who == processorName {
202+
delete(o.byPartition, partKey)
203+
}
204+
205+
o.byProcessor[processorName]--
206+
azlog.Writef(EventBalanceTest, "[loss] processor: %s, partition: %s, total: %d", processorName, partitionID, o.byProcessor[processorName])
207+
}
208+
209+
func stringize[T any](m map[string]T, stringer func(v T) string) string {
210+
var keys []string
211+
212+
for k := range m {
213+
keys = append(keys, k)
214+
}
215+
216+
sort.Strings(keys)
217+
218+
sb := strings.Builder{}
219+
220+
for _, k := range keys {
221+
sb.WriteString(fmt.Sprintf(" %s: %s\n", k, stringer(m[k])))
222+
}
223+
224+
return sb.String()
225+
}
226+
227+
func (o *owners) String() string {
228+
o.mu.Lock()
229+
defer o.mu.Unlock()
230+
231+
return fmt.Sprintf("By partition:\n%s\nBy processor:\n%s\n",
232+
stringize(o.byPartition, func(v ownerValue) string { return v.String() }),
233+
stringize(o.byProcessor, func(v int) string { return fmt.Sprintf("%d", v) }),
234+
)
235+
}
236+
237+
type processArgs struct {
238+
Name string
239+
Owners *owners
240+
241+
CS string
242+
HubName string
243+
244+
StorageCS string
245+
ContainerName string
246+
247+
Strategy azeventhubs.ProcessorStrategy
248+
}
249+
250+
func process(ctx context.Context, args processArgs) error {
251+
client, err := azeventhubs.NewConsumerClientFromConnectionString(args.CS, args.HubName, azeventhubs.DefaultConsumerGroup, nil)
252+
253+
if err != nil {
254+
return err
255+
}
256+
257+
defer func() { _ = client.Close(ctx) }()
258+
259+
blobClient, err := azblob.NewClientFromConnectionString(args.StorageCS, nil)
260+
261+
if err != nil {
262+
return err
263+
}
264+
265+
containerClient := blobClient.ServiceClient().NewContainerClient(args.ContainerName)
266+
267+
if _, err := containerClient.Create(ctx, nil); err != nil {
268+
if !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
269+
return err
270+
}
271+
}
272+
273+
defer func() { _, _ = containerClient.Delete(ctx, nil) }()
274+
275+
blobStore, err := checkpoints.NewBlobStore(containerClient, nil)
276+
277+
if err != nil {
278+
return err
279+
}
280+
281+
processor, err := azeventhubs.NewProcessor(client, blobStore, &azeventhubs.ProcessorOptions{
282+
LoadBalancingStrategy: args.Strategy,
283+
})
284+
285+
if err != nil {
286+
return err
287+
}
288+
289+
go func() {
290+
for {
291+
pc := processor.NextPartitionClient(ctx)
292+
293+
if pc == nil {
294+
break
295+
}
296+
297+
args.Owners.Set(args.Name, pc.PartitionID())
298+
299+
go keepAlive(ctx, pc, args)
300+
}
301+
}()
302+
303+
return processor.Run(ctx)
304+
}
305+
306+
func keepAlive(ctx context.Context, pc *azeventhubs.ProcessorPartitionClient, args processArgs) {
307+
ReceiveLoop:
308+
for {
309+
_, err := pc.ReceiveEvents(ctx, 1, nil)
310+
311+
var eventHubErr *azeventhubs.Error
312+
313+
switch {
314+
case errors.Is(err, context.Canceled):
315+
// test is over, we'll see how everything shook out.
316+
break ReceiveLoop
317+
case errors.As(err, &eventHubErr) && eventHubErr.Code == azeventhubs.ErrorCodeOwnershipLost:
318+
// we've swapped ownership with _someone_. Record it in the map, but be careful of potential ordering
319+
// and only delete if we're the one in the map!
320+
args.Owners.Remove(args.Name, pc.PartitionID())
321+
break ReceiveLoop
322+
}
323+
}
324+
}

sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func newStressTestData(name string, verbose bool, baggage map[string]string) (*s
8080

8181
variables := map[string]*string{
8282
"EVENTHUB_CONNECTION_STRING": &td.ConnectionString,
83-
"EVENTHUB_NAME": &td.HubName,
83+
"EVENTHUB_NAME_STRESS": &td.HubName,
8484
"CHECKPOINTSTORE_STORAGE_CONNECTION_STRING": &td.StorageConnectionString,
8585
}
8686

0 commit comments

Comments
 (0)