11// Concord
22//
3- // Copyright (c) 2018, 2019 VMware, Inc. All Rights Reserved.
3+ // Copyright (c) 2023 VMware, Inc. All Rights Reserved.
44//
55// This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in
66// compliance with the Apache 2.0 License.
1313#include < functional>
1414#include < bitset>
1515
16- #include < bftengine/Replica.hpp>
17- #include < messages/StateTransferMsg.hpp>
16+ #include " bftengine/Replica.hpp"
17+ #include " messages/StateTransferMsg.hpp"
1818#include " FullNodeReplica.hpp"
1919
2020#include " log/logger.hpp"
3333#include " communication/StateControl.hpp"
3434
3535using concordUtil::Timers;
36+ using namespace std ::placeholders;
37+
38+ // Note : The changes in files are inclined with RO replica SateTransfer behavior, all the class functions are inherited
39+ // from ReadOnlyReplica. As we know for timebeing StateTransfer functionality is a temporary solution for FullNode,
40+ // until the ASP/BSP is implemented the functions in this class needs to be changed based on the required accordingly.
3641
3742namespace bftEngine ::impl {
3843
3944FullNodeReplica::FullNodeReplica (const ReplicaConfig &config,
40- std::shared_ptr<IRequestsHandler> requestsHandler ,
41- IStateTransfer *stateTransfer ,
42- std::shared_ptr<MsgsCommunicator> msgComm ,
43- std::shared_ptr<MsgHandlersRegistrator> msgHandlerReg ,
45+ std::shared_ptr<IRequestsHandler> requests_handler ,
46+ IStateTransfer *state_transfer ,
47+ std::shared_ptr<MsgsCommunicator> msg_comm ,
48+ std::shared_ptr<MsgHandlersRegistrator> msg_handler_reg ,
4449 concordUtil::Timers &timers,
45- MetadataStorage *metadataStorage )
46- : ReplicaForStateTransfer(config, requestsHandler, stateTransfer, msgComm, msgHandlerReg , true , timers),
50+ MetadataStorage *metadata_storage )
51+ : ReplicaForStateTransfer(config, requests_handler, state_transfer, msg_comm, msg_handler_reg , true , timers),
4752 fn_metrics_{metrics_.RegisterCounter (" receivedCheckpointMsgs" ),
4853 metrics_.RegisterCounter (" sentAskForCheckpointMsgs" ),
4954 metrics_.RegisterCounter (" receivedInvalidMsgs" ),
5055 metrics_.RegisterGauge (" lastExecutedSeqNum" , lastExecutedSeqNum)},
51- metadataStorage_{metadataStorage } {
56+ metadata_storage_{metadata_storage } {
5257 LOG_INFO (GL, " Initialising Full Node Replica" );
5358 repsInfo = new ReplicasInfo (config, dynamicCollectorForPartialProofs, dynamicCollectorForExecutionProofs);
54- msgHandlers_->registerMsgHandler (
55- MsgCode::Checkpoint, std::bind (&FullNodeReplica::messageHandler<CheckpointMsg>, this , std::placeholders::_1));
56- msgHandlers_->registerMsgHandler (
57- MsgCode::ClientRequest,
58- std::bind (&FullNodeReplica::messageHandler<ClientRequestMsg>, this , std::placeholders::_1));
59- msgHandlers_->registerMsgHandler (
60- MsgCode::StateTransfer,
61- std::bind (&FullNodeReplica::messageHandler<StateTransferMsg>, this , std::placeholders::_1));
59+
60+ registerMsgHandlers ();
6261 metrics_.Register ();
6362
6463 SigManager::init (config_.replicaId ,
@@ -96,10 +95,8 @@ void FullNodeReplica::stop() {
9695}
9796
9897void FullNodeReplica::onTransferringCompleteImp (uint64_t newStateCheckpoint) {
99- lastExecutedSeqNum = newStateCheckpoint * checkpointWindowSize;
100-
101- fn_metrics_.last_executed_seq_num_ .Get ().Set (lastExecutedSeqNum);
102- last_executed_seq_num_ = lastExecutedSeqNum;
98+ last_executed_seq_num_ = newStateCheckpoint * checkpointWindowSize;
99+ fn_metrics_.last_executed_seq_num_ .Get ().Set (last_executed_seq_num_);
103100}
104101
105102void FullNodeReplica::onReportAboutInvalidMessage (MessageBase *msg, const char *reason) {
@@ -111,8 +108,8 @@ void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *
111108void FullNodeReplica::sendAskForCheckpointMsg () {
112109 fn_metrics_.sent_ask_for_checkpoint_msg_ ++;
113110 LOG_INFO (GL, " sending AskForCheckpointMsg" );
114- auto msg = std::make_unique<AskForCheckpointMsg>( config_.replicaId ) ;
115- for (auto id : repsInfo->idsOfPeerReplicas ()) send (msg. get () , id);
111+ AskForCheckpointMsg msg{ config_.replicaId } ;
112+ for (auto id : repsInfo->idsOfPeerReplicas ()) send (& msg, id);
116113}
117114
118115template <>
@@ -139,24 +136,24 @@ void FullNodeReplica::onMessage<CheckpointMsg>(std::unique_ptr<CheckpointMsg> ms
139136 msg->rvbDataDigest ()));
140137
141138 // Reconfiguration cmd block is synced to RO replica via reserved pages
142- EpochNum replicasLastKnownEpochVal = 0 ;
143- auto epochNumberFromResPages = ReconfigurationCmd::instance ().getReconfigurationCommandEpochNumber ();
144- if (epochNumberFromResPages .has_value ()) replicasLastKnownEpochVal = epochNumberFromResPages .value ();
139+ EpochNum replicas_last_known_epoch_val = 0 ;
140+ auto epoch_number_from_res_pages = ReconfigurationCmd::instance ().getReconfigurationCommandEpochNumber ();
141+ if (epoch_number_from_res_pages .has_value ()) replicas_last_known_epoch_val = epoch_number_from_res_pages .value ();
145142
146143 // not relevant
147144 if (!msg->isStableState () || msg->seqNumber () <= lastExecutedSeqNum ||
148- msg->epochNumber () < replicasLastKnownEpochVal ) {
145+ msg->epochNumber () < replicas_last_known_epoch_val ) {
149146 return ;
150147 }
151148 // no self certificate
152- static std::map<SeqNum, CheckpointInfo<false >> checkpointsInfo ;
153- const auto msgSeqNum = msg->seqNumber ();
154- const auto idOfGeneratedReplica = msg->idOfGeneratedReplica ();
155- checkpointsInfo[msgSeqNum ].addCheckpointMsg (msg.release (), idOfGeneratedReplica );
149+ static std::map<SeqNum, CheckpointInfo<false >> checkpoints_info ;
150+ const auto msg_seq_num = msg->seqNumber ();
151+ const auto id_of_generated_eplica = msg->idOfGeneratedReplica ();
152+ checkpoints_info[msg_seq_num ].addCheckpointMsg (msg.release (), id_of_generated_eplica );
156153 // if enough - invoke state transfer
157- if (checkpointsInfo[msgSeqNum ].isCheckpointCertificateComplete ()) {
158- persistCheckpointDescriptor (msgSeqNum, checkpointsInfo[msgSeqNum ]);
159- checkpointsInfo .clear ();
154+ if (checkpoints_info[msg_seq_num ].isCheckpointCertificateComplete ()) {
155+ persistCheckpointDescriptor (msg_seq_num, checkpoints_info[msg_seq_num ]);
156+ checkpoints_info .clear ();
160157 LOG_INFO (GL, " call to startCollectingState()" );
161158 stateTransfer->startCollectingState ();
162159 }
@@ -177,103 +174,37 @@ void FullNodeReplica::persistCheckpointDescriptor(const SeqNum &seqnum, const Ch
177174 m.second ->idOfGeneratedReplica ()));
178175 }
179176 DescriptorOfLastStableCheckpoint desc (ReplicaConfig::instance ().getnumReplicas (), msgs);
180- const size_t bufLen = DescriptorOfLastStableCheckpoint::maxSize (ReplicaConfig::instance ().getnumReplicas ());
181- concord::serialize::UniquePtrToChar descBuf (new char [bufLen ]);
182- char *descBufPtr = descBuf .get ();
183- size_t actualSize = 0 ;
184- desc.serialize (descBufPtr, bufLen, actualSize );
185- ConcordAssertNE (actualSize , 0 );
177+ const size_t buf_len = DescriptorOfLastStableCheckpoint::maxSize (ReplicaConfig::instance ().getnumReplicas ());
178+ concord::serialize::UniquePtrToChar desc_buf (new char [buf_len ]);
179+ char *desc_buf_ptr = desc_buf .get ();
180+ size_t actual_size = 0 ;
181+ desc.serialize (desc_buf_ptr, buf_len, actual_size );
182+ ConcordAssertNE (actual_size , 0 );
186183
187184 // TODO [TK] S3KeyGenerator
188185 // checkpoints/<BlockId>/<RepId>
189186 std::ostringstream oss;
190187 oss << " checkpoints/" << msgs[0 ]->state () << " /" << config_.replicaId ;
191- metadataStorage_ ->atomicWriteArbitraryObject (oss.str (), descBuf .get (), actualSize );
188+ metadata_storage_ ->atomicWriteArbitraryObject (oss.str (), desc_buf .get (), actual_size );
192189}
193190
194191template <>
195192void FullNodeReplica::onMessage<ClientRequestMsg>(std::unique_ptr<ClientRequestMsg> msg) {
196- const NodeIdType senderId = msg->senderId ();
197- const NodeIdType clientId = msg->clientProxyId ();
198- const bool reconfig_flag = (msg->flags () & MsgFlag::RECONFIG_FLAG) != 0 ;
199- const ReqId reqSeqNum = msg->requestSeqNum ();
193+ const NodeIdType sender_id = msg->senderId ();
194+ const NodeIdType client_id = msg->clientProxyId ();
195+ const ReqId req_seq_num = msg->requestSeqNum ();
200196 const uint64_t flags = msg->flags ();
201197
202198 SCOPED_MDC_CID (msg->getCid ());
203- LOG_DEBUG (CNSUS, KVLOG (clientId, reqSeqNum, senderId ) << " flags: " << std::bitset<sizeof (uint64_t ) * 8 >(flags));
199+ LOG_DEBUG (CNSUS, KVLOG (client_id, req_seq_num, sender_id ) << " flags: " << std::bitset<sizeof (uint64_t ) * 8 >(flags));
204200
205201 const auto &span_context = msg->spanContext <std::remove_pointer<ClientRequestMsg>::type>();
206202 auto span = concordUtils::startChildSpanFromContext (span_context, " bft_client_request" );
207203 span.setTag (" rid" , config_.getreplicaId ());
208204 span.setTag (" cid" , msg->getCid ());
209- span.setTag (" seq_num" , reqSeqNum);
210-
211- // A full node replica can handle only reconfiguration requests. Those requests are signed by the operator and
212- // the validation is done in the reconfiguration engine. Thus, we don't need to check the client validity as in
213- // the committers
205+ span.setTag (" seq_num" , req_seq_num);
214206
215- if (reconfig_flag) {
216- LOG_INFO (GL, " FN replica has received a reconfiguration request" );
217- executeReadOnlyRequest (span, *(msg.get ()));
218- return ;
219- }
220- }
221-
222- void FullNodeReplica::executeReadOnlyRequest (concordUtils::SpanWrapper &parent_span, const ClientRequestMsg &request) {
223- auto span = concordUtils::startChildSpan (" bft_execute_read_only_request" , parent_span);
224- // full node replica does not know who is the primary, so it always return 0. It is the client responsibility to treat
225- // the replies accordingly.
226- ClientReplyMsg reply (0 , request.requestSeqNum (), config_.getreplicaId ());
227- const uint16_t clientId = request.clientProxyId ();
228-
229- int executionResult = 0 ;
230- bftEngine::IRequestsHandler::ExecutionRequestsQueue accumulatedRequests;
231- accumulatedRequests.push_back (bftEngine::IRequestsHandler::ExecutionRequest{clientId,
232- static_cast <uint64_t >(lastExecutedSeqNum),
233- request.getCid (),
234- request.flags (),
235- request.requestLength (),
236- request.requestBuf (),
237- " " ,
238- reply.maxReplyLength (),
239- reply.replyBuf (),
240- request.requestSeqNum (),
241- request.requestIndexInBatch (),
242- request.result ()});
243- // DD: Do we need to take care of Time Service here?
244- bftRequestsHandler_->execute (accumulatedRequests, std::nullopt , request.getCid (), span);
245- IRequestsHandler::ExecutionRequest &single_request = accumulatedRequests.back ();
246- executionResult = single_request.outExecutionStatus ;
247- const uint32_t actualReplyLength = single_request.outActualReplySize ;
248- const uint32_t actualReplicaSpecificInfoLength = single_request.outReplicaSpecificInfoSize ;
249- LOG_DEBUG (GL,
250- " Executed full node request. " << KVLOG (clientId,
251- lastExecutedSeqNum,
252- request.requestLength (),
253- reply.maxReplyLength (),
254- actualReplyLength,
255- actualReplicaSpecificInfoLength,
256- executionResult));
257- // TODO(GG): TBD - how do we want to support empty replies? (actualReplyLength==0)
258- if (!executionResult) {
259- if (actualReplyLength > 0 ) {
260- reply.setReplyLength (actualReplyLength);
261- reply.setReplicaSpecificInfoLength (actualReplicaSpecificInfoLength);
262- send (&reply, clientId);
263- return ;
264- } else {
265- LOG_WARN (GL, " Received zero size response. " << KVLOG (clientId));
266- strcpy (single_request.outReply , " Executed data is empty" );
267- single_request.outActualReplySize = strlen (single_request.outReply );
268- executionResult = static_cast <uint32_t >(bftEngine::OperationResult::EXEC_DATA_EMPTY);
269- }
270-
271- } else {
272- LOG_ERROR (GL, " Received error while executing FN request. " << KVLOG (clientId, executionResult));
273- }
274- ClientReplyMsg replyMsg (
275- 0 , request.requestSeqNum (), single_request.outReply , single_request.outActualReplySize , executionResult);
276- send (&replyMsg, clientId);
207+ // TODO: handle reconfiguration request here, refer ReadOnlyReplica class
277208}
278209
279210void FullNodeReplica::registerStatusHandlers () {
@@ -292,4 +223,13 @@ void FullNodeReplica::registerStatusHandlers() {
292223 concord::diagnostics::RegistrarSingleton::getInstance ().status .registerHandler (h);
293224}
294225
226+ void FullNodeReplica::registerMsgHandlers () {
227+ msgHandlers_->registerMsgHandler (MsgCode::Checkpoint,
228+ std::bind (&FullNodeReplica::messageHandler<CheckpointMsg>, this , _1));
229+ msgHandlers_->registerMsgHandler (MsgCode::ClientRequest,
230+ std::bind (&FullNodeReplica::messageHandler<ClientRequestMsg>, this , _1));
231+ msgHandlers_->registerMsgHandler (MsgCode::StateTransfer,
232+ std::bind (&FullNodeReplica::messageHandler<StateTransferMsg>, this , _1));
233+ }
234+
295235} // namespace bftEngine::impl
0 commit comments