@@ -25,6 +25,9 @@ public class WebSocketTransport {
2525 private var messageQueue : [ Int : String ] = [ : ]
2626 private var connectingPayload : GraphQLMap ?
2727
28+ private let processingQueue = DispatchQueue ( label: " com.apollographql.websocket-processing " )
29+ private let lock = NSLock ( )
30+
2831 private var subscribers = [ String : ( JSONObject ? , Error ? ) -> Void ] ( )
2932 private var subscriptions : [ String : String ] = [ : ]
3033
@@ -63,14 +66,18 @@ public class WebSocketTransport {
6366 if let id = id {
6467 // remove the callback if NOT a subscription
6568 if subscriptions [ id] == nil {
69+ lock. lock ( )
6670 subscribers. removeValue ( forKey: id)
71+ lock. unlock ( )
6772 }
6873 } else {
6974 notifyErrorAllHandlers ( WebSocketError ( payload: payload, error: error, kind: . unprocessedMessage( text) ) )
7075 }
7176
7277 case . connectionAck:
78+ lock. lock ( )
7379 acked = true
80+ lock. unlock ( )
7481 processWriteQueue ( )
7582
7683 case . connectionKeepAlive:
@@ -89,10 +96,16 @@ public class WebSocketTransport {
8996 }
9097
9198 private func processWriteQueue( ) {
92- guard !self . messageQueue. isEmpty else { return }
99+ lock. lock ( )
100+ guard !self . messageQueue. isEmpty else {
101+ lock. unlock ( )
102+ return
103+ }
93104
94105 let queue = self . messageQueue. sorted ( by: { $0. 0 < $1. 0 } )
95106 self . messageQueue. removeAll ( )
107+ lock. unlock ( )
108+
96109 for (id, msg) in queue {
97110 write ( msg, id: id)
98111 }
@@ -154,10 +167,12 @@ public class WebSocketTransport {
154167 if let str = OperationMessage ( payload: body, id: sequenceNumber) . rawMessage {
155168 write ( str)
156169
170+ lock. lock ( )
157171 subscribers [ sequenceNumber] = resultHandler
158172 if operation. operationType == . subscription {
159173 subscriptions [ sequenceNumber] = str
160174 }
175+ lock. unlock ( )
161176
162177 return sequenceNumber
163178 }
@@ -180,8 +195,10 @@ public class WebSocketTransport {
180195 if let str = OperationMessage ( id: subscriptionId, type: . stop) . rawMessage {
181196 write ( str)
182197 }
198+ lock. lock ( )
183199 subscribers. removeValue ( forKey: subscriptionId)
184200 subscriptions. removeValue ( forKey: subscriptionId)
201+ lock. unlock ( )
185202 }
186203
187204 fileprivate final class WebSocketTask < Operation: GraphQLOperation > : Cancellable {
@@ -289,15 +306,17 @@ extension WebSocketTransport: NetworkTransport {
289306
290307extension WebSocketTransport : WebSocketDelegate {
291308 public func websocketDidConnect( socket: WebSocketClient ) {
292- initServer ( )
293- if reconnected {
294- // re-send the subscriptions whenever we are re-connected
295- // for the first connect, any subscriptions are already in queue
296- for (_, msg) in self . subscriptions {
297- write ( msg)
309+ processingQueue. async {
310+ self . initServer ( )
311+ if self . reconnected {
312+ // re-send the subscriptions whenever we are re-connected
313+ // for the first connect, any subscriptions are already in queue
314+ for (_, msg) in self . subscriptions {
315+ self . write ( msg)
316+ }
298317 }
318+ self . reconnected = true
299319 }
300- reconnected = true
301320 }
302321
303322 public func websocketDidDisconnect( socket: WebSocketClient , error: Error ? ) {
@@ -309,19 +328,27 @@ extension WebSocketTransport: WebSocketDelegate {
309328 }
310329 }
311330
312- acked = false // need new connect and ack before sending
331+ lock. lock ( )
332+ self . acked = false // need new connect and ack before sending
333+ lock. unlock ( )
313334
314- if ( reconnect) {
315- websocket. connect ( ) ;
335+ processingQueue. async {
336+ if self . reconnect {
337+ self . websocket. connect ( ) ;
338+ }
316339 }
317340 }
318341
319342 public func websocketDidReceiveMessage( socket: WebSocketClient , text: String ) {
320- processMessage ( socket: socket, text: text)
343+ processingQueue. async {
344+ self . processMessage ( socket: socket, text: text)
345+ }
321346 }
322347
323348 public func websocketDidReceiveData( socket: WebSocketClient , data: Data ) {
324- processMessage ( socket: socket, data: data)
349+ processingQueue. async {
350+ self . processMessage ( socket: socket, data: data)
351+ }
325352 }
326353}
327354
0 commit comments