Skip to content

Commit 2155259

Browse files
committed
Make replicas wait for quorum before publishing main key when starting
Move message validation to ReplicaBase Update RO replicas key state after ST via cre (same mechanism as clients)
1 parent cf5caeb commit 2155259

21 files changed

+153
-142
lines changed

bftengine/include/bftengine/KeyExchangeManager.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,11 @@ class KeyExchangeManager {
3535
void generateConsensusKeyAndSendInternalClientMsg(const SeqNum& sn);
3636
// Send the current main public key of the replica to consensus
3737
void sendMainPublicKey();
38+
39+
void waitForQuorum(const ReplicaImp* repImpInstance);
40+
3841
// Waits for a quorum and calls generateConsensusKeyAndSendInternalClientMsg
39-
void waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum& = 0);
42+
void waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum s = 0);
4043
// The execution handler implementation that is called after a key exchange msg passed consensus.
4144
// The new key pair will be used from two checkpoints after kemsg.generated_sn
4245
std::string onKeyExchange(const KeyExchangeMsg& kemsg, const SeqNum& req_sn, const std::string& cid);

bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class ScalingReplicaHandler : public IStateHandler {
130130
}
131131
};
132132

133+
// TODO(yf): remove
133134
class MainKeyUpdateHandler : public IStateHandler {
134135
public:
135136
MainKeyUpdateHandler() { LOG_INFO(getLogger(), "Created StateTransfer CRE replica main key update handler"); }
@@ -186,8 +187,11 @@ std::shared_ptr<ClientReconfigurationEngine> CreFactory::create(
186187
IStateClient* pbc = new PollBasedStateClient(bftClient, cre_config.interval_timeout_ms_, 0, cre_config.id_);
187188
auto cre =
188189
std::make_shared<ClientReconfigurationEngine>(cre_config, pbc, std::make_shared<concordMetrics::Aggregator>());
189-
if (!bftEngine::ReplicaConfig::instance().isReadOnly) cre->registerHandler(std::make_shared<ScalingReplicaHandler>());
190-
cre->registerHandler(std::make_shared<MainKeyUpdateHandler>());
190+
if (bftEngine::ReplicaConfig::instance().isReadOnly) {
191+
cre->registerHandler(std::make_shared<MainKeyUpdateHandler>());
192+
} else {
193+
cre->registerHandler(std::make_shared<ScalingReplicaHandler>());
194+
}
191195
return cre;
192196
}
193197

bftengine/src/bftengine/KeyExchangeManager.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,14 +355,20 @@ void KeyExchangeManager::loadClientPublicKey(const std::string& key,
355355
if (saveToReservedPages) saveClientsPublicKeys(SigManager::instance()->getClientsPublicKeys());
356356
}
357357

358-
void KeyExchangeManager::waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum& s) {
359-
std::unique_lock<std::mutex> lock(startup_mutex_);
360-
SCOPED_MDC(MDC_REPLICA_ID_KEY, std::to_string(ReplicaConfig::instance().replicaId));
361-
if (!ReplicaConfig::instance().waitForFullCommOnStartup) {
358+
void KeyExchangeManager::waitForQuorum(const ReplicaImp* repImpInstance) {
359+
bool partialQuorum = !ReplicaConfig::instance().waitForFullCommOnStartup;
360+
LOG_INFO(KEY_EX_LOG, "Waiting for quorum" << KVLOG(partialQuorum));
361+
if (partialQuorum) {
362362
waitForLiveQuorum(repImpInstance);
363363
} else {
364364
waitForFullCommunication();
365365
}
366+
}
367+
368+
void KeyExchangeManager::waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum s) {
369+
std::unique_lock<std::mutex> lock(startup_mutex_);
370+
SCOPED_MDC(MDC_REPLICA_ID_KEY, std::to_string(ReplicaConfig::instance().replicaId));
371+
waitForQuorum(repImpInstance);
366372

367373
generateConsensusKeyAndSendInternalClientMsg(s);
368374
metrics_->sent_key_exchange_on_start_status.Get().Set("True");

bftengine/src/bftengine/ReadOnlyReplica.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,6 @@ ReadOnlyReplica::ReadOnlyReplica(const ReplicaConfig &config,
6060
msgHandlers_->registerMsgHandler(
6161
MsgCode::StateTransfer,
6262
std::bind(&ReadOnlyReplica::messageHandler<StateTransferMsg>, this, std::placeholders::_1));
63-
msgHandlers_->registerMsgHandler(
64-
MsgCode::StateTransfer,
65-
std::bind(&ReadOnlyReplica::messageHandler<StateTransferMsg>, this, std::placeholders::_1));
66-
msgHandlers_->registerMsgHandler(
67-
MsgCode::StateTransfer,
68-
std::bind(&ReadOnlyReplica::messageHandler<StateTransferMsg>, this, std::placeholders::_1));
6963
metrics_.Register();
7064

7165
SigManager::init(config_.replicaId,

bftengine/src/bftengine/ReplicaBase.hpp

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ namespace bftEngine::impl {
2626
class MsgHandlersRegistrator;
2727
class MsgsCommunicator;
2828
class ReplicasInfo;
29+
class CheckpointMsg;
2930

3031
using concordMetrics::GaugeHandle;
3132
using concordMetrics::StatusHandle;
@@ -80,7 +81,25 @@ class ReplicaBase {
8081

8182
void sendRaw(MessageBase* m, NodeIdType dest);
8283

83-
bool validateMessage(MessageBase* msg) {
84+
template <typename MessageType>
85+
bool validateMessage(MessageType* msg) {
86+
if (config_.debugStatisticsEnabled) {
87+
DebugStatistics::onReceivedExMessage(msg->type());
88+
}
89+
try {
90+
if constexpr (std::is_same_v<MessageType, CheckpointMsg>) {
91+
msg->validate(*repsInfo, false);
92+
} else {
93+
msg->validate(*repsInfo);
94+
}
95+
return true;
96+
} catch (std::exception& e) {
97+
onReportAboutInvalidMessage(msg, e.what());
98+
return false;
99+
}
100+
}
101+
102+
/*bool validateMessage(MessageBase* msg) {
84103
try {
85104
if (config_.debugStatisticsEnabled) DebugStatistics::onReceivedExMessage(msg->type());
86105
@@ -90,7 +109,7 @@ class ReplicaBase {
90109
onReportAboutInvalidMessage(msg, e.what());
91110
return false;
92111
}
93-
}
112+
}*/
94113

95114
protected:
96115
static const uint16_t ALL_OTHER_REPLICAS = UINT16_MAX;

bftengine/src/bftengine/ReplicaForStateTransfer.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,23 +87,20 @@ void ReplicaForStateTransfer::start() {
8787
if (!config_.isReadOnly) {
8888
// Load the public keys of the other replicas from reserved pages
8989
// so that their responses can be validated
90-
cre_->halt();
9190
KeyExchangeManager::instance().loadPublicKeys();
91+
92+
// Make sure to sign the reconfiguration client messages using the key
93+
// other replicas expect
94+
SigManager::instance()->setReplicaLastExecutedSeq(checkpoint * checkpointWindowSize);
95+
9296
// Need to update private key to match the loaded public key in case they differ (key exchange was executed
9397
// on other replicas but not on this one, finishing ST does not mean that missed key exchanges are executed)
9498
// This can be done by iterating the saved cryptosystems and updating their private key if their
9599
// public key matches the candidate saved in KeyExchangeManager
96-
97-
// Clear old keys
98100
CryptoManager::instance().onCheckpoint(checkpoint);
99101
auto [priv, pub] = KeyExchangeManager::instance().getCandidateKeyPair();
100102
CryptoManager::instance().syncPrivateKeyAfterST(priv, pub);
101103

102-
// Make sure to sign the reconfiguration client messages using the key
103-
// other replicas expect
104-
SigManager::instance()->setReplicaLastExecutedSeq(checkpoint * checkpointWindowSize);
105-
cre_->resume();
106-
107104
// At this point, we, if are not going to have another blocks in state transfer. So, we can safely stop CRE.
108105
// if there is a reconfiguration state change that prevents us from starting another state transfer (i.e.
109106
// scaling) then CRE probably won't work as well.

bftengine/src/bftengine/ReplicaImp.cpp

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -260,24 +260,6 @@ void ReplicaImp::validatedMessageHandler(CarrierMesssage *msg) {
260260
}
261261
}
262262

263-
template <typename MessageType>
264-
bool ReplicaImp::validateMessage(MessageType *msg) {
265-
if (config_.debugStatisticsEnabled) {
266-
DebugStatistics::onReceivedExMessage(msg->type());
267-
}
268-
try {
269-
if constexpr (std::is_same_v<MessageType, CheckpointMsg>) {
270-
msg->validate(*repsInfo, false);
271-
} else {
272-
msg->validate(*repsInfo);
273-
}
274-
return true;
275-
} catch (std::exception &e) {
276-
onReportAboutInvalidMessage(msg, e.what());
277-
return false;
278-
}
279-
}
280-
281263
/**
282264
* asyncValidateMessage<T> This is a family of asynchronous message which just schedules
283265
* the validation in a thread bag and returns peacefully. This will also translate the message
@@ -4663,6 +4645,7 @@ void ReplicaImp::start() {
46634645
// If key exchange is disabled, first publish the replica's main key to clients
46644646
if (ReplicaConfig::instance().singleSignatureScheme ||
46654647
ReplicaConfig::instance().publishReplicasMasterKeyOnStartup) {
4648+
KeyExchangeManager::instance().waitForQuorum(this);
46664649
KeyExchangeManager::instance().sendMainPublicKey();
46674650
}
46684651
}

bftengine/src/bftengine/RequestHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ void RequestHandler::execute(IRequestsHandler::ExecutionRequestsQueue& requests,
161161
} else {
162162
// this replica has not reached stable seqNum yet to create snapshot at requested seqNum
163163
// add a callback to be called when seqNum is stable. We need to create snapshot on stable
164-
// seq num because checkpoint msg certificate is stored on stable seq num and is used for intergrity
164+
// seq num because checkpoint msg certificate is stored on stable seq num and is used for integrity
165165
// check of db snapshots
166166
const auto& seqNumToCreateSanpshot = createDbChkPtMsg.seqNum;
167167
DbCheckpointManager::instance().setCheckpointInProcess(true, *blockId);

bftengine/src/bftengine/SigManager.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@ bool SigManager::verifyOwnSignature(const concord::Byte* data,
325325
const concord::Byte* expectedSignature) const {
326326
std::vector<concord::Byte> sig(getMySigLength());
327327
if (ReplicaConfig::instance().singleSignatureScheme) {
328-
for (auto signer : CryptoManager::instance().getLatestSigners()) {
328+
auto signers = CryptoManager::instance().getLatestSigners();
329+
for (auto& signer : signers) {
329330
signer->signBuffer(data, dataLength, sig.data());
330331

331332
if (std::memcmp(sig.data(), expectedSignature, getMySigLength()) == 0) {

bftengine/src/preprocessor/tests/preprocessor_test.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ using namespace std;
3636
using namespace bft::communication;
3737
using namespace bftEngine;
3838
using namespace preprocessor;
39-
using concord::crypto::SignatureAlgorithm;
4039

4140
namespace {
4241

0 commit comments

Comments
 (0)