diff options
Diffstat (limited to 'data/pools/transactionPool.go')
-rw-r--r-- | data/pools/transactionPool.go | 235 |
1 files changed, 67 insertions, 168 deletions
diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go index 9477a380d..4295e82ff 100644 --- a/data/pools/transactionPool.go +++ b/data/pools/transactionPool.go @@ -28,7 +28,6 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/data/pooldata" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" @@ -54,11 +53,6 @@ type TransactionPool struct { // with atomic operations which require 64 bit alignment on arm. feePerByte uint64 - // latestMeasuredDataExchangeRate is the average data exchange rate, as measured by the transaction sync. - // we use the latestMeasuredDataExchangeRate in order to determine the desired proposal size, so that it - // won't create undesired network bottlenecks. - latestMeasuredDataExchangeRate uint64 - // const logProcessBlockStats bool logAssembleStats bool @@ -69,7 +63,7 @@ type TransactionPool struct { mu deadlock.Mutex cond sync.Cond expiredTxCount map[basics.Round]int - pendingBlockEvaluator *ledger.BlockEvaluator + pendingBlockEvaluator BlockEvaluator numPendingWholeBlocks basics.Round feeThresholdMultiplier uint64 statusCache *statusCache @@ -81,31 +75,34 @@ type TransactionPool struct { assemblyRound basics.Round assemblyResults poolAsmResults - // pendingMu protects pendingTxGroups, pendingTxids, pendingCounter and pendingLatestLocal - pendingMu deadlock.RWMutex - // pendingTxGroups is a slice of the pending transaction groups. - pendingTxGroups []pooldata.SignedTxGroup - // pendingTxids is a map of the pending *transaction ids* included in the pendingTxGroups array. - pendingTxids map[transactions.Txid]transactions.SignedTxn - // pendingCounter is a monotomic counter, indicating the next pending transaction group counter value. - pendingCounter uint64 - // pendingLatestLocal is the value of the last transaction group counter which is associated with a transaction that was - // locally originated ( i.e. posted to this node via the REST API ) - pendingLatestLocal uint64 + // pendingMu protects pendingTxGroups and pendingTxids + pendingMu deadlock.RWMutex + pendingTxGroups [][]transactions.SignedTxn + pendingTxids map[transactions.Txid]transactions.SignedTxn // Calls to remember() add transactions to rememberedTxGroups and // rememberedTxids. Calling rememberCommit() adds them to the // pendingTxGroups and pendingTxids. This allows us to batch the // changes in OnNewBlock() without preventing a concurrent call - // to PendingTxGroups(). - rememberedTxGroups []pooldata.SignedTxGroup + // to PendingTxGroups() or Verified(). + rememberedTxGroups [][]transactions.SignedTxn rememberedTxids map[transactions.Txid]transactions.SignedTxn - // rememberedLatestLocal is the value of the last transaction group counter which is associated with a transaction that was - // locally originated ( i.e. posted to this node via the REST API ). This variable is used when OnNewBlock is called and - // we filter out the pending transaction through the evaluator. - rememberedLatestLocal uint64 log logging.Logger + + // proposalAssemblyTime is the ProposalAssemblyTime configured for this node. + proposalAssemblyTime time.Duration +} + +// BlockEvaluator defines the block evaluator interface exposed by the ledger package. +type BlockEvaluator interface { + TestTransactionGroup(txgroup []transactions.SignedTxn) error + Round() basics.Round + PaySetSize() int + TransactionGroup(txads []transactions.SignedTxnWithAD) error + Transaction(txn transactions.SignedTxn, ad transactions.ApplyData) error + GenerateBlock() (*ledgercore.ValidatedBlock, error) + ResetTxnBytes() } // MakeTransactionPool makes a transaction pool. @@ -123,6 +120,7 @@ func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Lo logAssembleStats: cfg.EnableAssembleStats, expFeeFactor: cfg.TxPoolExponentialIncreaseFactor, txPoolMaxSize: cfg.TxPoolSize, + proposalAssemblyTime: cfg.ProposalAssemblyTime, log: log, } pool.cond.L = &pool.mu @@ -137,7 +135,7 @@ type poolAsmResults struct { // the ok variable indicates whether the assembly for the block roundStartedEvaluating was complete ( i.e. ok == true ) or // whether it's still in-progress. ok bool - blk *ledger.ValidatedBlock + blk *ledgercore.ValidatedBlock stats telemetryspec.AssembleBlockMetrics err error // roundStartedEvaluating is the round which we were attempted to evaluate last. It's a good measure for @@ -164,14 +162,6 @@ const ( // duration it would take to execute the GenerateBlock() function generateBlockBaseDuration = 2 * time.Millisecond generateBlockTransactionDuration = 2155 * time.Nanosecond - - // minMaxTxnBytesPerBlock is the minimal maximum block size that the evaluator would be asked to create, in case - // the local node doesn't have sufficient bandwidth to support higher throughputs. - // for example: a node that has a very low bandwidth of 10KB/s. If we will follow the block size calculations, we - // would get to an unrealistic block size of 20KB. This could be due to a temporary network bandwidth fluctuations - // or other measuring issue. In order to ensure we have some more realistic block sizes to - // work with, we clamp the block size to the range of [minMaxTxnBytesPerBlock .. proto.MaxTxnBytesPerBlock]. - minMaxTxnBytesPerBlock = 100 * 1024 ) // ErrStaleBlockAssemblyRequest returned by AssembleBlock when requested block number is older than the current transaction pool round @@ -180,9 +170,11 @@ var ErrStaleBlockAssemblyRequest = fmt.Errorf("AssembleBlock: requested block as // Reset resets the content of the transaction pool func (pool *TransactionPool) Reset() { + pool.mu.Lock() + defer pool.mu.Unlock() + defer pool.cond.Broadcast() pool.pendingTxids = make(map[transactions.Txid]transactions.SignedTxn) pool.pendingTxGroups = nil - pool.pendingLatestLocal = pooldata.InvalidSignedTxGroupCounter pool.rememberedTxids = make(map[transactions.Txid]transactions.SignedTxn) pool.rememberedTxGroups = nil pool.expiredTxCount = make(map[basics.Round]int) @@ -216,15 +208,14 @@ func (pool *TransactionPool) PendingTxIDs() []transactions.Txid { } // PendingTxGroups returns a list of transaction groups that should be proposed -// in the next block, in order. As the second return value, it returns the transaction -// group counter of the latest local generated transaction group. -func (pool *TransactionPool) PendingTxGroups() ([]pooldata.SignedTxGroup, uint64) { +// in the next block, in order. +func (pool *TransactionPool) PendingTxGroups() [][]transactions.SignedTxn { pool.pendingMu.RLock() defer pool.pendingMu.RUnlock() // note that this operation is safe for the sole reason that arrays in go are immutable. // if the underlaying array need to be expanded, the actual underlaying array would need // to be reallocated. - return pool.pendingTxGroups, pool.pendingLatestLocal + return pool.pendingTxGroups } // pendingTxIDsCount returns the number of pending transaction ids that are still waiting @@ -248,26 +239,8 @@ func (pool *TransactionPool) rememberCommit(flush bool) { if flush { pool.pendingTxGroups = pool.rememberedTxGroups pool.pendingTxids = pool.rememberedTxids - pool.pendingLatestLocal = pool.rememberedLatestLocal pool.ledger.VerifiedTransactionCache().UpdatePinned(pool.pendingTxids) } else { - // update the GroupCounter on all the transaction groups we're going to add. - // this would ensure that each transaction group has a unique monotonic GroupCounter - encodingBuf := protocol.GetEncodingBuf() - for i, txGroup := range pool.rememberedTxGroups { - pool.pendingCounter++ - txGroup.GroupCounter = pool.pendingCounter - txGroup.EncodedLength = 0 - for _, txn := range txGroup.Transactions { - encodingBuf = encodingBuf[:0] - txGroup.EncodedLength += len(txn.MarshalMsg(encodingBuf)) - } - pool.rememberedTxGroups[i] = txGroup - if txGroup.LocallyOriginated { - pool.pendingLatestLocal = txGroup.GroupCounter - } - } - protocol.PutEncodingBuf(encodingBuf) pool.pendingTxGroups = append(pool.pendingTxGroups, pool.rememberedTxGroups...) for txid, txn := range pool.rememberedTxids { @@ -275,15 +248,8 @@ func (pool *TransactionPool) rememberCommit(flush bool) { } } - pool.resetRememberedTransactionGroups() -} - -// resetRememberedTransactionGroups clears the remembered transaction groups. -// The caller is assumed to be holding pool.mu. -func (pool *TransactionPool) resetRememberedTransactionGroups() { pool.rememberedTxGroups = nil pool.rememberedTxids = make(map[transactions.Txid]transactions.SignedTxn) - pool.rememberedLatestLocal = pooldata.InvalidSignedTxGroupCounter } // PendingCount returns the number of transactions currently pending in the pool. @@ -298,7 +264,7 @@ func (pool *TransactionPool) PendingCount() int { func (pool *TransactionPool) pendingCountNoLock() int { var count int for _, txgroup := range pool.pendingTxGroups { - count += len(txgroup.Transactions) + count += len(txgroup) } return count } @@ -362,12 +328,12 @@ func (pool *TransactionPool) computeFeePerByte() uint64 { // checkSufficientFee take a set of signed transactions and verifies that each transaction has // sufficient fee to get into the transaction pool -func (pool *TransactionPool) checkSufficientFee(txgroup pooldata.SignedTxGroup) error { +func (pool *TransactionPool) checkSufficientFee(txgroup []transactions.SignedTxn) error { // Special case: the compact cert transaction, if issued from the // special compact-cert-sender address, in a singleton group, pays // no fee. - if len(txgroup.Transactions) == 1 { - t := txgroup.Transactions[0].Txn + if len(txgroup) == 1 { + t := txgroup[0].Txn if t.Type == protocol.CompactCertTx && t.Sender == transactions.CompactCertSender && t.Fee.IsZero() { return nil } @@ -376,7 +342,7 @@ func (pool *TransactionPool) checkSufficientFee(txgroup pooldata.SignedTxGroup) // get the current fee per byte feePerByte := pool.computeFeePerByte() - for _, t := range txgroup.Transactions { + for _, t := range txgroup { feeThreshold := feePerByte * uint64(t.GetEncodedLength()) if t.Txn.Fee.Raw < feeThreshold { return fmt.Errorf("fee %d below threshold %d (%d per byte * %d bytes)", @@ -410,7 +376,7 @@ type poolIngestParams struct { } // remember attempts to add a transaction group to the pool. -func (pool *TransactionPool) remember(txgroup pooldata.SignedTxGroup) error { +func (pool *TransactionPool) remember(txgroup []transactions.SignedTxn) error { params := poolIngestParams{ recomputing: false, } @@ -419,7 +385,7 @@ func (pool *TransactionPool) remember(txgroup pooldata.SignedTxGroup) error { // add tries to add the transaction group to the pool, bypassing the fee // priority checks. -func (pool *TransactionPool) add(txgroup pooldata.SignedTxGroup, stats *telemetryspec.AssembleBlockMetrics) error { +func (pool *TransactionPool) add(txgroup []transactions.SignedTxn, stats *telemetryspec.AssembleBlockMetrics) error { params := poolIngestParams{ recomputing: true, stats: stats, @@ -432,7 +398,7 @@ func (pool *TransactionPool) add(txgroup pooldata.SignedTxGroup, stats *telemetr // // ingest assumes that pool.mu is locked. It might release the lock // while it waits for OnNewBlock() to be called. -func (pool *TransactionPool) ingest(txgroup pooldata.SignedTxGroup, params poolIngestParams) error { +func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poolIngestParams) error { if pool.pendingBlockEvaluator == nil { return fmt.Errorf("TransactionPool.ingest: no pending block evaluator") } @@ -454,10 +420,6 @@ func (pool *TransactionPool) ingest(txgroup pooldata.SignedTxGroup, params poolI if err != nil { return err } - - // since this is the first time the transaction was added to the transaction pool, it would - // be a good time now to figure the group's ID. - txgroup.GroupTransactionID = txgroup.Transactions.ID() } err := pool.addToPendingBlockEvaluator(txgroup, params.recomputing, params.stats) @@ -466,19 +428,22 @@ func (pool *TransactionPool) ingest(txgroup pooldata.SignedTxGroup, params poolI } pool.rememberedTxGroups = append(pool.rememberedTxGroups, txgroup) - for _, t := range txgroup.Transactions { + for _, t := range txgroup { pool.rememberedTxids[t.ID()] = t } - return nil } +// RememberOne stores the provided transaction. +// Precondition: Only RememberOne() properly-signed and well-formed transactions (i.e., ensure t.WellFormed()) +func (pool *TransactionPool) RememberOne(t transactions.SignedTxn) error { + return pool.Remember([]transactions.SignedTxn{t}) +} + // Remember stores the provided transaction group. // Precondition: Only Remember() properly-signed and well-formed transactions (i.e., ensure t.WellFormed()) -// The function is called by the transaction handler ( i.e. txsync or gossip ) or by the node when -// transaction is coming from a REST API call. -func (pool *TransactionPool) Remember(txgroup pooldata.SignedTxGroup) error { - if err := pool.checkPendingQueueSize(len(txgroup.Transactions)); err != nil { +func (pool *TransactionPool) Remember(txgroup []transactions.SignedTxn) error { + if err := pool.checkPendingQueueSize(len(txgroup)); err != nil { return err } @@ -494,34 +459,6 @@ func (pool *TransactionPool) Remember(txgroup pooldata.SignedTxGroup) error { return nil } -// RememberArray stores the provided transaction group. -// Precondition: Only RememberArray() properly-signed and well-formed transactions (i.e., ensure t.WellFormed()) -// The function is called by the transaction handler ( i.e. txsync ) -func (pool *TransactionPool) RememberArray(txgroups []pooldata.SignedTxGroup) error { - totalSize := 0 - for _, txGroup := range txgroups { - totalSize += len(txGroup.Transactions) - } - if err := pool.checkPendingQueueSize(totalSize); err != nil { - return err - } - - pool.mu.Lock() - defer pool.mu.Unlock() - - for _, txGroup := range txgroups { - err := pool.remember(txGroup) - if err != nil { - // we need to explicitly clear the remembered transaction groups here, since we might have added the first one successfully and then failing on the second one. - pool.resetRememberedTransactionGroups() - return fmt.Errorf("TransactionPool.RememberArray: %w", err) - } - } - - pool.rememberCommit(false) - return nil -} - // Lookup returns the error associated with a transaction that used // to be in the pool. If no status information is available (e.g., because // it was too long ago, or the transaction committed successfully), then @@ -625,9 +562,9 @@ func (pool *TransactionPool) isAssemblyTimedOut() bool { return time.Now().After(pool.assemblyDeadline.Add(-generateBlockDuration)) } -func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup pooldata.SignedTxGroup, recomputing bool, stats *telemetryspec.AssembleBlockMetrics) error { +func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactions.SignedTxn, recomputing bool, stats *telemetryspec.AssembleBlockMetrics) error { r := pool.pendingBlockEvaluator.Round() + pool.numPendingWholeBlocks - for _, tx := range txgroup.Transactions { + for _, tx := range txgroup { if tx.Txn.LastValid < r { return transactions.TxnDeadError{ Round: r, @@ -637,7 +574,7 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup pooldata.Sig } } - txgroupad := transactions.WrapSignedTxnsWithAD(txgroup.Transactions) + txgroupad := transactions.WrapSignedTxnsWithAD(txgroup) transactionGroupStartsTime := time.Time{} if recomputing { @@ -660,10 +597,10 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup pooldata.Sig stats.StopReason = telemetryspec.AssembleBlockAbandon pool.assemblyResults.stats = *stats pool.assemblyCond.Broadcast() - } else if err == ledger.ErrNoSpace || pool.isAssemblyTimedOut() { + } else if err == ledgercore.ErrNoSpace || pool.isAssemblyTimedOut() { pool.assemblyResults.ok = true pool.assemblyResults.assemblyCompletedOrAbandoned = true - if err == ledger.ErrNoSpace { + if err == ledgercore.ErrNoSpace { stats.StopReason = telemetryspec.AssembleBlockFull } else { stats.StopReason = telemetryspec.AssembleBlockTimeout @@ -690,9 +627,9 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup pooldata.Sig return err } -func (pool *TransactionPool) addToPendingBlockEvaluator(txgroup pooldata.SignedTxGroup, recomputing bool, stats *telemetryspec.AssembleBlockMetrics) error { +func (pool *TransactionPool) addToPendingBlockEvaluator(txgroup []transactions.SignedTxn, recomputing bool, stats *telemetryspec.AssembleBlockMetrics) error { err := pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats) - if err == ledger.ErrNoSpace { + if err == ledgercore.ErrNoSpace { pool.numPendingWholeBlocks++ pool.pendingBlockEvaluator.ResetTxnBytes() err = pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats) @@ -747,8 +684,12 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact if hint < 0 || int(knownCommitted) < 0 { hint = 0 } - pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, hint, pool.calculateMaxTxnBytesPerBlock(next.BlockHeader.CurrentProtocol)) + pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, hint, 0) if err != nil { + // The pendingBlockEvaluator is an interface, and in case of an evaluator error + // we want to remove the interface itself rather then keeping an interface + // to a nil. + pool.pendingBlockEvaluator = nil var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval if errors.As(err, &nonSeqBlockEval) { if nonSeqBlockEval.EvaluatorRound <= nonSeqBlockEval.LatestRound { @@ -768,17 +709,17 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact // Feed the transactions in order for _, txgroup := range txgroups { - if len(txgroup.Transactions) == 0 { + if len(txgroup) == 0 { asmStats.InvalidCount++ continue } - if _, alreadyCommitted := committedTxIds[txgroup.Transactions[0].ID()]; alreadyCommitted { + if _, alreadyCommitted := committedTxIds[txgroup[0].ID()]; alreadyCommitted { asmStats.EarlyCommittedCount++ continue } err := pool.add(txgroup, &asmStats) if err != nil { - for _, tx := range txgroup.Transactions { + for _, tx := range txgroup { pool.statusCache.put(tx, err.Error()) } @@ -798,8 +739,6 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact stats.RemovedInvalidCount++ pool.log.Warnf("Cannot re-add pending transaction to pool: %v", err) } - } else if txgroup.LocallyOriginated { - pool.rememberedLatestLocal = txgroup.GroupCounter } } @@ -809,7 +748,7 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact // assembly. We want to figure out how long have we spent before trying to evaluate the first transaction. // ( ideally it's near zero. The goal here is to see if we get to a near time-out situation before processing the // first transaction group ) - asmStats.TransactionsLoopStartTime = int64(firstTxnGrpTime.Sub(pool.assemblyDeadline.Add(-config.ProposalAssemblyTime))) + asmStats.TransactionsLoopStartTime = int64(firstTxnGrpTime.Sub(pool.assemblyDeadline.Add(-pool.proposalAssemblyTime))) } if !pool.assemblyResults.ok && pool.assemblyRound <= pool.pendingBlockEvaluator.Round() { @@ -834,7 +773,7 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact // AssembleBlock assembles a block for a given round, trying not to // take longer than deadline to finish. -func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Time) (assembled *ledger.ValidatedBlock, err error) { +func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Time) (assembled *ledgercore.ValidatedBlock, err error) { var stats telemetryspec.AssembleBlockMetrics if pool.logAssembleStats { @@ -975,7 +914,7 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim // assembleEmptyBlock construct a new block for the given round. Internally it's using the ledger database calls, so callers // need to be aware that it might take a while before it would return. -func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *ledger.ValidatedBlock, err error) { +func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *ledgercore.ValidatedBlock, err error) { prevRound := round - 1 prev, err := pool.ledger.BlockHdr(prevRound) if err != nil { @@ -983,7 +922,7 @@ func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled * return nil, err } next := bookkeeping.MakeBlock(prev) - blockEval, err := pool.ledger.StartEvaluator(next.BlockHeader, 0, pool.calculateMaxTxnBytesPerBlock(next.BlockHeader.CurrentProtocol)) + blockEval, err := pool.ledger.StartEvaluator(next.BlockHeader, 0, 0) if err != nil { var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval if errors.As(err, &nonSeqBlockEval) { @@ -999,48 +938,8 @@ func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled * return blockEval.GenerateBlock() } -// SetDataExchangeRate updates the data exchange rate this node is expected to have. -func (pool *TransactionPool) SetDataExchangeRate(dataExchangeRate uint64) { - atomic.StoreUint64(&pool.latestMeasuredDataExchangeRate, dataExchangeRate) -} - -// calculateMaxTxnBytesPerBlock computes the optimal block size for the current node, based -// on it's effective network capabilities. This number is bound by the protocol MaxTxnBytesPerBlock. -func (pool *TransactionPool) calculateMaxTxnBytesPerBlock(consensusVersion protocol.ConsensusVersion) int { - // get the latest data exchange rate we received from the transaction sync. - dataExchangeRate := atomic.LoadUint64(&pool.latestMeasuredDataExchangeRate) - - // if we never received an update from the transaction sync connector about the data exchange rate, - // just let the evaluator use the consensus's default value. - if dataExchangeRate == 0 { - return 0 - } - - // get the consensus parameters for the given consensus version. - proto, ok := config.Consensus[consensusVersion] - if !ok { - // if we can't figure out the consensus version, just return 0. - return 0 - } - - // calculate the amount of data we can send in half of the agreement period. - halfMaxBlockSize := int(time.Duration(dataExchangeRate)*proto.AgreementFilterTimeoutPeriod0/time.Second) / 2 - - // if the amount of data is too high, bound it by the consensus parameters. - if halfMaxBlockSize > proto.MaxTxnBytesPerBlock { - return proto.MaxTxnBytesPerBlock - } - - // if the amount of data is too low, use the low transaction bytes threshold. - if halfMaxBlockSize < minMaxTxnBytesPerBlock { - return minMaxTxnBytesPerBlock - } - - return halfMaxBlockSize -} - // AssembleDevModeBlock assemble a new block from the existing transaction pool. The pending evaluator is being -func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledger.ValidatedBlock, err error) { +func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.ValidatedBlock, err error) { pool.mu.Lock() defer pool.mu.Unlock() @@ -1049,6 +948,6 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledger.Validated // The above was already pregenerating the entire block, // so there won't be any waiting on this call. - assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(config.ProposalAssemblyTime)) + assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(pool.proposalAssemblyTime)) return } |