summaryrefslogtreecommitdiff
path: root/data/txHandler.go
diff options
context:
space:
mode:
Diffstat (limited to 'data/txHandler.go')
-rw-r--r--data/txHandler.go254
1 files changed, 20 insertions, 234 deletions
diff --git a/data/txHandler.go b/data/txHandler.go
index 0ecef433c..fa8bf250b 100644
--- a/data/txHandler.go
+++ b/data/txHandler.go
@@ -25,11 +25,9 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/bookkeeping"
- "github.com/algorand/go-algorand/data/pooldata"
"github.com/algorand/go-algorand/data/pools"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/verify"
- "github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
@@ -177,7 +175,7 @@ func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) {
verifiedTxGroup := wi.unverifiedTxGroup
// save the transaction, if it has high enough fee and not already in the cache
- err := handler.txPool.Remember(pooldata.SignedTxGroup{Transactions: verifiedTxGroup})
+ err := handler.txPool.Remember(verifiedTxGroup)
if err != nil {
logging.Base().Debugf("could not remember tx: %v", err)
return
@@ -265,6 +263,12 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
// Note that this also checks the consistency of the transaction's group hash,
// which is required for safe transaction signature caching behavior.
func (handler *TxHandler) checkAlreadyCommitted(tx *txBacklogMsg) (processingDone bool) {
+ txids := make([]transactions.Txid, len(tx.unverifiedTxGroup))
+ for i := range tx.unverifiedTxGroup {
+ txids[i] = tx.unverifiedTxGroup[i].ID()
+ }
+ logging.Base().Debugf("got a tx group with IDs %v", txids)
+
// do a quick test to check that this transaction could potentially be committed, to reject dup pending transactions
err := handler.txPool.Test(tx.unverifiedTxGroup)
if err != nil {
@@ -274,12 +278,12 @@ func (handler *TxHandler) checkAlreadyCommitted(tx *txBacklogMsg) (processingDon
return false
}
-func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.SignedTxn) (disconnect bool) {
+func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.SignedTxn) (outmsg network.OutgoingMessage, processingDone bool) {
tx := &txBacklogMsg{
unverifiedTxGroup: unverifiedTxGroup,
}
if handler.checkAlreadyCommitted(tx) {
- return false
+ return network.OutgoingMessage{}, true
}
// build the transaction verification context
@@ -287,7 +291,7 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed
latestHdr, err := handler.ledger.BlockHdr(latest)
if err != nil {
logging.Base().Warnf("Could not get header for previous block %v: %v", latest, err)
- return false
+ return network.OutgoingMessage{}, true
}
unverifiedTxnGroups := bookkeeping.SignedTxnsToGroups(unverifiedTxGroup)
@@ -295,7 +299,7 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed
if err != nil {
// transaction is invalid
logging.Base().Warnf("One or more transactions were malformed: %v", err)
- return true
+ return network.OutgoingMessage{Action: network.Disconnect}, true
}
// at this point, we've verified the transaction group,
@@ -303,10 +307,10 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed
verifiedTxGroup := unverifiedTxGroup
// save the transaction, if it has high enough fee and not already in the cache
- err = handler.txPool.Remember(pooldata.SignedTxGroup{Transactions: verifiedTxGroup})
+ err = handler.txPool.Remember(verifiedTxGroup)
if err != nil {
logging.Base().Debugf("could not remember tx: %v", err)
- return false
+ return network.OutgoingMessage{}, true
}
// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
@@ -315,98 +319,7 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed
logging.Base().Warnf("unable to pin transaction: %v", err)
}
- return false
-}
-
-// filterAlreadyCommitted scan the list of signed transaction groups, and filter out the ones that have already been included,
-// or that should not be added to the transaction pool.
-// the resulting slice is using the *same* underlying array as the input slice, and the caller must ensure that this would not
-// cause issue on the caller side. The hasError describe whether any of the removed transacation groups was
-// removed for a reason *other* than being duplicate ( for instance, malformed transaction )
-func (handler *TxHandler) filterAlreadyCommitted(unverifiedTxGroups []pooldata.SignedTxGroup) (filteredGroups []pooldata.SignedTxGroup, hasError bool) {
- remainedTxnsGroupOffset := 0
- for idx, utxng := range unverifiedTxGroups {
- err := handler.txPool.Test(utxng.Transactions)
- switch err.(type) {
- case nil:
- // no error was generated.
- if remainedTxnsGroupOffset != idx {
- unverifiedTxGroups[remainedTxnsGroupOffset] = utxng
- }
- remainedTxnsGroupOffset++
- case *ledgercore.TransactionInLedgerError:
- // this is a duplicate transaction group.
- default:
- // some non-duplicate error was reported on this group.
- hasError = true
- }
- }
- return unverifiedTxGroups[:remainedTxnsGroupOffset], hasError
-}
-
-// processDecodedArray receives a slice of transaction groups and attempt to add them to the transaction pool.
-// The processDecodedArray returns whether the node should be disconnecting from the source of these transactions ( in case a malicious transaction is found )
-// as well as whether all the provided transactions were included in the transaction pool or committed.
-func (handler *TxHandler) processDecodedArray(unverifiedTxGroups []pooldata.SignedTxGroup) (disconnect, allTransactionIncluded bool) {
- var hasError bool
- unverifiedTxGroups, hasError = handler.filterAlreadyCommitted(unverifiedTxGroups)
-
- if len(unverifiedTxGroups) == 0 {
- return false, !hasError
- }
-
- // build the transaction verification context
- latest := handler.ledger.Latest()
- latestHdr, err := handler.ledger.BlockHdr(latest)
- if err != nil {
- // being unable to retrieve the last's block header is not something a working node is expected to expirience ( ever ).
- logging.Base().Errorf("Could not get header for previous block %d: %v", latest, err)
- // returning a disconnect=true, would not fix the problem for the local node, but would force the remote node to pick a different
- // relay, which ( hopefully ! ) would not have the same issue as this one.
- return true, false
- }
-
- unverifiedTxnGroups := make([][]transactions.SignedTxn, len(unverifiedTxGroups))
- for i, unverifiedGroup := range unverifiedTxGroups {
- unverifiedTxnGroups[i] = unverifiedGroup.Transactions
- }
-
- err = verify.PaysetGroups(context.Background(), unverifiedTxnGroups, latestHdr, handler.txVerificationPool, handler.ledger.VerifiedTransactionCache())
- if err != nil {
- // transaction is invalid
- logging.Base().Warnf("One or more transactions were malformed: %v", err)
- return true, false
- }
-
- // at this point, we've verified the transaction group,
- // so we can safely treat the transaction as a verified transaction.
- verifiedTxGroups := unverifiedTxGroups
-
- // before calling RememberArray we should reallocate the individual remaining
- // signed transactions - these transactions were allocated in bulk by the
- // transaction sync. By re-allocating the backing storage, we would allow the
- // original backing storage ( which includes transactions that won't go into the
- // transaction pool ) to be garbge collected.
- for i, group := range verifiedTxGroups {
- copiedTransactions := make(pooldata.SignedTxnSlice, len(group.Transactions))
- copy(copiedTransactions, group.Transactions)
- verifiedTxGroups[i].Transactions = copiedTransactions
- }
-
- // save the transaction, if it has high enough fee and not already in the cache
- err = handler.txPool.RememberArray(verifiedTxGroups)
- if err != nil {
- logging.Base().Debugf("could not remember tx: %v", err)
- return false, false
- }
-
- // if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
- err = handler.ledger.VerifiedTransactionCache().PinGroups(verifiedTxGroups)
- if err != nil {
- logging.Base().Warnf("unable to pin transaction: %v", err)
- }
-
- return false, !hasError
+ return network.OutgoingMessage{}, false
}
// SolicitedTxHandler handles messages received through channels other than the gossip network.
@@ -415,146 +328,19 @@ type SolicitedTxHandler interface {
Handle(txgroup []transactions.SignedTxn) error
}
-type solicitedTxHandler struct {
- txHandler *TxHandler
-}
-
// SolicitedTxHandler converts a transaction handler to a SolicitedTxHandler
func (handler *TxHandler) SolicitedTxHandler() SolicitedTxHandler {
return &solicitedTxHandler{txHandler: handler}
}
+type solicitedTxHandler struct {
+ txHandler *TxHandler
+}
+
func (handler *solicitedTxHandler) Handle(txgroup []transactions.SignedTxn) error {
- disconnect := handler.txHandler.processDecoded(txgroup)
- if disconnect {
+ outmsg, _ := handler.txHandler.processDecoded(txgroup)
+ if outmsg.Action == network.Disconnect {
return fmt.Errorf("invalid transaction")
}
return nil
}
-
-// SolicitedAsyncTxHandler handles slices of transaction groups received from the transaction sync.
-// It provides a non-blocking queueing for the processing of these transaction groups, which allows
-// the single-threaded transaction sync to keep processing other messages.
-type SolicitedAsyncTxHandler interface {
- // HandleTransactionGroups enqueues the given slice of transaction groups that came from the given network peer with
- // the given message sequence number. The provided acknowledgement channel provides a feedback for the transaction sync
- // that the entire transaction group slice was added ( or already included ) within the transaction pool. The method
- // return true if it's able to enqueue the processing task, or false if it's unable to enqueue the processing task.
- HandleTransactionGroups(networkPeer interface{}, ackCh chan uint64, messageSeq uint64, groups []pooldata.SignedTxGroup) bool
- Start()
- Stop()
-}
-
-type solicitedAsyncTxHandler struct {
- txHandler *TxHandler
- backlogGroups chan *txGroups
- stopped sync.WaitGroup
- stopCtxFunc context.CancelFunc
- // skipNextBacklogWarning is used to avoid repeated backlog full warning messages.
- skipNextBacklogWarning bool
-}
-
-type txGroups struct {
- // the network package opaque network peer
- networkPeer interface{}
- // the feedback channel, in case we've successfully added the transaction groups to the transaction pool.
- ackCh chan uint64
- // the message sequence number, which would be written back to the feedback channel
- messageSeq uint64
- // the transactions groups slice
- txGroups []pooldata.SignedTxGroup
-}
-
-// SolicitedAsyncTxHandler converts a transaction handler to a SolicitedTxHandler
-func (handler *TxHandler) SolicitedAsyncTxHandler() SolicitedAsyncTxHandler {
- return &solicitedAsyncTxHandler{
- txHandler: handler,
- backlogGroups: make(chan *txGroups, txBacklogSize),
- skipNextBacklogWarning: false,
- }
-}
-
-// HandleTransactionGroups implements the solicitedAsyncTxHandler.HandleTransactionGroups interface.
-// It enqueues the given slice of transaction groups that came from the given network peer with
-// the given message sequence number. The provided acknowledgement channel provides a feedback for the transaction sync
-// that the entire transaction group slice was added ( or already included ) within the transaction pool. The method
-// return true if it's able to enqueue the processing task, or false if it's unable to enqueue the processing task.
-func (handler *solicitedAsyncTxHandler) HandleTransactionGroups(networkPeer interface{}, ackCh chan uint64, messageSeq uint64, groups []pooldata.SignedTxGroup) (enqueued bool) {
- select {
- case handler.backlogGroups <- &txGroups{networkPeer: networkPeer, txGroups: groups, ackCh: ackCh, messageSeq: messageSeq}:
- // reset the skipNextBacklogWarning once the number of pending items on the backlogGroups channels goes to
- // less than half of it's capacity.
- if handler.skipNextBacklogWarning && (len(handler.backlogGroups)*2 < cap(handler.backlogGroups)) {
- handler.skipNextBacklogWarning = false
- }
- enqueued = true
- default:
- if !handler.skipNextBacklogWarning {
- logging.Base().Warnf("solicitedAsyncTxHandler exhusted groups backlog")
- handler.skipNextBacklogWarning = true
- }
- // if we failed here we want to increase the corresponding metric. It might suggest that we
- // want to increase the queue size.
- transactionMessagesDroppedFromBacklog.Inc(nil)
- }
- return
-}
-
-func (handler *solicitedAsyncTxHandler) Start() {
- if handler.stopCtxFunc == nil {
- handler.txHandler.Start()
- var ctx context.Context
- ctx, handler.stopCtxFunc = context.WithCancel(context.Background())
- handler.stopped.Add(1)
- go handler.loop(ctx)
- }
-}
-
-func (handler *solicitedAsyncTxHandler) Stop() {
- if handler.stopCtxFunc != nil {
- handler.stopCtxFunc()
- handler.stopped.Wait()
- handler.stopCtxFunc = nil
- handler.txHandler.Stop()
- }
-}
-
-func (handler *solicitedAsyncTxHandler) loop(ctx context.Context) {
- defer handler.stopped.Done()
- var groups *txGroups
- for {
- select {
- case <-ctx.Done():
- return
- case groups = <-handler.backlogGroups:
- }
- disconnect, allTransactionsIncluded := handler.txHandler.processDecodedArray(groups.txGroups)
- if disconnect {
- handler.txHandler.net.Disconnect(groups.networkPeer)
- handler.txHandler.net.RequestConnectOutgoing(false, make(chan struct{}))
- transactionMessagesDroppedFromPool.Inc(nil)
- } else if allTransactionsIncluded {
- for _, txnGroup := range groups.txGroups {
- // We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings
- err := handler.txHandler.net.Relay(ctx, protocol.TxnTag, reencode(txnGroup.Transactions), false, groups.networkPeer)
- if err != nil {
- logging.Base().Infof("solicitedAsyncTxHandler was unable to relay transaction message : %v", err)
- break
- }
- }
- select {
- case groups.ackCh <- groups.messageSeq:
- // all good, write was successful.
- default:
- // unable to write since channel was full - log this:
- logging.Base().Warnf("solicitedAsyncTxHandler was unable to ack transaction groups inclusion since the acknowledgement channel was full")
- }
- // we've processed this message, so increase the counter.
- transactionMessagesHandled.Inc(nil)
- } else {
- transactionMessagesDroppedFromPool.Inc(nil)
- }
- // clear out the groups; that would allow the GC to collect the group's memory allocations while we wait for the next task.
- *groups = txGroups{}
- }
-}