/**
* Monitors a task queue for errors and handles them gracefully.
*
* @param {Array<Function>} taskQueue An array of functions representing tasks to be executed.
* @param {number} concurrency The maximum number of tasks to run concurrently. Defaults to 1.
* @param {number} interval The monitoring interval in milliseconds. Defaults to 1000.
*/
async function monitorTaskQueue(taskQueue, concurrency = 1, interval = 1000) {
let runningTasks = 0;
const taskPromises = []; // Store promises for each running task
/**
* Executes a single task and handles errors.
* @param {Function} task
*/
async function executeTask(task) {
try {
await task();
console.log("Task completed successfully.");
} catch (error) {
console.error("Task failed:", error);
//Graceful handling: Log the error, potentially retry, or skip.
} finally {
runningTasks--;
taskPromises.splice(taskPromises.indexOf(taskPromise), 1); //remove the taskPromise
}
}
/**
* Starts executing tasks based on concurrency.
*/
async function startTasks() {
for (let i = 0; i < concurrency && taskQueue.length > 0; i++) {
const task = taskQueue.shift();
runningTasks++;
const taskPromise = executeTask(task);
taskPromises.push(taskPromise);
// Continue with the next task.
if (taskQueue.length === 0) {
await Promise.all(taskPromises); //wait for all tasks to complete.
return;
}
}
}
//Initial start.
startTasks();
//Regular monitoring loop
setInterval(async () => {
if (runningTasks > 0) {
//Check for pending tasks
try {
await Promise.race(taskPromises.map(p => p.then(async (res) => {
return new Promise((resolve, reject) => {
try {
await res;
resolve();
} catch (err) {
reject(err);
}
});
})));
//If all tasks completed, reset running tasks.
runningTasks = 0;
} catch (error){
console.error("Error during monitoring:", error);
}
}
}, interval);
}
Add your comment