Skip to content

Commit 01db4f4

Browse files
authored
Merge pull request #2785 from nkumar04/BC-24107
Fix a bug in build db checkpoint metadata from persistence
2 parents bc85838 + 1e9d1fd commit 01db4f4

File tree

3 files changed

+55
-37
lines changed

3 files changed

+55
-37
lines changed

bftengine/include/bftengine/DbCheckpointManager.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ class DbCheckpointManager {
180180
void updateDbCheckpointMetadata();
181181
void updateLastCmdInfo(const SeqNum&, const std::optional<Timestamp>&);
182182
void removeDbCheckpointFuture(CheckpointId);
183+
void builMetadataFromFileSystem();
183184
void updateMetrics();
184185
InternalBftClient* client_{nullptr};
185186
std::atomic<bool> stopped_ = false;

bftengine/src/bftengine/DbCheckpointManager.cpp

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -119,42 +119,7 @@ void DbCheckpointManager::init() {
119119
// if any dbcheckpoints are present in the file system.
120120
if (isMetadataErased_) {
121121
// update metadata for dbcheckpoints from file system
122-
const auto& checkpointDir = dbClient_->getCheckpointPath();
123-
_fs::path path(checkpointDir);
124-
std::vector<_fs::directory_entry> allDBCheckpoints;
125-
126-
try {
127-
if (_fs::exists(path)) {
128-
uint64_t lastCreatedDbCheckpoint = 0;
129-
for (const auto& entry : _fs::directory_iterator(checkpointDir)) {
130-
const auto filenameStr = entry.path().filename().string();
131-
if (_fs::is_directory(entry)) {
132-
allDBCheckpoints.push_back(entry);
133-
const CheckpointId& checkPointId = std::stoi(filenameStr);
134-
const auto blockId = checkPointId;
135-
136-
_fs::file_time_type ftime = _fs::last_write_time(entry.path());
137-
std::time_t cftime = decltype(ftime)::clock::to_time_t(ftime);
138-
const std::chrono::seconds creationTime = std::chrono::seconds(cftime);
139-
{
140-
std::scoped_lock lock(lockLastDbCheckpointDesc_);
141-
lastCreatedCheckpointMetadata_.emplace(DbCheckpointMetadata::DbCheckPointDescriptor{
142-
checkPointId, creationTime, blockId, lastCheckpointSeqNum_});
143-
dbCheckptMetadata_.dbCheckPoints_.insert({checkPointId, lastCreatedCheckpointMetadata_.value()});
144-
}
145-
146-
// extract the last created BlockId from all created dbCheckpoints.
147-
lastCreatedDbCheckpoint = std::max(lastCreatedDbCheckpoint, checkPointId);
148-
numOfDbCheckpointsCreated_++;
149-
}
150-
}
151-
lastDbCheckpointBlockId_.Get().Set(lastCreatedDbCheckpoint);
152-
metrics_.UpdateAggregator();
153-
}
154-
updateMetrics();
155-
} catch (std::exception& e) {
156-
LOG_WARN(getLogger(), "Failed to update checkpoints metadata from checkpoint directory" << e.what());
157-
}
122+
builMetadataFromFileSystem();
158123
} else {
159124
// check if there is chkpt data in persistence
160125
loadCheckpointDataFromPersistence();
@@ -473,4 +438,48 @@ std::map<uint64_t, uint64_t> DbCheckpointManager::getDbSize() {
473438
}
474439
return dbSizeMap;
475440
}
441+
void DbCheckpointManager::builMetadataFromFileSystem() {
442+
// update metadata for dbcheckpoints from file system
443+
const auto& checkpointDir = dbClient_->getCheckpointPath();
444+
_fs::path path(checkpointDir);
445+
446+
try {
447+
if (_fs::exists(path)) {
448+
for (const auto& entry : _fs::directory_iterator(checkpointDir)) {
449+
const auto filenameStr = entry.path().filename().string();
450+
// directory name is last block id in the db checkpoint
451+
// and must be a valid numeric string
452+
if (filenameStr.find_first_not_of("0123456789") != string::npos) {
453+
LOG_WARN(getLogger(), "Invalid file or directory:" << filenameStr << " found in checkpoint folder");
454+
continue;
455+
}
456+
if (_fs::is_directory(entry)) {
457+
const CheckpointId& checkPointId = std::stoi(filenameStr);
458+
const auto blockId = checkPointId;
459+
_fs::file_time_type ftime = _fs::last_write_time(entry.path());
460+
std::time_t cftime = decltype(ftime)::clock::to_time_t(ftime);
461+
const std::chrono::seconds creationTime = std::chrono::seconds(cftime);
462+
{
463+
std::scoped_lock lock(lockLastDbCheckpointDesc_);
464+
const auto checkPointDescriptor = DbCheckpointMetadata::DbCheckPointDescriptor{
465+
checkPointId, creationTime, blockId, lastCheckpointSeqNum_};
466+
dbCheckptMetadata_.dbCheckPoints_.insert({checkPointId, checkPointDescriptor});
467+
}
468+
}
469+
}
470+
471+
if (auto it = dbCheckptMetadata_.dbCheckPoints_.rbegin(); it != dbCheckptMetadata_.dbCheckPoints_.rend()) {
472+
lastCreatedCheckpointMetadata_ = it->second;
473+
dbCheckptMetadata_.lastCmdTimestamp_ = it->second.creationTimeSinceEpoch_;
474+
lastCheckpointCreationTime_ = dbCheckptMetadata_.lastCmdTimestamp_;
475+
lastDbCheckpointBlockId_.Get().Set(it->second.lastBlockId_);
476+
metrics_.UpdateAggregator();
477+
updateMetrics();
478+
}
479+
updateDbCheckpointMetadata();
480+
}
481+
} catch (std::exception& e) {
482+
LOG_WARN(getLogger(), "Failed to update checkpoints metadata from checkpoint directory" << e.what());
483+
}
484+
}
476485
} // namespace bftEngine::impl

tests/apollo/test_skvbc_dbsnapshot.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -728,15 +728,22 @@ async def test_verify_dbcheckpoints_after_scale_with_restart(self, bft_network):
728728
# It will fetch all dbcheckpoints created before and after scale with restart.
729729
client = bft_network.random_client()
730730
op = operator.Operator(bft_network.config, client, bft_network.builddir)
731+
latest_checkpoint_id = 0
732+
snapshot_rep = await op.state_snapshot_req()
733+
resp = cmf_msgs.ReconfigurationResponse.deserialize(snapshot_rep)[0]
734+
snapshot_id = resp.response.data.snapshot_id
731735
rep = await op.get_dbcheckpoint_info_request(bft=False)
732736
data = cmf_msgs.ReconfigurationResponse.deserialize(rep)[0]
733-
self.assertTrue(data.success)
737+
self.assertTrue(data.success)
734738
for r in client.get_rsi_replies().values():
735739
res = cmf_msgs.ReconfigurationResponse.deserialize(r)
736740
self.assertEqual(len(res[0].response.db_checkpoint_info), 2)
737741
dbcheckpoint_info_list = res[0].response.db_checkpoint_info
738742
self.assertTrue(any(dbcheckpoint_info.block_id ==
739743
300 for dbcheckpoint_info in dbcheckpoint_info_list))
744+
for dbcheckpoint_info in dbcheckpoint_info_list:
745+
latest_checkpoint_id = max(latest_checkpoint_id, dbcheckpoint_info.block_id)
746+
assert(latest_checkpoint_id == snapshot_id)
740747

741748

742749
@with_trio
@@ -948,6 +955,7 @@ async def test_restore_from_snapshot_of_other(self, bft_network, tracker):
948955
await bft_network.wait_for_consensus_path(path_type=ConsensusPathType.OPTIMISTIC_FAST,
949956
run_ops=lambda: skvbc.send_n_kvs_sequentially(DB_CHECKPOINT_WIN_SIZE),
950957
threshold=5)
958+
951959

952960
async def validate_stop_on_wedge_point(self, bft_network, skvbc, fullWedge=False):
953961
with log.start_action(action_type="validate_stop_on_stable_checkpoint") as action:

0 commit comments

Comments
 (0)