Skip to the content.

AI & Agent Workflows

AI agents and LLM pipelines are one of the best use cases for durable execution. LLM calls are slow, expensive, and unreliable — exactly the kind of work that should never be repeated unnecessarily.

pg-workflows gives you:

Why durable execution matters for AI

Problem Without pg-workflows With pg-workflows
Process crashes mid-pipeline All LLM calls re-run from scratch Resumes from the last completed step
LLM API returns 429/500 Manual retry logic everywhere Automatic retries with exponential backoff
Human review needed Custom polling/webhook infrastructure step.waitFor() — zero resource consumption while waiting
Debugging failed agents Lost intermediate state Full timeline of every step’s input/output in PostgreSQL
Cost control Repeated expensive LLM calls on failure Each LLM call runs exactly once, result cached
Long-running pipelines Timeout or lost connections Runs for hours/days, state persisted in Postgres

Multi-Step AI Agent

const researchAgent = workflow(
  'research-agent',
  async ({ step, input }) => {
    // Step 1: Plan the research (persisted - never re-runs on retry)
    const plan = await step.run('create-plan', async () => {
      return await llm.chat({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: `Create a research plan for: ${input.topic}` }],
      })
    })

    // Step 2: Execute each research task durably
    const findings = []
    for (const task of plan.tasks) {
      const result = await step.run(`research-${task.id}`, async () => {
        return await llm.chat({
          model: 'gpt-4o',
          messages: [{ role: 'user', content: `Research: ${task.description}` }],
        })
      })
      findings.push(result)
    }

    // Step 3: Synthesize results
    const report = await step.run('synthesize', async () => {
      return await llm.chat({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: `Synthesize these findings: ${JSON.stringify(findings)}` }],
      })
    })

    return { plan, findings, report }
  },
  {
    retries: 3,
    timeout: 30 * 60 * 1000, // 30 minutes
  },
)

If the process crashes after completing 3 of 5 research tasks, the agent resumes from task 4 — no LLM calls are wasted.

Human-in-the-Loop AI Pipeline

const contentPipeline = workflow(
  'ai-content-pipeline',
  async ({ step, input }) => {
    // Step 1: Generate draft with AI
    const draft = await step.run('generate-draft', async () => {
      return await llm.chat({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: `Write a blog post about: ${input.topic}` }],
      })
    })

    // Step 2: Pause for human review — costs nothing while waiting
    const review = await step.waitFor('human-review', {
      eventName: 'content-reviewed',
      timeout: 7 * 24 * 60 * 60 * 1000, // 7 days
    })

    // Step 3: Revise based on feedback
    if (review.approved) {
      return { status: 'published', content: draft }
    }

    const revision = await step.run('revise-draft', async () => {
      return await llm.chat({
        model: 'gpt-4o',
        messages: [
          { role: 'user', content: `Revise this draft based on feedback:\n\nDraft: ${draft}\n\nFeedback: ${review.feedback}` },
        ],
      })
    })

    return { status: 'revised', content: revision }
  },
  { retries: 3 },
)

// A reviewer approves or requests changes via your API
await engine.triggerEvent({
  runId: run.id,
  eventName: 'content-reviewed',
  data: { approved: false, feedback: 'Make the intro more engaging' },
})

RAG Pipeline with Tool Use

const ragAgent = workflow(
  'rag-agent',
  async ({ step, input }) => {
    // Step 1: Generate embeddings (cached on retry)
    const embedding = await step.run('embed-query', async () => {
      return await openai.embeddings.create({
        model: 'text-embedding-3-small',
        input: input.query,
      })
    })

    // Step 2: Search vector store
    const documents = await step.run('search-docs', async () => {
      return await vectorStore.search(embedding, { topK: 10 })
    })

    // Step 3: Generate answer with context
    const answer = await step.run('generate-answer', async () => {
      return await llm.chat({
        model: 'gpt-4o',
        messages: [
          { role: 'system', content: `Answer using these documents:\n${documents.map((d) => d.text).join('\n')}` },
          { role: 'user', content: input.query },
        ],
      })
    })

    // Step 4: Validate and fact-check
    const validation = await step.run('fact-check', async () => {
      return await llm.chat({
        model: 'gpt-4o',
        messages: [
          { role: 'user', content: `Fact-check this answer against the source documents. Answer: ${answer}` },
        ],
      })
    })

    return { answer, validation, sources: documents }
  },
  { retries: 3, timeout: 5 * 60 * 1000 },
)