-
Notifications
You must be signed in to change notification settings - Fork 146
Expand file tree
/
Copy pathAsyncStateTransferCRE.cpp
More file actions
174 lines (165 loc) · 8.48 KB
/
AsyncStateTransferCRE.cpp
File metadata and controls
174 lines (165 loc) · 8.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Concord
//
// Copyright (c) 2021 VMware, Inc. All Rights Reserved.
//
// This product is licensed to you under the Apache 2.0 license (the "License").
// You may not use this product except in compliance with the Apache 2.0
// License.
//
// This product may include a number of subcomponents with separate copyright
// notices and license terms. Your use of these subcomponents is subject to the
// terms and conditions of the subcomponent's license, as noted in the LICENSE
// file.
#include "AsyncStateTransferCRE.hpp"
#include "bftengine/ReplicaConfig.hpp"
#include "bftengine/SigManager.hpp"
#include "bftengine/EpochManager.hpp"
#include "secrets/secrets_manager_plain.h"
#include "communication/StateControl.hpp"
#include "client/reconfiguration/poll_based_state_client.hpp"
#include "communication/ICommunication.hpp"
#include "bftclient/bft_client.h"
#include "ControlStateManager.hpp"
#include "reconfiguration/ireconfiguration.hpp"
namespace bftEngine::bcst::asyncCRE {
using namespace concord::client::reconfiguration;
using namespace bft::communication;
class Communication : public ICommunication {
public:
Communication(std::shared_ptr<MsgsCommunicator> msgsCommunicator, std::shared_ptr<MsgHandlersRegistrator> msgHandlers)
: msgsCommunicator_{msgsCommunicator}, repId_{bftEngine::ReplicaConfig::instance().replicaId} {
msgHandlers->registerMsgHandler(MsgCode::ClientReply, [&](std::unique_ptr<bftEngine::impl::MessageBase> message) {
if (receiver_) receiver_->onNewMessage(message->senderId(), message->body(), message->size());
});
}
int getMaxMessageSize() override { return 128 * 1024; } // 128KB
int start() override {
is_running_ = true;
return 0;
}
int stop() override {
is_running_ = false;
return 0;
}
bool isRunning() const override { return is_running_; }
ConnectionStatus getCurrentConnectionStatus(NodeNum node) override {
if (!is_running_) return ConnectionStatus::Disconnected;
return ConnectionStatus::Connected;
}
int send(NodeNum destNode, std::vector<uint8_t>&& msg, NodeNum endpointNum) override {
if (destNode == repId_) {
return msg.size();
}
return msgsCommunicator_->sendAsyncMessage(destNode, reinterpret_cast<char*>(msg.data()), msg.size());
}
std::set<NodeNum> send(std::set<NodeNum> dests, std::vector<uint8_t>&& msg, NodeNum endpointNum) override {
auto ret = dests;
dests.erase(repId_);
msgsCommunicator_->send(dests, reinterpret_cast<char*>(msg.data()), msg.size());
return ret;
}
void setReceiver(NodeNum receiverNum, IReceiver* receiver) override { receiver_ = receiver; }
void restartCommunication(NodeNum i) override {}
private:
std::shared_ptr<MsgsCommunicator> msgsCommunicator_;
bool is_running_ = false;
IReceiver* receiver_ = nullptr;
uint16_t repId_;
};
class InternalSigner : public concord::crypto::ISigner {
public:
size_t signBuffer(const concord::Byte* dataIn, size_t dataLen, concord::Byte* sigOutBuffer) override {
return bftEngine::impl::SigManager::instance()->sign(dataIn, dataLen, sigOutBuffer);
}
size_t signatureLength() const override { return bftEngine::impl::SigManager::instance()->getMySigLength(); }
std::string getPrivKey() const override { return ""; }
};
// Scaling command may break state transfer itself (if, for example, we scale from n1 to n2 < n1 replicas). For that we
// need a client like mechanism which work asynchronously to the state itself. However, a committer replica, may end
// state transfer and get the scale command in the state, before it was caught by CRE. In this case, the command is
// handled by the reconfiguration state transfer callback (see concordbft/kvbc/include/st_reconfiguraion_sm.hpp). The
// two mechanisms are synchronized via the configurations list. Note that we are unable to synchronize them based on
// epoch number, because CRE is (at the moment) unaware to epochs. Epochs are shared via reserved pages which are
// getting updated at the end of state transfer.
class ScalingReplicaHandler : public IStateHandler {
public:
ScalingReplicaHandler() {}
bool validate(const State& state) const override {
concord::messages::ClientStateReply crep;
concord::messages::deserialize(state.data, crep);
if (crep.epoch < EpochManager::instance().getSelfEpochNumber()) return false;
if (std::holds_alternative<concord::messages::ClientsAddRemoveExecuteCommand>(crep.response)) {
concord::messages::ClientsAddRemoveExecuteCommand command =
std::get<concord::messages::ClientsAddRemoveExecuteCommand>(crep.response);
std::ofstream configurations_file;
configurations_file.open(bftEngine::ReplicaConfig::instance().configurationViewFilePath + "/" +
concord::reconfiguration::configurationsFileName + "." +
std::to_string(bftEngine::ReplicaConfig::instance().replicaId),
std::ios_base::app);
if (configurations_file.good()) {
std::stringstream stream;
stream << configurations_file.rdbuf();
std::string configs = stream.str();
return (configs.empty()) || (configs.find(command.config_descriptor) == std::string::npos);
}
}
return false;
}
bool execute(const State& state, WriteState&) override {
LOG_INFO(getLogger(), "execute AddRemoveWithWedgeCommand");
concord::messages::ClientStateReply crep;
concord::messages::deserialize(state.data, crep);
concord::messages::ClientsAddRemoveExecuteCommand command =
std::get<concord::messages::ClientsAddRemoveExecuteCommand>(crep.response);
std::ofstream configuration_file;
configuration_file.open(bftEngine::ReplicaConfig::instance().configurationViewFilePath + "/" +
concord::reconfiguration::configurationsFileName + "." +
std::to_string(bftEngine::ReplicaConfig::instance().replicaId),
std::ios_base::app);
if (!configuration_file.good()) {
LOG_FATAL(getLogger(), "unable to open the reconfigurations file");
}
configuration_file << (command.config_descriptor + "\n");
configuration_file.close();
LOG_INFO(getLogger(), "getting new configuration");
bftEngine::ControlStateManager::instance().getNewConfiguration(command.config_descriptor, command.token);
bftEngine::ControlStateManager::instance().markRemoveMetadata();
LOG_INFO(getLogger(), "completed scaling procedure for " << command.config_descriptor << " restarting the replica");
if (command.restart) bftEngine::ControlStateManager::instance().restart();
return true;
}
private:
logging::Logger getLogger() {
static logging::Logger logger_(logging::getLogger("bftEngine::bcst::asyncCRE.ScalingReplicaHandler"));
return logger_;
}
};
std::shared_ptr<ClientReconfigurationEngine> CreFactory::create(std::shared_ptr<MsgsCommunicator> msgsCommunicator,
std::shared_ptr<MsgHandlersRegistrator> msgHandlers) {
bft::client::ClientConfig bftClientConf;
auto& repConfig = bftEngine::ReplicaConfig::instance();
bftClientConf.f_val = repConfig.fVal;
bftClientConf.c_val = repConfig.cVal;
bftClientConf.retry_timeout_config.initial_retry_timeout = bftClientConf.retry_timeout_config.min_retry_timeout =
bftClientConf.retry_timeout_config.max_retry_timeout = 1s;
bftClientConf.id = bft::client::ClientId{repConfig.replicaId};
for (uint16_t i = 0; i < repConfig.numReplicas; i++) {
bftClientConf.all_replicas.emplace(bft::client::ReplicaId{i});
}
for (uint16_t i = repConfig.numReplicas; i < repConfig.numReplicas + repConfig.numRoReplicas; i++) {
bftClientConf.ro_replicas.emplace(bft::client::ReplicaId{i});
}
bftClientConf.replicas_master_key_folder_path = std::nullopt;
std::unique_ptr<ICommunication> comm = std::make_unique<Communication>(msgsCommunicator, msgHandlers);
bft::client::Client* bftClient = new bft::client::Client(std::move(comm), bftClientConf);
bftClient->setTransactionSigner(new InternalSigner());
Config cre_config;
cre_config.id_ = repConfig.replicaId;
cre_config.interval_timeout_ms_ = 1000;
IStateClient* pbc = new PollBasedStateClient(bftClient, cre_config.interval_timeout_ms_, 0, cre_config.id_);
auto cre =
std::make_shared<ClientReconfigurationEngine>(cre_config, pbc, std::make_shared<concordMetrics::Aggregator>());
if (!bftEngine::ReplicaConfig::instance().isReadOnly) cre->registerHandler(std::make_shared<ScalingReplicaHandler>());
return cre;
}
} // namespace bftEngine::bcst::asyncCRE