RabbitMQ Retries — The (new) Full Story
Implementing a semantic and powerful AMQP retry mechanism using DLX and Delayed Exchange.
When implementing event-driven architectures with the help of AMQP, you inevitably come to racking your brain over the message-retrying topic.
After reading a ton of documentation and making several tests over the years with one strategy or another (particularly from this post : RabbitMQ Retries — The Full Story); I often got frustrated with the solutions I came up with — feeling "hacky".
I finally came up with an elegant, simple and semantic pattern I'll try to describe here, from simplest concepts to advanced ones.
TL; DR
Here's the full solution diagram.
Our use-case
Something I dislike about other guides are the use-cases. In my opinion, a great example use-case is certainly minimalist; but realistic.
So let's take a brand new one : a notification service :
- We're reading {some events} from {some source},
- and dispatching them to several services. Let's say
- and webhooks
The base system features :
- 1 exchange :
notification-exchange
— can be of any type for this example - 2 work queues :
email-queue
andwebhook-queue
— bound to the exchange - 2 consumers : here
worker
s
— one consuming each queue
Our 2 workers will try and use external services to send emails and webhooks.
Option A — The naive nack(requeue)
Starting with the simplest possible form of "retry" : a plain nack-and-requeue.
Key concepts
nack
ing : is the way of telling AMQP "I failed handling the message"requeue
: is the last argument ofnack
— it will requeue the message back to the head of the queue
Some code
Here's a conceptual implementation example (Typescript, amqplib).
Pros / cons
Pros
- Simple and semantic, code is clean so is the architecture
Cons
- Requeuing is made at the head of the queue, repeated failures would lead to a complete queue locking
- Requeuing is made instantly, which would cause an extreme retry rate, so a extreme amount of subsequent service calls — like
emailService.send()
- Requeuing is made infinitely, any infinite failure will cause infinite retries, making the 2 points above even more dangerous.
Simplicity come at a cost. Read more about poison-messages.
Option B — The savior (aka DLX
)
Let's go on by tackling the issues above :
So
- How does one requeue at the tail of a queue (instead of the head) ?
- How does one limit the number of requeues ?
We're going to use a component actually designed for this : the well known DLX, or in its full name the Dead Letter Exchange.
Key concepts
- A Dead Letter Exchange is actually a standard exchange, but setup to be the queue
deadLetterExchange
await amqpChannel.assertQueue('email-queue', {
deadLetterExchange: 'error-exchange'
})
- Any discarded message on the queue will be re-routed to the DLX
— for any discard reason : rejection withnack
, expiration, ... - A requeue queue is bound to the Dead Letter Exchange
— in which we handle the requeuing (⚠️ see explaination below) - The Dead Letter Exchange adds some useful death headers on the message to help on making decisions about requeuing.
Some code
First, we add the 'error-exchange'
as deadLetterExchange
of work queues, and we create a 'requeue-queue'
bound to it.
Then, we consume :
- uppon error on work queues, we
nack
the message without requeing
— the rejected messages will go through the DLX - we consume the
'requeue-queue'
and handle requeuing logic
— using headers added by the DLX
Pros / cons
Pros
- Uppon repeated failures, the queue is not locked anymore. Requeues are made to the tail of the original queue, which releases it.
- Still semantic and relatively simple
Cons
- The requeues are still made almost instantly (modulo existing messages on the queue). So what's the problem with that ?
Well, let's take a step back and ask ourselves
When your partner doesn't want to get laid at the moment, do you retry him/her instantly 10 times in a row ?
I hope not, and the same goes for our use-case : an email service is unlikely to accept a new query 12ms after a failed one : it could be on an outage, or you could just have reached an API rate limit... In most cases, retrying instantly many times is actually both dangerous and ineffective.
Option C — The panacea (aka Delayed Exchange
)
So our solution is working so far, but we encountered this new issue :
So how does one wait before requeueing ?
We're using a new component called a Delayed Exchange as a "waiter" between retries. Check this out :
Pretty cool, huh ?
Key concepts
- We're still using a DLX just like before
- But this time we bound it to an intermediate queue
'error-queue'
— this one is here to handle delays logic - We introduce a new
'wait-exchange'
of type'x-delayed-message'
— it will store messages between retries - We move the
'requeue-queue'
at the end of the chain
- still responsible or requeuing logic
Some code
First, we create the new 'error-queue'
bound to the DLX. Then we add the Delayed 'wait-exchange'
and bind the 'requeue-queue'
to it.
const amqpChannel = // connect and get it from connection
// Create exchange and queues
const [notifExchange, errorExchange] = await Promise.all([
amqpChannel.assertExchange('notification-exchange', 'fanout'),
amqpChannel.assertExchange('error-exchange', 'fanout'),
// Our new delayed exchange
amqpChannel.assertExchange('wait-exchange', 'x-delayed-message')
])
const [emailQueue, webhookQueue] = await Promise.all([
amqpChannel.assertQueue('email-queue', {
deadLetterExchange: 'error-exchange'
}),
amqpChannel.assertQueue('webhook-queue', {
deadLetterExchange: 'error-exchange'
}),
// Our new error handling queue
amqpChannel.assertQueue('error-queue'),
amqpChannel.assertQueue('requeue-queue')
])
// Bind queues
await Promise.all([
amqpChannel.bindQueue('email-queue', 'notification-exchange', ''),
amqpChannel.bindQueue('webhook-queue', 'notification-exchange', ''),
// New bindings with intermediate error queue
amqpChannel.bindQueue('error-queue', 'error-exchange', ''),
amqpChannel.bindQueue('requeue-queue', 'wait-exchange', '')
])
Then, we consume
const amqpChannel = // connect and get it from connection
// Consume email
await amqpChannel.consume('email-queue', (msg) => {
try {
await emailService.send(composeEmail(msg))
amqpChannel.ack(msg)
} catch (err) {
console.error(err)
amqpChannel.nack(msg, false, false)
// ^ Don't requeue !
}
})
// Consume webhook
await amqpChannel.consume('webhook-queue', (msg) => {
try {
await webhookService.send(composeEmail(msg))
amqpChannel.ack(msg)
} catch (err) {
console.error(err)
amqpChannel.nack(msg, false, false)
// ^ Don't requeue !
}
})
// Consume and handle error
await amqpChannel.consume('error-queue', (msg) => {
const reason = msg.properties.headers['x-first-death-reason']
// Here we can control which discard reasons
// for which the message should be requeued
// I'm only keeping 'rejected' (`nack`s) ones here
if (reason !== 'rejected') {
amqpChannel.ack(msg)
return
}
// Get number of rejects from header
const xDeaths = msg.properties.headers['x-death']
const rejectsCount = (
xDeaths.find(d => d.reason === 'rejected')?.count ?? -1
)
// Not a rejection
if (rejectsCount <= 0) {
amqpChannel.ack(msg)
return
}
// Our requeue delays curve
const REQUEUE_DELAY_DURATIONS = [
500, // 0.5 secs
30 * 1000, // 5 secs
1 * 3600, // 1 min
5 * 3600, // 5 mins
]
// Find duration to wait
// before next requeue
const nextRequeueIndex = rejectsCount - 1
const nextRequeueDuration = REQUEUE_DELAY_DURATIONS[nextRequeueIndex] ?? null
// Msg reached max requeue amount
if (nextRequeueDuration == null) {
amqpChannel.ack(msg)
return
}
// Send message to the wait-exchange
await amqpChannel.publish(
'wait-exchange',
'',
msg.content,
{
...msg.properties,
headers: {
...msg.properties.headers,
// Tell wait-exchange to wait for this amount
'x-wait': nextRequeueDuration
}
}
)
// Don't forget to ack it from error queue
amqpChannel.ack(msg)
})
// Consume and handle requeue
await amqpChannel.consume('requeue-queue', (msg) => {
const originalQueue = msg.properties.headers['x-first-death-queue']
if (originalQueue == null) {
amqpChannel.ack(msg)
return
}
// Re-send the message to the queue
// where it errored
await amqpChannel.sendToQueue(
originalQueue,
msg.content,
msg.properties
)
// Don't forget to ack it from requeue queue
amqpChannel.ack(msg)
})
Pros / cons
Pros
- Uppon repeated failures, we requeue the messages with predefined and growing delays between retries; until a maximum is reached.
- Still semantic and conceptually simple
Cons
- It's using a community plugin : Delayed Exchange
— this is broadly mitigated by the fact this is a pretty stable and battle-tested
Thanks for reading 💚
Appendix
Quorum queues
To mitigate the "infinite requeue" issue in a more simple way, one can configure a delivery-limit
on the queues (⚠️ using quorum queues only).
Delay queues
While browsing the web, you'll land on a popular solution involving a wait-queue (instead of an delayed exchange) with a TTL setup on it, bound to another DLX.
I discourage against this practise because it has several tradeoffs.
- Firstly, in the likely case you want to grow the delay between each retry like we just did, you'd actually need multiple wait-queues. (see caveats or per-message TTL, forcing us to use per-queue TTL)
- Second, you're adding a new DLX; which will update on the fly the
x-death
headers.