RabbitMQ Retries — The (new) Full Story

Implementing a semantic and powerful AMQP retry mechanism using DLX and Delayed Exchange.

RabbitMQ Retries — The (new) Full Story
Credits Alex Shute / Unsplash

When implementing event-driven architectures with the help of AMQP, you inevitably come to racking your brain over the message-retrying topic.

After reading a ton of documentation and making several tests over the years with one strategy or another (particularly from this post : RabbitMQ Retries — The Full Story); I often got frustrated with the solutions I came up with — feeling "hacky".

I finally came up with an elegant, simple and semantic pattern I'll try to describe here, from simplest concepts to advanced ones.

TL; DR

Here's the full solution diagram.

AMQP retry architecture

Our use-case

Something I dislike about other guides are the use-cases. In my opinion, a great example use-case is certainly minimalist; but realistic.

So let's take a brand new one : a notification service :

  • We're reading {some events} from {some source},
  • and dispatching them to several services. Let's say
    • email
    • and webhooks
Simple yet realistic queuing use-case

The base system features :

  • 1 exchange : notification-exchange
    — can be of any type for this example
  • 2 work queues : email-queue and webhook-queue
    — bound to the exchange
  • 2 consumers : here workers
    — one consuming each queue

Our 2 workers will try and use external services to send emails and webhooks.

Option A — The naive nack(requeue)

Starting with the simplest possible form of "retry" : a plain nack-and-requeue.

Let's just nack and requeue ?

Key concepts

  • nacking : is the way of telling AMQP "I failed handling the message"
  • requeue : is the last argument of nack
    — it will requeue the message back to the head of the queue

Some code

Here's a conceptual implementation example (Typescript, amqplib).

const amqpChannel = // connect and get it from connection

// Create exchange and queues
const notifExchange = await amqpChannel.assertExchange(
  'notification-exchange',
  'fanout'
)
const [emailQueue, webhookQueue] = await Promise.all([
  amqpChannel.assertQueue('email-queue'),
  amqpChannel.assertQueue('webhook-queue')
])

// Bind queues
await Promise.all([
  amqpChannel.bindQueue('email-queue', 'notification-exchange', ''),
  amqpChannel.bindQueue('webhook-queue', 'notification-exchange', '')
])
migrate.ts
const amqpChannel = // connect and get it from connection

// Consume email
await amqpChannel.consume('email-queue', (msg) => {
  try {
    await emailService.send(composeEmail(msg))
    amqpChannel.ack(msg)
  } catch (err) {
    console.error(err)
    amqpChannel.nack(msg, false, true)
    //                           ^ Requeue !
  }
})

// Consume webhook
await amqpChannel.consume('webhook-queue', (msg) => {
  try {
    await webhookService.send(composeEmail(msg))
    amqpChannel.ack(msg)
  } catch (err) {
    console.error(err)
    amqpChannel.nack(msg, false, true)
    //                           ^ Requeue !
  }
})
consume.ts

Pros / cons

Pros

  • Simple and semantic, code is clean so is the architecture

Cons

  • Requeuing is made at the head of the queue, repeated failures would lead to a complete queue locking
  • Requeuing is made instantly, which would cause an extreme retry rate, so a extreme amount of subsequent service calls — like emailService.send()
  • Requeuing is made infinitely, any infinite failure will cause infinite retries, making the 2 points above even more dangerous.

Simplicity come at a cost. Read more about poison-messages.

Option B — The savior (aka DLX)

Let's go on by tackling the issues above :

Requeuing infinitely, at the head of a queue can lead to queue locking

So

  • How does one requeue at the tail of a queue (instead of the head) ?
  • How does one limit the number of requeues ?

We're going to use a component actually designed for this : the well known DLX, or in its full name the Dead Letter Exchange.

The savior : Dead Letter Exchange

Key concepts

  • A Dead Letter Exchange is actually a standard exchange, but setup to be the queue deadLetterExchange
await amqpChannel.assertQueue('email-queue', {
  deadLetterExchange: 'error-exchange'
})
  • Any discarded message on the queue will be re-routed to the DLX
    — for any discard reason : rejection with nack, expiration, ...
  • A requeue queue is bound to the Dead Letter Exchange
    — in which we handle the requeuing (⚠️ see explaination below)
  • The Dead Letter Exchange adds some useful death headers on the message to help on making decisions about requeuing.

Some code

First, we add the 'error-exchange' as deadLetterExchange of work queues, and we create a 'requeue-queue' bound to it.

const amqpChannel = // connect and get it from connection

// Create exchange and queues
const [notifExchange, errorExchange] = await Promise.all([
  amqpChannel.assertExchange('notification-exchange', 'fanout'),
  // Our brand new Exchange used as a DLX
  amqpChannel.assertExchange('error-exchange', 'fanout')
])
const [emailQueue, webhookQueue] = await Promise.all([
  amqpChannel.assertQueue('email-queue', {
    deadLetterExchange: 'error-exchange' // DLX Setup
  }),
  amqpChannel.assertQueue('webhook-queue', {
    deadLetterExchange: 'error-exchange' // DLX Setup
  }),
  // Our requeue handling queue
  amqpChannel.assertQueue('requeue-queue')
])

// Bind queues
await Promise.all([
  amqpChannel.bindQueue('email-queue', 'notification-exchange', ''),
  amqpChannel.bindQueue('webhook-queue', 'notification-exchange', ''),
  // Binding DLX to requeue queue
  amqpChannel.bindQueue('requeue-queue', 'error-exchange', '')
])
migrate.ts

Then, we consume :

  • uppon error on work queues, we nack the message without requeing
    — the rejected messages will go through the DLX
  • we consume the 'requeue-queue' and handle requeuing logic
    — using headers added by the DLX
const amqpChannel = // connect and get it from connection

// Consume email
await amqpChannel.consume('email-queue', (msg) => {
  try {
    await emailService.send(composeEmail(msg))
    amqpChannel.ack(msg)
  } catch (err) {
    console.error(err)
    amqpChannel.nack(msg, false, false)
    //                           ^ Don't requeue !
  }
})

// Consume webhook
await amqpChannel.consume('webhook-queue', (msg) => {
  try {
    await webhookService.send(composeEmail(msg))
    amqpChannel.ack(msg)
  } catch (err) {
    console.error(err)
    amqpChannel.nack(msg, false, false)
    //                           ^ Don't requeue !
  }
})

// Consume and handle requeue
await amqpChannel.consume('requeue-queue', (msg) => {
  const {
    // The reason msg was discarded
    'x-first-death-reason': reason,
    // The queue where it happened
    'x-first-death-queue': originalQueue
  } = msg.properties.headers

  // Here we can control which discard reasons
  // for which the message should be requeued
  // I'm only keeping 'rejected' (`nack`s) ones here
  if (originalQueue == null || reason !== 'rejected') {
    amqpChannel.ack(msg)
    return
  }
  
  // Get number of rejects from header
  const xDeaths = msg.properties.headers['x-death']
  const rejectsCount = (
    xDeaths.find(d => d.reason === 'rejected')?.count ?? -1
  )

  const MAX_REQUEUES = 10
  // Msg reached max requeue amount
  if (rejectsCount === -1 || rejectsCount >= MAX_REQUEUES) {
    amqpChannel.ack(msg)
    return
  }

  // Re-send the message to the queue
  // where it errored
  await amqpChannel.sendToQueue(
    originalQueue,
    msg.content,
    msg.properties
  )

  // Don't forget to ack it from error queue
  amqpChannel.ack(msg)
})
consume.ts

Pros / cons

Pros

  • Uppon repeated failures, the queue is not locked anymore. Requeues are made to the tail of the original queue, which releases it.
  • Still semantic and relatively simple

Cons

  • The requeues are still made almost instantly (modulo existing messages on the queue). So what's the problem with that ?

Well, let's take a step back and ask ourselves

When your partner doesn't want to get laid at the moment, do you retry him/her instantly 10 times in a row ?

I hope not, and the same goes for our use-case : an email service is unlikely to accept a new query 12ms after a failed one : it could be on an outage, or you could just have reached an API rate limit... In most cases, retrying instantly many times is actually both dangerous and ineffective.

Option C — The panacea (aka Delayed Exchange)

So our solution is working so far, but we encountered this new issue :

Requeuing almost instantly may be both dangerous and innefective.

So how does one wait before requeueing ?

We're using a new component called a Delayed Exchange as a "waiter" between retries. Check this out :

The panacea : DLW + delayed exchange

Pretty cool, huh ?

Key concepts

  • We're still using a DLX just like before
  • But this time we bound it to an intermediate queue 'error-queue'
    — this one is here to handle delays logic
  • We introduce a new 'wait-exchange' of type 'x-delayed-message'
    — it will store messages between retries
  • We move the 'requeue-queue' at the end of the chain
    - still responsible or requeuing logic

Some code

First, we create the new 'error-queue' bound to the DLX. Then we add the Delayed 'wait-exchange' and bind the 'requeue-queue' to it.

const amqpChannel = // connect and get it from connection

// Create exchange and queues
const [notifExchange, errorExchange] = await Promise.all([
  amqpChannel.assertExchange('notification-exchange', 'fanout'),
  amqpChannel.assertExchange('error-exchange', 'fanout'),
  // Our new delayed exchange
  amqpChannel.assertExchange('wait-exchange', 'x-delayed-message')
])
const [emailQueue, webhookQueue] = await Promise.all([
  amqpChannel.assertQueue('email-queue', {
    deadLetterExchange: 'error-exchange'
  }),
  amqpChannel.assertQueue('webhook-queue', {
    deadLetterExchange: 'error-exchange'
  }),
  // Our new error handling queue
  amqpChannel.assertQueue('error-queue'),
  amqpChannel.assertQueue('requeue-queue')
])

// Bind queues
await Promise.all([
  amqpChannel.bindQueue('email-queue', 'notification-exchange', ''),
  amqpChannel.bindQueue('webhook-queue', 'notification-exchange', ''),
  // New bindings with intermediate error queue
  amqpChannel.bindQueue('error-queue', 'error-exchange', ''),
  amqpChannel.bindQueue('requeue-queue', 'wait-exchange', '')
])

Then, we consume

const amqpChannel = // connect and get it from connection

// Consume email
await amqpChannel.consume('email-queue', (msg) => {
  try {
    await emailService.send(composeEmail(msg))
    amqpChannel.ack(msg)
  } catch (err) {
    console.error(err)
    amqpChannel.nack(msg, false, false)
    //                           ^ Don't requeue !
  }
})

// Consume webhook
await amqpChannel.consume('webhook-queue', (msg) => {
  try {
    await webhookService.send(composeEmail(msg))
    amqpChannel.ack(msg)
  } catch (err) {
    console.error(err)
    amqpChannel.nack(msg, false, false)
    //                           ^ Don't requeue !
  }
})

// Consume and handle error
await amqpChannel.consume('error-queue', (msg) => {
  const reason = msg.properties.headers['x-first-death-reason']

  // Here we can control which discard reasons
  // for which the message should be requeued
  // I'm only keeping 'rejected' (`nack`s) ones here
  if (reason !== 'rejected') {
    amqpChannel.ack(msg)
    return
  }
  
  // Get number of rejects from header
  const xDeaths = msg.properties.headers['x-death']
  const rejectsCount = (
    xDeaths.find(d => d.reason === 'rejected')?.count ?? -1
  )

  // Not a rejection
  if (rejectsCount <= 0) {
    amqpChannel.ack(msg)
    return
  }

  // Our requeue delays curve
  const REQUEUE_DELAY_DURATIONS = [
    500, // 0.5 secs
    30 * 1000, // 5 secs
    1 * 3600, // 1 min
    5 * 3600, // 5 mins
  ]

  // Find duration to wait
  // before next requeue
  const nextRequeueIndex = rejectsCount - 1
  const nextRequeueDuration = REQUEUE_DELAY_DURATIONS[nextRequeueIndex] ?? null

  // Msg reached max requeue amount
  if (nextRequeueDuration == null) {
    amqpChannel.ack(msg)
    return
  }

  // Send message to the wait-exchange
  await amqpChannel.publish(
    'wait-exchange',
    '',
    msg.content,
    {
      ...msg.properties,
      headers: {
        ...msg.properties.headers,
        // Tell wait-exchange to wait for this amount
        'x-wait': nextRequeueDuration
      }
    }
  )

  // Don't forget to ack it from error queue
  amqpChannel.ack(msg)
})

// Consume and handle requeue
await amqpChannel.consume('requeue-queue', (msg) => {
  const originalQueue = msg.properties.headers['x-first-death-queue']

  if (originalQueue == null) {
    amqpChannel.ack(msg)
    return
  }

  // Re-send the message to the queue
  // where it errored
  await amqpChannel.sendToQueue(
    originalQueue,
    msg.content,
    msg.properties
  )

  // Don't forget to ack it from requeue queue
  amqpChannel.ack(msg)
})

Pros / cons

Pros

  • Uppon repeated failures, we requeue the messages with predefined and growing delays between retries; until a maximum is reached.
  • Still semantic and conceptually simple

Cons

Thanks for reading 💚


Appendix

Quorum queues

To mitigate the "infinite requeue" issue in a more simple way, one can configure a delivery-limit on the queues (⚠️ using quorum queues only).

Delay queues

While browsing the web, you'll land on a popular solution involving a wait-queue (instead of an delayed exchange) with a TTL setup on it, bound to another DLX.

I discourage against this practise because it has several tradeoffs.

  • Firstly, in the likely case you want to grow the delay between each retry like we just did, you'd actually need multiple wait-queues. (see caveats or per-message TTL, forcing us to use per-queue TTL)
  • Second, you're adding a new DLX; which will update on the fly the x-death headers.