summaryrefslogtreecommitdiff
path: root/data/pools/transactionPool.go
diff options
context:
space:
mode:
Diffstat (limited to 'data/pools/transactionPool.go')
-rw-r--r--data/pools/transactionPool.go66
1 files changed, 52 insertions, 14 deletions
diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go
index 35871e91a..4295e82ff 100644
--- a/data/pools/transactionPool.go
+++ b/data/pools/transactionPool.go
@@ -17,6 +17,7 @@
package pools
import (
+ "errors"
"fmt"
"sync"
"sync/atomic"
@@ -62,7 +63,7 @@ type TransactionPool struct {
mu deadlock.Mutex
cond sync.Cond
expiredTxCount map[basics.Round]int
- pendingBlockEvaluator *ledger.BlockEvaluator
+ pendingBlockEvaluator BlockEvaluator
numPendingWholeBlocks basics.Round
feeThresholdMultiplier uint64
statusCache *statusCache
@@ -88,6 +89,20 @@ type TransactionPool struct {
rememberedTxids map[transactions.Txid]transactions.SignedTxn
log logging.Logger
+
+ // proposalAssemblyTime is the ProposalAssemblyTime configured for this node.
+ proposalAssemblyTime time.Duration
+}
+
+// BlockEvaluator defines the block evaluator interface exposed by the ledger package.
+type BlockEvaluator interface {
+ TestTransactionGroup(txgroup []transactions.SignedTxn) error
+ Round() basics.Round
+ PaySetSize() int
+ TransactionGroup(txads []transactions.SignedTxnWithAD) error
+ Transaction(txn transactions.SignedTxn, ad transactions.ApplyData) error
+ GenerateBlock() (*ledgercore.ValidatedBlock, error)
+ ResetTxnBytes()
}
// MakeTransactionPool makes a transaction pool.
@@ -105,6 +120,7 @@ func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Lo
logAssembleStats: cfg.EnableAssembleStats,
expFeeFactor: cfg.TxPoolExponentialIncreaseFactor,
txPoolMaxSize: cfg.TxPoolSize,
+ proposalAssemblyTime: cfg.ProposalAssemblyTime,
log: log,
}
pool.cond.L = &pool.mu
@@ -119,7 +135,7 @@ type poolAsmResults struct {
// the ok variable indicates whether the assembly for the block roundStartedEvaluating was complete ( i.e. ok == true ) or
// whether it's still in-progress.
ok bool
- blk *ledger.ValidatedBlock
+ blk *ledgercore.ValidatedBlock
stats telemetryspec.AssembleBlockMetrics
err error
// roundStartedEvaluating is the round which we were attempted to evaluate last. It's a good measure for
@@ -154,6 +170,9 @@ var ErrStaleBlockAssemblyRequest = fmt.Errorf("AssembleBlock: requested block as
// Reset resets the content of the transaction pool
func (pool *TransactionPool) Reset() {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+ defer pool.cond.Broadcast()
pool.pendingTxids = make(map[transactions.Txid]transactions.SignedTxn)
pool.pendingTxGroups = nil
pool.rememberedTxids = make(map[transactions.Txid]transactions.SignedTxn)
@@ -578,10 +597,10 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactio
stats.StopReason = telemetryspec.AssembleBlockAbandon
pool.assemblyResults.stats = *stats
pool.assemblyCond.Broadcast()
- } else if err == ledger.ErrNoSpace || pool.isAssemblyTimedOut() {
+ } else if err == ledgercore.ErrNoSpace || pool.isAssemblyTimedOut() {
pool.assemblyResults.ok = true
pool.assemblyResults.assemblyCompletedOrAbandoned = true
- if err == ledger.ErrNoSpace {
+ if err == ledgercore.ErrNoSpace {
stats.StopReason = telemetryspec.AssembleBlockFull
} else {
stats.StopReason = telemetryspec.AssembleBlockTimeout
@@ -610,7 +629,7 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactio
func (pool *TransactionPool) addToPendingBlockEvaluator(txgroup []transactions.SignedTxn, recomputing bool, stats *telemetryspec.AssembleBlockMetrics) error {
err := pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats)
- if err == ledger.ErrNoSpace {
+ if err == ledgercore.ErrNoSpace {
pool.numPendingWholeBlocks++
pool.pendingBlockEvaluator.ResetTxnBytes()
err = pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats)
@@ -665,8 +684,19 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact
if hint < 0 || int(knownCommitted) < 0 {
hint = 0
}
- pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, hint)
+ pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, hint, 0)
if err != nil {
+ // The pendingBlockEvaluator is an interface, and in case of an evaluator error
+ // we want to remove the interface itself rather then keeping an interface
+ // to a nil.
+ pool.pendingBlockEvaluator = nil
+ var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval
+ if errors.As(err, &nonSeqBlockEval) {
+ if nonSeqBlockEval.EvaluatorRound <= nonSeqBlockEval.LatestRound {
+ pool.log.Infof("TransactionPool.recomputeBlockEvaluator: skipped creating block evaluator for round %d since ledger already caught up with that round", nonSeqBlockEval.EvaluatorRound)
+ return
+ }
+ }
pool.log.Warnf("TransactionPool.recomputeBlockEvaluator: cannot start evaluator: %v", err)
return
}
@@ -718,7 +748,7 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact
// assembly. We want to figure out how long have we spent before trying to evaluate the first transaction.
// ( ideally it's near zero. The goal here is to see if we get to a near time-out situation before processing the
// first transaction group )
- asmStats.TransactionsLoopStartTime = int64(firstTxnGrpTime.Sub(pool.assemblyDeadline.Add(-config.ProposalAssemblyTime)))
+ asmStats.TransactionsLoopStartTime = int64(firstTxnGrpTime.Sub(pool.assemblyDeadline.Add(-pool.proposalAssemblyTime)))
}
if !pool.assemblyResults.ok && pool.assemblyRound <= pool.pendingBlockEvaluator.Round() {
@@ -743,7 +773,7 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact
// AssembleBlock assembles a block for a given round, trying not to
// take longer than deadline to finish.
-func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Time) (assembled *ledger.ValidatedBlock, err error) {
+func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Time) (assembled *ledgercore.ValidatedBlock, err error) {
var stats telemetryspec.AssembleBlockMetrics
if pool.logAssembleStats {
@@ -853,7 +883,7 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim
pool.log.Warnf("AssembleBlock: ran out of time for round %d", round)
stats.StopReason = telemetryspec.AssembleBlockTimeout
if emptyBlockErr != nil {
- emptyBlockErr = fmt.Errorf("AssembleBlock: failed to construct empty block : %v", emptyBlockErr)
+ emptyBlockErr = fmt.Errorf("AssembleBlock: failed to construct empty block : %w", emptyBlockErr)
}
return emptyBlock, emptyBlockErr
}
@@ -884,7 +914,7 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim
// assembleEmptyBlock construct a new block for the given round. Internally it's using the ledger database calls, so callers
// need to be aware that it might take a while before it would return.
-func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *ledger.ValidatedBlock, err error) {
+func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *ledgercore.ValidatedBlock, err error) {
prevRound := round - 1
prev, err := pool.ledger.BlockHdr(prevRound)
if err != nil {
@@ -892,16 +922,24 @@ func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *
return nil, err
}
next := bookkeeping.MakeBlock(prev)
- blockEval, err := pool.ledger.StartEvaluator(next.BlockHeader, 0)
+ blockEval, err := pool.ledger.StartEvaluator(next.BlockHeader, 0, 0)
if err != nil {
- err = fmt.Errorf("TransactionPool.assembleEmptyBlock: cannot start evaluator for %d: %v", round, err)
+ var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval
+ if errors.As(err, &nonSeqBlockEval) {
+ if nonSeqBlockEval.EvaluatorRound <= nonSeqBlockEval.LatestRound {
+ // in the case that the ledger have already moved beyond that round, just let the agreement know that
+ // we don't generate a block and it's perfectly fine.
+ return nil, ErrStaleBlockAssemblyRequest
+ }
+ }
+ err = fmt.Errorf("TransactionPool.assembleEmptyBlock: cannot start evaluator for %d: %w", round, err)
return nil, err
}
return blockEval.GenerateBlock()
}
// AssembleDevModeBlock assemble a new block from the existing transaction pool. The pending evaluator is being
-func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledger.ValidatedBlock, err error) {
+func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.ValidatedBlock, err error) {
pool.mu.Lock()
defer pool.mu.Unlock()
@@ -910,6 +948,6 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledger.Validated
// The above was already pregenerating the entire block,
// so there won't be any waiting on this call.
- assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(config.ProposalAssemblyTime))
+ assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(pool.proposalAssemblyTime))
return
}