Skip to content

Commit 4087109

Browse files
feat(cache-redis): add OTEL traces (#8830)
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent f19de07 commit 4087109

File tree

8 files changed

+362
-128
lines changed

8 files changed

+362
-128
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@graphql-mesh/cache-redis": patch
3+
---
4+
dependencies updates:
5+
- Added dependency [`@opentelemetry/api@^1.9.0` ↗︎](https://www.npmjs.com/package/@opentelemetry/api/v/1.9.0) (to `dependencies`)

.changeset/shiny-paths-flow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@graphql-mesh/cache-redis': minor
3+
---
4+
5+
Add support of OTEL tracing with spans for Get, Set, Delete and initialisation.

packages/cache/redis/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"@graphql-mesh/cross-helpers": "^0.4.10",
3939
"@graphql-mesh/string-interpolation": "0.5.9",
4040
"@graphql-mesh/types": "^0.104.14",
41+
"@opentelemetry/api": "^1.9.0",
4142
"@whatwg-node/disposablestack": "^0.0.6",
4243
"ioredis": "^5.3.2",
4344
"ioredis-mock": "^8.13.1",

packages/cache/redis/src/index.ts

Lines changed: 142 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
type MeshPubSub,
1212
type YamlConfig,
1313
} from '@graphql-mesh/types';
14+
import { trace, type Tracer } from '@opentelemetry/api';
1415
import { DisposableSymbols } from '@whatwg-node/disposablestack';
1516

1617
function interpolateStrWithEnv(str: string): string {
@@ -19,113 +20,123 @@ function interpolateStrWithEnv(str: string): string {
1920

2021
export default class RedisCache<V = string> implements KeyValueCache<V>, Disposable {
2122
private client: Redis | Cluster;
23+
private tracer: Tracer;
2224

2325
constructor(
2426
options: YamlConfig.Cache['redis'] & { pubsub?: MeshPubSub | HivePubSub; logger: Logger },
2527
) {
26-
const lazyConnect = options.lazyConnect !== false;
27-
if ('startupNodes' in options) {
28-
const parsedUsername =
29-
interpolateStrWithEnv(options.username?.toString()) || process.env.REDIS_USERNAME;
30-
const parsedPassword =
31-
interpolateStrWithEnv(options.password?.toString()) || process.env.REDIS_PASSWORD;
32-
const parsedDb = interpolateStrWithEnv(options.db?.toString()) || process.env.REDIS_DB;
33-
const numDb = parseInt(parsedDb);
34-
this.client = new Redis.Cluster(
35-
options.startupNodes.map(s => ({
36-
host: s.host && interpolateStrWithEnv(s.host),
37-
port: s.port && parseInt(interpolateStrWithEnv(s.port)),
38-
family: s.family && parseInt(interpolateStrWithEnv(s.family)),
39-
})),
40-
{
41-
dnsLookup: options.dnsLookupAsIs
42-
? (address, callback) => callback(null, address)
43-
: undefined,
44-
redisOptions: {
45-
username: parsedUsername,
46-
password: parsedPassword,
47-
db: isNaN(numDb) ? undefined : numDb,
28+
this.tracer = trace.getTracer('hive.cache.redis');
29+
this.tracer.startActiveSpan('hive.cache.redis.init', span => {
30+
try {
31+
const lazyConnect = options.lazyConnect !== false;
32+
if ('startupNodes' in options) {
33+
const parsedUsername =
34+
interpolateStrWithEnv(options.username?.toString()) || process.env.REDIS_USERNAME;
35+
const parsedPassword =
36+
interpolateStrWithEnv(options.password?.toString()) || process.env.REDIS_PASSWORD;
37+
const parsedDb = interpolateStrWithEnv(options.db?.toString()) || process.env.REDIS_DB;
38+
const numDb = parseInt(parsedDb);
39+
this.client = new Redis.Cluster(
40+
options.startupNodes.map(s => ({
41+
host: s.host && interpolateStrWithEnv(s.host),
42+
port: s.port && parseInt(interpolateStrWithEnv(s.port)),
43+
family: s.family && parseInt(interpolateStrWithEnv(s.family)),
44+
})),
45+
{
46+
dnsLookup: options.dnsLookupAsIs
47+
? (address, callback) => callback(null, address)
48+
: undefined,
49+
redisOptions: {
50+
username: parsedUsername,
51+
password: parsedPassword,
52+
db: isNaN(numDb) ? undefined : numDb,
53+
enableAutoPipelining: true,
54+
...(lazyConnect ? { lazyConnect: true } : {}),
55+
tls: options.tls ? {} : undefined,
56+
},
57+
enableAutoPipelining: true,
58+
enableOfflineQueue: true,
59+
...(lazyConnect ? { lazyConnect: true } : {}),
60+
},
61+
);
62+
} else if ('sentinels' in options) {
63+
this.client = new Redis({
64+
name: options.name,
65+
sentinelPassword:
66+
options.sentinelPassword && interpolateStrWithEnv(options.sentinelPassword),
67+
sentinels: options.sentinels.map(s => ({
68+
host: s.host && interpolateStrWithEnv(s.host),
69+
port: s.port && parseInt(interpolateStrWithEnv(s.port)),
70+
family: s.family && parseInt(interpolateStrWithEnv(s.family)),
71+
})),
72+
role: options.role,
73+
enableTLSForSentinelMode: options.enableTLSForSentinelMode,
4874
enableAutoPipelining: true,
49-
...(lazyConnect ? { lazyConnect: true } : {}),
50-
tls: options.tls ? {} : undefined,
51-
},
52-
enableAutoPipelining: true,
53-
enableOfflineQueue: true,
54-
...(lazyConnect ? { lazyConnect: true } : {}),
55-
},
56-
);
57-
} else if ('sentinels' in options) {
58-
this.client = new Redis({
59-
name: options.name,
60-
sentinelPassword:
61-
options.sentinelPassword && interpolateStrWithEnv(options.sentinelPassword),
62-
sentinels: options.sentinels.map(s => ({
63-
host: s.host && interpolateStrWithEnv(s.host),
64-
port: s.port && parseInt(interpolateStrWithEnv(s.port)),
65-
family: s.family && parseInt(interpolateStrWithEnv(s.family)),
66-
})),
67-
role: options.role,
68-
enableTLSForSentinelMode: options.enableTLSForSentinelMode,
69-
enableAutoPipelining: true,
70-
enableOfflineQueue: true,
71-
lazyConnect,
72-
});
73-
} else if (options.url) {
74-
const redisUrl = new URL(interpolateStrWithEnv(options.url));
75+
enableOfflineQueue: true,
76+
lazyConnect,
77+
});
78+
} else if (options.url) {
79+
const redisUrl = new URL(interpolateStrWithEnv(options.url));
7580

76-
if (!['redis:', 'rediss:'].includes(redisUrl.protocol)) {
77-
throw new Error('Redis URL must use either redis:// or rediss://');
78-
}
81+
if (!['redis:', 'rediss:'].includes(redisUrl.protocol)) {
82+
throw new Error('Redis URL must use either redis:// or rediss://');
83+
}
7984

80-
if (lazyConnect) {
81-
redisUrl.searchParams.set('lazyConnect', 'true');
82-
}
85+
if (lazyConnect) {
86+
redisUrl.searchParams.set('lazyConnect', 'true');
87+
}
8388

84-
redisUrl.searchParams.set('enableAutoPipelining', 'true');
85-
redisUrl.searchParams.set('enableOfflineQueue', 'true');
86-
const IPV6_REGEX =
87-
/^(?:(?:[a-fA-F\d]{1,4}:){7}(?:[a-fA-F\d]{1,4}|:)|(?:[a-fA-F\d]{1,4}:){6}(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|:[a-fA-F\d]{1,4}|:)|(?:[a-fA-F\d]{1,4}:){5}(?::(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,2}|:)|(?:[a-fA-F\d]{1,4}:){4}(?:(?::[a-fA-F\d]{1,4}){0,1}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,3}|:)|(?:[a-fA-F\d]{1,4}:){3}(?:(?::[a-fA-F\d]{1,4}){0,2}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,4}|:)|(?:[a-fA-F\d]{1,4}:){2}(?:(?::[a-fA-F\d]{1,4}){0,3}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,5}|:)|(?:[a-fA-F\d]{1,4}:){1}(?:(?::[a-fA-F\d]{1,4}){0,4}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,6}|:)|(?::(?:(?::[a-fA-F\d]{1,4}){0,5}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,7}|:)))(?:%[0-9a-zA-Z]{1,})?$/gm;
88-
if (IPV6_REGEX.test(redisUrl.hostname)) {
89-
redisUrl.searchParams.set('family', '6');
90-
}
91-
const urlStr = redisUrl.toString();
92-
options.logger.debug(`Connecting to Redis at ${urlStr}`);
93-
this.client = new Redis(urlStr);
94-
} else {
95-
const parsedHost = interpolateStrWithEnv(options.host?.toString()) || process.env.REDIS_HOST;
96-
const parsedPort = interpolateStrWithEnv(options.port?.toString()) || process.env.REDIS_PORT;
97-
const parsedUsername =
98-
interpolateStrWithEnv(options.username?.toString()) || process.env.REDIS_USERNAME;
99-
const parsedPassword =
100-
interpolateStrWithEnv(options.password?.toString()) || process.env.REDIS_PASSWORD;
101-
const parsedDb = interpolateStrWithEnv(options.db?.toString()) || process.env.REDIS_DB;
102-
const parsedFamily =
103-
interpolateStrWithEnv(options.family?.toString()) || process.env.REDIS_FAMILY;
104-
const numPort = parseInt(parsedPort);
105-
const numDb = parseInt(parsedDb);
106-
if (parsedHost) {
107-
options.logger.debug(`Connecting to Redis at ${parsedHost}:${parsedPort}`);
108-
this.client = new Redis({
109-
host: parsedHost,
110-
port: isNaN(numPort) ? undefined : numPort,
111-
username: parsedUsername,
112-
password: parsedPassword,
113-
db: isNaN(numDb) ? undefined : numDb,
114-
family: parsedFamily === '6' ? 6 : undefined,
115-
...(lazyConnect ? { lazyConnect: true } : {}),
116-
enableAutoPipelining: true,
117-
enableOfflineQueue: true,
89+
redisUrl.searchParams.set('enableAutoPipelining', 'true');
90+
redisUrl.searchParams.set('enableOfflineQueue', 'true');
91+
const IPV6_REGEX =
92+
/^(?:(?:[a-fA-F\d]{1,4}:){7}(?:[a-fA-F\d]{1,4}|:)|(?:[a-fA-F\d]{1,4}:){6}(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|:[a-fA-F\d]{1,4}|:)|(?:[a-fA-F\d]{1,4}:){5}(?::(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,2}|:)|(?:[a-fA-F\d]{1,4}:){4}(?:(?::[a-fA-F\d]{1,4}){0,1}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,3}|:)|(?:[a-fA-F\d]{1,4}:){3}(?:(?::[a-fA-F\d]{1,4}){0,2}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,4}|:)|(?:[a-fA-F\d]{1,4}:){2}(?:(?::[a-fA-F\d]{1,4}){0,3}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,5}|:)|(?:[a-fA-F\d]{1,4}:){1}(?:(?::[a-fA-F\d]{1,4}){0,4}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,6}|:)|(?::(?:(?::[a-fA-F\d]{1,4}){0,5}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,7}|:)))(?:%[0-9a-zA-Z]{1,})?$/gm;
93+
if (IPV6_REGEX.test(redisUrl.hostname)) {
94+
redisUrl.searchParams.set('family', '6');
95+
}
96+
const urlStr = redisUrl.toString();
97+
safelyLogURL(options.logger, urlStr);
98+
this.client = new Redis(urlStr);
99+
} else {
100+
const parsedHost =
101+
interpolateStrWithEnv(options.host?.toString()) || process.env.REDIS_HOST;
102+
const parsedPort =
103+
interpolateStrWithEnv(options.port?.toString()) || process.env.REDIS_PORT;
104+
const parsedUsername =
105+
interpolateStrWithEnv(options.username?.toString()) || process.env.REDIS_USERNAME;
106+
const parsedPassword =
107+
interpolateStrWithEnv(options.password?.toString()) || process.env.REDIS_PASSWORD;
108+
const parsedDb = interpolateStrWithEnv(options.db?.toString()) || process.env.REDIS_DB;
109+
const parsedFamily =
110+
interpolateStrWithEnv(options.family?.toString()) || process.env.REDIS_FAMILY;
111+
const numPort = parseInt(parsedPort);
112+
const numDb = parseInt(parsedDb);
113+
if (parsedHost) {
114+
options.logger.debug(`Connecting to Redis at ${parsedHost}:${parsedPort}`);
115+
this.client = new Redis({
116+
host: parsedHost,
117+
port: isNaN(numPort) ? undefined : numPort,
118+
username: parsedUsername,
119+
password: parsedPassword,
120+
db: isNaN(numDb) ? undefined : numDb,
121+
family: parsedFamily === '6' ? 6 : undefined,
122+
...(lazyConnect ? { lazyConnect: true } : {}),
123+
enableAutoPipelining: true,
124+
enableOfflineQueue: true,
125+
});
126+
} else {
127+
options.logger.debug(`Connecting to Redis mock`);
128+
this.client = new RedisMock();
129+
}
130+
}
131+
const pubsub = toMeshPubSub(options.pubsub);
132+
// TODO: PubSub.destroy will no longer be needed after v0
133+
const id = pubsub?.subscribe('destroy', () => {
134+
this.client.disconnect(false);
135+
pubsub.unsubscribe(id);
118136
});
119-
} else {
120-
options.logger.debug(`Connecting to Redis mock`);
121-
this.client = new RedisMock();
137+
} finally {
138+
span.end();
122139
}
123-
}
124-
const pubsub = toMeshPubSub(options.pubsub);
125-
// TODO: PubSub.destroy will no longer be needed after v0
126-
const id = pubsub?.subscribe('destroy', () => {
127-
this.client.disconnect(false);
128-
pubsub.unsubscribe(id);
129140
});
130141
}
131142

@@ -134,26 +145,42 @@ export default class RedisCache<V = string> implements KeyValueCache<V>, Disposa
134145
}
135146

136147
set(key: string, value: V, options?: KeyValueCacheSetOptions): Promise<any> {
137-
const stringifiedValue = JSON.stringify(value);
138-
if (options?.ttl && options.ttl > 0) {
139-
return this.client.set(key, stringifiedValue, 'PX', options.ttl * 1000);
140-
} else {
141-
return this.client.set(key, stringifiedValue);
142-
}
148+
return this.tracer.startActiveSpan('hive.cache.set', async span => {
149+
try {
150+
const stringifiedValue = JSON.stringify(value);
151+
if (options?.ttl && options.ttl > 0) {
152+
return await this.client.set(key, stringifiedValue, 'PX', options.ttl * 1000);
153+
} else {
154+
return await this.client.set(key, stringifiedValue);
155+
}
156+
} finally {
157+
span.end();
158+
}
159+
});
143160
}
144161

145162
get(key: string): Promise<V | undefined> {
146-
return this.client.get(key).then(value => (value != null ? JSON.parse(value) : undefined));
163+
return this.tracer.startActiveSpan('hive.cache.get', span =>
164+
this.client
165+
.get(key)
166+
.then(value => (value != null ? JSON.parse(value) : undefined))
167+
.finally(() => span.end()),
168+
);
147169
}
148170

149171
getKeysByPrefix(prefix: string): Promise<string[]> {
150172
return scanPatterns(this.client, `${prefix}*`);
151173
}
152174

153175
delete(key: string): Promise<boolean> {
154-
return this.client.del(key).then(
155-
value => value > 0,
156-
() => false,
176+
return this.tracer.startActiveSpan('hive.cache.delete', span =>
177+
this.client
178+
.del(key)
179+
.then(
180+
value => value > 0,
181+
() => false,
182+
)
183+
.finally(() => span.end()),
157184
);
158185
}
159186
}
@@ -172,3 +199,11 @@ function scanPatterns(
172199
return scanPatterns(redis, pattern, nextCursor, keys);
173200
});
174201
}
202+
203+
function safelyLogURL(log: Logger, url: string): void {
204+
const logURL = new URL(url);
205+
if (logURL.password) {
206+
logURL.password = '*'.repeat(logURL.password.length);
207+
}
208+
log.debug(`Connecting to Redis at ${logURL}`);
209+
}

packages/transports/grpc/src/index.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,7 @@ import {
1515
type Transport,
1616
} from '@graphql-mesh/transport-common';
1717
import type { Logger } from '@graphql-mesh/types';
18-
import {
19-
getDirective,
20-
getDirectives,
21-
getRootTypes,
22-
mapMaybePromise,
23-
type MaybePromise,
24-
} from '@graphql-tools/utils';
18+
import { getDirective, getDirectives, getRootTypes, type MaybePromise } from '@graphql-tools/utils';
2519
import type { ChannelCredentials } from '@grpc/grpc-js';
2620
import { credentials, loadPackageDefinition } from '@grpc/grpc-js';
2721
import type { ServiceClient } from '@grpc/grpc-js/build/src/make-client.js';
@@ -333,7 +327,7 @@ export class GrpcTransportHelper extends DisposableStack {
333327
}
334328
}
335329

336-
export default {
330+
const transport: Transport<gRPCTransportOptions> = {
337331
getSubgraphExecutor({ transportEntry, subgraph, cwd, logger }) {
338332
const transport = new GrpcTransportHelper(
339333
transportEntry.subgraph,
@@ -350,7 +344,9 @@ export default {
350344
},
351345
);
352346
},
353-
} satisfies Transport<gRPCTransportOptions>;
347+
};
348+
349+
export default transport;
354350

355351
function identityFn<T>(obj: T): T {
356352
return obj;
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createDefaultExecutor, type Transport } from '@graphql-mesh/transport-common';
22
import { processDirectives } from '@omnigraph/odata';
33

4-
export default {
4+
const transport: Transport = {
55
getSubgraphExecutor({ subgraph, fetch }) {
66
return createDefaultExecutor(
77
processDirectives({
@@ -10,4 +10,6 @@ export default {
1010
}),
1111
);
1212
},
13-
} satisfies Transport;
13+
};
14+
15+
export default transport;

packages/transports/rest/src/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export interface RESTTransportOptions {
66
queryParams?: Record<string, string>;
77
}
88

9-
export default {
9+
const transport: Transport<RESTTransportOptions> = {
1010
getSubgraphExecutor({ transportEntry, subgraph, fetch, pubsub, logger }) {
1111
const processDirectiveOpts: ProcessDirectiveArgs = {
1212
globalFetch: fetch,
@@ -18,8 +18,9 @@ export default {
1818
const executor = createDefaultExecutor(processedSchema);
1919
return executor;
2020
},
21-
} satisfies Transport<RESTTransportOptions>;
21+
};
2222

23+
export default transport;
2324
export { processDirectives } from './directives/process.js';
2425
export type { ProcessDirectiveArgs } from './directives/process.js';
2526
export { processScalarType } from './directives/scalars.js';

0 commit comments

Comments
 (0)