diff options
Diffstat (limited to 'shared/pingpong/pingpong.go')
-rw-r--r-- | shared/pingpong/pingpong.go | 511 |
1 files changed, 419 insertions, 92 deletions
diff --git a/shared/pingpong/pingpong.go b/shared/pingpong/pingpong.go index eb774b5d0..6780d13b3 100644 --- a/shared/pingpong/pingpong.go +++ b/shared/pingpong/pingpong.go @@ -23,8 +23,11 @@ import ( "math" "math/rand" "os" + "strings" "time" + "github.com/algorand/go-deadlock" + "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" v1 "github.com/algorand/go-algorand/daemon/algod/api/spec/v1" @@ -32,6 +35,7 @@ import ( "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/data/transactions/logic" "github.com/algorand/go-algorand/libgoal" + "github.com/algorand/go-algorand/protocol" ) // CreatablesInfo has information about created assets, apps and opting in @@ -44,9 +48,30 @@ type CreatablesInfo struct { // pingPongAccount represents the account state for each account in the pingpong application // This includes the current balance and public/private keys tied to the account type pingPongAccount struct { - balance uint64 - sk *crypto.SignatureSecrets - pk basics.Address + deadlock.Mutex + sk *crypto.SignatureSecrets + pk basics.Address + + balance uint64 + balanceRound uint64 +} + +func (ppa *pingPongAccount) getBalance() uint64 { + ppa.Lock() + defer ppa.Unlock() + return ppa.balance +} + +func (ppa *pingPongAccount) setBalance(balance uint64) { + ppa.Lock() + defer ppa.Unlock() + ppa.balance = balance +} + +func (ppa *pingPongAccount) addBalance(offset int64) { + ppa.Lock() + defer ppa.Unlock() + ppa.balance = uint64(int64(ppa.balance) + offset) } // WorkerState object holds a running pingpong worker @@ -59,6 +84,10 @@ type WorkerState struct { localNftIndex uint64 nftHolders map[string]int incTransactionSalt uint64 + + muSuggestedParams deadlock.Mutex + suggestedParams v1.TransactionParams + pendingTxns v1.PendingTransactions } // PrepareAccounts to set up accounts and asset accounts required for Ping Pong run @@ -75,7 +104,7 @@ func (pps *WorkerState) PrepareAccounts(ac libgoal.Client) (err error) { cfg.MaxAmt = 0 var assetAccounts map[string]*pingPongAccount - assetAccounts, err = pps.prepareNewAccounts(ac, cfg, pps.accounts) + assetAccounts, err = pps.prepareNewAccounts(ac) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "prepare new accounts failed: %v\n", err) return @@ -88,14 +117,15 @@ func (pps *WorkerState) PrepareAccounts(ac libgoal.Client) (err error) { } if !cfg.Quiet { - for addr := range pps.accounts { - fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].balance) + for addr := range assetAccounts { + if addr != pps.cfg.SrcAccount { + fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].getBalance()) + } } } } else if cfg.NumApp > 0 { - var appAccounts map[string]*pingPongAccount - appAccounts, err = pps.prepareNewAccounts(ac, cfg, pps.accounts) + appAccounts, err = pps.prepareNewAccounts(ac) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "prepare new accounts failed: %v\n", err) return @@ -105,11 +135,24 @@ func (pps *WorkerState) PrepareAccounts(ac libgoal.Client) (err error) { return } if !cfg.Quiet { - for addr := range pps.accounts { - fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].balance) + for addr := range appAccounts { + if addr != pps.cfg.SrcAccount { + fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].getBalance()) + } } } } else { + // If we have more accounts than requested, pick the top N (not including src) + if len(pps.accounts) > int(cfg.NumPartAccounts+1) { + fmt.Printf("Finding the richest %d accounts to use for transacting\n", cfg.NumPartAccounts) + pps.accounts = takeTopAccounts(pps.accounts, cfg.NumPartAccounts, cfg.SrcAccount) + } else { + // Not enough accounts yet (or just enough). Create more if needed + fmt.Printf("Not enough accounts - creating %d more\n", int(cfg.NumPartAccounts+1)-len(pps.accounts)) + generateAccounts(pps.accounts, cfg.NumPartAccounts) + } + go pps.roundMonitor(ac) + err = pps.fundAccounts(pps.accounts, ac, cfg) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "fund accounts failed %v\n", err) @@ -121,21 +164,18 @@ func (pps *WorkerState) PrepareAccounts(ac libgoal.Client) (err error) { return } -func (pps *WorkerState) prepareNewAccounts(client libgoal.Client, cfg PpConfig, accounts map[string]*pingPongAccount) (newAccounts map[string]*pingPongAccount, err error) { - // remove existing accounts except for src account - for k := range accounts { - if k != cfg.SrcAccount { - delete(accounts, k) - } - } +func (pps *WorkerState) prepareNewAccounts(client libgoal.Client) (newAccounts map[string]*pingPongAccount, err error) { // create new accounts for testing newAccounts = make(map[string]*pingPongAccount) - newAccounts = generateAccounts(newAccounts, cfg.NumPartAccounts-1) - - for k := range newAccounts { - accounts[k] = newAccounts[k] + generateAccounts(newAccounts, pps.cfg.NumPartAccounts) + // copy the source account, as needed. + if srcAcct, has := pps.accounts[pps.cfg.SrcAccount]; has { + newAccounts[pps.cfg.SrcAccount] = srcAcct } - err = pps.fundAccounts(accounts, client, cfg) + pps.accounts = newAccounts + go pps.roundMonitor(client) + + err = pps.fundAccounts(newAccounts, client, pps.cfg) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "fund accounts failed %v\n", err) return @@ -145,7 +185,7 @@ func (pps *WorkerState) prepareNewAccounts(client libgoal.Client, cfg PpConfig, } // determine the min balance per participant account -func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBalance uint64, err error) { +func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (fundingRequiredBalance uint64, runningRequiredBalance uint64, err error) { proto, err := getProto(client) if err != nil { return @@ -153,11 +193,6 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBala minActiveAccountBalance := proto.MinBalance - if cfg.NumApp > 0 { - requiredBalance = (cfg.MinAccountFunds + (cfg.MaxAmt+cfg.MaxFee)*10) * 2 - fmt.Printf("required min balance for app accounts: %d\n", requiredBalance) - return - } var fee uint64 if cfg.MaxFee != 0 { fee = cfg.MaxFee @@ -167,8 +202,22 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBala if err != nil { return } + fee *= uint64(cfg.GroupSize) + } + + if cfg.NumApp > 0 { + amount := uint64(0) + + runningRequiredBalance = (amount + fee) * 10 * 2 + setupCost := uint64(proto.MaxTxGroupSize) * (uint64(proto.AppFlatParamsMinBalance*2) + fee) + // todo: add the cfg.NumAppOptIn to the setup cost. + fundingRequiredBalance = proto.MinBalance + cfg.MinAccountFunds + (amount+fee)*10*2*cfg.TxnPerSec*uint64(math.Ceil(cfg.RefreshTime.Seconds())) + setupCost + fmt.Printf("required min balance for app accounts: %d\n", fundingRequiredBalance) + return } - requiredBalance = minActiveAccountBalance + + fundingRequiredBalance = minActiveAccountBalance + runningRequiredBalance = minActiveAccountBalance // add cost of assets if cfg.NumAsset > 0 { @@ -176,7 +225,8 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBala (fee)*uint64(cfg.NumAsset) + // asset creations (fee)*uint64(cfg.NumAsset)*uint64(cfg.NumPartAccounts) + // asset opt-ins (fee)*uint64(cfg.NumAsset)*uint64(cfg.NumPartAccounts) // asset distributions - requiredBalance += assetCost + fundingRequiredBalance += assetCost + runningRequiredBalance += assetCost } if cfg.NumApp > 0 { creationCost := uint64(cfg.NumApp) * proto.AppFlatParamsMinBalance * uint64(proto.MaxAppsCreated) @@ -185,21 +235,25 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBala maxLocalSchema := basics.StateSchema{NumUint: proto.MaxLocalSchemaEntries, NumByteSlice: proto.MaxLocalSchemaEntries} schemaCost := uint64(cfg.NumApp) * (maxGlobalSchema.MinBalance(&proto).Raw*uint64(proto.MaxAppsCreated) + maxLocalSchema.MinBalance(&proto).Raw*uint64(proto.MaxAppsOptedIn)) - requiredBalance += creationCost + optInCost + schemaCost + fundingRequiredBalance += creationCost + optInCost + schemaCost + runningRequiredBalance += creationCost + optInCost + schemaCost } // add cost of transactions - requiredBalance += (cfg.MaxAmt + fee) * 2 * cfg.TxnPerSec * uint64(math.Ceil(cfg.RefreshTime.Seconds())) + fundingRequiredBalance += (cfg.MaxAmt + fee) * 2 * cfg.TxnPerSec * uint64(math.Ceil(cfg.RefreshTime.Seconds())) // override computed value if less than configured value - if cfg.MinAccountFunds > requiredBalance { - requiredBalance = cfg.MinAccountFunds + if cfg.MinAccountFunds > fundingRequiredBalance { + fundingRequiredBalance = cfg.MinAccountFunds } return } func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, client libgoal.Client, cfg PpConfig) error { - srcFunds, err := client.GetBalance(cfg.SrcAccount) + var srcFunds, minFund uint64 + var err error + var tx transactions.Transaction + srcFunds, err = client.GetBalance(cfg.SrcAccount) if err != nil { return err @@ -211,23 +265,27 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien // Fee of 0 will make cause the function to use the suggested one by network fee := uint64(0) - minFund, err := computeAccountMinBalance(client, cfg) + minFund, _, err = computeAccountMinBalance(client, cfg) if err != nil { return err } fmt.Printf("adjusting account balance to %d\n", minFund) - for addr, acct := range accounts { - - if addr == pps.cfg.SrcAccount { - continue - } + for { + accountsAdjusted := 0 + for addr, acct := range accounts { - if !cfg.Quiet { - fmt.Printf("adjusting balance of account %v\n", addr) - } - if acct.balance < minFund { - toSend := minFund - acct.balance + if addr == pps.cfg.SrcAccount { + continue + } + repeat: + if acct.getBalance() >= minFund { + continue + } + if !cfg.Quiet { + fmt.Printf("adjusting balance of account %v\n", addr) + } + toSend := minFund - acct.getBalance() if srcFunds <= toSend { return fmt.Errorf("source account %s has insufficient funds %d - needs %d", cfg.SrcAccount, srcFunds, toSend) } @@ -235,20 +293,37 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien if !cfg.Quiet { fmt.Printf("adjusting balance of account %v by %d\n ", addr, toSend) } - _, err := pps.sendPaymentFromSourceAccount(client, addr, fee, toSend) + + tx, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend) if err != nil { + if strings.Contains(err.Error(), "broadcast queue full") { + fmt.Printf("failed to send payment, broadcast queue full. sleeping & retrying.\n") + waitForNextRoundOrSleep(client, 500*time.Millisecond) + goto repeat + } return err } - accounts[addr].balance = minFund + srcFunds -= tx.Fee.Raw + accountsAdjusted++ if !cfg.Quiet { - fmt.Printf("account balance for key %s is %d\n", addr, accounts[addr].balance) + fmt.Printf("account balance for key %s will be %d\n", addr, minFund) } totalSent++ throttleTransactionRate(startTime, cfg, totalSent) } + accounts[cfg.SrcAccount].setBalance(srcFunds) + // wait until all the above transactions are sent, or that we have no more transactions + // in our pending transaction pool coming from the source account. + err = waitPendingTransactions(map[string]*pingPongAccount{cfg.SrcAccount: nil}, client) + if err != nil { + return err + } + if accountsAdjusted == 0 { + break + } } - return nil + return err } func (pps *WorkerState) sendPaymentFromSourceAccount(client libgoal.Client, to string, fee, amount uint64) (transactions.Transaction, error) { @@ -256,13 +331,16 @@ func (pps *WorkerState) sendPaymentFromSourceAccount(client libgoal.Client, to s note := pps.makeNextUniqueNoteField() from := pps.cfg.SrcAccount - tx, err := client.ConstructPayment(from, to, fee, amount, note[:], "", [32]byte{}, 0, 0) + var txn transactions.Transaction + var stxn transactions.SignedTxn + var err error + txn, err = client.ConstructPayment(from, to, fee, amount, note, "", [32]byte{}, 0, 0) if err != nil { return transactions.Transaction{}, err } - stxn, err := signTxn(from, tx, pps.accounts, pps.cfg) + stxn, err = signTxn(from, txn, pps.accounts, pps.cfg) if err != nil { return transactions.Transaction{}, err @@ -273,10 +351,45 @@ func (pps *WorkerState) sendPaymentFromSourceAccount(client libgoal.Client, to s return transactions.Transaction{}, err } - return tx, nil + return txn, nil +} + +// waitPendingTransactions waits until all the pending transactions coming from the given +// accounts map have been cleared out of the transaction pool. A prerequesite for this is that +// there is no other source who might be generating transactions that would come from these account +// addresses. +func waitPendingTransactions(accounts map[string]*pingPongAccount, client libgoal.Client) error { + for from := range accounts { + repeat: + pendingTxns, err := client.GetPendingTransactionsByAddress(from, 0) + if err != nil { + fmt.Printf("failed to check pending transaction pool status : %v\n", err) + return err + } + for _, txn := range pendingTxns.TruncatedTxns.Transactions { + if txn.From != from { + // we found a transaction where the receiver was the given account. We don't + // care about these. + continue + } + // the transaction is still in the transaction pool. + // this would wait for the next round, when we will perform the check again. + waitForNextRoundOrSleep(client, 500*time.Millisecond) + goto repeat + } + } + return nil } func (pps *WorkerState) refreshAccounts(accounts map[string]*pingPongAccount, client libgoal.Client, cfg PpConfig) error { + // wait until all the pending transactions have been sent; otherwise, getting the balance + // is pretty much meaningless. + fmt.Printf("waiting for all transactions to be accepted before refreshing accounts.\n") + err := waitPendingTransactions(accounts, client) + if err != nil { + return err + } + for addr := range accounts { amount, err := client.GetBalance(addr) if err != nil { @@ -284,7 +397,7 @@ func (pps *WorkerState) refreshAccounts(accounts map[string]*pingPongAccount, cl return err } - accounts[addr].balance = amount + accounts[addr].setBalance(amount) } return pps.fundAccounts(accounts, client, cfg) @@ -297,11 +410,11 @@ func listSufficientAccounts(accounts map[string]*pingPongAccount, minimumAmount if key == except { continue } - if value.balance >= minimumAmount { + if value.getBalance() >= minimumAmount { out = append(out, key) } } - rand.Shuffle(len(out), func(i, j int) { t := out[i]; out[i] = out[j]; out[j] = t }) + rand.Shuffle(len(out), func(i, j int) { out[i], out[j] = out[j], out[i] }) return out } @@ -413,7 +526,7 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { throttleTransactionRate(startTime, cfg, totalSent) } - timeDelta := time.Now().Sub(startTime) + timeDelta := time.Since(startTime) _, _ = fmt.Fprintf(os.Stdout, "Sent %d transactions (%d attempted) in %d seconds\n", totalSucceeded, totalSent, int(math.Round(timeDelta.Seconds()))) if cfg.RestTime > 0 { _, _ = fmt.Fprintf(os.Stdout, "Pausing %d seconds before sending more transactions\n", int(math.Round(cfg.RestTime.Seconds()))) @@ -427,7 +540,7 @@ func NewPingpong(cfg PpConfig) *WorkerState { return &WorkerState{cfg: cfg, nftHolders: make(map[string]int)} } -func getCreatableID(cfg PpConfig, cinfo CreatablesInfo) (aidx uint64) { +func randomizeCreatableID(cfg PpConfig, cinfo CreatablesInfo) (aidx uint64) { if cfg.NumAsset > 0 { rindex := rand.Intn(len(cinfo.AssetParams)) i := 0 @@ -463,6 +576,7 @@ func (pps *WorkerState) fee() uint64 { func (pps *WorkerState) makeNftTraffic(client libgoal.Client) (sentCount uint64, err error) { fee := pps.fee() + var srcCost uint64 if (len(pps.nftHolders) == 0) || ((float64(int(pps.cfg.NftAsaAccountInFlight)-len(pps.nftHolders)) / float64(pps.cfg.NftAsaAccountInFlight)) >= rand.Float64()) { var addr string @@ -486,14 +600,17 @@ func (pps *WorkerState) makeNftTraffic(client libgoal.Client) (sentCount uint64, // enough for the per-asa minbalance and more than enough for the txns to create them toSend := proto.MinBalance * uint64(pps.cfg.NftAsaPerAccount+1) * 2 pps.nftHolders[addr] = 0 - _, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend) + var tx transactions.Transaction + tx, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend) if err != nil { return } + srcCost += tx.Fee.Raw + toSend sentCount++ // we ran one txn above already to fund the new addr, // we'll run a second txn below } + pps.accounts[pps.cfg.SrcAccount].addBalance(-int64(srcCost)) // pick a random sender from nft holder sub accounts pick := rand.Intn(len(pps.nftHolders)) pos := 0 @@ -531,8 +648,12 @@ func (pps *WorkerState) makeNftTraffic(client libgoal.Client) (sentCount uint64, if err != nil { return } - sentCount++ + _, err = client.BroadcastTransaction(stxn) + if err != nil { + return + } + sentCount++ return } @@ -545,15 +666,37 @@ func (pps *WorkerState) sendFromTo( cfg := pps.cfg amt := cfg.MaxAmt + var minAccountRunningBalance uint64 + _, minAccountRunningBalance, err = computeAccountMinBalance(client, cfg) + if err != nil { + return 0, 0, err + } + belowMinBalanceAccounts := make(map[string] /*basics.Address*/ bool) assetsByCreator := make(map[string][]*v1.AssetParams) for _, p := range cinfo.AssetParams { c := p.Creator - assetsByCreator[c] = append(assetsByCreator[c], &p) + ap := &v1.AssetParams{} + *ap = p + assetsByCreator[c] = append(assetsByCreator[c], ap) } - for i, from := range fromList { + lastTransactionTime := time.Now() + timeCredit := time.Duration(0) + for i := 0; i < len(fromList); i = (i + 1) % len(fromList) { + from := fromList[i] + + // keep going until the balances of at least 20% of the accounts is too low. + if len(belowMinBalanceAccounts)*5 > len(fromList) { + fmt.Printf("quitting sendFromTo: too many accounts below threshold") + return + } + + if belowMinBalanceAccounts[from] { + continue + } + if cfg.RandomizeAmt { - amt = rand.Uint64()%cfg.MaxAmt + 1 + amt = ((rand.Uint64() % cfg.MaxAmt) + 1) % cfg.MaxAmt } fee := pps.fee() @@ -563,6 +706,14 @@ func (pps *WorkerState) sendFromTo( var addr basics.Address crypto.RandBytes(addr[:]) to = addr.String() + } else if len(belowMinBalanceAccounts) > 0 && (crypto.RandUint64()%100 < 50) { + // make 50% of the calls attempt to refund low-balanced accounts. + // ( if there is any ) + // pick the first low balance account + for acct := range belowMinBalanceAccounts { + to = acct + break + } } // Broadcast transaction @@ -571,11 +722,13 @@ func (pps *WorkerState) sendFromTo( toBalanceChange := int64(0) if cfg.NumAsset > 0 { amt = 1 + } else if cfg.NumApp > 0 { + amt = 0 } if cfg.GroupSize == 1 { // generate random assetID or appId if we send asset/app txns - aidx := getCreatableID(cfg, cinfo) + aidx := randomizeCreatableID(cfg, cinfo) var txn transactions.Transaction var consErr error // Construct single txn @@ -587,8 +740,9 @@ func (pps *WorkerState) sendFromTo( } // would we have enough money after taking into account the current updated fees ? - if accounts[from].balance <= (txn.Fee.Raw + amt + cfg.MinAccountFunds) { - _, _ = fmt.Fprintf(os.Stdout, "Skipping sending %d : %s -> %s; Current cost too high.\n", amt, from, to) + if accounts[from].getBalance() <= (txn.Fee.Raw + amt + minAccountRunningBalance) { + _, _ = fmt.Fprintf(os.Stdout, "Skipping sending %d: %s -> %s; Current cost too high(%d <= %d + %d + %d).\n", amt, from, to, accounts[from].getBalance(), txn.Fee.Raw, amt, minAccountRunningBalance) + belowMinBalanceAccounts[from] = true continue } @@ -605,9 +759,6 @@ func (pps *WorkerState) sendFromTo( sentCount++ _, sendErr = client.BroadcastTransaction(stxn) - if sendErr != nil { - fmt.Printf("Warning, cannot broadcast txn, %s\n", sendErr) - } } else { // Generate txn group @@ -631,16 +782,15 @@ func (pps *WorkerState) sendFromTo( toBalanceChange += int64(amt) signer = to } else { - txn, _, err = pps.constructTxn(to, from, fee, amt, 0, client) + txn, signer, err = pps.constructTxn(to, from, fee, amt, 0, client) toBalanceChange -= int64(txn.Fee.Raw + amt) fromBalanceChange += int64(amt) - signer = to } if err != nil { _, _ = fmt.Fprintf(os.Stderr, "group tx failed: %v\n", err) return } - if cfg.RandomizeAmt { + if cfg.RandomizeAmt && j%2 == 1 { amt = rand.Uint64()%cfg.MaxAmt + 1 } if cfg.Rekey { @@ -661,11 +811,11 @@ func (pps *WorkerState) sendFromTo( } // would we have enough money after taking into account the current updated fees ? - if int64(accounts[from].balance)+fromBalanceChange <= int64(cfg.MinAccountFunds) { + if int64(accounts[from].getBalance())+fromBalanceChange <= int64(cfg.MinAccountFunds) { _, _ = fmt.Fprintf(os.Stdout, "Skipping sending %d : %s -> %s; Current cost too high.\n", amt, from, to) continue } - if int64(accounts[to].balance)+toBalanceChange <= int64(cfg.MinAccountFunds) { + if int64(accounts[to].getBalance())+toBalanceChange <= int64(cfg.MinAccountFunds) { _, _ = fmt.Fprintf(os.Stdout, "Skipping sending back %d : %s -> %s; Current cost too high.\n", amt, to, from) continue } @@ -682,15 +832,15 @@ func (pps *WorkerState) sendFromTo( } // Sign each transaction - var stxGroup []transactions.SignedTxn + stxGroup := make([]transactions.SignedTxn, len(txGroup)) + var signErr error for j, txn := range txGroup { txn.Group = gid - stxn, signErr := signTxn(txSigners[j], txn, pps.accounts, cfg) + stxGroup[j], signErr = signTxn(txSigners[j], txn, pps.accounts, cfg) if signErr != nil { err = signErr return } - stxGroup = append(stxGroup, stxn) } sentCount++ @@ -705,11 +855,33 @@ func (pps *WorkerState) sendFromTo( } successCount++ - accounts[from].balance = uint64(fromBalanceChange + int64(accounts[from].balance)) - accounts[to].balance = uint64(toBalanceChange + int64(accounts[to].balance)) + accounts[from].addBalance(fromBalanceChange) + // avoid updating the "to" account. + + // the logic here would sleep for the remaining of time to match the desired cfg.DelayBetweenTxn if cfg.DelayBetweenTxn > 0 { time.Sleep(cfg.DelayBetweenTxn) } + if cfg.TxnPerSec > 0 { + timeCredit += time.Second / time.Duration(cfg.TxnPerSec) + + now := time.Now() + took := now.Sub(lastTransactionTime) + timeCredit -= took + if timeCredit > 0 { + time.Sleep(timeCredit) + timeCredit = time.Duration(0) + } else if timeCredit < -1000*time.Millisecond { + // cap the "time debt" to 1000 ms. + timeCredit = -1000 * time.Millisecond + } + lastTransactionTime = time.Now() + + // since we just slept enough here, we can take it off the counters + sentCount-- + successCount-- + // fmt.Printf("itration took %v\n", took) + } } return } @@ -723,11 +895,62 @@ func (pps *WorkerState) nftSpamAssetName() string { } func (pps *WorkerState) makeNextUniqueNoteField() []byte { noteField := make([]byte, binary.MaxVarintLen64) - usedBytes := binary.PutUvarint(noteField[:], pps.incTransactionSalt) + usedBytes := binary.PutUvarint(noteField, pps.incTransactionSalt) pps.incTransactionSalt++ return noteField[:usedBytes] } +func (pps *WorkerState) roundMonitor(client libgoal.Client) { + var minFund uint64 + var err error + for { + minFund, _, err = computeAccountMinBalance(client, pps.cfg) + if err == nil { + break + } + } + var newBalance uint64 + for { + paramsResp, err := client.SuggestedParams() + if err != nil { + time.Sleep(5 * time.Millisecond) + continue + } + pendingTxns, err := client.GetPendingTransactions(0) + if err != nil { + time.Sleep(5 * time.Millisecond) + continue + } + pps.muSuggestedParams.Lock() + pps.suggestedParams = paramsResp + pps.pendingTxns = pendingTxns + pps.muSuggestedParams.Unlock() + + for _, acct := range pps.accounts { + acct.Lock() + needRefresh := acct.balance < minFund && acct.balanceRound < paramsResp.LastRound + acct.Unlock() + if needRefresh { + newBalance, err = client.GetBalance(acct.pk.String()) + if err == nil { + acct.Lock() + acct.balanceRound, acct.balance = paramsResp.LastRound, newBalance + acct.Unlock() + } + } + } + + // wait for the next round. + waitForNextRoundOrSleep(client, 200*time.Millisecond) + } +} + +func (pps *WorkerState) getSuggestedParams() v1.TransactionParams { + pps.muSuggestedParams.Lock() + defer pps.muSuggestedParams.Unlock() + return pps.suggestedParams +} + func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, client libgoal.Client) (txn transactions.Transaction, sender string, err error) { cfg := pps.cfg cinfo := pps.cinfo @@ -755,16 +978,33 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli if cfg.NumApp > 0 { // Construct app transaction // select opted-in accounts for Txn.Accounts field var accounts []string - if len(cinfo.OptIns[aidx]) > 0 { - indices := rand.Perm(len(cinfo.OptIns[aidx])) - limit := 4 + assetOptIns := cinfo.OptIns[aidx] + if len(assetOptIns) > 0 { + indices := rand.Perm(len(assetOptIns)) + limit := 5 if len(indices) < limit { limit = len(indices) } for i := 0; i < limit; i++ { idx := indices[i] - accounts = append(accounts, cinfo.OptIns[aidx][idx]) + accounts = append(accounts, assetOptIns[idx]) + } + if cinfo.AssetParams[aidx].Creator == from { + // if the application was created by the "from" account, then we don't need to worry about it being opted-in. + } else { + fromIsOptedIn := false + for i := 0; i < len(assetOptIns); i++ { + if assetOptIns[i] == from { + fromIsOptedIn = true + break + } + } + if !fromIsOptedIn { + sender = accounts[0] + from = sender + } } + accounts = accounts[1:] } txn, err = client.MakeUnsignedAppNoOpTx(aidx, nil, accounts, nil, nil) if err != nil { @@ -779,11 +1019,17 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli } else if cfg.NumAsset > 0 { // Construct asset transaction // select a pair of random opted-in accounts by aidx // use them as from/to addresses - if len(cinfo.OptIns[aidx]) > 0 { - indices := rand.Perm(len(cinfo.OptIns[aidx])) - from = cinfo.OptIns[aidx][indices[0]] - to = cinfo.OptIns[aidx][indices[1]] - sender = from + if from != to { + if len(cinfo.OptIns[aidx]) > 0 { + indices := rand.Perm(len(cinfo.OptIns[aidx])) + from = cinfo.OptIns[aidx][indices[0]] + to = cinfo.OptIns[aidx][indices[1]] + sender = from + } else { + err = fmt.Errorf("asset %d has not been opted in by any account", aidx) + _, _ = fmt.Fprintf(os.Stdout, "error constructing transaction - %v\n", err) + return + } } txn, err = client.MakeUnsignedAssetSendTx(aidx, amt, to, "", "") if err != nil { @@ -792,12 +1038,12 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli } txn.Note = noteField[:] txn.Lease = lease - txn, err = client.FillUnsignedTxTemplate(from, 0, 0, cfg.MaxFee, txn) + txn, err = client.FillUnsignedTxTemplate(sender, 0, 0, cfg.MaxFee, txn) if !cfg.Quiet { - _, _ = fmt.Fprintf(os.Stdout, "Sending %d asset %d: %s -> %s\n", amt, aidx, from, to) + _, _ = fmt.Fprintf(os.Stdout, "Sending %d asset %d: %s -> %s\n", amt, aidx, sender, to) } } else { - txn, err = client.ConstructPayment(from, to, fee, amt, noteField[:], "", lease, 0, 0) + txn, err = pps.constructPayment(from, to, fee, amt, noteField, "", lease) if !cfg.Quiet { _, _ = fmt.Fprintf(os.Stdout, "Sending %d : %s -> %s\n", amt, from, to) } @@ -825,6 +1071,87 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli return } +// ConstructPayment builds a payment transaction to be signed +// If the fee is 0, the function will use the suggested one form the network +// Although firstValid and lastValid come pre-computed in a normal flow, +// additional validation is done by computeValidityRounds: +// if the lastValid is 0, firstValid + maxTxnLifetime will be used +// if the firstValid is 0, lastRound + 1 will be used +func (pps *WorkerState) constructPayment(from, to string, fee, amount uint64, note []byte, closeTo string, lease [32]byte) (transactions.Transaction, error) { + fromAddr, err := basics.UnmarshalChecksumAddress(from) + if err != nil { + return transactions.Transaction{}, err + } + + var toAddr basics.Address + if to != "" { + toAddr, err = basics.UnmarshalChecksumAddress(to) + if err != nil { + return transactions.Transaction{}, err + } + } + + // Get current round, protocol, genesis ID + var params v1.TransactionParams + for params.LastRound == 0 { + params = pps.getSuggestedParams() + } + + cp, ok := config.Consensus[protocol.ConsensusVersion(params.ConsensusVersion)] + if !ok { + return transactions.Transaction{}, fmt.Errorf("ConstructPayment: unknown consensus protocol %s", params.ConsensusVersion) + } + fv := params.LastRound + 1 + lv := fv + cp.MaxTxnLife - 1 + + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: fromAddr, + Fee: basics.MicroAlgos{Raw: fee}, + FirstValid: basics.Round(fv), + LastValid: basics.Round(lv), + Lease: lease, + Note: note, + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: toAddr, + Amount: basics.MicroAlgos{Raw: amount}, + }, + } + + // If requesting closing, put it in the transaction. The protocol might + // not support it, but in that case, better to fail the transaction, + // because the user explicitly asked for it, and it's not supported. + if closeTo != "" { + closeToAddr, err := basics.UnmarshalChecksumAddress(closeTo) + if err != nil { + return transactions.Transaction{}, err + } + + tx.PaymentTxnFields.CloseRemainderTo = closeToAddr + } + + tx.Header.GenesisID = params.GenesisID + + // Check if the protocol supports genesis hash + if cp.SupportGenesisHash { + copy(tx.Header.GenesisHash[:], params.GenesisHash) + } + + // Default to the suggested fee, if the caller didn't supply it + // Fee is tricky, should taken care last. We encode the final transaction to get the size post signing and encoding + // Then, we multiply it by the suggested fee per byte. + if fee == 0 { + tx.Fee = basics.MulAIntSaturate(basics.MicroAlgos{Raw: params.Fee}, tx.EstimateEncodedSize()) + } + if tx.Fee.Raw < cp.MinTxnFee { + tx.Fee.Raw = cp.MinTxnFee + } + + return tx, nil +} + func signTxn(signer string, txn transactions.Transaction, accounts map[string]*pingPongAccount, cfg PpConfig) (stxn transactions.SignedTxn, err error) { var psig crypto.Signature |