diff options
Diffstat (limited to 'ledger/acctupdates_test.go')
-rw-r--r-- | ledger/acctupdates_test.go | 568 |
1 files changed, 135 insertions, 433 deletions
diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index 90ca7d205..27306af53 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -24,7 +24,6 @@ import ( "fmt" "io/ioutil" "os" - "path/filepath" "runtime" "strings" "sync" @@ -37,13 +36,18 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger/internal" "github.com/algorand/go-algorand/ledger/ledgercore" + ledgertesting "github.com/algorand/go-algorand/ledger/testing" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" "github.com/algorand/go-algorand/util/db" ) +var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} +var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36} + type mockLedgerForTracker struct { dbs db.Pair blocks []blockEntry @@ -52,6 +56,10 @@ type mockLedgerForTracker struct { filename string inMemory bool consensusParams config.ConsensusParams + accts map[basics.Address]basics.AccountData + + // trackerRegistry manages persistence into DB so we have to have it here even for a single tracker test + trackers trackerRegistry } func accumulateTotals(t testing.TB, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData, rewardLevel uint64) (totals ledgercore.AccountTotals) { @@ -84,7 +92,7 @@ func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount in } } consensusParams := config.Consensus[consensusVersion] - return &mockLedgerForTracker{dbs: dbs, log: dblogger, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams} + return &mockLedgerForTracker{dbs: dbs, log: dblogger, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]} } // fork creates another database which has the same content as the current one. Works only for non-memory databases. @@ -102,8 +110,12 @@ func (ml *mockLedgerForTracker) fork(t testing.TB) *mockLedgerForTracker { log: dblogger, blocks: make([]blockEntry, len(ml.blocks)), deltas: make([]ledgercore.StateDelta, len(ml.deltas)), + accts: make(map[basics.Address]basics.AccountData), filename: fn, } + for k, v := range ml.accts { + newLedgerTracker.accts[k] = v + } copy(newLedgerTracker.blocks, ml.blocks) copy(newLedgerTracker.deltas, ml.deltas) @@ -126,6 +138,8 @@ func (ml *mockLedgerForTracker) fork(t testing.TB) *mockLedgerForTracker { } func (ml *mockLedgerForTracker) Close() { + ml.trackers.close() + ml.dbs.Close() // delete the database files of non-memory instances. if !ml.inMemory { @@ -145,7 +159,7 @@ func (ml *mockLedgerForTracker) addMockBlock(be blockEntry, delta ledgercore.Sta return nil } -func (ml *mockLedgerForTracker) trackerEvalVerified(blk bookkeeping.Block, accUpdatesLedger ledgerForEvaluator) (ledgercore.StateDelta, error) { +func (ml *mockLedgerForTracker) trackerEvalVerified(blk bookkeeping.Block, accUpdatesLedger internal.LedgerForEvaluator) (ledgercore.StateDelta, error) { // support returning the deltas if the client explicitly provided them by calling addMockBlock, otherwise, // just return an empty state delta ( since the client clearly didn't care about these ) if len(ml.deltas) > int(blk.Round()) { @@ -195,6 +209,10 @@ func (ml *mockLedgerForTracker) GenesisProto() config.ConsensusParams { return ml.consensusParams } +func (ml *mockLedgerForTracker) GenesisAccounts() map[basics.Address]basics.AccountData { + return ml.accts +} + // this function used to be in acctupdates.go, but we were never using it for production purposes. This // function has a conceptual flaw in that it attempts to load the entire balances into memory. This might // not work if we have large number of balances. On these unit testing, however, it's not the case, and it's @@ -226,15 +244,28 @@ func (au *accountUpdates) allBalances(rnd basics.Round) (bals map[basics.Address return } +func newAcctUpdates(tb testing.TB, l *mockLedgerForTracker, conf config.Local, dbPathPrefix string) *accountUpdates { + au := &accountUpdates{} + au.initialize(conf) + _, err := trackerDBInitialize(l, false, ".") + require.NoError(tb, err) + + l.trackers.initialize(l, []ledgerTracker{au}, conf) + err = l.trackers.loadFromDisk(l) + require.NoError(tb, err) + + return au +} + func checkAcctUpdates(t *testing.T, au *accountUpdates, base basics.Round, latestRnd basics.Round, accts []map[basics.Address]basics.AccountData, rewards []uint64, proto config.ConsensusParams) { latest := au.latest() - require.Equal(t, latest, latestRnd) + require.Equal(t, latestRnd, latest) _, err := au.Totals(latest + 1) require.Error(t, err) var validThrough basics.Round - _, validThrough, err = au.LookupWithoutRewards(latest+1, randomAddress()) + _, validThrough, err = au.LookupWithoutRewards(latest+1, ledgertesting.RandomAddress()) require.Error(t, err) require.Equal(t, basics.Round(0), validThrough) @@ -242,7 +273,7 @@ func checkAcctUpdates(t *testing.T, au *accountUpdates, base basics.Round, lates _, err := au.Totals(base - 1) require.Error(t, err) - _, validThrough, err = au.LookupWithoutRewards(base-1, randomAddress()) + _, validThrough, err = au.LookupWithoutRewards(base-1, ledgertesting.RandomAddress()) require.Error(t, err) require.Equal(t, basics.Round(0), validThrough) } @@ -301,7 +332,7 @@ func checkAcctUpdates(t *testing.T, au *accountUpdates, base basics.Round, lates require.Equal(t, totals.Participating().Raw, totalOnline+totalOffline) require.Equal(t, totals.All().Raw, totalOnline+totalOffline+totalNotPart) - d, validThrough, err := au.LookupWithoutRewards(rnd, randomAddress()) + d, validThrough, err := au.LookupWithoutRewards(rnd, ledgertesting.RandomAddress()) require.NoError(t, err) require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), fmt.Sprintf("validThrough :%v\nrnd :%v\n", validThrough, rnd)) require.Equal(t, d, basics.AccountData{}) @@ -334,7 +365,7 @@ func TestAcctUpdates(t *testing.T) { } proto := config.Consensus[protocol.ConsensusCurrentVersion] - accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)} + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} rewardsLevels := []uint64{0} pooldata := basics.AccountData{} @@ -350,13 +381,10 @@ func TestAcctUpdates(t *testing.T) { ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts) defer ml.Close() - au := &accountUpdates{} - au.initialize(config.GetDefaultLocal(), ".", proto, accts[0]) + conf := config.GetDefaultLocal() + au := newAcctUpdates(t, ml, conf, ".") defer au.close() - err := au.loadFromDisk(ml) - require.NoError(t, err) - // cover 10 genesis blocks rewardLevel := uint64(0) for i := 1; i < 10; i++ { @@ -369,13 +397,14 @@ func TestAcctUpdates(t *testing.T) { // lastCreatableID stores asset or app max used index to get rid of conflicts lastCreatableID := crypto.RandUint64() % 512 knownCreatables := make(map[basics.CreatableIndex]bool) + for i := basics.Round(10); i < basics.Round(proto.MaxBalLookback+15); i++ { rewardLevelDelta := crypto.RandUint64() % 5 rewardLevel += rewardLevelDelta var updates ledgercore.AccountDeltas var totals map[basics.Address]basics.AccountData base := accts[i-1] - updates, totals, lastCreatableID = randomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID) + updates, totals, lastCreatableID = ledgertesting.RandomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(t, err) @@ -405,14 +434,35 @@ func TestAcctUpdates(t *testing.T) { for i := basics.Round(0); i < 15; i++ { // Clear the timer to ensure a flush - au.lastFlushTime = time.Time{} + ml.trackers.lastFlushTime = time.Time{} - au.committedUpTo(basics.Round(proto.MaxBalLookback) + i) - au.waitAccountsWriting() + ml.trackers.committedUpTo(basics.Round(proto.MaxBalLookback) + i) + ml.trackers.waitAccountsWriting() checkAcctUpdates(t, au, i, basics.Round(proto.MaxBalLookback+14), accts, rewardsLevels, proto) } -} + // check the account totals. + var dbRound basics.Round + err := ml.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + dbRound, err = accountsRound(tx) + return + }) + require.NoError(t, err) + + var updates ledgercore.AccountDeltas + for addr, acctData := range accts[dbRound] { + updates.Upsert(addr, acctData) + } + + expectedTotals := ledgertesting.CalculateNewRoundAccountTotals(t, updates, rewardsLevels[dbRound], proto, nil, ledgercore.AccountTotals{}) + var actualTotals ledgercore.AccountTotals + err = ml.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + actualTotals, err = accountsTotals(tx, false) + return + }) + require.NoError(t, err) + require.Equal(t, expectedTotals, actualTotals) +} func TestAcctUpdatesFastUpdates(t *testing.T) { partitiontest.PartitionTest(t) @@ -421,7 +471,7 @@ func TestAcctUpdatesFastUpdates(t *testing.T) { } proto := config.Consensus[protocol.ConsensusCurrentVersion] - accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)} + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} rewardsLevels := []uint64{0} pooldata := basics.AccountData{} @@ -437,15 +487,11 @@ func TestAcctUpdatesFastUpdates(t *testing.T) { ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts) defer ml.Close() - au := &accountUpdates{} conf := config.GetDefaultLocal() conf.CatchpointInterval = 1 - au.initialize(conf, ".", proto, accts[0]) + au := newAcctUpdates(t, ml, conf, ".") defer au.close() - err := au.loadFromDisk(ml) - require.NoError(t, err) - // cover 10 genesis blocks rewardLevel := uint64(0) for i := 1; i < 10; i++ { @@ -460,7 +506,7 @@ func TestAcctUpdatesFastUpdates(t *testing.T) { for i := basics.Round(10); i < basics.Round(proto.MaxBalLookback+15); i++ { rewardLevelDelta := crypto.RandUint64() % 5 rewardLevel += rewardLevelDelta - updates, totals := randomDeltasBalanced(1, accts[i-1], rewardLevel) + updates, totals := ledgertesting.RandomDeltasBalanced(1, accts[i-1], rewardLevel) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(t, err) @@ -487,7 +533,7 @@ func TestAcctUpdatesFastUpdates(t *testing.T) { wg.Add(1) go func(round basics.Round) { defer wg.Done() - au.committedUpTo(round) + ml.trackers.committedUpTo(round) }(i) } wg.Wait() @@ -513,7 +559,7 @@ func BenchmarkBalancesChanges(b *testing.B) { initialRounds := uint64(1) accountsCount := 5000 - accts := []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)} + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)} rewardsLevels := []uint64{0} pooldata := basics.AccountData{} @@ -529,10 +575,8 @@ func BenchmarkBalancesChanges(b *testing.B) { ml := makeMockLedgerForTracker(b, true, int(initialRounds), protocolVersion, accts) defer ml.Close() - au := &accountUpdates{} - au.initialize(config.GetDefaultLocal(), ".", proto, accts[0]) - err := au.loadFromDisk(ml) - require.NoError(b, err) + conf := config.GetDefaultLocal() + au := newAcctUpdates(b, ml, conf, ".") defer au.close() // cover initialRounds genesis blocks @@ -550,7 +594,7 @@ func BenchmarkBalancesChanges(b *testing.B) { accountChanges = accountsCount - 2 - int(basics.Round(proto.MaxBalLookback+uint64(b.N))+i) } - updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) + updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(b, err) @@ -575,18 +619,18 @@ func BenchmarkBalancesChanges(b *testing.B) { } for i := proto.MaxBalLookback; i < proto.MaxBalLookback+initialRounds; i++ { // Clear the timer to ensure a flush - au.lastFlushTime = time.Time{} - au.committedUpTo(basics.Round(i)) + ml.trackers.lastFlushTime = time.Time{} + ml.trackers.committedUpTo(basics.Round(i)) } - au.waitAccountsWriting() + ml.trackers.waitAccountsWriting() b.ResetTimer() startTime := time.Now() for i := proto.MaxBalLookback + initialRounds; i < proto.MaxBalLookback+uint64(b.N); i++ { // Clear the timer to ensure a flush - au.lastFlushTime = time.Time{} - au.committedUpTo(basics.Round(i)) + ml.trackers.lastFlushTime = time.Time{} + ml.trackers.committedUpTo(basics.Round(i)) } - au.waitAccountsWriting() + ml.trackers.waitAccountsWriting() deltaTime := time.Now().Sub(startTime) if deltaTime > time.Second { return @@ -644,7 +688,7 @@ func TestLargeAccountCountCatchpointGeneration(t *testing.T) { os.RemoveAll("./catchpoints") }() - accts := []map[basics.Address]basics.AccountData{randomAccounts(100000, true)} + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(100000, true)} rewardsLevels := []uint64{0} pooldata := basics.AccountData{} @@ -660,14 +704,11 @@ func TestLargeAccountCountCatchpointGeneration(t *testing.T) { ml := makeMockLedgerForTracker(t, true, 10, testProtocolVersion, accts) defer ml.Close() - au := &accountUpdates{} conf := config.GetDefaultLocal() conf.CatchpointInterval = 1 conf.Archival = true - au.initialize(conf, ".", protoParams, accts[0]) + au := newAcctUpdates(t, ml, conf, ".") defer au.close() - err := au.loadFromDisk(ml) - require.NoError(t, err) // cover 10 genesis blocks rewardLevel := uint64(0) @@ -679,7 +720,7 @@ func TestLargeAccountCountCatchpointGeneration(t *testing.T) { for i := basics.Round(10); i < basics.Round(protoParams.MaxBalLookback+5); i++ { rewardLevelDelta := crypto.RandUint64() % 5 rewardLevel += rewardLevelDelta - updates, totals := randomDeltasBalanced(1, accts[i-1], rewardLevel) + updates, totals := ledgertesting.RandomDeltasBalanced(1, accts[i-1], rewardLevel) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(t, err) @@ -703,9 +744,9 @@ func TestLargeAccountCountCatchpointGeneration(t *testing.T) { accts = append(accts, totals) rewardsLevels = append(rewardsLevels, rewardLevel) - au.committedUpTo(i) + ml.trackers.committedUpTo(i) if i%2 == 1 { - au.waitAccountsWriting() + ml.trackers.waitAccountsWriting() } } } @@ -736,7 +777,7 @@ func TestAcctUpdatesUpdatesCorrectness(t *testing.T) { inMemory := true testFunction := func(t *testing.T) { - accts := []map[basics.Address]basics.AccountData{randomAccounts(9, true)} + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(9, true)} pooldata := basics.AccountData{} pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 @@ -768,13 +809,10 @@ func TestAcctUpdatesUpdatesCorrectness(t *testing.T) { accts[0][addr] = accountData } - au := &accountUpdates{} - au.initialize(config.GetDefaultLocal(), ".", protoParams, accts[0]) + conf := config.GetDefaultLocal() + au := newAcctUpdates(t, ml, conf, ".") defer au.close() - err := au.loadFromDisk(ml) - require.NoError(t, err) - // cover 10 genesis blocks rewardLevel := uint64(0) for i := 1; i < 10; i++ { @@ -865,10 +903,10 @@ func TestAcctUpdatesUpdatesCorrectness(t *testing.T) { delta.Accts.Upsert(addr, ad) } au.newBlock(blk, delta) - au.committedUpTo(i) + ml.trackers.committedUpTo(i) } lastRound := i - 1 - au.waitAccountsWriting() + ml.trackers.waitAccountsWriting() for idx, addr := range moneyAccounts { balance, validThrough, err := au.LookupWithoutRewards(lastRound, addr) @@ -888,55 +926,6 @@ func TestAcctUpdatesUpdatesCorrectness(t *testing.T) { t.Run("DiskDB", testFunction) } -// TestAcctUpdatesDeleteStoredCatchpoints - The goal of this test is to verify that the deleteStoredCatchpoints function works correctly. -// it doing so by filling up the storedcatchpoints with dummy catchpoint file entries, as well as creating these dummy files on disk. -// ( the term dummy is only because these aren't real catchpoint files, but rather a zero-length file ). Then, the test call the function -// and ensures that it did not errored, the catchpoint files were correctly deleted, and that deleteStoredCatchpoints contains no more -// entries. -func TestAcctUpdatesDeleteStoredCatchpoints(t *testing.T) { - partitiontest.PartitionTest(t) - - proto := config.Consensus[protocol.ConsensusCurrentVersion] - accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)} - - ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts) - defer ml.Close() - - au := &accountUpdates{} - conf := config.GetDefaultLocal() - conf.CatchpointInterval = 1 - au.initialize(conf, ".", proto, accts[0]) - defer au.close() - - err := au.loadFromDisk(ml) - require.NoError(t, err) - - dummyCatchpointFilesToCreate := 42 - - for i := 0; i < dummyCatchpointFilesToCreate; i++ { - f, err := os.Create(fmt.Sprintf("./dummy_catchpoint_file-%d", i)) - require.NoError(t, err) - err = f.Close() - require.NoError(t, err) - } - - for i := 0; i < dummyCatchpointFilesToCreate; i++ { - err := au.accountsq.storeCatchpoint(context.Background(), basics.Round(i), fmt.Sprintf("./dummy_catchpoint_file-%d", i), "", 0) - require.NoError(t, err) - } - err = au.deleteStoredCatchpoints(context.Background(), au.accountsq) - require.NoError(t, err) - - for i := 0; i < dummyCatchpointFilesToCreate; i++ { - // ensure that all the files were deleted. - _, err := os.Open(fmt.Sprintf("./dummy_catchpoint_file-%d", i)) - require.True(t, os.IsNotExist(err)) - } - fileNames, err := au.accountsq.getOldestCatchpointFiles(context.Background(), dummyCatchpointFilesToCreate, 0) - require.NoError(t, err) - require.Equal(t, 0, len(fileNames)) -} - // listAndCompareComb lists the assets/applications and then compares against the expected // It repeats with different combinations of the limit parameters func listAndCompareComb(t *testing.T, au *accountUpdates, expected map[basics.CreatableIndex]ledgercore.ModifiedCreatable) { @@ -1074,7 +1063,7 @@ func TestListCreatables(t *testing.T) { require.NoError(t, err) au := &accountUpdates{} - au.accountsq, err = accountsDbInit(tx, tx) + au.accountsq, err = accountsInitDbQueries(tx, tx) require.NoError(t, err) // ******* All results are obtained from the cache. Empty database ******* @@ -1116,97 +1105,6 @@ func TestListCreatables(t *testing.T) { listAndCompareComb(t, au, expectedDbImage) } -func TestIsWritingCatchpointFile(t *testing.T) { - partitiontest.PartitionTest(t) - - au := &accountUpdates{} - - au.catchpointWriting = -1 - ans := au.IsWritingCatchpointFile() - require.True(t, ans) - - au.catchpointWriting = 0 - ans = au.IsWritingCatchpointFile() - require.False(t, ans) -} - -func TestGetCatchpointStream(t *testing.T) { - partitiontest.PartitionTest(t) - - proto := config.Consensus[protocol.ConsensusCurrentVersion] - - accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)} - - ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts) - defer ml.Close() - - au := &accountUpdates{} - conf := config.GetDefaultLocal() - conf.CatchpointInterval = 1 - au.initialize(conf, ".", proto, accts[0]) - defer au.close() - - err := au.loadFromDisk(ml) - require.NoError(t, err) - - filesToCreate := 4 - - temporaryDirectroy, err := ioutil.TempDir(os.TempDir(), "catchpoints") - require.NoError(t, err) - defer func() { - os.RemoveAll(temporaryDirectroy) - }() - catchpointsDirectory := filepath.Join(temporaryDirectroy, "catchpoints") - err = os.Mkdir(catchpointsDirectory, 0777) - require.NoError(t, err) - - au.dbDirectory = temporaryDirectroy - - // Create the catchpoint files with dummy data - for i := 0; i < filesToCreate; i++ { - fileName := filepath.Join("catchpoints", fmt.Sprintf("%d.catchpoint", i)) - data := []byte{byte(i), byte(i + 1), byte(i + 2)} - err = ioutil.WriteFile(filepath.Join(temporaryDirectroy, fileName), data, 0666) - require.NoError(t, err) - - // Store the catchpoint into the database - err := au.accountsq.storeCatchpoint(context.Background(), basics.Round(i), fileName, "", int64(len(data))) - require.NoError(t, err) - } - - dataRead := make([]byte, 3) - var n int - - // File on disk, and database has the record - reader, err := au.GetCatchpointStream(basics.Round(1)) - n, err = reader.Read(dataRead) - require.NoError(t, err) - require.Equal(t, 3, n) - outData := []byte{1, 2, 3} - require.Equal(t, outData, dataRead) - len, err := reader.Size() - require.NoError(t, err) - require.Equal(t, int64(3), len) - - // File deleted, but record in the database - err = os.Remove(filepath.Join(temporaryDirectroy, "catchpoints", "2.catchpoint")) - reader, err = au.GetCatchpointStream(basics.Round(2)) - require.Equal(t, ledgercore.ErrNoEntry{}, err) - require.Nil(t, reader) - - // File on disk, but database lost the record - err = au.accountsq.storeCatchpoint(context.Background(), basics.Round(3), "", "", 0) - reader, err = au.GetCatchpointStream(basics.Round(3)) - n, err = reader.Read(dataRead) - require.NoError(t, err) - require.Equal(t, 3, n) - outData = []byte{3, 4, 5} - require.Equal(t, outData, dataRead) - - err = au.deleteStoredCatchpoints(context.Background(), au.accountsq) - require.NoError(t, err) -} - func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err error) { rows, err := tx.Query("SELECT address, data FROM accountbase") if err != nil { @@ -1246,7 +1144,7 @@ func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err er func BenchmarkLargeMerkleTrieRebuild(b *testing.B) { proto := config.Consensus[protocol.ConsensusCurrentVersion] - accts := []map[basics.Address]basics.AccountData{randomAccounts(5, true)} + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(5, true)} pooldata := basics.AccountData{} pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 @@ -1261,21 +1159,17 @@ func BenchmarkLargeMerkleTrieRebuild(b *testing.B) { ml := makeMockLedgerForTracker(b, true, 10, protocol.ConsensusCurrentVersion, accts) defer ml.Close() - au := &accountUpdates{} cfg := config.GetDefaultLocal() cfg.Archival = true - au.initialize(cfg, ".", proto, accts[0]) + au := newAcctUpdates(b, ml, cfg, ".") defer au.close() - err := au.loadFromDisk(ml) - require.NoError(b, err) - // at this point, the database was created. We want to fill the accounts data accountsNumber := 6000000 * b.N for i := 0; i < accountsNumber-5-2; { // subtract the account we've already created above, plus the sink/reward var updates compactAccountDeltas for k := 0; i < accountsNumber-5-2 && k < 1024; k++ { - addr := randomAddress() + addr := ledgertesting.RandomAddress() acctData := basics.AccountData{} acctData.MicroAlgos.Raw = 1 updates.upsert(addr, accountDelta{new: acctData}) @@ -1289,87 +1183,20 @@ func BenchmarkLargeMerkleTrieRebuild(b *testing.B) { require.NoError(b, err) } - err = ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - return updateAccountsRound(tx, 0, 1) + err := ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + return updateAccountsHashRound(tx, 1) }) require.NoError(b, err) au.close() b.ResetTimer() - err = au.loadFromDisk(ml) + err = au.loadFromDisk(ml, 0) require.NoError(b, err) b.StopTimer() b.ReportMetric(float64(accountsNumber), "entries/trie") } -func BenchmarkLargeCatchpointWriting(b *testing.B) { - proto := config.Consensus[protocol.ConsensusCurrentVersion] - - accts := []map[basics.Address]basics.AccountData{randomAccounts(5, true)} - - pooldata := basics.AccountData{} - pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 - pooldata.Status = basics.NotParticipating - accts[0][testPoolAddr] = pooldata - - sinkdata := basics.AccountData{} - sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 - sinkdata.Status = basics.NotParticipating - accts[0][testSinkAddr] = sinkdata - - ml := makeMockLedgerForTracker(b, true, 10, protocol.ConsensusCurrentVersion, accts) - defer ml.Close() - - au := &accountUpdates{} - cfg := config.GetDefaultLocal() - cfg.Archival = true - au.initialize(cfg, ".", proto, accts[0]) - defer au.close() - - temporaryDirectroy, err := ioutil.TempDir(os.TempDir(), "catchpoints") - require.NoError(b, err) - defer func() { - os.RemoveAll(temporaryDirectroy) - }() - catchpointsDirectory := filepath.Join(temporaryDirectroy, "catchpoints") - err = os.Mkdir(catchpointsDirectory, 0777) - require.NoError(b, err) - - au.dbDirectory = temporaryDirectroy - - err = au.loadFromDisk(ml) - require.NoError(b, err) - - // at this point, the database was created. We want to fill the accounts data - accountsNumber := 6000000 * b.N - err = ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - for i := 0; i < accountsNumber-5-2; { // subtract the account we've already created above, plus the sink/reward - var updates compactAccountDeltas - for k := 0; i < accountsNumber-5-2 && k < 1024; k++ { - addr := randomAddress() - acctData := basics.AccountData{} - acctData.MicroAlgos.Raw = 1 - updates.upsert(addr, accountDelta{new: acctData}) - i++ - } - - _, err = accountsNewRound(tx, updates, nil, proto, basics.Round(1)) - if err != nil { - return - } - } - - return updateAccountsRound(tx, 0, 1) - }) - require.NoError(b, err) - - b.ResetTimer() - au.generateCatchpoint(basics.Round(0), "0#ABCD", crypto.Digest{}, time.Second) - b.StopTimer() - b.ReportMetric(float64(accountsNumber), "accounts") -} - func BenchmarkCompactDeltas(b *testing.B) { b.Run("account-deltas", func(b *testing.B) { if b.N < 500 { @@ -1461,129 +1288,6 @@ func TestCompactDeltas(t *testing.T) { } -func TestReproducibleCatchpointLabels(t *testing.T) { - partitiontest.PartitionTest(t) - - if runtime.GOARCH == "arm" || runtime.GOARCH == "arm64" { - t.Skip("This test is too slow on ARM and causes travis builds to time out") - } - // create new protocol version, which has lower lookback - testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestReproducibleCatchpointLabels") - protoParams := config.Consensus[protocol.ConsensusCurrentVersion] - protoParams.MaxBalLookback = 32 - protoParams.SeedLookback = 2 - protoParams.SeedRefreshInterval = 8 - config.Consensus[testProtocolVersion] = protoParams - defer func() { - delete(config.Consensus, testProtocolVersion) - }() - - accts := []map[basics.Address]basics.AccountData{randomAccounts(20, true)} - rewardsLevels := []uint64{0} - - pooldata := basics.AccountData{} - pooldata.MicroAlgos.Raw = 100 * 1000 * 1000 * 1000 * 1000 - pooldata.Status = basics.NotParticipating - accts[0][testPoolAddr] = pooldata - - sinkdata := basics.AccountData{} - sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 - sinkdata.Status = basics.NotParticipating - accts[0][testSinkAddr] = sinkdata - - ml := makeMockLedgerForTracker(t, false, 1, testProtocolVersion, accts) - defer ml.Close() - - au := &accountUpdates{} - cfg := config.GetDefaultLocal() - cfg.CatchpointInterval = 50 - cfg.CatchpointTracking = 1 - au.initialize(cfg, ".", protoParams, accts[0]) - defer au.close() - - err := au.loadFromDisk(ml) - require.NoError(t, err) - - rewardLevel := uint64(0) - - const testCatchpointLabelsCount = 5 - - // lastCreatableID stores asset or app max used index to get rid of conflicts - lastCreatableID := crypto.RandUint64() % 512 - knownCreatables := make(map[basics.CreatableIndex]bool) - catchpointLabels := make(map[basics.Round]string) - ledgerHistory := make(map[basics.Round]*mockLedgerForTracker) - roundDeltas := make(map[basics.Round]ledgercore.StateDelta) - for i := basics.Round(1); i <= basics.Round(testCatchpointLabelsCount*cfg.CatchpointInterval); i++ { - rewardLevelDelta := crypto.RandUint64() % 5 - rewardLevel += rewardLevelDelta - var updates ledgercore.AccountDeltas - var totals map[basics.Address]basics.AccountData - base := accts[i-1] - updates, totals, lastCreatableID = randomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID) - prevTotals, err := au.Totals(basics.Round(i - 1)) - require.NoError(t, err) - - newPool := totals[testPoolAddr] - newPool.MicroAlgos.Raw -= prevTotals.RewardUnits() * rewardLevelDelta - updates.Upsert(testPoolAddr, newPool) - totals[testPoolAddr] = newPool - - blk := bookkeeping.Block{ - BlockHeader: bookkeeping.BlockHeader{ - Round: basics.Round(i), - }, - } - blk.RewardsLevel = rewardLevel - blk.CurrentProtocol = testProtocolVersion - delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0) - delta.Accts.MergeAccounts(updates) - delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables) - au.newBlock(blk, delta) - au.committedUpTo(i) - ml.addMockBlock(blockEntry{block: blk}, delta) - accts = append(accts, totals) - rewardsLevels = append(rewardsLevels, rewardLevel) - roundDeltas[i] = delta - - // if this is a catchpoint round, save the label. - if uint64(i)%cfg.CatchpointInterval == 0 { - au.waitAccountsWriting() - catchpointLabels[i] = au.GetLastCatchpointLabel() - ledgerHistory[i] = ml.fork(t) - defer ledgerHistory[i].Close() - } - } - - // test in revese what happens when we try to repeat the exact same blocks. - // start off with the catchpoint before the last one - startingRound := basics.Round((testCatchpointLabelsCount - 1) * cfg.CatchpointInterval) - for ; startingRound > basics.Round(cfg.CatchpointInterval); startingRound -= basics.Round(cfg.CatchpointInterval) { - au.close() - err := au.loadFromDisk(ledgerHistory[startingRound]) - require.NoError(t, err) - - for i := startingRound + 1; i <= basics.Round(testCatchpointLabelsCount*cfg.CatchpointInterval); i++ { - blk := bookkeeping.Block{ - BlockHeader: bookkeeping.BlockHeader{ - Round: basics.Round(i), - }, - } - blk.RewardsLevel = rewardsLevels[i] - blk.CurrentProtocol = testProtocolVersion - delta := roundDeltas[i] - au.newBlock(blk, delta) - au.committedUpTo(i) - - // if this is a catchpoint round, check the label. - if uint64(i)%cfg.CatchpointInterval == 0 { - au.waitAccountsWriting() - require.Equal(t, catchpointLabels[i], au.GetLastCatchpointLabel()) - } - } - } -} - // TestCachesInitialization test the functionality of the initializeCaches cache. func TestCachesInitialization(t *testing.T) { partitiontest.PartitionTest(t) @@ -1594,7 +1298,7 @@ func TestCachesInitialization(t *testing.T) { initialRounds := uint64(1) accountsCount := 5 - accts := []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)} + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)} rewardsLevels := []uint64{0} pooldata := basics.AccountData{} @@ -1611,10 +1315,8 @@ func TestCachesInitialization(t *testing.T) { ml.log.SetLevel(logging.Warn) defer ml.Close() - au := &accountUpdates{} - au.initialize(config.GetDefaultLocal(), ".", proto, accts[0]) - err := au.loadFromDisk(ml) - require.NoError(t, err) + conf := config.GetDefaultLocal() + au := newAcctUpdates(t, ml, conf, ".") // cover initialRounds genesis blocks rewardLevel := uint64(0) @@ -1630,7 +1332,7 @@ func TestCachesInitialization(t *testing.T) { rewardLevel += rewardLevelDelta accountChanges := 2 - updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) + updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(t, err) @@ -1652,15 +1354,15 @@ func TestCachesInitialization(t *testing.T) { delta.Totals = accumulateTotals(t, protocol.ConsensusCurrentVersion, []map[basics.Address]basics.AccountData{totals}, rewardLevel) ml.addMockBlock(blockEntry{block: blk}, delta) au.newBlock(blk, delta) - au.committedUpTo(basics.Round(i)) - au.waitAccountsWriting() + ml.trackers.committedUpTo(basics.Round(i)) + ml.trackers.waitAccountsWriting() accts = append(accts, totals) rewardsLevels = append(rewardsLevels, rewardLevel) } au.close() // reset the accounts, since their balances are now changed due to the rewards. - accts = []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)} + accts = []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)} // create another mocked ledger, but this time with a fresh new tracker database. ml2 := makeMockLedgerForTracker(t, true, int(initialRounds), protocolVersion, accts) @@ -1671,15 +1373,13 @@ func TestCachesInitialization(t *testing.T) { ml2.blocks = ml.blocks ml2.deltas = ml.deltas - au = &accountUpdates{} - au.initialize(config.GetDefaultLocal(), ".", proto, accts[0]) - err = au.loadFromDisk(ml2) - require.NoError(t, err) + conf = config.GetDefaultLocal() + au = newAcctUpdates(t, ml2, conf, ".") defer au.close() // make sure the deltas array end up containing only the most recent 320 rounds. require.Equal(t, int(proto.MaxBalLookback), len(au.deltas)) - require.Equal(t, recoveredLedgerRound-basics.Round(proto.MaxBalLookback), au.dbRound) + require.Equal(t, recoveredLedgerRound-basics.Round(proto.MaxBalLookback), au.cachedDBRound) } // TestSplittingConsensusVersionCommits tests the a sequence of commits that spans over multiple consensus versions works correctly. @@ -1692,7 +1392,7 @@ func TestSplittingConsensusVersionCommits(t *testing.T) { initialRounds := uint64(1) accountsCount := 5 - accts := []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)} + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)} rewardsLevels := []uint64{0} pooldata := basics.AccountData{} @@ -1709,9 +1409,10 @@ func TestSplittingConsensusVersionCommits(t *testing.T) { ml.log.SetLevel(logging.Warn) defer ml.Close() - au := &accountUpdates{} - au.initialize(config.GetDefaultLocal(), ".", initialProtoParams, accts[0]) - err := au.loadFromDisk(ml) + conf := config.GetDefaultLocal() + au := newAcctUpdates(t, ml, conf, ".") + + err := au.loadFromDisk(ml, 0) require.NoError(t, err) defer au.close() @@ -1730,7 +1431,7 @@ func TestSplittingConsensusVersionCommits(t *testing.T) { rewardLevel += rewardLevelDelta accountChanges := 2 - updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) + updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(t, err) @@ -1765,7 +1466,7 @@ func TestSplittingConsensusVersionCommits(t *testing.T) { rewardLevel += rewardLevelDelta accountChanges := 2 - updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) + updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(t, err) @@ -1790,10 +1491,10 @@ func TestSplittingConsensusVersionCommits(t *testing.T) { accts = append(accts, totals) rewardsLevels = append(rewardsLevels, rewardLevel) } - // now, commit and verify that the committedUpTo method broken the range correctly. - au.committedUpTo(lastRoundToWrite) - au.waitAccountsWriting() - require.Equal(t, basics.Round(initialRounds+extraRounds)-1, au.dbRound) + // now, commit and verify that the produceCommittingTask method broken the range correctly. + ml.trackers.committedUpTo(lastRoundToWrite) + ml.trackers.waitAccountsWriting() + require.Equal(t, basics.Round(initialRounds+extraRounds)-1, au.cachedDBRound) } @@ -1808,7 +1509,7 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) { initialRounds := uint64(1) accountsCount := 5 - accts := []map[basics.Address]basics.AccountData{randomAccounts(accountsCount, true)} + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(accountsCount, true)} rewardsLevels := []uint64{0} pooldata := basics.AccountData{} @@ -1825,9 +1526,10 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) { ml.log.SetLevel(logging.Warn) defer ml.Close() - au := &accountUpdates{} - au.initialize(config.GetDefaultLocal(), ".", initialProtoParams, accts[0]) - err := au.loadFromDisk(ml) + conf := config.GetDefaultLocal() + au := newAcctUpdates(t, ml, conf, ".") + + err := au.loadFromDisk(ml, 0) require.NoError(t, err) defer au.close() @@ -1846,7 +1548,7 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) { rewardLevel += rewardLevelDelta accountChanges := 2 - updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) + updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(t, err) @@ -1880,7 +1582,7 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) { rewardLevel += rewardLevelDelta accountChanges := 2 - updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) + updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(t, err) @@ -1905,10 +1607,10 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) { accts = append(accts, totals) rewardsLevels = append(rewardsLevels, rewardLevel) } - // now, commit and verify that the committedUpTo method broken the range correctly. - au.committedUpTo(endOfFirstNewProtocolSegment) - au.waitAccountsWriting() - require.Equal(t, basics.Round(initialRounds+extraRounds)-1, au.dbRound) + // now, commit and verify that the produceCommittingTask method broken the range correctly. + ml.trackers.committedUpTo(endOfFirstNewProtocolSegment) + ml.trackers.waitAccountsWriting() + require.Equal(t, basics.Round(initialRounds+extraRounds)-1, au.cachedDBRound) // write additional extraRounds elements and verify these can be flushed. for i := endOfFirstNewProtocolSegment + 1; i <= basics.Round(initialRounds+2*extraRounds+initialProtoParams.MaxBalLookback); i++ { @@ -1916,7 +1618,7 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) { rewardLevel += rewardLevelDelta accountChanges := 2 - updates, totals := randomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) + updates, totals := ledgertesting.RandomDeltasBalanced(accountChanges, accts[i-1], rewardLevel) prevTotals, err := au.Totals(basics.Round(i - 1)) require.NoError(t, err) @@ -1941,9 +1643,9 @@ func TestSplittingConsensusVersionCommitsBoundry(t *testing.T) { accts = append(accts, totals) rewardsLevels = append(rewardsLevels, rewardLevel) } - au.committedUpTo(endOfFirstNewProtocolSegment + basics.Round(extraRounds)) - au.waitAccountsWriting() - require.Equal(t, basics.Round(initialRounds+2*extraRounds), au.dbRound) + ml.trackers.committedUpTo(endOfFirstNewProtocolSegment + basics.Round(extraRounds)) + ml.trackers.waitAccountsWriting() + require.Equal(t, basics.Round(initialRounds+2*extraRounds), au.cachedDBRound) } // TestConsecutiveVersion tests the consecutiveVersion method correctness. |