Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds compiler support for star joins, a new join operator variant introduced in #5517. Star joins are used to efficiently combine results from multiple aggregates over the same input collection. The implementation currently converts star joins to standard incremental joins at the end of compilation, with native DBSP support to be added in future work.
Changes:
- Introduced star join operators (
DBSPStarJoinOperatorandDBSPStarJoinIndexOperator) and their base class - Modified compilation pipeline to handle star join operators throughout transformation passes
- Refactored aggregate combination logic to use star joins instead of balanced binary join trees
- Added incremental/non-incremental interface markers to operators for better type safety
Reviewed changes
Copilot reviewed 63 out of 63 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| DBSPStarJoinOperator.java | New star join operator for unindexed results |
| DBSPStarJoinIndexOperator.java | New star join operator for indexed results |
| DBSPStarJoinBaseOperator.java | Base class for star join operators |
| ImplementJoins.java | Renamed from ExpandJoins.java, implements star joins using balanced tree of standard joins |
| CalciteToDBSPCompiler.java | Modified aggregate combination to use star joins instead of binary join trees |
| IIncremental.java | New interface to mark incremental operators |
| ILinear.java | Modified to extend IIncremental interface |
| RemoveUnusedFields.java | Added support for removing unused fields from star join operators |
| ExpandOperators.java | Added delta expansion logic for star join operators |
| OptimizeProjectionVisitor.java | Extended to push projections through star join operators |
| CircuitVisitor.java | Added visitor methods for star join operators |
| DBSPExpression.java | Modified field() to return simplified expressions automatically |
| Regression2Tests.java | Added test for issue 5520 (EXP overflow to infinity) |
Comments suppressed due to low confidence (4)
sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/unusedFields/RemoveUnusedFields.java:1
- The condition is inverted. The variable should be set to true when there ARE unused fields, not when there are no unused fields. This should be
if (map.hasUnusedFields(1)).
sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java:1 - The condition
true ||makes the rest of the expression unreachable. This appears to be a temporary placeholder that should be removed once the TODO is implemented.
sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/intern/RewriteInternedFields.java:1 - Corrected spelling of 'to' to 'do'.
sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustVisitor.java:1 - This loop adds a trailing comma after the last input. The comma should only be added between inputs, not after the last one.
...-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPStarJoinIndexOperator.java
Outdated
Show resolved
Hide resolved
|
This PR has been rebased on #5550 |
|
Claude finished @mihaibudiu's task in 2m 8s —— View job Star Join Implementation ReviewI've reviewed the PR changes implementing compiler support for star joins. Overall, this is a well-structured implementation that successfully introduces star join operators to the SQL compiler. Here's my detailed analysis: Todo List:
Architecture & Design ✅Strong Points:
Key Design Features:
Code Quality Review✅ Operator Implementations
✅ Visitor Pattern ExtensionsAll visitor classes properly extended:
✅ Optimization Support
Delta Expansion Analysis
// Delta expansion: d(A★B★C) = dA★I(B)★I(C) + I(A)★dB★I(C) + I(A)★I(B)★dC
for (int i = 0; i < inputs.size(); i++) {
List<OutputPort> joinInputs = new ArrayList<>(integratorOutputs);
joinInputs.set(i, inputs.get(i)); // Replace i-th integral with delta
// Create join with this combination
}This correctly implements the multivariate chain rule for N-way joins, ensuring incremental correctness. Issues & Recommendations
|
The macros that generate star join operators were not usable outside the DBSP crate, since they added new methods to the `Stream` type. We reimplement them to generate regular functions instead. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
…rs from trees of joins Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Benchmark star join vs a chain of binary joins. ``` --- Binary joins (chain of join_index + join) --- Time: 68.670540125s (25568562 output tuples) Total storage size: 1.05 GiB Per step: 6.867054012s --- Star join (inner_star_join5) --- Time: 24.710012208s (25568562 output tuples) Total storage size: 48.01 MiB Per step: 2.47100122s ``` Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
This fixes a perf issue in the join operator and a similar but more impactful issue in `star_join`. When iterating over the trace cursor, we use map_times to enumerate all (time, weight) tuples associated with a value. Instead of computing their total weight and evaluating the join function once for the entire value, we evaluated the join function every time. When doing this in the context of a star join, we had to also enumerate all other trace (time, weight) tuples, leading to an exponential (in the number of joins) blow up in the amount of work per value if the trace contains the same value with different weights in multiple batches. This is probably not very common in practice (except for the case where the weights add up to 0, which we don't need to worry about here), but when it happens it's bad. The implementation introduces a new cursor method `weight_checked`, which is equiavalent, but more ergonomic and efficient than: ```rust let mut total_w = 0; cursor.map_times(|_t, w| total_w += **w); ``` Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
The star join operator failed to check for out-of-bound cursor. Coincidentally, this did not affect anything, but the issue was revealed by debug checks added by the previous commit. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Star joins are a new flavor of joins introduced in #5517
This PR adds compiler support for star joins.
They are used currently to combine the results of multiple aggregates over the same input collection. Prior to this PR a balanced tree of joins was synthesized.
Currently this PR converts the star joins at the end of compilation into trees of standard (incremental) joins, so it works even without support from DBSP. Once support for DBSP is merged, this conversion will be removed when rebasing on main.
This PR enables us to test that all other compiler transformations work correctly with star join operators.
I have introduced only star_join_index and star_join for now; I will add support for star_join_flatmap as well.