Skip to content

Commit 0c9d7f6

Browse files
authored
fix: duplicate tx when importing completed tx (#7064)
Description --- When importing a completed transaction (not yet broadcasted), the wallet creates a duplicate of this transaction while scanning UTXOs. Motivation and Context --- Check if the corresponding transaction hasn’t been imported. Use existing tx's `tx_id` instead of creating a duplicate record. How Has This Been Tested? --- Manually What process can a PR reviewer use to test or verify this change? --- 1. Send one-sided tx via non-existing base node 2. Export tx 3. Imported tx to another wallet instance 4. Run this wallet instance and wait until tx is mined. <!-- Checklist --> <!-- 1. Is the title of your PR in the form that would make nice release notes? The title, excluding the conventional commit tag, will be included exactly as is in the CHANGELOG, so please think about it carefully. --> Breaking Changes --- - [x] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [ ] Other - Please specify <!-- Does this include a breaking change? If so, include this line as a footer --> <!-- BREAKING CHANGE: Description what the user should do, e.g. delete a database, resync the chain --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced wallet output recovery by linking recovered outputs to existing transactions using address-based transaction queries. - Added the ability to filter completed transactions by source and destination addresses in transaction history queries. - **Tests** - Introduced new tests for querying completed transactions filtered by addresses. - Updated test setups to include the transaction service handle for improved output recovery support. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent e82e5ff commit 0c9d7f6

File tree

10 files changed

+298
-51
lines changed

10 files changed

+298
-51
lines changed

base_layer/wallet/src/output_manager_service/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use crate::{
6262
service::OutputManagerService,
6363
storage::database::{OutputManagerBackend, OutputManagerDatabase},
6464
},
65+
transaction_service::handle::TransactionServiceHandle,
6566
utxo_scanner_service::handle::UtxoScannerHandle,
6667
};
6768

@@ -126,6 +127,7 @@ where
126127
let connectivity = handles.expect_handle::<WalletConnectivityHandle>();
127128
let key_manager = handles.expect_handle::<TKeyManagerInterface>();
128129
let utxo_scanner_handle = handles.expect_handle::<UtxoScannerHandle>();
130+
let transaction_service_handle = handles.expect_handle::<TransactionServiceHandle>();
129131

130132
let service = OutputManagerService::new(
131133
config,
@@ -140,6 +142,7 @@ where
140142
connectivity,
141143
key_manager,
142144
utxo_scanner_handle,
145+
transaction_service_handle,
143146
)
144147
.await
145148
.expect("Could not initialize Output Manager Service")

base_layer/wallet/src/output_manager_service/recovery/standard_outputs_recoverer.rs

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,34 +42,47 @@ use tari_crypto::keys::SecretKey;
4242
use tari_script::{inputs, script, ExecutionStack, Opcode, TariScript};
4343
use tari_utilities::hex::Hex;
4444

45-
use crate::output_manager_service::{
46-
error::{OutputManagerError, OutputManagerStorageError},
47-
handle::RecoveredOutput,
48-
storage::{
49-
database::{OutputManagerBackend, OutputManagerDatabase},
50-
models::{DbWalletOutput, KnownOneSidedPaymentScript},
51-
OutputSource,
45+
use crate::{
46+
output_manager_service::{
47+
error::{OutputManagerError, OutputManagerStorageError},
48+
handle::RecoveredOutput,
49+
storage::{
50+
database::{OutputManagerBackend, OutputManagerDatabase},
51+
models::{DbWalletOutput, KnownOneSidedPaymentScript},
52+
OutputSource,
53+
},
5254
},
55+
transaction_service::{handle::TransactionServiceHandle, storage::models::CompletedTransaction},
5356
};
5457

5558
const LOG_TARGET: &str = "wallet::output_manager_service::recovery";
5659

5760
pub(crate) struct StandardUtxoRecoverer<TBackend: OutputManagerBackend + 'static, TKeyManagerInterface> {
5861
master_key_manager: TKeyManagerInterface,
5962
db: OutputManagerDatabase<TBackend>,
63+
transaction_service_handle: TransactionServiceHandle,
6064
}
6165

6266
impl<TBackend, TKeyManagerInterface> StandardUtxoRecoverer<TBackend, TKeyManagerInterface>
6367
where
6468
TBackend: OutputManagerBackend + 'static,
6569
TKeyManagerInterface: TransactionKeyManagerInterface,
6670
{
67-
pub fn new(master_key_manager: TKeyManagerInterface, db: OutputManagerDatabase<TBackend>) -> Self {
68-
Self { master_key_manager, db }
71+
pub fn new(
72+
master_key_manager: TKeyManagerInterface,
73+
db: OutputManagerDatabase<TBackend>,
74+
transaction_service_handle: TransactionServiceHandle,
75+
) -> Self {
76+
Self {
77+
master_key_manager,
78+
db,
79+
transaction_service_handle,
80+
}
6981
}
7082

7183
/// Attempt to rewind all of the given transaction outputs into key_manager outputs. If they can be rewound then add
7284
/// them to the database and increment the key manager index
85+
#[allow(clippy::too_many_lines)]
7386
pub async fn scan_and_recover_outputs(
7487
&mut self,
7588
outputs: Vec<(TransactionOutput, Option<TxId>)>,
@@ -145,7 +158,33 @@ where
145158
.await?;
146159
let tx_id = match tx_id {
147160
Some(id) => *id,
148-
None => TxId::new_random(),
161+
None => {
162+
let mut related_txs: Vec<CompletedTransaction> = Vec::new();
163+
let (source_address, recipient_address) = match &db_output.payment_id {
164+
PaymentId::AddressAndData { sender_address, .. } => (Some(sender_address.clone()), None),
165+
PaymentId::TransactionInfo { recipient_address, .. } => (None, Some(recipient_address.clone())),
166+
_ => (None, None),
167+
};
168+
169+
if source_address.is_some() || recipient_address.is_some() {
170+
related_txs = self
171+
.transaction_service_handle
172+
.get_completed_transactions_by_addresses(source_address, recipient_address)
173+
.await
174+
.unwrap_or_default();
175+
}
176+
177+
let tx_id = related_txs.iter().find_map(|tx| {
178+
tx.transaction
179+
.body
180+
.outputs()
181+
.iter()
182+
.find(|tx| tx.commitment == db_output.commitment)
183+
.map(|_| tx.tx_id)
184+
});
185+
186+
tx_id.unwrap_or_else(TxId::new_random)
187+
},
149188
};
150189
let output_hex = db_output.commitment.to_hex();
151190
if let Err(e) = self.db.add_unspent_output_with_tx_id(tx_id, db_output) {

base_layer/wallet/src/output_manager_service/resources.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::{
3030
handle::OutputManagerEventSender,
3131
storage::database::OutputManagerDatabase,
3232
},
33+
transaction_service::handle::TransactionServiceHandle,
3334
utxo_scanner_service::handle::UtxoScannerHandle,
3435
};
3536

@@ -47,4 +48,5 @@ pub(crate) struct OutputManagerResources<TBackend, TWalletConnectivity, TKeyMana
4748
pub interactive_tari_address: TariAddress,
4849
pub one_sided_tari_address: TariAddress,
4950
pub utxo_scanner_handle: UtxoScannerHandle,
51+
pub transaction_service_handle: TransactionServiceHandle,
5052
}

base_layer/wallet/src/output_manager_service/service.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ use crate::{
115115
tasks::TxoValidationTask,
116116
TRANSACTION_INPUTS_LIMIT,
117117
},
118+
transaction_service::handle::TransactionServiceHandle,
118119
utxo_scanner_service::handle::{UtxoScannerEvent, UtxoScannerHandle},
119120
};
120121

@@ -156,6 +157,7 @@ where
156157
connectivity: TWalletConnectivity,
157158
key_manager: TKeyManagerInterface,
158159
utxo_scanner_handle: UtxoScannerHandle,
160+
transaction_service_handle: TransactionServiceHandle,
159161
) -> Result<Self, OutputManagerError> {
160162
let view_key = key_manager.get_view_key().await?;
161163
let spend_key = key_manager.get_spend_key().await?;
@@ -186,6 +188,7 @@ where
186188
one_sided_tari_address,
187189
interactive_tari_address,
188190
utxo_scanner_handle,
191+
transaction_service_handle,
189192
};
190193

191194
Ok(Self {
@@ -486,12 +489,14 @@ where
486489
.await
487490
.map(OutputManagerResponse::Transaction),
488491

489-
OutputManagerRequest::ScanForRecoverableOutputs(outputs) => {
490-
StandardUtxoRecoverer::new(self.resources.key_manager.clone(), self.resources.db.clone())
491-
.scan_and_recover_outputs(outputs)
492-
.await
493-
.map(OutputManagerResponse::RewoundOutputs)
494-
},
492+
OutputManagerRequest::ScanForRecoverableOutputs(outputs) => StandardUtxoRecoverer::new(
493+
self.resources.key_manager.clone(),
494+
self.resources.db.clone(),
495+
self.resources.transaction_service_handle.clone(),
496+
)
497+
.scan_and_recover_outputs(outputs)
498+
.await
499+
.map(OutputManagerResponse::RewoundOutputs),
495500
OutputManagerRequest::ScanOutputs(outputs) => self
496501
.scan_outputs_for_one_sided_payments(outputs)
497502
.await

base_layer/wallet/src/transaction_service/handle.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ pub enum TransactionServiceRequest {
8484
block_hash: Option<FixedHash>,
8585
block_height: Option<u64>,
8686
},
87+
GetCompletedTransactionsByAddresses {
88+
source_address: Option<TariAddress>,
89+
destination_address: Option<TariAddress>,
90+
},
8791
GetCancelledPendingInboundTransactions,
8892
GetCancelledPendingOutboundTransactions,
8993
GetCancelledCompletedTransactions,
@@ -213,6 +217,7 @@ impl fmt::Display for TransactionServiceRequest {
213217
Self::GetPendingInboundTransactions => write!(f, "GetPendingInboundTransactions"),
214218
Self::GetPendingOutboundTransactions => write!(f, "GetPendingOutboundTransactions"),
215219
Self::GetCompletedTransactions { .. } => write!(f, "GetCompletedTransactions"),
220+
Self::GetCompletedTransactionsByAddresses { .. } => write!(f, "GetCompletedTransactionsByAddresses"),
216221
Self::ImportTransaction(tx) => write!(f, "ImportTransaction: {:?}", tx),
217222
Self::GetCancelledPendingInboundTransactions => write!(f, "GetCancelledPendingInboundTransactions"),
218223
Self::GetCancelledPendingOutboundTransactions => write!(f, "GetCancelledPendingOutboundTransactions"),
@@ -984,6 +989,24 @@ impl TransactionServiceHandle {
984989
}
985990
}
986991

992+
pub async fn get_completed_transactions_by_addresses(
993+
&mut self,
994+
source_address: Option<TariAddress>,
995+
destination_address: Option<TariAddress>,
996+
) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {
997+
match self
998+
.handle
999+
.call(TransactionServiceRequest::GetCompletedTransactionsByAddresses {
1000+
source_address,
1001+
destination_address,
1002+
})
1003+
.await??
1004+
{
1005+
TransactionServiceResponse::CompletedTransactions(c) => Ok(c),
1006+
_ => Err(TransactionServiceError::UnexpectedApiResponse),
1007+
}
1008+
}
1009+
9871010
pub async fn get_cancelled_completed_transactions(
9881011
&mut self,
9891012
) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {

base_layer/wallet/src/transaction_service/service.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,13 @@ where
899899
self.db
900900
.get_completed_transactions(payment_id, block_hash, block_height)?,
901901
)),
902+
TransactionServiceRequest::GetCompletedTransactionsByAddresses {
903+
source_address,
904+
destination_address,
905+
} => Ok(TransactionServiceResponse::CompletedTransactions(
906+
self.db
907+
.get_completed_transactions_by_addresses(source_address, destination_address)?,
908+
)),
902909
TransactionServiceRequest::GetCancelledPendingInboundTransactions => {
903910
Ok(TransactionServiceResponse::PendingInboundTransactions(
904911
self.db.get_cancelled_pending_inbound_transactions()?,

base_layer/wallet/src/transaction_service/storage/database.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ pub trait TransactionBackend: Send + Sync + Clone {
162162
block_hash: Option<FixedHash>,
163163
block_height: Option<u64>,
164164
) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;
165+
fn find_completed_transactions_filter_addresses(
166+
&self,
167+
source_address: Option<TariAddress>,
168+
destination_address: Option<TariAddress>,
169+
) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;
165170
}
166171

167172
#[derive(Clone, PartialEq)]
@@ -611,6 +616,15 @@ where T: TransactionBackend + 'static
611616
self.get_completed_transactions_by_cancelled(payment_id, false, block_hash, block_height)
612617
}
613618

619+
pub fn get_completed_transactions_by_addresses(
620+
&self,
621+
source_address: Option<TariAddress>,
622+
destination_address: Option<TariAddress>,
623+
) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
624+
self.db
625+
.find_completed_transactions_filter_addresses(source_address, destination_address)
626+
}
627+
614628
pub fn get_cancelled_completed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
615629
self.get_completed_transactions_by_cancelled(None, true, None, None)
616630
}

base_layer/wallet/src/transaction_service/storage/sqlite_db.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,31 @@ impl TransactionBackend for TransactionServiceSqliteDatabase {
10481048
Ok(coinbases)
10491049
}
10501050

1051+
fn find_completed_transactions_filter_addresses(
1052+
&self,
1053+
source_address: Option<TariAddress>,
1054+
destination_address: Option<TariAddress>,
1055+
) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
1056+
let mut conn = self.database_connection.get_pooled_connection()?;
1057+
let cipher = acquire_read_lock!(self.cipher);
1058+
1059+
let mut query = completed_transactions::table.into_boxed();
1060+
if let Some(source_address) = source_address {
1061+
query = query.filter(completed_transactions::source_address.eq(source_address.to_vec()));
1062+
}
1063+
if let Some(destination_address) = destination_address {
1064+
query = query.filter(completed_transactions::destination_address.eq(destination_address.to_vec()));
1065+
}
1066+
1067+
query
1068+
.load::<CompletedTransactionSql>(&mut conn)?
1069+
.into_iter()
1070+
.map(|ct: CompletedTransactionSql| {
1071+
CompletedTransaction::try_from(ct, &cipher).map_err(TransactionStorageError::from)
1072+
})
1073+
.collect::<Result<Vec<CompletedTransaction>, TransactionStorageError>>()
1074+
}
1075+
10511076
fn find_completed_transactions_filter_payment_id_block_hash(
10521077
&self,
10531078
payment_id: Option<Vec<u8>>,

base_layer/wallet/tests/output_manager_service_tests/service.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ async fn setup_output_manager_service<T: OutputManagerBackend + 'static>(
182182
wallet_connectivity_mock.clone(),
183183
key_manager.clone(),
184184
scanner_handle,
185+
ts_handle.clone(),
185186
)
186187
.await
187188
.unwrap();
@@ -251,6 +252,7 @@ pub async fn setup_oms_with_bn_state<T: OutputManagerBackend + 'static>(
251252
connectivity,
252253
key_manager.clone(),
253254
scanner_handle,
255+
ts_handle.clone(),
254256
)
255257
.await
256258
.unwrap();

0 commit comments

Comments
 (0)