diff options
author | Tsachi Herman <tsachi.herman@algorand.com> | 2022-02-24 20:26:46 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-24 20:26:46 -0500 |
commit | 789922d78c3fee4126fac7938b8aeeab74d22f7d (patch) | |
tree | b9f2ab59e347c5ee2df421bd40d8edefbd7a741b | |
parent | 6ecb8aab6a8a7ae71963092cb3eb5eb4342e593a (diff) |
ledger refactoring: implement account data and resources prefetcher (#3666)
## Summary
This PR replaces the existing account preloading during validating with a more transaction-type oriented prefetcher.
The prefetcher examine the transaction type, and preload all the required resources ( account data, assets and applications ).
## Test Plan
Unit test added.
## Benchmarks
The following (existing) benchmarks were executed, and no performance difference was noted :
* BenchmarkBlockEvaluatorDiskFullAppOptIns
* BenchmarkBlockEvaluatorDiskAppOptIns
* BenchmarkBlockEvaluatorDiskNoCrypto
* BenchmarkBlockEvaluatorDiskCrypto
* BenchmarkBlockEvaluatorRAMNoCrypto
* BenchmarkBlockEvaluatorRAMCrypto
The benchmark `BenchmarkBlockEvaluatorDiskAppCalls`, which was designed for applications, shown a notable improvements, dropping the transaction processing time from 42902ns to 33465ns.
-rw-r--r-- | ledger/evalbench_test.go | 6 | ||||
-rw-r--r-- | ledger/internal/eval.go | 243 | ||||
-rw-r--r-- | ledger/internal/evalprefetcher.go | 533 | ||||
-rw-r--r-- | ledger/internal/evalprefetcher_test.go | 691 |
4 files changed, 1275 insertions, 198 deletions
diff --git a/ledger/evalbench_test.go b/ledger/evalbench_test.go index f03a115a5..e80aa9147 100644 --- a/ledger/evalbench_test.go +++ b/ledger/evalbench_test.go @@ -303,6 +303,12 @@ func BenchmarkBlockEvaluatorDiskAppOptIns(b *testing.B) { } func BenchmarkBlockEvaluatorDiskAppCalls(b *testing.B) { + // Go normally starts measuring the benchmark run time with b.N = 1, and then + // adjusts b.N if the runtime is less than a second. In this case, the runtime + // of the test for b.N = 1 is about 0.7 second, which cause the benchmark to be + // executed twice. Running it twice would not be an issue on it's own; however, + // the setup time for this test is 1.5 minutes long. By setting the b.N = 2, we + // set up for success on the first iteration, and preventing a second iteration. if b.N < 2 { b.N = 2 } diff --git a/ledger/internal/eval.go b/ledger/internal/eval.go index 595a12069..cabd03eaa 100644 --- a/ledger/internal/eval.go +++ b/ledger/internal/eval.go @@ -60,11 +60,6 @@ var ErrNotInCowCache = errors.New("can't find object in cow cache") // is considerably slower. const averageEncodedTxnSizeHint = 150 -// asyncAccountLoadingThreadCount controls how many go routines would be used -// to load the account data before the Eval() start processing individual -// transaction group. -const asyncAccountLoadingThreadCount = 4 - // Creatable represent a single creatable object. type creatable struct { cindex basics.CreatableIndex @@ -1543,12 +1538,12 @@ func Eval(ctx context.Context, l LedgerForEvaluator, blk bookkeeping.Block, vali } accountLoadingCtx, accountLoadingCancel := context.WithCancel(ctx) - paysetgroupsCh := loadAccounts(accountLoadingCtx, l, blk.Round()-1, paysetgroups, blk.BlockHeader.FeeSink, blk.ConsensusProtocol()) + preloadedTxnsData := prefetchAccounts(accountLoadingCtx, l, blk.Round()-1, paysetgroups, blk.BlockHeader.FeeSink, blk.ConsensusProtocol()) // ensure that before we exit from this method, the account loading is no longer active. defer func() { accountLoadingCancel() // wait for the paysetgroupsCh to get closed. - for range paysetgroupsCh { + for range preloadedTxnsData { } }() @@ -1566,26 +1561,62 @@ func Eval(ctx context.Context, l LedgerForEvaluator, blk bookkeeping.Block, vali txvalidator.txgroups = paysetgroups txvalidator.done = make(chan error, 1) go txvalidator.run() - } base := eval.state.lookupParent.(*roundCowBase) - transactionGroupLoop: for { select { - case txgroup, ok := <-paysetgroupsCh: + case txgroup, ok := <-preloadedTxnsData: if !ok { break transactionGroupLoop } else if txgroup.err != nil { return ledgercore.StateDelta{}, txgroup.err } - for _, br := range txgroup.balances { - base.accounts[br.Addr] = br.AccountData - // TODO: pull needed resources into cache? + for _, br := range txgroup.accounts { + if _, have := base.accounts[*br.address]; !have { + base.accounts[*br.address] = *br.data + } } - err = eval.TransactionGroup(txgroup.group) + for _, lr := range txgroup.resources { + if lr.address == nil { + // we attempted to look for the creator, and failed. + if lr.creatableType == basics.AssetCreatable { + base.creators[creatable{cindex: lr.creatableIndex, ctype: basics.AssetCreatable}] = foundAddress{exists: false} + } else { + base.creators[creatable{cindex: lr.creatableIndex, ctype: basics.AppCreatable}] = foundAddress{exists: false} + } + continue + } + if lr.creatableType == basics.AssetCreatable { + if lr.resource.AssetHolding != nil { + base.assets[ledgercore.AccountAsset{Address: *lr.address, Asset: basics.AssetIndex(lr.creatableIndex)}] = cachedAssetHolding{value: *lr.resource.AssetHolding, exists: true} + } else { + base.assets[ledgercore.AccountAsset{Address: *lr.address, Asset: basics.AssetIndex(lr.creatableIndex)}] = cachedAssetHolding{exists: false} + } + if lr.resource.AssetParams != nil { + base.assetParams[ledgercore.AccountAsset{Address: *lr.address, Asset: basics.AssetIndex(lr.creatableIndex)}] = cachedAssetParams{value: *lr.resource.AssetParams, exists: true} + base.creators[creatable{cindex: lr.creatableIndex, ctype: basics.AssetCreatable}] = foundAddress{address: *lr.address, exists: true} + } else { + base.assetParams[ledgercore.AccountAsset{Address: *lr.address, Asset: basics.AssetIndex(lr.creatableIndex)}] = cachedAssetParams{exists: false} + + } + } else { + if lr.resource.AppLocalState != nil { + base.appLocalStates[ledgercore.AccountApp{Address: *lr.address, App: basics.AppIndex(lr.creatableIndex)}] = cachedAppLocalState{value: *lr.resource.AppLocalState, exists: true} + } else { + base.appLocalStates[ledgercore.AccountApp{Address: *lr.address, App: basics.AppIndex(lr.creatableIndex)}] = cachedAppLocalState{exists: false} + } + if lr.resource.AppParams != nil { + base.appParams[ledgercore.AccountApp{Address: *lr.address, App: basics.AppIndex(lr.creatableIndex)}] = cachedAppParams{value: *lr.resource.AppParams, exists: true} + base.creators[creatable{cindex: lr.creatableIndex, ctype: basics.AppCreatable}] = foundAddress{address: *lr.address, exists: true} + } else { + base.appParams[ledgercore.AccountApp{Address: *lr.address, App: basics.AppIndex(lr.creatableIndex)}] = cachedAppParams{exists: false} + } + } + } + err = eval.TransactionGroup(txgroup.txnGroup) if err != nil { return ledgercore.StateDelta{}, err } @@ -1623,187 +1654,3 @@ transactionGroupLoop: return eval.state.deltas(), nil } - -// loadedTransactionGroup is a helper struct to allow asynchronous loading of the account data needed by the transaction groups -type loadedTransactionGroup struct { - // group is the transaction group - group []transactions.SignedTxnWithAD - // balances is a list of all the balances that the transaction group refer to and are needed. - balances []ledgercore.NewBalanceRecord - // err indicates whether any of the balances in this structure have failed to load. In case of an error, at least - // one of the entries in the balances would be uninitialized. - err error -} - -// Return the maximum number of addresses referenced in any given transaction. -func maxAddressesInTxn(proto *config.ConsensusParams) int { - return 7 + proto.MaxAppTxnAccounts -} - -// loadAccounts loads the account data for the provided transaction group list. It also loads the feeSink account and add it to the first returned transaction group. -// The order of the transaction groups returned by the channel is identical to the one in the input array. -func loadAccounts(ctx context.Context, l LedgerForEvaluator, rnd basics.Round, groups [][]transactions.SignedTxnWithAD, feeSinkAddr basics.Address, consensusParams config.ConsensusParams) chan loadedTransactionGroup { - outChan := make(chan loadedTransactionGroup, len(groups)) - go func() { - // groupTask helps to organize the account loading for each transaction group. - type groupTask struct { - // balances contains the loaded balances each transaction group have - balances []ledgercore.NewBalanceRecord - // balancesCount is the number of balances that nees to be loaded per transaction group - balancesCount int - // done is a waiting channel for all the account data for the transaction group to be loaded - done chan error - } - // addrTask manage the loading of a single account address. - type addrTask struct { - // account address to fetch - address basics.Address - // a list of transaction group tasks that depends on this address - groups []*groupTask - // a list of indices into the groupTask.balances where the address would be stored - groupIndices []int - } - defer close(outChan) - - accountTasks := make(map[basics.Address]*addrTask) - addressesCh := make(chan *addrTask, len(groups)*consensusParams.MaxTxGroupSize*maxAddressesInTxn(&consensusParams)) - // totalBalances counts the total number of balances over all the transaction groups - totalBalances := 0 - - initAccount := func(addr basics.Address, wg *groupTask) { - if addr.IsZero() { - return - } - if task, have := accountTasks[addr]; !have { - task := &addrTask{ - address: addr, - groups: make([]*groupTask, 1, 4), - groupIndices: make([]int, 1, 4), - } - task.groups[0] = wg - task.groupIndices[0] = wg.balancesCount - - accountTasks[addr] = task - addressesCh <- task - } else { - task.groups = append(task.groups, wg) - task.groupIndices = append(task.groupIndices, wg.balancesCount) - } - wg.balancesCount++ - totalBalances++ - } - // add the fee sink address to the accountTasks/addressesCh so that it will be loaded first. - if len(groups) > 0 { - task := &addrTask{ - address: feeSinkAddr, - } - addressesCh <- task - accountTasks[feeSinkAddr] = task - } - - // iterate over the transaction groups and add all their account addresses to the list - groupsReady := make([]*groupTask, len(groups)) - for i, group := range groups { - task := &groupTask{} - groupsReady[i] = task - for _, stxn := range group { - // If you add new addresses here, also add them in getTxnAddresses(). - initAccount(stxn.Txn.Sender, task) - initAccount(stxn.Txn.Receiver, task) - initAccount(stxn.Txn.CloseRemainderTo, task) - initAccount(stxn.Txn.AssetSender, task) - initAccount(stxn.Txn.AssetReceiver, task) - initAccount(stxn.Txn.AssetCloseTo, task) - initAccount(stxn.Txn.FreezeAccount, task) - for _, xa := range stxn.Txn.Accounts { - initAccount(xa, task) - } - } - } - - // Add fee sink to the first group - if len(groupsReady) > 0 { - initAccount(feeSinkAddr, groupsReady[0]) - } - close(addressesCh) - - // updata all the groups task : - // allocate the correct number of balances, as well as - // enough space on the "done" channel. - allBalances := make([]ledgercore.NewBalanceRecord, totalBalances) - usedBalances := 0 - for _, gr := range groupsReady { - gr.balances = allBalances[usedBalances : usedBalances+gr.balancesCount] - gr.done = make(chan error, gr.balancesCount) - usedBalances += gr.balancesCount - } - - // create few go-routines to load asyncroniously the account data. - for i := 0; i < asyncAccountLoadingThreadCount; i++ { - go func() { - for { - select { - case task, ok := <-addressesCh: - // load the address - if !ok { - // the channel got closed, which mean we're done. - return - } - // lookup the account data directly from the ledger. - acctData, _, err := l.LookupWithoutRewards(rnd, task.address) - br := ledgercore.NewBalanceRecord{ - Addr: task.address, - AccountData: acctData, - } - // if there is no error.. - if err == nil { - // update all the group tasks with the new acquired balance. - for i, wg := range task.groups { - wg.balances[task.groupIndices[i]] = br - // write a nil to indicate that we're loaded one entry. - wg.done <- nil - } - } else { - // there was an error loading that entry. - for _, wg := range task.groups { - // notify the channel of the error. - wg.done <- err - } - } - case <-ctx.Done(): - // if the context was canceled, abort right away. - return - } - - } - }() - } - - // iterate on the transaction groups tasks. This array retains the original order. - for i, wg := range groupsReady { - // Wait to receive wg.balancesCount nil error messages, one for each address referenced in this txn group. - for j := 0; j < wg.balancesCount; j++ { - select { - case err := <-wg.done: - if err != nil { - // if there is an error, report the error to the output channel. - outChan <- loadedTransactionGroup{ - group: groups[i], - err: err, - } - return - } - case <-ctx.Done(): - return - } - } - // if we had no error, write the result to the output channel. - // this write will not block since we preallocated enough space on the channel. - outChan <- loadedTransactionGroup{ - group: groups[i], - balances: wg.balances, - } - } - }() - return outChan -} diff --git a/ledger/internal/evalprefetcher.go b/ledger/internal/evalprefetcher.go new file mode 100644 index 000000000..2d193cb58 --- /dev/null +++ b/ledger/internal/evalprefetcher.go @@ -0,0 +1,533 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package internal + +import ( + "context" + "sync/atomic" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/protocol" +) + +// asyncAccountLoadingThreadCount controls how many go routines would be used +// to load the account data before the Eval() start processing individual +// transaction group. +const asyncAccountLoadingThreadCount = 4 + +type loadedAccountDataEntry struct { + address *basics.Address + data *ledgercore.AccountData +} + +type loadedResourcesEntry struct { + // resource is the loaded resource entry. unless address is nil, resource would always contain a valid ledgercore.AccountResource pointer. + resource *ledgercore.AccountResource + // address might be empty if the resource does not exist. In that case creatableIndex and creatableType would still be valid while resource would be nil. + address *basics.Address + creatableIndex basics.CreatableIndex + creatableType basics.CreatableType +} + +// loadedTransactionGroup is a helper struct to allow asynchronous loading of the account data needed by the transaction groups +type loadedTransactionGroup struct { + // the transaction group + txnGroup []transactions.SignedTxnWithAD + + // accounts is a list of all the accounts balance records that the transaction group refer to and are needed. + accounts []loadedAccountDataEntry + + // the following four are the resources used by the account + resources []loadedResourcesEntry + + // err indicates whether any of the balances in this structure have failed to load. In case of an error, at least + // one of the entries in the balances would be uninitialized. + err error +} + +// accountPrefetcher used to prefetch accounts balances and resources before the evaluator is being called. +type accountPrefetcher struct { + ledger LedgerForEvaluator + rnd basics.Round + groups [][]transactions.SignedTxnWithAD + feeSinkAddr basics.Address + consensusParams config.ConsensusParams + outChan chan loadedTransactionGroup +} + +// prefetchAccounts loads the account data for the provided transaction group list. It also loads the feeSink account and add it to the first returned transaction group. +// The order of the transaction groups returned by the channel is identical to the one in the input array. +func prefetchAccounts(ctx context.Context, l LedgerForEvaluator, rnd basics.Round, groups [][]transactions.SignedTxnWithAD, feeSinkAddr basics.Address, consensusParams config.ConsensusParams) <-chan loadedTransactionGroup { + prefetcher := &accountPrefetcher{ + ledger: l, + rnd: rnd, + groups: groups, + feeSinkAddr: feeSinkAddr, + consensusParams: consensusParams, + outChan: make(chan loadedTransactionGroup, len(groups)), + } + + go prefetcher.prefetch(ctx) + return prefetcher.outChan +} + +// groupTask helps to organize the account loading for each transaction group. +type groupTask struct { + groupTaskIndex int + // balances contains the loaded balances each transaction group have + balances []loadedAccountDataEntry + // balancesCount is the number of balances that nees to be loaded per transaction group + balancesCount int + // resources contains the loaded resources each of the transaction groups have + resources []loadedResourcesEntry + // resourcesCount is the number of resources that nees to be loaded per transaction group + resourcesCount int + // incompleteCount is the number of resources+balances still pending and need to be loaded + // this variable is used by as atomic variable to synchronize the readiness of the group taks. + incompleteCount int64 +} + +// preloaderTask manage the loading of a single element, whether it's a resource or an account address. +type preloaderTask struct { + // account address to fetch + address *basics.Address + // resource id + creatableIndex basics.CreatableIndex + // resource type + creatableType basics.CreatableType + // a list of transaction group tasks that depends on this address or resource + groups []*groupTask + // a list of indices into the groupTask.balances or groupTask.resources where the address would be stored + groupIndices []int +} + +// preloaderTaskQueue is a dynamic linked list of enqueued entries, optimized for non-syncronized insertion and +// syncronized extraction +type preloaderTaskQueue struct { + next *preloaderTaskQueue + used int + entries []*preloaderTask + baseIdx int + maxTxnGroupEntries int +} + +type groupTaskDone struct { + groupIdx int + err error +} + +func allocPreloaderQueue(count int, maxTxnGroupEntries int) preloaderTaskQueue { + return preloaderTaskQueue{ + entries: make([]*preloaderTask, count*2+maxTxnGroupEntries*2), + maxTxnGroupEntries: maxTxnGroupEntries, + } +} + +// enqueue places the queued entry on the queue, returning the latest queue +// ( in case the current "page" ran out of space ) +func (pq *preloaderTaskQueue) enqueue(t *preloaderTask) { + pq.entries[pq.used] = t + pq.used++ +} + +func (pq *preloaderTaskQueue) expand() *preloaderTaskQueue { + if cap(pq.entries)-pq.used < pq.maxTxnGroupEntries { + pq.next = &preloaderTaskQueue{ + entries: make([]*preloaderTask, cap(pq.entries)*2), + used: 0, + baseIdx: pq.baseIdx + pq.used, + maxTxnGroupEntries: pq.maxTxnGroupEntries, + } + return pq.next + } + return pq +} + +func (pq *preloaderTaskQueue) getTaskAtIndex(idx int) (*preloaderTaskQueue, *preloaderTask) { + localIdx := idx - pq.baseIdx + if pq.used > localIdx { + return pq, pq.entries[localIdx] + } + if pq.next != nil { + return pq.next.getTaskAtIndex(idx) + } + return pq, nil +} + +type accountCreatableKey struct { + address basics.Address + cidx basics.CreatableIndex +} + +func loadAccountsAddAccountTask(addr *basics.Address, wt *groupTask, accountTasks map[basics.Address]*preloaderTask, queue *preloaderTaskQueue) { + if addr.IsZero() { + return + } + if task, have := accountTasks[*addr]; !have { + task := &preloaderTask{ + address: addr, + groups: make([]*groupTask, 1, 4), + groupIndices: make([]int, 1, 4), + } + task.groups[0] = wt + task.groupIndices[0] = wt.balancesCount + + accountTasks[*addr] = task + queue.enqueue(task) + } else { + task.groups = append(task.groups, wt) + task.groupIndices = append(task.groupIndices, wt.balancesCount) + } + wt.balancesCount++ +} + +func loadAccountsAddResourceTask(addr *basics.Address, cidx basics.CreatableIndex, ctype basics.CreatableType, wt *groupTask, resourceTasks map[accountCreatableKey]*preloaderTask, queue *preloaderTaskQueue) { + if cidx == 0 { + return + } + key := accountCreatableKey{ + cidx: cidx, + } + if addr != nil { + key.address = *addr + } + if task, have := resourceTasks[key]; !have { + task := &preloaderTask{ + address: addr, + groups: make([]*groupTask, 1, 4), + groupIndices: make([]int, 1, 4), + creatableIndex: cidx, + creatableType: ctype, + } + task.groups[0] = wt + task.groupIndices[0] = wt.resourcesCount + + resourceTasks[key] = task + queue.enqueue(task) + } else { + task.groups = append(task.groups, wt) + task.groupIndices = append(task.groupIndices, wt.resourcesCount) + } + wt.resourcesCount++ +} + +// prefetch would process the input transaction groups by analyzing each of the transaction groups and building +// an execution queue that would allow us to fetch all the dependencies for the input transaction groups in order +// and output these onto a channel. +func (p *accountPrefetcher) prefetch(ctx context.Context) { + defer close(p.outChan) + accountTasks := make(map[basics.Address]*preloaderTask) + resourceTasks := make(map[accountCreatableKey]*preloaderTask) + + var maxTxnGroupEntries int + if p.consensusParams.Application { + // the extra two are for the sender account data, plus the application global state + maxTxnGroupEntries = p.consensusParams.MaxTxGroupSize * (2 + p.consensusParams.MaxAppTxnAccounts + p.consensusParams.MaxAppTxnForeignApps + p.consensusParams.MaxAppTxnForeignAssets) + } else { + // 8 is the number of resources+account used in the AssetTransferTx, which is the largest one. + maxTxnGroupEntries = p.consensusParams.MaxTxGroupSize * 8 + } + + tasksQueue := allocPreloaderQueue(len(p.groups), maxTxnGroupEntries) + + // totalBalances counts the total number of balances over all the transaction groups + totalBalances := 0 + totalResources := 0 + + groupsReady := make([]groupTask, len(p.groups)) + + // Add fee sink to the first group + if len(p.groups) > 0 { + // the feeSinkAddr is known to be non-empty + feeSinkPreloader := &preloaderTask{ + address: &p.feeSinkAddr, + groups: []*groupTask{&groupsReady[0]}, + groupIndices: []int{0}, + } + groupsReady[0].balancesCount = 1 + accountTasks[p.feeSinkAddr] = feeSinkPreloader + tasksQueue.enqueue(feeSinkPreloader) + } + + // iterate over the transaction groups and add all their account addresses to the list + queue := &tasksQueue + for i := range p.groups { + task := &groupsReady[i] + for j := range p.groups[i] { + stxn := &p.groups[i][j] + switch stxn.Txn.Type { + case protocol.PaymentTx: + loadAccountsAddAccountTask(&stxn.Txn.Receiver, task, accountTasks, queue) + loadAccountsAddAccountTask(&stxn.Txn.CloseRemainderTo, task, accountTasks, queue) + case protocol.AssetConfigTx: + loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.ConfigAsset), basics.AssetCreatable, task, resourceTasks, queue) + case protocol.AssetTransferTx: + loadAccountsAddResourceTask(&stxn.Txn.Sender, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) + if !stxn.Txn.AssetSender.IsZero() { + loadAccountsAddResourceTask(&stxn.Txn.AssetSender, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) + loadAccountsAddAccountTask(&stxn.Txn.AssetSender, task, accountTasks, queue) + } + if !stxn.Txn.AssetReceiver.IsZero() { + loadAccountsAddResourceTask(&stxn.Txn.AssetReceiver, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) + loadAccountsAddAccountTask(&stxn.Txn.AssetReceiver, task, accountTasks, queue) + } + if !stxn.Txn.AssetCloseTo.IsZero() { + loadAccountsAddResourceTask(&stxn.Txn.AssetCloseTo, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) + loadAccountsAddAccountTask(&stxn.Txn.AssetCloseTo, task, accountTasks, queue) + } + case protocol.AssetFreezeTx: + if !stxn.Txn.FreezeAccount.IsZero() { + loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.FreezeAsset), basics.AssetCreatable, task, resourceTasks, queue) + loadAccountsAddResourceTask(&stxn.Txn.FreezeAccount, basics.CreatableIndex(stxn.Txn.FreezeAsset), basics.AssetCreatable, task, resourceTasks, queue) + loadAccountsAddAccountTask(&stxn.Txn.FreezeAccount, task, accountTasks, queue) + } + case protocol.ApplicationCallTx: + if stxn.Txn.ApplicationID != 0 { + // load the global - so that we'll have the program + loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.ApplicationID), basics.AppCreatable, task, resourceTasks, queue) + // load the local - so that we'll have the local state + // this is something we need to decide if we want to enable, since not every application call would use local storage. + //loadAccountsAddResourceTask(&stxn.Txn.Sender, basics.CreatableIndex(stxn.Txn.ApplicationID), basics.AppCreatable, task, resourceTasks, queue) + } + for _, fa := range stxn.Txn.ForeignApps { + loadAccountsAddResourceTask(nil, basics.CreatableIndex(fa), basics.AppCreatable, task, resourceTasks, queue) + } + for _, fa := range stxn.Txn.ForeignAssets { + loadAccountsAddResourceTask(nil, basics.CreatableIndex(fa), basics.AssetCreatable, task, resourceTasks, queue) + } + for ixa := range stxn.Txn.Accounts { + loadAccountsAddAccountTask(&stxn.Txn.Accounts[ixa], task, accountTasks, queue) + } + case protocol.CompactCertTx: + fallthrough + case protocol.KeyRegistrationTx: + fallthrough + default: + } + // If you add new addresses here, also add them in getTxnAddresses(). + if !stxn.Txn.Sender.IsZero() { + loadAccountsAddAccountTask(&stxn.Txn.Sender, task, accountTasks, queue) + } + } + totalBalances += task.balancesCount + totalResources += task.resourcesCount + // expand the queue if needed. + queue = queue.expand() + } + + // find the number of tasks + tasksCount := int64(0) + for lastQueueEntry := &tasksQueue; ; lastQueueEntry = lastQueueEntry.next { + if lastQueueEntry.next == nil { + tasksCount = int64(lastQueueEntry.baseIdx + lastQueueEntry.used) + break + } + } + + // update all the groups task : + // allocate the correct number of balances, as well as + // enough space on the "done" channel. + allBalances := make([]loadedAccountDataEntry, totalBalances) + allResources := make([]loadedResourcesEntry, totalResources) + usedBalances := 0 + usedResources := 0 + + // groupDoneCh is used to communicate the completion signal for a single + // resource/address load between the go-routines and the main output channel + // writer loop. The various go-routines would write to the channel the index + // of the task that is complete and ready to be sent. + groupDoneCh := make(chan groupTaskDone, len(groupsReady)) + const dependencyFreeGroup = -int64(^uint64(0)/2) - 1 + for grpIdx := range groupsReady { + gr := &groupsReady[grpIdx] + gr.groupTaskIndex = grpIdx + gr.incompleteCount = int64(gr.balancesCount + gr.resourcesCount) + gr.balances = allBalances[usedBalances : usedBalances+gr.balancesCount] + if gr.resourcesCount > 0 { + gr.resources = allResources[usedResources : usedResources+gr.resourcesCount] + usedResources += gr.resourcesCount + } + usedBalances += gr.balancesCount + if gr.incompleteCount == 0 { + gr.incompleteCount = dependencyFreeGroup + } + } + + taskIdx := int64(-1) + defer atomic.StoreInt64(&taskIdx, tasksCount) + // create few go-routines to load asyncroniously the account data. + for i := 0; i < asyncAccountLoadingThreadCount; i++ { + go p.asyncPrefetchRoutine(&tasksQueue, &taskIdx, groupDoneCh) + } + + // iterate on the transaction groups tasks. This array retains the original order. + completed := make(map[int]bool) + for i := 0; i < len(p.groups); { + wait: + incompleteCount := atomic.LoadInt64(&groupsReady[i].incompleteCount) + if incompleteCount > 0 || (incompleteCount != dependencyFreeGroup && !completed[i]) { + select { + case done := <-groupDoneCh: + if done.err != nil { + // if there is an error, report the error to the output channel. + p.outChan <- loadedTransactionGroup{ + err: done.err, + } + return + } + if done.groupIdx > i { + // mark future txn as ready. + completed[done.groupIdx] = true + goto wait + } else if done.groupIdx < i { + // it was already processed. + goto wait + } + case <-ctx.Done(): + return + } + } + next := i + for ; next < len(p.groups); next++ { + if !completed[next] { + if next > i { + i = next + goto wait + } + // next == i + } + + delete(completed, next) + + // if we had no error, write the result to the output channel. + // this write will not block since we preallocated enough space on the channel. + p.outChan <- loadedTransactionGroup{ + txnGroup: p.groups[next], + accounts: groupsReady[next].balances, + resources: groupsReady[next].resources, + } + } + // if we get to this point, it means that we have no more transaction to process. + break + } +} + +func (gt *groupTask) markCompletionAcct(idx int, br loadedAccountDataEntry, groupDoneCh chan groupTaskDone) { + gt.balances[idx] = br + if atomic.AddInt64(>.incompleteCount, -1) == 0 { + groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex} + } +} + +func (gt *groupTask) markCompletionResource(idx int, res loadedResourcesEntry, groupDoneCh chan groupTaskDone) { + gt.resources[idx] = res + if atomic.AddInt64(>.incompleteCount, -1) == 0 { + groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex} + } +} + +func (gt *groupTask) markCompletionAcctError(err error, groupDoneCh chan groupTaskDone) { + for { + curVal := atomic.LoadInt64(>.incompleteCount) + if curVal <= 0 { + return + } + if atomic.CompareAndSwapInt64(>.incompleteCount, curVal, 0) { + groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex, err: err} + return + } + } +} + +func (p *accountPrefetcher) asyncPrefetchRoutine(queue *preloaderTaskQueue, taskIdx *int64, groupDoneCh chan groupTaskDone) { + var task *preloaderTask + var err error + for { + nextTaskIdx := atomic.AddInt64(taskIdx, 1) + queue, task = queue.getTaskAtIndex(int(nextTaskIdx)) + if task == nil { + // no more tasks. + return + } + if task.creatableIndex == 0 { + // lookup the account data directly from the ledger. + var acctData ledgercore.AccountData + acctData, _, err = p.ledger.LookupWithoutRewards(p.rnd, *task.address) + // if there was an error.. + if err != nil { + // there was an error loading that entry. + break + } + br := loadedAccountDataEntry{ + address: task.address, + data: &acctData, + } + // update all the group tasks with the new acquired balance. + for i, wt := range task.groups { + wt.markCompletionAcct(task.groupIndices[i], br, groupDoneCh) + } + continue + } + if task.address == nil { + // start off by figuring out the creator in case it's a global resource. + var creator basics.Address + var ok bool + creator, ok, err = p.ledger.GetCreatorForRound(p.rnd, task.creatableIndex, task.creatableType) + if err != nil { + // there was an error loading that entry. + break + } + if !ok { + re := loadedResourcesEntry{ + creatableIndex: task.creatableIndex, + creatableType: task.creatableType, + } + // update all the group tasks with the new acquired balance. + for i, wt := range task.groups { + wt.markCompletionResource(task.groupIndices[i], re, groupDoneCh) + } + continue + } + task.address = &creator + } + var resource ledgercore.AccountResource + resource, err = p.ledger.LookupResource(p.rnd, *task.address, task.creatableIndex, task.creatableType) + if err != nil { + // there was an error loading that entry. + break + } + re := loadedResourcesEntry{ + resource: &resource, + address: task.address, + creatableIndex: task.creatableIndex, + creatableType: task.creatableType, + } + // update all the group tasks with the new acquired balance. + for i, wt := range task.groups { + wt.markCompletionResource(task.groupIndices[i], re, groupDoneCh) + } + } + // if we got here, it means that there was an error. + // in every case we get here, the task is gurenteed to be a non-nil. + for _, wt := range task.groups { + // notify the channel of the error. + wt.markCompletionAcctError(err, groupDoneCh) + } +} diff --git a/ledger/internal/evalprefetcher_test.go b/ledger/internal/evalprefetcher_test.go new file mode 100644 index 000000000..96e1a9386 --- /dev/null +++ b/ledger/internal/evalprefetcher_test.go @@ -0,0 +1,691 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package internal + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/test/partitiontest" +) + +type prefetcherTestLedger struct { + round basics.Round + balances map[basics.Address]ledgercore.AccountData + creators map[basics.CreatableIndex]basics.Address +} + +func (l *prefetcherTestLedger) BlockHdr(basics.Round) (bookkeeping.BlockHeader, error) { + return bookkeeping.BlockHeader{}, nil +} +func (l *prefetcherTestLedger) CheckDup(config.ConsensusParams, basics.Round, basics.Round, basics.Round, transactions.Txid, ledgercore.Txlease) error { + return nil +} +func (l *prefetcherTestLedger) LookupWithoutRewards(_ basics.Round, addr basics.Address) (ledgercore.AccountData, basics.Round, error) { + if data, has := l.balances[addr]; has { + return data, l.round, nil + } + return ledgercore.AccountData{}, l.round, nil +} +func (l *prefetcherTestLedger) LookupResource(basics.Round, basics.Address, basics.CreatableIndex, basics.CreatableType) (ledgercore.AccountResource, error) { + return ledgercore.AccountResource{}, nil +} +func (l *prefetcherTestLedger) GetCreatorForRound(_ basics.Round, cidx basics.CreatableIndex, _ basics.CreatableType) (basics.Address, bool, error) { + if addr, has := l.creators[cidx]; has { + return addr, true, nil + } + return basics.Address{}, false, nil +} +func (l *prefetcherTestLedger) GenesisHash() crypto.Digest { + return crypto.Digest{} +} +func (l *prefetcherTestLedger) GenesisProto() config.ConsensusParams { + return config.Consensus[protocol.ConsensusCurrentVersion] +} +func (l *prefetcherTestLedger) LatestTotals() (basics.Round, ledgercore.AccountTotals, error) { + return l.round, ledgercore.AccountTotals{}, nil +} +func (l *prefetcherTestLedger) CompactCertVoters(basics.Round) (*ledgercore.VotersForRound, error) { + return nil, nil +} + +func TestEvaluatorPrefetcher(t *testing.T) { + partitiontest.PartitionTest(t) + + acctAddrPtr := func(i int) (o *basics.Address) { + o = new(basics.Address) + o[0] = byte(i) + o[1] = byte(i >> 8) + o[2] = byte(i >> 16) + return + } + acctAddr := func(i int) (o basics.Address) { + t := *acctAddrPtr(i) + copy(o[:], t[:]) + return + } + + rnd := basics.Round(5) + var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + + var ledger = &prefetcherTestLedger{ + round: rnd, + balances: make(map[basics.Address]ledgercore.AccountData), + creators: make(map[basics.CreatableIndex]basics.Address), + } + ledger.balances[acctAddr(1)] = ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + } + ledger.creators[1001] = acctAddr(2) + ledger.creators[2001] = acctAddr(15) + + type testTransactionCases struct { + signedTxn transactions.SignedTxn + accounts []loadedAccountDataEntry + resources []loadedResourcesEntry + } + + testTransactions := []testTransactionCases{ + // payment transaction + { + signedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: acctAddr(1), + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: acctAddr(2), + CloseRemainderTo: acctAddr(3), + }, + }, + }, + accounts: []loadedAccountDataEntry{ + { + address: &feeSinkAddr, + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(1), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + }, + }, + { + address: acctAddrPtr(2), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(3), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + }, + }, + // asset config transaction for a non-existing asset + { + signedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.AssetConfigTx, + Header: transactions.Header{ + Sender: acctAddr(1), + }, + AssetConfigTxnFields: transactions.AssetConfigTxnFields{ + ConfigAsset: 1000, + }, + }, + }, + accounts: []loadedAccountDataEntry{ + { + address: &feeSinkAddr, + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(1), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + }, + }, + }, + resources: []loadedResourcesEntry{ + { + address: nil, + creatableIndex: 1000, + creatableType: basics.AssetCreatable, + resource: nil, + }, + }, + }, + // asset config transaction for an existing asset + { + signedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.AssetConfigTx, + Header: transactions.Header{ + Sender: acctAddr(1), + }, + AssetConfigTxnFields: transactions.AssetConfigTxnFields{ + ConfigAsset: 1001, + }, + }, + }, + accounts: []loadedAccountDataEntry{ + { + address: &feeSinkAddr, + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(1), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + }, + }, + }, + resources: []loadedResourcesEntry{ + { + address: acctAddrPtr(2), + creatableIndex: 1001, + creatableType: basics.AssetCreatable, + resource: &ledgercore.AccountResource{}, + }, + }, + }, + // asset transfer transaction + { + signedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.AssetTransferTx, + Header: transactions.Header{ + Sender: acctAddr(1), + }, + AssetTransferTxnFields: transactions.AssetTransferTxnFields{ + XferAsset: 1001, + AssetSender: acctAddr(2), + AssetReceiver: acctAddr(3), + AssetCloseTo: acctAddr(4), + }, + }, + }, + accounts: []loadedAccountDataEntry{ + { + address: &feeSinkAddr, + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(1), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + }, + }, + { + address: acctAddrPtr(2), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(3), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(4), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + }, + resources: []loadedResourcesEntry{ + { + address: acctAddrPtr(1), + creatableIndex: 1001, + creatableType: basics.AssetCreatable, + resource: &ledgercore.AccountResource{}, + }, + { + address: acctAddrPtr(2), + creatableIndex: 1001, + creatableType: basics.AssetCreatable, + resource: &ledgercore.AccountResource{}, + }, + { + address: acctAddrPtr(3), + creatableIndex: 1001, + creatableType: basics.AssetCreatable, + resource: &ledgercore.AccountResource{}, + }, + { + address: acctAddrPtr(4), + creatableIndex: 1001, + creatableType: basics.AssetCreatable, + resource: &ledgercore.AccountResource{}, + }, + }, + }, + // asset freeze transaction + { + signedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.AssetFreezeTx, + Header: transactions.Header{ + Sender: acctAddr(1), + }, + AssetFreezeTxnFields: transactions.AssetFreezeTxnFields{ + FreezeAccount: acctAddr(3), + FreezeAsset: 1001, + }, + }, + }, + accounts: []loadedAccountDataEntry{ + { + address: &feeSinkAddr, + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(1), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + }, + }, + { + address: acctAddrPtr(3), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + }, + resources: []loadedResourcesEntry{ + { + address: acctAddrPtr(2), + creatableIndex: 1001, + creatableType: basics.AssetCreatable, + resource: &ledgercore.AccountResource{}, + }, + { + address: acctAddrPtr(3), + creatableIndex: 1001, + creatableType: basics.AssetCreatable, + resource: &ledgercore.AccountResource{}, + }, + }, + }, + // application transaction + { + signedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.ApplicationCallTx, + Header: transactions.Header{ + Sender: acctAddr(1), + }, + ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ + ApplicationID: 10, + Accounts: []basics.Address{ + acctAddr(4), + acctAddr(5), + }, + ForeignApps: []basics.AppIndex{ + 2001, + 2002, + }, + ForeignAssets: []basics.AssetIndex{ + 1001, + }, + }, + }, + }, + accounts: []loadedAccountDataEntry{ + { + address: &feeSinkAddr, + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(1), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + }, + }, + { + address: acctAddrPtr(4), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + address: acctAddrPtr(5), + data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + }, + resources: []loadedResourcesEntry{ + { + address: acctAddrPtr(2), + creatableIndex: 1001, + creatableType: basics.AssetCreatable, + resource: &ledgercore.AccountResource{}, + }, + { + address: acctAddrPtr(15), + creatableIndex: 2001, + creatableType: basics.AppCreatable, + resource: &ledgercore.AccountResource{}, + }, + { + address: nil, + creatableIndex: 2002, + creatableType: basics.AppCreatable, + resource: nil, + }, + /* - if we'll decide that we want to perfetch the account local state, then this should be enabled. + { + address: acctAddrPtr(1), + creatableIndex: 10, + creatableType: basics.AppCreatable, + resource: &ledgercore.AccountResource{}, + },*/ + { + address: nil, + creatableIndex: 10, + creatableType: basics.AppCreatable, + resource: nil, + }, + }, + }, + } + + for _, txn := range testTransactions { + groups := make([][]transactions.SignedTxnWithAD, 1) + groups[0] = make([]transactions.SignedTxnWithAD, 1) + groups[0][0].SignedTxn = txn.signedTxn + + preloadedTxnGroupsCh := prefetchAccounts(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[protocol.ConsensusCurrentVersion]) + + for loadedTxnGroup := range preloadedTxnGroupsCh { + require.NoError(t, loadedTxnGroup.err) + + // compare the txn.accounts and loadedTxnGroup.accounts in order agnostic way. + require.Equal(t, len(txn.accounts), len(loadedTxnGroup.accounts)) + for _, acct := range txn.accounts { + // make sure we find it in loadedTxnGroup.accounts + found := false + require.NotNil(t, acct.address) + for k, loadedAcct := range loadedTxnGroup.accounts { + require.NotNilf(t, loadedAcct.address, "index: %d\nexpected %#v\nactual %#v", k, acct, loadedAcct) + if *acct.address != *loadedAcct.address { + continue + } + require.Equal(t, *acct.data, *loadedAcct.data) + found = true + break + } + require.Truef(t, found, "missing account %#v", acct) + } + + // compare the txn.resources and loadedTxnGroup.resources in order agnostic way + require.Equalf(t, len(txn.resources), len(loadedTxnGroup.resources), "mismatching resources count; actual : %v", loadedTxnGroup.resources) + for _, res := range txn.resources { + // make sure we find it in loadedTxnGroup.resources + found := false + for _, loadedRes := range loadedTxnGroup.resources { + if res.creatableIndex != loadedRes.creatableIndex { + continue + } + require.Equal(t, res.creatableType, loadedRes.creatableType) + if res.address == nil { + require.Nil(t, loadedRes.address) + } else { + if loadedRes.address == nil || *res.address != *loadedRes.address { + continue + } + } + if res.resource == nil { + require.Nil(t, loadedRes.resource) + } else { + require.NotNil(t, loadedRes.resource) + require.Equal(t, *res.resource, *loadedRes.resource) + } + found = true + break + } + require.Truef(t, found, "failed to find resource %#v", res) + } + } + } +} + +func TestEvaluatorPrefetcherQueueExpansion(t *testing.T) { + partitiontest.PartitionTest(t) + + acctAddrPtr := func(i int) (o *basics.Address) { + o = new(basics.Address) + o[0] = byte(i) + o[1] = byte(i >> 8) + o[2] = byte(i >> 16) + return + } + acctAddr := func(i int) (o basics.Address) { + t := *acctAddrPtr(i) + copy(o[:], t[:]) + return + } + + rnd := basics.Round(5) + var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + + var ledger = &prefetcherTestLedger{ + round: rnd, + balances: make(map[basics.Address]ledgercore.AccountData), + creators: make(map[basics.CreatableIndex]basics.Address), + } + ledger.balances[acctAddr(1)] = ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + } + type testTransactionCases struct { + signedTxn transactions.SignedTxn + accounts []loadedAccountDataEntry + resources []loadedResourcesEntry + } + + txnGroups := make([][]transactions.SignedTxnWithAD, 20000) + addr := 1 + for i := range txnGroups { + txnGroups[i] = make([]transactions.SignedTxnWithAD, 16) + for k := range txnGroups[i] { + txnGroups[i][k].SignedTxn = transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: acctAddr(1), + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: acctAddr(addr), + CloseRemainderTo: acctAddr(addr + 1), + }, + }, + } + addr += 2 + } + } + preloadedTxnGroupsCh := prefetchAccounts(context.Background(), ledger, rnd, txnGroups, feeSinkAddr, config.Consensus[protocol.ConsensusCurrentVersion]) + groupsCount := 0 + addressCount := 0 + uniqueAccounts := make(map[basics.Address]bool) + for k := range preloadedTxnGroupsCh { + addressCount += len(k.accounts) + for _, acct := range k.accounts { + uniqueAccounts[*acct.address] = true + } + require.Equal(t, txnGroups[groupsCount], k.txnGroup) + groupsCount++ + } + require.Equal(t, len(txnGroups), groupsCount) + // the +1 below is for the fee sink address. + require.Equal(t, len(txnGroups)*16*3+1, addressCount) + require.Equal(t, len(txnGroups)*16*2+1, len(uniqueAccounts)) +} + +func BenchmarkPrefetcherApps(b *testing.B) { + acctAddrPtr := func(i int) (o *basics.Address) { + o = new(basics.Address) + o[0] = byte(i) + o[1] = byte(i >> 8) + o[2] = byte(i >> 16) + return + } + acctAddr := func(i int) (o basics.Address) { + t := *acctAddrPtr(i) + copy(o[:], t[:]) + return + } + + txnGroupLen := 16 + groups := make([][]transactions.SignedTxnWithAD, 1+b.N/txnGroupLen) + for grpIdx := range groups { + groups[grpIdx] = make([]transactions.SignedTxnWithAD, txnGroupLen) + for txnIdx := range groups[grpIdx] { + groups[grpIdx][txnIdx].SignedTxn = transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.ApplicationCallTx, + Header: transactions.Header{ + Sender: acctAddr(grpIdx + txnIdx), + }, + ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ + ApplicationID: 10, + Accounts: []basics.Address{ + acctAddr(grpIdx + txnIdx + 1), + acctAddr(grpIdx + txnIdx + 1), + }, + ForeignApps: []basics.AppIndex{ + 2001, + 2002, + }, + ForeignAssets: []basics.AssetIndex{ + 1001, + }, + }, + }, + } + } + } + rnd := basics.Round(5) + var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + var ledger = &prefetcherTestLedger{ + round: rnd, + balances: make(map[basics.Address]ledgercore.AccountData), + creators: make(map[basics.CreatableIndex]basics.Address), + } + ledger.balances[acctAddr(1)] = ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + } + + b.ResetTimer() + preloadedTxnGroupsCh := prefetchAccounts(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[protocol.ConsensusCurrentVersion]) + for k := range preloadedTxnGroupsCh { + require.NoError(b, k.err) + } +} + +func BenchmarkPrefetcherPayment(b *testing.B) { + acctAddrPtr := func(i int) (o *basics.Address) { + o = new(basics.Address) + o[0] = byte(i) + o[1] = byte(i >> 8) + o[2] = byte(i >> 16) + return + } + acctAddr := func(i int) (o basics.Address) { + t := *acctAddrPtr(i) + copy(o[:], t[:]) + return + } + + txnGroupLen := 16 + groups := make([][]transactions.SignedTxnWithAD, 1+b.N/txnGroupLen) + for grpIdx := range groups { + groups[grpIdx] = make([]transactions.SignedTxnWithAD, txnGroupLen) + for txnIdx := range groups[grpIdx] { + groups[grpIdx][txnIdx].SignedTxn = transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: acctAddr(grpIdx + txnIdx), + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: acctAddr(grpIdx + txnIdx + 1), + CloseRemainderTo: acctAddr(grpIdx + txnIdx + 2), + }, + }, + } + } + } + rnd := basics.Round(5) + var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + var ledger = &prefetcherTestLedger{ + round: rnd, + balances: make(map[basics.Address]ledgercore.AccountData), + creators: make(map[basics.CreatableIndex]basics.Address), + } + ledger.balances[acctAddr(1)] = ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + } + + b.ResetTimer() + preloadedTxnGroupsCh := prefetchAccounts(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[protocol.ConsensusCurrentVersion]) + for k := range preloadedTxnGroupsCh { + require.NoError(b, k.err) + } +} +func BenchmarkChannelWrites(b *testing.B) { + b.Run("groupTaskDone", func(b *testing.B) { + c := make(chan groupTaskDone, b.N) + for i := 0; i < b.N; i++ { + c <- groupTaskDone{groupIdx: i} + } + }) + + b.Run("int64", func(b *testing.B) { + c := make(chan int64, b.N) + for i := int64(0); i < int64(b.N); i++ { + c <- i + } + }) +} |