diff options
author | Bob Broderick <118225939+bbroder-algo@users.noreply.github.com> | 2023-05-23 15:34:02 -0400 |
---|---|---|
committer | Bob Broderick <118225939+bbroder-algo@users.noreply.github.com> | 2023-05-23 15:34:02 -0400 |
commit | 2981c1e2015bee380a5c83895fc287cb50313d1d (patch) | |
tree | 4e9954e3442e087b739c7a3746bcbec3684705f0 | |
parent | 297a4a183293e3e7faf18609a31eed92350fd8bc (diff) |
Revert "ledger: fix commit tasks enqueueing (#5214)"tracker_backout
This reverts commit f8a130ed53006473de823756383f76eeda3faade.
-rw-r--r-- | ledger/catchpointtracker_test.go | 222 | ||||
-rw-r--r-- | ledger/tracker.go | 32 | ||||
-rw-r--r-- | ledger/txtail_test.go | 2 |
3 files changed, 52 insertions, 204 deletions
diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index dcf6c68c6..3bf16ddbc 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -33,7 +33,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" @@ -48,7 +47,7 @@ import ( "github.com/algorand/go-algorand/test/partitiontest" ) -func TestCatchpointIsWritingCatchpointFile(t *testing.T) { +func TestIsWritingCatchpointFile(t *testing.T) { partitiontest.PartitionTest(t) ct := &catchpointTracker{} @@ -79,7 +78,7 @@ func newCatchpointTracker(tb testing.TB, l *mockLedgerForTracker, conf config.Lo return ct } -func TestCatchpointGetCatchpointStream(t *testing.T) { +func TestGetCatchpointStream(t *testing.T) { partitiontest.PartitionTest(t) accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} @@ -150,12 +149,12 @@ func TestCatchpointGetCatchpointStream(t *testing.T) { require.NoError(t, err) } -// TestCatchpointsDeleteStored - The goal of this test is to verify that the deleteStoredCatchpoints function works correctly. +// TestAcctUpdatesDeleteStoredCatchpoints - The goal of this test is to verify that the deleteStoredCatchpoints function works correctly. // It does so by filling up the storedcatchpoints with dummy catchpoint file entries, as well as creating these dummy files on disk. // ( the term dummy is only because these aren't real catchpoint files, but rather a zero-length file ). Then, the test calls the function // and ensures that it did not error, the catchpoint files were correctly deleted, and that deleteStoredCatchpoints contains no more // entries. -func TestCatchpointsDeleteStored(t *testing.T) { +func TestAcctUpdatesDeleteStoredCatchpoints(t *testing.T) { partitiontest.PartitionTest(t) accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} @@ -207,7 +206,7 @@ func TestCatchpointsDeleteStored(t *testing.T) { // The test validate that when algod boots up it cleans empty catchpoint directories. // It is done by creating empty directories in the catchpoint root directory. // When algod boots up it should remove those directories. -func TestCatchpointsDeleteStoredOnSchemaUpdate(t *testing.T) { +func TestSchemaUpdateDeleteStoredCatchpoints(t *testing.T) { partitiontest.PartitionTest(t) // we don't want to run this test before the binary is compiled against the latest database upgrade schema. @@ -468,7 +467,7 @@ func BenchmarkLargeCatchpointDataWriting(b *testing.B) { b.ReportMetric(float64(accountsNumber), "accounts") } -func TestCatchpointReproducibleLabels(t *testing.T) { +func TestReproducibleCatchpointLabels(t *testing.T) { partitiontest.PartitionTest(t) if runtime.GOARCH == "arm" || runtime.GOARCH == "arm64" { @@ -493,7 +492,6 @@ func TestCatchpointReproducibleLabels(t *testing.T) { defer ml.Close() cfg := config.GetDefaultLocal() - cfg.MaxAcctLookback = 2 cfg.CatchpointInterval = 50 cfg.CatchpointTracking = 1 ct := newCatchpointTracker(t, ml, cfg, ".") @@ -510,19 +508,10 @@ func TestCatchpointReproducibleLabels(t *testing.T) { catchpointLabels := make(map[basics.Round]string) ledgerHistory := make(map[basics.Round]*mockLedgerForTracker) roundDeltas := make(map[basics.Round]ledgercore.StateDelta) - - isCatchpointRound := func(rnd basics.Round) bool { - return (uint64(rnd) >= cfg.MaxAcctLookback) && - (uint64(rnd)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) && - ((uint64(rnd)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0) - } - isDataFileRound := func(rnd basics.Round) bool { - return ((uint64(rnd)-cfg.MaxAcctLookback+protoParams.CatchpointLookback)%cfg.CatchpointInterval == 0) - } - - i := basics.Round(0) numCatchpointsCreated := 0 + i := basics.Round(0) lastCatchpointLabel := "" + for numCatchpointsCreated < testCatchpointLabelsCount { i++ rewardLevelDelta := crypto.RandUint64() % 5 @@ -558,24 +547,16 @@ func TestCatchpointReproducibleLabels(t *testing.T) { delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables) delta.Totals = newTotals + ml.addBlock(blockEntry{block: blk}, delta) + accts = append(accts, newAccts) rewardsLevels = append(rewardsLevels, rewardLevel) roundDeltas[i] = delta - // determine if there is a data file round and commit - if isDataFileRound(i) || isCatchpointRound(i) { - ml.trackers.committedUpTo(i) - ml.trackers.waitAccountsWriting() - - // Let catchpoint data generation finish so that nothing gets skipped. - for ct.IsWritingCatchpointDataFile() { - time.Sleep(time.Millisecond) - } - } - // If we made a catchpoint, save the label. - if isCatchpointRound(i) { + if (uint64(i) >= cfg.MaxAcctLookback) && (uint64(i)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) && ((uint64(i)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0) { + ml.trackers.waitAccountsWriting() catchpointLabels[i] = ct.GetLastCatchpointLabel() require.NotEmpty(t, catchpointLabels[i]) require.NotEqual(t, lastCatchpointLabel, catchpointLabels[i]) @@ -584,19 +565,24 @@ func TestCatchpointReproducibleLabels(t *testing.T) { defer ledgerHistory[i].Close() numCatchpointsCreated++ } + + // Let catchpoint data generation finish so that nothing gets skipped. + for ct.IsWritingCatchpointDataFile() { + time.Sleep(time.Millisecond) + } } lastRound := i // Test in reverse what happens when we try to repeat the exact same blocks. // Start off with the catchpoint before the last one. - for rnd := lastRound - basics.Round(cfg.CatchpointInterval); uint64(rnd) > protoParams.CatchpointLookback; rnd -= basics.Round(cfg.CatchpointInterval) { + for startingRound := lastRound - basics.Round(cfg.CatchpointInterval); uint64(startingRound) > protoParams.CatchpointLookback; startingRound -= basics.Round(cfg.CatchpointInterval) { au.close() - ml2 := ledgerHistory[rnd] + ml2 := ledgerHistory[startingRound] require.NotNil(t, ml2) ct2 := newCatchpointTracker(t, ml2, cfg, ".") defer ct2.close() - for i := rnd + 1; i <= lastRound; i++ { + for i := startingRound + 1; i <= lastRound; i++ { blk := bookkeeping.Block{ BlockHeader: bookkeeping.BlockHeader{ Round: basics.Round(i), @@ -608,18 +594,16 @@ func TestCatchpointReproducibleLabels(t *testing.T) { ml2.addBlock(blockEntry{block: blk}, delta) - if isDataFileRound(i) || isCatchpointRound(i) { - ml2.trackers.committedUpTo(i) - ml2.trackers.waitAccountsWriting() - // Let catchpoint data generation finish so that nothing gets skipped. - for ct.IsWritingCatchpointDataFile() { - time.Sleep(time.Millisecond) - } - } // if this is a catchpoint round, check the label. - if isCatchpointRound(i) { + if (uint64(i) >= cfg.MaxAcctLookback) && (uint64(i)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) && ((uint64(i)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0) { + ml2.trackers.waitAccountsWriting() require.Equal(t, catchpointLabels[i], ct2.GetLastCatchpointLabel()) } + + // Let catchpoint data generation finish so that nothing gets skipped. + for ct.IsWritingCatchpointDataFile() { + time.Sleep(time.Millisecond) + } } } @@ -645,7 +629,6 @@ type blockingTracker struct { committedUpToRound int64 alwaysLock bool shouldLockPostCommit bool - shouldLockPostCommitUnlocked bool } // loadFromDisk is not implemented in the blockingTracker. @@ -688,7 +671,7 @@ func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitCo // postCommitUnlocked implements entry/exit blockers, designed for testing. func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { - if bt.alwaysLock || dcc.catchpointFirstStage || bt.shouldLockPostCommitUnlocked { + if bt.alwaysLock || dcc.catchpointFirstStage { bt.postCommitUnlockedEntryLock <- struct{}{} <-bt.postCommitUnlockedReleaseLock } @@ -705,7 +688,7 @@ func (bt *blockingTracker) close() { func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) { partitiontest.PartitionTest(t) - testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestCatchpointTrackerNonblockingCatchpointWriting") + testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestReproducibleCatchpointLabels") protoParams := config.Consensus[protocol.ConsensusCurrentVersion] protoParams.EnableCatchpointsWithSPContexts = true protoParams.CatchpointLookback = protoParams.MaxBalLookback @@ -848,94 +831,6 @@ func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) { } } -// TestCatchpointTrackerWaitNotBlocking checks a tracker with long postCommitUnlocked does not block blockq (notifyCommit) goroutine -func TestCatchpointTrackerWaitNotBlocking(t *testing.T) { - partitiontest.PartitionTest(t) - - genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 10) - const inMem = true - log := logging.TestingLog(t) - log.SetLevel(logging.Warn) - cfg := config.GetDefaultLocal() - cfg.Archival = true - ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) - require.NoError(t, err) - defer ledger.Close() - - writeStallingTracker := &blockingTracker{ - postCommitUnlockedEntryLock: make(chan struct{}), - postCommitUnlockedReleaseLock: make(chan struct{}), - shouldLockPostCommitUnlocked: true, - } - ledger.trackerMu.Lock() - ledger.trackers.mu.Lock() - ledger.trackers.trackers = append(ledger.trackers.trackers, writeStallingTracker) - ledger.trackers.mu.Unlock() - ledger.trackerMu.Unlock() - - startRound := ledger.Latest() + 1 - endRound := basics.Round(20) - addBlockDone := make(chan struct{}) - - // release the blocking tracker when the test is done - defer func() { - // unblocking from another goroutine is a bit complicated: - // this function should not quit until postCommitUnlockedReleaseLock is consumed - // to do that, write to it first and do not exit until consumed, - // otherwise we might exit and leave the tracker registry's syncer goroutine blocked - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{} - wg.Done() - }() - - // consume to unblock - <-writeStallingTracker.postCommitUnlockedEntryLock - // disable further blocking - writeStallingTracker.shouldLockPostCommitUnlocked = false - - // wait the writeStallingTracker.postCommitUnlockedReleaseLock passes - wg.Wait() - - // at the end, what while the addBlock goroutine finishes - // consume to unblock - <-addBlockDone - }() - - // tracker commits are now blocked, add some blocks - timer := time.NewTimer(1 * time.Second) - go func() { - defer close(addBlockDone) - blk := genesisInitState.Block - for rnd := startRound; rnd <= endRound; rnd++ { - blk.BlockHeader.Round = rnd - blk.BlockHeader.TimeStamp = int64(blk.BlockHeader.Round) - err := ledger.AddBlock(blk, agreement.Certificate{}) - require.NoError(t, err) - } - }() - - select { - case <-timer.C: - require.FailNow(t, "timeout") - case <-addBlockDone: - } - - // switch context one more time to give the blockqueue syncer to run - time.Sleep(1 * time.Millisecond) - - // ensure Ledger.Wait() is non-blocked for all rounds except the last one (due to possible races) - for rnd := startRound; rnd < endRound; rnd++ { - done := ledger.Wait(rnd) - select { - case <-done: - default: - require.FailNow(t, fmt.Sprintf("Wait(%d) is blocked", rnd)) - } - } -} - func TestCalculateFirstStageRounds(t *testing.T) { partitiontest.PartitionTest(t) @@ -1023,7 +918,7 @@ func TestCalculateCatchpointRounds(t *testing.T) { // Test that pruning first stage catchpoint database records and catchpoint data files // works. -func TestCatchpointFirstStageInfoPruning(t *testing.T) { +func TestFirstStageInfoPruning(t *testing.T) { partitiontest.PartitionTest(t) // create new protocol version, which has lower lookback @@ -1057,15 +952,6 @@ func TestCatchpointFirstStageInfoPruning(t *testing.T) { expectedNumEntries := protoParams.CatchpointLookback / cfg.CatchpointInterval - isCatchpointRound := func(rnd basics.Round) bool { - return (uint64(rnd) >= cfg.MaxAcctLookback) && - (uint64(rnd)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) && - ((uint64(rnd)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0) - } - isDataFileRound := func(rnd basics.Round) bool { - return ((uint64(rnd)-cfg.MaxAcctLookback+protoParams.CatchpointLookback)%cfg.CatchpointInterval == 0) - } - numCatchpointsCreated := uint64(0) i := basics.Round(0) lastCatchpointLabel := "" @@ -1085,21 +971,18 @@ func TestCatchpointFirstStageInfoPruning(t *testing.T) { ml.addBlock(blockEntry{block: blk}, delta) - if isDataFileRound(i) || isCatchpointRound(i) { - ml.trackers.committedUpTo(i) + if (uint64(i) >= cfg.MaxAcctLookback) && (uint64(i)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) && ((uint64(i)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0) { ml.trackers.waitAccountsWriting() - // Let catchpoint data generation finish so that nothing gets skipped. - for ct.IsWritingCatchpointDataFile() { - time.Sleep(time.Millisecond) - } - } - - if isCatchpointRound(i) { catchpointLabel := ct.GetLastCatchpointLabel() require.NotEqual(t, lastCatchpointLabel, catchpointLabel) lastCatchpointLabel = catchpointLabel numCatchpointsCreated++ } + + // Let catchpoint data generation finish so that nothing gets skipped. + for ct.IsWritingCatchpointDataFile() { + time.Sleep(time.Millisecond) + } } numEntries := uint64(0) @@ -1127,7 +1010,7 @@ func TestCatchpointFirstStageInfoPruning(t *testing.T) { // Test that on startup the catchpoint tracker restarts catchpoint's first stage if // there is an unfinished first stage record in the database. -func TestCatchpointFirstStagePersistence(t *testing.T) { +func TestFirstStagePersistence(t *testing.T) { partitiontest.PartitionTest(t) // create new protocol version, which has lower lookback @@ -1172,7 +1055,7 @@ func TestCatchpointFirstStagePersistence(t *testing.T) { ml.addBlock(blockEntry{block: blk}, delta) } - ml.trackers.committedUpTo(firstStageRound) + ml.trackers.waitAccountsWriting() // Check that the data file exists. @@ -1228,7 +1111,7 @@ func TestCatchpointFirstStagePersistence(t *testing.T) { // Test that on startup the catchpoint tracker restarts catchpoint's second stage if // there is an unfinished catchpoint record in the database. -func TestCatchpointSecondStagePersistence(t *testing.T) { +func TestSecondStagePersistence(t *testing.T) { partitiontest.PartitionTest(t) // create new protocol version, which has lower lookback @@ -1258,15 +1141,6 @@ func TestCatchpointSecondStagePersistence(t *testing.T) { t, ml, cfg, filepath.Join(tempDirectory, config.LedgerFilenamePrefix)) defer ct.close() - isCatchpointRound := func(rnd basics.Round) bool { - return (uint64(rnd) >= cfg.MaxAcctLookback) && - (uint64(rnd)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) && - ((uint64(rnd)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0) - } - isDataFileRound := func(rnd basics.Round) bool { - return ((uint64(rnd)-cfg.MaxAcctLookback+protoParams.CatchpointLookback)%cfg.CatchpointInterval == 0) - } - secondStageRound := basics.Round(36) firstStageRound := secondStageRound - basics.Round(protoParams.CatchpointLookback) catchpointDataFilePath := @@ -1300,16 +1174,14 @@ func TestCatchpointSecondStagePersistence(t *testing.T) { ml.addBlock(blockEntry{block: blk}, delta) - if isDataFileRound(i) || isCatchpointRound(i) { - ml.trackers.committedUpTo(i) - ml.trackers.waitAccountsWriting() - // Let catchpoint data generation finish so that nothing gets skipped. - for ct.IsWritingCatchpointDataFile() { - time.Sleep(time.Millisecond) - } + // Let catchpoint data generation finish so that nothing gets skipped. + for ct.IsWritingCatchpointDataFile() { + time.Sleep(time.Millisecond) } } + ml.trackers.waitAccountsWriting() + // Check that the data file exists. catchpointFilePath := filepath.Join(catchpointsDirectory, trackerdb.MakeCatchpointFilePath(secondStageRound)) @@ -1374,7 +1246,7 @@ func TestCatchpointSecondStagePersistence(t *testing.T) { // Test that when catchpoint's first stage record is unavailable // (e.g. catchpoints were disabled at first stage), the unfinished catchpoint // database record is deleted. -func TestCatchpointSecondStageDeletesUnfinishedCatchpointRecord(t *testing.T) { +func TestSecondStageDeletesUnfinishedCatchpointRecord(t *testing.T) { partitiontest.PartitionTest(t) // create new protocol version, which has lower lookback @@ -1463,7 +1335,7 @@ func TestCatchpointSecondStageDeletesUnfinishedCatchpointRecord(t *testing.T) { // Test that on startup the catchpoint tracker deletes the unfinished catchpoint // database record when the first stage database record is missing. -func TestCatchpointSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *testing.T) { +func TestSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *testing.T) { partitiontest.PartitionTest(t) // create new protocol version, which has lower lookback @@ -1767,8 +1639,8 @@ func TestCatchpointFastUpdates(t *testing.T) { ml.trackers.committedUpTo(round) }(i) } - wg.Wait() ml.trackers.waitAccountsWriting() + wg.Wait() require.NotEmpty(t, ct.GetLastCatchpointLabel()) } diff --git a/ledger/tracker.go b/ledger/tracker.go index 6eab2e1d0..58fde5d8f 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -411,40 +411,14 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round) // ( unless we're creating a catchpoint, in which case we want to flush it right away // so that all the instances of the catchpoint would contain exactly the same data ) flushTime := time.Now() - - // Some tracker want to flush - if dcc != nil { - // skip this flush if none of these conditions met: - // - has it been at least balancesFlushInterval since the last flush? - flushIntervalPassed := flushTime.After(tr.lastFlushTime.Add(balancesFlushInterval)) - // - does this commit task also include catchpoint file creation activity for the dcc.oldBase+dcc.offset? - flushForCatchpoint := dcc.catchpointFirstStage || dcc.catchpointSecondStage - // - have more than pendingDeltasFlushThreshold accounts been modified since the last flush? - flushAccounts := dcc.pendingDeltas >= pendingDeltasFlushThreshold - if !(flushIntervalPassed || flushForCatchpoint || flushAccounts) { - dcc = nil - } + if dcc != nil && !flushTime.After(tr.lastFlushTime.Add(balancesFlushInterval)) && !dcc.catchpointFirstStage && !dcc.catchpointSecondStage && dcc.pendingDeltas < pendingDeltasFlushThreshold { + dcc = nil } tr.mu.RUnlock() if dcc != nil { - // Increment the waitgroup first, otherwise this goroutine can be interrupted - // and commitSyncer attempts calling Done() on empty wait group. tr.accountsWriting.Add(1) - select { - case tr.deferredCommits <- dcc: - default: - // Do NOT block if deferredCommits cannot accept this task, skip it. - // Note: the next attempt will include these rounds plus some extra rounds. - // The main reason for slow commits is catchpoint file creation (when commitSyncer calls - // commitRound, which calls postCommitUnlocked). This producer thread is called by - // blockQueue.syncer() upon successful block DB flush, which calls ledger.notifyCommit() - // and trackerRegistry.committedUpTo() after taking the trackerMu.Lock(). - // This means a blocking write to deferredCommits will block Ledger reads (TODO use more fine-grained locks). - // Dropping this dcc allows the blockqueue syncer to continue persisting other blocks - // and ledger reads to proceed without being blocked by trackerMu lock. - tr.accountsWriting.Done() - } + tr.deferredCommits <- dcc } } diff --git a/ledger/txtail_test.go b/ledger/txtail_test.go index a21af5120..ba4a09755 100644 --- a/ledger/txtail_test.go +++ b/ledger/txtail_test.go @@ -19,6 +19,7 @@ package ledger import ( "context" "errors" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -258,6 +259,7 @@ func TestTxTailDeltaTracking(t *testing.T) { err := txtail.loadFromDisk(&ledger, ledger.Latest()) require.NoError(t, err) + fmt.Printf("%d, %s\n", len(txtail.recent), protoVersion) require.Equal(t, int(config.Consensus[protoVersion].MaxTxnLife), len(txtail.recent)) require.Equal(t, testTxTailValidityRange, len(txtail.lastValid)) require.Equal(t, ledger.Latest(), txtail.lowWaterMark) |