WebSocket API
Subscribe to live events for the catalog, the cluster, or any specific Flow. Includes the subscription protocol, the full event-type catalog, and how include_state delivers the projected state before live events.
Connection
ws://localhost:8080/engine/ws
A connection starts idle. Send a subscribe message to receive events.
Subscribe
{
"type": "subscribe",
"data": {
"sub_id": "flow-detail",
"aggregate_ids": [["flow", "wf-123"]],
"event_types": ["flow_started", "step_completed", "flow_completed"],
"include_state": true
}
}
| Field | Description |
|---|---|
sub_id | Required client-defined subscription ID |
aggregate_ids | Array of aggregate ID paths to follow |
event_types | Optional event type filter |
include_state | If true, server first sends the current projected state, then streams live events |
Aggregate IDs
| Path | Meaning |
|---|---|
["catalog"] | Step registry (registrations, updates) |
["cluster"] | Cluster state (per-step health across nodes) |
["flow", "flow-id"] | One flow’s execution events |
Subscribed Response
When include_state: true, the server sends current state before live events:
{
"type": "subscribed",
"sub_id": "flow-detail",
"items": [
{
"id": ["flow", "wf-123"],
"data": { "id": "wf-123", "status": "active" },
"sequence": 42
}
]
}
Event Messages
{
"type": "flow_started",
"data": {
"flow_id": "wf-123",
"init": { "customer_id": ["cust-456"] },
"plan": { "...": "..." },
"labels": { "customer": "cust-456" }
},
"id": ["flow", "wf-123"],
"sub_id": "flow-detail",
"timestamp": 1704067425000,
"sequence": 42
}
Event Types
Catalog (aggregate ["catalog"])
step_registered, step_unregistered, step_updated
Cluster (aggregate ["cluster"])
step_health_changed
Flow (aggregate ["flow", "flow-id"])
flow_started, flow_completed, flow_failed, flow_deactivated, step_started, step_completed, step_failed, step_skipped, attribute_set, work_started, work_succeeded, work_failed, work_not_completed, retry_scheduled, dispatch_deferred, comp_started, comp_succeeded, comp_failed, comp_retry_scheduled
Unsubscribe
{
"type": "unsubscribe",
"data": { "sub_id": "flow-detail" }
}
Reconnection
On disconnect, reconnect and resubscribe with the same filters. The engine deduplicates the initial state batch by sequence, so events older than the projected sequence are suppressed.