1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
package network
import (
"github.com/algorand/go-deadlock"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/protocol"
)
// IncomingMessage represents a message arriving from some peer in our p2p network
type messageFilter struct {
deadlock.Mutex
buckets []map[crypto.Digest]struct{}
maxBucketSize int
currentTopBucket int
nonce [16]byte
}
func makeMessageFilter(bucketsCount, maxBucketSize int) *messageFilter {
mf := &messageFilter{
buckets: make([]map[crypto.Digest]struct{}, bucketsCount),
maxBucketSize: maxBucketSize,
currentTopBucket: 0,
}
crypto.RandBytes(mf.nonce[:])
mf.buckets[mf.currentTopBucket] = make(map[crypto.Digest]struct{}, mf.maxBucketSize)
return mf
}
// CheckMessage checks if the given tag/msg already in the collection, and return true if it was there before the call.
// Prepends our own random secret to the message to make it hard to abuse hash collisions.
func (f *messageFilter) CheckIncomingMessage(tag protocol.Tag, msg []byte, add bool, promote bool) bool {
hasher := crypto.NewHash()
hasher.Write(f.nonce[:])
hasher.Write([]byte(tag))
hasher.Write(msg)
var digest crypto.Digest
hasher.Sum(digest[:0])
return f.CheckDigest(digest, add, promote)
}
// CheckDigest checks if the given digest already in the collection, and return true if it was there before the call.
// CheckDigest is used on outgoing messages, either given a hash from a peer notifying us of messages it doesn't need, or as we are about to send a message to see if we should send it.
func (f *messageFilter) CheckDigest(msgHash crypto.Digest, add bool, promote bool) bool {
f.Lock()
defer f.Unlock()
idx, has := f.find(msgHash)
if !add {
return has
}
if !has {
// we don't have this entry. add it.
f.buckets[f.currentTopBucket][msgHash] = struct{}{}
} else {
// we already have it.
// do we need to promote it ?
if promote && f.currentTopBucket != idx {
delete(f.buckets[idx], msgHash)
f.buckets[f.currentTopBucket][msgHash] = struct{}{}
}
}
// check to see if the current bucket reached capacity.
if len(f.buckets[f.currentTopBucket]) >= f.maxBucketSize {
f.currentTopBucket = (f.currentTopBucket + len(f.buckets) - 1) % len(f.buckets)
f.buckets[f.currentTopBucket] = make(map[crypto.Digest]struct{}, f.maxBucketSize)
}
return has
}
func generateMessageDigest(tag protocol.Tag, msg []byte) crypto.Digest {
hasher := crypto.NewHash()
hasher.Write([]byte(tag))
hasher.Write(msg)
var digest crypto.Digest
hasher.Sum(digest[:0])
return digest
}
func (f *messageFilter) find(digest crypto.Digest) (idx int, found bool) {
for i := len(f.buckets); i > 0; i-- {
bucketIdx := (f.currentTopBucket + i) % len(f.buckets)
if _, has := f.buckets[bucketIdx][digest]; has {
return bucketIdx, true
}
}
return -1, false
}
|