Skip to content

[fix][broker] Unthrottle producers immediately when publish rate limiting is disabled#25502

Open
thetumbled wants to merge 1 commit intoapache:masterfrom
thetumbled:fix/publish-rate-disable-immediate-unthrottle
Open

[fix][broker] Unthrottle producers immediately when publish rate limiting is disabled#25502
thetumbled wants to merge 1 commit intoapache:masterfrom
thetumbled:fix/publish-rate-disable-immediate-unthrottle

Conversation

@thetumbled
Copy link
Copy Markdown
Member

@thetumbled thetumbled commented Apr 9, 2026

Motivation

When publish rate limiting is disabled (for example by clearing the publish rate policy so update(null) is called), token buckets are nulled out immediately, but producers that were already throttled could remain blocked until a previously scheduled unthrottle task ran. That delayed task may have been scheduled with a very long wait time when the token bucket was deeply depleted, so disabling the limit in configuration did not restore traffic quickly. This change triggers the existing lock-free unthrottle path right away after disablement when an executor was already cached from a prior throttle on this limiter.

Modifications

  • In PublishRateLimiterImpl, cache the ScheduledExecutorService used when scheduling unthrottleQueuedProducers after the first throttling event on this limiter instance.
  • When update(PublishRate) is called with a null PublishRate (rate limiting disabled), call scheduleUnthrottling(cachedExecutor, 0) so queued producers are processed without waiting on the old delayed task.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

This PR ensures producers are unthrottled immediately when publish rate limiting is disabled, avoiding long waits from previously scheduled delayed unthrottle tasks.

Changes:

  • Cache the ScheduledExecutorService used for unthrottling after the first throttling event.
  • When update(null) disables rate limiting, trigger unthrottling immediately (0 delay) using the cached executor.

Comment on lines +178 to +182
ScheduledExecutorService executor = lastUnthrottleExecutor;
if (executor != null) {
// Wake the existing unthrottle path without waiting on an old delay.
scheduleUnthrottling(executor, 0L);
}
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastUnthrottleExecutor can become stale (e.g., executor shutdown during broker/namespace lifecycle changes). If scheduleUnthrottling(...) ultimately calls ScheduledExecutorService#schedule, this may throw RejectedExecutionException and break the update path when disabling rate limiting. Consider handling task rejection (catch RejectedExecutionException, clear lastUnthrottleExecutor, and/or fall back to a known-live executor) so disabling limits is best-effort and resilient.

Copilot uses AI. Check for mistakes.
Comment on lines 175 to 184
} else {
tokenBucketOnMessage = null;
tokenBucketOnByte = null;
ScheduledExecutorService executor = lastUnthrottleExecutor;
if (executor != null) {
// Wake the existing unthrottle path without waiting on an old delay.
scheduleUnthrottling(executor, 0L);
}
// If executor is null, no throttle has happened yet on this limiter
}
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds new behavior (immediate unthrottling on update(null)) that is timing- and race-sensitive. Please add a test that (1) throttles at least one producer enough to schedule a long unthrottle delay, then (2) disables rate limiting via update(null), and (3) asserts queued producers are unthrottled promptly (without waiting for the originally scheduled delay).

Copilot uses AI. Check for mistakes.
@@ -88,6 +89,7 @@ private void scheduleDecrementThrottleCount(Producer producer) {
// this is to avoid scheduling unthrottling multiple times for concurrent producers
if (throttledProducersCount.incrementAndGet() == 1) {
ScheduledExecutorService executor = producer.getCnx().getBrokerService().executor().next();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should provide (and reuse) a dedicated Executor when creating the rate limiter, for example:

private final Executor executor;

public PublishRateLimiterImpl(MonotonicClock monotonicClock,
                              Consumer<Producer> throttleAction,
                              Consumer<Producer> unthrottleAction,
                              Executor executor) {
    this.monotonicClock = monotonicClock;
    this.throttleAction = throttleAction;
    this.unthrottleAction = unthrottleAction;
    this.executor = executor;
}

This ensures that all scheduling (e.g., unthrottling tasks) runs on a controlled and consistent execution context, rather than relying on ad-hoc or shared threads.

When publish rate limiting is disabled, you can immediately release any throttled producers by invoking:

scheduleUnthrottling(executor, 0L);

Using a delay of 0L guarantees that the unthrottling task is executed asynchronously but without delay, ensuring producers are promptly resumed without blocking the caller thread.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT. @lhotari

@liangyepianzhou
Copy link
Copy Markdown
Contributor

liangyepianzhou commented Apr 10, 2026

The scenario is valid, but the fix needs further evaluation. The root cause is that disabling rate limiting should **directly signal** throttled producers to be released, rather than waiting for the next scheduled cycle — i.e., the `disable` semantic itself should trigger an immediate (synchronous or async) unthrottle.

The current fix, via update(null), appears to work around the root cause by adding an operational workaround rather than addressing the underlying code defect. This may fail to resolve the fundamental issue, and worse, increases cognitive overhead for users who need to understand why an extra update(null) call is necessary.

Sorry, the code version I was looking at doesn't seem to be the latest.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (maxPublishRate != null) {
if (PublishRate.normalize(maxPublishRate) != null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the latest version of the code, this change appears to have a correctness issue. In the path where rate limiting is disabled, the value passed to the following method will never be null:

org.apache.pulsar.broker.service.PublishRateLimiter#update(org.apache.pulsar.common.policies.data.PublishRate)

Otherwise, the following method would throw an NPE:

org.apache.pulsar.broker.service.AbstractTopic#updatePublishRateLimiter.

Here it should fall back to the broker-level configuration, where the publishRate is never null.

@lhotari
Copy link
Copy Markdown
Member

lhotari commented Apr 10, 2026

That delayed task may have been scheduled with a very long wait time when the token bucket was deeply depleted,

@thetumbled did the problem happen also with the fix in #25262 and #25269?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants