Python SDK

Register Steps, serve sync or async handlers, and start Flows from Python. The fluent builder mirrors the engine API, with typed enums for attribute roles, script languages, and backoff strategies.

Installation

pip install argyll-sdk

Client

from argyll import Client

client = Client("http://localhost:8080")

Register a Script Step

from argyll import Client, AttributeType

client = Client("http://localhost:8080")

client.new_step() \
    .with_id("price-calculator") \
    .with_name("Price Calculator") \
    .required("quantity", AttributeType.NUMBER) \
    .required("unit_price", AttributeType.NUMBER) \
    .output("total", AttributeType.NUMBER) \
    .with_script("{:total (* quantity unit_price)}") \
    .register()

with_script defaults to Ale. Use with_script_language(ScriptLanguage.LUA, ...) for Lua or with_script_language(ScriptLanguage.JPATH, ...) for JPath.

Serve a Sync Step

start() registers the step and runs an HTTP server that handles invocations:

from argyll import Client, StepContext, AttributeType

client = Client("http://localhost:8080")

def handle_greeting(ctx: StepContext, args: dict) -> dict:
    name = args.get("name", "World")
    return {"greeting": f"Hello, {name}!"}

client.new_step() \
    .with_id("greet") \
    .with_name("Greet") \
    .required("name", AttributeType.STRING) \
    .output("greeting", AttributeType.STRING) \
    .start(handle_greeting)

Serve an Async Step

import threading
from argyll import AsyncContext, AttributeType, Client, StepContext

client = Client("http://localhost:8080")

def handle_process(ctx: StepContext, args: dict) -> dict:
    async_ctx = AsyncContext(context=ctx, webhook_url=ctx.metadata["webhook_url"])

    def background():
        result = do_long_work(args)
        async_ctx.success({"result": result})

    threading.Thread(target=background).start()
    return {}  # return immediately

client.new_step() \
    .with_id("process") \
    .with_name("Process") \
    .with_async_execution() \
    .required("input", AttributeType.OBJECT) \
    .output("result", AttributeType.OBJECT) \
    .start(handle_process)

Serve a Step with Compensation

Use with_compensate_handler to register a handler that undoes a completed work item. start() auto-generates the compensate URL:

def handle_charge(ctx: StepContext, args: dict) -> dict:
    charge_id = process_charge(args["amount"])
    return {"charge_id": charge_id}

def handle_compensate(ctx: StepContext, inputs: dict, outputs: dict) -> None:
    refund_charge(outputs["charge_id"])

client.new_step() \
    .with_id("charge-card") \
    .with_name("Charge Card") \
    .required("amount", AttributeType.NUMBER) \
    .output("charge_id", AttributeType.STRING) \
    .with_compensate_handler(handle_compensate) \
    .start(handle_charge)

The compensate handler receives the original inputs and outputs for the work item being undone. Raise WorkNotCompletedError for a transient failure (triggers retry), or any other exception for a permanent failure.

Builder Methods

Identity

  • with_id(id), with_name(name), with_label(k, v), with_labels(labels)

Attributes

  • required(name, type): required input
  • optional(name, type, default): optional input with default value
  • const(name, type, value): constant — a fixed value baked into the step definition
  • meta(name, meta_key): metadata — injects a named metadata field (e.g. flow_id, webhook_url) at execution time
  • output(name, type): output attribute
  • with_for_each(name): mark an array input for parallel expansion

Execution Type

  • with_sync_execution(), with_async_execution()
  • with_script(...) implicitly makes the step a script step

HTTP

  • with_endpoint(url), with_method(method), with_timeout(ms), with_health_check(url)
  • with_compensate(url): set the compensate endpoint URL
  • with_compensate_handler(handler): register a compensation handler (auto-generates the URL when used with start())

Script

  • with_script(ale), with_script_language(lang, script) — lang: ScriptLanguage.ALE, ScriptLanguage.LUA, ScriptLanguage.JPATH

Predicate

  • with_predicate(lang, script)

Behavior

  • with_memoizable() , with_flow_goals(*goals) (for flow-type steps)

Lifecycle

  • register(): register the step definition only
  • update(): mark for update on next start()
  • start(handler): register and serve

List Steps

steps = client.list_steps()

Start a Flow

Use with_goals(*ids) to set all goals at once, or with_goal(id) to add one at a time:

from argyll import Client

client = Client("http://localhost:8080")

client.new_flow("order-123") \
    .with_goals("send-confirmation") \
    .with_initial_state({
        "customer_id": ["cust-456"],
        "order_amount": [99.99],
    }) \
    .start()

Initial values are arrays. Wrap each value in a list.

Query Flow State

flow = client.flow("order-123")
state = flow.get_state()

Imports

from argyll import (
    Client, StepBuilder, FlowBuilder, FlowClient,
    StepContext, AsyncContext,
    AttributeType, AttributeRole, InputCollect,
    BackoffType, ScriptLanguage,
)
from argyll.errors import (
    ArgyllError, ClientError, HTTPError, StepRegistrationError,
    StepValidationError, FlowError, WebhookError,
)