diff --git a/README.md b/README.md index 49a45f6..3ee94c1 100644 --- a/README.md +++ b/README.md @@ -33,10 +33,13 @@ const doReq = async (firstName, lastName) => { return firstName + ' ' + lastName; }; -const users = [{firstName: 'Irene', lastName: 'Pullman'}, {firstName: 'Sean', lastName: 'Parr'}]; +const users = [ + {firstName: 'Irene', lastName: 'Pullman'}, + {firstName: 'Sean', lastName: 'Parr'}, +]; //Queue with functions to be run -const queue = users.map(user => () => doReq(user.firstName, user.lastName)); +const queue = users.map((user) => () => doReq(user.firstName, user.lastName)); //Default Throttle runs with 5 promises parallel. const formattedNames = await Throttle.all(queue); @@ -86,6 +89,7 @@ The `progressCallback` and the `Raw` will return a `Result` object with the foll | Property | Type | Start value | Definition | | :--------------------- | :------ | :---------- | :------------------------------------------------------------------------------------------- | +| lastCompletedIndex | Integer | -1 | last index of a task that is completed (either fulfilled or rejected) | | amountDone | Integer | 0 | amount of tasks which are finished | | amountStarted | Integer | 0 | amount of tasks which started | | amountResolved | Integer | 0 | amount of tasks which successfully resolved | diff --git a/package.json b/package.json index 0bec1b6..231f7c8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "promise-parallel-throttle", - "version": "3.4.1", + "version": "3.5.0", "keywords": [ "promise", "async", diff --git a/src/throttle.ts b/src/throttle.ts index 6b94879..4e147d9 100644 --- a/src/throttle.ts +++ b/src/throttle.ts @@ -1,203 +1,207 @@ -const DEFAULT_MAX = 5; - -export interface Result { - amountDone: number; - amountStarted: number; - amountResolved: number; - amountRejected: number; - amountNextCheckFalsey: number; - rejectedIndexes: number[]; - resolvedIndexes: number[]; - nextCheckFalseyIndexes: number[]; - taskResults: T[]; -} - -export interface Options { - maxInProgress?: number; - failFast?: boolean; - progressCallback?: (result: Result) => void; - nextCheck?: nextTaskCheck; - ignoreIsFunctionCheck?: boolean; -} - -/** - * Default checker which validates if a next task should begin. - * This can be overwritten to write own checks for example checking the amount - * of used ram and waiting till the ram is low enough for a next task. - * - * It should always resolve with a boolean, either `true` to start a next task - * or `false` to stop executing a new task. - * - * If this method rejects, the error will propagate to the caller - * @param status - * @param tasks - * @returns {Promise} - */ -const defaultNextTaskCheck: nextTaskCheck = (status: Result, tasks: Tasks): Promise => { - return Promise.resolve(status.amountStarted < tasks.length); -}; - -const DEFAULT_OPTIONS = { - maxInProgress: DEFAULT_MAX, - failFast: false, - nextCheck: defaultNextTaskCheck, - ignoreIsFunctionCheck: false -}; - -export type Task = () => Promise; -export type Tasks = Array>; -export type nextTaskCheck = (status: Result, tasks: Tasks) => Promise; - -/** - * Raw throttle function, which can return extra meta data. - * @param tasks required array of tasks to be executed - * @param options Options object - * @returns {Promise} - */ -export function raw(tasks: Tasks, options?: Options): Promise> { - return new Promise>((resolve, reject) => { - const myOptions = Object.assign({}, DEFAULT_OPTIONS, options); - const result: Result = { - amountDone: 0, - amountStarted: 0, - amountResolved: 0, - amountRejected: 0, - amountNextCheckFalsey: 0, - rejectedIndexes: [], - resolvedIndexes: [], - nextCheckFalseyIndexes: [], - taskResults: [] - }; - - if (tasks.length === 0) { - return resolve(result); - } - - let failedFast = false; - let currentTaskIndex = 0; - - const handleError = (error: T, index: number) => { - result.taskResults[index] = error; - result.rejectedIndexes.push(index); - result.amountRejected++; - if (myOptions.failFast === true) { - failedFast = true; - return reject(result); - } - taskDone(); - }; - - const executeTask = (index: number) => { - result.amountStarted++; - if (typeof tasks[index] === 'function') { - try { - tasks[index]().then( - taskResult => { - result.taskResults[index] = taskResult; - result.resolvedIndexes.push(index); - result.amountResolved++; - taskDone(); - }, - error => { - handleError(error, index); - } - ); - } catch (err) { - handleError(err as T, index); - } - } else if (myOptions.ignoreIsFunctionCheck === true) { - result.taskResults[index] = (tasks[index] as any) as T; - result.resolvedIndexes.push(index); - result.amountResolved++; - taskDone(); - } else { - failedFast = true; - return reject( - new Error('tasks[' + index + ']: ' + tasks[index] + ', is supposed to be of type function') - ); - } - }; - - const taskDone = () => { - //make sure no more tasks are spawned when we failedFast - if (failedFast === true) { - return; - } - - result.amountDone++; - if (typeof (myOptions as Options).progressCallback === 'function') { - (myOptions as any).progressCallback(result); - } - if (result.amountDone === tasks.length) { - return resolve(result); - } - if (currentTaskIndex < tasks.length) { - nextTask(currentTaskIndex++); - } - }; - - const nextTask = (index: number) => { - //check if we can execute the next task - myOptions.nextCheck(result, tasks).then(canExecuteNextTask => { - if (canExecuteNextTask === true) { - //execute it - executeTask(index); - } else { - result.amountNextCheckFalsey++; - result.nextCheckFalseyIndexes.push(index); - taskDone(); - } - }, reject); - }; - - //spawn the first X task - for (let i = 0; i < Math.min(myOptions.maxInProgress, tasks.length); i++) { - nextTask(currentTaskIndex++); - } - }); -} - -/** - * Executes the raw function, but only return the task array - * @param tasks - * @param options - * @returns {Promise} - */ -function executeRaw(tasks: Tasks, options: Options): Promise { - return new Promise((resolve, reject) => { - raw(tasks, options).then( - (result: Result) => { - resolve(result.taskResults); - }, - (error: Error | Result) => { - if (error instanceof Error) { - reject(error); - } else { - reject(error.taskResults[error.rejectedIndexes[0]]); - } - } - ); - }); -} - -/** - * Simply run all the promises after each other, so in synchronous manner - * @param tasks required array of tasks to be executed - * @param options Options object - * @returns {Promise} - */ -export function sync(tasks: Tasks, options?: Options): Promise { - const myOptions = Object.assign({}, {maxInProgress: 1, failFast: true}, options); - return executeRaw(tasks, myOptions); -} - -/** - * Exposes the same behaviour as Promise.All(), but throttled! - * @param tasks required array of tasks to be executed - * @param options Options object - * @returns {Promise} - */ -export function all(tasks: Tasks, options?: Options): Promise { - const myOptions = Object.assign({}, {failFast: true}, options); - return executeRaw(tasks, myOptions); -} +const DEFAULT_MAX = 5; + +export interface Result { + lastCompletedIndex: number; + amountDone: number; + amountStarted: number; + amountResolved: number; + amountRejected: number; + amountNextCheckFalsey: number; + rejectedIndexes: number[]; + resolvedIndexes: number[]; + nextCheckFalseyIndexes: number[]; + taskResults: T[]; +} + +export interface Options { + maxInProgress?: number; + failFast?: boolean; + progressCallback?: (result: Result) => void; + nextCheck?: nextTaskCheck; + ignoreIsFunctionCheck?: boolean; +} + +/** + * Default checker which validates if a next task should begin. + * This can be overwritten to write own checks for example checking the amount + * of used ram and waiting till the ram is low enough for a next task. + * + * It should always resolve with a boolean, either `true` to start a next task + * or `false` to stop executing a new task. + * + * If this method rejects, the error will propagate to the caller + * @param status + * @param tasks + * @returns {Promise} + */ +const defaultNextTaskCheck: nextTaskCheck = (status: Result, tasks: Tasks): Promise => { + return Promise.resolve(status.amountStarted < tasks.length); +}; + +const DEFAULT_OPTIONS = { + maxInProgress: DEFAULT_MAX, + failFast: false, + nextCheck: defaultNextTaskCheck, + ignoreIsFunctionCheck: false, +}; + +export type Task = () => Promise; +export type Tasks = Array>; +export type nextTaskCheck = (status: Result, tasks: Tasks) => Promise; + +/** + * Raw throttle function, which can return extra meta data. + * @param tasks required array of tasks to be executed + * @param options Options object + * @returns {Promise} + */ +export function raw(tasks: Tasks, options?: Options): Promise> { + return new Promise>((resolve, reject) => { + const myOptions = Object.assign({}, DEFAULT_OPTIONS, options); + const result: Result = { + lastCompletedIndex: -1, + amountDone: 0, + amountStarted: 0, + amountResolved: 0, + amountRejected: 0, + amountNextCheckFalsey: 0, + rejectedIndexes: [], + resolvedIndexes: [], + nextCheckFalseyIndexes: [], + taskResults: [], + }; + + if (tasks.length === 0) { + return resolve(result); + } + + let failedFast = false; + let currentTaskIndex = 0; + + const handleError = (error: T, index: number) => { + result.taskResults[index] = error; + result.rejectedIndexes.push(index); + result.amountRejected++; + if (myOptions.failFast === true) { + result.lastCompletedIndex = index; + failedFast = true; + return reject(result); + } + taskDone(index); + }; + + const executeTask = (index: number) => { + result.amountStarted++; + if (typeof tasks[index] === 'function') { + try { + tasks[index]().then( + (taskResult) => { + result.taskResults[index] = taskResult; + result.resolvedIndexes.push(index); + result.amountResolved++; + taskDone(index); + }, + (error) => { + handleError(error, index); + }, + ); + } catch (err) { + handleError(err as T, index); + } + } else if (myOptions.ignoreIsFunctionCheck === true) { + result.taskResults[index] = tasks[index] as any as T; + result.resolvedIndexes.push(index); + result.amountResolved++; + taskDone(index); + } else { + failedFast = true; + return reject( + new Error('tasks[' + index + ']: ' + tasks[index] + ', is supposed to be of type function'), + ); + } + }; + + const taskDone = (index: number) => { + //make sure no more tasks are spawned when we failedFast + if (failedFast === true) { + return; + } + + result.amountDone++; + result.lastCompletedIndex = index; + if (typeof myOptions.progressCallback === 'function') { + myOptions.progressCallback(result); + } + if (result.amountDone === tasks.length) { + return resolve(result); + } + if (currentTaskIndex < tasks.length) { + nextTask(currentTaskIndex++); + } + }; + + const nextTask = (index: number) => { + //check if we can execute the next task + myOptions.nextCheck(result, tasks).then((canExecuteNextTask) => { + if (canExecuteNextTask === true) { + //execute it + executeTask(index); + } else { + result.amountNextCheckFalsey++; + result.nextCheckFalseyIndexes.push(index); + taskDone(index); + } + }, reject); + }; + + //spawn the first X task + for (let i = 0; i < Math.min(myOptions.maxInProgress, tasks.length); i++) { + nextTask(currentTaskIndex++); + } + }); +} + +/** + * Executes the raw function, but only return the task array + * @param tasks + * @param options + * @returns {Promise} + */ +function executeRaw(tasks: Tasks, options: Options): Promise { + return new Promise((resolve, reject) => { + raw(tasks, options).then( + (result: Result) => { + resolve(result.taskResults); + }, + (error: Error | Result) => { + if (error instanceof Error) { + reject(error); + } else { + reject(error.taskResults[error.rejectedIndexes[0]]); + } + }, + ); + }); +} + +/** + * Simply run all the promises after each other, so in synchronous manner + * @param tasks required array of tasks to be executed + * @param options Options object + * @returns {Promise} + */ +export function sync(tasks: Tasks, options?: Options): Promise { + const myOptions = Object.assign({}, {maxInProgress: 1, failFast: true}, options); + return executeRaw(tasks, myOptions); +} + +/** + * Exposes the same behaviour as Promise.All(), but throttled! + * @param tasks required array of tasks to be executed + * @param options Options object + * @returns {Promise} + */ +export function all(tasks: Tasks, options?: Options): Promise { + const myOptions = Object.assign({}, {failFast: true}, options); + return executeRaw(tasks, myOptions); +} diff --git a/tests/throttle.test.ts b/tests/throttle.test.ts index 50353a6..80774e6 100644 --- a/tests/throttle.test.ts +++ b/tests/throttle.test.ts @@ -332,6 +332,35 @@ describe('Throttle test', function () { expect(progressCallback).toHaveBeenCalledTimes(5); }); + it('should call the callback function with the just completed index', async function () { + /* Given */ + const names: Person[] = [ + {firstName: 'Irene', lastName: 'Pullman'}, + {firstName: 'Sean', lastName: 'Parr'}, + {firstName: 'Joe', lastName: 'Slater'}, + {firstName: 'Karen', lastName: 'Turner'}, + {firstName: 'Tim', lastName: 'Black'}, + ]; + + const combineNames = (firstName: string, lastName: string): Promise => { + return new Promise((resolve, reject) => { + resolve(firstName + ' ' + lastName); + }); + }; + + //Create a array of functions to be run + const tasks = names.map((u) => () => combineNames(u.firstName, u.lastName)); + + /* When */ + const indexes = new Set(); + const {taskResults: formattedNames} = await Throttle.raw(tasks, { + progressCallback: (res) => indexes.add(res.lastCompletedIndex), + }); + + /* Then */ + expect(indexes).toEqual(new Set([0, 1, 2, 3, 4])); + }); + it('should eventually stop if the nextTaskCheck returns false', async function () { /* Given */ const names: Person[] = [