diff options
Diffstat (limited to 'data/txHandler.go')
-rw-r--r-- | data/txHandler.go | 254 |
1 files changed, 20 insertions, 234 deletions
diff --git a/data/txHandler.go b/data/txHandler.go index 0ecef433c..fa8bf250b 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -25,11 +25,9 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/data/pooldata" "github.com/algorand/go-algorand/data/pools" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/data/transactions/verify" - "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" @@ -177,7 +175,7 @@ func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) { verifiedTxGroup := wi.unverifiedTxGroup // save the transaction, if it has high enough fee and not already in the cache - err := handler.txPool.Remember(pooldata.SignedTxGroup{Transactions: verifiedTxGroup}) + err := handler.txPool.Remember(verifiedTxGroup) if err != nil { logging.Base().Debugf("could not remember tx: %v", err) return @@ -265,6 +263,12 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net // Note that this also checks the consistency of the transaction's group hash, // which is required for safe transaction signature caching behavior. func (handler *TxHandler) checkAlreadyCommitted(tx *txBacklogMsg) (processingDone bool) { + txids := make([]transactions.Txid, len(tx.unverifiedTxGroup)) + for i := range tx.unverifiedTxGroup { + txids[i] = tx.unverifiedTxGroup[i].ID() + } + logging.Base().Debugf("got a tx group with IDs %v", txids) + // do a quick test to check that this transaction could potentially be committed, to reject dup pending transactions err := handler.txPool.Test(tx.unverifiedTxGroup) if err != nil { @@ -274,12 +278,12 @@ func (handler *TxHandler) checkAlreadyCommitted(tx *txBacklogMsg) (processingDon return false } -func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.SignedTxn) (disconnect bool) { +func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.SignedTxn) (outmsg network.OutgoingMessage, processingDone bool) { tx := &txBacklogMsg{ unverifiedTxGroup: unverifiedTxGroup, } if handler.checkAlreadyCommitted(tx) { - return false + return network.OutgoingMessage{}, true } // build the transaction verification context @@ -287,7 +291,7 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed latestHdr, err := handler.ledger.BlockHdr(latest) if err != nil { logging.Base().Warnf("Could not get header for previous block %v: %v", latest, err) - return false + return network.OutgoingMessage{}, true } unverifiedTxnGroups := bookkeeping.SignedTxnsToGroups(unverifiedTxGroup) @@ -295,7 +299,7 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed if err != nil { // transaction is invalid logging.Base().Warnf("One or more transactions were malformed: %v", err) - return true + return network.OutgoingMessage{Action: network.Disconnect}, true } // at this point, we've verified the transaction group, @@ -303,10 +307,10 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed verifiedTxGroup := unverifiedTxGroup // save the transaction, if it has high enough fee and not already in the cache - err = handler.txPool.Remember(pooldata.SignedTxGroup{Transactions: verifiedTxGroup}) + err = handler.txPool.Remember(verifiedTxGroup) if err != nil { logging.Base().Debugf("could not remember tx: %v", err) - return false + return network.OutgoingMessage{}, true } // if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. @@ -315,98 +319,7 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed logging.Base().Warnf("unable to pin transaction: %v", err) } - return false -} - -// filterAlreadyCommitted scan the list of signed transaction groups, and filter out the ones that have already been included, -// or that should not be added to the transaction pool. -// the resulting slice is using the *same* underlying array as the input slice, and the caller must ensure that this would not -// cause issue on the caller side. The hasError describe whether any of the removed transacation groups was -// removed for a reason *other* than being duplicate ( for instance, malformed transaction ) -func (handler *TxHandler) filterAlreadyCommitted(unverifiedTxGroups []pooldata.SignedTxGroup) (filteredGroups []pooldata.SignedTxGroup, hasError bool) { - remainedTxnsGroupOffset := 0 - for idx, utxng := range unverifiedTxGroups { - err := handler.txPool.Test(utxng.Transactions) - switch err.(type) { - case nil: - // no error was generated. - if remainedTxnsGroupOffset != idx { - unverifiedTxGroups[remainedTxnsGroupOffset] = utxng - } - remainedTxnsGroupOffset++ - case *ledgercore.TransactionInLedgerError: - // this is a duplicate transaction group. - default: - // some non-duplicate error was reported on this group. - hasError = true - } - } - return unverifiedTxGroups[:remainedTxnsGroupOffset], hasError -} - -// processDecodedArray receives a slice of transaction groups and attempt to add them to the transaction pool. -// The processDecodedArray returns whether the node should be disconnecting from the source of these transactions ( in case a malicious transaction is found ) -// as well as whether all the provided transactions were included in the transaction pool or committed. -func (handler *TxHandler) processDecodedArray(unverifiedTxGroups []pooldata.SignedTxGroup) (disconnect, allTransactionIncluded bool) { - var hasError bool - unverifiedTxGroups, hasError = handler.filterAlreadyCommitted(unverifiedTxGroups) - - if len(unverifiedTxGroups) == 0 { - return false, !hasError - } - - // build the transaction verification context - latest := handler.ledger.Latest() - latestHdr, err := handler.ledger.BlockHdr(latest) - if err != nil { - // being unable to retrieve the last's block header is not something a working node is expected to expirience ( ever ). - logging.Base().Errorf("Could not get header for previous block %d: %v", latest, err) - // returning a disconnect=true, would not fix the problem for the local node, but would force the remote node to pick a different - // relay, which ( hopefully ! ) would not have the same issue as this one. - return true, false - } - - unverifiedTxnGroups := make([][]transactions.SignedTxn, len(unverifiedTxGroups)) - for i, unverifiedGroup := range unverifiedTxGroups { - unverifiedTxnGroups[i] = unverifiedGroup.Transactions - } - - err = verify.PaysetGroups(context.Background(), unverifiedTxnGroups, latestHdr, handler.txVerificationPool, handler.ledger.VerifiedTransactionCache()) - if err != nil { - // transaction is invalid - logging.Base().Warnf("One or more transactions were malformed: %v", err) - return true, false - } - - // at this point, we've verified the transaction group, - // so we can safely treat the transaction as a verified transaction. - verifiedTxGroups := unverifiedTxGroups - - // before calling RememberArray we should reallocate the individual remaining - // signed transactions - these transactions were allocated in bulk by the - // transaction sync. By re-allocating the backing storage, we would allow the - // original backing storage ( which includes transactions that won't go into the - // transaction pool ) to be garbge collected. - for i, group := range verifiedTxGroups { - copiedTransactions := make(pooldata.SignedTxnSlice, len(group.Transactions)) - copy(copiedTransactions, group.Transactions) - verifiedTxGroups[i].Transactions = copiedTransactions - } - - // save the transaction, if it has high enough fee and not already in the cache - err = handler.txPool.RememberArray(verifiedTxGroups) - if err != nil { - logging.Base().Debugf("could not remember tx: %v", err) - return false, false - } - - // if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. - err = handler.ledger.VerifiedTransactionCache().PinGroups(verifiedTxGroups) - if err != nil { - logging.Base().Warnf("unable to pin transaction: %v", err) - } - - return false, !hasError + return network.OutgoingMessage{}, false } // SolicitedTxHandler handles messages received through channels other than the gossip network. @@ -415,146 +328,19 @@ type SolicitedTxHandler interface { Handle(txgroup []transactions.SignedTxn) error } -type solicitedTxHandler struct { - txHandler *TxHandler -} - // SolicitedTxHandler converts a transaction handler to a SolicitedTxHandler func (handler *TxHandler) SolicitedTxHandler() SolicitedTxHandler { return &solicitedTxHandler{txHandler: handler} } +type solicitedTxHandler struct { + txHandler *TxHandler +} + func (handler *solicitedTxHandler) Handle(txgroup []transactions.SignedTxn) error { - disconnect := handler.txHandler.processDecoded(txgroup) - if disconnect { + outmsg, _ := handler.txHandler.processDecoded(txgroup) + if outmsg.Action == network.Disconnect { return fmt.Errorf("invalid transaction") } return nil } - -// SolicitedAsyncTxHandler handles slices of transaction groups received from the transaction sync. -// It provides a non-blocking queueing for the processing of these transaction groups, which allows -// the single-threaded transaction sync to keep processing other messages. -type SolicitedAsyncTxHandler interface { - // HandleTransactionGroups enqueues the given slice of transaction groups that came from the given network peer with - // the given message sequence number. The provided acknowledgement channel provides a feedback for the transaction sync - // that the entire transaction group slice was added ( or already included ) within the transaction pool. The method - // return true if it's able to enqueue the processing task, or false if it's unable to enqueue the processing task. - HandleTransactionGroups(networkPeer interface{}, ackCh chan uint64, messageSeq uint64, groups []pooldata.SignedTxGroup) bool - Start() - Stop() -} - -type solicitedAsyncTxHandler struct { - txHandler *TxHandler - backlogGroups chan *txGroups - stopped sync.WaitGroup - stopCtxFunc context.CancelFunc - // skipNextBacklogWarning is used to avoid repeated backlog full warning messages. - skipNextBacklogWarning bool -} - -type txGroups struct { - // the network package opaque network peer - networkPeer interface{} - // the feedback channel, in case we've successfully added the transaction groups to the transaction pool. - ackCh chan uint64 - // the message sequence number, which would be written back to the feedback channel - messageSeq uint64 - // the transactions groups slice - txGroups []pooldata.SignedTxGroup -} - -// SolicitedAsyncTxHandler converts a transaction handler to a SolicitedTxHandler -func (handler *TxHandler) SolicitedAsyncTxHandler() SolicitedAsyncTxHandler { - return &solicitedAsyncTxHandler{ - txHandler: handler, - backlogGroups: make(chan *txGroups, txBacklogSize), - skipNextBacklogWarning: false, - } -} - -// HandleTransactionGroups implements the solicitedAsyncTxHandler.HandleTransactionGroups interface. -// It enqueues the given slice of transaction groups that came from the given network peer with -// the given message sequence number. The provided acknowledgement channel provides a feedback for the transaction sync -// that the entire transaction group slice was added ( or already included ) within the transaction pool. The method -// return true if it's able to enqueue the processing task, or false if it's unable to enqueue the processing task. -func (handler *solicitedAsyncTxHandler) HandleTransactionGroups(networkPeer interface{}, ackCh chan uint64, messageSeq uint64, groups []pooldata.SignedTxGroup) (enqueued bool) { - select { - case handler.backlogGroups <- &txGroups{networkPeer: networkPeer, txGroups: groups, ackCh: ackCh, messageSeq: messageSeq}: - // reset the skipNextBacklogWarning once the number of pending items on the backlogGroups channels goes to - // less than half of it's capacity. - if handler.skipNextBacklogWarning && (len(handler.backlogGroups)*2 < cap(handler.backlogGroups)) { - handler.skipNextBacklogWarning = false - } - enqueued = true - default: - if !handler.skipNextBacklogWarning { - logging.Base().Warnf("solicitedAsyncTxHandler exhusted groups backlog") - handler.skipNextBacklogWarning = true - } - // if we failed here we want to increase the corresponding metric. It might suggest that we - // want to increase the queue size. - transactionMessagesDroppedFromBacklog.Inc(nil) - } - return -} - -func (handler *solicitedAsyncTxHandler) Start() { - if handler.stopCtxFunc == nil { - handler.txHandler.Start() - var ctx context.Context - ctx, handler.stopCtxFunc = context.WithCancel(context.Background()) - handler.stopped.Add(1) - go handler.loop(ctx) - } -} - -func (handler *solicitedAsyncTxHandler) Stop() { - if handler.stopCtxFunc != nil { - handler.stopCtxFunc() - handler.stopped.Wait() - handler.stopCtxFunc = nil - handler.txHandler.Stop() - } -} - -func (handler *solicitedAsyncTxHandler) loop(ctx context.Context) { - defer handler.stopped.Done() - var groups *txGroups - for { - select { - case <-ctx.Done(): - return - case groups = <-handler.backlogGroups: - } - disconnect, allTransactionsIncluded := handler.txHandler.processDecodedArray(groups.txGroups) - if disconnect { - handler.txHandler.net.Disconnect(groups.networkPeer) - handler.txHandler.net.RequestConnectOutgoing(false, make(chan struct{})) - transactionMessagesDroppedFromPool.Inc(nil) - } else if allTransactionsIncluded { - for _, txnGroup := range groups.txGroups { - // We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings - err := handler.txHandler.net.Relay(ctx, protocol.TxnTag, reencode(txnGroup.Transactions), false, groups.networkPeer) - if err != nil { - logging.Base().Infof("solicitedAsyncTxHandler was unable to relay transaction message : %v", err) - break - } - } - select { - case groups.ackCh <- groups.messageSeq: - // all good, write was successful. - default: - // unable to write since channel was full - log this: - logging.Base().Warnf("solicitedAsyncTxHandler was unable to ack transaction groups inclusion since the acknowledgement channel was full") - } - // we've processed this message, so increase the counter. - transactionMessagesHandled.Inc(nil) - } else { - transactionMessagesDroppedFromPool.Inc(nil) - } - // clear out the groups; that would allow the GC to collect the group's memory allocations while we wait for the next task. - *groups = txGroups{} - } -} |