Skip to content

Commit d3e59b4

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent 4fedf70 commit d3e59b4

File tree

2 files changed

+238
-7
lines changed

2 files changed

+238
-7
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { describe, it } from 'mocha';
22

33
import { expectJSON } from '../../__testUtils__/expectJSON';
44

5+
import { invariant } from '../../jsutils/invariant';
56
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
67

78
import type { DocumentNode } from '../../language/ast';
@@ -104,6 +105,37 @@ const query = new GraphQLObjectType({
104105
yield await Promise.resolve({});
105106
},
106107
},
108+
asyncIterableListDelayed: {
109+
type: new GraphQLList(friendType),
110+
async *resolve() {
111+
for (const friend of friends) {
112+
// pause an additional ms before yielding to allow time
113+
// for tests to return or throw before next value is processed.
114+
// eslint-disable-next-line no-await-in-loop
115+
await new Promise((r) => setTimeout(r, 1));
116+
yield friend; /* c8 ignore start */
117+
// Not reachable, early return
118+
}
119+
} /* c8 ignore stop */,
120+
},
121+
asyncIterableListNoReturn: {
122+
type: new GraphQLList(friendType),
123+
resolve() {
124+
let i = 0;
125+
return {
126+
[Symbol.asyncIterator]: () => ({
127+
async next() {
128+
const friend = friends[i++];
129+
if (friend) {
130+
await new Promise((r) => setTimeout(r, 1));
131+
return { value: friend, done: false };
132+
}
133+
return { value: undefined, done: true };
134+
},
135+
}),
136+
};
137+
},
138+
},
107139
asyncIterableListDelayedClose: {
108140
type: new GraphQLList(friendType),
109141
async *resolve() {
@@ -989,4 +1021,181 @@ describe('Execute: stream directive', () => {
9891021
},
9901022
]);
9911023
});
1024+
it('Returns underlying async iterables when dispatcher is returned', async () => {
1025+
const document = parse(`
1026+
query {
1027+
asyncIterableListDelayed @stream(initialCount: 1) {
1028+
name
1029+
id
1030+
}
1031+
}
1032+
`);
1033+
const schema = new GraphQLSchema({ query, enableDeferStream: true });
1034+
1035+
const executeResult = await execute({ schema, document, rootValue: {} });
1036+
invariant(isAsyncIterable(executeResult));
1037+
const iterator = executeResult[Symbol.asyncIterator]();
1038+
1039+
const result1 = await iterator.next();
1040+
expectJSON(result1).toDeepEqual({
1041+
done: false,
1042+
value: {
1043+
data: {
1044+
asyncIterableListDelayed: [
1045+
{
1046+
id: '1',
1047+
name: 'Luke',
1048+
},
1049+
],
1050+
},
1051+
hasNext: true,
1052+
},
1053+
});
1054+
1055+
const returnPromise = iterator.return();
1056+
1057+
// this result had started processing before return was called
1058+
const result2 = await iterator.next();
1059+
expectJSON(result2).toDeepEqual({
1060+
done: false,
1061+
value: {
1062+
data: {
1063+
id: '2',
1064+
name: 'Han',
1065+
},
1066+
hasNext: true,
1067+
path: ['asyncIterableListDelayed', 1],
1068+
},
1069+
});
1070+
1071+
// third result is not returned because async iterator has returned
1072+
const result3 = await iterator.next();
1073+
expectJSON(result3).toDeepEqual({
1074+
done: false,
1075+
value: {
1076+
hasNext: false,
1077+
},
1078+
});
1079+
await returnPromise;
1080+
});
1081+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
1082+
const document = parse(`
1083+
query {
1084+
asyncIterableListNoReturn @stream(initialCount: 1) {
1085+
name
1086+
id
1087+
}
1088+
}
1089+
`);
1090+
const schema = new GraphQLSchema({ query, enableDeferStream: true });
1091+
1092+
const executeResult = await execute({ schema, document, rootValue: {} });
1093+
invariant(isAsyncIterable(executeResult));
1094+
const iterator = executeResult[Symbol.asyncIterator]();
1095+
1096+
const result1 = await iterator.next();
1097+
expectJSON(result1).toDeepEqual({
1098+
done: false,
1099+
value: {
1100+
data: {
1101+
asyncIterableListNoReturn: [
1102+
{
1103+
id: '1',
1104+
name: 'Luke',
1105+
},
1106+
],
1107+
},
1108+
hasNext: true,
1109+
},
1110+
});
1111+
1112+
const returnPromise = iterator.return();
1113+
1114+
// this result had started processing before return was called
1115+
const result2 = await iterator.next();
1116+
expectJSON(result2).toDeepEqual({
1117+
done: false,
1118+
value: {
1119+
data: {
1120+
id: '2',
1121+
name: 'Han',
1122+
},
1123+
hasNext: true,
1124+
path: ['asyncIterableListNoReturn', 1],
1125+
},
1126+
});
1127+
1128+
// third result is not returned because async iterator has returned
1129+
const result3 = await iterator.next();
1130+
expectJSON(result3).toDeepEqual({
1131+
done: false,
1132+
value: {
1133+
hasNext: false,
1134+
},
1135+
});
1136+
await returnPromise;
1137+
});
1138+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1139+
const document = parse(`
1140+
query {
1141+
asyncIterableListDelayed @stream(initialCount: 1) {
1142+
name
1143+
id
1144+
}
1145+
}
1146+
`);
1147+
const schema = new GraphQLSchema({ query, enableDeferStream: true });
1148+
1149+
const executeResult = await execute({ schema, document, rootValue: {} });
1150+
invariant(isAsyncIterable(executeResult));
1151+
const iterator = executeResult[Symbol.asyncIterator]();
1152+
1153+
const result1 = await iterator.next();
1154+
expectJSON(result1).toDeepEqual({
1155+
done: false,
1156+
value: {
1157+
data: {
1158+
asyncIterableListDelayed: [
1159+
{
1160+
id: '1',
1161+
name: 'Luke',
1162+
},
1163+
],
1164+
},
1165+
hasNext: true,
1166+
},
1167+
});
1168+
1169+
const throwPromise = iterator.throw(new Error('bad'));
1170+
1171+
// this result had started processing before return was called
1172+
const result2 = await iterator.next();
1173+
expectJSON(result2).toDeepEqual({
1174+
done: false,
1175+
value: {
1176+
data: {
1177+
id: '2',
1178+
name: 'Han',
1179+
},
1180+
hasNext: true,
1181+
path: ['asyncIterableListDelayed', 1],
1182+
},
1183+
});
1184+
1185+
// third result is not returned because async iterator has returned
1186+
const result3 = await iterator.next();
1187+
expectJSON(result3).toDeepEqual({
1188+
done: false,
1189+
value: {
1190+
hasNext: false,
1191+
},
1192+
});
1193+
try {
1194+
await throwPromise; /* c8 ignore start */
1195+
// Not reachable, always throws
1196+
/* c8 ignore stop */
1197+
} catch (e) {
1198+
// ignore error
1199+
}
1200+
});
9921201
});

src/execution/execute.ts

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,6 +1492,7 @@ function executeStreamIterator(
14921492
const asyncPayloadRecord = new AsyncPayloadRecord({
14931493
label,
14941494
path: fieldPath,
1495+
iterator,
14951496
});
14961497
const dataPromise: Promise<unknown> = iterator
14971498
.next()
@@ -1564,9 +1565,10 @@ function yieldSubsequentPayloads(
15641565
initialResult: ExecutionResult,
15651566
): AsyncGenerator<AsyncExecutionResult, void, void> {
15661567
let _hasReturnedInitialResult = false;
1568+
let isDone = false;
15671569

15681570
function race(): Promise<IteratorResult<AsyncExecutionResult>> {
1569-
if (exeContext.subsequentPayloads.length === 0) {
1571+
if (exeContext.subsequentPayloads.length === 0 || isDone) {
15701572
// async iterable resolver just finished and no more pending payloads
15711573
return Promise.resolve({
15721574
value: {
@@ -1637,12 +1639,26 @@ function yieldSubsequentPayloads(
16371639
}
16381640
return race();
16391641
},
1640-
// TODO: implement return & throw
1641-
return: /* istanbul ignore next: will be covered in follow up */ () =>
1642-
Promise.resolve({ value: undefined, done: true }),
1643-
throw: /* istanbul ignore next: will be covered in follow up */ (
1642+
async return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1643+
await Promise.all(
1644+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1645+
asyncPayloadRecord.iterator?.return?.(),
1646+
),
1647+
);
1648+
isDone = true;
1649+
return { value: undefined, done: true };
1650+
},
1651+
async throw(
16441652
error?: unknown,
1645-
) => Promise.reject(error),
1653+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1654+
await Promise.all(
1655+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1656+
asyncPayloadRecord.iterator?.return?.(),
1657+
),
1658+
);
1659+
isDone = true;
1660+
return Promise.reject(error);
1661+
},
16461662
};
16471663
}
16481664

@@ -1651,10 +1667,16 @@ class AsyncPayloadRecord {
16511667
label?: string;
16521668
path?: Path;
16531669
dataPromise?: Promise<unknown | null | undefined>;
1670+
iterator?: AsyncIterator<unknown>;
16541671
isCompletedIterator?: boolean;
1655-
constructor(opts: { label?: string; path?: Path }) {
1672+
constructor(opts: {
1673+
label?: string;
1674+
path?: Path;
1675+
iterator?: AsyncIterator<unknown>;
1676+
}) {
16561677
this.label = opts.label;
16571678
this.path = opts.path;
1679+
this.iterator = opts.iterator;
16581680
this.errors = [];
16591681
}
16601682

0 commit comments

Comments
 (0)