Skip to content

Synchronize partitions#5629

Open
blp wants to merge 6 commits intomainfrom
synchronize-partitions
Open

Synchronize partitions#5629
blp wants to merge 6 commits intomainfrom
synchronize-partitions

Conversation

@blp
Copy link
Member

@blp blp commented Feb 14, 2026

Describe Manual Test Plan

I didn't test this manually but I did write and run a new unit test for the feature.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

blp added 5 commits February 13, 2026 10:11
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
This prepares for adding another configuration setting.

Signed-off-by: Ben Pfaff <blp@feldera.com>
The following commit will add another way to stage records.  This commit
makes that one easier to understand.

This commit should not change any behavior.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Issue: #5607

Signed-off-by: Ben Pfaff <blp@feldera.com>
@blp blp requested a review from ryzhyk February 14, 2026 00:03
@blp blp self-assigned this Feb 14, 2026
@blp blp added connectors Issues related to the adapters/connectors crate rust Pull requests that update Rust code user-reported Reported by a user or customer labels Feb 14, 2026
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Copy link
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks clean. One thing I couldn't figure out is how we make sure that some of the partition receivers don't buffer unbounded messages while waiting for stragglers.


- If one or a few partitions have timestamps far behind the others, only
those partitions will be processed until all the old events are
processed. (This is the flip side of the previous pitfall.)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink has a feature called source idleness detection to deal with this problem. We might need something similar.

@blp
Copy link
Member Author

blp commented Feb 15, 2026

This looks clean. One thing I couldn't figure out is how we make sure that some of the partition receivers don't buffer unbounded messages while waiting for stragglers.

Hmm, I thought I had that figured out, but now that I look again, I was wrong.

I'll add a per-partition queuing limit.

@ryzhyk
Copy link
Contributor

ryzhyk commented Feb 15, 2026

This looks clean. One thing I couldn't figure out is how we make sure that some of the partition receivers don't buffer unbounded messages while waiting for stragglers.

Hmm, I thought I had that figured out, but now that I look again, I was wrong.

I'll add a per-partition queuing limit.

Can this affect the connector even without this new feature, in the sense that there's nothing preventing partition readers from accumulating unbounded data?

@blp
Copy link
Member Author

blp commented Feb 15, 2026

This looks clean. One thing I couldn't figure out is how we make sure that some of the partition receivers don't buffer unbounded messages while waiting for stragglers.

Hmm, I thought I had that figured out, but now that I look again, I was wrong.
I'll add a per-partition queuing limit.

Can this affect the connector even without this new feature, in the sense that there's nothing preventing partition readers from accumulating unbounded data?

I took another look.

We account data as buffered as soon as it comes in on the partition reader thread, so the total amount buffered is limited by the buffer limit plus however long it takes the controller to tell us to pause. So this will limit the amount buffered in either case.

However, that means there's another complication: we need to make sure that every partition can buffer at least one record, even if the overall buffers are filled, because otherwise we can be full of buffered records that can't be input to the circuit.

Also, I think that the idea of records that are buffered but can't be input to the circuit will cause livelock in the controller, which will continuously try to start a step since we've told it that there is data buffered.

So, there might need to be further distinction between the modes:

  • In synchronized mode, we need to do per-partition backpressure and we need to (somehow) account records as buffered only when they are available to be input to the circuit.
  • In non-synchronized mode, we can use the existing strategies.

I'll continue to think about this today. Thanks for asking probing questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate rust Pull requests that update Rust code user-reported Reported by a user or customer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants