diff options
Diffstat (limited to 'ledger/catchpointtracker_test.go')
-rw-r--r-- | ledger/catchpointtracker_test.go | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 64db5f275..d2188f2e3 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "runtime" + "sync/atomic" "testing" "time" @@ -33,8 +34,10 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/ledger/ledgercore" ledgertesting "github.com/algorand/go-algorand/ledger/testing" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -413,3 +416,202 @@ func TestCatchpointTrackerPrepareCommit(t *testing.T) { } } } + +// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked. +type blockingTracker struct { + postCommitUnlockedEntryLock chan struct{} + postCommitUnlockedReleaseLock chan struct{} + postCommitEntryLock chan struct{} + postCommitReleaseLock chan struct{} + committedUpToRound int64 +} + +// loadFromDisk is not implemented in the blockingTracker. +func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error { + return nil +} + +// newBlock is not implemented in the blockingTracker. +func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { +} + +// committedUpTo in the blockingTracker just stores the committed round. +func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { + atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd)) + return committedRnd, basics.Round(0) +} + +// produceCommittingTask is not used by the blockingTracker +func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { + return dcr +} + +// prepareCommit, is not used by the blockingTracker +func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error { + return nil +} + +// commitRound is not used by the blockingTracker +func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommitContext) error { + return nil +} + +// postCommit implements entry/exit blockers, designed for testing. +func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { + if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + bt.postCommitEntryLock <- struct{}{} + <-bt.postCommitReleaseLock + } +} + +// postCommitUnlocked implements entry/exit blockers, designed for testing. +func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { + if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + bt.postCommitUnlockedEntryLock <- struct{}{} + <-bt.postCommitUnlockedReleaseLock + } +} + +// handleUnorderedCommit is not used by the blockingTracker +func (bt *blockingTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) { +} + +// close is not used by the blockingTracker +func (bt *blockingTracker) close() { +} + +func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) { + partitiontest.PartitionTest(t) + + genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 10) + const inMem = true + log := logging.TestingLog(t) + log.SetLevel(logging.Warn) + cfg := config.GetDefaultLocal() + cfg.Archival = true + cfg.CatchpointInterval = 2 + ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) + require.NoError(t, err, "could not open ledger") + defer ledger.Close() + + writeStallingTracker := &blockingTracker{ + postCommitUnlockedEntryLock: make(chan struct{}), + postCommitUnlockedReleaseLock: make(chan struct{}), + postCommitEntryLock: make(chan struct{}), + postCommitReleaseLock: make(chan struct{}), + } + ledger.trackerMu.Lock() + ledger.trackers.mu.Lock() + ledger.trackers.trackers = append(ledger.trackers.trackers, writeStallingTracker) + ledger.trackers.mu.Unlock() + ledger.trackerMu.Unlock() + + proto := config.Consensus[protocol.ConsensusCurrentVersion] + + // create the first MaxBalLookback blocks + for rnd := ledger.Latest() + 1; rnd <= basics.Round(proto.MaxBalLookback); rnd++ { + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + } + + // make sure to get to a catchpoint round, and block the writing there. + for { + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 { + // release the entry lock for postCommit + <-writeStallingTracker.postCommitEntryLock + + // release the exit lock for postCommit + writeStallingTracker.postCommitReleaseLock <- struct{}{} + + // wait until we're blocked by the stalling tracker. + <-writeStallingTracker.postCommitUnlockedEntryLock + break + } + } + + // write additional block, so that the block queue would trigger that too + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + // wait for the committedUpToRound to be called with the correct round number. + for { + committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound) + if basics.Round(committedUpToRound) == ledger.Latest() { + break + } + time.Sleep(1 * time.Millisecond) + } + + lookupDone := make(chan struct{}) + // now that we're blocked the tracker, try to call LookupAgreement and confirm it returns almost immediately + go func() { + defer close(lookupDone) + ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink) + }() + + select { + case <-lookupDone: + // we expect it not to get stuck, even when the postCommitUnlocked is stuck. + case <-time.After(25 * time.Second): + require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker") + } + // let the goroutines complete. + // release the exit lock for postCommit + writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{} + + // test false positive : we want to ensure that without releasing the postCommit lock, the LookupAgreemnt would not be able to return within 1 second. + + // make sure to get to a catchpoint round, and block the writing there. + for { + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 { + // release the entry lock for postCommit + <-writeStallingTracker.postCommitEntryLock + break + } + } + // write additional block, so that the block queue would trigger that too + err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{}) + require.NoError(t, err) + // wait for the committedUpToRound to be called with the correct round number. + for { + committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound) + if basics.Round(committedUpToRound) == ledger.Latest() { + break + } + time.Sleep(1 * time.Millisecond) + } + + lookupDone = make(chan struct{}) + // now that we're blocked the tracker, try to call LookupAgreement and confirm it's not returning within 1 second. + go func() { + defer close(lookupDone) + ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink) + }() + + select { + case <-lookupDone: + require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker") + case <-time.After(5 * time.Second): + // this one was "stuck" for over five second ( as expected ) + } + // let the goroutines complete. + // release the exit lock for postCommit + writeStallingTracker.postCommitReleaseLock <- struct{}{} + + // wait until we're blocked by the stalling tracker. + <-writeStallingTracker.postCommitUnlockedEntryLock + // release the blocker. + writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{} + + // confirm that we get released quickly. + select { + case <-lookupDone: + // now that all the blocker have been removed, we should be able to complete + // the LookupAgreement call. + case <-time.After(30 * time.Second): + require.FailNow(t, "The LookupAgreement wasn't getting release as expected by the blocked tracker") + } +} |