Conversation
In preparation for adding support for parallel encoding, wrap the batch passed to `Encoder::encode` in `Arc`, so it can later be cloned and sent to multiple workers. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Extend trait SerBatchReader two support parallel encoders: - SerBatchReader::partition_keys - partitions the batch into approximately equal sized chunks. - SerCursor::seek_key_exact - can be used to move the cursor to the start of a partition. - SerCursor::key() - can be used to compare the current key under the cursor with the end of the partition to stop iteration. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
All batches are now Send and Sync, so we eliminate the SyncSerBatchReader trait and add Send + Sync bounds to SerBatchReader. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
135be2f to
2e36d67
Compare
ryzhyk
left a comment
There was a problem hiding this comment.
- I feel that this framework is still too opinionated. Please consider removing workerpool and using the threadpool crate or something similar.
- Transports like Kafka support concurrent senders. I think we need to support that in the consumer API. I'll try to come up with something.
- We need to figure out how to add a benchmark to this of next PR.
crates/adapterlib/src/catalog.rs
Outdated
| } | ||
| } | ||
|
|
||
| pub struct SplitCursorBuilder { |
There was a problem hiding this comment.
Why do we need the builder type? Can't next_split return a cursor?
crates/adapterlib/src/catalog.rs
Outdated
| pub struct BatchSplitter { | ||
| batch: Arc<dyn SerBatchReader>, | ||
| bounds: Box<DynVec<DynData>>, | ||
| position: AtomicUsize, |
There was a problem hiding this comment.
Why does this need to be an atomic? I don't think BatchSplitter can be used from multiple threads.
In fact, I'm not sure why we need the BatchSplitter type at all. I haven't read the rest of the PR yet, but the simplest API would be just a function that creates a list of cursors based on a bounds array. Alternatively, if you need more flexibility, you may want to have a function that creates a cursor given start and end bounds. The advantage of that is you don't need a stateful object like BatchSplitter.
There was a problem hiding this comment.
Some of this is a remnant of the previous version.
Yes, we don't need the position to be atomic anymore.
There was a problem hiding this comment.
In that case, let's keep it simple and eliminate This BatchSplitter thing, unless there's something I missing.
Implements parallel encoders for the avro output encoder. Defines cursor types `SplitCursor<'_>` and `SplitCursorBuilder`. `SplitCursorBuilder` can be built from a list of bounds for a partition (typically created by the `partition_keys` method) and the index of the partition we want a cursor for. This builder type is required to be able to send cursors safely between threads. The `SplitCursor<'_>` type requires a reference to the batch, and therefore is not thread safe. To create a builder use the method: `SplitCursorBuilder::from_bounds(batch, bounds, idx, format)`; returns None if the `idx` partition cannot be created from the given bounds. `SplitCursor<'_>` can only be created by calling `SplitCursorBuilder::build()`. This cursor then is only valid for the current partition. Additionally, we create a `AvroParallelEncoder` type for each threadpool worker we want to run. This parallel encoder then sends the enocded batches back to the main thread via a channel. All encoding errors, if any, are gathered and returned when the `AvroEncoder::encode_indexed` method returns. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
Adds a benchmark for parallel encoders with Avro format. Seems like the sweet spot is 4 workers. All the code here was vibe-coded with Claude code. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
gz
left a comment
There was a problem hiding this comment.
can we name the threads in the threadpool?
overall still seems better to just take them 10k at a time and farm it out to the tokio runtime
| IsNone, | ||
| )] | ||
| #[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))] | ||
| struct BenchTestStruct { |
There was a problem hiding this comment.
we have like 10 of these. can we just have one or two structs in the codebase for thesting, but actually put all sqllib types in them?
| } | ||
|
|
||
| while cursor.val_valid() { | ||
| let w = cursor.weight(); |
There was a problem hiding this comment.
it looks like we silently ignore things when weights are not -1 or 1
panic would seem appropriate here?
There was a problem hiding this comment.
probably. Other weights should be filtered out by the indexed_operation_type() above
| }; | ||
|
|
||
| match (operation_type, self.update_format.clone()) { | ||
| (None, _) => (), |
There was a problem hiding this comment.
what does it mean when operation_type is none?
There was a problem hiding this comment.
Means this is no-op (nothing's changed)
| &mut self.value_buffer, | ||
| )?; | ||
| } | ||
| _ => (), |
There was a problem hiding this comment.
would this indicate a problem in our code somewhere? does it need some logging?
There was a problem hiding this comment.
It indicates that this record will be handled below in the w == -1 branch.
This code is not new, it was just moved around in this PR.
| /// | ||
| /// * `num_partitions` - number of partitions to create. | ||
| /// * `bounds` - output vector to store the partition boundaries. | ||
| fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<Self::Key>) |
There was a problem hiding this comment.
I dont understand why we need to sample keys for this, are there situations where a output view has many values per key
| } | ||
|
|
||
| fn data_factory(&self) -> &'static dyn Factory<DynData> { | ||
| self.batch.inner().factories().key_factory() |
There was a problem hiding this comment.
this is called data_factory but it returns key_factory. is it the right factory?
There was a problem hiding this comment.
Yes, it does return the correct factory type, but yes it might make more sense to rename it.
| /// the number of workers to run in parallel. | ||
| /// Default: 4 | ||
| #[serde(default = "default_encoder_workers")] | ||
| pub workers: usize, |
There was a problem hiding this comment.
can we just call this threads instead of workers? it's confusing because we already have workers for dbsp workers
Adds documentation for the `workers` parameter on the Avro format configuration. Default: 4. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

#5340