summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchris erway <51567+cce@users.noreply.github.com>2022-01-06 15:09:38 -0500
committerGitHub <noreply@github.com>2022-01-06 15:09:38 -0500
commit8f62affa0ba1589fd3e874cf275771e5cab1c42c (patch)
treeb130b346ca464ad591d3463a9fdc0fdfe8171e85
parent35cf3b32c7c306abccbdfa149e86e0d0f0437113 (diff)
Add test to exercise lookup corner cases (#3376)
## Summary This test attempts to cover the case when an accountUpdates.lookupX method can't find the requested address, falls through looking at deltas and the LRU accounts cache, then hits the database — only to discover that the round stored in the database (committed in `accountUpdates.commitRound`) is out of sync with `accountUpdates.cachedDBRound` (updated a little bit later in `accountUpdates.postCommit`). In this case, the lookup method waits and tries again, iterating the `for { }` it is in. We did not have coverage for this code path before. ## Test Plan Adds new test.
-rw-r--r--ledger/acctupdates_test.go149
-rw-r--r--ledger/catchpointtracker_test.go5
2 files changed, 152 insertions, 2 deletions
diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go
index 439906a48..5f014985c 100644
--- a/ledger/acctupdates_test.go
+++ b/ledger/acctupdates_test.go
@@ -1700,3 +1700,152 @@ func TestConsecutiveVersion(t *testing.T) {
protocol.ConsensusV21,
}
}
+
+// This test attempts to cover the case when an accountUpdates.lookupX method:
+// - can't find the requested address,
+// - falls through looking at deltas and the LRU accounts cache,
+// - then hits the database (calling accountsDbQueries.lookup)
+// only to discover that the round stored in the database (committed in accountUpdates.commitRound)
+// is out of sync with accountUpdates.cachedDBRound (updated a little bit later in accountUpdates.postCommit).
+//
+// In this case it waits on a condition variable and retries when
+// commitSyncer/accountUpdates has advanced the cachedDBRound.
+func TestAcctUpdatesLookupRetry(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestAcctUpdatesLookupRetry")
+ proto := config.Consensus[protocol.ConsensusCurrentVersion]
+ proto.MaxBalLookback = 10
+ config.Consensus[testProtocolVersion] = proto
+ defer func() {
+ delete(config.Consensus, testProtocolVersion)
+ }()
+
+ accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
+ rewardsLevels := []uint64{0}
+
+ pooldata := basics.AccountData{}
+ pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
+ pooldata.Status = basics.NotParticipating
+ accts[0][testPoolAddr] = pooldata
+
+ sinkdata := basics.AccountData{}
+ sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
+ sinkdata.Status = basics.NotParticipating
+ accts[0][testSinkAddr] = sinkdata
+
+ ml := makeMockLedgerForTracker(t, false, 10, testProtocolVersion, accts)
+ defer ml.Close()
+
+ conf := config.GetDefaultLocal()
+ au := newAcctUpdates(t, ml, conf, ".")
+ defer au.close()
+
+ // cover 10 genesis blocks
+ rewardLevel := uint64(0)
+ for i := 1; i < 10; i++ {
+ accts = append(accts, accts[0])
+ rewardsLevels = append(rewardsLevels, rewardLevel)
+ }
+
+ checkAcctUpdates(t, au, 0, 9, accts, rewardsLevels, proto)
+
+ // lastCreatableID stores asset or app max used index to get rid of conflicts
+ lastCreatableID := crypto.RandUint64() % 512
+ knownCreatables := make(map[basics.CreatableIndex]bool)
+
+ for i := basics.Round(10); i < basics.Round(proto.MaxBalLookback+15); i++ {
+ rewardLevelDelta := crypto.RandUint64() % 5
+ rewardLevel += rewardLevelDelta
+ var updates ledgercore.AccountDeltas
+ var totals map[basics.Address]basics.AccountData
+ base := accts[i-1]
+ updates, totals, lastCreatableID = ledgertesting.RandomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID)
+ prevTotals, err := au.Totals(basics.Round(i - 1))
+ require.NoError(t, err)
+
+ newPool := totals[testPoolAddr]
+ newPool.MicroAlgos.Raw -= prevTotals.RewardUnits() * rewardLevelDelta
+ updates.Upsert(testPoolAddr, newPool)
+ totals[testPoolAddr] = newPool
+
+ blk := bookkeeping.Block{
+ BlockHeader: bookkeeping.BlockHeader{
+ Round: basics.Round(i),
+ },
+ }
+ blk.RewardsLevel = rewardLevel
+ blk.CurrentProtocol = testProtocolVersion
+
+ delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0)
+ delta.Accts.MergeAccounts(updates)
+ delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables)
+ delta.Totals = accumulateTotals(t, testProtocolVersion, []map[basics.Address]basics.AccountData{totals}, rewardLevel)
+ au.newBlock(blk, delta)
+ accts = append(accts, totals)
+ rewardsLevels = append(rewardsLevels, rewardLevel)
+
+ checkAcctUpdates(t, au, 0, i, accts, rewardsLevels, proto)
+ }
+
+ flushRound := func(i basics.Round) {
+ // Clear the timer to ensure a flush
+ ml.trackers.lastFlushTime = time.Time{}
+
+ ml.trackers.committedUpTo(basics.Round(proto.MaxBalLookback) + i)
+ ml.trackers.waitAccountsWriting()
+ }
+
+ // flush a couple of rounds (indirectly schedules commitSyncer)
+ flushRound(basics.Round(0))
+ flushRound(basics.Round(1))
+
+ // add stallingTracker to list of trackers
+ stallingTracker := &blockingTracker{
+ postCommitUnlockedEntryLock: make(chan struct{}),
+ postCommitUnlockedReleaseLock: make(chan struct{}),
+ postCommitEntryLock: make(chan struct{}),
+ postCommitReleaseLock: make(chan struct{}),
+ alwaysLock: true,
+ }
+ ml.trackers.trackers = append([]ledgerTracker{stallingTracker}, ml.trackers.trackers...)
+
+ // kick off another round
+ go flushRound(basics.Round(2))
+
+ // let stallingTracker enter postCommit() and block (waiting on postCommitReleaseLock)
+ // this will prevent accountUpdates.postCommit() from updating au.cachedDBRound = newBase
+ <-stallingTracker.postCommitEntryLock
+
+ // prune the baseAccounts cache, so that lookup will fall through to the DB
+ au.accountsMu.Lock()
+ au.baseAccounts.prune(0)
+ au.accountsMu.Unlock()
+
+ rnd := basics.Round(2)
+
+ // grab any address and data to use for call to lookup
+ var addr basics.Address
+ var data basics.AccountData
+ for a, d := range accts[rnd] {
+ addr = a
+ data = d
+ break
+ }
+
+ // release the postCommit lock, once au.lookupWithoutRewards hits au.accountsReadCond.Wait()
+ go func() {
+ time.Sleep(200 * time.Millisecond)
+ stallingTracker.postCommitReleaseLock <- struct{}{}
+ }()
+
+ // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound
+ d, validThrough, err := au.LookupWithoutRewards(rnd, addr)
+ require.NoError(t, err)
+ require.Equal(t, d, data)
+ require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd)
+
+ // allow the postCommitUnlocked() handler to go through
+ <-stallingTracker.postCommitUnlockedEntryLock
+ stallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
+}
diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go
index 983afed1d..b5778691e 100644
--- a/ledger/catchpointtracker_test.go
+++ b/ledger/catchpointtracker_test.go
@@ -427,6 +427,7 @@ type blockingTracker struct {
postCommitEntryLock chan struct{}
postCommitReleaseLock chan struct{}
committedUpToRound int64
+ alwaysLock bool
}
// loadFromDisk is not implemented in the blockingTracker.
@@ -461,7 +462,7 @@ func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommit
// postCommit implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
- if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
+ if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") {
bt.postCommitEntryLock <- struct{}{}
<-bt.postCommitReleaseLock
}
@@ -469,7 +470,7 @@ func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitCo
// postCommitUnlocked implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
- if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
+ if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") {
bt.postCommitUnlockedEntryLock <- struct{}{}
<-bt.postCommitUnlockedReleaseLock
}