Skip to content

pipeline-manager: pipeline monitoring events#5446

Open
snkas wants to merge 2 commits intomainfrom
pipeline-monitoring-events
Open

pipeline-manager: pipeline monitoring events#5446
snkas wants to merge 2 commits intomainfrom
pipeline-monitoring-events

Conversation

@snkas
Copy link
Contributor

@snkas snkas commented Jan 16, 2026

Both at a regular interval and whenever the status of the pipeline changes, store this as an event in the database. At most 720 events are stored for each pipeline. If there are no changes, every 10 minutes it stores a duplicate event. If there are only changes to the details, it only updates it every 10 seconds. As such, the retained events span approximately between 2 hours and 5 days.

API endpoints:

  • GET /v0/pipelines/<pipeline>/events
  • GET /v0/pipelines/<pipeline>/event/latest?selector=all/status
  • GET /v0/pipelines/<pipeline>/event/<event-id>?selector=all/status

The clients have been updated with corresponding functionality:

  • CLI:

    • fda events <pipeline>
    • fda event <pipeline> latest <selector>
    • fda event <pipeline> <event-id> <selector>
  • Python:

    • pipeline.events()
    • pipeline.event("latest", selector)
    • pipeline.event(event_id, selector)

Remaining tasks

  • More extensive API documentation
  • Use JOINs in the queries
  • Python tests
  • Database tests
  • Update documentation
  • Update changelog
  • Python client
  • fda client
  • Downstream adaptation if needed
  • Storage status extension
  • Pipeline tracking endpoint

PR information

  • No breaking changes

@snkas snkas force-pushed the pipeline-monitoring-events branch from 489a6a1 to c998ea7 Compare January 19, 2026 18:06
@snkas snkas force-pushed the pipeline-monitoring-events branch 2 times, most recently from fd696d8 to 5fcc887 Compare January 26, 2026 14:59
@snkas snkas force-pushed the pipeline-monitoring-events branch 3 times, most recently from 89e3b42 to aeb8b34 Compare February 3, 2026 11:18
@snkas snkas marked this pull request as ready for review February 3, 2026 11:18
Copilot AI review requested due to automatic review settings February 3, 2026 11:18
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds pipeline monitoring events to the backend and exposes them via new REST endpoints, along with Python/FDA client support and related tests/docs.

Changes:

  • Added /v0/pipelines/{pipeline_name}/events and /v0/pipelines/{pipeline_name}/events/{event_id} endpoints plus OpenAPI schema updates.
  • Implemented persistence, retention, and retrieval of pipeline monitor events in pipeline-manager (Postgres + model DB for proptests).
  • Added Python tests/client methods and FDA CLI commands, plus documentation/changelog updates.

Reviewed changes

Copilot reviewed 24 out of 24 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
python/tests/platform/test_pipeline_events.py Adds end-to-end tests for listing/getting pipeline events via REST and Python SDK.
python/tests/platform/helper.py Adds helper wrappers to call the new events endpoints.
python/feldera/rest/feldera_client.py Adds Python client methods to list/get pipeline events.
python/feldera/pipeline.py Exposes new Pipeline.events() / Pipeline.event() convenience methods.
openapi.json Documents the new endpoints and schemas in OpenAPI.
docs.feldera.com/docs/tutorials/rest_api/pipeline-monitoring.md Adds user-facing REST tutorial for pipeline monitoring events.
docs.feldera.com/docs/changelog.md Notes the new pipeline monitoring events feature.
crates/pipeline-manager/src/runner/pipeline_automata.rs Adds a max interval to periodically persist events even without changes.
crates/pipeline-manager/src/db/types/monitor.rs Introduces pipeline monitor event types and IDs.
crates/pipeline-manager/src/db/test.rs Extends DB model/proptests to cover pipeline event operations.
crates/pipeline-manager/src/db/storage_postgres.rs Implements storage methods for pipeline events in Postgres.
crates/pipeline-manager/src/db/storage.rs Extends the storage trait with pipeline event methods.
crates/pipeline-manager/src/db/operations/pipeline_monitor.rs Adds SQL ops for creating/listing/getting pipeline monitor events + retention enforcement.
crates/pipeline-manager/src/db/operations/pipeline.rs Emits pipeline monitor events on pipeline state changes; exposes helper for monitoring lookup.
crates/pipeline-manager/src/db/operations.rs Registers the new pipeline_monitor operations module.
crates/pipeline-manager/src/db/error.rs Adds DB errors for missing pipeline monitor events.
crates/pipeline-manager/src/api/main.rs Wires up new endpoints + schemas into the API server and OpenAPI generation.
crates/pipeline-manager/src/api/endpoints/pipeline_management/pipeline_events.rs Implements REST endpoints + selector logic for pipeline events.
crates/pipeline-manager/src/api/endpoints/pipeline_management.rs Exposes the new pipeline_events module.
crates/pipeline-manager/proptest-regressions/db/test.txt Updates proptest regression cases due to new behaviors.
crates/pipeline-manager/migrations/V32__pipeline_monitor_event.sql Adds the pipeline_monitor_event table migration.
crates/fda/test.bash Adds smoke coverage for new fda events / fda event commands.
crates/fda/src/main.rs Implements the new FDA CLI behaviors for listing/getting events.
crates/fda/src/cli.rs Adds CLI subcommands/args for pipeline events.

@snkas snkas force-pushed the pipeline-monitoring-events branch 2 times, most recently from ecd558e to 1c90b91 Compare February 3, 2026 11:56
Both at a regular interval and whenever the status of the pipeline
changes, store this as an event in the database. At most 720 events are
stored for each pipeline. If there are no changes, every 10 minutes it
stores a duplicate event. If there are only changes to the details, it
only updates it every 10 seconds. As such, the retained events span
approximately between 2 hours and 5 days.

API endpoints:
- `GET /v0/pipelines/<pipeline>/events`
- `GET /v0/pipelines/<pipeline>/event/latest?selector=all/status`
- `GET /v0/pipelines/<pipeline>/event/<event-id>?selector=all/status`

The clients have been updated with corresponding functionality:

- CLI:
  - `fda events <pipeline>`
  - `fda event <pipeline> latest <selector>`
  - `fda event <pipeline> <event-id> <selector>`

- Python:
  - `pipeline.events()`
  - `pipeline.event("latest", selector)`
  - `pipeline.event(event_id, selector)`

Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
@snkas snkas force-pushed the pipeline-monitoring-events branch from 1c90b91 to 8959912 Compare February 3, 2026 14:24
@snkas snkas requested a review from gz February 3, 2026 16:58
@snkas
Copy link
Contributor Author

snkas commented Feb 3, 2026

Passed CI and is ready for review!

@Karakatiza666
Copy link
Contributor

image

Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
@Karakatiza666 Karakatiza666 force-pushed the pipeline-monitoring-events branch from 3c3a278 to 431c197 Compare February 5, 2026 21:17
@mihaibudiu
Copy link
Contributor

Do we review one or both commits?

@Karakatiza666
Copy link
Contributor

I suggest you review my commit only. Gerd would review Simon's commit

@Karakatiza666
Copy link
Contributor

Simon, do we want to register pipeline startup as an incident (orange, shows up in incidents list)? See getEventClassification() for the current mapping of events.

@snkas
Copy link
Contributor Author

snkas commented Feb 6, 2026

This PR will also have the storage status and pipeline tracking endpoint features added to it, as they are more coupled than initially expected. I'll update once the back-end support for them is in.

Copy link
Contributor

@gz gz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@@ -0,0 +1,16 @@
-- Regularly the status of the pipeline is observed and stored in this table.
-- The rows in this table are regularly cleaned up to prevent it growing unbound.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unbounded

Pipeline monitoring: Feldera now monitors each pipeline health and stores these as events
in the database. They are exposed via `/v0/pipelines/[pipeline]/events` and further details
of a specific event can be retrieved via `/v0/pipelines/[pipeline]/events/[<id>|latest]`.
All API clients support these endpoints. The Web Console will soon expose these
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this comment has to be updated given the second commit

/// updated in the database, unless the status itself changed.
const DETAILS_UPDATE_MIN_INTERVAL: Duration = Duration::from_secs(10);

/// If nothing changes, this will create the same event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment.

/// the `GET /v0/pipelines/<pipeline>/events/<event-id>` endpoint.
///
/// Pipeline monitor events are collected at a periodic interval (every 10s), however only
/// every 10 minutes or if the overall health changes, does it get inserted into the database
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it -> it is

use uuid::Uuid;

/// All pipeline monitor event columns.
const RETRIEVE_EXTENDED_PIPELINE_EVENT_COLUMNS: &str = "e.id, e.recorded_at,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is "e."?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not in an array?

result = [v[0]]
prev = v[0]
for i in range(1, len(v)):
if v[i] != prev:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does != do what you want for dict values? It may depend on the type of dict entries... Maybe you can document the restrictions on the dict type.

prev_event = event

# Test deduplication function
assert remove_consecutive_duplicates([]) == []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From now on this looks like it should be a separate test

const api = usePipelineManager()
let events: PipelineMonitorEventSelectedInfo[] | null = $state(null)

useInterval(async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do?
does this run forever?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This runs every 60 seconds while the pipeline health tab is open

selectedEvent = null
selectedEventTimestamp = null
}}
onNavigatePrevious={canNavigatePrevious ? navigatePrevious : undefined}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the events for previous/next? how are they triggered?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These events are triggered when the user wants to navigate between adjacent incidents/hour slots (depending on what they have selected in the UI). User can navigate by pressing left and right arrow UI or keyboard buttons

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this documented?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UI in general is not yet documented, unfortunately.
As Lalith mentioned, the UX should be intuitive; the UI buttons are well-placed, and if the user tries to press the keyboard buttons intuitively - it'll work.

}))
.with({ resources_status: 'Provisioned', runtime_status: 'Coordination' }, () => ({
type: 'unhealthy',
description: 'Pipeline is coordinating.'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description sounds a bit vague.
I wonder whether the descriptions could have links toward some documentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simon will likely be doing a pass on the messaging for state descriptions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants