summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Broderick <118225939+bbroder-algo@users.noreply.github.com>2023-05-24 14:12:46 -0400
committerBob Broderick <118225939+bbroder-algo@users.noreply.github.com>2023-05-24 14:12:46 -0400
commitef9a8d68987795b9f497af19e037958dc0950e7d (patch)
tree7733d04f984d10bea67fd75467bdfa6a7b9d5404
parent593009447b4d85562667cd89ef23431bd3599e75 (diff)
backcommpavel_fix
-rw-r--r--catchup/service.go11
-rw-r--r--ledger/catchpointtracker.go5
-rw-r--r--ledger/ledger.go12
-rw-r--r--ledger/ledgercore/votersForRound.go7
-rw-r--r--node/node.go1
5 files changed, 36 insertions, 0 deletions
diff --git a/catchup/service.go b/catchup/service.go
index ac5fa730d..be0d419eb 100644
--- a/catchup/service.go
+++ b/catchup/service.go
@@ -184,6 +184,17 @@ func (s *Service) UnsetDisableSyncRound() {
s.triggerSync()
}
+func (s *Service) OnFinishedWriting() {
+// s.suspendForCatchpointWriting = s.ledger.IsWritingCatchpointDataFile()
+ if s.suspendForCatchpointWriting {
+ s.suspendForCatchpointWriting = false
+ select {
+ case s.syncNow <- struct{}{}:
+ default:
+ }
+ }
+}
+
// GetDisableSyncRound returns the disabled sync round
func (s *Service) GetDisableSyncRound() uint64 {
return atomic.LoadUint64(&s.disableSyncRound)
diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go
index bb437e977..6c864f693 100644
--- a/ledger/catchpointtracker.go
+++ b/ledger/catchpointtracker.go
@@ -152,6 +152,8 @@ type catchpointTracker struct {
// catchpointsMu protects `roundDigest`, `reenableCatchpointsRound` and
// `lastCatchpointLabel`.
catchpointsMu deadlock.RWMutex
+
+ catchup ledgercore.CatchupServiceListener
}
// initialize initializes the catchpointTracker structure
@@ -222,6 +224,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic
totalKVs, totalAccounts, totalChunks, biggestChunkLen, spVerificationHash, err = ct.generateCatchpointData(
ctx, dbRound, &catchpointGenerationStats)
atomic.StoreInt32(&ct.catchpointDataWriting, 0)
+ ct.catchup.OnFinishedWriting()
if err != nil {
return err
}
@@ -510,6 +513,7 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx trackerdb.Trans
defer func() {
if err != nil && dcc.catchpointFirstStage && ct.enableGeneratingCatchpointFiles {
atomic.StoreInt32(&ct.catchpointDataWriting, 0)
+ ct.catchup.OnFinishedWriting()
}
}()
@@ -939,6 +943,7 @@ func (ct *catchpointTracker) handleUnorderedCommitOrError(dcc *deferredCommitCon
if dcc.catchpointFirstStage {
// it was a catchpoint round, so update the catchpointWriting to indicate that we're done.
atomic.StoreInt32(&ct.catchpointDataWriting, 0)
+ ct.catchup.OnFinishedWriting()
}
}
}
diff --git a/ledger/ledger.go b/ledger/ledger.go
index 0c7747087..a122f44fb 100644
--- a/ledger/ledger.go
+++ b/ledger/ledger.go
@@ -426,6 +426,18 @@ func (l *Ledger) UnregisterVotersCommitListener() {
l.acctsOnline.voters.unregisterPrepareCommitListener()
}
+func (l *Ledger) RegisterCatchupServiceListener(listener ledgercore.CatchupServiceListener) {
+ l.trackerMu.RLock()
+ defer l.trackerMu.RUnlock()
+ l.catchpoint.catchup = listener
+}
+func (l *Ledger) UnregisterCatchupServiceListener() {
+ l.trackerMu.RLock()
+ defer l.trackerMu.RUnlock()
+ l.catchpoint.catchup = nil
+}
+
+
// notifyCommit informs the trackers that all blocks up to r have been
// written to disk. Returns the minimum block number that must be kept
// in the database.
diff --git a/ledger/ledgercore/votersForRound.go b/ledger/ledgercore/votersForRound.go
index 94901bf20..ca6590bc2 100644
--- a/ledger/ledgercore/votersForRound.go
+++ b/ledger/ledgercore/votersForRound.go
@@ -45,6 +45,13 @@ type LedgerForSPBuilder interface {
BlockHdr(basics.Round) (bookkeeping.BlockHeader, error)
}
+// CatchupServiceListener represents an object that needs to get notified on commit stages in the voters tracker.
+type CatchupServiceListener interface {
+ // OnPrepareVoterCommit gives the listener the opportunity to backup VotersForRound data related to rounds (oldBase, newBase] before it is being removed.
+ // The implementation should log any errors that might occur.
+ OnFinishedWriting()
+}
+
// VotersCommitListener represents an object that needs to get notified on commit stages in the voters tracker.
type VotersCommitListener interface {
// OnPrepareVoterCommit gives the listener the opportunity to backup VotersForRound data related to rounds (oldBase, newBase] before it is being removed.
diff --git a/node/node.go b/node/node.go
index 3ff616da6..2736c8933 100644
--- a/node/node.go
+++ b/node/node.go
@@ -293,6 +293,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
node.catchupBlockAuth = blockAuthenticatorImpl{Ledger: node.ledger, AsyncVoteVerifier: agreement.MakeAsyncVoteVerifier(node.lowPriorityCryptoVerificationPool)}
node.catchupService = catchup.MakeService(node.log, node.config, p2pNode, node.ledger, node.catchupBlockAuth, agreementLedger.UnmatchedPendingCertificates, node.lowPriorityCryptoVerificationPool)
+ node.ledger.RegisterCatchupServiceListener (node.catchupService)
node.txPoolSyncerService = rpcs.MakeTxSyncer(node.transactionPool, node.net, node.txHandler.SolicitedTxHandler(), time.Duration(cfg.TxSyncIntervalSeconds)*time.Second, time.Duration(cfg.TxSyncTimeoutSeconds)*time.Second, cfg.TxSyncServeResponseSize)
registry, err := ensureParticipationDB(genesisDir, node.log)