diff options
Diffstat (limited to 'ledger/catchpointtracker_test.go')
-rw-r--r-- | ledger/catchpointtracker_test.go | 415 |
1 files changed, 415 insertions, 0 deletions
diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go new file mode 100644 index 000000000..64db5f275 --- /dev/null +++ b/ledger/catchpointtracker_test.go @@ -0,0 +1,415 @@ +// Copyright (C) 2019-2021 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package ledger + +import ( + "context" + "database/sql" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger/ledgercore" + ledgertesting "github.com/algorand/go-algorand/ledger/testing" + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/test/partitiontest" +) + +func TestIsWritingCatchpointFile(t *testing.T) { + partitiontest.PartitionTest(t) + + ct := &catchpointTracker{} + + ct.catchpointWriting = -1 + ans := ct.IsWritingCatchpointFile() + require.True(t, ans) + + ct.catchpointWriting = 0 + ans = ct.IsWritingCatchpointFile() + require.False(t, ans) +} + +func newCatchpointTracker(tb testing.TB, l *mockLedgerForTracker, conf config.Local, dbPathPrefix string) *catchpointTracker { + au := &accountUpdates{} + ct := &catchpointTracker{} + au.initialize(conf) + ct.initialize(conf, dbPathPrefix) + _, err := trackerDBInitialize(l, ct.catchpointEnabled(), dbPathPrefix) + require.NoError(tb, err) + + err = l.trackers.initialize(l, []ledgerTracker{au, ct}, conf) + require.NoError(tb, err) + err = l.trackers.loadFromDisk(l) + require.NoError(tb, err) + return ct +} + +func TestGetCatchpointStream(t *testing.T) { + partitiontest.PartitionTest(t) + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} + + ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts) + defer ml.Close() + + conf := config.GetDefaultLocal() + conf.CatchpointInterval = 1 + ct := newCatchpointTracker(t, ml, conf, ".") + defer ct.close() + + filesToCreate := 4 + + temporaryDirectroy, err := ioutil.TempDir(os.TempDir(), "catchpoints") + require.NoError(t, err) + defer func() { + os.RemoveAll(temporaryDirectroy) + }() + catchpointsDirectory := filepath.Join(temporaryDirectroy, "catchpoints") + err = os.Mkdir(catchpointsDirectory, 0777) + require.NoError(t, err) + + ct.dbDirectory = temporaryDirectroy + + // Create the catchpoint files with dummy data + for i := 0; i < filesToCreate; i++ { + fileName := filepath.Join("catchpoints", fmt.Sprintf("%d.catchpoint", i)) + data := []byte{byte(i), byte(i + 1), byte(i + 2)} + err = ioutil.WriteFile(filepath.Join(temporaryDirectroy, fileName), data, 0666) + require.NoError(t, err) + + // Store the catchpoint into the database + err := ct.accountsq.storeCatchpoint(context.Background(), basics.Round(i), fileName, "", int64(len(data))) + require.NoError(t, err) + } + + dataRead := make([]byte, 3) + var n int + + // File on disk, and database has the record + reader, err := ct.GetCatchpointStream(basics.Round(1)) + n, err = reader.Read(dataRead) + require.NoError(t, err) + require.Equal(t, 3, n) + outData := []byte{1, 2, 3} + require.Equal(t, outData, dataRead) + len, err := reader.Size() + require.NoError(t, err) + require.Equal(t, int64(3), len) + + // File deleted, but record in the database + err = os.Remove(filepath.Join(temporaryDirectroy, "catchpoints", "2.catchpoint")) + reader, err = ct.GetCatchpointStream(basics.Round(2)) + require.Equal(t, ledgercore.ErrNoEntry{}, err) + require.Nil(t, reader) + + // File on disk, but database lost the record + err = ct.accountsq.storeCatchpoint(context.Background(), basics.Round(3), "", "", 0) + reader, err = ct.GetCatchpointStream(basics.Round(3)) + n, err = reader.Read(dataRead) + require.NoError(t, err) + require.Equal(t, 3, n) + outData = []byte{3, 4, 5} + require.Equal(t, outData, dataRead) + + err = deleteStoredCatchpoints(context.Background(), ct.accountsq, ct.dbDirectory) + require.NoError(t, err) +} + +// TestAcctUpdatesDeleteStoredCatchpoints - The goal of this test is to verify that the deleteStoredCatchpoints function works correctly. +// it doing so by filling up the storedcatchpoints with dummy catchpoint file entries, as well as creating these dummy files on disk. +// ( the term dummy is only because these aren't real catchpoint files, but rather a zero-length file ). Then, the test call the function +// and ensures that it did not errored, the catchpoint files were correctly deleted, and that deleteStoredCatchpoints contains no more +// entries. +func TestAcctUpdatesDeleteStoredCatchpoints(t *testing.T) { + partitiontest.PartitionTest(t) + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} + + ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts) + defer ml.Close() + + conf := config.GetDefaultLocal() + conf.CatchpointInterval = 1 + ct := newCatchpointTracker(t, ml, conf, ".") + defer ct.close() + + dummyCatchpointFilesToCreate := 42 + + for i := 0; i < dummyCatchpointFilesToCreate; i++ { + f, err := os.Create(fmt.Sprintf("./dummy_catchpoint_file-%d", i)) + require.NoError(t, err) + err = f.Close() + require.NoError(t, err) + } + + for i := 0; i < dummyCatchpointFilesToCreate; i++ { + err := ct.accountsq.storeCatchpoint(context.Background(), basics.Round(i), fmt.Sprintf("./dummy_catchpoint_file-%d", i), "", 0) + require.NoError(t, err) + } + err := deleteStoredCatchpoints(context.Background(), ct.accountsq, ct.dbDirectory) + require.NoError(t, err) + + for i := 0; i < dummyCatchpointFilesToCreate; i++ { + // ensure that all the files were deleted. + _, err := os.Open(fmt.Sprintf("./dummy_catchpoint_file-%d", i)) + require.True(t, os.IsNotExist(err)) + } + fileNames, err := ct.accountsq.getOldestCatchpointFiles(context.Background(), dummyCatchpointFilesToCreate, 0) + require.NoError(t, err) + require.Equal(t, 0, len(fileNames)) +} + +func BenchmarkLargeCatchpointWriting(b *testing.B) { + proto := config.Consensus[protocol.ConsensusCurrentVersion] + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(5, true)} + + pooldata := basics.AccountData{} + pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 + pooldata.Status = basics.NotParticipating + accts[0][testPoolAddr] = pooldata + + sinkdata := basics.AccountData{} + sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 + sinkdata.Status = basics.NotParticipating + accts[0][testSinkAddr] = sinkdata + + ml := makeMockLedgerForTracker(b, true, 10, protocol.ConsensusCurrentVersion, accts) + defer ml.Close() + + cfg := config.GetDefaultLocal() + cfg.Archival = true + ct := catchpointTracker{} + ct.initialize(cfg, ".") + + temporaryDirectroy, err := ioutil.TempDir(os.TempDir(), "catchpoints") + require.NoError(b, err) + defer func() { + os.RemoveAll(temporaryDirectroy) + }() + catchpointsDirectory := filepath.Join(temporaryDirectroy, "catchpoints") + err = os.Mkdir(catchpointsDirectory, 0777) + require.NoError(b, err) + + ct.dbDirectory = temporaryDirectroy + + err = ct.loadFromDisk(ml, 0) + require.NoError(b, err) + defer ct.close() + + // at this point, the database was created. We want to fill the accounts data + accountsNumber := 6000000 * b.N + err = ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + for i := 0; i < accountsNumber-5-2; { // subtract the account we've already created above, plus the sink/reward + var updates compactAccountDeltas + for k := 0; i < accountsNumber-5-2 && k < 1024; k++ { + addr := ledgertesting.RandomAddress() + acctData := basics.AccountData{} + acctData.MicroAlgos.Raw = 1 + updates.upsert(addr, accountDelta{new: acctData}) + i++ + } + + _, err = accountsNewRound(tx, updates, nil, proto, basics.Round(1)) + if err != nil { + return + } + } + + return updateAccountsHashRound(tx, 1) + }) + require.NoError(b, err) + + b.ResetTimer() + ct.generateCatchpoint(context.Background(), basics.Round(0), "0#ABCD", crypto.Digest{}, time.Second) + b.StopTimer() + b.ReportMetric(float64(accountsNumber), "accounts") +} + +func TestReproducibleCatchpointLabels(t *testing.T) { + partitiontest.PartitionTest(t) + + if runtime.GOARCH == "arm" || runtime.GOARCH == "arm64" { + t.Skip("This test is too slow on ARM and causes travis builds to time out") + } + // create new protocol version, which has lower lookback + testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestReproducibleCatchpointLabels") + protoParams := config.Consensus[protocol.ConsensusCurrentVersion] + protoParams.MaxBalLookback = 32 + protoParams.SeedLookback = 2 + protoParams.SeedRefreshInterval = 8 + config.Consensus[testProtocolVersion] = protoParams + defer func() { + delete(config.Consensus, testProtocolVersion) + }() + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} + rewardsLevels := []uint64{0} + + pooldata := basics.AccountData{} + pooldata.MicroAlgos.Raw = 100 * 1000 * 1000 * 1000 * 1000 + pooldata.Status = basics.NotParticipating + accts[0][testPoolAddr] = pooldata + + sinkdata := basics.AccountData{} + sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 + sinkdata.Status = basics.NotParticipating + accts[0][testSinkAddr] = sinkdata + + ml := makeMockLedgerForTracker(t, false, 1, testProtocolVersion, accts) + defer ml.Close() + + cfg := config.GetDefaultLocal() + cfg.CatchpointInterval = 50 + cfg.CatchpointTracking = 1 + ct := newCatchpointTracker(t, ml, cfg, ".") + au := ml.trackers.accts + defer ct.close() + + rewardLevel := uint64(0) + + const testCatchpointLabelsCount = 5 + + // lastCreatableID stores asset or app max used index to get rid of conflicts + lastCreatableID := crypto.RandUint64() % 512 + knownCreatables := make(map[basics.CreatableIndex]bool) + catchpointLabels := make(map[basics.Round]string) + ledgerHistory := make(map[basics.Round]*mockLedgerForTracker) + roundDeltas := make(map[basics.Round]ledgercore.StateDelta) + for i := basics.Round(1); i <= basics.Round(testCatchpointLabelsCount*cfg.CatchpointInterval); i++ { + rewardLevelDelta := crypto.RandUint64() % 5 + rewardLevel += rewardLevelDelta + var updates ledgercore.AccountDeltas + var totals map[basics.Address]basics.AccountData + base := accts[i-1] + updates, totals, lastCreatableID = ledgertesting.RandomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID) + prevTotals, err := au.Totals(basics.Round(i - 1)) + require.NoError(t, err) + + newPool := totals[testPoolAddr] + newPool.MicroAlgos.Raw -= prevTotals.RewardUnits() * rewardLevelDelta + updates.Upsert(testPoolAddr, newPool) + totals[testPoolAddr] = newPool + + newTotals := ledgertesting.CalculateNewRoundAccountTotals(t, updates, rewardLevel, protoParams, base, prevTotals) + + blk := bookkeeping.Block{ + BlockHeader: bookkeeping.BlockHeader{ + Round: basics.Round(i), + }, + } + blk.RewardsLevel = rewardLevel + blk.CurrentProtocol = testProtocolVersion + delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0) + delta.Accts.MergeAccounts(updates) + delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables) + delta.Totals = newTotals + + ml.trackers.newBlock(blk, delta) + ml.trackers.committedUpTo(i) + ml.addMockBlock(blockEntry{block: blk}, delta) + accts = append(accts, totals) + rewardsLevels = append(rewardsLevels, rewardLevel) + roundDeltas[i] = delta + + // if this is a catchpoint round, save the label. + if uint64(i)%cfg.CatchpointInterval == 0 { + ml.trackers.waitAccountsWriting() + catchpointLabels[i] = ct.GetLastCatchpointLabel() + ledgerHistory[i] = ml.fork(t) + defer ledgerHistory[i].Close() + } + } + + // test in revese what happens when we try to repeat the exact same blocks. + // start off with the catchpoint before the last one + startingRound := basics.Round((testCatchpointLabelsCount - 1) * cfg.CatchpointInterval) + for ; startingRound > basics.Round(cfg.CatchpointInterval); startingRound -= basics.Round(cfg.CatchpointInterval) { + au.close() + ml2 := ledgerHistory[startingRound] + + ct := newCatchpointTracker(t, ml2, cfg, ".") + for i := startingRound + 1; i <= basics.Round(testCatchpointLabelsCount*cfg.CatchpointInterval); i++ { + blk := bookkeeping.Block{ + BlockHeader: bookkeeping.BlockHeader{ + Round: basics.Round(i), + }, + } + blk.RewardsLevel = rewardsLevels[i] + blk.CurrentProtocol = testProtocolVersion + delta := roundDeltas[i] + ml2.trackers.newBlock(blk, delta) + ml2.trackers.committedUpTo(i) + + // if this is a catchpoint round, check the label. + if uint64(i)%cfg.CatchpointInterval == 0 { + ml2.trackers.waitAccountsWriting() + require.Equal(t, catchpointLabels[i], ct.GetLastCatchpointLabel()) + } + } + } + + // test to see that after loadFromDisk, all the tracker content is lost ( as expected ) + require.NotZero(t, len(ct.roundDigest)) + require.NoError(t, ct.loadFromDisk(ml, ml.Latest())) + require.Zero(t, len(ct.roundDigest)) + require.Zero(t, ct.catchpointWriting) + select { + case _, closed := <-ct.catchpointSlowWriting: + require.False(t, closed) + default: + require.FailNow(t, "The catchpointSlowWriting should have been a closed channel; it seems to be a nil ?!") + } +} + +func TestCatchpointTrackerPrepareCommit(t *testing.T) { + partitiontest.PartitionTest(t) + + ct := &catchpointTracker{} + const maxOffset = 40 + const maxLookback = 320 + ct.roundDigest = make([]crypto.Digest, maxOffset+maxLookback) + for i := 0; i < len(ct.roundDigest); i++ { + ct.roundDigest[i] = crypto.Hash([]byte{byte(i), byte(i / 256)}) + } + dcc := &deferredCommitContext{} + for offset := uint64(1); offset < maxOffset; offset++ { + dcc.offset = offset + for lookback := basics.Round(0); lookback < maxLookback; lookback += 20 { + dcc.lookback = lookback + for _, isCatchpointRound := range []bool{false, true} { + dcc.isCatchpointRound = isCatchpointRound + require.NoError(t, ct.prepareCommit(dcc)) + if isCatchpointRound { + expectedRound := offset + uint64(lookback) - 1 + expectedHash := crypto.Hash([]byte{byte(expectedRound), byte(expectedRound / 256)}) + require.Equal(t, expectedHash[:], dcc.committedRoundDigest[:]) + } + } + } + } +} |