-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpubsub.ts
More file actions
147 lines (126 loc) · 4.4 KB
/
pubsub.ts
File metadata and controls
147 lines (126 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/**
* pubsub.ts — Kafka-to-AsyncIterator bridge for Apollo Server subscriptions.
*
* This module creates an EventEmitter-based PubSub that is fed by Kafka consumers.
* When a subscription resolver calls asyncIterator('NOTIFICATION_CREATED'),
* it receives events consumed from the Kafka `notification-events` topic.
*
* Why EventEmitter and not graphql-subscriptions PubSub?
* - graphql-subscriptions PubSub is in-memory only and deprecated for production.
* - We need Kafka as the event source for multi-pod distribution.
* - This bridge consumes from Kafka → emits to local EventEmitter → AsyncIterator.
*/
import { EventEmitter } from "events";
import {
createConsumer,
TOPIC_NOTIFICATIONS,
TOPIC_SYSTEM_ALERTS,
} from "./kafka.js";
import type { Consumer } from "kafkajs";
// Increase default max listeners (each subscription adds a listener)
const emitter = new EventEmitter();
emitter.setMaxListeners(100);
// ── Event names (used by subscription resolvers) ──
export const NOTIFICATION_CREATED = "NOTIFICATION_CREATED";
export const SYSTEM_ALERT = "SYSTEM_ALERT";
let consumer: Consumer | null = null;
/**
* Start consuming from Kafka topics and bridge events to the local EventEmitter.
* Called once at server startup.
*/
export async function startKafkaBridge(): Promise<void> {
consumer = createConsumer("notifications-subgraph-group");
await consumer.connect();
console.log("✅ Kafka consumer connected");
await consumer.subscribe({
topics: [TOPIC_NOTIFICATIONS, TOPIC_SYSTEM_ALERTS],
fromBeginning: false,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!message.value) return;
try {
const payload = JSON.parse(message.value.toString());
console.log(
`📨 Kafka [${topic}] partition=${partition} offset=${message.offset}:`,
payload.id
);
if (topic === TOPIC_NOTIFICATIONS) {
emitter.emit(NOTIFICATION_CREATED, payload);
} else if (topic === TOPIC_SYSTEM_ALERTS) {
emitter.emit(SYSTEM_ALERT, payload);
}
} catch (err) {
console.error("❌ Failed to parse Kafka message:", err);
}
},
});
console.log(
`✅ Kafka bridge started — listening on [${TOPIC_NOTIFICATIONS}, ${TOPIC_SYSTEM_ALERTS}]`
);
}
/**
* Creates an AsyncIterableIterator that yields events for a given event name.
* Used by subscription resolvers to return an async stream.
*
* @param eventName - The event name to listen for (e.g., NOTIFICATION_CREATED)
* @param filterFn - Optional filter predicate applied to each event
*/
export function asyncIterator<T>(
eventName: string,
filterFn?: (payload: T) => boolean
): AsyncIterableIterator<T> {
const pullQueue: Array<(value: IteratorResult<T>) => void> = [];
const pushQueue: T[] = [];
let done = false;
const pushValue = (payload: T) => {
// Apply filter if provided
if (filterFn && !filterFn(payload)) return;
if (pullQueue.length > 0) {
// A consumer is waiting — resolve immediately
const resolve = pullQueue.shift()!;
resolve({ value: payload, done: false });
} else {
// Buffer the event until someone pulls
pushQueue.push(payload);
}
};
emitter.on(eventName, pushValue);
return {
next(): Promise<IteratorResult<T>> {
if (done) return Promise.resolve({ value: undefined as any, done: true });
if (pushQueue.length > 0) {
return Promise.resolve({ value: pushQueue.shift()!, done: false });
}
return new Promise((resolve) => pullQueue.push(resolve));
},
return(): Promise<IteratorResult<T>> {
done = true;
emitter.removeListener(eventName, pushValue);
// Drain any waiting consumers
for (const resolve of pullQueue) {
resolve({ value: undefined as any, done: true });
}
pullQueue.length = 0;
pushQueue.length = 0;
return Promise.resolve({ value: undefined as any, done: true });
},
throw(err: Error): Promise<IteratorResult<T>> {
done = true;
emitter.removeListener(eventName, pushValue);
return Promise.reject(err);
},
[Symbol.asyncIterator]() {
return this;
},
};
}
/**
* Stop the Kafka consumer. Called during graceful shutdown.
*/
export async function stopKafkaBridge(): Promise<void> {
if (consumer) {
await consumer.disconnect();
console.log("🔌 Kafka consumer disconnected");
}
}