[feat][broker] Add adaptive publish throttle controller (disabled by default)#25239
[feat][broker] Add adaptive publish throttle controller (disabled by default)#25239hemanth-19 wants to merge 1 commit intoapache:masterfrom
Conversation
|
@hemanth-19 Please add the following content to your PR description and select a checkbox: |
…default) Introduces an opt-in, broker-level adaptive publish throttle that dynamically reduces producer publish rates when JVM heap usage or per-topic backlog size approaches configurable watermarks. Key design points ----------------- - AdaptivePublishRateLimiter: per-topic PublishRateLimiter that is a complete no-op (zero overhead) when inactive. Asymmetric EWMA (α_up=0.30, α_down=0.05) tracks the producer's peak natural rate. - AdaptivePublishThrottleController: single-threaded broker-level scheduler; bounded-step rate changes (≤ 25 % of natural rate per cycle) prevent oscillation; hysteresis (activate on pressure > 0, deactivate only when pressure == 0) prevents rapid toggling. - observeOnly mode: compute + log + emit metrics without applying any throttling; togglable via dynamic config as an emergency circuit-breaker (no restart required). - Controller never dies silently: every evaluation cycle is wrapped in try-catch-finally; failures are counted and logged at ERROR. - ThrottleType.AdaptivePublishRate: dedicated reentrant enum constant appended at the end of ThrottleType to preserve existing ordinals. - OpenTelemetry: 3 always-on broker metrics + 3 controller health metrics (last-eval timestamp, duration, failure count) + 6 optional per-topic metrics (disabled by default). Disabled by default: adaptivePublisherThrottlingEnabled=false. A broker restart is required to enable it. All tuning parameters (watermarks, rate factors, observeOnly flag) are dynamic. Tests added ----------- - AdaptivePublishRateLimiterTest (13 unit tests) - AdaptivePublishThrottleControllerTest (30+ unit tests) - AdaptiveThrottleEndToEndTest (9 integration tests) - ThrottleTypeEnumTest (4 ordinal-stability guard tests) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1bb6e14 to
35b7d6a
Compare
Local build notes (for reviewers — not a code issue)tl;dr — the broker module compiles cleanly and Checkstyle is green. The failures below are all environment constraints on the dev machine, not problems with this PR. What succeeded
A fix was needed mid-way: the initial parallel build ( One genuine bug caught by the local build: three casts in Why the build stopped at
|
|
Doc label: This PR adds an experimental, disabled-by-default broker feature. It includes:
When this feature graduates from experimental (i.e. Label suggestion: |
|
@hemanth-19 Thanks for the contribution! In Apache Pulsar, we follow the PIP process to introduce new features. Please check https://github.com/apache/pulsar/tree/master/pip#pulsar-improvement-proposal-pip for more details. |
| Runtime runtime = Runtime.getRuntime(); | ||
| long maxMemory = runtime.maxMemory(); | ||
| if (maxMemory == Long.MAX_VALUE) { | ||
| // Unbounded heap — cannot compute pressure. | ||
| return 0.0; | ||
| } | ||
| long usedMemory = maxMemory - runtime.freeMemory(); | ||
| double usageFraction = (double) usedMemory / maxMemory; |
There was a problem hiding this comment.
Pulsar uses direct memory besides heap memory.
Here's one example of heap + direct memory usage calculation:
However, that example doesn't take it into account that Netty's allocator performs pooling and there would be more free direct memory available that the stats show in that example.
contains an example with details Netty allocator stats. It would be possible to calculate the exact allocation of direct memory by subtracting the free memory in directArenas->chunkLists->freeBytes
@hemanth-19 Please setup "Personal CI" in your fork and run Pulsar CI in that environment. It would be advisable to reduce the AI generated noise in PR description and comments so that you spend effort on your side validating what gets generated by AI. Thank you! |
|
Sharing some thoughts around publish throttling / backpressure / flow control: In Pulsar, there's already a solution to limit publishing with memory backpressure. One of the problems with existing publish throttling is that there's no explicit flow control in the Pulsar protocol and publish throttling happens by pausing reads on the TCP/IP connection. This solution impacts all producers and consumers sharing the same TCP/IP connection. One of the improvement directions would be to introduce explicit producer flow control. What is currently missing from Pulsar is the ability to explicitly prioritize tenants/namespaces/topics and have dynamic limits instead of fixed rate limits. This would be useful in cases where an overall broker limit is hit. Pulsar's "Resource Groups" (PIP-82) could be further developed to the direction what Confluent Kora has with the dynamic tenant-level quotas feature. Regarding this PR, I don't currently understand what essential benefit it brings over the existing |
Summary
adaptivePublisherThrottlingEnabled=false.adaptivePublisherThrottlingObserveOnly=true, toggleable at runtime) computes and logs decisions but never applies throttling — safe for validating in production and usable as an emergency circuit-breaker.try-catch-finally; failures increment a dedicated OTel counter.ThrottleType.AdaptivePublishRateis appended as the last enum constant to preserve existing ordinals 0–6. A guard test (ThrottleTypeEnumTest) fails immediately if the enum is accidentally mutated.Diff navigation
Configuration (reviewers: verify defaults, dynamic flags, and conf entry comments)
ServiceConfiguration.java@FieldContextfields —adaptivePublisherThrottling*conf/broker.confREADME.mdbroker.confsnippetBroker wiring (reviewers: focus on concurrency, lifecycle, and enum ordinal safety)
ServerCnxThrottleTracker.javaThrottleType.AdaptivePublishRateconstant (appended last, reentrant); class-level Javadoc on ordinal stabilityAbstractTopic.javaAdaptivePublishRateLimiterfield;handlePublishThrottling()delegation; usesThrottleType.AdaptivePublishRateBrokerService.javastartAdaptivePublishThrottleController()lifecycle;forEachPersistentTopic()helper;close()teardownAdaptivePublishRateLimiter.java(new)volatile boolean activefast path, asymmetric EWMA,activate()/deactivate()AdaptivePublishThrottleController.java(new)observeOnlyguard,try-catch-finallysafetyOpenTelemetryAdaptiveThrottleStats.java(new)Tests (reviewers: check the observeOnly safety tests and the enum guard tests)
AdaptivePublishRateLimiterTest.java(new)observeOnlynever changes channel autoreadAdaptivePublishThrottleControllerTest.java(new)linearPressure(), bounded-stepcomputeTargetRate(), hysteresis,observeOnlynever callsactivate()AdaptiveThrottleEndToEndTest.java(new)observeOnlyIO-thread safetyThrottleTypeEnumTest.java(new)AdaptivePublishRatepresent and last, declared reentrantDesign FAQ
Q: Why introduce a new
ThrottleType.AdaptivePublishRateinstead of reusingTopicPublishRate?ServerCnxThrottleTrackeruses reference-counting viastates[ThrottleType.ordinal()].If the adaptive limiter shared
TopicPublishRatewith the static rate limiter, a singleunmarkThrottled()call from the adaptive side could decrement the count that the staticlimiter had incremented — leaving the connection unthrottled even though the static limit is
still exceeded. A dedicated constant keeps the two signals fully independent and eliminates an
entire class of ordering bugs.
Q: Why does the controller not coordinate across brokers? Each broker throttles independently.
Adaptive throttling reacts to local signals — JVM heap on this JVM, backlog on topics owned
by this broker. Cross-broker coordination would require a consensus protocol, introduce network
latency on the hot publish path, and create a single point of failure. Local-only decisions
are simpler, faster, and fail-safe: if a network partition occurs, each broker still protects
itself independently. Cluster-wide load imbalance (e.g. one hot broker) is better addressed
by Pulsar's topic-migration and load-balancer machinery than by throttle coordination.
Q: What happens if the controller thread crashes?
The evaluation loop is wrapped in
try-catch-finally. A caught exception:evaluationFailureCount(surfaced as OTel counterpulsar.broker.adaptive.throttle.controller.evaluation.failure.count).ERRORlevel so it appears in broker logs immediately.ScheduledExecutorService— the scheduler reschedules the next cycle unconditionally.If the failure is persistent,
last.evaluation.timestampstops advancing whileevaluation.failure.countclimbs — a clear alert signal. A full controller stall (only possible via an uncheckedError) requires a broker restart; the OTel staleness alert will fire within3 × intervalMs.Test plan
mvn test -pl pulsar-broker -am -Dtest="AdaptivePublishRateLimiterTest,AdaptivePublishThrottleControllerTest,AdaptiveThrottleEndToEndTest,ThrottleTypeEnumTest" -DfailIfNoTests=false --no-transfer-progresspassesmvn test -pl pulsar-broker -am -Dtest="PublishRateLimiterTest" -DfailIfNoTests=false --no-transfer-progress(regression: static rate limiter unaffected)adaptivePublisherThrottlingEnabled=false(default): no controller thread, noAdaptivePublishRateLimiterallocated, no OTel instruments registeredobserveOnly=true: logs showOBSERVE-ONLY would-activate, zeroACTIVATEDlines,active.topic.countmetric stays at 0🤖 Generated with Claude Code