1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use crate :: stages:: PallasPoint ;
1516use acto:: { AcTokio , ActoCell , ActoMsgSuper , ActoRef , ActoRuntime } ;
1617use amaru_consensus:: { consensus:: store:: ChainStore , IsHeader } ;
1718use amaru_kernel:: { Hash , Header } ;
1819use amaru_ledger:: BlockValidationResult ;
1920use client_protocol:: { client_protocols, ClientProtocolMsg } ;
20- use client_state:: { to_pallas_point, ClientOp } ;
2121use gasket:: framework:: * ;
2222use pallas_network:: {
2323 facades:: PeerServer ,
@@ -40,9 +40,6 @@ pub const EVENT_TARGET: &str = "amaru::consensus::chain_forward";
4040
4141/// Forwarding stage of the consensus where blocks are stored and made
4242/// available to downstream peers.
43- ///
44- /// TODO: currently does nothing, should store block, update chain state, and
45- /// forward new chain downstream
4643
4744#[ derive( Stage ) ]
4845#[ stage( name = "consensus.forward" , unit = "Unit" , worker = "Worker" ) ]
@@ -52,14 +49,16 @@ pub struct ForwardStage {
5249 pub network_magic : u64 ,
5350 pub runtime : AcTokio ,
5451 pub listen_address : String ,
55- pub downstream : Option < ActoRef < ForwardEvent > > ,
52+ pub downstream : ActoRef < ForwardEvent > ,
5653 pub max_peers : usize ,
54+ pub our_tip : Tip ,
5755}
5856
5957#[ derive( Debug , Clone ) ]
6058pub enum ForwardEvent {
6159 Listening ( u16 ) ,
6260 Forward ( Point ) ,
61+ Backward ( Point ) ,
6362}
6463
6564impl ForwardStage {
@@ -69,6 +68,7 @@ impl ForwardStage {
6968 network_magic : u64 ,
7069 listen_address : & str ,
7170 max_peers : usize ,
71+ our_tip : Tip ,
7272 ) -> Self {
7373 #[ allow( clippy:: expect_used) ]
7474 let runtime =
@@ -79,8 +79,9 @@ impl ForwardStage {
7979 network_magic,
8080 runtime,
8181 listen_address : listen_address. to_string ( ) ,
82- downstream,
82+ downstream : downstream . unwrap_or_else ( ActoRef :: blackhole ) ,
8383 max_peers,
84+ our_tip,
8485 }
8586 }
8687}
@@ -93,7 +94,7 @@ pub enum Unit {
9394pub struct Worker {
9495 server : JoinHandle < ( ) > ,
9596 incoming_peers : Receiver < PeerServer > ,
96- tip : Tip ,
97+ our_tip : Tip ,
9798 clients : ActoRef < ClientMsg > ,
9899}
99100
@@ -103,16 +104,46 @@ impl Drop for Worker {
103104 }
104105}
105106
107+ #[ allow( clippy:: large_enum_variant) ]
108+ #[ derive( Debug , Clone ) ]
109+ enum ClientOp {
110+ /// the tip to go back to
111+ Backward ( Tip ) ,
112+ /// the header to go forward to and the tip we will be at after sending this header
113+ Forward ( Header , Tip ) ,
114+ }
115+
116+ impl PartialEq for ClientOp {
117+ fn eq ( & self , other : & Self ) -> bool {
118+ match ( self , other) {
119+ ( Self :: Backward ( l0) , Self :: Backward ( r0) ) => ( & l0. 0 , l0. 1 ) == ( & r0. 0 , r0. 1 ) ,
120+ ( Self :: Forward ( l0, l1) , Self :: Forward ( r0, r1) ) => {
121+ l0 == r0 && ( & l1. 0 , l1. 1 ) == ( & r1. 0 , r1. 1 )
122+ }
123+ _ => false ,
124+ }
125+ }
126+ }
127+
128+ impl Eq for ClientOp { }
129+
130+ impl ClientOp {
131+ pub fn tip ( & self ) -> Tip {
132+ match self {
133+ ClientOp :: Backward ( tip) => tip. clone ( ) ,
134+ ClientOp :: Forward ( _, tip) => tip. clone ( ) ,
135+ }
136+ }
137+ }
138+
106139#[ async_trait:: async_trait( ?Send ) ]
107140impl gasket:: framework:: Worker < ForwardStage > for Worker {
108141 async fn bootstrap ( stage : & ForwardStage ) -> Result < Self , WorkerError > {
109142 let server = TcpListener :: bind ( & stage. listen_address ) . await . or_panic ( ) ?;
110- if let Some ( downstream) = & stage. downstream {
111- tracing:: debug!( "sending listening event" ) ;
112- downstream. send ( ForwardEvent :: Listening (
113- server. local_addr ( ) . or_panic ( ) ?. port ( ) ,
114- ) ) ;
115- }
143+ tracing:: debug!( "sending listening event" ) ;
144+ stage. downstream . send ( ForwardEvent :: Listening (
145+ server. local_addr ( ) . or_panic ( ) ?. port ( ) ,
146+ ) ) ;
116147
117148 let ( tx, incoming_peers) = mpsc:: channel ( 10 ) ;
118149
@@ -154,7 +185,7 @@ impl gasket::framework::Worker<ForwardStage> for Worker {
154185 Ok ( Self {
155186 server,
156187 incoming_peers,
157- tip : Tip ( Point :: Origin , 0 ) ,
188+ our_tip : stage . our_tip . clone ( ) ,
158189 clients,
159190 } )
160191 }
@@ -191,12 +222,25 @@ impl gasket::framework::Worker<ForwardStage> for Worker {
191222 // FIXME: block height should be part of BlockValidated message
192223 let store = stage. store . lock ( ) . await ;
193224 if let Some ( header) = store. load_header ( & Hash :: from ( point) ) {
194- self . tip = Tip ( to_pallas_point ( point) , header. block_height ( ) ) ;
195- self . clients . send ( ClientMsg :: Op ( ClientOp :: Forward ( header) ) ) ;
196- }
225+ // assert that the new tip is a direct successor of the old tip
226+ assert_eq ! ( header. block_height( ) , self . our_tip. 1 + 1 ) ;
227+ match header. parent ( ) {
228+ Some ( parent) => assert_eq ! (
229+ Point :: new( self . our_tip. 0 . slot_or_default( ) , parent. as_ref( ) . to_vec( ) ) ,
230+ self . our_tip. 0
231+ ) ,
232+ None => assert_eq ! ( self . our_tip. 0 , Point :: Origin ) ,
233+ }
197234
198- if let Some ( downstream) = & stage. downstream {
199- downstream. send ( ForwardEvent :: Forward ( to_pallas_point ( point) ) ) ;
235+ self . our_tip = Tip ( point. pallas_point ( ) , header. block_height ( ) ) ;
236+ self . clients . send ( ClientMsg :: Op ( ClientOp :: Forward (
237+ header,
238+ self . our_tip . clone ( ) ,
239+ ) ) ) ;
240+
241+ stage
242+ . downstream
243+ . send ( ForwardEvent :: Forward ( point. pallas_point ( ) ) ) ;
200244 }
201245
202246 Ok ( ( ) )
@@ -213,9 +257,13 @@ impl gasket::framework::Worker<ForwardStage> for Worker {
213257 // FIXME: block height should be part of BlockValidated message
214258 let store = stage. store . lock ( ) . await ;
215259 if let Some ( header) = store. load_header ( & Hash :: from ( point) ) {
216- self . tip = Tip ( to_pallas_point ( point) , header. block_height ( ) ) ;
260+ self . our_tip = Tip ( point. pallas_point ( ) , header. block_height ( ) ) ;
217261 self . clients
218- . send ( ClientMsg :: Op ( ClientOp :: Backward ( to_pallas_point ( point) ) ) ) ;
262+ . send ( ClientMsg :: Op ( ClientOp :: Backward ( self . our_tip . clone ( ) ) ) ) ;
263+
264+ stage
265+ . downstream
266+ . send ( ForwardEvent :: Backward ( point. pallas_point ( ) ) ) ;
219267 }
220268
221269 Ok ( ( ) )
@@ -235,7 +283,8 @@ impl gasket::framework::Worker<ForwardStage> for Worker {
235283 // FIXME: gasket design bug that we only get &Unit and thus cannot take values from it without internal mutability
236284 let peer = peer. borrow_mut ( ) . take ( ) ;
237285 if let Some ( peer) = peer {
238- self . clients . send ( ClientMsg :: Peer ( peer, self . tip . clone ( ) ) ) ;
286+ self . clients
287+ . send ( ClientMsg :: Peer ( peer, self . our_tip . clone ( ) ) ) ;
239288 } else {
240289 tracing:: error!( target: EVENT_TARGET , "Unit::Peer was empty in execute" ) ;
241290 }
@@ -247,29 +296,33 @@ impl gasket::framework::Worker<ForwardStage> for Worker {
247296
248297#[ allow( clippy:: large_enum_variant) ]
249298enum ClientMsg {
299+ /// A new peer has connected to us.
300+ ///
301+ /// Our tip is included to get the connection handlers started correctly.
250302 Peer ( PeerServer , Tip ) ,
303+ /// An operation to be executed on all clients.
251304 Op ( ClientOp ) ,
252305}
253306
254307async fn client_supervisor (
255- mut cell : ActoCell < ClientMsg , impl ActoRuntime , Result < ( ) , client_protocol :: ClientError > > ,
308+ mut cell : ActoCell < ClientMsg , impl ActoRuntime , anyhow :: Result < ( ) > > ,
256309 store : Arc < Mutex < dyn ChainStore < Header > > > ,
257310 max_peers : usize ,
258311) {
259312 let mut clients = HashMap :: new ( ) ;
260313 while let Some ( msg) = cell. recv ( ) . await . has_senders ( ) {
261314 match msg {
262315 ActoMsgSuper :: Message ( ClientMsg :: Peer ( peer, tip) ) => {
263- if clients. len ( ) >= max_peers {
264- tracing:: warn!( target: EVENT_TARGET , "max peers reached, dropping peer" ) ;
265- continue ;
266- }
267-
268316 let addr = peer
269317 . accepted_address ( )
270318 . map ( |a| a. to_string ( ) )
271319 . unwrap_or_default ( ) ;
272320
321+ if clients. len ( ) >= max_peers {
322+ tracing:: warn!( target: EVENT_TARGET , "max peers reached, dropping peer from {addr}" ) ;
323+ continue ;
324+ }
325+
273326 let client = cell. spawn_supervised ( & addr, {
274327 let store = store. clone ( ) ;
275328 move |cell| client_protocols ( cell, peer, store, tip)
@@ -294,3 +347,6 @@ mod client_state;
294347
295348#[ cfg( test) ]
296349mod tests;
350+
351+ #[ cfg( test) ]
352+ mod test_infra;
0 commit comments