summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com>2023-12-22 13:58:22 -0500
committerGitHub <noreply@github.com>2023-12-22 13:58:22 -0500
commit3a80a406f7b94161ce031bec00b5a82bdf9becba (patch)
treef179c5dd673908b14dd7b72b7167dbd6364b818b
parent55cbb7f6d93acbe5040e4530aaabb87aec729f13 (diff)
tests: rewrite triggerTrackerFlush test helper (#5876)
-rw-r--r--ledger/catchpointtracker.go15
-rw-r--r--ledger/ledger_test.go92
2 files changed, 55 insertions, 52 deletions
diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go
index 32216fcb2..7745e33f5 100644
--- a/ledger/catchpointtracker.go
+++ b/ledger/catchpointtracker.go
@@ -151,8 +151,8 @@ type catchpointTracker struct {
// catchpoint files even before the protocol upgrade took place.
forceCatchpointFileWriting bool
- // catchpointsMu protects roundDigest, reenableCatchpointsRound, cachedDBRound and
- // `lastCatchpointLabel`.
+ // catchpointsMu protects roundDigest, reenableCatchpointsRound, cachedDBRound,
+ // lastCatchpointLabel and balancesTrie.
catchpointsMu deadlock.RWMutex
// cachedDBRound is always exactly tracker DB round (and therefore, accountsRound()),
@@ -555,16 +555,19 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx trackerdb.Trans
}
var trie *merkletrie.Trie
+ ct.catchpointsMu.Lock()
if ct.balancesTrie == nil {
trie, err = merkletrie.MakeTrie(mc, trackerdb.TrieMemoryConfig)
if err != nil {
ct.log.Warnf("unable to create merkle trie during committedUpTo: %v", err)
+ ct.catchpointsMu.Unlock()
return err
}
ct.balancesTrie = trie
} else {
ct.balancesTrie.SetCommitter(mc)
}
+ ct.catchpointsMu.Unlock()
treeTargetRound = dbRound + basics.Round(offset)
}
@@ -610,6 +613,7 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx trackerdb.Trans
}
func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
+ ct.catchpointsMu.Lock()
if ct.balancesTrie != nil {
_, err := ct.balancesTrie.Evict(false)
if err != nil {
@@ -617,7 +621,6 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit
}
}
- ct.catchpointsMu.Lock()
ct.roundDigest = ct.roundDigest[dcc.offset:]
ct.consensusVersion = ct.consensusVersion[dcc.offset:]
ct.cachedDBRound = dcc.newBase()
@@ -986,7 +989,9 @@ func (ct *catchpointTracker) handleCommitError(dcc *deferredCommitContext) {
// Specifically, modifications to the trie happen through accountsUpdateBalances,
// which happens before commit to disk. Errors in this tracker, subsequent trackers, or the commit to disk may cause the trie cache to be incorrect,
// affecting the perceived root on subsequent rounds
+ ct.catchpointsMu.Lock()
ct.balancesTrie = nil
+ ct.catchpointsMu.Unlock()
ct.cancelWrite(dcc)
}
@@ -1276,9 +1281,11 @@ func (ct *catchpointTracker) recordFirstStageInfo(ctx context.Context, tx tracke
if err != nil {
return err
}
+ ct.catchpointsMu.Lock()
if ct.balancesTrie == nil {
trie, trieErr := merkletrie.MakeTrie(mc, trackerdb.TrieMemoryConfig)
if trieErr != nil {
+ ct.catchpointsMu.Unlock()
return trieErr
}
ct.balancesTrie = trie
@@ -1288,8 +1295,10 @@ func (ct *catchpointTracker) recordFirstStageInfo(ctx context.Context, tx tracke
trieBalancesHash, err := ct.balancesTrie.RootHash()
if err != nil {
+ ct.catchpointsMu.Unlock()
return err
}
+ ct.catchpointsMu.Unlock()
cw, err := tx.MakeCatchpointWriter()
if err != nil {
diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go
index 19bb6a079..172643d48 100644
--- a/ledger/ledger_test.go
+++ b/ledger/ledger_test.go
@@ -27,7 +27,6 @@ import (
"runtime"
"sort"
"testing"
- "time"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
@@ -1443,30 +1442,43 @@ func benchLedgerCache(b *testing.B, startRound basics.Round) {
}
}
-func triggerTrackerFlush(t *testing.T, l *Ledger, genesisInitState ledgercore.InitState) {
+// triggerTrackerFlush is based in the commit flow but executed it in a single (this) goroutine.
+func triggerTrackerFlush(t *testing.T, l *Ledger) {
l.trackers.mu.Lock()
- initialDbRound := l.trackers.dbRound
- currentDbRound := initialDbRound
- l.trackers.lastFlushTime = time.Time{}
+ dbRound := l.trackers.dbRound
l.trackers.mu.Unlock()
- const timeout = 3 * time.Second
- started := time.Now()
+ rnd := l.Latest()
+ minBlock := rnd
+ maxLookback := basics.Round(0)
+ for _, lt := range l.trackers.trackers {
+ retainRound, lookback := lt.committedUpTo(rnd)
+ if retainRound < minBlock {
+ minBlock = retainRound
+ }
+ if lookback > maxLookback {
+ maxLookback = lookback
+ }
+ }
- // We can't truly wait for scheduleCommit to take place, which means without waiting using sleeps
- // we might beat scheduleCommit's addition to accountsWriting, making our wait on it continue immediately.
- // The solution is to continue to add blocks and wait for the advancement of l.trackers.dbRound,
- // which is a side effect of postCommit's success.
- for currentDbRound == initialDbRound {
- time.Sleep(50 * time.Microsecond)
- require.True(t, time.Since(started) < timeout)
- addEmptyValidatedBlock(t, l, genesisInitState.Accounts)
- l.WaitForCommit(l.Latest())
- l.trackers.mu.RLock()
- currentDbRound = l.trackers.dbRound
- l.trackers.mu.RUnlock()
+ dcc := &deferredCommitContext{
+ deferredCommitRange: deferredCommitRange{
+ lookback: maxLookback,
+ },
+ }
+
+ l.trackers.mu.RLock()
+ cdr := l.trackers.produceCommittingTask(rnd, dbRound, &dcc.deferredCommitRange)
+ if cdr != nil {
+ dcc.deferredCommitRange = *cdr
+ } else {
+ dcc = nil
+ }
+ l.trackers.mu.RUnlock()
+ if dcc != nil {
+ l.trackers.accountsWriting.Add(1)
+ l.trackers.commitRound(dcc)
}
- l.trackers.waitAccountsWriting()
}
func testLedgerReload(t *testing.T, cfg config.Local) {
@@ -1646,7 +1658,7 @@ func TestLedgerVerifiesOldStateProofs(t *testing.T) {
backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
defer backlogPool.Shutdown()
- triggerTrackerFlush(t, l, genesisInitState)
+ triggerTrackerFlush(t, l)
l.WaitForCommit(l.Latest())
blk := createBlkWithStateproof(t, maxBlocks, proto, genesisInitState, l, accounts)
_, err = l.Validate(context.Background(), blk, backlogPool)
@@ -1656,7 +1668,7 @@ func TestLedgerVerifiesOldStateProofs(t *testing.T) {
addDummyBlock(t, addresses, proto, l, initKeys, genesisInitState)
}
- triggerTrackerFlush(t, l, genesisInitState)
+ triggerTrackerFlush(t, l)
addDummyBlock(t, addresses, proto, l, initKeys, genesisInitState)
l.WaitForCommit(l.Latest())
// At this point the block queue go-routine will start removing block . However, it might not complete the task
@@ -2767,11 +2779,11 @@ func verifyVotersContent(t *testing.T, expected map[basics.Round]*ledgercore.Vot
func triggerDeleteVoters(t *testing.T, l *Ledger, genesisInitState ledgercore.InitState) {
// We make the ledger flush tracker data to allow votersTracker to advance lowestRound
- triggerTrackerFlush(t, l, genesisInitState)
+ triggerTrackerFlush(t, l)
// We add another block to make the block queue query the voter's tracker lowest round again, which allows it to forget
// rounds based on the new lowest round.
- triggerTrackerFlush(t, l, genesisInitState)
+ triggerTrackerFlush(t, l)
}
func testVotersReloadFromDisk(t *testing.T, cfg config.Local) {
@@ -2796,7 +2808,7 @@ func testVotersReloadFromDisk(t *testing.T, cfg config.Local) {
// at this point the database should contain the voter for round 256 but the voters for round 512 should be in deltas
l.WaitForCommit(l.Latest())
- triggerTrackerFlush(t, l, genesisInitState)
+ triggerTrackerFlush(t, l)
vtSnapshot := l.acctsOnline.voters.votersForRoundCache
// ensuring no tree was evicted.
@@ -3028,7 +3040,7 @@ func TestLedgerSPVerificationTracker(t *testing.T) {
}
l.WaitForCommit(l.Latest())
- triggerTrackerFlush(t, l, genesisInitState)
+ triggerTrackerFlush(t, l)
verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound),
numOfStateProofs-1, proto.StateProofInterval, true, trackerDB)
@@ -3037,7 +3049,7 @@ func TestLedgerSPVerificationTracker(t *testing.T) {
1, proto.StateProofInterval, true, trackerMemory)
l.WaitForCommit(l.Latest())
- triggerTrackerFlush(t, l, genesisInitState)
+ triggerTrackerFlush(t, l)
verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound),
numOfStateProofs, proto.StateProofInterval, true, spverDBLoc)
@@ -3063,7 +3075,7 @@ func TestLedgerSPVerificationTracker(t *testing.T) {
}
l.WaitForCommit(blk.BlockHeader.Round)
- triggerTrackerFlush(t, l, genesisInitState)
+ triggerTrackerFlush(t, l)
verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound),
1, proto.StateProofInterval, false, spverDBLoc)
@@ -3101,16 +3113,7 @@ func TestLedgerReloadStateProofVerificationTracker(t *testing.T) {
// trigger trackers flush
// first ensure the block is committed into blockdb
l.WaitForCommit(l.Latest())
- // wait for any pending tracker flushes
- l.trackers.waitAccountsWriting()
- // force flush as needed
- if l.LatestTrackerCommitted() < l.Latest()+basics.Round(cfg.MaxAcctLookback) {
- l.trackers.mu.Lock()
- l.trackers.lastFlushTime = time.Time{}
- l.trackers.mu.Unlock()
- l.notifyCommit(l.Latest())
- l.trackers.waitAccountsWriting()
- }
+ triggerTrackerFlush(t, l)
verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound),
numOfStateProofs-1, proto.StateProofInterval, true, trackerDB)
@@ -3167,7 +3170,7 @@ func TestLedgerCatchpointSPVerificationTracker(t *testing.T) {
// Feeding blocks until we can know for sure we have at least one catchpoint written.
blk = feedBlocksUntilRound(t, l, blk, basics.Round(cfg.CatchpointInterval*2))
l.WaitForCommit(basics.Round(cfg.CatchpointInterval * 2))
- triggerTrackerFlush(t, l, genesisInitState)
+ triggerTrackerFlush(t, l)
numTrackedDataFirstCatchpoint := (cfg.CatchpointInterval - proto.MaxBalLookback) / proto.StateProofInterval
@@ -3244,16 +3247,7 @@ func TestLedgerSPTrackerAfterReplay(t *testing.T) {
// first ensure the block is committed into blockdb
l.WaitForCommit(l.Latest())
- // wait for any pending tracker flushes
- l.trackers.waitAccountsWriting()
- // force flush as needed
- if l.LatestTrackerCommitted() < l.Latest()+basics.Round(cfg.MaxAcctLookback) {
- l.trackers.mu.Lock()
- l.trackers.lastFlushTime = time.Time{}
- l.trackers.mu.Unlock()
- l.notifyCommit(spblk.BlockHeader.Round)
- l.trackers.waitAccountsWriting()
- }
+ triggerTrackerFlush(t, l)
err = l.reloadLedger()
a.NoError(err)