Skip to content

Commit 5b12f8a

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent 1f1c82e commit 5b12f8a

File tree

2 files changed

+233
-8
lines changed

2 files changed

+233
-8
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe, it } from 'mocha';
22

3+
import { invariant } from '../../jsutils/invariant';
34
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
45
import { parse } from '../../language/parser';
56

@@ -103,6 +104,37 @@ const query = new GraphQLObjectType({
103104
yield await Promise.resolve({});
104105
},
105106
},
107+
asyncIterableListDelayed: {
108+
type: new GraphQLList(friendType),
109+
async *resolve() {
110+
for (const friend of friends) {
111+
// pause an additional ms before yielding to allow time
112+
// for tests to return or throw before next value is processed.
113+
// eslint-disable-next-line no-await-in-loop
114+
await new Promise((r) => setTimeout(r, 1));
115+
yield friend; /* c8 ignore start */
116+
// Not reachable, early return
117+
}
118+
} /* c8 ignore stop */,
119+
},
120+
asyncIterableListNoReturn: {
121+
type: new GraphQLList(friendType),
122+
resolve() {
123+
let i = 0;
124+
return {
125+
[Symbol.asyncIterator]: () => ({
126+
async next() {
127+
const friend = friends[i++];
128+
if (friend) {
129+
await new Promise((r) => setTimeout(r, 1));
130+
return { value: friend, done: false };
131+
}
132+
return { value: undefined, done: true };
133+
},
134+
}),
135+
};
136+
},
137+
},
106138
asyncIterableListDelayedClose: {
107139
type: new GraphQLList(friendType),
108140
async *resolve() {
@@ -988,4 +1020,181 @@ describe('Execute: stream directive', () => {
9881020
},
9891021
]);
9901022
});
1023+
it('Returns underlying async iterables when dispatcher is returned', async () => {
1024+
const document = parse(`
1025+
query {
1026+
asyncIterableListDelayed @stream(initialCount: 1) {
1027+
name
1028+
id
1029+
}
1030+
}
1031+
`);
1032+
const schema = new GraphQLSchema({ query, enableDeferStream: true });
1033+
1034+
const executeResult = await execute({ schema, document, rootValue: {} });
1035+
invariant(isAsyncIterable(executeResult));
1036+
const iterator = executeResult[Symbol.asyncIterator]();
1037+
1038+
const result1 = await iterator.next();
1039+
expectJSON(result1).toDeepEqual({
1040+
done: false,
1041+
value: {
1042+
data: {
1043+
asyncIterableListDelayed: [
1044+
{
1045+
id: '1',
1046+
name: 'Luke',
1047+
},
1048+
],
1049+
},
1050+
hasNext: true,
1051+
},
1052+
});
1053+
1054+
const returnPromise = iterator.return();
1055+
1056+
// this result had started processing before return was called
1057+
const result2 = await iterator.next();
1058+
expectJSON(result2).toDeepEqual({
1059+
done: false,
1060+
value: {
1061+
data: {
1062+
id: '2',
1063+
name: 'Han',
1064+
},
1065+
hasNext: true,
1066+
path: ['asyncIterableListDelayed', 1],
1067+
},
1068+
});
1069+
1070+
// third result is not returned because async iterator has returned
1071+
const result3 = await iterator.next();
1072+
expectJSON(result3).toDeepEqual({
1073+
done: false,
1074+
value: {
1075+
hasNext: false,
1076+
},
1077+
});
1078+
await returnPromise;
1079+
});
1080+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
1081+
const document = parse(`
1082+
query {
1083+
asyncIterableListNoReturn @stream(initialCount: 1) {
1084+
name
1085+
id
1086+
}
1087+
}
1088+
`);
1089+
const schema = new GraphQLSchema({ query, enableDeferStream: true });
1090+
1091+
const executeResult = await execute({ schema, document, rootValue: {} });
1092+
invariant(isAsyncIterable(executeResult));
1093+
const iterator = executeResult[Symbol.asyncIterator]();
1094+
1095+
const result1 = await iterator.next();
1096+
expectJSON(result1).toDeepEqual({
1097+
done: false,
1098+
value: {
1099+
data: {
1100+
asyncIterableListNoReturn: [
1101+
{
1102+
id: '1',
1103+
name: 'Luke',
1104+
},
1105+
],
1106+
},
1107+
hasNext: true,
1108+
},
1109+
});
1110+
1111+
const returnPromise = iterator.return();
1112+
1113+
// this result had started processing before return was called
1114+
const result2 = await iterator.next();
1115+
expectJSON(result2).toDeepEqual({
1116+
done: false,
1117+
value: {
1118+
data: {
1119+
id: '2',
1120+
name: 'Han',
1121+
},
1122+
hasNext: true,
1123+
path: ['asyncIterableListNoReturn', 1],
1124+
},
1125+
});
1126+
1127+
// third result is not returned because async iterator has returned
1128+
const result3 = await iterator.next();
1129+
expectJSON(result3).toDeepEqual({
1130+
done: false,
1131+
value: {
1132+
hasNext: false,
1133+
},
1134+
});
1135+
await returnPromise;
1136+
});
1137+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1138+
const document = parse(`
1139+
query {
1140+
asyncIterableListDelayed @stream(initialCount: 1) {
1141+
name
1142+
id
1143+
}
1144+
}
1145+
`);
1146+
const schema = new GraphQLSchema({ query, enableDeferStream: true });
1147+
1148+
const executeResult = await execute({ schema, document, rootValue: {} });
1149+
invariant(isAsyncIterable(executeResult));
1150+
const iterator = executeResult[Symbol.asyncIterator]();
1151+
1152+
const result1 = await iterator.next();
1153+
expectJSON(result1).toDeepEqual({
1154+
done: false,
1155+
value: {
1156+
data: {
1157+
asyncIterableListDelayed: [
1158+
{
1159+
id: '1',
1160+
name: 'Luke',
1161+
},
1162+
],
1163+
},
1164+
hasNext: true,
1165+
},
1166+
});
1167+
1168+
const throwPromise = iterator.throw(new Error('bad'));
1169+
1170+
// this result had started processing before return was called
1171+
const result2 = await iterator.next();
1172+
expectJSON(result2).toDeepEqual({
1173+
done: false,
1174+
value: {
1175+
data: {
1176+
id: '2',
1177+
name: 'Han',
1178+
},
1179+
hasNext: true,
1180+
path: ['asyncIterableListDelayed', 1],
1181+
},
1182+
});
1183+
1184+
// third result is not returned because async iterator has returned
1185+
const result3 = await iterator.next();
1186+
expectJSON(result3).toDeepEqual({
1187+
done: false,
1188+
value: {
1189+
hasNext: false,
1190+
},
1191+
});
1192+
try {
1193+
await throwPromise; /* c8 ignore start */
1194+
// Not reachable, always throws
1195+
/* c8 ignore stop */
1196+
} catch (e) {
1197+
// ignore error
1198+
}
1199+
});
9911200
});

src/execution/execute.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,10 +1411,14 @@ interface DispatcherResult {
14111411
export class Dispatcher {
14121412
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
14131413
_initialResult?: ExecutionResult;
1414+
_iterators: Array<AsyncIterator<unknown>>;
1415+
_isDone: boolean;
14141416
_hasReturnedInitialResult: boolean;
14151417

14161418
constructor() {
14171419
this._subsequentPayloads = [];
1420+
this._iterators = [];
1421+
this._isDone = false;
14181422
this._hasReturnedInitialResult = false;
14191423
}
14201424

@@ -1538,6 +1542,8 @@ export class Dispatcher {
15381542
parentContext?: AsyncPayloadContext,
15391543
): void {
15401544
const subsequentPayloads = this._subsequentPayloads;
1545+
const iterators = this._iterators;
1546+
iterators.push(iterator);
15411547
function next(index: number) {
15421548
const fieldPath = addPath(path, index, undefined);
15431549
const asyncPayloadContext = new AsyncPayloadContext({
@@ -1549,6 +1555,7 @@ export class Dispatcher {
15491555
.then(
15501556
({ value: data, done }) => {
15511557
if (done) {
1558+
iterators.splice(iterators.indexOf(iterator), 1);
15521559
return { value: undefined, done: true };
15531560
}
15541561

@@ -1625,7 +1632,7 @@ export class Dispatcher {
16251632
}
16261633

16271634
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
1628-
if (this._subsequentPayloads.length === 0) {
1635+
if (this._subsequentPayloads.length === 0 || this._isDone) {
16291636
// async iterable resolver just finished and no more pending payloads
16301637
return Promise.resolve({
16311638
value: {
@@ -1686,6 +1693,20 @@ export class Dispatcher {
16861693
return this._race();
16871694
}
16881695

1696+
async _return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1697+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1698+
this._isDone = true;
1699+
return { value: undefined, done: true };
1700+
}
1701+
1702+
async _throw(
1703+
error?: unknown,
1704+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1705+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1706+
this._isDone = true;
1707+
return Promise.reject(error);
1708+
}
1709+
16891710
get(
16901711
initialResult: ExecutionResult,
16911712
): AsyncGenerator<AsyncExecutionResult, void, void> {
@@ -1695,13 +1716,8 @@ export class Dispatcher {
16951716
return this;
16961717
},
16971718
next: () => this._next(),
1698-
// TODO: implement return & throw
1699-
return: /* c8 ignore next: will be covered in follow up */ () =>
1700-
Promise.resolve({ value: undefined, done: true }),
1701-
1702-
throw: /* c8 ignore next: will be covered in follow up */ (
1703-
error?: unknown,
1704-
) => Promise.reject(error),
1719+
return: () => this._return(),
1720+
throw: (error?: unknown) => this._throw(error),
17051721
};
17061722
}
17071723
}

0 commit comments

Comments
 (0)