-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Expand file tree
/
Copy pathfay_booter.py
More file actions
445 lines (388 loc) · 17.4 KB
/
fay_booter.py
File metadata and controls
445 lines (388 loc) · 17.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
#核心启动模块
import time
import os
import re
import pyaudio
import socket
import requests
from core.interact import Interact
from core.recorder import Recorder
from scheduler.thread_manager import MyThread
from utils import util, config_util, stream_util
from core.wsa_server import MyServer
from core import wsa_server
from core import socket_bridge_service
# from llm.nlp_cognitive_stream import save_agent_memory
# 全局变量声明
feiFei = None
recorderListener = None
__running = False
deviceSocketServer = None
DeviceInputListenerDict = {}
ngrok = None
socket_service_instance = None
mcp_sse_server = None
mcp_sse_thread = None
# 是否启用内置 MCP SSE 服务器(默认关闭,需显式开启以避免端口/代理问题)
mcp_sse_enabled = True
# 延迟导入fay_core
def get_fay_core():
from core import fay_core
return fay_core
#启动状态
def is_running():
return __running
#录制麦克风音频输入并传给aliyun
class RecorderListener(Recorder):
def __init__(self, device, fei):
self.__device = device
self.__FORMAT = pyaudio.paInt16
self.__running = False
self.username = 'User'
# 这两个参数会在 get_stream 中根据实际设备更新
self.channels = None
self.sample_rate = None
super().__init__(fei)
def on_speaking(self, text):
if len(text) > 1:
interact = Interact("mic", 1, {'user': 'User', 'msg': text})
util.printInfo(3, "语音", '{}'.format(interact.data["msg"]), time.time())
feiFei.on_interact(interact)
def get_stream(self):
try:
while True:
config_util.load_config()
record = config_util.config['source']['record']
if record['enabled']:
break
time.sleep(0.1)
self.paudio = pyaudio.PyAudio()
# 获取默认输入设备的信息
default_device = self.paudio.get_default_input_device_info()
self.channels = min(int(default_device.get('maxInputChannels', 1)), 2) # 最多使用2个通道
# self.sample_rate = int(default_device.get('defaultSampleRate', 16000))
util.printInfo(1, "系统", f"默认麦克风信息 - 采样率: {self.sample_rate}Hz, 通道数: {self.channels}")
# 使用系统默认麦克风
self.stream = self.paudio.open(
format=self.__FORMAT,
channels=self.channels,
rate=self.sample_rate,
input=True,
frames_per_buffer=1024
)
self.__running = True
MyThread(target=self.__pyaudio_clear).start()
except Exception as e:
util.log(1, f"打开麦克风时出错: {str(e)}")
util.printInfo(1, self.username, "请检查录音设备是否有误,再重新启动!")
time.sleep(10)
return self.stream
def __pyaudio_clear(self):
try:
while self.__running:
time.sleep(30)
except Exception as e:
util.log(1, f"音频清理线程出错: {str(e)}")
finally:
if hasattr(self, 'stream') and self.stream:
try:
self.stream.stop_stream()
self.stream.close()
except Exception as e:
util.log(1, f"关闭音频流时出错: {str(e)}")
def stop(self):
super().stop()
self.__running = False
time.sleep(0.1)#给清理线程一点处理时间
try:
while self.is_reading:#是为了确保停止的时候麦克风没有刚好在读取音频的
time.sleep(0.1)
if self.stream is not None:
self.stream.stop_stream()
self.stream.close()
self.paudio.terminate()
except Exception as e:
print(e)
util.log(1, "请检查设备是否有误,再重新启动!")
def is_remote(self):
return False
#Edit by xszyou on 20230113:录制远程设备音频输入并传给aliyun
class DeviceInputListener(Recorder):
def __init__(self, deviceConnector, fei):
super().__init__(fei)
self.__running = True
self.streamCache = None
self.thread = MyThread(target=self.run)
self.thread.start() #启动远程音频输入设备监听线程
self.username = 'User'
self.isOutput = True
self.deviceConnector = deviceConnector
def run(self):
#启动ngork
self.streamCache = stream_util.StreamCache(1024*1024*20)
addr = None
while self.__running:
try:
data = b""
while self.deviceConnector:
data = self.deviceConnector.recv(2048)
if b"<username>" in data:
data_str = data.decode("utf-8")
match = re.search(r"<username>(.*?)</username>", data_str)
if match:
self.username = match.group(1)
else:
self.streamCache.write(data)
if b"<output>" in data:
data_str = data.decode("utf-8")
match = re.search(r"<output>(.*?)<output>", data_str)
if match:
self.isOutput = (match.group(1) == "True")
else:
self.streamCache.write(data)
if not b"<username>" in data and not b"<output>" in data:
self.streamCache.write(data)
time.sleep(0.005)
self.streamCache.clear()
except Exception as err:
pass
time.sleep(1)
def on_speaking(self, text):
global feiFei
if len(text) > 1:
interact = Interact("socket", 1, {"user": self.username, "msg": text, "socket": self.deviceConnector})
util.printInfo(3, "(" + self.username + ")远程音频输入", '{}'.format(interact.data["msg"]), time.time())
feiFei.on_interact(interact)
#recorder会等待stream不为空才开始录音
def get_stream(self):
while not self.deviceConnector:
time.sleep(1)
pass
return self.streamCache
def stop(self):
super().stop()
self.__running = False
def is_remote(self):
return True
#检查远程音频连接状态
def device_socket_keep_alive():
global DeviceInputListenerDict
while __running:
delkey = None
for key, value in DeviceInputListenerDict.items():
try:
value.deviceConnector.send(b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8')#发送心跳包
if wsa_server.get_web_instance().is_connected(value.username):
wsa_server.get_web_instance().add_cmd({"remote_audio_connect": True, "Username" : value.username})
except Exception as serr:
util.printInfo(1, value.username, "远程音频输入输出设备已经断开:{}".format(key))
value.stop()
delkey = key
break
if delkey:
value = DeviceInputListenerDict.pop(delkey)
if wsa_server.get_web_instance().is_connected(value.username):
wsa_server.get_web_instance().add_cmd({"remote_audio_connect": False, "Username" : value.username})
time.sleep(10)
#远程音频连接
def accept_audio_device_output_connect():
global deviceSocketServer
global __running
global DeviceInputListenerDict
deviceSocketServer = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
deviceSocketServer.bind(("0.0.0.0",10001))
deviceSocketServer.listen(1)
MyThread(target = device_socket_keep_alive).start() # 开启心跳包检测
addr = None
while __running:
try:
deviceConnector,addr = deviceSocketServer.accept() #接受TCP连接,并返回新的套接字与IP地址
deviceInputListener = DeviceInputListener(deviceConnector, feiFei) # 设备音频输入输出麦克风
deviceInputListener.start()
#把DeviceInputListenner对象记录下来
peername = str(deviceConnector.getpeername()[0]) + ":" + str(deviceConnector.getpeername()[1])
DeviceInputListenerDict[peername] = deviceInputListener
util.log(1,"远程音频{}输入输出设备连接上:{}".format(len(DeviceInputListenerDict), addr))
except Exception as e:
pass
#数字人端请求获取最新的自动播报消息,若自动播报服务关闭会自动退出自动播报
def start_auto_play_service(): #TODO 评估一下有无优化的空间
if config_util.config['source'].get('automatic_player_url') is None or config_util.config['source'].get('automatic_player_status') is None:
return
url = f"{config_util.config['source']['automatic_player_url']}/get_auto_play_item"
user = "User" #TODO 临时固死了
is_auto_server_error = False
while __running:
if config_util.config['source']['wake_word_enabled'] and config_util.config['source']['wake_word_type'] == 'common' and recorderListener.wakeup_matched == True:
time.sleep(0.01)
continue
if is_auto_server_error:
util.printInfo(1, user, '60s后重连自动播报服务器')
time.sleep(60)
# 请求自动播报服务器
with get_fay_core().auto_play_lock:
if config_util.config['source']['automatic_player_status'] and config_util.config['source']['automatic_player_url'] is not None and get_fay_core().can_auto_play == True and (config_util.config["interact"]["playSound"] or wsa_server.get_instance().is_connected(user)):
get_fay_core().can_auto_play = False
post_data = {"user": user}
try:
response = requests.post(url, json=post_data, timeout=5)
if response.status_code == 200:
is_auto_server_error = False
data = response.json()
audio_url = data.get('audio')
if not audio_url or audio_url.strip()[0:4] != "http":
audio_url = None
response_text = data.get('text')
if audio_url is None and (response_text is None or '' == response_text.strip()):
continue
timestamp = data.get('timestamp')
interact = Interact("auto_play", 2, {'user': user, 'text': response_text, 'audio': audio_url})
util.printInfo(1, user, '自动播报:{},{}'.format(response_text, audio_url), time.time())
feiFei.on_interact(interact)
else:
is_auto_server_error = True
get_fay_core().can_auto_play = True
util.printInfo(1, user, '请求自动播报服务器失败,错误代码是:{}'.format(response.status_code))
except requests.exceptions.RequestException as e:
is_auto_server_error = True
get_fay_core().can_auto_play = True
util.printInfo(1, user, '请求自动播报服务器失败,错误信息是:{}'.format(e))
time.sleep(0.01)
#停止服务
def stop():
global feiFei
global recorderListener
global __running
global DeviceInputListenerDict
global ngrok
global socket_service_instance
global deviceSocketServer
global mcp_sse_server
global mcp_sse_thread
util.log(1, '正在关闭服务...')
__running = False
# 关闭 MCP SSE 服务
try:
if mcp_sse_server is not None:
util.log(1, '正在关闭MCP SSE服务器...')
try:
mcp_sse_server.should_exit = True
except Exception:
pass
if mcp_sse_thread is not None and mcp_sse_thread.is_alive():
mcp_sse_thread.join(timeout=2)
util.log(1, 'MCP SSE服务器已关闭')
except Exception as e:
util.log(1, f'MCP SSE服务器关闭异常: {e}')
# 断开所有MCP服务连接
util.log(1, '正在断开所有MCP服务连接...')
try:
from faymcp import mcp_service
mcp_service.disconnect_all_mcp_servers()
util.log(1, '所有MCP服务连接已断开')
except Exception as e:
util.log(1, f'断开MCP服务连接失败: {str(e)}')
# 保存代理记忆
util.log(1, '正在保存代理记忆...')
try:
from llm.nlp_cognitive_stream import save_agent_memory
save_agent_memory()
util.log(1, '代理记忆保存成功')
except Exception as e:
util.log(1, f'保存代理记忆失败: {str(e)}')
if recorderListener is not None:
util.log(1, '正在关闭录音服务...')
recorderListener.stop()
time.sleep(0.1)
util.log(1, '正在关闭远程音频输入输出服务...')
try:
if len(DeviceInputListenerDict) > 0:
for key in list(DeviceInputListenerDict.keys()):
value = DeviceInputListenerDict.pop(key)
value.stop()
deviceSocketServer.close()
if socket_service_instance is not None:
socket_service_instance.stop_server()
socket_service_instance = None
except:
pass
util.log(1, '正在关闭核心服务...')
feiFei.stop()
util.log(1, '服务已关闭!')
#开启服务
def start():
global feiFei
global recorderListener
global __running
global socket_service_instance
global mcp_sse_server
global mcp_sse_thread
util.log(1, '开启服务...')
__running = True
#读取配置
util.log(1, '读取配置...')
config_util.load_config()
# 启动阶段预热 embedding 服务(避免首条消息时才初始化维度)
try:
util.log(1, '启动阶段预热 embedding 服务...')
from simulation_engine.gpt_structure import get_text_embedding
# 检查服务是否已经初始化
from utils.api_embedding_service import get_embedding_service
service = get_embedding_service()
if hasattr(service, 'embedding_dim') and service.embedding_dim is not None:
util.log(1, f'embedding 服务已初始化,维度: {service.embedding_dim}')
else:
util.log(1, '初始化 embedding 服务维度...')
get_text_embedding("dimension_check")
util.log(1, f'embedding 服务维度初始化完成: {service.embedding_dim}')
util.log(1, 'embedding 服务预热完成')
except Exception as e:
util.log(1, f'embedding 服务预热失败: {str(e)}')
#开启核心服务
util.log(1, '开启核心服务...')
feiFei = get_fay_core().FeiFei()
feiFei.start()
util.log(1, '初始化定时保存记忆及反思的任务...')
from llm.nlp_cognitive_stream import init_memory_scheduler
init_memory_scheduler()
#开启录音服务
record = config_util.config['source']['record']
if record['enabled']:
util.log(1, '开启录音服务...')
recorderListener = RecorderListener('device', feiFei) # 监听麦克风
recorderListener.start()
#启动声音沟通接口服务
util.log(1,'启动声音沟通接口服务...')
deviceSocketThread = MyThread(target=accept_audio_device_output_connect)
deviceSocketThread.start()
socket_service_instance = socket_bridge_service.new_instance()
socket_bridge_service_Thread = MyThread(target=socket_service_instance.start_service)
socket_bridge_service_Thread.start()
#启动自动播报服务
util.log(1,'启动自动播报服务...')
MyThread(target=start_auto_play_service).start()
# 启动 MCP SSE 服务(需显式开启)
if mcp_sse_enabled:
try:
from faymcp import mcp_server as fay_mcp_server
import uvicorn
util.log(1, f"MCP SSE服务器启动中: http://{fay_mcp_server.HOST}:{fay_mcp_server.PORT}{fay_mcp_server.SSE_PATH}")
config = uvicorn.Config(
app=fay_mcp_server.app,
host=fay_mcp_server.HOST,
port=fay_mcp_server.PORT,
log_level="info"
)
mcp_sse_server = uvicorn.Server(config)
mcp_sse_thread = MyThread(target=mcp_sse_server.run, daemon=True)
mcp_sse_thread.start()
except Exception as e:
util.log(1, f"MCP SSE服务器启动异常: {e}")
else:
util.log(1, 'MCP SSE服务器默认未开启,设 FAY_MCP_SSE_ENABLE=1 可启用')
util.log(1, '服务启动完成!')
if __name__ == '__main__':
ws_server: MyServer = None
feiFei: get_fay_core().FeiFei = None
recorderListener: Recorder = None
start()