summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com>2022-01-19 19:17:52 -0500
committerGitHub <noreply@github.com>2022-01-19 19:17:52 -0500
commit82eabffaa24b591554c2d83b36d8003ccf555a89 (patch)
tree280201c7416688416f9a1df23e3c432be80a2285
parent2edd3decd1066696a08a90b48e2cfcee11b90323 (diff)
Fix a potential problem of committing non-uniform consensus versions (#3453)
If accountdb accumulates a large backlog, it is possible catchpoint tracker would attempt to commit a wider range than account updates tracker expects.
-rw-r--r--ledger/accountdb.go2
-rw-r--r--ledger/acctupdates_test.go18
-rw-r--r--ledger/catchpointtracker.go6
-rw-r--r--ledger/catchpointtracker_test.go4
-rw-r--r--ledger/tracker.go11
-rw-r--r--ledger/tracker_test.go125
6 files changed, 158 insertions, 8 deletions
diff --git a/ledger/accountdb.go b/ledger/accountdb.go
index 514332265..788ccdc37 100644
--- a/ledger/accountdb.go
+++ b/ledger/accountdb.go
@@ -1228,7 +1228,7 @@ func updateAccountsHashRound(tx *sql.Tx, hashRound basics.Round) (err error) {
}
if aff != 1 {
- err = fmt.Errorf("updateAccountsRound(hashbase,%d): expected to update 1 row but got %d", hashRound, aff)
+ err = fmt.Errorf("updateAccountsHashRound(hashbase,%d): expected to update 1 row but got %d", hashRound, aff)
return
}
return
diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go
index f1434b02f..1e77ef49a 100644
--- a/ledger/acctupdates_test.go
+++ b/ledger/acctupdates_test.go
@@ -75,12 +75,10 @@ func accumulateTotals(t testing.TB, consensusVersion protocol.ConsensusVersion,
return
}
-func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData) *mockLedgerForTracker {
+func makeMockLedgerForTrackerWithLogger(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData, l logging.Logger) *mockLedgerForTracker {
dbs, fileName := dbOpenTest(t, inMemory)
- dblogger := logging.TestingLog(t)
- dblogger.SetLevel(logging.Info)
- dbs.Rdb.SetLogger(dblogger)
- dbs.Wdb.SetLogger(dblogger)
+ dbs.Rdb.SetLogger(l)
+ dbs.Wdb.SetLogger(l)
blocks := randomInitChain(consensusVersion, initialBlocksCount)
deltas := make([]ledgercore.StateDelta, initialBlocksCount)
@@ -92,7 +90,15 @@ func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount in
}
}
consensusParams := config.Consensus[consensusVersion]
- return &mockLedgerForTracker{dbs: dbs, log: dblogger, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]}
+ return &mockLedgerForTracker{dbs: dbs, log: l, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]}
+
+}
+
+func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData) *mockLedgerForTracker {
+ dblogger := logging.TestingLog(t)
+ dblogger.SetLevel(logging.Info)
+
+ return makeMockLedgerForTrackerWithLogger(t, inMemory, initialBlocksCount, consensusVersion, accts, dblogger)
}
// fork creates another database which has the same content as the current one. Works only for non-memory databases.
diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go
index 58d6e77b7..087580f42 100644
--- a/ledger/catchpointtracker.go
+++ b/ledger/catchpointtracker.go
@@ -272,7 +272,11 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round,
return nil
}
- dcr.offset = uint64(newBase - dcr.oldBase)
+ newOffset := uint64(newBase - dcr.oldBase)
+ // trackers are not allowed to increase offsets, only descease
+ if newOffset < dcr.offset {
+ dcr.offset = newOffset
+ }
// check to see if this is a catchpoint round
dcr.isCatchpointRound = ct.isCatchpointRound(dcr.offset, dcr.oldBase, dcr.lookback)
diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go
index b5778691e..b75d07ada 100644
--- a/ledger/catchpointtracker_test.go
+++ b/ledger/catchpointtracker_test.go
@@ -114,6 +114,7 @@ func TestGetCatchpointStream(t *testing.T) {
// File on disk, and database has the record
reader, err := ct.GetCatchpointStream(basics.Round(1))
+ require.NoError(t, err)
n, err = reader.Read(dataRead)
require.NoError(t, err)
require.Equal(t, 3, n)
@@ -125,13 +126,16 @@ func TestGetCatchpointStream(t *testing.T) {
// File deleted, but record in the database
err = os.Remove(filepath.Join(temporaryDirectroy, "catchpoints", "2.catchpoint"))
+ require.NoError(t, err)
reader, err = ct.GetCatchpointStream(basics.Round(2))
require.Equal(t, ledgercore.ErrNoEntry{}, err)
require.Nil(t, reader)
// File on disk, but database lost the record
err = ct.accountsq.storeCatchpoint(context.Background(), basics.Round(3), "", "", 0)
+ require.NoError(t, err)
reader, err = ct.GetCatchpointStream(basics.Round(3))
+ require.NoError(t, err)
n, err = reader.Read(dataRead)
require.NoError(t, err)
require.Equal(t, 3, n)
diff --git a/ledger/tracker.go b/ledger/tracker.go
index b5d608c2b..57b1e52c4 100644
--- a/ledger/tracker.go
+++ b/ledger/tracker.go
@@ -91,6 +91,9 @@ type ledgerTracker interface {
// effort, and all the trackers contribute to that effort. All the trackers are being handed a
// pointer to the deferredCommitRange, and have the ability to either modify it, or return a
// nil. If nil is returned, the commit would be skipped.
+ // The contract:
+ // offset must not be greater than the received dcr.offset value of non zero
+ // oldBase must not be modifed if non zero
produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange
// prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data.
@@ -321,10 +324,18 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round)
}
cdr := &dcc.deferredCommitRange
for _, lt := range tr.trackers {
+ base := cdr.oldBase
+ offset := cdr.offset
cdr = lt.produceCommittingTask(blockqRound, dbRound, cdr)
if cdr == nil {
break
}
+ if offset > 0 && cdr.offset > offset {
+ tr.log.Warnf("tracker %T produced offset %d but expected not greater than %d, dbRound %d, latestRound %d", lt, cdr.offset, offset, dbRound, blockqRound)
+ }
+ if base > 0 && base != cdr.oldBase {
+ tr.log.Warnf("tracker %T modified oldBase %d that expected to be %d, dbRound %d, latestRound %d", lt, cdr.oldBase, base, dbRound, blockqRound)
+ }
}
if cdr != nil {
dcc.deferredCommitRange = *cdr
diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go
new file mode 100644
index 000000000..731772a24
--- /dev/null
+++ b/ledger/tracker_test.go
@@ -0,0 +1,125 @@
+// Copyright (C) 2019-2022 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package ledger
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/ledger/ledgercore"
+ ledgertesting "github.com/algorand/go-algorand/ledger/testing"
+ "github.com/algorand/go-algorand/logging"
+ "github.com/algorand/go-algorand/protocol"
+ "github.com/algorand/go-algorand/test/partitiontest"
+)
+
+// TestTrackerScheduleCommit checks catchpointTracker.produceCommittingTask does not increase commit offset relative
+// to the value set by accountUpdates
+func TestTrackerScheduleCommit(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ a := require.New(t)
+
+ var bufNewLogger bytes.Buffer
+ log := logging.NewLogger()
+ log.SetOutput(&bufNewLogger)
+
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(1, true)}
+ ml := makeMockLedgerForTrackerWithLogger(t, true, 10, protocol.ConsensusCurrentVersion, accts, log)
+ defer ml.Close()
+
+ conf := config.GetDefaultLocal()
+ conf.CatchpointTracking = 1
+ conf.CatchpointInterval = 10
+
+ au := &accountUpdates{}
+ ct := &catchpointTracker{}
+ au.initialize(conf)
+ ct.initialize(conf, ".")
+
+ _, err := trackerDBInitialize(ml, false, ".")
+ a.NoError(err)
+
+ ml.trackers.initialize(ml, []ledgerTracker{au, ct}, conf)
+ defer ml.trackers.close()
+ err = ml.trackers.loadFromDisk(ml)
+ a.NoError(err)
+ // close commitSyncer goroutine
+ ml.trackers.ctxCancel()
+ ml.trackers.ctxCancel = nil
+ <-ml.trackers.commitSyncerClosed
+ ml.trackers.commitSyncerClosed = nil
+
+ // simulate situation when au returns smaller offset b/c of consecutive versions
+ // and ct increses it
+ // base = 1, offset = 100, lookback = 16
+ // lastest = 1000
+ // would give a large mostRecentCatchpointRound value => large newBase => larger offset
+
+ expectedOffset := uint64(100)
+ blockqRound := basics.Round(1000)
+ lookback := basics.Round(16)
+ dbRound := basics.Round(1)
+
+ // prepare deltas and versions
+ au.accountsMu.Lock()
+ au.deltas = make([]ledgercore.AccountDeltas, int(blockqRound))
+ au.deltasAccum = make([]int, int(blockqRound))
+ au.versions = make([]protocol.ConsensusVersion, int(blockqRound))
+ for i := 0; i <= int(expectedOffset); i++ {
+ au.versions[i] = protocol.ConsensusCurrentVersion
+ }
+ for i := int(expectedOffset) + 1; i < len(au.versions); i++ {
+ au.versions[i] = protocol.ConsensusFuture
+ }
+ au.accountsMu.Unlock()
+
+ // ensure au and ct produce data we expect
+ dcc := &deferredCommitContext{
+ deferredCommitRange: deferredCommitRange{
+ lookback: lookback,
+ },
+ }
+ cdr := &dcc.deferredCommitRange
+
+ cdr = au.produceCommittingTask(blockqRound, dbRound, cdr)
+ a.NotNil(cdr)
+ a.Equal(expectedOffset, cdr.offset)
+
+ cdr = ct.produceCommittingTask(blockqRound, dbRound, cdr)
+ a.NotNil(cdr)
+ // before the fix
+ // expectedOffset = uint64(blockqRound - lookback - dbRound) // 983
+ a.Equal(expectedOffset, cdr.offset)
+
+ // schedule the commit. au is expected to return offset 100 and
+ ml.trackers.mu.Lock()
+ ml.trackers.dbRound = dbRound
+ ml.trackers.mu.Unlock()
+ ml.trackers.scheduleCommit(blockqRound, lookback)
+
+ a.Equal(1, len(ml.trackers.deferredCommits))
+ // before the fix
+ // a.Contains(bufNewLogger.String(), "tracker *ledger.catchpointTracker produced offset 983")
+ a.NotContains(bufNewLogger.String(), "tracker *ledger.catchpointTracker produced offset")
+ dc := <-ml.trackers.deferredCommits
+ a.Equal(expectedOffset, dc.offset)
+}