-
Notifications
You must be signed in to change notification settings - Fork 111
Expand file tree
/
Copy pathNode.hs
More file actions
169 lines (147 loc) · 5.35 KB
/
Copy pathNode.hs
File metadata and controls
169 lines (147 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE UndecidableInstances #-}
-- | Top-level module to run a single Hydra node.
module Hydra.Node where
import Cardano.Prelude hiding (STM, async, atomically, cancel, check, poll, threadDelay)
import Control.Monad.Class.MonadAsync (MonadAsync, async)
import Control.Monad.Class.MonadSTM (MonadSTM (STM), atomically, newTQueue, newTVar, readTQueue, stateTVar, writeTQueue)
import Control.Monad.Class.MonadThrow (MonadThrow)
import Control.Monad.Class.MonadTimer (MonadTimer, threadDelay)
import Hydra.HeadLogic (
ClientRequest (..),
ClientResponse (..),
Effect (..),
Environment (..),
Event (..),
HeadState (..),
LogicError (..),
NetworkEvent (..),
OnChainTx (..),
Outcome (..),
confirmedLedger,
)
import qualified Hydra.HeadLogic as Logic
import Hydra.Ledger
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (HydraNetwork (..))
-- ** Create and run a hydra node
data HydraNode tx m = HydraNode
{ eq :: EventQueue m (Event tx)
, hn :: HydraNetwork tx m
, hh :: HydraHead tx m
, oc :: OnChain m
, sendResponse :: ClientResponse tx -> m ()
, env :: Environment
}
data HydraNodeLog tx
= ErrorHandlingEvent (Event tx) (LogicError tx)
| ProcessingEvent (Event tx)
| ProcessedEvent (Event tx)
| ProcessingEffect (Effect tx)
| ProcessedEffect (Effect tx)
deriving (Eq, Show)
handleClientRequest :: HydraNode tx m -> ClientRequest tx -> m ()
handleClientRequest HydraNode{eq} = putEvent eq . ClientEvent
handleChainTx :: HydraNode tx m -> OnChainTx -> m ()
handleChainTx HydraNode{eq} = putEvent eq . OnChainEvent
handleMessage :: HydraNode tx m -> Logic.HydraMessage tx -> m ()
handleMessage HydraNode{eq} = putEvent eq . NetworkEvent . MessageReceived
queryLedgerState :: MonadSTM m => HydraNode tx m -> STM m (Maybe (LedgerState tx))
queryLedgerState HydraNode{hh} = getConfirmedLedger hh
runHydraNode ::
MonadThrow m =>
MonadAsync m =>
MonadTimer m =>
Tx tx =>
Ord tx =>
Tracer m (HydraNodeLog tx) ->
HydraNode tx m ->
m ()
runHydraNode tracer node@HydraNode{eq} = do
-- NOTE(SN): here we could introduce concurrent head processing, e.g. with
-- something like 'forM_ [0..1] $ async'
forever $ do
e <- nextEvent eq
traceWith tracer $ ProcessingEvent e
processNextEvent node e >>= \case
Left err -> traceWith tracer (ErrorHandlingEvent e err)
Right effs -> forM_ effs (processEffect node tracer) >> traceWith tracer (ProcessedEvent e)
-- | Monadic interface around 'Hydra.Logic.update'.
processNextEvent ::
Tx tx =>
Ord tx =>
MonadSTM m =>
HydraNode tx m ->
Event tx ->
m (Either (LogicError tx) [Effect tx])
processNextEvent HydraNode{hh, env} e = do
atomically $
modifyHeadState hh $ \s ->
case Logic.update env (ledger hh) s e of
NewState s' effects -> (Right effects, s')
Error err -> (Left err, s)
Wait -> panic "TODO: wait and reschedule continuation"
processEffect ::
MonadAsync m =>
MonadTimer m =>
MonadThrow m =>
HydraNode tx m ->
Tracer m (HydraNodeLog tx) ->
Effect tx ->
m ()
processEffect HydraNode{hn, oc, sendResponse, eq} tracer e = do
traceWith tracer $ ProcessingEffect e
case e of
ClientEffect i -> sendResponse i
NetworkEffect msg -> broadcast hn msg
OnChainEffect tx -> postTx oc tx
Delay after event -> void . async $ threadDelay after >> putEvent eq event
traceWith tracer $ ProcessedEffect e
-- ** Some general event queue from which the Hydra head is "fed"
-- | The single, required queue in the system from which a hydra head is "fed".
-- NOTE(SN): this probably should be bounded and include proper logging
-- NOTE(SN): handle pattern, but likely not required as there is no need for an
-- alternative implementation
data EventQueue m e = EventQueue
{ putEvent :: e -> m ()
, nextEvent :: m e
}
createEventQueue :: MonadSTM m => m (EventQueue m e)
createEventQueue = do
q <- atomically newTQueue
pure
EventQueue
{ putEvent = atomically . writeTQueue q
, nextEvent = atomically $ readTQueue q
}
-- ** HydraHead handle to manage a single hydra head state concurrently
-- | Handle to access and modify a Hydra Head's state.
data HydraHead tx m = HydraHead
{ modifyHeadState :: forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
, ledger :: Ledger tx
}
getConfirmedLedger :: MonadSTM m => HydraHead tx m -> STM m (Maybe (LedgerState tx))
getConfirmedLedger hh =
queryHeadState hh <&> \case
HeadState _ (Logic.OpenState st) -> Just (confirmedLedger st)
_ -> Nothing
queryHeadState :: HydraHead tx m -> STM m (HeadState tx)
queryHeadState = (`modifyHeadState` \s -> (s, s))
putState :: HydraHead tx m -> HeadState tx -> STM m ()
putState HydraHead{modifyHeadState} new =
modifyHeadState $ const ((), new)
createHydraHead :: MonadSTM m => HeadState tx -> Ledger tx -> m (HydraHead tx m)
createHydraHead initialState ledger = do
tv <- atomically $ newTVar initialState
pure HydraHead{modifyHeadState = stateTVar tv, ledger}
-- ** OnChain handle to abstract over chain access
data ChainError = ChainError
deriving (Exception, Show)
-- | Handle to interface with the main chain network
newtype OnChain m = OnChain
{ -- | Construct and send a transaction to the main chain corresponding to the
-- given 'OnChainTx' event.
-- Does at least throw 'ChainError'.
postTx :: MonadThrow m => OnChainTx -> m ()
}