summaryrefslogtreecommitdiff
path: root/txnsync/peerscheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'txnsync/peerscheduler.go')
-rw-r--r--txnsync/peerscheduler.go200
1 files changed, 0 insertions, 200 deletions
diff --git a/txnsync/peerscheduler.go b/txnsync/peerscheduler.go
deleted file mode 100644
index fc1481c85..000000000
--- a/txnsync/peerscheduler.go
+++ /dev/null
@@ -1,200 +0,0 @@
-// Copyright (C) 2019-2021 Algorand, Inc.
-// This file is part of go-algorand
-//
-// go-algorand is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// go-algorand is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
-
-package txnsync
-
-import (
- "container/heap"
- "sort"
- "time"
-)
-
-//msgp:ignore peerBuckets
-type peerBuckets []peerBucket
-
-type peerScheduler struct {
- peers peerBuckets
- nextPeers map[*Peer][]int // nextPeers holds an array of ordered indices where this Peer object is on the peers peerBuckets
- node NodeConnector
-}
-
-// makePeerScheduler initializes a peer scheduler object.
-func makePeerScheduler() peerScheduler {
- return peerScheduler{
- nextPeers: make(map[*Peer][]int),
- }
-}
-
-//msgp:ignore peerBucket
-type peerBucket struct {
- peer *Peer
- next time.Duration
-}
-
-// Push implements heap.Interface
-func (p *peerScheduler) Push(x interface{}) {
- entry := x.(peerBucket)
- p.peers = append(p.peers, entry)
- p.nextPeers[entry.peer] = append(p.nextPeers[entry.peer], len(p.peers)-1)
-
- if len(p.nextPeers[entry.peer]) > 1 {
- peerIndices := p.nextPeers[entry.peer]
- sort.Slice(peerIndices, func(i, j int) bool {
- return p.peers[peerIndices[i]].next < p.peers[peerIndices[j]].next
- })
- }
-
-}
-
-// Pop implements heap.Interface
-func (p *peerScheduler) Pop() interface{} {
- end := len(p.peers) - 1
- res := p.peers[end]
-
- // delete from the map only if it's the last entry
- peerIndices := p.nextPeers[res.peer]
-
- if peerIndices[0] != end {
- // this case is possible when the peer has two elements in p.peers.
- // and both have the same next value.
- for idx, x := range peerIndices {
- if x == end {
- peerIndices[0], peerIndices[idx] = peerIndices[idx], peerIndices[0]
- break
- }
- }
- }
- // the peer index must be the first entry.
- peerIndices = peerIndices[1:]
-
- // store if non-empty.
- if len(peerIndices) > 0 {
- p.nextPeers[res.peer] = peerIndices
- } else {
- delete(p.nextPeers, res.peer)
- }
-
- p.peers[end] = peerBucket{}
- p.peers = p.peers[0:end]
- return res
-}
-
-// Len implements heap.Interface
-func (p *peerScheduler) Len() int {
- return len(p.peers)
-}
-
-func (p *peerScheduler) replaceIndices(indices []int, i, j int) {
-
- for idx, x := range indices {
- if x == i {
- indices[idx] = j
- } else if x == j {
- indices[idx] = i
- }
- }
- sort.Slice(indices, func(i, j int) bool {
- return p.peers[indices[i]].next < p.peers[indices[j]].next
- })
-}
-
-// Swap implements heap.Interface
-func (p *peerScheduler) Swap(i, j int) {
- p.peers[i], p.peers[j] = p.peers[j], p.peers[i]
- if p.peers[i].peer == p.peers[j].peer {
- indices := p.nextPeers[p.peers[i].peer]
- sort.Slice(indices, func(x, y int) bool {
- return p.peers[indices[x]].next < p.peers[indices[y]].next
- })
- return
- }
- p.replaceIndices(p.nextPeers[p.peers[i].peer], i, j)
- p.replaceIndices(p.nextPeers[p.peers[j].peer], i, j)
-}
-
-// Less implements heap.Interface
-func (p *peerScheduler) Less(i, j int) bool {
- return p.peers[i].next < p.peers[j].next
-}
-
-// refresh the current schedule by creating new schedule for each of the peers.
-func (p *peerScheduler) scheduleNewRound(peers []*Peer) {
- // clear the existings peers list.
- p.peers = make(peerBuckets, 0, len(peers))
- p.nextPeers = make(map[*Peer][]int)
- for _, peer := range peers {
- peerEntry := peerBucket{peer: peer}
- peerEntry.next = kickoffTime + time.Duration(p.node.Random(uint64(randomRange)))
-
- p.peers = append(p.peers, peerEntry)
- p.nextPeers[peer] = []int{len(p.peers) - 1}
- }
- heap.Init(p)
-}
-
-func (p *peerScheduler) nextDuration() time.Duration {
- if len(p.peers) == 0 {
- return time.Duration(0)
- }
- return p.peers[0].next
-}
-
-func (p *peerScheduler) getNextPeers() (outPeers []*Peer) {
- next := p.nextDuration()
-
- // pull out of the heap all the entries that have next smaller or equal to the above next.
- for len(p.peers) > 0 && p.peers[0].next <= next {
- bucket := heap.Remove(p, 0).(peerBucket)
- outPeers = append(outPeers, bucket.peer)
- }
-
- // in many cases, we'll have only a single peer; however, in case we have multiple
- // ( which is more likely when we're "running late" ), we want to make sure to remove
- // duplicate ones.
- if len(outPeers) > 1 {
- // note that the algorithm here ensures that we retain the peer order from above
- // while dropping off recurring peers.
- peersMap := make(map[*Peer]bool, len(outPeers))
- offset := 0
- peersMap[outPeers[0]] = true
- for i := 1; i < len(outPeers); i++ {
- if peersMap[outPeers[i]] {
- // we already had this peer.
- offset++
- continue
- }
- // we haven't seen this peer.
- outPeers[i-offset] = outPeers[i]
- peersMap[outPeers[i]] = true
- }
- outPeers = outPeers[:len(outPeers)-offset]
- }
- return
-}
-
-func (p *peerScheduler) schedulePeer(peer *Peer, next time.Duration) {
- bucket := peerBucket{peer: peer, next: next}
- heap.Push(p, bucket)
-}
-
-func (p *peerScheduler) peerDuration(peer *Peer) time.Duration {
- peerIndices := p.nextPeers[peer]
- if len(peerIndices) == 0 {
- return time.Duration(0)
- }
- bucket := heap.Remove(p, peerIndices[0]).(peerBucket)
- return bucket.next
-}