Skip to content

Commit b837f98

Browse files
committed
ExtensionAppender refactoring
1 parent af1b204 commit b837f98

File tree

8 files changed

+146
-116
lines changed

8 files changed

+146
-116
lines changed

node/src/main/scala/com/wavesplatform/state/ParSignatureChecker.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import com.wavesplatform.utils.Schedulers
77
import monix.eval.Task
88
import monix.execution.schedulers.SchedulerService
99

10+
/** Executes signature verification in advance using parallel computation.
11+
* The result is cached in a lazy value: if precomputation completes in time,
12+
* the cached result is reused; otherwise, it is computed on demand.
13+
*/
1014
object ParSignatureChecker {
1115
implicit val sigverify: SchedulerService = Schedulers.fixedPool(4, "sigverify")
1216

node/src/main/scala/com/wavesplatform/state/appender/ExtensionAppender.scala

Lines changed: 119 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package com.wavesplatform.state.appender
22

3+
import cats.data.OptionT
4+
import cats.syntax.all.*
5+
import com.wavesplatform.block.Block
6+
import com.wavesplatform.block.Block.BlockId
7+
import com.wavesplatform.common.state.ByteStr
38
import com.wavesplatform.common.utils.EitherExt2.*
49
import com.wavesplatform.consensus.PoSSelector
510
import com.wavesplatform.features.BlockchainFeatures
@@ -17,9 +22,11 @@ import monix.eval.Task
1722
import monix.execution.Scheduler
1823
import org.influxdb.dto.Point
1924

20-
import scala.util.{Left, Right}
25+
import scala.annotation.tailrec
26+
import scala.util.chaining.*
2127

2228
object ExtensionAppender extends ScorexLogging {
29+
private case class AppendData(newBlocks: Seq[Block], lastCommonBlockId: BlockId, lastCommonHeight: Int)
2330

2431
def apply(
2532
blockchainUpdater: BlockchainUpdater & Blockchain,
@@ -29,111 +36,125 @@ object ExtensionAppender extends ScorexLogging {
2936
invalidBlocks: InvalidBlockStorage,
3037
peerDatabase: PeerDatabase,
3138
scheduler: Scheduler
32-
)(ch: Channel, extensionBlocks: ExtensionBlocks): Task[Either[ValidationError, Option[BigInt]]] = {
33-
def appendExtension(extension: ExtensionBlocks): Either[ValidationError, Option[BigInt]] =
39+
)(ch: Channel, extension: ExtensionBlocks): Task[Either[ValidationError, Option[BigInt]]] = {
40+
type Result[A] = Either[ValidationError, A]
41+
def appendExtension(): OptionT[Result, BigInt] =
3442
if (extension.remoteScore <= blockchainUpdater.score) {
3543
log.trace(s"Ignoring extension $extension because declared remote was not greater than local score ${blockchainUpdater.score}")
36-
Right(None)
37-
} else {
38-
extension.blocks
39-
.collectFirst { case b if !b.signatureValid() => GenericError(s"Block $b has invalid signature") }
40-
.toLeft(extension)
41-
.flatMap { extensionWithValidSignatures =>
42-
val newBlocks = extensionWithValidSignatures.blocks.dropWhile(blockchainUpdater.contains)
43-
44-
newBlocks.headOption.map(_.header.reference) match {
45-
case Some(lastCommonBlockId) =>
46-
val initialHeight = blockchainUpdater.height
47-
48-
val droppedBlocksEi = for {
49-
commonBlockHeight <- blockchainUpdater.heightOf(lastCommonBlockId).toRight(GenericError("Fork contains no common parent"))
50-
droppedBlocks <- {
51-
if (commonBlockHeight < initialHeight)
52-
blockchainUpdater.removeAfter(lastCommonBlockId)
53-
else Right(Seq.empty)
54-
}
55-
} yield (commonBlockHeight, droppedBlocks)
56-
57-
droppedBlocksEi.flatMap { case (commonBlockHeight, droppedBlocks) =>
58-
newBlocks.zipWithIndex.foreach { case (block, idx) =>
59-
val rideV6Activated = blockchainUpdater.isFeatureActivated(BlockchainFeatures.RideV6, commonBlockHeight + idx + 1)
60-
ParSignatureChecker.checkTxSignatures(block.transactionData, rideV6Activated)
61-
}
62-
63-
val forkApplicationResultEi = {
64-
newBlocks.view
65-
.map { b =>
66-
b -> appendExtensionBlock(blockchainUpdater, pos, time, verify = true, txSignParCheck = false)(
67-
b,
68-
extension.snapshots.get(b.id())
69-
)
70-
.map {
71-
case (_: Applied, height) => BlockStats.applied(b, BlockStats.Source.Ext, height)
72-
case _ =>
73-
}
74-
}
75-
.zipWithIndex
76-
.collectFirst { case ((b, Left(e)), i) => (i, b, e) }
77-
.fold[Either[ValidationError, Unit]](Right(())) { case (i, declinedBlock, e) =>
78-
e match {
79-
case _: TxValidationError.BlockFromFuture =>
80-
case _ => invalidBlocks.add(declinedBlock.id(), e)
81-
}
82-
83-
newBlocks.view
84-
.dropWhile(_ != declinedBlock)
85-
.foreach(BlockStats.declined(_, BlockStats.Source.Ext))
86-
87-
if (i == 0) log.warn(s"Can't process fork starting with $lastCommonBlockId, error appending block $declinedBlock: $e")
88-
else
89-
log.warn(
90-
s"Processed only ${i + 1} of ${newBlocks.size} blocks from extension, error appending next block $declinedBlock: $e"
91-
)
92-
93-
Left(e)
94-
}
95-
}
96-
97-
forkApplicationResultEi match {
98-
case Left(e) =>
99-
blockchainUpdater.removeAfter(lastCommonBlockId).explicitGet()
100-
droppedBlocks.foreach { x =>
101-
blockchainUpdater.processBlock(x.block, x.hitSource, x.snapshot, x.generatorSet).explicitGet()
102-
}
103-
Left(e)
104-
105-
case Right(_) =>
106-
val depth = initialHeight - commonBlockHeight
107-
if (depth > 0) {
108-
Metrics.write(
109-
Point
110-
.measurement("rollback")
111-
.addField("depth", initialHeight - commonBlockHeight)
112-
.addField("txs", droppedBlocks.size)
113-
)
114-
}
115-
116-
val newTransactions = newBlocks.view.flatMap(_.transactionData).toSet
117-
utxStorage.removeAll(newTransactions)
118-
utxStorage.addAndScheduleCleanup(droppedBlocks.flatMap(_._1.transactionData).filterNot(newTransactions))
119-
Right(Some(blockchainUpdater.score))
120-
}
121-
}
122-
123-
case None =>
124-
log.debug("No new blocks found in extension")
125-
Right(None)
126-
}
44+
OptionT.none[Result, BigInt]
45+
} else
46+
for {
47+
_ <- OptionT.liftF(validateSignatures())
48+
appendData <- OptionT.fromOption[Result](dropCommonPrefix())
49+
_ <- OptionT.liftF(processNewBlocks(appendData))
50+
} yield blockchainUpdater.score
51+
52+
def validateSignatures(): Either[ValidationError, ExtensionBlocks] =
53+
extension.blocks
54+
.collectFirst { case b if !b.signatureValid() => GenericError(s"Block $b has invalid signature") }
55+
.toLeft(extension)
56+
57+
def dropCommonPrefix(): Option[AppendData] = {
58+
@tailrec def loop(last: AppendData): Option[AppendData] = last.newBlocks match {
59+
case Nil =>
60+
log.debug("No new blocks found in extension")
61+
None
62+
63+
case b +: rest =>
64+
blockchainUpdater.heightOf(b.id()) match {
65+
case None => last.some
66+
case Some(h) => loop(AppendData(rest, b.id(), h))
12767
}
12868
}
12969

130-
log.debug(s"${id(ch)} Attempting to append extension ${formatBlocks(extensionBlocks.blocks)}")
131-
Task(appendExtension(extensionBlocks)).executeOn(scheduler).map {
70+
loop(
71+
AppendData(
72+
extension.blocks,
73+
blockchainUpdater.lastBlockId.getOrElse(throw new RuntimeException("Empty blockchain")),
74+
blockchainUpdater.height
75+
)
76+
)
77+
}
78+
79+
def processNewBlocks(appendData: AppendData): Either[ValidationError, Unit] = {
80+
val originalForkHeight = blockchainUpdater.height
81+
for {
82+
discardedBlocks <-
83+
if (appendData.lastCommonHeight < originalForkHeight) blockchainUpdater.removeAfter(appendData.lastCommonBlockId)
84+
else Right(Seq.empty)
85+
_ = precheckSignatures(appendData)
86+
_ <- applyFork(appendData).tap {
87+
case Left(_) => restoreDiscardedBlocks(appendData.lastCommonBlockId, discardedBlocks)
88+
case Right(_) =>
89+
val depth = originalForkHeight - appendData.lastCommonHeight
90+
if (depth > 0)
91+
Metrics.write(
92+
Point
93+
.measurement("rollback")
94+
.addField("depth", depth)
95+
.addField("txs", discardedBlocks.size)
96+
)
97+
98+
val newTxs = appendData.newBlocks.flatMap(_.transactionData)
99+
val newTxIds = newTxs.view.map(_.id()).toSet
100+
utxStorage.removeIds(newTxIds)
101+
102+
val discardedTxs = discardedBlocks.flatMap(_._1.transactionData)
103+
utxStorage.addAndScheduleCleanup(discardedTxs.filterNot(tx => newTxIds.contains(tx.id()))) // In a case of re-appending issues
104+
}
105+
} yield ()
106+
}
107+
108+
def precheckSignatures(appendData: AppendData): Unit =
109+
appendData.newBlocks.zipWithIndex.foreach { case (block, idx) =>
110+
val rideV6Activated = blockchainUpdater.isFeatureActivated(BlockchainFeatures.RideV6, appendData.lastCommonHeight + idx + 1)
111+
ParSignatureChecker.checkTxSignatures(block.transactionData, rideV6Activated)
112+
}
113+
114+
def applyFork(appendData: AppendData): Either[ValidationError, Unit] = appendData.newBlocks.view
115+
.map { b =>
116+
val s = extension.snapshots.get(b.id())
117+
val r = appendExtensionBlock(blockchainUpdater, pos, time, verify = true, txSignParCheck = false)(b, s).map {
118+
case (_: Applied, height) => BlockStats.applied(b, BlockStats.Source.Ext, height)
119+
case _ =>
120+
}
121+
b -> r
122+
}
123+
.zipWithIndex
124+
.collectFirst { case ((b, Left(e)), i) => (i, b, e) }
125+
.fold(Either.unit[ValidationError]) { case (i, declinedBlock, e) =>
126+
e match {
127+
case _: TxValidationError.BlockFromFuture =>
128+
case _ => invalidBlocks.add(declinedBlock.id(), e)
129+
}
130+
131+
appendData.newBlocks.view
132+
.dropWhile(_ != declinedBlock)
133+
.foreach(BlockStats.declined(_, BlockStats.Source.Ext))
134+
135+
log.warn(
136+
if (i == 0) s"Can't process fork starting with ${appendData.lastCommonBlockId}, error appending block $declinedBlock: $e"
137+
else s"Processed only ${i + 1} of ${appendData.newBlocks.size} blocks from extension, error appending next block $declinedBlock: $e"
138+
)
139+
140+
Left(e)
141+
}
142+
143+
def restoreDiscardedBlocks(lastCommonBlockId: ByteStr, blocks: DiscardedBlocks): Unit = {
144+
blockchainUpdater.removeAfter(lastCommonBlockId).explicitGet()
145+
blocks.foreach { x =>
146+
blockchainUpdater.processBlock(x.block, x.hitSource, x.snapshot, x.generatorSet).explicitGet()
147+
}
148+
}
149+
150+
val formattedBlocksStr = formatBlocks(extension.blocks)
151+
log.debug(s"${id(ch)} Attempting to append extension $formattedBlocksStr")
152+
Task(appendExtension().value).executeOn(scheduler).map {
132153
case Right(maybeNewScore) =>
133-
log.debug(s"${id(ch)} Successfully appended extension ${formatBlocks(extensionBlocks.blocks)}")
154+
log.debug(s"${id(ch)} Successfully appended extension $formattedBlocksStr")
134155
Right(maybeNewScore)
135156
case Left(ve) =>
136-
val errorMessage = s"${id(ch)} Error appending extension ${formatBlocks(extensionBlocks.blocks)}: $ve"
157+
val errorMessage = s"${id(ch)} Error appending extension $formattedBlocksStr: $ve"
137158
log.warn(errorMessage)
138159
peerDatabase.blacklistAndClose(ch, errorMessage)
139160
Left(ve)

node/src/main/scala/com/wavesplatform/utx/UtxPool.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ trait UtxForAppender {
1616

1717
trait UtxPool extends UtxForAppender with AutoCloseable {
1818
def putIfNew(tx: Transaction, forceValidate: Boolean = false): TracedResult[ValidationError, Boolean]
19-
def removeAll(txs: Iterable[Transaction]): Unit
19+
def removeIds(txIds: Iterable[ByteStr]): Unit
2020
def all: Seq[Transaction]
2121
def size: Int
2222
def transactionById(transactionId: ByteStr): Option[Transaction]
@@ -40,4 +40,11 @@ object UtxPool {
4040
case class Estimate(time: FiniteDuration) extends PackStrategy
4141
case object Unlimited extends PackStrategy
4242
}
43+
44+
extension (self: UtxPool) {
45+
def removeAll(txs: Iterable[Transaction]): Unit = if (txs.nonEmpty) {
46+
val ids = txs.map(_.id()).toSet
47+
self.removeIds(ids)
48+
}
49+
}
4350
}

node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ import com.wavesplatform.state.TxStateSnapshotHashBuilder.TxStatusInfo
1616
import com.wavesplatform.state.diffs.BlockDiffer.CurrentBlockFeePart
1717
import com.wavesplatform.state.diffs.TransactionDiffer.TransactionValidationError
1818
import com.wavesplatform.state.diffs.{BlockDiffer, TransactionDiffer}
19-
import com.wavesplatform.state.SnapshotBlockchain
20-
import com.wavesplatform.state.{Blockchain, Portfolio, StateSnapshot, TxStateSnapshotHashBuilder}
19+
import com.wavesplatform.state.{Blockchain, Portfolio, SnapshotBlockchain, StateSnapshot, TxStateSnapshotHashBuilder}
2120
import com.wavesplatform.transaction.*
2221
import com.wavesplatform.transaction.TxValidationError.{AlreadyInTheState, GenericError, SenderIsBlacklisted, WithLog}
2322
import com.wavesplatform.transaction.assets.exchange.ExchangeTransaction
@@ -166,12 +165,6 @@ case class UtxPoolImpl(
166165
tracedIsNew
167166
}
168167

169-
override def removeAll(txs: Iterable[Transaction]): Unit = {
170-
if (txs.isEmpty) return
171-
val ids = txs.map(_.id()).toSet
172-
removeIds(ids)
173-
}
174-
175168
def setPrioritySnapshots(discSnapshots: Seq[StateSnapshot]): Unit =
176169
priorityPool.setPriorityDiffs(discSnapshots).foreach(addTransaction(_, verify = false))
177170

@@ -185,8 +178,8 @@ case class UtxPoolImpl(
185178
}
186179
}
187180

188-
private def removeIds(removed: Set[ByteStr]): Unit =
189-
removed.flatMap(id => removeFromOrdPool(id)).foreach(TxStateActions.removeMined(_))
181+
def removeIds(txIds: Iterable[ByteStr]): Unit =
182+
txIds.flatMap(id => removeFromOrdPool(id)).foreach(TxStateActions.removeMined(_))
190183

191184
private[utx] def addTransaction(
192185
tx: Transaction,

node/tests/src/test/scala/com/wavesplatform/finalization/conflict/ConflictEndorserBlocksNgSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import com.wavesplatform.transaction.TxHelpers
1414
import org.scalactic.source.Position
1515
import org.scalatest.Assertion
1616

17-
/** Blocks:
17+
/** Adding blocks:
1818
* 1. Genesis
1919
* 2. With commitments from two generators
2020
* 3. First block at period #1

node/tests/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ class MicroBlockMinerSpec extends FlatSpec with WithDomain {
123123
)
124124

125125
val utxPool = new UtxPool {
126-
127126
override def packUnconfirmed(
128127
rest: MultiDimensionalMiningConstraint,
129128
prevStateHash: Option[ByteStr],
@@ -140,7 +139,7 @@ class MicroBlockMinerSpec extends FlatSpec with WithDomain {
140139
}
141140

142141
override def putIfNew(tx: Transaction, forceValidate: Boolean) = inner.putIfNew(tx, forceValidate)
143-
override def removeAll(txs: Iterable[Transaction]): Unit = inner.removeAll(txs)
142+
override def removeIds(txIds: Iterable[ByteStr]): Unit = inner.removeIds(txIds)
144143
override def all = inner.all
145144
override def size = inner.size
146145
override def transactionById(transactionId: ByteStr) = inner.transactionById(transactionId)

node/tests/src/test/scala/com/wavesplatform/state/LightNodeTest.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,9 @@ class LightNodeTest extends PropSpec with WithDomain {
114114
DomainPresets.TransactionStateSnapshot.copy(enableLightMode = isLightMode),
115115
AddrWithBalance.enoughBalances(sender, TxHelpers.defaultSigner)
116116
) { d =>
117-
val chainSize = 3
118-
val genesisId = d.lastBlockId
117+
val chainSize = 3
118+
val genesisId = d.lastBlockId
119+
val genesisBlock = d.lastBlock
119120
val betterBlocks = (1 to chainSize).map { idx =>
120121
val txs =
121122
Seq(TxHelpers.transfer(sender, recipient, amount = (idx + 10).waves), TxHelpers.transfer(sender, recipient, amount = (idx + 11).waves))
@@ -125,17 +126,21 @@ class LightNodeTest extends PropSpec with WithDomain {
125126
block -> txSnapshots
126127
}
127128
val expectedStateHash = d.lastBlock.header.stateHash
129+
130+
log.debug("Rolling back")
128131
d.rollbackTo(genesisId)
129132

133+
log.debug("Appending new blocks")
130134
(1 to chainSize).foreach { idx =>
131135
val txs = Seq(TxHelpers.transfer(sender, recipient, amount = idx.waves), TxHelpers.transfer(sender, recipient, (idx + 1).waves))
132136
d.appendBlock(txs*)
133137
}
134138
val currentScore = d.blockchain.score
135139

140+
log.debug("Appending extension")
136141
val extensionBlocks = ExtensionBlocks(
137142
currentScore + 1,
138-
betterBlocks.map(_._1),
143+
genesisBlock +: betterBlocks.map(_._1),
139144
betterBlocks.collect { case (b, Some(snapshots)) =>
140145
b.id() -> BlockSnapshotResponse(b.id(), snapshots.map { case (s, m) => PBSnapshots.toProtobuf(s, m) })
141146
}.toMap

node/tests/src/test/scala/com/wavesplatform/state/appender/ExtensionAppenderSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import monix.execution.Scheduler.Implicits.global
1414
class ExtensionAppenderSpec extends FlatSpec with WithDomain {
1515
"Extension appender" should "drop duplicate transactions from UTX" in
1616
withDomain(balances = AddrWithBalance.enoughBalances(TxHelpers.defaultSigner)) { d =>
17+
val genesisBlock = d.lastBlock
1718
val utx = new UtxPoolImpl(SystemTime, d.blockchain, d.settings.utxSettings, d.settings.maxTxErrorLogSize, d.settings.minerSettings.enable)
1819
val time = TestTime()
1920
val extensionAppender = ExtensionAppender(d.blockchain, utx, d.posSelector, time, InvalidBlockStorage.NoOp, PeerDatabase.NoOp, global)(null, _)
@@ -25,7 +26,7 @@ class ExtensionAppenderSpec extends FlatSpec with WithDomain {
2526
utx.all shouldBe Seq(tx)
2627

2728
time.setTime(block1.header.timestamp)
28-
extensionAppender(ExtensionBlocks(d.blockchain.score + block1.blockScore(), Seq(block1), Map.empty)).runSyncUnsafe().explicitGet()
29+
extensionAppender(ExtensionBlocks(d.blockchain.score + block1.blockScore(), Seq(genesisBlock, block1), Map.empty)).runSyncUnsafe().explicitGet()
2930
d.blockchain.height shouldBe 2
3031
utx.all shouldBe Nil
3132
utx.close()

0 commit comments

Comments
 (0)