From bdfdd52d3b59ade950b320a6e60775bac4aaecc3 Mon Sep 17 00:00:00 2001 From: Rohit P Date: Fri, 12 Jun 2026 14:40:57 -0700 Subject: [PATCH] Add experimental WhatsApp Personal connector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two-way WhatsApp on a personal account via a supervised Node.js sidecar built on Baileys (unofficial WhatsApp Web protocol). The sidecar streams inbound messages, pairing QRs, and connection state as NDJSON events on stdout — no inbound HTTP, no polling; its only listener is a loopback POST /send for the stateless send_message tool. Registration hooks (platforms, adapter factories, sender credentials) plug the experimental package into the existing gateway, which skips experimental platforms unless the opt-in setting is on. QR pairing via a new /v1/connectors/{name}/status endpoint; npm deps install into the state dir on first connect (never vendored); blunt account-ban risk notice gates connect; excluded from release builds like all experimental code. --- platform/.gitignore | 4 + platform/coworker/connectors/adapters.py | 12 +- platform/coworker/connectors/config.py | 24 +- .../connectors/experimental/__init__.py | 73 +++++- .../experimental/whatsapp_bridge/bridge.js | 166 +++++++++++++ .../experimental/whatsapp_bridge/package.json | 13 + .../experimental/whatsapp_personal.py | 230 ++++++++++++++++++ platform/coworker/connectors/senders.py | 21 ++ platform/coworker/connectors/tools.py | 9 +- platform/coworker/server/app.py | 5 + platform/coworker/server/manager.py | 17 ++ platform/packaging/coworker-server.spec | 12 + platform/pyproject.toml | 5 + platform/tests/test_connectors.py | 211 +++++++++++++++- 14 files changed, 785 insertions(+), 17 deletions(-) create mode 100644 platform/coworker/connectors/experimental/whatsapp_bridge/bridge.js create mode 100644 platform/coworker/connectors/experimental/whatsapp_bridge/package.json create mode 100644 platform/coworker/connectors/experimental/whatsapp_personal.py diff --git a/platform/.gitignore b/platform/.gitignore index df96f790..2dd9563b 100644 --- a/platform/.gitignore +++ b/platform/.gitignore @@ -6,3 +6,7 @@ __pycache__/ build/ dist/ .coverage + +# experimental WhatsApp bridge — deps are installed into the state dir, never vendored +coworker/connectors/experimental/whatsapp_bridge/node_modules/ +coworker/connectors/experimental/whatsapp_bridge/package-lock.json diff --git a/platform/coworker/connectors/adapters.py b/platform/coworker/connectors/adapters.py index c9d34a73..27a48a8e 100644 --- a/platform/coworker/connectors/adapters.py +++ b/platform/coworker/connectors/adapters.py @@ -174,10 +174,20 @@ async def send( return _send_slack(self.bot_token, chat_id, text, thread_id) +ADAPTER_FACTORIES: dict[str, Any] = {} + + +def register_adapter_factory(platform: str, factory: Any) -> None: + """Register a `profile -> Optional[BasePlatformAdapter]` factory for an extra + platform (used by the experimental package).""" + ADAPTER_FACTORIES[platform] = factory + + def make_adapter(platform: str, profile: dict) -> Optional[BasePlatformAdapter]: """Build the adapter for a connected platform from its SecretStore profile.""" if platform == "telegram" and profile.get("bot_token"): return TelegramAdapter(profile["bot_token"]) if platform == "slack" and profile.get("bot_token") and profile.get("app_token"): return SlackAdapter(profile["bot_token"], profile["app_token"]) - return None + factory = ADAPTER_FACTORIES.get(platform) + return factory(profile) if factory is not None else None diff --git a/platform/coworker/connectors/config.py b/platform/coworker/connectors/config.py index 0456e69b..e28eaffb 100644 --- a/platform/coworker/connectors/config.py +++ b/platform/coworker/connectors/config.py @@ -14,7 +14,24 @@ from ..secrets import SecretStore from .base import SessionSource -PLATFORMS = ("telegram", "slack") +PLATFORMS: list[str] = ["telegram", "slack"] + +# Per-platform credential key in the SecretStore profile that proves "connected enough to +# listen". None → the platform needs no stored credential (e.g. QR-paired bridges); a bare +# profile written by connect_connector is enough. +_CREDENTIAL_KEYS: dict[str, Optional[str]] = { + "telegram": "bot_token", + "slack": "bot_token", +} + + +def register_platform( + name: str, *, credential_key: Optional[str] = "bot_token" +) -> None: + """Register an extra two-way platform (used by the experimental package).""" + if name not in PLATFORMS: + PLATFORMS.append(name) + _CREDENTIAL_KEYS[name] = credential_key @dataclass @@ -49,13 +66,14 @@ def load_settings( out: dict[str, ConnectorSettings] = {} for platform in PLATFORMS: profile = secrets.get(f"{platform}:default") or {} - token = profile.get("bot_token") + cred_key = _CREDENTIAL_KEYS.get(platform, "bot_token") + has_cred = bool(profile.get(cred_key)) if cred_key else bool(profile) allowed = set(profile.get("allowed_users") or []) allowed |= _csv(os.environ.get(f"{platform.upper()}_ALLOWED_USERS")) allow_all = bool(profile.get("allow_all")) or os.environ.get( f"{platform.upper()}_ALLOW_ALL_USERS", "" ).lower() in ("1", "true", "yes") - enabled = bool(token) and profile.get("enabled", True) + enabled = has_cred and profile.get("enabled", True) out[platform] = ConnectorSettings( platform=platform, enabled=enabled, diff --git a/platform/coworker/connectors/experimental/__init__.py b/platform/coworker/connectors/experimental/__init__.py index d594e3db..5c823c66 100644 --- a/platform/coworker/connectors/experimental/__init__.py +++ b/platform/coworker/connectors/experimental/__init__.py @@ -6,13 +6,76 @@ them in a self-built binary). To add one: define a `ConnectorDescriptor` with a `risk_notice` that states the concrete -downside in plain language, append it to `EXPERIMENTAL_DESCRIPTORS`, and register its tools or -adapter the same way first-party connectors do. The `experimental` flag is forced on by the -loader in descriptors.py regardless of what the descriptor sets. +downside in plain language, append it to `EXPERIMENTAL_DESCRIPTORS`, and register its +platform/adapter/sender via the registries in config.py, adapters.py, and senders.py. The +`experimental` flag is forced on by the loader in descriptors.py regardless of what the +descriptor sets. """ from __future__ import annotations -from ..descriptors import ConnectorDescriptor +from ..adapters import register_adapter_factory +from ..config import register_platform +from ..descriptors import ConnectorDescriptor, Field +from ..senders import register_sender +from .whatsapp_personal import ( + PLATFORM as _WA_PLATFORM, + WhatsAppPersonalAdapter, + send_whatsapp_personal, +) -EXPERIMENTAL_DESCRIPTORS: list[ConnectorDescriptor] = [] +WHATSAPP_PERSONAL = ConnectorDescriptor( + name=_WA_PLATFORM, + title="WhatsApp Personal", + icon="◌", + blurb="Two-way WhatsApp on your personal account via the unofficial WhatsApp Web protocol.", + auth="qr", + two_way=True, + fields=[ + Field( + "mode", + "Mode", + required=False, + help="self (default): the agent lives in your message-yourself thread. all: every chat on the account.", + placeholder="self", + ), + Field( + "bridge_port", + "Bridge port", + required=False, + help="Local port for the bridge process. Default 3941.", + placeholder="3941", + ), + Field( + "allowed_users", + "Allowed user IDs", + required=False, + help="Comma-separated phone numbers (digits only) allowed to message the agent. Empty = nobody (self-chat mode only needs your own).", + placeholder="14155550123", + ), + ], + instructions=[ + "Requires Node.js on this machine — the bridge installs its own dependencies on first start.", + "Use a SECONDARY phone number. Never pair your primary personal account.", + "Connect here, then start the super-agent: the bridge starts and shows a QR code in the connector status.", + "Scan it from WhatsApp → Settings → Linked devices → Link a device.", + "In self mode, talk to the agent in your own message-yourself thread.", + ], + risk_notice=( + "This connector speaks the unofficial WhatsApp Web protocol (Baileys). That violates " + "WhatsApp's Terms of Service, and Meta detects and permanently bans accounts using it, " + "without warning and without appeal. Only use a secondary number you can afford to " + "lose. Nothing about this is endorsed by or affiliated with WhatsApp/Meta." + ), +) + +register_platform(_WA_PLATFORM, credential_key=None) +register_adapter_factory(_WA_PLATFORM, WhatsAppPersonalAdapter) +register_sender( + _WA_PLATFORM, + send_whatsapp_personal, + credential_key="bridge_port", + credential_required=False, +) + +EXPERIMENTAL_DESCRIPTORS: list[ConnectorDescriptor] = [WHATSAPP_PERSONAL] diff --git a/platform/coworker/connectors/experimental/whatsapp_bridge/bridge.js b/platform/coworker/connectors/experimental/whatsapp_bridge/bridge.js new file mode 100644 index 00000000..633fba1d --- /dev/null +++ b/platform/coworker/connectors/experimental/whatsapp_bridge/bridge.js @@ -0,0 +1,166 @@ +#!/usr/bin/env node +/** + * OpenCoworker WhatsApp Personal bridge — EXPERIMENTAL, use at your own risk. + * + * Speaks the unofficial WhatsApp Web protocol via Baileys and talks to the Python + * adapter over a stdio event stream: every inbound message, QR refresh, and state + * change is one NDJSON line on stdout, so there is no inbound HTTP surface, no + * polling, and no message queue to overflow. The only network listener is a + * single localhost POST /send endpoint, kept so the stateless send_message tool + * can deliver replies without holding a handle to this process. + * + * stdout events (one JSON object per line): + * {"event":"ready","port":N} HTTP send endpoint is up + * {"event":"state","state":S,"me":J} S: pairing|open|closed|reconnecting + * {"event":"qr","qr":"..."} pairing QR (refreshes periodically) + * {"event":"message","id","chat","sender","name","group","text","ts"} + * + * Usage: node bridge.js --port 3941 --session --mode self|all + */ + +import http from "node:http"; +import { mkdirSync } from "node:fs"; + +import makeWASocket, { + useMultiFileAuthState, + fetchLatestBaileysVersion, + DisconnectReason, +} from "@whiskeysockets/baileys"; + +const opt = (flag, fallback) => { + const i = process.argv.indexOf(`--${flag}`); + return i > 0 && process.argv[i + 1] ? process.argv[i + 1] : fallback; +}; + +const PORT = Number(opt("port", "3941")); +const SESSION_DIR = opt("session", "./wa-session"); +const MODE = opt("mode", "self") === "all" ? "all" : "self"; +const TEXT_LIMIT = 4096; + +const emit = (obj) => process.stdout.write(JSON.stringify(obj) + "\n"); +const digits = (jid) => String(jid || "").replace(/[:@].*$/, "").split(":")[0]; + +mkdirSync(SESSION_DIR, { recursive: true }); + +class WhatsAppLink { + constructor() { + this.sock = null; + this.me = null; + this.ownSends = []; + } + + rememberSend(id) { + if (!id) return; + this.ownSends.push(id); + if (this.ownSends.length > 1000) this.ownSends.shift(); + } + + textOf(m) { + const c = m.message || {}; + return ( + c.conversation || + c.extendedTextMessage?.text || + c.imageMessage?.caption || + c.videoMessage?.caption || + "" + ); + } + + // mode "self": only the message-yourself thread; user's own messages are the input, + // but anything this bridge itself sent must never loop back in. + shouldForward(m) { + const chat = m.key.remoteJid || ""; + if (!chat || chat === "status@broadcast") return false; + const isSelfThread = digits(chat) === digits(this.me); + if (MODE === "self" && !isSelfThread) return false; + if (m.key.fromMe) { + return isSelfThread && !this.ownSends.includes(m.key.id); + } + return true; + } + + async open() { + const { state, saveCreds } = await useMultiFileAuthState(SESSION_DIR); + const { version } = await fetchLatestBaileysVersion().catch(() => ({})); + this.sock = makeWASocket({ auth: state, version, printQRInTerminal: false }); + this.sock.ev.on("creds.update", saveCreds); + + this.sock.ev.on("connection.update", (u) => { + if (u.qr) emit({ event: "qr", qr: u.qr }), emit({ event: "state", state: "pairing", me: null }); + if (u.connection === "open") { + this.me = this.sock.user?.id || null; + emit({ event: "state", state: "open", me: digits(this.me) }); + } + if (u.connection === "close") { + const code = u.lastDisconnect?.error?.output?.statusCode; + if (code === DisconnectReason.loggedOut) { + emit({ event: "state", state: "closed", me: null }); + } else { + emit({ event: "state", state: "reconnecting", me: null }); + setTimeout(() => this.open().catch((e) => emit({ event: "state", state: "closed", error: String(e) })), 2500); + } + } + }); + + this.sock.ev.on("messages.upsert", ({ messages, type }) => { + if (type !== "notify") return; + for (const m of messages) { + const text = this.textOf(m); + if (!text || !this.shouldForward(m)) continue; + emit({ + event: "message", + id: m.key.id || "", + chat: m.key.remoteJid || "", + sender: digits(m.key.fromMe ? this.me : m.key.participant || m.key.remoteJid), + name: m.pushName || "", + group: String(m.key.remoteJid || "").endsWith("@g.us"), + text, + ts: Number(m.messageTimestamp) || 0, + }); + } + }); + } + + async deliver(to, body) { + if (!this.sock || !this.me) throw new Error("not paired/connected yet"); + const sent = await this.sock.sendMessage(String(to), { + text: String(body).slice(0, TEXT_LIMIT), + }); + this.rememberSend(sent?.key?.id); + return sent?.key?.id || ""; + } +} + +const link = new WhatsAppLink(); + +const server = http.createServer((req, res) => { + const reply = (code, body) => { + res.writeHead(code, { "content-type": "application/json" }); + res.end(JSON.stringify(body)); + }; + if (req.method !== "POST" || req.url !== "/send") return reply(404, { sent: false, error: "unknown route" }); + let raw = ""; + req.on("data", (chunk) => (raw += chunk)); + req.on("end", async () => { + try { + const { to, body } = JSON.parse(raw || "{}"); + if (!to || !body) return reply(400, { sent: false, error: "to and body required" }); + reply(200, { sent: true, id: await link.deliver(to, body) }); + } catch (e) { + reply(500, { sent: false, error: String(e?.message || e) }); + } + }); +}); + +// loopback only: the send endpoint must never be reachable off-machine +server.listen(PORT, "127.0.0.1", () => { + emit({ event: "ready", port: PORT }); + link.open().catch((e) => { + emit({ event: "state", state: "closed", error: String(e?.message || e) }); + process.exit(1); + }); +}); + +for (const sig of ["SIGINT", "SIGTERM"]) { + process.on(sig, () => process.exit(0)); +} diff --git a/platform/coworker/connectors/experimental/whatsapp_bridge/package.json b/platform/coworker/connectors/experimental/whatsapp_bridge/package.json new file mode 100644 index 00000000..45d86496 --- /dev/null +++ b/platform/coworker/connectors/experimental/whatsapp_bridge/package.json @@ -0,0 +1,13 @@ +{ + "name": "coworker-whatsapp-bridge", + "version": "0.1.0", + "description": "EXPERIMENTAL WhatsApp personal-account bridge for OpenCoworker (unofficial protocol — use at your own risk)", + "private": true, + "type": "module", + "scripts": { + "start": "node bridge.js" + }, + "dependencies": { + "@whiskeysockets/baileys": "^6.7.9" + } +} diff --git a/platform/coworker/connectors/experimental/whatsapp_personal.py b/platform/coworker/connectors/experimental/whatsapp_personal.py new file mode 100644 index 00000000..e974a83a --- /dev/null +++ b/platform/coworker/connectors/experimental/whatsapp_personal.py @@ -0,0 +1,230 @@ +"""WhatsApp Personal adapter — EXPERIMENTAL, unofficial protocol, use at your own risk. + +Two-way WhatsApp on a personal account through a supervised Node.js sidecar built on +Baileys (the unofficial WhatsApp Web protocol library). The sidecar pushes everything — +inbound messages, pairing QR codes, connection state — as NDJSON events on its stdout +pipe, which this adapter consumes directly; there is no inbound HTTP and no polling. +The sidecar's only listener is a loopback `POST /send`, kept so the stateless +`send_message` tool can deliver replies without a handle to the process. Pairing is by +QR code, so no credential is ever pasted, and npm dependencies install into the state +dir on first connect rather than shipping with the app. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import shutil +import subprocess +import sys +from importlib import resources +from typing import Any, Optional + +from ...secrets import state_dir +from ..base import BasePlatformAdapter, MessageEvent, SendResult, SessionSource + +logger = logging.getLogger("coworker.connectors.experimental") + +PLATFORM = "whatsapp_personal" +DEFAULT_PORT = 3941 +_BRIDGE_FILES = ("bridge.js", "package.json") +_NPM_INSTALL_TIMEOUT = 600 +_READY_TIMEOUT = 30.0 + + +# -- pure mapper (testable without the sidecar) ---------------------------------- +def bridge_event_to_message(event: dict) -> Optional[MessageEvent]: + """Map a sidecar `message` event to a MessageEvent; anything else → None.""" + if event.get("event") != "message": + return None + text, chat = event.get("text") or "", event.get("chat") or "" + if not text or not chat: + return None + source = SessionSource( + platform=PLATFORM, + chat_id=chat, + user_id=str(event.get("sender") or "") or None, + user_name=event.get("name") or None, + chat_type="group" if event.get("group") else "dm", + ) + return MessageEvent(text=text, source=source, message_id=str(event.get("id") or "")) + + +# -- stateless sender for the send_message tool ---------------------------------- +def send_whatsapp_personal( + port: str, chat_id: str, text: str, thread_id: Optional[str] = None +) -> SendResult: + """POST to the sidecar's loopback send endpoint. `port` comes from the profile's + bridge_port and may be empty (default port).""" + import httpx + + url = f"http://127.0.0.1:{int(port or DEFAULT_PORT)}/send" + try: + data = httpx.post(url, json={"to": chat_id, "body": text}, timeout=30.0).json() + except Exception as exc: + return SendResult(False, error=f"bridge unreachable ({exc})") + if data.get("sent"): + return SendResult(True, message_id=str(data.get("id") or "")) + return SendResult(False, error=data.get("error") or "whatsapp send failed") + + +# -- adapter --------------------------------------------------------------------- +class WhatsAppPersonalAdapter(BasePlatformAdapter): + """Owns the sidecar: spawn on connect, consume its stdout event stream, kill on + disconnect. Connection state and the pairing QR are cached from events, so + `status()` answers without any network round-trip.""" + + platform = PLATFORM + + def __init__(self, profile: Optional[dict] = None) -> None: + super().__init__() + profile = profile or {} + self.port = int(profile.get("bridge_port") or DEFAULT_PORT) + self.mode = "all" if str(profile.get("mode") or "").strip() == "all" else "self" + # The sidecar runs out of the state dir, not the package: PyInstaller bundles + # are ephemeral, and npm deps installed next to user state survive app updates. + self.bridge_home = state_dir() / PLATFORM / "bridge" + self.session_dir = state_dir() / PLATFORM / "session" + self._proc: Optional[subprocess.Popen] = None + self._reader_task: Optional[asyncio.Task] = None + self._ready = asyncio.Event() + self.state: dict[str, Any] = {"state": "down", "me": None, "qr": None} + + # -- event handling (pure-ish, unit-testable) -------------------------------- + async def handle_event_line(self, line: str) -> None: + try: + event = json.loads(line) + except (ValueError, TypeError): + return + kind = event.get("event") + if kind == "ready": + self._ready.set() + elif kind == "state": + self.state["state"] = event.get("state") + self.state["me"] = event.get("me") + if event.get("state") == "open": + self.state["qr"] = None + elif kind == "qr": + self.state["qr"] = event.get("qr") + elif kind == "message": + msg = bridge_event_to_message(event) + if msg is not None: + await self.handle_message(msg) + + # -- sidecar lifecycle -------------------------------------------------------- + @staticmethod + def node_path() -> Optional[str]: + return shutil.which("node") + + def _stage_bridge(self) -> None: + """Copy the sidecar sources from the package into the state dir (always + refreshed so app updates propagate; node_modules and the session persist).""" + self.bridge_home.mkdir(parents=True, exist_ok=True) + pkg = resources.files(__package__) / "whatsapp_bridge" + for name in _BRIDGE_FILES: + (self.bridge_home / name).write_bytes((pkg / name).read_bytes()) + + def _deps_installed(self) -> bool: + return (self.bridge_home / "node_modules" / "@whiskeysockets").is_dir() + + def _install_deps(self) -> bool: + npm = shutil.which("npm") + if not npm: + logger.error("[%s] npm not found — install Node.js first", PLATFORM) + return False + logger.info("[%s] installing sidecar dependencies (first run)…", PLATFORM) + result = subprocess.run( + [npm, "install", "--omit=dev", "--no-audit", "--no-fund"], + cwd=self.bridge_home, + capture_output=True, + timeout=_NPM_INSTALL_TIMEOUT, + ) + if result.returncode != 0: + logger.error( + "[%s] npm install failed: %s", + PLATFORM, + result.stderr.decode(errors="replace")[-500:], + ) + return result.returncode == 0 + + def _spawn(self, node: str) -> subprocess.Popen: + self.session_dir.mkdir(parents=True, exist_ok=True) + kwargs: dict[str, Any] = {} + if sys.platform != "win32": + kwargs["start_new_session"] = True # own process group → clean teardown + return subprocess.Popen( + [ + node, + str(self.bridge_home / "bridge.js"), + "--port", + str(self.port), + "--session", + str(self.session_dir), + "--mode", + self.mode, + ], + cwd=self.bridge_home, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + **kwargs, + ) + + async def _read_events(self) -> None: + """Consume the sidecar's stdout until it closes. Draining the pipe is also what + keeps the sidecar from blocking on a full buffer.""" + proc = self._proc + while proc is not None and proc.stdout is not None: + line = await asyncio.to_thread(proc.stdout.readline) + if not line: # EOF → sidecar exited + self.state.update({"state": "down", "qr": None}) + break + await self.handle_event_line(line.decode(errors="replace")) + + async def connect(self) -> bool: + node = self.node_path() + if not node: + logger.error("[%s] Node.js is required but not installed", PLATFORM) + return False + await asyncio.to_thread(self._stage_bridge) + if not self._deps_installed(): + if not await asyncio.to_thread(self._install_deps): + return False + self._ready.clear() + self._proc = self._spawn(node) + self._reader_task = asyncio.create_task(self._read_events()) + try: + await asyncio.wait_for(self._ready.wait(), timeout=_READY_TIMEOUT) + except asyncio.TimeoutError: + logger.error("[%s] sidecar did not become ready", PLATFORM) + await self.disconnect() + return False + logger.info( + "[%s] sidecar up on port %s (mode=%s)", PLATFORM, self.port, self.mode + ) + return True + + async def status(self) -> dict[str, Any]: + """Pairing/connection state for the UI, served from the cached event stream.""" + running = self._proc is not None and self._proc.poll() is None + return {"running": running, **self.state} + + async def send( + self, chat_id: str, text: str, *, thread_id: Optional[str] = None + ) -> SendResult: + return await asyncio.to_thread( + send_whatsapp_personal, str(self.port), chat_id, text, thread_id + ) + + async def disconnect(self) -> None: + if self._proc is not None: + self._proc.terminate() + try: + await asyncio.to_thread(self._proc.wait, 5) + except subprocess.TimeoutExpired: + self._proc.kill() + self._proc = None + if self._reader_task is not None: + self._reader_task.cancel() + self._reader_task = None + self.state.update({"state": "down", "qr": None}) diff --git a/platform/coworker/connectors/senders.py b/platform/coworker/connectors/senders.py index 318a1602..10bef0b2 100644 --- a/platform/coworker/connectors/senders.py +++ b/platform/coworker/connectors/senders.py @@ -75,3 +75,24 @@ def _send_slack( "telegram": _send_telegram, "slack": _send_slack, } + +# Which SecretStore profile key is passed to the sender as its first argument, and whether +# it must be present. ("bot_token", True) is the default; platforms whose sender needs no +# stored credential (e.g. a localhost bridge) register (key, False) and receive "" when the +# profile doesn't set the key. +SENDER_CREDENTIALS: dict[str, tuple[str, bool]] = { + "telegram": ("bot_token", True), + "slack": ("bot_token", True), +} + + +def register_sender( + platform: str, + sender: Sender, + *, + credential_key: str = "bot_token", + credential_required: bool = True, +) -> None: + """Register an outbound sender for an extra platform (used by the experimental package).""" + DEFAULT_SENDERS[platform] = sender + SENDER_CREDENTIALS[platform] = (credential_key, credential_required) diff --git a/platform/coworker/connectors/tools.py b/platform/coworker/connectors/tools.py index e4a3f10d..0d507ab3 100644 --- a/platform/coworker/connectors/tools.py +++ b/platform/coworker/connectors/tools.py @@ -13,7 +13,7 @@ from ..secrets import SecretStore from .base import parse_target -from .senders import DEFAULT_SENDERS, Sender +from .senders import DEFAULT_SENDERS, SENDER_CREDENTIALS, Sender _SCHEMA = { "type": "function", @@ -57,9 +57,10 @@ def send_message(target: str, text: str) -> dict[str, Any]: if sender is None: return {"error": f"unknown platform: {platform}"} creds = secrets.get(f"{platform}:default") or {} - token = creds.get("bot_token") - if not token: - return {"error": f"no bot token for {platform} — connect it first"} + cred_key, cred_required = SENDER_CREDENTIALS.get(platform, ("bot_token", True)) + token = str(creds.get(cred_key) or "") + if cred_required and not token: + return {"error": f"no {cred_key} for {platform} — connect it first"} result = sender(token, chat_id, text, thread_id) if result.ok: return {"ok": True, "message_id": result.message_id, "target": target} diff --git a/platform/coworker/server/app.py b/platform/coworker/server/app.py index f9533909..8c3ddda7 100644 --- a/platform/coworker/server/app.py +++ b/platform/coworker/server/app.py @@ -196,6 +196,11 @@ async def connector_connect(name: str, body: dict) -> dict[str, Any]: def connector_disconnect(name: str) -> dict[str, Any]: return manager.disconnect_connector(name) + @app.get("/v1/connectors/{name}/status") + async def connector_status(name: str) -> dict[str, Any]: + # live adapter state — e.g. the WhatsApp Personal pairing QR while it's scanning + return await manager.connector_status(name) + @app.patch("/v1/connectors/{name}/tools") def connector_tools_patch(name: str, body: dict) -> dict[str, Any]: enabled = (body or {}).get("enabled") diff --git a/platform/coworker/server/manager.py b/platform/coworker/server/manager.py index 281db752..9bfe672b 100644 --- a/platform/coworker/server/manager.py +++ b/platform/coworker/server/manager.py @@ -35,6 +35,7 @@ set_experimental_enabled, update_connector_tools, ) +from ..connectors.descriptors import get_descriptor as connector_descriptor from ..connectors.browser_automation import ( browser_close_session, browser_state, @@ -364,6 +365,14 @@ def connect_connector( def set_experimental_connectors(self, value: bool) -> dict[str, Any]: return set_experimental_enabled(self.secrets, value) + async def connector_status(self, name: str) -> dict[str, Any]: + """Live adapter status (pairing QR etc.) for connectors whose adapter exposes one.""" + adapter = self.gateway._adapters.get(name) if self.gateway is not None else None + status = getattr(adapter, "status", None) + if adapter is None or status is None: + return {"running": False, "status": "down"} + return await status() + def disconnect_connector(self, name: str) -> dict[str, Any]: return disconnect_connector(self.secrets, name) @@ -991,6 +1000,14 @@ async def start_gateway(self) -> list[str]: for platform, st in settings.items(): if not st.enabled: continue + # experimental platforms only listen while the opt-in setting is on + desc = connector_descriptor(platform) + if ( + desc is not None + and desc.experimental + and not experimental_enabled(self.secrets) + ): + continue profile = self.secrets.get(f"{platform}:default") or {} adapter = make_adapter(platform, profile) if adapter is not None: diff --git a/platform/packaging/coworker-server.spec b/platform/packaging/coworker-server.spec index 5b979d1c..c3906988 100644 --- a/platform/packaging/coworker-server.spec +++ b/platform/packaging/coworker-server.spec @@ -46,6 +46,18 @@ if not INCLUDE_EXPERIMENTAL: hiddenimports = [ m for m in hiddenimports if not m.startswith("coworker.connectors.experimental") ] +else: + # experimental builds also need the WhatsApp bridge sources as data files + _bridge = os.path.join( + PLATFORM, "coworker", "connectors", "experimental", "whatsapp_bridge" + ) + for _f in ("bridge.js", "package.json"): + datas.append( + ( + os.path.join(_bridge, _f), + "coworker/connectors/experimental/whatsapp_bridge", + ) + ) for pkg in ("uvicorn", "certifi", "anyio"): d, b, h = collect_all(pkg) diff --git a/platform/pyproject.toml b/platform/pyproject.toml index ac07b9eb..c2866456 100644 --- a/platform/pyproject.toml +++ b/platform/pyproject.toml @@ -43,6 +43,11 @@ coworker-connectors = "coworker.connectors.cli:main" where = ["."] include = ["coworker*"] +[tool.setuptools.package-data] +# The experimental WhatsApp bridge sources (staged into the state dir at first connect; +# npm dependencies are installed there, never vendored). +"coworker.connectors.experimental" = ["whatsapp_bridge/bridge.js", "whatsapp_bridge/package.json"] + [tool.pytest.ini_options] testpaths = ["tests"] asyncio_mode = "auto" diff --git a/platform/tests/test_connectors.py b/platform/tests/test_connectors.py index 5d7f1a93..193d8c74 100644 --- a/platform/tests/test_connectors.py +++ b/platform/tests/test_connectors.py @@ -1231,10 +1231,213 @@ def test_experimental_rest_roundtrip(tmp_path, monkeypatch, experimental_descrip assert r["ok"] is True -def test_experimental_package_loads_cleanly(): - """The experimental package import hook is a no-op when the package is empty or absent.""" +def test_experimental_package_contents(): + """Only the experimental package contributes experimental descriptors.""" from coworker.connectors.descriptors import DESCRIPTORS from coworker.connectors.experimental import EXPERIMENTAL_DESCRIPTORS - assert EXPERIMENTAL_DESCRIPTORS == [] - assert all(d.experimental is False for d in DESCRIPTORS if d.name != "dangerzone") + exp_names = {d.name for d in EXPERIMENTAL_DESCRIPTORS} + assert exp_names == {"whatsapp_personal"} + assert all(d.experimental is True for d in EXPERIMENTAL_DESCRIPTORS) + assert all( + d.experimental is False + for d in DESCRIPTORS + if d.name not in exp_names and d.name != "dangerzone" + ) + + +# -- whatsapp personal (experimental two-way via local bridge) ------------------- +def test_whatsapp_personal_registered(): + from coworker.connectors.adapters import make_adapter + from coworker.connectors.config import PLATFORMS + from coworker.connectors.experimental.whatsapp_personal import ( + WhatsAppPersonalAdapter, + ) + from coworker.connectors.senders import DEFAULT_SENDERS, SENDER_CREDENTIALS + + assert "whatsapp_personal" in PLATFORMS + assert "whatsapp_personal" in DEFAULT_SENDERS + assert SENDER_CREDENTIALS["whatsapp_personal"] == ("bridge_port", False) + adapter = make_adapter("whatsapp_personal", {"bridge_port": "4001", "mode": "all"}) + assert isinstance(adapter, WhatsAppPersonalAdapter) + assert adapter.port == 4001 and adapter.mode == "all" + assert make_adapter("whatsapp_personal", {}).mode == "self" # default + + +def test_whatsapp_personal_descriptor_gated(tmp_path): + from coworker.connectors import connector_list, set_experimental_enabled + from coworker.connectors.descriptors import get_descriptor + + desc = get_descriptor("whatsapp_personal") + assert desc is not None and desc.experimental is True and desc.two_way is True + assert "ban" in desc.risk_notice.lower() # the warning must be blunt + + secrets = SecretStore(tmp_path / "secrets.json") + assert "whatsapp_personal" not in {c["name"] for c in connector_list(secrets)} + set_experimental_enabled(secrets, True) + assert "whatsapp_personal" in {c["name"] for c in connector_list(secrets)} + + +def test_whatsapp_personal_load_settings(tmp_path): + from coworker.connectors.config import load_settings + + secrets = SecretStore(tmp_path / "secrets.json") + settings = load_settings(secrets) + assert settings["whatsapp_personal"].enabled is False # no profile yet + + # a bare profile (QR pairing, no stored credential) is enough to enable listening + secrets.put( + "whatsapp_personal:default", + {"type": "token", "enabled": True, "allowed_users": ["14155550123"]}, + ) + settings = load_settings(secrets) + assert settings["whatsapp_personal"].enabled is True + assert settings["whatsapp_personal"].allowed_users == {"14155550123"} + + +def test_bridge_event_mapper(): + from coworker.connectors.experimental.whatsapp_personal import ( + bridge_event_to_message, + ) + + ev = bridge_event_to_message( + { + "event": "message", + "id": "A1", + "chat": "1415@s.whatsapp.net", + "sender": "1415", + "name": "Ro", + "group": False, + "text": "hello", + "ts": 5, + } + ) + assert ev.text == "hello" and ev.source.platform == "whatsapp_personal" + assert ev.source.target == "whatsapp_personal:1415@s.whatsapp.net" + assert ev.source.chat_type == "dm" and ev.message_id == "A1" + + # non-message events and empty texts map to nothing + assert bridge_event_to_message({"event": "qr", "qr": "x"}) is None + assert ( + bridge_event_to_message({"event": "message", "chat": "x", "text": ""}) is None + ) + grp = bridge_event_to_message( + {"event": "message", "chat": "g1@g.us", "text": "t", "group": True} + ) + assert grp.source.chat_type == "group" + + +async def test_adapter_event_stream_handling(): + """The adapter's stdout-event handler drives state, QR, readiness, and inbound.""" + from coworker.connectors.experimental.whatsapp_personal import ( + WhatsAppPersonalAdapter, + ) + + adapter = WhatsAppPersonalAdapter({}) + received = [] + + async def handler(ev): + received.append(ev) + + adapter.set_message_handler(handler) + + await adapter.handle_event_line('{"event":"ready","port":3941}') + assert adapter._ready.is_set() + + await adapter.handle_event_line('{"event":"qr","qr":"QRDATA"}') + assert adapter.state["qr"] == "QRDATA" + + await adapter.handle_event_line('{"event":"state","state":"open","me":"1415"}') + assert adapter.state["state"] == "open" and adapter.state["me"] == "1415" + assert adapter.state["qr"] is None # QR cleared once paired + + await adapter.handle_event_line( + '{"event":"message","id":"M1","chat":"1415@s.whatsapp.net",' + '"sender":"1415","name":"Ro","group":false,"text":"hi","ts":1}' + ) + assert len(received) == 1 and received[0].text == "hi" + + await adapter.handle_event_line("not json at all") # noise must not raise + + status = await adapter.status() + assert status["running"] is False and status["state"] == "open" + + +def test_whatsapp_personal_sender(monkeypatch): + import httpx + + from coworker.connectors.experimental.whatsapp_personal import ( + send_whatsapp_personal, + ) + + calls = {} + + class _Resp: + def json(self): + return {"sent": True, "id": "M9"} + + def fake_post(url, json=None, timeout=None): + calls["url"], calls["json"] = url, json + return _Resp() + + monkeypatch.setattr(httpx, "post", fake_post) + + res = send_whatsapp_personal("", "1415@s.whatsapp.net", "hi") + assert res.ok and res.message_id == "M9" + assert calls["url"] == "http://127.0.0.1:3941/send" # default port when unset + assert calls["json"] == {"to": "1415@s.whatsapp.net", "body": "hi"} + + send_whatsapp_personal("4567", "c", "t") + assert calls["url"] == "http://127.0.0.1:4567/send" + + +def test_send_message_tool_no_credential_platform(tmp_path): + """send_message must not demand a bot token for credential-less platforms.""" + from coworker.connectors import make_send_message_tool + from coworker.connectors.base import SendResult as SR + + secrets = SecretStore(tmp_path / "secrets.json") + secrets.put("whatsapp_personal:default", {"type": "token", "enabled": True}) + record = [] + + def sender(token, chat_id, text, thread_id=None): + record.append((token, chat_id, text)) + return SR(True, message_id="1") + + tool = make_send_message_tool(secrets, senders={"whatsapp_personal": sender}) + out = tool(target="whatsapp_personal:1415@s.whatsapp.net", text="yo") + assert out["ok"] is True + assert record == [("", "1415@s.whatsapp.net", "yo")] + + +async def test_gateway_skips_experimental_until_opted_in(tmp_path, monkeypatch): + import coworker.server.manager as mgr + from coworker.connectors import FakeAdapter, set_experimental_enabled + + monkeypatch.setenv("COWORKER_STATE_DIR", str(tmp_path / "state")) + m = mgr.SessionManager(data_dir=tmp_path / "data", provider=_StubProvider()) + m.secrets.put("whatsapp_personal:default", {"type": "token", "enabled": True}) + m.secrets.put("telegram:default", {"bot_token": "T", "enabled": True}) + + built = [] + + def fake_make_adapter(platform, profile): + built.append(platform) + fake = FakeAdapter() + fake.platform = platform + return fake + + monkeypatch.setattr(mgr, "make_adapter", fake_make_adapter) + + live = await m.start_gateway() + assert "telegram" in built and "telegram" in live + assert "whatsapp_personal" not in built # experimental opt-in is off + await m.stop_gateway() + await m.scheduler.stop() + + set_experimental_enabled(m.secrets, True) + built.clear() + live = await m.start_gateway() + assert "whatsapp_personal" in built and "whatsapp_personal" in live + await m.stop_gateway() + await m.scheduler.stop()