summaryrefslogtreecommitdiff
path: root/shared/pingpong/pingpong.go
diff options
context:
space:
mode:
Diffstat (limited to 'shared/pingpong/pingpong.go')
-rw-r--r--shared/pingpong/pingpong.go511
1 files changed, 419 insertions, 92 deletions
diff --git a/shared/pingpong/pingpong.go b/shared/pingpong/pingpong.go
index eb774b5d0..6780d13b3 100644
--- a/shared/pingpong/pingpong.go
+++ b/shared/pingpong/pingpong.go
@@ -23,8 +23,11 @@ import (
"math"
"math/rand"
"os"
+ "strings"
"time"
+ "github.com/algorand/go-deadlock"
+
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
v1 "github.com/algorand/go-algorand/daemon/algod/api/spec/v1"
@@ -32,6 +35,7 @@ import (
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/libgoal"
+ "github.com/algorand/go-algorand/protocol"
)
// CreatablesInfo has information about created assets, apps and opting in
@@ -44,9 +48,30 @@ type CreatablesInfo struct {
// pingPongAccount represents the account state for each account in the pingpong application
// This includes the current balance and public/private keys tied to the account
type pingPongAccount struct {
- balance uint64
- sk *crypto.SignatureSecrets
- pk basics.Address
+ deadlock.Mutex
+ sk *crypto.SignatureSecrets
+ pk basics.Address
+
+ balance uint64
+ balanceRound uint64
+}
+
+func (ppa *pingPongAccount) getBalance() uint64 {
+ ppa.Lock()
+ defer ppa.Unlock()
+ return ppa.balance
+}
+
+func (ppa *pingPongAccount) setBalance(balance uint64) {
+ ppa.Lock()
+ defer ppa.Unlock()
+ ppa.balance = balance
+}
+
+func (ppa *pingPongAccount) addBalance(offset int64) {
+ ppa.Lock()
+ defer ppa.Unlock()
+ ppa.balance = uint64(int64(ppa.balance) + offset)
}
// WorkerState object holds a running pingpong worker
@@ -59,6 +84,10 @@ type WorkerState struct {
localNftIndex uint64
nftHolders map[string]int
incTransactionSalt uint64
+
+ muSuggestedParams deadlock.Mutex
+ suggestedParams v1.TransactionParams
+ pendingTxns v1.PendingTransactions
}
// PrepareAccounts to set up accounts and asset accounts required for Ping Pong run
@@ -75,7 +104,7 @@ func (pps *WorkerState) PrepareAccounts(ac libgoal.Client) (err error) {
cfg.MaxAmt = 0
var assetAccounts map[string]*pingPongAccount
- assetAccounts, err = pps.prepareNewAccounts(ac, cfg, pps.accounts)
+ assetAccounts, err = pps.prepareNewAccounts(ac)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "prepare new accounts failed: %v\n", err)
return
@@ -88,14 +117,15 @@ func (pps *WorkerState) PrepareAccounts(ac libgoal.Client) (err error) {
}
if !cfg.Quiet {
- for addr := range pps.accounts {
- fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].balance)
+ for addr := range assetAccounts {
+ if addr != pps.cfg.SrcAccount {
+ fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].getBalance())
+ }
}
}
} else if cfg.NumApp > 0 {
-
var appAccounts map[string]*pingPongAccount
- appAccounts, err = pps.prepareNewAccounts(ac, cfg, pps.accounts)
+ appAccounts, err = pps.prepareNewAccounts(ac)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "prepare new accounts failed: %v\n", err)
return
@@ -105,11 +135,24 @@ func (pps *WorkerState) PrepareAccounts(ac libgoal.Client) (err error) {
return
}
if !cfg.Quiet {
- for addr := range pps.accounts {
- fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].balance)
+ for addr := range appAccounts {
+ if addr != pps.cfg.SrcAccount {
+ fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].getBalance())
+ }
}
}
} else {
+ // If we have more accounts than requested, pick the top N (not including src)
+ if len(pps.accounts) > int(cfg.NumPartAccounts+1) {
+ fmt.Printf("Finding the richest %d accounts to use for transacting\n", cfg.NumPartAccounts)
+ pps.accounts = takeTopAccounts(pps.accounts, cfg.NumPartAccounts, cfg.SrcAccount)
+ } else {
+ // Not enough accounts yet (or just enough). Create more if needed
+ fmt.Printf("Not enough accounts - creating %d more\n", int(cfg.NumPartAccounts+1)-len(pps.accounts))
+ generateAccounts(pps.accounts, cfg.NumPartAccounts)
+ }
+ go pps.roundMonitor(ac)
+
err = pps.fundAccounts(pps.accounts, ac, cfg)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "fund accounts failed %v\n", err)
@@ -121,21 +164,18 @@ func (pps *WorkerState) PrepareAccounts(ac libgoal.Client) (err error) {
return
}
-func (pps *WorkerState) prepareNewAccounts(client libgoal.Client, cfg PpConfig, accounts map[string]*pingPongAccount) (newAccounts map[string]*pingPongAccount, err error) {
- // remove existing accounts except for src account
- for k := range accounts {
- if k != cfg.SrcAccount {
- delete(accounts, k)
- }
- }
+func (pps *WorkerState) prepareNewAccounts(client libgoal.Client) (newAccounts map[string]*pingPongAccount, err error) {
// create new accounts for testing
newAccounts = make(map[string]*pingPongAccount)
- newAccounts = generateAccounts(newAccounts, cfg.NumPartAccounts-1)
-
- for k := range newAccounts {
- accounts[k] = newAccounts[k]
+ generateAccounts(newAccounts, pps.cfg.NumPartAccounts)
+ // copy the source account, as needed.
+ if srcAcct, has := pps.accounts[pps.cfg.SrcAccount]; has {
+ newAccounts[pps.cfg.SrcAccount] = srcAcct
}
- err = pps.fundAccounts(accounts, client, cfg)
+ pps.accounts = newAccounts
+ go pps.roundMonitor(client)
+
+ err = pps.fundAccounts(newAccounts, client, pps.cfg)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "fund accounts failed %v\n", err)
return
@@ -145,7 +185,7 @@ func (pps *WorkerState) prepareNewAccounts(client libgoal.Client, cfg PpConfig,
}
// determine the min balance per participant account
-func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBalance uint64, err error) {
+func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (fundingRequiredBalance uint64, runningRequiredBalance uint64, err error) {
proto, err := getProto(client)
if err != nil {
return
@@ -153,11 +193,6 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBala
minActiveAccountBalance := proto.MinBalance
- if cfg.NumApp > 0 {
- requiredBalance = (cfg.MinAccountFunds + (cfg.MaxAmt+cfg.MaxFee)*10) * 2
- fmt.Printf("required min balance for app accounts: %d\n", requiredBalance)
- return
- }
var fee uint64
if cfg.MaxFee != 0 {
fee = cfg.MaxFee
@@ -167,8 +202,22 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBala
if err != nil {
return
}
+ fee *= uint64(cfg.GroupSize)
+ }
+
+ if cfg.NumApp > 0 {
+ amount := uint64(0)
+
+ runningRequiredBalance = (amount + fee) * 10 * 2
+ setupCost := uint64(proto.MaxTxGroupSize) * (uint64(proto.AppFlatParamsMinBalance*2) + fee)
+ // todo: add the cfg.NumAppOptIn to the setup cost.
+ fundingRequiredBalance = proto.MinBalance + cfg.MinAccountFunds + (amount+fee)*10*2*cfg.TxnPerSec*uint64(math.Ceil(cfg.RefreshTime.Seconds())) + setupCost
+ fmt.Printf("required min balance for app accounts: %d\n", fundingRequiredBalance)
+ return
}
- requiredBalance = minActiveAccountBalance
+
+ fundingRequiredBalance = minActiveAccountBalance
+ runningRequiredBalance = minActiveAccountBalance
// add cost of assets
if cfg.NumAsset > 0 {
@@ -176,7 +225,8 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBala
(fee)*uint64(cfg.NumAsset) + // asset creations
(fee)*uint64(cfg.NumAsset)*uint64(cfg.NumPartAccounts) + // asset opt-ins
(fee)*uint64(cfg.NumAsset)*uint64(cfg.NumPartAccounts) // asset distributions
- requiredBalance += assetCost
+ fundingRequiredBalance += assetCost
+ runningRequiredBalance += assetCost
}
if cfg.NumApp > 0 {
creationCost := uint64(cfg.NumApp) * proto.AppFlatParamsMinBalance * uint64(proto.MaxAppsCreated)
@@ -185,21 +235,25 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (requiredBala
maxLocalSchema := basics.StateSchema{NumUint: proto.MaxLocalSchemaEntries, NumByteSlice: proto.MaxLocalSchemaEntries}
schemaCost := uint64(cfg.NumApp) * (maxGlobalSchema.MinBalance(&proto).Raw*uint64(proto.MaxAppsCreated) +
maxLocalSchema.MinBalance(&proto).Raw*uint64(proto.MaxAppsOptedIn))
- requiredBalance += creationCost + optInCost + schemaCost
+ fundingRequiredBalance += creationCost + optInCost + schemaCost
+ runningRequiredBalance += creationCost + optInCost + schemaCost
}
// add cost of transactions
- requiredBalance += (cfg.MaxAmt + fee) * 2 * cfg.TxnPerSec * uint64(math.Ceil(cfg.RefreshTime.Seconds()))
+ fundingRequiredBalance += (cfg.MaxAmt + fee) * 2 * cfg.TxnPerSec * uint64(math.Ceil(cfg.RefreshTime.Seconds()))
// override computed value if less than configured value
- if cfg.MinAccountFunds > requiredBalance {
- requiredBalance = cfg.MinAccountFunds
+ if cfg.MinAccountFunds > fundingRequiredBalance {
+ fundingRequiredBalance = cfg.MinAccountFunds
}
return
}
func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, client libgoal.Client, cfg PpConfig) error {
- srcFunds, err := client.GetBalance(cfg.SrcAccount)
+ var srcFunds, minFund uint64
+ var err error
+ var tx transactions.Transaction
+ srcFunds, err = client.GetBalance(cfg.SrcAccount)
if err != nil {
return err
@@ -211,23 +265,27 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien
// Fee of 0 will make cause the function to use the suggested one by network
fee := uint64(0)
- minFund, err := computeAccountMinBalance(client, cfg)
+ minFund, _, err = computeAccountMinBalance(client, cfg)
if err != nil {
return err
}
fmt.Printf("adjusting account balance to %d\n", minFund)
- for addr, acct := range accounts {
-
- if addr == pps.cfg.SrcAccount {
- continue
- }
+ for {
+ accountsAdjusted := 0
+ for addr, acct := range accounts {
- if !cfg.Quiet {
- fmt.Printf("adjusting balance of account %v\n", addr)
- }
- if acct.balance < minFund {
- toSend := minFund - acct.balance
+ if addr == pps.cfg.SrcAccount {
+ continue
+ }
+ repeat:
+ if acct.getBalance() >= minFund {
+ continue
+ }
+ if !cfg.Quiet {
+ fmt.Printf("adjusting balance of account %v\n", addr)
+ }
+ toSend := minFund - acct.getBalance()
if srcFunds <= toSend {
return fmt.Errorf("source account %s has insufficient funds %d - needs %d", cfg.SrcAccount, srcFunds, toSend)
}
@@ -235,20 +293,37 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien
if !cfg.Quiet {
fmt.Printf("adjusting balance of account %v by %d\n ", addr, toSend)
}
- _, err := pps.sendPaymentFromSourceAccount(client, addr, fee, toSend)
+
+ tx, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend)
if err != nil {
+ if strings.Contains(err.Error(), "broadcast queue full") {
+ fmt.Printf("failed to send payment, broadcast queue full. sleeping & retrying.\n")
+ waitForNextRoundOrSleep(client, 500*time.Millisecond)
+ goto repeat
+ }
return err
}
- accounts[addr].balance = minFund
+ srcFunds -= tx.Fee.Raw
+ accountsAdjusted++
if !cfg.Quiet {
- fmt.Printf("account balance for key %s is %d\n", addr, accounts[addr].balance)
+ fmt.Printf("account balance for key %s will be %d\n", addr, minFund)
}
totalSent++
throttleTransactionRate(startTime, cfg, totalSent)
}
+ accounts[cfg.SrcAccount].setBalance(srcFunds)
+ // wait until all the above transactions are sent, or that we have no more transactions
+ // in our pending transaction pool coming from the source account.
+ err = waitPendingTransactions(map[string]*pingPongAccount{cfg.SrcAccount: nil}, client)
+ if err != nil {
+ return err
+ }
+ if accountsAdjusted == 0 {
+ break
+ }
}
- return nil
+ return err
}
func (pps *WorkerState) sendPaymentFromSourceAccount(client libgoal.Client, to string, fee, amount uint64) (transactions.Transaction, error) {
@@ -256,13 +331,16 @@ func (pps *WorkerState) sendPaymentFromSourceAccount(client libgoal.Client, to s
note := pps.makeNextUniqueNoteField()
from := pps.cfg.SrcAccount
- tx, err := client.ConstructPayment(from, to, fee, amount, note[:], "", [32]byte{}, 0, 0)
+ var txn transactions.Transaction
+ var stxn transactions.SignedTxn
+ var err error
+ txn, err = client.ConstructPayment(from, to, fee, amount, note, "", [32]byte{}, 0, 0)
if err != nil {
return transactions.Transaction{}, err
}
- stxn, err := signTxn(from, tx, pps.accounts, pps.cfg)
+ stxn, err = signTxn(from, txn, pps.accounts, pps.cfg)
if err != nil {
return transactions.Transaction{}, err
@@ -273,10 +351,45 @@ func (pps *WorkerState) sendPaymentFromSourceAccount(client libgoal.Client, to s
return transactions.Transaction{}, err
}
- return tx, nil
+ return txn, nil
+}
+
+// waitPendingTransactions waits until all the pending transactions coming from the given
+// accounts map have been cleared out of the transaction pool. A prerequesite for this is that
+// there is no other source who might be generating transactions that would come from these account
+// addresses.
+func waitPendingTransactions(accounts map[string]*pingPongAccount, client libgoal.Client) error {
+ for from := range accounts {
+ repeat:
+ pendingTxns, err := client.GetPendingTransactionsByAddress(from, 0)
+ if err != nil {
+ fmt.Printf("failed to check pending transaction pool status : %v\n", err)
+ return err
+ }
+ for _, txn := range pendingTxns.TruncatedTxns.Transactions {
+ if txn.From != from {
+ // we found a transaction where the receiver was the given account. We don't
+ // care about these.
+ continue
+ }
+ // the transaction is still in the transaction pool.
+ // this would wait for the next round, when we will perform the check again.
+ waitForNextRoundOrSleep(client, 500*time.Millisecond)
+ goto repeat
+ }
+ }
+ return nil
}
func (pps *WorkerState) refreshAccounts(accounts map[string]*pingPongAccount, client libgoal.Client, cfg PpConfig) error {
+ // wait until all the pending transactions have been sent; otherwise, getting the balance
+ // is pretty much meaningless.
+ fmt.Printf("waiting for all transactions to be accepted before refreshing accounts.\n")
+ err := waitPendingTransactions(accounts, client)
+ if err != nil {
+ return err
+ }
+
for addr := range accounts {
amount, err := client.GetBalance(addr)
if err != nil {
@@ -284,7 +397,7 @@ func (pps *WorkerState) refreshAccounts(accounts map[string]*pingPongAccount, cl
return err
}
- accounts[addr].balance = amount
+ accounts[addr].setBalance(amount)
}
return pps.fundAccounts(accounts, client, cfg)
@@ -297,11 +410,11 @@ func listSufficientAccounts(accounts map[string]*pingPongAccount, minimumAmount
if key == except {
continue
}
- if value.balance >= minimumAmount {
+ if value.getBalance() >= minimumAmount {
out = append(out, key)
}
}
- rand.Shuffle(len(out), func(i, j int) { t := out[i]; out[i] = out[j]; out[j] = t })
+ rand.Shuffle(len(out), func(i, j int) { out[i], out[j] = out[j], out[i] })
return out
}
@@ -413,7 +526,7 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) {
throttleTransactionRate(startTime, cfg, totalSent)
}
- timeDelta := time.Now().Sub(startTime)
+ timeDelta := time.Since(startTime)
_, _ = fmt.Fprintf(os.Stdout, "Sent %d transactions (%d attempted) in %d seconds\n", totalSucceeded, totalSent, int(math.Round(timeDelta.Seconds())))
if cfg.RestTime > 0 {
_, _ = fmt.Fprintf(os.Stdout, "Pausing %d seconds before sending more transactions\n", int(math.Round(cfg.RestTime.Seconds())))
@@ -427,7 +540,7 @@ func NewPingpong(cfg PpConfig) *WorkerState {
return &WorkerState{cfg: cfg, nftHolders: make(map[string]int)}
}
-func getCreatableID(cfg PpConfig, cinfo CreatablesInfo) (aidx uint64) {
+func randomizeCreatableID(cfg PpConfig, cinfo CreatablesInfo) (aidx uint64) {
if cfg.NumAsset > 0 {
rindex := rand.Intn(len(cinfo.AssetParams))
i := 0
@@ -463,6 +576,7 @@ func (pps *WorkerState) fee() uint64 {
func (pps *WorkerState) makeNftTraffic(client libgoal.Client) (sentCount uint64, err error) {
fee := pps.fee()
+ var srcCost uint64
if (len(pps.nftHolders) == 0) || ((float64(int(pps.cfg.NftAsaAccountInFlight)-len(pps.nftHolders)) / float64(pps.cfg.NftAsaAccountInFlight)) >= rand.Float64()) {
var addr string
@@ -486,14 +600,17 @@ func (pps *WorkerState) makeNftTraffic(client libgoal.Client) (sentCount uint64,
// enough for the per-asa minbalance and more than enough for the txns to create them
toSend := proto.MinBalance * uint64(pps.cfg.NftAsaPerAccount+1) * 2
pps.nftHolders[addr] = 0
- _, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend)
+ var tx transactions.Transaction
+ tx, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend)
if err != nil {
return
}
+ srcCost += tx.Fee.Raw + toSend
sentCount++
// we ran one txn above already to fund the new addr,
// we'll run a second txn below
}
+ pps.accounts[pps.cfg.SrcAccount].addBalance(-int64(srcCost))
// pick a random sender from nft holder sub accounts
pick := rand.Intn(len(pps.nftHolders))
pos := 0
@@ -531,8 +648,12 @@ func (pps *WorkerState) makeNftTraffic(client libgoal.Client) (sentCount uint64,
if err != nil {
return
}
- sentCount++
+
_, err = client.BroadcastTransaction(stxn)
+ if err != nil {
+ return
+ }
+ sentCount++
return
}
@@ -545,15 +666,37 @@ func (pps *WorkerState) sendFromTo(
cfg := pps.cfg
amt := cfg.MaxAmt
+ var minAccountRunningBalance uint64
+ _, minAccountRunningBalance, err = computeAccountMinBalance(client, cfg)
+ if err != nil {
+ return 0, 0, err
+ }
+ belowMinBalanceAccounts := make(map[string] /*basics.Address*/ bool)
assetsByCreator := make(map[string][]*v1.AssetParams)
for _, p := range cinfo.AssetParams {
c := p.Creator
- assetsByCreator[c] = append(assetsByCreator[c], &p)
+ ap := &v1.AssetParams{}
+ *ap = p
+ assetsByCreator[c] = append(assetsByCreator[c], ap)
}
- for i, from := range fromList {
+ lastTransactionTime := time.Now()
+ timeCredit := time.Duration(0)
+ for i := 0; i < len(fromList); i = (i + 1) % len(fromList) {
+ from := fromList[i]
+
+ // keep going until the balances of at least 20% of the accounts is too low.
+ if len(belowMinBalanceAccounts)*5 > len(fromList) {
+ fmt.Printf("quitting sendFromTo: too many accounts below threshold")
+ return
+ }
+
+ if belowMinBalanceAccounts[from] {
+ continue
+ }
+
if cfg.RandomizeAmt {
- amt = rand.Uint64()%cfg.MaxAmt + 1
+ amt = ((rand.Uint64() % cfg.MaxAmt) + 1) % cfg.MaxAmt
}
fee := pps.fee()
@@ -563,6 +706,14 @@ func (pps *WorkerState) sendFromTo(
var addr basics.Address
crypto.RandBytes(addr[:])
to = addr.String()
+ } else if len(belowMinBalanceAccounts) > 0 && (crypto.RandUint64()%100 < 50) {
+ // make 50% of the calls attempt to refund low-balanced accounts.
+ // ( if there is any )
+ // pick the first low balance account
+ for acct := range belowMinBalanceAccounts {
+ to = acct
+ break
+ }
}
// Broadcast transaction
@@ -571,11 +722,13 @@ func (pps *WorkerState) sendFromTo(
toBalanceChange := int64(0)
if cfg.NumAsset > 0 {
amt = 1
+ } else if cfg.NumApp > 0 {
+ amt = 0
}
if cfg.GroupSize == 1 {
// generate random assetID or appId if we send asset/app txns
- aidx := getCreatableID(cfg, cinfo)
+ aidx := randomizeCreatableID(cfg, cinfo)
var txn transactions.Transaction
var consErr error
// Construct single txn
@@ -587,8 +740,9 @@ func (pps *WorkerState) sendFromTo(
}
// would we have enough money after taking into account the current updated fees ?
- if accounts[from].balance <= (txn.Fee.Raw + amt + cfg.MinAccountFunds) {
- _, _ = fmt.Fprintf(os.Stdout, "Skipping sending %d : %s -> %s; Current cost too high.\n", amt, from, to)
+ if accounts[from].getBalance() <= (txn.Fee.Raw + amt + minAccountRunningBalance) {
+ _, _ = fmt.Fprintf(os.Stdout, "Skipping sending %d: %s -> %s; Current cost too high(%d <= %d + %d + %d).\n", amt, from, to, accounts[from].getBalance(), txn.Fee.Raw, amt, minAccountRunningBalance)
+ belowMinBalanceAccounts[from] = true
continue
}
@@ -605,9 +759,6 @@ func (pps *WorkerState) sendFromTo(
sentCount++
_, sendErr = client.BroadcastTransaction(stxn)
- if sendErr != nil {
- fmt.Printf("Warning, cannot broadcast txn, %s\n", sendErr)
- }
} else {
// Generate txn group
@@ -631,16 +782,15 @@ func (pps *WorkerState) sendFromTo(
toBalanceChange += int64(amt)
signer = to
} else {
- txn, _, err = pps.constructTxn(to, from, fee, amt, 0, client)
+ txn, signer, err = pps.constructTxn(to, from, fee, amt, 0, client)
toBalanceChange -= int64(txn.Fee.Raw + amt)
fromBalanceChange += int64(amt)
- signer = to
}
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "group tx failed: %v\n", err)
return
}
- if cfg.RandomizeAmt {
+ if cfg.RandomizeAmt && j%2 == 1 {
amt = rand.Uint64()%cfg.MaxAmt + 1
}
if cfg.Rekey {
@@ -661,11 +811,11 @@ func (pps *WorkerState) sendFromTo(
}
// would we have enough money after taking into account the current updated fees ?
- if int64(accounts[from].balance)+fromBalanceChange <= int64(cfg.MinAccountFunds) {
+ if int64(accounts[from].getBalance())+fromBalanceChange <= int64(cfg.MinAccountFunds) {
_, _ = fmt.Fprintf(os.Stdout, "Skipping sending %d : %s -> %s; Current cost too high.\n", amt, from, to)
continue
}
- if int64(accounts[to].balance)+toBalanceChange <= int64(cfg.MinAccountFunds) {
+ if int64(accounts[to].getBalance())+toBalanceChange <= int64(cfg.MinAccountFunds) {
_, _ = fmt.Fprintf(os.Stdout, "Skipping sending back %d : %s -> %s; Current cost too high.\n", amt, to, from)
continue
}
@@ -682,15 +832,15 @@ func (pps *WorkerState) sendFromTo(
}
// Sign each transaction
- var stxGroup []transactions.SignedTxn
+ stxGroup := make([]transactions.SignedTxn, len(txGroup))
+ var signErr error
for j, txn := range txGroup {
txn.Group = gid
- stxn, signErr := signTxn(txSigners[j], txn, pps.accounts, cfg)
+ stxGroup[j], signErr = signTxn(txSigners[j], txn, pps.accounts, cfg)
if signErr != nil {
err = signErr
return
}
- stxGroup = append(stxGroup, stxn)
}
sentCount++
@@ -705,11 +855,33 @@ func (pps *WorkerState) sendFromTo(
}
successCount++
- accounts[from].balance = uint64(fromBalanceChange + int64(accounts[from].balance))
- accounts[to].balance = uint64(toBalanceChange + int64(accounts[to].balance))
+ accounts[from].addBalance(fromBalanceChange)
+ // avoid updating the "to" account.
+
+ // the logic here would sleep for the remaining of time to match the desired cfg.DelayBetweenTxn
if cfg.DelayBetweenTxn > 0 {
time.Sleep(cfg.DelayBetweenTxn)
}
+ if cfg.TxnPerSec > 0 {
+ timeCredit += time.Second / time.Duration(cfg.TxnPerSec)
+
+ now := time.Now()
+ took := now.Sub(lastTransactionTime)
+ timeCredit -= took
+ if timeCredit > 0 {
+ time.Sleep(timeCredit)
+ timeCredit = time.Duration(0)
+ } else if timeCredit < -1000*time.Millisecond {
+ // cap the "time debt" to 1000 ms.
+ timeCredit = -1000 * time.Millisecond
+ }
+ lastTransactionTime = time.Now()
+
+ // since we just slept enough here, we can take it off the counters
+ sentCount--
+ successCount--
+ // fmt.Printf("itration took %v\n", took)
+ }
}
return
}
@@ -723,11 +895,62 @@ func (pps *WorkerState) nftSpamAssetName() string {
}
func (pps *WorkerState) makeNextUniqueNoteField() []byte {
noteField := make([]byte, binary.MaxVarintLen64)
- usedBytes := binary.PutUvarint(noteField[:], pps.incTransactionSalt)
+ usedBytes := binary.PutUvarint(noteField, pps.incTransactionSalt)
pps.incTransactionSalt++
return noteField[:usedBytes]
}
+func (pps *WorkerState) roundMonitor(client libgoal.Client) {
+ var minFund uint64
+ var err error
+ for {
+ minFund, _, err = computeAccountMinBalance(client, pps.cfg)
+ if err == nil {
+ break
+ }
+ }
+ var newBalance uint64
+ for {
+ paramsResp, err := client.SuggestedParams()
+ if err != nil {
+ time.Sleep(5 * time.Millisecond)
+ continue
+ }
+ pendingTxns, err := client.GetPendingTransactions(0)
+ if err != nil {
+ time.Sleep(5 * time.Millisecond)
+ continue
+ }
+ pps.muSuggestedParams.Lock()
+ pps.suggestedParams = paramsResp
+ pps.pendingTxns = pendingTxns
+ pps.muSuggestedParams.Unlock()
+
+ for _, acct := range pps.accounts {
+ acct.Lock()
+ needRefresh := acct.balance < minFund && acct.balanceRound < paramsResp.LastRound
+ acct.Unlock()
+ if needRefresh {
+ newBalance, err = client.GetBalance(acct.pk.String())
+ if err == nil {
+ acct.Lock()
+ acct.balanceRound, acct.balance = paramsResp.LastRound, newBalance
+ acct.Unlock()
+ }
+ }
+ }
+
+ // wait for the next round.
+ waitForNextRoundOrSleep(client, 200*time.Millisecond)
+ }
+}
+
+func (pps *WorkerState) getSuggestedParams() v1.TransactionParams {
+ pps.muSuggestedParams.Lock()
+ defer pps.muSuggestedParams.Unlock()
+ return pps.suggestedParams
+}
+
func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, client libgoal.Client) (txn transactions.Transaction, sender string, err error) {
cfg := pps.cfg
cinfo := pps.cinfo
@@ -755,16 +978,33 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli
if cfg.NumApp > 0 { // Construct app transaction
// select opted-in accounts for Txn.Accounts field
var accounts []string
- if len(cinfo.OptIns[aidx]) > 0 {
- indices := rand.Perm(len(cinfo.OptIns[aidx]))
- limit := 4
+ assetOptIns := cinfo.OptIns[aidx]
+ if len(assetOptIns) > 0 {
+ indices := rand.Perm(len(assetOptIns))
+ limit := 5
if len(indices) < limit {
limit = len(indices)
}
for i := 0; i < limit; i++ {
idx := indices[i]
- accounts = append(accounts, cinfo.OptIns[aidx][idx])
+ accounts = append(accounts, assetOptIns[idx])
+ }
+ if cinfo.AssetParams[aidx].Creator == from {
+ // if the application was created by the "from" account, then we don't need to worry about it being opted-in.
+ } else {
+ fromIsOptedIn := false
+ for i := 0; i < len(assetOptIns); i++ {
+ if assetOptIns[i] == from {
+ fromIsOptedIn = true
+ break
+ }
+ }
+ if !fromIsOptedIn {
+ sender = accounts[0]
+ from = sender
+ }
}
+ accounts = accounts[1:]
}
txn, err = client.MakeUnsignedAppNoOpTx(aidx, nil, accounts, nil, nil)
if err != nil {
@@ -779,11 +1019,17 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli
} else if cfg.NumAsset > 0 { // Construct asset transaction
// select a pair of random opted-in accounts by aidx
// use them as from/to addresses
- if len(cinfo.OptIns[aidx]) > 0 {
- indices := rand.Perm(len(cinfo.OptIns[aidx]))
- from = cinfo.OptIns[aidx][indices[0]]
- to = cinfo.OptIns[aidx][indices[1]]
- sender = from
+ if from != to {
+ if len(cinfo.OptIns[aidx]) > 0 {
+ indices := rand.Perm(len(cinfo.OptIns[aidx]))
+ from = cinfo.OptIns[aidx][indices[0]]
+ to = cinfo.OptIns[aidx][indices[1]]
+ sender = from
+ } else {
+ err = fmt.Errorf("asset %d has not been opted in by any account", aidx)
+ _, _ = fmt.Fprintf(os.Stdout, "error constructing transaction - %v\n", err)
+ return
+ }
}
txn, err = client.MakeUnsignedAssetSendTx(aidx, amt, to, "", "")
if err != nil {
@@ -792,12 +1038,12 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli
}
txn.Note = noteField[:]
txn.Lease = lease
- txn, err = client.FillUnsignedTxTemplate(from, 0, 0, cfg.MaxFee, txn)
+ txn, err = client.FillUnsignedTxTemplate(sender, 0, 0, cfg.MaxFee, txn)
if !cfg.Quiet {
- _, _ = fmt.Fprintf(os.Stdout, "Sending %d asset %d: %s -> %s\n", amt, aidx, from, to)
+ _, _ = fmt.Fprintf(os.Stdout, "Sending %d asset %d: %s -> %s\n", amt, aidx, sender, to)
}
} else {
- txn, err = client.ConstructPayment(from, to, fee, amt, noteField[:], "", lease, 0, 0)
+ txn, err = pps.constructPayment(from, to, fee, amt, noteField, "", lease)
if !cfg.Quiet {
_, _ = fmt.Fprintf(os.Stdout, "Sending %d : %s -> %s\n", amt, from, to)
}
@@ -825,6 +1071,87 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli
return
}
+// ConstructPayment builds a payment transaction to be signed
+// If the fee is 0, the function will use the suggested one form the network
+// Although firstValid and lastValid come pre-computed in a normal flow,
+// additional validation is done by computeValidityRounds:
+// if the lastValid is 0, firstValid + maxTxnLifetime will be used
+// if the firstValid is 0, lastRound + 1 will be used
+func (pps *WorkerState) constructPayment(from, to string, fee, amount uint64, note []byte, closeTo string, lease [32]byte) (transactions.Transaction, error) {
+ fromAddr, err := basics.UnmarshalChecksumAddress(from)
+ if err != nil {
+ return transactions.Transaction{}, err
+ }
+
+ var toAddr basics.Address
+ if to != "" {
+ toAddr, err = basics.UnmarshalChecksumAddress(to)
+ if err != nil {
+ return transactions.Transaction{}, err
+ }
+ }
+
+ // Get current round, protocol, genesis ID
+ var params v1.TransactionParams
+ for params.LastRound == 0 {
+ params = pps.getSuggestedParams()
+ }
+
+ cp, ok := config.Consensus[protocol.ConsensusVersion(params.ConsensusVersion)]
+ if !ok {
+ return transactions.Transaction{}, fmt.Errorf("ConstructPayment: unknown consensus protocol %s", params.ConsensusVersion)
+ }
+ fv := params.LastRound + 1
+ lv := fv + cp.MaxTxnLife - 1
+
+ tx := transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: fromAddr,
+ Fee: basics.MicroAlgos{Raw: fee},
+ FirstValid: basics.Round(fv),
+ LastValid: basics.Round(lv),
+ Lease: lease,
+ Note: note,
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: toAddr,
+ Amount: basics.MicroAlgos{Raw: amount},
+ },
+ }
+
+ // If requesting closing, put it in the transaction. The protocol might
+ // not support it, but in that case, better to fail the transaction,
+ // because the user explicitly asked for it, and it's not supported.
+ if closeTo != "" {
+ closeToAddr, err := basics.UnmarshalChecksumAddress(closeTo)
+ if err != nil {
+ return transactions.Transaction{}, err
+ }
+
+ tx.PaymentTxnFields.CloseRemainderTo = closeToAddr
+ }
+
+ tx.Header.GenesisID = params.GenesisID
+
+ // Check if the protocol supports genesis hash
+ if cp.SupportGenesisHash {
+ copy(tx.Header.GenesisHash[:], params.GenesisHash)
+ }
+
+ // Default to the suggested fee, if the caller didn't supply it
+ // Fee is tricky, should taken care last. We encode the final transaction to get the size post signing and encoding
+ // Then, we multiply it by the suggested fee per byte.
+ if fee == 0 {
+ tx.Fee = basics.MulAIntSaturate(basics.MicroAlgos{Raw: params.Fee}, tx.EstimateEncodedSize())
+ }
+ if tx.Fee.Raw < cp.MinTxnFee {
+ tx.Fee.Raw = cp.MinTxnFee
+ }
+
+ return tx, nil
+}
+
func signTxn(signer string, txn transactions.Transaction, accounts map[string]*pingPongAccount, cfg PpConfig) (stxn transactions.SignedTxn, err error) {
var psig crypto.Signature