diff --git a/build.sbt b/build.sbt index 1263b47f45..39e93a2e4c 100644 --- a/build.sbt +++ b/build.sbt @@ -242,16 +242,16 @@ buildRIDERunnerForDocker := { ) } -lazy val compilePRRaw = taskKey[Unit]("Compile the project") +lazy val compilePRRaw = + taskKey[Unit]("Incremental compilation (Compile + Test) without cleanup. Useful for fixing warnings/errors in a fast feedback loop.") compilePRRaw := Def .sequential( - clean.all(ScopeFilter(inAnyProject)), scalafmtCheck.all(ScopeFilter(inAnyProject, inConfigurations(Compile))), compile.all(ScopeFilter(inAnyProject, inConfigurations(Test))) ) .value -lazy val checkPRRaw = taskKey[Unit]("Compile the project and run unit tests") +lazy val checkPRRaw = taskKey[Unit]("Incremental compilation without cleanup and running unit tests. Useful for quick test-fix iterations.") checkPRRaw := Def .sequential( compilePRRaw, @@ -266,20 +266,28 @@ checkPRRaw := Def ) .value -def commandWithFatalWarnings(commandName: String, task: TaskKey[Unit]): Command = - Command.command(commandName) { state => +def commandWithCleanupAndFatalWarnings(commandName: String, task: TaskKey[Unit], help: Help): Command = + Command.command(commandName, help) { state => val extracted = Project.extract(state) val newState = extracted.appendWithoutSession( Seq(Global / scalacOptions ++= Seq("-Werror")), state ) - Project.extract(newState).runTask(task, newState) + Command.process("clean", newState, onParseError = _ => ()).unsafeRunTask(task) state } -def compilePR: Command = commandWithFatalWarnings("compilePR", compilePRRaw) -def checkPR: Command = commandWithFatalWarnings("checkPR", checkPRRaw) +def compilePR: Command = commandWithCleanupAndFatalWarnings( + "compilePR", + compilePRRaw, + Help.briefOnly(Seq("compilePR" -> "Compile with scalafmt.")) +) +def checkPR: Command = commandWithCleanupAndFatalWarnings( + "checkPR", + checkPRRaw, + Help.briefOnly(Seq("checkPR" -> "Compile with scalafmt, build JavaScript artifacts, run unit tests.")) +) commands += Command.command("buildDebPackages") { state => "set node / Debian / packageArchitecture := \"arm64\"" :: diff --git a/node/src/main/scala/com/wavesplatform/api/common/CommonGeneratorsApi.scala b/node/src/main/scala/com/wavesplatform/api/common/CommonGeneratorsApi.scala index b9386b0d6d..cbcc8a9dd9 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/CommonGeneratorsApi.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/CommonGeneratorsApi.scala @@ -93,17 +93,22 @@ object CommonGeneratorsApi { addresses .lazyZip(txIds) .lazyZip(Iterator.from(0).take(addresses.size).map(GeneratorIndex(_)).to(Iterable)) - .collect { case (Some(address), txnId, idx) => // TODO: address=None ? - val b = balances.get(idx) match { - case None if at.toInt <= blockchain.height => Some(0L) - case r => r - } + .flatMap { + case (Some(address), txnId, idx) => + val b = balances.get(idx) match { + case None if at.toInt <= blockchain.height => Some(0L) + case r => r + } + + Some(GeneratorEntry(address, b, txnId, conflict.heightOf(idx))) - GeneratorEntry(address, b, txnId, conflict.heightOf(idx)) + case (None, txnId, idx) => + log.warn(s"Can't find address, txnId=$txnId, idx=$idx. Contact with developers") + None } .toSeq } else { - log.warn(s"Different size: addresses=${addresses.size}, balances=${balances.size}, blsPks=${blsPks.size}") + log.warn(s"Different size: addresses=${addresses.size}, balances=${balances.size}, blsPks=${blsPks.size}. Contact with developers") Seq.empty } } diff --git a/node/src/main/scala/com/wavesplatform/consensus/PoSSelector.scala b/node/src/main/scala/com/wavesplatform/consensus/PoSSelector.scala index 3c2c8fb39e..7b9443c024 100644 --- a/node/src/main/scala/com/wavesplatform/consensus/PoSSelector.scala +++ b/node/src/main/scala/com/wavesplatform/consensus/PoSSelector.scala @@ -84,7 +84,6 @@ case class PoSSelector(blockchain: Blockchain, maxBaseTarget: Option[Long]) exte def validateGenerationSignature(block: Block): Either[ValidationError, ByteStr] = { val blockGenSig = block.header.generationSignature - // TODO: we already checked this blockchain.heightOf(block.header.reference).toRight(GenericError(s"Block reference ${block.header.reference} doesn't exist")).flatMap { height => if (vrfActivated(height + 1)) { getHitSource(height) diff --git a/node/src/main/scala/com/wavesplatform/database/Caches.scala b/node/src/main/scala/com/wavesplatform/database/Caches.scala index 01bd166179..d292a7f9b8 100644 --- a/node/src/main/scala/com/wavesplatform/database/Caches.scala +++ b/node/src/main/scala/com/wavesplatform/database/Caches.scala @@ -4,7 +4,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.collect.ArrayListMultimap import com.google.protobuf.ByteString import com.typesafe.scalalogging.StrictLogging -import com.wavesplatform.account.{Address, Alias, PublicKey} +import com.wavesplatform.account.{Address, Alias} import com.wavesplatform.block.{Block, SignedBlockHeader} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.common.utils.EitherExt2.* diff --git a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala index 5f58102fc8..dce45a344e 100644 --- a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala +++ b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala @@ -7,7 +7,7 @@ import com.google.common.hash.{BloomFilter, Funnels} import com.google.common.primitives.Ints import com.google.common.util.concurrent.MoreExecutors import com.typesafe.scalalogging.Logger -import com.wavesplatform.account.{Address, Alias, PublicKey} +import com.wavesplatform.account.{Address, Alias} import com.wavesplatform.api.common.WavesBalanceIterator import com.wavesplatform.block.Block.BlockId import com.wavesplatform.block.BlockSnapshot @@ -725,17 +725,13 @@ class RocksDBWriter( this.generationPeriodOf(h).foreach { currPeriod => // None checked in Caches if (nextCommittedGenerators.nonEmpty) { val nextPeriod = currPeriod.next - rw.put(Keys.committedGenerators(nextPeriod, h), Some(nextCommittedGenerators)) - - // TODO: Option to not store rw.put(Keys.commitmentTransactions(nextPeriod, h), commitmentTransactionIds) } if (conflictGenerators.nonEmpty) rw.put(Keys.conflictGenerators(currPeriod, h), conflictGenerators) } - // TODO: Option to not store rw.put(Keys.generatorBalances(h, rdb.apiHandle), Some(generatorSet.map(x => x.index -> x.balance))) // TODO: height @@ -1141,9 +1137,10 @@ class RocksDBWriter( rw.delete(Keys.transactionStateSnapshotAt(currentHeight, num, rdb.txSnapshotHandle)) } - rw.delete(Keys.generatorBalances(currentHeight, rdb.apiHandle)) + // Finality currentPeriod.foreach { currentPeriod => - rw.delete(Keys.conflictGenerators(currentPeriod, currentHeight)) // TODO: test + rw.delete(Keys.generatorBalances(currentHeight, rdb.apiHandle)) + rw.delete(Keys.conflictGenerators(currentPeriod, currentHeight)) val nextPeriod = currentPeriod.next rw.delete(Keys.committedGenerators(nextPeriod, currentHeight)) @@ -1192,7 +1189,7 @@ class RocksDBWriter( Some(BlockSnapshot(block.id(), loadTxStateSnapshotsWithStatus(currentHeight, rdb, block.transactionData))) } else None - DiscardedBlock(block, Caches.toHitSource(discardedMeta), snapshot, Seq.empty) // TODO: generatorBalances + DiscardedBlock(block, Caches.toHitSource(discardedMeta), snapshot) } balancesToInvalidate.result().foreach(discardBalance) @@ -1643,7 +1640,7 @@ class RocksDBWriter( override def resolveERC20Address(address: ERC20Address): Option[IssuedAsset] = readOnly(_.get(Keys.assetStaticInfo(address)).map(assetInfo => IssuedAsset(assetInfo.id.toByteStr))) - override def lastStateHash(refId: Option[ByteStr]): ByteStr = + override def lastStateHash(liquidBlockId: Option[ByteStr]): ByteStr = snapshotStateHash(height) def snapshotStateHash(height: Int): ByteStr = diff --git a/node/src/main/scala/com/wavesplatform/mining/Miner.scala b/node/src/main/scala/com/wavesplatform/mining/Miner.scala index 2656f5a47c..b8a70a9057 100644 --- a/node/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -204,7 +204,7 @@ class MinerImpl( } } yield balance - def retryReasons(balance: Long) = for { + def retryReasons(balance: Long): Either[String, ForgeAttemptResult] = for { _ <- checkQuorumAvailable() validBlockDelay <- pos .getValidBlockDelay(height, account, refBaseTarget, balance) @@ -317,18 +317,20 @@ class MinerImpl( f"Next attempt for acc=${account.toAddress} in ${offset.toUnit(SECONDS)}%.3f seconds (${LocalTime.now().plusNanos(offset.toNanos)})$waitBlockIdStr" ) + // We need to wait, because the mining scheduled for SnapshotBlockchain state in BlockchainUpdater + // If we need to forge a block immediately, we can't do this, because the parent block is not in the state yet val waitBlockAppendedTask = waitBlockId match { case Some(blockId) => def waitUntilBlockAppended(block: BlockId): Task[Unit] = - if (blockchainUpdater.contains(block)) Task.unit - else Task.defer(waitUntilBlockAppended(block)).delayExecution(1 seconds) + if (blockchainUpdater.lastBlockId.contains(block)) Task.unit + else Task.defer(waitUntilBlockAppended(block)).delayExecution(1.second) waitUntilBlockAppended(blockId) case None => Task.unit } - def appendTask(block: Block, totalConstraint: MiningConstraint) = // TODO: accept blockAppender instead all these dependencies? + def appendTask(block: Block, totalConstraint: MiningConstraint) = BlockAppender(blockchainUpdater, timeService, utx, pos, blockEndorser, appenderScheduler)(block, None).flatMap { case Left(BlockFromFuture(_, _)) => // Time was corrected, retry generateBlockTask(account, None) diff --git a/node/src/main/scala/com/wavesplatform/network/PeerDatabase.scala b/node/src/main/scala/com/wavesplatform/network/PeerDatabase.scala index 62a64046a9..7b4cbe6e4a 100644 --- a/node/src/main/scala/com/wavesplatform/network/PeerDatabase.scala +++ b/node/src/main/scala/com/wavesplatform/network/PeerDatabase.scala @@ -46,6 +46,6 @@ object PeerDatabase { override val detailedSuspended: Map[InetAddress, Long] = Map.empty - override def blacklistAndClose(channel: Channel, reason: String): Unit = channel.close() + override def blacklistAndClose(channel: Channel, reason: String): Unit = Option(channel).foreach(_.close()) } } diff --git a/node/src/main/scala/com/wavesplatform/network/RxExtensionLoader.scala b/node/src/main/scala/com/wavesplatform/network/RxExtensionLoader.scala index e087e3788f..48ee4295e5 100644 --- a/node/src/main/scala/com/wavesplatform/network/RxExtensionLoader.scala +++ b/node/src/main/scala/com/wavesplatform/network/RxExtensionLoader.scala @@ -22,6 +22,8 @@ import monix.reactive.{Observable, Observer} import java.util.concurrent.TimeUnit import scala.concurrent.duration.* +/** @param blocks The recent is last + */ case class ExtensionBlocks(remoteScore: BigInt, blocks: Seq[Block], snapshots: Map[BlockId, BlockSnapshotResponse]) { override def toString: String = s"ExtensionBlocks($remoteScore, ${formatSignatures(blocks.map(_.id()))}" } diff --git a/node/src/main/scala/com/wavesplatform/state/Blockchain.scala b/node/src/main/scala/com/wavesplatform/state/Blockchain.scala index cccffe5fff..2ef66c07d3 100644 --- a/node/src/main/scala/com/wavesplatform/state/Blockchain.scala +++ b/node/src/main/scala/com/wavesplatform/state/Blockchain.scala @@ -92,14 +92,15 @@ trait Blockchain { def effectiveBalanceBanHeights(address: Address): Seq[Int] - // TODO: named? def committedGenerators(at: GenerationPeriod): IndexedSeq[(Address, BlsPublicKey)] def conflictGenerators(at: GenerationPeriod): ConflictGenerators def resolveERC20Address(address: ERC20Address): Option[IssuedAsset] - def lastStateHash(refId: Option[ByteStr]): ByteStr + /** @note Works only for key block or one of liquid blocks + */ + def lastStateHash(liquidBlockId: Option[ByteStr]): ByteStr } object Blockchain { @@ -174,9 +175,6 @@ object Blockchain { generationDeposit = blockchain.generationDeposit(address) ) - // TODO: lock? - // TODO: not efficient? See RocksDBWriter.balanceSnapshots - // TODO: optimize def generationDeposit(address: Address, at: Height = Height(blockchain.height)): Long = blockchain.generationPeriodOf(at).fold(0L) { period => val committed = blockchain.committedGenerators(period) val conflict = blockchain.conflictGenerators(period) diff --git a/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala b/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala index 469d5e4a42..c1b2245dca 100644 --- a/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala +++ b/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala @@ -230,7 +230,26 @@ class BlockchainUpdaterImpl( challengedHitSource: Option[ByteStr] = None, verify: Boolean = true, txSignParCheck: Boolean = true - ): Either[ValidationError, BlockApplyResult] = + ): Either[ValidationError, BlockApplyResult] = { + def mkDiff(refBlockchain: SnapshotBlockchain, parentBlock: Option[Block]): Either[ValidationError, BlockDiffer.Result] = + BlockDiffer.fromBlock( + refBlockchain, + parentBlock, + block, + snapshot, + MiningConstraints(rocksdb, rocksdb.height).total, + hitSource, + challengedHitSource, + rocksdb.loadCacheData, + verify, + txSignParCheck = txSignParCheck + ) + + def scheduleMining(refBlockchain: Blockchain, differResult: BlockDiffer.Result): Unit = + miner.scheduleMining( + SnapshotBlockchain(refBlockchain, differResult.snapshot, block, hitSource, differResult.carry, reward = None, stateHash = None).some + ) + writeLock { val height = rocksdb.height val notImplementedFeatures: Set[Short] = rocksdb.activatedFeaturesAt(height).diff(BlockchainFeatures.implemented) @@ -241,61 +260,30 @@ class BlockchainUpdaterImpl( (), GenericError(s"UNIMPLEMENTED ${displayFeatures(notImplementedFeatures)} ACTIVATED ON BLOCKCHAIN, UPDATE THE NODE IMMEDIATELY") ) - .flatMap[ValidationError, BlockApplyResult](_ => - (ngState match { + .flatMap[ValidationError, BlockApplyResult] { _ => + val appendResult = ngState match { case None => rocksdb.lastBlockId match { - case Some(uniqueId) if uniqueId != block.header.reference => + case Some(lastBlockId) if lastBlockId != block.header.reference => val logDetails = s"The referenced block(${block.header.reference})" + s" ${if (rocksdb.contains(block.header.reference)) "exists, it's not last persisted" else "doesn't exist"}" Left(BlockAppendError(s"References incorrect or non-existing block: " + logDetails, block)) - case lastBlockId => - val height = lastBlockId.fold(0)(rocksdb.unsafeHeightOf) - val miningConstraints = MiningConstraints(rocksdb, height) - val reward = computeNextReward - - val referencedBlockchain = SnapshotBlockchain(rocksdb, reward) - BlockDiffer - .fromBlock( - referencedBlockchain, - rocksdb.lastBlock, - block, - snapshot, - miningConstraints.total, - hitSource, - challengedHitSource, - rocksdb.loadCacheData, - verify, - txSignParCheck = txSignParCheck - ) - .map { r => - val updatedBlockchain = SnapshotBlockchain(rocksdb, r.snapshot, block, hitSource, r.carry, reward, Some(r.computedStateHash)) - miner.scheduleMining(Some(updatedBlockchain)) - blockchainUpdateTriggers.onProcessBlock(block, r.keyBlockSnapshot, reward, hitSource, referencedBlockchain) + case _ => + val reward = computeNextReward + val withReward = SnapshotBlockchain(rocksdb, reward) + mkDiff(withReward, rocksdb.lastBlock).map { r => + scheduleMining(withReward, r) - Option((r, Nil, reward, hitSource)) - } + blockchainUpdateTriggers.onProcessBlock(block, r.keyBlockSnapshot, reward, hitSource, withReward) + + (r, Nil, reward).some + } } case Some(ng) => if (ng.base.header.reference == block.header.reference) { if (block.header.timestamp < ng.base.header.timestamp) { - val height = rocksdb.unsafeHeightOf(ng.base.header.reference) - val miningConstraints = MiningConstraints(rocksdb, height) - - val referencedBlockchain = SnapshotBlockchain(rocksdb, ng.reward) - BlockDiffer - .fromBlock( - referencedBlockchain, - rocksdb.lastBlock, - block, - snapshot, - miningConstraints.total, - hitSource, - challengedHitSource, - rocksdb.loadCacheData, - verify, - txSignParCheck = txSignParCheck - ) + val withReward = SnapshotBlockchain(rocksdb, ng.reward) + mkDiff(withReward, rocksdb.lastBlock) .map { r => log.trace( s"Better liquid block(timestamp=${block.header.timestamp}) received and applied instead of existing(timestamp=${ng.base.header.timestamp})" @@ -305,13 +293,12 @@ class BlockchainUpdaterImpl( val allSnapshots = ng.baseBlockSnapshot +: mbSnapshots log.trace(s"Discarded microblocks = $mbs, snapshots = ${allSnapshots.map(_.hashString)}") - val updatedBlockchain = SnapshotBlockchain(referencedBlockchain, r.snapshot, block, hitSource, r.carry, None, None) - miner.scheduleMining(Some(updatedBlockchain)) + scheduleMining(withReward, r) blockchainUpdateTriggers.onRollback(this, ng.base.header.reference, rocksdb.height) - blockchainUpdateTriggers.onProcessBlock(block, r.keyBlockSnapshot, ng.reward, hitSource, referencedBlockchain) + blockchainUpdateTriggers.onProcessBlock(block, r.keyBlockSnapshot, ng.reward, hitSource, withReward) - Some((r, allSnapshots, ng.reward, hitSource)) + (r, allSnapshots, ng.reward).some } } else if (areVersionsOfSameBlock(block, ng.base)) { // silently ignore @@ -329,16 +316,12 @@ class BlockchainUpdaterImpl( case Some(liquid) => // Block on a new height if (!verify || liquid.block.signatureValid()) { - val referencedForgedBlockParentHeight = Height(rocksdb.heightOf(liquid.block.header.reference).getOrElse(0)) - - val constraint = MiningConstraints(rocksdb, referencedForgedBlockParentHeight.toInt).total - val prevReward = ng.reward val reward = computeNextReward val prevHitSource = ng.hitSource val liquidSnapshotWithCancelledLeases = ng.cancelExpiredLeases(liquid.data.snapshot) - val referencedBlockchain = SnapshotBlockchain( + val refBlockchain = SnapshotBlockchain( rocksdb, liquidSnapshotWithCancelledLeases, liquid.block, @@ -346,33 +329,10 @@ class BlockchainUpdaterImpl( liquid.data.carryFee, reward, Some(liquid.data.liquidStateHash) - // TODO: generatorBalances? With this we can't remove a hacky fallback calculation ) - for { - differResult <- BlockDiffer.fromBlock( - referencedBlockchain, - Some(liquid.block), - block, - snapshot, - constraint, - hitSource, - challengedHitSource, - rocksdb.loadCacheData, - verify, - txSignParCheck = txSignParCheck - ) - } yield { - val extendedBlockchain = SnapshotBlockchain( - referencedBlockchain, - differResult.snapshot, - block, - hitSource, - differResult.carry, - None, - Some(differResult.computedStateHash) - ) - miner.scheduleMining(Some(extendedBlockchain)) + mkDiff(refBlockchain, Some(liquid.block)).map { r => + scheduleMining(refBlockchain, r) log.trace( s"Persisting block ${liquid.block.id()}, discarded microblock refs: ${liquid.discarded.map(_._1.reference).mkString("[", ",", "]")}" @@ -384,7 +344,7 @@ class BlockchainUpdaterImpl( metrics.microBlockForkHeightStats.record(liquid.discarded.size) } - // Careful! This affects referencedBlockchain and extendedBlockchain, e.g. height + // Careful! This affects refBlockchain and extendedBlockchain, e.g. height rocksdb.append( liquidSnapshotWithCancelledLeases, liquid.data.carryFee, @@ -398,13 +358,13 @@ class BlockchainUpdaterImpl( ) BlockStats.appended(liquid.block, liquid.data.snapshot.scriptsComplexity) TxsInBlockchainStats.record(ng.transactions.size) - blockchainUpdateTriggers.onProcessBlock(block, differResult.keyBlockSnapshot, reward, hitSource, rocksdb) + blockchainUpdateTriggers.onProcessBlock(block, r.keyBlockSnapshot, reward, hitSource, rocksdb) val (discardedMbs, discardedSnapshots) = liquid.discarded.unzip if (discardedMbs.nonEmpty) { log.trace(s"Discarded microblocks: $discardedMbs") } - Some((differResult, discardedSnapshots, reward, hitSource)) + (r, discardedSnapshots, reward).some } } else { val errorText = s"Forged block has invalid signature. Base: ${ng.base}, requested reference: ${block.header.reference}" @@ -412,63 +372,58 @@ class BlockchainUpdaterImpl( Left(BlockAppendError(errorText, block)) } } - }).map { - _ map { - // TODO: case class instead of tuple - case ( - BlockDiffer.Result(newBlockSnapshot, carry, totalFee, updatedTotalConstraint, _, computedStateHash), - discDiffs, - reward, - hitSource - ) => - val newHeight = Height(rocksdb.height + 1) - val currentFinalizedHeight = rocksdb.finalizedHeightAt(Height(rocksdb.height)) - - restTotalConstraint = updatedTotalConstraint - if ( - (block.header.timestamp > time.getTimestamp() - wavesSettings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed.toMillis) - || (newHeight.toInt % 100 == 0) - ) { - currentFinalizedHeight.foreach { h => - log.debug(s"Finalized height at ${rocksdb.height}: $h") - } - log.info(s"New height: $newHeight") - } + } - val blockchain = SnapshotBlockchain(rocksdb, newBlockSnapshot, block, hitSource, carry, reward, Some(computedStateHash)) - ngState = Some( - new NgState( + appendResult.map { + case None => Ignored + case Some((differResult, discarded, reward)) => + val newHeight = Height(rocksdb.height + 1) + val currentFinalizedHeight = rocksdb.finalizedHeightAt(Height(rocksdb.height)) + + restTotalConstraint = differResult.constraint + if ( + (block.header.timestamp > time.getTimestamp() - wavesSettings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed.toMillis) + || (newHeight.toInt % 100 == 0) + ) { + currentFinalizedHeight.foreach { h => log.debug(s"Finalized height at ${rocksdb.height}: $h") } + log.info(s"New height: $newHeight") + } + + val blockchain = + SnapshotBlockchain(rocksdb, differResult.snapshot, block, hitSource, differResult.carry, reward, Some(differResult.computedStateHash)) + ngState = Some( + new NgState( + block, + differResult.snapshot, + differResult.carry, + differResult.totalFee, + differResult.computedStateHash, + featuresApprovedWithBlock(block), + reward, + hitSource, + cancelLeases(collectLeasesToCancel(newHeight), newHeight), + finalizationState = FinalizationState.init( + generatorSet, + conflictGenerators = + this.generationPeriodOf(newHeight).fold(ConflictGenerators.empty)(blockchain.conflictGenerators).upTo(newHeight), block, - newBlockSnapshot, - carry, - totalFee, - computedStateHash, - featuresApprovedWithBlock(block), - reward, - hitSource, - cancelLeases(collectLeasesToCancel(newHeight), newHeight), - finalizationState = FinalizationState.init( - generatorSet, - conflictGenerators = - this.generationPeriodOf(newHeight).fold(ConflictGenerators.empty)(blockchain.conflictGenerators).upTo(newHeight), - block, - parentHeight = Height(rocksdb.height), - finalizedHeight = Blockchain.finalizedHeightOrFallback( - at = newHeight, - latestFinalized = currentFinalizedHeight, - maxRollbackLength = maxSyncRollbackLength - ) + parentHeight = Height(rocksdb.height), + finalizedHeight = Blockchain.finalizedHeightOrFallback( + at = newHeight, + latestFinalized = currentFinalizedHeight, + maxRollbackLength = maxSyncRollbackLength ) ) ) + ) - publishLastBlockInfo() + publishLastBlockInfo() - Applied(discDiffs, this.score, generatorSet) - } getOrElse Ignored + Applied(discarded, this.score, generatorSet) } - ) + } } + } private def collectLeasesToCancel(newHeight: Height): Map[ByteStr, LeaseDetails] = if (rocksdb.isFeatureActivated(BlockchainFeatures.LeaseExpiration, newHeight.toInt)) { @@ -533,7 +488,7 @@ class BlockchainUpdaterImpl( ) ) } else None - DiscardedBlock(block, ng.hitSource, snapshot, generatorSet = Seq.empty) + DiscardedBlock(block, ng.hitSource, snapshot) }.toSeq blocks ++ liquidBlockData } @@ -856,12 +811,12 @@ class BlockchainUpdaterImpl( snapshotBlockchain.resolveERC20Address(address) } - override def lastStateHash(refId: Option[ByteStr]): ByteStr = readLock { - ngState - .map { ng => - refId.filter(ng.contains).fold(ng.bestLiquidComputedStateHash)(id => ng.snapshotFor(id)._4) - } - .getOrElse(rocksdb.lastStateHash(None)) + override def lastStateHash(liquidBlockId: Option[ByteStr]): ByteStr = readLock { + (ngState, liquidBlockId) match { + case (Some(ngState), Some(refId)) if ngState.contains(refId) => ngState.snapshotFor(refId).liquidStateHash + case (Some(ngState), None) => ngState.bestLiquidComputedStateHash + case _ => rocksdb.lastStateHash(liquidBlockId) + } } override def committedGenerators(at: GenerationPeriod): IndexedSeq[(Address, BlsPublicKey)] = readLock { diff --git a/node/src/main/scala/com/wavesplatform/state/EndorsementStorage.scala b/node/src/main/scala/com/wavesplatform/state/EndorsementStorage.scala index 96c85d6ed3..8c4c7f6961 100644 --- a/node/src/main/scala/com/wavesplatform/state/EndorsementStorage.scala +++ b/node/src/main/scala/com/wavesplatform/state/EndorsementStorage.scala @@ -12,7 +12,6 @@ import com.wavesplatform.state.EndorsementStorage.InMemory.FinalizationResult import scala.collection.{immutable, mutable} -// TODO: .switch: use in appender when changed height trait EndorsementStorage { /** @return true, if it can be shared with neighbors diff --git a/node/src/main/scala/com/wavesplatform/state/FinalizationState.scala b/node/src/main/scala/com/wavesplatform/state/FinalizationState.scala index c9c35bb1de..254e55411e 100644 --- a/node/src/main/scala/com/wavesplatform/state/FinalizationState.scala +++ b/node/src/main/scala/com/wavesplatform/state/FinalizationState.scala @@ -89,8 +89,6 @@ object FinalizationState extends ScorexLogging { ) } - // TODO: add already known as conflict, or better: generator balances without conflict - // TODO: easier to create lambda? private def isParentFinalized( after: BlockId, generatorSet: GeneratorSet, diff --git a/node/src/main/scala/com/wavesplatform/state/ParSignatureChecker.scala b/node/src/main/scala/com/wavesplatform/state/ParSignatureChecker.scala index 8d9925f729..4014ddf3d5 100644 --- a/node/src/main/scala/com/wavesplatform/state/ParSignatureChecker.scala +++ b/node/src/main/scala/com/wavesplatform/state/ParSignatureChecker.scala @@ -7,6 +7,10 @@ import com.wavesplatform.utils.Schedulers import monix.eval.Task import monix.execution.schedulers.SchedulerService +/** Executes signature verification in advance using parallel computation. + * The result is cached in a lazy value: if precomputation completes in time, + * the cached result is reused; otherwise, it is computed on demand. + */ object ParSignatureChecker { implicit val sigverify: SchedulerService = Schedulers.fixedPool(4, "sigverify") diff --git a/node/src/main/scala/com/wavesplatform/state/SnapshotBlockchain.scala b/node/src/main/scala/com/wavesplatform/state/SnapshotBlockchain.scala index 3fa9f32aed..e397ebd6e2 100644 --- a/node/src/main/scala/com/wavesplatform/state/SnapshotBlockchain.scala +++ b/node/src/main/scala/com/wavesplatform/state/SnapshotBlockchain.scala @@ -1,7 +1,7 @@ package com.wavesplatform.state import cats.syntax.option.* -import com.wavesplatform.account.{Address, Alias, PublicKey} +import com.wavesplatform.account.{Address, Alias} import com.wavesplatform.block.Block.BlockId import com.wavesplatform.block.{Block, SignedBlockHeader} import com.wavesplatform.common.state.ByteStr @@ -18,7 +18,7 @@ import com.wavesplatform.transaction.{Asset, CommitToGenerationTransaction, ERC2 case class SnapshotBlockchain( inner: Blockchain, maybeSnapshot: Option[StateSnapshot] = None, - blockMeta: Option[(SignedBlockHeader, ByteStr)] = None, + blockMeta: Option[(signedHeader: SignedBlockHeader, hitSource: ByteStr)] = None, carry: Long = 0, reward: Option[Long] = None, stateHash: Option[ByteStr] = None, @@ -55,7 +55,7 @@ case class SnapshotBlockchain( } override def effectiveBalanceBanHeights(address: Address): Seq[Int] = { - val maybeLastBlockBan = blockMeta.flatMap(_._1.header.challengedHeader).map(_.generator.toAddress) match { + val maybeLastBlockBan = blockMeta.flatMap(_.signedHeader.header.challengedHeader).map(_.generator.toAddress) match { case Some(generator) if address == generator => Seq(height) case _ => Seq.empty } @@ -201,7 +201,7 @@ case class SnapshotBlockchain( override def carryFee(refId: Option[ByteStr]): Long = carry - override def score: BigInt = blockMeta.fold(BigInt(0))(_._1.header.score()) + inner.score + override def score: BigInt = blockMeta.fold(BigInt(0))(_.signedHeader.header.score()) + inner.score override def blockHeader(height: Int): Option[SignedBlockHeader] = blockMeta match { @@ -209,7 +209,8 @@ case class SnapshotBlockchain( case _ => inner.blockHeader(height) } - override def heightOf(blockId: ByteStr): Option[Int] = blockMeta.filter(_._1.id() == blockId).map(_ => height) orElse inner.heightOf(blockId) + override def heightOf(blockId: ByteStr): Option[Int] = + blockMeta.filter(_.signedHeader.id() == blockId).map(_ => height) orElse inner.heightOf(blockId) /** Features related */ override def approvedFeatures: Map[Short, Height] = inner.approvedFeatures @@ -241,7 +242,7 @@ case class SnapshotBlockchain( override def hitSource(height: Int): Option[ByteStr] = blockMeta - .collect { case (_, hitSource) if this.height == height => hitSource } + .collect { case x if this.height == height => x.hitSource } .orElse(inner.hitSource(height)) override def resolveERC20Address(address: ERC20Address): Option[IssuedAsset] = @@ -249,8 +250,8 @@ case class SnapshotBlockchain( .resolveERC20Address(address) .orElse(snapshot.erc20Addresses.get(address)) - override def lastStateHash(refId: Option[ByteStr]): BlockId = - stateHash.orElse(blockMeta.flatMap(_._1.header.stateHash)).getOrElse(inner.lastStateHash(refId)) + override def lastStateHash(liquidBlockId: Option[ByteStr]): BlockId = + stateHash.orElse(blockMeta.flatMap(_.signedHeader.header.stateHash)).getOrElse(inner.lastStateHash(liquidBlockId)) override def committedGenerators(at: GenerationPeriod): IndexedSeq[(Address, BlsPublicKey)] = { val base = inner.committedGenerators(at) @@ -265,9 +266,9 @@ case class SnapshotBlockchain( else if (at > currPeriod) ConflictGenerators.empty else { val extraConflictIndexes = for { - (blockMeta, _) <- blockMeta.toSeq - v <- blockMeta.header.finalizationVoting.toSeq - c <- v.conflict + blockMeta <- blockMeta.toSeq + v <- blockMeta.signedHeader.header.finalizationVoting.toSeq + c <- v.conflict } yield c.endorserIndex base.appendAll(Height(height), extraConflictIndexes*) diff --git a/node/src/main/scala/com/wavesplatform/state/appender/BlockAppender.scala b/node/src/main/scala/com/wavesplatform/state/appender/BlockAppender.scala index 64bd5d9639..5efe6dd61d 100644 --- a/node/src/main/scala/com/wavesplatform/state/appender/BlockAppender.scala +++ b/node/src/main/scala/com/wavesplatform/state/appender/BlockAppender.scala @@ -39,25 +39,35 @@ object BlockAppender extends ScorexLogging { verify: Boolean = true, txSignParCheck: Boolean = true )(newBlock: Block, snapshot: Option[BlockSnapshotResponse]): Task[Either[ValidationError, BlockApplyResult]] = - Task { - if (blockchainUpdater.isLastBlockId(newBlock.id())) Right(Ignored) // Cheap to test - else if ( - blockchainUpdater.isLastBlockId(newBlock.header.reference) || - blockchainUpdater.lastBlockHeader.exists(_.header.reference == newBlock.header.reference) - ) { - if (newBlock.header.challengedHeader.isDefined) { - appendChallengeBlock(blockchainUpdater, utxStorage, pos, time, log, verify, txSignParCheck)(newBlock, snapshot) - } else { - appendKeyBlock(blockchainUpdater, utxStorage, pos, time, log, verify, txSignParCheck)(newBlock, snapshot).tap { - case Right(Applied(generatorSet = gs)) => blockEndorser.vote(gs) - case _ => - } + Task(applySync(blockchainUpdater, time, utxStorage, pos, blockEndorser, verify, txSignParCheck)(newBlock, snapshot)) + .executeOn(scheduler) + + private[appender] def applySync( + blockchainUpdater: BlockchainUpdater & Blockchain, + time: Time, + utxStorage: UtxPool, + pos: PoSSelector, + blockEndorser: BlockEndorser, + verify: Boolean = true, + txSignParCheck: Boolean = true + )(newBlock: Block, snapshot: Option[BlockSnapshotResponse]): Either[ValidationError, BlockApplyResult] = + if (blockchainUpdater.isLastBlockId(newBlock.id())) Right(Ignored) // Cheap to test + else if ( + blockchainUpdater.isLastBlockId(newBlock.header.reference) || + blockchainUpdater.lastBlockHeader.exists(_.header.reference == newBlock.header.reference) + ) { + if (newBlock.header.challengedHeader.isDefined) { + appendChallengeBlock(blockchainUpdater, utxStorage, pos, time, log, verify, txSignParCheck)(newBlock, snapshot) + } else { + appendKeyBlock(blockchainUpdater, utxStorage, pos, time, log, verify, txSignParCheck)(newBlock, snapshot).tap { + case Right(Applied(generatorSet = gs)) => blockEndorser.vote(gs) + case _ => } - } else if (blockchainUpdater.contains(newBlock.id())) - Right(Ignored) - else - Left(BlockAppendError("Block is not a child of the last block or its parent", newBlock)) - }.executeOn(scheduler) + } + } else if (blockchainUpdater.contains(newBlock.id())) + Right(Ignored) + else + Left(BlockAppendError("Block is not a child of the last block or its parent", newBlock)) def apply( blockchainUpdater: BlockchainUpdater & Blockchain, diff --git a/node/src/main/scala/com/wavesplatform/state/appender/ExtensionAppender.scala b/node/src/main/scala/com/wavesplatform/state/appender/ExtensionAppender.scala index 52f59a9237..9276af03b1 100644 --- a/node/src/main/scala/com/wavesplatform/state/appender/ExtensionAppender.scala +++ b/node/src/main/scala/com/wavesplatform/state/appender/ExtensionAppender.scala @@ -1,11 +1,17 @@ package com.wavesplatform.state.appender +import cats.data.OptionT +import cats.syntax.all.* +import com.wavesplatform.block.Block +import com.wavesplatform.block.Block.BlockId +import com.wavesplatform.common.state.ByteStr import com.wavesplatform.common.utils.EitherExt2.* import com.wavesplatform.consensus.PoSSelector import com.wavesplatform.features.BlockchainFeatures import com.wavesplatform.lang.ValidationError import com.wavesplatform.metrics.{BlockStats, Metrics} -import com.wavesplatform.network.{ExtensionBlocks, InvalidBlockStorage, PeerDatabase, formatBlocks, id} +import com.wavesplatform.network.{BlockSnapshotResponse, ExtensionBlocks, InvalidBlockStorage, PeerDatabase, formatBlocks, id} +import com.wavesplatform.protobuf.PBSnapshots import com.wavesplatform.state.* import com.wavesplatform.state.BlockchainUpdaterImpl.BlockApplyResult.Applied import com.wavesplatform.transaction.* @@ -17,9 +23,11 @@ import monix.eval.Task import monix.execution.Scheduler import org.influxdb.dto.Point -import scala.util.{Left, Right} +import scala.annotation.tailrec +import scala.util.chaining.* object ExtensionAppender extends ScorexLogging { + private case class AppendData(newBlocks: Seq[Block], lastCommonBlockId: BlockId, lastCommonHeight: Int) def apply( blockchainUpdater: BlockchainUpdater & Blockchain, @@ -29,111 +37,133 @@ object ExtensionAppender extends ScorexLogging { invalidBlocks: InvalidBlockStorage, peerDatabase: PeerDatabase, scheduler: Scheduler - )(ch: Channel, extensionBlocks: ExtensionBlocks): Task[Either[ValidationError, Option[BigInt]]] = { - def appendExtension(extension: ExtensionBlocks): Either[ValidationError, Option[BigInt]] = + )(ch: Channel, extension: ExtensionBlocks): Task[Either[ValidationError, Option[BigInt]]] = { + type Result[A] = Either[ValidationError, A] + def appendExtension(): OptionT[Result, BigInt] = if (extension.remoteScore <= blockchainUpdater.score) { log.trace(s"Ignoring extension $extension because declared remote was not greater than local score ${blockchainUpdater.score}") - Right(None) - } else { - extension.blocks - .collectFirst { case b if !b.signatureValid() => GenericError(s"Block $b has invalid signature") } - .toLeft(extension) - .flatMap { extensionWithValidSignatures => - val newBlocks = extensionWithValidSignatures.blocks.dropWhile(blockchainUpdater.contains) - - newBlocks.headOption.map(_.header.reference) match { - case Some(lastCommonBlockId) => - val initialHeight = blockchainUpdater.height - - val droppedBlocksEi = for { - commonBlockHeight <- blockchainUpdater.heightOf(lastCommonBlockId).toRight(GenericError("Fork contains no common parent")) - droppedBlocks <- { - if (commonBlockHeight < initialHeight) - blockchainUpdater.removeAfter(lastCommonBlockId) - else Right(Seq.empty) - } - } yield (commonBlockHeight, droppedBlocks) - - droppedBlocksEi.flatMap { case (commonBlockHeight, droppedBlocks) => - newBlocks.zipWithIndex.foreach { case (block, idx) => - val rideV6Activated = blockchainUpdater.isFeatureActivated(BlockchainFeatures.RideV6, commonBlockHeight + idx + 1) - ParSignatureChecker.checkTxSignatures(block.transactionData, rideV6Activated) - } - - val forkApplicationResultEi = { - newBlocks.view - .map { b => - b -> appendExtensionBlock(blockchainUpdater, pos, time, verify = true, txSignParCheck = false)( - b, - extension.snapshots.get(b.id()) - ) - .map { - case (_: Applied, height) => BlockStats.applied(b, BlockStats.Source.Ext, height) - case _ => - } - } - .zipWithIndex - .collectFirst { case ((b, Left(e)), i) => (i, b, e) } - .fold[Either[ValidationError, Unit]](Right(())) { case (i, declinedBlock, e) => - e match { - case _: TxValidationError.BlockFromFuture => - case _ => invalidBlocks.add(declinedBlock.id(), e) - } - - newBlocks.view - .dropWhile(_ != declinedBlock) - .foreach(BlockStats.declined(_, BlockStats.Source.Ext)) - - if (i == 0) log.warn(s"Can't process fork starting with $lastCommonBlockId, error appending block $declinedBlock: $e") - else - log.warn( - s"Processed only ${i + 1} of ${newBlocks.size} blocks from extension, error appending next block $declinedBlock: $e" - ) - - Left(e) - } - } - - forkApplicationResultEi match { - case Left(e) => - blockchainUpdater.removeAfter(lastCommonBlockId).explicitGet() - droppedBlocks.foreach { x => - blockchainUpdater.processBlock(x.block, x.hitSource, x.snapshot, x.generatorSet).explicitGet() - } - Left(e) - - case Right(_) => - val depth = initialHeight - commonBlockHeight - if (depth > 0) { - Metrics.write( - Point - .measurement("rollback") - .addField("depth", initialHeight - commonBlockHeight) - .addField("txs", droppedBlocks.size) - ) - } - - val newTransactions = newBlocks.view.flatMap(_.transactionData).toSet - utxStorage.removeAll(newTransactions) - utxStorage.addAndScheduleCleanup(droppedBlocks.flatMap(_._1.transactionData).filterNot(newTransactions)) - Right(Some(blockchainUpdater.score)) - } - } - - case None => - log.debug("No new blocks found in extension") - Right(None) - } + OptionT.none[Result, BigInt] + } else + for { + _ <- OptionT.liftF(validateSignatures()) + appendData <- OptionT.fromOption[Result](dropCommonPrefix()) + _ <- OptionT.liftF(processNewBlocks(appendData)) + } yield blockchainUpdater.score + + def validateSignatures(): Either[ValidationError, ExtensionBlocks] = + extension.blocks + .collectFirst { case b if !b.signatureValid() => GenericError(s"Block $b has invalid signature") } + .toLeft(extension) + + def dropCommonPrefix(): Option[AppendData] = { + @tailrec def loop(last: AppendData): Option[AppendData] = last.newBlocks match { + case Nil => + log.debug("No new blocks found in extension") + None + + case b +: rest => + blockchainUpdater.heightOf(b.id()) match { + case None => last.some + case Some(h) => loop(AppendData(rest, b.id(), h)) } } - log.debug(s"${id(ch)} Attempting to append extension ${formatBlocks(extensionBlocks.blocks)}") - Task(appendExtension(extensionBlocks)).executeOn(scheduler).map { + loop( + AppendData( + extension.blocks, + blockchainUpdater.lastBlockId.getOrElse(throw new RuntimeException("Empty blockchain")), + blockchainUpdater.height + ) + ) + } + + def processNewBlocks(appendData: AppendData): Either[ValidationError, Unit] = { + val originalForkHeight = blockchainUpdater.height + for { + discardedBlocks <- + if (appendData.lastCommonHeight < originalForkHeight) blockchainUpdater.removeAfter(appendData.lastCommonBlockId) + else Right(Seq.empty) + _ = precheckSignatures(appendData) + _ <- applyFork(appendData).tap { + case Left(_) => restoreDiscardedBlocks(appendData.lastCommonBlockId, discardedBlocks) + case Right(_) => + val depth = originalForkHeight - appendData.lastCommonHeight + if (depth > 0) + Metrics.write( + Point + .measurement("rollback") + .addField("depth", depth) + .addField("txs", discardedBlocks.size) + ) + + val newTxs = appendData.newBlocks.flatMap(_.transactionData) + val newTxIds = newTxs.view.map(_.id()).toSet + utxStorage.removeIds(newTxIds) + + val discardedTxs = discardedBlocks.flatMap(_._1.transactionData) + utxStorage.addAndScheduleCleanup(discardedTxs.filterNot(tx => newTxIds.contains(tx.id()))) // In a case of re-appending issues + } + } yield () + } + + def precheckSignatures(appendData: AppendData): Unit = + appendData.newBlocks.zipWithIndex.foreach { case (block, idx) => + val rideV6Activated = blockchainUpdater.isFeatureActivated(BlockchainFeatures.RideV6, appendData.lastCommonHeight + idx + 1) + ParSignatureChecker.checkTxSignatures(block.transactionData, rideV6Activated) + } + + def applyFork(appendData: AppendData): Either[ValidationError, Unit] = { + val appendBlock = appendExtensionBlock(blockchainUpdater, pos, time, verify = true, txSignParCheck = false)(_, _) + appendData.newBlocks.view + .map { b => + val s = extension.snapshots.get(b.id()) + val r = appendBlock(b, s).map { + case (_: Applied, height) => BlockStats.applied(b, BlockStats.Source.Ext, height) + case _ => + } + b -> r + } + .zipWithIndex + .collectFirst { case ((b, Left(e)), i) => (i, b, e) } + .fold(Either.unit[ValidationError]) { case (i, declinedBlock, e) => + e match { + case _: TxValidationError.BlockFromFuture => + case _ => invalidBlocks.add(declinedBlock.id(), e) + } + + appendData.newBlocks.view + .dropWhile(_ != declinedBlock) + .foreach(BlockStats.declined(_, BlockStats.Source.Ext)) + + log.warn( + if (i == 0) s"Can't process fork starting with ${appendData.lastCommonBlockId}, error appending block $declinedBlock: $e" + else s"Processed only ${i + 1} of ${appendData.newBlocks.size} blocks from extension, error appending next block $declinedBlock: $e" + ) + + Left(e) + } + } + + def restoreDiscardedBlocks(lastCommonBlockId: ByteStr, blocks: DiscardedBlocks): Unit = { + blockchainUpdater.removeAfter(lastCommonBlockId).explicitGet() + val reAppend = BlockAppender.applySync(blockchainUpdater, time, utxStorage, pos, BlockEndorser.Disabled, txSignParCheck = false)(_, _) + blocks.foreach { x => + val blockSnapshotResponse = x.snapshot.map { x => + val txStateSnapshot = x.snapshots.map(PBSnapshots.toProtobuf) + BlockSnapshotResponse(x.blockId, txStateSnapshot) + } + reAppend(x.block, blockSnapshotResponse).explicitGet() + } + } + + val formattedBlocksStr = formatBlocks(extension.blocks) + log.debug(s"${id(ch)} Attempting to append extension $formattedBlocksStr") + Task(appendExtension().value).executeOn(scheduler).map { case Right(maybeNewScore) => - log.debug(s"${id(ch)} Successfully appended extension ${formatBlocks(extensionBlocks.blocks)}") + log.debug(s"${id(ch)} Successfully appended extension $formattedBlocksStr") Right(maybeNewScore) case Left(ve) => - val errorMessage = s"${id(ch)} Error appending extension ${formatBlocks(extensionBlocks.blocks)}: $ve" + val errorMessage = s"${id(ch)} Error appending extension $formattedBlocksStr: $ve" log.warn(errorMessage) peerDatabase.blacklistAndClose(ch, errorMessage) Left(ve) diff --git a/node/src/main/scala/com/wavesplatform/state/diffs/BlockDiffer.scala b/node/src/main/scala/com/wavesplatform/state/diffs/BlockDiffer.scala index a86f5bc0c9..bd6dae5d72 100644 --- a/node/src/main/scala/com/wavesplatform/state/diffs/BlockDiffer.scala +++ b/node/src/main/scala/com/wavesplatform/state/diffs/BlockDiffer.scala @@ -496,15 +496,16 @@ object BlockDiffer { private def computeTxFeeInfo(blockchain: Blockchain, tx: Transaction, hasNg: Boolean): TxFeeInfo = { val hasSponsorship = Height(blockchain.height) >= Sponsorship.sponsoredFeesSwitchHeight(blockchain) - val (feeAsset, feeAmount) = maybeApplySponsorship(blockchain, hasSponsorship, tx.assetFee) + val (feeAsset, feeAmount) = maybeApplySponsorship(blockchain, hasSponsorship, tx.assetFee) // In WAVES if hasSponsorship val currentBlockFee = CurrentBlockFeePart(feeAmount) - // carry is 60% of waves fees the next miner will get. obviously carry fee only makes sense when both - // NG and sponsorship is active. also if sponsorship is active, feeAsset can only be Waves - val carry = if (hasNg && hasSponsorship) feeAmount - currentBlockFee else 0 + // Carry is 60% of waves fees the next miner will get. + // Carry fee (in WAVES) only makes sense when both NG and sponsorship (fee paid in WAVES instead of asset) is active. + // Also, if sponsorship is active, feeAsset can only be Waves + val carryFee = if (hasNg && hasSponsorship) feeAmount - currentBlockFee else 0 val wavesFee = if (feeAsset == Waves) feeAmount else 0L - TxFeeInfo(feeAsset, feeAmount, carry, wavesFee) + TxFeeInfo(feeAsset, feeAmount, carryFee, wavesFee) } private def leasePatchesSnapshot(blockchain: Blockchain): StateSnapshot = diff --git a/node/src/main/scala/com/wavesplatform/state/diffs/FeeValidation.scala b/node/src/main/scala/com/wavesplatform/state/diffs/FeeValidation.scala index 3dc43f7612..a145414e29 100644 --- a/node/src/main/scala/com/wavesplatform/state/diffs/FeeValidation.scala +++ b/node/src/main/scala/com/wavesplatform/state/diffs/FeeValidation.scala @@ -44,7 +44,7 @@ object FeeValidation { TransactionType.UpdateAssetInfo -> 1, TransactionType.Ethereum -> 1, TransactionType.InvokeExpression -> 10, - TransactionType.CommitToGeneration -> 100 // TODO: decide + TransactionType.CommitToGeneration -> 100 ) def apply(blockchain: Blockchain, tx: Transaction): Either[ValidationError, Unit] = { diff --git a/node/src/main/scala/com/wavesplatform/transaction/package.scala b/node/src/main/scala/com/wavesplatform/transaction/package.scala index 3e1a3ddf99..ec70ccfccf 100644 --- a/node/src/main/scala/com/wavesplatform/transaction/package.scala +++ b/node/src/main/scala/com/wavesplatform/transaction/package.scala @@ -5,7 +5,7 @@ import com.wavesplatform.account.PrivateKey import com.wavesplatform.block.{Block, BlockSnapshot, MicroBlock} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.lang.ValidationError -import com.wavesplatform.state.{GeneratorSet, StateSnapshot} +import com.wavesplatform.state.StateSnapshot import com.wavesplatform.transaction.Asset.IssuedAsset import com.wavesplatform.transaction.assets.IssueTransaction import com.wavesplatform.transaction.assets.exchange.Order @@ -19,7 +19,7 @@ package object transaction { val AssetIdLength: Int = com.wavesplatform.crypto.DigestLength val AssetIdStringLength: Int = base58Length(AssetIdLength) - case class DiscardedBlock(block: Block, hitSource: ByteStr, snapshot: Option[BlockSnapshot], generatorSet: GeneratorSet) + case class DiscardedBlock(block: Block, hitSource: ByteStr, snapshot: Option[BlockSnapshot]) type DiscardedBlocks = Seq[DiscardedBlock] type DiscardedMicroBlocks = Seq[(MicroBlock, StateSnapshot)] type AuthorizedTransaction = Authorized & Transaction diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala index 863b524c39..573fd584ed 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala @@ -16,7 +16,7 @@ trait UtxForAppender { trait UtxPool extends UtxForAppender with AutoCloseable { def putIfNew(tx: Transaction, forceValidate: Boolean = false): TracedResult[ValidationError, Boolean] - def removeAll(txs: Iterable[Transaction]): Unit + def removeIds(txIds: Iterable[ByteStr]): Unit def all: Seq[Transaction] def size: Int def transactionById(transactionId: ByteStr): Option[Transaction] @@ -40,4 +40,11 @@ object UtxPool { case class Estimate(time: FiniteDuration) extends PackStrategy case object Unlimited extends PackStrategy } + + extension (self: UtxPool) { + def removeAll(txs: Iterable[Transaction]): Unit = if (txs.nonEmpty) { + val ids = txs.map(_.id()).toSet + self.removeIds(ids) + } + } } diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala index 1d3e1dec2c..3ea316e987 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala @@ -16,8 +16,7 @@ import com.wavesplatform.state.TxStateSnapshotHashBuilder.TxStatusInfo import com.wavesplatform.state.diffs.BlockDiffer.CurrentBlockFeePart import com.wavesplatform.state.diffs.TransactionDiffer.TransactionValidationError import com.wavesplatform.state.diffs.{BlockDiffer, TransactionDiffer} -import com.wavesplatform.state.SnapshotBlockchain -import com.wavesplatform.state.{Blockchain, Portfolio, StateSnapshot, TxStateSnapshotHashBuilder} +import com.wavesplatform.state.{Blockchain, Portfolio, SnapshotBlockchain, StateSnapshot, TxStateSnapshotHashBuilder} import com.wavesplatform.transaction.* import com.wavesplatform.transaction.TxValidationError.{AlreadyInTheState, GenericError, SenderIsBlacklisted, WithLog} import com.wavesplatform.transaction.assets.exchange.ExchangeTransaction @@ -166,12 +165,6 @@ case class UtxPoolImpl( tracedIsNew } - override def removeAll(txs: Iterable[Transaction]): Unit = { - if (txs.isEmpty) return - val ids = txs.map(_.id()).toSet - removeIds(ids) - } - def setPrioritySnapshots(discSnapshots: Seq[StateSnapshot]): Unit = priorityPool.setPriorityDiffs(discSnapshots).foreach(addTransaction(_, verify = false)) @@ -185,8 +178,8 @@ case class UtxPoolImpl( } } - private def removeIds(removed: Set[ByteStr]): Unit = - removed.flatMap(id => removeFromOrdPool(id)).foreach(TxStateActions.removeMined(_)) + def removeIds(txIds: Iterable[ByteStr]): Unit = + txIds.flatMap(id => removeFromOrdPool(id)).foreach(TxStateActions.removeMined(_)) private[utx] def addTransaction( tx: Transaction, diff --git a/node/testkit/src/main/scala/com/wavesplatform/history/Domain.scala b/node/testkit/src/main/scala/com/wavesplatform/history/Domain.scala index 8869cd23c7..2fc4ab1c48 100644 --- a/node/testkit/src/main/scala/com/wavesplatform/history/Domain.scala +++ b/node/testkit/src/main/scala/com/wavesplatform/history/Domain.scala @@ -432,13 +432,14 @@ case class Domain( for { resultTimestamp <- - if (blockchain.height > 0) { + if (blockchain.height <= 0) Right(testTime.getTimestamp() - (1 hour).toMillis) + else timestamp .map(Right(_)) .getOrElse( posSelector .getValidBlockDelay( - blockchain.height, + parentHeight, generator, parent.baseTarget, // HACK: 1e11 some generators in tests have less than minimum @@ -446,8 +447,6 @@ case class Domain( ) .map(_ + parent.timestamp) ) - } else - Right(testTime.getTimestamp() - (1 hour).toMillis) consensus <- if (blockchain.height > 0) posSelector @@ -462,9 +461,8 @@ case class Domain( ) else Right(NxtLikeConsensusBlockData(60, generationSignature)) resultBt = - if (blockchain.isFeatureActivated(BlockchainFeatures.FairPoS, parentHeight)) { - consensus.baseTarget - } else if (parentHeight % 2 != 0) parent.baseTarget + if (blockchain.isFeatureActivated(BlockchainFeatures.FairPoS, parentHeight)) consensus.baseTarget + else if (parentHeight % 2 != 0) parent.baseTarget else consensus.baseTarget.max(PoSCalculator.MinBaseTarget) blockWithoutStateHash <- Block .buildAndSign( diff --git a/node/testkit/src/main/scala/com/wavesplatform/utils/EmptyBlockchain.scala b/node/testkit/src/main/scala/com/wavesplatform/utils/EmptyBlockchain.scala index aa8cd0e249..bfd93aa1df 100644 --- a/node/testkit/src/main/scala/com/wavesplatform/utils/EmptyBlockchain.scala +++ b/node/testkit/src/main/scala/com/wavesplatform/utils/EmptyBlockchain.scala @@ -95,7 +95,7 @@ trait EmptyBlockchain extends Blockchain { override def resolveERC20Address(address: ERC20Address): Option[IssuedAsset] = None - override def lastStateHash(refId: Option[ByteStr]): ByteStr = TxStateSnapshotHashBuilder.InitStateHash + override def lastStateHash(liquidBlockId: Option[ByteStr]): ByteStr = TxStateSnapshotHashBuilder.InitStateHash override def committedGenerators(at: GenerationPeriod): IndexedSeq[(Address, BlsPublicKey)] = IndexedSeq.empty diff --git a/node/tests/src/test/scala/com/wavesplatform/finalization/ChallengingAfterFinalizationSuite.scala b/node/tests/src/test/scala/com/wavesplatform/finalization/ChallengingAfterFinalizationSuite.scala index af97dda4d7..a2d914dedc 100644 --- a/node/tests/src/test/scala/com/wavesplatform/finalization/ChallengingAfterFinalizationSuite.scala +++ b/node/tests/src/test/scala/com/wavesplatform/finalization/ChallengingAfterFinalizationSuite.scala @@ -7,12 +7,11 @@ import com.wavesplatform.features.BlockchainFeatures import com.wavesplatform.history.Domain import com.wavesplatform.state.* import com.wavesplatform.test.DomainPresets.WavesSettingsOps -import com.wavesplatform.test.TestSchedulerOps import com.wavesplatform.transaction.TxHelpers import com.wavesplatform.wallet.Wallet import org.scalatest.time.SpanSugar.convertLongToGrainOfTime -class ChallengingAfterFinalizationSuite extends BaseFinalizationSpec, TestSchedulerOps { +class ChallengingAfterFinalizationSuite extends BaseFinalizationSpec { private val thisNodeAcc = Wallet.generateNewAccount(Domain.DefaultWalletSeed, nonce = 0) private val committedGenerator = TxHelpers.defaultSigner diff --git a/node/tests/src/test/scala/com/wavesplatform/finalization/ExtensionAppenderAfterFinalizationSpec.scala b/node/tests/src/test/scala/com/wavesplatform/finalization/ExtensionAppenderAfterFinalizationSpec.scala new file mode 100644 index 0000000000..6f7e91cb69 --- /dev/null +++ b/node/tests/src/test/scala/com/wavesplatform/finalization/ExtensionAppenderAfterFinalizationSpec.scala @@ -0,0 +1,118 @@ +package com.wavesplatform.finalization + +import cats.syntax.option.* +import com.wavesplatform.api.common.CommonGeneratorsApi.GeneratorEntry +import com.wavesplatform.block.{Block, FinalizationVoting} +import com.wavesplatform.db.WithState.AddrWithBalance +import com.wavesplatform.features.BlockchainFeatures +import com.wavesplatform.network.{ExtensionBlocks, InvalidBlockStorage, PeerDatabase} +import com.wavesplatform.state.* +import com.wavesplatform.state.appender.ExtensionAppender +import com.wavesplatform.test.DomainPresets.WavesSettingsOps +import com.wavesplatform.test.produce +import com.wavesplatform.transaction.{Transaction, TxHelpers} +import monix.execution.Scheduler.Implicits.global +import org.scalactic.Prettifier + +class ExtensionAppenderAfterFinalizationSpec extends BaseFinalizationSpec { + private val defaultSettings = DomainPresets.DeterministicFinality + .addFeatures(BlockchainFeatures.SmallerMinimalGeneratingBalance) + .setFeaturesHeight(BlockchainFeatures.DeterministicFinality -> 3) + .configure( + _.copy( + generationPeriodLength = 2, + lightNodeBlockFieldsAbsenceInterval = 0 + ) + ) + + private val committedGenerator1 = TxHelpers.signer(0) + + private val committedGenerator2 = TxHelpers.signer(1) + private val committedGenerator2Idx = GeneratorIndex(1) + + private val committedGenerator3 = TxHelpers.signer(2) + private val committedGenerator3Idx = GeneratorIndex(2) + + private val allGenerators = Seq(committedGenerator1, committedGenerator2, committedGenerator3) + + "Should re-append blocks of original branch and preserve generators info if failed to append of better fork" in withDomain( + defaultSettings, + AddrWithBalance.enoughBalances(allGenerators*) + ) { d => + def mkCommitments(period: GenerationPeriod = d.blockchain.currentGenerationPeriod.value.next): Seq[Transaction] = + allGenerators.map(x => TxHelpers.commitToGeneration(period.start, x)) + + def appendBlock(txs: Seq[Transaction] = Nil): Block = { + val b = d.createBlock(txs, generator = committedGenerator1, strictTime = true) + d.appender.appendBlock(b) + b + } + + def appendMicroBlock(fv: FinalizationVoting): Unit = + d.appendMicroBlockE( + d.createMicroBlock(signer = committedGenerator1.some, finalizationVoting = fv.some)(TxHelpers.transfer(committedGenerator3)) + ) + + val genesisBlock = d.lastBlock + + log.debug("Append block 2 - the first common block") + val altChainBlock1 = d.createBlock(generator = committedGenerator2, strictTime = true) + appendBlock() + + log.debug("Append block 3 (activation height)") + val altChainBlock2 = d.createBlock(ref = altChainBlock1.id().some, generator = committedGenerator2, strictTime = true) + appendBlock() + + log.debug("Append block 4") + appendBlock() + + log.debug("Append block 5 with commitments") + appendBlock(mkCommitments()) + + log.debug("Period 1 with generators") + log.debug("Append block 6") + val endorsedBlockOfPeriod1 = appendBlock() + + log.debug("Append block 7 with commitments and conflicting endorsement") + appendBlock(mkCommitments()) + appendMicroBlock(mkFinalizationVoting().withConflict(committedGenerator3, committedGenerator3Idx, endorsedBlockOfPeriod1.id())) + + log.debug("Period 2 without generators") + log.debug("Append block 8") + appendBlock() + log.debug("Append block 9 with commitments") + appendBlock(mkCommitments()) + + log.debug("Period 3 with generators") + log.debug("Append block 10 with conflicting endorsement") + val endorsedBlockOfPeriod2 = appendBlock(mkCommitments(d.blockchain.generationPeriodOf(Height(13)).value)) + appendMicroBlock(mkFinalizationVoting().withConflict(committedGenerator2, committedGenerator2Idx, endorsedBlockOfPeriod2.id())) + val lastBlockId = d.lastBlockId + + def getGenerators = (1 to d.blockchain.height).map(i => d.generatorsApi.generators(Height(i))) + val mainChainBlockGenerators = getGenerators + + log.debug("Try to append an extension with a wrong block") + val extensionAppender = + ExtensionAppender(d.blockchain, d.utxPool, d.posSelector, d.testTime, InvalidBlockStorage.NoOp, PeerDatabase.NoOp, global)(null, _) + + val altChain = Seq(genesisBlock, altChainBlock1, altChainBlock2) + extensionAppender(ExtensionBlocks(d.blockchain.score + 1, altChain, Map.empty)).runSyncUnsafe() should produce("is invalid") + + log.debug("Checks") + withClue("Restored: ") { + d.lastBlockId shouldBe lastBlockId + } + + val mainChainBlockGeneratorsAfterRestore = getGenerators + + { + given Prettifier = { + case o: IndexedSeq[?] => o.mkString("\n") + case o => o.toString + } + + mainChainBlockGeneratorsAfterRestore shouldBe mainChainBlockGenerators + } + } +} diff --git a/node/tests/src/test/scala/com/wavesplatform/finalization/FinalizationActivationSuite.scala b/node/tests/src/test/scala/com/wavesplatform/finalization/FinalizationActivationSuite.scala new file mode 100644 index 0000000000..96f59f053e --- /dev/null +++ b/node/tests/src/test/scala/com/wavesplatform/finalization/FinalizationActivationSuite.scala @@ -0,0 +1,43 @@ +package com.wavesplatform.finalization + +import com.wavesplatform.db.WithState.AddrWithBalance +import com.wavesplatform.features.BlockchainFeatures +import com.wavesplatform.state.{GeneratorIndex, Height} +import com.wavesplatform.test.DomainPresets.WavesSettingsOps +import com.wavesplatform.transaction.TxHelpers + +class FinalizationActivationSuite extends BaseFinalizationSpec { + private val node0Acc = TxHelpers.signer(0) + private val node1Acc = TxHelpers.signer(1) + + private val defaultSettings = DomainPresets.DeterministicFinality + .addFeatures(BlockchainFeatures.SmallerMinimalGeneratingBalance) + .setFeaturesHeight(BlockchainFeatures.DeterministicFinality -> 5) + .configure(_.copy(generationPeriodLength = 3)) + + "activation from 5" in withDomain(defaultSettings, AddrWithBalance.enoughBalances(node0Acc, node1Acc)) { d => + val genesisBlockId = d.blockchain.lastBlockId.value + (2 to 4).foreach(_ => d.appendBlock()) + + log.debug("Append block 5 with commitments") + d.appendBlock(Seq(node0Acc, node1Acc).map(x => TxHelpers.commitToGeneration(Height(9), x))*) + (6 to 8).foreach(_ => d.appendBlock()) + + log.debug(s"Append block 9 with votes") + d.appender.appendBlock( + d.createBlock( + generator = node0Acc, + strictTime = true, + finalizationVoting = Some( + mkFinalizationVoting(valid = Seq(GeneratorIndex(1))) + .signed(endorsedId = d.lastBlockId, finalizedId = genesisBlockId, validEndorsers = node1Acc) + ) + ) + ) + d.allFinalizedHeightIs(1) + + log.debug("Append block 10") + d.appender.appendBlock(d.createBlock(generator = node1Acc, strictTime = true)) + d.allFinalizedHeightIs(8) + } +} diff --git a/node/tests/src/test/scala/com/wavesplatform/finalization/FinalizationSuite.scala b/node/tests/src/test/scala/com/wavesplatform/finalization/FinalizationSuite.scala index 2fe5959aa1..63ba7a47e7 100644 --- a/node/tests/src/test/scala/com/wavesplatform/finalization/FinalizationSuite.scala +++ b/node/tests/src/test/scala/com/wavesplatform/finalization/FinalizationSuite.scala @@ -17,7 +17,6 @@ class FinalizationSuite extends BaseFinalizationSpec { private val baseSettings = DomainPresets.DeterministicFinality.addFeatures(BlockchainFeatures.SmallerMinimalGeneratingBalance) private val defaultSettings = baseSettings - .copy(minerSettings = baseSettings.minerSettings.copy(quorum = 0)) .configure(_.copy(generationPeriodLength = 3, generationBalanceDepthFrom50To1000AfterHeight = 1000)) "finalized if got next block referenced votes in" - { diff --git a/node/tests/src/test/scala/com/wavesplatform/finalization/conflict/ConflictEndorserBlocksNgSuite.scala b/node/tests/src/test/scala/com/wavesplatform/finalization/conflict/ConflictEndorserBlocksNgSuite.scala index ea491d8883..bab2dd8545 100644 --- a/node/tests/src/test/scala/com/wavesplatform/finalization/conflict/ConflictEndorserBlocksNgSuite.scala +++ b/node/tests/src/test/scala/com/wavesplatform/finalization/conflict/ConflictEndorserBlocksNgSuite.scala @@ -14,7 +14,7 @@ import com.wavesplatform.transaction.TxHelpers import org.scalactic.source.Position import org.scalatest.Assertion -/** Blocks: +/** Adding blocks: * 1. Genesis * 2. With commitments from two generators * 3. First block at period #1 diff --git a/node/tests/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala b/node/tests/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala index 2db5cda1e5..190213bee1 100644 --- a/node/tests/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala +++ b/node/tests/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala @@ -123,7 +123,6 @@ class MicroBlockMinerSpec extends FlatSpec with WithDomain { ) val utxPool = new UtxPool { - override def packUnconfirmed( rest: MultiDimensionalMiningConstraint, prevStateHash: Option[ByteStr], @@ -140,7 +139,7 @@ class MicroBlockMinerSpec extends FlatSpec with WithDomain { } override def putIfNew(tx: Transaction, forceValidate: Boolean) = inner.putIfNew(tx, forceValidate) - override def removeAll(txs: Iterable[Transaction]): Unit = inner.removeAll(txs) + override def removeIds(txIds: Iterable[ByteStr]): Unit = inner.removeIds(txIds) override def all = inner.all override def size = inner.size override def transactionById(transactionId: ByteStr) = inner.transactionById(transactionId) diff --git a/node/tests/src/test/scala/com/wavesplatform/mining/MinerScheduleSuite.scala b/node/tests/src/test/scala/com/wavesplatform/mining/MinerScheduleSuite.scala new file mode 100644 index 0000000000..4d3ca40456 --- /dev/null +++ b/node/tests/src/test/scala/com/wavesplatform/mining/MinerScheduleSuite.scala @@ -0,0 +1,161 @@ +package com.wavesplatform.mining + +import cats.syntax.option.* +import com.wavesplatform.db.WithDomain +import com.wavesplatform.db.WithState.AddrWithBalance +import com.wavesplatform.history.Domain +import com.wavesplatform.state.* +import com.wavesplatform.test.{FreeSpec, TestSchedulerOps, WithResourceManager} +import com.wavesplatform.transaction.{Transaction, TxHelpers} +import com.wavesplatform.wallet.Wallet +import io.netty.channel.group.DefaultChannelGroup +import io.netty.util.concurrent.GlobalEventExecutor +import monix.execution.schedulers.TestScheduler +import monix.reactive.subjects.ConcurrentSubject +import org.scalatest.EitherValues +import org.scalatest.time.SpanSugar.convertLongToGrainOfTime + +import scala.util.Using + +class MinerScheduleSuite extends FreeSpec with WithDomain with TestSchedulerOps with WithResourceManager with EitherValues { + private val thisNodeAcc = Wallet.generateNewAccount(Domain.DefaultWalletSeed, nonce = 0) + private val otherNodeAcc = TxHelpers.defaultSigner + + private val microBlockInterval = 5.seconds + private val minMicroBlockAge = 3.seconds + + "After new block (no NG state)" in pending // Hard to do, because, we haven't yet emulated restarts in such tests + + "After better liquid block" in new BaseTest { + override def continue(d: Domain): Unit = { + log.debug("Prepare worse and better blocks") + val commonBlockId = d.lastBlockId + val worseBlock = d.createBlock( + generator = thisNodeAcc, + ref = commonBlockId.some, + strictTime = true, + timestamp = Some(d.nextBlockTime(otherNodeAcc) + 1L) + ) + val betterBlock = d.createBlock(generator = otherNodeAcc, ref = commonBlockId.some, strictTime = true) + + log.debug("Append worse block") + d.appender.appendBlockWithoutFallback(worseBlock) should beRight + appenderScheduler.tickNext("this-appender-1") + + log.debug("Append microBlock") + d.testTime.advance(microBlockInterval) + val microBlock = d.createMicroBlock(signer = thisNodeAcc.some)(mkTx()) + d.appendMicroBlock(microBlock) + appenderScheduler.tickNext("this-appender-2") + + log.debug("Append better liquid block") + d.appender.appendBlockWithoutFallback(betterBlock) should beRight + + log.debug("Trigger thisNode forging") + d.testTime.setTimeIfGreater(d.nextBlockTime(thisNodeAcc)) + d.kickUtx() + appenderScheduler.tickNext("this-appender-3") + minerScheduler.tickNext("this-miner-1") + appenderScheduler.tickNext("this-appender-4") + + withClue(s"lastBlock.id=${d.lastBlockId}, reference: ") { + val lbh = d.lastBlock.header + lbh.reference shouldBe betterBlock.id() + lbh.generator shouldBe thisNodeAcc.publicKey + } + } + }.run() + + "After key block" in new BaseTest { + override def continue(d: Domain): Unit = { + log.debug("Trigger thisNode forging") + val parentBlockId = d.lastBlockId + + d.testTime.setTimeIfGreater(d.nextBlockTime(thisNodeAcc)) + minerScheduler.tickNext("this-miner-1") + appenderScheduler.tickNext("this-appender-1") + + val lbh = d.lastBlock.header + lbh.reference shouldBe parentBlockId + lbh.generator shouldBe thisNodeAcc.publicKey + } + }.run() + + "After micro block" in new BaseTest { + override def continue(d: Domain): Unit = { + log.debug("Append block") + d.appender.appendBlockWithoutFallback(d.createBlock(generator = otherNodeAcc, strictTime = true)) should beRight + appenderScheduler.tickNext("this-appender-1") + + log.debug("Append micro block") + d.testTime.advance(microBlockInterval) + d.appendMicroBlock(d.createMicroBlock(signer = otherNodeAcc.some)(mkTx())) + appenderScheduler.tickNext("this-appender-2") + val parentBlockId = d.lastBlockId + + log.debug("Trigger thisNode forging") + d.testTime.setTimeIfGreater(d.nextBlockTime(thisNodeAcc)) + minerScheduler.tickNext("this-miner-1") + appenderScheduler.tickNext("this-appender-3") + + val lbh = d.lastBlock.header + lbh.reference shouldBe parentBlockId + lbh.generator shouldBe thisNodeAcc.publicKey + } + }.run() + + private trait BaseTest { + def baseSettings = DomainPresets.TransactionStateSnapshot + def defaultSettings = baseSettings.copy( + minerSettings = baseSettings.minerSettings.copy(quorum = 0, microBlockInterval = microBlockInterval, minMicroBlockAge = minMicroBlockAge) + ) + + val minerScheduler = TestScheduler() + val appenderScheduler = TestScheduler() + val utxEvents = ConcurrentSubject.publish[Unit](using minerScheduler) + + def continue(d: Domain): Unit + + def run(): Unit = Using.Manager { manager => + val channels = manager(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE))(using _.close()) + + var miner = Miner.StrictDisabledMiner + withDomain( + defaultSettings, + AddrWithBalance.enoughBalances(thisNodeAcc, otherNodeAcc), + miner = Miner.forwardTo(miner) + ) { d => + d.wallet.generateNewAccounts(1) + + miner = new MinerImpl( + channels, + d.blockchain, + d.settings, + d.testTime, + d.utxPool, + BlockEndorser.Disabled, + EndorsementStorage.Disabled, + d.wallet, + d.posSelector, + minerScheduler, + appenderScheduler, + utxEvents + ) + + d.appendBlock() + appenderScheduler.tickNext("this-appender-0") + + continue(d) + } + }.get + + def mkTx(): Transaction = TxHelpers.transfer(from = thisNodeAcc, to = otherNodeAcc.toAddress) + + extension (d: Domain) { + def kickUtx(): Unit = { + d.utxPool.putIfNew(mkTx()) + utxEvents.onNext(()) + } + } + } +} diff --git a/node/tests/src/test/scala/com/wavesplatform/mining/NgMinerWithSponsorshipSuite.scala b/node/tests/src/test/scala/com/wavesplatform/mining/NgMinerWithSponsorshipSuite.scala new file mode 100644 index 0000000000..d06cd9f284 --- /dev/null +++ b/node/tests/src/test/scala/com/wavesplatform/mining/NgMinerWithSponsorshipSuite.scala @@ -0,0 +1,96 @@ +package com.wavesplatform.mining + +import com.wavesplatform.db.WithDomain +import com.wavesplatform.db.WithState.AddrWithBalance +import com.wavesplatform.features.BlockchainFeatures +import com.wavesplatform.history.Domain +import com.wavesplatform.state.* +import com.wavesplatform.test.DomainPresets.WavesSettingsOps +import com.wavesplatform.test.{FreeSpec, NumericExt, TestSchedulerOps, WithResourceManager} +import com.wavesplatform.transaction.TxHelpers +import com.wavesplatform.wallet.Wallet +import io.netty.channel.group.DefaultChannelGroup +import io.netty.util.concurrent.GlobalEventExecutor +import monix.execution.schedulers.TestScheduler +import monix.reactive.subjects.ConcurrentSubject +import org.scalatest.EitherValues +import org.scalatest.time.SpanSugar.convertLongToGrainOfTime + +class NgMinerWithSponsorshipSuite extends FreeSpec with WithDomain with TestSchedulerOps with WithResourceManager with EitherValues { + private val thisNodeAcc = Wallet.generateNewAccount(Domain.DefaultWalletSeed, nonce = 0) + + "Correct block signature" ignore withManager { manager => + val minerScheduler = TestScheduler() + val appenderScheduler = TestScheduler() + + val channels = manager(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)) + var miner = Miner.StrictDisabledMiner + + val baseSettings = DomainPresets.TransactionStateSnapshot + .addFeatures(BlockchainFeatures.SmallerMinimalGeneratingBalance) + .setFeaturesHeight( + BlockchainFeatures.FeeSponsorship -> 0, // The root of issue, it works with -feature_period + BlockchainFeatures.BlockRewardDistribution -> 100 + ) + + val settings = baseSettings + .copy(minerSettings = baseSettings.minerSettings.copy(quorum = 0, microBlockInterval = 100.millis)) + .configure( + _.copy( + daoAddress = None, + xtnBuybackAddress = None, + lightNodeBlockFieldsAbsenceInterval = 2, + generationPeriodLength = 4 + ) + ) + + withDomain(settings, Seq(AddrWithBalance(thisNodeAcc.toAddress, 10_000.waves)), miner = Miner.forwardTo(miner)) { d => + d.wallet.generateNewAccounts(1) + + val endorsementStorage = EndorsementStorage.InMemory((blockId, h) => blockId == d.blockchain.blockId(h.toInt)) + val blockEndorser = BlockEndorser.InMemory(d.settings.synchronizationSettings.maxRollback, d.blockchain, d.wallet, endorsementStorage, channels) + val utxEvents = ConcurrentSubject.publish[Unit](using minerScheduler) + val minerImpl = new MinerImpl( + channels, + d.blockchain, + d.settings, + d.testTime, + d.utxPool, + blockEndorser, + endorsementStorage, + d.wallet, + d.posSelector, + minerScheduler, + appenderScheduler, + utxEvents + ) + miner = minerImpl + log.debug("Schedule mining") + minerImpl.scheduleMining() + + log.debug("Trigger forging block 2") + d.testTime.setTimeIfGreater(d.nextBlockTime(thisNodeAcc)) + appenderScheduler.tickNext("appender-1") + minerScheduler.tickNext("miner-1") + appenderScheduler.tickNext("appender-2") + + log.debug("Trigger forging microblock of 2") + d.utxPool.putIfNew(TxHelpers.transfer(from = thisNodeAcc, amount = 10.waves, fee = 1.waves)) + utxEvents.onNext(()) + minerScheduler.tickNext("miner-2") + appenderScheduler.tickNext("appender-3") + minerScheduler.tickNext("miner-1") + + val lastBlockId = d.lastBlockId + + log.debug("Trigger forging block 3") + d.testTime.setTimeIfGreater(d.nextBlockTime(thisNodeAcc)) + appenderScheduler.tickNext("appender-1") + minerScheduler.tickNext("miner-1") + appenderScheduler.tickNext("appender-2") + + // TODO: fails, because the miner has an unexpected WAVES balance + d.lastBlockId shouldNot be(lastBlockId) + } + } +} diff --git a/node/tests/src/test/scala/com/wavesplatform/state/LightNodeTest.scala b/node/tests/src/test/scala/com/wavesplatform/state/LightNodeTest.scala index f6f559a83d..66491db027 100644 --- a/node/tests/src/test/scala/com/wavesplatform/state/LightNodeTest.scala +++ b/node/tests/src/test/scala/com/wavesplatform/state/LightNodeTest.scala @@ -114,8 +114,9 @@ class LightNodeTest extends PropSpec with WithDomain { DomainPresets.TransactionStateSnapshot.copy(enableLightMode = isLightMode), AddrWithBalance.enoughBalances(sender, TxHelpers.defaultSigner) ) { d => - val chainSize = 3 - val genesisId = d.lastBlockId + val chainSize = 3 + val genesisId = d.lastBlockId + val genesisBlock = d.lastBlock val betterBlocks = (1 to chainSize).map { idx => val txs = Seq(TxHelpers.transfer(sender, recipient, amount = (idx + 10).waves), TxHelpers.transfer(sender, recipient, amount = (idx + 11).waves)) @@ -125,17 +126,21 @@ class LightNodeTest extends PropSpec with WithDomain { block -> txSnapshots } val expectedStateHash = d.lastBlock.header.stateHash + + log.debug("Rolling back") d.rollbackTo(genesisId) + log.debug("Appending new blocks") (1 to chainSize).foreach { idx => val txs = Seq(TxHelpers.transfer(sender, recipient, amount = idx.waves), TxHelpers.transfer(sender, recipient, (idx + 1).waves)) d.appendBlock(txs*) } val currentScore = d.blockchain.score + log.debug("Appending extension") val extensionBlocks = ExtensionBlocks( currentScore + 1, - betterBlocks.map(_._1), + genesisBlock +: betterBlocks.map(_._1), betterBlocks.collect { case (b, Some(snapshots)) => b.id() -> BlockSnapshotResponse(b.id(), snapshots.map { case (s, m) => PBSnapshots.toProtobuf(s, m) }) }.toMap diff --git a/node/tests/src/test/scala/com/wavesplatform/state/appender/ExtensionAppenderSpec.scala b/node/tests/src/test/scala/com/wavesplatform/state/appender/ExtensionAppenderSpec.scala index c7ff877e53..9ae23c23ad 100644 --- a/node/tests/src/test/scala/com/wavesplatform/state/appender/ExtensionAppenderSpec.scala +++ b/node/tests/src/test/scala/com/wavesplatform/state/appender/ExtensionAppenderSpec.scala @@ -14,6 +14,7 @@ import monix.execution.Scheduler.Implicits.global class ExtensionAppenderSpec extends FlatSpec with WithDomain { "Extension appender" should "drop duplicate transactions from UTX" in withDomain(balances = AddrWithBalance.enoughBalances(TxHelpers.defaultSigner)) { d => + val genesisBlock = d.lastBlock val utx = new UtxPoolImpl(SystemTime, d.blockchain, d.settings.utxSettings, d.settings.maxTxErrorLogSize, d.settings.minerSettings.enable) val time = TestTime() val extensionAppender = ExtensionAppender(d.blockchain, utx, d.posSelector, time, InvalidBlockStorage.NoOp, PeerDatabase.NoOp, global)(null, _) @@ -25,7 +26,7 @@ class ExtensionAppenderSpec extends FlatSpec with WithDomain { utx.all shouldBe Seq(tx) time.setTime(block1.header.timestamp) - extensionAppender(ExtensionBlocks(d.blockchain.score + block1.blockScore(), Seq(block1), Map.empty)).runSyncUnsafe().explicitGet() + extensionAppender(ExtensionBlocks(d.blockchain.score + block1.blockScore(), Seq(genesisBlock, block1), Map.empty)).runSyncUnsafe().explicitGet() d.blockchain.height shouldBe 2 utx.all shouldBe Nil utx.close() diff --git a/node/tests/src/test/scala/com/wavesplatform/state/diffs/ci/sync/SyncDAppPaymentTest.scala b/node/tests/src/test/scala/com/wavesplatform/state/diffs/ci/sync/SyncDAppPaymentTest.scala index 45865f3d24..feed85d856 100644 --- a/node/tests/src/test/scala/com/wavesplatform/state/diffs/ci/sync/SyncDAppPaymentTest.scala +++ b/node/tests/src/test/scala/com/wavesplatform/state/diffs/ci/sync/SyncDAppPaymentTest.scala @@ -10,12 +10,13 @@ import com.wavesplatform.lang.directives.values.V5 import com.wavesplatform.lang.script.Script import com.wavesplatform.lang.v1.compiler.TestCompiler import com.wavesplatform.lang.v1.evaluator.ctx.impl.GlobalValNames +import com.wavesplatform.state.Height import com.wavesplatform.state.diffs.{ENOUGH_AMT, produceRejectOrFailedDiff} import com.wavesplatform.test.* import com.wavesplatform.test.DomainPresets.* import com.wavesplatform.transaction.Asset.{IssuedAsset, Waves} import com.wavesplatform.transaction.smart.InvokeScriptTransaction -import com.wavesplatform.transaction.{Asset, Transaction, TxHelpers} +import com.wavesplatform.transaction.{Asset, CommitToGenerationTransaction, Transaction, TxHelpers} class SyncDAppPaymentTest extends PropSpec with WithDomain { @@ -241,6 +242,92 @@ class SyncDAppPaymentTest extends PropSpec with WithDomain { } } + property("can't use deposited funds for sync call payment when master has only deposit") { + val miner = TxHelpers.signer(0) + val invoker = TxHelpers.signer(1) + val masterDApp = TxHelpers.signer(2) + val serviceDApp = TxHelpers.signer(3) + + val settings = DeterministicFinality + .addFeatures(BlockchainFeatures.SmallerMinimalGeneratingBalance) + .configure(_.copy(generationPeriodLength = 2)) + + val initBalance = 2000.waves + val initBalances = Seq(miner, invoker, serviceDApp, masterDApp).map(kp => AddrWithBalance(kp.toAddress, initBalance)) + + withDomain(settings, initBalances) { d => + log.debug("Append block 2 with scripts and commitment") + val txFeeAmount = 1.waves + val available = initBalance - (CommitToGenerationTransaction.DepositInWavelets + 2 * txFeeAmount) + val transferAmount = available + 1 + + val setMasterScript = TxHelpers.setScript(masterDApp, invokerDAppScript(serviceDApp.toAddress, amount = transferAmount), txFeeAmount) + val setServiceScript = TxHelpers.setScript(serviceDApp, simpleDAppScript()) + val commitments = Seq(miner, masterDApp).map { kp => + TxHelpers.commitToGeneration(generationPeriodStart = Height(3), sender = kp, fee = txFeeAmount) + } + + d.appender.appendBlock( + d.createBlock( + txs = Seq(setMasterScript, setServiceScript) ++ commitments, + strictTime = true + ) + ) + + log.debug("Check master dApp balances") + val balanceDetails = d.accountsApi.balanceDetails(masterDApp.toAddress).explicitGet() + balanceDetails.available shouldBe available + + log.debug("Try to spend") + val invoke = TxHelpers.invoke(masterDApp.toAddress, invoker = invoker) + d.appendBlockE(invoke) should produce(s"${masterDApp.toAddress} -> trying to spend a deposit") + } + } + + property("can't use deposited funds for sync call payment when master has deposit and leasing") { + val miner = TxHelpers.signer(0) + val invoker = TxHelpers.signer(1) + val masterDApp = TxHelpers.signer(2) + val serviceDApp = TxHelpers.signer(3) + + val settings = DeterministicFinality + .addFeatures(BlockchainFeatures.SmallerMinimalGeneratingBalance) + .configure(_.copy(generationPeriodLength = 2)) + + val initBalance = 2000.waves + val initBalances = Seq(miner, invoker, serviceDApp, masterDApp).map(kp => AddrWithBalance(kp.toAddress, initBalance)) + + withDomain(settings, initBalances) { d => + log.debug("Append block 2 with scripts and commitment") + val txFeeAmount = 1.waves + val leaseAmount = 100.waves + val available = initBalance - (leaseAmount + CommitToGenerationTransaction.DepositInWavelets + 3 * txFeeAmount) + val transferAmount = available + 1 + + val setMasterScript = TxHelpers.setScript(masterDApp, invokerDAppScript(serviceDApp.toAddress, amount = transferAmount), txFeeAmount) + val setServiceScript = TxHelpers.setScript(serviceDApp, simpleDAppScript()) + val lease = TxHelpers.lease(masterDApp, invoker.toAddress, leaseAmount, txFeeAmount) + val commitments = Seq(miner, masterDApp).map { kp => + TxHelpers.commitToGeneration(generationPeriodStart = Height(3), sender = kp, fee = txFeeAmount) + } + + d.appender.appendBlock( + d.createBlock( + txs = Seq(setMasterScript, setServiceScript, lease) ++ commitments, + strictTime = true + ) + ) + + log.debug("Check master dApp balances") + val balanceDetails = d.accountsApi.balanceDetails(masterDApp.toAddress).explicitGet() + balanceDetails.available shouldBe available + + log.debug("Try to spend") + val invoke = TxHelpers.invoke(masterDApp.toAddress, invoker = invoker) + d.appendBlockE(invoke) should produce(s"${masterDApp.toAddress} -> trying to spend either a deposit or leased money") + } + } + property("should not allow payments overflow") { val invoker = TxHelpers.signer(1) val masterDApp = TxHelpers.signer(2) @@ -254,7 +341,7 @@ class SyncDAppPaymentTest extends PropSpec with WithDomain { d.appendBlock(setMasterScript, setServiceScript) - Long.MaxValue - d.balance(serviceDApp.toAddress) < paymentAmount + Long.MaxValue - d.balance(serviceDApp.toAddress) shouldBe <(paymentAmount) d.appendBlockE(invoke) should produce(s"Waves balance sum overflow") } diff --git a/node/tests/src/test/scala/com/wavesplatform/state/diffs/smart/predef/MatcherBlockchainTest.scala b/node/tests/src/test/scala/com/wavesplatform/state/diffs/smart/predef/MatcherBlockchainTest.scala index 73923518e5..2087f1ff91 100644 --- a/node/tests/src/test/scala/com/wavesplatform/state/diffs/smart/predef/MatcherBlockchainTest.scala +++ b/node/tests/src/test/scala/com/wavesplatform/state/diffs/smart/predef/MatcherBlockchainTest.scala @@ -63,7 +63,7 @@ class MatcherBlockchainTest extends PropSpec, WithDomain { override def wavesBalances(addresses: Seq[Address]): Map[Address, Long] = ??? override def effectiveBalanceBanHeights(address: Address): Seq[Int] = ??? override def resolveERC20Address(address: ERC20Address): Option[Asset.IssuedAsset] = ??? - override def lastStateHash(refId: Option[ByteStr]): BlockId = ??? + override def lastStateHash(liquidBlockId: Option[ByteStr]): BlockId = ??? override def committedGenerators(at: GenerationPeriod): IndexedSeq[(Address, BlsPublicKey)] = ??? override def conflictGenerators(at: GenerationPeriod): ConflictGenerators = ??? } diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/ImmutableBlockchain.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/ImmutableBlockchain.scala index 6f3aa9d91b..720228b09b 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/ImmutableBlockchain.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/ImmutableBlockchain.scala @@ -202,7 +202,7 @@ class ImmutableBlockchain(override val settings: BlockchainSettings, input: Ride override def conflictGenerators(at: GenerationPeriod): ConflictGenerators = ConflictGenerators.empty - override def lastStateHash(refId: Option[BlockId]): BlockId = ??? + override def lastStateHash(liquidBlockId: Option[BlockId]): BlockId = ??? // Ride: transferTransactionById override def transferById(id: ByteStr): Option[(Int, TransferTransactionLike)] = diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/LazyBlockchain.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/LazyBlockchain.scala index 7963fc6f09..2a4806b3d7 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/LazyBlockchain.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/LazyBlockchain.scala @@ -103,7 +103,7 @@ class LazyBlockchain[TagT] private ( override def effectiveBalanceBanHeights(address: Address): Seq[Int] = ??? - override def lastStateHash(refId: Option[BlockId]): BlockId = ??? + override def lastStateHash(liquidBlockId: Option[BlockId]): BlockId = ??? // Ride: blockInfoByHeight override def blockReward(height: Int): Option[Long] = blockHeaderWithVrf(Height(height)).map(_.blockReward)