summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGary <982483+gmalouf@users.noreply.github.com>2024-02-21 15:33:35 -0500
committerGitHub <noreply@github.com>2024-02-21 15:33:35 -0500
commit52964edd9795c4cacc7d107ac9f41f73374fce71 (patch)
treed203ae6f91aa09a0ff9d47294f86b5269c2885a3
parentd8c825d96b90b4f1c84a5b3771987ab53bfc8568 (diff)
Network: Class-based Peer Selector (#5937)
-rw-r--r--catchup/catchpointService.go80
-rw-r--r--catchup/classBasedPeerSelector.go156
-rw-r--r--catchup/classBasedPeerSelector_test.go498
-rw-r--r--catchup/peerSelector.go34
-rw-r--r--catchup/peerSelector_test.go22
-rw-r--r--catchup/service.go108
-rw-r--r--catchup/service_test.go111
7 files changed, 789 insertions, 220 deletions
diff --git a/catchup/catchpointService.go b/catchup/catchpointService.go
index 2c4f6dfc4..3c11d0db7 100644
--- a/catchup/catchpointService.go
+++ b/catchup/catchpointService.go
@@ -18,6 +18,7 @@ package catchup
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
@@ -69,7 +70,7 @@ type CatchpointCatchupStats struct {
type CatchpointCatchupService struct {
// stats is the statistics object, updated async while downloading the ledger
stats CatchpointCatchupStats
- // statsMu synchronizes access to stats, as we could attempt to update it while querying for it's current state
+ // statsMu synchronizes access to stats, as we could attempt to update it while querying for its current state
statsMu deadlock.Mutex
node CatchpointCatchupNodeServices
// ctx is the node cancellation context, used when the node is being stopped.
@@ -98,7 +99,7 @@ type CatchpointCatchupService struct {
abortCtx context.Context
abortCtxFunc context.CancelFunc
// blocksDownloadPeerSelector is the peer selector used for downloading blocks.
- blocksDownloadPeerSelector *peerSelector
+ blocksDownloadPeerSelector peerSelector
}
// MakeResumedCatchpointCatchupService creates a catchpoint catchup service for a node that is already in catchpoint catchup mode
@@ -280,51 +281,50 @@ func (cs *CatchpointCatchupService) processStageInactive() (err error) {
}
// processStageLedgerDownload is the second catchpoint catchup stage. It downloads the ledger.
-func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
+func (cs *CatchpointCatchupService) processStageLedgerDownload() error {
cs.statsMu.Lock()
label := cs.stats.CatchpointLabel
cs.statsMu.Unlock()
- round, _, err0 := ledgercore.ParseCatchpointLabel(label)
+ round, _, err := ledgercore.ParseCatchpointLabel(label)
- if err0 != nil {
- return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err0))
+ if err != nil {
+ return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err))
}
// download balances file.
- peerSelector := cs.makeCatchpointPeerSelector()
- ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
+ lf := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
attemptsCount := 0
for {
attemptsCount++
- err = cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
- if err != nil {
+ err0 := cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
+ if err0 != nil {
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}
- return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err))
+ return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err0))
}
- psp, err := peerSelector.getNextPeer()
- if err != nil {
- err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
- return cs.abort(err)
+ psp, err0 := cs.blocksDownloadPeerSelector.getNextPeer()
+ if err0 != nil {
+ err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
+ return cs.abort(err0)
}
peer := psp.Peer
start := time.Now()
- err = ledgerFetcher.downloadLedger(cs.ctx, peer, round)
- if err == nil {
+ err0 = lf.downloadLedger(cs.ctx, peer, round)
+ if err0 == nil {
cs.log.Infof("ledger downloaded in %d seconds", time.Since(start)/time.Second)
start = time.Now()
- err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
- if err == nil {
+ err0 = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
+ if err0 == nil {
cs.log.Infof("built merkle trie in %d seconds", time.Since(start)/time.Second)
break
}
// failed to build the merkle trie for the above catchpoint file.
- peerSelector.rankPeer(psp, peerRankInvalidDownload)
+ cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
} else {
- peerSelector.rankPeer(psp, peerRankDownloadFailed)
+ cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed)
}
// instead of testing for err == cs.ctx.Err() , we'll check on the context itself.
@@ -335,10 +335,10 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
}
if attemptsCount >= cs.config.CatchupLedgerDownloadRetryAttempts {
- err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
- return cs.abort(err)
+ err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
+ return cs.abort(err0)
}
- cs.log.Warnf("unable to download ledger : %v", err)
+ cs.log.Warnf("unable to download ledger : %v", err0)
}
err = cs.updateStage(ledger.CatchpointCatchupStateLatestBlockDownload)
@@ -506,14 +506,14 @@ func lookbackForStateproofsSupport(topBlock *bookkeeping.Block) uint64 {
return uint64(topBlock.Round().SubSaturate(lowestStateProofRound))
}
-// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against it's predecessor.
+// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against its predecessor.
func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
topBlock, err := cs.ledgerAccessor.EnsureFirstBlock(cs.ctx)
if err != nil {
return cs.abort(fmt.Errorf("processStageBlocksDownload failed, unable to ensure first block : %v", err))
}
- // pick the lookback with the greater of
+ // pick the lookback with the greatest of
// either (MaxTxnLife+DeeperBlockHeaderHistory+CatchpointLookback) or MaxBalLookback
// Explanation:
// 1. catchpoint snapshots accounts at round X-CatchpointLookback
@@ -531,13 +531,13 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
}
// in case the effective lookback is going before our rounds count, trim it there.
- // ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback..MaxTxnLife)
+ // ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback...MaxTxnLife)
if lookback >= uint64(topBlock.Round()) {
lookback = uint64(topBlock.Round() - 1)
}
cs.statsMu.Lock()
- cs.stats.TotalBlocks = uint64(lookback)
+ cs.stats.TotalBlocks = lookback
cs.stats.AcquiredBlocks = 0
cs.stats.VerifiedBlocks = 0
cs.statsMu.Unlock()
@@ -558,8 +558,9 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
blk = &ledgerBlock
cert = &ledgerCert
} else {
- switch err0.(type) {
- case ledgercore.ErrNoEntry:
+ var errNoEntry ledgercore.ErrNoEntry
+ switch {
+ case errors.As(err0, &errNoEntry):
// this is expected, ignore this one.
default:
cs.log.Warnf("processStageBlocksDownload encountered the following error when attempting to retrieve the block for round %d : %v", topBlock.Round()-basics.Round(blocksFetched), err0)
@@ -658,7 +659,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
if err != nil {
- if err == errPeerSelectorNoPeerPoolsAvailable {
+ if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {
cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.")
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
time.Sleep(noPeersAvailableSleepInterval)
@@ -718,7 +719,7 @@ func (cs *CatchpointCatchupService) processStageSwitch() (err error) {
// stopOrAbort is called when any of the stage processing function sees that cs.ctx has been canceled. It can be
// due to the end user attempting to abort the current catchpoint catchup operation or due to a node shutdown.
func (cs *CatchpointCatchupService) stopOrAbort() error {
- if cs.abortCtx.Err() == context.Canceled {
+ if errors.Is(cs.abortCtx.Err(), context.Canceled) {
return cs.abort(context.Canceled)
}
return nil
@@ -749,7 +750,7 @@ func (cs *CatchpointCatchupService) updateStage(newStage ledger.CatchpointCatchu
return nil
}
-// updateNodeCatchupMode requests the node to change it's operational mode from
+// updateNodeCatchupMode requests the node to change its operational mode from
// catchup mode to normal mode and vice versa.
func (cs *CatchpointCatchupService) updateNodeCatchupMode(catchupModeEnabled bool) {
newCtxCh := cs.node.SetCatchpointCatchupMode(catchupModeEnabled)
@@ -802,15 +803,7 @@ func (cs *CatchpointCatchupService) updateBlockRetrievalStatistics(acquiredBlock
}
func (cs *CatchpointCatchupService) initDownloadPeerSelector() {
- cs.blocksDownloadPeerSelector = cs.makeCatchpointPeerSelector()
-}
-
-func (cs *CatchpointCatchupService) makeCatchpointPeerSelector() *peerSelector {
- return makePeerSelector(
- cs.net,
- []peerClass{
- {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays},
- })
+ cs.blocksDownloadPeerSelector = makeCatchpointPeerSelector(cs.net)
}
// checkLedgerDownload sends a HEAD request to the ledger endpoint of peers to validate the catchpoint's availability
@@ -821,10 +814,9 @@ func (cs *CatchpointCatchupService) checkLedgerDownload() error {
if err != nil {
return fmt.Errorf("failed to parse catchpoint label : %v", err)
}
- peerSelector := cs.makeCatchpointPeerSelector()
ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
for i := 0; i < cs.config.CatchupLedgerDownloadRetryAttempts; i++ {
- psp, peerError := peerSelector.getNextPeer()
+ psp, peerError := cs.blocksDownloadPeerSelector.getNextPeer()
if peerError != nil {
return err
}
diff --git a/catchup/classBasedPeerSelector.go b/catchup/classBasedPeerSelector.go
new file mode 100644
index 000000000..9ab9e6d71
--- /dev/null
+++ b/catchup/classBasedPeerSelector.go
@@ -0,0 +1,156 @@
+// Copyright (C) 2019-2024 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package catchup
+
+import (
+ "errors"
+ "github.com/algorand/go-algorand/network"
+ "github.com/algorand/go-deadlock"
+ "time"
+)
+
+// classBasedPeerSelector is a rankPooledPeerSelector that tracks and ranks classes of peers based on their response behavior.
+// It is used to select the most appropriate peers to download blocks from - this is most useful when catching up
+// and needing to figure out whether the blocks can be retrieved from relay nodes or require archive nodes.
+// The ordering of the peerSelectors directly determines the priority of the classes of peers.
+type classBasedPeerSelector struct {
+ mu deadlock.Mutex
+ peerSelectors []*wrappedPeerSelector
+}
+
+func makeClassBasedPeerSelector(peerSelectors []*wrappedPeerSelector) *classBasedPeerSelector {
+ return &classBasedPeerSelector{
+ peerSelectors: peerSelectors,
+ }
+}
+
+func (c *classBasedPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ oldRank, newRank := -1, -1
+ for _, wp := range c.peerSelectors {
+ // See if the peer is in the class, ranking it appropriately if so
+ if psp.peerClass != wp.peerClass {
+ continue
+ }
+
+ oldRank, newRank = wp.peerSelector.rankPeer(psp, rank)
+ if oldRank < 0 || newRank < 0 {
+ // Peer not found in this selector
+ continue
+ }
+
+ // Peer was in this class, if there was any kind of download issue, we increment the failure count
+ if rank >= peerRankNoBlockForRound {
+ wp.downloadFailures++
+ }
+
+ break
+ }
+
+ return oldRank, newRank
+}
+
+func (c *classBasedPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for _, wp := range c.peerSelectors {
+ rank = wp.peerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
+ // If rank is peerRankInvalidDownload, we check the next class's rankPooledPeerSelector
+ if rank >= peerRankInvalidDownload {
+ continue
+ }
+ // Should be a legit ranking, we return it
+ return rank
+ }
+ // If we reached here, we have exhausted all classes without finding the peer
+ return peerRankInvalidDownload
+}
+
+func (c *classBasedPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ return c.internalGetNextPeer(0)
+}
+
+// internalGetNextPeer is a helper function that should be called with the lock held
+func (c *classBasedPeerSelector) internalGetNextPeer(recurseCount int8) (psp *peerSelectorPeer, err error) {
+ // Safety check to prevent infinite recursion
+ if recurseCount > 1 {
+ return nil, errPeerSelectorNoPeerPoolsAvailable
+ }
+ selectorDisabledCount := 0
+ for _, wp := range c.peerSelectors {
+ if wp.downloadFailures > wp.toleranceFactor {
+ // peerSelector is disabled for now, we move to the next one
+ selectorDisabledCount++
+ continue
+ }
+ psp, err = wp.peerSelector.getNextPeer()
+
+ if err != nil {
+ // This is mostly just future-proofing, as we don't expect any other errors from getNextPeer
+ if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {
+ // We penalize this class the equivalent of one download failure (in case this is transient)
+ wp.downloadFailures++
+ }
+ continue
+ }
+ return psp, nil
+ }
+ // If we reached here, we have exhausted all classes and still have no peers
+ // IFF all classes are disabled, we reset the downloadFailures for all classes and start over
+ if len(c.peerSelectors) != 0 && selectorDisabledCount == len(c.peerSelectors) {
+ for _, wp := range c.peerSelectors {
+ wp.downloadFailures = 0
+ }
+ // Recurse to try again, we should have at least one class enabled now
+ return c.internalGetNextPeer(recurseCount + 1)
+ }
+ // If we reached here, we have exhausted all classes without finding a peer, not due to all classes being disabled
+ return nil, errPeerSelectorNoPeerPoolsAvailable
+}
+
+type wrappedPeerSelector struct {
+ peerSelector peerSelector // The underlying peerSelector for this class
+ peerClass network.PeerOption // The class of peers the peerSelector is responsible for
+ toleranceFactor int // The number of times we can net fail for any reason before we move to the next class's rankPooledPeerSelector
+ downloadFailures int // The number of times we have failed to download a block from this class's rankPooledPeerSelector since it was last reset
+}
+
+// makeCatchpointPeerSelector returns a classBasedPeerSelector that selects peers based on their class and response behavior.
+// These are the preferred configurations for the catchpoint service.
+func makeCatchpointPeerSelector(net peersRetriever) peerSelector {
+ wrappedPeerSelectors := []*wrappedPeerSelector{
+ {
+ peerClass: network.PeersPhonebookRelays,
+ peerSelector: makeRankPooledPeerSelector(net,
+ []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays}}),
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookArchivalNodes,
+ peerSelector: makeRankPooledPeerSelector(net,
+ []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}),
+ toleranceFactor: 10,
+ },
+ }
+
+ return makeClassBasedPeerSelector(wrappedPeerSelectors)
+}
diff --git a/catchup/classBasedPeerSelector_test.go b/catchup/classBasedPeerSelector_test.go
new file mode 100644
index 000000000..0110663f8
--- /dev/null
+++ b/catchup/classBasedPeerSelector_test.go
@@ -0,0 +1,498 @@
+// Copyright (C) 2019-2024 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package catchup
+
+import (
+ "github.com/algorand/go-algorand/network"
+ "github.com/algorand/go-algorand/test/partitiontest"
+ "github.com/stretchr/testify/require"
+ "testing"
+ "time"
+)
+
+// Use to mock the wrapped peer selectors where warranted
+type mockPeerSelector struct {
+ mockRankPeer func(psp *peerSelectorPeer, rank int) (int, int)
+ mockPeerDownloadDurationToRank func(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int)
+ mockGetNextPeer func() (psp *peerSelectorPeer, err error)
+}
+
+func (m mockPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) {
+ return m.mockRankPeer(psp, rank)
+}
+
+func (m mockPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
+ return m.mockPeerDownloadDurationToRank(psp, blockDownloadDuration)
+}
+
+func (m mockPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
+ return m.mockGetNextPeer()
+}
+
+func TestClassBasedPeerSelector_makeClassBasedPeerSelector(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ wrappedPeerSelectors := []*wrappedPeerSelector{
+ {
+ peerClass: network.PeersPhonebookRelays,
+ peerSelector: mockPeerSelector{},
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersConnectedOut,
+ peerSelector: mockPeerSelector{},
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookArchivalNodes,
+ peerSelector: mockPeerSelector{},
+ toleranceFactor: 10,
+ },
+ }
+
+ cps := makeClassBasedPeerSelector(wrappedPeerSelectors)
+
+ // The selectors should be sorted by priority
+ require.Equal(t, 3, len(cps.peerSelectors))
+ require.Equal(t, network.PeersPhonebookRelays, cps.peerSelectors[0].peerClass)
+ require.Equal(t, network.PeersConnectedOut, cps.peerSelectors[1].peerClass)
+ require.Equal(t, network.PeersPhonebookArchivalNodes, cps.peerSelectors[2].peerClass)
+}
+
+func TestClassBasedPeerSelector_rankPeer(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ mockPeer := &peerSelectorPeer{
+ peerClass: network.PeersPhonebookRelays,
+ }
+
+ // Create a class based peer selector initially with the first wrapped peer selector not having the peer,
+ // second one having it, and a third one not having it
+ wrappedPeerSelectors := []*wrappedPeerSelector{
+ {
+ peerClass: network.PeersConnectedOut,
+ peerSelector: mockPeerSelector{
+ mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) {
+ return -1, -1
+ },
+ },
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookRelays,
+ peerSelector: mockPeerSelector{
+ mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) {
+ if psp == mockPeer {
+ return 10, rank
+ }
+ return -1, -1
+ },
+ },
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookArchivalNodes,
+ peerSelector: mockPeerSelector{
+ mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) {
+ return -1, -1
+ },
+ },
+ toleranceFactor: 3,
+ },
+ }
+ cps := makeClassBasedPeerSelector(wrappedPeerSelectors)
+
+ // Peer is found in second selector, rank is within range for a block found
+ oldRank, newRank := cps.rankPeer(mockPeer, 50)
+
+ require.Equal(t, 10, oldRank)
+ require.Equal(t, 50, newRank)
+ require.Equal(t, 0, cps.peerSelectors[1].downloadFailures)
+
+ // Peer is found in second selector, rank is >= peerRankNoBlockForRound
+ oldRank, newRank = cps.rankPeer(mockPeer, peerRankNoBlockForRound)
+
+ require.Equal(t, 10, oldRank)
+ require.Equal(t, peerRankNoBlockForRound, newRank)
+ require.Equal(t, 1, cps.peerSelectors[1].downloadFailures)
+
+ // We fail to find a block for round 3 more times, download failures should reflect that.
+ cps.rankPeer(mockPeer, peerRankNoBlockForRound)
+ oldRank, newRank = cps.rankPeer(mockPeer, peerRankNoBlockForRound)
+
+ require.Equal(t, 10, oldRank)
+ require.Equal(t, peerRankNoBlockForRound, newRank)
+ require.Equal(t, 3, cps.peerSelectors[1].downloadFailures)
+
+ oldRank, newRank = cps.rankPeer(mockPeer, peerRankNoBlockForRound)
+ require.Equal(t, 10, oldRank)
+ require.Equal(t, peerRankNoBlockForRound, newRank)
+ require.Equal(t, 4, cps.peerSelectors[1].downloadFailures)
+
+ // Now, feed peers that are not in any of the selectors - it should return -1, -1
+ mockPeer2 := &peerSelectorPeer{
+ peerClass: network.PeersConnectedIn,
+ }
+
+ oldRank, newRank = cps.rankPeer(mockPeer2, 50)
+ require.Equal(t, -1, oldRank)
+ require.Equal(t, -1, newRank)
+
+ // While this will match class, the selectors will not have it
+ mockPeer3 := &peerSelectorPeer{
+ peerClass: network.PeersConnectedOut,
+ }
+
+ oldRank, newRank = cps.rankPeer(mockPeer3, 50)
+ require.Equal(t, -1, oldRank)
+ require.Equal(t, -1, newRank)
+
+ // Last sanity check, we should have zero download failures for the first and third selectors
+ require.Equal(t, 0, cps.peerSelectors[0].downloadFailures)
+ require.Equal(t, 0, cps.peerSelectors[2].downloadFailures)
+}
+
+func TestClassBasedPeerSelector_peerDownloadDurationToRank(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ mockPeer := &peerSelectorPeer{}
+ testDuration := 50 * time.Millisecond
+
+ // Create a class based peer selector initially with the first wrapped peer selector not having the peer,
+ // second one having it, and a third one not having it
+ wrappedPeerSelectors := []*wrappedPeerSelector{
+ {
+ peerClass: network.PeersConnectedOut,
+ peerSelector: mockPeerSelector{
+ mockPeerDownloadDurationToRank: func(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
+ return peerRankInvalidDownload
+ },
+ },
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookRelays,
+ peerSelector: mockPeerSelector{
+ mockPeerDownloadDurationToRank: func(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
+ if psp == mockPeer && blockDownloadDuration == testDuration {
+ return peerRank0HighBlockTime
+ }
+ return peerRankInvalidDownload
+ },
+ },
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookArchivalNodes,
+ peerSelector: mockPeerSelector{
+ mockPeerDownloadDurationToRank: func(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
+ return peerRankInvalidDownload
+ },
+ },
+ toleranceFactor: 3,
+ },
+ }
+ cps := makeClassBasedPeerSelector(wrappedPeerSelectors)
+
+ // The peer is found in the second selector, so the rank should be peerRank0HighBlockTime
+ rank := cps.peerDownloadDurationToRank(mockPeer, testDuration)
+ require.Equal(t, peerRank0HighBlockTime, rank)
+
+ // The peer is not found in any of the selectors, so the rank should be peerRankInvalidDownload
+ mockPeer2 := &peerSelectorPeer{}
+
+ rank = cps.peerDownloadDurationToRank(mockPeer2, testDuration)
+ require.Equal(t, peerRankInvalidDownload, rank)
+}
+
+func TestClassBasedPeerSelector_getNextPeer(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ mockPeer := &peerSelectorPeer{
+ peerClass: network.PeersPhonebookRelays,
+ }
+
+ // Create a class based peer selector initially with the first wrapped peer selector not having any peers,
+ // second one having a peer, and a third one not having any peers
+ wrappedPeerSelectors := []*wrappedPeerSelector{
+ {
+ peerClass: network.PeersConnectedOut,
+ peerSelector: mockPeerSelector{
+ mockGetNextPeer: func() (psp *peerSelectorPeer, err error) {
+ return nil, errPeerSelectorNoPeerPoolsAvailable
+ },
+ },
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookRelays,
+ peerSelector: mockPeerSelector{
+ mockGetNextPeer: func() (psp *peerSelectorPeer, err error) {
+ return mockPeer, nil
+ },
+ },
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookArchivalNodes,
+ peerSelector: mockPeerSelector{
+ mockGetNextPeer: func() (psp *peerSelectorPeer, err error) {
+ return nil, errPeerSelectorNoPeerPoolsAvailable
+ },
+ },
+ toleranceFactor: 3,
+ },
+ }
+
+ cps := makeClassBasedPeerSelector(wrappedPeerSelectors)
+
+ peerResult, err := cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockPeer, peerResult)
+
+ // Update selector to not return any peers
+ wrappedPeerSelectors[1].peerSelector = mockPeerSelector{
+ mockGetNextPeer: func() (psp *peerSelectorPeer, err error) {
+ return nil, errPeerSelectorNoPeerPoolsAvailable
+ },
+ }
+
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, peerResult)
+ require.Equal(t, errPeerSelectorNoPeerPoolsAvailable, err)
+
+ // Create a class based peer selector initially with all wrapped peer selectors having peers.
+ // The peers should always come from the first one repeatedly since rankings are not changed.
+ mockPeer = &peerSelectorPeer{
+ peerClass: network.PeersConnectedOut,
+ }
+ mockPeer2 := &peerSelectorPeer{
+ peerClass: network.PeersPhonebookRelays,
+ }
+ mockPeer3 := &peerSelectorPeer{
+ peerClass: network.PeersPhonebookArchivalNodes,
+ }
+
+ wrappedPeerSelectors = []*wrappedPeerSelector{
+ {
+ peerClass: network.PeersConnectedOut,
+ peerSelector: mockPeerSelector{
+ mockGetNextPeer: func() (psp *peerSelectorPeer, err error) {
+ return mockPeer, nil
+ },
+ mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) {
+ if psp == mockPeer {
+ return 10, rank
+ }
+ return -1, -1
+ },
+ },
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookRelays,
+ peerSelector: mockPeerSelector{
+ mockGetNextPeer: func() (psp *peerSelectorPeer, err error) {
+ return mockPeer2, nil
+ },
+ mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) {
+ if psp == mockPeer2 {
+ return 10, rank
+ }
+ return -1, -1
+ },
+ },
+ toleranceFactor: 10,
+ },
+ {
+ peerClass: network.PeersPhonebookArchivalNodes,
+ peerSelector: mockPeerSelector{
+ mockGetNextPeer: func() (psp *peerSelectorPeer, err error) {
+ return mockPeer3, nil
+ },
+ mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) {
+ if psp == mockPeer3 {
+ return 10, rank
+ }
+ return -1, -1
+ },
+ },
+ toleranceFactor: 3,
+ },
+ }
+
+ cps = makeClassBasedPeerSelector(wrappedPeerSelectors)
+
+ // We should always get the peer from the top priority selector since rankings are not updated/list is not re-sorted.
+ for i := 0; i < 10; i++ {
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockPeer, peerResult)
+ }
+
+ // Okay, record enough download failures to disable the first selector
+ for i := 0; i < 4; i++ {
+ cps.rankPeer(mockPeer, peerRankNoBlockForRound)
+ }
+
+ // Now, we should get the peer from the second selector
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockPeer2, peerResult)
+
+ // Sanity check the download failures for each selector
+ require.Equal(t, 4, cps.peerSelectors[0].downloadFailures)
+ require.Equal(t, 0, cps.peerSelectors[1].downloadFailures)
+ require.Equal(t, 0, cps.peerSelectors[2].downloadFailures)
+
+ // Now, record download failures just up to the tolerance factor for the second selector
+ for i := 0; i < 10; i++ {
+ cps.rankPeer(mockPeer2, peerRankNoBlockForRound)
+ }
+
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockPeer2, peerResult)
+
+ // One more should push us to the third selector
+ cps.rankPeer(mockPeer2, peerRankNoBlockForRound)
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockPeer3, peerResult)
+
+ // Check of the download failures for each selector
+ require.Equal(t, 4, cps.peerSelectors[0].downloadFailures)
+ require.Equal(t, 11, cps.peerSelectors[1].downloadFailures)
+ require.Equal(t, 0, cps.peerSelectors[2].downloadFailures)
+
+ // Now, record download failures just up to the tolerance factor for the third selector
+ for i := 0; i < 3; i++ {
+ cps.rankPeer(mockPeer3, peerRankNoBlockForRound)
+ }
+
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockPeer3, peerResult)
+
+ require.Equal(t, 4, cps.peerSelectors[0].downloadFailures)
+ require.Equal(t, 11, cps.peerSelectors[1].downloadFailures)
+ require.Equal(t, 3, cps.peerSelectors[2].downloadFailures)
+
+ // One more failure should reset ALL download failures (and grab a peer from the first selector)
+ cps.rankPeer(mockPeer3, peerRankNoBlockForRound)
+
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockPeer, peerResult)
+
+ // Check of the download failures for each selector, should have been reset
+ require.Equal(t, 0, cps.peerSelectors[0].downloadFailures)
+ require.Equal(t, 0, cps.peerSelectors[1].downloadFailures)
+ require.Equal(t, 0, cps.peerSelectors[2].downloadFailures)
+}
+
+func TestClassBasedPeerSelector_integration(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ mockP1Peer := mockHTTPPeer{address: "p1"}
+ mockP2Peer := mockHTTPPeer{address: "p2"}
+
+ mockP1WrappedPeer := &peerSelectorPeer{&mockP1Peer, network.PeersPhonebookRelays}
+ mockP2WrappedPeer := &peerSelectorPeer{&mockP2Peer, network.PeersPhonebookArchivalNodes}
+
+ net := makePeersRetrieverStub(func(options ...network.PeerOption) []network.Peer {
+ if len(options) > 0 {
+ switch options[0] {
+ case network.PeersPhonebookRelays:
+ return []network.Peer{&mockP1Peer}
+ case network.PeersPhonebookArchivalNodes:
+ return []network.Peer{&mockP2Peer}
+ default:
+ return []network.Peer{&mockP1Peer, &mockP2Peer}
+ }
+ }
+ return nil
+ })
+ // Create a class based peer selector with a few wrapped peer selectors
+ cps := makeCatchpointPeerSelector(net).(*classBasedPeerSelector)
+
+ // We should get the peer from the first priority selector, PeersPhonebookRelays
+ peerResult, err := cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockP1WrappedPeer, peerResult)
+
+ // Normal expected usage: rank the peer
+ durationRank := cps.peerDownloadDurationToRank(mockP1WrappedPeer, 500)
+ oldRank, newRank := cps.rankPeer(mockP1WrappedPeer, durationRank)
+
+ require.Equal(t, 0, oldRank)
+ require.Equal(t, durationRank, newRank)
+
+ // Let's simulate a few download failures (not enough to disable the selector)
+ for i := 0; i < 3; i++ {
+ expectedOldRank := newRank
+ peerResult, err = cps.getNextPeer()
+
+ require.Nil(t, err)
+ require.Equal(t, mockP1WrappedPeer, peerResult)
+
+ oldRank, newRank = cps.rankPeer(mockP1WrappedPeer, peerRankNoBlockForRound)
+
+ require.Equal(t, expectedOldRank, oldRank)
+ // Should be increasing with no block penalties
+ require.True(t, newRank >= oldRank)
+ }
+
+ // Sanity check, still should be the same peer (from phonebook selector)
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockP1WrappedPeer, peerResult)
+
+ // Rank the peer to follow normal usage
+ durationRank = cps.peerDownloadDurationToRank(mockP1WrappedPeer, 500)
+ expectedOldRank := newRank
+ oldRank, newRank = cps.rankPeer(mockP1WrappedPeer, durationRank)
+
+ require.Equal(t, expectedOldRank, oldRank)
+ // Rank should not go up after successful download
+ require.True(t, newRank <= oldRank)
+
+ // Now, let's simulate enough download failures to disable the first selector
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockP1WrappedPeer, peerResult)
+ cps.rankPeer(mockP1WrappedPeer, peerRankNoBlockForRound)
+
+ peerResult, err = cps.getNextPeer()
+ require.Nil(t, err)
+ require.Equal(t, mockP2WrappedPeer, peerResult)
+
+ // Normal expected usage: rank the peer
+ durationRank = cps.peerDownloadDurationToRank(mockP2WrappedPeer, 500)
+ oldRank, newRank = cps.rankPeer(mockP2WrappedPeer, durationRank)
+
+ require.Equal(t, 0, oldRank)
+ require.Equal(t, durationRank, newRank)
+
+ require.Equal(t, 4, cps.peerSelectors[0].downloadFailures)
+ require.Equal(t, 0, cps.peerSelectors[1].downloadFailures)
+}
diff --git a/catchup/peerSelector.go b/catchup/peerSelector.go
index 4ceda8d42..148529558 100644
--- a/catchup/peerSelector.go
+++ b/catchup/peerSelector.go
@@ -88,7 +88,7 @@ type peerClass struct {
peerClass network.PeerOption
}
-// the peersRetriever is a subset of the network.GossipNode used to ensure that we can create an instance of the peerSelector
+// the peersRetriever is a subset of the network.GossipNode used to ensure that we can create an instance of the rankPooledPeerSelector
// for testing purposes, providing just the above function.
type peersRetriever interface {
// Get a list of Peers we could potentially send a direct message to.
@@ -109,14 +109,20 @@ type peerPool struct {
peers []peerPoolEntry
}
-// peerSelector is a helper struct used to select the next peer to try and connect to
+type peerSelector interface {
+ rankPeer(psp *peerSelectorPeer, rank int) (int, int)
+ peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int)
+ getNextPeer() (psp *peerSelectorPeer, err error)
+}
+
+// rankPooledPeerSelector is a helper struct used to select the next peer to try and connect to
// for various catchup purposes. Unlike the underlying network GetPeers(), it allows the
// client to provide feedback regarding the peer's performance, and to have the subsequent
// query(s) take advantage of that intel.
-type peerSelector struct {
+type rankPooledPeerSelector struct {
mu deadlock.Mutex
net peersRetriever
- // peerClasses is the list of peer classes we want to have in the peerSelector.
+ // peerClasses is the list of peer classes we want to have in the rankPooledPeerSelector.
peerClasses []peerClass
// pools is the list of peer pools, each pool contains a list of peers with the same rank.
pools []peerPool
@@ -284,9 +290,9 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera
return bounded
}
-// makePeerSelector creates a peerSelector, given a peersRetriever and peerClass array.
-func makePeerSelector(net peersRetriever, initialPeersClasses []peerClass) *peerSelector {
- selector := &peerSelector{
+// makeRankPooledPeerSelector creates a rankPooledPeerSelector, given a peersRetriever and peerClass array.
+func makeRankPooledPeerSelector(net peersRetriever, initialPeersClasses []peerClass) *rankPooledPeerSelector {
+ selector := &rankPooledPeerSelector{
net: net,
peerClasses: initialPeersClasses,
}
@@ -296,7 +302,7 @@ func makePeerSelector(net peersRetriever, initialPeersClasses []peerClass) *peer
// getNextPeer returns the next peer. It randomally selects a peer from a pool that has
// the lowest rank value. Given that the peers are grouped by their ranks, allow us to
// prioritize peers based on their class and/or performance.
-func (ps *peerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
+func (ps *rankPooledPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.refreshAvailablePeers()
@@ -317,7 +323,7 @@ func (ps *peerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
// rankPeer ranks a given peer.
// return the old value and the new updated value.
// updated value could be different from the input rank.
-func (ps *peerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) {
+func (ps *rankPooledPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) {
if psp == nil {
return -1, -1
}
@@ -384,7 +390,7 @@ func (ps *peerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) {
}
// peerDownloadDurationToRank calculates the rank for a peer given a peer and the block download time.
-func (ps *peerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
+func (ps *rankPooledPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
ps.mu.Lock()
defer ps.mu.Unlock()
poolIdx, peerIdx := ps.findPeer(psp)
@@ -409,7 +415,7 @@ func (ps *peerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockD
// addToPool adds a given peer to the correct group. If no group exists for that peer's rank,
// a new group is created.
// The method return true if a new group was created ( suggesting that the pools list would need to be re-ordered ), or false otherwise.
-func (ps *peerSelector) addToPool(peer network.Peer, rank int, class peerClass, peerHistory *historicStats) bool {
+func (ps *rankPooledPeerSelector) addToPool(peer network.Peer, rank int, class peerClass, peerHistory *historicStats) bool {
// see if we already have a list with that rank:
for i, pool := range ps.pools {
if pool.rank == rank {
@@ -423,7 +429,7 @@ func (ps *peerSelector) addToPool(peer network.Peer, rank int, class peerClass,
}
// sort the pools array in an ascending order according to the rank of each pool.
-func (ps *peerSelector) sort() {
+func (ps *rankPooledPeerSelector) sort() {
sort.SliceStable(ps.pools, func(i, j int) bool {
return ps.pools[i].rank < ps.pools[j].rank
})
@@ -443,7 +449,7 @@ func peerAddress(peer network.Peer) string {
// refreshAvailablePeers reload the available peers from the network package, add new peers along with their
// corresponding initial rank, and deletes peers that have been dropped by the network package.
-func (ps *peerSelector) refreshAvailablePeers() {
+func (ps *rankPooledPeerSelector) refreshAvailablePeers() {
existingPeers := make(map[network.PeerOption]map[string]bool)
for _, pool := range ps.pools {
for _, localPeer := range pool.peers {
@@ -501,7 +507,7 @@ func (ps *peerSelector) refreshAvailablePeers() {
// findPeer look into the peer pool and find the given peer.
// The method returns the pool and peer indices if a peer was found, or (-1, -1) otherwise.
-func (ps *peerSelector) findPeer(psp *peerSelectorPeer) (poolIdx, peerIdx int) {
+func (ps *rankPooledPeerSelector) findPeer(psp *peerSelectorPeer) (poolIdx, peerIdx int) {
peerAddr := peerAddress(psp.Peer)
if peerAddr == "" {
return -1, -1
diff --git a/catchup/peerSelector_test.go b/catchup/peerSelector_test.go
index aa8d348d4..7aa373d28 100644
--- a/catchup/peerSelector_test.go
+++ b/catchup/peerSelector_test.go
@@ -131,7 +131,7 @@ func TestPeerSelector_RankPeer(t *testing.T) {
peers := []network.Peer{&mockHTTPPeer{address: "12345"}}
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) []network.Peer {
return peers
}), []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}},
@@ -191,7 +191,7 @@ func TestPeerSelector_PeerDownloadRanking(t *testing.T) {
peers1 := []network.Peer{&mockHTTPPeer{address: "1234"}, &mockHTTPPeer{address: "5678"}}
peers2 := []network.Peer{&mockHTTPPeer{address: "abcd"}, &mockHTTPPeer{address: "efgh"}}
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) {
for _, opt := range options {
if opt == network.PeersPhonebookArchivalNodes {
@@ -240,7 +240,7 @@ func TestPeerSelector_FindMissingPeer(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) []network.Peer {
return []network.Peer{}
}), []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}},
@@ -258,7 +258,7 @@ func TestPeerSelector_HistoricData(t *testing.T) {
peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}, &mockHTTPPeer{address: "a3"}}
peers2 := []network.Peer{&mockHTTPPeer{address: "b1"}, &mockHTTPPeer{address: "b2"}}
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) {
for _, opt := range options {
if opt == network.PeersPhonebookArchivalNodes {
@@ -332,7 +332,7 @@ func TestPeerSelector_PeersDownloadFailed(t *testing.T) {
peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}, &mockHTTPPeer{address: "a3"}}
peers2 := []network.Peer{&mockHTTPPeer{address: "b1"}, &mockHTTPPeer{address: "b2"}}
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) {
for _, opt := range options {
if opt == network.PeersPhonebookArchivalNodes {
@@ -408,7 +408,7 @@ func TestPeerSelector_Penalty(t *testing.T) {
peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}, &mockHTTPPeer{address: "a3"}}
peers2 := []network.Peer{&mockHTTPPeer{address: "b1"}, &mockHTTPPeer{address: "b2"}}
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) {
for _, opt := range options {
if opt == network.PeersPhonebookArchivalNodes {
@@ -469,7 +469,7 @@ func TestPeerSelector_PeerDownloadDurationToRank(t *testing.T) {
peers3 := []network.Peer{&mockHTTPPeer{address: "c1"}, &mockHTTPPeer{address: "c2"}}
peers4 := []network.Peer{&mockHTTPPeer{address: "d1"}, &mockHTTPPeer{address: "b2"}}
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) {
for _, opt := range options {
if opt == network.PeersPhonebookRelays {
@@ -574,7 +574,7 @@ func TestPeerSelector_ClassUpperBound(t *testing.T) {
peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}}
pClass := peerClass{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes}
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) {
for _, opt := range options {
if opt == network.PeersPhonebookArchivalNodes {
@@ -609,7 +609,7 @@ func TestPeerSelector_ClassLowerBound(t *testing.T) {
peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}}
pClass := peerClass{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes}
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) {
for _, opt := range options {
if opt == network.PeersPhonebookArchivalNodes {
@@ -639,7 +639,7 @@ func TestPeerSelector_EvictionAndUpgrade(t *testing.T) {
peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}}
peers2 := []network.Peer{&mockHTTPPeer{address: "a1"}}
- peerSelector := makePeerSelector(
+ peerSelector := makeRankPooledPeerSelector(
makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) {
for _, opt := range options {
if opt == network.PeersPhonebookArchivalNodes {
@@ -677,7 +677,7 @@ func TestPeerSelector_RefreshAvailablePeers(t *testing.T) {
// check new peers added to the pool
p1 := mockHTTPPeer{address: "p1"}
p2 := mockHTTPPeer{address: "p2"}
- ps := peerSelector{
+ ps := rankPooledPeerSelector{
peerClasses: []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes},
diff --git a/catchup/service.go b/catchup/service.go
index 58fc3ae6b..5c6609b23 100644
--- a/catchup/service.go
+++ b/catchup/service.go
@@ -38,10 +38,7 @@ import (
"github.com/algorand/go-algorand/util/execpool"
)
-const catchupPeersForSync = 10
-const blockQueryPeerLimit = 10
-
-// uncapParallelDownloadRate is a simple threshold to detect whether or not the node is caught up.
+// uncapParallelDownloadRate is a simple threshold to detect whether the node is caught up.
// If a block is downloaded in less than this duration, it's assumed that the node is not caught up
// and allow the block downloader to start N=parallelBlocks concurrent fetches.
const uncapParallelDownloadRate = time.Second
@@ -76,7 +73,7 @@ type Ledger interface {
WaitMem(r basics.Round) chan struct{}
}
-// Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network.
+// Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up-to-date with network.
type Service struct {
// disableSyncRound, provided externally, is the first round we will _not_ fetch from the network
// any round >= disableSyncRound will not be fetched. If set to 0, it will be disregarded.
@@ -266,7 +263,7 @@ const errNoBlockForRoundThreshold = 5
// - If we couldn't fetch the block (e.g. if there are no peers available, or we've reached the catchupRetryLimit)
// - If the block is already in the ledger (e.g. if agreement service has already written it)
// - If the retrieval of the previous block was unsuccessful
-func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCompleteChan chan struct{}, lookbackComplete chan struct{}, peerSelector *peerSelector) bool {
+func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCompleteChan chan struct{}, lookbackComplete chan struct{}, peerSelector peerSelector) bool {
// If sync-ing this round is not intended, don't fetch it
if dontSyncRound := s.GetDisableSyncRound(); dontSyncRound != 0 && r >= basics.Round(dontSyncRound) {
return false
@@ -318,7 +315,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
block, cert, blockDownloadDuration, err := s.innerFetch(ctx, r, peer)
if err != nil {
- if err == errLedgerAlreadyHasBlock {
+ if errors.Is(err, errLedgerAlreadyHasBlock) {
// ledger already has the block, no need to request this block.
// only the agreement could have added this block into the ledger, catchup is complete
s.log.Infof("fetchAndWrite(%d): the block is already in the ledger. The catchup is complete", r)
@@ -329,7 +326,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
if errors.As(err, &nbfe) {
failureRank = peerRankNoBlockForRound
// remote peer doesn't have the block, try another peer
- // quit if the the same peer peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times
+ // quit if the same peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times
if s.followLatest {
// back off between retries to allow time for the next block to appear;
// this will provide 50s (catchupRetryLimit * followLatestBackoff) of
@@ -427,7 +424,8 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
// if the context expired, just exit.
return false
}
- if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound {
+ var errNSBE ledgercore.ErrNonSequentialBlockEval
+ if errors.As(err, &errNSBE) && errNSBE.EvaluatorRound <= errNSBE.LatestRound {
// the block was added to the ledger from elsewhere after fetching it here
// only the agreement could have added this block into the ledger, catchup is complete
s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r)
@@ -442,16 +440,19 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
}
if err != nil {
- switch err.(type) {
- case ledgercore.ErrNonSequentialBlockEval:
+ var errNonSequentialBlockEval ledgercore.ErrNonSequentialBlockEval
+ var blockInLedgerError ledgercore.BlockInLedgerError
+ var protocolErr protocol.Error
+ switch {
+ case errors.As(err, &errNonSequentialBlockEval):
s.log.Infof("fetchAndWrite(%d): no need to re-evaluate historical block", r)
return true
- case ledgercore.BlockInLedgerError:
+ case errors.As(err, &blockInLedgerError):
// the block was added to the ledger from elsewhere after fetching it here
// only the agreement could have added this block into the ledger, catchup is complete
s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r)
return false
- case protocol.Error:
+ case errors.As(err, &protocolErr):
if !s.protocolErrorLogged {
logging.Base().Errorf("fetchAndWrite(%v): unrecoverable protocol error detected: %v", r, err)
s.protocolErrorLogged = true
@@ -491,8 +492,8 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
}
}()
- peerSelector := createPeerSelector(s.net, s.cfg, true)
- if _, err := peerSelector.getNextPeer(); err == errPeerSelectorNoPeerPoolsAvailable {
+ ps := createPeerSelector(s.net)
+ if _, err := ps.getNextPeer(); err != nil {
s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from")
return
}
@@ -527,7 +528,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
go func(r basics.Round) {
prev := s.ledger.WaitMem(r - 1)
seed := s.ledger.WaitMem(r.SubSaturate(basics.Round(seedLookback)))
- done <- s.fetchAndWrite(ctx, r, prev, seed, peerSelector)
+ done <- s.fetchAndWrite(ctx, r, prev, seed, ps)
wg.Done()
}(nextRound)
@@ -751,9 +752,9 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
peerErrors := map[network.Peer]int{}
blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
- peerSelector := createPeerSelector(s.net, s.cfg, false)
+ ps := createPeerSelector(s.net)
for s.ledger.LastRound() < cert.Round {
- psp, getPeerErr := peerSelector.getNextPeer()
+ psp, getPeerErr := ps.getNextPeer()
if getPeerErr != nil {
s.log.Debugf("fetchRound: was unable to obtain a peer to retrieve the block from")
s.net.RequestConnectOutgoing(true, s.ctx.Done())
@@ -783,19 +784,19 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
time.Sleep(50 * time.Millisecond)
}
if count > errNoBlockForRoundThreshold*10 {
- // for the low number of connected peers (like 2) the following scenatio is possible:
+ // for the low number of connected peers (like 2) the following scenario is possible:
// - both peers do not have the block
// - peer selector punishes one of the peers more than the other
- // - the punoshed peer gets the block, and the less punished peer stucks.
+ // - the punished peer gets the block, and the less punished peer stucks.
// It this case reset the peer selector to let it re-learn priorities.
- peerSelector = createPeerSelector(s.net, s.cfg, false)
+ ps = createPeerSelector(s.net)
}
}
peerErrors[peer]++
}
// remote peer doesn't have the block, try another peer
logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err)
- peerSelector.rankPeer(psp, failureRank)
+ ps.rankPeer(psp, failureRank)
continue
}
@@ -805,7 +806,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
}
// Otherwise, fetcher gave us the wrong block
logging.Base().Warnf("fetcher gave us bad/wrong block (for round %d): fetched hash %v; want hash %v", cert.Round, block.Hash(), blockHash)
- peerSelector.rankPeer(psp, peerRankInvalidDownload)
+ ps.rankPeer(psp, peerRankInvalidDownload)
// As a failsafe, if the cert we fetched is valid but for the wrong block, panic as loudly as possible
if cert.Round == fetchedCert.Round &&
@@ -866,38 +867,33 @@ func (s *Service) roundIsNotSupported(nextRound basics.Round) bool {
return true
}
-func createPeerSelector(net network.GossipNode, cfg config.Local, pipelineFetch bool) *peerSelector {
- var peerClasses []peerClass
- if pipelineFetch {
- if cfg.NetAddress != "" && cfg.EnableGossipService { // Relay node
- peerClasses = []peerClass{
- {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
- {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes},
- {initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
- {initialRank: peerRankInitialFourthPriority, peerClass: network.PeersConnectedIn},
- }
- } else {
- peerClasses = []peerClass{
- {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes},
- {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedOut},
- {initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
- }
- }
- } else {
- if cfg.NetAddress != "" && cfg.EnableGossipService { // Relay node
- peerClasses = []peerClass{
- {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
- {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
- {initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookArchivalNodes},
- {initialRank: peerRankInitialFourthPriority, peerClass: network.PeersPhonebookRelays},
- }
- } else {
- peerClasses = []peerClass{
- {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
- {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes},
- {initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
- }
- }
+func createPeerSelector(net network.GossipNode) peerSelector {
+ wrappedPeerSelectors := []*wrappedPeerSelector{
+ {
+ peerClass: network.PeersConnectedOut,
+ peerSelector: makeRankPooledPeerSelector(net,
+ []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut}}),
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookRelays,
+ peerSelector: makeRankPooledPeerSelector(net,
+ []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays}}),
+ toleranceFactor: 3,
+ },
+ {
+ peerClass: network.PeersPhonebookArchivalNodes,
+ peerSelector: makeRankPooledPeerSelector(net,
+ []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}),
+ toleranceFactor: 10,
+ },
+ {
+ peerClass: network.PeersConnectedIn,
+ peerSelector: makeRankPooledPeerSelector(net,
+ []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedIn}}),
+ toleranceFactor: 3,
+ },
}
- return makePeerSelector(net, peerClasses)
+
+ return makeClassBasedPeerSelector(wrappedPeerSelectors)
}
diff --git a/catchup/service_test.go b/catchup/service_test.go
index 8deb692b0..045a0438f 100644
--- a/catchup/service_test.go
+++ b/catchup/service_test.go
@@ -958,102 +958,23 @@ func TestCatchupUnmatchedCertificate(t *testing.T) {
func TestCreatePeerSelector(t *testing.T) {
partitiontest.PartitionTest(t)
- // Make Service
- cfg := defaultConfig
+ s := MakeService(logging.Base(), defaultConfig, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
+ ps := createPeerSelector(s.net)
+
+ cps, ok := ps.(*classBasedPeerSelector)
+ require.True(t, ok)
+
+ require.Equal(t, 4, len(cps.peerSelectors))
+
+ require.Equal(t, network.PeersConnectedOut, cps.peerSelectors[0].peerClass)
+ require.Equal(t, network.PeersPhonebookRelays, cps.peerSelectors[1].peerClass)
+ require.Equal(t, network.PeersPhonebookArchivalNodes, cps.peerSelectors[2].peerClass)
+ require.Equal(t, network.PeersConnectedIn, cps.peerSelectors[3].peerClass)
- // cfg.NetAddress != ""; cfg.EnableGossipService = true; pipelineFetch = true
- cfg.NetAddress = "someAddress"
- cfg.EnableGossipService = true
- s := MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps := createPeerSelector(s.net, s.cfg, true)
-
- require.Equal(t, 4, len(ps.peerClasses))
- require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
- require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
- require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank)
- require.Equal(t, peerRankInitialFourthPriority, ps.peerClasses[3].initialRank)
-
- require.Equal(t, network.PeersConnectedOut, ps.peerClasses[0].peerClass)
- require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[1].peerClass)
- require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass)
- require.Equal(t, network.PeersConnectedIn, ps.peerClasses[3].peerClass)
-
- // cfg.NetAddress == ""; cfg.EnableGossipService = true; pipelineFetch = true
- cfg.NetAddress = ""
- cfg.EnableGossipService = true
- s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = createPeerSelector(s.net, s.cfg, true)
-
- require.Equal(t, 3, len(ps.peerClasses))
- require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
- require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
- require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank)
-
- require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[0].peerClass)
- require.Equal(t, network.PeersConnectedOut, ps.peerClasses[1].peerClass)
- require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass)
-
- // cfg.NetAddress != ""; cfg.EnableGossipService = false; pipelineFetch = true
- cfg.NetAddress = "someAddress"
- cfg.EnableGossipService = false
- s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = createPeerSelector(s.net, s.cfg, true)
-
- require.Equal(t, 3, len(ps.peerClasses))
- require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
- require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
- require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank)
-
- require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[0].peerClass)
- require.Equal(t, network.PeersConnectedOut, ps.peerClasses[1].peerClass)
- require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass)
-
- // cfg.NetAddress != ""; cfg.EnableGossipService = true; pipelineFetch = false
- cfg.NetAddress = "someAddress"
- cfg.EnableGossipService = true
- s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = createPeerSelector(s.net, s.cfg, false)
-
- require.Equal(t, 4, len(ps.peerClasses))
- require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
- require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
- require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank)
- require.Equal(t, peerRankInitialFourthPriority, ps.peerClasses[3].initialRank)
-
- require.Equal(t, network.PeersConnectedOut, ps.peerClasses[0].peerClass)
- require.Equal(t, network.PeersConnectedIn, ps.peerClasses[1].peerClass)
- require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[2].peerClass)
- require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[3].peerClass)
-
- // cfg.NetAddress == ""; cfg.EnableGossipService = true; pipelineFetch = false
- cfg.NetAddress = ""
- cfg.EnableGossipService = true
- s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = createPeerSelector(s.net, s.cfg, false)
-
- require.Equal(t, 3, len(ps.peerClasses))
- require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
- require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
- require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank)
-
- require.Equal(t, network.PeersConnectedOut, ps.peerClasses[0].peerClass)
- require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[1].peerClass)
- require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass)
-
- // cfg.NetAddress != ""; cfg.EnableGossipService = false; pipelineFetch = false
- cfg.NetAddress = "someAddress"
- cfg.EnableGossipService = false
- s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = createPeerSelector(s.net, s.cfg, false)
-
- require.Equal(t, 3, len(ps.peerClasses))
- require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
- require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
- require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank)
-
- require.Equal(t, network.PeersConnectedOut, ps.peerClasses[0].peerClass)
- require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[1].peerClass)
- require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass)
+ require.Equal(t, 3, cps.peerSelectors[0].toleranceFactor)
+ require.Equal(t, 3, cps.peerSelectors[1].toleranceFactor)
+ require.Equal(t, 10, cps.peerSelectors[2].toleranceFactor)
+ require.Equal(t, 3, cps.peerSelectors[3].toleranceFactor)
}
func TestServiceStartStop(t *testing.T) {