summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Broderick <118225939+bbroder-algo@users.noreply.github.com>2023-05-23 15:34:02 -0400
committerBob Broderick <118225939+bbroder-algo@users.noreply.github.com>2023-05-23 15:34:02 -0400
commit2981c1e2015bee380a5c83895fc287cb50313d1d (patch)
tree4e9954e3442e087b739c7a3746bcbec3684705f0
parent297a4a183293e3e7faf18609a31eed92350fd8bc (diff)
Revert "ledger: fix commit tasks enqueueing (#5214)"tracker_backout
This reverts commit f8a130ed53006473de823756383f76eeda3faade.
-rw-r--r--ledger/catchpointtracker_test.go222
-rw-r--r--ledger/tracker.go32
-rw-r--r--ledger/txtail_test.go2
3 files changed, 52 insertions, 204 deletions
diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go
index dcf6c68c6..3bf16ddbc 100644
--- a/ledger/catchpointtracker_test.go
+++ b/ledger/catchpointtracker_test.go
@@ -33,7 +33,6 @@ import (
"github.com/stretchr/testify/require"
- "github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
@@ -48,7 +47,7 @@ import (
"github.com/algorand/go-algorand/test/partitiontest"
)
-func TestCatchpointIsWritingCatchpointFile(t *testing.T) {
+func TestIsWritingCatchpointFile(t *testing.T) {
partitiontest.PartitionTest(t)
ct := &catchpointTracker{}
@@ -79,7 +78,7 @@ func newCatchpointTracker(tb testing.TB, l *mockLedgerForTracker, conf config.Lo
return ct
}
-func TestCatchpointGetCatchpointStream(t *testing.T) {
+func TestGetCatchpointStream(t *testing.T) {
partitiontest.PartitionTest(t)
accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
@@ -150,12 +149,12 @@ func TestCatchpointGetCatchpointStream(t *testing.T) {
require.NoError(t, err)
}
-// TestCatchpointsDeleteStored - The goal of this test is to verify that the deleteStoredCatchpoints function works correctly.
+// TestAcctUpdatesDeleteStoredCatchpoints - The goal of this test is to verify that the deleteStoredCatchpoints function works correctly.
// It does so by filling up the storedcatchpoints with dummy catchpoint file entries, as well as creating these dummy files on disk.
// ( the term dummy is only because these aren't real catchpoint files, but rather a zero-length file ). Then, the test calls the function
// and ensures that it did not error, the catchpoint files were correctly deleted, and that deleteStoredCatchpoints contains no more
// entries.
-func TestCatchpointsDeleteStored(t *testing.T) {
+func TestAcctUpdatesDeleteStoredCatchpoints(t *testing.T) {
partitiontest.PartitionTest(t)
accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
@@ -207,7 +206,7 @@ func TestCatchpointsDeleteStored(t *testing.T) {
// The test validate that when algod boots up it cleans empty catchpoint directories.
// It is done by creating empty directories in the catchpoint root directory.
// When algod boots up it should remove those directories.
-func TestCatchpointsDeleteStoredOnSchemaUpdate(t *testing.T) {
+func TestSchemaUpdateDeleteStoredCatchpoints(t *testing.T) {
partitiontest.PartitionTest(t)
// we don't want to run this test before the binary is compiled against the latest database upgrade schema.
@@ -468,7 +467,7 @@ func BenchmarkLargeCatchpointDataWriting(b *testing.B) {
b.ReportMetric(float64(accountsNumber), "accounts")
}
-func TestCatchpointReproducibleLabels(t *testing.T) {
+func TestReproducibleCatchpointLabels(t *testing.T) {
partitiontest.PartitionTest(t)
if runtime.GOARCH == "arm" || runtime.GOARCH == "arm64" {
@@ -493,7 +492,6 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
defer ml.Close()
cfg := config.GetDefaultLocal()
- cfg.MaxAcctLookback = 2
cfg.CatchpointInterval = 50
cfg.CatchpointTracking = 1
ct := newCatchpointTracker(t, ml, cfg, ".")
@@ -510,19 +508,10 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
catchpointLabels := make(map[basics.Round]string)
ledgerHistory := make(map[basics.Round]*mockLedgerForTracker)
roundDeltas := make(map[basics.Round]ledgercore.StateDelta)
-
- isCatchpointRound := func(rnd basics.Round) bool {
- return (uint64(rnd) >= cfg.MaxAcctLookback) &&
- (uint64(rnd)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) &&
- ((uint64(rnd)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0)
- }
- isDataFileRound := func(rnd basics.Round) bool {
- return ((uint64(rnd)-cfg.MaxAcctLookback+protoParams.CatchpointLookback)%cfg.CatchpointInterval == 0)
- }
-
- i := basics.Round(0)
numCatchpointsCreated := 0
+ i := basics.Round(0)
lastCatchpointLabel := ""
+
for numCatchpointsCreated < testCatchpointLabelsCount {
i++
rewardLevelDelta := crypto.RandUint64() % 5
@@ -558,24 +547,16 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables)
delta.Totals = newTotals
+
ml.addBlock(blockEntry{block: blk}, delta)
+
accts = append(accts, newAccts)
rewardsLevels = append(rewardsLevels, rewardLevel)
roundDeltas[i] = delta
- // determine if there is a data file round and commit
- if isDataFileRound(i) || isCatchpointRound(i) {
- ml.trackers.committedUpTo(i)
- ml.trackers.waitAccountsWriting()
-
- // Let catchpoint data generation finish so that nothing gets skipped.
- for ct.IsWritingCatchpointDataFile() {
- time.Sleep(time.Millisecond)
- }
- }
-
// If we made a catchpoint, save the label.
- if isCatchpointRound(i) {
+ if (uint64(i) >= cfg.MaxAcctLookback) && (uint64(i)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) && ((uint64(i)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0) {
+ ml.trackers.waitAccountsWriting()
catchpointLabels[i] = ct.GetLastCatchpointLabel()
require.NotEmpty(t, catchpointLabels[i])
require.NotEqual(t, lastCatchpointLabel, catchpointLabels[i])
@@ -584,19 +565,24 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
defer ledgerHistory[i].Close()
numCatchpointsCreated++
}
+
+ // Let catchpoint data generation finish so that nothing gets skipped.
+ for ct.IsWritingCatchpointDataFile() {
+ time.Sleep(time.Millisecond)
+ }
}
lastRound := i
// Test in reverse what happens when we try to repeat the exact same blocks.
// Start off with the catchpoint before the last one.
- for rnd := lastRound - basics.Round(cfg.CatchpointInterval); uint64(rnd) > protoParams.CatchpointLookback; rnd -= basics.Round(cfg.CatchpointInterval) {
+ for startingRound := lastRound - basics.Round(cfg.CatchpointInterval); uint64(startingRound) > protoParams.CatchpointLookback; startingRound -= basics.Round(cfg.CatchpointInterval) {
au.close()
- ml2 := ledgerHistory[rnd]
+ ml2 := ledgerHistory[startingRound]
require.NotNil(t, ml2)
ct2 := newCatchpointTracker(t, ml2, cfg, ".")
defer ct2.close()
- for i := rnd + 1; i <= lastRound; i++ {
+ for i := startingRound + 1; i <= lastRound; i++ {
blk := bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: basics.Round(i),
@@ -608,18 +594,16 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
ml2.addBlock(blockEntry{block: blk}, delta)
- if isDataFileRound(i) || isCatchpointRound(i) {
- ml2.trackers.committedUpTo(i)
- ml2.trackers.waitAccountsWriting()
- // Let catchpoint data generation finish so that nothing gets skipped.
- for ct.IsWritingCatchpointDataFile() {
- time.Sleep(time.Millisecond)
- }
- }
// if this is a catchpoint round, check the label.
- if isCatchpointRound(i) {
+ if (uint64(i) >= cfg.MaxAcctLookback) && (uint64(i)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) && ((uint64(i)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0) {
+ ml2.trackers.waitAccountsWriting()
require.Equal(t, catchpointLabels[i], ct2.GetLastCatchpointLabel())
}
+
+ // Let catchpoint data generation finish so that nothing gets skipped.
+ for ct.IsWritingCatchpointDataFile() {
+ time.Sleep(time.Millisecond)
+ }
}
}
@@ -645,7 +629,6 @@ type blockingTracker struct {
committedUpToRound int64
alwaysLock bool
shouldLockPostCommit bool
- shouldLockPostCommitUnlocked bool
}
// loadFromDisk is not implemented in the blockingTracker.
@@ -688,7 +671,7 @@ func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitCo
// postCommitUnlocked implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
- if bt.alwaysLock || dcc.catchpointFirstStage || bt.shouldLockPostCommitUnlocked {
+ if bt.alwaysLock || dcc.catchpointFirstStage {
bt.postCommitUnlockedEntryLock <- struct{}{}
<-bt.postCommitUnlockedReleaseLock
}
@@ -705,7 +688,7 @@ func (bt *blockingTracker) close() {
func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
partitiontest.PartitionTest(t)
- testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestCatchpointTrackerNonblockingCatchpointWriting")
+ testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestReproducibleCatchpointLabels")
protoParams := config.Consensus[protocol.ConsensusCurrentVersion]
protoParams.EnableCatchpointsWithSPContexts = true
protoParams.CatchpointLookback = protoParams.MaxBalLookback
@@ -848,94 +831,6 @@ func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
}
}
-// TestCatchpointTrackerWaitNotBlocking checks a tracker with long postCommitUnlocked does not block blockq (notifyCommit) goroutine
-func TestCatchpointTrackerWaitNotBlocking(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 10)
- const inMem = true
- log := logging.TestingLog(t)
- log.SetLevel(logging.Warn)
- cfg := config.GetDefaultLocal()
- cfg.Archival = true
- ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
- require.NoError(t, err)
- defer ledger.Close()
-
- writeStallingTracker := &blockingTracker{
- postCommitUnlockedEntryLock: make(chan struct{}),
- postCommitUnlockedReleaseLock: make(chan struct{}),
- shouldLockPostCommitUnlocked: true,
- }
- ledger.trackerMu.Lock()
- ledger.trackers.mu.Lock()
- ledger.trackers.trackers = append(ledger.trackers.trackers, writeStallingTracker)
- ledger.trackers.mu.Unlock()
- ledger.trackerMu.Unlock()
-
- startRound := ledger.Latest() + 1
- endRound := basics.Round(20)
- addBlockDone := make(chan struct{})
-
- // release the blocking tracker when the test is done
- defer func() {
- // unblocking from another goroutine is a bit complicated:
- // this function should not quit until postCommitUnlockedReleaseLock is consumed
- // to do that, write to it first and do not exit until consumed,
- // otherwise we might exit and leave the tracker registry's syncer goroutine blocked
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
- wg.Done()
- }()
-
- // consume to unblock
- <-writeStallingTracker.postCommitUnlockedEntryLock
- // disable further blocking
- writeStallingTracker.shouldLockPostCommitUnlocked = false
-
- // wait the writeStallingTracker.postCommitUnlockedReleaseLock passes
- wg.Wait()
-
- // at the end, what while the addBlock goroutine finishes
- // consume to unblock
- <-addBlockDone
- }()
-
- // tracker commits are now blocked, add some blocks
- timer := time.NewTimer(1 * time.Second)
- go func() {
- defer close(addBlockDone)
- blk := genesisInitState.Block
- for rnd := startRound; rnd <= endRound; rnd++ {
- blk.BlockHeader.Round = rnd
- blk.BlockHeader.TimeStamp = int64(blk.BlockHeader.Round)
- err := ledger.AddBlock(blk, agreement.Certificate{})
- require.NoError(t, err)
- }
- }()
-
- select {
- case <-timer.C:
- require.FailNow(t, "timeout")
- case <-addBlockDone:
- }
-
- // switch context one more time to give the blockqueue syncer to run
- time.Sleep(1 * time.Millisecond)
-
- // ensure Ledger.Wait() is non-blocked for all rounds except the last one (due to possible races)
- for rnd := startRound; rnd < endRound; rnd++ {
- done := ledger.Wait(rnd)
- select {
- case <-done:
- default:
- require.FailNow(t, fmt.Sprintf("Wait(%d) is blocked", rnd))
- }
- }
-}
-
func TestCalculateFirstStageRounds(t *testing.T) {
partitiontest.PartitionTest(t)
@@ -1023,7 +918,7 @@ func TestCalculateCatchpointRounds(t *testing.T) {
// Test that pruning first stage catchpoint database records and catchpoint data files
// works.
-func TestCatchpointFirstStageInfoPruning(t *testing.T) {
+func TestFirstStageInfoPruning(t *testing.T) {
partitiontest.PartitionTest(t)
// create new protocol version, which has lower lookback
@@ -1057,15 +952,6 @@ func TestCatchpointFirstStageInfoPruning(t *testing.T) {
expectedNumEntries := protoParams.CatchpointLookback / cfg.CatchpointInterval
- isCatchpointRound := func(rnd basics.Round) bool {
- return (uint64(rnd) >= cfg.MaxAcctLookback) &&
- (uint64(rnd)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) &&
- ((uint64(rnd)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0)
- }
- isDataFileRound := func(rnd basics.Round) bool {
- return ((uint64(rnd)-cfg.MaxAcctLookback+protoParams.CatchpointLookback)%cfg.CatchpointInterval == 0)
- }
-
numCatchpointsCreated := uint64(0)
i := basics.Round(0)
lastCatchpointLabel := ""
@@ -1085,21 +971,18 @@ func TestCatchpointFirstStageInfoPruning(t *testing.T) {
ml.addBlock(blockEntry{block: blk}, delta)
- if isDataFileRound(i) || isCatchpointRound(i) {
- ml.trackers.committedUpTo(i)
+ if (uint64(i) >= cfg.MaxAcctLookback) && (uint64(i)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) && ((uint64(i)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0) {
ml.trackers.waitAccountsWriting()
- // Let catchpoint data generation finish so that nothing gets skipped.
- for ct.IsWritingCatchpointDataFile() {
- time.Sleep(time.Millisecond)
- }
- }
-
- if isCatchpointRound(i) {
catchpointLabel := ct.GetLastCatchpointLabel()
require.NotEqual(t, lastCatchpointLabel, catchpointLabel)
lastCatchpointLabel = catchpointLabel
numCatchpointsCreated++
}
+
+ // Let catchpoint data generation finish so that nothing gets skipped.
+ for ct.IsWritingCatchpointDataFile() {
+ time.Sleep(time.Millisecond)
+ }
}
numEntries := uint64(0)
@@ -1127,7 +1010,7 @@ func TestCatchpointFirstStageInfoPruning(t *testing.T) {
// Test that on startup the catchpoint tracker restarts catchpoint's first stage if
// there is an unfinished first stage record in the database.
-func TestCatchpointFirstStagePersistence(t *testing.T) {
+func TestFirstStagePersistence(t *testing.T) {
partitiontest.PartitionTest(t)
// create new protocol version, which has lower lookback
@@ -1172,7 +1055,7 @@ func TestCatchpointFirstStagePersistence(t *testing.T) {
ml.addBlock(blockEntry{block: blk}, delta)
}
- ml.trackers.committedUpTo(firstStageRound)
+
ml.trackers.waitAccountsWriting()
// Check that the data file exists.
@@ -1228,7 +1111,7 @@ func TestCatchpointFirstStagePersistence(t *testing.T) {
// Test that on startup the catchpoint tracker restarts catchpoint's second stage if
// there is an unfinished catchpoint record in the database.
-func TestCatchpointSecondStagePersistence(t *testing.T) {
+func TestSecondStagePersistence(t *testing.T) {
partitiontest.PartitionTest(t)
// create new protocol version, which has lower lookback
@@ -1258,15 +1141,6 @@ func TestCatchpointSecondStagePersistence(t *testing.T) {
t, ml, cfg, filepath.Join(tempDirectory, config.LedgerFilenamePrefix))
defer ct.close()
- isCatchpointRound := func(rnd basics.Round) bool {
- return (uint64(rnd) >= cfg.MaxAcctLookback) &&
- (uint64(rnd)-cfg.MaxAcctLookback > protoParams.CatchpointLookback) &&
- ((uint64(rnd)-cfg.MaxAcctLookback)%cfg.CatchpointInterval == 0)
- }
- isDataFileRound := func(rnd basics.Round) bool {
- return ((uint64(rnd)-cfg.MaxAcctLookback+protoParams.CatchpointLookback)%cfg.CatchpointInterval == 0)
- }
-
secondStageRound := basics.Round(36)
firstStageRound := secondStageRound - basics.Round(protoParams.CatchpointLookback)
catchpointDataFilePath :=
@@ -1300,16 +1174,14 @@ func TestCatchpointSecondStagePersistence(t *testing.T) {
ml.addBlock(blockEntry{block: blk}, delta)
- if isDataFileRound(i) || isCatchpointRound(i) {
- ml.trackers.committedUpTo(i)
- ml.trackers.waitAccountsWriting()
- // Let catchpoint data generation finish so that nothing gets skipped.
- for ct.IsWritingCatchpointDataFile() {
- time.Sleep(time.Millisecond)
- }
+ // Let catchpoint data generation finish so that nothing gets skipped.
+ for ct.IsWritingCatchpointDataFile() {
+ time.Sleep(time.Millisecond)
}
}
+ ml.trackers.waitAccountsWriting()
+
// Check that the data file exists.
catchpointFilePath :=
filepath.Join(catchpointsDirectory, trackerdb.MakeCatchpointFilePath(secondStageRound))
@@ -1374,7 +1246,7 @@ func TestCatchpointSecondStagePersistence(t *testing.T) {
// Test that when catchpoint's first stage record is unavailable
// (e.g. catchpoints were disabled at first stage), the unfinished catchpoint
// database record is deleted.
-func TestCatchpointSecondStageDeletesUnfinishedCatchpointRecord(t *testing.T) {
+func TestSecondStageDeletesUnfinishedCatchpointRecord(t *testing.T) {
partitiontest.PartitionTest(t)
// create new protocol version, which has lower lookback
@@ -1463,7 +1335,7 @@ func TestCatchpointSecondStageDeletesUnfinishedCatchpointRecord(t *testing.T) {
// Test that on startup the catchpoint tracker deletes the unfinished catchpoint
// database record when the first stage database record is missing.
-func TestCatchpointSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *testing.T) {
+func TestSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *testing.T) {
partitiontest.PartitionTest(t)
// create new protocol version, which has lower lookback
@@ -1767,8 +1639,8 @@ func TestCatchpointFastUpdates(t *testing.T) {
ml.trackers.committedUpTo(round)
}(i)
}
- wg.Wait()
ml.trackers.waitAccountsWriting()
+ wg.Wait()
require.NotEmpty(t, ct.GetLastCatchpointLabel())
}
diff --git a/ledger/tracker.go b/ledger/tracker.go
index 6eab2e1d0..58fde5d8f 100644
--- a/ledger/tracker.go
+++ b/ledger/tracker.go
@@ -411,40 +411,14 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round)
// ( unless we're creating a catchpoint, in which case we want to flush it right away
// so that all the instances of the catchpoint would contain exactly the same data )
flushTime := time.Now()
-
- // Some tracker want to flush
- if dcc != nil {
- // skip this flush if none of these conditions met:
- // - has it been at least balancesFlushInterval since the last flush?
- flushIntervalPassed := flushTime.After(tr.lastFlushTime.Add(balancesFlushInterval))
- // - does this commit task also include catchpoint file creation activity for the dcc.oldBase+dcc.offset?
- flushForCatchpoint := dcc.catchpointFirstStage || dcc.catchpointSecondStage
- // - have more than pendingDeltasFlushThreshold accounts been modified since the last flush?
- flushAccounts := dcc.pendingDeltas >= pendingDeltasFlushThreshold
- if !(flushIntervalPassed || flushForCatchpoint || flushAccounts) {
- dcc = nil
- }
+ if dcc != nil && !flushTime.After(tr.lastFlushTime.Add(balancesFlushInterval)) && !dcc.catchpointFirstStage && !dcc.catchpointSecondStage && dcc.pendingDeltas < pendingDeltasFlushThreshold {
+ dcc = nil
}
tr.mu.RUnlock()
if dcc != nil {
- // Increment the waitgroup first, otherwise this goroutine can be interrupted
- // and commitSyncer attempts calling Done() on empty wait group.
tr.accountsWriting.Add(1)
- select {
- case tr.deferredCommits <- dcc:
- default:
- // Do NOT block if deferredCommits cannot accept this task, skip it.
- // Note: the next attempt will include these rounds plus some extra rounds.
- // The main reason for slow commits is catchpoint file creation (when commitSyncer calls
- // commitRound, which calls postCommitUnlocked). This producer thread is called by
- // blockQueue.syncer() upon successful block DB flush, which calls ledger.notifyCommit()
- // and trackerRegistry.committedUpTo() after taking the trackerMu.Lock().
- // This means a blocking write to deferredCommits will block Ledger reads (TODO use more fine-grained locks).
- // Dropping this dcc allows the blockqueue syncer to continue persisting other blocks
- // and ledger reads to proceed without being blocked by trackerMu lock.
- tr.accountsWriting.Done()
- }
+ tr.deferredCommits <- dcc
}
}
diff --git a/ledger/txtail_test.go b/ledger/txtail_test.go
index a21af5120..ba4a09755 100644
--- a/ledger/txtail_test.go
+++ b/ledger/txtail_test.go
@@ -19,6 +19,7 @@ package ledger
import (
"context"
"errors"
+ "fmt"
"testing"
"github.com/stretchr/testify/require"
@@ -258,6 +259,7 @@ func TestTxTailDeltaTracking(t *testing.T) {
err := txtail.loadFromDisk(&ledger, ledger.Latest())
require.NoError(t, err)
+ fmt.Printf("%d, %s\n", len(txtail.recent), protoVersion)
require.Equal(t, int(config.Consensus[protoVersion].MaxTxnLife), len(txtail.recent))
require.Equal(t, testTxTailValidityRange, len(txtail.lastValid))
require.Equal(t, ledger.Latest(), txtail.lowWaterMark)