Python SDK
Register Steps, serve sync or async handlers, and start Flows from Python. The fluent builder mirrors the engine API, with typed enums for attribute roles, script languages, and backoff strategies.
Installation
pip install argyll-sdk
Client
from argyll import Client
client = Client("http://localhost:8080")
Register a Script Step
from argyll import Client, AttributeType
client = Client("http://localhost:8080")
client.new_step() \
.with_id("price-calculator") \
.with_name("Price Calculator") \
.required("quantity", AttributeType.NUMBER) \
.required("unit_price", AttributeType.NUMBER) \
.output("total", AttributeType.NUMBER) \
.with_script("{:total (* quantity unit_price)}") \
.register()
with_script defaults to Ale. Use with_script_language(ScriptLanguage.LUA, ...) for Lua or with_script_language(ScriptLanguage.JPATH, ...) for JPath.
Serve a Sync Step
start() registers the step and runs an HTTP server that handles invocations:
from argyll import Client, StepContext, AttributeType
client = Client("http://localhost:8080")
def handle_greeting(ctx: StepContext, args: dict) -> dict:
name = args.get("name", "World")
return {"greeting": f"Hello, {name}!"}
client.new_step() \
.with_id("greet") \
.with_name("Greet") \
.required("name", AttributeType.STRING) \
.output("greeting", AttributeType.STRING) \
.start(handle_greeting)
Serve an Async Step
import threading
from argyll import AsyncContext, AttributeType, Client, StepContext
client = Client("http://localhost:8080")
def handle_process(ctx: StepContext, args: dict) -> dict:
async_ctx = AsyncContext(context=ctx, webhook_url=ctx.metadata["webhook_url"])
def background():
result = do_long_work(args)
async_ctx.success({"result": result})
threading.Thread(target=background).start()
return {} # return immediately
client.new_step() \
.with_id("process") \
.with_name("Process") \
.with_async_execution() \
.required("input", AttributeType.OBJECT) \
.output("result", AttributeType.OBJECT) \
.start(handle_process)
Serve a Step with Compensation
Use with_compensate_handler to register a handler that undoes a completed work item. start() auto-generates the compensate URL:
def handle_charge(ctx: StepContext, args: dict) -> dict:
charge_id = process_charge(args["amount"])
return {"charge_id": charge_id}
def handle_compensate(ctx: StepContext, inputs: dict, outputs: dict) -> None:
refund_charge(outputs["charge_id"])
client.new_step() \
.with_id("charge-card") \
.with_name("Charge Card") \
.required("amount", AttributeType.NUMBER) \
.output("charge_id", AttributeType.STRING) \
.with_compensate_handler(handle_compensate) \
.start(handle_charge)
The compensate handler receives the original inputs and outputs for the work item being undone. Raise WorkNotCompletedError for a transient failure (triggers retry), or any other exception for a permanent failure.
Builder Methods
Identity
with_id(id),with_name(name),with_label(k, v),with_labels(labels)
Attributes
required(name, type): required inputoptional(name, type, default): optional input with default valueconst(name, type, value): constant — a fixed value baked into the step definitionmeta(name, meta_key): metadata — injects a named metadata field (e.g.flow_id,webhook_url) at execution timeoutput(name, type): output attributewith_for_each(name): mark an array input for parallel expansion
Execution Type
with_sync_execution(),with_async_execution()with_script(...)implicitly makes the step a script step
HTTP
with_endpoint(url),with_method(method),with_timeout(ms),with_health_check(url)with_compensate(url): set the compensate endpoint URLwith_compensate_handler(handler): register a compensation handler (auto-generates the URL when used withstart())
Script
with_script(ale),with_script_language(lang, script)— lang:ScriptLanguage.ALE,ScriptLanguage.LUA,ScriptLanguage.JPATH
Predicate
with_predicate(lang, script)
Behavior
with_memoizable(),with_flow_goals(*goals)(for flow-type steps)
Lifecycle
register(): register the step definition onlyupdate(): mark for update on nextstart()start(handler): register and serve
List Steps
steps = client.list_steps()
Start a Flow
Use with_goals(*ids) to set all goals at once, or with_goal(id) to add one at a time:
from argyll import Client
client = Client("http://localhost:8080")
client.new_flow("order-123") \
.with_goals("send-confirmation") \
.with_initial_state({
"customer_id": ["cust-456"],
"order_amount": [99.99],
}) \
.start()
Initial values are arrays. Wrap each value in a list.
Query Flow State
flow = client.flow("order-123")
state = flow.get_state()
Imports
from argyll import (
Client, StepBuilder, FlowBuilder, FlowClient,
StepContext, AsyncContext,
AttributeType, AttributeRole, InputCollect,
BackoffType, ScriptLanguage,
)
from argyll.errors import (
ArgyllError, ClientError, HTTPError, StepRegistrationError,
StepValidationError, FlowError, WebhookError,
)