summaryrefslogtreecommitdiff
path: root/rpcs/txSyncer.go
blob: c724417ea36d9bd69452a25355a8e7572fbe814d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright (C) 2019-2023 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand.  If not, see <https://www.gnu.org/licenses/>.

package rpcs

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/algorand/go-algorand/data"
	"github.com/algorand/go-algorand/data/transactions"
	"github.com/algorand/go-algorand/logging"
	"github.com/algorand/go-algorand/network"
	"github.com/algorand/go-algorand/util/bloom"
)

// PendingTxAggregate is a container of pending transactions
type PendingTxAggregate interface {
	PendingTxIDs() []transactions.Txid
	PendingTxGroups() [][]transactions.SignedTxn
}

// TxSyncClient abstracts sync-ing pending transactions from a peer.
type TxSyncClient interface {
	Sync(ctx context.Context, bloom *bloom.Filter) (txns [][]transactions.SignedTxn, err error)
	Address() string
	Close() error
}

// TxSyncer fetches pending transactions that are missing from its pool, and feeds them to the handler
type TxSyncer struct {
	pool         PendingTxAggregate
	clientSource network.GossipNode
	handler      data.SolicitedTxHandler
	ctx          context.Context
	cancel       context.CancelFunc
	syncInterval time.Duration
	syncTimeout  time.Duration
	counter      uint32
	wg           sync.WaitGroup
	log          logging.Logger
	httpSync     *HTTPTxSync
}

// MakeTxSyncer returns a TxSyncer
func MakeTxSyncer(pool PendingTxAggregate, clientSource network.GossipNode, txHandler data.SolicitedTxHandler, syncInterval time.Duration, syncTimeout time.Duration, serverResponseSize int) *TxSyncer {
	return &TxSyncer{
		pool:         pool,
		clientSource: clientSource,
		handler:      txHandler,
		syncInterval: syncInterval,
		syncTimeout:  syncTimeout,
		log:          logging.Base(),
		httpSync:     makeHTTPSync(clientSource, logging.Base(), uint64(serverResponseSize)),
	}
}

// Start begins periodically syncing after the canStart chanel indicates it can begin
func (syncer *TxSyncer) Start(canStart chan struct{}) {
	syncer.wg.Add(1)
	syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
	go func() {
		defer syncer.wg.Done()
		select {
		case <-syncer.ctx.Done():
			return
		case <-canStart:
		}
		for {
			select {
			case <-syncer.ctx.Done():
				return
			case <-time.After(syncer.syncInterval):
				err := syncer.sync()
				if err != nil {
					syncer.log.Warnf("problem syncing transactions %v", err)
				}
			}
		}
	}()
}

// Stop stops periodic syncing
func (syncer *TxSyncer) Stop() {
	syncer.cancel()
	syncer.wg.Wait()
}

func (syncer *TxSyncer) sync() error {
	return syncer.syncFromClient(syncer.httpSync)
}

const bloomFilterFalsePositiveRate = 0.01

func (syncer *TxSyncer) syncFromClient(client TxSyncClient) error {
	syncer.log.Infof("TxSyncer.Sync: asking client %v for missing transactions", client.Address())

	pending := syncer.pool.PendingTxIDs()
	sizeBits, numHashes := bloom.Optimal(len(pending), bloomFilterFalsePositiveRate)
	filter := bloom.New(sizeBits, numHashes, syncer.counter)
	syncer.counter++
	for _, txid := range pending {
		filter.Set(txid[:])
	}

	ctx, cf := context.WithTimeout(syncer.ctx, syncer.syncTimeout)
	defer cf()
	txgroups, err := client.Sync(ctx, filter)
	if err != nil {
		return fmt.Errorf("TxSyncer.Sync: peer '%v' error '%v'", client.Address(), err)
	}

	var pendingTxidMap map[transactions.Txid]struct{}
	// test to see if all the transaction that we've received honor the bloom filter constraints
	// that we've requested.
	for _, txgroup := range txgroups {
		var txnsInFilter int
		for i := range txgroup {
			txID := txgroup[i].ID()
			if filter.Test(txID[:]) {
				// having the transaction id tested here might still fall into the false-positive class, so we
				// need to perform explicit check. This is not too bad since we're doing this check only on the fail
				// cases.
				if pendingTxidMap == nil {
					// construct and initialize it.
					pendingTxidMap = make(map[transactions.Txid]struct{}, len(pending))
					for _, txid := range pending {
						pendingTxidMap[txid] = struct{}{}
					}
				}
				if _, has := pendingTxidMap[txID]; has {
					// we just found a transaction that shouldn't have been
					// included in the response.  maybe this is a false positive
					// and other transactions in the group aren't included in the
					// bloom filter, though.
					txnsInFilter++
				}
			}
		}

		// if the entire group was in the bloom filter, report an error.
		if txnsInFilter == len(txgroup) {
			client.Close()
			return fmt.Errorf("TxSyncer.Sync: peer %v sent a transaction group that was entirely included in the bloom filter", client.Address())
		}

		// send the transaction to the trasaction pool
		if syncer.handler.Handle(txgroup) != nil {
			client.Close()
			return fmt.Errorf("TxSyncer.Sync: peer %v sent invalid transaction", client.Address())
		}
	}

	return nil
}