@@ -476,6 +476,10 @@ def __init__(self) -> None:
476476 self ._leader_ttl = settings .redis_leader_ttl
477477 self ._leader_heartbeat_interval = settings .redis_leader_heartbeat_interval
478478 self ._leader_heartbeat_task : Optional [asyncio .Task ] = None
479+ self ._follower_election_task : Optional [asyncio .Task ] = None
480+
481+ # Log instance mapping for debugging
482+ logger .info (f"Instance started: instance_id={ self ._instance_id } , port={ settings .port } , pid={ os .getpid ()} " )
479483
480484 # Always initialize file lock as fallback (used if Redis connection fails at runtime)
481485 if settings .cache_type != "none" :
@@ -588,8 +592,12 @@ async def initialize(self) -> None:
588592 logger .info ("Acquired Redis leadership. Starting health check and heartbeat tasks." )
589593 self ._health_check_task = asyncio .create_task (self ._run_health_checks (user_email ))
590594 self ._leader_heartbeat_task = asyncio .create_task (self ._run_leader_heartbeat ())
595+ else :
596+ # Did not acquire leadership - start follower election loop
597+ logger .info ("Did not acquire leadership. Starting follower election loop." )
598+ self ._follower_election_task = asyncio .create_task (self ._run_follower_election (user_email ))
591599 else :
592- # Always create the health check task in filelock mode; leader check is handled inside.
600+ # No Redis available - always create the health check task in filelock mode
593601 self ._health_check_task = asyncio .create_task (self ._run_health_checks (user_email ))
594602
595603 async def shutdown (self ) -> None :
@@ -608,14 +616,25 @@ async def shutdown(self) -> None:
608616 >>> len(service._active_gateways)
609617 0
610618 """
619+ # Cancel follower election FIRST to prevent it from spawning new
620+ # health-check / heartbeat tasks while we are tearing down.
621+ if getattr (self , "_follower_election_task" , None ):
622+ self ._follower_election_task .cancel ()
623+ try :
624+ await self ._follower_election_task
625+ except asyncio .CancelledError :
626+ pass
627+
628+ # Now safe to cancel health-check and heartbeat (handles may have been
629+ # overwritten by follower election just before cancellation — that is fine,
630+ # we always cancel whichever task the attribute currently points to).
611631 if self ._health_check_task :
612632 self ._health_check_task .cancel ()
613633 try :
614634 await self ._health_check_task
615635 except asyncio .CancelledError :
616636 pass
617637
618- # Cancel leader heartbeat task if running
619638 if getattr (self , "_leader_heartbeat_task" , None ):
620639 self ._leader_heartbeat_task .cancel ()
621640 try :
@@ -3901,34 +3920,87 @@ def get_first_gateway_by_url(self, db: Session, url: str, team_id: Optional[str]
39013920 return self .convert_gateway_to_read (result )
39023921
39033922 async def _run_leader_heartbeat (self ) -> None :
3904- """Run leader heartbeat loop to keep leader key alive.
3905-
3906- This runs independently from health checks to ensure the leader key
3907- is refreshed frequently enough (every redis_leader_heartbeat_interval seconds)
3908- to prevent expiration during long-running health check operations.
3923+ """Run leader heartbeat loop with Redis reconnection support.
39093924
3910- The loop exits if this instance loses leadership.
3925+ Refreshes the leader key TTL every heartbeat interval. Exits and starts
3926+ follower election if leadership is lost or after consecutive failures.
39113927 """
3928+ consecutive_failures = 0
3929+ max_failures = 3
3930+
39123931 while True :
39133932 try :
39143933 await asyncio .sleep (self ._leader_heartbeat_interval )
39153934
39163935 if not self ._redis_client :
3917- return
3936+ logger .warning ("Redis client unavailable in heartbeat" )
3937+ consecutive_failures += 1
3938+ if consecutive_failures >= max_failures :
3939+ logger .error ("Lost Redis connection, stopping heartbeat" )
3940+ return
3941+ continue
39183942
39193943 # Check if we're still the leader
39203944 current_leader = await self ._redis_client .get (self ._leader_key )
39213945 if current_leader != self ._instance_id :
39223946 logger .info ("Lost Redis leadership, stopping heartbeat" )
3947+ self ._start_follower_election ()
39233948 return
39243949
39253950 # Refresh the leader key TTL
39263951 await self ._redis_client .expire (self ._leader_key , self ._leader_ttl )
39273952 logger .debug (f"Leader heartbeat: refreshed TTL to { self ._leader_ttl } s" )
3953+ consecutive_failures = 0
3954+
3955+ except Exception as e :
3956+ consecutive_failures += 1
3957+ logger .warning (f"Leader heartbeat error (failure { consecutive_failures } /{ max_failures } ): { e } " )
3958+ if consecutive_failures >= max_failures :
3959+ logger .error ("Too many consecutive heartbeat failures, starting follower election" )
3960+ self ._start_follower_election ()
3961+ return
3962+
3963+ def _start_follower_election (self ) -> None :
3964+ """Start a follower election task if one is not already running."""
3965+ if self ._follower_election_task is None or self ._follower_election_task .done ():
3966+ self ._follower_election_task = asyncio .create_task (self ._run_follower_election (settings .platform_admin_email ))
3967+
3968+ async def _run_follower_election (self , user_email : str ) -> None :
3969+ """Continuously attempt to acquire leadership when not the leader.
3970+
3971+ This runs on follower instances and polls Redis to claim leadership
3972+ when the current leader key expires or becomes available.
3973+
3974+ Args:
3975+ user_email: Email of the user for OAuth token lookup
3976+ """
3977+ retry_interval = max (1 , self ._leader_ttl // 3 ) # Poll at 1/3 of TTL
3978+
3979+ while True :
3980+ try :
3981+ await asyncio .sleep (retry_interval )
3982+
3983+ if not self ._redis_client :
3984+ logger .warning ("Redis client unavailable, cannot attempt election." )
3985+ continue
3986+
3987+ # Attempt to acquire leadership
3988+ is_leader = await self ._redis_client .set (self ._leader_key , self ._instance_id , ex = self ._leader_ttl , nx = True )
3989+
3990+ if is_leader :
3991+ logger .info ("Acquired Redis leadership via follower election. Starting health check and heartbeat." )
3992+ # Cancel stale tasks from a previous leadership period to prevent
3993+ # orphaned loops running alongside the new ones.
3994+ if self ._health_check_task and not self ._health_check_task .done ():
3995+ self ._health_check_task .cancel ()
3996+ if getattr (self , "_leader_heartbeat_task" , None ) and not self ._leader_heartbeat_task .done ():
3997+ self ._leader_heartbeat_task .cancel ()
3998+ self ._health_check_task = asyncio .create_task (self ._run_health_checks (user_email ))
3999+ self ._leader_heartbeat_task = asyncio .create_task (self ._run_leader_heartbeat ())
4000+ return # Exit follower loop, now running as leader
39284001
39294002 except Exception as e :
3930- logger .warning (f"Leader heartbeat error: { e } " )
3931- # Continue trying - the main health check loop will handle leadership loss
4003+ logger .warning (f"Follower election error: { e } " , exc_info = True )
39324004
39334005 async def _run_health_checks (self , user_email : str ) -> None :
39344006 """Run health checks periodically,
0 commit comments