Reliable HTTP: Outsmarting the Two Generals with Webhooks

Reliable HTTP: Outsmarting the Two Generals with Webhooks

Featured on Hashnode

The Two Generals Problem is a mathematical theorem proving that no messaging protocol can reliably ensure that two parties share the same state.

However, some approaches guarantee that two distributed systems will follow an acceptable state progression over time. That is, state changes progress linearly and deterministically. It either halts or continues but never enters an unrecoverable state, e.g., missing one message in a series of messages.

One of the most common sources of problems in applications is the misuse of HTTP in machine-to-machine communication. In this article, we will examine system design patterns that significantly improve the reliability of HTTP communication between systems.

The first step to solving these problems is understanding the distinction between a delivery guarantee provided by message producers and a processing guarantee provided by message consumers.

Guarantees come in three flavours:

  • Atmost-once: 0 or 1

  • Atleast-once: 1 or more

  • Exactly-once: 1

Delivery guarantees are either "atmost-once" or "atleast-once".
A processing guarantee could be any one of the three, depending on the delivery guarantee provided by the producer.

Next, we need to understand that every HTTP request/response cycle is TWO messages over one connection:

  1. The connection opens

  2. The requester writes their message

  3. The responder writes a reply

  4. The connection closes

Caveat: Steps 1 & 4
HTTP Keep-Alive means a connection may be reused for subsequent HTTP Request/Response cycles. However, this detail is irrelevant to the current discussion because the HTTP 1.1 spec still requires two messages, regardless of the behaviour of the transport layer.

This means the requester can receive an acknowledgement of their message via the response, but critically, the responder does not know if the requester received and processed their reply.

In both scenarios depicted above, Service B does not receive a confirmation that the message was processed. At best, it may receive confirmation that the packets were received but not confirmation that Service A successfully processed the message.

This lack of confirmation means sending important information via an HTTP Response is unreliable. Suppose that HTTP Response contains the result of the requester's operation to change the responding system's state. In that case, the requester cannot know for certain if their internal representation of the responder's system state is accurate.

That was a wordful and woefully abstract, so let's look at a concrete example.

Implementing Exactly-Once Processing

In this scenario, our system must

  • Subscribe a user to a subscription plan, provided they meet the requirements for a subscription, e.g. verified email, active account, etc.

  • Update a 3rd party billing platform that manages the actual charges, invoicing, etc.

Let's add that in this scenario, the business logic dictates that a user can have zero, one, or many active subscriptions. As you can imagine, this means it is essential that every subscription in the third-party billing platform be recorded in our database.

The code below shows a naive implementation using a single HTTP Request+Response cycle.

const createSubscription = async (userId: number, plan: Plan) => {
  const user = await userDb.get(userId);
  const { outcome } = validateCreateSubscriptionOperation(user, plan);
  if (outcome === 'SUCCESS') {
    const result = await billingApi.post(
      `/plan/${plan.id}/subscribe`,
      { userId },
    );
    await dbConnectionPool.query(`
        INSERT INTO subscriptions    
            (status, external_id, user_id, plan_id)
            VALUES ($1, $2, $3, $4)
        `,
        ['ACTIVE', result.subscription.id, userId, plan.id]
    );
  }
}

What happens if our server is terminated after the HTTP request to billingApi, but before updating the database? Our system won't know the subscription was created, and it won't have the subscription id created by the billing system. Ultimately, neither billingApi nor our system will be able to automatically detect and correct the issue.

For the initial subscription request sent by our system, we are the producers of that message, and we implicitly provide atleast once delivery. If we don't get a response from billingApi, or we fail to process the response, then we (or the user) could try again until we do.

For the response, the billingApi is the producer and it only provides an atmost-once delivery guarantee.

This can cause a serious problem for our users. They might try to subscribe to a plan, receive an error, try again, and then be billed twice each month. When they look at their active subscriptions in our system, they would only see one subscription. If they cancel that subscription, they would continue being billed once a month. Even if our customer support team tried to help this user, they would not be able to see the extra subscription. Only someone with access to the third-party billing system, such as a member of finance, or the billing provider's customer support team, would be able to find the problem. However, even they might not know what they are looking for: they could have to check every subscription in both systems, looking for a subscription that only exists in the billing service.

Most billing providers/payment platforms don't work this way
This started out as a completely hypothetical situation. I have never worked with a payment service with such a serious design flaw. However, while writing this article I decided to check Braintree and saw that they don't offer webhooks for the result of any POST request. Flabbergasted, I searched further until I found Stack Overflow answers from Braintree employees as well as potential customers (who decided to use Stripe instead) that confirmed what I saw in their documentation.

So how do we solve this problem? Luckily, the billingApi service also provides webhooks with an atleast-once delivery guarantee. Following every HTTP request we make to billingApi, they will send a webhook, a HTTP request, to our service with the result of our previous request. Each webhook they send to our service contains a unique messageId. If our service does not acknowlege the webhook by responding 200 OK within 5 seconds, they will resend the webhook with the same messageId again until we do.

First, we need the createSubscription operation to reduce its scope to only dispatching the request. We will rename it dispatchSubscriptionRequest:

const dispatchSubscriptionRequest = async (userId: number, plan: Plan) => {
  const user = await userDb.get(userId);
  const result = validateDispatchSubscriptionRequestOperation(user, plan);
  if (result.outcome === 'SUCCESS') {
    await billingApi.post(
      `/plan/${plan.id}/subscribe`,
      { userId },
    );
  }
  return result;
}

Instead of receiving the subscription.id from the billing provider via the HTTP Response, the billing provider will send it via a webhook.

Here is a condensed example of how we could handle that:

api.post('webhooks/billing', async (req, res) => {
  const { 
    messageId, 
    subscription: { id, userId, planId } 
  } = req.body;
  try {
    const result = await dbConnectionPool.query(`
      INSERT INTO billing_inbox
      (message_id, subscription_id, user_id, plan_id),
      VALUES ($1, $2, $3, $4)`, 
      [messageId, id, userId, planId]
    );
    res.sendStatus(200);
    return;
  } catch (e) {
    const isExistingMessage = (
      e instanceof DatabaseError 
      && e.code === DbErrorCodes.UniqueViolation
    );
    if (isExistingMessage) {
      res.sendStatus(200);
      return;
    }
    res.sendStatus(400);
    return;
  }
});

Of course, this isn't production grade code, for the sake of brevity it crosses many levels of abstraction in one function. The critical facts here are that

  • there is one database transaction per HTTP Request received by the webhook endpoint โ€” a single statement is a single transaction.

  • we use a unique constraint on our idempotency key, the message_id, to ensure we only save a message once, even if it is sent multiple times.

  • we return 200 OK if we receive a message that we had previously saved to our inbox, allowing the webhook producer to resend a message until they successfully record our acknowledgment.

Now we need to process the messages in our inbox, guaranteeing we process each message exactly once. For example:

function initialiseBillingInboxProcessor() {
  (async () => {
    while (lifecycle.isOpen()) {
       const client = await dbConnectionPool.connect();
       try {
           await client.query('BEGIN');
           const { rows: [ message ] } = await client.query(`
             SELECT * FROM billing_inbox
             WHERE processed IS FALSE
             LIMIT 1
             FOR UPDATE SKIP LOCKED`
           );
           if (!message) {
             await wait(200);
             continue; 
           }
           await client.query(`
              INSERT INTO subscriptions    
                (status, external_id, user_id, plan_id)
                VALUES ($1, $2, $3, $4)
             `, [
             'ACTIVE', 
              message.subscription_id, 
              message.user_id, 
              message.plan_id
           ]);

           await client.query(`
             UPDATE billing_inbox
             SET processed = true
             WHERE message_id = $1`,
             [message.message_id],
           );
           await client.query('COMMIT');
       } catch (error) {
           logger.error(
             'Unexpected error processing billing message',
             { error },
           );
           await client.query('ROLLBACK');
       } finally {
           client.release();
       }
    }
  })();
}

What's happening in this code snippet? First, we define a function that is synchronous, but contains an immediately invoked async function expression.

function initialiseBillingInboxProcessor() {
  (async () => {
    while (lifecycle.isOpen()) {

    }
  })();
}

This prevents other engineers from accidentally awaiting our infinite loop since it won't resolve until the server begins a graceful shutdown.

Next we begin our transaction and take an update lock on the row we select, while excluding rows that have been locked by another transaction.

const client = await dbConnectionPool.connect();
try {
  await client.query('BEGIN');
  const { rows: [ message ] } = await client.query(`
    SELECT * FROM billing_inbox
    WHERE processed IS FALSE
    LIMIT 1
    FOR UPDATE SKIP LOCKED`
  );

This allows us to run the inbox processor across multiple servers simultaneously while preventing any two servers from ever processing the same message at the same time.

If the query returned no results, we wait 200ms before returning to the top of the while loop and polling for new messages again.

if (!message) {
  await wait(200);
  continue; 
}

Next we process the message by creating a new subscription record, marking the message as processed, and committing our transaction.

await client.query(`
    INSERT INTO subscriptions    
    (status, external_id, user_id, plan_id)
    VALUES ($1, $2, $3, $4)
  `, [
  'ACTIVE', 
  message.subscription_id, 
  message.user_id, 
  message.plan_id
]);

await client.query(`
  UPDATE billing_inbox
  SET processed = true
  WHERE message_id = $1`,
  [message.message_id],
);

await client.query('COMMIT');

So in summary, this transaction has three steps:

  1. Retrieval: Get and lock an unprocessed message row, ensuring other servers don't process it.

  2. Processing: Insert the new subscription record.

  3. Commit: Mark the message as processed and commit the transaction.

Voila! Exactly-once processing for each message. If an error occurs during processing, e.g. the server is terminated unexpectedly or the database connection drops mid transaction, then the transaction will be rolled back and processing the message will be automatically retried. Of course, step two can include many more steps, as long as every database query is part of the transaction.

In reality, a production-grade implementation is much cleaner
The example I gave explodes several layers of abstraction into one large function. At SKUTOPIA, we built a simple framework for this type of processing. Using it can be as simple as passing two functions to our process creator: One function to select a message, and another to process it. Our database layer automatically uses the transaction-bound connection when called within a withTransaction higher order function, so the transaction is managed by our process creator. Error handling, observability, metrics, retries, and alerting are managed by our inbox framework. We can even perform serial (and linear) processing or concurrent processing, depending on the use case.

Now we are much more reliable message consumers because we have implemented an exactly once processing guarantee using the billing provider's webhooks, which provide an atleast once delivery guarantee. There is still a flaw in our system design, which we will look at next.

Implementing Atleast Once Delivery

We have learnt that when a Message Producer implements Atleast Once Delivery with idempotency keys we can then implement Exactly Once Processing as a Message Consumer. But what about the messages we produce?

To implement Atleast Once Delivery we will need to

  • Create an outbox supporting multiple destinations and providing a unique idempotency key per message

  • Add messages to the outbox instead of making API requests directly

  • Process messages in the outbox by sending them to third parties until we receive a response

In our example service we currently send messages to the third party billing provider when our users call the new subscription endpoint. Currently, a user could make a request, receive an error, then resubmit their request. However, the first request error could be a false negative, meaning the billing provider received the request but something went wrong while sending the acknowledgement, either between their server and our server or between our server and our user.

Now that we get the billing provider's response via a webhook and implemented exactly once processing, the user will be able to see and cancel the duplicate subscription. However, we can make this process much more reliable using an outbox that will

  • prevent the duplicate subscription from ever occurring

  • ensure that every successful call to the new subscription endpoint results in a new subscription in the billing provider's system

  • significantly increase throughput and reduce latency of the new subscription endpoint

Creating The Outbox

Let's start by defining the outbox table schema

CREATE TYPE "outbox_message_status" AS ENUM (
  'pending',
  'failed',
  'sent'
);

CREATE TABLE "outbox" (
  "id" UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- WARNING: https://www.2ndquadrant.com/en/blog/sequential-uuid-generators/ 
  "status" "outbox_message_status" NOT NULL DEFAULT 'pending',
  "attempts" SMALLINT NOT NULL DEFAULT 0,
  "retryLimit" SMALLINT NOT NULL DEFAULT 0,
  "retryWaitPeriodMs" INT NOT NULL DEFAULT 1000,
  "createdAt" TIMESTAMP NOT NULL DEFAULT CLOCK_TIMESTAMP(),
  "updatedAt" TIMESTAMP,
  "destination" VARCHAR(255) NOT NULL,
  "payload" JSONB
);

CREATE INDEX outbox_created_at_pending_idx
  ON "outbox" ("status", "createdAt") WHERE "status" = 'pending';

For the sake of simplicity I have used a random UUID as both the primary key and the idempotency key, however random UUID primary keys impact performance. The alternatives are to either use a partially sequential UUID generator or use a BIGSERIAL primary key.

This schema is, I hope, fairly intuitive. The novel decisions are:

  • All columns preceding payload are fixed length. I mostly ordered these columns for readability and intentionality, but I cannot help playing a little Column Tetris to optimise for storage. Please forgive me my vices ๐Ÿ˜…

  • using CLOCK_TIMESTAMP instead of NOW for createdAt so that two rows inserted in one transaction do not share a timestamp.

  • using a partial index since we will only query for pending rows.

  • using "camelCase" for columns to make the database repository easier to write.

  • not indexing destination because in most systems it won't be very selective (not many distinct values) AND there won't be many pending rows. Even a backlog of 1,000 pending messages isn't worth indexing.

Here is how we might implement a database repository for this table:

type OutboxRepo = {
  add: (msg: OutboxInsert) => Promise<Outbox['id']>;
  getPendingMessage: (destination?: OutboxDestination) => Promise<Outbox | undefined>;
  incrementAttempts: (id: Outbox['id']) => Promise<number>;
  setStatus: (id: Outbox['id'], status: Outbox['status']) => Promise<void>;
};

type Outbox = {
  id: string;
  status: 'pending' | 'failed' | 'sent';
  attempts: number;
  retryLimit: number;
  retryWaitPeriodMs: number;
  createdAt: Date;
  updatedAt: Date | null;
  destination: string;
  payload: Record<string, unknown>;
};

type OutboxInsert = Pick<Outbox, 'destination' | 'payload'> & {
  id?: string; // for use cases where the client generates the msg id
  retryLimit?: number;
  retryWaitPeriodMs?: number;
}

export type OutboxDestination = 'billingProvider' | 'exampleProvider';

export const outboxRepo: OutboxRepo = {
  add: async (msg) => {
    const { keys, values } = Object.entries(msg)
      .reduce<EntriesOf<OutboxInsert>>((obj, [key, value]) => {
        if (value !== undefined) {
          obj.keys.push(`"${key}"`);
          obj.values.push(value);
        }
        return obj;
      }, {keys: [], values: []});
    const client = getTransactionAwareClient();
    const queryText = `
      INSERT INTO "outbox" 
      (${keys.join(', ')})
      VALUES
      (${keys.map((_, i) => `$${i + 1}`)})
      RETURNING id
    `;
    const { rows: [row] } = await client.query<Pick<Outbox, 'id'>>(queryText, values);
    return row.id;
  },
  getPendingMessage: async (destination) => {
    const client = getTransactionAwareClient();
    const { rows } = await client.query<Outbox>(`
      SELECT * FROM "outbox"  
      WHERE status = 'pending'
        ${destination ? 'AND destination = $1' : ''}
      ORDER BY "createdAt"
      LIMIT 1
      FOR UPDATE SKIP LOCKED;
    `, destination ? [destination] : []);
    return rows.shift();
  },
  incrementAttempts: async (id) => {
    const { rows: [row] } = await pool.query<Pick<Outbox, 'attempts'>>(`
      UPDATE "outbox"  
      SET attempts = attempts + 1
      WHERE id = $1
      RETURNING attempts
    `, [id]);
    return row.attempts;
  },
  setStatus: async (id, status) => {
    const client = getTransactionAwareClient();
    await client.query(`
    UPDATE "outbox"  
    SET status = $1
    WHERE id = $2
    `, [status, id]);
  },
}

The code above includes a few common utilities I always use:

export type ValueOf<T> = T[keyof T];
export type EntriesOf<T> = { keys: string[], values: ValueOf<T>[] };

Plus, getTransactionAwareClient represents a way for database repos to automatically use a transaction-bound database connection if the caller is currently mid-transaction. Typically implemented using AsyncLocalStorage.

It's worth noting that incrementAttempts does not use getTransactionAwareClient. Whenever we call incrementAttempts, we want that change committed separately and immediately, regardless of the outcome of the caller's subsequent operations. Otherwise, during a failure, our attempt counter could be rolled back with the rest of the operation, defeating the purpose of the column.

I made destination an optional parameter when calling getPendingMessage to support use cases where we must run outbox processors dedicated to particular destinations. Why? Assuming the number of concurrent outbound requests we can make is limited, there are a few possible reasons:

  • A destination has higher latency: We prevent our faster destinations being delayed by this slower one by running an outbox processor dedicated to the high latency destination.

  • A destination is more important to the business: We prevent messages to one destination being queued behind others by running an outbox processor dedicated to the high priority destination.

Adding Messages To The Outbox

While implementing Exactly Once Processing, we created a dispatchSubscriptionRequest function that validated the user's subscription request before sending it to the third-party billing provider. Now that we have an outbox, we will update dispatchSubscriptionRequest to add the request to the outbox:

const enqueueSubscriptionRequest = async (userId: number, plan: Plan) => {
  const user = await userDb.get(userId);
  const result = validateEnqueueSubscriptionRequest(user, plan);
  if (result.outcome === 'SUCCESS') {
    await outboxRepo.add({
      destination: 'billingProvider',
      payload: {
        path: `/plan/${plan.id}/subscribe`,
        body: { userId },
      },
    });
  }
  return result;
}

What have we changed?

  • We replaced the verb in the function name from dispatch to enqueue so callers know this function will not immediately make a request to the provider.

  • We renamed validateEnqueueSubscriptionRequest to match, but it is otherwise unchanged.

  • We swapped the HTTP request with a call to outboxRepo.add

There were no other changes. Importantly, we still return the result from validateEnqueueSubscriptionRequest so that users are informed if their request does not satisfy our business rules.

Processing The Outbox

We've created our outbox and added messages to it, now let's start processing them. There are a few things to keep in mind while reading the next code snippet:

  • This is not production code: It crosses multiple layers of abstraction. In professional code we would breakup this large function to improve readability, but an article does not allow you to CMD+Click a function to jump to its definition and back.

  • For concepts I explained earlier in the article, I have encapsulated them in functions. Instead of writing raw SQL we call the outboxRepo, instead of explicitly beginning, committing, and rolling back a transaction I use a withTransaction higher order function to commit on promise resolved and rollback if promise rejected.

  • At 90 lines long it is our largest snippet so far, but don't worry, we will break it down piece by piece.

type Resolver = (v?: unknown) => void;

function initialiseOutboxProcessor() {
  const destinationApis: Record<OutboxDestination, Axios> = {
    billingProvider: axios.create({
      baseUrl: 'https://api.fictional-payment-platform.com/',
      transformRequest: [(data, headers) => {
        headers['Authorization'] = `Bearer ${getBillingAuthToken()}`;
        return data;
      }],
      httpsAgent: new https.Agent({
        keepAlive: true,
        noDelay: true,
        timeout: 30_000, // milliseconds
      }),
    }),
    exampleProvider: axios.create({
      baseUrl: 'https://fictional-service.com/api/',
      httpsAgent: new https.Agent({
        keepAlive: true,
        noDelay: true,
        timeout: 10_000, // milliseconds
      }),
    })
  } as const;

  (async () => {
    const concurrencyLimit = 20;
    const pollingIntervalMs = 200;
    let concurrentRequests = 0;

    while (lifecycle.isOpen()) {
      // Each execution of the while loop's code block is called a tick
      let resolveTickContinuationPromise: Resolver = () => {};
      const tickContinuationPromise = new Promise((resolver) => {
        resolveTickContinuationPromise = resolver;
      });

      // DO NOT AWAIT withTransaction!
      withTransaction(async () => {
        const isAtConcurrencyLimit = concurrentRequests >= concurrencyLimit;
        if (isAtConcurrencyLimit) {
          await wait(pollingIntervalMs);
          return;
        }

        const message = await outboxRepo.getPendingMessage();
        if (!message) {
          await wait(pollingIntervalMs);
          return;
        }

        const attempts = await outboxRepo.incrementAttempts(message.id);
        const isAtRetryLimit = attempts > message.retryLimit;
        if (isAtRetryLimit) {
          await outboxRepo.setStatus(message.id, 'failed');
          return;
        }

        concurrentRequests++;
        resolveTickContinuationPromise();
        // resolving here so the while loop can process another 
        // message concurrently
        const externalApiClient = destinationApis[message.destination];
        const { status } = await externalApiClient.post(
          message.payload.path,
          message.payload.body,
          {
            headers: {
              'Idempotency-Key': message.id
            }
          }
        );
        const wasAcknowledged = status >= 200 && status < 300;
        if (wasAcknowledged) {
          await outboxRepo.setStatus(message.id, 'sent');
        }
        /* DO NOT ADD A CATCH BLOCK!
           If an error is thrown, withTransaction MUST catch, log,
           and rollback.
         */
      }).finally(() => {
        concurrentRequests--;
        resolveTickContinuationPromise();
      );

      await tickContinuationPromise;
      /* 
        awaiting tickContinuationPromise instead of withTransaction 
        allows the outbox processor to send HTTP requests concurrently
        because tickContinuationPromise resolves before starting the 
        HTTP request while withTransaction resolves once the request 
        and transaction have finished
      */
    }
  })();
}

Let's break this down!

function initialiseOutboxProcessor() {
  const destinationApis: Record<OutboxDestination, Axios> = {
    billingProvider: axios.create({
      baseUrl: 'https://api.fictional-payment-platform.com/',
      transformRequest: [(data, headers) => {
        headers['Authorization'] = `Bearer ${getBillingAuthToken()}`;
        return data;
      }],
      httpsAgent: new https.Agent({
        keepAlive: true,
        noDelay: true,
        timeout: 30_000, // milliseconds
      }),
    }),
    exampleProvider: axios.create({
      baseUrl: 'https://fictional-service.com/api/',
      httpsAgent: new https.Agent({
        keepAlive: true,
        noDelay: true,
        timeout: 10_000, // milliseconds
      }),
    })
  } as const;

First we declare our destination API map. By telling typescript that the value assigned to destinationApis must satisfy : Record<OutboxDestination, Axios>, we guarantee that TypeScript will not compile if someone adds a new OutboxDestination but forgets to add an axios instance here.

In our Axios instance configurations we

  • Set the baseUrl of the destination

  • Add an Authorization header with a bearer token for the billing provider

  • Set some sensible defaults for the HTTPS agent:

    • keepAlive: true keeps the socket open between the requests, like we talked about earlier. This improves performance if we make multiple requests to the destination within the timeout

    • noDelay: true turns off Nagle's Algorithm, improving performance.

    • timeout: number determines how long the socket will stay open after the last data packet. Tweaking the timeout for different destinations can be beneficial depending on their behaviour, for example, setting a timeout shorter than the target's keep-alive timeout can help prevent ECONNRESET errors.

Then we finish the object instantiation with as const to tell TypeScript that the properties in this object are readonly.

  (async () => {
    const concurrencyLimit = 20;
    const pollingIntervalMs = 200;
    let concurrentRequests = 0;

We begin another immediately invoked function expression like we did with our inbox, only this time we instantiate a few constants and a counter of running requests. Each concurrent message being sent consumes a database connection, but limiting the number of concurrent requests prevents exhaustion of the connection pool.

while (lifecycle.isOpen()) {
  // Each execution of the while loop's code block is called a tick
  let resolveTickContinuationPromise: Resolver = () => {};
  const tickContinuationPromise = new Promise((resolver) => {
    resolveTickContinuationPromise = resolver;
  });

We want our while loop to return to the start of its code block once we are about to start our HTTP request, but allow the processing of the result to continue in the background. This will allow us to send requests for multiple outbox messages concurrently.

However, our while loop is inside an async function, and the retrieval and processing of messages will occur within an anonymous function passed to a withTransaction higher order function, we need an alternative to the continue statement that allows us to continue the while loop from within the anonymous function but without preventing it from executing the remaining work. We solve this problem by inverting control: The while loop will wait until the anonymous function tells it that it is ready.

In the snippet above, we create the promise the while loop will wait for, tickContinuationPromise, and we assign the promise resolver to a variable that will be in the anonymous function's closure.

// DO NOT AWAIT withTransaction!
withTransaction(async () => {
  const isAtConcurrencyLimit = concurrentRequests >= concurrencyLimit;
  if (isAtConcurrencyLimit) {
    await wait(pollingIntervalMs);
    return;
  }

  const message = await outboxRepo.getPendingMessage();
  if (!message) {
    await wait(pollingIntervalMs);
    return;
  }

  const attempts = await outboxRepo.incrementAttempts(message.id);
  const isAtRetryLimit = attempts > message.retryLimit;
  if (isAtRetryLimit) {
     await outboxRepo.setStatus(message.id, 'failed');
     return;
  }

  concurrentRequests++;
  resolveTickContinuationPromise();
  // resolving here so the while loop can process another
  // message concurrently

In the snippet above we perform a few checks before sending the request. Just before we send the request we increment the concurrent requests counter and resolve the continuation promise. This allows the while loop to return to the top of its code block while this function continues running in the background.

  const externalApiClient = destinationApis[message.destination];
  const { status } = await externalApiClient.post(
    message.payload.path,
    message.payload.body,
    {
      headers: {
        'Idempotency-Key': message.id
      }
    }
  );

  const wasAcknowledged = status >= 200 && status < 300;
  if (wasAcknowledged) {
    await outboxRepo.setStatus(message.id, 'sent');
  }
}).finally(() => {
  concurrentRequests--;
  resolveTickContinuationPromise();
});

Here we grab one of the clients we prepared earlier. Crucially, we include the message ID as the Idempotency Key Header when we make a request. That way the recipient, in this case the billing system, can detect if we have sent this message previously.

Once we get a response, we check the status code, and if they returned 200 we mark the message as sent to prevent sending it again. Then inside the finally callback of the withTransaction promise we decrement the concurrent request counter and resolve the continuation promise in case an error or early return had occurred.

      await tickContinuationPromise;
    }
  })();
}

And now we wrap everything up! We await the continuation promise as the final statement inside the while loop. Then we close the loop, close the immediately invoked function expression, and close the initialiseOutboxProcessor function definition.

Now we have a process that will send messages from our outbox. Of course, this is just one approach of many and it has some trade-offs to be aware of. The biggest one, is that it maintains a open transaction with the database while making the request to the third-party. This might be acceptable if the third-party is a good citizen who sees their messages to an inbox before processing them.

However, if they attempt to process the message before responding with 200 OK, then our transaction will be held open while their system is processing our message. This makes our system extremely vulnerable to their influence. Database connections are not an infinite resource. In this case we would want to change the status of our message to sending and put a system in place to detect messages that have been stuck in the sending status for too long so that they can be resent again. That approach has a few more failure cases, thereby trading simplicity for performance. I decided to keep it simple for this article.

The outbox in this article is unordered, but sometimes we need to send messages in a particular order. In that scenario we can use a cursor to track our progress through an ordered set of messages.

Summary

Using HTTP, producers can implement an atleast-once delivery guarantee by sending a message repeatedly until the producer receives an acknowledgement from the consumer via a 200 OK response. (Along with a timeout while waiting for an ack)

These relatively simple messaging guarantees can be built upon to power our more robust network protocols. Yet, many services offering a REST API expect users to mutate data with a HTTP POST request and then receive updated state via the HTTP response. As we just learned, in this context, a HTTP Response is only useful for receiving acknowledgement of our request. It is an unreliable method for receiving the result of our request.

In HTTP, only the requester knows if the message they sent was delivered. If you provide a REST API where other systems can issue changes to your system, always provide webhooks with an atleast-once delivery guarantee and idempotency key.

ย