summaryrefslogtreecommitdiff
path: root/network/netprio.go
blob: 5cb122c11fdd43b7c82af61c99a1b76610d13ac0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright (C) 2019-2023 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand.  If not, see <https://www.gnu.org/licenses/>.

package network

import (
	"container/heap"

	"github.com/algorand/go-algorand/data/basics"
	"github.com/algorand/go-algorand/protocol"
)

// NetPrioScheme is an implementation of network connection priorities
// based on a challenge-response protocol.
type NetPrioScheme interface {
	NewPrioChallenge() string
	MakePrioResponse(challenge string) []byte
	VerifyPrioResponse(challenge string, response []byte) (basics.Address, error)
	GetPrioWeight(addr basics.Address) uint64
}

func prioResponseHandler(message IncomingMessage) OutgoingMessage {
	wn := message.Net.(*WebsocketNetwork)
	if wn.prioScheme == nil {
		return OutgoingMessage{}
	}

	peer := message.Sender.(*wsPeer)
	challenge := peer.prioChallenge
	if challenge == "" {
		return OutgoingMessage{}
	}

	addr, err := wn.prioScheme.VerifyPrioResponse(challenge, message.Data)
	if err != nil {
		wn.log.Warnf("prioScheme.VerifyPrioResponse from %s: %v", peer.rootURL, err)
	} else {
		weight := wn.prioScheme.GetPrioWeight(addr)

		wn.peersLock.Lock()
		defer wn.peersLock.Unlock()
		wn.prioTracker.setPriority(peer, addr, weight)
	}

	// For testing
	if wn.prioResponseChan != nil {
		wn.prioResponseChan <- peer
	}

	return OutgoingMessage{}
}

var prioHandlers = []TaggedMessageHandler{
	{protocol.NetPrioResponseTag, HandlerFunc(prioResponseHandler)},
}

// The prioTracker sorts active peers by priority, and ensures
// there's only one peer with weight per address.  The data
// structure is not thread-safe; it is protected by wn.peersLock.
type prioTracker struct {
	// If a peer has a non-zero prioWeight, it will be present in
	// this map under its peerAddress.
	peerByAddress map[basics.Address]*wsPeer

	wn *WebsocketNetwork
}

func newPrioTracker(wn *WebsocketNetwork) *prioTracker {
	return &prioTracker{
		peerByAddress: make(map[basics.Address]*wsPeer),
		wn:            wn,
	}
}

func (pt *prioTracker) setPriority(peer *wsPeer, addr basics.Address, weight uint64) {
	wn := pt.wn

	// Make sure this peer is currently in the peers slice
	if peer.peerIndex >= len(wn.peers) || wn.peers[peer.peerIndex] != peer {
		// The peer might be in the process of being added to wn.peers;
		// in this case, wn.addPeer() will call setPriority again and
		// we will finish setup in that call.
		peer.prioAddress = addr
		peer.prioWeight = weight
		return
	}

	// Evict old peer from same address, if present
	old, present := pt.peerByAddress[addr]
	if present {
		if old == peer {
			// No eviction necessary if it was already this peer
			if peer.prioAddress == addr && peer.prioWeight == weight {
				// Same address and weight, nothing to update
				return
			}
		} else if old.prioAddress == addr {
			old.prioWeight = 0
			if old.peerIndex < len(wn.peers) && wn.peers[old.peerIndex] == old {
				heap.Fix(peersHeap{wn}, old.peerIndex)
			}
		}
	}

	// Check if this peer was in peerByAddress[] under its old address,
	// and delete that mapping if so.
	if addr != peer.prioAddress && peer == pt.peerByAddress[peer.prioAddress] {
		delete(pt.peerByAddress, peer.prioAddress)
	}

	pt.peerByAddress[addr] = peer
	peer.prioAddress = addr
	peer.prioWeight = weight
	heap.Fix(peersHeap{wn}, peer.peerIndex)
	wn.peersChangeCounter.Add(1)
}

func (pt *prioTracker) removePeer(peer *wsPeer) {
	addr := peer.prioAddress
	old, present := pt.peerByAddress[addr]
	if present && old == peer {
		delete(pt.peerByAddress, addr)
	}
}