summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTsachi Herman <tsachi.herman@algorand.com>2022-02-24 20:26:46 -0500
committerGitHub <noreply@github.com>2022-02-24 20:26:46 -0500
commit789922d78c3fee4126fac7938b8aeeab74d22f7d (patch)
treeb9f2ab59e347c5ee2df421bd40d8edefbd7a741b
parent6ecb8aab6a8a7ae71963092cb3eb5eb4342e593a (diff)
ledger refactoring: implement account data and resources prefetcher (#3666)
## Summary This PR replaces the existing account preloading during validating with a more transaction-type oriented prefetcher. The prefetcher examine the transaction type, and preload all the required resources ( account data, assets and applications ). ## Test Plan Unit test added. ## Benchmarks The following (existing) benchmarks were executed, and no performance difference was noted : * BenchmarkBlockEvaluatorDiskFullAppOptIns * BenchmarkBlockEvaluatorDiskAppOptIns * BenchmarkBlockEvaluatorDiskNoCrypto * BenchmarkBlockEvaluatorDiskCrypto * BenchmarkBlockEvaluatorRAMNoCrypto * BenchmarkBlockEvaluatorRAMCrypto The benchmark `BenchmarkBlockEvaluatorDiskAppCalls`, which was designed for applications, shown a notable improvements, dropping the transaction processing time from 42902ns to 33465ns.
-rw-r--r--ledger/evalbench_test.go6
-rw-r--r--ledger/internal/eval.go243
-rw-r--r--ledger/internal/evalprefetcher.go533
-rw-r--r--ledger/internal/evalprefetcher_test.go691
4 files changed, 1275 insertions, 198 deletions
diff --git a/ledger/evalbench_test.go b/ledger/evalbench_test.go
index f03a115a5..e80aa9147 100644
--- a/ledger/evalbench_test.go
+++ b/ledger/evalbench_test.go
@@ -303,6 +303,12 @@ func BenchmarkBlockEvaluatorDiskAppOptIns(b *testing.B) {
}
func BenchmarkBlockEvaluatorDiskAppCalls(b *testing.B) {
+ // Go normally starts measuring the benchmark run time with b.N = 1, and then
+ // adjusts b.N if the runtime is less than a second. In this case, the runtime
+ // of the test for b.N = 1 is about 0.7 second, which cause the benchmark to be
+ // executed twice. Running it twice would not be an issue on it's own; however,
+ // the setup time for this test is 1.5 minutes long. By setting the b.N = 2, we
+ // set up for success on the first iteration, and preventing a second iteration.
if b.N < 2 {
b.N = 2
}
diff --git a/ledger/internal/eval.go b/ledger/internal/eval.go
index 595a12069..cabd03eaa 100644
--- a/ledger/internal/eval.go
+++ b/ledger/internal/eval.go
@@ -60,11 +60,6 @@ var ErrNotInCowCache = errors.New("can't find object in cow cache")
// is considerably slower.
const averageEncodedTxnSizeHint = 150
-// asyncAccountLoadingThreadCount controls how many go routines would be used
-// to load the account data before the Eval() start processing individual
-// transaction group.
-const asyncAccountLoadingThreadCount = 4
-
// Creatable represent a single creatable object.
type creatable struct {
cindex basics.CreatableIndex
@@ -1543,12 +1538,12 @@ func Eval(ctx context.Context, l LedgerForEvaluator, blk bookkeeping.Block, vali
}
accountLoadingCtx, accountLoadingCancel := context.WithCancel(ctx)
- paysetgroupsCh := loadAccounts(accountLoadingCtx, l, blk.Round()-1, paysetgroups, blk.BlockHeader.FeeSink, blk.ConsensusProtocol())
+ preloadedTxnsData := prefetchAccounts(accountLoadingCtx, l, blk.Round()-1, paysetgroups, blk.BlockHeader.FeeSink, blk.ConsensusProtocol())
// ensure that before we exit from this method, the account loading is no longer active.
defer func() {
accountLoadingCancel()
// wait for the paysetgroupsCh to get closed.
- for range paysetgroupsCh {
+ for range preloadedTxnsData {
}
}()
@@ -1566,26 +1561,62 @@ func Eval(ctx context.Context, l LedgerForEvaluator, blk bookkeeping.Block, vali
txvalidator.txgroups = paysetgroups
txvalidator.done = make(chan error, 1)
go txvalidator.run()
-
}
base := eval.state.lookupParent.(*roundCowBase)
-
transactionGroupLoop:
for {
select {
- case txgroup, ok := <-paysetgroupsCh:
+ case txgroup, ok := <-preloadedTxnsData:
if !ok {
break transactionGroupLoop
} else if txgroup.err != nil {
return ledgercore.StateDelta{}, txgroup.err
}
- for _, br := range txgroup.balances {
- base.accounts[br.Addr] = br.AccountData
- // TODO: pull needed resources into cache?
+ for _, br := range txgroup.accounts {
+ if _, have := base.accounts[*br.address]; !have {
+ base.accounts[*br.address] = *br.data
+ }
}
- err = eval.TransactionGroup(txgroup.group)
+ for _, lr := range txgroup.resources {
+ if lr.address == nil {
+ // we attempted to look for the creator, and failed.
+ if lr.creatableType == basics.AssetCreatable {
+ base.creators[creatable{cindex: lr.creatableIndex, ctype: basics.AssetCreatable}] = foundAddress{exists: false}
+ } else {
+ base.creators[creatable{cindex: lr.creatableIndex, ctype: basics.AppCreatable}] = foundAddress{exists: false}
+ }
+ continue
+ }
+ if lr.creatableType == basics.AssetCreatable {
+ if lr.resource.AssetHolding != nil {
+ base.assets[ledgercore.AccountAsset{Address: *lr.address, Asset: basics.AssetIndex(lr.creatableIndex)}] = cachedAssetHolding{value: *lr.resource.AssetHolding, exists: true}
+ } else {
+ base.assets[ledgercore.AccountAsset{Address: *lr.address, Asset: basics.AssetIndex(lr.creatableIndex)}] = cachedAssetHolding{exists: false}
+ }
+ if lr.resource.AssetParams != nil {
+ base.assetParams[ledgercore.AccountAsset{Address: *lr.address, Asset: basics.AssetIndex(lr.creatableIndex)}] = cachedAssetParams{value: *lr.resource.AssetParams, exists: true}
+ base.creators[creatable{cindex: lr.creatableIndex, ctype: basics.AssetCreatable}] = foundAddress{address: *lr.address, exists: true}
+ } else {
+ base.assetParams[ledgercore.AccountAsset{Address: *lr.address, Asset: basics.AssetIndex(lr.creatableIndex)}] = cachedAssetParams{exists: false}
+
+ }
+ } else {
+ if lr.resource.AppLocalState != nil {
+ base.appLocalStates[ledgercore.AccountApp{Address: *lr.address, App: basics.AppIndex(lr.creatableIndex)}] = cachedAppLocalState{value: *lr.resource.AppLocalState, exists: true}
+ } else {
+ base.appLocalStates[ledgercore.AccountApp{Address: *lr.address, App: basics.AppIndex(lr.creatableIndex)}] = cachedAppLocalState{exists: false}
+ }
+ if lr.resource.AppParams != nil {
+ base.appParams[ledgercore.AccountApp{Address: *lr.address, App: basics.AppIndex(lr.creatableIndex)}] = cachedAppParams{value: *lr.resource.AppParams, exists: true}
+ base.creators[creatable{cindex: lr.creatableIndex, ctype: basics.AppCreatable}] = foundAddress{address: *lr.address, exists: true}
+ } else {
+ base.appParams[ledgercore.AccountApp{Address: *lr.address, App: basics.AppIndex(lr.creatableIndex)}] = cachedAppParams{exists: false}
+ }
+ }
+ }
+ err = eval.TransactionGroup(txgroup.txnGroup)
if err != nil {
return ledgercore.StateDelta{}, err
}
@@ -1623,187 +1654,3 @@ transactionGroupLoop:
return eval.state.deltas(), nil
}
-
-// loadedTransactionGroup is a helper struct to allow asynchronous loading of the account data needed by the transaction groups
-type loadedTransactionGroup struct {
- // group is the transaction group
- group []transactions.SignedTxnWithAD
- // balances is a list of all the balances that the transaction group refer to and are needed.
- balances []ledgercore.NewBalanceRecord
- // err indicates whether any of the balances in this structure have failed to load. In case of an error, at least
- // one of the entries in the balances would be uninitialized.
- err error
-}
-
-// Return the maximum number of addresses referenced in any given transaction.
-func maxAddressesInTxn(proto *config.ConsensusParams) int {
- return 7 + proto.MaxAppTxnAccounts
-}
-
-// loadAccounts loads the account data for the provided transaction group list. It also loads the feeSink account and add it to the first returned transaction group.
-// The order of the transaction groups returned by the channel is identical to the one in the input array.
-func loadAccounts(ctx context.Context, l LedgerForEvaluator, rnd basics.Round, groups [][]transactions.SignedTxnWithAD, feeSinkAddr basics.Address, consensusParams config.ConsensusParams) chan loadedTransactionGroup {
- outChan := make(chan loadedTransactionGroup, len(groups))
- go func() {
- // groupTask helps to organize the account loading for each transaction group.
- type groupTask struct {
- // balances contains the loaded balances each transaction group have
- balances []ledgercore.NewBalanceRecord
- // balancesCount is the number of balances that nees to be loaded per transaction group
- balancesCount int
- // done is a waiting channel for all the account data for the transaction group to be loaded
- done chan error
- }
- // addrTask manage the loading of a single account address.
- type addrTask struct {
- // account address to fetch
- address basics.Address
- // a list of transaction group tasks that depends on this address
- groups []*groupTask
- // a list of indices into the groupTask.balances where the address would be stored
- groupIndices []int
- }
- defer close(outChan)
-
- accountTasks := make(map[basics.Address]*addrTask)
- addressesCh := make(chan *addrTask, len(groups)*consensusParams.MaxTxGroupSize*maxAddressesInTxn(&consensusParams))
- // totalBalances counts the total number of balances over all the transaction groups
- totalBalances := 0
-
- initAccount := func(addr basics.Address, wg *groupTask) {
- if addr.IsZero() {
- return
- }
- if task, have := accountTasks[addr]; !have {
- task := &addrTask{
- address: addr,
- groups: make([]*groupTask, 1, 4),
- groupIndices: make([]int, 1, 4),
- }
- task.groups[0] = wg
- task.groupIndices[0] = wg.balancesCount
-
- accountTasks[addr] = task
- addressesCh <- task
- } else {
- task.groups = append(task.groups, wg)
- task.groupIndices = append(task.groupIndices, wg.balancesCount)
- }
- wg.balancesCount++
- totalBalances++
- }
- // add the fee sink address to the accountTasks/addressesCh so that it will be loaded first.
- if len(groups) > 0 {
- task := &addrTask{
- address: feeSinkAddr,
- }
- addressesCh <- task
- accountTasks[feeSinkAddr] = task
- }
-
- // iterate over the transaction groups and add all their account addresses to the list
- groupsReady := make([]*groupTask, len(groups))
- for i, group := range groups {
- task := &groupTask{}
- groupsReady[i] = task
- for _, stxn := range group {
- // If you add new addresses here, also add them in getTxnAddresses().
- initAccount(stxn.Txn.Sender, task)
- initAccount(stxn.Txn.Receiver, task)
- initAccount(stxn.Txn.CloseRemainderTo, task)
- initAccount(stxn.Txn.AssetSender, task)
- initAccount(stxn.Txn.AssetReceiver, task)
- initAccount(stxn.Txn.AssetCloseTo, task)
- initAccount(stxn.Txn.FreezeAccount, task)
- for _, xa := range stxn.Txn.Accounts {
- initAccount(xa, task)
- }
- }
- }
-
- // Add fee sink to the first group
- if len(groupsReady) > 0 {
- initAccount(feeSinkAddr, groupsReady[0])
- }
- close(addressesCh)
-
- // updata all the groups task :
- // allocate the correct number of balances, as well as
- // enough space on the "done" channel.
- allBalances := make([]ledgercore.NewBalanceRecord, totalBalances)
- usedBalances := 0
- for _, gr := range groupsReady {
- gr.balances = allBalances[usedBalances : usedBalances+gr.balancesCount]
- gr.done = make(chan error, gr.balancesCount)
- usedBalances += gr.balancesCount
- }
-
- // create few go-routines to load asyncroniously the account data.
- for i := 0; i < asyncAccountLoadingThreadCount; i++ {
- go func() {
- for {
- select {
- case task, ok := <-addressesCh:
- // load the address
- if !ok {
- // the channel got closed, which mean we're done.
- return
- }
- // lookup the account data directly from the ledger.
- acctData, _, err := l.LookupWithoutRewards(rnd, task.address)
- br := ledgercore.NewBalanceRecord{
- Addr: task.address,
- AccountData: acctData,
- }
- // if there is no error..
- if err == nil {
- // update all the group tasks with the new acquired balance.
- for i, wg := range task.groups {
- wg.balances[task.groupIndices[i]] = br
- // write a nil to indicate that we're loaded one entry.
- wg.done <- nil
- }
- } else {
- // there was an error loading that entry.
- for _, wg := range task.groups {
- // notify the channel of the error.
- wg.done <- err
- }
- }
- case <-ctx.Done():
- // if the context was canceled, abort right away.
- return
- }
-
- }
- }()
- }
-
- // iterate on the transaction groups tasks. This array retains the original order.
- for i, wg := range groupsReady {
- // Wait to receive wg.balancesCount nil error messages, one for each address referenced in this txn group.
- for j := 0; j < wg.balancesCount; j++ {
- select {
- case err := <-wg.done:
- if err != nil {
- // if there is an error, report the error to the output channel.
- outChan <- loadedTransactionGroup{
- group: groups[i],
- err: err,
- }
- return
- }
- case <-ctx.Done():
- return
- }
- }
- // if we had no error, write the result to the output channel.
- // this write will not block since we preallocated enough space on the channel.
- outChan <- loadedTransactionGroup{
- group: groups[i],
- balances: wg.balances,
- }
- }
- }()
- return outChan
-}
diff --git a/ledger/internal/evalprefetcher.go b/ledger/internal/evalprefetcher.go
new file mode 100644
index 000000000..2d193cb58
--- /dev/null
+++ b/ledger/internal/evalprefetcher.go
@@ -0,0 +1,533 @@
+// Copyright (C) 2019-2022 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package internal
+
+import (
+ "context"
+ "sync/atomic"
+
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/data/transactions"
+ "github.com/algorand/go-algorand/ledger/ledgercore"
+ "github.com/algorand/go-algorand/protocol"
+)
+
+// asyncAccountLoadingThreadCount controls how many go routines would be used
+// to load the account data before the Eval() start processing individual
+// transaction group.
+const asyncAccountLoadingThreadCount = 4
+
+type loadedAccountDataEntry struct {
+ address *basics.Address
+ data *ledgercore.AccountData
+}
+
+type loadedResourcesEntry struct {
+ // resource is the loaded resource entry. unless address is nil, resource would always contain a valid ledgercore.AccountResource pointer.
+ resource *ledgercore.AccountResource
+ // address might be empty if the resource does not exist. In that case creatableIndex and creatableType would still be valid while resource would be nil.
+ address *basics.Address
+ creatableIndex basics.CreatableIndex
+ creatableType basics.CreatableType
+}
+
+// loadedTransactionGroup is a helper struct to allow asynchronous loading of the account data needed by the transaction groups
+type loadedTransactionGroup struct {
+ // the transaction group
+ txnGroup []transactions.SignedTxnWithAD
+
+ // accounts is a list of all the accounts balance records that the transaction group refer to and are needed.
+ accounts []loadedAccountDataEntry
+
+ // the following four are the resources used by the account
+ resources []loadedResourcesEntry
+
+ // err indicates whether any of the balances in this structure have failed to load. In case of an error, at least
+ // one of the entries in the balances would be uninitialized.
+ err error
+}
+
+// accountPrefetcher used to prefetch accounts balances and resources before the evaluator is being called.
+type accountPrefetcher struct {
+ ledger LedgerForEvaluator
+ rnd basics.Round
+ groups [][]transactions.SignedTxnWithAD
+ feeSinkAddr basics.Address
+ consensusParams config.ConsensusParams
+ outChan chan loadedTransactionGroup
+}
+
+// prefetchAccounts loads the account data for the provided transaction group list. It also loads the feeSink account and add it to the first returned transaction group.
+// The order of the transaction groups returned by the channel is identical to the one in the input array.
+func prefetchAccounts(ctx context.Context, l LedgerForEvaluator, rnd basics.Round, groups [][]transactions.SignedTxnWithAD, feeSinkAddr basics.Address, consensusParams config.ConsensusParams) <-chan loadedTransactionGroup {
+ prefetcher := &accountPrefetcher{
+ ledger: l,
+ rnd: rnd,
+ groups: groups,
+ feeSinkAddr: feeSinkAddr,
+ consensusParams: consensusParams,
+ outChan: make(chan loadedTransactionGroup, len(groups)),
+ }
+
+ go prefetcher.prefetch(ctx)
+ return prefetcher.outChan
+}
+
+// groupTask helps to organize the account loading for each transaction group.
+type groupTask struct {
+ groupTaskIndex int
+ // balances contains the loaded balances each transaction group have
+ balances []loadedAccountDataEntry
+ // balancesCount is the number of balances that nees to be loaded per transaction group
+ balancesCount int
+ // resources contains the loaded resources each of the transaction groups have
+ resources []loadedResourcesEntry
+ // resourcesCount is the number of resources that nees to be loaded per transaction group
+ resourcesCount int
+ // incompleteCount is the number of resources+balances still pending and need to be loaded
+ // this variable is used by as atomic variable to synchronize the readiness of the group taks.
+ incompleteCount int64
+}
+
+// preloaderTask manage the loading of a single element, whether it's a resource or an account address.
+type preloaderTask struct {
+ // account address to fetch
+ address *basics.Address
+ // resource id
+ creatableIndex basics.CreatableIndex
+ // resource type
+ creatableType basics.CreatableType
+ // a list of transaction group tasks that depends on this address or resource
+ groups []*groupTask
+ // a list of indices into the groupTask.balances or groupTask.resources where the address would be stored
+ groupIndices []int
+}
+
+// preloaderTaskQueue is a dynamic linked list of enqueued entries, optimized for non-syncronized insertion and
+// syncronized extraction
+type preloaderTaskQueue struct {
+ next *preloaderTaskQueue
+ used int
+ entries []*preloaderTask
+ baseIdx int
+ maxTxnGroupEntries int
+}
+
+type groupTaskDone struct {
+ groupIdx int
+ err error
+}
+
+func allocPreloaderQueue(count int, maxTxnGroupEntries int) preloaderTaskQueue {
+ return preloaderTaskQueue{
+ entries: make([]*preloaderTask, count*2+maxTxnGroupEntries*2),
+ maxTxnGroupEntries: maxTxnGroupEntries,
+ }
+}
+
+// enqueue places the queued entry on the queue, returning the latest queue
+// ( in case the current "page" ran out of space )
+func (pq *preloaderTaskQueue) enqueue(t *preloaderTask) {
+ pq.entries[pq.used] = t
+ pq.used++
+}
+
+func (pq *preloaderTaskQueue) expand() *preloaderTaskQueue {
+ if cap(pq.entries)-pq.used < pq.maxTxnGroupEntries {
+ pq.next = &preloaderTaskQueue{
+ entries: make([]*preloaderTask, cap(pq.entries)*2),
+ used: 0,
+ baseIdx: pq.baseIdx + pq.used,
+ maxTxnGroupEntries: pq.maxTxnGroupEntries,
+ }
+ return pq.next
+ }
+ return pq
+}
+
+func (pq *preloaderTaskQueue) getTaskAtIndex(idx int) (*preloaderTaskQueue, *preloaderTask) {
+ localIdx := idx - pq.baseIdx
+ if pq.used > localIdx {
+ return pq, pq.entries[localIdx]
+ }
+ if pq.next != nil {
+ return pq.next.getTaskAtIndex(idx)
+ }
+ return pq, nil
+}
+
+type accountCreatableKey struct {
+ address basics.Address
+ cidx basics.CreatableIndex
+}
+
+func loadAccountsAddAccountTask(addr *basics.Address, wt *groupTask, accountTasks map[basics.Address]*preloaderTask, queue *preloaderTaskQueue) {
+ if addr.IsZero() {
+ return
+ }
+ if task, have := accountTasks[*addr]; !have {
+ task := &preloaderTask{
+ address: addr,
+ groups: make([]*groupTask, 1, 4),
+ groupIndices: make([]int, 1, 4),
+ }
+ task.groups[0] = wt
+ task.groupIndices[0] = wt.balancesCount
+
+ accountTasks[*addr] = task
+ queue.enqueue(task)
+ } else {
+ task.groups = append(task.groups, wt)
+ task.groupIndices = append(task.groupIndices, wt.balancesCount)
+ }
+ wt.balancesCount++
+}
+
+func loadAccountsAddResourceTask(addr *basics.Address, cidx basics.CreatableIndex, ctype basics.CreatableType, wt *groupTask, resourceTasks map[accountCreatableKey]*preloaderTask, queue *preloaderTaskQueue) {
+ if cidx == 0 {
+ return
+ }
+ key := accountCreatableKey{
+ cidx: cidx,
+ }
+ if addr != nil {
+ key.address = *addr
+ }
+ if task, have := resourceTasks[key]; !have {
+ task := &preloaderTask{
+ address: addr,
+ groups: make([]*groupTask, 1, 4),
+ groupIndices: make([]int, 1, 4),
+ creatableIndex: cidx,
+ creatableType: ctype,
+ }
+ task.groups[0] = wt
+ task.groupIndices[0] = wt.resourcesCount
+
+ resourceTasks[key] = task
+ queue.enqueue(task)
+ } else {
+ task.groups = append(task.groups, wt)
+ task.groupIndices = append(task.groupIndices, wt.resourcesCount)
+ }
+ wt.resourcesCount++
+}
+
+// prefetch would process the input transaction groups by analyzing each of the transaction groups and building
+// an execution queue that would allow us to fetch all the dependencies for the input transaction groups in order
+// and output these onto a channel.
+func (p *accountPrefetcher) prefetch(ctx context.Context) {
+ defer close(p.outChan)
+ accountTasks := make(map[basics.Address]*preloaderTask)
+ resourceTasks := make(map[accountCreatableKey]*preloaderTask)
+
+ var maxTxnGroupEntries int
+ if p.consensusParams.Application {
+ // the extra two are for the sender account data, plus the application global state
+ maxTxnGroupEntries = p.consensusParams.MaxTxGroupSize * (2 + p.consensusParams.MaxAppTxnAccounts + p.consensusParams.MaxAppTxnForeignApps + p.consensusParams.MaxAppTxnForeignAssets)
+ } else {
+ // 8 is the number of resources+account used in the AssetTransferTx, which is the largest one.
+ maxTxnGroupEntries = p.consensusParams.MaxTxGroupSize * 8
+ }
+
+ tasksQueue := allocPreloaderQueue(len(p.groups), maxTxnGroupEntries)
+
+ // totalBalances counts the total number of balances over all the transaction groups
+ totalBalances := 0
+ totalResources := 0
+
+ groupsReady := make([]groupTask, len(p.groups))
+
+ // Add fee sink to the first group
+ if len(p.groups) > 0 {
+ // the feeSinkAddr is known to be non-empty
+ feeSinkPreloader := &preloaderTask{
+ address: &p.feeSinkAddr,
+ groups: []*groupTask{&groupsReady[0]},
+ groupIndices: []int{0},
+ }
+ groupsReady[0].balancesCount = 1
+ accountTasks[p.feeSinkAddr] = feeSinkPreloader
+ tasksQueue.enqueue(feeSinkPreloader)
+ }
+
+ // iterate over the transaction groups and add all their account addresses to the list
+ queue := &tasksQueue
+ for i := range p.groups {
+ task := &groupsReady[i]
+ for j := range p.groups[i] {
+ stxn := &p.groups[i][j]
+ switch stxn.Txn.Type {
+ case protocol.PaymentTx:
+ loadAccountsAddAccountTask(&stxn.Txn.Receiver, task, accountTasks, queue)
+ loadAccountsAddAccountTask(&stxn.Txn.CloseRemainderTo, task, accountTasks, queue)
+ case protocol.AssetConfigTx:
+ loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.ConfigAsset), basics.AssetCreatable, task, resourceTasks, queue)
+ case protocol.AssetTransferTx:
+ loadAccountsAddResourceTask(&stxn.Txn.Sender, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue)
+ if !stxn.Txn.AssetSender.IsZero() {
+ loadAccountsAddResourceTask(&stxn.Txn.AssetSender, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue)
+ loadAccountsAddAccountTask(&stxn.Txn.AssetSender, task, accountTasks, queue)
+ }
+ if !stxn.Txn.AssetReceiver.IsZero() {
+ loadAccountsAddResourceTask(&stxn.Txn.AssetReceiver, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue)
+ loadAccountsAddAccountTask(&stxn.Txn.AssetReceiver, task, accountTasks, queue)
+ }
+ if !stxn.Txn.AssetCloseTo.IsZero() {
+ loadAccountsAddResourceTask(&stxn.Txn.AssetCloseTo, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue)
+ loadAccountsAddAccountTask(&stxn.Txn.AssetCloseTo, task, accountTasks, queue)
+ }
+ case protocol.AssetFreezeTx:
+ if !stxn.Txn.FreezeAccount.IsZero() {
+ loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.FreezeAsset), basics.AssetCreatable, task, resourceTasks, queue)
+ loadAccountsAddResourceTask(&stxn.Txn.FreezeAccount, basics.CreatableIndex(stxn.Txn.FreezeAsset), basics.AssetCreatable, task, resourceTasks, queue)
+ loadAccountsAddAccountTask(&stxn.Txn.FreezeAccount, task, accountTasks, queue)
+ }
+ case protocol.ApplicationCallTx:
+ if stxn.Txn.ApplicationID != 0 {
+ // load the global - so that we'll have the program
+ loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.ApplicationID), basics.AppCreatable, task, resourceTasks, queue)
+ // load the local - so that we'll have the local state
+ // this is something we need to decide if we want to enable, since not every application call would use local storage.
+ //loadAccountsAddResourceTask(&stxn.Txn.Sender, basics.CreatableIndex(stxn.Txn.ApplicationID), basics.AppCreatable, task, resourceTasks, queue)
+ }
+ for _, fa := range stxn.Txn.ForeignApps {
+ loadAccountsAddResourceTask(nil, basics.CreatableIndex(fa), basics.AppCreatable, task, resourceTasks, queue)
+ }
+ for _, fa := range stxn.Txn.ForeignAssets {
+ loadAccountsAddResourceTask(nil, basics.CreatableIndex(fa), basics.AssetCreatable, task, resourceTasks, queue)
+ }
+ for ixa := range stxn.Txn.Accounts {
+ loadAccountsAddAccountTask(&stxn.Txn.Accounts[ixa], task, accountTasks, queue)
+ }
+ case protocol.CompactCertTx:
+ fallthrough
+ case protocol.KeyRegistrationTx:
+ fallthrough
+ default:
+ }
+ // If you add new addresses here, also add them in getTxnAddresses().
+ if !stxn.Txn.Sender.IsZero() {
+ loadAccountsAddAccountTask(&stxn.Txn.Sender, task, accountTasks, queue)
+ }
+ }
+ totalBalances += task.balancesCount
+ totalResources += task.resourcesCount
+ // expand the queue if needed.
+ queue = queue.expand()
+ }
+
+ // find the number of tasks
+ tasksCount := int64(0)
+ for lastQueueEntry := &tasksQueue; ; lastQueueEntry = lastQueueEntry.next {
+ if lastQueueEntry.next == nil {
+ tasksCount = int64(lastQueueEntry.baseIdx + lastQueueEntry.used)
+ break
+ }
+ }
+
+ // update all the groups task :
+ // allocate the correct number of balances, as well as
+ // enough space on the "done" channel.
+ allBalances := make([]loadedAccountDataEntry, totalBalances)
+ allResources := make([]loadedResourcesEntry, totalResources)
+ usedBalances := 0
+ usedResources := 0
+
+ // groupDoneCh is used to communicate the completion signal for a single
+ // resource/address load between the go-routines and the main output channel
+ // writer loop. The various go-routines would write to the channel the index
+ // of the task that is complete and ready to be sent.
+ groupDoneCh := make(chan groupTaskDone, len(groupsReady))
+ const dependencyFreeGroup = -int64(^uint64(0)/2) - 1
+ for grpIdx := range groupsReady {
+ gr := &groupsReady[grpIdx]
+ gr.groupTaskIndex = grpIdx
+ gr.incompleteCount = int64(gr.balancesCount + gr.resourcesCount)
+ gr.balances = allBalances[usedBalances : usedBalances+gr.balancesCount]
+ if gr.resourcesCount > 0 {
+ gr.resources = allResources[usedResources : usedResources+gr.resourcesCount]
+ usedResources += gr.resourcesCount
+ }
+ usedBalances += gr.balancesCount
+ if gr.incompleteCount == 0 {
+ gr.incompleteCount = dependencyFreeGroup
+ }
+ }
+
+ taskIdx := int64(-1)
+ defer atomic.StoreInt64(&taskIdx, tasksCount)
+ // create few go-routines to load asyncroniously the account data.
+ for i := 0; i < asyncAccountLoadingThreadCount; i++ {
+ go p.asyncPrefetchRoutine(&tasksQueue, &taskIdx, groupDoneCh)
+ }
+
+ // iterate on the transaction groups tasks. This array retains the original order.
+ completed := make(map[int]bool)
+ for i := 0; i < len(p.groups); {
+ wait:
+ incompleteCount := atomic.LoadInt64(&groupsReady[i].incompleteCount)
+ if incompleteCount > 0 || (incompleteCount != dependencyFreeGroup && !completed[i]) {
+ select {
+ case done := <-groupDoneCh:
+ if done.err != nil {
+ // if there is an error, report the error to the output channel.
+ p.outChan <- loadedTransactionGroup{
+ err: done.err,
+ }
+ return
+ }
+ if done.groupIdx > i {
+ // mark future txn as ready.
+ completed[done.groupIdx] = true
+ goto wait
+ } else if done.groupIdx < i {
+ // it was already processed.
+ goto wait
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ next := i
+ for ; next < len(p.groups); next++ {
+ if !completed[next] {
+ if next > i {
+ i = next
+ goto wait
+ }
+ // next == i
+ }
+
+ delete(completed, next)
+
+ // if we had no error, write the result to the output channel.
+ // this write will not block since we preallocated enough space on the channel.
+ p.outChan <- loadedTransactionGroup{
+ txnGroup: p.groups[next],
+ accounts: groupsReady[next].balances,
+ resources: groupsReady[next].resources,
+ }
+ }
+ // if we get to this point, it means that we have no more transaction to process.
+ break
+ }
+}
+
+func (gt *groupTask) markCompletionAcct(idx int, br loadedAccountDataEntry, groupDoneCh chan groupTaskDone) {
+ gt.balances[idx] = br
+ if atomic.AddInt64(&gt.incompleteCount, -1) == 0 {
+ groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex}
+ }
+}
+
+func (gt *groupTask) markCompletionResource(idx int, res loadedResourcesEntry, groupDoneCh chan groupTaskDone) {
+ gt.resources[idx] = res
+ if atomic.AddInt64(&gt.incompleteCount, -1) == 0 {
+ groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex}
+ }
+}
+
+func (gt *groupTask) markCompletionAcctError(err error, groupDoneCh chan groupTaskDone) {
+ for {
+ curVal := atomic.LoadInt64(&gt.incompleteCount)
+ if curVal <= 0 {
+ return
+ }
+ if atomic.CompareAndSwapInt64(&gt.incompleteCount, curVal, 0) {
+ groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex, err: err}
+ return
+ }
+ }
+}
+
+func (p *accountPrefetcher) asyncPrefetchRoutine(queue *preloaderTaskQueue, taskIdx *int64, groupDoneCh chan groupTaskDone) {
+ var task *preloaderTask
+ var err error
+ for {
+ nextTaskIdx := atomic.AddInt64(taskIdx, 1)
+ queue, task = queue.getTaskAtIndex(int(nextTaskIdx))
+ if task == nil {
+ // no more tasks.
+ return
+ }
+ if task.creatableIndex == 0 {
+ // lookup the account data directly from the ledger.
+ var acctData ledgercore.AccountData
+ acctData, _, err = p.ledger.LookupWithoutRewards(p.rnd, *task.address)
+ // if there was an error..
+ if err != nil {
+ // there was an error loading that entry.
+ break
+ }
+ br := loadedAccountDataEntry{
+ address: task.address,
+ data: &acctData,
+ }
+ // update all the group tasks with the new acquired balance.
+ for i, wt := range task.groups {
+ wt.markCompletionAcct(task.groupIndices[i], br, groupDoneCh)
+ }
+ continue
+ }
+ if task.address == nil {
+ // start off by figuring out the creator in case it's a global resource.
+ var creator basics.Address
+ var ok bool
+ creator, ok, err = p.ledger.GetCreatorForRound(p.rnd, task.creatableIndex, task.creatableType)
+ if err != nil {
+ // there was an error loading that entry.
+ break
+ }
+ if !ok {
+ re := loadedResourcesEntry{
+ creatableIndex: task.creatableIndex,
+ creatableType: task.creatableType,
+ }
+ // update all the group tasks with the new acquired balance.
+ for i, wt := range task.groups {
+ wt.markCompletionResource(task.groupIndices[i], re, groupDoneCh)
+ }
+ continue
+ }
+ task.address = &creator
+ }
+ var resource ledgercore.AccountResource
+ resource, err = p.ledger.LookupResource(p.rnd, *task.address, task.creatableIndex, task.creatableType)
+ if err != nil {
+ // there was an error loading that entry.
+ break
+ }
+ re := loadedResourcesEntry{
+ resource: &resource,
+ address: task.address,
+ creatableIndex: task.creatableIndex,
+ creatableType: task.creatableType,
+ }
+ // update all the group tasks with the new acquired balance.
+ for i, wt := range task.groups {
+ wt.markCompletionResource(task.groupIndices[i], re, groupDoneCh)
+ }
+ }
+ // if we got here, it means that there was an error.
+ // in every case we get here, the task is gurenteed to be a non-nil.
+ for _, wt := range task.groups {
+ // notify the channel of the error.
+ wt.markCompletionAcctError(err, groupDoneCh)
+ }
+}
diff --git a/ledger/internal/evalprefetcher_test.go b/ledger/internal/evalprefetcher_test.go
new file mode 100644
index 000000000..96e1a9386
--- /dev/null
+++ b/ledger/internal/evalprefetcher_test.go
@@ -0,0 +1,691 @@
+// Copyright (C) 2019-2022 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package internal
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/crypto"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/data/bookkeeping"
+ "github.com/algorand/go-algorand/data/transactions"
+ "github.com/algorand/go-algorand/ledger/ledgercore"
+ "github.com/algorand/go-algorand/protocol"
+ "github.com/algorand/go-algorand/test/partitiontest"
+)
+
+type prefetcherTestLedger struct {
+ round basics.Round
+ balances map[basics.Address]ledgercore.AccountData
+ creators map[basics.CreatableIndex]basics.Address
+}
+
+func (l *prefetcherTestLedger) BlockHdr(basics.Round) (bookkeeping.BlockHeader, error) {
+ return bookkeeping.BlockHeader{}, nil
+}
+func (l *prefetcherTestLedger) CheckDup(config.ConsensusParams, basics.Round, basics.Round, basics.Round, transactions.Txid, ledgercore.Txlease) error {
+ return nil
+}
+func (l *prefetcherTestLedger) LookupWithoutRewards(_ basics.Round, addr basics.Address) (ledgercore.AccountData, basics.Round, error) {
+ if data, has := l.balances[addr]; has {
+ return data, l.round, nil
+ }
+ return ledgercore.AccountData{}, l.round, nil
+}
+func (l *prefetcherTestLedger) LookupResource(basics.Round, basics.Address, basics.CreatableIndex, basics.CreatableType) (ledgercore.AccountResource, error) {
+ return ledgercore.AccountResource{}, nil
+}
+func (l *prefetcherTestLedger) GetCreatorForRound(_ basics.Round, cidx basics.CreatableIndex, _ basics.CreatableType) (basics.Address, bool, error) {
+ if addr, has := l.creators[cidx]; has {
+ return addr, true, nil
+ }
+ return basics.Address{}, false, nil
+}
+func (l *prefetcherTestLedger) GenesisHash() crypto.Digest {
+ return crypto.Digest{}
+}
+func (l *prefetcherTestLedger) GenesisProto() config.ConsensusParams {
+ return config.Consensus[protocol.ConsensusCurrentVersion]
+}
+func (l *prefetcherTestLedger) LatestTotals() (basics.Round, ledgercore.AccountTotals, error) {
+ return l.round, ledgercore.AccountTotals{}, nil
+}
+func (l *prefetcherTestLedger) CompactCertVoters(basics.Round) (*ledgercore.VotersForRound, error) {
+ return nil, nil
+}
+
+func TestEvaluatorPrefetcher(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ acctAddrPtr := func(i int) (o *basics.Address) {
+ o = new(basics.Address)
+ o[0] = byte(i)
+ o[1] = byte(i >> 8)
+ o[2] = byte(i >> 16)
+ return
+ }
+ acctAddr := func(i int) (o basics.Address) {
+ t := *acctAddrPtr(i)
+ copy(o[:], t[:])
+ return
+ }
+
+ rnd := basics.Round(5)
+ var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
+
+ var ledger = &prefetcherTestLedger{
+ round: rnd,
+ balances: make(map[basics.Address]ledgercore.AccountData),
+ creators: make(map[basics.CreatableIndex]basics.Address),
+ }
+ ledger.balances[acctAddr(1)] = ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ }
+ ledger.creators[1001] = acctAddr(2)
+ ledger.creators[2001] = acctAddr(15)
+
+ type testTransactionCases struct {
+ signedTxn transactions.SignedTxn
+ accounts []loadedAccountDataEntry
+ resources []loadedResourcesEntry
+ }
+
+ testTransactions := []testTransactionCases{
+ // payment transaction
+ {
+ signedTxn: transactions.SignedTxn{
+ Txn: transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: acctAddr(1),
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: acctAddr(2),
+ CloseRemainderTo: acctAddr(3),
+ },
+ },
+ },
+ accounts: []loadedAccountDataEntry{
+ {
+ address: &feeSinkAddr,
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(1),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ },
+ },
+ {
+ address: acctAddrPtr(2),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(3),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ },
+ },
+ // asset config transaction for a non-existing asset
+ {
+ signedTxn: transactions.SignedTxn{
+ Txn: transactions.Transaction{
+ Type: protocol.AssetConfigTx,
+ Header: transactions.Header{
+ Sender: acctAddr(1),
+ },
+ AssetConfigTxnFields: transactions.AssetConfigTxnFields{
+ ConfigAsset: 1000,
+ },
+ },
+ },
+ accounts: []loadedAccountDataEntry{
+ {
+ address: &feeSinkAddr,
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(1),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ },
+ },
+ },
+ resources: []loadedResourcesEntry{
+ {
+ address: nil,
+ creatableIndex: 1000,
+ creatableType: basics.AssetCreatable,
+ resource: nil,
+ },
+ },
+ },
+ // asset config transaction for an existing asset
+ {
+ signedTxn: transactions.SignedTxn{
+ Txn: transactions.Transaction{
+ Type: protocol.AssetConfigTx,
+ Header: transactions.Header{
+ Sender: acctAddr(1),
+ },
+ AssetConfigTxnFields: transactions.AssetConfigTxnFields{
+ ConfigAsset: 1001,
+ },
+ },
+ },
+ accounts: []loadedAccountDataEntry{
+ {
+ address: &feeSinkAddr,
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(1),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ },
+ },
+ },
+ resources: []loadedResourcesEntry{
+ {
+ address: acctAddrPtr(2),
+ creatableIndex: 1001,
+ creatableType: basics.AssetCreatable,
+ resource: &ledgercore.AccountResource{},
+ },
+ },
+ },
+ // asset transfer transaction
+ {
+ signedTxn: transactions.SignedTxn{
+ Txn: transactions.Transaction{
+ Type: protocol.AssetTransferTx,
+ Header: transactions.Header{
+ Sender: acctAddr(1),
+ },
+ AssetTransferTxnFields: transactions.AssetTransferTxnFields{
+ XferAsset: 1001,
+ AssetSender: acctAddr(2),
+ AssetReceiver: acctAddr(3),
+ AssetCloseTo: acctAddr(4),
+ },
+ },
+ },
+ accounts: []loadedAccountDataEntry{
+ {
+ address: &feeSinkAddr,
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(1),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ },
+ },
+ {
+ address: acctAddrPtr(2),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(3),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(4),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ },
+ resources: []loadedResourcesEntry{
+ {
+ address: acctAddrPtr(1),
+ creatableIndex: 1001,
+ creatableType: basics.AssetCreatable,
+ resource: &ledgercore.AccountResource{},
+ },
+ {
+ address: acctAddrPtr(2),
+ creatableIndex: 1001,
+ creatableType: basics.AssetCreatable,
+ resource: &ledgercore.AccountResource{},
+ },
+ {
+ address: acctAddrPtr(3),
+ creatableIndex: 1001,
+ creatableType: basics.AssetCreatable,
+ resource: &ledgercore.AccountResource{},
+ },
+ {
+ address: acctAddrPtr(4),
+ creatableIndex: 1001,
+ creatableType: basics.AssetCreatable,
+ resource: &ledgercore.AccountResource{},
+ },
+ },
+ },
+ // asset freeze transaction
+ {
+ signedTxn: transactions.SignedTxn{
+ Txn: transactions.Transaction{
+ Type: protocol.AssetFreezeTx,
+ Header: transactions.Header{
+ Sender: acctAddr(1),
+ },
+ AssetFreezeTxnFields: transactions.AssetFreezeTxnFields{
+ FreezeAccount: acctAddr(3),
+ FreezeAsset: 1001,
+ },
+ },
+ },
+ accounts: []loadedAccountDataEntry{
+ {
+ address: &feeSinkAddr,
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(1),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ },
+ },
+ {
+ address: acctAddrPtr(3),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ },
+ resources: []loadedResourcesEntry{
+ {
+ address: acctAddrPtr(2),
+ creatableIndex: 1001,
+ creatableType: basics.AssetCreatable,
+ resource: &ledgercore.AccountResource{},
+ },
+ {
+ address: acctAddrPtr(3),
+ creatableIndex: 1001,
+ creatableType: basics.AssetCreatable,
+ resource: &ledgercore.AccountResource{},
+ },
+ },
+ },
+ // application transaction
+ {
+ signedTxn: transactions.SignedTxn{
+ Txn: transactions.Transaction{
+ Type: protocol.ApplicationCallTx,
+ Header: transactions.Header{
+ Sender: acctAddr(1),
+ },
+ ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{
+ ApplicationID: 10,
+ Accounts: []basics.Address{
+ acctAddr(4),
+ acctAddr(5),
+ },
+ ForeignApps: []basics.AppIndex{
+ 2001,
+ 2002,
+ },
+ ForeignAssets: []basics.AssetIndex{
+ 1001,
+ },
+ },
+ },
+ },
+ accounts: []loadedAccountDataEntry{
+ {
+ address: &feeSinkAddr,
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(1),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ },
+ },
+ {
+ address: acctAddrPtr(4),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ {
+ address: acctAddrPtr(5),
+ data: &ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}},
+ },
+ },
+ },
+ resources: []loadedResourcesEntry{
+ {
+ address: acctAddrPtr(2),
+ creatableIndex: 1001,
+ creatableType: basics.AssetCreatable,
+ resource: &ledgercore.AccountResource{},
+ },
+ {
+ address: acctAddrPtr(15),
+ creatableIndex: 2001,
+ creatableType: basics.AppCreatable,
+ resource: &ledgercore.AccountResource{},
+ },
+ {
+ address: nil,
+ creatableIndex: 2002,
+ creatableType: basics.AppCreatable,
+ resource: nil,
+ },
+ /* - if we'll decide that we want to perfetch the account local state, then this should be enabled.
+ {
+ address: acctAddrPtr(1),
+ creatableIndex: 10,
+ creatableType: basics.AppCreatable,
+ resource: &ledgercore.AccountResource{},
+ },*/
+ {
+ address: nil,
+ creatableIndex: 10,
+ creatableType: basics.AppCreatable,
+ resource: nil,
+ },
+ },
+ },
+ }
+
+ for _, txn := range testTransactions {
+ groups := make([][]transactions.SignedTxnWithAD, 1)
+ groups[0] = make([]transactions.SignedTxnWithAD, 1)
+ groups[0][0].SignedTxn = txn.signedTxn
+
+ preloadedTxnGroupsCh := prefetchAccounts(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[protocol.ConsensusCurrentVersion])
+
+ for loadedTxnGroup := range preloadedTxnGroupsCh {
+ require.NoError(t, loadedTxnGroup.err)
+
+ // compare the txn.accounts and loadedTxnGroup.accounts in order agnostic way.
+ require.Equal(t, len(txn.accounts), len(loadedTxnGroup.accounts))
+ for _, acct := range txn.accounts {
+ // make sure we find it in loadedTxnGroup.accounts
+ found := false
+ require.NotNil(t, acct.address)
+ for k, loadedAcct := range loadedTxnGroup.accounts {
+ require.NotNilf(t, loadedAcct.address, "index: %d\nexpected %#v\nactual %#v", k, acct, loadedAcct)
+ if *acct.address != *loadedAcct.address {
+ continue
+ }
+ require.Equal(t, *acct.data, *loadedAcct.data)
+ found = true
+ break
+ }
+ require.Truef(t, found, "missing account %#v", acct)
+ }
+
+ // compare the txn.resources and loadedTxnGroup.resources in order agnostic way
+ require.Equalf(t, len(txn.resources), len(loadedTxnGroup.resources), "mismatching resources count; actual : %v", loadedTxnGroup.resources)
+ for _, res := range txn.resources {
+ // make sure we find it in loadedTxnGroup.resources
+ found := false
+ for _, loadedRes := range loadedTxnGroup.resources {
+ if res.creatableIndex != loadedRes.creatableIndex {
+ continue
+ }
+ require.Equal(t, res.creatableType, loadedRes.creatableType)
+ if res.address == nil {
+ require.Nil(t, loadedRes.address)
+ } else {
+ if loadedRes.address == nil || *res.address != *loadedRes.address {
+ continue
+ }
+ }
+ if res.resource == nil {
+ require.Nil(t, loadedRes.resource)
+ } else {
+ require.NotNil(t, loadedRes.resource)
+ require.Equal(t, *res.resource, *loadedRes.resource)
+ }
+ found = true
+ break
+ }
+ require.Truef(t, found, "failed to find resource %#v", res)
+ }
+ }
+ }
+}
+
+func TestEvaluatorPrefetcherQueueExpansion(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ acctAddrPtr := func(i int) (o *basics.Address) {
+ o = new(basics.Address)
+ o[0] = byte(i)
+ o[1] = byte(i >> 8)
+ o[2] = byte(i >> 16)
+ return
+ }
+ acctAddr := func(i int) (o basics.Address) {
+ t := *acctAddrPtr(i)
+ copy(o[:], t[:])
+ return
+ }
+
+ rnd := basics.Round(5)
+ var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
+
+ var ledger = &prefetcherTestLedger{
+ round: rnd,
+ balances: make(map[basics.Address]ledgercore.AccountData),
+ creators: make(map[basics.CreatableIndex]basics.Address),
+ }
+ ledger.balances[acctAddr(1)] = ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ }
+ type testTransactionCases struct {
+ signedTxn transactions.SignedTxn
+ accounts []loadedAccountDataEntry
+ resources []loadedResourcesEntry
+ }
+
+ txnGroups := make([][]transactions.SignedTxnWithAD, 20000)
+ addr := 1
+ for i := range txnGroups {
+ txnGroups[i] = make([]transactions.SignedTxnWithAD, 16)
+ for k := range txnGroups[i] {
+ txnGroups[i][k].SignedTxn = transactions.SignedTxn{
+ Txn: transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: acctAddr(1),
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: acctAddr(addr),
+ CloseRemainderTo: acctAddr(addr + 1),
+ },
+ },
+ }
+ addr += 2
+ }
+ }
+ preloadedTxnGroupsCh := prefetchAccounts(context.Background(), ledger, rnd, txnGroups, feeSinkAddr, config.Consensus[protocol.ConsensusCurrentVersion])
+ groupsCount := 0
+ addressCount := 0
+ uniqueAccounts := make(map[basics.Address]bool)
+ for k := range preloadedTxnGroupsCh {
+ addressCount += len(k.accounts)
+ for _, acct := range k.accounts {
+ uniqueAccounts[*acct.address] = true
+ }
+ require.Equal(t, txnGroups[groupsCount], k.txnGroup)
+ groupsCount++
+ }
+ require.Equal(t, len(txnGroups), groupsCount)
+ // the +1 below is for the fee sink address.
+ require.Equal(t, len(txnGroups)*16*3+1, addressCount)
+ require.Equal(t, len(txnGroups)*16*2+1, len(uniqueAccounts))
+}
+
+func BenchmarkPrefetcherApps(b *testing.B) {
+ acctAddrPtr := func(i int) (o *basics.Address) {
+ o = new(basics.Address)
+ o[0] = byte(i)
+ o[1] = byte(i >> 8)
+ o[2] = byte(i >> 16)
+ return
+ }
+ acctAddr := func(i int) (o basics.Address) {
+ t := *acctAddrPtr(i)
+ copy(o[:], t[:])
+ return
+ }
+
+ txnGroupLen := 16
+ groups := make([][]transactions.SignedTxnWithAD, 1+b.N/txnGroupLen)
+ for grpIdx := range groups {
+ groups[grpIdx] = make([]transactions.SignedTxnWithAD, txnGroupLen)
+ for txnIdx := range groups[grpIdx] {
+ groups[grpIdx][txnIdx].SignedTxn = transactions.SignedTxn{
+ Txn: transactions.Transaction{
+ Type: protocol.ApplicationCallTx,
+ Header: transactions.Header{
+ Sender: acctAddr(grpIdx + txnIdx),
+ },
+ ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{
+ ApplicationID: 10,
+ Accounts: []basics.Address{
+ acctAddr(grpIdx + txnIdx + 1),
+ acctAddr(grpIdx + txnIdx + 1),
+ },
+ ForeignApps: []basics.AppIndex{
+ 2001,
+ 2002,
+ },
+ ForeignAssets: []basics.AssetIndex{
+ 1001,
+ },
+ },
+ },
+ }
+ }
+ }
+ rnd := basics.Round(5)
+ var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
+ var ledger = &prefetcherTestLedger{
+ round: rnd,
+ balances: make(map[basics.Address]ledgercore.AccountData),
+ creators: make(map[basics.CreatableIndex]basics.Address),
+ }
+ ledger.balances[acctAddr(1)] = ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ }
+
+ b.ResetTimer()
+ preloadedTxnGroupsCh := prefetchAccounts(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[protocol.ConsensusCurrentVersion])
+ for k := range preloadedTxnGroupsCh {
+ require.NoError(b, k.err)
+ }
+}
+
+func BenchmarkPrefetcherPayment(b *testing.B) {
+ acctAddrPtr := func(i int) (o *basics.Address) {
+ o = new(basics.Address)
+ o[0] = byte(i)
+ o[1] = byte(i >> 8)
+ o[2] = byte(i >> 16)
+ return
+ }
+ acctAddr := func(i int) (o basics.Address) {
+ t := *acctAddrPtr(i)
+ copy(o[:], t[:])
+ return
+ }
+
+ txnGroupLen := 16
+ groups := make([][]transactions.SignedTxnWithAD, 1+b.N/txnGroupLen)
+ for grpIdx := range groups {
+ groups[grpIdx] = make([]transactions.SignedTxnWithAD, txnGroupLen)
+ for txnIdx := range groups[grpIdx] {
+ groups[grpIdx][txnIdx].SignedTxn = transactions.SignedTxn{
+ Txn: transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: acctAddr(grpIdx + txnIdx),
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: acctAddr(grpIdx + txnIdx + 1),
+ CloseRemainderTo: acctAddr(grpIdx + txnIdx + 2),
+ },
+ },
+ }
+ }
+ }
+ rnd := basics.Round(5)
+ var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
+ var ledger = &prefetcherTestLedger{
+ round: rnd,
+ balances: make(map[basics.Address]ledgercore.AccountData),
+ creators: make(map[basics.CreatableIndex]basics.Address),
+ }
+ ledger.balances[acctAddr(1)] = ledgercore.AccountData{
+ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
+ }
+
+ b.ResetTimer()
+ preloadedTxnGroupsCh := prefetchAccounts(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[protocol.ConsensusCurrentVersion])
+ for k := range preloadedTxnGroupsCh {
+ require.NoError(b, k.err)
+ }
+}
+func BenchmarkChannelWrites(b *testing.B) {
+ b.Run("groupTaskDone", func(b *testing.B) {
+ c := make(chan groupTaskDone, b.N)
+ for i := 0; i < b.N; i++ {
+ c <- groupTaskDone{groupIdx: i}
+ }
+ })
+
+ b.Run("int64", func(b *testing.B) {
+ c := make(chan int64, b.N)
+ for i := int64(0); i < int64(b.N); i++ {
+ c <- i
+ }
+ })
+}