Skip to content

Phase 12.3 — CollaborationChannel: real-time pub/sub workspace for cooperating agents #358

@web3guru888

Description

@web3guru888

Phase 12.3 — CollaborationChannel: real-time shared workspace for cooperating agents

Parent Phase: Phase 12 — Multi-Agent Collaboration
Depends on: Phase 12.1 AgentRegistry (#352), Phase 12.2 NegotiationEngine (#355)
Next sub-phase: Phase 12.4 ConsensusVoting (planned)


Motivation

Once the NegotiationEngine assigns a task to a winning agent, that agent may need to collaborate in real-time with peers: sharing intermediate results, broadcasting state snapshots, and requesting specialised help mid-execution. CollaborationChannel provides a pub/sub workspace scoped to a single collaboration session, sitting between the negotiation layer and the consensus/coalition layers.


Data model

class MessageType(Enum):
    STATE_SNAPSHOT   = auto()   # agent broadcasts its current internal state
    PARTIAL_RESULT   = auto()   # intermediate computation result
    HELP_REQUEST     = auto()   # agent requests assistance from peers
    HELP_RESPONSE    = auto()   # peer responds to a help request
    SYNC_BARRIER     = auto()   # all-or-nothing synchronisation point
    HEARTBEAT        = auto()   # keep-alive / liveness probe

class ChannelStatus(Enum):
    OPEN    = auto()   # accepting posts and subscribers
    CLOSED  = auto()   # no new posts; existing subscribers drain
    EXPIRED = auto()   # TTL elapsed; channel evicted

@dataclass(frozen=True)
class ChannelMessage:
    message_id:   str
    channel_id:   str
    sender_did:   str
    message_type: MessageType
    payload:      Any
    reply_to:     str | None = None
    timestamp:    float = field(default_factory=time.time)

@dataclass(frozen=True)
class ChannelConfig:
    max_subscribers:    int   = 32
    max_history:        int   = 256
    ttl_seconds:        float = 600.0
    heartbeat_interval: float = 10.0
    require_membership: bool  = True

@dataclass(frozen=True)
class ChannelInfo:
    channel_id:    str
    owner_did:     str
    topic:         str
    status:        ChannelStatus
    member_dids:   frozenset[str]
    message_count: int
    created_at:    float
    last_activity: float

Protocols

class CollaborationChannel(Protocol):
    async def post(self, sender_did, message_type, payload, reply_to=None) -> ChannelMessage: ...
    async def subscribe(self, subscriber_did) -> AsyncIterator[ChannelMessage]: ...
    async def add_member(self, agent_did) -> None: ...
    async def remove_member(self, agent_did) -> None: ...
    async def close(self) -> None: ...
    async def info(self) -> ChannelInfo: ...

class ChannelManager(Protocol):
    async def create_channel(self, owner_did, topic, initial_members, config=None) -> CollaborationChannel: ...
    async def get_channel(self, channel_id) -> CollaborationChannel: ...
    async def list_channels(self, agent_did) -> list[ChannelInfo]: ...
    async def close_channel(self, channel_id) -> None: ...
    async def start(self) -> None: ...
    async def stop(self) -> None: ...

InMemoryCollaborationChannel

  • Per-subscriber asyncio.Queue(maxsize=512) — slow subscribers drop rather than block
  • Ring-buffer deque(maxlen=256) for history replay on late join
  • Membership guard (require_membership=True) — non-members raise PermissionError
  • _heartbeat_loop asyncio task — posts HEARTBEAT every heartbeat_interval seconds
  • Channel auto-expires after ttl_seconds of inactivity (status → EXPIRED)

InMemoryChannelManager

  • create_channel() — allocates UUID, starts _heartbeat_loop
  • get_channel() / list_channels() — lookup by ID or DID membership
  • close_channel() — removes from registry and closes channel
  • _evict_loop() — background task cleaning CLOSED/EXPIRED channels every 60 s

CognitiveCycle integration

async def _open_collaboration_channel(
    self, task_id, winner_did, peer_dids
) -> None:
    ch = await self._channel_manager.create_channel(
        owner_did       = self.agent_did,
        topic           = f"task:{task_id}",
        initial_members = [winner_did] + peer_dids,
    )
    self._active_channels[task_id] = ch
    await self._federated_task_router.dispatch(
        task_id, winner_did, collaboration_channel_id=ch._channel_id
    )

Prometheus metrics (5)

Metric Type Labels
asi_collab_channels_total Counter
asi_collab_channels_evicted_total Counter
asi_collab_messages_total Counter channel, type
asi_collab_active_subscribers Gauge channel
asi_collab_queue_drop_total Counter channel

Test targets (12)

  1. test_post_and_receive — single subscriber receives posted message
  2. test_late_join_history_replay — history deque replayed on subscribe
  3. test_require_membership_guard — non-member post raises PermissionError
  4. test_max_subscribers_cap — 33rd subscriber raises RuntimeError
  5. test_slow_subscriber_drop — full queue drops without blocking sender
  6. test_heartbeat_posted — heartbeat message delivered after interval
  7. test_ttl_expiry — channel transitions to EXPIRED after idle TTL
  8. test_close_drains_subscribersclose() terminates subscribe generators
  9. test_reply_threadingreply_to field chains messages correctly
  10. test_create_and_get_channel — manager round-trip via channel_id
  11. test_list_channels_by_memberlist_channels() filters by DID membership
  12. test_evict_loop_removes_expired — eviction task cleans up expired channels

Labels: enhancement, phase-12, multi-agent, collaboration

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions