Skip to content

Commit 5455571

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent 27f4d64 commit 5455571

File tree

2 files changed

+223
-8
lines changed

2 files changed

+223
-8
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe, it } from 'mocha';
22

3+
import { invariant } from '../../jsutils/invariant';
34
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
45
import { parse } from '../../language/parser';
56

@@ -91,6 +92,36 @@ const query = new GraphQLObjectType({
9192
yield await Promise.resolve({});
9293
},
9394
},
95+
asyncIterableListDelayed: {
96+
type: new GraphQLList(friendType),
97+
async *resolve() {
98+
for (const friend of friends) {
99+
// pause an additional ms before yielding to allow time
100+
// for tests to return or throw before next value is processed.
101+
// eslint-disable-next-line no-await-in-loop
102+
await new Promise((r) => setTimeout(r, 1));
103+
yield friend;
104+
}
105+
},
106+
},
107+
asyncIterableListNoReturn: {
108+
type: new GraphQLList(friendType),
109+
resolve() {
110+
let i = 0;
111+
return {
112+
[Symbol.asyncIterator]: () => ({
113+
async next() {
114+
const friend = friends[i++];
115+
if (friend) {
116+
await new Promise((r) => setTimeout(r, 1));
117+
return { value: friend, done: false };
118+
}
119+
return { value: undefined, done: true };
120+
},
121+
}),
122+
};
123+
},
124+
},
94125
asyncIterableListDelayedClose: {
95126
type: new GraphQLList(friendType),
96127
async *resolve() {
@@ -869,4 +900,172 @@ describe('Execute: stream directive', () => {
869900
},
870901
]);
871902
});
903+
it('Returns underlying async iterables when dispatcher is returned', async () => {
904+
const document = parse(`
905+
query {
906+
asyncIterableListDelayed @stream(initialCount: 1) {
907+
name
908+
id
909+
}
910+
}
911+
`);
912+
const schema = new GraphQLSchema({ query });
913+
914+
const executeResult = await execute({ schema, document, rootValue: {} });
915+
invariant(isAsyncIterable(executeResult));
916+
const iterator = executeResult[Symbol.asyncIterator]();
917+
918+
const result1 = await iterator.next();
919+
expectJSON(result1).toDeepEqual({
920+
done: false,
921+
value: {
922+
data: {
923+
asyncIterableListDelayed: [
924+
{
925+
id: '1',
926+
name: 'Luke',
927+
},
928+
],
929+
},
930+
hasNext: true,
931+
},
932+
});
933+
934+
iterator.return?.();
935+
936+
// this result had started processing before return was called
937+
const result2 = await iterator.next();
938+
expectJSON(result2).toDeepEqual({
939+
done: false,
940+
value: {
941+
data: {
942+
id: '2',
943+
name: 'Han',
944+
},
945+
hasNext: true,
946+
path: ['asyncIterableListDelayed', 1],
947+
},
948+
});
949+
950+
// third result is not returned because async iterator has returned
951+
const result3 = await iterator.next();
952+
expectJSON(result3).toDeepEqual({
953+
done: false,
954+
value: {
955+
hasNext: false,
956+
},
957+
});
958+
});
959+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
960+
const document = parse(`
961+
query {
962+
asyncIterableListNoReturn @stream(initialCount: 1) {
963+
name
964+
id
965+
}
966+
}
967+
`);
968+
const schema = new GraphQLSchema({ query });
969+
970+
const executeResult = await execute({ schema, document, rootValue: {} });
971+
invariant(isAsyncIterable(executeResult));
972+
const iterator = executeResult[Symbol.asyncIterator]();
973+
974+
const result1 = await iterator.next();
975+
expectJSON(result1).toDeepEqual({
976+
done: false,
977+
value: {
978+
data: {
979+
asyncIterableListNoReturn: [
980+
{
981+
id: '1',
982+
name: 'Luke',
983+
},
984+
],
985+
},
986+
hasNext: true,
987+
},
988+
});
989+
990+
iterator.return?.();
991+
992+
// this result had started processing before return was called
993+
const result2 = await iterator.next();
994+
expectJSON(result2).toDeepEqual({
995+
done: false,
996+
value: {
997+
data: {
998+
id: '2',
999+
name: 'Han',
1000+
},
1001+
hasNext: true,
1002+
path: ['asyncIterableListNoReturn', 1],
1003+
},
1004+
});
1005+
1006+
// third result is not returned because async iterator has returned
1007+
const result3 = await iterator.next();
1008+
expectJSON(result3).toDeepEqual({
1009+
done: false,
1010+
value: {
1011+
hasNext: false,
1012+
},
1013+
});
1014+
});
1015+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1016+
const document = parse(`
1017+
query {
1018+
asyncIterableListDelayed @stream(initialCount: 1) {
1019+
name
1020+
id
1021+
}
1022+
}
1023+
`);
1024+
const schema = new GraphQLSchema({ query });
1025+
1026+
const executeResult = await execute({ schema, document, rootValue: {} });
1027+
invariant(isAsyncIterable(executeResult));
1028+
const iterator = executeResult[Symbol.asyncIterator]();
1029+
1030+
const result1 = await iterator.next();
1031+
expectJSON(result1).toDeepEqual({
1032+
done: false,
1033+
value: {
1034+
data: {
1035+
asyncIterableListDelayed: [
1036+
{
1037+
id: '1',
1038+
name: 'Luke',
1039+
},
1040+
],
1041+
},
1042+
hasNext: true,
1043+
},
1044+
});
1045+
1046+
iterator.throw?.(new Error('bad'));
1047+
1048+
// this result had started processing before return was called
1049+
const result2 = await iterator.next();
1050+
expectJSON(result2).toDeepEqual({
1051+
done: false,
1052+
value: {
1053+
data: {
1054+
id: '2',
1055+
name: 'Han',
1056+
},
1057+
hasNext: true,
1058+
path: ['asyncIterableListDelayed', 1],
1059+
},
1060+
});
1061+
1062+
// third result is not returned because async iterator has returned
1063+
const result3 = await iterator.next();
1064+
expectJSON(result3).toDeepEqual({
1065+
done: false,
1066+
value: {
1067+
hasNext: false,
1068+
},
1069+
});
1070+
});
8721071
});

src/execution/execute.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,10 +1410,14 @@ interface DispatcherResult {
14101410
export class Dispatcher {
14111411
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
14121412
_initialResult?: ExecutionResult;
1413+
_iterators: Array<AsyncIterator<unknown>>;
1414+
_isDone: boolean;
14131415
_hasReturnedInitialResult: boolean;
14141416

14151417
constructor() {
14161418
this._subsequentPayloads = [];
1419+
this._iterators = [];
1420+
this._isDone = false;
14171421
this._hasReturnedInitialResult = false;
14181422
}
14191423

@@ -1517,6 +1521,8 @@ export class Dispatcher {
15171521
label?: string,
15181522
): void {
15191523
const subsequentPayloads = this._subsequentPayloads;
1524+
const iterators = this._iterators;
1525+
iterators.push(iterator);
15201526
function next(index: number) {
15211527
const fieldPath = addPath(path, index, undefined);
15221528
const patchErrors: Array<GraphQLError> = [];
@@ -1526,6 +1532,7 @@ export class Dispatcher {
15261532
.then(
15271533
({ value: data, done }) => {
15281534
if (done) {
1535+
iterators.splice(iterators.indexOf(iterator), 1);
15291536
return { value: undefined, done: true };
15301537
}
15311538

@@ -1605,7 +1612,7 @@ export class Dispatcher {
16051612
}
16061613

16071614
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
1608-
if (this._subsequentPayloads.length === 0) {
1615+
if (this._subsequentPayloads.length === 0 || this._isDone) {
16091616
// async iterable resolver just finished and no more pending payloads
16101617
return Promise.resolve({
16111618
value: {
@@ -1666,6 +1673,20 @@ export class Dispatcher {
16661673
return this._race();
16671674
}
16681675

1676+
async _return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1677+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1678+
this._isDone = true;
1679+
return { value: undefined, done: true };
1680+
}
1681+
1682+
async _throw(
1683+
error?: unknown,
1684+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1685+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1686+
this._isDone = true;
1687+
return Promise.reject(error);
1688+
}
1689+
16691690
get(
16701691
initialResult: ExecutionResult,
16711692
): AsyncGenerator<AsyncExecutionResult, void, void> {
@@ -1675,13 +1696,8 @@ export class Dispatcher {
16751696
return this;
16761697
},
16771698
next: () => this._next(),
1678-
// TODO: implement return & throw
1679-
return: /* istanbul ignore next: will be covered in follow up */ () =>
1680-
Promise.resolve({ value: undefined, done: true }),
1681-
1682-
throw: /* istanbul ignore next: will be covered in follow up */ (
1683-
error?: unknown,
1684-
) => Promise.reject(error),
1699+
return: () => this._return(),
1700+
throw: (error?: unknown) => this._throw(error),
16851701
};
16861702
}
16871703
}

0 commit comments

Comments
 (0)