pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/feldera/feldera/pull/5633

848.css" /> [python] move async wait/synchronization logic into client APIs and update docs/tests by igorscs · Pull Request #5633 · feldera/feldera · GitHub
Skip to content

[python] move async wait/synchronization logic into client APIs and update docs/tests#5633

Draft
igorscs wants to merge 20 commits intomainfrom
feldera-python-client-5597
Draft

[python] move async wait/synchronization logic into client APIs and update docs/tests#5633
igorscs wants to merge 20 commits intomainfrom
feldera-python-client-5597

Conversation

@igorscs
Copy link
Contributor

@igorscs igorscs commented Feb 14, 2026

Motivation

See discussion about design and implementation in #5597

Feldera lifecycle and ingestion paths are asynchronous (start/stop/clear/compile/ingest are accepted first, then converge). We had multiple test-local polling/sleep helpers to avoid races, which made tests pass while SDK users could still hit the same race windows.

This PR moves synchronization behavior into Python client SDK built-ins, makes wait behavior bounded and monotonic, and updates tests/docs to use the same APIs users rely on.

This PR is the first stage of improving async determinism in Python tests and the Feldera Python client. In this stage, we moved key wait/synchronization logic from test-local helpers into client SDK built-in methods, replaced ad-hoc
polling/sleeps in tests, and documented the new usage patterns. We plan to continue in follow-up stages with tighter wait_for_status fail-fast/error diagnostics, additional client wait APIs for connector/checkpoint/storage flows, and further consolidation of wait-loop internals.

Longer term, we should align this with backend support for transition/operation IDs on /start and /stop to eliminate stale-state ambiguity at the source. As far as I know, @snkas is already working on related backend improvements, so we can coordinate and combine efforts.

What changed

Client API improvements

  • Added public FelderaClient.wait_for_program_success(...).
  • Added public FelderaClient.wait_for_condition(...).
  • Moved deployment-status waiting from test helper into client (_wait_for_deployment_status path).
  • Added lifecycle start observation support for non-blocking start:
    • start_pipeline(..., observe_start=True) and corresponding Pipeline.start(..., observe_start=True).
  • Added ignore_deployment_error handling for expected start-recovery flows.
  • Standardized wait loops to:
    • use finite/bounded timeouts,
    • validate/default invalid timeout inputs,
    • use time.monotonic() for elapsed/timeout logic.

Test changes

  • Replaced fixed sleeps and ad-hoc polling loops with client-backed waiters.
  • Switched completion-token polling in tests to TEST_CLIENT.wait_for_token(...).
  • Updated ingress tests to synchronize assertions with completion behavior.
  • Replaced helper storage polling with client clear_storage waiter behavior.
  • Unified remaining platform wait loops through client condition waiter FelderaClient.wait_for_condition(...).

Documentation updates

  • Python docs: documented new wait helpers and examples for:
    • waiting for compilation success,
    • non-blocking start observation,
    • custom predicate waits (wait_for_condition),
    • expected deployment-error recovery flow.
  • docs.feldera.com: added lifecycle synchronization guidance and async lifecycle notes; clarified using Python client
    SDK built-in methods instead of custom synchronization loops.

Outcome

  • Less duplicated race-handling logic in tests.
  • Deterministic, bounded wait semantics in client-facing APIs.
  • Better alignment between integration tests and real SDK usage.
  • Improved diagnostics and reliability for async lifecycle workflows.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

@igorscs igorscs self-assigned this Feb 14, 2026
@igorscs igorscs added documentation Improvements or additions to documentation RFC Request for Comments python-sdk Issues related to the feldera python sdk QA Testing and quality assurance python Pull requests that update python code enterprise Issue related to Feldera Enterprise features. labels Feb 14, 2026
Copy link
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much cleaner. My main concern is with the observe_start flag. It seems that we need better support for this in the backend (seen inline comment).


from feldera.enums import BootstrapPolicy

pipeline.start(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like something that shouldn't be needed. Starting a previously failed pipeline is a totally normal situation. The caller may not know or care that the pipeline previously failed. I wonder if observe_start=True should be the default (only?) behavior of start? Would this still be needed then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it is cumbersome heuristic currently. If only we could wait for specific transaction ID to complete ;)
See comments below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they do not care, then they would pass ignore_deployment_error=True (or do care, the inverse). I think we want to not ignore to be default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need some evidence that pipeline /start operation is in progress, as we cannot rely for status polling on stalled states from the previous run. We have to use this fragile heuristic.

)

if not wait:
if observe_start:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understanding correct that there is no way to do this well today: if we return immediately after /start, the pipeline can still be stopped during the next check. If we wait for the pipeline to not be stopped, we may wait forever, since it may have failed and gone back to the stopped state during startup.

It seems that the ideal solution would make sure that the pipeline leaves the stopped state before returning 202 from /start. @snkas , do you think this is possible to achieve.

Alternatively, can we use incarnation ids to distinguish between STOPPED states from the previous and current runs of the pipeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding backend support for transaction or operation ID would make the whole system easier to reason about, easier to debug, and much more deterministic for everyone. Our synchronizations stop being “guess from current state” and become “wait for this exact operation” and then just report state/error. That is a huge shift in clarity, no more fragile polling heuristics, fewer race-condition workarounds in tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, although transaction ids would introduce a lot of backend complexity. One mechanism that may help in a similar way specifically for the /start operation is pipeline incarnation id's. Let's engage @snkas, who's the ultimate authority on this, in the discussion.

Copy link
Contributor Author

@igorscs igorscs Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand all backend constraints and complexity, so I’m trying to propose the smallest possible change and keep the backend contract with client as simple as possible. It would be nice to have a simple universal transaction concept for async operations. From the client/test side this looks like the smallest and simplest way to make synchronization deterministic:

  1. each async request returns a transaction_id
  2. backend reports that transaction’s state (pending|succeeded|failed+error)
  3. client waits on that exact transaction_id .

This would reduce status-guessing heuristics, improve error attribution, and simplify both SDK and test wait logic with one consistent pattern across async endpoints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason this may not be straightforward is that we use the desired state architecture internally, i.e., we don't track the status and progress of individual state change requests. Such a request generally just updates the desired state of the system. The manager continuously tries to reconcile this desired state with the actual state by driving the pipeline automaton through various state transitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know that this won't necessarily hold true for our customers. They may mix client applications with UI-induced actions.

Yes, they may, and all individual API endpoints of the backend are atomic when it comes to pipeline management (i.e., control plane). It would be the responsibility of the user to ensure that any concurrent interactions they do, do not interfere with each other. For example, looking in the Web Console at a pipeline should not have side effects. (This also ties into Gerd's comment)

If we want to detect concurrent interaction, I would propose something along the lines of the dibs mechanism I described in my prior comment. It seems simpler than doing transaction/request-id tracking.

I started this thread to address a specific problem: when starting a pipeline, we don't seem to have a good way to distinguish between a pipeline that (1) hasn't yet transitioned from Stopped to Provisioning and (2) a pipeline that transitioned to Provisioning or one of the later states and failed. In both cases the pipeline is Stopped.

I was wondering if we could change the semantics of /start to return success only after the pipeline has moved from Stopped to any other state. In this case (1) is no longer possible.

This can already be distinguished by the deployment_error being set. (1) will not have it set, (2) will have it set. After pipeline.start() calls /start, it observes and if it encounters a deployment_error, it throws an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After it was able to do one successful /start request, it just monitors until it achieves to become the initial desired status.

The problem that @ryzhyk mentioned, we have no idea if we are reading old staled desired status or a new one, the currently desired.

We can see current state "stopped" and current desired state "stopped" so we will bailout immediately.
But these were staled states from the previous run, the backend just didn't update them yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you call /start, the desired status gets set. Not sure what you mean with old staled desired status?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to why I had to wait for the incarnation ID to change in https://github.com/feldera/cloud/commit/819b42bb7305795e7d0873be596d65e94435a67e?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you call /start, the desired status gets set. Not sure what you mean with old staled desired status?

My question is "when it gets set". Is it set before /start returns? Or asynchronously after /start returns?

@igorscs igorscs marked this pull request as draft February 15, 2026 08:09
Comment on lines +293 to +297
Wait until pipeline 'pipeline_name' has 'desired' deployment status:

- If 'desired' is a string, until that is the status.
- If 'desired' is a function, until it returns true when passed
the deployment status.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I only introduced the ability to pass a function to be able to wait for "not stopped", and that's the only current use. It seems like that's an important special case based on the other commits, so if it's better to special case it here, too, that's OK.

# Create/update asynchronously.
client.create_or_update_pipeline(pipeline, wait=False)

# Wait for compile success of at least program version 2.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not exactly program version 2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because of any automatic upgrade mechanism?

Copy link
Contributor Author

@igorscs igorscs Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because of any automatic upgrade mechanism?

not sure, this was the orig implementation, I tried not to change behavior of what we already have and already use in our tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function does become part of the public API right? We should consider what makes most sense for a user, intuitively I feel like it would be an exact version? I can probably give more detailed feedback after having tried out the PR locally to get a feel for the new API functions


from feldera.enums import BootstrapPolicy

pipeline.start(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they do not care, then they would pass ignore_deployment_error=True (or do care, the inverse). I think we want to not ignore to be default.

)

if not wait:
if observe_start:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know that this won't necessarily hold true for our customers. They may mix client applications with UI-induced actions.

Yes, they may, and all individual API endpoints of the backend are atomic when it comes to pipeline management (i.e., control plane). It would be the responsibility of the user to ensure that any concurrent interactions they do, do not interfere with each other. For example, looking in the Web Console at a pipeline should not have side effects. (This also ties into Gerd's comment)

If we want to detect concurrent interaction, I would propose something along the lines of the dibs mechanism I described in my prior comment. It seems simpler than doing transaction/request-id tracking.

I started this thread to address a specific problem: when starting a pipeline, we don't seem to have a good way to distinguish between a pipeline that (1) hasn't yet transitioned from Stopped to Provisioning and (2) a pipeline that transitioned to Provisioning or one of the later states and failed. In both cases the pipeline is Stopped.

I was wondering if we could change the semantics of /start to return success only after the pipeline has moved from Stopped to any other state. In this case (1) is no longer possible.

This can already be distinguished by the deployment_error being set. (1) will not have it set, (2) will have it set. After pipeline.start() calls /start, it observes and if it encounters a deployment_error, it throws an error.

@igorscs
Copy link
Contributor Author

igorscs commented Feb 16, 2026

I started this thread to address a specific problem: when starting a pipeline, we don't seem to have a good way to distinguish between a pipeline that (1) hasn't yet transitioned from Stopped to Provisioning and (2) a pipeline that transitioned to Provisioning or one of the later states and failed. In both cases the pipeline is Stopped.
I was wondering if we could change the semantics of /start to return success only after the pipeline has moved from Stopped to any other state. In this case (1) is no longer possible.

This can already be distinguished by the deployment_error being set. (1) will not have it set, (2) will have it set. After pipeline.start() calls /start, it observes and if it encounters a deployment_error, it throws an error.

The problem that the client doesn't know if the error is a stale error from previous run or this is a new error.
Currently we have no way to distinguish between old states and new states. We have to use some heuristics like "start observed"

@ryzhyk
Copy link
Contributor

ryzhyk commented Feb 16, 2026

I started this thread to address a specific problem: when starting a pipeline, we don't seem to have a good way to distinguish between a pipeline that (1) hasn't yet transitioned from Stopped to Provisioning and (2) a pipeline that transitioned to Provisioning or one of the later states and failed. In both cases the pipeline is Stopped.
I was wondering if we could change the semantics of /start to return success only after the pipeline has moved from Stopped to any other state. In this case (1) is no longer possible.

This can already be distinguished by the deployment_error being set. (1) will not have it set, (2) will have it set. After pipeline.start() calls /start, it observes and if it encounters a deployment_error, it throws an error.

The problem that the client doesn't know if the error is a stale error from previous run or this is a new error. Currently we have no way to distinguish between old states and new states. We have to use some heuristics like "start observed"

Right, this is my main concern. Maybe we can at least clear the error before returning from /start.

@snkas
Copy link
Contributor

snkas commented Feb 17, 2026

This is a bug that will be fixed -- by calling /start the deployment_error will be cleared.

Igor Smolyar added 5 commits February 17, 2026 15:02
Avoid test-only race workarounds and align test behavior with what
feldera client SDK users actually run.

Added client-side start observation (observe_start) for start(wait=False)
so callers can wait until lifecycle leaves STOPPED.

Removed deployment-status polling helper logic from platform tests and
switched helpers/tests to rely on Feldera client lifecycle APIs.

Polling for deployment-status transitions is now internal to FelderaClient
(_wait_for_deployment_status) instead of test helper code.
…latform tests

Use explicit condition polling instead of blind delays for logs,
connector state changes, checkpoint completion, and datagen progress.

This removes timing-based race masking, makes synchronization deterministic,
and makes each test wait state explicit about what condition must be reached.
…age polling

Replace test helper wait_for_cleared_storage() with FelderaClient.clear_storage(...)
This moves storage lifecycle synchronization to the client path used
by SDK users and removes duplicate test-side polling logic.
…rm tests

Expose compilation waiting as a client API with optional expected
program version, timeout, and poll interval.

Useful for users who run async create/update flows (wait=False) and
later need a deterministic compile barrier before start(), and for
patch/update workflows that must wait for a specific new program
version (not stale Success).

Unlike deployment-oriented waiter wait_for_status(),
this API tracks program lifecycle (program_status/program_version)
and fails fast on compile errors.
Igor Smolyar and others added 13 commits February 17, 2026 15:02
Raw ingress tests queried too early after async ingestion, causing
flaky failures.

Wait on completion tokens for successful ingests and explicitly poll for
CSV partial-ingest visibility in the BAD_REQUEST parse-error case.
Ensure waiters never block forever by normalizing None/infinite timeouts to
safe defaults across client, pipeline, HTTP health recovery, and test helpers.
Also standardize timeout measurements on time.monotonic() to avoid wall-clock
jumps affecting elapsed-time logic. This prevents hidden hangs while keeping
async operations deterministic under slow cluster conditions.
Replace test _wait_token() polling with client wait_for_token(...)
to align platform tests with feldera client behavior and remove
duplicate completion-status wait logic.
Switch timeout/elapsed polling paths in client and platform tests from
time.time() to time.monotonic() to avoid wall-clock jumps affecting wait
behavior. Keep time.time() in testutils_oidc.py because token expiry/cache
timestamps are wall-clock/epoch semantics, and in checkpoint-sync random
seeding where clock monotonicity is irrelevant.
Move generic condition polling from platform test helper into Feldera client as
FelderaClient.wait_for_condition(...), including timeout/poll validation and
usage docs. Keep test callsites stable by making helper a thin proxy to the new
client API.

Document usage with a new “Waiting for a Custom Condition” example
in python/docs/examples.rst.
This is another step in the wait-loop migration series to reduce async
start/stop/checkpoint race flakes and keep test synchronization aligned with
Feldera client behavior used by SDK users.

Replace remaining manual loops with wait_for_condition-based waits and clarify
helper usage: use helper wrapper only when no Pipeline object exists; otherwise
call pipeline.client.wait_for_condition(...) directly.
Update Python and docs.feldera.com docs for new client-side wait APIs and
lifecycle synchronization patterns:
 - document wait_for_program_success and wait_for_condition
 - add practical examples for async start and custom predicate waiting
 - clarify lifecycle async behavior and recommend SDK built-in methods over
   custom synchronization loops
 - fix typo: "predicate returns True when condition is met"
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@igorscs igorscs force-pushed the feldera-python-client-5597 branch from 2f1810c to 1ebcbb5 Compare February 17, 2026 23:04
Igor Smolyar and others added 2 commits February 17, 2026 17:25
Use logger.debug for waiter progress so tests can enable diagnostics
while production runs stay quiet at default log levels.
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation enterprise Issue related to Feldera Enterprise features. python Pull requests that update python code python-sdk Issues related to the feldera python sdk QA Testing and quality assurance RFC Request for Comments

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants

pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy