Core Concepts
Workflows
A workflow is a durable function that breaks complex operations into discrete, resumable steps. Define workflows using the workflow() function:
const myWorkflow = workflow(
'workflow-id',
async ({ step, input }) => {
// Your workflow logic here
},
{
inputSchema: mySchema, // any Standard Schema-compliant schema
timeout: 60000, // milliseconds
retries: 3,
},
)
Steps
Steps are the building blocks of durable workflows. Each step is executed exactly once, even if the workflow is retried:
await step.run('step-id', async () => {
// This will only execute once — the result is persisted in Postgres
return { result: 'data' }
})
Step IDs must be unique within a workflow. When using loops, use dynamic IDs: step.run(\process-${item.id}`, …)`.
Event-Driven Steps
Wait for external events to pause and resume workflows without consuming resources:
const eventData = await step.waitFor('wait-step', {
eventName: 'payment-completed',
timeout: 5 * 60 * 1000, // 5 minutes
})
Send events from outside the workflow:
await engine.triggerEvent({
runId: run.id,
eventName: 'payment-completed',
data: { amount: 99 },
})
Scheduled & Delay Steps
Wait until a specific time, or delay for a duration (sugar over waitUntil). If the date is in the past, the step runs immediately.
// Wait until a specific date (Date, ISO string, or { date })
await step.waitUntil('scheduled-step', new Date('2025-06-01'))
await step.waitUntil('scheduled-step', '2025-06-01T12:00:00.000Z')
await step.waitUntil('scheduled-step', { date: new Date('2025-06-01') })
// Delay for a duration (string or object). sleep is an alias of delay.
await step.delay('cool-off', '3 days')
await step.delay('cool-off', { days: 3 })
await step.delay('ramp-up', '2 days 12 hours')
await step.sleep('backoff', '1 hour')
Polling Steps
Repeatedly check a condition until it returns a truthy value or a timeout expires:
const result = await step.poll(
'wait-for-payment',
async () => {
const payment = await getPaymentStatus(input.paymentId)
return payment.completed ? payment : false
},
{ interval: '1 minute', timeout: '24 hours' },
)
if (result.timedOut) {
return { status: 'expired' }
}
return { status: 'paid', payment: result.data }
conditionFn returns false to keep polling, or a truthy value to resolve the step. The minimum interval is 30s (default). If timeout is omitted the step polls indefinitely.
Child Workflows
Start a child workflow from inside a parent workflow and wait for it to complete without keeping a worker busy:
const parent = workflow('parent-workflow', async ({ step, input }) => {
const childOutput = await step.invokeChildWorkflow('run-child', childWorkflowRef, {
userId: input.userId,
});
return { childOutput };
});
step.invokeChildWorkflow is durable. Unlike startWorkflow(), which creates a top-level run and returns immediately, invokeChildWorkflow() is a child call: the child run is started once for the parent step, the parent pauses while the child runs, and the child output is cached on the parent timeline when it completes. If the child fails or is cancelled, the parent step throws and follows the parent workflow’s normal retry/failure behavior.
You can also invoke by workflow ID:
const result = await step.invokeChildWorkflow<{ ok: true }>('run-child', {
workflowId: 'child-workflow',
input: { userId: input.userId },
});
Behavioral notes
- Cancellation does not propagate to children. Cancelling a parent (via
cancelWorkflowor a parent timeout) does not cancel any in-flight child workflows started viainvokeChildWorkflow. Children run to their own terminal state; the wakeup event the child would normally send to the parent is dropped because the parent is no longer inpaused. The same applies if the parent reaches any other terminal state (failed or completed) while a child is in flight. - Manual resume and fast-forward do not skip child waits.
resumeWorkflow()andfastForwardWorkflow()are no-ops while a parent is paused onstep.invokeChildWorkflow(). The parent only moves forward when the child completes, fails, or is cancelled.
Resource ID
The optional resourceId associates a workflow run with an external entity in your application — a user, an order, a subscription, or any domain object the workflow operates on. It serves two purposes:
- Association — links each workflow run to the business entity it belongs to, so you can query all runs for a given resource.
- Scoping — when provided, all read and write operations (get, update, pause, resume, cancel, trigger events) include
resource_idin their database queries, ensuring you only access workflow runs that belong to that resource. Useful for enforcing tenant isolation or ownership checks.
resourceId is optional on every API method. If you don’t need to group or scope runs by an external entity, you can omit it entirely and use runId alone.
// Start a workflow scoped to a specific user
const run = await engine.startWorkflow({
workflowId: 'send-welcome-email',
resourceId: 'user-123', // ties this run to user-123
input: { email: 'user@example.com' },
})
// Later, list all workflow runs for that user
const { items } = await engine.getRuns({
resourceId: 'user-123',
})
Idempotency Key
Pass an optional idempotencyKey to startWorkflow() when the same logical start might be requested more than once (user double-clicks, API retries, or at-least-once webhooks). The engine stores the key on the run; a second startWorkflow with the same key returns the existing run and does not enqueue a second job.
Keys are globally unique in the database (up to 256 characters), not scoped per workflow or resource. Prefer stable, namespaced strings so different workflows never collide — for example send-welcome-email:order-123 instead of a bare order id.
const run = await engine.startWorkflow({
workflowId: 'send-welcome-email',
input: { email: 'user@example.com' },
idempotencyKey: 'send-welcome-email:checkout-session_cs_abc123',
})
// Idempotent: returns the same run and run.id as above
const again = await engine.startWorkflow({
workflowId: 'send-welcome-email',
input: { email: 'other@example.com' }, // ignored for deduplication
idempotencyKey: 'send-welcome-email:checkout-session_cs_abc123',
})
The returned WorkflowRun includes idempotencyKey (or null if omitted).
Pause and Resume
Manually pause a workflow and resume it later:
// Pause inside a workflow
await step.pause('pause-step')
// Resume from outside the workflow
await engine.resumeWorkflow({
runId: run.id,
resourceId: 'resource-123',
})
resumeWorkflow() does not force a parent past a step.invokeChildWorkflow() wait. Child workflow waits resume only when the child completes, fails, or is cancelled.
Fast-Forward
Skip the current waiting step and immediately resume execution. fastForwardWorkflow inspects the paused step and dispatches the right internal action — triggerEvent for waitFor, timeout triggers for delay/waitUntil, resume for pause, and direct output writes for poll. If the workflow is not paused or is paused on step.invokeChildWorkflow(), it’s a no-op.
Useful for testing, debugging, or manually advancing workflows past long waits.
// Fast-forward a waitFor step, providing mock event data
await engine.fastForwardWorkflow({
runId: run.id,
resourceId: 'user-123',
data: { approved: true, reviewer: 'admin' },
})
// Fast-forward a delay/waitUntil step (no data needed)
await engine.fastForwardWorkflow({
runId: run.id,
resourceId: 'user-123',
})
// Fast-forward a poll step with mock result data
await engine.fastForwardWorkflow({
runId: run.id,
resourceId: 'user-123',
data: { paymentId: 'pay_123', status: 'completed' },
})
| Paused step type | Behavior |
|---|---|
step.waitFor() |
Triggers the event with data (defaults to {}) |
step.delay() / step.waitUntil() |
Triggers the timeout event to skip the wait |
step.poll() |
Writes data as the poll result and triggers resolution |
step.pause() |
Delegates to resumeWorkflow() |
step.invokeChildWorkflow() |
No-op; child completion, failure, or cancellation controls the parent result |
Input Validation
pg-workflows supports any Standard Schema-compliant validation library for inputSchema — Zod, Valibot, ArkType, or any library that implements the spec. When a schema is provided, the workflow input is validated before execution and the handler’s input parameter is fully typed.
With Zod
import { workflow } from 'pg-workflows'
import { z } from 'zod'
const myWorkflow = workflow(
'user-onboarding',
async ({ step, input }) => {
// input is typed as { email: string; name: string }
await step.run('send-welcome', async () => {
return await sendEmail(input.email, `Welcome, ${input.name}!`)
})
},
{
inputSchema: z.object({
email: z.string().email(),
name: z.string(),
}),
},
)
With Valibot
import { workflow } from 'pg-workflows'
import * as v from 'valibot'
const myWorkflow = workflow(
'user-onboarding',
async ({ step, input }) => {
// input is typed as { email: string; name: string }
await step.run('send-welcome', async () => {
return await sendEmail(input.email, `Welcome, ${input.name}!`)
})
},
{
inputSchema: v.object({
email: v.pipe(v.string(), v.email()),
name: v.string(),
}),
},
)
Without a Schema
When no inputSchema is provided, input is not validated and input is typed as unknown. The engine has no guarantee about the shape of the data — it passes through whatever was provided to startWorkflow(). You are responsible for narrowing the type yourself:
import { workflow } from 'pg-workflows'
const myWorkflow = workflow(
'process-order',
async ({ step, input }) => {
// Option 1: Type assertion - you trust the caller
const { orderId, amount } = input as { orderId: string; amount: number }
await step.run('charge', async () => {
return await chargeOrder(orderId, amount)
})
},
)
const myDefensiveWorkflow = workflow(
'process-order-safe',
async ({ step, input }) => {
// Option 2: Runtime checks - you verify before using
if (typeof input !== 'object' || input === null) {
throw new Error('Expected input to be an object')
}
const { orderId, amount } = input as Record<string, unknown>
if (typeof orderId !== 'string' || typeof amount !== 'number') {
throw new Error('Invalid input shape')
}
await step.run('charge', async () => {
return await chargeOrder(orderId, amount)
})
},
)
Using an inputSchema is recommended — it validates input at the engine boundary before your handler runs, and gives you full type inference with no manual work.