Skip to content

[python] move async wait/synchronization logic into client APIs and update docs/tests#5633

Draft
igorscs wants to merge 14 commits intomainfrom
feldera-python-client-5597
Draft

[python] move async wait/synchronization logic into client APIs and update docs/tests#5633
igorscs wants to merge 14 commits intomainfrom
feldera-python-client-5597

Conversation

@igorscs
Copy link
Contributor

@igorscs igorscs commented Feb 14, 2026

Motivation

See discussion about design and implementation in #5597

Feldera lifecycle and ingestion paths are asynchronous (start/stop/clear/compile/ingest are accepted first, then converge). We had multiple test-local polling/sleep helpers to avoid races, which made tests pass while SDK users could still hit the same race windows.

This PR moves synchronization behavior into Python client SDK built-ins, makes wait behavior bounded and monotonic, and updates tests/docs to use the same APIs users rely on.

This PR is the first stage of improving async determinism in Python tests and the Feldera Python client. In this stage, we moved key wait/synchronization logic from test-local helpers into client SDK built-in methods, replaced ad-hoc
polling/sleeps in tests, and documented the new usage patterns. We plan to continue in follow-up stages with tighter wait_for_status fail-fast/error diagnostics, additional client wait APIs for connector/checkpoint/storage flows, and further consolidation of wait-loop internals.

Longer term, we should align this with backend support for transition/operation IDs on /start and /stop to eliminate stale-state ambiguity at the source. As far as I know, @snkas is already working on related backend improvements, so we can coordinate and combine efforts.

What changed

Client API improvements

  • Added public FelderaClient.wait_for_program_success(...).
  • Added public FelderaClient.wait_for_condition(...).
  • Moved deployment-status waiting from test helper into client (_wait_for_deployment_status path).
  • Added lifecycle start observation support for non-blocking start:
    • start_pipeline(..., observe_start=True) and corresponding Pipeline.start(..., observe_start=True).
  • Added ignore_deployment_error handling for expected start-recovery flows.
  • Standardized wait loops to:
    • use finite/bounded timeouts,
    • validate/default invalid timeout inputs,
    • use time.monotonic() for elapsed/timeout logic.

Test changes

  • Replaced fixed sleeps and ad-hoc polling loops with client-backed waiters.
  • Switched completion-token polling in tests to TEST_CLIENT.wait_for_token(...).
  • Updated ingress tests to synchronize assertions with completion behavior.
  • Replaced helper storage polling with client clear_storage waiter behavior.
  • Unified remaining platform wait loops through client condition waiter FelderaClient.wait_for_condition(...).

Documentation updates

  • Python docs: documented new wait helpers and examples for:
    • waiting for compilation success,
    • non-blocking start observation,
    • custom predicate waits (wait_for_condition),
    • expected deployment-error recovery flow.
  • docs.feldera.com: added lifecycle synchronization guidance and async lifecycle notes; clarified using Python client
    SDK built-in methods instead of custom synchronization loops.

Outcome

  • Less duplicated race-handling logic in tests.
  • Deterministic, bounded wait semantics in client-facing APIs.
  • Better alignment between integration tests and real SDK usage.
  • Improved diagnostics and reliability for async lifecycle workflows.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

Igor Smolyar added 12 commits February 14, 2026 14:16
Avoid test-only race workarounds and align test behavior with what
feldera client SDK users actually run.

Added client-side start observation (observe_start) for start(wait=False)
so callers can wait until lifecycle leaves STOPPED.

Removed deployment-status polling helper logic from platform tests and
switched helpers/tests to rely on Feldera client lifecycle APIs.

Polling for deployment-status transitions is now internal to FelderaClient
(_wait_for_deployment_status) instead of test helper code.
…latform tests

Use explicit condition polling instead of blind delays for logs,
connector state changes, checkpoint completion, and datagen progress.

This removes timing-based race masking, makes synchronization deterministic,
and makes each test wait state explicit about what condition must be reached.
…age polling

Replace test helper wait_for_cleared_storage() with FelderaClient.clear_storage(...)
This moves storage lifecycle synchronization to the client path used
by SDK users and removes duplicate test-side polling logic.
…rm tests

Expose compilation waiting as a client API with optional expected
program version, timeout, and poll interval.

Useful for users who run async create/update flows (wait=False) and
later need a deterministic compile barrier before start(), and for
patch/update workflows that must wait for a specific new program
version (not stale Success).

Unlike deployment-oriented waiter wait_for_status(),
this API tracks program lifecycle (program_status/program_version)
and fails fast on compile errors.
Raw ingress tests queried too early after async ingestion, causing
flaky failures.

Wait on completion tokens for successful ingests and explicitly poll for
CSV partial-ingest visibility in the BAD_REQUEST parse-error case.
Ensure waiters never block forever by normalizing None/infinite timeouts to
safe defaults across client, pipeline, HTTP health recovery, and test helpers.
Also standardize timeout measurements on time.monotonic() to avoid wall-clock
jumps affecting elapsed-time logic. This prevents hidden hangs while keeping
async operations deterministic under slow cluster conditions.
Replace test _wait_token() polling with client wait_for_token(...)
to align platform tests with feldera client behavior and remove
duplicate completion-status wait logic.
Switch timeout/elapsed polling paths in client and platform tests from
time.time() to time.monotonic() to avoid wall-clock jumps affecting wait
behavior. Keep time.time() in testutils_oidc.py because token expiry/cache
timestamps are wall-clock/epoch semantics, and in checkpoint-sync random
seeding where clock monotonicity is irrelevant.
Move generic condition polling from platform test helper into Feldera client as
FelderaClient.wait_for_condition(...), including timeout/poll validation and
usage docs. Keep test callsites stable by making helper a thin proxy to the new
client API.

Document usage with a new “Waiting for a Custom Condition” example
in python/docs/examples.rst.
This is another step in the wait-loop migration series to reduce async
start/stop/checkpoint race flakes and keep test synchronization aligned with
Feldera client behavior used by SDK users.

Replace remaining manual loops with wait_for_condition-based waits and clarify
helper usage: use helper wrapper only when no Pipeline object exists; otherwise
call pipeline.client.wait_for_condition(...) directly.
Update Python and docs.feldera.com docs for new client-side wait APIs and
lifecycle synchronization patterns:
 - document wait_for_program_success and wait_for_condition
 - add practical examples for async start and custom predicate waiting
 - clarify lifecycle async behavior and recommend SDK built-in methods over
   custom synchronization loops
 - fix typo: "predicate returns True when condition is met"
@igorscs igorscs self-assigned this Feb 14, 2026
@igorscs igorscs added documentation Improvements or additions to documentation RFC Request for Comments python-sdk Issues related to the feldera python sdk QA Testing and quality assurance python Pull requests that update python code enterprise Issue related to Feldera Enterprise features. labels Feb 14, 2026
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Copy link
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much cleaner. My main concern is with the observe_start flag. It seems that we need better support for this in the backend (seen inline comment).


from feldera.enums import BootstrapPolicy

pipeline.start(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like something that shouldn't be needed. Starting a previously failed pipeline is a totally normal situation. The caller may not know or care that the pipeline previously failed. I wonder if observe_start=True should be the default (only?) behavior of start? Would this still be needed then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it is cumbersome heuristic currently. If only we could wait for specific transaction ID to complete ;)
See comments below.

)

if not wait:
if observe_start:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understanding correct that there is no way to do this well today: if we return immediately after /start, the pipeline can still be stopped during the next check. If we wait for the pipeline to not be stopped, we may wait forever, since it may have failed and gone back to the stopped state during startup.

It seems that the ideal solution would make sure that the pipeline leaves the stopped state before returning 202 from /start. @snkas , do you think this is possible to achieve.

Alternatively, can we use incarnation ids to distinguish between STOPPED states from the previous and current runs of the pipeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding backend support for transaction or operation ID would make the whole system easier to reason about, easier to debug, and much more deterministic for everyone. Our synchronizations stop being “guess from current state” and become “wait for this exact operation” and then just report state/error. That is a huge shift in clarity, no more fragile polling heuristics, fewer race-condition workarounds in tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, although transaction ids would introduce a lot of backend complexity. One mechanism that may help in a similar way specifically for the /start operation is pipeline incarnation id's. Let's engage @snkas, who's the ultimate authority on this, in the discussion.

Copy link
Contributor Author

@igorscs igorscs Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand all backend constraints and complexity, so I’m trying to propose the smallest possible change and keep the backend contract with client as simple as possible. It would be nice to have a simple universal transaction concept for async operations. From the client/test side this looks like the smallest and simplest way to make synchronization deterministic:

  1. each async request returns a transaction_id
  2. backend reports that transaction’s state (pending|succeeded|failed+error)
  3. client waits on that exact transaction_id .

This would reduce status-guessing heuristics, improve error attribution, and simplify both SDK and test wait logic with one consistent pattern across async endpoints.

@igorscs igorscs marked this pull request as draft February 15, 2026 08:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation enterprise Issue related to Feldera Enterprise features. python Pull requests that update python code python-sdk Issues related to the feldera python sdk QA Testing and quality assurance RFC Request for Comments

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants