[adapters] Fix connector-initiated multihost transactions.#5542
Merged
[adapters] Fix connector-initiated multihost transactions.#5542
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR refactors transaction management logic to improve clarity and fixes a bug where connector-initiated transaction requests were being prematurely cleared in multihost configurations.
Changes:
- Renamed and clarified transaction state checking methods with improved documentation
- Added
is_active()method to distinguish between active and ongoing transactions - Refactored
advance_transaction_state()from a single match statement into explicit if-else branches with detailed comments
crates/adapters/src/controller.rs
Outdated
| // here; instead, we'd be ensuring that a transaction is | ||
| // running. | ||
| // | ||
| // - They cannot be all be committed, because |
There was a problem hiding this comment.
Corrected spelling of 'all be' to 'all'.
Suggested change
| // - They cannot be all be committed, because | |
| // - They cannot all be committed, because |
This commit is primarily intended to clarify the transaction code. I found it somewhat puzzling before partly because is_ongoing() and is_ready_to_commit() seemed a little confusing, but also because advance_transaction_state() was one `match` statement that I found obscure. This commit rewrites these functions so that I can understand them better, adding more comments and more explicit logic. However, some of the changes are bug fixes for the case where a connector initiates a transaction when multihost is enabled. In this case, what would happen before was something like this: 1. Connector requests starting a transaction. 2. Coordinator runs a step before it receives the connector's request. 3. Coordinator receives the connector's request. 4. Coordinator starts a transaction. Unfortunately, as a side effect, step 2 would discard the connector's request entirely because advance_transaction_state(), when it was called if there was no ongoing transaction and none to initiate, always called transaction_info.initiators.clear(), which clears all the transaction requests including those from connectors. This commit changes that case to instead just assert that there's no active API transaction (in the multihost case), instead of clearing all of them, which as a side effect fixes the bug. Issue: feldera/cloud#1372 Signed-off-by: Ben Pfaff <blp@feldera.com>
swanandx
approved these changes
Feb 2, 2026
Contributor
swanandx
left a comment
There was a problem hiding this comment.
thanks for the fix!
assuming all hosts logging Starting transaction .. / Committing.. is intended.
[txns host=0] 2026-02-02T07:51:34.217334Z INFO dbsp_adapters::controller: Starting transaction 1
[txns host=1] 2026-02-02T07:51:34.217560Z INFO dbsp_adapters::controller: Starting transaction 1
[txns host=1] 2026-02-02T07:51:34.218593Z WARN dbsp::storage::backend::posixio_impl: /pipeline-storage/status.json.mut: unable to delete dropped file: No such file or directory (os error 2)
[txns host=1] 2026-02-02T07:51:34.218681Z WARN dbsp_adapters::server: status.json: failed to write to storage (/pipeline-storage/status.json.mut: rename failed: entity not found)
[txns host=0] 2026-02-02T07:51:34.219036Z WARN dbsp::storage::backend::posixio_impl: /pipeline-storage/status.json.mut: unable to delete dropped file: No such file or directory (os error 2)
[txns host=0] 2026-02-02T07:51:34.219080Z WARN dbsp_adapters::server: status.json: failed to write to storage (/pipeline-storage/status.json.mut: rename failed: entity not found)
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.228504Z INFO stream{path="/coordination/transaction/status" ordinal=0}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: Some(true), requests: {} }
[txns host=1] 2026-02-02T07:51:34.231961Z INFO dbsp_adapters::server: pause: Transitioning from Running to Paused
[txns host=0] 2026-02-02T07:51:34.231982Z INFO dbsp_adapters::server: pause: Transitioning from Running to Paused
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232245Z INFO stream{path="/coordination/status" ordinal=0}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Paused, runtime_status_details: String(""), runtime_desired_status: Paused }
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232266Z INFO stream{path="/coordination/status" ordinal=1}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Paused, runtime_status_details: String(""), runtime_desired_status: Paused }
[txns host=1] 2026-02-02T07:51:34.232669Z INFO dbsp_adapters::server: start: Transitioning from Paused to Running
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232519Z INFO stream{path="/coordination/transaction/status" ordinal=1}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: Some(false), requests: {} }
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232533Z INFO stream{path="/coordination/transaction/status" ordinal=0}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: Some(false), requests: {} }
[txns host=0] 2026-02-02T07:51:34.232629Z INFO dbsp_adapters::server: start: Transitioning from Paused to Running
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232811Z INFO stream{path="/coordination/status" ordinal=1}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Running, runtime_status_details: String(""), runtime_desired_status: Running }
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232919Z INFO stream{path="/coordination/status" ordinal=0}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Running, runtime_status_details: String(""), runtime_desired_status: Running }
[txns host=0] 2026-02-02T07:51:34.232949Z INFO dbsp_adapters::controller: Committing transaction 1
[txns host=1] 2026-02-02T07:51:34.233003Z INFO dbsp_adapters::controller: Committing transaction 1
[txns host=1] 2026-02-02T07:51:34.233817Z INFO dbsp_adapters::controller: Transaction 1 committed
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.233933Z INFO stream{path="/coordination/transaction/status" ordinal=1}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: None, requests: {} }
[txns host=1] 2026-02-02T07:51:34.234302Z WARN dbsp::storage::backend::posixio_impl: /pipeline-storage/status.json.mut: unable to delete dropped file: No such file or directory (os error 2)
[txns host=1] 2026-02-02T07:51:34.234318Z WARN dbsp_adapters::server: status.json: failed to write to storage (/pipeline-storage/status.json.mut: rename failed: entity not found)
[txns host=0] 2026-02-02T07:51:34.233986Z INFO dbsp_adapters::controller: Transaction 1 committed
Member
Author
Yes (I think we might need more logging from the coordinator itself). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This commit is primarily intended to clarify the transaction code. I found it somewhat puzzling before partly because is_ongoing() and is_ready_to_commit() seemed a little confusing, but also because advance_transaction_state() was one
matchstatement that I found obscure. This commit rewrites these functions so that I can understand them better, adding more comments and more explicit logic.However, some of the changes are bug fixes for the case where a connector initiates a transaction when multihost is enabled. In this case, what would happen before was something like this:
Unfortunately, as a side effect, step 2 would discard the connector's request entirely because advance_transaction_state(), when it was called if there was no ongoing transaction and none to initiate, always called transaction_info.initiators.clear(), which clears all the transaction requests including those from connectors.
This commit changes that case to instead just assert that there's no active API transaction (in the multihost case), instead of clearing all of them, which as a side effect fixes the bug.
Issue: https://github.com/feldera/cloud/issues/1372