-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgatt_manager.py
More file actions
executable file
·250 lines (195 loc) · 7.28 KB
/
gatt_manager.py
File metadata and controls
executable file
·250 lines (195 loc) · 7.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
#!/usr/bin/env python3
import argparse
import gatt
import pynmea2
import logging
import redis
from datetime import datetime
logger = logging.getLogger(__name__)
r = redis.Redis()
data_session = r.incr('sessioncounter')
class GenericDevice(gatt.Device):
def connect_succeeded(self):
super().connect_succeeded()
logger.info("Connected to [{}]".format(self.mac_address))
def connect_failed(self, error):
super().connect_failed(error)
logger.info("Connection failed [{}]: {}".format(self.mac_address, error))
def disconnect_succeeded(self):
super().disconnect_succeeded()
logger.info("Disconnected [{}]".format(self.mac_address))
def services_resolved(self):
super().services_resolved()
logger.info("Resolved services [{}]".format(self.mac_address))
for service in self.services:
logger.info("\t[{}] Service [{}]".format(self.mac_address, service.uuid))
for characteristic in service.characteristics:
logger.info("\t\tCharacteristic [{}]".format(characteristic.uuid))
def characteristic_enable_notifications_succeeded(self, characteristic):
logger.debug('Successfully enabled notifications for chrstc [{}]'.format(characteristic.uuid))
def characteristic_enable_notifications_failed(self, characteristic, error):
logger.debug('Failed to enabled notifications for chrstc [{}]: {}'.format(characteristic.uuid, error))
def find_service(self, uuid):
for service in self.services:
if service.uuid == uuid:
return service
return None
def find_characteristic(self, service, uuid):
for chrstc in service.characteristics:
if chrstc.uuid == uuid:
return chrstc
return None
class GPSDevice(GenericDevice):
SERVICE_UUID_UART = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
CHARACTERISTIC_UUID_TX = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
CHARACTERISTIC_UUID_RX = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
class NextPackage:
def __init__(self):
self.datestamp = None
self.timestamp = None
self.lat = None
self.lng = None
self.alt = None
self.gps_qual = 0
self.num_sats = 0
self.h_dil = None
self.speed = None
self.dir = None
def __repr__(self):
msg = '{}(datestamp={},timestamp={},lat={},lng={},alt={},gps_qual={},num_sats={},speed={},dir={})'
return msg.format(
type(self).__name__,
repr(self.datestamp),
repr(self.timestamp),
repr(self.lat),
repr(self.lng),
repr(self.alt),
repr(self.gps_qual),
repr(self.num_sats),
repr(self.speed),
repr(self.dir))
def is_valid(self):
return (self.datestamp is not None and
self.timestamp is not None and
self.lat is not None and
self.lng is not None and
self.alt is not None and
self.speed is not None and
self.dir is not None)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.buffer = ''
self.next_package = self.NextPackage()
def services_resolved(self):
super().services_resolved()
service = self.find_service(self.SERVICE_UUID_UART)
chrstc = self.find_characteristic(service, self.CHARACTERISTIC_UUID_RX)
chrstc.enable_notifications()
def characteristic_value_updated(self, characteristic, value):
self.buffer += value.decode()
split = self.buffer.splitlines()
# Fix buffer and split so that we don't have half lines
if len(split) >= 0:
if len(split[-1]) >= 3 and split[-1][-3] == '*':
self.buffer = ''
else:
self.buffer = split[-1]
split = split[0:-1]
for line in split:
try:
msg = pynmea2.parse(line)
logger.debug(line)
# Parse input
if isinstance(msg, pynmea2.GGA):
if msg.gps_qual == 0:
logger.debug('No lock')
logger.debug(repr(msg))
continue
self.next_package.timestamp = msg.timestamp
self.next_package.lat = msg.lat + msg.lat_dir
self.next_package.lng = msg.lon + msg.lon_dir
self.next_package.alt = '{:.2f}{}'.format(msg.altitude, msg.altitude_units)
self.next_package.gps_qual = msg.gps_qual
self.next_package.num_sats = msg.num_sats
elif isinstance(msg, pynmea2.RMC):
self.next_package.datestamp = msg.datestamp
self.next_package.speed = msg.spd_over_grnd * 1.852 # Convert to KM/H
self.next_package.dir = msg.true_course
if (self.next_package.is_valid()):
logger.debug(str(self.next_package))
# Do redis stuff here
keyid = datetime.combine(
self.next_package.datestamp,
self.next_package.timestamp
).isoformat()
r.rpush('gpsentries', keyid)
r.hmset('gpsdata:{}'.format(keyid),
{
'session' : data_session,
'latitude' : self.next_package.lat,
'longitude' : self.next_package.lng,
'altitude' : self.next_package.alt,
'speed' : self.next_package.speed,
'direction' : self.next_package.dir
})
self.next_package = self.NextPackage()
except pynmea2.ChecksumError as e:
((msg, line),) = e.args
logger.info('NMEA: {}: Skipping sentence'.format(msg))
logger.debug('\t{}'.format(line))
except pynmea2.ParseError as e:
((msg, line),) = e.args
logger.warn('NMEA: {}: Skipping sentence'.format(msg))
logger.debug('\t{}'.format(line))
class IMUDevice(GenericDevice):
SERVICE_UUID_UART = "0000ffe0-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_UART = "0000ffe1-0000-1000-8000-00805f9b34fb"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.buffer = ''
def services_resolved(self):
super().services_resolved()
service = self.find_service(self.SERVICE_UUID_UART)
chrstc = self.find_characteristic(service, self.CHARACTERISTIC_UUID_UART)
chrstc.enable_notifications()
def characteristic_value_updated(self, characteristic, value):
self.buffer += value.decode()
split = self.buffer.splitlines()
self.buffer = split[-1]
for i in split[0:-1]:
# Need to parse input and push accel x y z, gyro x y z, mag x y z and temp
pass
class AnyDeviceManager(gatt.DeviceManager):
# For now, device mac addresses are hard coded into the program. An enhancement for the
# future is a registration system, whereby nodes could be registered with this edge device
# to add their mac addresses to a database.
registered_device_macs = {
"3c:71:bf:84:b3:86" : GPSDevice
}
def device_discovered(self, device):
#logging.debug('Found device [{}] type "{}"'.format(device.mac_address, type(device)))
if type(device) is not gatt.Device and not device.is_connected():
logging.debug('Attempting connection with [{}]'.format(device.mac_address))
device.connect()
def make_device(self, mac_address):
if mac_address not in self.registered_device_macs:
return gatt.Device(manager=self, mac_address=mac_address)
return self.registered_device_macs[mac_address](manager=self, mac_address=mac_address)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='count', default=0)
args = parser.parse_args()
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
logging.basicConfig(level=levels[min(len(levels) - 1, args.verbose)])
logger.warning('Level WARNING')
logger.info('Level INFO')
logger.debug('Level DEBUG')
manager = AnyDeviceManager('hci0')
manager.start_discovery()
try:
manager.run()
except KeyboardInterrupt:
for device in manager.devices():
if device.is_connected():
device.disconnect()
manager.stop()