|
22 | 22 | from monarch._src.actor.actor_mesh import ActorMesh |
23 | 23 | from monarch._src.actor.shape import Extent |
24 | 24 |
|
25 | | -from monarch.actor import Actor, endpoint, HostMesh, ProcMesh, this_host |
| 25 | +from monarch.actor import ( |
| 26 | + Actor, |
| 27 | + endpoint, |
| 28 | + HostMesh, |
| 29 | + ProcMesh, |
| 30 | + shutdown_context, |
| 31 | + this_host, |
| 32 | +) |
26 | 33 |
|
27 | 34 | from monarch.tools import commands |
28 | 35 | from monarch.utils import setup_env_for_distributed |
@@ -486,6 +493,21 @@ async def shutdown_all_allocations(self): |
486 | 493 | self._registered_actors.clear() |
487 | 494 | self._registered_services.clear() |
488 | 495 |
|
| 496 | + # -- HostMeshes (including the implicit local host) --- |
| 497 | + logger.info(f"Shutting down {len(self._host_mesh_map)} HostMesh(es)...") |
| 498 | + results = await asyncio.gather( |
| 499 | + *[host_mesh.shutdown() for host_mesh in self._host_mesh_map.values()], |
| 500 | + return_exceptions=True, |
| 501 | + ) |
| 502 | + for (name, _), result in zip(self._host_mesh_map.items(), results, strict=True): |
| 503 | + if isinstance(result, Exception): |
| 504 | + logger.warning(f"Failed to shutdown HostMesh {name}: {result}") |
| 505 | + self._host_mesh_map.clear() |
| 506 | + try: |
| 507 | + await shutdown_context() |
| 508 | + except Exception as e: |
| 509 | + logger.warning(f"Failed to shutdown context: {e}") |
| 510 | + |
489 | 511 | async def shutdown(self): |
490 | 512 | """Tears down all remaining remote allocations.""" |
491 | 513 | await self.shutdown_all_allocations() |
|
0 commit comments