The simplest Postgres workflow engine for TypeScript. Durable execution, event-driven orchestration, and automatic retries - powered entirely by PostgreSQL. No extra infrastructure. No vendor lock-in.
npm install pg-workflows
Most workflow engines ask you to adopt an entirely new platform - a new runtime, a new deployment target, a new bill. pg-workflows takes a different approach: if you already have PostgreSQL, you already have everything you need.
| pg-workflows | Temporal | Inngest | DBOS | pgflow | |
|---|---|---|---|---|---|
| Runs on your existing Postgres | Yes | No | No | Partial | Supabase only |
| Zero extra infrastructure | Yes | No | No | No | No |
| Framework-agnostic | Yes | Yes | No | Yes | No |
| Event-driven pause/resume | Yes | Yes | Yes | No | No |
| Open source | MIT | MIT | ELv2 | MIT | Apache-2.0 |
| TypeScript-first | Yes | Via SDK | Yes | Via SDK | Yes |
If you need enterprise-grade features like distributed tracing, complex DAG scheduling, or plan to scale to millions of concurrent workflows, consider Temporal, Inngest, Trigger.dev, or DBOS.
step.waitFor(). Resume automatically when signals arrive.pg-workflows uses PostgreSQL as both the job queue and the state store. Under the hood:
waitFor() or pause() - the workflow sleeps with zero resource consumptionresumeWorkflow() is calledAll state lives in PostgreSQL. No Redis. No message broker. No external scheduler. Just Postgres.
npm install pg-workflows pg-boss
# or
yarn add pg-workflows pg-boss
# or
bun add pg-workflows pg-boss
import { WorkflowEngine, workflow } from 'pg-workflows';
import PgBoss from 'pg-boss';
import { z } from 'zod';
// Define a durable workflow
const sendWelcomeEmail = workflow(
'send-welcome-email',
async ({ step, input }) => {
// Step 1: Create user record (runs exactly once)
const user = await step.run('create-user', async () => {
return { id: '123', email: input.email };
});
// Step 2: Send email (runs exactly once)
await step.run('send-email', async () => {
await sendEmail(user.email, 'Welcome!');
});
// Step 3: Wait for user confirmation (pauses the workflow)
const confirmation = await step.waitFor('wait-confirmation', {
eventName: 'user-confirmed',
timeout: 24 * 60 * 60 * 1000, // 24 hours
});
return { success: true, user, confirmation };
},
{
inputSchema: z.object({
email: z.string().email(),
}),
timeout: 48 * 60 * 60 * 1000, // 48 hours
retries: 3,
}
);
const boss = new PgBoss({
connectionString: process.env.DATABASE_URL,
});
const engine = new WorkflowEngine({
boss,
workflows: [sendWelcomeEmail],
});
await engine.start();
// Start a workflow run
const run = await engine.startWorkflow({
workflowId: 'send-welcome-email',
resourceId: 'user-123',
input: { email: 'user@example.com' },
});
// Send an event to resume the waiting workflow
await engine.triggerEvent({
runId: run.id,
resourceId: 'user-123',
eventName: 'user-confirmed',
data: { confirmedAt: new Date() },
});
// Check progress
const progress = await engine.checkProgress({
runId: run.id,
resourceId: 'user-123',
});
console.log(`Progress: ${progress.completionPercentage}%`);
A workflow is a durable function that breaks complex operations into discrete, resumable steps. Define workflows using the workflow() function:
const myWorkflow = workflow(
'workflow-id',
async ({ step, input }) => {
// Your workflow logic here
},
{
inputSchema: z.object({ /* ... */ }),
timeout: 60000, // milliseconds
retries: 3,
}
);
Steps are the building blocks of durable workflows. Each step is executed exactly once, even if the workflow is retried:
await step.run('step-id', async () => {
// This will only execute once - the result is persisted in Postgres
return { result: 'data' };
});
Wait for external events to pause and resume workflows without consuming resources:
const eventData = await step.waitFor('wait-step', {
eventName: 'payment-completed',
timeout: 5 * 60 * 1000, // 5 minutes
});
Manually pause a workflow and resume it later:
// Pause inside a workflow
await step.pause('pause-step');
// Resume from outside the workflow
await engine.resumeWorkflow({
runId: run.id,
resourceId: 'resource-123',
});
const conditionalWorkflow = workflow('conditional', async ({ step }) => {
const data = await step.run('fetch-data', async () => {
return { isPremium: true };
});
if (data.isPremium) {
await step.run('premium-action', async () => {
// Only runs for premium users
});
}
});
const batchWorkflow = workflow('batch-process', async ({ step }) => {
const items = await step.run('get-items', async () => {
return [1, 2, 3, 4, 5];
});
for (const item of items) {
await step.run(`process-${item}`, async () => {
// Each item is processed durably
return processItem(item);
});
}
});
const resilientWorkflow = workflow('resilient', async ({ step }) => {
await step.run('risky-operation', async () => {
// Retries up to 3 times with exponential backoff
return await riskyApiCall();
});
}, {
retries: 3,
timeout: 60000,
});
const progress = await engine.checkProgress({
runId: run.id,
resourceId: 'resource-123',
});
console.log({
status: progress.status,
completionPercentage: progress.completionPercentage,
completedSteps: progress.completedSteps,
totalSteps: progress.totalSteps,
});
const engine = new WorkflowEngine({
boss: PgBoss, // Required: pg-boss instance
workflows: WorkflowDefinition[], // Optional: register workflows on init
logger: WorkflowLogger, // Optional: custom logger
});
| Method | Description |
|---|---|
start(asEngine?, options?) |
Start the engine and workers |
stop() |
Stop the engine gracefully |
registerWorkflow(definition) |
Register a workflow definition |
startWorkflow({ workflowId, resourceId?, input, options? }) |
Start a new workflow run |
pauseWorkflow({ runId, resourceId? }) |
Pause a running workflow |
resumeWorkflow({ runId, resourceId?, options? }) |
Resume a paused workflow |
cancelWorkflow({ runId, resourceId? }) |
Cancel a workflow |
triggerEvent({ runId, resourceId?, eventName, data?, options? }) |
Send an event to a workflow |
getRun({ runId, resourceId? }) |
Get workflow run details |
checkProgress({ runId, resourceId? }) |
Get workflow progress |
getRuns(filters) |
List workflow runs with pagination |
workflow<I extends Parameters>(
id: string,
handler: (context: WorkflowContext) => Promise<unknown>,
options?: {
inputSchema?: I,
timeout?: number,
retries?: number,
}
): WorkflowDefinition<I>
The context object passed to workflow handlers:
{
input: T, // Validated input data
workflowId: string, // Workflow ID
runId: string, // Unique run ID
timeline: Record<string, unknown>, // Step execution history
logger: WorkflowLogger, // Logger instance
step: {
run: <T>(stepId, handler) => Promise<T>,
waitFor: <T>(stepId, { eventName, timeout?, schema? }) => Promise<T>,
waitUntil: (stepId, { date }) => Promise<void>,
pause: (stepId) => Promise<void>,
}
}
enum WorkflowStatus {
PENDING = 'pending',
RUNNING = 'running',
PAUSED = 'paused',
COMPLETED = 'completed',
FAILED = 'failed',
CANCELLED = 'cancelled',
}
| Variable | Description | Default |
|---|---|---|
DATABASE_URL |
PostgreSQL connection string | required |
WORKFLOW_RUN_WORKERS |
Number of worker processes | 3 |
WORKFLOW_RUN_EXPIRE_IN_SECONDS |
Job expiration time in seconds | 300 |
The engine automatically runs migrations on startup to create the required tables:
workflow_runs - Stores workflow execution state, step results, and timelinepgboss.* - pg-boss job queue tables for reliable task schedulingAs championed by postgresforeverything.com, PostgreSQL is one of the most reliable, feature-rich, and cost-effective databases ever built. pg-workflows embraces this philosophy:
If you’re already running Postgres (and you probably should be), adding durable workflows is as simple as:
npm install pg-workflows
Special thanks to the teams behind Temporal, Inngest, Trigger.dev, and DBOS for pioneering durable execution patterns and inspiring this project.
MIT