From 82eabffaa24b591554c2d83b36d8003ccf555a89 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Wed, 19 Jan 2022 19:17:52 -0500 Subject: Fix a potential problem of committing non-uniform consensus versions (#3453) If accountdb accumulates a large backlog, it is possible catchpoint tracker would attempt to commit a wider range than account updates tracker expects. --- ledger/accountdb.go | 2 +- ledger/acctupdates_test.go | 18 ++++-- ledger/catchpointtracker.go | 6 +- ledger/catchpointtracker_test.go | 4 ++ ledger/tracker.go | 11 ++++ ledger/tracker_test.go | 125 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 158 insertions(+), 8 deletions(-) create mode 100644 ledger/tracker_test.go diff --git a/ledger/accountdb.go b/ledger/accountdb.go index 514332265..788ccdc37 100644 --- a/ledger/accountdb.go +++ b/ledger/accountdb.go @@ -1228,7 +1228,7 @@ func updateAccountsHashRound(tx *sql.Tx, hashRound basics.Round) (err error) { } if aff != 1 { - err = fmt.Errorf("updateAccountsRound(hashbase,%d): expected to update 1 row but got %d", hashRound, aff) + err = fmt.Errorf("updateAccountsHashRound(hashbase,%d): expected to update 1 row but got %d", hashRound, aff) return } return diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index f1434b02f..1e77ef49a 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -75,12 +75,10 @@ func accumulateTotals(t testing.TB, consensusVersion protocol.ConsensusVersion, return } -func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData) *mockLedgerForTracker { +func makeMockLedgerForTrackerWithLogger(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData, l logging.Logger) *mockLedgerForTracker { dbs, fileName := dbOpenTest(t, inMemory) - dblogger := logging.TestingLog(t) - dblogger.SetLevel(logging.Info) - dbs.Rdb.SetLogger(dblogger) - dbs.Wdb.SetLogger(dblogger) + dbs.Rdb.SetLogger(l) + dbs.Wdb.SetLogger(l) blocks := randomInitChain(consensusVersion, initialBlocksCount) deltas := make([]ledgercore.StateDelta, initialBlocksCount) @@ -92,7 +90,15 @@ func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount in } } consensusParams := config.Consensus[consensusVersion] - return &mockLedgerForTracker{dbs: dbs, log: dblogger, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]} + return &mockLedgerForTracker{dbs: dbs, log: l, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]} + +} + +func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData) *mockLedgerForTracker { + dblogger := logging.TestingLog(t) + dblogger.SetLevel(logging.Info) + + return makeMockLedgerForTrackerWithLogger(t, inMemory, initialBlocksCount, consensusVersion, accts, dblogger) } // fork creates another database which has the same content as the current one. Works only for non-memory databases. diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 58d6e77b7..087580f42 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -272,7 +272,11 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round, return nil } - dcr.offset = uint64(newBase - dcr.oldBase) + newOffset := uint64(newBase - dcr.oldBase) + // trackers are not allowed to increase offsets, only descease + if newOffset < dcr.offset { + dcr.offset = newOffset + } // check to see if this is a catchpoint round dcr.isCatchpointRound = ct.isCatchpointRound(dcr.offset, dcr.oldBase, dcr.lookback) diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index b5778691e..b75d07ada 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -114,6 +114,7 @@ func TestGetCatchpointStream(t *testing.T) { // File on disk, and database has the record reader, err := ct.GetCatchpointStream(basics.Round(1)) + require.NoError(t, err) n, err = reader.Read(dataRead) require.NoError(t, err) require.Equal(t, 3, n) @@ -125,13 +126,16 @@ func TestGetCatchpointStream(t *testing.T) { // File deleted, but record in the database err = os.Remove(filepath.Join(temporaryDirectroy, "catchpoints", "2.catchpoint")) + require.NoError(t, err) reader, err = ct.GetCatchpointStream(basics.Round(2)) require.Equal(t, ledgercore.ErrNoEntry{}, err) require.Nil(t, reader) // File on disk, but database lost the record err = ct.accountsq.storeCatchpoint(context.Background(), basics.Round(3), "", "", 0) + require.NoError(t, err) reader, err = ct.GetCatchpointStream(basics.Round(3)) + require.NoError(t, err) n, err = reader.Read(dataRead) require.NoError(t, err) require.Equal(t, 3, n) diff --git a/ledger/tracker.go b/ledger/tracker.go index b5d608c2b..57b1e52c4 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -91,6 +91,9 @@ type ledgerTracker interface { // effort, and all the trackers contribute to that effort. All the trackers are being handed a // pointer to the deferredCommitRange, and have the ability to either modify it, or return a // nil. If nil is returned, the commit would be skipped. + // The contract: + // offset must not be greater than the received dcr.offset value of non zero + // oldBase must not be modifed if non zero produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange // prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data. @@ -321,10 +324,18 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round) } cdr := &dcc.deferredCommitRange for _, lt := range tr.trackers { + base := cdr.oldBase + offset := cdr.offset cdr = lt.produceCommittingTask(blockqRound, dbRound, cdr) if cdr == nil { break } + if offset > 0 && cdr.offset > offset { + tr.log.Warnf("tracker %T produced offset %d but expected not greater than %d, dbRound %d, latestRound %d", lt, cdr.offset, offset, dbRound, blockqRound) + } + if base > 0 && base != cdr.oldBase { + tr.log.Warnf("tracker %T modified oldBase %d that expected to be %d, dbRound %d, latestRound %d", lt, cdr.oldBase, base, dbRound, blockqRound) + } } if cdr != nil { dcc.deferredCommitRange = *cdr diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go new file mode 100644 index 000000000..731772a24 --- /dev/null +++ b/ledger/tracker_test.go @@ -0,0 +1,125 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/ledger/ledgercore" + ledgertesting "github.com/algorand/go-algorand/ledger/testing" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/test/partitiontest" +) + +// TestTrackerScheduleCommit checks catchpointTracker.produceCommittingTask does not increase commit offset relative +// to the value set by accountUpdates +func TestTrackerScheduleCommit(t *testing.T) { + partitiontest.PartitionTest(t) + + a := require.New(t) + + var bufNewLogger bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&bufNewLogger) + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(1, true)} + ml := makeMockLedgerForTrackerWithLogger(t, true, 10, protocol.ConsensusCurrentVersion, accts, log) + defer ml.Close() + + conf := config.GetDefaultLocal() + conf.CatchpointTracking = 1 + conf.CatchpointInterval = 10 + + au := &accountUpdates{} + ct := &catchpointTracker{} + au.initialize(conf) + ct.initialize(conf, ".") + + _, err := trackerDBInitialize(ml, false, ".") + a.NoError(err) + + ml.trackers.initialize(ml, []ledgerTracker{au, ct}, conf) + defer ml.trackers.close() + err = ml.trackers.loadFromDisk(ml) + a.NoError(err) + // close commitSyncer goroutine + ml.trackers.ctxCancel() + ml.trackers.ctxCancel = nil + <-ml.trackers.commitSyncerClosed + ml.trackers.commitSyncerClosed = nil + + // simulate situation when au returns smaller offset b/c of consecutive versions + // and ct increses it + // base = 1, offset = 100, lookback = 16 + // lastest = 1000 + // would give a large mostRecentCatchpointRound value => large newBase => larger offset + + expectedOffset := uint64(100) + blockqRound := basics.Round(1000) + lookback := basics.Round(16) + dbRound := basics.Round(1) + + // prepare deltas and versions + au.accountsMu.Lock() + au.deltas = make([]ledgercore.AccountDeltas, int(blockqRound)) + au.deltasAccum = make([]int, int(blockqRound)) + au.versions = make([]protocol.ConsensusVersion, int(blockqRound)) + for i := 0; i <= int(expectedOffset); i++ { + au.versions[i] = protocol.ConsensusCurrentVersion + } + for i := int(expectedOffset) + 1; i < len(au.versions); i++ { + au.versions[i] = protocol.ConsensusFuture + } + au.accountsMu.Unlock() + + // ensure au and ct produce data we expect + dcc := &deferredCommitContext{ + deferredCommitRange: deferredCommitRange{ + lookback: lookback, + }, + } + cdr := &dcc.deferredCommitRange + + cdr = au.produceCommittingTask(blockqRound, dbRound, cdr) + a.NotNil(cdr) + a.Equal(expectedOffset, cdr.offset) + + cdr = ct.produceCommittingTask(blockqRound, dbRound, cdr) + a.NotNil(cdr) + // before the fix + // expectedOffset = uint64(blockqRound - lookback - dbRound) // 983 + a.Equal(expectedOffset, cdr.offset) + + // schedule the commit. au is expected to return offset 100 and + ml.trackers.mu.Lock() + ml.trackers.dbRound = dbRound + ml.trackers.mu.Unlock() + ml.trackers.scheduleCommit(blockqRound, lookback) + + a.Equal(1, len(ml.trackers.deferredCommits)) + // before the fix + // a.Contains(bufNewLogger.String(), "tracker *ledger.catchpointTracker produced offset 983") + a.NotContains(bufNewLogger.String(), "tracker *ledger.catchpointTracker produced offset") + dc := <-ml.trackers.deferredCommits + a.Equal(expectedOffset, dc.offset) +} -- cgit v1.2.3