diff options
author | Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> | 2024-01-17 13:21:16 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-17 13:21:16 -0500 |
commit | 5e5c16a7b4e6c40c9e57dcce65b51118755592ba (patch) | |
tree | 7e31dabd6c2abd998e79c69d810cf3f066a8d20b | |
parent | 2086425a2f9e0140c9a86631471be49d5df52dad (diff) |
api: optimize /transactions/pending/{txid} endpoint (#5891)
-rw-r--r-- | ledger/ledger.go | 9 | ||||
-rw-r--r-- | ledger/txtail.go | 50 | ||||
-rw-r--r-- | ledger/txtail_test.go | 80 | ||||
-rw-r--r-- | node/node.go | 22 |
4 files changed, 146 insertions, 15 deletions
diff --git a/ledger/ledger.go b/ledger/ledger.go index 9313f55e6..d5436ad5d 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -657,6 +657,15 @@ func (l *Ledger) CheckDup(currentProto config.ConsensusParams, current basics.Ro return l.txTail.checkDup(currentProto, current, firstValid, lastValid, txid, txl) } +// CheckConfirmedTail checks if a transaction txid happens to have LastValid greater than the current round at the time of calling and has been already committed to the ledger. +// If both conditions are met it returns true. +// This function could be used as filter to check if a transaction is committed to the ledger, and no extra checks needed if it says true. +// +// Note, this cannot be used to check if transaction happened or not in past MaxTxnLife rounds. +func (l *Ledger) CheckConfirmedTail(txid transactions.Txid) (basics.Round, bool) { + return l.txTail.checkConfirmed(txid) +} + // Latest returns the latest known block round added to the ledger. func (l *Ledger) Latest() basics.Round { return l.blockQ.latest() diff --git a/ledger/txtail.go b/ledger/txtail.go index e1e85fee6..129fbb398 100644 --- a/ledger/txtail.go +++ b/ledger/txtail.go @@ -80,7 +80,9 @@ type txTail struct { // lastValid, recent, lowWaterMark, roundTailHashes, roundTailSerializedDeltas and blockHeaderData. tailMu deadlock.RWMutex - lastValid map[basics.Round]map[transactions.Txid]struct{} // map tx.LastValid -> tx confirmed set + // lastValid allows looking up all of the transactions that expire in a given round. + // The map for an expiration round gives the round the transaction was originally confirmed, so it can be found for the /pending endpoint. + lastValid map[basics.Round]map[transactions.Txid]uint16 // map tx.LastValid -> tx confirmed map: txid -> (last valid - confirmed) delta // duplicate detection queries with LastValid before // lowWaterMark are not guaranteed to succeed @@ -115,14 +117,18 @@ func (t *txTail) loadFromDisk(l ledgerForTracker, dbRound basics.Round) error { } t.lowWaterMark = l.Latest() - t.lastValid = make(map[basics.Round]map[transactions.Txid]struct{}) + t.lastValid = make(map[basics.Round]map[transactions.Txid]uint16) t.recent = make(map[basics.Round]roundLeases) // the lastValid is a temporary map used during the execution of // loadFromDisk, allowing us to construct the lastValid maps in their // optimal size. This would ensure that upon startup, we don't preallocate // more memory than we truly need. - lastValid := make(map[basics.Round][]transactions.Txid) + type lastValidEntry struct { + rnd basics.Round + txid transactions.Txid + } + lastValid := make(map[basics.Round][]lastValidEntry) // the roundTailHashes and blockHeaderData need a single element to start with // in order to allow lookups on zero offsets when they are empty (new database) @@ -153,16 +159,16 @@ func (t *txTail) loadFromDisk(l ledgerForTracker, dbRound basics.Round) error { list := lastValid[txTailRound.LastValid[i]] // if the list reached capacity, resize. if len(list) == cap(list) { - var newList []transactions.Txid + var newList []lastValidEntry if cap(list) == 0 { - newList = make([]transactions.Txid, 0, initialLastValidArrayLen) + newList = make([]lastValidEntry, 0, initialLastValidArrayLen) } else { - newList = make([]transactions.Txid, len(list), len(list)*2) + newList = make([]lastValidEntry, len(list), len(list)*2) } copy(newList[:], list[:]) list = newList } - list = append(list, txTailRound.TxnIDs[i]) + list = append(list, lastValidEntry{txTailRound.Hdr.Round, txTailRound.TxnIDs[i]}) lastValid[txTailRound.LastValid[i]] = list } } @@ -173,11 +179,15 @@ func (t *txTail) loadFromDisk(l ledgerForTracker, dbRound basics.Round) error { // add all the entries in roundsLastValids to their corresponding map entry in t.lastValid for lastValid, list := range lastValid { - lastValueMap := make(map[transactions.Txid]struct{}, len(list)) - for _, id := range list { - lastValueMap[id] = struct{}{} + lastValidMap := make(map[transactions.Txid]uint16, len(list)) + for _, entry := range list { + if lastValid < entry.rnd { + return fmt.Errorf("txTail: invalid lastValid %d / rnd %d for txid %s", lastValid, entry.rnd, entry.txid) + } + deltaR := uint16(lastValid - entry.rnd) + lastValidMap[entry.txid] = deltaR } - t.lastValid[lastValid] = lastValueMap + t.lastValid[lastValid] = lastValidMap } if enableTxTailHashes { @@ -210,9 +220,10 @@ func (t *txTail) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { for txid, txnInc := range delta.Txids { if _, ok := t.lastValid[txnInc.LastValid]; !ok { - t.lastValid[txnInc.LastValid] = make(map[transactions.Txid]struct{}) + t.lastValid[txnInc.LastValid] = make(map[transactions.Txid]uint16) } - t.lastValid[txnInc.LastValid][txid] = struct{}{} + deltaR := uint16(txnInc.LastValid - blk.BlockHeader.Round) + t.lastValid[txnInc.LastValid][txid] = deltaR tail.TxnIDs[txnInc.Intra] = txid tail.LastValid[txnInc.Intra] = txnInc.LastValid @@ -381,6 +392,19 @@ func (t *txTail) checkDup(proto config.ConsensusParams, current basics.Round, fi return nil } +// checkConfirmed test to see if the given transaction id already exists. +func (t *txTail) checkConfirmed(txid transactions.Txid) (basics.Round, bool) { + t.tailMu.RLock() + defer t.tailMu.RUnlock() + + for lastValidRound, lastValid := range t.lastValid { + if deltaR, confirmed := lastValid[txid]; confirmed { + return lastValidRound - basics.Round(deltaR), true + } + } + return 0, false +} + func (t *txTail) recentTailHash(offset uint64, retainSize uint64) (crypto.Digest, error) { // prepare a buffer to hash. buffer := make([]byte, (retainSize)*crypto.DigestSize) diff --git a/ledger/txtail_test.go b/ledger/txtail_test.go index fa3473fca..8eb9d4990 100644 --- a/ledger/txtail_test.go +++ b/ledger/txtail_test.go @@ -112,13 +112,15 @@ func TestTxTailCheckdup(t *testing.T) { type txTailTestLedger struct { Ledger protoVersion protocol.ConsensusVersion + blocks map[basics.Round]bookkeeping.Block } const testTxTailValidityRange = 200 const testTxTailTxnPerRound = 150 +const testTxTailExtraRounds = 10 func (t *txTailTestLedger) Latest() basics.Round { - return basics.Round(config.Consensus[t.protoVersion].MaxTxnLife + 10) + return basics.Round(config.Consensus[t.protoVersion].MaxTxnLife + testTxTailExtraRounds) } func (t *txTailTestLedger) BlockHdr(r basics.Round) (bookkeeping.BlockHeader, error) { @@ -130,6 +132,10 @@ func (t *txTailTestLedger) BlockHdr(r basics.Round) (bookkeeping.BlockHeader, er } func (t *txTailTestLedger) Block(r basics.Round) (bookkeeping.Block, error) { + if bkl, found := t.blocks[r]; found { + return bkl, nil + } + blk := bookkeeping.Block{ BlockHeader: bookkeeping.BlockHeader{ UpgradeState: bookkeeping.UpgradeState{ @@ -142,6 +148,10 @@ func (t *txTailTestLedger) Block(r basics.Round) (bookkeeping.Block, error) { for i := range blk.Payset { blk.Payset[i] = makeTxTailTestTransaction(r, i) } + if t.blocks == nil { + t.blocks = make(map[basics.Round]bookkeeping.Block) + } + t.blocks[r] = blk return blk, nil } @@ -330,6 +340,74 @@ func TestTxTailDeltaTracking(t *testing.T) { } } +func TestTxTailCheckConfirmed(t *testing.T) { + partitiontest.PartitionTest(t) + + var ledger txTailTestLedger + txtail := txTail{} + protoVersion := protocol.ConsensusCurrentVersion + proto := config.Consensus[protoVersion] + require.NoError(t, ledger.initialize(t, protoVersion)) + require.NoError(t, txtail.loadFromDisk(&ledger, ledger.Latest())) + + // ensure block retrieval from txTailTestLedger works + startRound := ledger.Latest() - basics.Round(proto.MaxTxnLife) + 1 + b1, err := ledger.Block(startRound) + require.NoError(t, err) + b2, err := ledger.Block(startRound) + require.NoError(t, err) + require.Equal(t, b1, b2) + + // check all txids in blocks are in txTail as well + // note, txtail does not store txids for transactions with lastValid < ledger.Latest() + for i := ledger.Latest() - testTxTailValidityRange + 1; i < ledger.Latest(); i++ { + blk, err := ledger.Block(i) + require.NoError(t, err) + for _, txn := range blk.Payset { + confirmedAt, found := txtail.checkConfirmed(txn.Txn.ID()) + require.True(t, found, "failed to find txn at round %d (startRound=%d, latest=%d)", i, startRound, ledger.Latest()) + require.Equal(t, basics.Round(i), confirmedAt) + } + } + + rnd := ledger.Latest() + 1 + lv := basics.Round(rnd + 50) + blk := bookkeeping.Block{ + BlockHeader: bookkeeping.BlockHeader{ + Round: rnd, + TimeStamp: int64(rnd << 10), + UpgradeState: bookkeeping.UpgradeState{ + CurrentProtocol: protoVersion, + }, + }, + Payset: make(transactions.Payset, 1), + } + sender := &basics.Address{} + sender[0] = byte(rnd) + sender[1] = byte(rnd >> 8) + sender[2] = byte(rnd >> 16) + blk.Payset[0].Txn.Sender = *sender + blk.Payset[0].Txn.FirstValid = rnd + blk.Payset[0].Txn.LastValid = lv + deltas := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, 0, 0) + deltas.Txids[blk.Payset[0].Txn.ID()] = ledgercore.IncludedTransactions{ + LastValid: lv, + Intra: 0, + } + deltas.AddTxLease(ledgercore.Txlease{Sender: blk.Payset[0].Txn.Sender, Lease: blk.Payset[0].Txn.Lease}, basics.Round(rnd+50)) + + txtail.newBlock(blk, deltas) + txtail.committedUpTo(basics.Round(rnd)) + + confirmedAt, found := txtail.checkConfirmed(blk.Payset[0].Txn.ID()) + require.True(t, found) + require.Equal(t, basics.Round(rnd), confirmedAt) + + confirmedAt, found = txtail.checkConfirmed(transactions.Txid{}) + require.False(t, found) + require.Equal(t, basics.Round(0), confirmedAt) +} + // BenchmarkTxTailBlockHeaderCache adds 2M random blocks by calling // newBlock and postCommit on txTail tracker, and reports memory allocations func BenchmarkTxTailBlockHeaderCache(b *testing.B) { diff --git a/node/node.go b/node/node.go index 75d9bcfa2..f59bd67da 100644 --- a/node/node.go +++ b/node/node.go @@ -653,6 +653,25 @@ func (node *AlgorandFullNode) GetPendingTransaction(txID transactions.Txid) (res // Keep looking in the ledger. } + // quick check for confirmed transactions with LastValid in future + // this supposed to cover most of the cases where REST checks for the most recent txns + if r, confirmed := node.ledger.CheckConfirmedTail(txID); confirmed { + tx, foundBlk, err := node.ledger.LookupTxid(txID, r) + if err == nil && foundBlk { + return TxnWithStatus{ + Txn: tx.SignedTxn, + ConfirmedRound: r, + ApplyData: tx.ApplyData, + }, true + } + } + // if found in the pool and not in the tail then return without looking into blocks + // because the check appears to be too early + if found { + return res, found + } + + // fallback to blocks lookup var maxLife basics.Round latest := node.ledger.Latest() proto, err := node.ledger.ConsensusParams(latest) @@ -688,6 +707,7 @@ func (node *AlgorandFullNode) GetPendingTransaction(txID transactions.Txid) (res if err != nil || !found { continue } + return TxnWithStatus{ Txn: tx.SignedTxn, ConfirmedRound: r, @@ -696,7 +716,7 @@ func (node *AlgorandFullNode) GetPendingTransaction(txID transactions.Txid) (res } // Return whatever we found in the pool (if anything). - return + return res, found } // Status returns a StatusReport structure reporting our status as Active and with our ledger's LastRound |