Skip to content

Synchronization of Kafka input partitions #5607

@blp

Description

@blp

Feldera garbage collection (lateness) only produces correct output if input arrives approximately in order, with the bounds on "approximately" defining the lateness. The Feldera Kafka input connector can reorder input when there are multiple partitions:

  • If partitions start at different times, then reading all the partitions in parallel will naturally consume data out of order.
  • Even if they start at the same time, partitions might contain events at different rates.
  • Even if the partitions start at the same time and have the same number of events per unit time, if partitions are spread across brokers, different brokers may fetch data at different rates.
  • Even if all of the partitions are on a single broker, one cannot expect all of the partitions to naturally remain exactly in sync forever.

Therefore, one must have some way to synchronize consumption across partitions. Two strategies present themselves:

  • Most generically, one might devise a way to synchronize input connectors generically; then, Kafka partitions could be broken apart into one connector per partition (either internally by Feldera or explicitly by the user) and synchronized as individual connectors.
  • Kafka-connector specific synchronization, where the connector internally consumes from partitions approximately in order (within specified bounds).

The generic approach is for now not planned. For the Kafka-specific approach, we have to decide on what basis to synchronize the streams. Again, two approaches come to mind:

  • Synchronize based on Kafka message timestamp. This has the simplest implementation. It's not clear whether timestamps are strictly monotonically increasing. It's not clear whether different brokers have clocks synchronized sufficiently. If it works, it's easy.
  • Synchronize based on column used for lateness. This is more complicated because the input connectors do not currently know anything about the semantics of the data they parse.

Possible pitfalls:

  • This will require the input connector to pause reading partitions sometimes. It looks like, in the input connector implementation, Feldera can simply stop reading from a partition if its messages are "too late" and that that will do the right thing. librdkafka is remarkably hard to use, so we have to hope that this is true.
  • This could cause surprises if an event with a timestamp far in the future is added to a partition, because that event will never be processed.
  • This could cause performance surprises if one or a few partitions are far behind the others, because only those partitions will be processed until all the old events are processed. (This is the other side of the previous pitfall.)
  • One or more empty partitions will prevent any data from being processed at all, because there is no way to know the timestamp for the first event that will be added to that partition.
  • In a topic with N partitions, N-1 events will always be left unprocessed (one in each of N-1 partitions), because there is no way to know the timestamp for the next event to be added to the partition whose events have been completely processed.

Metadata

Metadata

Assignees

Labels

connectorsIssues related to the adapters/connectors crateuser-reportedReported by a user or customer

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions