diff options
author | Bob Broderick <118225939+bbroder-algo@users.noreply.github.com> | 2023-05-24 14:12:46 -0400 |
---|---|---|
committer | Bob Broderick <118225939+bbroder-algo@users.noreply.github.com> | 2023-05-24 14:12:46 -0400 |
commit | ef9a8d68987795b9f497af19e037958dc0950e7d (patch) | |
tree | 7733d04f984d10bea67fd75467bdfa6a7b9d5404 | |
parent | 593009447b4d85562667cd89ef23431bd3599e75 (diff) |
backcommpavel_fix
-rw-r--r-- | catchup/service.go | 11 | ||||
-rw-r--r-- | ledger/catchpointtracker.go | 5 | ||||
-rw-r--r-- | ledger/ledger.go | 12 | ||||
-rw-r--r-- | ledger/ledgercore/votersForRound.go | 7 | ||||
-rw-r--r-- | node/node.go | 1 |
5 files changed, 36 insertions, 0 deletions
diff --git a/catchup/service.go b/catchup/service.go index ac5fa730d..be0d419eb 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -184,6 +184,17 @@ func (s *Service) UnsetDisableSyncRound() { s.triggerSync() } +func (s *Service) OnFinishedWriting() { +// s.suspendForCatchpointWriting = s.ledger.IsWritingCatchpointDataFile() + if s.suspendForCatchpointWriting { + s.suspendForCatchpointWriting = false + select { + case s.syncNow <- struct{}{}: + default: + } + } +} + // GetDisableSyncRound returns the disabled sync round func (s *Service) GetDisableSyncRound() uint64 { return atomic.LoadUint64(&s.disableSyncRound) diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index bb437e977..6c864f693 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -152,6 +152,8 @@ type catchpointTracker struct { // catchpointsMu protects `roundDigest`, `reenableCatchpointsRound` and // `lastCatchpointLabel`. catchpointsMu deadlock.RWMutex + + catchup ledgercore.CatchupServiceListener } // initialize initializes the catchpointTracker structure @@ -222,6 +224,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic totalKVs, totalAccounts, totalChunks, biggestChunkLen, spVerificationHash, err = ct.generateCatchpointData( ctx, dbRound, &catchpointGenerationStats) atomic.StoreInt32(&ct.catchpointDataWriting, 0) + ct.catchup.OnFinishedWriting() if err != nil { return err } @@ -510,6 +513,7 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx trackerdb.Trans defer func() { if err != nil && dcc.catchpointFirstStage && ct.enableGeneratingCatchpointFiles { atomic.StoreInt32(&ct.catchpointDataWriting, 0) + ct.catchup.OnFinishedWriting() } }() @@ -939,6 +943,7 @@ func (ct *catchpointTracker) handleUnorderedCommitOrError(dcc *deferredCommitCon if dcc.catchpointFirstStage { // it was a catchpoint round, so update the catchpointWriting to indicate that we're done. atomic.StoreInt32(&ct.catchpointDataWriting, 0) + ct.catchup.OnFinishedWriting() } } } diff --git a/ledger/ledger.go b/ledger/ledger.go index 0c7747087..a122f44fb 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -426,6 +426,18 @@ func (l *Ledger) UnregisterVotersCommitListener() { l.acctsOnline.voters.unregisterPrepareCommitListener() } +func (l *Ledger) RegisterCatchupServiceListener(listener ledgercore.CatchupServiceListener) { + l.trackerMu.RLock() + defer l.trackerMu.RUnlock() + l.catchpoint.catchup = listener +} +func (l *Ledger) UnregisterCatchupServiceListener() { + l.trackerMu.RLock() + defer l.trackerMu.RUnlock() + l.catchpoint.catchup = nil +} + + // notifyCommit informs the trackers that all blocks up to r have been // written to disk. Returns the minimum block number that must be kept // in the database. diff --git a/ledger/ledgercore/votersForRound.go b/ledger/ledgercore/votersForRound.go index 94901bf20..ca6590bc2 100644 --- a/ledger/ledgercore/votersForRound.go +++ b/ledger/ledgercore/votersForRound.go @@ -45,6 +45,13 @@ type LedgerForSPBuilder interface { BlockHdr(basics.Round) (bookkeeping.BlockHeader, error) } +// CatchupServiceListener represents an object that needs to get notified on commit stages in the voters tracker. +type CatchupServiceListener interface { + // OnPrepareVoterCommit gives the listener the opportunity to backup VotersForRound data related to rounds (oldBase, newBase] before it is being removed. + // The implementation should log any errors that might occur. + OnFinishedWriting() +} + // VotersCommitListener represents an object that needs to get notified on commit stages in the voters tracker. type VotersCommitListener interface { // OnPrepareVoterCommit gives the listener the opportunity to backup VotersForRound data related to rounds (oldBase, newBase] before it is being removed. diff --git a/node/node.go b/node/node.go index 3ff616da6..2736c8933 100644 --- a/node/node.go +++ b/node/node.go @@ -293,6 +293,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd node.catchupBlockAuth = blockAuthenticatorImpl{Ledger: node.ledger, AsyncVoteVerifier: agreement.MakeAsyncVoteVerifier(node.lowPriorityCryptoVerificationPool)} node.catchupService = catchup.MakeService(node.log, node.config, p2pNode, node.ledger, node.catchupBlockAuth, agreementLedger.UnmatchedPendingCertificates, node.lowPriorityCryptoVerificationPool) + node.ledger.RegisterCatchupServiceListener (node.catchupService) node.txPoolSyncerService = rpcs.MakeTxSyncer(node.transactionPool, node.net, node.txHandler.SolicitedTxHandler(), time.Duration(cfg.TxSyncIntervalSeconds)*time.Second, time.Duration(cfg.TxSyncTimeoutSeconds)*time.Second, cfg.TxSyncServeResponseSize) registry, err := ensureParticipationDB(genesisDir, node.log) |