Bright ideas and techniques for building with Convex.
Profile image
Ian Macartney
7 months ago

Background Job Management

Using a table to manage background jobs

What do you do when the work you want to perform should happen after a request?

How can you see incremental progress on workflows that may take a while? Or cancel a job scheduled for the future?

In this post, we’ll look at a pattern for managing asynchronous work that allows you to:

  1. Start a long-running job and respond quickly to a client.
  2. Track incremental progress on multi-stage tasks.
  3. Subscribe to the status and result of a background task from multiple clients.
  4. Cancel a request transactionally (i.e., without race conditions).
  5. Monitor job timeouts.
  6. Implement custom retry logic in the case of transient failures.

As an example of this pattern, I’ll be referencing a multiplayer game using Dall-E. While building it, I found that OpenAI’s image endpoint could sometimes take over 30 seconds, timing out and giving a bad user experience. Rather than have the client wait on a single request, I schedule the work to be run asynchronously using Convex’s function scheduling. You can see the code here.

Tracking status in a table

The high-level approach uses a table to keep track of a long-running task’s state. For my example, I made a “submissions” table to track generating an image using OpenAI based on a prompt:

// in convex/schema.ts
submissions: defineTable({
    prompt: v.string(),
    authorId: v.id("users"),
    result: v.union(
      v.object({
        status: v.literal("generating"),
        details: v.string(),
      }),
      v.object({
        status: v.literal("failed"),
        reason: v.string(),
        elapsedMs: v.number(),
      }),
      v.object({
        status: v.literal("saved"),
        imageStorageId: v.string(),
        elapsedMs: v.number(),
      })
    ),
  }),

Depending on the status of the work, we capture different information.

1. Starting a job without waiting for it

To start the process, the client calls a mutation, which creates the submission document, schedules the work to start immediately, and returns the ID:

// in convex/submissions.tx "start" mutation
const submissionId = await db.insert("submissions", {
  prompt,
  authorId: session.userId,
  result: {
    status: "generating",
    details: "Starting...",
  },
});
// Kick off createImage in the background
// so we don't block this request.
scheduler.runAfter(0, internal.actions.createImage, { prompt, submissionId });
return submissionId;

Mutations are transactional, so we could have also checked if there was an ongoing submission for the user or a duplicate request. Importantly, we pass the submission ID to the client and the action that will update the submission.

2. Tracking incremental progress

Once the client receives the submission ID, it can update its UI reactively based on the submission status:

const Submission = ({ submissionId }) => {
  const result = useSessionQuery(api.submissions.get, { submissionId: props.submissionId });
  switch (result?.status) {
    case "generating":
      return (
        <figure>
          <article aria-busy="true"></article>
          {result.details}
        </figure>
      );
    case "failed":
      return <p>{result.reason}</p>;
    case "saved":
      return (
        <figure>
          <img src={result.url} />
          Generated in {result.elapsedMs / 1000} seconds.
        </figure>
      );
  }
  return null;
};

As a reminder, Convex queries are re-run automatically whenever the underlying data changes. So as the submission is altered, this React component will re-render with the latest results.

On the server, it can update the status from the action by running the submissions:update mutation:

// in actions/createImage.ts
runMutation(internal.submissions.update, { 
  submissionId, 
  result: {
    status: "generating",
    details: "Generating image...",
  }
});

Which can be as simple as:

// in convex/submissions.ts
export const update = internalMutation(async (ctx, {submissionId, result}) => {
  await ctx.db.patch(submissionId, { result });
});

When the request is done, it’s up to you whether you write to the job table with the results or commit them elsewhere. In my case, I let the user decide whether they like the image before submitting it to the game, so the action is only responsible for generating it.

3. Subscribing from multiple clients

One nice side-effect of storing the data in a table and reactively querying it is that you can subscribe to the result from multiple clients. Anyone with the submissionId can wait for results. On a higher level, you can also see real-time statistics about the health of the jobs. Because Dall-E can be so slow, I decided to surface its status in the UI, to manage user expectations.

Here I query the latest five submissions and calculate their average time and average success rate:

// in submissions.ts
export const health = query(async (ctx) => {
  const latestSubmissions = await ctx.db
    .query("submissions")
    .order("desc")
    .filter((q) => q.neq(q.field("result.status"), "generating"))
    .take(5);
  let totalTime = 0;
  let successes = 0;
  for (const submission of latestSubmissions) {
    totalTime += submission.result.elapsedMs;
    if (submission.result.status === "saved") successes += 1;
  }
  const n = latestSubmissions.length;
  return [totalTime / n, successes / n];
});

4. Cancelling a request safely

If you decide to cancel a request, you can check if it’s already started and update its status to “canceled” in the table otherwise. When the job runs, it can query the table and either return early or mark the task as “started.” Because mutations are transactional with serializable isolation by default, you are guaranteed that either the mutation to cancel the job will see that it’s “started” already or the job will see “canceled” - you’ll never think that you canceled a task but find out it ran anyways.

For example, say you want to make a last-minute change to which email a user will get. It is important to send only one email. You can cancel the current pending email, and if you succeeded in canceling it, send an updated one instead.

5. Monitoring timeouts

To mark a job as timed out, you can schedule a follow-up mutation when you’re scheduling the job.

// in submissions.ts "start" mutation:
scheduler.runAfter(30, internal.submissions.timeout, { submissionId });

The timeout could do something like:

export const timeout = internalMutation(async (ctx., { submissionId }) => {
  const submission = await ctx.db.get(submissionId);
  if (submission.result.status === "generating") {
    await ctx.db.patch(submissionId, {
      result: { status: "failed", reason: "Timed out", elapsedMs: 30000 },
    });
  }
});

Depending on your application, you might want your background job to not save results if it ends up finishing anyways, so it could check that the status isn’t already “failed" and commit in the same mutation - similar to canceling a request above.

6. Implementing retries

Convex functions give you different guarantees around failure and retries. To summarize, Convex automatically retries your queries and mutations, but cannot automatically retry actions since they may contain side effects that may not be safe.

Sometimes, however, it does make sense to retry an action. Such as fetching resources from an external service that has transient failures. To do this, we can have our actions catch errors and retry themselves:

// in actions/createImage
if (!response.ok && retryCount < 3) {
	// call this same function again
	await scheduler.runAfter(
    calculateBackoff(retryCount),
    internal.actions.createImage, {
		  prompt,
			submissionId,
			retryCount: retryCount + 1
		} );
}

For your use case, calculateBackoff will likely implement some backoff and jitter to ensure you don’t exacerbate issues for the service you’re hitting.

Summary

In this post, we looked at some patterns for using a table to track scheduled functions to achieve a number of common behaviors for background tasks. As always, let us know what you think in our Discord!

Build in minutes, scale forever.

Convex is the backend application platform with everything you need to build your project. Cloud functions, a database, file storage, scheduling, search, and realtime updates fit together seamlessly.

Get started