summaryrefslogtreecommitdiff
path: root/network/messageFilter.go
blob: b098eddf7983f06b547297a20eec4a71014e3f39 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand.  If not, see <https://www.gnu.org/licenses/>.

package network

import (
	"github.com/algorand/go-deadlock"

	"github.com/algorand/go-algorand/crypto"
	"github.com/algorand/go-algorand/protocol"
)

// IncomingMessage represents a message arriving from some peer in our p2p network
type messageFilter struct {
	deadlock.Mutex
	buckets          []map[crypto.Digest]struct{}
	maxBucketSize    int
	currentTopBucket int
	nonce            [16]byte
}

func makeMessageFilter(bucketsCount, maxBucketSize int) *messageFilter {
	mf := &messageFilter{
		buckets:          make([]map[crypto.Digest]struct{}, bucketsCount),
		maxBucketSize:    maxBucketSize,
		currentTopBucket: 0,
	}
	crypto.RandBytes(mf.nonce[:])
	mf.buckets[mf.currentTopBucket] = make(map[crypto.Digest]struct{}, mf.maxBucketSize)
	return mf
}

// CheckMessage checks if the given tag/msg already in the collection, and return true if it was there before the call.
// Prepends our own random secret to the message to make it hard to abuse hash collisions.
func (f *messageFilter) CheckIncomingMessage(tag protocol.Tag, msg []byte, add bool, promote bool) bool {
	hasher := crypto.NewHash()
	hasher.Write(f.nonce[:])
	hasher.Write([]byte(tag))
	hasher.Write(msg)
	var digest crypto.Digest
	hasher.Sum(digest[:0])
	return f.CheckDigest(digest, add, promote)
}

// CheckDigest checks if the given digest already in the collection, and return true if it was there before the call.
// CheckDigest is used on outgoing messages, either given a hash from a peer notifying us of messages it doesn't need, or as we are about to send a message to see if we should send it.
func (f *messageFilter) CheckDigest(msgHash crypto.Digest, add bool, promote bool) bool {
	f.Lock()
	defer f.Unlock()
	idx, has := f.find(msgHash)
	if !add {
		return has
	}

	if !has {
		// we don't have this entry. add it.
		f.buckets[f.currentTopBucket][msgHash] = struct{}{}
	} else {
		// we already have it.
		// do we need to promote it ?
		if promote && f.currentTopBucket != idx {
			delete(f.buckets[idx], msgHash)
			f.buckets[f.currentTopBucket][msgHash] = struct{}{}
		}
	}
	// check to see if the current bucket reached capacity.
	if len(f.buckets[f.currentTopBucket]) >= f.maxBucketSize {
		f.currentTopBucket = (f.currentTopBucket + len(f.buckets) - 1) % len(f.buckets)
		f.buckets[f.currentTopBucket] = make(map[crypto.Digest]struct{}, f.maxBucketSize)
	}

	return has
}

func generateMessageDigest(tag protocol.Tag, msg []byte) crypto.Digest {
	hasher := crypto.NewHash()
	hasher.Write([]byte(tag))
	hasher.Write(msg)
	var digest crypto.Digest
	hasher.Sum(digest[:0])
	return digest
}

func (f *messageFilter) find(digest crypto.Digest) (idx int, found bool) {
	for i := len(f.buckets); i > 0; i-- {
		bucketIdx := (f.currentTopBucket + i) % len(f.buckets)
		if _, has := f.buckets[bucketIdx][digest]; has {
			return bucketIdx, true
		}
	}
	return -1, false
}