How to Download multiple Youtube Videos using Nodejs and Show a Progress Bar
Introduction I was trying to download some youtube videos for my kids. As I have…
August 23, 2022
Please make sure to read first, why we need the Exponential Backoff in event driven systems.
Lets first wrap our message,
'use strict';
class Message {
constructor(channel, message) {
this._channel = channel;
this._message = message;
}
async ack() {
await this._channel.ack(this._message);
}
async nack() {
await this._channel.nack(this._message);
}
get properties() {
return this._message.properties;
}
get content() {
return this._message.content;
}
}
module.exports = Message;
Lets define our controller class, which will create connections and manage queues.
'use strict';
const amqp = require('amqp-connection-manager');
const Message = require('./message');
const QUEUE_DELAY = 'x-gyanblog-delay';
const QUEUE_TOTAL_DELAY = 'x-gyanblog-total-delay';
const QUEUE_TOTAL_RETRIES = 'x-gyanblog-total-retires';
function randomIntFromInterval(min, max) {
return Math.floor(Math.random() * (max - min + 1) + min);
}
class Controller {
init(config) {
this.config = config;
this._queueName = config.queueName;
this._retrySettings = config.retry;
this._prefetch = config.prefetch;
this._retryOnException = config.retryOnException;
this._connection_string = config.connection_string;
return this.connect();
}
connect() {
if (!this._queueName) {
return Promise.reject(new Error('No queue name was specified'));
}
this._queueName = `gyanblog_${this._queueName}`;
this.retryQueueName = `gyanblog_${this._queueName}_retry`;
this.inputExchangeName = `snitch_exchange_${this._queueName}`;
this.retryExchangeName = `snitch_exchange_${this.queueName}_retry`;
return Promise.resolve()
.then(() => {
return amqp.connect(this._connection_string, {
connectionOptions: {frameMax: 0x10000}, heartbeatIntervalInSeconds: 60
});
})
.then((conn) => {
console.log('info', 'Connected to rabbitmq');
//Try and open a channel
return conn.createChannel({json: true,
setup: ((channel) => {
console.log('info', 'input channel created, creating exchange', this.inputExchangeName);
return channel.assertExchange(this.inputExchangeName, 'direct', {durable: true})
.then(async () => {
console.log('info', 'Lets create the input queue', this._queueName);
await channel.assertQueue(this._queueName, {
durable: true,
persistent: true,
maxLength: 1000000
});
})
.then(async () => {
console.log('info', 'Bind queue and exchange', this._queueName, this.inputExchangeName);
//Bind our queue to the earlier created exchange, the exchange will receive message and put them in the queue for us
await channel.bindQueue(this._queueName, this.inputExchangeName, '');
})
.then(async () => {
console.log('info', 'Create retry exchange', this.retryExchangeName);
//Lets create a dead letter and exponential backoff exchange
await channel.assertExchange(this.retryExchangeName, 'direct', {
durable: true
});
})
.then(async () => {
console.log('info', 'Lets create the retry queue with dead letter delivery to', this.retryQueueName, this.inputExchangeName);
//Create a another queue with an dead-letter delivery to our main exchange. We will expect messages to be put here with an expiry
// and will expire and then retired later from main queue
await channel.assertQueue(this.retryQueueName, {
durable: true,
persistent: true,
deadLetterExchange: this.inputExchangeName,
maxLength: 1000000
});
})
.then(async () => {
console.log('info', 'Bind', this.retryQueueName, this.retryExchangeName, '');
await channel.bindQueue(this.retryQueueName, this.retryExchangeName, '');
})
.then(async () => {
console.log('info', `Starting queue consumer for queue: ${this._queueName}`);
channel.prefetch(this._prefetch);
console.log('info', 'wait for the input to arrive on', this._queueName);
await channel.consume(this._queueName, this._consumeMessage.bind(this), {noAck: false});
});
})
});
})
.then((channel) => {
this._channel = channel;
channel.on('disconnect', (err) => {
console.log('warn', err);
});
});
}
async _publishToExchange(message) {
await this._channel.publish(this.inputExchangeName, '', message, {persistent: true})
.then((response) => {
if (!response) {
console.log('warn', 'cannot publish message; channel\'s buffer is full');
//Lets wait for drain event and send this message back into the queue
return new Promise((resolve) => {
this._channel._channel.once('drain', () => {
console.log('debug', 'Channel was successfully drained, so we can accept more data.');
resolve();
});
});
}
return response;
});
}
async publishMessage(message) {
await this._publishToExchange(message);
}
_consumeMessage(message) {
let _message = new Message(this._channel, message);
console.log('Message received');
return new Promise((resolve, reject) => {
try {
const messageContent = JSON.parse(message.content);
if (messageContent.op == 'error') {
return reject(new Error('My error'));
}
console.log(messageContent);
} catch(error) {
reject(error);
}
resolve();
})
.then(() => {
_message.ack()
})
.catch((err) => {
console.log('error', err);
if (this._retryOnException) {
return this._pushToDelayedQueue(message);
}
return _message.nack();
});
}
_pushToDelayedQueue(message) {
if (this._channel === null) {
return;
}
//We will not nack this but instead it ack and put it in the retry later queue
let expiration = 0;
let totalDelay = 0;
let totalRetries = 0;
let config = this._retrySettings;
//Sometimes these headers are missing and needs to be checked if they exists or not.
if (message.properties.headers !== undefined) {
expiration = message.properties.headers[QUEUE_DELAY] || expiration;
totalDelay = message.properties.headers[QUEUE_TOTAL_DELAY] || totalDelay;
totalRetries = message.properties.headers[QUEUE_TOTAL_RETRIES] || totalRetries;
}
if (expiration === 0) {
expiration = config.initialWait;
} else {
if ((config.maximumWait !== -1 && totalDelay > config.maximumWait) ||
(config.maxRetries !== -1 && totalRetries >= config.maxRetries)) {
let object = JSON.parse(message.content.toString());
console.log('warn', 'Rejecting task as it has between retried too beyond maximum attempts', object.id);
return this._channel.ack(message);
}
expiration *= config.factor;
expiration = Math.floor(expiration);
}
let randomizedValue = 0;
if (config.randomizeBy > 0) {
randomizedValue = randomIntFromInterval(0, config.randomizeBy);
}
let nextTotalDelay = totalDelay + expiration;
if (config.maximumWaitCeil !== -1 && expiration > config.maximumWaitCeil) {
expiration = config.maximumWaitCeil;
nextTotalDelay = config.maximumWaitCeil;
}
console.log('expiration', expiration);
//Lets send it to the retry exchange
return this._channel.publish(this.retryExchangeName, '', JSON.parse(message.content), {
expiration: expiration + randomizedValue,
headers: {
QUEUE_DELAY: expiration,
QUEUE_TOTAL_DELAY: nextTotalDelay,
QUEUE_TOTAL_RETRIES: totalRetries + 1
}
})
.then(() => this._channel.ack(message));
}
}
module.exports = Controller;
const RabbiqmqController = require('./rabbitmq_controller');
const config = {
connection_string: 'amqp://guest:guest@localhost:5672',
queueName: "test",
retry: {
factor: 1.2,
initialWait: 5000,
maximumWait: -1,
randomizeBy: 2000,
maxRetries: -1,
maximumWaitCeil: -1
},
prefetch: 1,
retryOnException: true
};
rabbitmqController = new RabbiqmqController();
rabbitmqController.init(config)
.then(() => {
})
.catch((error) => {
console.error(error);
});
The code is self explanatory. We are just creating exchange/queues and retry queues. And, starting the consumer by default in this code. You might want to do something else.
Docker-compose file for starting rabbitmq container:
version: '3.3'
services:
rabbitmq:
image: rabbitmq:3-management
hostname: "mynode"
ports:
- "15672:15672"
- "5672:5672"
environment:
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
RABBITMQ_NODENAME: "mynode"
volumes:
- ./data:/var/lib/rabbitmq
Thanks for reading.
Introduction I was trying to download some youtube videos for my kids. As I have…
Understanding Simple Message Workflow First, lets understand a simple workflow…
What is a Singleton Pattern Following constraints are applied: Where we can…
MongoDB CRUD Operations Mongoose provides a simple schema based solution to…
Introduction Npm has a tool called: npm audit which reports if your packages or…
This library is ES6, promise compatible. Or, in your package.json file, include…
Introduction In this post we will see following: How to schedule a job on cron…
Introduction There are some cases, where I need another git repository while…
Introduction In this post, we will see how to fetch multiple credentials and…
Introduction I have an automation script, that I want to run on different…
Introduction I had to write a CICD system for one of our project. I had to…
Introduction Java log4j has many ways to initialize and append the desired…