Skip to content

Commit 637e36b

Browse files
committed
Return underlying AsyncIterators when execute result is returned (graphql#2843)
# Conflicts: # src/execution/execute.ts
1 parent 4fa3d37 commit 637e36b

File tree

2 files changed

+223
-8
lines changed

2 files changed

+223
-8
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe, it } from 'mocha';
22

3+
import { invariant } from '../../jsutils/invariant';
34
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
45
import { parse } from '../../language/parser';
56

@@ -101,6 +102,36 @@ const query = new GraphQLObjectType({
101102
yield await Promise.resolve({});
102103
},
103104
},
105+
asyncIterableListDelayed: {
106+
type: new GraphQLList(friendType),
107+
async *resolve() {
108+
for (const friend of friends) {
109+
// pause an additional ms before yielding to allow time
110+
// for tests to return or throw before next value is processed.
111+
// eslint-disable-next-line no-await-in-loop
112+
await new Promise((r) => setTimeout(r, 1));
113+
yield friend;
114+
}
115+
},
116+
},
117+
asyncIterableListNoReturn: {
118+
type: new GraphQLList(friendType),
119+
resolve() {
120+
let i = 0;
121+
return {
122+
[Symbol.asyncIterator]: () => ({
123+
async next() {
124+
const friend = friends[i++];
125+
if (friend) {
126+
await new Promise((r) => setTimeout(r, 1));
127+
return { value: friend, done: false };
128+
}
129+
return { value: undefined, done: true };
130+
},
131+
}),
132+
};
133+
},
134+
},
104135
asyncIterableListDelayedClose: {
105136
type: new GraphQLList(friendType),
106137
async *resolve() {
@@ -995,4 +1026,172 @@ describe('Execute: stream directive', () => {
9951026
},
9961027
]);
9971028
});
1029+
it('Returns underlying async iterables when dispatcher is returned', async () => {
1030+
const document = parse(`
1031+
query {
1032+
asyncIterableListDelayed @stream(initialCount: 1) {
1033+
name
1034+
id
1035+
}
1036+
}
1037+
`);
1038+
const schema = new GraphQLSchema({ query });
1039+
1040+
const executeResult = await execute({ schema, document, rootValue: {} });
1041+
invariant(isAsyncIterable(executeResult));
1042+
const iterator = executeResult[Symbol.asyncIterator]();
1043+
1044+
const result1 = await iterator.next();
1045+
expectJSON(result1).toDeepEqual({
1046+
done: false,
1047+
value: {
1048+
data: {
1049+
asyncIterableListDelayed: [
1050+
{
1051+
id: '1',
1052+
name: 'Luke',
1053+
},
1054+
],
1055+
},
1056+
hasNext: true,
1057+
},
1058+
});
1059+
1060+
iterator.return?.();
1061+
1062+
// this result had started processing before return was called
1063+
const result2 = await iterator.next();
1064+
expectJSON(result2).toDeepEqual({
1065+
done: false,
1066+
value: {
1067+
data: {
1068+
id: '2',
1069+
name: 'Han',
1070+
},
1071+
hasNext: true,
1072+
path: ['asyncIterableListDelayed', 1],
1073+
},
1074+
});
1075+
1076+
// third result is not returned because async iterator has returned
1077+
const result3 = await iterator.next();
1078+
expectJSON(result3).toDeepEqual({
1079+
done: false,
1080+
value: {
1081+
hasNext: false,
1082+
},
1083+
});
1084+
});
1085+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
1086+
const document = parse(`
1087+
query {
1088+
asyncIterableListNoReturn @stream(initialCount: 1) {
1089+
name
1090+
id
1091+
}
1092+
}
1093+
`);
1094+
const schema = new GraphQLSchema({ query });
1095+
1096+
const executeResult = await execute({ schema, document, rootValue: {} });
1097+
invariant(isAsyncIterable(executeResult));
1098+
const iterator = executeResult[Symbol.asyncIterator]();
1099+
1100+
const result1 = await iterator.next();
1101+
expectJSON(result1).toDeepEqual({
1102+
done: false,
1103+
value: {
1104+
data: {
1105+
asyncIterableListNoReturn: [
1106+
{
1107+
id: '1',
1108+
name: 'Luke',
1109+
},
1110+
],
1111+
},
1112+
hasNext: true,
1113+
},
1114+
});
1115+
1116+
iterator.return?.();
1117+
1118+
// this result had started processing before return was called
1119+
const result2 = await iterator.next();
1120+
expectJSON(result2).toDeepEqual({
1121+
done: false,
1122+
value: {
1123+
data: {
1124+
id: '2',
1125+
name: 'Han',
1126+
},
1127+
hasNext: true,
1128+
path: ['asyncIterableListNoReturn', 1],
1129+
},
1130+
});
1131+
1132+
// third result is not returned because async iterator has returned
1133+
const result3 = await iterator.next();
1134+
expectJSON(result3).toDeepEqual({
1135+
done: false,
1136+
value: {
1137+
hasNext: false,
1138+
},
1139+
});
1140+
});
1141+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1142+
const document = parse(`
1143+
query {
1144+
asyncIterableListDelayed @stream(initialCount: 1) {
1145+
name
1146+
id
1147+
}
1148+
}
1149+
`);
1150+
const schema = new GraphQLSchema({ query });
1151+
1152+
const executeResult = await execute({ schema, document, rootValue: {} });
1153+
invariant(isAsyncIterable(executeResult));
1154+
const iterator = executeResult[Symbol.asyncIterator]();
1155+
1156+
const result1 = await iterator.next();
1157+
expectJSON(result1).toDeepEqual({
1158+
done: false,
1159+
value: {
1160+
data: {
1161+
asyncIterableListDelayed: [
1162+
{
1163+
id: '1',
1164+
name: 'Luke',
1165+
},
1166+
],
1167+
},
1168+
hasNext: true,
1169+
},
1170+
});
1171+
1172+
iterator.throw?.(new Error('bad'));
1173+
1174+
// this result had started processing before return was called
1175+
const result2 = await iterator.next();
1176+
expectJSON(result2).toDeepEqual({
1177+
done: false,
1178+
value: {
1179+
data: {
1180+
id: '2',
1181+
name: 'Han',
1182+
},
1183+
hasNext: true,
1184+
path: ['asyncIterableListDelayed', 1],
1185+
},
1186+
});
1187+
1188+
// third result is not returned because async iterator has returned
1189+
const result3 = await iterator.next();
1190+
expectJSON(result3).toDeepEqual({
1191+
done: false,
1192+
value: {
1193+
hasNext: false,
1194+
},
1195+
});
1196+
});
9981197
});

src/execution/execute.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,10 +1398,14 @@ interface DispatcherResult {
13981398
export class Dispatcher {
13991399
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
14001400
_initialResult?: ExecutionResult;
1401+
_iterators: Array<AsyncIterator<unknown>>;
1402+
_isDone: boolean;
14011403
_hasReturnedInitialResult: boolean;
14021404

14031405
constructor() {
14041406
this._subsequentPayloads = [];
1407+
this._iterators = [];
1408+
this._isDone = false;
14051409
this._hasReturnedInitialResult = false;
14061410
}
14071411

@@ -1545,6 +1549,8 @@ export class Dispatcher {
15451549
parentContext?: AsyncPayloadContext,
15461550
): void {
15471551
const subsequentPayloads = this._subsequentPayloads;
1552+
const iterators = this._iterators;
1553+
iterators.push(iterator);
15481554
function next(index: number) {
15491555
const fieldPath = addPath(path, index, undefined);
15501556
const asyncPayloadContext = new AsyncPayloadContext();
@@ -1553,6 +1559,7 @@ export class Dispatcher {
15531559
.then(
15541560
({ value: data, done }) => {
15551561
if (done) {
1562+
iterators.splice(iterators.indexOf(iterator), 1);
15561563
return { value: undefined, done: true };
15571564
}
15581565

@@ -1654,7 +1661,7 @@ export class Dispatcher {
16541661
}
16551662

16561663
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
1657-
if (this._subsequentPayloads.length === 0) {
1664+
if (this._subsequentPayloads.length === 0 || this._isDone) {
16581665
// async iterable resolver just finished and no more pending payloads
16591666
return Promise.resolve({
16601667
value: {
@@ -1715,6 +1722,20 @@ export class Dispatcher {
17151722
return this._race();
17161723
}
17171724

1725+
async _return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1726+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1727+
this._isDone = true;
1728+
return { value: undefined, done: true };
1729+
}
1730+
1731+
async _throw(
1732+
error?: unknown,
1733+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1734+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1735+
this._isDone = true;
1736+
return Promise.reject(error);
1737+
}
1738+
17181739
get(
17191740
initialResult: ExecutionResult,
17201741
): AsyncGenerator<AsyncExecutionResult, void, void> {
@@ -1724,13 +1745,8 @@ export class Dispatcher {
17241745
return this;
17251746
},
17261747
next: () => this._next(),
1727-
// TODO: implement return & throw
1728-
return: /* istanbul ignore next: will be covered in follow up */ () =>
1729-
Promise.resolve({ value: undefined, done: true }),
1730-
1731-
throw: /* istanbul ignore next: will be covered in follow up */ (
1732-
error?: unknown,
1733-
) => Promise.reject(error),
1748+
return: () => this._return(),
1749+
throw: (error?: unknown) => this._throw(error),
17341750
};
17351751
}
17361752
}

0 commit comments

Comments
 (0)