AI Chat with HTTP Streaming
This article describes how to build a chat app with ChatGPT by streaming text from OpenAI to the Convex database and ultimately to clients with the app loaded. This provides a super responsive experience for everyone using the app, but it can require a lot of database bandwidth since we’re rewriting the document with the message on every streamed update we get from OpenAI.
In this article, we’ll go through an extension to this approach — using HTTP actions with streaming. The end result will be that we can get the responsive, nearly character by character streaming for the user ChatGPT is responding to, while every other client sees updates in larger chunks (and we save on bandwidth).
The full code for this is available here but we’ll walk through the most interesting parts below.
Above is a diagram showing how data flows in this app. Users are able to send messages using a mutation (send
) and read message using a query (list
).
When a user sends a message that needs a response from ChatGPT, the send
mutation returns a result that the client uses to call an HTTP endpoint /chat
. This endpoint talks to OpenAI, streaming a response from ChatGPT.
Here’s what the client portion of this looks like:
// src/App.tsx
// https://github.com/sshader/streaming-chat-gpt/blob/main/src/App.tsx#L84
async function handleGptResponse(
onUpdate: (update: string) => void,
requestBody: { messageId: Id<"messages">; messages: Doc<"messages">[] }
) {
const convexSiteUrl = import.meta.env.VITE_CONVEX_URL.replace(
/\.cloud$/,
".site"
);
const response = await fetch(`${convexSiteUrl}/chat`, {
method: "POST",
body: JSON.stringify(requestBody),
headers: { "Content-Type": "application/json" },
});
// Taken from https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
const responseBody = response.body;
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
onUpdate(new TextDecoder().decode(value));
return;
}
onUpdate(new TextDecoder().decode(value));
}
}
and when a user sends a message:
// src/App.tsx
// https://github.com/sshader/streaming-chat-gpt/blob/main/src/App.tsx#L53
<form
onSubmit={async (e) => {
e.preventDefault();
const result = await sendMessage({
body: newMessageText,
author: NAME,
});
setNewMessageText("");
// Kick off ChatGPT response + stream the result
if (result !== null) {
await handleGptResponse((text) => {
// TODO: make the streamed message appear to the user
console.log(text);
}, result);
}
}}
>
{ /* ... */ }
</form>
We stream every chunk of this response to the client in the Response
of our HTTP endpoint, and periodically update the database with everything we’ve streamed so far. This is adapted from this example using Cloudflare workers.
// convex/http.ts
// https://github.com/sshader/streaming-chat-gpt/blob/main/convex/http.ts#L20
http.route({
path: "/chat",
method: "POST",
handler: httpAction(async (ctx, request) => {
// Create a TransformStream to handle streaming data
let { readable, writable } = new TransformStream();
let writer = writable.getWriter();
const textEncoder = new TextEncoder();
const streamData = async () => {
let content = "";
const openai = new OpenAI();
const stream = await openai.chat.completions.create({
model: "gpt-3.5-turbo",
messages: [/* ... */],
stream: true,
});
for await (const part of stream) {
const text = part.choices[0]?.delta?.content || "";
content += text;
// write to this handler's response stream on every update
await writer.write(textEncoder.encode(text));
// write to the database periodically, like at the end of sentences
if (hasDelimeter(text)) {
await ctx.runMutation(internal.messages.update, {
messageId,
body: content,
isComplete: false,
});
}
}
// flush any last updates
await ctx.runMutation(internal.messages.update, {
messageId,
body: content,
isComplete: true,
});
await writer.close();
};
// kick off the request to OpenAI, but don't `await` it, so we can start sending
// the response. Convex will wait until `writer.close`.
void streamData();
// Send the readable back to the browser
return new Response(readable);
}),
});
Note: we additionally have to set up CORS to allow our browser to request our HTTP action. There’s an example of this in the repo, and Will it CORS? is a great resource for setting up CORS correctly.
To show the streamed response immediately on the client, we’ll essentially be building an optimistic update. We’ll store the ID and text of the message we’re receiving from ChatGPT via our HTTP endpoint, and show this text instead of the text returned by useQuery
. Once the message returned by useQuery
is complete, we’ll “drop” our optimistic update and start showing the text returned by useQuery
(which should be exactly the same, provided there were no errors).
Here’s what this looks like in code:
export default function App() {
// Hold state for a message we're streaming from ChatGPT via an HTTP endpoint,
// which we'll apply on top
const [streamedMessage, setStreamedMessage] = useState("");
const [streamedMessageId, setStreamedMessageId] = useState<Id<"messages"> | null>(null);
useEffect(() => {
const message = messages.find((m) => m._id === streamedMessageId);
if (message !== undefined && message.isComplete) {
// Clear what we streamed in favor of the complete message
setStreamedMessageId(null);
setStreamedMessage("");
}
}, [messages, setStreamedMessage, setStreamedMessageId]);
return <main>
{/* .... */}
{messages.map((message) => {
const messageText = streamedMessageId === message._id
? streamedMessage
: message.body;
return (
<article
key={message._id}
className={message.author === NAME ? "message-mine" : ""}>
<div>{message.author}</div>
<p>{messageText}</p>
</article>
);
})}
{/* ... */ }
</main>
}
// src/App.tsx
// https://github.com/sshader/streaming-chat-gpt/blob/main/src/App.tsx#L53
<form
onSubmit={async (e) => {
e.preventDefault();
const result = await sendMessage({
body: newMessageText,
author: NAME,
});
setNewMessageText("")
// Kick off ChatGPT response + stream the result
if (result !== null) {
setStreamedMessageId(result.messageId)
await handleGptResponse((text) => {
setStreamedMessageText((t) => t + text)
}, result);
}
}}
>
{/* ... */}
</form>
Summary
By leveraging HTTP actions with streaming, this chat app balances real-time responsiveness with efficient bandwidth usage. Users receive character-by-character updates to their own responses directly from ChatGPT, while other users see periodic updates, minimizing database bandwidth.
The full code for this app can be found below: