Exponential Backoff in Rabbitmq
Please make sure to read first, why we need the Exponential Backoff in event driven systems.
Nodejs Code using AMQP
Message Class
Lets first wrap our message,
'use strict';
class Message {
constructor(channel, message) {
this._channel = channel;
this._message = message;
}
async ack() {
await this._channel.ack(this._message);
}
async nack() {
await this._channel.nack(this._message);
}
get properties() {
return this._message.properties;
}
get content() {
return this._message.content;
}
}
module.exports = Message;Rabbitmq Controller Class
Lets define our controller class, which will create connections and manage queues.
'use strict';
const amqp = require('amqp-connection-manager');
const Message = require('./message');
const QUEUE_DELAY = 'x-gyanblog-delay';
const QUEUE_TOTAL_DELAY = 'x-gyanblog-total-delay';
const QUEUE_TOTAL_RETRIES = 'x-gyanblog-total-retires';
function randomIntFromInterval(min, max) {
return Math.floor(Math.random() * (max - min + 1) + min);
}
class Controller {
init(config) {
this.config = config;
this._queueName = config.queueName;
this._retrySettings = config.retry;
this._prefetch = config.prefetch;
this._retryOnException = config.retryOnException;
this._connection_string = config.connection_string;
return this.connect();
}
connect() {
if (!this._queueName) {
return Promise.reject(new Error('No queue name was specified'));
}
this._queueName = `gyanblog_${this._queueName}`;
this.retryQueueName = `gyanblog_${this._queueName}_retry`;
this.inputExchangeName = `snitch_exchange_${this._queueName}`;
this.retryExchangeName = `snitch_exchange_${this.queueName}_retry`;
return Promise.resolve()
.then(() => {
return amqp.connect(this._connection_string, {
connectionOptions: {frameMax: 0x10000}, heartbeatIntervalInSeconds: 60
});
})
.then((conn) => {
console.log('info', 'Connected to rabbitmq');
//Try and open a channel
return conn.createChannel({json: true,
setup: ((channel) => {
console.log('info', 'input channel created, creating exchange', this.inputExchangeName);
return channel.assertExchange(this.inputExchangeName, 'direct', {durable: true})
.then(async () => {
console.log('info', 'Lets create the input queue', this._queueName);
await channel.assertQueue(this._queueName, {
durable: true,
persistent: true,
maxLength: 1000000
});
})
.then(async () => {
console.log('info', 'Bind queue and exchange', this._queueName, this.inputExchangeName);
//Bind our queue to the earlier created exchange, the exchange will receive message and put them in the queue for us
await channel.bindQueue(this._queueName, this.inputExchangeName, '');
})
.then(async () => {
console.log('info', 'Create retry exchange', this.retryExchangeName);
//Lets create a dead letter and exponential backoff exchange
await channel.assertExchange(this.retryExchangeName, 'direct', {
durable: true
});
})
.then(async () => {
console.log('info', 'Lets create the retry queue with dead letter delivery to', this.retryQueueName, this.inputExchangeName);
//Create a another queue with an dead-letter delivery to our main exchange. We will expect messages to be put here with an expiry
// and will expire and then retired later from main queue
await channel.assertQueue(this.retryQueueName, {
durable: true,
persistent: true,
deadLetterExchange: this.inputExchangeName,
maxLength: 1000000
});
})
.then(async () => {
console.log('info', 'Bind', this.retryQueueName, this.retryExchangeName, '');
await channel.bindQueue(this.retryQueueName, this.retryExchangeName, '');
})
.then(async () => {
console.log('info', `Starting queue consumer for queue: ${this._queueName}`);
channel.prefetch(this._prefetch);
console.log('info', 'wait for the input to arrive on', this._queueName);
await channel.consume(this._queueName, this._consumeMessage.bind(this), {noAck: false});
});
})
});
})
.then((channel) => {
this._channel = channel;
channel.on('disconnect', (err) => {
console.log('warn', err);
});
});
}
async _publishToExchange(message) {
await this._channel.publish(this.inputExchangeName, '', message, {persistent: true})
.then((response) => {
if (!response) {
console.log('warn', 'cannot publish message; channel\'s buffer is full');
//Lets wait for drain event and send this message back into the queue
return new Promise((resolve) => {
this._channel._channel.once('drain', () => {
console.log('debug', 'Channel was successfully drained, so we can accept more data.');
resolve();
});
});
}
return response;
});
}
async publishMessage(message) {
await this._publishToExchange(message);
}
_consumeMessage(message) {
let _message = new Message(this._channel, message);
console.log('Message received');
return new Promise((resolve, reject) => {
try {
const messageContent = JSON.parse(message.content);
if (messageContent.op == 'error') {
return reject(new Error('My error'));
}
console.log(messageContent);
} catch(error) {
reject(error);
}
resolve();
})
.then(() => {
_message.ack()
})
.catch((err) => {
console.log('error', err);
if (this._retryOnException) {
return this._pushToDelayedQueue(message);
}
return _message.nack();
});
}
_pushToDelayedQueue(message) {
if (this._channel === null) {
return;
}
//We will not nack this but instead it ack and put it in the retry later queue
let expiration = 0;
let totalDelay = 0;
let totalRetries = 0;
let config = this._retrySettings;
//Sometimes these headers are missing and needs to be checked if they exists or not.
if (message.properties.headers !== undefined) {
expiration = message.properties.headers[QUEUE_DELAY] || expiration;
totalDelay = message.properties.headers[QUEUE_TOTAL_DELAY] || totalDelay;
totalRetries = message.properties.headers[QUEUE_TOTAL_RETRIES] || totalRetries;
}
if (expiration === 0) {
expiration = config.initialWait;
} else {
if ((config.maximumWait !== -1 && totalDelay > config.maximumWait) ||
(config.maxRetries !== -1 && totalRetries >= config.maxRetries)) {
let object = JSON.parse(message.content.toString());
console.log('warn', 'Rejecting task as it has between retried too beyond maximum attempts', object.id);
return this._channel.ack(message);
}
expiration *= config.factor;
expiration = Math.floor(expiration);
}
let randomizedValue = 0;
if (config.randomizeBy > 0) {
randomizedValue = randomIntFromInterval(0, config.randomizeBy);
}
let nextTotalDelay = totalDelay + expiration;
if (config.maximumWaitCeil !== -1 && expiration > config.maximumWaitCeil) {
expiration = config.maximumWaitCeil;
nextTotalDelay = config.maximumWaitCeil;
}
console.log('expiration', expiration);
//Lets send it to the retry exchange
return this._channel.publish(this.retryExchangeName, '', JSON.parse(message.content), {
expiration: expiration + randomizedValue,
headers: {
QUEUE_DELAY: expiration,
QUEUE_TOTAL_DELAY: nextTotalDelay,
QUEUE_TOTAL_RETRIES: totalRetries + 1
}
})
.then(() => this._channel.ack(message));
}
}
module.exports = Controller;Main Runner Test Code
const RabbiqmqController = require('./rabbitmq_controller');
const config = {
connection_string: 'amqp://guest:guest@localhost:5672',
queueName: "test",
retry: {
factor: 1.2,
initialWait: 5000,
maximumWait: -1,
randomizeBy: 2000,
maxRetries: -1,
maximumWaitCeil: -1
},
prefetch: 1,
retryOnException: true
};
rabbitmqController = new RabbiqmqController();
rabbitmqController.init(config)
.then(() => {
})
.catch((error) => {
console.error(error);
});The code is self explanatory. We are just creating exchange/queues and retry queues. And, starting the consumer by default in this code. You might want to do something else.
Bonus
Docker-compose file for starting rabbitmq container:
version: '3.3'
services:
rabbitmq:
image: rabbitmq:3-management
hostname: "mynode"
ports:
- "15672:15672"
- "5672:5672"
environment:
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
RABBITMQ_NODENAME: "mynode"
volumes:
- ./data:/var/lib/rabbitmqThanks for reading.












