[python] move async wait/synchronization logic into client APIs and update docs/tests#5633
[python] move async wait/synchronization logic into client APIs and update docs/tests#5633
Conversation
ryzhyk
left a comment
There was a problem hiding this comment.
This looks much cleaner. My main concern is with the observe_start flag. It seems that we need better support for this in the backend (seen inline comment).
|
|
||
| from feldera.enums import BootstrapPolicy | ||
|
|
||
| pipeline.start( |
There was a problem hiding this comment.
This seems like something that shouldn't be needed. Starting a previously failed pipeline is a totally normal situation. The caller may not know or care that the pipeline previously failed. I wonder if observe_start=True should be the default (only?) behavior of start? Would this still be needed then?
There was a problem hiding this comment.
yeah, it is cumbersome heuristic currently. If only we could wait for specific transaction ID to complete ;)
See comments below.
There was a problem hiding this comment.
If they do not care, then they would pass ignore_deployment_error=True (or do care, the inverse). I think we want to not ignore to be default.
There was a problem hiding this comment.
we need some evidence that pipeline /start operation is in progress, as we cannot rely for status polling on stalled states from the previous run. We have to use this fragile heuristic.
| ) | ||
|
|
||
| if not wait: | ||
| if observe_start: |
There was a problem hiding this comment.
Is my understanding correct that there is no way to do this well today: if we return immediately after /start, the pipeline can still be stopped during the next check. If we wait for the pipeline to not be stopped, we may wait forever, since it may have failed and gone back to the stopped state during startup.
It seems that the ideal solution would make sure that the pipeline leaves the stopped state before returning 202 from /start. @snkas , do you think this is possible to achieve.
Alternatively, can we use incarnation ids to distinguish between STOPPED states from the previous and current runs of the pipeline?
There was a problem hiding this comment.
Adding backend support for transaction or operation ID would make the whole system easier to reason about, easier to debug, and much more deterministic for everyone. Our synchronizations stop being “guess from current state” and become “wait for this exact operation” and then just report state/error. That is a huge shift in clarity, no more fragile polling heuristics, fewer race-condition workarounds in tests.
There was a problem hiding this comment.
I understand, although transaction ids would introduce a lot of backend complexity. One mechanism that may help in a similar way specifically for the /start operation is pipeline incarnation id's. Let's engage @snkas, who's the ultimate authority on this, in the discussion.
There was a problem hiding this comment.
I don't fully understand all backend constraints and complexity, so I’m trying to propose the smallest possible change and keep the backend contract with client as simple as possible. It would be nice to have a simple universal transaction concept for async operations. From the client/test side this looks like the smallest and simplest way to make synchronization deterministic:
- each async request returns a transaction_id
- backend reports that transaction’s state (pending|succeeded|failed+error)
- client waits on that exact transaction_id .
This would reduce status-guessing heuristics, improve error attribution, and simplify both SDK and test wait logic with one consistent pattern across async endpoints.
There was a problem hiding this comment.
The reason this may not be straightforward is that we use the desired state architecture internally, i.e., we don't track the status and progress of individual state change requests. Such a request generally just updates the desired state of the system. The manager continuously tries to reconcile this desired state with the actual state by driving the pipeline automaton through various state transitions.
There was a problem hiding this comment.
We know that this won't necessarily hold true for our customers. They may mix client applications with UI-induced actions.
Yes, they may, and all individual API endpoints of the backend are atomic when it comes to pipeline management (i.e., control plane). It would be the responsibility of the user to ensure that any concurrent interactions they do, do not interfere with each other. For example, looking in the Web Console at a pipeline should not have side effects. (This also ties into Gerd's comment)
If we want to detect concurrent interaction, I would propose something along the lines of the dibs mechanism I described in my prior comment. It seems simpler than doing transaction/request-id tracking.
I started this thread to address a specific problem: when starting a pipeline, we don't seem to have a good way to distinguish between a pipeline that (1) hasn't yet transitioned from Stopped to Provisioning and (2) a pipeline that transitioned to Provisioning or one of the later states and failed. In both cases the pipeline is Stopped.
I was wondering if we could change the semantics of /start to return success only after the pipeline has moved from Stopped to any other state. In this case (1) is no longer possible.
This can already be distinguished by the deployment_error being set. (1) will not have it set, (2) will have it set. After pipeline.start() calls /start, it observes and if it encounters a deployment_error, it throws an error.
There was a problem hiding this comment.
After it was able to do one successful /start request, it just monitors until it achieves to become the initial desired status.
The problem that @ryzhyk mentioned, we have no idea if we are reading old staled desired status or a new one, the currently desired.
We can see current state "stopped" and current desired state "stopped" so we will bailout immediately.
But these were staled states from the previous run, the backend just didn't update them yet.
There was a problem hiding this comment.
When you call /start, the desired status gets set. Not sure what you mean with old staled desired status?
There was a problem hiding this comment.
Is this related to why I had to wait for the incarnation ID to change in https://github.com/feldera/cloud/commit/819b42bb7305795e7d0873be596d65e94435a67e?
There was a problem hiding this comment.
When you call
/start, the desired status gets set. Not sure what you mean with old staled desired status?
My question is "when it gets set". Is it set before /start returns? Or asynchronously after /start returns?
| Wait until pipeline 'pipeline_name' has 'desired' deployment status: | ||
|
|
||
| - If 'desired' is a string, until that is the status. | ||
| - If 'desired' is a function, until it returns true when passed | ||
| the deployment status. |
There was a problem hiding this comment.
FWIW, I only introduced the ability to pass a function to be able to wait for "not stopped", and that's the only current use. It seems like that's an important special case based on the other commits, so if it's better to special case it here, too, that's OK.
| # Create/update asynchronously. | ||
| client.create_or_update_pipeline(pipeline, wait=False) | ||
|
|
||
| # Wait for compile success of at least program version 2. |
There was a problem hiding this comment.
Why not exactly program version 2?
There was a problem hiding this comment.
Is it because of any automatic upgrade mechanism?
There was a problem hiding this comment.
Is it because of any automatic upgrade mechanism?
not sure, this was the orig implementation, I tried not to change behavior of what we already have and already use in our tests
There was a problem hiding this comment.
This function does become part of the public API right? We should consider what makes most sense for a user, intuitively I feel like it would be an exact version? I can probably give more detailed feedback after having tried out the PR locally to get a feel for the new API functions
|
|
||
| from feldera.enums import BootstrapPolicy | ||
|
|
||
| pipeline.start( |
There was a problem hiding this comment.
If they do not care, then they would pass ignore_deployment_error=True (or do care, the inverse). I think we want to not ignore to be default.
| ) | ||
|
|
||
| if not wait: | ||
| if observe_start: |
There was a problem hiding this comment.
We know that this won't necessarily hold true for our customers. They may mix client applications with UI-induced actions.
Yes, they may, and all individual API endpoints of the backend are atomic when it comes to pipeline management (i.e., control plane). It would be the responsibility of the user to ensure that any concurrent interactions they do, do not interfere with each other. For example, looking in the Web Console at a pipeline should not have side effects. (This also ties into Gerd's comment)
If we want to detect concurrent interaction, I would propose something along the lines of the dibs mechanism I described in my prior comment. It seems simpler than doing transaction/request-id tracking.
I started this thread to address a specific problem: when starting a pipeline, we don't seem to have a good way to distinguish between a pipeline that (1) hasn't yet transitioned from Stopped to Provisioning and (2) a pipeline that transitioned to Provisioning or one of the later states and failed. In both cases the pipeline is Stopped.
I was wondering if we could change the semantics of /start to return success only after the pipeline has moved from Stopped to any other state. In this case (1) is no longer possible.
This can already be distinguished by the deployment_error being set. (1) will not have it set, (2) will have it set. After pipeline.start() calls /start, it observes and if it encounters a deployment_error, it throws an error.
The problem that the client doesn't know if the error is a stale error from previous run or this is a new error. |
Right, this is my main concern. Maybe we can at least clear the error before returning from |
|
This is a bug that will be fixed -- by calling |
Avoid test-only race workarounds and align test behavior with what feldera client SDK users actually run. Added client-side start observation (observe_start) for start(wait=False) so callers can wait until lifecycle leaves STOPPED. Removed deployment-status polling helper logic from platform tests and switched helpers/tests to rely on Feldera client lifecycle APIs. Polling for deployment-status transitions is now internal to FelderaClient (_wait_for_deployment_status) instead of test helper code.
…latform tests Use explicit condition polling instead of blind delays for logs, connector state changes, checkpoint completion, and datagen progress. This removes timing-based race masking, makes synchronization deterministic, and makes each test wait state explicit about what condition must be reached.
…age polling Replace test helper wait_for_cleared_storage() with FelderaClient.clear_storage(...) This moves storage lifecycle synchronization to the client path used by SDK users and removes duplicate test-side polling logic.
…rm tests Expose compilation waiting as a client API with optional expected program version, timeout, and poll interval. Useful for users who run async create/update flows (wait=False) and later need a deterministic compile barrier before start(), and for patch/update workflows that must wait for a specific new program version (not stale Success). Unlike deployment-oriented waiter wait_for_status(), this API tracks program lifecycle (program_status/program_version) and fails fast on compile errors.
Raw ingress tests queried too early after async ingestion, causing flaky failures. Wait on completion tokens for successful ingests and explicitly poll for CSV partial-ingest visibility in the BAD_REQUEST parse-error case.
Ensure waiters never block forever by normalizing None/infinite timeouts to safe defaults across client, pipeline, HTTP health recovery, and test helpers. Also standardize timeout measurements on time.monotonic() to avoid wall-clock jumps affecting elapsed-time logic. This prevents hidden hangs while keeping async operations deterministic under slow cluster conditions.
Replace test _wait_token() polling with client wait_for_token(...) to align platform tests with feldera client behavior and remove duplicate completion-status wait logic.
Switch timeout/elapsed polling paths in client and platform tests from time.time() to time.monotonic() to avoid wall-clock jumps affecting wait behavior. Keep time.time() in testutils_oidc.py because token expiry/cache timestamps are wall-clock/epoch semantics, and in checkpoint-sync random seeding where clock monotonicity is irrelevant.
Move generic condition polling from platform test helper into Feldera client as FelderaClient.wait_for_condition(...), including timeout/poll validation and usage docs. Keep test callsites stable by making helper a thin proxy to the new client API. Document usage with a new “Waiting for a Custom Condition” example in python/docs/examples.rst.
This is another step in the wait-loop migration series to reduce async start/stop/checkpoint race flakes and keep test synchronization aligned with Feldera client behavior used by SDK users. Replace remaining manual loops with wait_for_condition-based waits and clarify helper usage: use helper wrapper only when no Pipeline object exists; otherwise call pipeline.client.wait_for_condition(...) directly.
Update Python and docs.feldera.com docs for new client-side wait APIs and lifecycle synchronization patterns: - document wait_for_program_success and wait_for_condition - add practical examples for async start and custom predicate waiting - clarify lifecycle async behavior and recommend SDK built-in methods over custom synchronization loops - fix typo: "predicate returns True when condition is met"
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
2f1810c to
1ebcbb5
Compare
Use logger.debug for waiter progress so tests can enable diagnostics while production runs stay quiet at default log levels.
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Motivation
See discussion about design and implementation in #5597
Feldera lifecycle and ingestion paths are asynchronous (start/stop/clear/compile/ingest are accepted first, then converge). We had multiple test-local polling/sleep helpers to avoid races, which made tests pass while SDK users could still hit the same race windows.
This PR moves synchronization behavior into Python client SDK built-ins, makes wait behavior bounded and monotonic, and updates tests/docs to use the same APIs users rely on.
This PR is the first stage of improving async determinism in Python tests and the Feldera Python client. In this stage, we moved key wait/synchronization logic from test-local helpers into client SDK built-in methods, replaced ad-hoc
polling/sleeps in tests, and documented the new usage patterns. We plan to continue in follow-up stages with tighter wait_for_status fail-fast/error diagnostics, additional client wait APIs for connector/checkpoint/storage flows, and further consolidation of wait-loop internals.
Longer term, we should align this with backend support for transition/operation IDs on /start and /stop to eliminate stale-state ambiguity at the source. As far as I know, @snkas is already working on related backend improvements, so we can coordinate and combine efforts.
What changed
Client API improvements
FelderaClient.wait_for_program_success(...).FelderaClient.wait_for_condition(...).start_pipeline(..., observe_start=True)and correspondingPipeline.start(..., observe_start=True).ignore_deployment_errorhandling for expected start-recovery flows.time.monotonic()for elapsed/timeout logic.Test changes
FelderaClient.wait_for_condition(...).Documentation updates
SDK built-in methods instead of custom synchronization loops.
Outcome
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes