Skip to content

Commit a6ebfab

Browse files
authored
Fix producer max pending messages validation inconsistency (#284)
* Fix producer max pending messages validation inconsistency ProducerBuilder.Create validated MaxPendingMessages, but PulsarClient.CreateProducer accepted 0 through ProducerOptions. This allowed an invalid producer configuration to reach the internal send queue, making the behavior inconsistent depending on which API was used to create the producer. Add the missing validation to PulsarClient.CreateProducer and add unit tests for both creation paths. * moved producerbuildertest.cs to the internal namespace * moved the validation test to pulsarclienttests
1 parent 25214f8 commit a6ebfab

File tree

3 files changed

+72
-0
lines changed

3 files changed

+72
-0
lines changed

src/DotPulsar/PulsarClient.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> op
6060
{
6161
ThrowIfDisposed();
6262

63+
if (options.MaxPendingMessages == 0)
64+
throw new ConfigurationException("ProducerOptions.MaxPendingMessages must be greater than 0");
65+
6366
ICompressorFactory? compressorFactory = null;
6467
if (options.CompressionType != CompressionType.None)
6568
{
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
namespace DotPulsar.Tests.Internal;
16+
17+
using DotPulsar.Abstractions;
18+
using DotPulsar.Exceptions;
19+
using DotPulsar.Extensions;
20+
21+
[Trait("Category", "Unit")]
22+
public sealed class ProducerBuilderTests
23+
{
24+
[Fact]
25+
public async Task Create_GivenMaxPendingMessagesIsZero_ShouldThrowConfigurationException()
26+
{
27+
// Arrange
28+
await using var client = CreateClient();
29+
30+
// Act
31+
var act = () => client
32+
.NewProducer(Schema.String)
33+
.Topic("persistent://public/default/my-topic")
34+
.MaxPendingMessages(0)
35+
.Create();
36+
37+
// Assert
38+
var exception = act.ShouldThrow<ConfigurationException>();
39+
exception.Message.ShouldBe("ProducerOptions.MaxPendingMessages must be greater than 0");
40+
}
41+
42+
private static IPulsarClient CreateClient()
43+
=> PulsarClient
44+
.Builder()
45+
.ServiceUrl(new Uri("pulsar://localhost:6650"))
46+
.Build();
47+
}

tests/DotPulsar.Tests/PulsarClientTests.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,28 @@ public async Task TokenSupplier_WhenTokenSupplierReturnValidToken_ShouldStayConn
124124
state.ShouldBe(ProducerState.Connected);
125125
}
126126

127+
[Fact]
128+
public async Task CreateProducer_GivenMaxPendingMessagesIsZero_ShouldThrowConfigurationException()
129+
{
130+
// Arrange
131+
await using var client = PulsarClient
132+
.Builder()
133+
.ServiceUrl(new Uri("pulsar://localhost:6650"))
134+
.Build();
135+
136+
var options = new ProducerOptions<string>("persistent://public/default/my-topic", Schema.String)
137+
{
138+
MaxPendingMessages = 0
139+
};
140+
141+
// Act
142+
var act = () => client.CreateProducer(options);
143+
144+
// Assert
145+
var exception = act.ShouldThrow<ConfigurationException>();
146+
exception.Message.ShouldBe("ProducerOptions.MaxPendingMessages must be greater than 0");
147+
}
148+
127149
private IPulsarClient CreateClient(Func<CancellationToken, ValueTask<string>> tokenSupplier)
128150
=> PulsarClient
129151
.Builder()

0 commit comments

Comments
 (0)