Skip to content

Commit ba03fe0

Browse files
committed
feat(collaboration): implement lazy CRDT initialization with enhanced logging
Changes the CollaborationActor to initialize the Automerge document only after joining a room, enabling offline-first usage before peers connect. This improves the actor lifecycle and makes peer synchronization more predictable. Key changes: - Defer Automerge document initialization until roomJoined event - First peer in room initializes from current state, joining peers start empty - Add comprehensive logging throughout sync workflow for debugging - Fix race condition where sync messages could arrive before doc initialization - Make setState() return Promise to allow awaiting state commits - Update PeerMessagingActor to emit roomJoined event and handle existing peers correctly - Prevent duplicate WebRTC initiation when joining existing peers This addresses sync timing issues and provides better visibility into the collaboration flow.
1 parent 2bb16f2 commit ba03fe0

File tree

6 files changed

+139
-50
lines changed

6 files changed

+139
-50
lines changed

packages/collaboration/src/CollaborationActor.ts

Lines changed: 74 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as Automerge from '@automerge/automerge';
2-
import { Actor, effect, type IActorClient, type StateShape } from '@d-buckner/ensemble-core';
2+
import { Actor, effect, type IActorClient } from '@d-buckner/ensemble-core';
33
import type { PeerMessagingActor } from './PeerMessagingActor';
4-
import type { CollaborationEvents, AutomergeDoc, SyncState, MessagePayload } from './types';
4+
import type { CollaborationEvents, AutomergeDoc, SyncState, MessagePayload, RoomJoinedPayload } from './types';
55
import type { Draft } from 'mutative';
66

77
/**
@@ -44,49 +44,40 @@ export interface CollaborationDeps {
4444
*/
4545
export class CollaborationActor<TDoc extends Record<string, any> = Record<string, any>> extends Actor<TDoc, CollaborationEvents> {
4646
// Automerge internals (private, NOT in state)
47-
private automergeDoc: AutomergeDoc<TDoc>;
47+
// Null until roomJoined - allows offline-first usage before peers connect
48+
private automergeDoc: AutomergeDoc<TDoc> | null = null;
4849
private syncStates = new Map<string, SyncState>();
4950

5051
protected declare deps: CollaborationDeps;
5152

5253
/**
53-
* Create a new CollaborationActor with an initial document.
54-
* The document becomes the actor's state directly.
55-
*
56-
* @param initialDocument - Initial CRDT document (will be wrapped by Automerge)
57-
*/
58-
constructor(initialDocument: StateShape<TDoc>) {
59-
super(initialDocument);
60-
this.automergeDoc = Automerge.from(initialDocument as TDoc);
61-
}
62-
63-
/**
64-
* Override setState to route through Automerge CRDT.
65-
*
6654
* This enables transparent conflict resolution - users call setState()
6755
* just like any other actor, but changes are automatically synced
6856
* with peers and conflicts are resolved via Automerge.
6957
*
58+
* Before peers connect (automergeDoc is null), acts as normal Actor with local state only.
59+
*
7060
* @param updater - Function that mutates a draft of the current state
61+
* @returns Promise that resolves when the state update has been committed
7162
*/
72-
protected setState(updater: (draft: Draft<TDoc>) => void): void {
63+
protected setState(updater: (draft: Draft<TDoc>) => void): Promise<void> {
64+
// If no Automerge doc yet, just update local state (offline-first)
65+
if (!this.automergeDoc) {
66+
return super.setState(updater);
67+
}
68+
7369
// 1. Apply via Automerge (conflict resolution)
7470
const newDoc = Automerge.change(this.automergeDoc, updater as any);
7571
const changes = Automerge.getChanges(this.automergeDoc, newDoc);
7672
this.automergeDoc = newDoc;
7773

7874
// 2. Update actor state via parent (triggers state events)
7975
const jsDoc = Automerge.toJS(newDoc);
80-
super.setState(draft => {
81-
// Directly assign each property to ensure reactivity
82-
(Object.keys(jsDoc) as Array<keyof TDoc>).forEach(key => {
83-
(draft as TDoc)[key] = jsDoc[key];
84-
});
85-
});
8676

8777
// 3. Generate and send sync messages for peers
8878
if (changes.length > 0) {
8979
const peers = this.deps.connection.state.connectedPeers;
80+
console.log(`[CollaborationActor] 📝 Broadcasting ${changes.length} change(s) to ${peers.length} peer(s)`);
9081
for (const peerId of peers) {
9182
const syncMsg = this.generateSyncMessageForPeer(peerId);
9283
if (syncMsg) {
@@ -95,18 +86,51 @@ export class CollaborationActor<TDoc extends Record<string, any> = Record<string
9586
}
9687
}
9788
}
89+
90+
return super.setState(draft => {
91+
// Directly assign each property to ensure reactivity
92+
(Object.keys(jsDoc) as Array<keyof TDoc>).forEach(key => {
93+
(draft as TDoc)[key] = jsDoc[key];
94+
});
95+
});
9896
}
9997

10098
// ========================================
10199
// Effects: React to connection events
102100
// ========================================
103101

102+
/**
103+
* Handle room joined event to initialize Automerge document.
104+
* First peer in room creates doc from current state, others start empty.
105+
*/
106+
@effect('connection.roomJoined')
107+
private handleRoomJoined(payload: RoomJoinedPayload): void {
108+
// Joining peer - start empty, receive via sync
109+
if (payload.peerIds.length > 0) {
110+
this.automergeDoc = Automerge.init();
111+
console.log(`[CollaborationActor] 👋 Joining peer - initialized empty Automerge doc (${payload.peerIds.length} existing peer(s))`);
112+
return;
113+
}
114+
115+
// First peer - initialize from current state
116+
this.automergeDoc = Automerge.from(this.state as TDoc);
117+
console.log('[CollaborationActor] 🎉 First peer - initialized Automerge doc from current state');
118+
}
119+
104120
/**
105121
* Handle incoming CRDT sync messages from peers.
106122
* Applies remote changes and generates response if needed.
107123
*/
108124
@effect('connection.messageReceived')
109125
private handleIncomingMessage({ peerId, message }: MessagePayload): void {
126+
console.log(`[CollaborationActor] 📨 Received sync message from ${peerId} (${message.length} bytes)`);
127+
128+
// Guard: Automerge doc must be initialized before receiving sync messages
129+
if (!this.automergeDoc) {
130+
console.warn('[CollaborationActor] ⚠️ Received sync message before Automerge doc initialized - dropping message');
131+
return;
132+
}
133+
110134
const syncState = this.syncStates.get(peerId) || Automerge.initSyncState();
111135

112136
const [newDoc, newSyncState] = Automerge.receiveSyncMessage(
@@ -119,6 +143,7 @@ export class CollaborationActor<TDoc extends Record<string, any> = Record<string
119143

120144
// Document changed - update via parent setState (skip sync broadcast)
121145
if (newDoc !== this.automergeDoc) {
146+
console.log(`[CollaborationActor] ✅ Document updated from ${peerId}`);
122147
this.automergeDoc = newDoc;
123148
const jsDoc = Automerge.toJS(newDoc);
124149
super.setState(draft => {
@@ -127,6 +152,8 @@ export class CollaborationActor<TDoc extends Record<string, any> = Record<string
127152
(draft as TDoc)[key] = jsDoc[key];
128153
});
129154
});
155+
} else {
156+
console.log(`[CollaborationActor] ℹ️ No document changes from ${peerId}`);
130157
}
131158

132159
// Generate response if needed
@@ -136,9 +163,12 @@ export class CollaborationActor<TDoc extends Record<string, any> = Record<string
136163
);
137164

138165
if (responseMsg) {
166+
console.log(`[CollaborationActor] 📤 Sending sync response to ${peerId} (${responseMsg.length} bytes)`);
139167
this.syncStates.set(peerId, nextSyncState);
140168
// PeerMessagingActor handles routing
141169
this.deps.connection.actions.sendTo(peerId, responseMsg);
170+
} else {
171+
console.log(`[CollaborationActor] ℹ️ No sync response needed for ${peerId}`);
142172
}
143173
}
144174

@@ -148,17 +178,29 @@ export class CollaborationActor<TDoc extends Record<string, any> = Record<string
148178
*/
149179
@effect('connection.peerConnected')
150180
private initSyncWithPeer(peerId: string): void {
181+
console.log(`[CollaborationActor] 🔄 Peer connected, initiating sync with: ${peerId}`);
182+
183+
// Guard: Automerge doc must be initialized before syncing with peers
184+
if (!this.automergeDoc) {
185+
console.warn('[CollaborationActor] ⚠️ Peer connected before Automerge doc initialized - skipping sync');
186+
return;
187+
}
188+
151189
const syncState = Automerge.initSyncState();
152190
const [newSyncState, message] = Automerge.generateSyncMessage(
153191
this.automergeDoc,
154192
syncState
155193
);
156194

157-
if (message) {
158-
this.syncStates.set(peerId, newSyncState);
159-
// PeerMessagingActor handles routing
160-
this.deps.connection.actions.sendTo(peerId, message);
195+
if (!message) {
196+
console.log(`[CollaborationActor] ⚠️ No sync message generated for ${peerId}`);
197+
return;
161198
}
199+
200+
console.log(`[CollaborationActor] 📤 Sending initial sync message to ${peerId} (${message.length} bytes)`);
201+
this.syncStates.set(peerId, newSyncState);
202+
// PeerMessagingActor handles routing
203+
this.deps.connection.actions.sendTo(peerId, message);
162204
}
163205

164206
/**
@@ -181,6 +223,11 @@ export class CollaborationActor<TDoc extends Record<string, any> = Record<string
181223
* @returns Sync message or null if no sync needed
182224
*/
183225
private generateSyncMessageForPeer(peerId: string): Uint8Array | null {
226+
// Guard: Automerge doc must be initialized
227+
if (!this.automergeDoc) {
228+
return null;
229+
}
230+
184231
const syncState = this.syncStates.get(peerId) || Automerge.initSyncState();
185232
const [newSyncState, message] = Automerge.generateSyncMessage(
186233
this.automergeDoc,

packages/collaboration/src/PeerMessagingActor.ts

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type {
44
PeerMessagingEvents,
55
MessagePayload,
66
SignalingPayload,
7+
RoomJoinedPayload,
78
} from './types';
89
import type { WebRTCActor } from './WebRTCActor';
910
import type { WebSocketActor } from './WebSocketActor';
@@ -58,22 +59,29 @@ export class PeerMessagingActor extends Actor<PeerMessagingState, PeerMessagingE
5859
sendTo(peerId: string, message: Uint8Array): void {
5960
const transport = this.state.peerTransports[peerId];
6061

62+
console.log(`[PeerMessagingActor] 📨 sendTo called for peer: ${peerId}, transport: ${transport}, message: ${message.length} bytes`);
63+
6164
if (!transport) {
65+
console.warn(`[PeerMessagingActor] ⚠️ No transport found for peer: ${peerId} - message dropped!`);
6266
return;
6367
}
6468

6569
// Check WebRTC connection state before attempting to use it
6670
if (transport === 'webrtc') {
6771
const webrtcState = this.deps.webrtc.state.peerConnectionStates[peerId];
72+
console.log(`[PeerMessagingActor] 🔀 Transport is WebRTC, state: ${webrtcState}`);
6873
if (webrtcState === 'connected') {
74+
console.log(`[PeerMessagingActor] ⚡ Sending via WebRTC to ${peerId}`);
6975
this.deps.webrtc.actions.sendTo(peerId, message);
7076
return;
7177
}
7278
// WebRTC not connected, fallback to WebSocket
79+
console.log(`[PeerMessagingActor] 🔄 WebRTC not ready, falling back to WebSocket for ${peerId}`);
7380
this.deps.websocket.actions.sendTo(peerId, message);
7481
return;
7582
}
7683

84+
console.log(`[PeerMessagingActor] 🌐 Sending via WebSocket to ${peerId}`);
7785
this.deps.websocket.actions.sendTo(peerId, message);
7886
}
7987

@@ -93,25 +101,55 @@ export class PeerMessagingActor extends Actor<PeerMessagingState, PeerMessagingE
93101
// Effects: WebSocket events
94102
// ========================================
95103

104+
/**
105+
* Handle room joined via WebSocket.
106+
* Emits roomJoined event, then adds existing peers to state WITHOUT initiating WebRTC.
107+
* The existing peers will initiate the connection when they receive peer-joined.
108+
*/
109+
@effect('websocket.roomJoined')
110+
private async handleWebSocketRoomJoined(payload: RoomJoinedPayload): Promise<void> {
111+
console.log(`[PeerMessagingActor] 🏠 Joined room with ${payload.peerIds.length} existing peer(s) - NOT initiating WebRTC`);
112+
113+
// Emit roomJoined event for dependent actors
114+
this.emit('roomJoined', payload);
115+
116+
// Add all existing peers to state
117+
for (const peerId of payload.peerIds) {
118+
if (!this.state.connectedPeers.includes(peerId)) {
119+
await this.setState(draft => {
120+
draft.connectedPeers.push(peerId);
121+
draft.peerTransports[peerId] = 'websocket';
122+
});
123+
124+
// Emit after state is committed
125+
this.emit('peerConnected', peerId);
126+
}
127+
}
128+
// Do NOT initiate WebRTC - let the existing peers initiate to us
129+
}
130+
96131
/**
97132
* Handle peer joined via WebSocket.
98133
* Adds peer to state with WebSocket transport and emits peerConnected.
99134
* Also initiates WebRTC connection as the initiator.
100135
*/
101136
@effect('websocket.peerJoined')
102-
private handleWebSocketPeerJoined(peerId: string): void {
137+
private async handleWebSocketPeerJoined(peerId: string): Promise<void> {
103138
if (this.state.connectedPeers.includes(peerId)) {
104139
return;
105140
}
106141

107-
this.setState(draft => {
142+
console.log(`[PeerMessagingActor] 👋 New peer joined: ${peerId} - initiating WebRTC as existing peer`);
143+
144+
await this.setState(draft => {
108145
draft.connectedPeers.push(peerId);
109146
draft.peerTransports[peerId] = 'websocket';
110147
});
111148

149+
// Emit after state is committed
112150
this.emit('peerConnected', peerId);
113151

114-
// Initiate WebRTC connection
152+
// Initiate WebRTC connection as the initiator (we're the existing peer)
115153
this.deps.webrtc.actions.connectToPeer(peerId);
116154
}
117155

@@ -143,6 +181,7 @@ export class PeerMessagingActor extends Actor<PeerMessagingState, PeerMessagingE
143181
*/
144182
@effect('websocket.signalingMessage')
145183
private handleWebSocketSignaling(payload: SignalingPayload): void {
184+
console.log(`[PeerMessagingActor] 🔀 Forwarding WebSocket signaling to WebRTC for peer: ${payload.peerId}`);
146185
this.deps.webrtc.actions.handleSignaling(payload);
147186
}
148187

@@ -182,6 +221,8 @@ export class PeerMessagingActor extends Actor<PeerMessagingState, PeerMessagingE
182221

183222
const previousTransport = this.state.peerTransports[peerId];
184223

224+
console.log(`[PeerMessagingActor] 🚀 Upgrading transport to WebRTC for peer: ${peerId} (was: ${previousTransport})`);
225+
185226
this.setState(draft => {
186227
draft.peerTransports[peerId] = 'webrtc';
187228
});
@@ -219,6 +260,7 @@ export class PeerMessagingActor extends Actor<PeerMessagingState, PeerMessagingE
219260
*/
220261
@effect('webrtc.signalingData')
221262
private handleWebRTCSignaling(payload: SignalingPayload): void {
263+
console.log(`[PeerMessagingActor] 🔀 Forwarding WebRTC signaling to WebSocket for peer: ${payload.peerId}`);
222264
this.deps.websocket.actions.sendSignal(payload.peerId, payload.data);
223265
}
224266

packages/collaboration/src/WebRTCActor.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,13 @@ export class WebRTCActor extends Actor<WebRTCState, WebRTCEvents> {
164164

165165
// Connection closed
166166
peer.on('close', () => {
167+
console.log(`[WebRTCActor] 🔌 WebRTC connection closed for peer: ${peerId}`);
167168
this.handlePeerConnectionClosed(peerId);
168169
});
169170

170171
// Connection error
171-
peer.on('error', (_error: Error) => {
172+
peer.on('error', (error: Error) => {
173+
console.error(`[WebRTCActor] ❌ WebRTC error for peer ${peerId}:`, error);
172174
this.setState(draft => {
173175
draft.peerConnectionStates[peerId] = 'failed';
174176
});

packages/collaboration/src/WebSocketActor.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,12 @@ export class WebSocketActor extends Actor<WebSocketState, WebSocketEvents> {
135135
@action
136136
sendTo(peerId: string, message: Uint8Array): void {
137137
if (!this.socket?.connected) {
138+
console.warn('[WebSocketActor] ⚠️ Cannot send CRDT message - socket not connected');
138139
return;
139140
}
140141

142+
console.log(`[WebSocketActor] 📤 Sending CRDT sync message to peer: ${peerId} (${message.length} bytes)`);
143+
141144
this.socket.emit('sync-message', {
142145
to: peerId,
143146
message: Array.from(message), // Convert Uint8Array to regular array for JSON serialization
@@ -190,11 +193,6 @@ export class WebSocketActor extends Actor<WebSocketState, WebSocketEvents> {
190193
peerId: data.peerId,
191194
peerIds: data.peers,
192195
});
193-
194-
// Emit peerJoined for each existing peer
195-
for (const peerId of data.peers) {
196-
this.emit('peerJoined', peerId);
197-
}
198196
});
199197

200198
this.socket.on('peer-joined', (peerId: string) => {
@@ -217,6 +215,7 @@ export class WebSocketActor extends Actor<WebSocketState, WebSocketEvents> {
217215
this.socket.on('sync-message', (data: { from: string; message: number[] }) => {
218216
// Convert array back to Uint8Array
219217
const message = new Uint8Array(data.message);
218+
console.log(`[WebSocketActor] 📥 Received CRDT sync message from peer: ${data.from} (${message.length} bytes)`);
220219
this.emit('messageReceived', {
221220
peerId: data.from,
222221
message,

packages/collaboration/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export interface TransportChangedPayload {
3131
}
3232

3333
export interface PeerMessagingEvents {
34+
roomJoined: RoomJoinedPayload;
3435
peerConnected: string;
3536
peerDisconnected: string;
3637
transportChanged: TransportChangedPayload;

0 commit comments

Comments
 (0)