AI Agents

Resumable Streams

When building chat interfaces, network interruptions, page refreshes, or serverless function timeouts can break the connection to an in-progress agent.

Where a standard chat implementation would require the user to resend their message and wait for the entire response again, Workflow DevKit provides WorkflowChatTransport, a drop-in transport for the AI SDK that enables automatic stream reconnection.

Implementing stream resumption

Getting stream resumption requires both adding WorkflowChatTransport (to allow the client to reconnect to the stream) and some client-side state management, in order to know when and where to reconnect to the stream.

Return the Run ID from Your API

Modify your chat endpoint to include the workflow run ID in a response header:

app/api/chat/route.ts
import { createUIMessageStreamResponse, convertToModelMessages } from 'ai';
import { start } from 'workflow/api';
import { chatWorkflow } from './workflow';

export async function POST(req: Request) {
  const { messages, modelId } = await req.json();
  const modelMessages = convertToModelMessages(messages);

  const run = await start(chatWorkflow, [{ messages: modelMessages, modelId }]);

  return createUIMessageStreamResponse({
    stream: run.readable,
    headers: {
      'x-workflow-run-id': run.runId, 
    },
  });
}

Add a Stream Reconnection Endpoint

Create a new API route that returns the stream for an existing run:

app/api/chat/[id]/stream/route.ts
import { createUIMessageStreamResponse } from 'ai';
import { getRun } from 'workflow/api'; 

export async function GET(
  request: Request,
  { params }: { params: Promise<{ id: string }> }
) {
  const { id } = await params;
  const { searchParams } = new URL(request.url);

  // Client provides the last chunk index they received
  const startIndexParam = searchParams.get('startIndex'); 
  const startIndex = startIndexParam
    ? parseInt(startIndexParam, 10)
    : undefined;

  const run = getRun(id); 
  const stream = run.getReadable({ startIndex }); 

  return createUIMessageStreamResponse({ stream });
}

The startIndex parameter ensures the client only receives chunks it missed, avoiding duplicate data.

Use WorkflowChatTransport in the Client

Replace the default transport in useChat with WorkflowChatTransport, and store the latest run ID.

app/chat.tsx
'use client';

import { useChat } from '@ai-sdk/react';
import { WorkflowChatTransport } from '@workflow/ai'; 
import { useMemo, useState } from 'react';

export function Chat() {
  const [input, setInput] = useState('');

  // Check for an active workflow run on mount
  const activeRunId = useMemo(() => { 
    if (typeof window === 'undefined') return; 
    return localStorage.getItem('active-workflow-run-id') ?? undefined; 
  }, []); 

  const { messages, sendMessage, status } = useChat({
    resume: Boolean(activeRunId), 
    transport: new WorkflowChatTransport({ 
      api: '/api/chat',

      // Store the run ID when a new chat starts
      onChatSendMessage: (response, options) => { 
        const workflowRunId = response.headers.get('x-workflow-run-id'); 
        if (workflowRunId) { 
          localStorage.setItem('active-workflow-run-id', workflowRunId); 
        } 
      }, 

      // Clear the run ID when the chat completes
      onChatEnd: () => { 
        localStorage.removeItem('active-workflow-run-id'); 
      }, 

      // Use the stored run ID for reconnection
      prepareReconnectToStreamRequest: ({ api, ...rest }) => { 
        const runId = localStorage.getItem('active-workflow-run-id'); 
        if (!runId) throw new Error('No active workflow run ID found'); 
        return { 
          ...rest, 
          api: `/api/chat/${encodeURIComponent(runId)}/stream`, 
        }; 
      }, 

      maxConsecutiveErrors: 5,
    }), 
  });

  return (
    <div>
      {messages.map((m) => (
        <div key={m.id}>
          <strong>{m.role}:</strong> {m.content}
        </div>
      ))}
      <form
        onSubmit={(e) => {
          e.preventDefault();
          sendMessage({ text: input });
          setInput('');
        }}
      >
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="Type a message..."
        />
      </form>
    </div>
  );
}

How It Works

  1. When the user sends a message, WorkflowChatTransport makes a POST to /api/chat
  2. The API starts a workflow and returns the run ID in the x-workflow-run-id header
  3. onChatSendMessage stores this run ID in localStorage
  4. If the stream is interrupted before receiving a "finish" chunk, the transport automatically reconnects
  5. prepareReconnectToStreamRequest builds the reconnection URL using the stored run ID
  6. The reconnection endpoint returns the stream from where the client left off
  7. When the stream completes, onChatEnd clears the stored run ID

This approach also handles page refreshes, as the client will automatically reconnect to the stream from the last known position when the UI loads with a stored run ID.