From a4f9ebe2a8708f796acdd1eed1c5d1d020f4002f Mon Sep 17 00:00:00 2001 From: Tsachi Herman Date: Wed, 15 Dec 2021 16:12:12 -0500 Subject: ledger: perform the catchpoint writing outside the trackers lock. (#3311) ## Summary This PR moves the catchpoint file writing to be performed outside of the trackers lock. This resolves the issue where a long catchpoint file writing blocks the agreement from validating and propagating votes. ## Test Plan * [x] Test manually & use existing tests. * [x] Implement a unit test * [x] Deploy a local network where the catchpoint writing takes a long time and verify it doesn't get blocked during catchpoint writing. --- ledger/acctupdates.go | 3 + ledger/bulletin.go | 3 + ledger/catchpointtracker.go | 3 + ledger/catchpointtracker_test.go | 202 +++++++++++++++++++++++++++++++++++++++ ledger/metrics.go | 3 + ledger/notifier.go | 3 + ledger/tracker.go | 11 +++ ledger/txtail.go | 3 + 8 files changed, 231 insertions(+) diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index 16d933fbb..fa1537513 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -1241,6 +1241,9 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon } } +func (au *accountUpdates) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { +} + // compactCreatableDeltas takes an array of creatables map deltas ( one array entry per round ), and compact the array into a single // map that contains all the deltas changes. While doing that, the function eliminate any intermediate changes. // It counts the number of changes per round by specifying it in the ndeltas field of the modifiedCreatable. diff --git a/ledger/bulletin.go b/ledger/bulletin.go index 1e95ee2ab..b18813007 100644 --- a/ledger/bulletin.go +++ b/ledger/bulletin.go @@ -120,6 +120,9 @@ func (b *bulletin) commitRound(context.Context, *sql.Tx, *deferredCommitContext) func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) { } +func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { +} + func (b *bulletin) handleUnorderedCommit(uint64, basics.Round, basics.Round) { } func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index e70b526da..ab03db4ef 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -393,12 +393,15 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit ct.roundDigest = ct.roundDigest[dcc.offset:] ct.catchpointsMu.Unlock() +} +func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { if dcc.isCatchpointRound && ct.archivalLedger && dcc.catchpointLabel != "" { // generate the catchpoint file. This need to be done inline so that it will block any new accounts that from being written. // the generateCatchpoint expects that the accounts data would not be modified in the background during it's execution. ct.generateCatchpoint(ctx, basics.Round(dcc.offset)+dcc.oldBase+dcc.lookback, dcc.catchpointLabel, dcc.committedRoundDigest, dcc.updatingBalancesDuration) } + // in scheduleCommit, we expect that this function to update the catchpointWriting when // it's on a catchpoint round and it's an archival ledger. Doing this in a deferred function // here would prevent us from "forgetting" to update this variable later on. diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 64db5f275..d2188f2e3 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "runtime" + "sync/atomic" "testing" "time" @@ -33,8 +34,10 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/ledger/ledgercore" ledgertesting "github.com/algorand/go-algorand/ledger/testing" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -413,3 +416,202 @@ func TestCatchpointTrackerPrepareCommit(t *testing.T) { } } } + +// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked. +type blockingTracker struct { + postCommitUnlockedEntryLock chan struct{} + postCommitUnlockedReleaseLock chan struct{} + postCommitEntryLock chan struct{} + postCommitReleaseLock chan struct{} + committedUpToRound int64 +} + +// loadFromDisk is not implemented in the blockingTracker. +func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error { + return nil +} + +// newBlock is not implemented in the blockingTracker. +func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { +} + +// committedUpTo in the blockingTracker just stores the committed round. +func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { + atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd)) + return committedRnd, basics.Round(0) +} + +// produceCommittingTask is not used by the blockingTracker +func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { + return dcr +} + +// prepareCommit, is not used by the blockingTracker +func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error { + return nil +} + +// commitRound is not used by the blockingTracker +func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommitContext) error { + return nil +} + +// postCommit implements entry/exit blockers, designed for testing. +func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { + if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + bt.postCommitEntryLock <- struct{}{} + <-bt.postCommitReleaseLock + } +} + +// postCommitUnlocked implements entry/exit blockers, designed for testing. +func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { + if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + bt.postCommitUnlockedEntryLock <- struct{}{} + <-bt.postCommitUnlockedReleaseLock + } +} + +// handleUnorderedCommit is not used by the blockingTracker +func (bt *blockingTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) { +} + +// close is not used by the blockingTracker +func (bt *blockingTracker) close() { +} + +func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) { + partitiontest.PartitionTest(t) + + genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 10) + const inMem = true + log := logging.TestingLog(t) + log.SetLevel(logging.Warn) + cfg := config.GetDefaultLocal() + cfg.Archival = true + cfg.CatchpointInterval = 2 + ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) + require.NoError(t, err, "could not open ledger") + defer ledger.Close() + + writeStallingTracker := &blockingTracker{ + postCommitUnlockedEntryLock: make(chan struct{}), + postCommitUnlockedReleaseLock: make(chan struct{}), + postCommitEntryLock: make(chan struct{}), + postCommitReleaseLock: make(chan struct{}), + } + ledger.trackerMu.Lock() + ledger.trackers.mu.Lock() + ledger.trackers.trackers = append(ledger.trackers.trackers, writeStallingTracker) + ledger.trackers.mu.Unlock() + ledger.trackerMu.Unlock() + + proto := config.Consensus[protocol.ConsensusCurrentVersion] + + // create the first MaxBalLookback blocks + for rnd := ledger.Latest() + 1; rnd <= basics.Round(proto.MaxBalLookback); rnd++ { + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + } + + // make sure to get to a catchpoint round, and block the writing there. + for { + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 { + // release the entry lock for postCommit + <-writeStallingTracker.postCommitEntryLock + + // release the exit lock for postCommit + writeStallingTracker.postCommitReleaseLock <- struct{}{} + + // wait until we're blocked by the stalling tracker. + <-writeStallingTracker.postCommitUnlockedEntryLock + break + } + } + + // write additional block, so that the block queue would trigger that too + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + // wait for the committedUpToRound to be called with the correct round number. + for { + committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound) + if basics.Round(committedUpToRound) == ledger.Latest() { + break + } + time.Sleep(1 * time.Millisecond) + } + + lookupDone := make(chan struct{}) + // now that we're blocked the tracker, try to call LookupAgreement and confirm it returns almost immediately + go func() { + defer close(lookupDone) + ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink) + }() + + select { + case <-lookupDone: + // we expect it not to get stuck, even when the postCommitUnlocked is stuck. + case <-time.After(25 * time.Second): + require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker") + } + // let the goroutines complete. + // release the exit lock for postCommit + writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{} + + // test false positive : we want to ensure that without releasing the postCommit lock, the LookupAgreemnt would not be able to return within 1 second. + + // make sure to get to a catchpoint round, and block the writing there. + for { + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 { + // release the entry lock for postCommit + <-writeStallingTracker.postCommitEntryLock + break + } + } + // write additional block, so that the block queue would trigger that too + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + // wait for the committedUpToRound to be called with the correct round number. + for { + committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound) + if basics.Round(committedUpToRound) == ledger.Latest() { + break + } + time.Sleep(1 * time.Millisecond) + } + + lookupDone = make(chan struct{}) + // now that we're blocked the tracker, try to call LookupAgreement and confirm it's not returning within 1 second. + go func() { + defer close(lookupDone) + ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink) + }() + + select { + case <-lookupDone: + require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker") + case <-time.After(5 * time.Second): + // this one was "stuck" for over five second ( as expected ) + } + // let the goroutines complete. + // release the exit lock for postCommit + writeStallingTracker.postCommitReleaseLock <- struct{}{} + + // wait until we're blocked by the stalling tracker. + <-writeStallingTracker.postCommitUnlockedEntryLock + // release the blocker. + writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{} + + // confirm that we get released quickly. + select { + case <-lookupDone: + // now that all the blocker have been removed, we should be able to complete + // the LookupAgreement call. + case <-time.After(30 * time.Second): + require.FailNow(t, "The LookupAgreement wasn't getting release as expected by the blocked tracker") + } +} diff --git a/ledger/metrics.go b/ledger/metrics.go index 55a84d563..8c7e5dcc7 100644 --- a/ledger/metrics.go +++ b/ledger/metrics.go @@ -65,6 +65,9 @@ func (mt *metricsTracker) commitRound(context.Context, *sql.Tx, *deferredCommitC func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { } +func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { +} + func (mt *metricsTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) { } func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { diff --git a/ledger/notifier.go b/ledger/notifier.go index e922c73e1..db1347140 100644 --- a/ledger/notifier.go +++ b/ledger/notifier.go @@ -125,6 +125,9 @@ func (bn *blockNotifier) commitRound(context.Context, *sql.Tx, *deferredCommitCo func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitContext) { } +func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { +} + func (bn *blockNotifier) handleUnorderedCommit(uint64, basics.Round, basics.Round) { } diff --git a/ledger/tracker.go b/ledger/tracker.go index 855995665..44a255b03 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -109,6 +109,11 @@ type ledgerTracker interface { // has completed. An optional context is provided for long-running operations. postCommit(context.Context, *deferredCommitContext) + // postCommitUnlocked is called only on a successful commitRound. In that case, each of the trackers have + // the chance to make changes that aren't state-dependent. + // An optional context is provided for long-running operations. + postCommitUnlocked(context.Context, *deferredCommitContext) + // handleUnorderedCommit is a special method for handling deferred commits that are out of order. // Tracker might update own state in this case. For example, account updates tracker cancels // scheduled catchpoint writing that deferred commit. @@ -323,6 +328,8 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round) } if cdr != nil { dcc.deferredCommitRange = *cdr + } else { + dcc = nil } tr.mu.RLock() @@ -472,6 +479,10 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) { tr.lastFlushTime = dcc.flushTime tr.mu.Unlock() + for _, lt := range tr.trackers { + lt.postCommitUnlocked(tr.ctx, dcc) + } + } // initializeTrackerCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ). diff --git a/ledger/txtail.go b/ledger/txtail.go index a5d77e49c..4d596396f 100644 --- a/ledger/txtail.go +++ b/ledger/txtail.go @@ -168,6 +168,9 @@ func (t *txTail) commitRound(context.Context, *sql.Tx, *deferredCommitContext) e func (t *txTail) postCommit(ctx context.Context, dcc *deferredCommitContext) { } +func (t *txTail) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { +} + func (t *txTail) handleUnorderedCommit(uint64, basics.Round, basics.Round) { } -- cgit v1.2.3 From 4d853562f025ac4eb5a95affcc15e6db33985c49 Mon Sep 17 00:00:00 2001 From: John Lee Date: Mon, 27 Dec 2021 11:53:31 -0500 Subject: Bump version number --- buildnumber.dat | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildnumber.dat b/buildnumber.dat index 0cfbf0888..00750edc0 100644 --- a/buildnumber.dat +++ b/buildnumber.dat @@ -1 +1 @@ -2 +3 -- cgit v1.2.3