-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessage_processor.py
More file actions
336 lines (266 loc) · 12.4 KB
/
message_processor.py
File metadata and controls
336 lines (266 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
"""
Asynchronous message processing for Coreflux MCP Server
This module provides non-blocking message processing capabilities
to improve server performance under high message loads.
"""
import asyncio
import threading
import queue
import logging
import time
import json
from typing import Dict, List, Optional, Callable, Any
from datetime import datetime, timedelta
from collections import defaultdict, deque
class MessageProcessor:
"""Asynchronous message processor for MQTT messages"""
def __init__(self, logger: logging.Logger, max_buffer_size: int = 10000):
self.logger = logger
self.max_buffer_size = max_buffer_size
# Message buffers
self.message_queue = queue.Queue(maxsize=max_buffer_size)
self.processed_messages = defaultdict(lambda: deque(maxlen=100)) # Keep last 100 per topic
# Processing statistics
self.stats = {
'messages_received': 0,
'messages_processed': 0,
'messages_dropped': 0,
'processing_errors': 0,
'start_time': datetime.now()
}
# Processing control
self.processing_active = False
self.processor_thread = None
self.message_handlers: Dict[str, Callable] = {}
# Rate limiting
self.rate_limits = defaultdict(lambda: deque(maxlen=100)) # Track message times per topic
self.rate_limit_threshold = 50 # Messages per minute per topic
def start_processing(self):
"""Start the message processing thread"""
if self.processing_active:
self.logger.warning("Message processor already running")
return
self.processing_active = True
self.processor_thread = threading.Thread(
target=self._process_messages_loop,
name="MessageProcessor",
daemon=True
)
self.processor_thread.start()
self.logger.info("Message processor started")
def stop_processing(self):
"""Stop the message processing thread"""
if not self.processing_active:
return
self.processing_active = False
if self.processor_thread and self.processor_thread.is_alive():
self.processor_thread.join(timeout=5.0)
self.logger.info("Message processor stopped")
def register_handler(self, topic_pattern: str, handler: Callable[[str, str, Dict], None]):
"""Register a message handler for a topic pattern"""
self.message_handlers[topic_pattern] = handler
self.logger.debug(f"Registered handler for topic pattern: {topic_pattern}")
def add_message(self, topic: str, payload: str, metadata: Optional[Dict] = None) -> bool:
"""
Add a message to the processing queue
Returns True if message was added, False if queue is full
"""
if not self._check_rate_limit(topic):
self.stats['messages_dropped'] += 1
self.logger.warning(f"Rate limit exceeded for topic: {topic}")
return False
message = {
'topic': topic,
'payload': payload,
'metadata': metadata or {},
'timestamp': datetime.now(),
'id': f"{topic}_{int(time.time() * 1000)}"
}
try:
self.message_queue.put_nowait(message)
self.stats['messages_received'] += 1
return True
except queue.Full:
self.stats['messages_dropped'] += 1
self.logger.warning(f"Message queue full, dropping message for topic: {topic}")
return False
def get_recent_messages(self, topic: str, limit: int = 10) -> List[Dict]:
"""Get recent messages for a topic"""
messages = list(self.processed_messages.get(topic, []))
return messages[-limit:] if messages else []
def get_all_recent_messages(self, limit: int = 50) -> List[Dict]:
"""Get recent messages from all topics"""
all_messages = []
for topic, messages in self.processed_messages.items():
all_messages.extend([{**msg, 'topic': topic} for msg in messages])
# Sort by timestamp and return most recent
all_messages.sort(key=lambda x: x.get('timestamp', datetime.min), reverse=True)
return all_messages[:limit]
def get_statistics(self) -> Dict[str, Any]:
"""Get processing statistics"""
uptime = datetime.now() - self.stats['start_time']
queue_size = self.message_queue.qsize()
return {
**self.stats,
'uptime_seconds': int(uptime.total_seconds()),
'queue_size': queue_size,
'queue_utilization': f"{(queue_size / self.max_buffer_size) * 100:.1f}%",
'processing_rate': self._calculate_processing_rate(),
'active_topics': len(self.processed_messages),
'registered_handlers': len(self.message_handlers)
}
def clear_buffers(self):
"""Clear all message buffers"""
# Clear processed messages
self.processed_messages.clear()
# Clear message queue
while not self.message_queue.empty():
try:
self.message_queue.get_nowait()
except queue.Empty:
break
# Reset statistics
self.stats.update({
'messages_received': 0,
'messages_processed': 0,
'messages_dropped': 0,
'processing_errors': 0,
'start_time': datetime.now()
})
self.logger.info("Message buffers cleared")
def _process_messages_loop(self):
"""Main message processing loop"""
self.logger.info("Message processing loop started")
while self.processing_active:
try:
# Get message with timeout to allow checking processing_active
message = self.message_queue.get(timeout=1.0)
self._process_single_message(message)
self.stats['messages_processed'] += 1
except queue.Empty:
continue # Timeout reached, check if we should continue
except Exception as e:
self.stats['processing_errors'] += 1
self.logger.error(f"Error in message processing loop: {str(e)}")
self.logger.info("Message processing loop stopped")
def _process_single_message(self, message: Dict):
"""Process a single message"""
topic = message['topic']
payload = message['payload']
metadata = message['metadata']
try:
# Store in processed messages buffer
processed_msg = {
'payload': payload,
'timestamp': message['timestamp'],
'id': message['id'],
'metadata': metadata
}
self.processed_messages[topic].append(processed_msg)
# Find and call appropriate handlers
handlers_called = 0
for pattern, handler in self.message_handlers.items():
if self._topic_matches_pattern(topic, pattern):
try:
handler(topic, payload, metadata)
handlers_called += 1
except Exception as e:
self.logger.error(f"Error in message handler for {pattern}: {str(e)}")
if handlers_called == 0:
self.logger.debug(f"No handlers found for topic: {topic}")
except Exception as e:
self.logger.error(f"Error processing message for topic {topic}: {str(e)}")
def _check_rate_limit(self, topic: str) -> bool:
"""Check if message rate limit is exceeded for a topic"""
now = datetime.now()
cutoff = now - timedelta(minutes=1)
# Clean old entries
topic_times = self.rate_limits[topic]
while topic_times and topic_times[0] < cutoff:
topic_times.popleft()
# Check rate limit
if len(topic_times) >= self.rate_limit_threshold:
return False
# Add current time
topic_times.append(now)
return True
def _calculate_processing_rate(self) -> float:
"""Calculate messages processed per second"""
uptime = datetime.now() - self.stats['start_time']
if uptime.total_seconds() == 0:
return 0.0
return self.stats['messages_processed'] / uptime.total_seconds()
def _topic_matches_pattern(self, topic: str, pattern: str) -> bool:
"""Check if a topic matches a pattern (supports MQTT wildcards)"""
# Simple wildcard matching for MQTT topics
# + matches single level, # matches multiple levels
if pattern == '#':
return True
topic_parts = topic.split('/')
pattern_parts = pattern.split('/')
# If pattern doesn't end with #, lengths must match
if not pattern.endswith('#') and len(topic_parts) != len(pattern_parts):
return False
for i, pattern_part in enumerate(pattern_parts):
if pattern_part == '#':
return True # # matches everything after this point
if i >= len(topic_parts):
return False
if pattern_part != '+' and pattern_part != topic_parts[i]:
return False
return True
class MessagePersistence:
"""Handle message persistence to prevent memory issues"""
def __init__(self, logger: logging.Logger, max_memory_messages: int = 1000):
self.logger = logger
self.max_memory_messages = max_memory_messages
self.memory_storage = defaultdict(lambda: deque(maxlen=100))
self.overflow_storage = {} # Could be implemented with SQLite or file storage
def store_message(self, topic: str, message: Dict) -> bool:
"""Store a message, using memory or persistence as needed"""
try:
self.memory_storage[topic].append(message)
# Check if we need to move old messages to persistent storage
total_messages = sum(len(deque_obj) for deque_obj in self.memory_storage.values())
if total_messages > self.max_memory_messages:
self._move_to_persistent_storage()
return True
except Exception as e:
self.logger.error(f"Error storing message: {str(e)}")
return False
def get_messages(self, topic: str, limit: int = 10) -> List[Dict]:
"""Retrieve messages for a topic"""
# Get from memory first
memory_messages = list(self.memory_storage.get(topic, []))
# If we need more and have persistent storage, get from there
if len(memory_messages) < limit and topic in self.overflow_storage:
# Implementation would retrieve from persistent storage
pass
return memory_messages[-limit:]
def _move_to_persistent_storage(self):
"""Move older messages to persistent storage"""
# This could be implemented with SQLite, files, or external storage
# For now, we just remove oldest messages
for topic, messages in self.memory_storage.items():
if len(messages) > 50: # Keep only recent 50 in memory
# In a full implementation, save the removed messages
removed_count = len(messages) - 50
for _ in range(removed_count):
if messages:
messages.popleft()
self.logger.debug("Moved older messages to make room in memory")
# Global message processor instance
_message_processor: Optional[MessageProcessor] = None
def get_message_processor(logger: logging.Logger) -> MessageProcessor:
"""Get or create the global message processor instance"""
global _message_processor
if _message_processor is None:
_message_processor = MessageProcessor(logger)
_message_processor.start_processing()
return _message_processor
def shutdown_message_processor():
"""Shutdown the global message processor"""
global _message_processor
if _message_processor is not None:
_message_processor.stop_processing()
_message_processor = None