diff options
Diffstat (limited to 'data/pools/transactionPool.go')
-rw-r--r-- | data/pools/transactionPool.go | 66 |
1 files changed, 52 insertions, 14 deletions
diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go index 35871e91a..4295e82ff 100644 --- a/data/pools/transactionPool.go +++ b/data/pools/transactionPool.go @@ -17,6 +17,7 @@ package pools import ( + "errors" "fmt" "sync" "sync/atomic" @@ -62,7 +63,7 @@ type TransactionPool struct { mu deadlock.Mutex cond sync.Cond expiredTxCount map[basics.Round]int - pendingBlockEvaluator *ledger.BlockEvaluator + pendingBlockEvaluator BlockEvaluator numPendingWholeBlocks basics.Round feeThresholdMultiplier uint64 statusCache *statusCache @@ -88,6 +89,20 @@ type TransactionPool struct { rememberedTxids map[transactions.Txid]transactions.SignedTxn log logging.Logger + + // proposalAssemblyTime is the ProposalAssemblyTime configured for this node. + proposalAssemblyTime time.Duration +} + +// BlockEvaluator defines the block evaluator interface exposed by the ledger package. +type BlockEvaluator interface { + TestTransactionGroup(txgroup []transactions.SignedTxn) error + Round() basics.Round + PaySetSize() int + TransactionGroup(txads []transactions.SignedTxnWithAD) error + Transaction(txn transactions.SignedTxn, ad transactions.ApplyData) error + GenerateBlock() (*ledgercore.ValidatedBlock, error) + ResetTxnBytes() } // MakeTransactionPool makes a transaction pool. @@ -105,6 +120,7 @@ func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Lo logAssembleStats: cfg.EnableAssembleStats, expFeeFactor: cfg.TxPoolExponentialIncreaseFactor, txPoolMaxSize: cfg.TxPoolSize, + proposalAssemblyTime: cfg.ProposalAssemblyTime, log: log, } pool.cond.L = &pool.mu @@ -119,7 +135,7 @@ type poolAsmResults struct { // the ok variable indicates whether the assembly for the block roundStartedEvaluating was complete ( i.e. ok == true ) or // whether it's still in-progress. ok bool - blk *ledger.ValidatedBlock + blk *ledgercore.ValidatedBlock stats telemetryspec.AssembleBlockMetrics err error // roundStartedEvaluating is the round which we were attempted to evaluate last. It's a good measure for @@ -154,6 +170,9 @@ var ErrStaleBlockAssemblyRequest = fmt.Errorf("AssembleBlock: requested block as // Reset resets the content of the transaction pool func (pool *TransactionPool) Reset() { + pool.mu.Lock() + defer pool.mu.Unlock() + defer pool.cond.Broadcast() pool.pendingTxids = make(map[transactions.Txid]transactions.SignedTxn) pool.pendingTxGroups = nil pool.rememberedTxids = make(map[transactions.Txid]transactions.SignedTxn) @@ -578,10 +597,10 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactio stats.StopReason = telemetryspec.AssembleBlockAbandon pool.assemblyResults.stats = *stats pool.assemblyCond.Broadcast() - } else if err == ledger.ErrNoSpace || pool.isAssemblyTimedOut() { + } else if err == ledgercore.ErrNoSpace || pool.isAssemblyTimedOut() { pool.assemblyResults.ok = true pool.assemblyResults.assemblyCompletedOrAbandoned = true - if err == ledger.ErrNoSpace { + if err == ledgercore.ErrNoSpace { stats.StopReason = telemetryspec.AssembleBlockFull } else { stats.StopReason = telemetryspec.AssembleBlockTimeout @@ -610,7 +629,7 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactio func (pool *TransactionPool) addToPendingBlockEvaluator(txgroup []transactions.SignedTxn, recomputing bool, stats *telemetryspec.AssembleBlockMetrics) error { err := pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats) - if err == ledger.ErrNoSpace { + if err == ledgercore.ErrNoSpace { pool.numPendingWholeBlocks++ pool.pendingBlockEvaluator.ResetTxnBytes() err = pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats) @@ -665,8 +684,19 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact if hint < 0 || int(knownCommitted) < 0 { hint = 0 } - pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, hint) + pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, hint, 0) if err != nil { + // The pendingBlockEvaluator is an interface, and in case of an evaluator error + // we want to remove the interface itself rather then keeping an interface + // to a nil. + pool.pendingBlockEvaluator = nil + var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval + if errors.As(err, &nonSeqBlockEval) { + if nonSeqBlockEval.EvaluatorRound <= nonSeqBlockEval.LatestRound { + pool.log.Infof("TransactionPool.recomputeBlockEvaluator: skipped creating block evaluator for round %d since ledger already caught up with that round", nonSeqBlockEval.EvaluatorRound) + return + } + } pool.log.Warnf("TransactionPool.recomputeBlockEvaluator: cannot start evaluator: %v", err) return } @@ -718,7 +748,7 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact // assembly. We want to figure out how long have we spent before trying to evaluate the first transaction. // ( ideally it's near zero. The goal here is to see if we get to a near time-out situation before processing the // first transaction group ) - asmStats.TransactionsLoopStartTime = int64(firstTxnGrpTime.Sub(pool.assemblyDeadline.Add(-config.ProposalAssemblyTime))) + asmStats.TransactionsLoopStartTime = int64(firstTxnGrpTime.Sub(pool.assemblyDeadline.Add(-pool.proposalAssemblyTime))) } if !pool.assemblyResults.ok && pool.assemblyRound <= pool.pendingBlockEvaluator.Round() { @@ -743,7 +773,7 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact // AssembleBlock assembles a block for a given round, trying not to // take longer than deadline to finish. -func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Time) (assembled *ledger.ValidatedBlock, err error) { +func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Time) (assembled *ledgercore.ValidatedBlock, err error) { var stats telemetryspec.AssembleBlockMetrics if pool.logAssembleStats { @@ -853,7 +883,7 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim pool.log.Warnf("AssembleBlock: ran out of time for round %d", round) stats.StopReason = telemetryspec.AssembleBlockTimeout if emptyBlockErr != nil { - emptyBlockErr = fmt.Errorf("AssembleBlock: failed to construct empty block : %v", emptyBlockErr) + emptyBlockErr = fmt.Errorf("AssembleBlock: failed to construct empty block : %w", emptyBlockErr) } return emptyBlock, emptyBlockErr } @@ -884,7 +914,7 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim // assembleEmptyBlock construct a new block for the given round. Internally it's using the ledger database calls, so callers // need to be aware that it might take a while before it would return. -func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *ledger.ValidatedBlock, err error) { +func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *ledgercore.ValidatedBlock, err error) { prevRound := round - 1 prev, err := pool.ledger.BlockHdr(prevRound) if err != nil { @@ -892,16 +922,24 @@ func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled * return nil, err } next := bookkeeping.MakeBlock(prev) - blockEval, err := pool.ledger.StartEvaluator(next.BlockHeader, 0) + blockEval, err := pool.ledger.StartEvaluator(next.BlockHeader, 0, 0) if err != nil { - err = fmt.Errorf("TransactionPool.assembleEmptyBlock: cannot start evaluator for %d: %v", round, err) + var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval + if errors.As(err, &nonSeqBlockEval) { + if nonSeqBlockEval.EvaluatorRound <= nonSeqBlockEval.LatestRound { + // in the case that the ledger have already moved beyond that round, just let the agreement know that + // we don't generate a block and it's perfectly fine. + return nil, ErrStaleBlockAssemblyRequest + } + } + err = fmt.Errorf("TransactionPool.assembleEmptyBlock: cannot start evaluator for %d: %w", round, err) return nil, err } return blockEval.GenerateBlock() } // AssembleDevModeBlock assemble a new block from the existing transaction pool. The pending evaluator is being -func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledger.ValidatedBlock, err error) { +func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.ValidatedBlock, err error) { pool.mu.Lock() defer pool.mu.Unlock() @@ -910,6 +948,6 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledger.Validated // The above was already pregenerating the entire block, // so there won't be any waiting on this call. - assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(config.ProposalAssemblyTime)) + assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(pool.proposalAssemblyTime)) return } |