summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Paulos <jasonpaulos@users.noreply.github.com>2022-12-19 12:52:41 -0800
committerJason Paulos <jasonpaulos@users.noreply.github.com>2022-12-19 12:52:41 -0800
commit6ad3ffcd6ca4631f97205593d1d5182d6aa4481c (patch)
treedc0255cc956c7caad41b3390cf28c1084b41860a
parent069a0a0c672edf54234ea3a1e517a1468036c8f1 (diff)
parent5979c090629f6f028bd7469598f905df23dd29af (diff)
Merge branch 'master' into feature/simulate-endpointfeature/simulate-endpoint
-rw-r--r--data/accountManager_test.go11
-rw-r--r--data/ledger_test.go6
-rw-r--r--data/txHandler_test.go572
3 files changed, 494 insertions, 95 deletions
diff --git a/data/accountManager_test.go b/data/accountManager_test.go
index 1fcfe56bf..17853f4bd 100644
--- a/data/accountManager_test.go
+++ b/data/accountManager_test.go
@@ -43,6 +43,11 @@ import (
func TestAccountManagerKeys(t *testing.T) {
partitiontest.PartitionTest(t)
+ if testing.Short() {
+ t.Log("this is a long test and skipping for -short")
+ return
+ }
+
registry := &mocks.MockParticipationRegistry{}
testAccountManagerKeys(t, registry, false)
}
@@ -85,6 +90,11 @@ func registryCloseTest(t testing.TB, registry account.ParticipationRegistry, dbf
func TestAccountManagerKeysRegistry(t *testing.T) {
partitiontest.PartitionTest(t)
+ if testing.Short() {
+ t.Log("this is a long test and skipping for -short")
+ return
+ }
+
registry, dbName := getRegistryImpl(t, false, true)
defer registryCloseTest(t, registry, dbName)
testAccountManagerKeys(t, registry, true)
@@ -123,6 +133,7 @@ func testAccountManagerKeys(t *testing.T, registry account.ParticipationRegistry
accessor, err := db.MakeErasableAccessor(partFilename)
require.NoError(t, err)
+ defer accessor.Close()
accessor.SetLogger(log)
part, err := account.FillDBWithParticipationKeys(accessor, root.Address(), 0, 100, 10000)
diff --git a/data/ledger_test.go b/data/ledger_test.go
index b3cb326df..82782d6b9 100644
--- a/data/ledger_test.go
+++ b/data/ledger_test.go
@@ -327,6 +327,10 @@ func TestLedgerSeed(t *testing.T) {
func TestConsensusVersion(t *testing.T) {
partitiontest.PartitionTest(t)
+ if testing.Short() {
+ t.Log("this is a long test and skipping for -short")
+ return
+ }
// find a consensus protocol that leads to ConsensusCurrentVersion
var previousProtocol protocol.ConsensusVersion
@@ -514,7 +518,7 @@ func TestLedgerErrorValidate(t *testing.T) {
defer realLedger.Close()
l := Ledger{Ledger: realLedger, log: log}
- l.log.SetLevel(logging.Debug)
+ l.log.SetLevel(logging.Warn)
require.NotNil(t, &l)
totalsRound, _, err := realLedger.LatestTotals()
diff --git a/data/txHandler_test.go b/data/txHandler_test.go
index ad74a1a0e..dc1f7215a 100644
--- a/data/txHandler_test.go
+++ b/data/txHandler_test.go
@@ -33,6 +33,8 @@ import (
"github.com/stretchr/testify/require"
+ "github.com/algorand/go-deadlock"
+
"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/components/mocks"
"github.com/algorand/go-algorand/config"
@@ -42,6 +44,7 @@ import (
"github.com/algorand/go-algorand/data/pools"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/verify"
+ "github.com/algorand/go-algorand/data/txntest"
realledger "github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
@@ -50,11 +53,9 @@ import (
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
-
- "github.com/algorand/go-deadlock"
)
-func makeTestGenesisAccounts(tb require.TestingT, numUsers int) ([]basics.Address, []*crypto.SignatureSecrets, map[basics.Address]basics.AccountData) {
+func makeTestGenesisAccounts(tb testing.TB, numUsers int) ([]basics.Address, []*crypto.SignatureSecrets, map[basics.Address]basics.AccountData) {
addresses := make([]basics.Address, numUsers)
secrets := make([]*crypto.SignatureSecrets, numUsers)
genesis := make(map[basics.Address]basics.AccountData)
@@ -91,12 +92,14 @@ func BenchmarkTxHandlerProcessing(b *testing.B) {
cfg.Archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
require.NoError(b, err)
+ defer ledger.Close()
l := ledger
cfg.TxPoolSize = 75000
cfg.EnableProcessBlockStats = false
txHandler := makeTestTxHandler(l, cfg)
+ defer txHandler.txVerificationPool.Shutdown()
makeTxns := func(N int) [][]transactions.SignedTxn {
ret := make([][]transactions.SignedTxn, 0, N)
@@ -432,6 +435,45 @@ func BenchmarkTxHandlerProcessIncomingTxn16(b *testing.B) {
finConsume()
}
+// BenchmarkTxHandlerProcessIncomingLogicTxn16 is similar to BenchmarkTxHandlerProcessIncomingTxn16
+// but with logicsig groups of 4 txns
+func BenchmarkTxHandlerProcessIncomingLogicTxn16(b *testing.B) {
+ deadlockDisable := deadlock.Opts.Disable
+ deadlock.Opts.Disable = true
+ defer func() {
+ deadlock.Opts.Disable = deadlockDisable
+ }()
+
+ const numSendThreads = 16
+ handler := makeTestTxHandlerOrphaned(txBacklogSize)
+
+ // prepare tx groups
+ blobs := make([][]byte, b.N)
+ stxns := make([][]transactions.SignedTxn, b.N)
+ for i := 0; i < b.N; i++ {
+ txns := txntest.CreateTinyManTxGroup(b, true)
+ stxns[i], _ = txntest.CreateTinyManSignedTxGroup(b, txns)
+ var blob []byte
+ for j := range stxns[i] {
+ encoded := protocol.Encode(&stxns[i][j])
+ blob = append(blob, encoded...)
+ }
+ blobs[i] = blob
+ }
+ numTxnsPerGroup := len(stxns[0])
+
+ statsCh := make(chan [4]int, 1)
+ defer close(statsCh)
+ finConsume := benchTxHandlerProcessIncomingTxnConsume(b, handler, numTxnsPerGroup, 0, statsCh)
+
+ // submit tx groups
+ b.ResetTimer()
+ finalizeSubmit := benchTxHandlerProcessIncomingTxnSubmit(b, handler, blobs, numSendThreads)
+
+ finalizeSubmit()
+ finConsume()
+}
+
// BenchmarkTxHandlerIncDeDup checks txn receiving with duplicates
// simulating processing delay
func BenchmarkTxHandlerIncDeDup(b *testing.B) {
@@ -624,7 +666,7 @@ func TestTxHandlerProcessIncomingCensoring(t *testing.T) {
}
t.Run("single", func(t *testing.T) {
- handler := makeTestTxHandlerOrphaned(txBacklogSize)
+ handler := makeTestTxHandlerOrphanedWithContext(context.Background(), txBacklogSize, txBacklogSize, txHandlerConfig{true, true}, 0)
stxns, blob := makeRandomTransactions(1)
stxn := stxns[0]
action := handler.processIncomingTxn(network.IncomingMessage{Data: blob})
@@ -649,7 +691,7 @@ func TestTxHandlerProcessIncomingCensoring(t *testing.T) {
})
t.Run("group", func(t *testing.T) {
- handler := makeTestTxHandlerOrphaned(txBacklogSize)
+ handler := makeTestTxHandlerOrphanedWithContext(context.Background(), txBacklogSize, txBacklogSize, txHandlerConfig{true, true}, 0)
num := rand.Intn(config.MaxTxGroupSize-1) + 2 // 2..config.MaxTxGroupSize
require.LessOrEqual(t, num, config.MaxTxGroupSize)
stxns, blob := makeRandomTransactions(num)
@@ -719,10 +761,10 @@ func TestTxHandlerProcessIncomingCensoring(t *testing.T) {
// makeTestTxHandlerOrphaned creates a tx handler without any backlog consumer.
// It is caller responsibility to run a consumer thread.
func makeTestTxHandlerOrphaned(backlogSize int) *TxHandler {
- return makeTestTxHandlerOrphanedWithContext(context.Background(), txBacklogSize, txBacklogSize, 0)
+ return makeTestTxHandlerOrphanedWithContext(context.Background(), txBacklogSize, txBacklogSize, txHandlerConfig{true, false}, 0)
}
-func makeTestTxHandlerOrphanedWithContext(ctx context.Context, backlogSize int, cacheSize int, refreshInterval time.Duration) *TxHandler {
+func makeTestTxHandlerOrphanedWithContext(ctx context.Context, backlogSize int, cacheSize int, txHandlerConfig txHandlerConfig, refreshInterval time.Duration) *TxHandler {
if backlogSize <= 0 {
backlogSize = txBacklogSize
}
@@ -733,7 +775,7 @@ func makeTestTxHandlerOrphanedWithContext(ctx context.Context, backlogSize int,
backlogQueue: make(chan *txBacklogMsg, backlogSize),
msgCache: makeSaltedCache(cacheSize),
txCanonicalCache: makeDigestCache(cacheSize),
- cacheConfig: txHandlerConfig{true, true},
+ cacheConfig: txHandlerConfig,
}
handler.msgCache.start(ctx, refreshInterval)
return handler
@@ -839,7 +881,7 @@ func TestTxHandlerProcessIncomingCacheRotation(t *testing.T) {
t.Run("scheduled", func(t *testing.T) {
// double enqueue a single txn message, ensure it discarded
ctx, cancelFunc := context.WithCancel(context.Background())
- handler := makeTestTxHandlerOrphanedWithContext(ctx, txBacklogSize, txBacklogSize, 10*time.Millisecond)
+ handler := makeTestTxHandlerOrphanedWithContext(ctx, txBacklogSize, txBacklogSize, txHandlerConfig{true, true}, 10*time.Millisecond)
var action network.OutgoingMessage
var msg *txBacklogMsg
@@ -902,7 +944,7 @@ func TestTxHandlerProcessIncomingCacheBacklogDrop(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()
- handler := makeTestTxHandlerOrphanedWithContext(context.Background(), 1, 20, 0)
+ handler := makeTestTxHandlerOrphanedWithContext(context.Background(), 1, 20, txHandlerConfig{true, true}, 0)
stxns1, blob1 := makeRandomTransactions(1)
require.Equal(t, 1, len(stxns1))
@@ -942,9 +984,11 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) {
cfg.TxIncomingFilteringFlags = 3 // txFilterRawMsg + txFilterCanonical
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
require.NoError(t, err)
+ defer ledger.Close()
l := ledger
handler := makeTestTxHandler(l, cfg)
+ defer handler.txVerificationPool.Shutdown()
handler.postVerificationQueue = make(chan *txBacklogMsg)
makeTxns := func(sendIdx, recvIdx int) ([]transactions.SignedTxn, []byte) {
@@ -1066,6 +1110,9 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t
transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool)
}()
+ // reset the counters
+ transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
+ transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool)
const numUsers = 100
log := logging.TestingLog(t)
@@ -1079,9 +1126,10 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t
cfg.Archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
require.NoError(t, err)
+ defer ledger.Close()
- l := ledger
- handler := makeTestTxHandler(l, cfg)
+ handler := makeTestTxHandler(ledger, cfg)
+ defer handler.txVerificationPool.Shutdown()
// since Start is not called, set the context here
handler.ctx, handler.ctxCancel = context.WithCancel(context.Background())
@@ -1210,15 +1258,31 @@ func getDropped() (droppedBacklog, droppedPool uint64) {
return
}
-// makeSignedTxnGroups prepares N transaction groups of random (maxGroupSize) sizes with random
-// invalid signatures of a given probability (invalidProb)
-func makeSignedTxnGroups(N, numUsers, maxGroupSize int, invalidProb float32, addresses []basics.Address,
- secrets []*crypto.SignatureSecrets) (ret [][]transactions.SignedTxn,
- badTxnGroups map[uint64]interface{}) {
- badTxnGroups = make(map[uint64]interface{})
+func getTransaction(sender, receiver basics.Address, u int) transactions.Transaction {
+ noteField := make([]byte, binary.MaxVarintLen64)
+ binary.PutUvarint(noteField, uint64(u))
+
+ tx := transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: sender,
+ Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
+ FirstValid: 0,
+ LastValid: basics.Round(proto.MaxTxnLife),
+ GenesisHash: genesisHash,
+ Note: noteField,
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: receiver,
+ Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
+ },
+ }
+ return tx
+}
+func getTransactionGroups(N, numUsers, maxGroupSize int, addresses []basics.Address) [][]transactions.Transaction {
+ txnGrps := make([][]transactions.Transaction, N)
protoMaxGrpSize := proto.MaxTxGroupSize
- ret = make([][]transactions.SignedTxn, 0, N)
for u := 0; u < N; u++ {
grpSize := rand.Intn(protoMaxGrpSize-1) + 1
if grpSize > maxGroupSize {
@@ -1228,72 +1292,189 @@ func makeSignedTxnGroups(N, numUsers, maxGroupSize int, invalidProb float32, add
txns := make([]transactions.Transaction, 0, grpSize)
for g := 0; g < grpSize; g++ {
// generate transactions
- noteField := make([]byte, binary.MaxVarintLen64)
- binary.PutUvarint(noteField, uint64(u))
- tx := transactions.Transaction{
- Type: protocol.PaymentTx,
- Header: transactions.Header{
- Sender: addresses[(u+g)%numUsers],
- Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
- FirstValid: 0,
- LastValid: basics.Round(proto.MaxTxnLife),
- GenesisHash: genesisHash,
- Note: noteField,
- },
- PaymentTxnFields: transactions.PaymentTxnFields{
- Receiver: addresses[(u+g+1)%numUsers],
- Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
- },
- }
+ tx := getTransaction(addresses[(u+g)%numUsers], addresses[(u+g+1)%numUsers], u)
if grpSize > 1 {
txGroup.TxGroupHashes = append(txGroup.TxGroupHashes, crypto.Digest(tx.ID()))
}
txns = append(txns, tx)
}
- groupHash := crypto.HashObj(txGroup)
- signedTxGroup := make([]transactions.SignedTxn, 0, grpSize)
- for g, txn := range txns {
- if grpSize > 1 {
- txn.Group = groupHash
+ if grpSize > 1 {
+ groupHash := crypto.HashObj(txGroup)
+ for t := range txns {
+ txns[t].Group = groupHash
}
- signedTx := txn.Sign(secrets[(u+g)%numUsers])
- signedTx.Txn = txn
+ }
+ txnGrps[u] = txns
+ }
+ return txnGrps
+}
+
+func signTransactionGroups(txnGroups [][]transactions.Transaction, secrets []*crypto.SignatureSecrets, invalidProb float32) (
+ ret [][]transactions.SignedTxn, badTxnGroups map[uint64]interface{}) {
+ numUsers := len(secrets)
+ badTxnGroups = make(map[uint64]interface{})
+ for tg := range txnGroups {
+ grpSize := len(txnGroups[tg])
+ signedTxGroup := make([]transactions.SignedTxn, 0, grpSize)
+ for t := range txnGroups[tg] {
+ signedTx := txnGroups[tg][t].Sign(secrets[(tg+t)%numUsers])
+ signedTx.Txn = txnGroups[tg][t]
signedTxGroup = append(signedTxGroup, signedTx)
}
// randomly make bad signatures
if rand.Float32() < invalidProb {
tinGrp := rand.Intn(grpSize)
signedTxGroup[tinGrp].Sig[0] = signedTxGroup[tinGrp].Sig[0] + 1
- badTxnGroups[uint64(u)] = struct{}{}
+ badTxnGroups[uint64(tg)] = struct{}{}
}
ret = append(ret, signedTxGroup)
}
return
}
+func signMSigTransactionGroups(txnGroups [][]transactions.Transaction, secrets []*crypto.SignatureSecrets,
+ invalidProb float32, msigSize int) (ret [][]transactions.SignedTxn, badTxnGroups map[uint64]interface{}, err error) {
+ ret = make([][]transactions.SignedTxn, len(txnGroups))
+ numUsers := len(secrets)
+ badTxnGroups = make(map[uint64]interface{})
+ badTxnGroupsMU := deadlock.Mutex{}
+ // process them using multiple threads
+ workers := make(chan interface{}, runtime.NumCPU()-1)
+ wg := sync.WaitGroup{}
+ errChan := make(chan error, 1)
+ for tg := range txnGroups {
+ wg.Add(1)
+ workers <- struct{}{}
+ go func(i int) {
+ defer func() {
+ wg.Done()
+ <-workers
+ }()
+ msigVer := uint8(1)
+ msigTHld := uint8(msigSize)
+ pks := make([]crypto.PublicKey, msigSize)
+ for x := 0; x < msigSize; x++ {
+ pks[x] = secrets[(i+x)%numUsers].SignatureVerifier
+ }
+ multiSigAddr, err := crypto.MultisigAddrGen(msigVer, msigTHld, pks)
+ if err != nil {
+ select {
+ case errChan <- err:
+ return
+ default:
+ return
+ }
+ }
+ grpSize := len(txnGroups[i])
+ signedTxGroup := make([]transactions.SignedTxn, grpSize)
+ sigsForTxn := make([]crypto.MultisigSig, msigTHld)
+
+ for t := range txnGroups[i] {
+ txnGroups[i][t].Sender = basics.Address(multiSigAddr)
+ for s := range sigsForTxn {
+ sig, err := crypto.MultisigSign(txnGroups[i][t], crypto.Digest(multiSigAddr), msigVer, msigTHld, pks, *secrets[(i+s)%numUsers])
+ if err != nil {
+ select {
+ case errChan <- err:
+ return
+ default:
+ return
+ }
+ }
+ sigsForTxn[s] = sig
+ }
+ msig, err := crypto.MultisigAssemble(sigsForTxn)
+ if err != nil {
+ select {
+ case errChan <- err:
+ return
+ default:
+ return
+ }
+ }
+ signedTxGroup[t].Txn = txnGroups[i][t]
+ signedTxGroup[t].Msig = msig
+ }
+ // randomly make bad signatures
+ if rand.Float32() < invalidProb {
+ tinGrp := rand.Intn(grpSize)
+ tinMsig := rand.Intn(len(signedTxGroup[tinGrp].Msig.Subsigs))
+ signedTxGroup[tinGrp].Msig.Subsigs[tinMsig].Sig[0] = signedTxGroup[tinGrp].Msig.Subsigs[tinMsig].Sig[0] + 1
+ badTxnGroupsMU.Lock()
+ badTxnGroups[uint64(i)] = struct{}{}
+ badTxnGroupsMU.Unlock()
+ }
+ ret[i] = signedTxGroup
+ }(tg)
+ }
+ wg.Wait()
+ close(errChan)
+ err = <-errChan
+ return
+}
+
+// makeSignedTxnGroups prepares N transaction groups of random (maxGroupSize) sizes with random
+// invalid signatures of a given probability (invalidProb)
+func makeSignedTxnGroups(N, numUsers, maxGroupSize int, invalidProb float32, addresses []basics.Address,
+ secrets []*crypto.SignatureSecrets) (ret [][]transactions.SignedTxn,
+ badTxnGroups map[uint64]interface{}) {
+
+ txnGroups := getTransactionGroups(N, numUsers, maxGroupSize, addresses)
+ ret, badTxnGroups = signTransactionGroups(txnGroups, secrets, invalidProb)
+ return
+}
+
+const numBenchUsers = 512
+
// BenchmarkHandleTxns sends signed transactions directly to the verifier
func BenchmarkHandleTxns(b *testing.B) {
maxGroupSize := 1
- tpss := []int{6000000, 600000, 60000, 6000}
invalidRates := []float32{0.5, 0.001}
- for _, tps := range tpss {
+ for _, ivr := range invalidRates {
+ b.Run(fmt.Sprintf("inv_%.3f", ivr), func(b *testing.B) {
+ txGen := makeSigGenerator(b, numBenchUsers, maxGroupSize, ivr)
+ runHandlerBenchmarkWithBacklog(b, txGen, 0, false)
+ })
+ }
+}
+
+// BenchmarkHandleTxnGroups sends signed transaction groups directly to the verifier
+func BenchmarkHandleTxnGroups(b *testing.B) {
+ maxGroupSize := proto.MaxTxGroupSize / 2
+ invalidRates := []float32{0.5, 0.001}
+ for _, ivr := range invalidRates {
+ b.Run(fmt.Sprintf("inv_%.3f", ivr), func(b *testing.B) {
+ txGen := makeSigGenerator(b, numBenchUsers, maxGroupSize, ivr)
+ runHandlerBenchmarkWithBacklog(b, txGen, 0, false)
+ })
+ }
+}
+
+// BenchmarkHandleMsigTxns sends signed transactions directly to the verifier
+func BenchmarkHandleMsigTxns(b *testing.B) {
+ maxGroupSize := 1
+ msigSizes := []int{255, 64, 16}
+ invalidRates := []float32{0.5, 0.001}
+ for _, msigSize := range msigSizes {
for _, ivr := range invalidRates {
- b.Run(fmt.Sprintf("tps_%d_inv_%.3f", tps, ivr), func(b *testing.B) {
- runHandlerBenchmarkWithBacklog(maxGroupSize, tps, ivr, b, false)
+ b.Run(fmt.Sprintf("msigSize_%d_inv_%.3f", msigSize, ivr), func(b *testing.B) {
+ txGen := makeMsigGenerator(b, numBenchUsers, maxGroupSize, ivr, msigSize)
+ runHandlerBenchmarkWithBacklog(b, txGen, 0, false)
})
}
}
}
// BenchmarkHandleTxnGroups sends signed transaction groups directly to the verifier
-func BenchmarkHandleTxnGroups(b *testing.B) {
+func BenchmarkHandleMsigTxnGroups(b *testing.B) {
maxGroupSize := proto.MaxTxGroupSize / 2
- tpss := []int{6000000, 600000, 60000, 6000}
+ msigSizes := []int{255, 64, 16}
invalidRates := []float32{0.5, 0.001}
- for _, tps := range tpss {
+ for _, msigSize := range msigSizes {
for _, ivr := range invalidRates {
- b.Run(fmt.Sprintf("tps_%d_inv_%.3f", tps, ivr), func(b *testing.B) {
- runHandlerBenchmarkWithBacklog(maxGroupSize, tps, ivr, b, false)
+ b.Run(fmt.Sprintf("msigSize_%d_inv_%.3f", msigSize, ivr), func(b *testing.B) {
+ txGen := makeMsigGenerator(b, numBenchUsers, maxGroupSize, ivr, msigSize)
+ runHandlerBenchmarkWithBacklog(b, txGen, 0, false)
})
}
}
@@ -1308,7 +1489,8 @@ func BenchmarkHandleBLWTxns(b *testing.B) {
for _, tps := range tpss {
for _, ivr := range invalidRates {
b.Run(fmt.Sprintf("tps_%d_inv_%.3f", tps, ivr), func(b *testing.B) {
- runHandlerBenchmarkWithBacklog(maxGroupSize, tps, ivr, b, true)
+ txGen := makeSigGenerator(b, numBenchUsers, maxGroupSize, ivr)
+ runHandlerBenchmarkWithBacklog(b, txGen, tps, true)
})
}
}
@@ -1323,37 +1505,155 @@ func BenchmarkHandleBLWTxnGroups(b *testing.B) {
for _, tps := range tpss {
for _, ivr := range invalidRates {
b.Run(fmt.Sprintf("tps_%d_inv_%.3f", tps, ivr), func(b *testing.B) {
- runHandlerBenchmarkWithBacklog(maxGroupSize, tps, ivr, b, true)
+ txGen := makeSigGenerator(b, numBenchUsers, maxGroupSize, ivr)
+ runHandlerBenchmarkWithBacklog(b, txGen, tps, true)
})
}
}
}
-// runHandlerBenchmarkWithBacklog benchmarks the number of transactions verfied or dropped
-func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32, b *testing.B, useBacklogWorker bool) {
+// BenchmarkHandleTxnGroups sends signed transaction groups directly to the verifier
+func BenchmarkHandleLsigTxnGroups(b *testing.B) {
+ maxGroupSize := proto.MaxTxGroupSize / 2
+ invalidRates := []float32{0.5, 0.001}
+ for _, ivr := range invalidRates {
+ b.Run(fmt.Sprintf("lsig-inv_%.3f", ivr), func(b *testing.B) {
+ txGen := makeLsigGenerator(b, numBenchUsers, maxGroupSize, ivr)
+ runHandlerBenchmarkWithBacklog(b, txGen, 0, false)
+ })
+ }
+}
+
+type txGenIf interface {
+ makeLedger(tb testing.TB, cfg config.Local, log logging.Logger, namePrefix string) *Ledger
+ createSignedTxGroups(tb testing.TB, txgCount int) ([][]transactions.SignedTxn, map[uint64]interface{})
+}
+
+type txGenerator struct {
+ numUsers int
+ maxGroupSize int
+ invalidRate float32
+
+ addresses []basics.Address
+ secrets []*crypto.SignatureSecrets
+ genesis map[basics.Address]basics.AccountData
+}
+
+type sigGenerator struct {
+ txGenerator
+}
+
+type msigGenerator struct {
+ txGenerator
+ msigSize int
+}
+
+type lsigGenerator struct {
+ txGenerator
+}
+
+func makeTxGenerator(tb testing.TB, numUsers, maxGroupSize int, invalidRate float32) *txGenerator {
+ addresses, secrets, genesis := makeTestGenesisAccounts(tb, numUsers)
+ return &txGenerator{
+ numUsers: numUsers,
+ maxGroupSize: maxGroupSize,
+ invalidRate: invalidRate,
+ addresses: addresses,
+ secrets: secrets,
+ genesis: genesis,
+ }
+}
+
+func (g *txGenerator) makeLedger(tb testing.TB, cfg config.Local, log logging.Logger, namePrefix string) *Ledger {
+ genBal := bookkeeping.MakeGenesisBalances(g.genesis, sinkAddr, poolAddr)
+ ivrString := strings.IndexAny(fmt.Sprintf("%f", g.invalidRate), "1")
+ ledgerName := fmt.Sprintf("%s-in_mem-w_inv=%d", namePrefix, ivrString)
+ ledgerName = strings.Replace(ledgerName, "#", "-", 1)
+ const inMem = true
+ ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
+ require.NoError(tb, err)
+ return ledger
+}
+
+func makeSigGenerator(tb testing.TB, numUsers, maxGroupSize int, invalidRate float32) *sigGenerator {
+ return &sigGenerator{
+ txGenerator: *makeTxGenerator(tb, numUsers, maxGroupSize, invalidRate),
+ }
+}
+
+func (g *sigGenerator) createSignedTxGroups(tb testing.TB, txgCount int) ([][]transactions.SignedTxn, map[uint64]interface{}) {
+ return makeSignedTxnGroups(txgCount, g.numUsers, g.maxGroupSize, g.invalidRate, g.addresses, g.secrets)
+}
+
+func makeMsigGenerator(tb testing.TB, numUsers, maxGroupSize int, invalidRate float32, msigSize int) *msigGenerator {
+ return &msigGenerator{
+ txGenerator: *makeTxGenerator(tb, numUsers, maxGroupSize, invalidRate),
+ msigSize: msigSize,
+ }
+}
+
+func (g *msigGenerator) createSignedTxGroups(tb testing.TB, txgCount int) ([][]transactions.SignedTxn, map[uint64]interface{}) {
+ txnGroups := getTransactionGroups(txgCount, g.numUsers, g.maxGroupSize, g.addresses)
+ signedTransactionGroups, badTxnGroups, err := signMSigTransactionGroups(txnGroups, g.secrets, g.invalidRate, g.msigSize)
+ require.NoError(tb, err)
+ return signedTransactionGroups, badTxnGroups
+}
+
+func makeLsigGenerator(tb testing.TB, numUsers, maxGroupSize int, invalidRate float32) *lsigGenerator {
+ return &lsigGenerator{
+ txGenerator: *makeTxGenerator(tb, numUsers, maxGroupSize, invalidRate),
+ }
+}
+
+func (g *lsigGenerator) createSignedTxGroups(tb testing.TB, txgCount int) ([][]transactions.SignedTxn, map[uint64]interface{}) {
+ stxns := make([][]transactions.SignedTxn, txgCount)
+ badTxnGroups := make(map[uint64]interface{})
+ for i := 0; i < txgCount; i++ {
+ txns := txntest.CreateTinyManTxGroup(tb, true)
+ stxns[i], _ = txntest.CreateTinyManSignedTxGroup(tb, txns)
+
+ // randomly make bad signatures
+ if rand.Float32() < g.invalidRate {
+ tinGrp := rand.Intn(len(txns))
+ if stxns[i][tinGrp].Sig != (crypto.Signature{}) {
+ stxns[i][tinGrp].Sig[0] = stxns[i][tinGrp].Sig[0] + 1
+ } else {
+ stxns[i][tinGrp].Lsig.Logic[0] = 255
+ }
+ badTxnGroups[uint64(i)] = struct{}{}
+ }
+ }
+ return stxns, badTxnGroups
+}
+
+// runHandlerBenchmarkWithBacklog benchmarks the number of transactions verified or dropped
+func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBacklogWorker bool) {
defer func() {
// reset the counters
transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool)
}()
+ // reset the counters
+ transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
+ transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool)
- const numUsers = 100
log := logging.TestingLog(b)
log.SetLevel(logging.Warn)
- addresses, secrets, genesis := makeTestGenesisAccounts(b, numUsers)
- genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr)
- ivrString := strings.IndexAny(fmt.Sprintf("%f", invalidRate), "1")
- ledgerName := fmt.Sprintf("%s-mem-%d-%d", b.Name(), b.N, ivrString)
- ledgerName = strings.Replace(ledgerName, "#", "-", 1)
- const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
- ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
- require.NoError(b, err)
+ ledger := txGen.makeLedger(b, cfg, log, fmt.Sprintf("%s-%d", b.Name(), b.N))
+ defer ledger.Close()
+ handler := makeTestTxHandler(ledger, cfg)
+ defer handler.txVerificationPool.Shutdown()
+
+ // The benchmark generates only 1000 txns, and reuses them. This is done for faster benchmark time and the
+ // ability to have long runs without being limited to the memory. The dedup will block the txns once the same
+ // ones are rotated again. If the purpose is to test dedup, then this can be changed by setting
+ // genTCount = b.N
+ handler.cacheConfig.enableFilteringRawMsg = false
+ handler.cacheConfig.enableFilteringCanonical = false
- l := ledger
- handler := makeTestTxHandler(l, cfg)
// since Start is not called, set the context here
handler.ctx, handler.ctxCancel = context.WithCancel(context.Background())
defer handler.ctxCancel()
@@ -1407,24 +1707,30 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32,
}()
}
- // Prepare the transactions
- signedTransactionGroups, badTxnGroups := makeSignedTxnGroups(b.N, numUsers, maxGroupSize, invalidRate, addresses, secrets)
- var encodedSignedTransactionGroups []network.IncomingMessage
+ // Prepare 1000 transactions
+ genTCount := 1000
+ if b.N < genTCount {
+ genTCount = b.N
+ }
+ signedTransactionGroups, badTxnGroups := txGen.createSignedTxGroups(b, genTCount)
+ var encStxns []network.IncomingMessage
if useBacklogWorker {
- encodedSignedTransactionGroups = make([]network.IncomingMessage, 0, b.N)
+ encStxns = make([]network.IncomingMessage, 0, genTCount)
for _, stxngrp := range signedTransactionGroups {
data := make([]byte, 0)
for _, stxn := range stxngrp {
data = append(data, protocol.Encode(&stxn)...)
}
- encodedSignedTransactionGroups =
- append(encodedSignedTransactionGroups, network.IncomingMessage{Data: data})
+ encStxns = append(encStxns, network.IncomingMessage{Data: data})
}
}
var tt time.Time
// Process the results and make sure they are correct
- rateAdjuster := time.Second / time.Duration(tps)
+ var rateAdjuster time.Duration
+ if tps > 0 {
+ rateAdjuster = time.Second / time.Duration(tps)
+ }
wg.Add(1)
go func() {
defer wg.Done()
@@ -1433,21 +1739,31 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32,
invalidCounter := 0
defer func() {
if groupCounter > 1 {
+ timeSinceStart := time.Since(tt)
droppedBacklog, droppedPool := getDropped()
- b.Logf("Input T(grp)PS: %d (delay %f microsec)", tps, float64(rateAdjuster)/float64(time.Microsecond))
- b.Logf("Verified TPS: %d", uint64(txnCounter)*uint64(time.Second)/uint64(time.Since(tt)))
- b.Logf("Time/txn: %d(microsec)", uint64((time.Since(tt)/time.Microsecond))/txnCounter)
+ if tps > 0 {
+ b.Logf("Input T(grp)PS: %d (delay %f microsec)", tps, float64(rateAdjuster)/float64(time.Microsecond))
+ }
+ b.Logf("Verified TPS: %d T(grp)PS: %d", uint64(txnCounter)*uint64(time.Second)/uint64(timeSinceStart),
+ uint64(groupCounter)*uint64(time.Second)/uint64(timeSinceStart))
+ b.Logf("Time/txn: %d(microsec)", uint64(timeSinceStart/time.Microsecond)/txnCounter)
b.Logf("processed total: [%d groups (%d invalid)] [%d txns]", groupCounter, invalidCounter, txnCounter)
b.Logf("dropped: [%d backlog] [%d pool]\n", droppedBacklog, droppedPool)
}
handler.Stop() // cancel the handler ctx
}()
+ counterMutex := deadlock.Mutex{}
stopChan := make(chan interface{})
+ wg.Add(1)
go func() {
+ defer wg.Done()
for {
time.Sleep(200 * time.Millisecond)
droppedBacklog, droppedPool := getDropped()
- if int(groupCounter+droppedBacklog+droppedPool) == len(signedTransactionGroups) {
+ counterMutex.Lock()
+ counters := groupCounter + droppedBacklog + droppedPool
+ counterMutex.Unlock()
+ if int(counters) == b.N {
// all the benchmark txns processed
close(stopChan)
return
@@ -1459,7 +1775,9 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32,
select {
case wi := <-testResultChan:
txnCounter = txnCounter + uint64(len(wi.unverifiedTxGroup))
+ counterMutex.Lock()
groupCounter++
+ counterMutex.Unlock()
u, _ := binary.Uvarint(wi.unverifiedTxGroup[0].Txn.Note)
_, inBad := badTxnGroups[u]
if wi.verificationErr == nil {
@@ -1468,7 +1786,7 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32,
invalidCounter++
require.True(b, inBad, "Error for good signature")
}
- if groupCounter == uint64(len(signedTransactionGroups)) {
+ if groupCounter == uint64(b.N) {
// all the benchmark txns processed
return
}
@@ -1478,18 +1796,30 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32,
}
}()
+ completed := false
+ c := 0
+ ticker := &time.Ticker{}
+ if rateAdjuster > 0 {
+ ticker = time.NewTicker(rateAdjuster)
+ }
+ defer ticker.Stop()
b.ResetTimer()
tt = time.Now()
- if useBacklogWorker {
- for _, tg := range encodedSignedTransactionGroups {
- handler.processIncomingTxn(tg)
- time.Sleep(rateAdjuster)
- }
- } else {
- for _, stxngrp := range signedTransactionGroups {
- blm := txBacklogMsg{rawmsg: nil, unverifiedTxGroup: stxngrp}
- handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, &blm, nil)
- time.Sleep(rateAdjuster)
+ for !completed {
+ for i := range signedTransactionGroups {
+ if useBacklogWorker {
+ handler.processIncomingTxn(encStxns[i])
+ <-ticker.C
+ } else {
+ stxngrp := signedTransactionGroups[i]
+ blm := txBacklogMsg{rawmsg: nil, unverifiedTxGroup: stxngrp}
+ handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, &blm, nil)
+ }
+ c++
+ if c == b.N {
+ completed = true
+ break
+ }
}
}
wg.Wait()
@@ -1498,7 +1828,22 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32,
func TestTxHandlerPostProcessError(t *testing.T) {
partitiontest.PartitionTest(t)
- t.Parallel()
+
+ defer func() {
+ transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed)
+ transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted)
+ transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee)
+ transactionMessagesTxnSigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigNotWellFormed)
+ transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigNotWellFormed)
+ transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig)
+ }()
+
+ transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed)
+ transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted)
+ transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee)
+ transactionMessagesTxnSigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigNotWellFormed)
+ transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigNotWellFormed)
+ transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig)
collect := func() map[string]float64 {
// collect all specific error reason metrics except TxGroupErrorReasonNotWellFormed,
@@ -1555,7 +1900,11 @@ func TestTxHandlerPostProcessError(t *testing.T) {
func TestTxHandlerPostProcessErrorWithVerify(t *testing.T) {
partitiontest.PartitionTest(t)
- t.Parallel()
+
+ defer func() {
+ transactionMessagesTxnNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnNotWellFormed)
+ }()
+ transactionMessagesTxnNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnNotWellFormed)
txn := transactions.Transaction{}
stxn := transactions.SignedTxn{Txn: txn}
@@ -1582,7 +1931,19 @@ func TestTxHandlerPostProcessErrorWithVerify(t *testing.T) {
// TestTxHandlerRememberReportErrors checks Is and As statements work as expected
func TestTxHandlerRememberReportErrors(t *testing.T) {
partitiontest.PartitionTest(t)
- t.Parallel()
+
+ defer func() {
+ transactionMessageTxPoolRememberCounter = metrics.NewTagCounter(
+ "algod_transaction_messages_txpool_remember_err_{TAG}", "Number of transaction messages not remembered by txpool b/c of {TAG}",
+ txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID,
+ txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric,
+ )
+ }()
+ transactionMessageTxPoolRememberCounter = metrics.NewTagCounter(
+ "algod_transaction_messages_txpool_remember_err_{TAG}", "Number of transaction messages not remembered by txpool b/c of {TAG}",
+ txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID,
+ txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric,
+ )
var txh TxHandler
result := map[string]float64{}
@@ -1643,7 +2004,28 @@ func (t *blockTicker) Wait() {
func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) {
partitiontest.PartitionTest(t)
- t.Parallel()
+ defer func() {
+ transactionMessageTxPoolRememberCounter = metrics.NewTagCounter(
+ "algod_transaction_messages_txpool_remember_err_{TAG}", "Number of transaction messages not remembered by txpool b/c of {TAG}",
+ txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID,
+ txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric,
+ )
+ transactionMessageTxPoolCheckCounter = metrics.NewTagCounter(
+ "algod_transaction_messages_txpool_check_err_{TAG}", "Number of transaction messages that didn't pass check by txpool b/c of {TAG}",
+ txPoolRememberTagTxnNotWellFormed, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID,
+ txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric,
+ )
+ }()
+ transactionMessageTxPoolRememberCounter = metrics.NewTagCounter(
+ "algod_transaction_messages_txpool_remember_err_{TAG}", "Number of transaction messages not remembered by txpool b/c of {TAG}",
+ txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID,
+ txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric,
+ )
+ transactionMessageTxPoolCheckCounter = metrics.NewTagCounter(
+ "algod_transaction_messages_txpool_check_err_{TAG}", "Number of transaction messages that didn't pass check by txpool b/c of {TAG}",
+ txPoolRememberTagTxnNotWellFormed, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID,
+ txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric,
+ )
result := map[string]float64{}
checkResult := map[string]float64{}
@@ -1694,8 +2076,10 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) {
cfg.TxPoolSize = config.MaxTxGroupSize + 1
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
require.NoError(t, err)
+ defer ledger.Close()
handler := makeTestTxHandler(ledger, cfg)
+ defer handler.txVerificationPool.Shutdown()
// since Start is not called, set the context here
handler.ctx, handler.ctxCancel = context.WithCancel(context.Background())
defer handler.ctxCancel()