summaryrefslogtreecommitdiff
path: root/ledger/acctupdates_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'ledger/acctupdates_test.go')
-rw-r--r--ledger/acctupdates_test.go568
1 files changed, 135 insertions, 433 deletions
diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go
index 90ca7d205..27306af53 100644
--- a/ledger/acctupdates_test.go
+++ b/ledger/acctupdates_test.go
@@ -24,7 +24,6 @@ import (
"fmt"
"io/ioutil"
"os"
- "path/filepath"
"runtime"
"strings"
"sync"
@@ -37,13 +36,18 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
+ "github.com/algorand/go-algorand/ledger/internal"
"github.com/algorand/go-algorand/ledger/ledgercore"
+ ledgertesting "github.com/algorand/go-algorand/ledger/testing"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/db"
)
+var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
+var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36}
+
type mockLedgerForTracker struct {
dbs db.Pair
blocks []blockEntry
@@ -52,6 +56,10 @@ type mockLedgerForTracker struct {
filename string
inMemory bool
consensusParams config.ConsensusParams
+ accts map[basics.Address]basics.AccountData
+
+ // trackerRegistry manages persistence into DB so we have to have it here even for a single tracker test
+ trackers trackerRegistry
}
func accumulateTotals(t testing.TB, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData, rewardLevel uint64) (totals ledgercore.AccountTotals) {
@@ -84,7 +92,7 @@ func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount in
}
}
consensusParams := config.Consensus[consensusVersion]
- return &mockLedgerForTracker{dbs: dbs, log: dblogger, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams}
+ return &mockLedgerForTracker{dbs: dbs, log: dblogger, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]}
}
// fork creates another database which has the same content as the current one. Works only for non-memory databases.
@@ -102,8 +110,12 @@ func (ml *mockLedgerForTracker) fork(t testing.TB) *mockLedgerForTracker {
log: dblogger,
blocks: make([]blockEntry, len(ml.blocks)),
deltas: make([]ledgercore.StateDelta, len(ml.deltas)),
+ accts: make(map[basics.Address]basics.AccountData),
filename: fn,
}
+ for k, v := range ml.accts {
+ newLedgerTracker.accts[k] = v
+ }
copy(newLedgerTracker.blocks, ml.blocks)
copy(newLedgerTracker.deltas, ml.deltas)
@@ -126,6 +138,8 @@ func (ml *mockLedgerForTracker) fork(t testing.TB) *mockLedgerForTracker {
}
func (ml *mockLedgerForTracker) Close() {
+ ml.trackers.close()
+
ml.dbs.Close()
// delete the database files of non-memory instances.
if !ml.inMemory {
@@ -145,7 +159,7 @@ func (ml *mockLedgerForTracker) addMockBlock(be blockEntry, delta ledgercore.Sta
return nil
}
-func (ml *mockLedgerForTracker) trackerEvalVerified(blk bookkeeping.Block, accUpdatesLedger ledgerForEvaluator) (ledgercore.StateDelta, error) {
+func (ml *mockLedgerForTracker) trackerEvalVerified(blk bookkeeping.Block, accUpdatesLedger internal.LedgerForEvaluator) (ledgercore.StateDelta, error) {
// support returning the deltas if the client explicitly provided them by calling addMockBlock, otherwise,
// just return an empty state delta ( since the client clearly didn't care about these )
if len(ml.deltas) > int(blk.Round()) {
@@ -195,6 +209,10 @@ func (ml *mockLedgerForTracker) GenesisProto() config.ConsensusParams {
return ml.consensusParams
}
+func (ml *mockLedgerForTracker) GenesisAccounts() map[basics.Address]basics.AccountData {
+ return ml.accts
+}
+
// this function used to be in acctupdates.go, but we were never using it for production purposes. This
// function has a conceptual flaw in that it attempts to load the entire balances into memory. This might
// not work if we have large number of balances. On these unit testing, however, it's not the case, and it's
@@ -226,15 +244,28 @@ func (au *accountUpdates) allBalances(rnd basics.Round) (bals map[basics.Address
return
}
+func newAcctUpdates(tb testing.TB, l *mockLedgerForTracker, conf config.Local, dbPathPrefix string) *accountUpdates {
+ au := &accountUpdates{}
+ au.initialize(conf)
+ _, err := trackerDBInitialize(l, false, ".")
+ require.NoError(tb, err)
+
+ l.trackers.initialize(l, []ledgerTracker{au}, conf)
+ err = l.trackers.loadFromDisk(l)
+ require.NoError(tb, err)
+
+ return au
+}
+
func checkAcctUpdates(t *testing.T, au *accountUpdates, base basics.Round, latestRnd basics.Round, accts []map[basics.Address]basics.AccountData, rewards []uint64, proto config.ConsensusParams) {
latest := au.latest()
- require.Equal(t, latest, latestRnd)
+ require.Equal(t, latestRnd, latest)
_, err := au.Totals(latest + 1)
require.Error(t, err)
var validThrough basics.Round
- _, validThrough, err = au.LookupWithoutRewards(latest+1, randomAddress())
+ _, validThrough, err = au.LookupWithoutRewards(latest+1, ledgertesting.RandomAddress())
require.Error(t, err)
require.Equal(t, basics.Round(0), validThrough)
@@ -242,7 +273,7 @@ func checkAcctUpdates(t *testing.T, au *accountUpdates, base basics.Round, lates
_, err := au.Totals(base - 1)
require.Error(t, err)
- _, validThrough, err = au.LookupWithoutRewards(base-1, randomAddress())
+ _, validThrough, err = au.LookupWithoutRewards(base-1, ledgertesting.RandomAddress())
require.Error(t, err)
require.Equal(t, basics.Round(0), validThrough)
}
@@ -301,7 +332,7 @@ func checkAcctUpdates(t *testing.T, au *accountUpdates, base basics.Round, lates
require.Equal(t, totals.Participating().Raw, totalOnline+totalOffline)
require.Equal(t, totals.All().Raw, totalOnline+totalOffline+totalNotPart)
- d, validThrough, err := au.LookupWithoutRewards(rnd, randomAddress())
+ d, validThrough, err := au.LookupWithoutRewards(rnd, ledgertesting.RandomAddress())
require.NoError(t, err)
require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), fmt.Sprintf("validThrough :%v\nrnd :%v\n", validThrough, rnd))
require.Equal(t, d, basics.AccountData{})
@@ -334,7 +365,7 @@ func TestAcctUpdates(t *testing.T) {
}
proto := config.Consensus[protocol.ConsensusCurrentVersion]
- accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)}
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
rewardsLevels := []uint64{0}
pooldata := basics.AccountData{}
@@ -350,13 +381,10 @@ func TestAcctUpdates(t *testing.T) {
ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts)
defer ml.Close()
- au := &accountUpdates{}
- au.initialize(config.GetDefaultLocal(), ".", proto, accts[0])
+ conf := config.GetDefaultLocal()
+ au := newAcctUpdates(t, ml, conf, ".")
defer au.close()
- err := au.loadFromDisk(ml)
- require.NoError(t, err)
-
// cover 10 genesis blocks
rewardLevel := uint64(0)
for i := 1; i < 10; i++ {
@@ -369,13 +397,14 @@ func TestAcctUpdates(t *testing.T) {
// lastCreatableID stores asset or app max used index to get rid of conflicts
lastCreatableID := crypto.RandUint64() % 512
knownCreatables := make(map[basics.CreatableIndex]bool)
+
for i := basics.Round(10); i < basics.Round(proto.MaxBalLookback+15); i++ {
rewardLevelDelta := crypto.RandUint64() % 5
rewardLevel += rewardLevelDelta
var updates ledgercore.AccountDeltas
var totals map[basics.Address]basics.AccountData
base := accts[i-1]
- updates, totals, lastCreatableID = randomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID)
+ updates, totals, lastCreatableID = ledgertesting.RandomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)
@@ -405,14 +434,35 @@ func TestAcctUpdates(t *testing.T) {
for i := basics.Round(0); i < 15; i++ {
// Clear the timer to ensure a flush
- au.lastFlushTime = time.Time{}
+ ml.trackers.lastFlushTime = time.Time{}
- au.committedUpTo(basics.Round(proto.MaxBalLookback) + i)
- au.waitAccountsWriting()
+ ml.trackers.committedUpTo(basics.Round(proto.MaxBalLookback) + i)
+ ml.trackers.waitAccountsWriting()
checkAcctUpdates(t, au, i, basics.Round(proto.MaxBalLookback+14), accts, rewardsLevels, proto)
}
-}
+ // check the account totals.
+ var dbRound basics.Round
+ err := ml.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
+ dbRound, err = accountsRound(tx)
+ return
+ })
+ require.NoError(t, err)
+
+ var updates ledgercore.AccountDeltas
+ for addr, acctData := range accts[dbRound] {
+ updates.Upsert(addr, acctData)
+ }
+
+ expectedTotals := ledgertesting.CalculateNewRoundAccountTotals(t, updates, rewardsLevels[dbRound], proto, nil, ledgercore.AccountTotals{})
+ var actualTotals ledgercore.AccountTotals
+ err = ml.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
+ actualTotals, err = accountsTotals(tx, false)
+ return
+ })
+ require.NoError(t, err)
+ require.Equal(t, expectedTotals, actualTotals)
+}
func TestAcctUpdatesFastUpdates(t *testing.T) {
partitiontest.PartitionTest(t)
@@ -421,7 +471,7 @@ func TestAcctUpdatesFastUpdates(t *testing.T) {
}
proto := config.Consensus[protocol.ConsensusCurrentVersion]
- accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)}
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
rewardsLevels := []uint64{0}
pooldata := basics.AccountData{}
@@ -437,15 +487,11 @@ func TestAcctUpdatesFastUpdates(t *testing.T) {
ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts)
defer ml.Close()
- au := &accountUpdates{}
conf := config.GetDefaultLocal()
conf.CatchpointInterval = 1
- au.initialize(conf, ".", proto, accts[0])
+ au := newAcctUpdates(t, ml, conf, ".")
defer au.close()
- err := au.loadFromDisk(ml)
- require.NoError(t, err)
-
// cover 10 genesis blocks
rewardLevel := uint64(0)
for i := 1; i < 10; i++ {
@@ -460,7 +506,7 @@ func TestAcctUpdatesFastUpdates(t *testing.T) {
for i := basics.Round(10); i < basics.Round(proto.MaxBalLookback+15); i++ {
rewardLevelDelta := crypto.RandUint64() % 5
rewardLevel += rewardLevelDelta
- updates, totals := randomDeltasBalanced(1, accts[i-1], rewardLevel)
+ updates, totals := ledgertesting.RandomDeltasBalanced(1, accts[i-1], rewardLevel)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)
@@ -487,7 +533,7 @@ func TestAcctUpdatesFastUpdates(t *testing.T) {
wg.Add(1)
go func(round basics.Round) {
defer wg.Done()
- au.committedUpTo(round)
+ ml.trackers.committedUpTo(round)
}(i)
}
wg.Wait()
@@ -513,7 +559,7 @@ func BenchmarkBalancesChanges(b *testing.B) {
initialRounds := uint64(1)
accountsCount := 5000
- accts := []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)}
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)}
rewardsLevels := []uint64{0}
pooldata := basics.AccountData{}
@@ -529,10 +575,8 @@ func BenchmarkBalancesChanges(b *testing.B) {
ml := makeMockLedgerForTracker(b, true, int(initialRounds), protocolVersion, accts)
defer ml.Close()
- au := &accountUpdates{}
- au.initialize(config.GetDefaultLocal(), ".", proto, accts[0])
- err := au.loadFromDisk(ml)
- require.NoError(b, err)
+ conf := config.GetDefaultLocal()
+ au := newAcctUpdates(b, ml, conf, ".")
defer au.close()
// cover initialRounds genesis blocks
@@ -550,7 +594,7 @@ func BenchmarkBalancesChanges(b *testing.B) {
accountChanges = accountsCount - 2 - int(basics.Round(proto.MaxBalLookback+uint64(b.N))+i)
}
- updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
+ updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(b, err)
@@ -575,18 +619,18 @@ func BenchmarkBalancesChanges(b *testing.B) {
}
for i := proto.MaxBalLookback; i < proto.MaxBalLookback+initialRounds; i++ {
// Clear the timer to ensure a flush
- au.lastFlushTime = time.Time{}
- au.committedUpTo(basics.Round(i))
+ ml.trackers.lastFlushTime = time.Time{}
+ ml.trackers.committedUpTo(basics.Round(i))
}
- au.waitAccountsWriting()
+ ml.trackers.waitAccountsWriting()
b.ResetTimer()
startTime := time.Now()
for i := proto.MaxBalLookback + initialRounds; i < proto.MaxBalLookback+uint64(b.N); i++ {
// Clear the timer to ensure a flush
- au.lastFlushTime = time.Time{}
- au.committedUpTo(basics.Round(i))
+ ml.trackers.lastFlushTime = time.Time{}
+ ml.trackers.committedUpTo(basics.Round(i))
}
- au.waitAccountsWriting()
+ ml.trackers.waitAccountsWriting()
deltaTime := time.Now().Sub(startTime)
if deltaTime > time.Second {
return
@@ -644,7 +688,7 @@ func TestLargeAccountCountCatchpointGeneration(t *testing.T) {
os.RemoveAll("./catchpoints")
}()
- accts := []map[basics.Address]basics.AccountData{randomAccounts(100000, true)}
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(100000, true)}
rewardsLevels := []uint64{0}
pooldata := basics.AccountData{}
@@ -660,14 +704,11 @@ func TestLargeAccountCountCatchpointGeneration(t *testing.T) {
ml := makeMockLedgerForTracker(t, true, 10, testProtocolVersion, accts)
defer ml.Close()
- au := &accountUpdates{}
conf := config.GetDefaultLocal()
conf.CatchpointInterval = 1
conf.Archival = true
- au.initialize(conf, ".", protoParams, accts[0])
+ au := newAcctUpdates(t, ml, conf, ".")
defer au.close()
- err := au.loadFromDisk(ml)
- require.NoError(t, err)
// cover 10 genesis blocks
rewardLevel := uint64(0)
@@ -679,7 +720,7 @@ func TestLargeAccountCountCatchpointGeneration(t *testing.T) {
for i := basics.Round(10); i < basics.Round(protoParams.MaxBalLookback+5); i++ {
rewardLevelDelta := crypto.RandUint64() % 5
rewardLevel += rewardLevelDelta
- updates, totals := randomDeltasBalanced(1, accts[i-1], rewardLevel)
+ updates, totals := ledgertesting.RandomDeltasBalanced(1, accts[i-1], rewardLevel)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)
@@ -703,9 +744,9 @@ func TestLargeAccountCountCatchpointGeneration(t *testing.T) {
accts = append(accts, totals)
rewardsLevels = append(rewardsLevels, rewardLevel)
- au.committedUpTo(i)
+ ml.trackers.committedUpTo(i)
if i%2 == 1 {
- au.waitAccountsWriting()
+ ml.trackers.waitAccountsWriting()
}
}
}
@@ -736,7 +777,7 @@ func TestAcctUpdatesUpdatesCorrectness(t *testing.T) {
inMemory := true
testFunction := func(t *testing.T) {
- accts := []map[basics.Address]basics.AccountData{randomAccounts(9, true)}
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(9, true)}
pooldata := basics.AccountData{}
pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
@@ -768,13 +809,10 @@ func TestAcctUpdatesUpdatesCorrectness(t *testing.T) {
accts[0][addr] = accountData
}
- au := &accountUpdates{}
- au.initialize(config.GetDefaultLocal(), ".", protoParams, accts[0])
+ conf := config.GetDefaultLocal()
+ au := newAcctUpdates(t, ml, conf, ".")
defer au.close()
- err := au.loadFromDisk(ml)
- require.NoError(t, err)
-
// cover 10 genesis blocks
rewardLevel := uint64(0)
for i := 1; i < 10; i++ {
@@ -865,10 +903,10 @@ func TestAcctUpdatesUpdatesCorrectness(t *testing.T) {
delta.Accts.Upsert(addr, ad)
}
au.newBlock(blk, delta)
- au.committedUpTo(i)
+ ml.trackers.committedUpTo(i)
}
lastRound := i - 1
- au.waitAccountsWriting()
+ ml.trackers.waitAccountsWriting()
for idx, addr := range moneyAccounts {
balance, validThrough, err := au.LookupWithoutRewards(lastRound, addr)
@@ -888,55 +926,6 @@ func TestAcctUpdatesUpdatesCorrectness(t *testing.T) {
t.Run("DiskDB", testFunction)
}
-// TestAcctUpdatesDeleteStoredCatchpoints - The goal of this test is to verify that the deleteStoredCatchpoints function works correctly.
-// it doing so by filling up the storedcatchpoints with dummy catchpoint file entries, as well as creating these dummy files on disk.
-// ( the term dummy is only because these aren't real catchpoint files, but rather a zero-length file ). Then, the test call the function
-// and ensures that it did not errored, the catchpoint files were correctly deleted, and that deleteStoredCatchpoints contains no more
-// entries.
-func TestAcctUpdatesDeleteStoredCatchpoints(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- proto := config.Consensus[protocol.ConsensusCurrentVersion]
- accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)}
-
- ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts)
- defer ml.Close()
-
- au := &accountUpdates{}
- conf := config.GetDefaultLocal()
- conf.CatchpointInterval = 1
- au.initialize(conf, ".", proto, accts[0])
- defer au.close()
-
- err := au.loadFromDisk(ml)
- require.NoError(t, err)
-
- dummyCatchpointFilesToCreate := 42
-
- for i := 0; i < dummyCatchpointFilesToCreate; i++ {
- f, err := os.Create(fmt.Sprintf("./dummy_catchpoint_file-%d", i))
- require.NoError(t, err)
- err = f.Close()
- require.NoError(t, err)
- }
-
- for i := 0; i < dummyCatchpointFilesToCreate; i++ {
- err := au.accountsq.storeCatchpoint(context.Background(), basics.Round(i), fmt.Sprintf("./dummy_catchpoint_file-%d", i), "", 0)
- require.NoError(t, err)
- }
- err = au.deleteStoredCatchpoints(context.Background(), au.accountsq)
- require.NoError(t, err)
-
- for i := 0; i < dummyCatchpointFilesToCreate; i++ {
- // ensure that all the files were deleted.
- _, err := os.Open(fmt.Sprintf("./dummy_catchpoint_file-%d", i))
- require.True(t, os.IsNotExist(err))
- }
- fileNames, err := au.accountsq.getOldestCatchpointFiles(context.Background(), dummyCatchpointFilesToCreate, 0)
- require.NoError(t, err)
- require.Equal(t, 0, len(fileNames))
-}
-
// listAndCompareComb lists the assets/applications and then compares against the expected
// It repeats with different combinations of the limit parameters
func listAndCompareComb(t *testing.T, au *accountUpdates, expected map[basics.CreatableIndex]ledgercore.ModifiedCreatable) {
@@ -1074,7 +1063,7 @@ func TestListCreatables(t *testing.T) {
require.NoError(t, err)
au := &accountUpdates{}
- au.accountsq, err = accountsDbInit(tx, tx)
+ au.accountsq, err = accountsInitDbQueries(tx, tx)
require.NoError(t, err)
// ******* All results are obtained from the cache. Empty database *******
@@ -1116,97 +1105,6 @@ func TestListCreatables(t *testing.T) {
listAndCompareComb(t, au, expectedDbImage)
}
-func TestIsWritingCatchpointFile(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- au := &accountUpdates{}
-
- au.catchpointWriting = -1
- ans := au.IsWritingCatchpointFile()
- require.True(t, ans)
-
- au.catchpointWriting = 0
- ans = au.IsWritingCatchpointFile()
- require.False(t, ans)
-}
-
-func TestGetCatchpointStream(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- proto := config.Consensus[protocol.ConsensusCurrentVersion]
-
- accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)}
-
- ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts)
- defer ml.Close()
-
- au := &accountUpdates{}
- conf := config.GetDefaultLocal()
- conf.CatchpointInterval = 1
- au.initialize(conf, ".", proto, accts[0])
- defer au.close()
-
- err := au.loadFromDisk(ml)
- require.NoError(t, err)
-
- filesToCreate := 4
-
- temporaryDirectroy, err := ioutil.TempDir(os.TempDir(), "catchpoints")
- require.NoError(t, err)
- defer func() {
- os.RemoveAll(temporaryDirectroy)
- }()
- catchpointsDirectory := filepath.Join(temporaryDirectroy, "catchpoints")
- err = os.Mkdir(catchpointsDirectory, 0777)
- require.NoError(t, err)
-
- au.dbDirectory = temporaryDirectroy
-
- // Create the catchpoint files with dummy data
- for i := 0; i < filesToCreate; i++ {
- fileName := filepath.Join("catchpoints", fmt.Sprintf("%d.catchpoint", i))
- data := []byte{byte(i), byte(i + 1), byte(i + 2)}
- err = ioutil.WriteFile(filepath.Join(temporaryDirectroy, fileName), data, 0666)
- require.NoError(t, err)
-
- // Store the catchpoint into the database
- err := au.accountsq.storeCatchpoint(context.Background(), basics.Round(i), fileName, "", int64(len(data)))
- require.NoError(t, err)
- }
-
- dataRead := make([]byte, 3)
- var n int
-
- // File on disk, and database has the record
- reader, err := au.GetCatchpointStream(basics.Round(1))
- n, err = reader.Read(dataRead)
- require.NoError(t, err)
- require.Equal(t, 3, n)
- outData := []byte{1, 2, 3}
- require.Equal(t, outData, dataRead)
- len, err := reader.Size()
- require.NoError(t, err)
- require.Equal(t, int64(3), len)
-
- // File deleted, but record in the database
- err = os.Remove(filepath.Join(temporaryDirectroy, "catchpoints", "2.catchpoint"))
- reader, err = au.GetCatchpointStream(basics.Round(2))
- require.Equal(t, ledgercore.ErrNoEntry{}, err)
- require.Nil(t, reader)
-
- // File on disk, but database lost the record
- err = au.accountsq.storeCatchpoint(context.Background(), basics.Round(3), "", "", 0)
- reader, err = au.GetCatchpointStream(basics.Round(3))
- n, err = reader.Read(dataRead)
- require.NoError(t, err)
- require.Equal(t, 3, n)
- outData = []byte{3, 4, 5}
- require.Equal(t, outData, dataRead)
-
- err = au.deleteStoredCatchpoints(context.Background(), au.accountsq)
- require.NoError(t, err)
-}
-
func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err error) {
rows, err := tx.Query("SELECT address, data FROM accountbase")
if err != nil {
@@ -1246,7 +1144,7 @@ func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err er
func BenchmarkLargeMerkleTrieRebuild(b *testing.B) {
proto := config.Consensus[protocol.ConsensusCurrentVersion]
- accts := []map[basics.Address]basics.AccountData{randomAccounts(5, true)}
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(5, true)}
pooldata := basics.AccountData{}
pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
@@ -1261,21 +1159,17 @@ func BenchmarkLargeMerkleTrieRebuild(b *testing.B) {
ml := makeMockLedgerForTracker(b, true, 10, protocol.ConsensusCurrentVersion, accts)
defer ml.Close()
- au := &accountUpdates{}
cfg := config.GetDefaultLocal()
cfg.Archival = true
- au.initialize(cfg, ".", proto, accts[0])
+ au := newAcctUpdates(b, ml, cfg, ".")
defer au.close()
- err := au.loadFromDisk(ml)
- require.NoError(b, err)
-
// at this point, the database was created. We want to fill the accounts data
accountsNumber := 6000000 * b.N
for i := 0; i < accountsNumber-5-2; { // subtract the account we've already created above, plus the sink/reward
var updates compactAccountDeltas
for k := 0; i < accountsNumber-5-2 && k < 1024; k++ {
- addr := randomAddress()
+ addr := ledgertesting.RandomAddress()
acctData := basics.AccountData{}
acctData.MicroAlgos.Raw = 1
updates.upsert(addr, accountDelta{new: acctData})
@@ -1289,87 +1183,20 @@ func BenchmarkLargeMerkleTrieRebuild(b *testing.B) {
require.NoError(b, err)
}
- err = ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
- return updateAccountsRound(tx, 0, 1)
+ err := ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
+ return updateAccountsHashRound(tx, 1)
})
require.NoError(b, err)
au.close()
b.ResetTimer()
- err = au.loadFromDisk(ml)
+ err = au.loadFromDisk(ml, 0)
require.NoError(b, err)
b.StopTimer()
b.ReportMetric(float64(accountsNumber), "entries/trie")
}
-func BenchmarkLargeCatchpointWriting(b *testing.B) {
- proto := config.Consensus[protocol.ConsensusCurrentVersion]
-
- accts := []map[basics.Address]basics.AccountData{randomAccounts(5, true)}
-
- pooldata := basics.AccountData{}
- pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
- pooldata.Status = basics.NotParticipating
- accts[0][testPoolAddr] = pooldata
-
- sinkdata := basics.AccountData{}
- sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
- sinkdata.Status = basics.NotParticipating
- accts[0][testSinkAddr] = sinkdata
-
- ml := makeMockLedgerForTracker(b, true, 10, protocol.ConsensusCurrentVersion, accts)
- defer ml.Close()
-
- au := &accountUpdates{}
- cfg := config.GetDefaultLocal()
- cfg.Archival = true
- au.initialize(cfg, ".", proto, accts[0])
- defer au.close()
-
- temporaryDirectroy, err := ioutil.TempDir(os.TempDir(), "catchpoints")
- require.NoError(b, err)
- defer func() {
- os.RemoveAll(temporaryDirectroy)
- }()
- catchpointsDirectory := filepath.Join(temporaryDirectroy, "catchpoints")
- err = os.Mkdir(catchpointsDirectory, 0777)
- require.NoError(b, err)
-
- au.dbDirectory = temporaryDirectroy
-
- err = au.loadFromDisk(ml)
- require.NoError(b, err)
-
- // at this point, the database was created. We want to fill the accounts data
- accountsNumber := 6000000 * b.N
- err = ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
- for i := 0; i < accountsNumber-5-2; { // subtract the account we've already created above, plus the sink/reward
- var updates compactAccountDeltas
- for k := 0; i < accountsNumber-5-2 && k < 1024; k++ {
- addr := randomAddress()
- acctData := basics.AccountData{}
- acctData.MicroAlgos.Raw = 1
- updates.upsert(addr, accountDelta{new: acctData})
- i++
- }
-
- _, err = accountsNewRound(tx, updates, nil, proto, basics.Round(1))
- if err != nil {
- return
- }
- }
-
- return updateAccountsRound(tx, 0, 1)
- })
- require.NoError(b, err)
-
- b.ResetTimer()
- au.generateCatchpoint(basics.Round(0), "0#ABCD", crypto.Digest{}, time.Second)
- b.StopTimer()
- b.ReportMetric(float64(accountsNumber), "accounts")
-}
-
func BenchmarkCompactDeltas(b *testing.B) {
b.Run("account-deltas", func(b *testing.B) {
if b.N < 500 {
@@ -1461,129 +1288,6 @@ func TestCompactDeltas(t *testing.T) {
}
-func TestReproducibleCatchpointLabels(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- if runtime.GOARCH == "arm" || runtime.GOARCH == "arm64" {
- t.Skip("This test is too slow on ARM and causes travis builds to time out")
- }
- // create new protocol version, which has lower lookback
- testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestReproducibleCatchpointLabels")
- protoParams := config.Consensus[protocol.ConsensusCurrentVersion]
- protoParams.MaxBalLookback = 32
- protoParams.SeedLookback = 2
- protoParams.SeedRefreshInterval = 8
- config.Consensus[testProtocolVersion] = protoParams
- defer func() {
- delete(config.Consensus, testProtocolVersion)
- }()
-
- accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)}
- rewardsLevels := []uint64{0}
-
- pooldata := basics.AccountData{}
- pooldata.MicroAlgos.Raw = 100 * 1000 * 1000 * 1000 * 1000
- pooldata.Status = basics.NotParticipating
- accts[0][testPoolAddr] = pooldata
-
- sinkdata := basics.AccountData{}
- sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
- sinkdata.Status = basics.NotParticipating
- accts[0][testSinkAddr] = sinkdata
-
- ml := makeMockLedgerForTracker(t, false, 1, testProtocolVersion, accts)
- defer ml.Close()
-
- au := &accountUpdates{}
- cfg := config.GetDefaultLocal()
- cfg.CatchpointInterval = 50
- cfg.CatchpointTracking = 1
- au.initialize(cfg, ".", protoParams, accts[0])
- defer au.close()
-
- err := au.loadFromDisk(ml)
- require.NoError(t, err)
-
- rewardLevel := uint64(0)
-
- const testCatchpointLabelsCount = 5
-
- // lastCreatableID stores asset or app max used index to get rid of conflicts
- lastCreatableID := crypto.RandUint64() % 512
- knownCreatables := make(map[basics.CreatableIndex]bool)
- catchpointLabels := make(map[basics.Round]string)
- ledgerHistory := make(map[basics.Round]*mockLedgerForTracker)
- roundDeltas := make(map[basics.Round]ledgercore.StateDelta)
- for i := basics.Round(1); i <= basics.Round(testCatchpointLabelsCount*cfg.CatchpointInterval); i++ {
- rewardLevelDelta := crypto.RandUint64() % 5
- rewardLevel += rewardLevelDelta
- var updates ledgercore.AccountDeltas
- var totals map[basics.Address]basics.AccountData
- base := accts[i-1]
- updates, totals, lastCreatableID = randomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID)
- prevTotals, err := au.Totals(basics.Round(i - 1))
- require.NoError(t, err)
-
- newPool := totals[testPoolAddr]
- newPool.MicroAlgos.Raw -= prevTotals.RewardUnits() * rewardLevelDelta
- updates.Upsert(testPoolAddr, newPool)
- totals[testPoolAddr] = newPool
-
- blk := bookkeeping.Block{
- BlockHeader: bookkeeping.BlockHeader{
- Round: basics.Round(i),
- },
- }
- blk.RewardsLevel = rewardLevel
- blk.CurrentProtocol = testProtocolVersion
- delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0)
- delta.Accts.MergeAccounts(updates)
- delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables)
- au.newBlock(blk, delta)
- au.committedUpTo(i)
- ml.addMockBlock(blockEntry{block: blk}, delta)
- accts = append(accts, totals)
- rewardsLevels = append(rewardsLevels, rewardLevel)
- roundDeltas[i] = delta
-
- // if this is a catchpoint round, save the label.
- if uint64(i)%cfg.CatchpointInterval == 0 {
- au.waitAccountsWriting()
- catchpointLabels[i] = au.GetLastCatchpointLabel()
- ledgerHistory[i] = ml.fork(t)
- defer ledgerHistory[i].Close()
- }
- }
-
- // test in revese what happens when we try to repeat the exact same blocks.
- // start off with the catchpoint before the last one
- startingRound := basics.Round((testCatchpointLabelsCount - 1) * cfg.CatchpointInterval)
- for ; startingRound > basics.Round(cfg.CatchpointInterval); startingRound -= basics.Round(cfg.CatchpointInterval) {
- au.close()
- err := au.loadFromDisk(ledgerHistory[startingRound])
- require.NoError(t, err)
-
- for i := startingRound + 1; i <= basics.Round(testCatchpointLabelsCount*cfg.CatchpointInterval); i++ {
- blk := bookkeeping.Block{
- BlockHeader: bookkeeping.BlockHeader{
- Round: basics.Round(i),
- },
- }
- blk.RewardsLevel = rewardsLevels[i]
- blk.CurrentProtocol = testProtocolVersion
- delta := roundDeltas[i]
- au.newBlock(blk, delta)
- au.committedUpTo(i)
-
- // if this is a catchpoint round, check the label.
- if uint64(i)%cfg.CatchpointInterval == 0 {
- au.waitAccountsWriting()
- require.Equal(t, catchpointLabels[i], au.GetLastCatchpointLabel())
- }
- }
- }
-}
-
// TestCachesInitialization test the functionality of the initializeCaches cache.
func TestCachesInitialization(t *testing.T) {
partitiontest.PartitionTest(t)
@@ -1594,7 +1298,7 @@ func TestCachesInitialization(t *testing.T) {
initialRounds := uint64(1)
accountsCount := 5
- accts := []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)}
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)}
rewardsLevels := []uint64{0}
pooldata := basics.AccountData{}
@@ -1611,10 +1315,8 @@ func TestCachesInitialization(t *testing.T) {
ml.log.SetLevel(logging.Warn)
defer ml.Close()
- au := &accountUpdates{}
- au.initialize(config.GetDefaultLocal(), ".", proto, accts[0])
- err := au.loadFromDisk(ml)
- require.NoError(t, err)
+ conf := config.GetDefaultLocal()
+ au := newAcctUpdates(t, ml, conf, ".")
// cover initialRounds genesis blocks
rewardLevel := uint64(0)
@@ -1630,7 +1332,7 @@ func TestCachesInitialization(t *testing.T) {
rewardLevel += rewardLevelDelta
accountChanges := 2
- updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
+ updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)
@@ -1652,15 +1354,15 @@ func TestCachesInitialization(t *testing.T) {
delta.Totals = accumulateTotals(t, protocol.ConsensusCurrentVersion, []map[basics.Address]basics.AccountData{totals}, rewardLevel)
ml.addMockBlock(blockEntry{block: blk}, delta)
au.newBlock(blk, delta)
- au.committedUpTo(basics.Round(i))
- au.waitAccountsWriting()
+ ml.trackers.committedUpTo(basics.Round(i))
+ ml.trackers.waitAccountsWriting()
accts = append(accts, totals)
rewardsLevels = append(rewardsLevels, rewardLevel)
}
au.close()
// reset the accounts, since their balances are now changed due to the rewards.
- accts = []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)}
+ accts = []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)}
// create another mocked ledger, but this time with a fresh new tracker database.
ml2 := makeMockLedgerForTracker(t, true, int(initialRounds), protocolVersion, accts)
@@ -1671,15 +1373,13 @@ func TestCachesInitialization(t *testing.T) {
ml2.blocks = ml.blocks
ml2.deltas = ml.deltas
- au = &accountUpdates{}
- au.initialize(config.GetDefaultLocal(), ".", proto, accts[0])
- err = au.loadFromDisk(ml2)
- require.NoError(t, err)
+ conf = config.GetDefaultLocal()
+ au = newAcctUpdates(t, ml2, conf, ".")
defer au.close()
// make sure the deltas array end up containing only the most recent 320 rounds.
require.Equal(t, int(proto.MaxBalLookback), len(au.deltas))
- require.Equal(t, recoveredLedgerRound-basics.Round(proto.MaxBalLookback), au.dbRound)
+ require.Equal(t, recoveredLedgerRound-basics.Round(proto.MaxBalLookback), au.cachedDBRound)
}
// TestSplittingConsensusVersionCommits tests the a sequence of commits that spans over multiple consensus versions works correctly.
@@ -1692,7 +1392,7 @@ func TestSplittingConsensusVersionCommits(t *testing.T) {
initialRounds := uint64(1)
accountsCount := 5
- accts := []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)}
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)}
rewardsLevels := []uint64{0}
pooldata := basics.AccountData{}
@@ -1709,9 +1409,10 @@ func TestSplittingConsensusVersionCommits(t *testing.T) {
ml.log.SetLevel(logging.Warn)
defer ml.Close()
- au := &accountUpdates{}
- au.initialize(config.GetDefaultLocal(), ".", initialProtoParams, accts[0])
- err := au.loadFromDisk(ml)
+ conf := config.GetDefaultLocal()
+ au := newAcctUpdates(t, ml, conf, ".")
+
+ err := au.loadFromDisk(ml, 0)
require.NoError(t, err)
defer au.close()
@@ -1730,7 +1431,7 @@ func TestSplittingConsensusVersionCommits(t *testing.T) {
rewardLevel += rewardLevelDelta
accountChanges := 2
- updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
+ updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)
@@ -1765,7 +1466,7 @@ func TestSplittingConsensusVersionCommits(t *testing.T) {
rewardLevel += rewardLevelDelta
accountChanges := 2
- updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
+ updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)
@@ -1790,10 +1491,10 @@ func TestSplittingConsensusVersionCommits(t *testing.T) {
accts = append(accts, totals)
rewardsLevels = append(rewardsLevels, rewardLevel)
}
- // now, commit and verify that the committedUpTo method broken the range correctly.
- au.committedUpTo(lastRoundToWrite)
- au.waitAccountsWriting()
- require.Equal(t, basics.Round(initialRounds+extraRounds)-1, au.dbRound)
+ // now, commit and verify that the produceCommittingTask method broken the range correctly.
+ ml.trackers.committedUpTo(lastRoundToWrite)
+ ml.trackers.waitAccountsWriting()
+ require.Equal(t, basics.Round(initialRounds+extraRounds)-1, au.cachedDBRound)
}
@@ -1808,7 +1509,7 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) {
initialRounds := uint64(1)
accountsCount := 5
- accts := []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)}
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)}
rewardsLevels := []uint64{0}
pooldata := basics.AccountData{}
@@ -1825,9 +1526,10 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) {
ml.log.SetLevel(logging.Warn)
defer ml.Close()
- au := &accountUpdates{}
- au.initialize(config.GetDefaultLocal(), ".", initialProtoParams, accts[0])
- err := au.loadFromDisk(ml)
+ conf := config.GetDefaultLocal()
+ au := newAcctUpdates(t, ml, conf, ".")
+
+ err := au.loadFromDisk(ml, 0)
require.NoError(t, err)
defer au.close()
@@ -1846,7 +1548,7 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) {
rewardLevel += rewardLevelDelta
accountChanges := 2
- updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
+ updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)
@@ -1880,7 +1582,7 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) {
rewardLevel += rewardLevelDelta
accountChanges := 2
- updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
+ updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)
@@ -1905,10 +1607,10 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) {
accts = append(accts, totals)
rewardsLevels = append(rewardsLevels, rewardLevel)
}
- // now, commit and verify that the committedUpTo method broken the range correctly.
- au.committedUpTo(endOfFirstNewProtocolSegment)
- au.waitAccountsWriting()
- require.Equal(t, basics.Round(initialRounds+extraRounds)-1, au.dbRound)
+ // now, commit and verify that the produceCommittingTask method broken the range correctly.
+ ml.trackers.committedUpTo(endOfFirstNewProtocolSegment)
+ ml.trackers.waitAccountsWriting()
+ require.Equal(t, basics.Round(initialRounds+extraRounds)-1, au.cachedDBRound)
// write additional extraRounds elements and verify these can be flushed.
for i := endOfFirstNewProtocolSegment + 1; i <= basics.Round(initialRounds+2*extraRounds+initialProtoParams.MaxBalLookback); i++ {
@@ -1916,7 +1618,7 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) {
rewardLevel += rewardLevelDelta
accountChanges := 2
- updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
+ updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)
@@ -1941,9 +1643,9 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) {
accts = append(accts, totals)
rewardsLevels = append(rewardsLevels, rewardLevel)
}
- au.committedUpTo(endOfFirstNewProtocolSegment + basics.Round(extraRounds))
- au.waitAccountsWriting()
- require.Equal(t, basics.Round(initialRounds+2*extraRounds), au.dbRound)
+ ml.trackers.committedUpTo(endOfFirstNewProtocolSegment + basics.Round(extraRounds))
+ ml.trackers.waitAccountsWriting()
+ require.Equal(t, basics.Round(initialRounds+2*extraRounds), au.cachedDBRound)
}
// TestConsecutiveVersion tests the consecutiveVersion method correctness.