Skip to content

Commit cad63ba

Browse files
committed
Redesign WhatsApp bridge as a stdio event sidecar
Inbound messages, QR codes, and connection state now stream as NDJSON events on the sidecar's stdout, consumed directly from the process pipe — no inbound HTTP, no long-polling, no message queue to overflow. The only network listener left is a loopback POST /send for the stateless send_message tool. Modes renamed to self/all; adapter status() now answers from the cached event stream without a network round-trip.
1 parent f494bd5 commit cad63ba

4 files changed

Lines changed: 274 additions & 270 deletions

File tree

platform/coworker/connectors/experimental/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
"mode",
3737
"Mode",
3838
required=False,
39-
help="self-chat (default): the agent lives in your message-yourself thread. bot: a dedicated number.",
40-
placeholder="self-chat",
39+
help="self (default): the agent lives in your message-yourself thread. all: every chat on the account.",
40+
placeholder="self",
4141
),
4242
Field(
4343
"bridge_port",
@@ -59,7 +59,7 @@
5959
"Use a SECONDARY phone number. Never pair your primary personal account.",
6060
"Connect here, then start the super-agent: the bridge starts and shows a QR code in the connector status.",
6161
"Scan it from WhatsApp → Settings → Linked devices → Link a device.",
62-
"In self-chat mode, talk to the agent in your own message-yourself thread.",
62+
"In self mode, talk to the agent in your own message-yourself thread.",
6363
],
6464
risk_notice=(
6565
"This connector speaks the unofficial WhatsApp Web protocol (Baileys). That violates "

platform/coworker/connectors/experimental/whatsapp_bridge/bridge.js

Lines changed: 125 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,20 @@
22
/**
33
* OpenCoworker WhatsApp Personal bridge — EXPERIMENTAL, use at your own risk.
44
*
5-
* Connects to WhatsApp as a personal account over the unofficial WhatsApp Web protocol
6-
* (Baileys) and exposes a localhost-only HTTP API for the Python adapter. Architecture
7-
* modeled on the Hermes Agent bridge (MIT, (c) 2025 Nous Research), reimplemented with a
8-
* smaller surface and no Express dependency.
5+
* Speaks the unofficial WhatsApp Web protocol via Baileys and talks to the Python
6+
* adapter over a stdio event stream: every inbound message, QR refresh, and state
7+
* change is one NDJSON line on stdout, so there is no inbound HTTP surface, no
8+
* polling, and no message queue to overflow. The only network listener is a
9+
* single localhost POST /send endpoint, kept so the stateless send_message tool
10+
* can deliver replies without holding a handle to this process.
911
*
10-
* Endpoints:
11-
* GET /health -> { ok, status, me } status: starting|pairing|open|closed
12-
* GET /qr -> { qr } raw QR string while pairing, else null
13-
* GET /messages -> { messages: [...] } drains the inbound queue (long-poll ~25s)
14-
* POST /send -> { ok, messageId } body: { chatId, text }
12+
* stdout events (one JSON object per line):
13+
* {"event":"ready","port":N} HTTP send endpoint is up
14+
* {"event":"state","state":S,"me":J} S: pairing|open|closed|reconnecting
15+
* {"event":"qr","qr":"..."} pairing QR (refreshes periodically)
16+
* {"event":"message","id","chat","sender","name","group","text","ts"}
1517
*
16-
* Usage: node bridge.js --port 3941 --session <dir> --mode self-chat|bot
18+
* Usage: node bridge.js --port 3941 --session <dir> --mode self|all
1719
*/
1820

1921
import http from "node:http";
@@ -25,172 +27,140 @@ import makeWASocket, {
2527
DisconnectReason,
2628
} from "@whiskeysockets/baileys";
2729

28-
const args = process.argv.slice(2);
29-
const getArg = (name, dflt) => {
30-
const i = args.indexOf(`--${name}`);
31-
return i !== -1 && args[i + 1] ? args[i + 1] : dflt;
30+
const opt = (flag, fallback) => {
31+
const i = process.argv.indexOf(`--${flag}`);
32+
return i > 0 && process.argv[i + 1] ? process.argv[i + 1] : fallback;
3233
};
3334

34-
const PORT = parseInt(getArg("port", "3941"), 10);
35-
const SESSION_DIR = getArg("session", "./wa-session");
36-
const MODE = getArg("mode", "self-chat"); // "self-chat": only your message-yourself thread
37-
const MAX_TEXT = 4096;
38-
const QUEUE_CAP = 500;
39-
const LONG_POLL_MS = 25000;
35+
const PORT = Number(opt("port", "3941"));
36+
const SESSION_DIR = opt("session", "./wa-session");
37+
const MODE = opt("mode", "self") === "all" ? "all" : "self";
38+
const TEXT_LIMIT = 4096;
4039

41-
mkdirSync(SESSION_DIR, { recursive: true });
40+
const emit = (obj) => process.stdout.write(JSON.stringify(obj) + "\n");
41+
const digits = (jid) => String(jid || "").replace(/[:@].*$/, "").split(":")[0];
4242

43-
let sock = null;
44-
let status = "starting";
45-
let qrString = null;
46-
let meJid = null;
47-
const inbound = []; // queued message dicts for the Python adapter
48-
const sentByBridge = new Set(); // ids of our own sends, to drop echoes in self-chat mode
49-
let waiters = []; // pending long-poll resolvers
43+
mkdirSync(SESSION_DIR, { recursive: true });
5044

51-
const bareJid = (jid) => String(jid || "").split(":")[0].split("@")[0];
45+
class WhatsAppLink {
46+
constructor() {
47+
this.sock = null;
48+
this.me = null;
49+
this.ownSends = [];
50+
}
5251

53-
function pushInbound(msg) {
54-
inbound.push(msg);
55-
if (inbound.length > QUEUE_CAP) inbound.shift();
56-
for (const w of waiters.splice(0)) w();
57-
}
52+
rememberSend(id) {
53+
if (!id) return;
54+
this.ownSends.push(id);
55+
if (this.ownSends.length > 1000) this.ownSends.shift();
56+
}
5857

59-
function mapMessage(m) {
60-
const text =
61-
m.message?.conversation ||
62-
m.message?.extendedTextMessage?.text ||
63-
m.message?.imageMessage?.caption ||
64-
"";
65-
if (!text) return null;
66-
const chatId = m.key.remoteJid || "";
67-
if (chatId === "status@broadcast") return null;
68-
const fromMe = Boolean(m.key.fromMe);
69-
const selfChat = bareJid(chatId) === bareJid(meJid);
70-
if (MODE === "self-chat" && !selfChat) return null;
71-
// In self-chat the user's own messages are fromMe — keep them, but never our own sends.
72-
if (fromMe && (!selfChat || sentByBridge.has(m.key.id))) return null;
73-
return {
74-
id: m.key.id || "",
75-
chatId,
76-
senderId: bareJid(fromMe ? meJid : m.key.participant || chatId),
77-
senderName: m.pushName || "",
78-
isGroup: chatId.endsWith("@g.us"),
79-
text,
80-
timestamp: Number(m.messageTimestamp) || 0,
81-
};
82-
}
58+
textOf(m) {
59+
const c = m.message || {};
60+
return (
61+
c.conversation ||
62+
c.extendedTextMessage?.text ||
63+
c.imageMessage?.caption ||
64+
c.videoMessage?.caption ||
65+
""
66+
);
67+
}
8368

84-
async function startSocket() {
85-
const { state, saveCreds } = await useMultiFileAuthState(SESSION_DIR);
86-
const { version } = await fetchLatestBaileysVersion().catch(() => ({ version: undefined }));
87-
sock = makeWASocket({ auth: state, version, printQRInTerminal: false });
88-
89-
sock.ev.on("creds.update", saveCreds);
90-
sock.ev.on("connection.update", ({ connection, lastDisconnect, qr }) => {
91-
if (qr) {
92-
qrString = qr;
93-
status = "pairing";
94-
console.log("[bridge] pairing required — fetch GET /qr and scan it in WhatsApp");
95-
}
96-
if (connection === "open") {
97-
qrString = null;
98-
status = "open";
99-
meJid = sock.user?.id || null;
100-
console.log(`[bridge] connected as ${meJid}`);
101-
}
102-
if (connection === "close") {
103-
const code = lastDisconnect?.error?.output?.statusCode;
104-
if (code === DisconnectReason.loggedOut) {
105-
status = "closed";
106-
console.error("[bridge] logged out — delete the session dir and re-pair");
107-
} else {
108-
status = "starting";
109-
console.log(`[bridge] connection closed (code ${code}) — reconnecting`);
110-
setTimeout(() => startSocket().catch((e) => console.error("[bridge]", e)), 2000);
111-
}
112-
}
113-
});
114-
sock.ev.on("messages.upsert", ({ messages, type }) => {
115-
if (type !== "notify") return;
116-
for (const m of messages) {
117-
const mapped = mapMessage(m);
118-
if (mapped) pushInbound(mapped);
69+
// mode "self": only the message-yourself thread; user's own messages are the input,
70+
// but anything this bridge itself sent must never loop back in.
71+
shouldForward(m) {
72+
const chat = m.key.remoteJid || "";
73+
if (!chat || chat === "status@broadcast") return false;
74+
const isSelfThread = digits(chat) === digits(this.me);
75+
if (MODE === "self" && !isSelfThread) return false;
76+
if (m.key.fromMe) {
77+
return isSelfThread && !this.ownSends.includes(m.key.id);
11978
}
120-
});
121-
}
122-
123-
const json = (res, code, body) => {
124-
res.writeHead(code, { "content-type": "application/json" });
125-
res.end(JSON.stringify(body));
126-
};
79+
return true;
80+
}
12781

128-
const readBody = (req) =>
129-
new Promise((resolve, reject) => {
130-
let data = "";
131-
req.on("data", (c) => {
132-
data += c;
133-
if (data.length > 1e6) reject(new Error("body too large"));
82+
async open() {
83+
const { state, saveCreds } = await useMultiFileAuthState(SESSION_DIR);
84+
const { version } = await fetchLatestBaileysVersion().catch(() => ({}));
85+
this.sock = makeWASocket({ auth: state, version, printQRInTerminal: false });
86+
this.sock.ev.on("creds.update", saveCreds);
87+
88+
this.sock.ev.on("connection.update", (u) => {
89+
if (u.qr) emit({ event: "qr", qr: u.qr }), emit({ event: "state", state: "pairing", me: null });
90+
if (u.connection === "open") {
91+
this.me = this.sock.user?.id || null;
92+
emit({ event: "state", state: "open", me: digits(this.me) });
93+
}
94+
if (u.connection === "close") {
95+
const code = u.lastDisconnect?.error?.output?.statusCode;
96+
if (code === DisconnectReason.loggedOut) {
97+
emit({ event: "state", state: "closed", me: null });
98+
} else {
99+
emit({ event: "state", state: "reconnecting", me: null });
100+
setTimeout(() => this.open().catch((e) => emit({ event: "state", state: "closed", error: String(e) })), 2500);
101+
}
102+
}
134103
});
135-
req.on("end", () => resolve(data));
136-
req.on("error", reject);
137-
});
138104

139-
const server = http.createServer(async (req, res) => {
140-
const url = new URL(req.url, `http://127.0.0.1:${PORT}`);
141-
try {
142-
if (req.method === "GET" && url.pathname === "/health") {
143-
return json(res, 200, { ok: true, status, me: bareJid(meJid) || null });
144-
}
145-
if (req.method === "GET" && url.pathname === "/qr") {
146-
return json(res, 200, { qr: qrString });
147-
}
148-
if (req.method === "GET" && url.pathname === "/messages") {
149-
if (!inbound.length) {
150-
await new Promise((resolve) => {
151-
const t = setTimeout(resolve, LONG_POLL_MS);
152-
waiters.push(() => {
153-
clearTimeout(t);
154-
resolve();
155-
});
105+
this.sock.ev.on("messages.upsert", ({ messages, type }) => {
106+
if (type !== "notify") return;
107+
for (const m of messages) {
108+
const text = this.textOf(m);
109+
if (!text || !this.shouldForward(m)) continue;
110+
emit({
111+
event: "message",
112+
id: m.key.id || "",
113+
chat: m.key.remoteJid || "",
114+
sender: digits(m.key.fromMe ? this.me : m.key.participant || m.key.remoteJid),
115+
name: m.pushName || "",
116+
group: String(m.key.remoteJid || "").endsWith("@g.us"),
117+
text,
118+
ts: Number(m.messageTimestamp) || 0,
156119
});
157120
}
158-
return json(res, 200, { messages: inbound.splice(0) });
159-
}
160-
if (req.method === "POST" && url.pathname === "/send") {
161-
const { chatId, text } = JSON.parse((await readBody(req)) || "{}");
162-
if (!chatId || !text) return json(res, 400, { ok: false, error: "chatId and text required" });
163-
if (status !== "open") return json(res, 503, { ok: false, error: `not connected (${status})` });
164-
const sent = await sock.sendMessage(String(chatId), { text: String(text).slice(0, MAX_TEXT) });
165-
const id = sent?.key?.id || "";
166-
if (id) {
167-
sentByBridge.add(id);
168-
if (sentByBridge.size > 1000) sentByBridge.delete(sentByBridge.values().next().value);
169-
}
170-
return json(res, 200, { ok: true, messageId: id });
171-
}
172-
return json(res, 404, { ok: false, error: "not found" });
173-
} catch (e) {
174-
return json(res, 500, { ok: false, error: String(e?.message || e) });
121+
});
122+
}
123+
124+
async deliver(to, body) {
125+
if (!this.sock || !this.me) throw new Error("not paired/connected yet");
126+
const sent = await this.sock.sendMessage(String(to), {
127+
text: String(body).slice(0, TEXT_LIMIT),
128+
});
129+
this.rememberSend(sent?.key?.id);
130+
return sent?.key?.id || "";
175131
}
132+
}
133+
134+
const link = new WhatsAppLink();
135+
136+
const server = http.createServer((req, res) => {
137+
const reply = (code, body) => {
138+
res.writeHead(code, { "content-type": "application/json" });
139+
res.end(JSON.stringify(body));
140+
};
141+
if (req.method !== "POST" || req.url !== "/send") return reply(404, { sent: false, error: "unknown route" });
142+
let raw = "";
143+
req.on("data", (chunk) => (raw += chunk));
144+
req.on("end", async () => {
145+
try {
146+
const { to, body } = JSON.parse(raw || "{}");
147+
if (!to || !body) return reply(400, { sent: false, error: "to and body required" });
148+
reply(200, { sent: true, id: await link.deliver(to, body) });
149+
} catch (e) {
150+
reply(500, { sent: false, error: String(e?.message || e) });
151+
}
152+
});
176153
});
177154

178-
// localhost only — never expose the bridge beyond the machine
155+
// loopback only: the send endpoint must never be reachable off-machine
179156
server.listen(PORT, "127.0.0.1", () => {
180-
console.log(`[bridge] listening on 127.0.0.1:${PORT} (mode=${MODE})`);
181-
startSocket().catch((e) => {
182-
console.error("[bridge] fatal:", e);
157+
emit({ event: "ready", port: PORT });
158+
link.open().catch((e) => {
159+
emit({ event: "state", state: "closed", error: String(e?.message || e) });
183160
process.exit(1);
184161
});
185162
});
186163

187164
for (const sig of ["SIGINT", "SIGTERM"]) {
188-
process.on(sig, () => {
189-
try {
190-
server.close();
191-
sock?.end?.(undefined);
192-
} finally {
193-
process.exit(0);
194-
}
195-
});
165+
process.on(sig, () => process.exit(0));
196166
}

0 commit comments

Comments
 (0)