Resumable Streams
When building chat interfaces, network interruptions, page refreshes, or serverless function timeouts can break the connection to an in-progress agent.
Where a standard chat implementation would require the user to resend their message and wait for the entire response again, Workflow DevKit provides WorkflowChatTransport, a drop-in transport for the AI SDK that enables automatic stream reconnection.
Implementing stream resumption
Getting stream resumption requires both adding WorkflowChatTransport (to allow the client to reconnect to the stream) and some client-side state management, in order to know when and where to reconnect to the stream.
Return the Run ID from Your API
Modify your chat endpoint to include the workflow run ID in a response header:
import { createUIMessageStreamResponse, convertToModelMessages } from 'ai';
import { start } from 'workflow/api';
import { chatWorkflow } from './workflow';
export async function POST(req: Request) {
const { messages, modelId } = await req.json();
const modelMessages = convertToModelMessages(messages);
const run = await start(chatWorkflow, [{ messages: modelMessages, modelId }]);
return createUIMessageStreamResponse({
stream: run.readable,
headers: {
'x-workflow-run-id': run.runId,
},
});
}Add a Stream Reconnection Endpoint
Create a new API route that returns the stream for an existing run:
import { createUIMessageStreamResponse } from 'ai';
import { getRun } from 'workflow/api';
export async function GET(
request: Request,
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const { searchParams } = new URL(request.url);
// Client provides the last chunk index they received
const startIndexParam = searchParams.get('startIndex');
const startIndex = startIndexParam
? parseInt(startIndexParam, 10)
: undefined;
const run = getRun(id);
const stream = run.getReadable({ startIndex });
return createUIMessageStreamResponse({ stream });
}The startIndex parameter ensures the client only receives chunks it missed, avoiding duplicate data.
Use WorkflowChatTransport in the Client
Replace the default transport in useChat with WorkflowChatTransport, and store the latest run ID.
'use client';
import { useChat } from '@ai-sdk/react';
import { WorkflowChatTransport } from '@workflow/ai';
import { useMemo, useState } from 'react';
export function Chat() {
const [input, setInput] = useState('');
// Check for an active workflow run on mount
const activeRunId = useMemo(() => {
if (typeof window === 'undefined') return;
return localStorage.getItem('active-workflow-run-id') ?? undefined;
}, []);
const { messages, sendMessage, status } = useChat({
resume: Boolean(activeRunId),
transport: new WorkflowChatTransport({
api: '/api/chat',
// Store the run ID when a new chat starts
onChatSendMessage: (response, options) => {
const workflowRunId = response.headers.get('x-workflow-run-id');
if (workflowRunId) {
localStorage.setItem('active-workflow-run-id', workflowRunId);
}
},
// Clear the run ID when the chat completes
onChatEnd: () => {
localStorage.removeItem('active-workflow-run-id');
},
// Use the stored run ID for reconnection
prepareReconnectToStreamRequest: ({ api, ...rest }) => {
const runId = localStorage.getItem('active-workflow-run-id');
if (!runId) throw new Error('No active workflow run ID found');
return {
...rest,
api: `/api/chat/${encodeURIComponent(runId)}/stream`,
};
},
maxConsecutiveErrors: 5,
}),
});
return (
<div>
{messages.map((m) => (
<div key={m.id}>
<strong>{m.role}:</strong> {m.content}
</div>
))}
<form
onSubmit={(e) => {
e.preventDefault();
sendMessage({ text: input });
setInput('');
}}
>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Type a message..."
/>
</form>
</div>
);
}How It Works
- When the user sends a message,
WorkflowChatTransportmakes a POST to/api/chat - The API starts a workflow and returns the run ID in the
x-workflow-run-idheader onChatSendMessagestores this run ID in localStorage- If the stream is interrupted before receiving a "finish" chunk, the transport automatically reconnects
prepareReconnectToStreamRequestbuilds the reconnection URL using the stored run ID- The reconnection endpoint returns the stream from where the client left off
- When the stream completes,
onChatEndclears the stored run ID
This approach also handles page refreshes, as the client will automatically reconnect to the stream from the last known position when the UI loads with a stored run ID.
Related Documentation
WorkflowChatTransportAPI Reference - Full configuration options- Streaming - Understanding workflow streams
getRun()API Reference - Retrieving existing runs