-
Notifications
You must be signed in to change notification settings - Fork 749
Expand file tree
/
Copy pathWebSocketTransport.swift
More file actions
829 lines (714 loc) · 31.9 KB
/
WebSocketTransport.swift
File metadata and controls
829 lines (714 loc) · 31.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
@_spi(Execution) import Apollo
@_spi(Unsafe) import ApolloAPI
import Foundation
public actor WebSocketTransport: SubscriptionNetworkTransport, NetworkTransport {
typealias OperationID = String
public enum Error: Swift.Error {
/// The received WebSocket message could not be parsed as a valid `graphql-transport-ws` message.
case unrecognizedMessage
/// The WebSocket connection was closed before the server acknowledged the connection.
case connectionClosed
/// The server sent one or more GraphQL errors for an operation.
case graphQLErrors([GraphQLError])
}
/// Configuration options for ``WebSocketTransport``.
public struct Configuration: Sendable {
/// The interval to wait before attempting to reconnect after a disconnect.
///
/// - A value of `0` means reconnect immediately with no delay.
/// - A negative value (e.g. `-1`) disables auto-reconnection entirely.
///
/// When auto-reconnection is enabled and the connection drops while there are active
/// subscribers, the transport will automatically reconnect and re-subscribe all active
/// operations on the new connection. Subscriber streams are kept alive across the
/// reconnection — callers are unaware the disconnect happened.
///
/// Default: `-1` (disabled).
public var reconnectionInterval: TimeInterval
/// Whether auto-reconnection is enabled based on the configuration.
public var isReconnectEnabled: Bool {
reconnectionInterval >= 0
}
/// The request body creator used to build the JSON payload for `subscribe` messages.
///
/// This is always called with `sendQueryDocument: true` and `autoPersistQuery: false`.
/// The default ``DefaultRequestBodyCreator`` includes `clientAwarenessMetadata` in the
/// payload when available.
public var requestBodyCreator: any JSONRequestBodyCreator
/// The payload to send on connection. Defaults to `nil`.
public var connectingPayload: JSONEncodableDictionary?
/// The ``OperationMessageIdCreator`` used to generate a unique message identifier per
/// operation. Defaults to ``ApolloSequencedOperationMessageIdCreator``.
public var operationMessageIdCreator: any OperationMessageIdCreator
/// The interval at which the client sends ping messages to the server as a keepalive.
///
/// Some servers require clients to send periodic pings and will drop the connection
/// if they don't receive one within a certain timeframe.
///
/// - A positive value (e.g. `20`) sends a ping every 20 seconds while connected.
/// - A value of `nil` disables client-initiated pings (the default).
///
/// Pings are only sent after `connection_ack` is received. The timer is stopped on
/// disconnect or pause, and restarted on reconnect.
///
/// Default: `nil` (disabled).
public var pingInterval: TimeInterval?
/// Metadata used by GraphOS Studio's
/// [client awareness](https://www.apollographql.com/docs/graphos/platform/insights/client-segmentation)
/// feature.
///
/// When set, the client application name and version are sent as HTTP headers
/// (`apollographql-client-name`, `apollographql-client-version`) on the WebSocket
/// connection request.
///
/// Default: `ClientAwarenessMetadata()` (includes Apollo library awareness headers).
public var clientAwarenessMetadata: ClientAwarenessMetadata
public init(
reconnectionInterval: TimeInterval = -1,
requestBodyCreator: any JSONRequestBodyCreator = DefaultRequestBodyCreator(),
connectingPayload: JSONEncodableDictionary? = nil,
operationMessageIdCreator: any OperationMessageIdCreator = ApolloSequencedOperationMessageIdCreator(),
pingInterval: TimeInterval? = nil,
clientAwarenessMetadata: ClientAwarenessMetadata = ClientAwarenessMetadata()
) {
self.reconnectionInterval = reconnectionInterval
self.requestBodyCreator = requestBodyCreator
self.connectingPayload = connectingPayload
self.operationMessageIdCreator = operationMessageIdCreator
self.pingInterval = pingInterval
self.clientAwarenessMetadata = clientAwarenessMetadata
}
}
struct Constants {
static let headerWSProtocolName = "Sec-WebSocket-Protocol"
static let headerWSProtocolValue = "graphql-transport-ws"
}
enum ConnectionState {
case notStarted
case connecting
case connected
case disconnected
/// The connection was intentionally paused by the caller. The underlying WebSocket is
/// closed, but subscription streams remain alive for seamless resumption.
case paused
}
/// The delegate that receives lifecycle events from this transport.
///
/// Delegate methods are `isolated` to this actor, meaning they execute within the
/// transport's isolation domain. The transport awaits each delegate call before
/// continuing its receive loop.
public weak var delegate: (any WebSocketTransportDelegate)?
/// Sets the delegate that receives lifecycle events from this transport.
public func setDelegate(_ delegate: (any WebSocketTransportDelegate)?) {
self.delegate = delegate
}
public let urlSession: WebSocketURLSession
public let store: ApolloStore
public private(set) var configuration: Configuration
private var request: URLRequest
private var connection: WebSocketConnection
var connectionState: ConnectionState = .notStarted
/// Tracks whether the transport has ever successfully connected. Used to distinguish
/// initial connection from reconnection for delegate callbacks.
private var hasBeenConnected = false
private var subscriberRegistry: SubscriberRegistry
private var connectionWaiters = ConnectionWaiterQueue()
/// The task that periodically sends client-initiated ping messages.
/// Created after `connection_ack` and cancelled on disconnect/pause.
private var pingTimerTask: Task<Void, Never>?
/// The number of active subscribers. Exposed for test assertions.
var subscriberCount: Int { subscriberRegistry.count }
public init(
urlSession: WebSocketURLSession,
store: ApolloStore,
endpointURL: URL,
configuration: Configuration = Configuration()
) throws {
self.urlSession = urlSession
self.store = store
self.configuration = configuration
self.request = try Self.createURLRequest(
endpointURL: endpointURL,
clientAwarenessMetadata: configuration.clientAwarenessMetadata
)
self.connection = WebSocketConnection(task: urlSession.webSocketTask(with: request))
self.subscriberRegistry = SubscriberRegistry(
operationMessageIdCreator: configuration.operationMessageIdCreator
)
}
// MARK: - Request Setup
private static func createURLRequest(
endpointURL: URL,
clientAwarenessMetadata: ClientAwarenessMetadata
) throws -> URLRequest {
var request = URLRequest(url: endpointURL)
request.setValue(Constants.headerWSProtocolValue, forHTTPHeaderField: Constants.headerWSProtocolName)
clientAwarenessMetadata.applyHeaders(to: &request)
return request
}
// MARK: - Connection Management
/// Ensures the WebSocket connection is established before returning.
///
/// Handles all connection states:
/// - `notStarted`: Opens the connection and waits for `connection_ack`.
/// - `connecting`: Waits for the in-progress connection to receive `connection_ack`.
/// - `connected`: Returns immediately.
/// - `disconnected`: Creates a fresh connection and waits for `connection_ack`.
/// - `paused`: Waits for `resume()` to re-establish the connection.
private func ensureConnected() async throws {
switch connectionState {
case .notStarted:
connectionState = .connecting
startConnectionReceiveLoop()
try await waitForConnectionAck()
case .connecting:
try await waitForConnectionAck()
case .connected:
return
case .disconnected:
connection = WebSocketConnection(task: urlSession.webSocketTask(with: request))
connectionState = .connecting
startConnectionReceiveLoop()
try await waitForConnectionAck()
case .paused:
try await waitForConnectionAck()
}
}
/// Spawns a task that iterates the connection's message stream and routes incoming messages.
///
/// When the stream terminates (normally or with error), the behavior depends on state:
/// - If the connection was previously `connected`, there are active subscribers, and
/// auto-reconnection is enabled: attempts reconnection without terminating subscriber streams.
/// - Otherwise: transitions to `disconnected`, fails pending connection waiters, and finishes
/// all subscriber streams.
private func startConnectionReceiveLoop() {
/// Keeps a reference to the connection the receive loop was opened on. If a reconnect occurs,
/// `self.connection` will be a new connection and we should ignore disconnection events for this loop.
let loopConnection = self.connection
let connectionStream = self.connection.openConnection(
connectingPayload: configuration.connectingPayload
)
Task {
do {
for try await message in connectionStream {
didReceive(message: message)
}
guard self.connection === loopConnection else { return }
await handleDisconnection()
} catch {
guard self.connection === loopConnection else { return }
// Use Task.isCancelled to distinguish genuine task cancellation from
// connection errors. The WebSocket task's receive() may throw errors
// (including CancellationError) when the connection closes, which should
// be treated as a disconnection — not as task cancellation.
await handleDisconnection(error: Task.isCancelled ? nil : error)
}
}
}
/// Handles a disconnection from the receive loop.
///
/// When `error` is nil, this is a normal disconnection (stream ended cleanly or task was
/// cancelled). When non-nil, the error is forwarded to connection waiters and subscribers
/// if reconnection is not attempted.
///
/// One-shot operations (queries and mutations) are always terminated immediately on
/// disconnect — they should never be retried across a reconnection, as replaying a
/// mutation could cause duplicate side effects.
private func handleDisconnection(error: (any Swift.Error)? = nil) async {
guard connectionState != .paused else { return }
let wasConnected = (self.connectionState == .connected)
self.connectionState = .disconnected
stopPingTimer()
delegate?.webSocketTransport(self, didDisconnectWithError: error)
// Terminate one-shot operations (queries/mutations) immediately, regardless of
// whether reconnection is enabled. Only subscriptions survive reconnection.
subscriberRegistry.finishNonSubscriptions(throwing: error ?? Error.connectionClosed)
if wasConnected && !subscriberRegistry.isEmpty && configuration.isReconnectEnabled {
subscriberRegistry.markSubscriptionsReconnecting()
await attemptReconnection()
} else {
connectionWaiters.failAll(with: error ?? Error.connectionClosed)
if let error {
subscriberRegistry.finishAll(reason: .error(error))
} else {
subscriberRegistry.finishAll(reason: .completed)
}
}
}
/// Attempts to reconnect after a disconnect by creating a new connection.
///
/// Waits for `reconnectionInterval` (if > 0) before connecting. Subscriber continuations
/// are kept alive — they will receive data from the new connection after reconnection
/// succeeds. If the reconnection attempt itself fails, `startConnectionReceiveLoop` will
/// detect that the state was never `connected` and terminate everything.
private func attemptReconnection() async {
let previousConnection = self.connection
if configuration.reconnectionInterval > 0 {
do {
try await Task.sleep(nanoseconds: UInt64(configuration.reconnectionInterval * 1_000_000_000))
} catch {
// Sleep was cancelled — terminate everything
subscriberRegistry.finishAll(reason: .cancelled)
return
}
}
// If the connection was replaced during the delay (e.g. by an explicit reconnection),
// bail out — that reconnection already started a new receive loop.
guard self.connection === previousConnection else { return }
// If all subscribers were cancelled during the reconnection delay, no need to reconnect.
guard !subscriberRegistry.isEmpty else {
return
}
// If the transport was paused during the reconnection delay, don't reconnect.
guard connectionState != .paused else { return }
connection = WebSocketConnection(task: urlSession.webSocketTask(with: request))
connectionState = .connecting
startConnectionReceiveLoop()
}
/// Suspends the caller until the connection transitions to `connected`.
///
/// If the connection is already `connected` (e.g. because `connection_ack` was buffered and
/// processed before this method runs), returns immediately without suspending.
/// Responds to task cancellation by resuming the waiter with `CancellationError`.
private func waitForConnectionAck() async throws {
if connectionState == .connected { return }
let waiterID = UUID()
try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Swift.Error>) in
if Task.isCancelled {
continuation.resume(throwing: CancellationError())
} else {
connectionWaiters.add(id: waiterID, continuation: continuation)
}
}
} onCancel: {
Task { await self.cancelConnectionWaiter(id: waiterID) }
}
}
/// Cancels a single connection waiter identified by its UUID.
/// Called from the `onCancel` handler of `withTaskCancellationHandler` when the
/// waiting task is cancelled before `connection_ack` arrives.
private func cancelConnectionWaiter(id: UUID) {
connectionWaiters.cancel(id: id)
}
// MARK: - Client-Initiated Ping Keepalive
/// Starts the periodic ping timer if `pingInterval` is configured.
///
/// Spawns a task that sends a `ping` message at the configured interval. The task
/// runs until cancelled (via ``stopPingTimer()``). Called after `connection_ack`.
private func startPingTimer() {
guard let interval = configuration.pingInterval, interval > 0 else { return }
stopPingTimer()
let connection = self.connection
pingTimerTask = Task { [weak self] in
let nanoseconds = UInt64(interval * 1_000_000_000)
while !Task.isCancelled {
do {
try await Task.sleep(nanoseconds: nanoseconds)
} catch {
break
}
guard let _ = self, !Task.isCancelled else { break }
guard let pingMessage = try? Message.Outgoing.ping(payload: nil).toWebSocketMessage() else {
continue
}
connection.send(pingMessage)
}
}
}
/// Stops the periodic ping timer.
private func stopPingTimer() {
pingTimerTask?.cancel()
pingTimerTask = nil
}
// MARK: - Runtime Configuration Updates
/// Updates the HTTP headers used when creating new WebSocket connection requests.
///
/// Headers are applied to the stored `URLRequest` and take effect on the next connection
/// (including reconnections). The `Sec-WebSocket-Protocol` header is always preserved.
///
/// - Parameters:
/// - headerValues: A dictionary of header field names to values. A `nil` value removes
/// the header field.
/// - reconnectIfConnected: If `true` and the transport is currently connected,
/// disconnects and reconnects with the updated headers.
public func updateHeaderValues(
_ headerValues: [String: String?],
reconnectIfConnected: Bool = false
) {
for (key, value) in headerValues {
request.setValue(value, forHTTPHeaderField: key)
}
if reconnectIfConnected && connectionState == .connected {
disconnectAndReconnect()
}
}
/// Updates the `connectingPayload` dictionary sent in the `connection_init` message.
///
/// The payload takes effect on the next connection (including reconnections).
///
/// - Parameters:
/// - payload: The new connecting payload dictionary, or `nil` to clear it.
/// - reconnectIfConnected: If `true` and the transport is currently connected,
/// disconnects and reconnects with the updated payload.
public func updateConnectingPayload(
_ payload: JSONEncodableDictionary?,
reconnectIfConnected: Bool = false
) {
configuration.connectingPayload = payload
if reconnectIfConnected && connectionState == .connected {
disconnectAndReconnect()
}
}
/// Disconnects the current WebSocket connection and immediately starts a new one.
///
/// Replaces the connection, which invalidates the old receive loop (it will see that
/// `self.connection` no longer matches and exit). Active subscribers will be resubscribed
/// when the new `connection_ack` arrives.
private func disconnectAndReconnect() {
connection = WebSocketConnection(task: urlSession.webSocketTask(with: request))
connectionState = .connecting
startConnectionReceiveLoop()
}
// MARK: - Pause / Resume
/// Gracefully pauses the WebSocket connection without terminating subscriber streams.
///
/// The underlying WebSocket task is closed and the transport transitions to a `paused` state.
/// Active subscription streams remain alive and will automatically be re-subscribed when
/// ``resume()`` is called and the new connection is acknowledged by the server.
///
/// Auto-reconnection is suppressed while paused. One-shot operations (queries and mutations)
/// that are in-flight are terminated immediately, as they cannot safely survive a
/// connection interruption.
///
/// This method is a no-op if the transport has not yet started or is already paused.
///
/// Typical usage:
/// ```swift
/// // App entering background
/// await transport.pause()
///
/// // App returning to foreground
/// await transport.resume()
/// ```
public func pause() {
switch connectionState {
case .notStarted, .paused:
return
case .connected, .connecting, .disconnected:
break
}
connectionState = .paused
stopPingTimer()
// Terminate one-shot operations — they cannot survive a pause because replaying
// a mutation could cause duplicate side effects.
subscriberRegistry.finishNonSubscriptions(throwing: Error.connectionClosed)
// Mark surviving subscriptions as paused so consumers can observe the state.
subscriberRegistry.markSubscriptionsPaused()
// Close the underlying WebSocket task. This causes the receive loop's stream to
// end, at which point the loop checks connectionState, sees `.paused`, and exits
// without triggering auto-reconnection or finishing subscriber streams.
connection.close()
}
/// Re-establishes the WebSocket connection after a ``pause()``, or opens a new connection
/// from a stopped or disconnected state.
///
/// Creates a fresh WebSocket task and begins the connection handshake. Once the server
/// sends `connection_ack`, all surviving subscription streams are automatically
/// re-subscribed on the new connection.
///
/// This method is a no-op if the transport is already connecting or connected.
///
/// Can also be used to eagerly open a connection before any ``subscribe`` calls:
/// ```swift
/// let transport = try WebSocketTransport(...)
/// await transport.resume()
/// // Connection is now being established; subsequent subscribe() calls
/// // will use this connection without additional setup delay.
/// ```
public func resume() {
switch connectionState {
case .notStarted:
connectionState = .connecting
startConnectionReceiveLoop()
case .paused, .disconnected:
connection = WebSocketConnection(task: urlSession.webSocketTask(with: request))
connectionState = .connecting
startConnectionReceiveLoop()
case .connecting, .connected:
return
}
}
// MARK: - Subscriber Management
private func registerSubscriber(
for operation: any GraphQLOperation,
stateStorage: SubscriptionStateStorage? = nil
) -> (OperationID, AsyncThrowingStream<JSONObject, any Swift.Error>) {
subscriberRegistry.register(for: operation, stateStorage: stateStorage)
}
/// Cancels a subscription from the client side. Removes the subscriber from the registry,
/// finishes its payload stream, and sends a `complete` message to the server.
/// No-ops if the subscriber was already removed (e.g. server already completed it).
private func cancelSubscription(operationID: OperationID) {
guard subscriberRegistry.finish(operationID, reason: .cancelled) else { return }
let message = Message.Outgoing.complete(id: operationID)
if let wsMessage = try? message.toWebSocketMessage() {
connection.send(wsMessage)
}
}
/// Removes a subscriber from the registry with a `.completed` finish reason without
/// sending a `complete` message to the server. Used when the connection is already
/// down (e.g. network-failure cache fallback) or when no server cleanup is needed.
private func cleanupSubscription(operationID: OperationID) {
subscriberRegistry.finish(operationID, reason: .completed)
}
private func sendSubscribeMessage<Operation: GraphQLOperation>(
operationID: OperationID,
operation: Operation
) throws {
let payload = configuration.requestBodyCreator.requestBody(
for: operation,
sendQueryDocument: true,
autoPersistQuery: false
)
let message = Message.Outgoing.subscribe(id: operationID, payload: payload)
connection.send(try message.toWebSocketMessage())
subscriberRegistry.markSubscribed(operationID)
}
/// Re-sends subscribe messages for all subscribers that were previously subscribed.
///
/// Called after a successful reconnection (on `connection_ack`). Only re-subscribes entries
/// with status `.subscribed` — entries with status `.pending` are new subscribers whose
/// inner tasks will send their own subscribe message after `ensureConnected()` returns.
private func resubscribeActiveSubscribers() {
for (id, operation) in subscriberRegistry.activeSubscriptions {
do {
try sendSubscribeMessage(operationID: id, operation: operation)
} catch {
// Defensive: if re-subscribe fails (e.g. missing query document — shouldn't happen
// since the first subscribe succeeded), terminate this individual subscriber.
subscriberRegistry.finish(id, reason: .error(error))
}
}
}
// MARK: - Processing Messages
private func didReceive(message: URLSessionWebSocketTask.Message) {
do {
let incoming = try Message.Incoming.from(message)
switch incoming {
case .connectionAck:
let isReconnect = hasBeenConnected
self.connectionState = .connected
self.hasBeenConnected = true
connectionWaiters.resumeAll()
resubscribeActiveSubscribers()
startPingTimer()
if isReconnect {
delegate?.webSocketTransportDidReconnect(self)
} else {
delegate?.webSocketTransportDidConnect(self)
}
case .next(let id, let payload):
subscriberRegistry.yield(payload, for: id)
case .error(let id, let errors):
subscriberRegistry.finish(id, reason: .error(Error.graphQLErrors(errors)))
case .complete(let id):
subscriberRegistry.finish(id, reason: .completed)
case .ping(let payload):
// Per the graphql-transport-ws protocol, a pong must be sent in response
// to a ping "as soon as possible".
if let pongMessage = try? Message.Outgoing.pong(payload: nil).toWebSocketMessage() {
connection.send(pongMessage)
}
delegate?.webSocketTransport(self, didReceivePingWithPayload: payload)
case .pong(let payload):
delegate?.webSocketTransport(self, didReceivePongWithPayload: payload)
}
} catch {
subscriberRegistry.finishAll(reason: .error(Error.unrecognizedMessage))
}
}
// MARK: - Cache Helpers
/// Attempts a pre-network cache read based on `fetchBehavior` and determines whether the
/// network fetch should proceed.
///
/// If the fetch behavior indicates a cache read before the network fetch, this method attempts
/// to load cached data from the store and yields it to the continuation. It then evaluates
/// whether a network fetch is still required.
///
/// - Returns: `true` if the caller should proceed with the network (WebSocket) fetch,
/// `false` if the stream should be finished without opening a connection.
private func readCacheBeforeNetworkIfNeeded<Operation: GraphQLOperation>(
operation: Operation,
fetchBehavior: FetchBehavior,
continuation: AsyncThrowingStream<GraphQLResponse<Operation>, any Swift.Error>.Continuation
) async throws -> Bool {
var didYieldCacheData = false
if fetchBehavior.cacheRead == .beforeNetworkFetch {
do {
if let cacheResponse = try await store.load(operation) {
didYieldCacheData = true
continuation.yield(cacheResponse)
}
} catch {
// If we won't fetch from network, propagate the cache read error.
if fetchBehavior.networkFetch == .never {
throw error
}
// Otherwise swallow the error and proceed to network.
}
}
switch fetchBehavior.networkFetch {
case .never:
return false
case .always:
return true
case .onCacheMiss:
return !didYieldCacheData
}
}
/// Parses a WebSocket payload, writes cache records if configured, and returns the response.
private func parseAndCacheResponse<Operation: GraphQLOperation>(
payload: JSONObject,
operation: Operation,
requestConfiguration: RequestConfiguration
) async throws -> GraphQLResponse<Operation> {
let handler = JSONResponseParser.SingleResponseExecutionHandler<Operation>(
responseBody: payload,
operationVariables: operation.__variables
)
let parsedResult = try await handler.execute(
includeCacheRecords: requestConfiguration.writeResultsToCache
)
if requestConfiguration.writeResultsToCache,
let cacheRecords = parsedResult.cacheRecords,
parsedResult.result.source == .server {
try await store.publish(records: cacheRecords)
}
return parsedResult.result
}
// MARK: - Operation Execution
/// Core implementation for executing any GraphQL operation over the WebSocket connection.
///
/// This is the shared implementation for queries, mutations, and subscriptions. All three
/// use the same `subscribe` message type in the `graphql-transport-ws` protocol. The server
/// replies with `next` (results) and `complete` (stream end) messages.
///
/// Cache behavior is controlled by `fetchBehavior` and `requestConfiguration`:
/// - `fetchBehavior.cacheRead` determines whether cached data is returned before or after
/// a network fetch, or not at all.
/// - `fetchBehavior.networkFetch` determines whether a WebSocket fetch is performed.
/// - `requestConfiguration.writeResultsToCache` determines whether server responses are
/// written to the normalized cache.
///
/// - Parameter stateStorage: When provided (for subscriptions), the storage is updated as
/// the operation moves through its lifecycle. When `nil` (for queries/mutations), no
/// state tracking is performed.
private nonisolated func executeOperation<Operation: GraphQLOperation>(
operation: Operation,
fetchBehavior: FetchBehavior,
requestConfiguration: RequestConfiguration,
stateStorage: SubscriptionStateStorage? = nil
) -> AsyncThrowingStream<GraphQLResponse<Operation>, any Swift.Error> {
AsyncThrowingStream { continuation in
let innerTask = Task {
do {
// Step 1: Cache read before network (if applicable)
let shouldFetchFromNetwork = try await self.readCacheBeforeNetworkIfNeeded(
operation: operation,
fetchBehavior: fetchBehavior,
continuation: continuation
)
guard shouldFetchFromNetwork else {
stateStorage?.set(.finished(.completed))
continuation.finish()
return
}
// Step 2: WebSocket fetch
let (operationID, payloadStream) = await self.registerSubscriber(
for: operation,
stateStorage: stateStorage
)
do {
try await self.ensureConnected()
try Task.checkCancellation()
try await self.sendSubscribeMessage(operationID: operationID, operation: operation)
for try await payload in payloadStream {
let response = try await self.parseAndCacheResponse(
payload: payload,
operation: operation,
requestConfiguration: requestConfiguration
)
continuation.yield(response)
}
if Task.isCancelled {
await self.cancelSubscription(operationID: operationID)
}
continuation.finish()
} catch {
// Step 3: Cache read on transport failure (if applicable).
// Only fall back to cache for transport/connection errors, not GraphQL
// application errors (e.g. validation failures, resolver errors).
let isGraphQLError: Bool
if case .graphQLErrors = error as? Error { isGraphQLError = true }
else { isGraphQLError = false }
if fetchBehavior.cacheRead == .onNetworkFailure, !isGraphQLError {
if let cacheResponse = try? await self.store.load(operation) {
continuation.yield(cacheResponse)
await self.cleanupSubscription(operationID: operationID)
continuation.finish()
return
}
}
await self.cancelSubscription(operationID: operationID)
continuation.finish(throwing: error)
}
} catch {
stateStorage?.set(.finished(.error(error)))
continuation.finish(throwing: error)
}
}
continuation.onTermination = { @Sendable reason in
guard case .cancelled = reason else { return }
innerTask.cancel()
}
}
}
/// Sends a GraphQL subscription over the WebSocket connection and returns a
/// ``SubscriptionStream`` that tracks the subscription's lifecycle state.
private nonisolated func sendSubscription<Operation: GraphQLSubscription>(
subscription: Operation,
fetchBehavior: FetchBehavior,
requestConfiguration: RequestConfiguration
) -> SubscriptionStream<GraphQLResponse<Operation>> {
let stateStorage = SubscriptionStateStorage()
let stream = executeOperation(
operation: subscription,
fetchBehavior: fetchBehavior,
requestConfiguration: requestConfiguration,
stateStorage: stateStorage
)
return SubscriptionStream(stream: stream, stateProvider: { stateStorage.state })
}
// MARK: - Network Transport Protocol Conformance
nonisolated public func send<Query: GraphQLQuery>(
query: Query,
fetchBehavior: FetchBehavior,
requestConfiguration: RequestConfiguration
) throws -> AsyncThrowingStream<GraphQLResponse<Query>, any Swift.Error> {
executeOperation(operation: query, fetchBehavior: fetchBehavior, requestConfiguration: requestConfiguration)
}
nonisolated public func send<Mutation: GraphQLMutation>(
mutation: Mutation,
requestConfiguration: RequestConfiguration
) throws -> AsyncThrowingStream<GraphQLResponse<Mutation>, any Swift.Error> {
executeOperation(operation: mutation, fetchBehavior: .NetworkOnly, requestConfiguration: requestConfiguration)
}
nonisolated public func send<Subscription: GraphQLSubscription>(
subscription: Subscription,
fetchBehavior: Apollo.FetchBehavior,
requestConfiguration: Apollo.RequestConfiguration
) throws -> SubscriptionStream<Apollo.GraphQLResponse<Subscription>> {
sendSubscription(subscription: subscription, fetchBehavior: fetchBehavior, requestConfiguration: requestConfiguration)
}
}