Skip to content

Concurrent encoders#5361

Merged
abhizer merged 7 commits intomainfrom
issue5340
Feb 9, 2026
Merged

Concurrent encoders#5361
abhizer merged 7 commits intomainfrom
issue5340

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Jan 2, 2026

@ryzhyk ryzhyk added the connectors Issues related to the adapters/connectors crate label Jan 2, 2026
@gz gz added the marketing Relevant for marketing content label Jan 20, 2026
In preparation for adding support for parallel encoding, wrap the batch passed
to `Encoder::encode` in `Arc`, so it can later be cloned and sent to multiple
workers.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Extend trait SerBatchReader two support parallel encoders:
- SerBatchReader::partition_keys - partitions the batch into approximately equal sized chunks.
- SerCursor::seek_key_exact - can be used to move the cursor to the start of a partition.
- SerCursor::key() - can be used to compare the current key under the cursor with the end
  of the partition to stop iteration.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
All batches are now Send and Sync, so we eliminate the SyncSerBatchReader trait
and add Send + Sync bounds to SerBatchReader.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@abhizer abhizer force-pushed the issue5340 branch 2 times, most recently from 135be2f to 2e36d67 Compare February 2, 2026 16:05
Copy link
Contributor Author

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I feel that this framework is still too opinionated. Please consider removing workerpool and using the threadpool crate or something similar.
  • Transports like Kafka support concurrent senders. I think we need to support that in the consumer API. I'll try to come up with something.
  • We need to figure out how to add a benchmark to this of next PR.

}
}

pub struct SplitCursorBuilder {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the builder type? Can't next_split return a cursor?

pub struct BatchSplitter {
batch: Arc<dyn SerBatchReader>,
bounds: Box<DynVec<DynData>>,
position: AtomicUsize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be an atomic? I don't think BatchSplitter can be used from multiple threads.

In fact, I'm not sure why we need the BatchSplitter type at all. I haven't read the rest of the PR yet, but the simplest API would be just a function that creates a list of cursors based on a bounds array. Alternatively, if you need more flexibility, you may want to have a function that creates a cursor given start and end bounds. The advantage of that is you don't need a stateful object like BatchSplitter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of this is a remnant of the previous version.
Yes, we don't need the position to be atomic anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, let's keep it simple and eliminate This BatchSplitter thing, unless there's something I missing.

Implements parallel encoders for the avro output encoder.
Defines cursor types `SplitCursor<'_>` and `SplitCursorBuilder`.

`SplitCursorBuilder` can be built from a list of bounds for a partition
(typically created by the `partition_keys` method) and the index of the
partition we want a cursor for. This builder type is required to be able
to send cursors safely between threads. The `SplitCursor<'_>` type
requires a reference to the batch, and therefore is not thread safe.
To create a builder use the method:
`SplitCursorBuilder::from_bounds(batch, bounds, idx, format)`; returns
None if the `idx` partition cannot be created from the given bounds.

`SplitCursor<'_>` can only be created by calling
`SplitCursorBuilder::build()`. This cursor then is only valid for the
current partition.

Additionally, we create a `AvroParallelEncoder` type for each
threadpool worker we want to run. This parallel encoder then sends the
enocded batches back to the main thread via a channel.

All encoding errors, if any, are gathered and returned when the
`AvroEncoder::encode_indexed` method returns.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
Adds a benchmark for parallel encoders with Avro format. Seems like the
sweet spot is 4 workers.

All the code here was vibe-coded with Claude code.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
@abhizer abhizer marked this pull request as ready for review February 8, 2026 20:44
@abhizer
Copy link
Contributor

abhizer commented Feb 8, 2026

The sweet spot seems to be 4 workers.

Screenshot 2026-02-09 at 02 30 28

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
@ryzhyk ryzhyk changed the title WIP: Concurrent encoders Concurrent encoders Feb 9, 2026
@ryzhyk ryzhyk enabled auto-merge February 9, 2026 18:36
Copy link
Contributor

@gz gz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we name the threads in the threadpool?

overall still seems better to just take them 10k at a time and farm it out to the tokio runtime

IsNone,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
struct BenchTestStruct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have like 10 of these. can we just have one or two structs in the codebase for thesting, but actually put all sqllib types in them?

}

while cursor.val_valid() {
let w = cursor.weight();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we silently ignore things when weights are not -1 or 1

panic would seem appropriate here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably. Other weights should be filtered out by the indexed_operation_type() above

};

match (operation_type, self.update_format.clone()) {
(None, _) => (),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean when operation_type is none?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Means this is no-op (nothing's changed)

&mut self.value_buffer,
)?;
}
_ => (),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this indicate a problem in our code somewhere? does it need some logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It indicates that this record will be handled below in the w == -1 branch.

This code is not new, it was just moved around in this PR.

///
/// * `num_partitions` - number of partitions to create.
/// * `bounds` - output vector to store the partition boundaries.
fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<Self::Key>)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont understand why we need to sample keys for this, are there situations where a output view has many values per key

}

fn data_factory(&self) -> &'static dyn Factory<DynData> {
self.batch.inner().factories().key_factory()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is called data_factory but it returns key_factory. is it the right factory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does return the correct factory type, but yes it might make more sense to rename it.

/// the number of workers to run in parallel.
/// Default: 4
#[serde(default = "default_encoder_workers")]
pub workers: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just call this threads instead of workers? it's confusing because we already have workers for dbsp workers

@ryzhyk ryzhyk added this pull request to the merge queue Feb 9, 2026
@abhizer abhizer removed this pull request from the merge queue due to a manual request Feb 9, 2026
Adds documentation for the `workers` parameter on the Avro format
configuration. Default: 4.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
@abhizer abhizer enabled auto-merge February 9, 2026 20:21
@abhizer abhizer added this pull request to the merge queue Feb 9, 2026
Merged via the queue into main with commit 03b5144 Feb 9, 2026
1 check passed
@abhizer abhizer deleted the issue5340 branch February 9, 2026 21:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate marketing Relevant for marketing content

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants