Work Items

Mark an array input for_each: true and the engine expands it into one work item per element, processed in parallel. Covers Cartesian product over multiple for_each inputs, parallelism control, output aggregation, and partial failure.

What Are Work Items?

When a Step has an input marked for_each: true and that input is an array, the engine expands it into one Work Item per element. Each Work Item executes independently with its own receipt token.

Basic Example

{
  "id": "process-item",
  "name": "Process Item",
  "type": "sync",
  "http": { "endpoint": "https://api.example.com/items/process", "timeout": 5000 },
  "attributes": {
    "items": {
      "role": "required",
      "type": "array",
      "required": { "for_each": true }
    },
    "processed": { "role": "output", "type": "number" }
  }
}

Flow init:

{ "items": [[ {"id": "item-1", "value": 10}, {"id": "item-2", "value": 20} ]] }

The engine creates two Work Items and calls the handler twice, once per element.

Cartesian Product

Multiple for_each Attributes produce the Cartesian product:

{
  "attributes": {
    "regions": { "role": "required", "type": "array", "required": { "for_each": true } },
    "products": { "role": "required", "type": "array", "required": { "for_each": true } }
  }
}

With regions: ["US", "EU"] and products: ["A", "B"], the engine creates 4 Work Items: US×A, US×B, EU×A, EU×B.

Parallelism

By default Work Items execute sequentially (parallelism defaults to 1). Use work_config to run them concurrently:

{
  "work_config": {
    "parallelism": 5,
    "max_retries": 3,
    "init_backoff": 100,
    "max_backoff": 5000,
    "backoff_type": "exponential"
  }
}

Match parallelism to your downstream service’s capacity. For I/O-bound Steps 10–50 is typical; for CPU-bound Steps keep it low.

Output Aggregation

Step outputs are only available once all Work Items complete. Each aggregated output is an array, where each element includes the for_each input values alongside the output:

Work item 1 (item-1): processed = 100
Work item 2 (item-2): processed = 200

Aggregated "processed" attribute:
[
  { "items": {"id": "item-1", "value": 10}, "processed": 100 },
  { "items": {"id": "item-2", "value": 20}, "processed": 200 }
]
graph TD Array["items: [A, B, C]"] WI1["Work Item: A"] WI2["Work Item: B"] WI3["Work Item: C"] Agg["Aggregated output"] Array --> WI1 Array --> WI2 Array --> WI3 WI1 --> Agg WI2 --> Agg WI3 --> Agg

Idempotency

Each Work Item carries a unique Argyll-Receipt-Token header. The engine ignores duplicate completions for the same token (returning 200 either way), so your handler can safely be retried.

Partial Failure

If any Work Item fails permanently, the Step fails immediately. Pending Work Items that have not yet started are abandoned; in-flight Work Items may still complete but their results are discarded.

Interaction with Predicates

If a Step has both a Predicate and for_each, the Predicate is evaluated before initial scheduling and again before each pending or retry Work Item starts. A false Predicate prevents that work from running.