Skip to content

Commit d582cbe

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent 8cc1503 commit d582cbe

File tree

2 files changed

+223
-8
lines changed

2 files changed

+223
-8
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe, it } from 'mocha';
22

3+
import { invariant } from '../../jsutils/invariant';
34
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
45
import { parse } from '../../language/parser';
56

@@ -101,6 +102,36 @@ const query = new GraphQLObjectType({
101102
yield await Promise.resolve({});
102103
},
103104
},
105+
asyncIterableListDelayed: {
106+
type: new GraphQLList(friendType),
107+
async *resolve() {
108+
for (const friend of friends) {
109+
// pause an additional ms before yielding to allow time
110+
// for tests to return or throw before next value is processed.
111+
// eslint-disable-next-line no-await-in-loop
112+
await new Promise((r) => setTimeout(r, 1));
113+
yield friend;
114+
}
115+
},
116+
},
117+
asyncIterableListNoReturn: {
118+
type: new GraphQLList(friendType),
119+
resolve() {
120+
let i = 0;
121+
return {
122+
[Symbol.asyncIterator]: () => ({
123+
async next() {
124+
const friend = friends[i++];
125+
if (friend) {
126+
await new Promise((r) => setTimeout(r, 1));
127+
return { value: friend, done: false };
128+
}
129+
return { value: undefined, done: true };
130+
},
131+
}),
132+
};
133+
},
134+
},
104135
asyncIterableListDelayedClose: {
105136
type: new GraphQLList(friendType),
106137
async *resolve() {
@@ -1011,4 +1042,172 @@ describe('Execute: stream directive', () => {
10111042
},
10121043
]);
10131044
});
1045+
it('Returns underlying async iterables when dispatcher is returned', async () => {
1046+
const document = parse(`
1047+
query {
1048+
asyncIterableListDelayed @stream(initialCount: 1) {
1049+
name
1050+
id
1051+
}
1052+
}
1053+
`);
1054+
const schema = new GraphQLSchema({ query, enableDeferStream: true });
1055+
1056+
const executeResult = await execute({ schema, document, rootValue: {} });
1057+
invariant(isAsyncIterable(executeResult));
1058+
const iterator = executeResult[Symbol.asyncIterator]();
1059+
1060+
const result1 = await iterator.next();
1061+
expectJSON(result1).toDeepEqual({
1062+
done: false,
1063+
value: {
1064+
data: {
1065+
asyncIterableListDelayed: [
1066+
{
1067+
id: '1',
1068+
name: 'Luke',
1069+
},
1070+
],
1071+
},
1072+
hasNext: true,
1073+
},
1074+
});
1075+
1076+
iterator.return?.();
1077+
1078+
// this result had started processing before return was called
1079+
const result2 = await iterator.next();
1080+
expectJSON(result2).toDeepEqual({
1081+
done: false,
1082+
value: {
1083+
data: {
1084+
id: '2',
1085+
name: 'Han',
1086+
},
1087+
hasNext: true,
1088+
path: ['asyncIterableListDelayed', 1],
1089+
},
1090+
});
1091+
1092+
// third result is not returned because async iterator has returned
1093+
const result3 = await iterator.next();
1094+
expectJSON(result3).toDeepEqual({
1095+
done: false,
1096+
value: {
1097+
hasNext: false,
1098+
},
1099+
});
1100+
});
1101+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
1102+
const document = parse(`
1103+
query {
1104+
asyncIterableListNoReturn @stream(initialCount: 1) {
1105+
name
1106+
id
1107+
}
1108+
}
1109+
`);
1110+
const schema = new GraphQLSchema({ query, enableDeferStream: true });
1111+
1112+
const executeResult = await execute({ schema, document, rootValue: {} });
1113+
invariant(isAsyncIterable(executeResult));
1114+
const iterator = executeResult[Symbol.asyncIterator]();
1115+
1116+
const result1 = await iterator.next();
1117+
expectJSON(result1).toDeepEqual({
1118+
done: false,
1119+
value: {
1120+
data: {
1121+
asyncIterableListNoReturn: [
1122+
{
1123+
id: '1',
1124+
name: 'Luke',
1125+
},
1126+
],
1127+
},
1128+
hasNext: true,
1129+
},
1130+
});
1131+
1132+
iterator.return?.();
1133+
1134+
// this result had started processing before return was called
1135+
const result2 = await iterator.next();
1136+
expectJSON(result2).toDeepEqual({
1137+
done: false,
1138+
value: {
1139+
data: {
1140+
id: '2',
1141+
name: 'Han',
1142+
},
1143+
hasNext: true,
1144+
path: ['asyncIterableListNoReturn', 1],
1145+
},
1146+
});
1147+
1148+
// third result is not returned because async iterator has returned
1149+
const result3 = await iterator.next();
1150+
expectJSON(result3).toDeepEqual({
1151+
done: false,
1152+
value: {
1153+
hasNext: false,
1154+
},
1155+
});
1156+
});
1157+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1158+
const document = parse(`
1159+
query {
1160+
asyncIterableListDelayed @stream(initialCount: 1) {
1161+
name
1162+
id
1163+
}
1164+
}
1165+
`);
1166+
const schema = new GraphQLSchema({ query, enableDeferStream: true });
1167+
1168+
const executeResult = await execute({ schema, document, rootValue: {} });
1169+
invariant(isAsyncIterable(executeResult));
1170+
const iterator = executeResult[Symbol.asyncIterator]();
1171+
1172+
const result1 = await iterator.next();
1173+
expectJSON(result1).toDeepEqual({
1174+
done: false,
1175+
value: {
1176+
data: {
1177+
asyncIterableListDelayed: [
1178+
{
1179+
id: '1',
1180+
name: 'Luke',
1181+
},
1182+
],
1183+
},
1184+
hasNext: true,
1185+
},
1186+
});
1187+
1188+
iterator.throw?.(new Error('bad'));
1189+
1190+
// this result had started processing before return was called
1191+
const result2 = await iterator.next();
1192+
expectJSON(result2).toDeepEqual({
1193+
done: false,
1194+
value: {
1195+
data: {
1196+
id: '2',
1197+
name: 'Han',
1198+
},
1199+
hasNext: true,
1200+
path: ['asyncIterableListDelayed', 1],
1201+
},
1202+
});
1203+
1204+
// third result is not returned because async iterator has returned
1205+
const result3 = await iterator.next();
1206+
expectJSON(result3).toDeepEqual({
1207+
done: false,
1208+
value: {
1209+
hasNext: false,
1210+
},
1211+
});
1212+
});
10141213
});

src/execution/execute.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,10 +1401,14 @@ interface DispatcherResult {
14011401
export class Dispatcher {
14021402
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
14031403
_initialResult?: ExecutionResult;
1404+
_iterators: Array<AsyncIterator<unknown>>;
1405+
_isDone: boolean;
14041406
_hasReturnedInitialResult: boolean;
14051407

14061408
constructor() {
14071409
this._subsequentPayloads = [];
1410+
this._iterators = [];
1411+
this._isDone = false;
14081412
this._hasReturnedInitialResult = false;
14091413
}
14101414

@@ -1528,6 +1532,8 @@ export class Dispatcher {
15281532
parentContext?: AsyncPayloadContext,
15291533
): void {
15301534
const subsequentPayloads = this._subsequentPayloads;
1535+
const iterators = this._iterators;
1536+
iterators.push(iterator);
15311537
function next(index: number) {
15321538
const fieldPath = addPath(path, index, undefined);
15331539
const asyncPayloadContext = new AsyncPayloadContext({
@@ -1539,6 +1545,7 @@ export class Dispatcher {
15391545
.then(
15401546
({ value: data, done }) => {
15411547
if (done) {
1548+
iterators.splice(iterators.indexOf(iterator), 1);
15421549
return { value: undefined, done: true };
15431550
}
15441551

@@ -1615,7 +1622,7 @@ export class Dispatcher {
16151622
}
16161623

16171624
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
1618-
if (this._subsequentPayloads.length === 0) {
1625+
if (this._subsequentPayloads.length === 0 || this._isDone) {
16191626
// async iterable resolver just finished and no more pending payloads
16201627
return Promise.resolve({
16211628
value: {
@@ -1676,6 +1683,20 @@ export class Dispatcher {
16761683
return this._race();
16771684
}
16781685

1686+
async _return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1687+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1688+
this._isDone = true;
1689+
return { value: undefined, done: true };
1690+
}
1691+
1692+
async _throw(
1693+
error?: unknown,
1694+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1695+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1696+
this._isDone = true;
1697+
return Promise.reject(error);
1698+
}
1699+
16791700
get(
16801701
initialResult: ExecutionResult,
16811702
): AsyncGenerator<AsyncExecutionResult, void, void> {
@@ -1685,13 +1706,8 @@ export class Dispatcher {
16851706
return this;
16861707
},
16871708
next: () => this._next(),
1688-
// TODO: implement return & throw
1689-
return: /* istanbul ignore next: will be covered in follow up */ () =>
1690-
Promise.resolve({ value: undefined, done: true }),
1691-
1692-
throw: /* istanbul ignore next: will be covered in follow up */ (
1693-
error?: unknown,
1694-
) => Promise.reject(error),
1709+
return: () => this._return(),
1710+
throw: (error?: unknown) => this._throw(error),
16951711
};
16961712
}
16971713
}

0 commit comments

Comments
 (0)