Skip to content

[SPARK-55510][SS] Fix deleteRange of Rocksdb state store to call changelogWriter#54298

Open
zeruibao wants to merge 3 commits intoapache:masterfrom
zeruibao:zeruibao/SPARK-55510-fix-delete-range-to-call-changlog-writer
Open

[SPARK-55510][SS] Fix deleteRange of Rocksdb state store to call changelogWriter#54298
zeruibao wants to merge 3 commits intoapache:masterfrom
zeruibao:zeruibao/SPARK-55510-fix-delete-range-to-call-changlog-writer

Conversation

@zeruibao
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds changelog writer support for deleteRange in the RocksDB state store. Previously, deleteRange only performed the RocksDB native range deletion but did not record the operation in the changelog file. The changes include:

  • Added a new DELETE_RANGE_RECORD record type (byte 0x20) to the RecordType enum in StateStoreChangelog.scala
  • Added an abstract deleteRange(beginKey, endKey) method to StateStoreChangelogWriter, implemented in V2/V4 writers (V1/V3 throw UnsupportedOperationException, consistent with merge)
  • Updated StateStoreChangelogReaderV2 to parse DELETE_RANGE_RECORD entries
  • Updated RocksDB.deleteRange to write to the changelog after the native db.deleteRange call, with an includesPrefix parameter for replay correctness
  • Updated RocksDB.replayChangelog to handle DELETE_RANGE_RECORD by calling deleteRange during recovery
  • Updated RocksDBStateStoreChangeDataReader to skip DELETE_RANGE_RECORD with a warning, since range deletions cannot be expanded into individual key-value change records
  • Added a test verifying that deleteRange is properly recorded and replayed via changelog checkpointing

Why are the changes needed?

When changelog checkpointing is enabled, the state store recovers by replaying changelog files rather than loading full snapshots. Since deleteRange was not recorded in the changelog, any range deletions were silently lost during changelog-based recovery, leading to data inconsistency -- keys that should have been deleted would reappear after a restart.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

Was this patch authored or co-authored using generative AI tooling?

Yes, co-authored with Cursor

Copy link
Contributor

@liviazhu liviazhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too familiar with RocksDBStateStoreChangeDataReader, can you get @eason-yuchen-liu 's review?

}
val nextRecord = reader.next()
if (nextRecord._1 == RecordType.DELETE_RANGE_RECORD) {
logWarning(log"Skipping DELETE_RANGE_RECORD in state change data feed " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if a warning is enough. Should we not fail the query?

Copy link
Contributor Author

@zeruibao zeruibao Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have preference. you guys can make a call.

@zeruibao zeruibao requested a review from liviazhu February 14, 2026 00:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants