-
Notifications
You must be signed in to change notification settings - Fork 99
Description
We propose introducing support for external tables as first-class objects in Feldera. Conceptually, an external table is a table in an external database or data warehouse that can be queried incrementally by a Feldera pipeline.
External tables take advantage of predicate pushdown to query the table without transferring its entire state to Feldera. The pipeline ingests only the records required to evaluate the views in the program. As the pipeline definition changes, it may ingest additional records.
We break up this feature into several components that can be refined independently.
Syntax
We propose adding an EXTERNAL keyword to table declaration. The compiler validates that the table uses one of the input connectors that support external table semantics (basically, this includes connectors that support predicate pushdown and snapshot and follow mode). At runtime, this annotation activates the mechanism described below.
Predicate pushdown in SQL
The SQL compiler performs static dataflow analysis to determine a set of predicates that describe the set ofrecords that must be ingested from the external table. This is the standard filter pushdown optimization. I don't know how well Calcite implements it and how much additional work we need to do to propagate it across views (we can only propagate predicates through LOCAL views). In general, computing precise predicates is undecidable.
In practice, the user may need to structure the pipeline in a particular way to take advantage of external tables, e.g., query such tables via intermediate views that filter out irrelevant records.
Predicates computed at this step are expressed using a formal language, e.g., a subset of SQL or a lower-level IR and are stored as part of the dataflow graph of the pipeline.
Example predicates:
- Simple where clauses. For example,
WHERE category = 'invoices'. - Temporal filters.
WHERE ts >= NOW() - 6 months. - Complicated: select records that participate in a join with another table or view:
select * from external_table inner join other_table on .... I believe there are ways to achieve this, but this may be outside the scope of the RFC for now.
Predicate pushdown to the external database
At runtime, the connector attached to the external table converts predicates computed by the compiler into a query in the query language of the database it is connected to, e.g., Delta Lake and Iceberg connectors can use Datafusion SQL.
Modifying and bootstrapping the pipeline
Modifying and bootstrapping a pipeline with an external table may require ingesting additional data from the external database to populate newly created views while keeping the state of the pipeline consistent with the external database.
This requires the external database connector to support snapshot-and-follow semantics. Specifically, it should be able to load the snapshot of the table for a given table version and follow a stream of changes from a specific version.
-
Checkpoint the pipeline, including the external table input connector. Let's assume the checkpointed vable version is V.
-
During bootstrapping, disable parts of the circuit that compute views unchanged in the new version of the pipeline and pushdown filters associated with these views (let's call these filters F1).
-
Activate parts of the pipeline that compute new and modified views, and their pushdown filters (F2).
-
Run the activated parts of the circuit, including running the input connector in snapshot mode to ingest version V of the table with pushdown filters F2.
-
Bootstrapping is complete. Activate the entire circuit, including activating the connector in follow mode from version V with filter F1 || F2.
Example
Original program:
create external table external_table (
id bigint not null primary key,
category string,
ts timestamp
) with (
'connectors' = '[{
"transport": {
"name": "delta_table_input",
"config": {
"uri": "s3://...",
}
}
}]');
create view v1 as select * from external_table
where
category = 'invoices' and
ts >= NOW() - 6 months;Pushdown filter F1:
category = 'invoices' and
ts >= NOW() - 6 monthsModified program introduces an additional view:
create view v1 as select * from external_table
where
category = 'deposits' and
ts >= NOW() - 12 months;Pushdown filter F2 used during bootstrapping:
category = 'deposits' and
ts >= NOW() - 12 monthsCombined pushdown filter used after bootstrapping:
category = 'invoices' and ts >= NOW() - 6 months
OR
category = 'deposits' and ts >= NOW() - 12 monthsOpen questions
- How do we run ad hoc queries over external tables? Do we forward them to the external DB by building a DataFusion table provider for it (already available for delta and iceberg) or do we run them over the local copy of the state, which means that only locally cached state is queryable?
- Related to the previous question: do we materialize external tables locally? If we do, we may not have an easy way to delete no-longer-used data when some of the views are deleted or modified (not impossible, but tricky). If we don't, we cannot have locally enforceable primary keys and will have to rely on the connector for exactly once delivery (which some of them actually implement already).