Conversation
489a6a1 to
c998ea7
Compare
fd696d8 to
5fcc887
Compare
89e3b42 to
aeb8b34
Compare
There was a problem hiding this comment.
Pull request overview
Adds pipeline monitoring events to the backend and exposes them via new REST endpoints, along with Python/FDA client support and related tests/docs.
Changes:
- Added
/v0/pipelines/{pipeline_name}/eventsand/v0/pipelines/{pipeline_name}/events/{event_id}endpoints plus OpenAPI schema updates. - Implemented persistence, retention, and retrieval of pipeline monitor events in pipeline-manager (Postgres + model DB for proptests).
- Added Python tests/client methods and FDA CLI commands, plus documentation/changelog updates.
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| python/tests/platform/test_pipeline_events.py | Adds end-to-end tests for listing/getting pipeline events via REST and Python SDK. |
| python/tests/platform/helper.py | Adds helper wrappers to call the new events endpoints. |
| python/feldera/rest/feldera_client.py | Adds Python client methods to list/get pipeline events. |
| python/feldera/pipeline.py | Exposes new Pipeline.events() / Pipeline.event() convenience methods. |
| openapi.json | Documents the new endpoints and schemas in OpenAPI. |
| docs.feldera.com/docs/tutorials/rest_api/pipeline-monitoring.md | Adds user-facing REST tutorial for pipeline monitoring events. |
| docs.feldera.com/docs/changelog.md | Notes the new pipeline monitoring events feature. |
| crates/pipeline-manager/src/runner/pipeline_automata.rs | Adds a max interval to periodically persist events even without changes. |
| crates/pipeline-manager/src/db/types/monitor.rs | Introduces pipeline monitor event types and IDs. |
| crates/pipeline-manager/src/db/test.rs | Extends DB model/proptests to cover pipeline event operations. |
| crates/pipeline-manager/src/db/storage_postgres.rs | Implements storage methods for pipeline events in Postgres. |
| crates/pipeline-manager/src/db/storage.rs | Extends the storage trait with pipeline event methods. |
| crates/pipeline-manager/src/db/operations/pipeline_monitor.rs | Adds SQL ops for creating/listing/getting pipeline monitor events + retention enforcement. |
| crates/pipeline-manager/src/db/operations/pipeline.rs | Emits pipeline monitor events on pipeline state changes; exposes helper for monitoring lookup. |
| crates/pipeline-manager/src/db/operations.rs | Registers the new pipeline_monitor operations module. |
| crates/pipeline-manager/src/db/error.rs | Adds DB errors for missing pipeline monitor events. |
| crates/pipeline-manager/src/api/main.rs | Wires up new endpoints + schemas into the API server and OpenAPI generation. |
| crates/pipeline-manager/src/api/endpoints/pipeline_management/pipeline_events.rs | Implements REST endpoints + selector logic for pipeline events. |
| crates/pipeline-manager/src/api/endpoints/pipeline_management.rs | Exposes the new pipeline_events module. |
| crates/pipeline-manager/proptest-regressions/db/test.txt | Updates proptest regression cases due to new behaviors. |
| crates/pipeline-manager/migrations/V32__pipeline_monitor_event.sql | Adds the pipeline_monitor_event table migration. |
| crates/fda/test.bash | Adds smoke coverage for new fda events / fda event commands. |
| crates/fda/src/main.rs | Implements the new FDA CLI behaviors for listing/getting events. |
| crates/fda/src/cli.rs | Adds CLI subcommands/args for pipeline events. |
ecd558e to
1c90b91
Compare
Both at a regular interval and whenever the status of the pipeline
changes, store this as an event in the database. At most 720 events are
stored for each pipeline. If there are no changes, every 10 minutes it
stores a duplicate event. If there are only changes to the details, it
only updates it every 10 seconds. As such, the retained events span
approximately between 2 hours and 5 days.
API endpoints:
- `GET /v0/pipelines/<pipeline>/events`
- `GET /v0/pipelines/<pipeline>/event/latest?selector=all/status`
- `GET /v0/pipelines/<pipeline>/event/<event-id>?selector=all/status`
The clients have been updated with corresponding functionality:
- CLI:
- `fda events <pipeline>`
- `fda event <pipeline> latest <selector>`
- `fda event <pipeline> <event-id> <selector>`
- Python:
- `pipeline.events()`
- `pipeline.event("latest", selector)`
- `pipeline.event(event_id, selector)`
Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1c90b91 to
8959912
Compare
|
Passed CI and is ready for review! |
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
3c3a278 to
431c197
Compare
|
Do we review one or both commits? |
|
I suggest you review my commit only. Gerd would review Simon's commit |
|
Simon, do we want to register pipeline startup as an incident (orange, shows up in incidents list)? See getEventClassification() for the current mapping of events. |
|
This PR will also have the storage status and pipeline tracking endpoint features added to it, as they are more coupled than initially expected. I'll update once the back-end support for them is in. |
| @@ -0,0 +1,16 @@ | |||
| -- Regularly the status of the pipeline is observed and stored in this table. | |||
| -- The rows in this table are regularly cleaned up to prevent it growing unbound. | |||
| Pipeline monitoring: Feldera now monitors each pipeline health and stores these as events | ||
| in the database. They are exposed via `/v0/pipelines/[pipeline]/events` and further details | ||
| of a specific event can be retrieved via `/v0/pipelines/[pipeline]/events/[<id>|latest]`. | ||
| All API clients support these endpoints. The Web Console will soon expose these |
There was a problem hiding this comment.
maybe this comment has to be updated given the second commit
| /// updated in the database, unless the status itself changed. | ||
| const DETAILS_UPDATE_MIN_INTERVAL: Duration = Duration::from_secs(10); | ||
|
|
||
| /// If nothing changes, this will create the same event. |
There was a problem hiding this comment.
I don't understand this comment.
| /// the `GET /v0/pipelines/<pipeline>/events/<event-id>` endpoint. | ||
| /// | ||
| /// Pipeline monitor events are collected at a periodic interval (every 10s), however only | ||
| /// every 10 minutes or if the overall health changes, does it get inserted into the database |
| use uuid::Uuid; | ||
|
|
||
| /// All pipeline monitor event columns. | ||
| const RETRIEVE_EXTENDED_PIPELINE_EVENT_COLUMNS: &str = "e.id, e.recorded_at, |
| result = [v[0]] | ||
| prev = v[0] | ||
| for i in range(1, len(v)): | ||
| if v[i] != prev: |
There was a problem hiding this comment.
does != do what you want for dict values? It may depend on the type of dict entries... Maybe you can document the restrictions on the dict type.
| prev_event = event | ||
|
|
||
| # Test deduplication function | ||
| assert remove_consecutive_duplicates([]) == [] |
There was a problem hiding this comment.
From now on this looks like it should be a separate test
| const api = usePipelineManager() | ||
| let events: PipelineMonitorEventSelectedInfo[] | null = $state(null) | ||
|
|
||
| useInterval(async () => { |
There was a problem hiding this comment.
what does this do?
does this run forever?
There was a problem hiding this comment.
This runs every 60 seconds while the pipeline health tab is open
| selectedEvent = null | ||
| selectedEventTimestamp = null | ||
| }} | ||
| onNavigatePrevious={canNavigatePrevious ? navigatePrevious : undefined} |
There was a problem hiding this comment.
what are the events for previous/next? how are they triggered?
There was a problem hiding this comment.
These events are triggered when the user wants to navigate between adjacent incidents/hour slots (depending on what they have selected in the UI). User can navigate by pressing left and right arrow UI or keyboard buttons
There was a problem hiding this comment.
Where is this documented?
There was a problem hiding this comment.
The UI in general is not yet documented, unfortunately.
As Lalith mentioned, the UX should be intuitive; the UI buttons are well-placed, and if the user tries to press the keyboard buttons intuitively - it'll work.
| })) | ||
| .with({ resources_status: 'Provisioned', runtime_status: 'Coordination' }, () => ({ | ||
| type: 'unhealthy', | ||
| description: 'Pipeline is coordinating.' |
There was a problem hiding this comment.
This description sounds a bit vague.
I wonder whether the descriptions could have links toward some documentation.
There was a problem hiding this comment.
Simon will likely be doing a pass on the messaging for state descriptions

Both at a regular interval and whenever the status of the pipeline changes, store this as an event in the database. At most 720 events are stored for each pipeline. If there are no changes, every 10 minutes it stores a duplicate event. If there are only changes to the details, it only updates it every 10 seconds. As such, the retained events span approximately between 2 hours and 5 days.
API endpoints:
GET /v0/pipelines/<pipeline>/eventsGET /v0/pipelines/<pipeline>/event/latest?selector=all/statusGET /v0/pipelines/<pipeline>/event/<event-id>?selector=all/statusThe clients have been updated with corresponding functionality:
CLI:
fda events <pipeline>fda event <pipeline> latest <selector>fda event <pipeline> <event-id> <selector>Python:
pipeline.events()pipeline.event("latest", selector)pipeline.event(event_id, selector)Remaining tasks
fdaclientPR information