Skip to the content.

API Reference

WorkflowEngine

The full engine — registers workflows, runs workers, and executes steps. Use in your worker service or in a single-service setup.

Constructor

// With connection string (engine creates and owns the pool)
const engine = new WorkflowEngine({
  connectionString: string,          // PostgreSQL connection string
  workflows?: WorkflowDefinition[],  // Optional: register workflows on init
  logger?: WorkflowLogger,           // Optional: custom logger
  boss?: PgBoss,                     // Optional: bring your own pg-boss instance
})

// With existing pool (you manage the pool lifecycle)
const engine = new WorkflowEngine({
  pool: pg.Pool,                     // Your pg.Pool instance
  workflows?: WorkflowDefinition[],
  logger?: WorkflowLogger,
  boss?: PgBoss,
})

Pass either connectionString or pool (exactly one). When connectionString is used, the engine creates the pool internally and closes it on stop().

When boss is omitted, pg-boss is created automatically with an isolated schema (pgboss_v12_pgworkflow) to avoid conflicts with other pg-boss installations.

Methods

Method Description
start(asEngine?, options?) Start the engine and workers
stop() Stop the engine gracefully
registerWorkflow(definition) Register a workflow definition
startWorkflow(ref, input, options?) Start a top-level workflow run using a typed ref (see WorkflowRef)
startWorkflow({ workflowId, resourceId?, input, idempotencyKey?, options? }) Start a top-level workflow run by ID. resourceId optionally ties the run to an external entity (see Resource ID). idempotencyKey optionally deduplicates starts (see Idempotency Key).
pauseWorkflow({ runId, resourceId? }) Pause a running workflow
resumeWorkflow({ runId, resourceId?, options? }) Resume a paused workflow. No-ops for step.invokeChildWorkflow() waits.
cancelWorkflow({ runId, resourceId? }) Cancel a workflow
triggerEvent({ runId, resourceId?, eventName, data?, options? }) Send an event to a workflow
fastForwardWorkflow({ runId, resourceId?, data? }) Skip the current waiting step and resume execution. No-ops for step.invokeChildWorkflow() waits.
getRun({ runId, resourceId? }) Get workflow run details
checkProgress({ runId, resourceId? }) Get workflow progress
getRuns(filters) List workflow runs with pagination

WorkflowClient

A lightweight client for API services in a microservices setup. Starts and manages workflow runs without importing handler code. Import from pg-workflows/client.

Constructor

import { WorkflowClient } from 'pg-workflows/client'

const client = new WorkflowClient({
  connectionString: string,  // or pool: pg.Pool
  logger?: WorkflowLogger,
})

Methods

Method Description
start() Connect to the database (called automatically on first use)
stop() Close the connection
startWorkflow(ref, input, options?) Start a top-level workflow run using a typed ref
startWorkflow({ workflowId, input, resourceId?, options? }) Start a top-level workflow run by ID
pauseWorkflow({ runId, resourceId? }) Pause a running workflow
resumeWorkflow({ runId, resourceId?, options? }) Resume a paused workflow. No-ops for step.invokeChildWorkflow() waits.
cancelWorkflow({ runId, resourceId? }) Cancel a workflow
triggerEvent({ runId, resourceId?, eventName, data?, options? }) Send an event to a workflow
fastForwardWorkflow({ runId, resourceId?, data? }) Skip the current waiting step. No-ops for step.invokeChildWorkflow() waits.
getRun({ runId, resourceId? }) Get workflow run details
checkProgress({ runId, resourceId? }) Get workflow progress
getRuns(filters) List workflow runs with pagination

WorkflowRef

A lightweight, callable reference that carries a workflow’s ID and input schema without any handler code. Created with createWorkflowRef() (importable from pg-workflows/client) or workflow.ref().

import { createWorkflowRef } from 'pg-workflows/client'
import { z } from 'zod'

// Create a ref — just an ID + schema, no handler
const myWorkflow = createWorkflowRef('my-workflow', {
  inputSchema: z.object({ email: z.string().email() }),
})

// Use in API service — type-safe input
await client.startWorkflow(myWorkflow, { email: 'user@example.com' })

// Use in worker service — call with a handler to get a full definition
const definition = myWorkflow(async ({ step, input }) => {
  await step.run('do-work', async () => {
    /* ... */
  })
})

Refs can also carry an output type for step.invokeChildWorkflow():

type ChildOutput = { ok: true }
const childWorkflow = createWorkflowRef<ChildOutput>('child-workflow')

const output = await step.invokeChildWorkflow('call-child', childWorkflow, {})
// output is ChildOutput

workflow()

workflow<I extends Parameters>(
  id: string,
  handler: (context: WorkflowContext) => Promise<unknown>,
  options?: {
    inputSchema?: I,
    timeout?: number,
    retries?: number,
  }
): WorkflowDefinition<I>

WorkflowContext

The context object passed to workflow handlers:

{
  input: T,                          // Validated input data
  workflowId: string,                // Workflow ID
  runId: string,                     // Unique run ID
  timeline: Record<string, unknown>, // Step execution history
  logger: WorkflowLogger,            // Logger instance
  step: {
    run: <T>(stepId, handler) => Promise<T>,
    // without timeout: always returns event data T
    waitFor: <T>(stepId, { eventName, schema? }) => Promise<T>,
    // with timeout: returns event data T or undefined if timeout fires first
    waitFor: <T>(stepId, { eventName, timeout, schema? }) => Promise<T | undefined>,
    waitUntil: (stepId, date | dateString | { date }) => Promise<void>,
    delay: (stepId, duration) => Promise<void>,
    sleep: (stepId, duration) => Promise<void>,
    pause: (stepId) => Promise<void>,
    poll: <T>(stepId, conditionFn, { interval?, timeout? }) => Promise<{ timedOut: false; data: T } | { timedOut: true }>,
    // invokeChildWorkflow has two overloads:
    //   1) by typed `WorkflowRef<TInput, TOutput>` - return type is inferred
    invokeChildWorkflow: <TInput, TOutput>(stepId, ref: WorkflowRef<TInput, TOutput>, input, options?) => Promise<TOutput>,
    //   2) by workflow ID - explicit `<TOutput>` generic for the return
    invokeChildWorkflow: <TOutput>(stepId, { workflowId, input, resourceId?, idempotencyKey?, options? }) => Promise<TOutput>,
  }
}

startWorkflow() creates a top-level run and returns immediately. step.invokeChildWorkflow() starts a child run from inside a workflow, pauses the parent, and resolves with the child output when the child reaches a terminal state.

duration is a string (e.g. '3 days', '2h') or an object ({ weeks?, days?, hours?, minutes?, seconds? }). See the Duration type and parseDuration from the package.

WorkflowStatus

enum WorkflowStatus {
  PENDING = 'pending',
  RUNNING = 'running',
  PAUSED = 'paused',
  COMPLETED = 'completed',
  FAILED = 'failed',
  CANCELLED = 'cancelled',
}