summaryrefslogtreecommitdiff
path: root/catchup/service.go
diff options
context:
space:
mode:
Diffstat (limited to 'catchup/service.go')
-rw-r--r--catchup/service.go37
1 files changed, 31 insertions, 6 deletions
diff --git a/catchup/service.go b/catchup/service.go
index f8f92b9b6..27ce957ba 100644
--- a/catchup/service.go
+++ b/catchup/service.go
@@ -18,6 +18,7 @@ package catchup
import (
"context"
+ "errors"
"fmt"
"sync"
"sync/atomic"
@@ -28,7 +29,6 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
- "github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
@@ -58,8 +58,8 @@ type Ledger interface {
LastRound() basics.Round
Block(basics.Round) (bookkeeping.Block, error)
IsWritingCatchpointFile() bool
- Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledger.ValidatedBlock, error)
- AddValidatedBlock(vb ledger.ValidatedBlock, cert agreement.Certificate) error
+ Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error)
+ AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error
}
// Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network.
@@ -156,8 +156,19 @@ func (s *Service) SynchronizingTime() time.Duration {
return time.Duration(timeInNS - startNS)
}
+// errLedgerAlreadyHasBlock is returned by innerFetch in case the local ledger already has the requested block.
+var errLedgerAlreadyHasBlock = errors.New("ledger already has block")
+
// function scope to make a bunch of defer statements better
func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeeping.Block, cert *agreement.Certificate, ddur time.Duration, err error) {
+ ledgerWaitCh := s.ledger.Wait(r)
+ select {
+ case <-ledgerWaitCh:
+ // if our ledger already have this block, no need to attempt to fetch it.
+ return nil, nil, time.Duration(0), errLedgerAlreadyHasBlock
+ default:
+ }
+
ctx, cf := context.WithCancel(s.ctx)
fetcher := makeUniversalBlockFetcher(s.log, s.net, s.cfg)
defer cf()
@@ -166,11 +177,21 @@ func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeepin
go func() {
select {
case <-stopWaitingForLedgerRound:
- case <-s.ledger.Wait(r):
+ case <-ledgerWaitCh:
cf()
}
}()
- return fetcher.fetchBlock(ctx, r, peer)
+ blk, cert, ddur, err = fetcher.fetchBlock(ctx, r, peer)
+ // check to see if we aborted due to ledger.
+ if err != nil {
+ select {
+ case <-ledgerWaitCh:
+ // yes, we aborted since the ledger received this round.
+ err = errLedgerAlreadyHasBlock
+ default:
+ }
+ }
+ return
}
// fetchAndWrite fetches a block, checks the cert, and writes it to the ledger. Cert checking and ledger writing both wait for the ledger to advance if necessary.
@@ -219,6 +240,10 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
block, cert, blockDownloadDuration, err := s.innerFetch(r, peer)
if err != nil {
+ if err == errLedgerAlreadyHasBlock {
+ // ledger already has the block, no need to request this block from anyone.
+ return true
+ }
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
// we've just failed to retrieve a block; wait until the previous block is fetched before trying again
@@ -307,7 +332,7 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
}
if s.cfg.CatchupVerifyTransactionSignatures() || s.cfg.CatchupVerifyApplyData() {
- var vb *ledger.ValidatedBlock
+ var vb *ledgercore.ValidatedBlock
vb, err = s.ledger.Validate(s.ctx, *block, s.blockValidationPool)
if err != nil {
if s.ctx.Err() != nil {