WebSocket API

Subscribe to live events for the catalog, the cluster, or any specific Flow. Includes the subscription protocol, the full event-type catalog, and how include_state delivers the projected state before live events.

Connection

ws://localhost:8080/engine/ws

A connection starts idle. Send a subscribe message to receive events.

Subscribe

{
  "type": "subscribe",
  "data": {
    "sub_id": "flow-detail",
    "aggregate_ids": [["flow", "wf-123"]],
    "event_types": ["flow_started", "step_completed", "flow_completed"],
    "include_state": true
  }
}
FieldDescription
sub_idRequired client-defined subscription ID
aggregate_idsArray of aggregate ID paths to follow
event_typesOptional event type filter
include_stateIf true, server first sends the current projected state, then streams live events

Aggregate IDs

PathMeaning
["catalog"]Step registry (registrations, updates)
["cluster"]Cluster state (per-step health across nodes)
["flow", "flow-id"]One flow’s execution events

Subscribed Response

When include_state: true, the server sends current state before live events:

{
  "type": "subscribed",
  "sub_id": "flow-detail",
  "items": [
    {
      "id": ["flow", "wf-123"],
      "data": { "id": "wf-123", "status": "active" },
      "sequence": 42
    }
  ]
}

Event Messages

{
  "type": "flow_started",
  "data": {
    "flow_id": "wf-123",
    "init": { "customer_id": ["cust-456"] },
    "plan": { "...": "..." },
    "labels": { "customer": "cust-456" }
  },
  "id": ["flow", "wf-123"],
  "sub_id": "flow-detail",
  "timestamp": 1704067425000,
  "sequence": 42
}

Event Types

Catalog (aggregate ["catalog"])

step_registered, step_unregistered, step_updated

Cluster (aggregate ["cluster"])

step_health_changed

Flow (aggregate ["flow", "flow-id"])

flow_started, flow_completed, flow_failed, flow_deactivated, step_started, step_completed, step_failed, step_skipped, attribute_set, work_started, work_succeeded, work_failed, work_not_completed, retry_scheduled, dispatch_deferred, comp_started, comp_succeeded, comp_failed, comp_retry_scheduled

Unsubscribe

{
  "type": "unsubscribe",
  "data": { "sub_id": "flow-detail" }
}

Reconnection

On disconnect, reconnect and resubscribe with the same filters. The engine deduplicates the initial state batch by sequence, so events older than the projected sequence are suppressed.