Skip to content

Commit 03d5275

Browse files
authored
Merge branch 'master' into nkumar4/clientVerifiability-POC
2 parents 8712c7d + bc1c094 commit 03d5275

File tree

78 files changed

+2640
-861
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+2640
-861
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ option(USE_GRPC "Enable GRPC and Protobuf" ON)
4444
option(USE_OPENSSL "Enable use of OpenSSL" ON)
4545
option(BUILD_THIRDPARTY "Wheter to build third party librarie or use preinstalled ones" OFF)
4646
option(CODECOVERAGE "Enable Code Coverage Metrics in Clang" OFF)
47+
option(ENABLE_RESTART_RECOVERY_TESTS "Enable tests for restart recovery" OFF)
4748

4849
if(USE_OPENSSL AND NOT BUILD_THIRDPARTY)
4950
set(OPENSSL_ROOT_DIR /usr/local/ssl) # not to confuse with system ssl libs

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ CONCORD_BFT_CMAKE_ASAN?=FALSE
5656
CONCORD_BFT_CMAKE_TSAN?=FALSE
5757
CONCORD_BFT_CMAKE_CODECOVERAGE?=FALSE
5858
CONCORD_BFT_CMAKE_USE_FAKE_CLOCK_IN_TIME_SERVICE?=FALSE
59+
ENABLE_RESTART_RECOVERY_TESTS?=FALSE
5960
ifeq (${CONCORD_BFT_CMAKE_ASAN},TRUE)
6061
CONCORD_BFT_CMAKE_CXX_FLAGS_RELEASE='-O0 -g'
6162
else ifeq (${CONCORD_BFT_CMAKE_TSAN},TRUE)
@@ -88,7 +89,8 @@ CONCORD_BFT_CMAKE_FLAGS?= \
8889
-DLEAKCHECK=${CONCORD_BFT_CMAKE_ASAN} \
8990
-DTHREADCHECK=${CONCORD_BFT_CMAKE_TSAN} \
9091
-DCODECOVERAGE=${CONCORD_BFT_CMAKE_CODECOVERAGE} \
91-
-DTXN_SIGNING_ENABLED=${CONCORD_BFT_CMAKE_TRANSACTION_SIGNING_ENABLED}
92+
-DTXN_SIGNING_ENABLED=${CONCORD_BFT_CMAKE_TRANSACTION_SIGNING_ENABLED} \
93+
-DENABLE_RESTART_RECOVERY_TESTS=${ENABLE_RESTART_RECOVERY_TESTS}
9294

9395

9496
# The consistency parameter makes sense only at MacOS.

bftengine/include/bftengine/DbCheckpointManager.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ class DbCheckpointManager {
136136
}
137137
inline auto getLastDbCheckpointSeqNum() const { return lastCheckpointSeqNum_; }
138138
std::string getDiskUsageInfo();
139+
// return a map of pair<checkpoint_id, size_on_disk>
140+
// checkpoint_id = 0 indicates rocksdb size
141+
// only used for apollo test
142+
std::map<uint64_t, uint64_t> getDbSize();
139143

140144
private:
141145
logging::Logger getLogger() {

bftengine/src/bcstatetransfer/BCStateTran.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,8 +1481,12 @@ bool BCStateTran::onMessage(const AskForCheckpointSummariesMsg *m, uint32_t msgL
14811481
if (!psd_->hasCheckpointDesc(i)) continue;
14821482

14831483
DataStore::CheckpointDesc cpDesc = psd_->getCheckpointDesc(i);
1484-
std::unique_ptr<CheckpointSummaryMsg> msg =
1485-
std::unique_ptr<CheckpointSummaryMsg>(CheckpointSummaryMsg::create(cpDesc.rvbData.size()));
1484+
auto deleter = [](CheckpointSummaryMsg *msg) {
1485+
char *bytes = reinterpret_cast<char *>(msg);
1486+
delete[] bytes;
1487+
};
1488+
auto msg = std::unique_ptr<CheckpointSummaryMsg, decltype(deleter)>(
1489+
CheckpointSummaryMsg::create(cpDesc.rvbData.size()), deleter);
14861490

14871491
msg->checkpointNum = i;
14881492
msg->maxBlockId = cpDesc.maxBlockId;

bftengine/src/bftengine/DbCheckpointManager.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ uint64_t DbCheckpointManager::directorySize(const _fs::path& directory, const bo
148148
uint64_t size{0};
149149
try {
150150
if (_fs::exists(directory)) {
151-
for (const auto& entry : _fs::recursive_directory_iterator(directory)) {
151+
for (const auto& entry : _fs::directory_iterator(directory)) {
152152
if (_fs::is_regular_file(entry) && !_fs::is_symlink(entry)) {
153153
if (_fs::hard_link_count(entry) > 1 && excludeHardLinks) continue;
154154
size += _fs::file_size(entry);
@@ -348,7 +348,8 @@ void DbCheckpointManager::updateMetrics() {
348348
if (const auto it = dbCheckptMetadata_.dbCheckPoints_.crbegin(); it != dbCheckptMetadata_.dbCheckPoints_.crend()) {
349349
_fs::path path(checkpointDir);
350350
_fs::path chkptIdPath = path / std::to_string(it->first);
351-
auto lastDbCheckpointSize = directorySize(chkptIdPath, false, true);
351+
// db checkpoint directory does not have any sub-directory
352+
auto lastDbCheckpointSize = directorySize(chkptIdPath, false, false);
352353
lastDbCheckpointSizeInMb_.Get().Set(lastDbCheckpointSize / (1024 * 1024));
353354
metrics_.UpdateAggregator();
354355
LOG_INFO(getLogger(), "rocksdb check point id:" << it->first << ", size: " << HumanReadable{lastDbCheckpointSize});
@@ -398,4 +399,20 @@ std::string DbCheckpointManager::getDiskUsageInfo() {
398399
}
399400
return ss.str();
400401
}
402+
std::map<uint64_t, uint64_t> DbCheckpointManager::getDbSize() {
403+
auto find_db_size = [&](const std::string& path) -> uint64_t {
404+
_fs::path dbPath(path);
405+
return directorySize(dbPath, false, false);
406+
};
407+
auto dbSizeMap = std::map<uint64_t, uint64_t>{};
408+
dbSizeMap[0] = find_db_size(dbClient_->getPath());
409+
{
410+
std::scoped_lock lock(lock_);
411+
for (const auto& [db_chkpt_id, _] : dbCheckptMetadata_.dbCheckPoints_) {
412+
(void)_;
413+
dbSizeMap[db_chkpt_id] = find_db_size(dbClient_->getPathForCheckpoint(db_chkpt_id));
414+
}
415+
}
416+
return dbSizeMap;
417+
}
401418
} // namespace bftEngine::impl

bftengine/src/bftengine/KeyExchangeManager.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,12 @@ void KeyExchangeManager::loadPublicKeys() {
145145
void KeyExchangeManager::exchangeTlsKeys(const std::string& type, const SeqNum& bft_sn) {
146146
auto keys = concord::util::crypto::Crypto::instance().generateECDSAKeyPair(
147147
concord::util::crypto::KeyFormat::PemFormat, concord::util::crypto::CurveType::secp384r1);
148-
std::string root_path =
149-
bftEngine::ReplicaConfig::instance().certificatesRootPath + "/" + std::to_string(repID_) + "/" + type;
148+
bool use_unified_certs = bftEngine::ReplicaConfig::instance().useUnifiedCertificates;
149+
const std::string base_path =
150+
bftEngine::ReplicaConfig::instance().certificatesRootPath + "/" + std::to_string(repID_);
151+
std::string root_path = (use_unified_certs) ? base_path : base_path + "/" + type;
150152

151-
std::string cert_path = root_path + "/" + type + ".cert";
153+
std::string cert_path = (use_unified_certs) ? root_path + "/node.cert" : root_path + "/" + type + ".cert";
152154
std::string prev_key_pem = concord::util::crypto::Crypto::instance()
153155
.RsaHexToPem(std::make_pair(SigManager::instance()->getSelfPrivKey(), ""))
154156
.first;

bftengine/src/bftengine/ReplicaImp.cpp

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2448,6 +2448,7 @@ void ReplicaImp::startExecution(SeqNum seqNumber,
24482448
if (isCurrentPrimary()) {
24492449
metric_consensus_duration_.finishMeasurement(seqNumber);
24502450
metric_post_exe_duration_.addStartTimeStamp(seqNumber);
2451+
metric_consensus_end_to_core_exe_duration_.addStartTimeStamp(seqNumber);
24512452
}
24522453

24532454
consensus_times_.end(seqNumber);
@@ -4341,9 +4342,13 @@ ReplicaImp::ReplicaImp(bool firstTime,
43414342
metric_total_preexec_requests_executed_{metrics_.RegisterCounter("totalPreExecRequestsExecuted")},
43424343
metric_received_restart_ready_{metrics_.RegisterCounter("receivedRestartReadyMsg", 0)},
43434344
metric_received_restart_proof_{metrics_.RegisterCounter("receivedRestartProofMsg", 0)},
4344-
metric_consensus_duration_{metrics_, "consensusDuration", 1000, true},
4345-
metric_post_exe_duration_{metrics_, "postExeDuration", 1000, true},
4346-
metric_primary_batching_duration_{metrics_, "primaryBatchingDuration", 10000, true},
4345+
metric_consensus_duration_{metrics_, "consensusDuration", 1000, 100, true},
4346+
metric_post_exe_duration_{metrics_, "postExeDuration", 1000, 100, true},
4347+
metric_core_exe_func_duration_{metrics_, "postExeCoreFuncDuration", 1000, 100, true},
4348+
metric_consensus_end_to_core_exe_duration_{metrics_, "consensusEndToExeStartDuration", 1000, 100, true},
4349+
metric_post_exe_thread_idle_time_{metrics_, "PostExeThreadIdleDuration", 1000, 100, true},
4350+
metric_post_exe_thread_active_time_{metrics_, "PostExeThreadActiveDuration", 1000, 100, true},
4351+
metric_primary_batching_duration_{metrics_, "primaryBatchingDuration", 10000, 1000, true},
43474352
consensus_times_(histograms_.consensus),
43484353
checkpoint_times_(histograms_.checkpointFromCreationToStable),
43494354
time_in_active_view_(histograms_.timeInActiveView),
@@ -4817,6 +4822,11 @@ void ReplicaImp::startPrePrepareMsgExecution(PrePrepareMsg *ppMsg,
48174822
// send internal message that will call to finishExecutePrePrepareMsg
48184823
ConcordAssert(activeExecutions_ == 0);
48194824
activeExecutions_ = 1;
4825+
if (isCurrentPrimary()) {
4826+
metric_post_exe_thread_active_time_.addStartTimeStamp(0);
4827+
metric_post_exe_thread_idle_time_.finishMeasurement(0);
4828+
}
4829+
48204830
InternalMessage im = FinishPrePrepareExecutionInternalMsg{ppMsg, nullptr}; // TODO(GG): check....
48214831
getIncomingMsgsStorage().pushInternalMsg(std::move(im));
48224832
}
@@ -4908,6 +4918,10 @@ void ReplicaImp::executeAllPrePreparedRequests(bool allowParallelExecution,
49084918

49094919
ConcordAssert(activeExecutions_ == 0);
49104920
activeExecutions_ = 1;
4921+
if (isCurrentPrimary()) {
4922+
metric_post_exe_thread_active_time_.addStartTimeStamp(0);
4923+
metric_post_exe_thread_idle_time_.finishMeasurement(0);
4924+
}
49114925
if (shouldRunRequestsInParallel) {
49124926
PostExecJob *j = new PostExecJob(ppMsg, requestSet, time, *this);
49134927
postExecThread_.add(j);
@@ -5032,7 +5046,14 @@ void ReplicaImp::executeRequests(PrePrepareMsg *ppMsg, Bitmap &requestSet, Times
50325046
span.setTag("rid", config_.getreplicaId());
50335047
span.setTag("cid", ppMsg->getCid());
50345048
span.setTag("seq_num", ppMsg->seqNumber());
5049+
if (isCurrentPrimary()) {
5050+
metric_consensus_end_to_core_exe_duration_.finishMeasurement(ppMsg->seqNumber());
5051+
metric_core_exe_func_duration_.addStartTimeStamp(ppMsg->seqNumber());
5052+
}
50355053
bftRequestsHandler_->execute(*pAccumulatedRequests, time, ppMsg->getCid(), span);
5054+
if (isCurrentPrimary()) {
5055+
metric_core_exe_func_duration_.finishMeasurement(ppMsg->seqNumber());
5056+
}
50365057
}
50375058
} else {
50385059
LOG_INFO(
@@ -5067,6 +5088,10 @@ void ReplicaImp::executeRequests(PrePrepareMsg *ppMsg, Bitmap &requestSet, Times
50675088
void ReplicaImp::finishExecutePrePrepareMsg(PrePrepareMsg *ppMsg,
50685089
IRequestsHandler::ExecutionRequestsQueue *pAccumulatedRequests) {
50695090
activeExecutions_ = 0;
5091+
if (isCurrentPrimary()) {
5092+
metric_post_exe_thread_idle_time_.addStartTimeStamp(0);
5093+
metric_post_exe_thread_active_time_.finishMeasurement(0);
5094+
}
50705095

50715096
if (pAccumulatedRequests != nullptr) {
50725097
sendResponses(ppMsg, *pAccumulatedRequests);

bftengine/src/bftengine/ReplicaImp.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,10 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {
314314
CounterHandle metric_received_restart_proof_;
315315
PerfMetric<uint64_t> metric_consensus_duration_;
316316
PerfMetric<uint64_t> metric_post_exe_duration_;
317+
PerfMetric<uint64_t> metric_core_exe_func_duration_;
318+
PerfMetric<uint64_t> metric_consensus_end_to_core_exe_duration_;
319+
PerfMetric<uint64_t> metric_post_exe_thread_idle_time_;
320+
PerfMetric<uint64_t> metric_post_exe_thread_active_time_;
317321
PerfMetric<std::string> metric_primary_batching_duration_;
318322
//*****************************************************
319323
RollingAvgAndVar consensus_time_;

bftengine/src/preprocessor/PreProcessor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ PreProcessor::PreProcessor(shared_ptr<MsgsCommunicator> &msgsCommunicator,
363363
metricsComponent_.RegisterAtomicGauge("preProcessingTimeAvg", 0),
364364
metricsComponent_.RegisterAtomicGauge("launchAsyncPreProcessJobTimeAvg", 0),
365365
metricsComponent_.RegisterAtomicGauge("PreProcInFlyRequestsNum", 0)},
366-
metric_pre_exe_duration_{metricsComponent_, "metric_pre_exe_duration_", 10000, true},
366+
metric_pre_exe_duration_{metricsComponent_, "metric_pre_exe_duration_", 10000, 1000, true},
367367
totalPreProcessingTime_(true),
368368
launchAsyncJobTimeAvg_(true),
369369
preExecReqStatusCheckPeriodMilli_(myReplica_.getReplicaConfig().preExecReqStatusCheckTimerMillisec),

client/bftclient/include/bftclient/bft_client.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ typedef std::shared_ptr<bft::communication::ICommunication> SharedCommPtr;
3535

3636
class Client {
3737
public:
38-
Client(SharedCommPtr comm, const ClientConfig& config);
38+
Client(SharedCommPtr comm,
39+
const ClientConfig& config,
40+
std::shared_ptr<concordMetrics::Aggregator> aggregator = nullptr);
3941

4042
void setAggregator(const std::shared_ptr<concordMetrics::Aggregator>& aggregator) {
4143
metrics_.setAggregator(aggregator);

0 commit comments

Comments
 (0)