Stack logo
Bright ideas and techniques for building with Convex.
Profile image
Sarah Shader
3 months ago

AI Chat with HTTP Streaming

ai chat robot next to a river stream representing http streaming

This article describes how to build a chat app with ChatGPT by streaming text from OpenAI to the Convex database and ultimately to clients with the app loaded. This provides a super responsive experience for everyone using the app, but it can require a lot of database bandwidth since we’re rewriting the document with the message on every streamed update we get from OpenAI.

In this article, we’ll go through an extension to this approach — using HTTP actions with streaming. The end result will be that we can get the responsive, nearly character by character streaming for the user ChatGPT is responding to, while every other client sees updates in larger chunks (and we save on bandwidth).

The full code for this is available here but we’ll walk through the most interesting parts below.

GIF showing two users using the chat app

Diagram showing data flow for this app

Above is a diagram showing how data flows in this app. Users are able to send messages using a mutation (send) and read message using a query (list).

When a user sends a message that needs a response from ChatGPT, the send mutation returns a result that the client uses to call an HTTP endpoint /chat. This endpoint talks to OpenAI, streaming a response from ChatGPT.

Here’s what the client portion of this looks like:

// src/App.tsx
// https://github.com/sshader/streaming-chat-gpt/blob/main/src/App.tsx#L84
async function handleGptResponse(
  onUpdate: (update: string) => void,
  requestBody: { messageId: Id<"messages">; messages: Doc<"messages">[] }
) {
  const convexSiteUrl = import.meta.env.VITE_CONVEX_URL.replace(
    /\.cloud$/,
    ".site"
  );
  const response = await fetch(`${convexSiteUrl}/chat`, {
    method: "POST",
    body: JSON.stringify(requestBody),
    headers: { "Content-Type": "application/json" },
  });
  // Taken from https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
  const responseBody = response.body;
  const reader = response.body.getReader();
  while (true) {
    const { done, value } = await reader.read();
    if (done) {
      onUpdate(new TextDecoder().decode(value));
      return;
    }
    onUpdate(new TextDecoder().decode(value));
  }
}

and when a user sends a message:

// src/App.tsx
// https://github.com/sshader/streaming-chat-gpt/blob/main/src/App.tsx#L53
<form 
	onSubmit={async (e) => {
		e.preventDefault();
		const result = await sendMessage({
			body: newMessageText,
			author: NAME,
		});
		setNewMessageText("");
		// Kick off ChatGPT response + stream the result
		if (result !== null) {
			await handleGptResponse((text) => {
				// TODO: make the streamed message appear to the user
				console.log(text);
			}, result);
		}
	}}
	>
	{ /* ... */ }
</form>

We stream every chunk of this response to the client in the Response of our HTTP endpoint, and periodically update the database with everything we’ve streamed so far. This is adapted from this example using Cloudflare workers.

// convex/http.ts
// https://github.com/sshader/streaming-chat-gpt/blob/main/convex/http.ts#L20
http.route({
	path: "/chat",
	method: "POST",
	handler: httpAction(async (ctx, request) => {
		// Create a TransformStream to handle streaming data
		let { readable, writable } = new TransformStream();
		let writer = writable.getWriter();
		const textEncoder = new TextEncoder();
		
		const streamData = async () => {
			let content = "";
			const openai = new OpenAI();
			const stream = await openai.chat.completions.create({
				model: "gpt-3.5-turbo",
				messages: [/* ... */],
				stream: true,
			});
			
			for await (const part of stream) {
				const text = part.choices[0]?.delta?.content || "";
				content += text;
				
				// write to this handler's response stream on every update
				await writer.write(textEncoder.encode(text));
				// write to the database periodically, like at the end of sentences
				if (hasDelimeter(text)) {
					await ctx.runMutation(internal.messages.update, {
						messageId,
						body: content,
						isComplete: false,
					});
				}
			}
			
			// flush any last updates
			await ctx.runMutation(internal.messages.update, {
				messageId,
				body: content,
				isComplete: true,
			});
			await writer.close();
		};
		
		// kick off the request to OpenAI, but don't `await` it, so we can start sending
		// the response. Convex will wait until `writer.close`.
		void streamData();
		
		// Send the readable back to the browser
		return new Response(readable);
	}),
});

Note: we additionally have to set up CORS to allow our browser to request our HTTP action. There’s an example of this in the repo, and Will it CORS? is a great resource for setting up CORS correctly.

To show the streamed response immediately on the client, we’ll essentially be building an optimistic update. We’ll store the ID and text of the message we’re receiving from ChatGPT via our HTTP endpoint, and show this text instead of the text returned by useQuery. Once the message returned by useQuery is complete, we’ll “drop” our optimistic update and start showing the text returned by useQuery (which should be exactly the same, provided there were no errors).

Here’s what this looks like in code:

export default function App() {
	// Hold state for a message we're streaming from ChatGPT via an HTTP endpoint,
	// which we'll apply on top
	const [streamedMessage, setStreamedMessage] = useState("");
	const [streamedMessageId, setStreamedMessageId] = useState<Id<"messages"> | null>(null);
    
	useEffect(() => {
		const message = messages.find((m) => m._id === streamedMessageId);
		if (message !== undefined && message.isComplete) {
			// Clear what we streamed in favor of the complete message
			setStreamedMessageId(null);
			setStreamedMessage("");
		}
	}, [messages, setStreamedMessage, setStreamedMessageId]);
	
	return <main>
		{/* .... */}
		{messages.map((message) => {
		const messageText = streamedMessageId === message._id 
			? streamedMessage 
			: message.body;
		return (
			<article
			key={message._id}
			className={message.author === NAME ? "message-mine" : ""}>
				<div>{message.author}</div>
				<p>{messageText}</p>
			</article>
			);
		})}
		{/* ... */ }
  </main>
}
  
// src/App.tsx
// https://github.com/sshader/streaming-chat-gpt/blob/main/src/App.tsx#L53
<form
	onSubmit={async (e) => {
		e.preventDefault();
		const result = await sendMessage({
			body: newMessageText,
			author: NAME,
		});
		setNewMessageText("")
		// Kick off ChatGPT response + stream the result
		if (result !== null) {
			setStreamedMessageId(result.messageId)
			await handleGptResponse((text) => {
				setStreamedMessageText((t) => t + text)
			}, result);
		}
	}}
>
{/* ... */}
</form>

Summary

By leveraging HTTP actions with streaming, this chat app balances real-time responsiveness with efficient bandwidth usage. Users receive character-by-character updates to their own responses directly from ChatGPT, while other users see periodic updates, minimizing database bandwidth.

The full code for this app can be found below:

sshader/streaming-chat-gpt