|
7 | 7 | from django.core.exceptions import ImproperlyConfigured |
8 | 8 | from django.test import TransactionTestCase, tag |
9 | 9 | from django.urls import reverse |
10 | | -from django.utils.timezone import now, timedelta |
11 | | -from freezegun import freeze_time |
| 10 | +from django.utils.timezone import now |
12 | 11 |
|
13 | 12 | from .. import settings as app_settings |
14 | 13 | from ..tasks import create_mesh_topology |
@@ -216,44 +215,6 @@ def test_mesh_id_changed(self): |
216 | 215 | 3, |
217 | 216 | ) |
218 | 217 |
|
219 | | - def test_discard_old_monitoring_data(self): |
220 | | - now_time = now() |
221 | | - with freeze_time(now_time - timedelta(minutes=20)): |
222 | | - devices, org = self._populate_mesh(SIMPLE_MESH_DATA) |
223 | | - self.assertEqual(Topology.objects.count(), 1) |
224 | | - topology = Topology.objects.first() |
225 | | - self.assertEqual(topology.node_set.count(), 3) |
226 | | - self.assertEqual(topology.link_set.filter(status='up').count(), 3) |
227 | | - |
228 | | - with freeze_time(now_time - timedelta(minutes=10)): |
229 | | - # Only two devices sent monitoring data |
230 | | - for device in devices[:2]: |
231 | | - response = self.client.post( |
232 | | - '{0}?key={1}&time={2}'.format( |
233 | | - reverse('monitoring:api_device_metric', args=[device.id]), |
234 | | - device.key, |
235 | | - now().utcnow().strftime('%d-%m-%Y_%H:%M:%S.%f'), |
236 | | - ), |
237 | | - data=json.dumps( |
238 | | - { |
239 | | - 'type': 'DeviceMonitoring', |
240 | | - 'interfaces': SIMPLE_MESH_DATA[device.mac_address], |
241 | | - } |
242 | | - ), |
243 | | - content_type='application/json', |
244 | | - ) |
245 | | - self.assertEqual(response.status_code, 200) |
246 | | - create_mesh_topology.delay(organization_ids=(org.id,)) |
247 | | - self.assertEqual(Topology.objects.count(), 1) |
248 | | - self.assertEqual(topology.node_set.count(), 3) |
249 | | - self.assertEqual(topology.link_set.filter(status='up').count(), 1) |
250 | | - |
251 | | - # No device is sending monitoring data |
252 | | - create_mesh_topology.delay(organization_ids=(org.id,)) |
253 | | - self.assertEqual(Topology.objects.count(), 1) |
254 | | - self.assertEqual(topology.node_set.count(), 3) |
255 | | - self.assertEqual(topology.link_set.filter(status='up').count(), 0) |
256 | | - |
257 | 218 | def test_topology_admin(self): |
258 | 219 | """ |
259 | 220 | Tests WifiMeshInlineAdmin is present in TopologyAdmin |
|
0 commit comments