Eight working xFlow recipes
Copy-pasteable code for the substrate-pluggable runtime. Quickstart, production xSync + S3WORM, browser + server multi-writer, Docker peer, multi-tenant definitions, federation with WDK, XState authoring, and migration from WDK directives.
Recipe 1
Quickstart with MemorySubstrate.
Smallest possible workflow run, no external services. Useful for tests, local prototyping, and convincing yourself the API works before you wire up xSync.
Install
Just core, runtime, substrate base, and the local node detector.
pnpm add \
@decoperations/xflow-core \
@decoperations/xflow-substrate \
@decoperations/xflow-runtime \
@decoperations/xflow-provider-localWorkflow + run
Two pure steps linked head-to-tail. The MemorySubstrate stores events in JS memory and computes the FlowRun via the reducer. No xSync, no S3, no servers.
// quickstart.ts
import {
defineWorkflow,
defineStep,
link,
} from "@decoperations/xflow-core"
import { MemorySubstrate } from "@decoperations/xflow-substrate"
import { createXFlowRuntime } from "@decoperations/xflow-runtime"
import { detectLocalNode } from "@decoperations/xflow-provider-local"
const workflow = defineWorkflow({
id: "demo.greet",
version: "1.0.0",
steps: {
upper: defineStep({
id: "upper",
type: "text.uppercase",
sideEffects: { kind: "pure", idempotencyRequired: false },
}),
greet: defineStep({
id: "greet",
type: "text.greet",
sideEffects: { kind: "pure", idempotencyRequired: false },
}),
},
links: [link("upper", "greet", { when: { type: "step.succeeded" } })],
})
const substrate = new MemorySubstrate({ id: "demo" })
const runtime = createXFlowRuntime({
node: detectLocalNode({ id: "main", capabilities: ["test"] }),
substrate,
})
runtime.register("text.uppercase", async () => "HELLO")
runtime.register("text.greet", async (_ctx, _input) => "👋 hello")
const run = await runtime.start({ workflow, input: "hello" })
for await (const snapshot of run.observe()) {
if (snapshot.status === "succeeded") {
console.log(snapshot.steps)
break
}
}Recipe 2
Production xSync substrate with S3WORM-backed storage.
Run state lives in your own bucket. S3WORM gives you content-addressed, append-only persistence; xSync gives you signed events, multi-writer log, and pluggable transports.
Install
pnpm add \
@decoperations/xflow-core \
@decoperations/xflow-substrate \
@decoperations/xflow-substrate-xsync \
@decoperations/xflow-runtime \
@decoperations/xflow-provider-local \
@decoperations/xflow-s3worm \
@decoperations/xsync-clientServer-side runtime
Wires xSync against an S3-compatible bucket (R2 / Storj / B2 / S3 itself), wraps it as a substrate, registers a Stripe charge executor with idempotency.
// server/runtime.ts (Node / Vercel function / Docker)
import { createXSync } from "@decoperations/xsync-client"
import { xflowS3WormStore } from "@decoperations/xflow-s3worm"
import { xsyncSubstrate, flowRunView } from "@decoperations/xflow-substrate-xsync"
import { createXFlowRuntime } from "@decoperations/xflow-runtime"
import { detectLocalNode } from "@decoperations/xflow-provider-local"
const store = xflowS3WormStore({
bucket: process.env.S3_BUCKET!,
region: process.env.S3_REGION ?? "us-east-1",
endpoint: process.env.S3_ENDPOINT, // R2 / Storj / B2 endpoint
accessKeyId: process.env.S3_ACCESS_KEY!,
secretAccessKey: process.env.S3_SECRET!,
rootPrefix: "tenants/acme",
})
const xsync = await createXSync({
store,
views: { flowRun: flowRunView },
})
const substrate = xsyncSubstrate({ client: xsync })
const runtime = createXFlowRuntime({
node: detectLocalNode({
id: `server-${process.env.HOSTNAME ?? "default"}`,
capabilities: ["server", "stripe", "email"],
authority: { canStartWorkflows: true, canIssueArtifacts: true },
}),
substrate,
})
runtime.register("billing.charge", async (ctx, input) => {
// ctx.idempotencyKey is stable across retries
return await stripe.charges.create({ ...input, idempotencyKey: ctx.idempotencyKey })
})
export { runtime, substrate, xsync }Next.js App Router handlers
Substrate-agnostic — pass any factory that returns a XFlowSubstrate per request.
// app/api/xflow/[...xflow]/route.ts
import { createXFlowRouteHandlers } from "@decoperations/xflow-next"
import { runtime, substrate } from "@/server/runtime"
import { processVideo } from "@/server/workflows/process-video"
import { onboardUser } from "@/server/workflows/onboard-user"
export const { POST, GET } = createXFlowRouteHandlers({
runtime,
substrate: () => substrate,
workflows: { [processVideo.id]: processVideo, [onboardUser.id]: onboardUser },
authorize: async (req) => {
const session = await getSession(req)
return Boolean(session)
},
})Recipe 3
Browser tab + server peers writing to the same run.
Browser handles the human-in-the-loop confirm step; the server handles draft and publish. Same workflow definition; placement decides who runs what.
Shared workflow definition
One file, three steps with different placement and claim policies. Authority-claimed server steps for irreversible side effects; optimistic-idempotent browser step for user input.
// workflows/review.ts — shared between browser and server
import { defineWorkflow, defineStep, link } from "@decoperations/xflow-core"
export const reviewWorkflow = defineWorkflow({
id: "review.flow",
version: "1.0.0",
steps: {
draft: defineStep({
id: "draft",
type: "review.draft",
placement: { required: ["server"] },
claim: { mode: "authority", authorityActorId: "server-prod" },
sideEffects: { kind: "external", idempotencyRequired: true },
}),
confirm: defineStep({
id: "confirm",
type: "review.confirm",
placement: { required: ["browser-tab"] },
claim: { mode: "optimistic-idempotent" },
sideEffects: { kind: "pure", idempotencyRequired: false },
}),
publish: defineStep({
id: "publish",
type: "review.publish",
placement: { required: ["server"] },
claim: { mode: "authority", authorityActorId: "server-prod" },
sideEffects: { kind: "irreversible", idempotencyRequired: true },
}),
},
links: [
link("draft", "confirm", { when: { type: "step.succeeded" } }),
link("confirm", "publish", { when: { type: "step.succeeded" } }),
],
})Browser-side peer
IndexedDB store, WebSocket transport, runtime registers a confirm executor. The browser is a real peer, not just a stream consumer — it writes durable signed events.
// app/runs/[runId]/page.tsx — browser-side peer
"use client"
import { useEffect, useState } from "react"
import { createXSync } from "@decoperations/xsync-client"
import { indexedDb } from "@decoperations/xsync-store-indexeddb"
import { websocket } from "@decoperations/xsync-transport-ws"
import {
xsyncSubstrate,
flowRunView,
} from "@decoperations/xflow-substrate-xsync"
import { createXFlowRuntime, type RunHandle } from "@decoperations/xflow-runtime"
import { detectLocalNode } from "@decoperations/xflow-provider-local"
import { useFlowRun, useStep } from "@decoperations/xflow-react"
import { reviewWorkflow } from "@/workflows/review"
export default function ReviewSession({ params }: { params: { runId: string } }) {
const [handle, setHandle] = useState<RunHandle | null>(null)
useEffect(() => {
let h: RunHandle | null = null
void (async () => {
const xsync = await createXSync({
store: indexedDb({ database: "xflow" }),
transports: [websocket({ url: `/api/xsync` })],
views: { flowRun: flowRunView },
})
const substrate = xsyncSubstrate({ client: xsync })
const runtime = createXFlowRuntime({
node: detectLocalNode({
id: `browser-${crypto.randomUUID()}`,
capabilities: ["browser-tab"],
}),
substrate,
})
// The browser registers an executor for an interactive review step.
// Server peers won't compete because placement excludes them.
runtime.register("review.confirm", async (ctx, input) => {
const decision = await window.prompt("Approve?")
return { approved: decision === "yes", input }
})
h = await runtime.start({ workflow: reviewWorkflow, runId: params.runId })
setHandle(h)
})()
return () => h?.stop()
}, [params.runId])
const run = useFlowRun(handle)
const reviewStep = useStep(handle, "review.confirm")
return (
<div>
<h1>Run {params.runId}</h1>
<p>Status: {run?.status ?? "loading"}</p>
<p>Review step: {reviewStep?.status ?? "—"}</p>
</div>
)
}Recipe 4
Docker / VPS peer for long-running compute.
ffmpeg, GPU inference, Playwright, large Rust builds — anything that doesn't fit in a serverless function. Lease-mode claims with TTL renewal.
Long-running worker
Filesystem store for the hot path, S3WORM for cold replication, WebSocket transport to the rest of the cluster. Two executors for media processing — both run as long as they need to.
// docker-worker/index.ts — runs in a Docker / VPS / Fly / Railway container
import { createXSync } from "@decoperations/xsync-client"
import { fsStore } from "@decoperations/xsync-store-fs"
import { xflowS3WormStore } from "@decoperations/xflow-s3worm"
import { websocket } from "@decoperations/xsync-transport-ws"
import {
xsyncSubstrate,
flowRunView,
} from "@decoperations/xflow-substrate-xsync"
import { createXFlowRuntime } from "@decoperations/xflow-runtime"
const xsync = await createXSync({
// Local FS for hot path; S3WORM for cold replication
store: fsStore({ path: "/data/xsync" }),
cold: xflowS3WormStore({
bucket: "xflow-prod",
region: "us-east-1",
rootPrefix: "tenants/acme",
}),
transports: [websocket({ url: process.env.XSYNC_HUB_URL! })],
views: { flowRun: flowRunView },
})
const substrate = xsyncSubstrate({ client: xsync })
const runtime = createXFlowRuntime({
node: {
id: `render-${process.env.HOSTNAME ?? "1"}`,
kind: "docker",
capabilities: ["docker", "ffmpeg-binary", "gpu"],
resources: {
cpuScore: 16,
memoryMb: 32_000,
longRunning: true,
background: true,
persistentStorage: true,
},
authority: {
canStartWorkflows: false,
canIssueArtifacts: true,
},
trust: { level: "trusted" },
status: { online: true },
},
substrate,
})
runtime.register("media.transcode", async (ctx, input: { sourceUrl: string }) => {
await ctx.progress({ phase: "downloading", url: input.sourceUrl })
const local = await downloadToTemp(input.sourceUrl)
await ctx.progress({ phase: "transcoding" })
const out = await runFfmpeg(local, "/tmp/out.mp4")
await ctx.progress({ phase: "uploading" })
return { kind: "artifact", ref: await uploadToR2(out) }
})
runtime.register("media.thumbnail", async (ctx, input: { videoUrl: string }) => {
// Long-running step. No 800s function cap because we're not on a function.
return await extractThumbnails(input.videoUrl, { count: 24 })
})
console.log(`docker peer ${runtime.registry.types().length} executors registered`)Recipe 5
Tenant-provided workflow definitions.
Definitions are JSON-serialisable data. Persist them per-tenant; load and run on demand. AI-generated workflows are the same shape — just data going through defineWorkflow().
Upload, persist, run
POST a definition; defineWorkflow validates it; store it; later load and call runtime.start() with it. No deploy, no SWC plugin, no separate product layer.
// app/api/tenants/[tenant]/workflows/route.ts
import { defineWorkflow, type WorkflowDefinition } from "@decoperations/xflow-core"
import { db } from "@/server/db"
export async function POST(req: Request, { params }: { params: { tenant: string } }) {
const body = (await req.json()) as WorkflowDefinition
// defineWorkflow validates structurally; throws on bad shape.
const def = defineWorkflow(body)
await db.tenantWorkflows.put({
tenantId: params.tenant,
workflowId: def.id,
version: def.version,
definition: def,
createdAt: new Date().toISOString(),
})
return Response.json({ id: def.id, version: def.version })
}
// app/api/tenants/[tenant]/runs/route.ts
import { runtime, substrate } from "@/server/runtime"
export async function POSTStartRun(req: Request, { params }: { params: { tenant: string } }) {
const { workflowId, input } = (await req.json()) as { workflowId: string; input: unknown }
const def = await db.tenantWorkflows.find(params.tenant, workflowId)
if (!def) return Response.json({ error: "not-found" }, { status: 404 })
const run = await runtime.start({ workflow: def.definition, input, substrate })
return Response.json({ runId: run.id })
}
// AI-generated workflow definitions are just data:
const generatedDef: WorkflowDefinition = await llm.generateWorkflow({
goal: "summarise PDFs and email the user",
})
await db.tenantWorkflows.put({
tenantId: "acme",
workflowId: generatedDef.id,
version: generatedDef.version,
definition: generatedDef,
createdAt: new Date().toISOString(),
})Recipe 6
Federate with Workflow SDK (WDK as one executor of many).
xFlow owns the outer run, the audit log, and multi-peer coordination. A specific step's executor delegates to a WDK workflow when you want WDK's deterministic-replay semantics for that part of the work.
xFlow step → WDK workflow
No bridge package needed — a step executor can do anything, including calling WDK's start(). The xFlow run remains the source of truth for lifecycle and audit.
// app/server/workflows/wdk-federate.ts
//
// Pattern: a xFlow workflow has a step whose executor delegates
// the actual work to a WDK ("use workflow") workflow. xFlow owns the
// run identity, audit trail, multi-peer coordination, and browser view.
// WDK does what it's best at: deterministic-replay server-side execution.
import { start } from "workflow/api" // peer dep on the user's WDK install
import { defineWorkflow, defineStep, link } from "@decoperations/xflow-core"
import { runtime } from "@/server/runtime"
// 1) Author the WDK side as you would normally:
async function generateReport(input: { userId: string }) {
"use workflow"
const data = await fetchUserData(input.userId)
const draft = await draftReport(data)
return await renderPdf(draft)
}
async function fetchUserData(userId: string) {
"use step"
return await db.users.find(userId)
}
async function draftReport(data: unknown) {
"use step"
return await openai.responses.create({ input: data })
}
async function renderPdf(draft: unknown) {
"use step"
return await renderPdfBuffer(draft)
}
// 2) Wrap the WDK workflow as a xFlow step executor.
//
// The xFlow step has its own claim, lifecycle, and audit log.
// Its body just happens to drive a WDK workflow underneath.
runtime.register("billing.generate-report", async (ctx, input: { userId: string }) => {
await ctx.progress({ phase: "starting-wdk-run" })
const wdkRun = await start(generateReport, [input])
// Poll or await WDK's completion API depending on your WDK version.
const pdf = await wdkRun.output()
await ctx.progress({ phase: "wdk-run-completed" })
return { kind: "value", value: pdf }
})
// 3) Author the xFlow workflow that orchestrates the federation:
export const billingFlow = defineWorkflow({
id: "billing.monthly",
version: "1.0.0",
steps: {
fetchUsers: defineStep({
id: "fetchUsers",
type: "billing.fetch-users",
sideEffects: { kind: "external", idempotencyRequired: true },
}),
report: defineStep({
id: "report",
type: "billing.generate-report",
claim: { mode: "lease", ttlMs: 30 * 60_000 },
placement: { required: ["server"] },
sideEffects: { kind: "external", idempotencyRequired: true },
}),
email: defineStep({
id: "email",
type: "billing.email",
claim: { mode: "authority", authorityActorId: "server-prod" },
sideEffects: { kind: "irreversible", idempotencyRequired: true },
}),
},
links: [
link("fetchUsers", "report", { when: { type: "step.succeeded" } }),
link("report", "email", { when: { type: "step.succeeded" } }),
],
})Why federation, not a substrate adapter?
A true xflow-substrate-wdk-world would implement the xFlow substrate contract against WDK's @workflow/world interface — putting xFlow events directly into the same store WDK uses. That's a real Phase 3 deliverable, but it requires reading WDK's World interface against its actual source and validating the event-type model against xFlow's lifecycle. It ships once that source review is done.
Federation gives you the practical interop today: keep WDK runs running, give them a xFlow audit log and multi-peer wrapper. No special adapter, no fragile mapping — just an executor that drives a WDK run.
Recipe 7
Authoring with XState.
If you already model state with XState v5, fromXState() converts a machine config into a xFlow WorkflowDefinition — invoke.src becomes step.type, onDone/onError become success/failure links, on.<event> becomes external-signal links.
XState → WorkflowDefinition
No XState runtime dep — the bridge takes the plain config object and produces a xFlow definition. Run it with the same runtime.start() you would for any other workflow.
// workflows/onboard.ts
import { setup, createMachine } from "xstate"
import { fromXState } from "@decoperations/xflow-xstate"
const onboardMachine = setup({
types: {} as { context: { userId: string }; events: { type: "approved" } | { type: "rejected" } },
}).createMachine({
id: "onboard",
initial: "verifyEmail",
states: {
verifyEmail: {
invoke: {
src: "auth.verify-email",
onDone: "kyc",
onError: "failed",
},
},
kyc: {
invoke: {
src: "compliance.kyc",
onDone: "approval",
onError: "failed",
},
},
approval: {
on: {
approved: "provision",
rejected: "failed",
},
},
provision: {
invoke: {
src: "billing.provision-account",
onDone: "done",
onError: "failed",
},
},
done: { type: "final" },
failed: { type: "final" },
},
})
// fromXState() converts the machine config into a xFlow WorkflowDefinition.
// invoke.src becomes step.type; onDone becomes step.succeeded link;
// on.approved becomes external-signal link with signal "approved".
export const onboardWorkflow = fromXState(onboardMachine.config, {
id: "onboard",
version: "1.0.0",
defaultSideEffects: { kind: "external", idempotencyRequired: true },
})Recipe 8
Migrating from WDK directives.
Direct translation table for the four most common WDK patterns — workflow body, sleep, waitForEvent, and start trigger. Plus a wdk-compat package for teams who want to keep imperative authoring while moving to xFlow's substrate.
Translations
Side-by-side: WDK directive → xFlow declarative graph + executor registration.
// Pattern A — "use workflow" / "use step" ↔ defineWorkflow + defineStep + link
//
// Workflow SDK (WDK):
async function publishPost(input: PostInput) {
"use workflow"
const draft = await generateDraft(input)
const reviewed = await reviewDraft(draft)
return await publish(reviewed)
}
async function generateDraft(i) { "use step"; /* ... */ }
async function reviewDraft(d) { "use step"; /* ... */ }
async function publish(p) { "use step"; /* ... */ }
// xFlow:
import { defineWorkflow, defineStep, link } from "@decoperations/xflow-core"
const publishPost = defineWorkflow({
id: "post.publish",
version: "1.0.0",
steps: {
generateDraft: defineStep({
id: "generateDraft", type: "post.generate-draft",
sideEffects: { kind: "external", idempotencyRequired: true },
}),
reviewDraft: defineStep({
id: "reviewDraft", type: "post.review-draft",
sideEffects: { kind: "external", idempotencyRequired: true },
}),
publish: defineStep({
id: "publish", type: "post.publish",
claim: { mode: "authority", authorityActorId: "server-prod" },
sideEffects: { kind: "irreversible", idempotencyRequired: true },
}),
},
links: [
link("generateDraft", "reviewDraft"),
link("reviewDraft", "publish"),
],
})
// Step bodies become executors registered against the runtime:
runtime.register("post.generate-draft", async (ctx, input) => { /* ... */ })
runtime.register("post.review-draft", async (ctx, input) => { /* ... */ })
runtime.register("post.publish", async (ctx, input) => { /* ... */ })
// Pattern B — sleep / waitForEvent
// WDK: xFlow:
// await sleep("1d") link(prev, next, { when: { type: "timer", afterMs: 86_400_000 } })
// await ctx.waitForEvent("approved") link(prev, next, { when: { type: "external-signal", signal: "approved" } })
// Pattern C — start() trigger
// WDK: xFlow:
// await start(myWorkflow, [input]) await runtime.start({ workflow: myWorkflow, input })
// Pattern D — webhooks / hooks
// WDK: xFlow:
// hook /workflow/v1/webhook/<token> substrate.append(runId, "xflow.signal.received", { signal, ... })Imperative authoring via xflow-wdk-compat
For teams that prefer the directive ergonomics, the wdk-compat package providesrunImperative + step. Note: this uses Inngest-style step memoization (function body re-runs, completed steps short-circuit), not WDK's deterministic-replay sandbox. Trade-offs documented in the package README.
// Authoring style closer to WDK directives — for teams porting from WDK.
//
// xflow-wdk-compat uses the Inngest-style memoization model:
// the body re-runs from the top, completed steps short-circuit by returning
// their persisted output. NOT deterministic replay (xFlow has no sandbox).
//
// Use this if the directive ergonomics matter more than declarative graphs.
// For new code, prefer defineWorkflow + defineStep + link directly.
import { runImperative, step } from "@decoperations/xflow-wdk-compat"
import { runtime } from "@/server/runtime"
await runImperative(
{
workflowId: "post.publish",
version: "1.0.0",
runtime,
},
async (input: PostInput) => {
const draft = await step("generateDraft", { type: "post.generate-draft" }, async () => {
return await llm.generateDraft(input)
})
const reviewed = await step("reviewDraft", { type: "post.review-draft" }, async () => {
return await llm.review(draft)
})
const published = await step(
"publish",
{ type: "post.publish", claim: { mode: "authority", authorityActorId: "server-prod" } },
async () => {
return await social.post(reviewed)
},
)
return published
},
)When to use: teams porting from WDK whose code reads naturally as a single async function. Easier onramp; same xFlow substrate underneath.
When to skip: new code. The declarative graph (defineWorkflow + defineStep + link) is more inspectable, easier to visualize, and avoids the subtleties of body re-runs.
Closing
Recipes are starting points, not ceilings.
The recipes above each cover one production-shaped concern. Most real systems combine several — e.g., recipe 2 (xSync + S3WORM) plus recipe 3 (browser peer) plus recipe 4 (Docker peer) for a full media pipeline. The substrate is shared; the workflow definition is shared; only the executor registrations differ per peer.
For the architectural background see the runtimes & worlds guide. For the pain points each recipe addresses, see the WDK gaps page.