forked from camwolff02/groundstation
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
126 lines (111 loc) · 3.82 KB
/
utils.py
File metadata and controls
126 lines (111 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""Boiler plate functions and classes provided by documentation"""
import sys
import logging
from typing import Set, Type
from traceback import print_exception
from foxglove.websocket import (
ChannelView,
Client,
ClientChannel,
ServerListener,
)
try:
import google.protobuf.message
from google.protobuf.descriptor_pb2 import FileDescriptorSet
from google.protobuf.descriptor import FileDescriptor
except ImportError:
print_exception(*sys.exc_info())
print(
"Unable to import protobuf schemas; \
did you forget to run `pip install 'foxglove-websocket[examples]'`?",
)
sys.exit(1)
def build_file_descriptor_set(
message_class: Type[google.protobuf.message.Message],
) -> FileDescriptorSet:
"""
Build a FileDescriptorSet representing the message class and its
dependencies.
"""
file_descriptor_set = FileDescriptorSet()
seen_dependencies: Set[str] = set()
def append_file_descriptor(file_descriptor: FileDescriptor):
for dep in file_descriptor.dependencies:
if dep.name not in seen_dependencies:
seen_dependencies.add(dep.name)
append_file_descriptor(dep)
file_descriptor.CopyToProto(file_descriptor_set.file.add())
append_file_descriptor(message_class.DESCRIPTOR.file)
return file_descriptor_set
class CustomListener(ServerListener):
def __init__(self) -> None:
# Map client id -> set of subscribed topics
self.subscribers: dict[int, set[str]] = {}
def has_subscribers(self) -> bool:
return len(self.subscribers) > 0
def on_subscribe(
self,
client: Client,
channel: ChannelView,
) -> None:
"""
Called by the server when a client subscribes to a channel.
We'll use this and on_unsubscribe to simply track if we have any
subscribers at all.
"""
logging.info(f"Client {client} subscribed to channel {channel.topic}")
self.subscribers.setdefault(client.id, set()).add(channel.topic)
def on_unsubscribe(
self,
client: Client,
channel: ChannelView,
) -> None:
"""
Called by the server when a client unsubscribes from a channel.
"""
logging.info(
f"Client {client} unsubscribed from channel {channel.topic}"
)
self.subscribers[client.id].remove(channel.topic)
if not self.subscribers[client.id]:
del self.subscribers[client.id]
def on_client_advertise(
self,
client: Client,
channel: ClientChannel,
) -> None:
"""
Called when a client advertises a new channel.
"""
logging.info(f"Client {client.id} advertised channel: {channel.id}")
logging.info(f" Topic: {channel.topic}")
logging.info(f" Encoding: {channel.encoding}")
logging.info(f" Schema name: {channel.schema_name}")
logging.info(f" Schema encoding: {channel.schema_encoding}")
logging.info(f" Schema: {channel.schema!r}")
def on_message_data(
self,
client: Client,
client_channel_id: int,
data: bytes,
) -> None:
"""
This handler demonstrates receiving messages from the client.
You can send messages from Foxglove app in the publish panel:
https://docs.foxglove.dev/docs/visualization/panels/publish
"""
logging.info(
f"Message from client {client.id} on channel {client_channel_id}"
)
logging.info(f"Data: {data!r}")
def on_client_unadvertise(
self,
client: Client,
client_channel_id: int,
) -> None:
"""
Called when a client unadvertises a new channel.
"""
logging.info(
f"Client {client.id} unadvertised channel: {client_channel_id}"
)