Skip to content

Commit b1e6535

Browse files
authored
feat: add option to set asyncmap/asyncfilter concurrency pool limit (#58)
* feat: add option to set asyncmap/asyncfilter concurrency pool limit * address copilot comments * try to reduce test flakiness * update mocha syntax * use p-limit to avoid reinventing the wheel * simplify with limitFunction * address review comments * address review comments * ensure it still works with sync inputs
1 parent be989b0 commit b1e6535

File tree

5 files changed

+110
-47
lines changed

5 files changed

+110
-47
lines changed

README.md

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,27 @@ Then in your async functions, you can do:
9090

9191
```js
9292
const items = [1, 2, 3, 4];
93-
const slowSquare = async (n) => { await sleep(5); return n * 2; };
94-
let newItems = await asyncmap(items, async (i) => { return await slowSquare(i); });
93+
const slowSquare = async (n) => { await sleep(5); return n * n; };
94+
let newItems = await asyncmap(items, slowSquare);
9595
console.log(newItems); // [1, 4, 9, 16];
9696

9797
const slowEven = async (n) => { await sleep(5); return n % 2 === 0; };
98-
newItems = await asyncfilter(items, async (i) => { return await slowEven(i); });
98+
newItems = await asyncfilter(items, slowEven);
9999
console.log(newItems); // [2, 4];
100100
```
101101

102-
By default, `asyncmap` and `asyncfilter` run their operations in parallel; you
103-
can pass `false` as a third argument to make sure it happens serially.
102+
By default, `asyncmap` and `asyncfilter` run their operations in parallel, but you
103+
can set the third argument to `false` to enforce sequential execution, or set a custom
104+
concurrency pool limit using `{concurrency: <number>}`:
105+
106+
```js
107+
const items = [1, 2, 3, 4];
108+
const slowSquare = async (n) => { await sleep(5); return n * n; };
109+
// this will run sequentially (~20ms)
110+
const newItemsSeq = await asyncmap(items, slowSquare, false);
111+
// this will handle 2 items at a time (~10ms)
112+
const newItemsMaxTwo = await asyncmap(items, slowSquare, {concurrency: 2});
113+
```
104114

105115
### waitForCondition
106116

lib/asyncbox.ts

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import type {LongSleepOptions, WaitForConditionOptions} from './types.js';
1+
import {limitFunction} from 'p-limit';
2+
import type {LongSleepOptions, MapFilterOptions, WaitForConditionOptions} from './types.js';
23

34
const LONG_SLEEP_THRESHOLD = 5000; // anything over 5000ms will turn into a spin
45

@@ -105,54 +106,66 @@ export async function retryInterval<T = any>(
105106
}
106107

107108
/**
108-
* Similar to `Array.prototype.map`; runs in serial or parallel
109+
* Similar to `Array.prototype.map`; runs in parallel, serial, or with custom concurrency pool
109110
* @param coll - The collection to map over
110111
* @param mapper - The function to apply to each element
111-
* @param runInParallel - Whether to run operations in parallel (default: true)
112+
* @param options - Options for controlling parallelism (default: true - fully parallel)
112113
*/
113114
export async function asyncmap<T, R>(
114115
coll: T[],
115116
mapper: (value: T) => R | Promise<R>,
116-
runInParallel = true,
117+
options: MapFilterOptions = true,
117118
): Promise<R[]> {
118-
if (runInParallel) {
119-
return Promise.all(coll.map(mapper));
119+
if (options === null) {
120+
throw new Error('Options cannot be null');
120121
}
121-
122-
const newColl: R[] = [];
123-
for (const item of coll) {
124-
newColl.push(await mapper(item));
122+
// limitFunction requires the mapper to always return a promise
123+
const mapperAsync = async (value: T): Promise<R> => mapper(value);
124+
if (options === false) {
125+
return coll.reduce<Promise<R[]>>(
126+
async (acc, item) => [...(await acc), await mapperAsync(item)],
127+
Promise.resolve([]),
128+
);
125129
}
126-
return newColl;
130+
const adjustedMapper =
131+
options === true ? mapperAsync : limitFunction(mapperAsync, {concurrency: options.concurrency});
132+
return Promise.all(coll.map(adjustedMapper));
127133
}
128134

129135
/**
130-
* Similar to `Array.prototype.filter`
136+
* Similar to `Array.prototype.filter`; runs in parallel, serial, or with custom concurrency pool
131137
* @param coll - The collection to filter
132138
* @param filter - The function to test each element
133-
* @param runInParallel - Whether to run operations in parallel (default: true)
139+
* @param options - Options for controlling parallelism (default: true - fully parallel)
134140
*/
135141
export async function asyncfilter<T>(
136142
coll: T[],
137143
filter: (value: T) => boolean | Promise<boolean>,
138-
runInParallel = true,
144+
options: MapFilterOptions = true,
139145
): Promise<T[]> {
140-
const newColl: T[] = [];
141-
if (runInParallel) {
142-
const bools = await Promise.all(coll.map(filter));
143-
for (let i = 0; i < coll.length; i++) {
144-
if (bools[i]) {
145-
newColl.push(coll[i]);
146-
}
147-
}
148-
} else {
149-
for (const item of coll) {
150-
if (await filter(item)) {
151-
newColl.push(item);
146+
if (options === null) {
147+
throw new Error('Options cannot be null');
148+
}
149+
// limitFunction requires the filter to always return a promise
150+
const filterAsync = async (value: T): Promise<boolean> => filter(value);
151+
if (options === false) {
152+
return coll.reduce<Promise<T[]>>(async (accP, item) => {
153+
const acc = await accP;
154+
if (await filterAsync(item)) {
155+
acc.push(item);
152156
}
153-
}
157+
return acc;
158+
}, Promise.resolve([]));
154159
}
155-
return newColl;
160+
const adjustedFilter =
161+
options === true ? filterAsync : limitFunction(filterAsync, {concurrency: options.concurrency});
162+
const bools = await Promise.all(coll.map(adjustedFilter));
163+
return coll.reduce<T[]>((acc, item, i) => {
164+
if (bools[i]) {
165+
acc.push(item);
166+
}
167+
return acc;
168+
}, []);
156169
}
157170

158171
/**

lib/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ export interface LongSleepOptions {
3131
progressCb?: ProgressCallback | null;
3232
}
3333

34+
/**
35+
* Options for {@link asyncmap} and {@link asyncfilter}
36+
*/
37+
export type MapFilterOptions = boolean | {concurrency: number};
38+
3439
/**
3540
* Options for {@link waitForCondition}
3641
*/

package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
"printWidth": 100,
4646
"singleQuote": true
4747
},
48+
"dependencies": {
49+
"p-limit": "^7.2.0"
50+
},
4851
"devDependencies": {
4952
"@appium/eslint-config-appium-ts": "^2.0.5",
5053
"@appium/tsconfig": "^1.0.0",
@@ -56,8 +59,8 @@
5659
"chai-as-promised": "^8.0.2",
5760
"conventional-changelog-conventionalcommits": "^9.0.0",
5861
"eslint": "^9.39.1",
59-
"prettier": "^3.0.0",
6062
"mocha": "^11.7.5",
63+
"prettier": "^3.0.0",
6164
"semantic-release": "^25.0.2",
6265
"ts-node": "^10.9.1",
6366
"tsx": "^4.21.0",

test/asyncbox-specs.ts

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -210,49 +210,81 @@ describe('asyncmap', function () {
210210
await sleep(10);
211211
return el * 2;
212212
};
213-
const coll = [1, 2, 3];
213+
const coll = [1, 2, 3, 4, 5];
214+
const newColl = [2, 4, 6, 8, 10];
214215
it('should map elements one at a time', async function () {
215216
const start = Date.now();
216-
expect(await asyncmap(coll, mapper, false)).to.eql([2, 4, 6]);
217-
expect(Date.now() - start).to.be.at.least(30);
217+
expect(await asyncmap(coll, mapper, false)).to.eql(newColl);
218+
expect(Date.now() - start).to.be.at.least(50);
218219
});
219220
it('should map elements in parallel', async function () {
220221
const start = Date.now();
221-
expect(await asyncmap(coll, mapper)).to.eql([2, 4, 6]);
222+
expect(await asyncmap(coll, mapper)).to.eql(newColl);
222223
expect(Date.now() - start).to.be.at.most(20);
223224
});
225+
it('should map elements with concurrency', async function () {
226+
const start = Date.now();
227+
expect(await asyncmap(coll, mapper, {concurrency: 2})).to.eql(newColl);
228+
expect(Date.now() - start).to.be.at.least(29);
229+
expect(Date.now() - start).to.be.at.most(40);
230+
});
224231
it('should handle an empty array', async function () {
225232
expect(await asyncmap([], mapper, false)).to.eql([]);
226233
});
227234
it('should handle an empty array in parallel', async function () {
228235
expect(await asyncmap([], mapper)).to.eql([]);
229236
});
237+
it('should work for a sync mapper function', async function () {
238+
const syncmapper = (el: number): number => el * 2;
239+
expect(await asyncmap(coll, syncmapper, false)).to.eql(newColl);
240+
expect(await asyncmap(coll, syncmapper)).to.eql(newColl);
241+
});
242+
it('should raise an error if options is null', async function () {
243+
// @ts-expect-error - testing invalid inputs
244+
await expect(asyncmap(coll, mapper, null)).to.be.rejectedWith(
245+
'Options cannot be null'
246+
);
247+
});
230248
});
231249

232250
describe('asyncfilter', function () {
233251
const filter = async function (el: number): Promise<boolean> {
234-
await sleep(5);
252+
await sleep(10);
235253
return el % 2 === 0;
236254
};
237255
const coll = [1, 2, 3, 4, 5];
256+
const newColl = [2, 4];
238257
it('should filter elements one at a time', async function () {
239258
const start = Date.now();
240-
expect(await asyncfilter(coll, filter, false)).to.eql([2, 4]);
241-
expect(Date.now() - start).to.be.at.least(19);
259+
expect(await asyncfilter(coll, filter, false)).to.eql(newColl);
260+
expect(Date.now() - start).to.be.at.least(50);
242261
});
243262
it('should filter elements in parallel', async function () {
244263
const start = Date.now();
245-
expect(await asyncfilter(coll, filter)).to.eql([2, 4]);
246-
expect(Date.now() - start).to.be.below(9);
264+
expect(await asyncfilter(coll, filter)).to.eql(newColl);
265+
expect(Date.now() - start).to.be.at.most(20);
247266
});
248-
it('should handle an empty array', async function () {
267+
it('should filter elements with concurrency', async function () {
249268
const start = Date.now();
269+
expect(await asyncfilter(coll, filter, {concurrency: 2})).to.eql(newColl);
270+
expect(Date.now() - start).to.be.at.least(29);
271+
expect(Date.now() - start).to.be.at.most(40);
272+
});
273+
it('should handle an empty array', async function () {
250274
expect(await asyncfilter([], filter, false)).to.eql([]);
251-
expect(Date.now() - start).to.be.below(9);
252275
});
253276
it('should handle an empty array in parallel', async function () {
254-
const start = Date.now();
255277
expect(await asyncfilter([], filter)).to.eql([]);
256-
expect(Date.now() - start).to.be.below(9);
278+
});
279+
it('should work for a sync filter function', async function () {
280+
const syncfilter = (el: number): boolean => el % 2 === 0;
281+
expect(await asyncfilter(coll, syncfilter, false)).to.eql(newColl);
282+
expect(await asyncfilter(coll, syncfilter)).to.eql(newColl);
283+
});
284+
it('should raise an error if options is null', async function () {
285+
// @ts-expect-error - testing invalid inputs
286+
await expect(asyncfilter(coll, filter, null)).to.be.rejectedWith(
287+
'Options cannot be null'
288+
);
257289
});
258290
});

0 commit comments

Comments
 (0)