Skip to content

Commit c45052c

Browse files
committed
Fixed client message replay execution after state transfer
1 parent 2155259 commit c45052c

File tree

5 files changed

+83
-30
lines changed

5 files changed

+83
-30
lines changed

bftengine/src/bftengine/ReplicaImp.cpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3073,6 +3073,17 @@ bool ReplicaImp::tryToEnterView() {
30733073
return enteredView;
30743074
}
30753075

3076+
size_t ReplicaImp::clearClientRequestQueue() {
3077+
size_t primaryCombinedReqSize = 0;
3078+
LOG_INFO(GL, "clearing client requests" << KVLOG(requestsQueueOfPrimary.size()));
3079+
// clear requestsQueueOfPrimary
3080+
while (!requestsQueueOfPrimary.empty()) {
3081+
primaryCombinedReqSize += requestsQueueOfPrimary.front()->size();
3082+
requestsQueueOfPrimary.pop();
3083+
}
3084+
return primaryCombinedReqSize;
3085+
}
3086+
30763087
void ReplicaImp::onNewView(const std::vector<PrePrepareMsg *> &prePreparesForNewView) {
30773088
SCOPED_MDC_SEQ_NUM(std::to_string(getCurrentView()));
30783089
SeqNum firstPPSeq = 0;
@@ -3199,11 +3210,7 @@ void ReplicaImp::onNewView(const std::vector<PrePrepareMsg *> &prePreparesForNew
31993210

32003211
requestsOfNonPrimary.clear();
32013212

3202-
// clear requestsQueueOfPrimary
3203-
while (!requestsQueueOfPrimary.empty()) {
3204-
primaryCombinedReqSize -= requestsQueueOfPrimary.front()->size();
3205-
requestsQueueOfPrimary.pop();
3206-
}
3213+
primaryCombinedReqSize -= clearClientRequestQueue();
32073214

32083215
primary_queue_size_.Get().Set(requestsQueueOfPrimary.size());
32093216

@@ -3296,6 +3303,8 @@ void ReplicaImp::onTransferringCompleteImp(uint64_t newStateCheckpoint) {
32963303
time_in_state_transfer_.end();
32973304
LOG_INFO(GL, KVLOG(newStateCheckpoint));
32983305
requestsOfNonPrimary.clear();
3306+
clearClientRequestQueue();
3307+
32993308
if (ps_) {
33003309
ps_->beginWriteTran();
33013310
}

bftengine/src/bftengine/ReplicaImp.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,7 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {
633633
void addTimers();
634634
void startConsensusProcess(PrePrepareMsgUPtr pp, bool isCreatedEarlier);
635635
void startConsensusProcess(PrePrepareMsgUPtr pp);
636+
void clearClientRequestQueue();
636637
/**
637638
* Updates both seqNumInfo and slow_path metric
638639
* @param seqNumInfo

tests/simpleKVBC/TesterReplica/internalCommandsHandler.cpp

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <unistd.h>
2121
#include <algorithm>
2222
#include <variant>
23+
#include <nlohmann/json.hpp>
2324
#include "ReplicaConfig.hpp"
2425
#include "kvbc_key_types.hpp"
2526

@@ -64,6 +65,34 @@ static const std::string &keyHashToCategory(const Hash &keyHash) {
6465

6566
static const std::string &keyToCategory(const std::string &key) { return keyHashToCategory(createHash(key)); }
6667

68+
InternalCommandsHandler::InternalCommandsHandler(concord::kvbc::IReader *storage,
69+
concord::kvbc::IBlockAdder *blocksAdder,
70+
concord::kvbc::IBlockMetadata *blockMetadata,
71+
logging::Logger &logger,
72+
bftEngine::IStateTransfer &st,
73+
bool addAllKeysAsPublic,
74+
concord::kvbc::adapter::ReplicaBlockchain *kvbc)
75+
: m_storage(storage),
76+
m_blockAdder(blocksAdder),
77+
m_blockMetadata(blockMetadata),
78+
m_logger(logger),
79+
m_addAllKeysAsPublic{addAllKeysAsPublic},
80+
m_kvbc{kvbc} {
81+
st.addOnTransferringCompleteCallback([this](uint64_t) {
82+
LOG_INFO(GL, "Synchronizing client execution state after state transfer");
83+
auto data = m_storage->getLatest(CLIENT_STATE_CAT_ID, {0x1});
84+
ConcordAssert(data.has_value());
85+
auto raw_json = std::get<concord::kvbc::categorization::VersionedValue>(data.value()).data;
86+
nlohmann::json json2 = nlohmann::json::parse(raw_json);
87+
nlohmann::json::json_serializer<std::unordered_map<uint16_t, uint64_t>, void> serializer;
88+
serializer.from_json(json2, m_clientToMaxExecutedReqId);
89+
LOG_INFO(GL, "raw client state: " << KVLOG(raw_json));
90+
});
91+
if (m_addAllKeysAsPublic) {
92+
ConcordAssertNE(m_kvbc, nullptr);
93+
}
94+
}
95+
6796
void InternalCommandsHandler::add(std::string &&key,
6897
std::string &&value,
6998
VersionedUpdates &verUpdates,
@@ -318,7 +347,15 @@ void InternalCommandsHandler::writeAccumulatedBlock(ExecutionRequestsQueue &bloc
318347
"SKVBCWrite message handled; writesCounter=" << m_writesCounter << " currBlock=" << write_rep.latest_block);
319348
}
320349
}
321-
addBlock(verUpdates, merkleUpdates, sn);
350+
351+
nlohmann::json json;
352+
nlohmann::json::json_serializer<std::unordered_map<uint16_t, uint64_t>, void> serializer;
353+
serializer.to_json(json, m_clientToMaxExecutedReqId);
354+
355+
VersionedUpdates clientStateUpdate;
356+
clientStateUpdate.addUpdate({0x1}, json.dump());
357+
358+
addBlock(verUpdates, merkleUpdates, clientStateUpdate, sn);
322359
}
323360

324361
OperationResult InternalCommandsHandler::verifyWriteCommand(uint32_t requestSize,
@@ -364,7 +401,10 @@ void InternalCommandsHandler::addKeys(const SKVBCWriteRequest &writeReq,
364401
addMetadataKeyValue(verUpdates, sequenceNum);
365402
}
366403

367-
void InternalCommandsHandler::addBlock(VersionedUpdates &verUpdates, BlockMerkleUpdates &merkleUpdates, uint64_t sn) {
404+
void InternalCommandsHandler::addBlock(VersionedUpdates &verUpdates,
405+
BlockMerkleUpdates &merkleUpdates,
406+
VersionedUpdates &clientStateVerUpdates,
407+
uint64_t sn) {
368408
BlockId currBlock = m_storage->getLastBlockId();
369409
Updates updates;
370410

@@ -396,6 +436,7 @@ void InternalCommandsHandler::addBlock(VersionedUpdates &verUpdates, BlockMerkle
396436
updates.add(kConcordInternalCategoryId, std::move(internal_updates));
397437
updates.add(VERSIONED_KV_CAT_ID, std::move(verUpdates));
398438
updates.add(BLOCK_MERKLE_CAT_ID, std::move(merkleUpdates));
439+
updates.add(CLIENT_STATE_CAT_ID, std::move(clientStateVerUpdates));
399440
const auto newBlockId = m_blockAdder->add(std::move(updates));
400441
ConcordAssert(newBlockId == currBlock + 1);
401442
}
@@ -483,29 +524,35 @@ OperationResult InternalCommandsHandler::executeWriteCommand(uint32_t requestSiz
483524
hasConflict = hasConflict || (!isFirstClientRequest && m_clientToMaxExecutedReqId[clientId] >= requestId);
484525
}
485526

527+
SKVBCReply reply;
528+
reply.reply = SKVBCWriteReply();
529+
SKVBCWriteReply &write_rep = std::get<SKVBCWriteReply>(reply.reply);
530+
write_rep.success = !hasConflict;
486531
if (!hasConflict) {
532+
write_rep.latest_block = currBlock + 1;
533+
auto [iter, isNew] = m_clientToMaxExecutedReqId.emplace(clientId, 0);
534+
UNUSED(iter);
535+
UNUSED(isNew);
536+
m_clientToMaxExecutedReqId[clientId] = std::max(m_clientToMaxExecutedReqId[clientId], batchCid);
537+
487538
if (isBlockAccumulationEnabled) {
488539
// If Block Accumulation is enabled then blocks are added after all requests are processed
489540
addKeys(write_req, sequenceNum, blockAccumulatedVerUpdates, blockAccumulatedMerkleUpdates);
490541
} else {
491542
// If Block Accumulation is not enabled then blocks are added after all requests are processed
492543
VersionedUpdates verUpdates;
493544
BlockMerkleUpdates merkleUpdates;
545+
VersionedUpdates clientVerUpdates;
546+
nlohmann::json json;
547+
nlohmann::json::json_serializer<std::unordered_map<uint16_t, uint64_t>, void> serializer;
548+
serializer.to_json(json, m_clientToMaxExecutedReqId);
549+
auto dump = json.dump();
550+
LOG_INFO(GL, KVLOG(dump));
551+
clientVerUpdates.addUpdate({0x1}, json.dump());
494552
addKeys(write_req, sequenceNum, verUpdates, merkleUpdates);
495-
addBlock(verUpdates, merkleUpdates, sequenceNum);
553+
addBlock(verUpdates, merkleUpdates, clientVerUpdates, sequenceNum);
496554
}
497-
}
498555

499-
SKVBCReply reply;
500-
reply.reply = SKVBCWriteReply();
501-
SKVBCWriteReply &write_rep = std::get<SKVBCWriteReply>(reply.reply);
502-
write_rep.success = !hasConflict;
503-
if (!hasConflict) {
504-
write_rep.latest_block = currBlock + 1;
505-
auto [iter, isNew] = m_clientToMaxExecutedReqId.emplace(clientId, 0);
506-
UNUSED(iter);
507-
UNUSED(isNew);
508-
m_clientToMaxExecutedReqId[clientId] = std::max(m_clientToMaxExecutedReqId[clientId], batchCid);
509556
} else {
510557
write_rep.latest_block = currBlock;
511558
}

tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,18 @@
3131

3232
static const std::string VERSIONED_KV_CAT_ID{concord::kvbc::categorization::kExecutionPrivateCategory};
3333
static const std::string BLOCK_MERKLE_CAT_ID{concord::kvbc::categorization::kExecutionProvableCategory};
34+
static constexpr const char *clientReplyStateCategory = "client_state";
35+
static const std::string CLIENT_STATE_CAT_ID{clientReplyStateCategory};
3436

3537
class InternalCommandsHandler : public concord::kvbc::ICommandsHandler {
3638
public:
3739
InternalCommandsHandler(concord::kvbc::IReader *storage,
3840
concord::kvbc::IBlockAdder *blocksAdder,
3941
concord::kvbc::IBlockMetadata *blockMetadata,
4042
logging::Logger &logger,
43+
bftEngine::IStateTransfer &st,
4144
bool addAllKeysAsPublic = false,
42-
concord::kvbc::adapter::ReplicaBlockchain *kvbc = nullptr)
43-
: m_storage(storage),
44-
m_blockAdder(blocksAdder),
45-
m_blockMetadata(blockMetadata),
46-
m_logger(logger),
47-
m_addAllKeysAsPublic{addAllKeysAsPublic},
48-
m_kvbc{kvbc} {
49-
if (m_addAllKeysAsPublic) {
50-
ConcordAssertNE(m_kvbc, nullptr);
51-
}
52-
}
45+
concord::kvbc::adapter::ReplicaBlockchain *kvbc = nullptr);
5346

5447
void execute(ExecutionRequestsQueue &requests,
5548
std::optional<bftEngine::Timestamp> timestamp,
@@ -146,6 +139,7 @@ class InternalCommandsHandler : public concord::kvbc::ICommandsHandler {
146139
uint64_t sn);
147140
void addBlock(concord::kvbc::categorization::VersionedUpdates &verUpdates,
148141
concord::kvbc::categorization::BlockMerkleUpdates &merkleUpdates,
142+
concord::kvbc::categorization::VersionedUpdates &clientStateUpdates,
149143
uint64_t sn);
150144
void addKeys(const skvbc::messages::SKVBCWriteRequest &writeReq,
151145
uint64_t sequenceNum,

tests/simpleKVBC/TesterReplica/main.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ void run_replica(int argc, char** argv) {
130130
setup->GetPerformanceManager(),
131131
std::map<std::string, categorization::CATEGORY_TYPE>{
132132
{VERSIONED_KV_CAT_ID, categorization::CATEGORY_TYPE::versioned_kv},
133+
{CLIENT_STATE_CAT_ID, categorization::CATEGORY_TYPE::versioned_kv},
133134
{categorization::kExecutionEventGroupLatestCategory, categorization::CATEGORY_TYPE::versioned_kv},
134135
{BLOCK_MERKLE_CAT_ID, categorization::CATEGORY_TYPE::block_merkle}},
135136
setup->GetSecretManager());
@@ -151,6 +152,7 @@ void run_replica(int argc, char** argv) {
151152
replica.get(),
152153
blockMetadata,
153154
logger,
155+
replica->getStateTransfer(),
154156
setup->AddAllKeysAsPublic(),
155157
replica->kvBlockchain() ? &replica->kvBlockchain().value() : nullptr);
156158
replica->set_command_handler(cmdHandler);

0 commit comments

Comments
 (0)