-
Notifications
You must be signed in to change notification settings - Fork 146
FullNode concord-bft changes #3001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
bandatarunkumar
wants to merge
3
commits into
vmware:master
Choose a base branch
from
bandatarunkumar:full_node_st_changes
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,235 @@ | ||
| // Concord | ||
| // | ||
| // Copyright (c) 2023 VMware, Inc. All Rights Reserved. | ||
| // | ||
| // This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in | ||
| // compliance with the Apache 2.0 License. | ||
| // | ||
| // This product may include a number of subcomponents with separate copyright notices and license terms. Your use of | ||
| // these subcomponents is subject to the terms and conditions of the sub-component's license, as noted in the LICENSE | ||
| // file. | ||
|
|
||
| #include <optional> | ||
| #include <functional> | ||
| #include <bitset> | ||
|
|
||
| #include "bftengine/Replica.hpp" | ||
| #include "messages/StateTransferMsg.hpp" | ||
| #include "FullNodeReplica.hpp" | ||
|
bandatarunkumar marked this conversation as resolved.
|
||
|
|
||
| #include "log/logger.hpp" | ||
| #include "MsgHandlersRegistrator.hpp" | ||
| #include "messages/CheckpointMsg.hpp" | ||
| #include "messages/AskForCheckpointMsg.hpp" | ||
| #include "messages/ClientRequestMsg.hpp" | ||
| #include "messages/ClientReplyMsg.hpp" | ||
| #include "util/kvstream.h" | ||
| #include "PersistentStorage.hpp" | ||
| #include "MsgsCommunicator.hpp" | ||
| #include "SigManager.hpp" | ||
| #include "ReconfigurationCmd.hpp" | ||
| #include "util/json_output.hpp" | ||
| #include "SharedTypes.hpp" | ||
| #include "communication/StateControl.hpp" | ||
|
|
||
| using concordUtil::Timers; | ||
| using namespace std::placeholders; | ||
|
|
||
| // Note : The changes in files are inclined with RO replica SateTransfer behavior, all the class functions are inherited | ||
| // from ReadOnlyReplica. As we know for timebeing StateTransfer functionality is a temporary solution for FullNode, | ||
| // until the ASP/BSP is implemented the functions in this class needs to be changed based on the required accordingly. | ||
|
|
||
| namespace bftEngine::impl { | ||
|
|
||
| FullNodeReplica::FullNodeReplica(const ReplicaConfig &config, | ||
| std::shared_ptr<IRequestsHandler> requests_handler, | ||
| IStateTransfer *state_transfer, | ||
| std::shared_ptr<MsgsCommunicator> msg_comm, | ||
| std::shared_ptr<MsgHandlersRegistrator> msg_handler_reg, | ||
| concordUtil::Timers &timers, | ||
| MetadataStorage *metadata_storage) | ||
| : ReplicaForStateTransfer(config, requests_handler, state_transfer, msg_comm, msg_handler_reg, true, timers), | ||
| fn_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"), | ||
| metrics_.RegisterCounter("sentAskForCheckpointMsgs"), | ||
| metrics_.RegisterCounter("receivedInvalidMsgs"), | ||
| metrics_.RegisterGauge("lastExecutedSeqNum", lastExecutedSeqNum)}, | ||
| metadata_storage_{metadata_storage} { | ||
| LOG_INFO(GL, "Initialising Full Node Replica"); | ||
| repsInfo = new ReplicasInfo(config, dynamicCollectorForPartialProofs, dynamicCollectorForExecutionProofs); | ||
|
|
||
| registerMsgHandlers(); | ||
| metrics_.Register(); | ||
|
|
||
| SigManager::init(config_.replicaId, | ||
| config_.replicaPrivateKey, | ||
| config_.publicKeysOfReplicas, | ||
| concord::crypto::KeyFormat::HexaDecimalStrippedFormat, | ||
| ReplicaConfig::instance().getPublicKeysOfClients(), | ||
| concord::crypto::KeyFormat::PemFormat, | ||
| {{repsInfo->getIdOfOperator(), | ||
| ReplicaConfig::instance().getOperatorPublicKey(), | ||
| concord::crypto::KeyFormat::PemFormat}}, | ||
| *repsInfo); | ||
|
|
||
| // Register status handler for Full Node replica | ||
| registerStatusHandlers(); | ||
| bft::communication::StateControl::instance().setGetPeerPubKeyMethod( | ||
| [&](uint32_t id) { return SigManager::instance()->getPublicKeyOfVerifier(id); }); | ||
| } | ||
|
|
||
| void FullNodeReplica::start() { | ||
| ReplicaForStateTransfer::start(); | ||
| size_t sendAskForCheckpointMsgPeriodSec = config_.get("concord.bft.ro.sendAskForCheckpointMsgPeriodSec", 30); | ||
| askForCheckpointMsgTimer_ = timers_.add( | ||
| std::chrono::seconds(sendAskForCheckpointMsgPeriodSec), Timers::Timer::RECURRING, [this](Timers::Handle) { | ||
| if (!this->isCollectingState()) { | ||
|
bandatarunkumar marked this conversation as resolved.
|
||
| sendAskForCheckpointMsg(); | ||
| } | ||
| }); | ||
| msgsCommunicator_->startMsgsProcessing(config_.replicaId); | ||
| } | ||
|
|
||
| void FullNodeReplica::stop() { | ||
| timers_.cancel(askForCheckpointMsgTimer_); | ||
| ReplicaForStateTransfer::stop(); | ||
| } | ||
|
|
||
| void FullNodeReplica::onTransferringCompleteImp(uint64_t newStateCheckpoint) { | ||
|
bandatarunkumar marked this conversation as resolved.
|
||
| last_executed_seq_num_ = newStateCheckpoint * checkpointWindowSize; | ||
| fn_metrics_.last_executed_seq_num_.Get().Set(last_executed_seq_num_); | ||
| } | ||
|
|
||
| void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *reason) { | ||
| fn_metrics_.received_invalid_msg_++; | ||
| LOG_WARN(GL, | ||
| "Node " << config_.replicaId << " received invalid message from Node " << msg->senderId() | ||
| << " type=" << msg->type() << " reason: " << reason); | ||
| } | ||
| void FullNodeReplica::sendAskForCheckpointMsg() { | ||
|
bandatarunkumar marked this conversation as resolved.
|
||
| fn_metrics_.sent_ask_for_checkpoint_msg_++; | ||
| LOG_INFO(GL, "sending AskForCheckpointMsg"); | ||
| AskForCheckpointMsg msg{config_.replicaId}; | ||
| for (auto id : repsInfo->idsOfPeerReplicas()) send(&msg, id); | ||
| } | ||
|
|
||
| template <> | ||
| void FullNodeReplica::onMessage<StateTransferMsg>(std::unique_ptr<StateTransferMsg> msg) { | ||
| ReplicaForStateTransfer::onMessage(move(msg)); | ||
| } | ||
|
|
||
| template <> | ||
|
bandatarunkumar marked this conversation as resolved.
|
||
| void FullNodeReplica::onMessage<CheckpointMsg>(std::unique_ptr<CheckpointMsg> msg) { | ||
| if (isCollectingState()) { | ||
| return; | ||
| } | ||
| fn_metrics_.received_checkpoint_msg_++; | ||
| LOG_INFO(GL, | ||
| KVLOG(msg->senderId(), | ||
| msg->idOfGeneratedReplica(), | ||
| msg->seqNumber(), | ||
| msg->epochNumber(), | ||
| msg->size(), | ||
| msg->isStableState(), | ||
| msg->state(), | ||
| msg->stateDigest(), | ||
| msg->reservedPagesDigest(), | ||
| msg->rvbDataDigest())); | ||
|
|
||
| // Reconfiguration cmd block is synced to RO replica via reserved pages | ||
| EpochNum replicas_last_known_epoch_val = 0; | ||
| auto epoch_number_from_res_pages = ReconfigurationCmd::instance().getReconfigurationCommandEpochNumber(); | ||
| if (epoch_number_from_res_pages.has_value()) replicas_last_known_epoch_val = epoch_number_from_res_pages.value(); | ||
|
|
||
| // not relevant | ||
| if (!msg->isStableState() || msg->seqNumber() <= lastExecutedSeqNum || | ||
| msg->epochNumber() < replicas_last_known_epoch_val) { | ||
| return; | ||
| } | ||
| // no self certificate | ||
| static std::map<SeqNum, CheckpointInfo<false>> checkpoints_info; | ||
| const auto msg_seq_num = msg->seqNumber(); | ||
| const auto id_of_generated_eplica = msg->idOfGeneratedReplica(); | ||
| checkpoints_info[msg_seq_num].addCheckpointMsg(msg.release(), id_of_generated_eplica); | ||
| // if enough - invoke state transfer | ||
| if (checkpoints_info[msg_seq_num].isCheckpointCertificateComplete()) { | ||
| persistCheckpointDescriptor(msg_seq_num, checkpoints_info[msg_seq_num]); | ||
| checkpoints_info.clear(); | ||
| LOG_INFO(GL, "call to startCollectingState()"); | ||
| stateTransfer->startCollectingState(); | ||
| } | ||
| } | ||
|
|
||
| void FullNodeReplica::persistCheckpointDescriptor(const SeqNum &seqnum, const CheckpointInfo<false> &chckpinfo) { | ||
|
bandatarunkumar marked this conversation as resolved.
|
||
| std::vector<CheckpointMsg *> msgs; | ||
| msgs.reserve(chckpinfo.getAllCheckpointMsgs().size()); | ||
| for (const auto &m : chckpinfo.getAllCheckpointMsgs()) { | ||
| msgs.push_back(m.second); | ||
| LOG_INFO(GL, | ||
| KVLOG(m.second->seqNumber(), | ||
| m.second->epochNumber(), | ||
| m.second->state(), | ||
| m.second->stateDigest(), | ||
| m.second->reservedPagesDigest(), | ||
| m.second->rvbDataDigest(), | ||
| m.second->idOfGeneratedReplica())); | ||
| } | ||
| DescriptorOfLastStableCheckpoint desc(ReplicaConfig::instance().getnumReplicas(), msgs); | ||
| const size_t buf_len = DescriptorOfLastStableCheckpoint::maxSize(ReplicaConfig::instance().getnumReplicas()); | ||
| concord::serialize::UniquePtrToChar desc_buf(new char[buf_len]); | ||
| char *desc_buf_ptr = desc_buf.get(); | ||
| size_t actual_size = 0; | ||
| desc.serialize(desc_buf_ptr, buf_len, actual_size); | ||
| ConcordAssertNE(actual_size, 0); | ||
|
|
||
| // TODO [TK] S3KeyGenerator | ||
| // checkpoints/<BlockId>/<RepId> | ||
| std::ostringstream oss; | ||
| oss << "checkpoints/" << msgs[0]->state() << "/" << config_.replicaId; | ||
| metadata_storage_->atomicWriteArbitraryObject(oss.str(), desc_buf.get(), actual_size); | ||
| } | ||
|
|
||
| template <> | ||
| void FullNodeReplica::onMessage<ClientRequestMsg>(std::unique_ptr<ClientRequestMsg> msg) { | ||
|
bandatarunkumar marked this conversation as resolved.
|
||
| const NodeIdType sender_id = msg->senderId(); | ||
| const NodeIdType client_id = msg->clientProxyId(); | ||
| const ReqId req_seq_num = msg->requestSeqNum(); | ||
| const uint64_t flags = msg->flags(); | ||
|
|
||
| SCOPED_MDC_CID(msg->getCid()); | ||
| LOG_DEBUG(CNSUS, KVLOG(client_id, req_seq_num, sender_id) << " flags: " << std::bitset<sizeof(uint64_t) * 8>(flags)); | ||
|
|
||
| const auto &span_context = msg->spanContext<std::remove_pointer<ClientRequestMsg>::type>(); | ||
| auto span = concordUtils::startChildSpanFromContext(span_context, "bft_client_request"); | ||
| span.setTag("rid", config_.getreplicaId()); | ||
| span.setTag("cid", msg->getCid()); | ||
| span.setTag("seq_num", req_seq_num); | ||
|
|
||
| // TODO: handle reconfiguration request here, refer ReadOnlyReplica class | ||
| } | ||
|
|
||
| void FullNodeReplica::registerStatusHandlers() { | ||
| auto h = concord::diagnostics::StatusHandler( | ||
| "replica-sequence-numbers", "Last executed sequence number of the full node replica", [this]() { | ||
| concordUtils::BuildJson bj; | ||
|
|
||
| bj.startJson(); | ||
| bj.startNested("sequenceNumbers"); | ||
| bj.addKv("lastExecutedSeqNum", last_executed_seq_num_); | ||
| bj.endNested(); | ||
| bj.endJson(); | ||
|
|
||
| return bj.getJson(); | ||
| }); | ||
| concord::diagnostics::RegistrarSingleton::getInstance().status.registerHandler(h); | ||
| } | ||
|
|
||
| void FullNodeReplica::registerMsgHandlers() { | ||
| msgHandlers_->registerMsgHandler(MsgCode::Checkpoint, | ||
| std::bind(&FullNodeReplica::messageHandler<CheckpointMsg>, this, _1)); | ||
| msgHandlers_->registerMsgHandler(MsgCode::ClientRequest, | ||
| std::bind(&FullNodeReplica::messageHandler<ClientRequestMsg>, this, _1)); | ||
| msgHandlers_->registerMsgHandler(MsgCode::StateTransfer, | ||
| std::bind(&FullNodeReplica::messageHandler<StateTransferMsg>, this, _1)); | ||
| } | ||
|
|
||
| } // namespace bftEngine::impl | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.