software-design|August 23, 2022|4 min read

How to Implement Exponential Backoff in Rabbitmq Using AMQP in Node.js

TL;DR

Use per-retry dead-letter exchanges with increasing TTLs to implement exponential backoff in RabbitMQ. Each retry publishes to a delay queue that routes back to the main queue after the TTL expires.

How to Implement Exponential Backoff in Rabbitmq Using AMQP in Node.js

Exponential Backoff in Rabbitmq

Please make sure to read first, why we need the Exponential Backoff in event driven systems.

Nodejs Code using AMQP

Message Class

Lets first wrap our message,

'use strict';

class Message {
    constructor(channel, message) {
        this._channel = channel;
        this._message = message;
    }

    async ack() {
        await this._channel.ack(this._message);
    }

    async nack() {
        await this._channel.nack(this._message);
    }

    get properties() {
        return this._message.properties;
    }

    get content() {
        return this._message.content;
    }
}

module.exports = Message;

Rabbitmq Controller Class

Lets define our controller class, which will create connections and manage queues.

'use strict';

const amqp = require('amqp-connection-manager');
const Message = require('./message');

const QUEUE_DELAY = 'x-gyanblog-delay';
const QUEUE_TOTAL_DELAY = 'x-gyanblog-total-delay';
const QUEUE_TOTAL_RETRIES = 'x-gyanblog-total-retires';

function randomIntFromInterval(min, max) {
    return Math.floor(Math.random() * (max - min + 1) + min);
}

class Controller {
    init(config) {
        this.config = config;
        
        this._queueName = config.queueName;
        this._retrySettings = config.retry;
        this._prefetch = config.prefetch;
        this._retryOnException = config.retryOnException;

        this._connection_string = config.connection_string;

        return this.connect();
    }

    connect() {
        if (!this._queueName) {
            return Promise.reject(new Error('No queue name was specified'));
        }
        this._queueName = `gyanblog_${this._queueName}`;
        this.retryQueueName = `gyanblog_${this._queueName}_retry`;
        this.inputExchangeName = `snitch_exchange_${this._queueName}`;
        this.retryExchangeName = `snitch_exchange_${this.queueName}_retry`;
        return Promise.resolve()
            .then(() => {
                return amqp.connect(this._connection_string, {
                    connectionOptions: {frameMax: 0x10000}, heartbeatIntervalInSeconds: 60
                });
            })
            .then((conn) => {
                console.log('info', 'Connected to rabbitmq');
                //Try and open a channel
                return conn.createChannel({json: true,
                    setup: ((channel) => {
                        console.log('info', 'input channel created, creating exchange', this.inputExchangeName);
                        return channel.assertExchange(this.inputExchangeName, 'direct', {durable: true})
                            .then(async () => {
                                console.log('info', 'Lets create the input queue', this._queueName);
                                await channel.assertQueue(this._queueName, {
                                    durable: true,
                                    persistent: true,
                                    maxLength: 1000000
                                });
                            })
                            .then(async () => {
                                console.log('info', 'Bind queue and exchange', this._queueName, this.inputExchangeName);
                                //Bind our queue to the earlier created exchange, the exchange will receive message and put them in the queue for us
                                await channel.bindQueue(this._queueName, this.inputExchangeName, '');
                            })
                            .then(async () => {
                                console.log('info', 'Create retry exchange', this.retryExchangeName);
                                //Lets create a dead letter and exponential backoff exchange
                                await channel.assertExchange(this.retryExchangeName, 'direct', {
                                    durable: true
                                });
                            })
                            .then(async () => {
                                console.log('info', 'Lets create the retry queue with dead letter delivery to', this.retryQueueName, this.inputExchangeName);
                                //Create a another queue with an dead-letter delivery to our main exchange. We will expect messages to be put here with an expiry
                                // and will expire and then retired later from main queue
                                await channel.assertQueue(this.retryQueueName, {
                                    durable: true,
                                    persistent: true,
                                    deadLetterExchange: this.inputExchangeName,
                                    maxLength: 1000000
                                });
                            })
                            .then(async () => {
                                console.log('info', 'Bind', this.retryQueueName, this.retryExchangeName, '');
                                await channel.bindQueue(this.retryQueueName, this.retryExchangeName, '');
                            })
                            .then(async () => {
                                console.log('info', `Starting queue consumer for queue: ${this._queueName}`);
                                channel.prefetch(this._prefetch);
                                
                                console.log('info', 'wait for the input to arrive on', this._queueName);
                                await channel.consume(this._queueName, this._consumeMessage.bind(this), {noAck: false});
                            });
                    })
                });
            })
            .then((channel) => {
                this._channel = channel;
                channel.on('disconnect', (err) => {
                    console.log('warn', err);
                });
            });
    }

    async _publishToExchange(message) {
        await this._channel.publish(this.inputExchangeName, '', message, {persistent: true})
            .then((response) => {
                if (!response) {
                    console.log('warn', 'cannot publish message; channel\'s buffer is full');
                    //Lets wait for drain event and send this message back into the queue
                    return new Promise((resolve) => {
                        this._channel._channel.once('drain', () => {
                            console.log('debug', 'Channel was successfully drained, so we can accept more data.');
                            resolve();
                        });
                    });
                }
                return response;
            });
    }

    async publishMessage(message) {
        await this._publishToExchange(message);
    }

    _consumeMessage(message) {
        let _message = new Message(this._channel, message);
        
        console.log('Message received');
        return new Promise((resolve, reject) => {
            try {
                const messageContent = JSON.parse(message.content);
                if (messageContent.op == 'error') {
                    return reject(new Error('My error'));
                }
                console.log(messageContent);
            } catch(error) {
                reject(error);
            }
            resolve();
        })
        .then(() => {
            _message.ack()
        })
        .catch((err) => {
            console.log('error', err);
            if (this._retryOnException) {
                return this._pushToDelayedQueue(message);
            }
            return _message.nack();
        });
    }

    _pushToDelayedQueue(message) {
        if (this._channel === null) {
            return;
        }
        //We will not nack this but instead it ack and put it in the retry later queue
        let expiration = 0;
        let totalDelay = 0;
        let totalRetries = 0;

        let config = this._retrySettings;

        //Sometimes these headers are missing and needs to be checked if they exists or not.
        if (message.properties.headers !== undefined) {
            expiration = message.properties.headers[QUEUE_DELAY] || expiration;
            totalDelay = message.properties.headers[QUEUE_TOTAL_DELAY] || totalDelay;
            totalRetries = message.properties.headers[QUEUE_TOTAL_RETRIES] || totalRetries;
        }
        if (expiration === 0) {
            expiration = config.initialWait;
        } else {
            if ((config.maximumWait !== -1 && totalDelay > config.maximumWait) ||
                (config.maxRetries !== -1 && totalRetries >= config.maxRetries)) {
                let object = JSON.parse(message.content.toString());
                console.log('warn', 'Rejecting task as it has between retried too beyond maximum attempts', object.id);
                return this._channel.ack(message);
            }
            expiration *= config.factor;
            expiration = Math.floor(expiration);
        }

        let randomizedValue = 0;
        if (config.randomizeBy > 0) {
            randomizedValue = randomIntFromInterval(0, config.randomizeBy);
        }

        let nextTotalDelay = totalDelay + expiration;
        if (config.maximumWaitCeil !== -1 && expiration > config.maximumWaitCeil) {
            expiration = config.maximumWaitCeil;
            nextTotalDelay = config.maximumWaitCeil;
        }

        console.log('expiration', expiration);
        //Lets send it to the retry exchange
        return this._channel.publish(this.retryExchangeName, '', JSON.parse(message.content), {
            expiration: expiration + randomizedValue,
            headers: {
                QUEUE_DELAY: expiration,
                QUEUE_TOTAL_DELAY: nextTotalDelay,
                QUEUE_TOTAL_RETRIES: totalRetries + 1
            }
        })
            .then(() => this._channel.ack(message));
    }
}

module.exports = Controller;

Main Runner Test Code

const RabbiqmqController = require('./rabbitmq_controller');

const config = {
    connection_string: 'amqp://guest:guest@localhost:5672',
    queueName: "test",
    retry: {
        factor: 1.2,
        initialWait: 5000,
        maximumWait: -1,
        randomizeBy: 2000,
        maxRetries: -1,
        maximumWaitCeil: -1
    },
    prefetch: 1,
    retryOnException: true
};

rabbitmqController = new RabbiqmqController();
rabbitmqController.init(config)
    .then(() => {

    })
    .catch((error) => {
        console.error(error);
    });

The code is self explanatory. We are just creating exchange/queues and retry queues. And, starting the consumer by default in this code. You might want to do something else.

Bonus

Docker-compose file for starting rabbitmq container:

version: '3.3'
services:
  rabbitmq:
    image: rabbitmq:3-management
    hostname: "mynode"
    ports:
       - "15672:15672"
       - "5672:5672"
    environment:
       RABBITMQ_DEFAULT_USER: "guest"
       RABBITMQ_DEFAULT_PASS: "guest"
       RABBITMQ_NODENAME: "mynode"
    volumes:
      - ./data:/var/lib/rabbitmq

Thanks for reading.

Related Posts

Why Exponential Backoff in Rabbitmq or In Event-Driven Systems

Why Exponential Backoff in Rabbitmq or In Event-Driven Systems

Understanding Simple Message Workflow First, lets understand a simple workflow…

Nodejs - Json object schema validation with Joi

Nodejs - Json object schema validation with Joi

Introduction In this post, I will show how to validate your json schema…

Mongoose - Using CRUD operations in mongodb in nodejs

Mongoose - Using CRUD operations in mongodb in nodejs

MongoDB CRUD Operations Mongoose provides a simple schema based solution to…

How to check whether a website link has your URL backlink or not - NodeJs implementation

How to check whether a website link has your URL backlink or not - NodeJs implementation

Introduction I got my seo backlink work done from a freelancer. It was like 300…

How to connect to mysql from nodejs, with ES6 promise

How to connect to mysql from nodejs, with ES6 promise

Introduction I had to develop a small automation to query some old mysql data…

Deep Dive on Elasticsearch: A System Design Interview Perspective

Deep Dive on Elasticsearch: A System Design Interview Perspective

“If you’re searching, filtering, or aggregating over large volumes of semi…

Latest Posts

Deep Dive on Elasticsearch: A System Design Interview Perspective

Deep Dive on Elasticsearch: A System Design Interview Perspective

“If you’re searching, filtering, or aggregating over large volumes of semi…

Deep Dive on Apache Kafka: A System Design Interview Perspective

Deep Dive on Apache Kafka: A System Design Interview Perspective

“Kafka is not a message queue. It’s a distributed commit log that happens to be…

Deep Dive on Redis: Architecture, Data Structures, and Production Usage

Deep Dive on Redis: Architecture, Data Structures, and Production Usage

“Redis is not just a cache. It’s a data structure server that happens to be…

Deep Dive on API Gateway: A System Design Interview Perspective

Deep Dive on API Gateway: A System Design Interview Perspective

“An API Gateway is the front door to your microservices. Every request walks…

REST API Design: Pagination, Versioning, and Best Practices

REST API Design: Pagination, Versioning, and Best Practices

Every time two systems need to talk, someone has to design the contract between…

Efficient Data Modelling: A Practical Guide for Production Systems

Efficient Data Modelling: A Practical Guide for Production Systems

Most engineers learn data modelling backwards. They draw an ER diagram…