Skip to content

Commit ca9ed02

Browse files
feat(notification): handle deduping for tokens
1 parent b41c0a5 commit ca9ed02

3 files changed

Lines changed: 119 additions & 6 deletions

File tree

raven_cloud/api/notification.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from frappe.utils.response import Response
99

1010
from raven_cloud.utils.fcm import get_app
11-
from raven_cloud.utils.notification import sanitize_fcm_data
11+
from raven_cloud.utils.notification import sanitize_fcm_data, get_background_job_id
1212
from raven_cloud.utils.rc_caching import get_push_tokens_for_user
1313

1414
Message = dict[str, Any]
@@ -58,12 +58,16 @@ def send(messages: str, site_name: str):
5858
if isinstance(messages, str):
5959
messages = json.loads(messages)
6060

61+
job_id = get_background_job_id(messages, site_name)
62+
6163
# Enqueue the job of sending notifications
6264
frappe.enqueue(
6365
_send,
6466
messages=messages,
6567
site_url=site_name,
6668
queue='short',
69+
job_id=job_id,
70+
deduplicate=True
6771
)
6872

6973
# TODO: Return the response from api itself.
@@ -92,6 +96,12 @@ def _send(messages: Messages, site_url: str):
9296

9397
try:
9498
for message in messages:
99+
# remove duplicate tokens to avoid sending duplicate notifications
100+
message_tokens = list(dict.fromkeys(message.get("tokens", [])))
101+
102+
if not message_tokens:
103+
continue
104+
95105
notification = None
96106
data = None
97107
webpush = None
@@ -138,7 +148,7 @@ def _send(messages: Messages, site_url: str):
138148
if message.get('data'):
139149
data = sanitize_fcm_data(message['data'])
140150

141-
for token in message.get('tokens', []):
151+
for token in message_tokens:
142152
fcm_message = messaging.Message(
143153
token=token,
144154
notification=notification,
@@ -224,8 +234,17 @@ def send_to_users(messages: str, site_name: str):
224234
if isinstance(messages, str):
225235
messages = json.loads(messages)
226236

237+
job_id = get_background_job_id(messages, site_name)
238+
227239
# enqueue the job of sending notifications
228-
frappe.enqueue(_send_to_users, messages=messages, site_url=site_name, queue='short')
240+
frappe.enqueue(
241+
_send_to_users,
242+
messages=messages,
243+
site_url=site_name,
244+
queue='short',
245+
job_id=job_id,
246+
deduplicate=True
247+
)
229248

230249
# TODO: Return the response from api itself.
231250
return
@@ -258,6 +277,9 @@ def _send_to_users(messages: Messages, site_url: str):
258277
for user in message.get("users", []):
259278
message_tokens.extend(get_push_tokens_for_user(user, site_url))
260279

280+
# remove duplicate tokens to avoid sending duplicate notifications - not using set() because it doesn't preserve order
281+
message_tokens = list(dict.fromkeys(message_tokens))
282+
261283
if not message_tokens:
262284
continue
263285

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
from types import SimpleNamespace
2+
from unittest.mock import patch
3+
4+
import frappe
5+
from frappe.tests import IntegrationTestCase
6+
7+
import raven_cloud.api.notification as notification_api
8+
9+
10+
class TestNotificationSendToUsers(IntegrationTestCase):
11+
def setUp(self):
12+
super().setUp()
13+
self.site_name = f"rc-push-repro-{frappe.generate_hash(length=8)}.test"
14+
frappe.get_doc({"doctype": "RC Site", "site": self.site_name}).insert(ignore_permissions=True)
15+
16+
def tearDown(self):
17+
frappe.db.rollback()
18+
super().tearDown()
19+
20+
def create_site_user_with_token(self, user_id: str, token: str):
21+
site_user = frappe.get_doc(
22+
{"doctype": "RC Site User", "site": self.site_name, "user_id": user_id}
23+
).insert(ignore_permissions=True)
24+
frappe.get_doc(
25+
{"doctype": "RC Site User Token", "user": site_user.name, "fcm_token": token}
26+
).insert(ignore_permissions=True)
27+
28+
def test_send_to_users_should_send_once_for_duplicate_users(self):
29+
self.create_site_user_with_token("prathamesh@example.com", "tok-prathamesh")
30+
31+
sent_tokens = []
32+
33+
with patch.object(notification_api, "get_app", return_value=object()), patch.object(
34+
notification_api.messaging,
35+
"Message",
36+
side_effect=lambda **kwargs: SimpleNamespace(token=kwargs["token"]),
37+
), patch.object(
38+
notification_api.messaging,
39+
"send_each",
40+
side_effect=lambda fcm_messages, app=None: (
41+
sent_tokens.extend(message.token for message in fcm_messages)
42+
or SimpleNamespace(failure_count=0, responses=[])
43+
),
44+
):
45+
notification_api._send_to_users(
46+
[{"users": ["prathamesh@example.com", "prathamesh@example.com", "prathamesh@example.com"]}],
47+
self.site_name,
48+
)
49+
50+
log = frappe.get_last_doc("RC Push Notification Log", filters={"site": self.site_name})
51+
52+
self.assertEqual(["tok-prathamesh"], sent_tokens)
53+
self.assertEqual(1, log.number_of_tokens)
54+
55+
def test_send_to_users_should_send_once_for_shared_token(self):
56+
self.create_site_user_with_token("prathamesh@example.com", "tok-shared")
57+
self.create_site_user_with_token("aditya@example.com", "tok-shared")
58+
59+
sent_tokens = []
60+
61+
with patch.object(notification_api, "get_app", return_value=object()), patch.object(
62+
notification_api.messaging,
63+
"Message",
64+
side_effect=lambda **kwargs: SimpleNamespace(token=kwargs["token"]),
65+
), patch.object(
66+
notification_api.messaging,
67+
"send_each",
68+
side_effect=lambda fcm_messages, app=None: (
69+
sent_tokens.extend(message.token for message in fcm_messages)
70+
or SimpleNamespace(failure_count=0, responses=[])
71+
),
72+
):
73+
notification_api._send_to_users(
74+
[{"users": ["prathamesh@example.com", "aditya@example.com"]}],
75+
self.site_name,
76+
)
77+
78+
log = frappe.get_last_doc("RC Push Notification Log", filters={"site": self.site_name})
79+
80+
self.assertEqual(["tok-shared"], sent_tokens)
81+
self.assertEqual(1, log.number_of_tokens)

raven_cloud/utils/notification.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# import frappe
1+
import frappe
22

33
def sanitize_fcm_data(data_dict):
44
"""
@@ -7,9 +7,19 @@ def sanitize_fcm_data(data_dict):
77
"""
88
if not data_dict:
99
return None
10-
10+
1111
sanitized = {}
1212
for key, value in data_dict.items():
1313
# convert None to empty string, everything else to string
1414
sanitized[key] = str(value) if value is not None else ""
15-
return sanitized
15+
return sanitized
16+
17+
18+
def get_background_job_id(messages, site_name: str) -> str:
19+
first = (messages or [{}])[0]
20+
data = first.get("data") or {}
21+
22+
channel_id = data.get("channel_id") or first.get("channel_id") or "no-channel"
23+
message_id = data.get("message_id") or first.get("message_id") or "no-message"
24+
25+
return f"rc_push_{frappe.scrub(site_name)}_{frappe.scrub(channel_id)}_{frappe.scrub(message_id)}"

0 commit comments

Comments
 (0)