diff options
Diffstat (limited to 'catchup/service.go')
-rw-r--r-- | catchup/service.go | 37 |
1 files changed, 31 insertions, 6 deletions
diff --git a/catchup/service.go b/catchup/service.go index f8f92b9b6..27ce957ba 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -18,6 +18,7 @@ package catchup import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -28,7 +29,6 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/logging/telemetryspec" @@ -58,8 +58,8 @@ type Ledger interface { LastRound() basics.Round Block(basics.Round) (bookkeeping.Block, error) IsWritingCatchpointFile() bool - Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledger.ValidatedBlock, error) - AddValidatedBlock(vb ledger.ValidatedBlock, cert agreement.Certificate) error + Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error) + AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error } // Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network. @@ -156,8 +156,19 @@ func (s *Service) SynchronizingTime() time.Duration { return time.Duration(timeInNS - startNS) } +// errLedgerAlreadyHasBlock is returned by innerFetch in case the local ledger already has the requested block. +var errLedgerAlreadyHasBlock = errors.New("ledger already has block") + // function scope to make a bunch of defer statements better func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeeping.Block, cert *agreement.Certificate, ddur time.Duration, err error) { + ledgerWaitCh := s.ledger.Wait(r) + select { + case <-ledgerWaitCh: + // if our ledger already have this block, no need to attempt to fetch it. + return nil, nil, time.Duration(0), errLedgerAlreadyHasBlock + default: + } + ctx, cf := context.WithCancel(s.ctx) fetcher := makeUniversalBlockFetcher(s.log, s.net, s.cfg) defer cf() @@ -166,11 +177,21 @@ func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeepin go func() { select { case <-stopWaitingForLedgerRound: - case <-s.ledger.Wait(r): + case <-ledgerWaitCh: cf() } }() - return fetcher.fetchBlock(ctx, r, peer) + blk, cert, ddur, err = fetcher.fetchBlock(ctx, r, peer) + // check to see if we aborted due to ledger. + if err != nil { + select { + case <-ledgerWaitCh: + // yes, we aborted since the ledger received this round. + err = errLedgerAlreadyHasBlock + default: + } + } + return } // fetchAndWrite fetches a block, checks the cert, and writes it to the ledger. Cert checking and ledger writing both wait for the ledger to advance if necessary. @@ -219,6 +240,10 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, block, cert, blockDownloadDuration, err := s.innerFetch(r, peer) if err != nil { + if err == errLedgerAlreadyHasBlock { + // ledger already has the block, no need to request this block from anyone. + return true + } s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i) peerSelector.rankPeer(psp, peerRankDownloadFailed) // we've just failed to retrieve a block; wait until the previous block is fetched before trying again @@ -307,7 +332,7 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, } if s.cfg.CatchupVerifyTransactionSignatures() || s.cfg.CatchupVerifyApplyData() { - var vb *ledger.ValidatedBlock + var vb *ledgercore.ValidatedBlock vb, err = s.ledger.Validate(s.ctx, *block, s.blockValidationPool) if err != nil { if s.ctx.Err() != nil { |