diff options
author | Gary <982483+gmalouf@users.noreply.github.com> | 2024-02-21 15:33:35 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-21 15:33:35 -0500 |
commit | 52964edd9795c4cacc7d107ac9f41f73374fce71 (patch) | |
tree | d203ae6f91aa09a0ff9d47294f86b5269c2885a3 | |
parent | d8c825d96b90b4f1c84a5b3771987ab53bfc8568 (diff) |
Network: Class-based Peer Selector (#5937)
-rw-r--r-- | catchup/catchpointService.go | 80 | ||||
-rw-r--r-- | catchup/classBasedPeerSelector.go | 156 | ||||
-rw-r--r-- | catchup/classBasedPeerSelector_test.go | 498 | ||||
-rw-r--r-- | catchup/peerSelector.go | 34 | ||||
-rw-r--r-- | catchup/peerSelector_test.go | 22 | ||||
-rw-r--r-- | catchup/service.go | 108 | ||||
-rw-r--r-- | catchup/service_test.go | 111 |
7 files changed, 789 insertions, 220 deletions
diff --git a/catchup/catchpointService.go b/catchup/catchpointService.go index 2c4f6dfc4..3c11d0db7 100644 --- a/catchup/catchpointService.go +++ b/catchup/catchpointService.go @@ -18,6 +18,7 @@ package catchup import ( "context" + "errors" "fmt" "sync" "time" @@ -69,7 +70,7 @@ type CatchpointCatchupStats struct { type CatchpointCatchupService struct { // stats is the statistics object, updated async while downloading the ledger stats CatchpointCatchupStats - // statsMu synchronizes access to stats, as we could attempt to update it while querying for it's current state + // statsMu synchronizes access to stats, as we could attempt to update it while querying for its current state statsMu deadlock.Mutex node CatchpointCatchupNodeServices // ctx is the node cancellation context, used when the node is being stopped. @@ -98,7 +99,7 @@ type CatchpointCatchupService struct { abortCtx context.Context abortCtxFunc context.CancelFunc // blocksDownloadPeerSelector is the peer selector used for downloading blocks. - blocksDownloadPeerSelector *peerSelector + blocksDownloadPeerSelector peerSelector } // MakeResumedCatchpointCatchupService creates a catchpoint catchup service for a node that is already in catchpoint catchup mode @@ -280,51 +281,50 @@ func (cs *CatchpointCatchupService) processStageInactive() (err error) { } // processStageLedgerDownload is the second catchpoint catchup stage. It downloads the ledger. -func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) { +func (cs *CatchpointCatchupService) processStageLedgerDownload() error { cs.statsMu.Lock() label := cs.stats.CatchpointLabel cs.statsMu.Unlock() - round, _, err0 := ledgercore.ParseCatchpointLabel(label) + round, _, err := ledgercore.ParseCatchpointLabel(label) - if err0 != nil { - return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err0)) + if err != nil { + return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err)) } // download balances file. - peerSelector := cs.makeCatchpointPeerSelector() - ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config) + lf := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config) attemptsCount := 0 for { attemptsCount++ - err = cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true) - if err != nil { + err0 := cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true) + if err0 != nil { if cs.ctx.Err() != nil { return cs.stopOrAbort() } - return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err)) + return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err0)) } - psp, err := peerSelector.getNextPeer() - if err != nil { - err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from") - return cs.abort(err) + psp, err0 := cs.blocksDownloadPeerSelector.getNextPeer() + if err0 != nil { + err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from") + return cs.abort(err0) } peer := psp.Peer start := time.Now() - err = ledgerFetcher.downloadLedger(cs.ctx, peer, round) - if err == nil { + err0 = lf.downloadLedger(cs.ctx, peer, round) + if err0 == nil { cs.log.Infof("ledger downloaded in %d seconds", time.Since(start)/time.Second) start = time.Now() - err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts) - if err == nil { + err0 = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts) + if err0 == nil { cs.log.Infof("built merkle trie in %d seconds", time.Since(start)/time.Second) break } // failed to build the merkle trie for the above catchpoint file. - peerSelector.rankPeer(psp, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) } else { - peerSelector.rankPeer(psp, peerRankDownloadFailed) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed) } // instead of testing for err == cs.ctx.Err() , we'll check on the context itself. @@ -335,10 +335,10 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) { } if attemptsCount >= cs.config.CatchupLedgerDownloadRetryAttempts { - err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger") - return cs.abort(err) + err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger") + return cs.abort(err0) } - cs.log.Warnf("unable to download ledger : %v", err) + cs.log.Warnf("unable to download ledger : %v", err0) } err = cs.updateStage(ledger.CatchpointCatchupStateLatestBlockDownload) @@ -506,14 +506,14 @@ func lookbackForStateproofsSupport(topBlock *bookkeeping.Block) uint64 { return uint64(topBlock.Round().SubSaturate(lowestStateProofRound)) } -// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against it's predecessor. +// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against its predecessor. func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { topBlock, err := cs.ledgerAccessor.EnsureFirstBlock(cs.ctx) if err != nil { return cs.abort(fmt.Errorf("processStageBlocksDownload failed, unable to ensure first block : %v", err)) } - // pick the lookback with the greater of + // pick the lookback with the greatest of // either (MaxTxnLife+DeeperBlockHeaderHistory+CatchpointLookback) or MaxBalLookback // Explanation: // 1. catchpoint snapshots accounts at round X-CatchpointLookback @@ -531,13 +531,13 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { } // in case the effective lookback is going before our rounds count, trim it there. - // ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback..MaxTxnLife) + // ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback...MaxTxnLife) if lookback >= uint64(topBlock.Round()) { lookback = uint64(topBlock.Round() - 1) } cs.statsMu.Lock() - cs.stats.TotalBlocks = uint64(lookback) + cs.stats.TotalBlocks = lookback cs.stats.AcquiredBlocks = 0 cs.stats.VerifiedBlocks = 0 cs.statsMu.Unlock() @@ -558,8 +558,9 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { blk = &ledgerBlock cert = &ledgerCert } else { - switch err0.(type) { - case ledgercore.ErrNoEntry: + var errNoEntry ledgercore.ErrNoEntry + switch { + case errors.As(err0, &errNoEntry): // this is expected, ignore this one. default: cs.log.Warnf("processStageBlocksDownload encountered the following error when attempting to retrieve the block for round %d : %v", topBlock.Round()-basics.Round(blocksFetched), err0) @@ -658,7 +659,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) { psp, err = cs.blocksDownloadPeerSelector.getNextPeer() if err != nil { - if err == errPeerSelectorNoPeerPoolsAvailable { + if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) { cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.") // this is a possible on startup, since the network package might have yet to retrieve the list of peers. time.Sleep(noPeersAvailableSleepInterval) @@ -718,7 +719,7 @@ func (cs *CatchpointCatchupService) processStageSwitch() (err error) { // stopOrAbort is called when any of the stage processing function sees that cs.ctx has been canceled. It can be // due to the end user attempting to abort the current catchpoint catchup operation or due to a node shutdown. func (cs *CatchpointCatchupService) stopOrAbort() error { - if cs.abortCtx.Err() == context.Canceled { + if errors.Is(cs.abortCtx.Err(), context.Canceled) { return cs.abort(context.Canceled) } return nil @@ -749,7 +750,7 @@ func (cs *CatchpointCatchupService) updateStage(newStage ledger.CatchpointCatchu return nil } -// updateNodeCatchupMode requests the node to change it's operational mode from +// updateNodeCatchupMode requests the node to change its operational mode from // catchup mode to normal mode and vice versa. func (cs *CatchpointCatchupService) updateNodeCatchupMode(catchupModeEnabled bool) { newCtxCh := cs.node.SetCatchpointCatchupMode(catchupModeEnabled) @@ -802,15 +803,7 @@ func (cs *CatchpointCatchupService) updateBlockRetrievalStatistics(acquiredBlock } func (cs *CatchpointCatchupService) initDownloadPeerSelector() { - cs.blocksDownloadPeerSelector = cs.makeCatchpointPeerSelector() -} - -func (cs *CatchpointCatchupService) makeCatchpointPeerSelector() *peerSelector { - return makePeerSelector( - cs.net, - []peerClass{ - {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays}, - }) + cs.blocksDownloadPeerSelector = makeCatchpointPeerSelector(cs.net) } // checkLedgerDownload sends a HEAD request to the ledger endpoint of peers to validate the catchpoint's availability @@ -821,10 +814,9 @@ func (cs *CatchpointCatchupService) checkLedgerDownload() error { if err != nil { return fmt.Errorf("failed to parse catchpoint label : %v", err) } - peerSelector := cs.makeCatchpointPeerSelector() ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config) for i := 0; i < cs.config.CatchupLedgerDownloadRetryAttempts; i++ { - psp, peerError := peerSelector.getNextPeer() + psp, peerError := cs.blocksDownloadPeerSelector.getNextPeer() if peerError != nil { return err } diff --git a/catchup/classBasedPeerSelector.go b/catchup/classBasedPeerSelector.go new file mode 100644 index 000000000..9ab9e6d71 --- /dev/null +++ b/catchup/classBasedPeerSelector.go @@ -0,0 +1,156 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package catchup + +import ( + "errors" + "github.com/algorand/go-algorand/network" + "github.com/algorand/go-deadlock" + "time" +) + +// classBasedPeerSelector is a rankPooledPeerSelector that tracks and ranks classes of peers based on their response behavior. +// It is used to select the most appropriate peers to download blocks from - this is most useful when catching up +// and needing to figure out whether the blocks can be retrieved from relay nodes or require archive nodes. +// The ordering of the peerSelectors directly determines the priority of the classes of peers. +type classBasedPeerSelector struct { + mu deadlock.Mutex + peerSelectors []*wrappedPeerSelector +} + +func makeClassBasedPeerSelector(peerSelectors []*wrappedPeerSelector) *classBasedPeerSelector { + return &classBasedPeerSelector{ + peerSelectors: peerSelectors, + } +} + +func (c *classBasedPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) { + c.mu.Lock() + defer c.mu.Unlock() + + oldRank, newRank := -1, -1 + for _, wp := range c.peerSelectors { + // See if the peer is in the class, ranking it appropriately if so + if psp.peerClass != wp.peerClass { + continue + } + + oldRank, newRank = wp.peerSelector.rankPeer(psp, rank) + if oldRank < 0 || newRank < 0 { + // Peer not found in this selector + continue + } + + // Peer was in this class, if there was any kind of download issue, we increment the failure count + if rank >= peerRankNoBlockForRound { + wp.downloadFailures++ + } + + break + } + + return oldRank, newRank +} + +func (c *classBasedPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, wp := range c.peerSelectors { + rank = wp.peerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration) + // If rank is peerRankInvalidDownload, we check the next class's rankPooledPeerSelector + if rank >= peerRankInvalidDownload { + continue + } + // Should be a legit ranking, we return it + return rank + } + // If we reached here, we have exhausted all classes without finding the peer + return peerRankInvalidDownload +} + +func (c *classBasedPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) { + c.mu.Lock() + defer c.mu.Unlock() + return c.internalGetNextPeer(0) +} + +// internalGetNextPeer is a helper function that should be called with the lock held +func (c *classBasedPeerSelector) internalGetNextPeer(recurseCount int8) (psp *peerSelectorPeer, err error) { + // Safety check to prevent infinite recursion + if recurseCount > 1 { + return nil, errPeerSelectorNoPeerPoolsAvailable + } + selectorDisabledCount := 0 + for _, wp := range c.peerSelectors { + if wp.downloadFailures > wp.toleranceFactor { + // peerSelector is disabled for now, we move to the next one + selectorDisabledCount++ + continue + } + psp, err = wp.peerSelector.getNextPeer() + + if err != nil { + // This is mostly just future-proofing, as we don't expect any other errors from getNextPeer + if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) { + // We penalize this class the equivalent of one download failure (in case this is transient) + wp.downloadFailures++ + } + continue + } + return psp, nil + } + // If we reached here, we have exhausted all classes and still have no peers + // IFF all classes are disabled, we reset the downloadFailures for all classes and start over + if len(c.peerSelectors) != 0 && selectorDisabledCount == len(c.peerSelectors) { + for _, wp := range c.peerSelectors { + wp.downloadFailures = 0 + } + // Recurse to try again, we should have at least one class enabled now + return c.internalGetNextPeer(recurseCount + 1) + } + // If we reached here, we have exhausted all classes without finding a peer, not due to all classes being disabled + return nil, errPeerSelectorNoPeerPoolsAvailable +} + +type wrappedPeerSelector struct { + peerSelector peerSelector // The underlying peerSelector for this class + peerClass network.PeerOption // The class of peers the peerSelector is responsible for + toleranceFactor int // The number of times we can net fail for any reason before we move to the next class's rankPooledPeerSelector + downloadFailures int // The number of times we have failed to download a block from this class's rankPooledPeerSelector since it was last reset +} + +// makeCatchpointPeerSelector returns a classBasedPeerSelector that selects peers based on their class and response behavior. +// These are the preferred configurations for the catchpoint service. +func makeCatchpointPeerSelector(net peersRetriever) peerSelector { + wrappedPeerSelectors := []*wrappedPeerSelector{ + { + peerClass: network.PeersPhonebookRelays, + peerSelector: makeRankPooledPeerSelector(net, + []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays}}), + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookArchivalNodes, + peerSelector: makeRankPooledPeerSelector(net, + []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}), + toleranceFactor: 10, + }, + } + + return makeClassBasedPeerSelector(wrappedPeerSelectors) +} diff --git a/catchup/classBasedPeerSelector_test.go b/catchup/classBasedPeerSelector_test.go new file mode 100644 index 000000000..0110663f8 --- /dev/null +++ b/catchup/classBasedPeerSelector_test.go @@ -0,0 +1,498 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package catchup + +import ( + "github.com/algorand/go-algorand/network" + "github.com/algorand/go-algorand/test/partitiontest" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +// Use to mock the wrapped peer selectors where warranted +type mockPeerSelector struct { + mockRankPeer func(psp *peerSelectorPeer, rank int) (int, int) + mockPeerDownloadDurationToRank func(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) + mockGetNextPeer func() (psp *peerSelectorPeer, err error) +} + +func (m mockPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) { + return m.mockRankPeer(psp, rank) +} + +func (m mockPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { + return m.mockPeerDownloadDurationToRank(psp, blockDownloadDuration) +} + +func (m mockPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) { + return m.mockGetNextPeer() +} + +func TestClassBasedPeerSelector_makeClassBasedPeerSelector(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + wrappedPeerSelectors := []*wrappedPeerSelector{ + { + peerClass: network.PeersPhonebookRelays, + peerSelector: mockPeerSelector{}, + toleranceFactor: 3, + }, + { + peerClass: network.PeersConnectedOut, + peerSelector: mockPeerSelector{}, + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookArchivalNodes, + peerSelector: mockPeerSelector{}, + toleranceFactor: 10, + }, + } + + cps := makeClassBasedPeerSelector(wrappedPeerSelectors) + + // The selectors should be sorted by priority + require.Equal(t, 3, len(cps.peerSelectors)) + require.Equal(t, network.PeersPhonebookRelays, cps.peerSelectors[0].peerClass) + require.Equal(t, network.PeersConnectedOut, cps.peerSelectors[1].peerClass) + require.Equal(t, network.PeersPhonebookArchivalNodes, cps.peerSelectors[2].peerClass) +} + +func TestClassBasedPeerSelector_rankPeer(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + mockPeer := &peerSelectorPeer{ + peerClass: network.PeersPhonebookRelays, + } + + // Create a class based peer selector initially with the first wrapped peer selector not having the peer, + // second one having it, and a third one not having it + wrappedPeerSelectors := []*wrappedPeerSelector{ + { + peerClass: network.PeersConnectedOut, + peerSelector: mockPeerSelector{ + mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) { + return -1, -1 + }, + }, + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookRelays, + peerSelector: mockPeerSelector{ + mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) { + if psp == mockPeer { + return 10, rank + } + return -1, -1 + }, + }, + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookArchivalNodes, + peerSelector: mockPeerSelector{ + mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) { + return -1, -1 + }, + }, + toleranceFactor: 3, + }, + } + cps := makeClassBasedPeerSelector(wrappedPeerSelectors) + + // Peer is found in second selector, rank is within range for a block found + oldRank, newRank := cps.rankPeer(mockPeer, 50) + + require.Equal(t, 10, oldRank) + require.Equal(t, 50, newRank) + require.Equal(t, 0, cps.peerSelectors[1].downloadFailures) + + // Peer is found in second selector, rank is >= peerRankNoBlockForRound + oldRank, newRank = cps.rankPeer(mockPeer, peerRankNoBlockForRound) + + require.Equal(t, 10, oldRank) + require.Equal(t, peerRankNoBlockForRound, newRank) + require.Equal(t, 1, cps.peerSelectors[1].downloadFailures) + + // We fail to find a block for round 3 more times, download failures should reflect that. + cps.rankPeer(mockPeer, peerRankNoBlockForRound) + oldRank, newRank = cps.rankPeer(mockPeer, peerRankNoBlockForRound) + + require.Equal(t, 10, oldRank) + require.Equal(t, peerRankNoBlockForRound, newRank) + require.Equal(t, 3, cps.peerSelectors[1].downloadFailures) + + oldRank, newRank = cps.rankPeer(mockPeer, peerRankNoBlockForRound) + require.Equal(t, 10, oldRank) + require.Equal(t, peerRankNoBlockForRound, newRank) + require.Equal(t, 4, cps.peerSelectors[1].downloadFailures) + + // Now, feed peers that are not in any of the selectors - it should return -1, -1 + mockPeer2 := &peerSelectorPeer{ + peerClass: network.PeersConnectedIn, + } + + oldRank, newRank = cps.rankPeer(mockPeer2, 50) + require.Equal(t, -1, oldRank) + require.Equal(t, -1, newRank) + + // While this will match class, the selectors will not have it + mockPeer3 := &peerSelectorPeer{ + peerClass: network.PeersConnectedOut, + } + + oldRank, newRank = cps.rankPeer(mockPeer3, 50) + require.Equal(t, -1, oldRank) + require.Equal(t, -1, newRank) + + // Last sanity check, we should have zero download failures for the first and third selectors + require.Equal(t, 0, cps.peerSelectors[0].downloadFailures) + require.Equal(t, 0, cps.peerSelectors[2].downloadFailures) +} + +func TestClassBasedPeerSelector_peerDownloadDurationToRank(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + mockPeer := &peerSelectorPeer{} + testDuration := 50 * time.Millisecond + + // Create a class based peer selector initially with the first wrapped peer selector not having the peer, + // second one having it, and a third one not having it + wrappedPeerSelectors := []*wrappedPeerSelector{ + { + peerClass: network.PeersConnectedOut, + peerSelector: mockPeerSelector{ + mockPeerDownloadDurationToRank: func(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { + return peerRankInvalidDownload + }, + }, + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookRelays, + peerSelector: mockPeerSelector{ + mockPeerDownloadDurationToRank: func(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { + if psp == mockPeer && blockDownloadDuration == testDuration { + return peerRank0HighBlockTime + } + return peerRankInvalidDownload + }, + }, + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookArchivalNodes, + peerSelector: mockPeerSelector{ + mockPeerDownloadDurationToRank: func(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { + return peerRankInvalidDownload + }, + }, + toleranceFactor: 3, + }, + } + cps := makeClassBasedPeerSelector(wrappedPeerSelectors) + + // The peer is found in the second selector, so the rank should be peerRank0HighBlockTime + rank := cps.peerDownloadDurationToRank(mockPeer, testDuration) + require.Equal(t, peerRank0HighBlockTime, rank) + + // The peer is not found in any of the selectors, so the rank should be peerRankInvalidDownload + mockPeer2 := &peerSelectorPeer{} + + rank = cps.peerDownloadDurationToRank(mockPeer2, testDuration) + require.Equal(t, peerRankInvalidDownload, rank) +} + +func TestClassBasedPeerSelector_getNextPeer(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + mockPeer := &peerSelectorPeer{ + peerClass: network.PeersPhonebookRelays, + } + + // Create a class based peer selector initially with the first wrapped peer selector not having any peers, + // second one having a peer, and a third one not having any peers + wrappedPeerSelectors := []*wrappedPeerSelector{ + { + peerClass: network.PeersConnectedOut, + peerSelector: mockPeerSelector{ + mockGetNextPeer: func() (psp *peerSelectorPeer, err error) { + return nil, errPeerSelectorNoPeerPoolsAvailable + }, + }, + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookRelays, + peerSelector: mockPeerSelector{ + mockGetNextPeer: func() (psp *peerSelectorPeer, err error) { + return mockPeer, nil + }, + }, + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookArchivalNodes, + peerSelector: mockPeerSelector{ + mockGetNextPeer: func() (psp *peerSelectorPeer, err error) { + return nil, errPeerSelectorNoPeerPoolsAvailable + }, + }, + toleranceFactor: 3, + }, + } + + cps := makeClassBasedPeerSelector(wrappedPeerSelectors) + + peerResult, err := cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockPeer, peerResult) + + // Update selector to not return any peers + wrappedPeerSelectors[1].peerSelector = mockPeerSelector{ + mockGetNextPeer: func() (psp *peerSelectorPeer, err error) { + return nil, errPeerSelectorNoPeerPoolsAvailable + }, + } + + peerResult, err = cps.getNextPeer() + require.Nil(t, peerResult) + require.Equal(t, errPeerSelectorNoPeerPoolsAvailable, err) + + // Create a class based peer selector initially with all wrapped peer selectors having peers. + // The peers should always come from the first one repeatedly since rankings are not changed. + mockPeer = &peerSelectorPeer{ + peerClass: network.PeersConnectedOut, + } + mockPeer2 := &peerSelectorPeer{ + peerClass: network.PeersPhonebookRelays, + } + mockPeer3 := &peerSelectorPeer{ + peerClass: network.PeersPhonebookArchivalNodes, + } + + wrappedPeerSelectors = []*wrappedPeerSelector{ + { + peerClass: network.PeersConnectedOut, + peerSelector: mockPeerSelector{ + mockGetNextPeer: func() (psp *peerSelectorPeer, err error) { + return mockPeer, nil + }, + mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) { + if psp == mockPeer { + return 10, rank + } + return -1, -1 + }, + }, + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookRelays, + peerSelector: mockPeerSelector{ + mockGetNextPeer: func() (psp *peerSelectorPeer, err error) { + return mockPeer2, nil + }, + mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) { + if psp == mockPeer2 { + return 10, rank + } + return -1, -1 + }, + }, + toleranceFactor: 10, + }, + { + peerClass: network.PeersPhonebookArchivalNodes, + peerSelector: mockPeerSelector{ + mockGetNextPeer: func() (psp *peerSelectorPeer, err error) { + return mockPeer3, nil + }, + mockRankPeer: func(psp *peerSelectorPeer, rank int) (int, int) { + if psp == mockPeer3 { + return 10, rank + } + return -1, -1 + }, + }, + toleranceFactor: 3, + }, + } + + cps = makeClassBasedPeerSelector(wrappedPeerSelectors) + + // We should always get the peer from the top priority selector since rankings are not updated/list is not re-sorted. + for i := 0; i < 10; i++ { + peerResult, err = cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockPeer, peerResult) + } + + // Okay, record enough download failures to disable the first selector + for i := 0; i < 4; i++ { + cps.rankPeer(mockPeer, peerRankNoBlockForRound) + } + + // Now, we should get the peer from the second selector + peerResult, err = cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockPeer2, peerResult) + + // Sanity check the download failures for each selector + require.Equal(t, 4, cps.peerSelectors[0].downloadFailures) + require.Equal(t, 0, cps.peerSelectors[1].downloadFailures) + require.Equal(t, 0, cps.peerSelectors[2].downloadFailures) + + // Now, record download failures just up to the tolerance factor for the second selector + for i := 0; i < 10; i++ { + cps.rankPeer(mockPeer2, peerRankNoBlockForRound) + } + + peerResult, err = cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockPeer2, peerResult) + + // One more should push us to the third selector + cps.rankPeer(mockPeer2, peerRankNoBlockForRound) + peerResult, err = cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockPeer3, peerResult) + + // Check of the download failures for each selector + require.Equal(t, 4, cps.peerSelectors[0].downloadFailures) + require.Equal(t, 11, cps.peerSelectors[1].downloadFailures) + require.Equal(t, 0, cps.peerSelectors[2].downloadFailures) + + // Now, record download failures just up to the tolerance factor for the third selector + for i := 0; i < 3; i++ { + cps.rankPeer(mockPeer3, peerRankNoBlockForRound) + } + + peerResult, err = cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockPeer3, peerResult) + + require.Equal(t, 4, cps.peerSelectors[0].downloadFailures) + require.Equal(t, 11, cps.peerSelectors[1].downloadFailures) + require.Equal(t, 3, cps.peerSelectors[2].downloadFailures) + + // One more failure should reset ALL download failures (and grab a peer from the first selector) + cps.rankPeer(mockPeer3, peerRankNoBlockForRound) + + peerResult, err = cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockPeer, peerResult) + + // Check of the download failures for each selector, should have been reset + require.Equal(t, 0, cps.peerSelectors[0].downloadFailures) + require.Equal(t, 0, cps.peerSelectors[1].downloadFailures) + require.Equal(t, 0, cps.peerSelectors[2].downloadFailures) +} + +func TestClassBasedPeerSelector_integration(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + mockP1Peer := mockHTTPPeer{address: "p1"} + mockP2Peer := mockHTTPPeer{address: "p2"} + + mockP1WrappedPeer := &peerSelectorPeer{&mockP1Peer, network.PeersPhonebookRelays} + mockP2WrappedPeer := &peerSelectorPeer{&mockP2Peer, network.PeersPhonebookArchivalNodes} + + net := makePeersRetrieverStub(func(options ...network.PeerOption) []network.Peer { + if len(options) > 0 { + switch options[0] { + case network.PeersPhonebookRelays: + return []network.Peer{&mockP1Peer} + case network.PeersPhonebookArchivalNodes: + return []network.Peer{&mockP2Peer} + default: + return []network.Peer{&mockP1Peer, &mockP2Peer} + } + } + return nil + }) + // Create a class based peer selector with a few wrapped peer selectors + cps := makeCatchpointPeerSelector(net).(*classBasedPeerSelector) + + // We should get the peer from the first priority selector, PeersPhonebookRelays + peerResult, err := cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockP1WrappedPeer, peerResult) + + // Normal expected usage: rank the peer + durationRank := cps.peerDownloadDurationToRank(mockP1WrappedPeer, 500) + oldRank, newRank := cps.rankPeer(mockP1WrappedPeer, durationRank) + + require.Equal(t, 0, oldRank) + require.Equal(t, durationRank, newRank) + + // Let's simulate a few download failures (not enough to disable the selector) + for i := 0; i < 3; i++ { + expectedOldRank := newRank + peerResult, err = cps.getNextPeer() + + require.Nil(t, err) + require.Equal(t, mockP1WrappedPeer, peerResult) + + oldRank, newRank = cps.rankPeer(mockP1WrappedPeer, peerRankNoBlockForRound) + + require.Equal(t, expectedOldRank, oldRank) + // Should be increasing with no block penalties + require.True(t, newRank >= oldRank) + } + + // Sanity check, still should be the same peer (from phonebook selector) + peerResult, err = cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockP1WrappedPeer, peerResult) + + // Rank the peer to follow normal usage + durationRank = cps.peerDownloadDurationToRank(mockP1WrappedPeer, 500) + expectedOldRank := newRank + oldRank, newRank = cps.rankPeer(mockP1WrappedPeer, durationRank) + + require.Equal(t, expectedOldRank, oldRank) + // Rank should not go up after successful download + require.True(t, newRank <= oldRank) + + // Now, let's simulate enough download failures to disable the first selector + peerResult, err = cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockP1WrappedPeer, peerResult) + cps.rankPeer(mockP1WrappedPeer, peerRankNoBlockForRound) + + peerResult, err = cps.getNextPeer() + require.Nil(t, err) + require.Equal(t, mockP2WrappedPeer, peerResult) + + // Normal expected usage: rank the peer + durationRank = cps.peerDownloadDurationToRank(mockP2WrappedPeer, 500) + oldRank, newRank = cps.rankPeer(mockP2WrappedPeer, durationRank) + + require.Equal(t, 0, oldRank) + require.Equal(t, durationRank, newRank) + + require.Equal(t, 4, cps.peerSelectors[0].downloadFailures) + require.Equal(t, 0, cps.peerSelectors[1].downloadFailures) +} diff --git a/catchup/peerSelector.go b/catchup/peerSelector.go index 4ceda8d42..148529558 100644 --- a/catchup/peerSelector.go +++ b/catchup/peerSelector.go @@ -88,7 +88,7 @@ type peerClass struct { peerClass network.PeerOption } -// the peersRetriever is a subset of the network.GossipNode used to ensure that we can create an instance of the peerSelector +// the peersRetriever is a subset of the network.GossipNode used to ensure that we can create an instance of the rankPooledPeerSelector // for testing purposes, providing just the above function. type peersRetriever interface { // Get a list of Peers we could potentially send a direct message to. @@ -109,14 +109,20 @@ type peerPool struct { peers []peerPoolEntry } -// peerSelector is a helper struct used to select the next peer to try and connect to +type peerSelector interface { + rankPeer(psp *peerSelectorPeer, rank int) (int, int) + peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) + getNextPeer() (psp *peerSelectorPeer, err error) +} + +// rankPooledPeerSelector is a helper struct used to select the next peer to try and connect to // for various catchup purposes. Unlike the underlying network GetPeers(), it allows the // client to provide feedback regarding the peer's performance, and to have the subsequent // query(s) take advantage of that intel. -type peerSelector struct { +type rankPooledPeerSelector struct { mu deadlock.Mutex net peersRetriever - // peerClasses is the list of peer classes we want to have in the peerSelector. + // peerClasses is the list of peer classes we want to have in the rankPooledPeerSelector. peerClasses []peerClass // pools is the list of peer pools, each pool contains a list of peers with the same rank. pools []peerPool @@ -284,9 +290,9 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera return bounded } -// makePeerSelector creates a peerSelector, given a peersRetriever and peerClass array. -func makePeerSelector(net peersRetriever, initialPeersClasses []peerClass) *peerSelector { - selector := &peerSelector{ +// makeRankPooledPeerSelector creates a rankPooledPeerSelector, given a peersRetriever and peerClass array. +func makeRankPooledPeerSelector(net peersRetriever, initialPeersClasses []peerClass) *rankPooledPeerSelector { + selector := &rankPooledPeerSelector{ net: net, peerClasses: initialPeersClasses, } @@ -296,7 +302,7 @@ func makePeerSelector(net peersRetriever, initialPeersClasses []peerClass) *peer // getNextPeer returns the next peer. It randomally selects a peer from a pool that has // the lowest rank value. Given that the peers are grouped by their ranks, allow us to // prioritize peers based on their class and/or performance. -func (ps *peerSelector) getNextPeer() (psp *peerSelectorPeer, err error) { +func (ps *rankPooledPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) { ps.mu.Lock() defer ps.mu.Unlock() ps.refreshAvailablePeers() @@ -317,7 +323,7 @@ func (ps *peerSelector) getNextPeer() (psp *peerSelectorPeer, err error) { // rankPeer ranks a given peer. // return the old value and the new updated value. // updated value could be different from the input rank. -func (ps *peerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) { +func (ps *rankPooledPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) { if psp == nil { return -1, -1 } @@ -384,7 +390,7 @@ func (ps *peerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) { } // peerDownloadDurationToRank calculates the rank for a peer given a peer and the block download time. -func (ps *peerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { +func (ps *rankPooledPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { ps.mu.Lock() defer ps.mu.Unlock() poolIdx, peerIdx := ps.findPeer(psp) @@ -409,7 +415,7 @@ func (ps *peerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockD // addToPool adds a given peer to the correct group. If no group exists for that peer's rank, // a new group is created. // The method return true if a new group was created ( suggesting that the pools list would need to be re-ordered ), or false otherwise. -func (ps *peerSelector) addToPool(peer network.Peer, rank int, class peerClass, peerHistory *historicStats) bool { +func (ps *rankPooledPeerSelector) addToPool(peer network.Peer, rank int, class peerClass, peerHistory *historicStats) bool { // see if we already have a list with that rank: for i, pool := range ps.pools { if pool.rank == rank { @@ -423,7 +429,7 @@ func (ps *peerSelector) addToPool(peer network.Peer, rank int, class peerClass, } // sort the pools array in an ascending order according to the rank of each pool. -func (ps *peerSelector) sort() { +func (ps *rankPooledPeerSelector) sort() { sort.SliceStable(ps.pools, func(i, j int) bool { return ps.pools[i].rank < ps.pools[j].rank }) @@ -443,7 +449,7 @@ func peerAddress(peer network.Peer) string { // refreshAvailablePeers reload the available peers from the network package, add new peers along with their // corresponding initial rank, and deletes peers that have been dropped by the network package. -func (ps *peerSelector) refreshAvailablePeers() { +func (ps *rankPooledPeerSelector) refreshAvailablePeers() { existingPeers := make(map[network.PeerOption]map[string]bool) for _, pool := range ps.pools { for _, localPeer := range pool.peers { @@ -501,7 +507,7 @@ func (ps *peerSelector) refreshAvailablePeers() { // findPeer look into the peer pool and find the given peer. // The method returns the pool and peer indices if a peer was found, or (-1, -1) otherwise. -func (ps *peerSelector) findPeer(psp *peerSelectorPeer) (poolIdx, peerIdx int) { +func (ps *rankPooledPeerSelector) findPeer(psp *peerSelectorPeer) (poolIdx, peerIdx int) { peerAddr := peerAddress(psp.Peer) if peerAddr == "" { return -1, -1 diff --git a/catchup/peerSelector_test.go b/catchup/peerSelector_test.go index aa8d348d4..7aa373d28 100644 --- a/catchup/peerSelector_test.go +++ b/catchup/peerSelector_test.go @@ -131,7 +131,7 @@ func TestPeerSelector_RankPeer(t *testing.T) { peers := []network.Peer{&mockHTTPPeer{address: "12345"}} - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) []network.Peer { return peers }), []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}, @@ -191,7 +191,7 @@ func TestPeerSelector_PeerDownloadRanking(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "1234"}, &mockHTTPPeer{address: "5678"}} peers2 := []network.Peer{&mockHTTPPeer{address: "abcd"}, &mockHTTPPeer{address: "efgh"}} - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { for _, opt := range options { if opt == network.PeersPhonebookArchivalNodes { @@ -240,7 +240,7 @@ func TestPeerSelector_FindMissingPeer(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) []network.Peer { return []network.Peer{} }), []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}, @@ -258,7 +258,7 @@ func TestPeerSelector_HistoricData(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}, &mockHTTPPeer{address: "a3"}} peers2 := []network.Peer{&mockHTTPPeer{address: "b1"}, &mockHTTPPeer{address: "b2"}} - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { for _, opt := range options { if opt == network.PeersPhonebookArchivalNodes { @@ -332,7 +332,7 @@ func TestPeerSelector_PeersDownloadFailed(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}, &mockHTTPPeer{address: "a3"}} peers2 := []network.Peer{&mockHTTPPeer{address: "b1"}, &mockHTTPPeer{address: "b2"}} - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { for _, opt := range options { if opt == network.PeersPhonebookArchivalNodes { @@ -408,7 +408,7 @@ func TestPeerSelector_Penalty(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}, &mockHTTPPeer{address: "a3"}} peers2 := []network.Peer{&mockHTTPPeer{address: "b1"}, &mockHTTPPeer{address: "b2"}} - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { for _, opt := range options { if opt == network.PeersPhonebookArchivalNodes { @@ -469,7 +469,7 @@ func TestPeerSelector_PeerDownloadDurationToRank(t *testing.T) { peers3 := []network.Peer{&mockHTTPPeer{address: "c1"}, &mockHTTPPeer{address: "c2"}} peers4 := []network.Peer{&mockHTTPPeer{address: "d1"}, &mockHTTPPeer{address: "b2"}} - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { for _, opt := range options { if opt == network.PeersPhonebookRelays { @@ -574,7 +574,7 @@ func TestPeerSelector_ClassUpperBound(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}} pClass := peerClass{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes} - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { for _, opt := range options { if opt == network.PeersPhonebookArchivalNodes { @@ -609,7 +609,7 @@ func TestPeerSelector_ClassLowerBound(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}} pClass := peerClass{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes} - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { for _, opt := range options { if opt == network.PeersPhonebookArchivalNodes { @@ -639,7 +639,7 @@ func TestPeerSelector_EvictionAndUpgrade(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}} peers2 := []network.Peer{&mockHTTPPeer{address: "a1"}} - peerSelector := makePeerSelector( + peerSelector := makeRankPooledPeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { for _, opt := range options { if opt == network.PeersPhonebookArchivalNodes { @@ -677,7 +677,7 @@ func TestPeerSelector_RefreshAvailablePeers(t *testing.T) { // check new peers added to the pool p1 := mockHTTPPeer{address: "p1"} p2 := mockHTTPPeer{address: "p2"} - ps := peerSelector{ + ps := rankPooledPeerSelector{ peerClasses: []peerClass{ {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut}, {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes}, diff --git a/catchup/service.go b/catchup/service.go index 58fc3ae6b..5c6609b23 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -38,10 +38,7 @@ import ( "github.com/algorand/go-algorand/util/execpool" ) -const catchupPeersForSync = 10 -const blockQueryPeerLimit = 10 - -// uncapParallelDownloadRate is a simple threshold to detect whether or not the node is caught up. +// uncapParallelDownloadRate is a simple threshold to detect whether the node is caught up. // If a block is downloaded in less than this duration, it's assumed that the node is not caught up // and allow the block downloader to start N=parallelBlocks concurrent fetches. const uncapParallelDownloadRate = time.Second @@ -76,7 +73,7 @@ type Ledger interface { WaitMem(r basics.Round) chan struct{} } -// Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network. +// Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up-to-date with network. type Service struct { // disableSyncRound, provided externally, is the first round we will _not_ fetch from the network // any round >= disableSyncRound will not be fetched. If set to 0, it will be disregarded. @@ -266,7 +263,7 @@ const errNoBlockForRoundThreshold = 5 // - If we couldn't fetch the block (e.g. if there are no peers available, or we've reached the catchupRetryLimit) // - If the block is already in the ledger (e.g. if agreement service has already written it) // - If the retrieval of the previous block was unsuccessful -func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCompleteChan chan struct{}, lookbackComplete chan struct{}, peerSelector *peerSelector) bool { +func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCompleteChan chan struct{}, lookbackComplete chan struct{}, peerSelector peerSelector) bool { // If sync-ing this round is not intended, don't fetch it if dontSyncRound := s.GetDisableSyncRound(); dontSyncRound != 0 && r >= basics.Round(dontSyncRound) { return false @@ -318,7 +315,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo block, cert, blockDownloadDuration, err := s.innerFetch(ctx, r, peer) if err != nil { - if err == errLedgerAlreadyHasBlock { + if errors.Is(err, errLedgerAlreadyHasBlock) { // ledger already has the block, no need to request this block. // only the agreement could have added this block into the ledger, catchup is complete s.log.Infof("fetchAndWrite(%d): the block is already in the ledger. The catchup is complete", r) @@ -329,7 +326,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo if errors.As(err, &nbfe) { failureRank = peerRankNoBlockForRound // remote peer doesn't have the block, try another peer - // quit if the the same peer peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times + // quit if the same peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times if s.followLatest { // back off between retries to allow time for the next block to appear; // this will provide 50s (catchupRetryLimit * followLatestBackoff) of @@ -427,7 +424,8 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo // if the context expired, just exit. return false } - if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound { + var errNSBE ledgercore.ErrNonSequentialBlockEval + if errors.As(err, &errNSBE) && errNSBE.EvaluatorRound <= errNSBE.LatestRound { // the block was added to the ledger from elsewhere after fetching it here // only the agreement could have added this block into the ledger, catchup is complete s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r) @@ -442,16 +440,19 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo } if err != nil { - switch err.(type) { - case ledgercore.ErrNonSequentialBlockEval: + var errNonSequentialBlockEval ledgercore.ErrNonSequentialBlockEval + var blockInLedgerError ledgercore.BlockInLedgerError + var protocolErr protocol.Error + switch { + case errors.As(err, &errNonSequentialBlockEval): s.log.Infof("fetchAndWrite(%d): no need to re-evaluate historical block", r) return true - case ledgercore.BlockInLedgerError: + case errors.As(err, &blockInLedgerError): // the block was added to the ledger from elsewhere after fetching it here // only the agreement could have added this block into the ledger, catchup is complete s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r) return false - case protocol.Error: + case errors.As(err, &protocolErr): if !s.protocolErrorLogged { logging.Base().Errorf("fetchAndWrite(%v): unrecoverable protocol error detected: %v", r, err) s.protocolErrorLogged = true @@ -491,8 +492,8 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { } }() - peerSelector := createPeerSelector(s.net, s.cfg, true) - if _, err := peerSelector.getNextPeer(); err == errPeerSelectorNoPeerPoolsAvailable { + ps := createPeerSelector(s.net) + if _, err := ps.getNextPeer(); err != nil { s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from") return } @@ -527,7 +528,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { go func(r basics.Round) { prev := s.ledger.WaitMem(r - 1) seed := s.ledger.WaitMem(r.SubSaturate(basics.Round(seedLookback))) - done <- s.fetchAndWrite(ctx, r, prev, seed, peerSelector) + done <- s.fetchAndWrite(ctx, r, prev, seed, ps) wg.Done() }(nextRound) @@ -751,9 +752,9 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy peerErrors := map[network.Peer]int{} blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest - peerSelector := createPeerSelector(s.net, s.cfg, false) + ps := createPeerSelector(s.net) for s.ledger.LastRound() < cert.Round { - psp, getPeerErr := peerSelector.getNextPeer() + psp, getPeerErr := ps.getNextPeer() if getPeerErr != nil { s.log.Debugf("fetchRound: was unable to obtain a peer to retrieve the block from") s.net.RequestConnectOutgoing(true, s.ctx.Done()) @@ -783,19 +784,19 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy time.Sleep(50 * time.Millisecond) } if count > errNoBlockForRoundThreshold*10 { - // for the low number of connected peers (like 2) the following scenatio is possible: + // for the low number of connected peers (like 2) the following scenario is possible: // - both peers do not have the block // - peer selector punishes one of the peers more than the other - // - the punoshed peer gets the block, and the less punished peer stucks. + // - the punished peer gets the block, and the less punished peer stucks. // It this case reset the peer selector to let it re-learn priorities. - peerSelector = createPeerSelector(s.net, s.cfg, false) + ps = createPeerSelector(s.net) } } peerErrors[peer]++ } // remote peer doesn't have the block, try another peer logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err) - peerSelector.rankPeer(psp, failureRank) + ps.rankPeer(psp, failureRank) continue } @@ -805,7 +806,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy } // Otherwise, fetcher gave us the wrong block logging.Base().Warnf("fetcher gave us bad/wrong block (for round %d): fetched hash %v; want hash %v", cert.Round, block.Hash(), blockHash) - peerSelector.rankPeer(psp, peerRankInvalidDownload) + ps.rankPeer(psp, peerRankInvalidDownload) // As a failsafe, if the cert we fetched is valid but for the wrong block, panic as loudly as possible if cert.Round == fetchedCert.Round && @@ -866,38 +867,33 @@ func (s *Service) roundIsNotSupported(nextRound basics.Round) bool { return true } -func createPeerSelector(net network.GossipNode, cfg config.Local, pipelineFetch bool) *peerSelector { - var peerClasses []peerClass - if pipelineFetch { - if cfg.NetAddress != "" && cfg.EnableGossipService { // Relay node - peerClasses = []peerClass{ - {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut}, - {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes}, - {initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays}, - {initialRank: peerRankInitialFourthPriority, peerClass: network.PeersConnectedIn}, - } - } else { - peerClasses = []peerClass{ - {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}, - {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedOut}, - {initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays}, - } - } - } else { - if cfg.NetAddress != "" && cfg.EnableGossipService { // Relay node - peerClasses = []peerClass{ - {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut}, - {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn}, - {initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookArchivalNodes}, - {initialRank: peerRankInitialFourthPriority, peerClass: network.PeersPhonebookRelays}, - } - } else { - peerClasses = []peerClass{ - {initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut}, - {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivalNodes}, - {initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays}, - } - } +func createPeerSelector(net network.GossipNode) peerSelector { + wrappedPeerSelectors := []*wrappedPeerSelector{ + { + peerClass: network.PeersConnectedOut, + peerSelector: makeRankPooledPeerSelector(net, + []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut}}), + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookRelays, + peerSelector: makeRankPooledPeerSelector(net, + []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays}}), + toleranceFactor: 3, + }, + { + peerClass: network.PeersPhonebookArchivalNodes, + peerSelector: makeRankPooledPeerSelector(net, + []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}), + toleranceFactor: 10, + }, + { + peerClass: network.PeersConnectedIn, + peerSelector: makeRankPooledPeerSelector(net, + []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedIn}}), + toleranceFactor: 3, + }, } - return makePeerSelector(net, peerClasses) + + return makeClassBasedPeerSelector(wrappedPeerSelectors) } diff --git a/catchup/service_test.go b/catchup/service_test.go index 8deb692b0..045a0438f 100644 --- a/catchup/service_test.go +++ b/catchup/service_test.go @@ -958,102 +958,23 @@ func TestCatchupUnmatchedCertificate(t *testing.T) { func TestCreatePeerSelector(t *testing.T) { partitiontest.PartitionTest(t) - // Make Service - cfg := defaultConfig + s := MakeService(logging.Base(), defaultConfig, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil) + ps := createPeerSelector(s.net) + + cps, ok := ps.(*classBasedPeerSelector) + require.True(t, ok) + + require.Equal(t, 4, len(cps.peerSelectors)) + + require.Equal(t, network.PeersConnectedOut, cps.peerSelectors[0].peerClass) + require.Equal(t, network.PeersPhonebookRelays, cps.peerSelectors[1].peerClass) + require.Equal(t, network.PeersPhonebookArchivalNodes, cps.peerSelectors[2].peerClass) + require.Equal(t, network.PeersConnectedIn, cps.peerSelectors[3].peerClass) - // cfg.NetAddress != ""; cfg.EnableGossipService = true; pipelineFetch = true - cfg.NetAddress = "someAddress" - cfg.EnableGossipService = true - s := MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil) - ps := createPeerSelector(s.net, s.cfg, true) - - require.Equal(t, 4, len(ps.peerClasses)) - require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank) - require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank) - require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank) - require.Equal(t, peerRankInitialFourthPriority, ps.peerClasses[3].initialRank) - - require.Equal(t, network.PeersConnectedOut, ps.peerClasses[0].peerClass) - require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[1].peerClass) - require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass) - require.Equal(t, network.PeersConnectedIn, ps.peerClasses[3].peerClass) - - // cfg.NetAddress == ""; cfg.EnableGossipService = true; pipelineFetch = true - cfg.NetAddress = "" - cfg.EnableGossipService = true - s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil) - ps = createPeerSelector(s.net, s.cfg, true) - - require.Equal(t, 3, len(ps.peerClasses)) - require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank) - require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank) - require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank) - - require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[0].peerClass) - require.Equal(t, network.PeersConnectedOut, ps.peerClasses[1].peerClass) - require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass) - - // cfg.NetAddress != ""; cfg.EnableGossipService = false; pipelineFetch = true - cfg.NetAddress = "someAddress" - cfg.EnableGossipService = false - s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil) - ps = createPeerSelector(s.net, s.cfg, true) - - require.Equal(t, 3, len(ps.peerClasses)) - require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank) - require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank) - require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank) - - require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[0].peerClass) - require.Equal(t, network.PeersConnectedOut, ps.peerClasses[1].peerClass) - require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass) - - // cfg.NetAddress != ""; cfg.EnableGossipService = true; pipelineFetch = false - cfg.NetAddress = "someAddress" - cfg.EnableGossipService = true - s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil) - ps = createPeerSelector(s.net, s.cfg, false) - - require.Equal(t, 4, len(ps.peerClasses)) - require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank) - require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank) - require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank) - require.Equal(t, peerRankInitialFourthPriority, ps.peerClasses[3].initialRank) - - require.Equal(t, network.PeersConnectedOut, ps.peerClasses[0].peerClass) - require.Equal(t, network.PeersConnectedIn, ps.peerClasses[1].peerClass) - require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[2].peerClass) - require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[3].peerClass) - - // cfg.NetAddress == ""; cfg.EnableGossipService = true; pipelineFetch = false - cfg.NetAddress = "" - cfg.EnableGossipService = true - s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil) - ps = createPeerSelector(s.net, s.cfg, false) - - require.Equal(t, 3, len(ps.peerClasses)) - require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank) - require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank) - require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank) - - require.Equal(t, network.PeersConnectedOut, ps.peerClasses[0].peerClass) - require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[1].peerClass) - require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass) - - // cfg.NetAddress != ""; cfg.EnableGossipService = false; pipelineFetch = false - cfg.NetAddress = "someAddress" - cfg.EnableGossipService = false - s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil) - ps = createPeerSelector(s.net, s.cfg, false) - - require.Equal(t, 3, len(ps.peerClasses)) - require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank) - require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank) - require.Equal(t, peerRankInitialThirdPriority, ps.peerClasses[2].initialRank) - - require.Equal(t, network.PeersConnectedOut, ps.peerClasses[0].peerClass) - require.Equal(t, network.PeersPhonebookArchivalNodes, ps.peerClasses[1].peerClass) - require.Equal(t, network.PeersPhonebookRelays, ps.peerClasses[2].peerClass) + require.Equal(t, 3, cps.peerSelectors[0].toleranceFactor) + require.Equal(t, 3, cps.peerSelectors[1].toleranceFactor) + require.Equal(t, 10, cps.peerSelectors[2].toleranceFactor) + require.Equal(t, 3, cps.peerSelectors[3].toleranceFactor) } func TestServiceStartStop(t *testing.T) { |