summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Lee <64482439+algojohnlee@users.noreply.github.com>2022-01-04 13:27:06 -0500
committerGitHub <noreply@github.com>2022-01-04 13:27:06 -0500
commitd2289a52d517b1e7e0a23b6936305520895d36d5 (patch)
tree291dbfd496fe58071be90e3d2f478c901e67e499
parent97e8068037c429f8ac866b0eae899593f9aa5d13 (diff)
parent201337a3adcab617f53500359fb8b11f458c5f79 (diff)
Merge pull request #3356 from Algo-devops-service/relstable3.2.3v3.2.3-stable
go-algorand 3.2.3-stable
-rw-r--r--buildnumber.dat2
-rw-r--r--ledger/acctupdates.go3
-rw-r--r--ledger/bulletin.go3
-rw-r--r--ledger/catchpointtracker.go3
-rw-r--r--ledger/catchpointtracker_test.go202
-rw-r--r--ledger/metrics.go3
-rw-r--r--ledger/notifier.go3
-rw-r--r--ledger/tracker.go11
-rw-r--r--ledger/txtail.go3
9 files changed, 232 insertions, 1 deletions
diff --git a/buildnumber.dat b/buildnumber.dat
index 0cfbf0888..00750edc0 100644
--- a/buildnumber.dat
+++ b/buildnumber.dat
@@ -1 +1 @@
-2
+3
diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go
index 16d933fbb..fa1537513 100644
--- a/ledger/acctupdates.go
+++ b/ledger/acctupdates.go
@@ -1241,6 +1241,9 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon
}
}
+func (au *accountUpdates) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
+}
+
// compactCreatableDeltas takes an array of creatables map deltas ( one array entry per round ), and compact the array into a single
// map that contains all the deltas changes. While doing that, the function eliminate any intermediate changes.
// It counts the number of changes per round by specifying it in the ndeltas field of the modifiedCreatable.
diff --git a/ledger/bulletin.go b/ledger/bulletin.go
index 1e95ee2ab..b18813007 100644
--- a/ledger/bulletin.go
+++ b/ledger/bulletin.go
@@ -120,6 +120,9 @@ func (b *bulletin) commitRound(context.Context, *sql.Tx, *deferredCommitContext)
func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}
+func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
+}
+
func (b *bulletin) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
}
func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go
index e70b526da..ab03db4ef 100644
--- a/ledger/catchpointtracker.go
+++ b/ledger/catchpointtracker.go
@@ -393,12 +393,15 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit
ct.roundDigest = ct.roundDigest[dcc.offset:]
ct.catchpointsMu.Unlock()
+}
+func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
if dcc.isCatchpointRound && ct.archivalLedger && dcc.catchpointLabel != "" {
// generate the catchpoint file. This need to be done inline so that it will block any new accounts that from being written.
// the generateCatchpoint expects that the accounts data would not be modified in the background during it's execution.
ct.generateCatchpoint(ctx, basics.Round(dcc.offset)+dcc.oldBase+dcc.lookback, dcc.catchpointLabel, dcc.committedRoundDigest, dcc.updatingBalancesDuration)
}
+
// in scheduleCommit, we expect that this function to update the catchpointWriting when
// it's on a catchpoint round and it's an archival ledger. Doing this in a deferred function
// here would prevent us from "forgetting" to update this variable later on.
diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go
index 64db5f275..d2188f2e3 100644
--- a/ledger/catchpointtracker_test.go
+++ b/ledger/catchpointtracker_test.go
@@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"runtime"
+ "sync/atomic"
"testing"
"time"
@@ -33,8 +34,10 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
+ "github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/ledger/ledgercore"
ledgertesting "github.com/algorand/go-algorand/ledger/testing"
+ "github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)
@@ -413,3 +416,202 @@ func TestCatchpointTrackerPrepareCommit(t *testing.T) {
}
}
}
+
+// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
+type blockingTracker struct {
+ postCommitUnlockedEntryLock chan struct{}
+ postCommitUnlockedReleaseLock chan struct{}
+ postCommitEntryLock chan struct{}
+ postCommitReleaseLock chan struct{}
+ committedUpToRound int64
+}
+
+// loadFromDisk is not implemented in the blockingTracker.
+func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error {
+ return nil
+}
+
+// newBlock is not implemented in the blockingTracker.
+func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
+}
+
+// committedUpTo in the blockingTracker just stores the committed round.
+func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
+ atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd))
+ return committedRnd, basics.Round(0)
+}
+
+// produceCommittingTask is not used by the blockingTracker
+func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
+ return dcr
+}
+
+// prepareCommit, is not used by the blockingTracker
+func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error {
+ return nil
+}
+
+// commitRound is not used by the blockingTracker
+func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommitContext) error {
+ return nil
+}
+
+// postCommit implements entry/exit blockers, designed for testing.
+func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
+ if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
+ bt.postCommitEntryLock <- struct{}{}
+ <-bt.postCommitReleaseLock
+ }
+}
+
+// postCommitUnlocked implements entry/exit blockers, designed for testing.
+func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
+ if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
+ bt.postCommitUnlockedEntryLock <- struct{}{}
+ <-bt.postCommitUnlockedReleaseLock
+ }
+}
+
+// handleUnorderedCommit is not used by the blockingTracker
+func (bt *blockingTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
+}
+
+// close is not used by the blockingTracker
+func (bt *blockingTracker) close() {
+}
+
+func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 10)
+ const inMem = true
+ log := logging.TestingLog(t)
+ log.SetLevel(logging.Warn)
+ cfg := config.GetDefaultLocal()
+ cfg.Archival = true
+ cfg.CatchpointInterval = 2
+ ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
+ require.NoError(t, err, "could not open ledger")
+ defer ledger.Close()
+
+ writeStallingTracker := &blockingTracker{
+ postCommitUnlockedEntryLock: make(chan struct{}),
+ postCommitUnlockedReleaseLock: make(chan struct{}),
+ postCommitEntryLock: make(chan struct{}),
+ postCommitReleaseLock: make(chan struct{}),
+ }
+ ledger.trackerMu.Lock()
+ ledger.trackers.mu.Lock()
+ ledger.trackers.trackers = append(ledger.trackers.trackers, writeStallingTracker)
+ ledger.trackers.mu.Unlock()
+ ledger.trackerMu.Unlock()
+
+ proto := config.Consensus[protocol.ConsensusCurrentVersion]
+
+ // create the first MaxBalLookback blocks
+ for rnd := ledger.Latest() + 1; rnd <= basics.Round(proto.MaxBalLookback); rnd++ {
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ }
+
+ // make sure to get to a catchpoint round, and block the writing there.
+ for {
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 {
+ // release the entry lock for postCommit
+ <-writeStallingTracker.postCommitEntryLock
+
+ // release the exit lock for postCommit
+ writeStallingTracker.postCommitReleaseLock <- struct{}{}
+
+ // wait until we're blocked by the stalling tracker.
+ <-writeStallingTracker.postCommitUnlockedEntryLock
+ break
+ }
+ }
+
+ // write additional block, so that the block queue would trigger that too
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ // wait for the committedUpToRound to be called with the correct round number.
+ for {
+ committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound)
+ if basics.Round(committedUpToRound) == ledger.Latest() {
+ break
+ }
+ time.Sleep(1 * time.Millisecond)
+ }
+
+ lookupDone := make(chan struct{})
+ // now that we're blocked the tracker, try to call LookupAgreement and confirm it returns almost immediately
+ go func() {
+ defer close(lookupDone)
+ ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink)
+ }()
+
+ select {
+ case <-lookupDone:
+ // we expect it not to get stuck, even when the postCommitUnlocked is stuck.
+ case <-time.After(25 * time.Second):
+ require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker")
+ }
+ // let the goroutines complete.
+ // release the exit lock for postCommit
+ writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
+
+ // test false positive : we want to ensure that without releasing the postCommit lock, the LookupAgreemnt would not be able to return within 1 second.
+
+ // make sure to get to a catchpoint round, and block the writing there.
+ for {
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 {
+ // release the entry lock for postCommit
+ <-writeStallingTracker.postCommitEntryLock
+ break
+ }
+ }
+ // write additional block, so that the block queue would trigger that too
+ err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
+ require.NoError(t, err)
+ // wait for the committedUpToRound to be called with the correct round number.
+ for {
+ committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound)
+ if basics.Round(committedUpToRound) == ledger.Latest() {
+ break
+ }
+ time.Sleep(1 * time.Millisecond)
+ }
+
+ lookupDone = make(chan struct{})
+ // now that we're blocked the tracker, try to call LookupAgreement and confirm it's not returning within 1 second.
+ go func() {
+ defer close(lookupDone)
+ ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink)
+ }()
+
+ select {
+ case <-lookupDone:
+ require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker")
+ case <-time.After(5 * time.Second):
+ // this one was "stuck" for over five second ( as expected )
+ }
+ // let the goroutines complete.
+ // release the exit lock for postCommit
+ writeStallingTracker.postCommitReleaseLock <- struct{}{}
+
+ // wait until we're blocked by the stalling tracker.
+ <-writeStallingTracker.postCommitUnlockedEntryLock
+ // release the blocker.
+ writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
+
+ // confirm that we get released quickly.
+ select {
+ case <-lookupDone:
+ // now that all the blocker have been removed, we should be able to complete
+ // the LookupAgreement call.
+ case <-time.After(30 * time.Second):
+ require.FailNow(t, "The LookupAgreement wasn't getting release as expected by the blocked tracker")
+ }
+}
diff --git a/ledger/metrics.go b/ledger/metrics.go
index 55a84d563..8c7e5dcc7 100644
--- a/ledger/metrics.go
+++ b/ledger/metrics.go
@@ -65,6 +65,9 @@ func (mt *metricsTracker) commitRound(context.Context, *sql.Tx, *deferredCommitC
func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}
+func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
+}
+
func (mt *metricsTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
}
func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
diff --git a/ledger/notifier.go b/ledger/notifier.go
index e922c73e1..db1347140 100644
--- a/ledger/notifier.go
+++ b/ledger/notifier.go
@@ -125,6 +125,9 @@ func (bn *blockNotifier) commitRound(context.Context, *sql.Tx, *deferredCommitCo
func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}
+func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
+}
+
func (bn *blockNotifier) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
}
diff --git a/ledger/tracker.go b/ledger/tracker.go
index 855995665..44a255b03 100644
--- a/ledger/tracker.go
+++ b/ledger/tracker.go
@@ -109,6 +109,11 @@ type ledgerTracker interface {
// has completed. An optional context is provided for long-running operations.
postCommit(context.Context, *deferredCommitContext)
+ // postCommitUnlocked is called only on a successful commitRound. In that case, each of the trackers have
+ // the chance to make changes that aren't state-dependent.
+ // An optional context is provided for long-running operations.
+ postCommitUnlocked(context.Context, *deferredCommitContext)
+
// handleUnorderedCommit is a special method for handling deferred commits that are out of order.
// Tracker might update own state in this case. For example, account updates tracker cancels
// scheduled catchpoint writing that deferred commit.
@@ -323,6 +328,8 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round)
}
if cdr != nil {
dcc.deferredCommitRange = *cdr
+ } else {
+ dcc = nil
}
tr.mu.RLock()
@@ -472,6 +479,10 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) {
tr.lastFlushTime = dcc.flushTime
tr.mu.Unlock()
+ for _, lt := range tr.trackers {
+ lt.postCommitUnlocked(tr.ctx, dcc)
+ }
+
}
// initializeTrackerCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ).
diff --git a/ledger/txtail.go b/ledger/txtail.go
index a5d77e49c..4d596396f 100644
--- a/ledger/txtail.go
+++ b/ledger/txtail.go
@@ -168,6 +168,9 @@ func (t *txTail) commitRound(context.Context, *sql.Tx, *deferredCommitContext) e
func (t *txTail) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}
+func (t *txTail) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
+}
+
func (t *txTail) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
}