forked from apollographql/apollo-ios
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWebSocketTransport.swift
More file actions
389 lines (321 loc) · 12.8 KB
/
WebSocketTransport.swift
File metadata and controls
389 lines (321 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
import Apollo
import Starscream
// To allow for alternative implementations supporting the same WebSocketClient protocol
public class ApolloWebSocket: WebSocket, ApolloWebSocketClient {
required public convenience init(request: URLRequest, protocols: [String]? = nil) {
self.init(request: request, protocols: protocols, stream: FoundationStream())
}
}
public protocol ApolloWebSocketClient: WebSocketClient {
init(request: URLRequest, protocols: [String]?)
}
public protocol WebSocketTransportDelegate: class {
func webSocketTransportDidConnect(_ webSocketTransport: WebSocketTransport)
func webSocketTransportDidReconnect(_ webSocketTransport: WebSocketTransport)
func webSocketTransport(_ webSocketTransport: WebSocketTransport, didDisconnectWithError error:Error?)
}
public extension WebSocketTransportDelegate {
func webSocketTransportDidConnect(_ webSocketTransport: WebSocketTransport) {}
func webSocketTransportDidReconnect(_ webSocketTransport: WebSocketTransport) {}
func webSocketTransport(_ webSocketTransport: WebSocketTransport, didDisconnectWithError error:Error?) {}
}
/// A network transport that uses web sockets requests to send GraphQL subscription operations to a server, and that uses the Starscream implementation of web sockets.
public class WebSocketTransport: NetworkTransport, WebSocketDelegate {
public static var provider : ApolloWebSocketClient.Type = ApolloWebSocket.self
public weak var delegate: WebSocketTransportDelegate?
var reconnect = false
var websocket: ApolloWebSocketClient
var error: Error? = nil
let serializationFormat = JSONSerializationFormat.self
private final let protocols = ["graphql-ws"]
private var acked = false
private var queue: [Int: String] = [:]
private var connectingPayload: GraphQLMap?
private var subscribers = [String: (JSONObject?, Error?) -> Void]()
private var subscriptions : [String: String] = [:]
private let sendOperationIdentifiers: Bool
private let reconnectionInterval: TimeInterval
fileprivate var sequenceNumber = 0
fileprivate var reconnected = false
public init(request: URLRequest, sendOperationIdentifiers: Bool = false, reconnectionInterval: TimeInterval = 0.5, connectingPayload: GraphQLMap? = [:]) {
self.connectingPayload = connectingPayload
self.sendOperationIdentifiers = sendOperationIdentifiers
self.reconnectionInterval = reconnectionInterval
self.websocket = WebSocketTransport.provider.init(request: request, protocols: protocols)
self.websocket.delegate = self
self.websocket.connect()
}
public func send<Operation>(operation: Operation, fetchOptions: FetchOptions, completionHandler: @escaping (_ response: GraphQLResponse<Operation>?, _ error: Error?) -> Void) -> Cancellable {
if let error = self.error {
completionHandler(nil,error)
}
return WebSocketTask(self,operation) { (body, error) in
if let body = body {
let response = GraphQLResponse(operation: operation, body: body)
completionHandler(response,error)
} else {
completionHandler(nil,error)
}
}
}
public func isConnected() -> Bool {
return websocket.isConnected
}
private func processMessage(socket: WebSocketClient, text: String) {
OperationMessage(serialized: text).parse { (type, id, payload, error) in
guard let type = type, let messageType = OperationMessage.Types(rawValue: type) else {
notifyErrorAllHandlers(WebSocketError(payload: payload, error: error, kind: .unprocessedMessage(text)))
return
}
switch(messageType) {
case .data, .error:
if let id = id, let responseHandler = subscribers[id] {
responseHandler(payload,error)
} else {
notifyErrorAllHandlers(WebSocketError(payload: payload, error: error, kind: .unprocessedMessage(text)))
}
case .complete:
if let id = id {
// remove the callback if NOT a subscription
if subscriptions[id] == nil {
subscribers.removeValue(forKey: id)
}
} else {
notifyErrorAllHandlers(WebSocketError(payload: payload, error: error, kind: .unprocessedMessage(text)))
}
case .connectionAck:
acked = true
writeQueue()
case .connectionKeepAlive:
writeQueue()
case .connectionInit, .connectionTerminate, .start, .stop, .connectionError:
notifyErrorAllHandlers(WebSocketError(payload: payload, error: error, kind: .unprocessedMessage(text)))
}
}
}
private func notifyErrorAllHandlers(_ error: Error) {
for (_, handler) in subscribers {
handler(nil,error)
}
}
private func writeQueue() {
guard !self.queue.isEmpty else {
return
}
let queue = self.queue.sorted(by: { $0.0 < $1.0 })
self.queue.removeAll()
for (id, msg) in queue {
self.write(msg,id: id)
}
}
private func processMessage(socket: WebSocketClient, data: Data) {
print("WebSocketTransport::unprocessed event \(data)")
}
public func websocketDidConnect(socket: WebSocketClient) {
self.error = nil
initServer()
if reconnected {
self.delegate?.webSocketTransportDidReconnect(self)
// re-send the subscriptions whenever we are re-connected
// for the first connect, any subscriptions are already in queue
for (_,msg) in self.subscriptions {
write(msg)
}
} else {
self.delegate?.webSocketTransportDidConnect(self)
}
reconnected = true
}
public func websocketDidDisconnect(socket: WebSocketClient, error: Error?) {
// report any error to all subscribers
if let error = error {
self.error = WebSocketError(payload: nil, error: error, kind: .networkError)
for (_, responseHandler) in subscribers {
responseHandler(nil,error)
}
} else {
self.error = nil
}
self.delegate?.webSocketTransport(self, didDisconnectWithError: self.error)
acked = false // need new connect and ack before sending
if reconnect {
DispatchQueue.main.asyncAfter(deadline: .now() + reconnectionInterval) {
self.websocket.connect();
}
}
}
public func websocketDidReceiveMessage(socket: WebSocketClient, text: String) {
processMessage(socket: socket, text: text)
}
public func websocketDidReceiveData(socket: WebSocketClient, data: Data) {
processMessage(socket: socket, data: data)
}
public func initServer(reconnect: Bool = true) {
self.reconnect = reconnect
self.acked = false
if let str = OperationMessage(payload: self.connectingPayload, type: .connectionInit).rawMessage {
write(str, force:true)
}
}
public func closeConnection() {
self.reconnect = false
if let str = OperationMessage(type: .connectionTerminate).rawMessage {
write(str)
}
self.queue.removeAll()
self.subscriptions.removeAll()
}
private func write(_ str: String, force forced: Bool = false, id: Int? = nil) {
if websocket.isConnected && (acked || forced) {
websocket.write(string: str)
} else {
// using sequence number to make sure that the queue is processed correctly
// either using the earlier assigned id or with the next higher key
if let id = id {
queue[id] = str
} else if let id = queue.keys.max() {
queue[id+1] = str
} else {
queue[1] = str
}
}
}
deinit {
websocket.disconnect()
websocket.delegate = nil
}
fileprivate func nextSequenceNumber() -> Int {
sequenceNumber += 1
return sequenceNumber
}
fileprivate func sendHelper<Operation: GraphQLOperation>(operation: Operation, resultHandler: @escaping (_ response: JSONObject?, _ error: Error?) -> Void) -> String? {
let body = requestBody(for: operation)
let sequenceNumber = "\(nextSequenceNumber())"
guard let message = OperationMessage(payload: body, id: sequenceNumber).rawMessage else {
return nil
}
write(message)
subscribers[sequenceNumber] = resultHandler
if operation.operationType == .subscription {
subscriptions[sequenceNumber] = message
}
return sequenceNumber
}
private func requestBody<Operation: GraphQLOperation>(for operation: Operation) -> GraphQLMap {
if sendOperationIdentifiers {
guard let operationIdentifier = operation.operationIdentifier else {
preconditionFailure("To send operation identifiers, Apollo types must be generated with operationIdentifiers")
}
return ["id": operationIdentifier, "variables": operation.variables]
}
return ["query": operation.queryDocument, "variables": operation.variables]
}
public func unsubscribe(_ subscriptionId: String) {
if let str = OperationMessage(id: subscriptionId, type: .stop).rawMessage {
write(str)
}
subscribers.removeValue(forKey: subscriptionId)
subscriptions.removeValue(forKey: subscriptionId)
}
fileprivate final class WebSocketTask<Operation: GraphQLOperation> : Cancellable {
let sequenceNumber : String?
let transport: WebSocketTransport
init(_ ws: WebSocketTransport, _ operation: Operation, _ completionHandler: @escaping (_ response: JSONObject?, _ error: Error?) -> Void) {
sequenceNumber = ws.sendHelper(operation: operation, resultHandler: completionHandler)
transport = ws
}
public func cancel() {
if let sequenceNumber = sequenceNumber {
transport.unsubscribe(sequenceNumber)
}
}
// unsubscribe same as cancel
public func unsubscribe() {
cancel()
}
}
fileprivate final class OperationMessage {
enum Types : String {
case connectionInit = "connection_init" // Client -> Server
case connectionTerminate = "connection_terminate" // Client -> Server
case start = "start" // Client -> Server
case stop = "stop" // Client -> Server
case connectionAck = "connection_ack" // Server -> Client
case connectionError = "connection_error" // Server -> Client
case connectionKeepAlive = "ka" // Server -> Client
case data = "data" // Server -> Client
case error = "error" // Server -> Client
case complete = "complete" // Server -> Client
}
let serializationFormat = JSONSerializationFormat.self
var message: GraphQLMap = [:]
var serialized: String?
var rawMessage : String? {
let serialized = try! serializationFormat.serialize(value: message)
if let str = String(data: serialized, encoding: .utf8) {
return str
} else {
return nil
}
}
init(payload: GraphQLMap? = nil, id: String? = nil, type: Types = .start) {
if let payload = payload {
message += ["payload": payload]
}
if let id = id {
message += ["id": id]
}
message += ["type": type.rawValue]
}
init(serialized: String) {
self.serialized = serialized
}
func parse(handler: (_ type: String?, _ id: String?, _ payload: JSONObject?, _ error: Error?) -> Void) {
guard let serialized = self.serialized else {
handler(nil, nil, nil, WebSocketError(payload: nil, error: nil, kind: .serializedMessageError))
return
}
guard let data = self.serialized?.data(using: (.utf8) ) else {
handler(nil, nil, nil, WebSocketError(payload: nil, error: nil, kind: .unprocessedMessage(serialized)))
return
}
var type : String?
var id : String?
var payload : JSONObject?
do {
let json = try JSONSerializationFormat.deserialize(data: data ) as? JSONObject
id = json?["id"] as? String
type = json?["type"] as? String
payload = json?["payload"] as? JSONObject
handler(type,id,payload,nil)
}
catch {
handler(type, id, payload, WebSocketError(payload: payload, error: error, kind: .unprocessedMessage(serialized)))
}
}
}
}
public struct WebSocketError: Error, LocalizedError {
public enum ErrorKind {
case errorResponse
case networkError
case unprocessedMessage(String)
case serializedMessageError
var description: String {
switch self {
case .errorResponse:
return "Received error response"
case .networkError:
return "Websocket network error"
case .unprocessedMessage(let message):
return "Websocket error: Unprocessed message \(message)"
case .serializedMessageError:
return "Websocket error: Serialized message not found"
}
}
}
/// The payload of the response.
public let payload: JSONObject?
public let error: Error?
public let kind: ErrorKind
}