Skip to content

Commit a867bbc

Browse files
fix: forward operation directives to the subschema requests (#6420)
* fix: forward operation directives to the subschema requests * chore(dependencies): updated changesets for modified dependencies --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent b5849c4 commit a867bbc

File tree

12 files changed

+153
-23
lines changed

12 files changed

+153
-23
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@graphql-tools/delegate": patch
3+
---
4+
dependencies updates:
5+
- Added dependency [`@repeaterjs/repeater@^3.0.6` ↗︎](https://www.npmjs.com/package/@repeaterjs/repeater/v/3.0.6) (to `dependencies`)

.changeset/good-parents-push.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@graphql-tools/delegate': patch
3+
---
4+
5+
Pass operation directives correctly to the subschema;
6+
```graphql
7+
query {
8+
hello @someDir
9+
}
10+
```

.changeset/nice-parrots-kick.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@graphql-tools/executor': patch
3+
'@graphql-tools/utils': patch
4+
---
5+
6+
`mapAsyncIterator` now accepts `AsyncIterable`

packages/delegate/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
"@graphql-tools/executor": "^1.3.0",
5555
"@graphql-tools/schema": "^10.0.4",
5656
"@graphql-tools/utils": "^10.2.3",
57+
"@repeaterjs/repeater": "^3.0.6",
5758
"dataloader": "^2.2.2",
5859
"tslib": "^2.5.0"
5960
},

packages/delegate/src/createRequest.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ export function createRequest({
107107
newVariables,
108108
);
109109
}
110-
111-
const rootFieldName = targetFieldName ?? fieldNodes?.[0]?.name.value;
110+
const fieldNode = fieldNodes?.[0];
111+
const rootFieldName = targetFieldName ?? fieldNode?.name.value;
112112

113113
if (rootFieldName === undefined) {
114114
throw new Error(`Either "targetFieldName" or a non empty "fieldNodes" array must be provided.`);
@@ -122,6 +122,7 @@ export function createRequest({
122122
value: rootFieldName,
123123
},
124124
selectionSet: newSelectionSet,
125+
directives: fieldNode?.directives,
125126
};
126127

127128
const operationName: NameNode | undefined = targetOperationName

packages/delegate/src/delegateToSchema.ts

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
FieldDefinitionNode,
44
GraphQLOutputType,
55
GraphQLSchema,
6+
isListType,
67
OperationTypeNode,
78
validate,
89
} from 'graphql';
@@ -21,6 +22,7 @@ import {
2122
MaybeAsyncIterable,
2223
memoize1,
2324
} from '@graphql-tools/utils';
25+
import { Repeater } from '@repeaterjs/repeater';
2426
import { applySchemaTransforms } from './applySchemaTransforms.js';
2527
import { createRequest, getDelegatingOperation } from './createRequest.js';
2628
import { Subschema } from './Subschema.js';
@@ -105,9 +107,44 @@ export function delegateRequest<
105107

106108
function handleExecutorResult(executorResult: MaybeAsyncIterable<ExecutionResult<any>>) {
107109
if (isAsyncIterable(executorResult)) {
108-
const iterator = executorResult[Symbol.asyncIterator]();
109-
// "subscribe" to the subscription result and map the result through the transforms
110-
return mapAsyncIterator(iterator, result => transformer.transformResult(result));
110+
// This might be a stream
111+
if (delegationContext.operation === 'query' && isListType(delegationContext.returnType)) {
112+
return new Repeater<ExecutionResult<any>>(async (push, stop) => {
113+
const pushed = new WeakSet();
114+
let stopped = false;
115+
stop.finally(() => {
116+
stopped = true;
117+
});
118+
try {
119+
for await (const result of executorResult) {
120+
if (stopped) {
121+
break;
122+
}
123+
const transformedResult = await transformer.transformResult(result);
124+
// @stream needs to get the results one by one
125+
if (Array.isArray(transformedResult)) {
126+
for (const individualResult$ of transformedResult) {
127+
if (stopped) {
128+
break;
129+
}
130+
const individualResult = await individualResult$;
131+
// Avoid pushing the same result multiple times
132+
if (!pushed.has(individualResult)) {
133+
pushed.add(individualResult);
134+
await push(individualResult);
135+
}
136+
}
137+
} else {
138+
await push(await transformedResult);
139+
}
140+
}
141+
stop();
142+
} catch (error) {
143+
stop(error);
144+
}
145+
});
146+
}
147+
return mapAsyncIterator(executorResult, result => transformer.transformResult(result));
111148
}
112149
return transformer.transformResult(executorResult);
113150
}

packages/executor/src/execution/execute.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1641,7 +1641,7 @@ function mapSourceToResponse(
16411641
// "ExecuteQuery" algorithm, for which `execute` is also used.
16421642
return flattenAsyncIterable(
16431643
mapAsyncIterator(
1644-
resultOrStream[Symbol.asyncIterator](),
1644+
resultOrStream,
16451645
async (payload: unknown) =>
16461646
ensureAsyncIterable(await executeImpl(buildPerEventExecutionContext(exeContext, payload))),
16471647
(error: Error) => {

packages/federation/test/__snapshots__/defer-stream.test.ts.snap

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ exports[`Defer/Stream streams: stream 1`] = `
291291
],
292292
},
293293
{
294-
"hasNext": false,
294+
"hasNext": true,
295295
"incremental": [
296296
{
297297
"items": [
@@ -317,5 +317,8 @@ exports[`Defer/Stream streams: stream 1`] = `
317317
},
318318
],
319319
},
320+
{
321+
"hasNext": false,
322+
},
320323
]
321324
`;

packages/federation/test/defer-stream.test.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import { inspect } from 'util';
22
import { GraphQLSchema, parse, print } from 'graphql';
3+
import { createYoga } from 'graphql-yoga';
34
import _ from 'lodash';
45
import { IntrospectAndCompose, LocalGraphQLDataSource } from '@apollo/gateway';
56
import { buildSubgraphSchema } from '@apollo/subgraph';
6-
import { createDefaultExecutor } from '@graphql-tools/delegate';
77
import { normalizedExecutor } from '@graphql-tools/executor';
8+
import { buildHTTPExecutor } from '@graphql-tools/executor-http';
89
import { asArray, ExecutionResult, mergeDeep } from '@graphql-tools/utils';
10+
import { useDeferStream } from '@graphql-yoga/plugin-defer-stream';
911
import { assertAsyncIterable } from '../../loaders/url/tests/test-utils';
1012
import { getStitchedSchemaFromSupergraphSdl } from '../src/supergraph';
1113

@@ -98,6 +100,10 @@ describe('Defer/Stream', () => {
98100
},
99101
},
100102
});
103+
const usersServer = createYoga({
104+
schema: usersSubgraph,
105+
plugins: [useDeferStream()],
106+
});
101107
const postsSubgraph = buildSubgraphSchema({
102108
typeDefs: parse(/* GraphQL */ `
103109
type Query {
@@ -141,6 +147,10 @@ describe('Defer/Stream', () => {
141147
},
142148
},
143149
});
150+
const postsServer = createYoga({
151+
schema: postsSubgraph,
152+
plugins: [useDeferStream()],
153+
});
144154
let schema: GraphQLSchema;
145155
let finalResult: ExecutionResult;
146156
beforeAll(async () => {
@@ -163,7 +173,10 @@ describe('Defer/Stream', () => {
163173
onSubschemaConfig(subschemaConfig) {
164174
const subgraphName = subschemaConfig.name.toLowerCase();
165175
if (subgraphName === 'users') {
166-
const origExecutor = createDefaultExecutor(usersSubgraph);
176+
const origExecutor = buildHTTPExecutor({
177+
endpoint: 'http://localhost:4001/graphql',
178+
fetch: usersServer.fetch,
179+
});
167180
subschemaConfig.executor = async function usersExecutor(execReq) {
168181
const result = await origExecutor(execReq);
169182
if (process.env['DEBUG']) {
@@ -176,7 +189,10 @@ describe('Defer/Stream', () => {
176189
return result;
177190
};
178191
} else if (subgraphName === 'posts') {
179-
const origExecutor = createDefaultExecutor(postsSubgraph);
192+
const origExecutor = buildHTTPExecutor({
193+
endpoint: 'http://localhost:4002/graphql',
194+
fetch: postsServer.fetch,
195+
});
180196
subschemaConfig.executor = async function postsExecutor(execReq) {
181197
const result = await origExecutor(execReq);
182198
if (process.env['DEBUG']) {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { parse, print } from 'graphql';
2+
import { normalizedExecutor } from '@graphql-tools/executor';
3+
import { makeExecutableSchema } from '@graphql-tools/schema';
4+
import { stitchSchemas } from '../src/stitchSchemas';
5+
6+
describe('Operation Directives', () => {
7+
it('sends the directives to the subschema operations', async () => {
8+
const schema = makeExecutableSchema({
9+
typeDefs: /* GraphQL */ `
10+
directive @strExpr on FIELD
11+
type Query {
12+
hello: String
13+
}
14+
`,
15+
resolvers: {
16+
Query: {
17+
hello: (_source, _args, _context, info) => {
18+
return print(info.operation);
19+
},
20+
},
21+
},
22+
});
23+
24+
const gateway = stitchSchemas({
25+
subschemas: [schema],
26+
mergeDirectives: true,
27+
});
28+
29+
const res = await normalizedExecutor({
30+
schema: gateway,
31+
document: parse(/* GraphQL */ `
32+
query getHello {
33+
hello @strExpr
34+
}
35+
`),
36+
});
37+
38+
expect(res).toEqual({
39+
data: {
40+
hello: /* GraphQL */ `
41+
query getHello {
42+
__typename
43+
hello @strExpr
44+
}`.trim(),
45+
},
46+
});
47+
});
48+
});

0 commit comments

Comments
 (0)