-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_sse_client.py
More file actions
31 lines (29 loc) · 1005 Bytes
/
test_sse_client.py
File metadata and controls
31 lines (29 loc) · 1005 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import requests, time, json
url = 'http://127.0.0.1:5002/chat_sse?text=Streaming+audio+test+from+client'
print('Connecting to', url)
with requests.get(url, stream=True, timeout=60) as r:
r.raise_for_status()
start = time.time()
partial = None
for line in r.iter_lines(decode_unicode=True):
if time.time() - start > 12:
print('timed out'); break
if not line:
continue
line = line.strip()
if line.startswith('event:'):
ev = line[len('event:'):].strip()
print('EVENT', ev)
continue
if line.startswith('data:'):
data = line[len('data:'):].strip()
print('DATA', data)
try:
if ev == 'audio':
obj = json.loads(data)
print(' got audio obj keys', list(obj.keys()))
else:
pass
except Exception as e:
print(' parse error', e)
print('done')