Skip to content

Commit a2b8a93

Browse files
feat: add payref background task (#7280)
Description --- Added background task to rebuild payref indexes based on a payref rebuild status in the metadata database, rather than doing it as a migration task at stsrtup. Motivation and Context --- Payref migration takes very long time to complete and is not essential to base node operation. How Has This Been Tested? --- System-level testing [**TBD**] What process can a PR reviewer use to test or verify this change? --- - Code review - System-level testing <!-- Checklist --> <!-- 1. Is the title of your PR in the form that would make nice release notes? The title, excluding the conventional commit tag, will be included exactly as is in the CHANGELOG, so please think about it carefully. --> Breaking Changes --- - [x] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [ ] Other - Please specify <!-- Does this include a breaking change? If so, include this line as a footer --> <!-- BREAKING CHANGE: Description what the user should do, e.g. delete a database, resync the chain --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added background rebuilding of PayRef indexes, improving index availability and tracking rebuild progress. * Introduced explicit status tracking for PayRef index rebuilds, with user-facing error reporting if indexes are unavailable. * **Bug Fixes** * Enhanced error messages to provide more context when PayRef indexes are not available. * **Chores** * Improved logging and internal management of PayRef index rebuilds. * Updated public exports to include PayRef rebuild status for easier access in other modules. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: SW van Heerden <swvheerden@gmail.com>
1 parent 4bbc859 commit a2b8a93

File tree

8 files changed

+346
-91
lines changed

8 files changed

+346
-91
lines changed

base_layer/core/src/chain_storage/blockchain_backend.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ use tari_common_types::{
66
types::{BadBlock, CompressedCommitment, CompressedPublicKey, FixedHash, HashOutput, Signature},
77
};
88

9-
use super::{lmdb_db::lmdb_tree_reader::OwnedLmdbTreeReader, MinedInfo, TemplateRegistrationEntry};
9+
use super::{
10+
lmdb_db::lmdb_tree_reader::OwnedLmdbTreeReader,
11+
MinedInfo,
12+
PayrefRebuildStatus,
13+
TemplateRegistrationEntry,
14+
};
1015
use crate::{
1116
blocks::{Block, BlockAccumulatedData, BlockHeader, BlockHeaderAccumulatedData, ChainBlock, ChainHeader},
1217
chain_storage::{
@@ -34,7 +39,7 @@ use crate::{
3439
/// us to keep the reading and writing API extremely simple. Extending the types of data that the backends can handle
3540
/// will entail adding to those enums, and the backends, while this trait can remain unchanged.
3641
#[allow(clippy::ptr_arg)]
37-
pub trait BlockchainBackend: Send + Sync {
42+
pub trait BlockchainBackend: Send + Sync + 'static {
3843
/// Commit the transaction given to the backend. If there is an error, the transaction must be rolled back, and
3944
/// the error condition returned. On success, every operation in the transaction will have been committed, and
4045
/// the function will return `Ok(())`.
@@ -138,6 +143,16 @@ pub trait BlockchainBackend: Send + Sync {
138143
fn fetch_tip_header(&self) -> Result<ChainHeader, ChainStorageError>;
139144
/// Returns the stored chain metadata.
140145
fn fetch_chain_metadata(&self) -> Result<ChainMetadata, ChainStorageError>;
146+
/// Returns the stored payref rebuild status.
147+
fn fetch_payref_rebuild_status(&self) -> Result<PayrefRebuildStatus, ChainStorageError>;
148+
/// Builds the payref indexes for a given block height, with stats.
149+
fn build_payref_indexes_for_height(
150+
&self,
151+
height: u64,
152+
metadata_at_start: ChainMetadata,
153+
initialize_stats: Option<u64>,
154+
finalize: bool,
155+
) -> Result<PayrefRebuildStatus, ChainStorageError>;
141156
/// Returns the UTXO count
142157
fn utxo_count(&self) -> Result<usize, ChainStorageError>;
143158
/// Returns the kernel count

base_layer/core/src/chain_storage/blockchain_database.rs

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use tari_hashing::TransactionHashDomain;
5959
use tari_mmr::pruned_hashset::PrunedHashSet;
6060
use tari_utilities::{epoch_time::EpochTime, hex::Hex, ByteArray};
6161

62-
use super::{smt_hasher::SmtHasher, MinedInfo, TemplateRegistrationEntry};
62+
use super::{smt_hasher::SmtHasher, MinedInfo, PayrefRebuildStatus, TemplateRegistrationEntry};
6363
use crate::{
6464
block_output_mr_hash_from_pruned_mmr,
6565
blocks::{
@@ -352,6 +352,88 @@ where B: BlockchainBackend
352352
self.clear_all_reorgs()?;
353353
}
354354

355+
self.rebuild_payref_indexes_background_task()?;
356+
357+
Ok(())
358+
}
359+
360+
/// This function will rebuild the payref indexes in the background if they are not already rebuilt.
361+
pub fn rebuild_payref_indexes_background_task(&self) -> Result<(), ChainStorageError> {
362+
let initial_status = {
363+
let db = self.db_read_access()?;
364+
db.fetch_payref_rebuild_status()?
365+
};
366+
if initial_status.is_rebuilt {
367+
debug!(target: LOG_TARGET, "[PayRef] Payref indexes has already been rebuilt.");
368+
return Ok(());
369+
}
370+
371+
// If we had a previous start metadata, we will use that to continue the rebuild, otherwise we will use the
372+
// current chain metadata to set a new target rebuild height. All new or re-orged blocks added to the database
373+
// after this process started will have the correct payref indexes and therefor do not need to be processed.
374+
let metadata_at_start = if let Some(metadata) = initial_status.metadata_at_start.clone() {
375+
metadata
376+
} else {
377+
let db = self.db_read_access()?;
378+
db.fetch_chain_metadata()?
379+
};
380+
let db_rw_lock = self.db.clone();
381+
382+
tokio::task::spawn(async move {
383+
let start_height = initial_status.last_rebuild_height.unwrap_or_default();
384+
let mut last_status = initial_status.clone();
385+
debug!(
386+
target: LOG_TARGET,
387+
"[PayRef] Starting index rebuilding for heights {} to {}",
388+
start_height, metadata_at_start.best_block_height()
389+
);
390+
391+
let mut initialize_stats = Some(metadata_at_start.best_block_height());
392+
for height in start_height..=metadata_at_start.best_block_height() {
393+
let finalize = height == metadata_at_start.best_block_height();
394+
let metadata = metadata_at_start.clone();
395+
let db = db_rw_lock.clone();
396+
// We use `spawn_blocking` with `.await` here to ensure that the async spawned task will be able to
397+
// shut down when base node shutdown is triggered
398+
let res = tokio::task::spawn_blocking(move || {
399+
process_payref_for_height(db, height, metadata, initialize_stats, finalize)
400+
})
401+
.await;
402+
match res {
403+
Ok(Ok(current_status)) => {
404+
last_status = current_status;
405+
},
406+
Ok(Err(e)) => {
407+
error!(
408+
target: LOG_TARGET,
409+
"[PayRef] Index rebuilding failed. Initial status: {:?}. Last updated status: {:?} ({})",
410+
initial_status, last_status, e
411+
);
412+
break;
413+
},
414+
Err(e) => {
415+
error!(
416+
target: LOG_TARGET,
417+
"[PayRef] Index rebuilding failed. Initial status: {:?}. Last updated status: {:?} ({})",
418+
initial_status, last_status, e
419+
);
420+
break;
421+
},
422+
}
423+
if initialize_stats.is_some() {
424+
initialize_stats = None;
425+
}
426+
if finalize || last_status.is_rebuilt {
427+
debug!(
428+
target: LOG_TARGET,
429+
"[PayRef] Starting index rebuilding completed, Final status: {:?}",
430+
last_status
431+
);
432+
break;
433+
}
434+
}
435+
});
436+
355437
Ok(())
356438
}
357439

@@ -2651,6 +2733,34 @@ fn convert_to_option_bounds<T: RangeBounds<u64>>(bounds: T) -> (Option<u64>, Opt
26512733
(start, end)
26522734
}
26532735

2736+
/// Process a batch of outputs in one block for PayRef migration
2737+
fn process_payref_for_height<B: BlockchainBackend>(
2738+
db: Arc<RwLock<B>>,
2739+
height: u64,
2740+
metadata_at_start: ChainMetadata,
2741+
initialize_stats: Option<u64>,
2742+
finalize: bool,
2743+
) -> Result<PayrefRebuildStatus, ChainStorageError> {
2744+
debug!(target: LOG_TARGET, "[PayRef] Processing index rebuilding for height {}", height);
2745+
2746+
let write_txn = db
2747+
.write()
2748+
.map_err(|_e| ChainStorageError::AccessError("Write lock on blockchain backend failed".into()))?;
2749+
2750+
let status =
2751+
write_txn.build_payref_indexes_for_height(height, metadata_at_start.clone(), initialize_stats, finalize)?;
2752+
2753+
if finalize || status.is_rebuilt {
2754+
debug!(
2755+
target: LOG_TARGET,
2756+
"[PayRef] Finalized index rebuilding for heights {} to {}",
2757+
metadata_at_start.best_block_height(), height
2758+
);
2759+
}
2760+
2761+
Ok(status)
2762+
}
2763+
26542764
#[cfg(test)]
26552765
mod test {
26562766
use std::{collections::HashMap, sync};

base_layer/core/src/chain_storage/error.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ pub enum ChainStorageError {
6666
InsertError { table: &'static str, error: String },
6767
#[error("An invalid query was attempted: {0}")]
6868
InvalidQuery(String),
69+
#[error(
70+
"PayRef index not available: current `{current_height}`, start `{start_height}`, target `{target_height}`"
71+
)]
72+
PayRefIndexNotAvailable {
73+
current_height: u64,
74+
start_height: u64,
75+
target_height: u64,
76+
},
6977
#[error("Invalid argument `{arg}` in `{func}`: {message}")]
7078
InvalidArguments {
7179
func: &'static str,
@@ -204,7 +212,8 @@ impl ChainStorageError {
204212
_err @ ChainStorageError::InvalidChainMetaData(_) |
205213
_err @ ChainStorageError::OutOfRange |
206214
_err @ ChainStorageError::MrHashError(_) |
207-
_err @ ChainStorageError::JellyfishMerkleTreeError(_) => None,
215+
_err @ ChainStorageError::JellyfishMerkleTreeError(_) |
216+
_err @ ChainStorageError::PayRefIndexNotAvailable { .. } => None,
208217
}
209218
}
210219
}

base_layer/core/src/chain_storage/lmdb_db/lmdb.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,7 @@ where
6565
{
6666
let val_buf = serialize(val, None)?;
6767
match txn.access().put(db, key, &val_buf, put::NOOVERWRITE) {
68-
Ok(_) => {
69-
trace!(
70-
target: LOG_TARGET, "Inserted {} bytes with key '{}' into '{}'",
71-
val_buf.len(), to_hex(key.as_lmdb_bytes()), table_name
72-
);
73-
Ok(())
74-
},
68+
Ok(_) => Ok(()),
7569
err @ Err(lmdb_zero::Error::Code(lmdb_zero::error::KEYEXIST)) => {
7670
error!(
7771
target: LOG_TARGET, "Could not insert {} bytes with key '{}' into '{}' ({:?})",

0 commit comments

Comments
 (0)