[python] move async wait/synchronization logic into client APIs and update docs/tests#5633
[python] move async wait/synchronization logic into client APIs and update docs/tests#5633
Conversation
Avoid test-only race workarounds and align test behavior with what feldera client SDK users actually run. Added client-side start observation (observe_start) for start(wait=False) so callers can wait until lifecycle leaves STOPPED. Removed deployment-status polling helper logic from platform tests and switched helpers/tests to rely on Feldera client lifecycle APIs. Polling for deployment-status transitions is now internal to FelderaClient (_wait_for_deployment_status) instead of test helper code.
…latform tests Use explicit condition polling instead of blind delays for logs, connector state changes, checkpoint completion, and datagen progress. This removes timing-based race masking, makes synchronization deterministic, and makes each test wait state explicit about what condition must be reached.
…age polling Replace test helper wait_for_cleared_storage() with FelderaClient.clear_storage(...) This moves storage lifecycle synchronization to the client path used by SDK users and removes duplicate test-side polling logic.
…rm tests Expose compilation waiting as a client API with optional expected program version, timeout, and poll interval. Useful for users who run async create/update flows (wait=False) and later need a deterministic compile barrier before start(), and for patch/update workflows that must wait for a specific new program version (not stale Success). Unlike deployment-oriented waiter wait_for_status(), this API tracks program lifecycle (program_status/program_version) and fails fast on compile errors.
Raw ingress tests queried too early after async ingestion, causing flaky failures. Wait on completion tokens for successful ingests and explicitly poll for CSV partial-ingest visibility in the BAD_REQUEST parse-error case.
Ensure waiters never block forever by normalizing None/infinite timeouts to safe defaults across client, pipeline, HTTP health recovery, and test helpers. Also standardize timeout measurements on time.monotonic() to avoid wall-clock jumps affecting elapsed-time logic. This prevents hidden hangs while keeping async operations deterministic under slow cluster conditions.
Replace test _wait_token() polling with client wait_for_token(...) to align platform tests with feldera client behavior and remove duplicate completion-status wait logic.
Switch timeout/elapsed polling paths in client and platform tests from time.time() to time.monotonic() to avoid wall-clock jumps affecting wait behavior. Keep time.time() in testutils_oidc.py because token expiry/cache timestamps are wall-clock/epoch semantics, and in checkpoint-sync random seeding where clock monotonicity is irrelevant.
Move generic condition polling from platform test helper into Feldera client as FelderaClient.wait_for_condition(...), including timeout/poll validation and usage docs. Keep test callsites stable by making helper a thin proxy to the new client API. Document usage with a new “Waiting for a Custom Condition” example in python/docs/examples.rst.
This is another step in the wait-loop migration series to reduce async start/stop/checkpoint race flakes and keep test synchronization aligned with Feldera client behavior used by SDK users. Replace remaining manual loops with wait_for_condition-based waits and clarify helper usage: use helper wrapper only when no Pipeline object exists; otherwise call pipeline.client.wait_for_condition(...) directly.
Update Python and docs.feldera.com docs for new client-side wait APIs and lifecycle synchronization patterns: - document wait_for_program_success and wait_for_condition - add practical examples for async start and custom predicate waiting - clarify lifecycle async behavior and recommend SDK built-in methods over custom synchronization loops - fix typo: "predicate returns True when condition is met"
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
ryzhyk
left a comment
There was a problem hiding this comment.
This looks much cleaner. My main concern is with the observe_start flag. It seems that we need better support for this in the backend (seen inline comment).
|
|
||
| from feldera.enums import BootstrapPolicy | ||
|
|
||
| pipeline.start( |
There was a problem hiding this comment.
This seems like something that shouldn't be needed. Starting a previously failed pipeline is a totally normal situation. The caller may not know or care that the pipeline previously failed. I wonder if observe_start=True should be the default (only?) behavior of start? Would this still be needed then?
There was a problem hiding this comment.
yeah, it is cumbersome heuristic currently. If only we could wait for specific transaction ID to complete ;)
See comments below.
| ) | ||
|
|
||
| if not wait: | ||
| if observe_start: |
There was a problem hiding this comment.
Is my understanding correct that there is no way to do this well today: if we return immediately after /start, the pipeline can still be stopped during the next check. If we wait for the pipeline to not be stopped, we may wait forever, since it may have failed and gone back to the stopped state during startup.
It seems that the ideal solution would make sure that the pipeline leaves the stopped state before returning 202 from /start. @snkas , do you think this is possible to achieve.
Alternatively, can we use incarnation ids to distinguish between STOPPED states from the previous and current runs of the pipeline?
There was a problem hiding this comment.
Adding backend support for transaction or operation ID would make the whole system easier to reason about, easier to debug, and much more deterministic for everyone. Our synchronizations stop being “guess from current state” and become “wait for this exact operation” and then just report state/error. That is a huge shift in clarity, no more fragile polling heuristics, fewer race-condition workarounds in tests.
There was a problem hiding this comment.
I understand, although transaction ids would introduce a lot of backend complexity. One mechanism that may help in a similar way specifically for the /start operation is pipeline incarnation id's. Let's engage @snkas, who's the ultimate authority on this, in the discussion.
There was a problem hiding this comment.
I don't fully understand all backend constraints and complexity, so I’m trying to propose the smallest possible change and keep the backend contract with client as simple as possible. It would be nice to have a simple universal transaction concept for async operations. From the client/test side this looks like the smallest and simplest way to make synchronization deterministic:
- each async request returns a transaction_id
- backend reports that transaction’s state (pending|succeeded|failed+error)
- client waits on that exact transaction_id .
This would reduce status-guessing heuristics, improve error attribution, and simplify both SDK and test wait logic with one consistent pattern across async endpoints.
Motivation
See discussion about design and implementation in #5597
Feldera lifecycle and ingestion paths are asynchronous (start/stop/clear/compile/ingest are accepted first, then converge). We had multiple test-local polling/sleep helpers to avoid races, which made tests pass while SDK users could still hit the same race windows.
This PR moves synchronization behavior into Python client SDK built-ins, makes wait behavior bounded and monotonic, and updates tests/docs to use the same APIs users rely on.
This PR is the first stage of improving async determinism in Python tests and the Feldera Python client. In this stage, we moved key wait/synchronization logic from test-local helpers into client SDK built-in methods, replaced ad-hoc
polling/sleeps in tests, and documented the new usage patterns. We plan to continue in follow-up stages with tighter wait_for_status fail-fast/error diagnostics, additional client wait APIs for connector/checkpoint/storage flows, and further consolidation of wait-loop internals.
Longer term, we should align this with backend support for transition/operation IDs on /start and /stop to eliminate stale-state ambiguity at the source. As far as I know, @snkas is already working on related backend improvements, so we can coordinate and combine efforts.
What changed
Client API improvements
FelderaClient.wait_for_program_success(...).FelderaClient.wait_for_condition(...).start_pipeline(..., observe_start=True)and correspondingPipeline.start(..., observe_start=True).ignore_deployment_errorhandling for expected start-recovery flows.time.monotonic()for elapsed/timeout logic.Test changes
FelderaClient.wait_for_condition(...).Documentation updates
SDK built-in methods instead of custom synchronization loops.
Outcome
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes