summaryrefslogtreecommitdiff
path: root/ledger/catchpointtracker_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'ledger/catchpointtracker_test.go')
-rw-r--r--ledger/catchpointtracker_test.go202
1 files changed, 202 insertions, 0 deletions
diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go
index 64db5f275..d2188f2e3 100644
--- a/ledger/catchpointtracker_test.go
+++ b/ledger/catchpointtracker_test.go
@@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"runtime"
+ "sync/atomic"
"testing"
"time"
@@ -33,8 +34,10 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
+ "github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/ledger/ledgercore"
ledgertesting "github.com/algorand/go-algorand/ledger/testing"
+ "github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)
@@ -413,3 +416,202 @@ func TestCatchpointTrackerPrepareCommit(t *testing.T) {
}
}
}
+
+// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
+type blockingTracker struct {
+ postCommitUnlockedEntryLock chan struct{}
+ postCommitUnlockedReleaseLock chan struct{}
+ postCommitEntryLock chan struct{}
+ postCommitReleaseLock chan struct{}
+ committedUpToRound int64
+}
+
+// loadFromDisk is not implemented in the blockingTracker.
+func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error {
+ return nil
+}
+
+// newBlock is not implemented in the blockingTracker.
+func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
+}
+
+// committedUpTo in the blockingTracker just stores the committed round.
+func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
+ atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd))
+ return committedRnd, basics.Round(0)
+}
+
+// produceCommittingTask is not used by the blockingTracker
+func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
+ return dcr
+}
+
+// prepareCommit, is not used by the blockingTracker
+func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error {
+ return nil
+}
+
+// commitRound is not used by the blockingTracker
+func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommitContext) error {
+ return nil
+}
+
+// postCommit implements entry/exit blockers, designed for testing.
+func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
+ if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
+ bt.postCommitEntryLock <- struct{}{}
+ <-bt.postCommitReleaseLock
+ }
+}
+
+// postCommitUnlocked implements entry/exit blockers, designed for testing.
+func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
+ if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
+ bt.postCommitUnlockedEntryLock <- struct{}{}
+ <-bt.postCommitUnlockedReleaseLock
+ }
+}
+
+// handleUnorderedCommit is not used by the blockingTracker
+func (bt *blockingTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
+}
+
+// close is not used by the blockingTracker
+func (bt *blockingTracker) close() {
+}
+
+func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 10)
+ const inMem = true
+ log := logging.TestingLog(t)
+ log.SetLevel(logging.Warn)
+ cfg := config.GetDefaultLocal()
+ cfg.Archival = true
+ cfg.CatchpointInterval = 2
+ ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
+ require.NoError(t, err, "could not open ledger")
+ defer ledger.Close()
+
+ writeStallingTracker := &blockingTracker{
+ postCommitUnlockedEntryLock: make(chan struct{}),
+ postCommitUnlockedReleaseLock: make(chan struct{}),
+ postCommitEntryLock: make(chan struct{}),
+ postCommitReleaseLock: make(chan struct{}),
+ }
+ ledger.trackerMu.Lock()
+ ledger.trackers.mu.Lock()
+ ledger.trackers.trackers = append(ledger.trackers.trackers, writeStallingTracker)
+ ledger.trackers.mu.Unlock()
+ ledger.trackerMu.Unlock()
+
+ proto := config.Consensus[protocol.ConsensusCurrentVersion]
+
+ // create the first MaxBalLookback blocks
+ for rnd := ledger.Latest() + 1; rnd <= basics.Round(proto.MaxBalLookback); rnd++ {
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ }
+
+ // make sure to get to a catchpoint round, and block the writing there.
+ for {
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 {
+ // release the entry lock for postCommit
+ <-writeStallingTracker.postCommitEntryLock
+
+ // release the exit lock for postCommit
+ writeStallingTracker.postCommitReleaseLock <- struct{}{}
+
+ // wait until we're blocked by the stalling tracker.
+ <-writeStallingTracker.postCommitUnlockedEntryLock
+ break
+ }
+ }
+
+ // write additional block, so that the block queue would trigger that too
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ // wait for the committedUpToRound to be called with the correct round number.
+ for {
+ committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound)
+ if basics.Round(committedUpToRound) == ledger.Latest() {
+ break
+ }
+ time.Sleep(1 * time.Millisecond)
+ }
+
+ lookupDone := make(chan struct{})
+ // now that we're blocked the tracker, try to call LookupAgreement and confirm it returns almost immediately
+ go func() {
+ defer close(lookupDone)
+ ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink)
+ }()
+
+ select {
+ case <-lookupDone:
+ // we expect it not to get stuck, even when the postCommitUnlocked is stuck.
+ case <-time.After(25 * time.Second):
+ require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker")
+ }
+ // let the goroutines complete.
+ // release the exit lock for postCommit
+ writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
+
+ // test false positive : we want to ensure that without releasing the postCommit lock, the LookupAgreemnt would not be able to return within 1 second.
+
+ // make sure to get to a catchpoint round, and block the writing there.
+ for {
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 {
+ // release the entry lock for postCommit
+ <-writeStallingTracker.postCommitEntryLock
+ break
+ }
+ }
+ // write additional block, so that the block queue would trigger that too
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ // wait for the committedUpToRound to be called with the correct round number.
+ for {
+ committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound)
+ if basics.Round(committedUpToRound) == ledger.Latest() {
+ break
+ }
+ time.Sleep(1 * time.Millisecond)
+ }
+
+ lookupDone = make(chan struct{})
+ // now that we're blocked the tracker, try to call LookupAgreement and confirm it's not returning within 1 second.
+ go func() {
+ defer close(lookupDone)
+ ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink)
+ }()
+
+ select {
+ case <-lookupDone:
+ require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker")
+ case <-time.After(5 * time.Second):
+ // this one was "stuck" for over five second ( as expected )
+ }
+ // let the goroutines complete.
+ // release the exit lock for postCommit
+ writeStallingTracker.postCommitReleaseLock <- struct{}{}
+
+ // wait until we're blocked by the stalling tracker.
+ <-writeStallingTracker.postCommitUnlockedEntryLock
+ // release the blocker.
+ writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
+
+ // confirm that we get released quickly.
+ select {
+ case <-lookupDone:
+ // now that all the blocker have been removed, we should be able to complete
+ // the LookupAgreement call.
+ case <-time.After(30 * time.Second):
+ require.FailNow(t, "The LookupAgreement wasn't getting release as expected by the blocked tracker")
+ }
+}