Skip to content

Commit 855be7f

Browse files
authored
Add support for multiple filters in consumer grous (#40)
* Add support for multiple filters in static and elastic consumer group configurations Refactored filtering logic, added utility methods, extended validations, updated related tests, and enhanced handling of wildcard-based partitioning. * Add test to verify partitioning behavior with full-subject hash in elastic consumer groups. * Add compatibility notes for .NET-only multi-filter and [-1] sentinel features in consumer groups * Add server version guard in elastic consumer group tests for [-1] sentinel support. * Update compatibility notes for .NET-only multi-filter and [-1] sentinel features in consumer group configurations * Refactor elastic consumer group configuration to replace wildcards with `NatsPcgPartitioningFilter`, simplify validations, and update related tests. * Update partitioning filter in paritiy * fix build * tests: fix elastic PCGroups CI failures Use unique GUID-based subject prefixes in all elastic tests to prevent "subjects overlap with an existing stream" errors when multiple TFMs share the same NATS server in CI. Add SkipBelow212Async to the empty PartitioningFilters test since full-subject partitioning requires NATS 2.12+. Fix the work-queue stream name assertion (was incorrectly prefixed with "pcg-") and add proper cleanup in try/finally. * tests: add Go interop tests for elastic PCGroups Use GoHarness to verify cross-language interop with orbit.go pcgroups v0.2.0. Three tests cover .NET-creates/Go-consumes, Go-creates/.NET-consumes, and empty PartitioningFilters interop. * tests: skip Go interop consume tests on NATS < 2.11 ElasticConsume uses priority groups which require NATS 2.11+. The Go process exits with an error on 2.10, causing the test to fail on ReadLineAsync.
1 parent 4d64809 commit 855be7f

18 files changed

+1387
-223
lines changed

src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConfig.cs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,10 @@ public sealed record NatsPcgElasticConfig
1717
public required uint MaxMembers { get; init; }
1818

1919
/// <summary>
20-
/// Gets the subject filter for the consumer group.
20+
/// Gets the partitioning filters, each pairing a subject filter with its wildcard positions for partitioning.
2121
/// </summary>
22-
[JsonPropertyName("filter")]
23-
public required string Filter { get; init; }
24-
25-
/// <summary>
26-
/// Gets the wildcard positions used for partitioning (1-indexed).
27-
/// </summary>
28-
[JsonPropertyName("partitioning_wildcards")]
29-
public required int[] PartitioningWildcards { get; init; }
22+
[JsonPropertyName("partitioning_filters")]
23+
public required NatsPcgPartitioningFilter[] PartitioningFilters { get; init; }
3024

3125
/// <summary>
3226
/// Gets the optional maximum number of buffered messages.

src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,7 @@ private async Task CreateOrGetConsumerAsync(CancellationToken cancellationToken)
260260
config = _config;
261261
}
262262

263-
string[] filters = NatsPcgPartitionDistributor.GeneratePartitionFilters(
264-
config.Members,
265-
config.MaxMembers,
266-
config.MemberMappings,
267-
_memberName);
263+
string[] filters = GenerateFiltersForMember(config, _memberName);
268264

269265
_currentFilters = filters;
270266

@@ -313,11 +309,7 @@ private async Task RecreateConsumerAsync()
313309
}
314310

315311
// Recalculate filters
316-
string[] filters = NatsPcgPartitionDistributor.GeneratePartitionFilters(
317-
config.Members,
318-
config.MaxMembers,
319-
config.MemberMappings,
320-
_memberName);
312+
string[] filters = GenerateFiltersForMember(config, _memberName);
321313

322314
// Only recreate if filters changed
323315
if (FiltersEqual(filters, _currentFilters))
@@ -347,6 +339,34 @@ private async Task RecreateConsumerAsync()
347339
_consumer = await _js.CreateOrUpdateConsumerAsync(workQueueStreamName, consumerConfig, _cts.Token).ConfigureAwait(false);
348340
}
349341

342+
private static string[] GenerateFiltersForMember(NatsPcgElasticConfig config, string memberName)
343+
{
344+
var allFilters = new List<string>();
345+
if (config.PartitioningFilters.Length > 0)
346+
{
347+
foreach (var pf in config.PartitioningFilters)
348+
{
349+
allFilters.AddRange(NatsPcgPartitionDistributor.GeneratePartitionFilters(
350+
config.Members,
351+
config.MaxMembers,
352+
config.MemberMappings,
353+
memberName,
354+
pf.Filter));
355+
}
356+
}
357+
else
358+
{
359+
allFilters.AddRange(NatsPcgPartitionDistributor.GeneratePartitionFilters(
360+
config.Members,
361+
config.MaxMembers,
362+
config.MemberMappings,
363+
memberName,
364+
">"));
365+
}
366+
367+
return allFilters.ToArray();
368+
}
369+
350370
private async Task WatchConfigLoopAsync()
351371
{
352372
try

src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticExtensions.cs

Lines changed: 80 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ public static class NatsPcgElasticExtensions
2525
/// <param name="streamName">Name of the source stream.</param>
2626
/// <param name="consumerGroupName">Name of the consumer group.</param>
2727
/// <param name="maxNumMembers">Maximum number of members (also the number of partitions).</param>
28-
/// <param name="filter">Subject filter for the consumer group.</param>
29-
/// <param name="partitioningWildcards">Wildcard positions for partitioning (1-indexed).</param>
28+
/// <param name="partitioningFilters">Partitioning filters, each pairing a subject filter with wildcard positions.</param>
3029
/// <param name="maxBufferedMessages">Optional maximum number of buffered messages.</param>
3130
/// <param name="maxBufferedBytes">Optional maximum bytes of buffered messages.</param>
3231
/// <param name="cancellationToken">Cancellation token.</param>
@@ -36,19 +35,17 @@ public static async Task<NatsPcgElasticConfig> CreatePcgElasticAsync(
3635
string streamName,
3736
string consumerGroupName,
3837
uint maxNumMembers,
39-
string filter,
40-
int[] partitioningWildcards,
38+
NatsPcgPartitioningFilter[] partitioningFilters,
4139
long? maxBufferedMessages = null,
4240
long? maxBufferedBytes = null,
4341
CancellationToken cancellationToken = default)
4442
{
45-
ValidateConfig(maxNumMembers, filter, partitioningWildcards);
43+
ValidateConfig(maxNumMembers, partitioningFilters);
4644

4745
var config = new NatsPcgElasticConfig
4846
{
4947
MaxMembers = maxNumMembers,
50-
Filter = filter,
51-
PartitioningWildcards = partitioningWildcards,
48+
PartitioningFilters = partitioningFilters,
5249
MaxBufferedMsgs = maxBufferedMessages,
5350
MaxBufferedBytes = maxBufferedBytes,
5451
};
@@ -461,11 +458,30 @@ public static async Task DeletePcgElasticMemberMappingsAsync(
461458
/// <returns>Array of partition filters.</returns>
462459
public static string[] GetPcgElasticPartitionFilters(this NatsPcgElasticConfig config, string memberName)
463460
{
464-
return NatsPcgPartitionDistributor.GeneratePartitionFilters(
465-
config.Members,
466-
config.MaxMembers,
467-
config.MemberMappings,
468-
memberName);
461+
var filters = new List<string>();
462+
if (config.PartitioningFilters.Length > 0)
463+
{
464+
foreach (var pf in config.PartitioningFilters)
465+
{
466+
filters.AddRange(NatsPcgPartitionDistributor.GeneratePartitionFilters(
467+
config.Members,
468+
config.MaxMembers,
469+
config.MemberMappings,
470+
memberName,
471+
pf.Filter));
472+
}
473+
}
474+
else
475+
{
476+
filters.AddRange(NatsPcgPartitionDistributor.GeneratePartitionFilters(
477+
config.Members,
478+
config.MaxMembers,
479+
config.MemberMappings,
480+
memberName,
481+
">"));
482+
}
483+
484+
return filters.ToArray();
469485
}
470486

471487
/// <summary>
@@ -491,15 +507,15 @@ public static async Task PcgElasticMemberStepDownAsync(
491507
}
492508

493509
// ReSharper disable ParameterOnlyUsedForPreconditionCheck.Local
494-
private static void ValidateConfig(uint maxNumMembers, string filter, int[] partitioningWildcards)
510+
private static void ValidateConfig(uint maxNumMembers, NatsPcgPartitioningFilter[] partitioningFilters)
495511
{
496512
// ReSharper restore ParameterOnlyUsedForPreconditionCheck.Local
497513
if (maxNumMembers == 0)
498514
{
499515
throw new NatsPcgConfigurationException("maxNumMembers must be greater than 0");
500516
}
501517

502-
NatsPcgMemberMappingValidator.ValidateFilterAndWildcards(filter, partitioningWildcards);
518+
NatsPcgMemberMappingValidator.ValidatePartitioningFilters(partitioningFilters);
503519
}
504520

505521
private static async Task CreateWorkQueueStreamAsync(
@@ -511,37 +527,26 @@ private static async Task CreateWorkQueueStreamAsync(
511527
{
512528
string workQueueStreamName = GetWorkQueueStreamName(streamName, consumerGroupName);
513529

514-
// Build subject transform: add partition prefix based on wildcards
515-
// Transform syntax: {{Partition(numPartitions, wildcardIndexes...)}}
516-
// Replace * wildcards with {{wildcard(N)}} in the filter
517-
string wildcardStr = string.Join(",", config.PartitioningWildcards);
518-
var filterTokens = config.Filter.Split('.');
519-
int wildcardIndex = 1;
520-
for (int i = 0; i < filterTokens.Length; i++)
530+
var subjectTransforms = new List<SubjectTransform>();
531+
if (config.PartitioningFilters.Length > 0)
521532
{
522-
if (filterTokens[i] == "*")
533+
foreach (var pf in config.PartitioningFilters)
523534
{
524-
filterTokens[i] = $"{{{{wildcard({wildcardIndex})}}}}";
525-
wildcardIndex++;
535+
subjectTransforms.Add(BuildSubjectTransform(pf, config.MaxMembers));
526536
}
527537
}
528-
529-
string destFromFilter = string.Join(".", filterTokens);
530-
string subjectTransform = $"{{{{Partition({config.MaxMembers},{wildcardStr})}}}}.{destFromFilter}";
538+
else
539+
{
540+
var defaultFilter = new NatsPcgPartitioningFilter(">", Array.Empty<int>());
541+
subjectTransforms.Add(BuildSubjectTransform(defaultFilter, config.MaxMembers));
542+
}
531543

532544
var sources = new List<StreamSource>
533545
{
534546
new()
535547
{
536548
Name = streamName,
537-
SubjectTransforms = new List<SubjectTransform>
538-
{
539-
new()
540-
{
541-
Src = config.Filter,
542-
Dest = subjectTransform,
543-
},
544-
},
549+
SubjectTransforms = subjectTransforms,
545550
},
546551
};
547552

@@ -565,6 +570,46 @@ private static async Task CreateWorkQueueStreamAsync(
565570
await js.CreateStreamAsync(streamConfig, cancellationToken).ConfigureAwait(false);
566571
}
567572

573+
private static SubjectTransform BuildSubjectTransform(NatsPcgPartitioningFilter pf, uint maxMembers)
574+
{
575+
bool isFullSubject = NatsPcgMemberMappingValidator.IsPartitionByFullSubject(pf.PartitioningWildcards);
576+
577+
// Build subject transform: add partition prefix based on wildcards
578+
// Replace * wildcards with {{wildcard(N)}} in the filter
579+
var filterTokens = pf.Filter.Split('.');
580+
int wildcardIndex = 1;
581+
for (int i = 0; i < filterTokens.Length; i++)
582+
{
583+
if (filterTokens[i] == "*")
584+
{
585+
filterTokens[i] = $"{{{{wildcard({wildcardIndex})}}}}";
586+
wildcardIndex++;
587+
}
588+
}
589+
590+
string destFromFilter = string.Join(".", filterTokens);
591+
592+
string partitionFunction;
593+
if (isFullSubject)
594+
{
595+
// {{Partition(N)}} without wildcard positions hashes the entire subject
596+
partitionFunction = $"{{{{Partition({maxMembers})}}}}";
597+
}
598+
else
599+
{
600+
string wildcardStr = string.Join(",", pf.PartitioningWildcards);
601+
partitionFunction = $"{{{{Partition({maxMembers},{wildcardStr})}}}}";
602+
}
603+
604+
string subjectTransform = $"{partitionFunction}.{destFromFilter}";
605+
606+
return new SubjectTransform
607+
{
608+
Src = pf.Filter,
609+
Dest = subjectTransform,
610+
};
611+
}
612+
568613
private static async Task<string[]> UpdateMembersAsync(
569614
INatsJSContext js,
570615
string streamName,

src/Synadia.Orbit.PCGroups/NatsPcgMemberMappingValidator.cs

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ public static void Validate(NatsPcgMemberMapping[]? memberMappings, uint maxMemb
7777
}
7878

7979
/// <summary>
80-
/// Validates filter and partitioning wildcards for elastic consumer groups.
80+
/// Validates a partitioning filter for elastic consumer groups.
8181
/// </summary>
8282
/// <param name="filter">The subject filter.</param>
83-
/// <param name="partitioningWildcards">The partitioning wildcard positions (1-indexed).</param>
83+
/// <param name="partitioningWildcards">The partitioning wildcard positions (1-indexed). Empty array means partition by full subject.</param>
8484
/// <exception cref="NatsPcgConfigurationException">Thrown when validation fails.</exception>
8585
public static void ValidateFilterAndWildcards(string filter, int[] partitioningWildcards)
8686
{
@@ -89,25 +89,23 @@ public static void ValidateFilterAndWildcards(string filter, int[] partitioningW
8989
throw new NatsPcgConfigurationException("Filter is required for elastic consumer groups");
9090
}
9191

92-
// Count wildcards in filter
93-
var filterTokens = filter.Split('.');
94-
int numWildcards = 0;
95-
foreach (var token in filterTokens)
92+
if (partitioningWildcards == null)
9693
{
97-
if (token == "*")
98-
{
99-
numWildcards++;
100-
}
94+
throw new NatsPcgConfigurationException("PartitioningWildcards must not be null");
10195
}
10296

103-
if (numWildcards < 1)
97+
// Empty array means partition by full subject - no wildcard requirements
98+
if (IsPartitionByFullSubject(partitioningWildcards))
10499
{
105-
throw new NatsPcgConfigurationException("Filter must contain at least one '*' wildcard");
100+
return;
106101
}
107102

108-
if (partitioningWildcards == null || partitioningWildcards.Length == 0)
103+
// Count wildcards in filter
104+
int numWildcards = CountWildcards(filter);
105+
106+
if (numWildcards < 1)
109107
{
110-
throw new NatsPcgConfigurationException("PartitioningWildcards must contain at least one element");
108+
throw new NatsPcgConfigurationException("Filter must contain at least one '*' wildcard");
111109
}
112110

113111
if (partitioningWildcards.Length > numWildcards)
@@ -133,4 +131,57 @@ public static void ValidateFilterAndWildcards(string filter, int[] partitioningW
133131
}
134132
}
135133
}
134+
135+
/// <summary>
136+
/// Validates a partitioning filter record for elastic consumer groups.
137+
/// </summary>
138+
/// <param name="partitioningFilter">The partitioning filter to validate.</param>
139+
/// <exception cref="NatsPcgConfigurationException">Thrown when validation fails.</exception>
140+
public static void ValidatePartitioningFilter(NatsPcgPartitioningFilter partitioningFilter)
141+
{
142+
ValidateFilterAndWildcards(partitioningFilter.Filter, partitioningFilter.PartitioningWildcards);
143+
}
144+
145+
/// <summary>
146+
/// Validates multiple partitioning filters for elastic consumer groups.
147+
/// </summary>
148+
/// <param name="partitioningFilters">The partitioning filters to validate.</param>
149+
/// <exception cref="NatsPcgConfigurationException">Thrown when validation fails.</exception>
150+
public static void ValidatePartitioningFilters(NatsPcgPartitioningFilter[] partitioningFilters)
151+
{
152+
if (partitioningFilters == null)
153+
{
154+
return;
155+
}
156+
157+
foreach (var pf in partitioningFilters)
158+
{
159+
ValidatePartitioningFilter(pf);
160+
}
161+
}
162+
163+
/// <summary>
164+
/// Determines if the partitioning wildcards represent partition-by-full-subject mode.
165+
/// </summary>
166+
/// <param name="partitioningWildcards">The partitioning wildcard positions.</param>
167+
/// <returns>True if the array is empty (partition by full subject).</returns>
168+
public static bool IsPartitionByFullSubject(int[] partitioningWildcards)
169+
{
170+
return partitioningWildcards != null && partitioningWildcards.Length == 0;
171+
}
172+
173+
private static int CountWildcards(string filter)
174+
{
175+
var filterTokens = filter.Split('.');
176+
int numWildcards = 0;
177+
foreach (var token in filterTokens)
178+
{
179+
if (token == "*")
180+
{
181+
numWildcards++;
182+
}
183+
}
184+
185+
return numWildcards;
186+
}
136187
}

src/Synadia.Orbit.PCGroups/NatsPcgPartitionDistributor.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ public static class NatsPcgPartitionDistributor
1515
/// <param name="maxMembers">Maximum number of members (equals number of partitions).</param>
1616
/// <param name="memberMappings">Optional explicit member-to-partition mappings.</param>
1717
/// <param name="memberName">The member name to generate filters for.</param>
18-
/// <returns>Array of filters like ["0.>", "3.>", "6.>"].</returns>
18+
/// <param name="filter">The subject filter to embed in partition filters.</param>
19+
/// <returns>Array of filters like ["0.orders.*", "3.orders.*"].</returns>
1920
public static string[] GeneratePartitionFilters(
2021
string[]? members,
2122
uint maxMembers,
2223
NatsPcgMemberMapping[]? memberMappings,
23-
string memberName)
24+
string memberName,
25+
string filter)
2426
{
2527
int[] partitions;
2628

@@ -54,7 +56,7 @@ public static string[] GeneratePartitionFilters(
5456
string[] filters = new string[partitions.Length];
5557
for (int i = 0; i < partitions.Length; i++)
5658
{
57-
filters[i] = $"{partitions[i]}.>";
59+
filters[i] = $"{partitions[i]}.{filter}";
5860
}
5961

6062
return filters;

0 commit comments

Comments
 (0)