summaryrefslogtreecommitdiff
path: root/data/transactions/verify/verifiedTxnCache.go
blob: cb73a437113465a69f0e7fea2c06270a1f3178b2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand.  If not, see <https://www.gnu.org/licenses/>.

package verify

import (
	"errors"

	"github.com/algorand/go-deadlock"

	"github.com/algorand/go-algorand/data/transactions"
	"github.com/algorand/go-algorand/protocol"
)

const maxPinnedEntries = 500000

// VerifiedTxnCacheError helps to identify the errors of a cache error and diffrenciate these from a general verification errors.
type VerifiedTxnCacheError struct {
	inner error
}

// Unwrap provides access to the underlying error
func (e *VerifiedTxnCacheError) Unwrap() error {
	return e.inner
}

// Error formats the underlying error message
func (e *VerifiedTxnCacheError) Error() string {
	return e.inner.Error()
}

// errTooManyPinnedEntries is being generated when we attempt to pin an transaction while we've already exceeded the maximum number of allows
// transactions in the verification cache.
var errTooManyPinnedEntries = &VerifiedTxnCacheError{errors.New("Too many pinned entries")}

// errMissingPinnedEntry is being generated when we're trying to pin a transaction that does not appear in the cache
var errMissingPinnedEntry = &VerifiedTxnCacheError{errors.New("Missing pinned entry")}

// VerifiedTransactionCache provides a cached store of recently verified transactions. The cache is desiged two have two separate "levels". On the
// bottom tier, the cache would be using a cyclic buffer, where old transactions would end up overridden by new ones. In order to support transactions
// that goes into the transaction pool, we have a higher tier of pinned cache. Pinned transactions would not be cycled-away by new incoming transactions,
// and would only get eliminated by updates to the transaction pool, which would inform the cache of updates to the pinned items.
type VerifiedTransactionCache interface {
	// Add adds a given transaction group and it's associated group context to the cache. If any of the transactions already appear
	// in the cache, the new entry overrides the old one.
	Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext)
	// AddPayset works in a similar way to Add, but is intended for adding an array of transaction groups, along with their corresponding contexts.
	AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext)
	// GetUnverifiedTransactionGroups compares the provided payset against the currently cached transactions and figure which transaction groups aren't fully cached.
	GetUnverifiedTransactionGroups(payset [][]transactions.SignedTxn, CurrSpecAddrs transactions.SpecialAddresses, CurrProto protocol.ConsensusVersion) [][]transactions.SignedTxn
	// UpdatePinned replaces the pinned entries with the one provided in the pinnedTxns map. This is typically expected to be a subset of the
	// already-pinned transactions. If a transaction is not currently pinned, and it's can't be found in the cache, a errMissingPinnedEntry error would be generated.
	UpdatePinned(pinnedTxns map[transactions.Txid]transactions.SignedTxn) error
	// Pin function would mark the given transaction group as pinned.
	Pin(txgroup []transactions.SignedTxn) error
}

// verifiedTransactionCache provides an implementation of the VerifiedTransactionCache interface
type verifiedTransactionCache struct {
	// Number of entries in each map (bucket).
	entriesPerBucket int
	// bucketsLock is the lock for synchronizing access to the cache
	bucketsLock deadlock.Mutex
	// buckets is the circular cache buckets buffer
	buckets []map[transactions.Txid]*GroupContext
	// pinned is the pinned transactions entries map.
	pinned map[transactions.Txid]*GroupContext
	// base is the index into the buckets array where the next transaction entry would be written.
	base int
}

// MakeVerifiedTransactionCache creates an instance of verifiedTransactionCache and returns it.
func MakeVerifiedTransactionCache(cacheSize int) VerifiedTransactionCache {
	impl := &verifiedTransactionCache{
		entriesPerBucket: (cacheSize + 1) / 2,
		buckets:          make([]map[transactions.Txid]*GroupContext, 3),
		pinned:           make(map[transactions.Txid]*GroupContext, cacheSize),
		base:             0,
	}
	for i := 0; i < len(impl.buckets); i++ {
		impl.buckets[i] = make(map[transactions.Txid]*GroupContext, impl.entriesPerBucket)
	}
	return impl
}

// Add adds a given transaction group and it's associated group context to the cache. If any of the transactions already appear
// in the cache, the new entry overrides the old one.
func (v *verifiedTransactionCache) Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext) {
	v.bucketsLock.Lock()
	defer v.bucketsLock.Unlock()
	v.add(txgroup, groupCtx)
}

// AddPayset works in a similar way to Add, but is intended for adding an array of transaction groups, along with their corresponding contexts.
func (v *verifiedTransactionCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) {
	v.bucketsLock.Lock()
	defer v.bucketsLock.Unlock()
	for i := range txgroup {
		v.add(txgroup[i], groupCtxs[i])
	}
}

// GetUnverifiedTransactionGroups compares the provided payset against the currently cached transactions and figure which transaction groups aren't fully cached.
func (v *verifiedTransactionCache) GetUnverifiedTransactionGroups(txnGroups [][]transactions.SignedTxn, currSpecAddrs transactions.SpecialAddresses, currProto protocol.ConsensusVersion) (unverifiedGroups [][]transactions.SignedTxn) {
	v.bucketsLock.Lock()
	defer v.bucketsLock.Unlock()
	groupCtx := &GroupContext{
		specAddrs:        currSpecAddrs,
		consensusVersion: currProto,
	}
	unverifiedGroups = make([][]transactions.SignedTxn, 0, len(txnGroups))

	for txnGroupIndex := 0; txnGroupIndex < len(txnGroups); txnGroupIndex++ {
		signedTxnGroup := txnGroups[txnGroupIndex]
		verifiedTxn := 0

		baseBucket := v.base
		for txnIdx := 0; txnIdx < len(signedTxnGroup); txnIdx++ {
			txn := &signedTxnGroup[txnIdx]
			id := txn.Txn.ID()
			// check pinned first
			entryGroup := v.pinned[id]
			// if not found in the pinned map, try to find in the verified buckets:
			if entryGroup == nil {
				// try to look in the previously verified buckets.
				// we use the (base + W) % W trick here so we can go backward and wrap around the zero.
				for offsetBucketIdx := baseBucket + len(v.buckets); offsetBucketIdx > baseBucket; offsetBucketIdx-- {
					bucketIdx := offsetBucketIdx % len(v.buckets)
					if params, has := v.buckets[bucketIdx][id]; has {
						entryGroup = params
						baseBucket = bucketIdx
						break
					}
				}
			}

			if entryGroup == nil {
				break
			}

			if !entryGroup.Equal(groupCtx) {
				break
			}

			if entryGroup.signedGroupTxns[txnIdx].Sig != txn.Sig || (!entryGroup.signedGroupTxns[txnIdx].Msig.Equal(txn.Msig)) || (!entryGroup.signedGroupTxns[txnIdx].Lsig.Equal(&txn.Lsig)) || (entryGroup.signedGroupTxns[txnIdx].AuthAddr != txn.AuthAddr) {
				break
			}
			verifiedTxn++
		}
		if verifiedTxn != len(signedTxnGroup) || verifiedTxn == 0 {
			unverifiedGroups = append(unverifiedGroups, signedTxnGroup)
		}
	}
	return
}

// UpdatePinned replaces the pinned entries with the one provided in the pinnedTxns map. This is typically expected to be a subset of the
// already-pinned transactions. If a transaction is not currently pinned, and it's can't be found in the cache, a errMissingPinnedEntry error would be generated.
func (v *verifiedTransactionCache) UpdatePinned(pinnedTxns map[transactions.Txid]transactions.SignedTxn) (err error) {
	v.bucketsLock.Lock()
	defer v.bucketsLock.Unlock()
	pinned := make(map[transactions.Txid]*GroupContext, len(pinnedTxns))
	for txID := range pinnedTxns {
		if groupEntry, has := v.pinned[txID]; has {
			pinned[txID] = groupEntry
			continue
		}

		// entry isn't in pinned; maybe we have it in one of the buckets ?
		found := false
		// we use the (base + W) % W trick here so we can go backward and wrap around the zero.
		for offsetBucketIdx := v.base + len(v.buckets); offsetBucketIdx > v.base; offsetBucketIdx-- {
			bucketIdx := offsetBucketIdx % len(v.buckets)
			if groupEntry, has := v.buckets[bucketIdx][txID]; has {
				pinned[txID] = groupEntry
				found = true
				break
			}
		}
		if !found {
			err = errMissingPinnedEntry
		}

	}
	v.pinned = pinned
	return err
}

// Pin sets a given transaction group as pinned items, after they have already been verified.
func (v *verifiedTransactionCache) Pin(txgroup []transactions.SignedTxn) (err error) {
	v.bucketsLock.Lock()
	defer v.bucketsLock.Unlock()
	transactionMissing := false
	if len(v.pinned)+len(txgroup) > maxPinnedEntries {
		// reaching this number likely means that we have an issue not removing entries from the pinned map.
		// return an error ( which would get logged )
		return errTooManyPinnedEntries
	}
	baseBucket := v.base
	for _, txn := range txgroup {
		txID := txn.ID()
		if _, has := v.pinned[txID]; has {
			// it's already pinned; keep going.
			continue
		}

		// entry isn't in pinned; maybe we have it in one of the buckets ?
		found := false
		// we use the (base + W) % W trick here so we can go backward and wrap around the zero.
		for offsetBucketIdx := baseBucket + len(v.buckets); offsetBucketIdx > baseBucket; offsetBucketIdx-- {
			bucketIdx := offsetBucketIdx % len(v.buckets)
			if ctx, has := v.buckets[bucketIdx][txID]; has {
				// move it to the pinned items :
				v.pinned[txID] = ctx
				delete(v.buckets[bucketIdx], txID)
				found = true
				baseBucket = bucketIdx
				break
			}
		}
		if !found {
			transactionMissing = true
		}
	}
	if transactionMissing {
		err = errMissingPinnedEntry
	}
	return
}

// add is the internal implementation of Add/AddPayset which adds a transaction group to the buffer.
func (v *verifiedTransactionCache) add(txgroup []transactions.SignedTxn, groupCtx *GroupContext) {
	if len(v.buckets[v.base])+len(txgroup) > v.entriesPerBucket {
		// move to the next bucket while deleting the content of the next bucket.
		v.base = (v.base + 1) % len(v.buckets)
		v.buckets[v.base] = make(map[transactions.Txid]*GroupContext, v.entriesPerBucket)
	}
	currentBucket := v.buckets[v.base]
	for _, txn := range txgroup {
		currentBucket[txn.ID()] = groupCtx
	}
}

var alwaysVerifiedCache = mockedCache{true}
var neverVerifiedCache = mockedCache{false}

type mockedCache struct {
	alwaysVerified bool
}

func (v *mockedCache) Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext) {
	return
}

func (v *mockedCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) {
}

func (v *mockedCache) GetUnverifiedTransactionGroups(txnGroups [][]transactions.SignedTxn, currSpecAddrs transactions.SpecialAddresses, currProto protocol.ConsensusVersion) (unverifiedGroups [][]transactions.SignedTxn) {
	if v.alwaysVerified {
		return nil
	}
	return txnGroups
}

func (v *mockedCache) UpdatePinned(pinnedTxns map[transactions.Txid]transactions.SignedTxn) (err error) {
	return nil
}

func (v *mockedCache) Pin(txgroup []transactions.SignedTxn) (err error) {
	return nil
}

// GetMockedCache returns a mocked transaction cache implementation
func GetMockedCache(alwaysVerified bool) VerifiedTransactionCache {
	if alwaysVerified {
		return &alwaysVerifiedCache
	}
	return &neverVerifiedCache
}