diff options
author | Jason Paulos <jasonpaulos@users.noreply.github.com> | 2022-12-19 12:52:41 -0800 |
---|---|---|
committer | Jason Paulos <jasonpaulos@users.noreply.github.com> | 2022-12-19 12:52:41 -0800 |
commit | 6ad3ffcd6ca4631f97205593d1d5182d6aa4481c (patch) | |
tree | dc0255cc956c7caad41b3390cf28c1084b41860a | |
parent | 069a0a0c672edf54234ea3a1e517a1468036c8f1 (diff) | |
parent | 5979c090629f6f028bd7469598f905df23dd29af (diff) |
Merge branch 'master' into feature/simulate-endpointfeature/simulate-endpoint
-rw-r--r-- | data/accountManager_test.go | 11 | ||||
-rw-r--r-- | data/ledger_test.go | 6 | ||||
-rw-r--r-- | data/txHandler_test.go | 572 |
3 files changed, 494 insertions, 95 deletions
diff --git a/data/accountManager_test.go b/data/accountManager_test.go index 1fcfe56bf..17853f4bd 100644 --- a/data/accountManager_test.go +++ b/data/accountManager_test.go @@ -43,6 +43,11 @@ import ( func TestAccountManagerKeys(t *testing.T) { partitiontest.PartitionTest(t) + if testing.Short() { + t.Log("this is a long test and skipping for -short") + return + } + registry := &mocks.MockParticipationRegistry{} testAccountManagerKeys(t, registry, false) } @@ -85,6 +90,11 @@ func registryCloseTest(t testing.TB, registry account.ParticipationRegistry, dbf func TestAccountManagerKeysRegistry(t *testing.T) { partitiontest.PartitionTest(t) + if testing.Short() { + t.Log("this is a long test and skipping for -short") + return + } + registry, dbName := getRegistryImpl(t, false, true) defer registryCloseTest(t, registry, dbName) testAccountManagerKeys(t, registry, true) @@ -123,6 +133,7 @@ func testAccountManagerKeys(t *testing.T, registry account.ParticipationRegistry accessor, err := db.MakeErasableAccessor(partFilename) require.NoError(t, err) + defer accessor.Close() accessor.SetLogger(log) part, err := account.FillDBWithParticipationKeys(accessor, root.Address(), 0, 100, 10000) diff --git a/data/ledger_test.go b/data/ledger_test.go index b3cb326df..82782d6b9 100644 --- a/data/ledger_test.go +++ b/data/ledger_test.go @@ -327,6 +327,10 @@ func TestLedgerSeed(t *testing.T) { func TestConsensusVersion(t *testing.T) { partitiontest.PartitionTest(t) + if testing.Short() { + t.Log("this is a long test and skipping for -short") + return + } // find a consensus protocol that leads to ConsensusCurrentVersion var previousProtocol protocol.ConsensusVersion @@ -514,7 +518,7 @@ func TestLedgerErrorValidate(t *testing.T) { defer realLedger.Close() l := Ledger{Ledger: realLedger, log: log} - l.log.SetLevel(logging.Debug) + l.log.SetLevel(logging.Warn) require.NotNil(t, &l) totalsRound, _, err := realLedger.LatestTotals() diff --git a/data/txHandler_test.go b/data/txHandler_test.go index ad74a1a0e..dc1f7215a 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -33,6 +33,8 @@ import ( "github.com/stretchr/testify/require" + "github.com/algorand/go-deadlock" + "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/components/mocks" "github.com/algorand/go-algorand/config" @@ -42,6 +44,7 @@ import ( "github.com/algorand/go-algorand/data/pools" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/data/transactions/verify" + "github.com/algorand/go-algorand/data/txntest" realledger "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/logging" @@ -50,11 +53,9 @@ import ( "github.com/algorand/go-algorand/test/partitiontest" "github.com/algorand/go-algorand/util/execpool" "github.com/algorand/go-algorand/util/metrics" - - "github.com/algorand/go-deadlock" ) -func makeTestGenesisAccounts(tb require.TestingT, numUsers int) ([]basics.Address, []*crypto.SignatureSecrets, map[basics.Address]basics.AccountData) { +func makeTestGenesisAccounts(tb testing.TB, numUsers int) ([]basics.Address, []*crypto.SignatureSecrets, map[basics.Address]basics.AccountData) { addresses := make([]basics.Address, numUsers) secrets := make([]*crypto.SignatureSecrets, numUsers) genesis := make(map[basics.Address]basics.AccountData) @@ -91,12 +92,14 @@ func BenchmarkTxHandlerProcessing(b *testing.B) { cfg.Archival = true ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) require.NoError(b, err) + defer ledger.Close() l := ledger cfg.TxPoolSize = 75000 cfg.EnableProcessBlockStats = false txHandler := makeTestTxHandler(l, cfg) + defer txHandler.txVerificationPool.Shutdown() makeTxns := func(N int) [][]transactions.SignedTxn { ret := make([][]transactions.SignedTxn, 0, N) @@ -432,6 +435,45 @@ func BenchmarkTxHandlerProcessIncomingTxn16(b *testing.B) { finConsume() } +// BenchmarkTxHandlerProcessIncomingLogicTxn16 is similar to BenchmarkTxHandlerProcessIncomingTxn16 +// but with logicsig groups of 4 txns +func BenchmarkTxHandlerProcessIncomingLogicTxn16(b *testing.B) { + deadlockDisable := deadlock.Opts.Disable + deadlock.Opts.Disable = true + defer func() { + deadlock.Opts.Disable = deadlockDisable + }() + + const numSendThreads = 16 + handler := makeTestTxHandlerOrphaned(txBacklogSize) + + // prepare tx groups + blobs := make([][]byte, b.N) + stxns := make([][]transactions.SignedTxn, b.N) + for i := 0; i < b.N; i++ { + txns := txntest.CreateTinyManTxGroup(b, true) + stxns[i], _ = txntest.CreateTinyManSignedTxGroup(b, txns) + var blob []byte + for j := range stxns[i] { + encoded := protocol.Encode(&stxns[i][j]) + blob = append(blob, encoded...) + } + blobs[i] = blob + } + numTxnsPerGroup := len(stxns[0]) + + statsCh := make(chan [4]int, 1) + defer close(statsCh) + finConsume := benchTxHandlerProcessIncomingTxnConsume(b, handler, numTxnsPerGroup, 0, statsCh) + + // submit tx groups + b.ResetTimer() + finalizeSubmit := benchTxHandlerProcessIncomingTxnSubmit(b, handler, blobs, numSendThreads) + + finalizeSubmit() + finConsume() +} + // BenchmarkTxHandlerIncDeDup checks txn receiving with duplicates // simulating processing delay func BenchmarkTxHandlerIncDeDup(b *testing.B) { @@ -624,7 +666,7 @@ func TestTxHandlerProcessIncomingCensoring(t *testing.T) { } t.Run("single", func(t *testing.T) { - handler := makeTestTxHandlerOrphaned(txBacklogSize) + handler := makeTestTxHandlerOrphanedWithContext(context.Background(), txBacklogSize, txBacklogSize, txHandlerConfig{true, true}, 0) stxns, blob := makeRandomTransactions(1) stxn := stxns[0] action := handler.processIncomingTxn(network.IncomingMessage{Data: blob}) @@ -649,7 +691,7 @@ func TestTxHandlerProcessIncomingCensoring(t *testing.T) { }) t.Run("group", func(t *testing.T) { - handler := makeTestTxHandlerOrphaned(txBacklogSize) + handler := makeTestTxHandlerOrphanedWithContext(context.Background(), txBacklogSize, txBacklogSize, txHandlerConfig{true, true}, 0) num := rand.Intn(config.MaxTxGroupSize-1) + 2 // 2..config.MaxTxGroupSize require.LessOrEqual(t, num, config.MaxTxGroupSize) stxns, blob := makeRandomTransactions(num) @@ -719,10 +761,10 @@ func TestTxHandlerProcessIncomingCensoring(t *testing.T) { // makeTestTxHandlerOrphaned creates a tx handler without any backlog consumer. // It is caller responsibility to run a consumer thread. func makeTestTxHandlerOrphaned(backlogSize int) *TxHandler { - return makeTestTxHandlerOrphanedWithContext(context.Background(), txBacklogSize, txBacklogSize, 0) + return makeTestTxHandlerOrphanedWithContext(context.Background(), txBacklogSize, txBacklogSize, txHandlerConfig{true, false}, 0) } -func makeTestTxHandlerOrphanedWithContext(ctx context.Context, backlogSize int, cacheSize int, refreshInterval time.Duration) *TxHandler { +func makeTestTxHandlerOrphanedWithContext(ctx context.Context, backlogSize int, cacheSize int, txHandlerConfig txHandlerConfig, refreshInterval time.Duration) *TxHandler { if backlogSize <= 0 { backlogSize = txBacklogSize } @@ -733,7 +775,7 @@ func makeTestTxHandlerOrphanedWithContext(ctx context.Context, backlogSize int, backlogQueue: make(chan *txBacklogMsg, backlogSize), msgCache: makeSaltedCache(cacheSize), txCanonicalCache: makeDigestCache(cacheSize), - cacheConfig: txHandlerConfig{true, true}, + cacheConfig: txHandlerConfig, } handler.msgCache.start(ctx, refreshInterval) return handler @@ -839,7 +881,7 @@ func TestTxHandlerProcessIncomingCacheRotation(t *testing.T) { t.Run("scheduled", func(t *testing.T) { // double enqueue a single txn message, ensure it discarded ctx, cancelFunc := context.WithCancel(context.Background()) - handler := makeTestTxHandlerOrphanedWithContext(ctx, txBacklogSize, txBacklogSize, 10*time.Millisecond) + handler := makeTestTxHandlerOrphanedWithContext(ctx, txBacklogSize, txBacklogSize, txHandlerConfig{true, true}, 10*time.Millisecond) var action network.OutgoingMessage var msg *txBacklogMsg @@ -902,7 +944,7 @@ func TestTxHandlerProcessIncomingCacheBacklogDrop(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() - handler := makeTestTxHandlerOrphanedWithContext(context.Background(), 1, 20, 0) + handler := makeTestTxHandlerOrphanedWithContext(context.Background(), 1, 20, txHandlerConfig{true, true}, 0) stxns1, blob1 := makeRandomTransactions(1) require.Equal(t, 1, len(stxns1)) @@ -942,9 +984,11 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) { cfg.TxIncomingFilteringFlags = 3 // txFilterRawMsg + txFilterCanonical ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) require.NoError(t, err) + defer ledger.Close() l := ledger handler := makeTestTxHandler(l, cfg) + defer handler.txVerificationPool.Shutdown() handler.postVerificationQueue = make(chan *txBacklogMsg) makeTxns := func(sendIdx, recvIdx int) ([]transactions.SignedTxn, []byte) { @@ -1066,6 +1110,9 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool) }() + // reset the counters + transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) + transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool) const numUsers = 100 log := logging.TestingLog(t) @@ -1079,9 +1126,10 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t cfg.Archival = true ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) require.NoError(t, err) + defer ledger.Close() - l := ledger - handler := makeTestTxHandler(l, cfg) + handler := makeTestTxHandler(ledger, cfg) + defer handler.txVerificationPool.Shutdown() // since Start is not called, set the context here handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) @@ -1210,15 +1258,31 @@ func getDropped() (droppedBacklog, droppedPool uint64) { return } -// makeSignedTxnGroups prepares N transaction groups of random (maxGroupSize) sizes with random -// invalid signatures of a given probability (invalidProb) -func makeSignedTxnGroups(N, numUsers, maxGroupSize int, invalidProb float32, addresses []basics.Address, - secrets []*crypto.SignatureSecrets) (ret [][]transactions.SignedTxn, - badTxnGroups map[uint64]interface{}) { - badTxnGroups = make(map[uint64]interface{}) +func getTransaction(sender, receiver basics.Address, u int) transactions.Transaction { + noteField := make([]byte, binary.MaxVarintLen64) + binary.PutUvarint(noteField, uint64(u)) + + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: sender, + Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2}, + FirstValid: 0, + LastValid: basics.Round(proto.MaxTxnLife), + GenesisHash: genesisHash, + Note: noteField, + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: receiver, + Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)}, + }, + } + return tx +} +func getTransactionGroups(N, numUsers, maxGroupSize int, addresses []basics.Address) [][]transactions.Transaction { + txnGrps := make([][]transactions.Transaction, N) protoMaxGrpSize := proto.MaxTxGroupSize - ret = make([][]transactions.SignedTxn, 0, N) for u := 0; u < N; u++ { grpSize := rand.Intn(protoMaxGrpSize-1) + 1 if grpSize > maxGroupSize { @@ -1228,72 +1292,189 @@ func makeSignedTxnGroups(N, numUsers, maxGroupSize int, invalidProb float32, add txns := make([]transactions.Transaction, 0, grpSize) for g := 0; g < grpSize; g++ { // generate transactions - noteField := make([]byte, binary.MaxVarintLen64) - binary.PutUvarint(noteField, uint64(u)) - tx := transactions.Transaction{ - Type: protocol.PaymentTx, - Header: transactions.Header{ - Sender: addresses[(u+g)%numUsers], - Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2}, - FirstValid: 0, - LastValid: basics.Round(proto.MaxTxnLife), - GenesisHash: genesisHash, - Note: noteField, - }, - PaymentTxnFields: transactions.PaymentTxnFields{ - Receiver: addresses[(u+g+1)%numUsers], - Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)}, - }, - } + tx := getTransaction(addresses[(u+g)%numUsers], addresses[(u+g+1)%numUsers], u) if grpSize > 1 { txGroup.TxGroupHashes = append(txGroup.TxGroupHashes, crypto.Digest(tx.ID())) } txns = append(txns, tx) } - groupHash := crypto.HashObj(txGroup) - signedTxGroup := make([]transactions.SignedTxn, 0, grpSize) - for g, txn := range txns { - if grpSize > 1 { - txn.Group = groupHash + if grpSize > 1 { + groupHash := crypto.HashObj(txGroup) + for t := range txns { + txns[t].Group = groupHash } - signedTx := txn.Sign(secrets[(u+g)%numUsers]) - signedTx.Txn = txn + } + txnGrps[u] = txns + } + return txnGrps +} + +func signTransactionGroups(txnGroups [][]transactions.Transaction, secrets []*crypto.SignatureSecrets, invalidProb float32) ( + ret [][]transactions.SignedTxn, badTxnGroups map[uint64]interface{}) { + numUsers := len(secrets) + badTxnGroups = make(map[uint64]interface{}) + for tg := range txnGroups { + grpSize := len(txnGroups[tg]) + signedTxGroup := make([]transactions.SignedTxn, 0, grpSize) + for t := range txnGroups[tg] { + signedTx := txnGroups[tg][t].Sign(secrets[(tg+t)%numUsers]) + signedTx.Txn = txnGroups[tg][t] signedTxGroup = append(signedTxGroup, signedTx) } // randomly make bad signatures if rand.Float32() < invalidProb { tinGrp := rand.Intn(grpSize) signedTxGroup[tinGrp].Sig[0] = signedTxGroup[tinGrp].Sig[0] + 1 - badTxnGroups[uint64(u)] = struct{}{} + badTxnGroups[uint64(tg)] = struct{}{} } ret = append(ret, signedTxGroup) } return } +func signMSigTransactionGroups(txnGroups [][]transactions.Transaction, secrets []*crypto.SignatureSecrets, + invalidProb float32, msigSize int) (ret [][]transactions.SignedTxn, badTxnGroups map[uint64]interface{}, err error) { + ret = make([][]transactions.SignedTxn, len(txnGroups)) + numUsers := len(secrets) + badTxnGroups = make(map[uint64]interface{}) + badTxnGroupsMU := deadlock.Mutex{} + // process them using multiple threads + workers := make(chan interface{}, runtime.NumCPU()-1) + wg := sync.WaitGroup{} + errChan := make(chan error, 1) + for tg := range txnGroups { + wg.Add(1) + workers <- struct{}{} + go func(i int) { + defer func() { + wg.Done() + <-workers + }() + msigVer := uint8(1) + msigTHld := uint8(msigSize) + pks := make([]crypto.PublicKey, msigSize) + for x := 0; x < msigSize; x++ { + pks[x] = secrets[(i+x)%numUsers].SignatureVerifier + } + multiSigAddr, err := crypto.MultisigAddrGen(msigVer, msigTHld, pks) + if err != nil { + select { + case errChan <- err: + return + default: + return + } + } + grpSize := len(txnGroups[i]) + signedTxGroup := make([]transactions.SignedTxn, grpSize) + sigsForTxn := make([]crypto.MultisigSig, msigTHld) + + for t := range txnGroups[i] { + txnGroups[i][t].Sender = basics.Address(multiSigAddr) + for s := range sigsForTxn { + sig, err := crypto.MultisigSign(txnGroups[i][t], crypto.Digest(multiSigAddr), msigVer, msigTHld, pks, *secrets[(i+s)%numUsers]) + if err != nil { + select { + case errChan <- err: + return + default: + return + } + } + sigsForTxn[s] = sig + } + msig, err := crypto.MultisigAssemble(sigsForTxn) + if err != nil { + select { + case errChan <- err: + return + default: + return + } + } + signedTxGroup[t].Txn = txnGroups[i][t] + signedTxGroup[t].Msig = msig + } + // randomly make bad signatures + if rand.Float32() < invalidProb { + tinGrp := rand.Intn(grpSize) + tinMsig := rand.Intn(len(signedTxGroup[tinGrp].Msig.Subsigs)) + signedTxGroup[tinGrp].Msig.Subsigs[tinMsig].Sig[0] = signedTxGroup[tinGrp].Msig.Subsigs[tinMsig].Sig[0] + 1 + badTxnGroupsMU.Lock() + badTxnGroups[uint64(i)] = struct{}{} + badTxnGroupsMU.Unlock() + } + ret[i] = signedTxGroup + }(tg) + } + wg.Wait() + close(errChan) + err = <-errChan + return +} + +// makeSignedTxnGroups prepares N transaction groups of random (maxGroupSize) sizes with random +// invalid signatures of a given probability (invalidProb) +func makeSignedTxnGroups(N, numUsers, maxGroupSize int, invalidProb float32, addresses []basics.Address, + secrets []*crypto.SignatureSecrets) (ret [][]transactions.SignedTxn, + badTxnGroups map[uint64]interface{}) { + + txnGroups := getTransactionGroups(N, numUsers, maxGroupSize, addresses) + ret, badTxnGroups = signTransactionGroups(txnGroups, secrets, invalidProb) + return +} + +const numBenchUsers = 512 + // BenchmarkHandleTxns sends signed transactions directly to the verifier func BenchmarkHandleTxns(b *testing.B) { maxGroupSize := 1 - tpss := []int{6000000, 600000, 60000, 6000} invalidRates := []float32{0.5, 0.001} - for _, tps := range tpss { + for _, ivr := range invalidRates { + b.Run(fmt.Sprintf("inv_%.3f", ivr), func(b *testing.B) { + txGen := makeSigGenerator(b, numBenchUsers, maxGroupSize, ivr) + runHandlerBenchmarkWithBacklog(b, txGen, 0, false) + }) + } +} + +// BenchmarkHandleTxnGroups sends signed transaction groups directly to the verifier +func BenchmarkHandleTxnGroups(b *testing.B) { + maxGroupSize := proto.MaxTxGroupSize / 2 + invalidRates := []float32{0.5, 0.001} + for _, ivr := range invalidRates { + b.Run(fmt.Sprintf("inv_%.3f", ivr), func(b *testing.B) { + txGen := makeSigGenerator(b, numBenchUsers, maxGroupSize, ivr) + runHandlerBenchmarkWithBacklog(b, txGen, 0, false) + }) + } +} + +// BenchmarkHandleMsigTxns sends signed transactions directly to the verifier +func BenchmarkHandleMsigTxns(b *testing.B) { + maxGroupSize := 1 + msigSizes := []int{255, 64, 16} + invalidRates := []float32{0.5, 0.001} + for _, msigSize := range msigSizes { for _, ivr := range invalidRates { - b.Run(fmt.Sprintf("tps_%d_inv_%.3f", tps, ivr), func(b *testing.B) { - runHandlerBenchmarkWithBacklog(maxGroupSize, tps, ivr, b, false) + b.Run(fmt.Sprintf("msigSize_%d_inv_%.3f", msigSize, ivr), func(b *testing.B) { + txGen := makeMsigGenerator(b, numBenchUsers, maxGroupSize, ivr, msigSize) + runHandlerBenchmarkWithBacklog(b, txGen, 0, false) }) } } } // BenchmarkHandleTxnGroups sends signed transaction groups directly to the verifier -func BenchmarkHandleTxnGroups(b *testing.B) { +func BenchmarkHandleMsigTxnGroups(b *testing.B) { maxGroupSize := proto.MaxTxGroupSize / 2 - tpss := []int{6000000, 600000, 60000, 6000} + msigSizes := []int{255, 64, 16} invalidRates := []float32{0.5, 0.001} - for _, tps := range tpss { + for _, msigSize := range msigSizes { for _, ivr := range invalidRates { - b.Run(fmt.Sprintf("tps_%d_inv_%.3f", tps, ivr), func(b *testing.B) { - runHandlerBenchmarkWithBacklog(maxGroupSize, tps, ivr, b, false) + b.Run(fmt.Sprintf("msigSize_%d_inv_%.3f", msigSize, ivr), func(b *testing.B) { + txGen := makeMsigGenerator(b, numBenchUsers, maxGroupSize, ivr, msigSize) + runHandlerBenchmarkWithBacklog(b, txGen, 0, false) }) } } @@ -1308,7 +1489,8 @@ func BenchmarkHandleBLWTxns(b *testing.B) { for _, tps := range tpss { for _, ivr := range invalidRates { b.Run(fmt.Sprintf("tps_%d_inv_%.3f", tps, ivr), func(b *testing.B) { - runHandlerBenchmarkWithBacklog(maxGroupSize, tps, ivr, b, true) + txGen := makeSigGenerator(b, numBenchUsers, maxGroupSize, ivr) + runHandlerBenchmarkWithBacklog(b, txGen, tps, true) }) } } @@ -1323,37 +1505,155 @@ func BenchmarkHandleBLWTxnGroups(b *testing.B) { for _, tps := range tpss { for _, ivr := range invalidRates { b.Run(fmt.Sprintf("tps_%d_inv_%.3f", tps, ivr), func(b *testing.B) { - runHandlerBenchmarkWithBacklog(maxGroupSize, tps, ivr, b, true) + txGen := makeSigGenerator(b, numBenchUsers, maxGroupSize, ivr) + runHandlerBenchmarkWithBacklog(b, txGen, tps, true) }) } } } -// runHandlerBenchmarkWithBacklog benchmarks the number of transactions verfied or dropped -func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32, b *testing.B, useBacklogWorker bool) { +// BenchmarkHandleTxnGroups sends signed transaction groups directly to the verifier +func BenchmarkHandleLsigTxnGroups(b *testing.B) { + maxGroupSize := proto.MaxTxGroupSize / 2 + invalidRates := []float32{0.5, 0.001} + for _, ivr := range invalidRates { + b.Run(fmt.Sprintf("lsig-inv_%.3f", ivr), func(b *testing.B) { + txGen := makeLsigGenerator(b, numBenchUsers, maxGroupSize, ivr) + runHandlerBenchmarkWithBacklog(b, txGen, 0, false) + }) + } +} + +type txGenIf interface { + makeLedger(tb testing.TB, cfg config.Local, log logging.Logger, namePrefix string) *Ledger + createSignedTxGroups(tb testing.TB, txgCount int) ([][]transactions.SignedTxn, map[uint64]interface{}) +} + +type txGenerator struct { + numUsers int + maxGroupSize int + invalidRate float32 + + addresses []basics.Address + secrets []*crypto.SignatureSecrets + genesis map[basics.Address]basics.AccountData +} + +type sigGenerator struct { + txGenerator +} + +type msigGenerator struct { + txGenerator + msigSize int +} + +type lsigGenerator struct { + txGenerator +} + +func makeTxGenerator(tb testing.TB, numUsers, maxGroupSize int, invalidRate float32) *txGenerator { + addresses, secrets, genesis := makeTestGenesisAccounts(tb, numUsers) + return &txGenerator{ + numUsers: numUsers, + maxGroupSize: maxGroupSize, + invalidRate: invalidRate, + addresses: addresses, + secrets: secrets, + genesis: genesis, + } +} + +func (g *txGenerator) makeLedger(tb testing.TB, cfg config.Local, log logging.Logger, namePrefix string) *Ledger { + genBal := bookkeeping.MakeGenesisBalances(g.genesis, sinkAddr, poolAddr) + ivrString := strings.IndexAny(fmt.Sprintf("%f", g.invalidRate), "1") + ledgerName := fmt.Sprintf("%s-in_mem-w_inv=%d", namePrefix, ivrString) + ledgerName = strings.Replace(ledgerName, "#", "-", 1) + const inMem = true + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + require.NoError(tb, err) + return ledger +} + +func makeSigGenerator(tb testing.TB, numUsers, maxGroupSize int, invalidRate float32) *sigGenerator { + return &sigGenerator{ + txGenerator: *makeTxGenerator(tb, numUsers, maxGroupSize, invalidRate), + } +} + +func (g *sigGenerator) createSignedTxGroups(tb testing.TB, txgCount int) ([][]transactions.SignedTxn, map[uint64]interface{}) { + return makeSignedTxnGroups(txgCount, g.numUsers, g.maxGroupSize, g.invalidRate, g.addresses, g.secrets) +} + +func makeMsigGenerator(tb testing.TB, numUsers, maxGroupSize int, invalidRate float32, msigSize int) *msigGenerator { + return &msigGenerator{ + txGenerator: *makeTxGenerator(tb, numUsers, maxGroupSize, invalidRate), + msigSize: msigSize, + } +} + +func (g *msigGenerator) createSignedTxGroups(tb testing.TB, txgCount int) ([][]transactions.SignedTxn, map[uint64]interface{}) { + txnGroups := getTransactionGroups(txgCount, g.numUsers, g.maxGroupSize, g.addresses) + signedTransactionGroups, badTxnGroups, err := signMSigTransactionGroups(txnGroups, g.secrets, g.invalidRate, g.msigSize) + require.NoError(tb, err) + return signedTransactionGroups, badTxnGroups +} + +func makeLsigGenerator(tb testing.TB, numUsers, maxGroupSize int, invalidRate float32) *lsigGenerator { + return &lsigGenerator{ + txGenerator: *makeTxGenerator(tb, numUsers, maxGroupSize, invalidRate), + } +} + +func (g *lsigGenerator) createSignedTxGroups(tb testing.TB, txgCount int) ([][]transactions.SignedTxn, map[uint64]interface{}) { + stxns := make([][]transactions.SignedTxn, txgCount) + badTxnGroups := make(map[uint64]interface{}) + for i := 0; i < txgCount; i++ { + txns := txntest.CreateTinyManTxGroup(tb, true) + stxns[i], _ = txntest.CreateTinyManSignedTxGroup(tb, txns) + + // randomly make bad signatures + if rand.Float32() < g.invalidRate { + tinGrp := rand.Intn(len(txns)) + if stxns[i][tinGrp].Sig != (crypto.Signature{}) { + stxns[i][tinGrp].Sig[0] = stxns[i][tinGrp].Sig[0] + 1 + } else { + stxns[i][tinGrp].Lsig.Logic[0] = 255 + } + badTxnGroups[uint64(i)] = struct{}{} + } + } + return stxns, badTxnGroups +} + +// runHandlerBenchmarkWithBacklog benchmarks the number of transactions verified or dropped +func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBacklogWorker bool) { defer func() { // reset the counters transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool) }() + // reset the counters + transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) + transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool) - const numUsers = 100 log := logging.TestingLog(b) log.SetLevel(logging.Warn) - addresses, secrets, genesis := makeTestGenesisAccounts(b, numUsers) - genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr) - ivrString := strings.IndexAny(fmt.Sprintf("%f", invalidRate), "1") - ledgerName := fmt.Sprintf("%s-mem-%d-%d", b.Name(), b.N, ivrString) - ledgerName = strings.Replace(ledgerName, "#", "-", 1) - const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) - require.NoError(b, err) + ledger := txGen.makeLedger(b, cfg, log, fmt.Sprintf("%s-%d", b.Name(), b.N)) + defer ledger.Close() + handler := makeTestTxHandler(ledger, cfg) + defer handler.txVerificationPool.Shutdown() + + // The benchmark generates only 1000 txns, and reuses them. This is done for faster benchmark time and the + // ability to have long runs without being limited to the memory. The dedup will block the txns once the same + // ones are rotated again. If the purpose is to test dedup, then this can be changed by setting + // genTCount = b.N + handler.cacheConfig.enableFilteringRawMsg = false + handler.cacheConfig.enableFilteringCanonical = false - l := ledger - handler := makeTestTxHandler(l, cfg) // since Start is not called, set the context here handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) defer handler.ctxCancel() @@ -1407,24 +1707,30 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32, }() } - // Prepare the transactions - signedTransactionGroups, badTxnGroups := makeSignedTxnGroups(b.N, numUsers, maxGroupSize, invalidRate, addresses, secrets) - var encodedSignedTransactionGroups []network.IncomingMessage + // Prepare 1000 transactions + genTCount := 1000 + if b.N < genTCount { + genTCount = b.N + } + signedTransactionGroups, badTxnGroups := txGen.createSignedTxGroups(b, genTCount) + var encStxns []network.IncomingMessage if useBacklogWorker { - encodedSignedTransactionGroups = make([]network.IncomingMessage, 0, b.N) + encStxns = make([]network.IncomingMessage, 0, genTCount) for _, stxngrp := range signedTransactionGroups { data := make([]byte, 0) for _, stxn := range stxngrp { data = append(data, protocol.Encode(&stxn)...) } - encodedSignedTransactionGroups = - append(encodedSignedTransactionGroups, network.IncomingMessage{Data: data}) + encStxns = append(encStxns, network.IncomingMessage{Data: data}) } } var tt time.Time // Process the results and make sure they are correct - rateAdjuster := time.Second / time.Duration(tps) + var rateAdjuster time.Duration + if tps > 0 { + rateAdjuster = time.Second / time.Duration(tps) + } wg.Add(1) go func() { defer wg.Done() @@ -1433,21 +1739,31 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32, invalidCounter := 0 defer func() { if groupCounter > 1 { + timeSinceStart := time.Since(tt) droppedBacklog, droppedPool := getDropped() - b.Logf("Input T(grp)PS: %d (delay %f microsec)", tps, float64(rateAdjuster)/float64(time.Microsecond)) - b.Logf("Verified TPS: %d", uint64(txnCounter)*uint64(time.Second)/uint64(time.Since(tt))) - b.Logf("Time/txn: %d(microsec)", uint64((time.Since(tt)/time.Microsecond))/txnCounter) + if tps > 0 { + b.Logf("Input T(grp)PS: %d (delay %f microsec)", tps, float64(rateAdjuster)/float64(time.Microsecond)) + } + b.Logf("Verified TPS: %d T(grp)PS: %d", uint64(txnCounter)*uint64(time.Second)/uint64(timeSinceStart), + uint64(groupCounter)*uint64(time.Second)/uint64(timeSinceStart)) + b.Logf("Time/txn: %d(microsec)", uint64(timeSinceStart/time.Microsecond)/txnCounter) b.Logf("processed total: [%d groups (%d invalid)] [%d txns]", groupCounter, invalidCounter, txnCounter) b.Logf("dropped: [%d backlog] [%d pool]\n", droppedBacklog, droppedPool) } handler.Stop() // cancel the handler ctx }() + counterMutex := deadlock.Mutex{} stopChan := make(chan interface{}) + wg.Add(1) go func() { + defer wg.Done() for { time.Sleep(200 * time.Millisecond) droppedBacklog, droppedPool := getDropped() - if int(groupCounter+droppedBacklog+droppedPool) == len(signedTransactionGroups) { + counterMutex.Lock() + counters := groupCounter + droppedBacklog + droppedPool + counterMutex.Unlock() + if int(counters) == b.N { // all the benchmark txns processed close(stopChan) return @@ -1459,7 +1775,9 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32, select { case wi := <-testResultChan: txnCounter = txnCounter + uint64(len(wi.unverifiedTxGroup)) + counterMutex.Lock() groupCounter++ + counterMutex.Unlock() u, _ := binary.Uvarint(wi.unverifiedTxGroup[0].Txn.Note) _, inBad := badTxnGroups[u] if wi.verificationErr == nil { @@ -1468,7 +1786,7 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32, invalidCounter++ require.True(b, inBad, "Error for good signature") } - if groupCounter == uint64(len(signedTransactionGroups)) { + if groupCounter == uint64(b.N) { // all the benchmark txns processed return } @@ -1478,18 +1796,30 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32, } }() + completed := false + c := 0 + ticker := &time.Ticker{} + if rateAdjuster > 0 { + ticker = time.NewTicker(rateAdjuster) + } + defer ticker.Stop() b.ResetTimer() tt = time.Now() - if useBacklogWorker { - for _, tg := range encodedSignedTransactionGroups { - handler.processIncomingTxn(tg) - time.Sleep(rateAdjuster) - } - } else { - for _, stxngrp := range signedTransactionGroups { - blm := txBacklogMsg{rawmsg: nil, unverifiedTxGroup: stxngrp} - handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, &blm, nil) - time.Sleep(rateAdjuster) + for !completed { + for i := range signedTransactionGroups { + if useBacklogWorker { + handler.processIncomingTxn(encStxns[i]) + <-ticker.C + } else { + stxngrp := signedTransactionGroups[i] + blm := txBacklogMsg{rawmsg: nil, unverifiedTxGroup: stxngrp} + handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, &blm, nil) + } + c++ + if c == b.N { + completed = true + break + } } } wg.Wait() @@ -1498,7 +1828,22 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32, func TestTxHandlerPostProcessError(t *testing.T) { partitiontest.PartitionTest(t) - t.Parallel() + + defer func() { + transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) + transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted) + transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee) + transactionMessagesTxnSigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigNotWellFormed) + transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigNotWellFormed) + transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig) + }() + + transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) + transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted) + transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee) + transactionMessagesTxnSigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigNotWellFormed) + transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigNotWellFormed) + transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig) collect := func() map[string]float64 { // collect all specific error reason metrics except TxGroupErrorReasonNotWellFormed, @@ -1555,7 +1900,11 @@ func TestTxHandlerPostProcessError(t *testing.T) { func TestTxHandlerPostProcessErrorWithVerify(t *testing.T) { partitiontest.PartitionTest(t) - t.Parallel() + + defer func() { + transactionMessagesTxnNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnNotWellFormed) + }() + transactionMessagesTxnNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnNotWellFormed) txn := transactions.Transaction{} stxn := transactions.SignedTxn{Txn: txn} @@ -1582,7 +1931,19 @@ func TestTxHandlerPostProcessErrorWithVerify(t *testing.T) { // TestTxHandlerRememberReportErrors checks Is and As statements work as expected func TestTxHandlerRememberReportErrors(t *testing.T) { partitiontest.PartitionTest(t) - t.Parallel() + + defer func() { + transactionMessageTxPoolRememberCounter = metrics.NewTagCounter( + "algod_transaction_messages_txpool_remember_err_{TAG}", "Number of transaction messages not remembered by txpool b/c of {TAG}", + txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID, + txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric, + ) + }() + transactionMessageTxPoolRememberCounter = metrics.NewTagCounter( + "algod_transaction_messages_txpool_remember_err_{TAG}", "Number of transaction messages not remembered by txpool b/c of {TAG}", + txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID, + txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric, + ) var txh TxHandler result := map[string]float64{} @@ -1643,7 +2004,28 @@ func (t *blockTicker) Wait() { func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { partitiontest.PartitionTest(t) - t.Parallel() + defer func() { + transactionMessageTxPoolRememberCounter = metrics.NewTagCounter( + "algod_transaction_messages_txpool_remember_err_{TAG}", "Number of transaction messages not remembered by txpool b/c of {TAG}", + txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID, + txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric, + ) + transactionMessageTxPoolCheckCounter = metrics.NewTagCounter( + "algod_transaction_messages_txpool_check_err_{TAG}", "Number of transaction messages that didn't pass check by txpool b/c of {TAG}", + txPoolRememberTagTxnNotWellFormed, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID, + txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric, + ) + }() + transactionMessageTxPoolRememberCounter = metrics.NewTagCounter( + "algod_transaction_messages_txpool_remember_err_{TAG}", "Number of transaction messages not remembered by txpool b/c of {TAG}", + txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID, + txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric, + ) + transactionMessageTxPoolCheckCounter = metrics.NewTagCounter( + "algod_transaction_messages_txpool_check_err_{TAG}", "Number of transaction messages that didn't pass check by txpool b/c of {TAG}", + txPoolRememberTagTxnNotWellFormed, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID, + txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric, + ) result := map[string]float64{} checkResult := map[string]float64{} @@ -1694,8 +2076,10 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { cfg.TxPoolSize = config.MaxTxGroupSize + 1 ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) require.NoError(t, err) + defer ledger.Close() handler := makeTestTxHandler(ledger, cfg) + defer handler.txVerificationPool.Shutdown() // since Start is not called, set the context here handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) defer handler.ctxCancel() |