Skip to content

Commit 5499afd

Browse files
authored
Slackbot: ignore edits while initial response is in progress (#1224)
* slackbot: ignore edits while initial response is in progress (in-process lock)\n\n- Add simple per-thread asyncio.Lock to gate concurrent handling\n- Send friendly notice for message_changed edits during processing\n- Avoid DB changes; keep example lightweight\n- Add subtype to SlackEvent model\n * slackbot: cross-process dedupe via tiny sqlite status\n\n- Add _internal.thread_status with try_acquire/get_status/mark_completed\n- Gate mention handling with DB-backed idempotency key (thread_ts)\n- Post friendly edit-ignored message when in_progress\n- Keep api/core lean; no sprawl * address copilot review: remove non-threadsafe schema flag; improve assert message for edit notice
1 parent 8dc1e91 commit 5499afd

4 files changed

Lines changed: 152 additions & 1 deletion

File tree

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""Thread status tracking for Slack events (cross-process safe).
2+
3+
This module encapsulates a tiny SQLite-backed status mechanism to avoid duplicate
4+
processing of the same Slack thread when users edit their original post or when
5+
Slack delivers multiple events. It keeps the rest of the app simple and avoids
6+
sprawling status logic across files.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from typing import Literal
12+
13+
from slackbot.core import Database
14+
15+
Status = Literal["in_progress", "completed"]
16+
17+
18+
async def ensure_schema(db: Database) -> None:
19+
"""Ensure the status table exists.
20+
21+
Using CREATE TABLE IF NOT EXISTS is idempotent and cheap enough to call
22+
whenever we touch the status table, avoiding module-level state or locks.
23+
"""
24+
25+
def _create() -> None:
26+
db.con.execute(
27+
"""
28+
CREATE TABLE IF NOT EXISTS slack_thread_status (
29+
thread_ts TEXT PRIMARY KEY,
30+
status TEXT NOT NULL CHECK (status IN ('in_progress','completed')),
31+
updated_at REAL NOT NULL
32+
);
33+
"""
34+
)
35+
db.con.commit()
36+
37+
await db.loop.run_in_executor(db.executor, _create)
38+
39+
40+
async def try_acquire(db: Database, thread_ts: str) -> bool:
41+
"""Attempt to mark a thread as in_progress; returns True if acquired.
42+
43+
Uses an atomic INSERT OR IGNORE on a PRIMARY KEY to prevent duplicates across
44+
concurrent processes or tasks.
45+
"""
46+
await ensure_schema(db)
47+
48+
def _insert() -> int:
49+
cur = db.con.cursor()
50+
cur.execute(
51+
"""
52+
INSERT OR IGNORE INTO slack_thread_status (thread_ts, status, updated_at)
53+
VALUES (?, 'in_progress', strftime('%s','now'))
54+
""",
55+
(thread_ts,),
56+
)
57+
db.con.commit()
58+
return cur.rowcount
59+
60+
rowcount: int = await db.loop.run_in_executor(db.executor, _insert)
61+
return rowcount == 1
62+
63+
64+
async def get_status(db: Database, thread_ts: str) -> Status | None:
65+
await ensure_schema(db)
66+
67+
def _query() -> Status | None:
68+
cur = db.con.cursor()
69+
cur.execute(
70+
"SELECT status FROM slack_thread_status WHERE thread_ts = ?",
71+
(thread_ts,),
72+
)
73+
row = cur.fetchone()
74+
return row[0] if row else None # type: ignore[return-value]
75+
76+
return await db.loop.run_in_executor(db.executor, _query)
77+
78+
79+
async def mark_completed(db: Database, thread_ts: str) -> None:
80+
await ensure_schema(db)
81+
82+
def _update() -> None:
83+
cur = db.con.cursor()
84+
cur.execute(
85+
"""
86+
UPDATE slack_thread_status
87+
SET status = 'completed', updated_at = strftime('%s','now')
88+
WHERE thread_ts = ?
89+
""",
90+
(thread_ts,),
91+
)
92+
db.con.commit()
93+
94+
await db.loop.run_in_executor(db.executor, _update)

examples/slackbot/src/slackbot/api.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717

1818
from slackbot._internal.constants import WORKSPACE_TO_CHANNEL_ID
1919
from slackbot._internal.templates import CHANNEL_REDIRECT_MESSAGE, WELCOME_MESSAGE
20+
from slackbot._internal.thread_status import (
21+
get_status as get_thread_status,
22+
)
23+
from slackbot._internal.thread_status import (
24+
mark_completed as mark_thread_completed,
25+
)
26+
from slackbot._internal.thread_status import (
27+
try_acquire as try_acquire_thread,
28+
)
2029
from slackbot.assets import summarize_thread
2130
from slackbot.core import (
2231
Database,
@@ -40,6 +49,9 @@
4049

4150
logger = get_logger(__name__)
4251

52+
# Duplicate handling is coordinated via a small SQLite table in
53+
# _internal.thread_status to work across processes.
54+
4355

4456
def get_designated_channel_for_workspace(team_id: str) -> str | None:
4557
"""Get the designated channel ID for a given workspace team ID."""
@@ -137,6 +149,37 @@ async def handle_message(payload: SlackPayload, db: Database):
137149
return Completed(message="Message too long", name="SKIPPED")
138150

139151
if re.search(BOT_MENTION, user_message) and payload.authorizations:
152+
# Use the root thread timestamp as our idempotency key
153+
root_ts = thread_ts
154+
155+
# Cross-process acquire; only one handler should proceed
156+
acquired = await try_acquire_thread(db, root_ts)
157+
if not acquired:
158+
status = await get_thread_status(db, root_ts)
159+
if status == "in_progress":
160+
assert event.channel is not None, (
161+
"Event channel is None when posting edit-ignored notice"
162+
)
163+
await post_slack_message(
164+
message=(
165+
"✋ I noticed you edited your original message. "
166+
"I'm already working on your first version — please add any "
167+
"clarifications as new messages in this thread so I don't lose track."
168+
),
169+
channel_id=event.channel,
170+
thread_ts=root_ts,
171+
)
172+
return Completed(
173+
message="Ignored edit while in progress",
174+
name="IGNORED_EDIT",
175+
data=dict(thread_ts=root_ts),
176+
)
177+
return Completed(
178+
message="Duplicate event after completion",
179+
name="SKIPPED_DUPLICATE",
180+
data=dict(thread_ts=root_ts),
181+
)
182+
140183
# Check if this is the designated channel
141184
team_id = payload.team_id or ""
142185
is_designated = check_if_designated_channel(event.channel, team_id)
@@ -188,7 +231,11 @@ async def handle_message(payload: SlackPayload, db: Database):
188231

189232
try:
190233
result = await run_agent(
191-
cleaned_message, conversation, user_context, event.channel, thread_ts
234+
cleaned_message,
235+
conversation,
236+
user_context,
237+
event.channel,
238+
thread_ts,
192239
) # type: ignore
193240

194241
await db.add_thread_messages(thread_ts, result.new_messages())
@@ -213,6 +260,12 @@ async def handle_message(payload: SlackPayload, db: Database):
213260
name="ERROR_HANDLED",
214261
data=dict(error=str(e), user_context=user_context),
215262
)
263+
finally:
264+
try:
265+
await mark_thread_completed(db, root_ts)
266+
except Exception:
267+
logger.warning("Failed to mark thread as completed")
268+
216269
return Completed(
217270
message="Responded to mention",
218271
data=dict(user_context=user_context, conversation=conversation),

examples/slackbot/src/slackbot/core.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ def _insert():
129129

130130
await self.loop.run_in_executor(self.executor, _insert)
131131

132+
# Note: Thread status tracking is handled in-process within api.py to keep
133+
# persistence minimal for the examples package.
134+
132135

133136
@task(task_run_name="build user context for {user_id}")
134137
def build_user_context(

examples/slackbot/src/slackbot/slack.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class EventBlock(BaseModel):
2929
class SlackEvent(BaseModel):
3030
client_msg_id: str | None = None
3131
type: str
32+
subtype: str | None = None
3233
text: str | None = None
3334
user: str | dict[str, Any] | None = None
3435
ts: str | None = None

0 commit comments

Comments
 (0)