summaryrefslogtreecommitdiff
path: root/ledger/tracker.go
diff options
context:
space:
mode:
Diffstat (limited to 'ledger/tracker.go')
-rw-r--r--ledger/tracker.go552
1 files changed, 527 insertions, 25 deletions
diff --git a/ledger/tracker.go b/ledger/tracker.go
index 40dd725a5..855995665 100644
--- a/ledger/tracker.go
+++ b/ledger/tracker.go
@@ -17,16 +17,24 @@
package ledger
import (
+ "context"
+ "database/sql"
+ "errors"
"fmt"
"reflect"
+ "sync"
+ "time"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
+ "github.com/algorand/go-algorand/ledger/internal"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
+ "github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/util/db"
+ "github.com/algorand/go-deadlock"
)
// ledgerTracker defines part of the API for any state machine that
@@ -55,26 +63,56 @@ type ledgerTracker interface {
// blocks from the database, or access its own state. The
// ledgerForTracker interface abstracts away the details of
// ledger internals so that individual trackers can be tested
- // in isolation.
- loadFromDisk(ledgerForTracker) error
+ // in isolation. The provided round number represents the
+ // current accounts storage round number.
+ loadFromDisk(ledgerForTracker, basics.Round) error
- // newBlock informs the tracker of a new block from round
- // rnd and a given ledgercore.StateDelta as produced by BlockEvaluator.
+ // newBlock informs the tracker of a new block along with
+ // a given ledgercore.StateDelta as produced by BlockEvaluator.
newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta)
- // committedUpTo informs the tracker that the database has
+ // committedUpTo informs the tracker that the block database has
// committed all blocks up to and including rnd to persistent
- // storage (the SQL database). This can allow the tracker
+ // storage. This can allow the tracker
// to garbage-collect state that will not be needed.
//
// committedUpTo() returns the round number of the earliest
- // block that this tracker needs to be stored in the ledger
- // for subsequent calls to loadFromDisk(). All blocks with
- // round numbers before that may be deleted to save space,
- // and the tracker is expected to still function after a
- // restart and a call to loadFromDisk(). For example,
- // returning 0 means that no blocks can be deleted.
- committedUpTo(basics.Round) basics.Round
+ // block that this tracker needs to be stored in the block
+ // database for subsequent calls to loadFromDisk().
+ // All blocks with round numbers before that may be deleted to
+ // save space, and the tracker is expected to still function
+ // after a restart and a call to loadFromDisk().
+ // For example, returning 0 means that no blocks can be deleted.
+ // Separetly, the method returns the lookback that is being
+ // maintained by the tracker.
+ committedUpTo(basics.Round) (minRound, lookback basics.Round)
+
+ // produceCommittingTask prepares a deferredCommitRange; Preparing a deferredCommitRange is a joint
+ // effort, and all the trackers contribute to that effort. All the trackers are being handed a
+ // pointer to the deferredCommitRange, and have the ability to either modify it, or return a
+ // nil. If nil is returned, the commit would be skipped.
+ produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange
+
+ // prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data.
+ // If an error returned the process is aborted.
+
+ // prepareCommit aligns the data structures stored in the deferredCommitContext with the current
+ // state of the tracker. It allows the tracker to decide what data is going to be persisted
+ // on the coming commitRound.
+ prepareCommit(*deferredCommitContext) error
+ // commitRound is called for each of the trackers after a deferredCommitContext was agreed upon
+ // by all the prepareCommit calls. The commitRound is being executed within a single transactional
+ // context, and so, if any of the tracker's commitRound calls fails, the transaction is rolled back.
+ commitRound(context.Context, *sql.Tx, *deferredCommitContext) error
+ // postCommit is called only on a successful commitRound. In that case, each of the trackers have
+ // the chance to update it's internal data structures, knowing that the given deferredCommitContext
+ // has completed. An optional context is provided for long-running operations.
+ postCommit(context.Context, *deferredCommitContext)
+
+ // handleUnorderedCommit is a special method for handling deferred commits that are out of order.
+ // Tracker might update own state in this case. For example, account updates tracker cancels
+ // scheduled catchpoint writing that deferred commit.
+ handleUnorderedCommit(uint64, basics.Round, basics.Round)
// close terminates the tracker, reclaiming any resources
// like open database connections or goroutines. close may
@@ -89,26 +127,142 @@ type ledgerForTracker interface {
trackerDB() db.Pair
blockDB() db.Pair
trackerLog() logging.Logger
- trackerEvalVerified(bookkeeping.Block, ledgerForEvaluator) (ledgercore.StateDelta, error)
+ trackerEvalVerified(bookkeeping.Block, internal.LedgerForEvaluator) (ledgercore.StateDelta, error)
Latest() basics.Round
Block(basics.Round) (bookkeeping.Block, error)
BlockHdr(basics.Round) (bookkeeping.BlockHeader, error)
GenesisHash() crypto.Digest
GenesisProto() config.ConsensusParams
+ GenesisAccounts() map[basics.Address]basics.AccountData
}
type trackerRegistry struct {
trackers []ledgerTracker
+ // the accts has some exceptional usages in the tracker registry.
+ accts *accountUpdates
+
+ // ctx is the context for the committing go-routine.
+ ctx context.Context
+ // ctxCancel is the canceling function for canceling the committing go-routine ( i.e. signaling the committing go-routine that it's time to abort )
+ ctxCancel context.CancelFunc
+
+ // deferredCommits is the channel of pending deferred commits
+ deferredCommits chan *deferredCommitContext
+
+ // commitSyncerClosed is the blocking channel for synchronizing closing the commitSyncer goroutine. Once it's closed, the
+ // commitSyncer can be assumed to have aborted.
+ commitSyncerClosed chan struct{}
+
+ // accountsWriting provides synchronization around the background writing of account balances.
+ accountsWriting sync.WaitGroup
+
+ // dbRound is always exactly accountsRound(),
+ // cached to avoid SQL queries.
+ dbRound basics.Round
+
+ dbs db.Pair
+ log logging.Logger
+
+ // the synchronous mode that would be used for the account database.
+ synchronousMode db.SynchronousMode
+
+ // the synchronous mode that would be used while the accounts database is being rebuilt.
+ accountsRebuildSynchronousMode db.SynchronousMode
+
+ mu deadlock.RWMutex
+
+ // lastFlushTime is the time we last flushed updates to
+ // the accounts DB (bumping dbRound).
+ lastFlushTime time.Time
+}
+
+// deferredCommitRange is used during the calls to produceCommittingTask, and used as a data structure
+// to syncronize the various trackers and create a uniformity around which rounds need to be persisted
+// next.
+type deferredCommitRange struct {
+ offset uint64
+ oldBase basics.Round
+ lookback basics.Round
+
+ // pendingDeltas is the number of accounts that were modified within this commit context.
+ // note that in this number we might have the same account being modified several times.
+ pendingDeltas int
+
+ isCatchpointRound bool
+
+ // catchpointWriting is a pointer to a varible with the same name in the catchpointTracker.
+ // it's used in order to reset the catchpointWriting flag from the acctupdates's
+ // prepareCommit/commitRound ( which is called before the corresponding catchpoint tracker method )
+ catchpointWriting *int32
}
-func (tr *trackerRegistry) register(lt ledgerTracker) {
- tr.trackers = append(tr.trackers, lt)
+// deferredCommitContext is used in order to syncornize the persistence of a given deferredCommitRange.
+// prepareCommit, commitRound and postCommit are all using it to exchange data.
+type deferredCommitContext struct {
+ deferredCommitRange
+
+ newBase basics.Round
+ flushTime time.Time
+
+ genesisProto config.ConsensusParams
+
+ deltas []ledgercore.AccountDeltas
+ roundTotals ledgercore.AccountTotals
+ compactAccountDeltas compactAccountDeltas
+ compactCreatableDeltas map[basics.CreatableIndex]ledgercore.ModifiedCreatable
+
+ updatedPersistedAccounts []persistedAccountData
+
+ committedRoundDigest crypto.Digest
+ trieBalancesHash crypto.Digest
+ updatingBalancesDuration time.Duration
+ catchpointLabel string
+
+ stats telemetryspec.AccountsUpdateMetrics
+ updateStats bool
+}
+
+var errMissingAccountUpdateTracker = errors.New("initializeTrackerCaches : called without a valid accounts update tracker")
+
+func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTracker, cfg config.Local) (err error) {
+ tr.dbs = l.trackerDB()
+ tr.log = l.trackerLog()
+
+ err = tr.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
+ tr.dbRound, err = accountsRound(tx)
+ return err
+ })
+
+ if err != nil {
+ return err
+ }
+
+ tr.ctx, tr.ctxCancel = context.WithCancel(context.Background())
+ tr.deferredCommits = make(chan *deferredCommitContext, 1)
+ tr.commitSyncerClosed = make(chan struct{})
+ tr.synchronousMode = db.SynchronousMode(cfg.LedgerSynchronousMode)
+ tr.accountsRebuildSynchronousMode = db.SynchronousMode(cfg.AccountsRebuildSynchronousMode)
+ go tr.commitSyncer(tr.deferredCommits)
+
+ tr.trackers = append([]ledgerTracker{}, trackers...)
+
+ for _, tracker := range tr.trackers {
+ if accts, ok := tracker.(*accountUpdates); ok {
+ tr.accts = accts
+ break
+ }
+ }
+ return
}
func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error {
+ tr.mu.RLock()
+ dbRound := tr.dbRound
+ tr.mu.RUnlock()
+
for _, lt := range tr.trackers {
- err := lt.loadFromDisk(l)
+ err := lt.loadFromDisk(l, dbRound)
if err != nil {
// find the tracker name.
trackerName := reflect.TypeOf(lt).String()
@@ -116,34 +270,382 @@ func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error {
}
}
- return nil
+ err := tr.initializeTrackerCaches(l)
+ if err != nil {
+ return err
+ }
+ // the votes have a special dependency on the account updates, so we need to initialize these separetly.
+ tr.accts.voters = &votersTracker{}
+ err = tr.accts.voters.loadFromDisk(l, tr.accts)
+ return err
}
func (tr *trackerRegistry) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
for _, lt := range tr.trackers {
lt.newBlock(blk, delta)
}
- if len(tr.trackers) == 0 {
- fmt.Printf("trackerRegistry::newBlock - no trackers (%d)\n", blk.Round())
- }
}
func (tr *trackerRegistry) committedUpTo(rnd basics.Round) basics.Round {
minBlock := rnd
-
+ maxLookback := basics.Round(0)
for _, lt := range tr.trackers {
- retain := lt.committedUpTo(rnd)
- if retain < minBlock {
- minBlock = retain
+ retainRound, lookback := lt.committedUpTo(rnd)
+ if retainRound < minBlock {
+ minBlock = retainRound
+ }
+ if lookback > maxLookback {
+ maxLookback = lookback
}
}
+ tr.scheduleCommit(rnd, maxLookback)
+
return minBlock
}
+func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round) {
+ tr.mu.RLock()
+ dbRound := tr.dbRound
+ tr.mu.RUnlock()
+
+ dcc := &deferredCommitContext{
+ deferredCommitRange: deferredCommitRange{
+ lookback: maxLookback,
+ },
+ }
+ cdr := &dcc.deferredCommitRange
+ for _, lt := range tr.trackers {
+ cdr = lt.produceCommittingTask(blockqRound, dbRound, cdr)
+ if cdr == nil {
+ break
+ }
+ }
+ if cdr != nil {
+ dcc.deferredCommitRange = *cdr
+ }
+
+ tr.mu.RLock()
+ // If we recently flushed, wait to aggregate some more blocks.
+ // ( unless we're creating a catchpoint, in which case we want to flush it right away
+ // so that all the instances of the catchpoint would contain exactly the same data )
+ flushTime := time.Now()
+ if dcc != nil && !flushTime.After(tr.lastFlushTime.Add(balancesFlushInterval)) && !dcc.isCatchpointRound && dcc.pendingDeltas < pendingDeltasFlushThreshold {
+ dcc = nil
+ }
+ tr.mu.RUnlock()
+
+ if dcc != nil {
+ tr.accountsWriting.Add(1)
+ tr.deferredCommits <- dcc
+ }
+}
+
+// waitAccountsWriting waits for all the pending ( or current ) account writing to be completed.
+func (tr *trackerRegistry) waitAccountsWriting() {
+ tr.accountsWriting.Wait()
+}
+
func (tr *trackerRegistry) close() {
+ if tr.ctxCancel != nil {
+ tr.ctxCancel()
+ }
+
+ // close() is called from reloadLedger() when and trackerRegistry is not initialized yet
+ if tr.commitSyncerClosed != nil {
+ tr.waitAccountsWriting()
+ // this would block until the commitSyncerClosed channel get closed.
+ <-tr.commitSyncerClosed
+ }
+
for _, lt := range tr.trackers {
lt.close()
}
tr.trackers = nil
+ tr.accts = nil
+}
+
+// commitSyncer is the syncer go-routine function which perform the database updates. Internally, it dequeues deferredCommits and
+// send the tasks to commitRound for completing the operation.
+func (tr *trackerRegistry) commitSyncer(deferredCommits chan *deferredCommitContext) {
+ defer close(tr.commitSyncerClosed)
+ for {
+ select {
+ case commit, ok := <-deferredCommits:
+ if !ok {
+ return
+ }
+ tr.commitRound(commit)
+ case <-tr.ctx.Done():
+ // drain the pending commits queue:
+ drained := false
+ for !drained {
+ select {
+ case <-deferredCommits:
+ tr.accountsWriting.Done()
+ default:
+ drained = true
+ }
+ }
+ return
+ }
+ }
+}
+
+// commitRound commits the given deferredCommitContext via the trackers.
+func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) {
+ defer tr.accountsWriting.Done()
+ tr.mu.RLock()
+
+ offset := dcc.offset
+ dbRound := dcc.oldBase
+ lookback := dcc.lookback
+
+ // we can exit right away, as this is the result of mis-ordered call to committedUpTo.
+ if tr.dbRound < dbRound || offset < uint64(tr.dbRound-dbRound) {
+ tr.log.Warnf("out of order deferred commit: offset %d, dbRound %d but current tracker DB round is %d", offset, dbRound, tr.dbRound)
+ for _, lt := range tr.trackers {
+ lt.handleUnorderedCommit(offset, dbRound, lookback)
+ }
+ tr.mu.RUnlock()
+ return
+ }
+
+ // adjust the offset according to what happened meanwhile..
+ offset -= uint64(tr.dbRound - dbRound)
+
+ // if this iteration need to flush out zero rounds, just return right away.
+ // this usecase can happen when two subsequent calls to committedUpTo concludes that the same rounds range need to be
+ // flush, without the commitRound have a chance of committing these rounds.
+ if offset == 0 {
+ tr.mu.RUnlock()
+ return
+ }
+
+ dbRound = tr.dbRound
+ newBase := basics.Round(offset) + dbRound
+
+ dcc.offset = offset
+ dcc.oldBase = dbRound
+ dcc.newBase = newBase
+ dcc.flushTime = time.Now()
+
+ for _, lt := range tr.trackers {
+ err := lt.prepareCommit(dcc)
+ if err != nil {
+ tr.log.Errorf(err.Error())
+ tr.mu.RUnlock()
+ return
+ }
+ }
+ tr.mu.RUnlock()
+
+ start := time.Now()
+ ledgerCommitroundCount.Inc(nil)
+ err := tr.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
+ for _, lt := range tr.trackers {
+ err0 := lt.commitRound(ctx, tx, dcc)
+ if err0 != nil {
+ return err0
+ }
+ }
+
+ err = updateAccountsRound(tx, dbRound+basics.Round(offset))
+ if err != nil {
+ return err
+ }
+
+ return nil
+ })
+ ledgerCommitroundMicros.AddMicrosecondsSince(start, nil)
+
+ if err != nil {
+ tr.log.Warnf("unable to advance tracker db snapshot (%d-%d): %v", dbRound, dbRound+basics.Round(offset), err)
+ return
+ }
+
+ tr.mu.Lock()
+ tr.dbRound = newBase
+ for _, lt := range tr.trackers {
+ lt.postCommit(tr.ctx, dcc)
+ }
+ tr.lastFlushTime = dcc.flushTime
+ tr.mu.Unlock()
+
+}
+
+// initializeTrackerCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ).
+// the method also support balances recovery in cases where the difference between the lastBalancesRound and the lastestBlockRound
+// is far greater than 320; in these cases, it would flush to disk periodically in order to avoid high memory consumption.
+func (tr *trackerRegistry) initializeTrackerCaches(l ledgerForTracker) (err error) {
+ lastestBlockRound := l.Latest()
+ lastBalancesRound := tr.dbRound
+
+ var blk bookkeeping.Block
+ var delta ledgercore.StateDelta
+
+ if tr.accts == nil {
+ return errMissingAccountUpdateTracker
+ }
+
+ accLedgerEval := accountUpdatesLedgerEvaluator{
+ au: tr.accts,
+ }
+
+ if lastBalancesRound < lastestBlockRound {
+ accLedgerEval.prevHeader, err = l.BlockHdr(lastBalancesRound)
+ if err != nil {
+ return err
+ }
+ }
+
+ skipAccountCacheMessage := make(chan struct{})
+ writeAccountCacheMessageCompleted := make(chan struct{})
+ defer func() {
+ close(skipAccountCacheMessage)
+ select {
+ case <-writeAccountCacheMessageCompleted:
+ if err == nil {
+ tr.log.Infof("initializeTrackerCaches completed initializing account data caches")
+ }
+ default:
+ }
+ }()
+
+ catchpointInterval := uint64(0)
+ for _, tracker := range tr.trackers {
+ if catchpointTracker, ok := tracker.(*catchpointTracker); ok {
+ catchpointInterval = catchpointTracker.catchpointInterval
+ break
+ }
+ }
+
+ // this goroutine logs a message once if the parent function have not completed in initializingAccountCachesMessageTimeout seconds.
+ // the message is important, since we're blocking on the ledger block database here, and we want to make sure that we log a message
+ // within the above timeout.
+ go func() {
+ select {
+ case <-time.After(initializingAccountCachesMessageTimeout):
+ tr.log.Infof("initializeTrackerCaches is initializing account data caches")
+ close(writeAccountCacheMessageCompleted)
+ case <-skipAccountCacheMessage:
+ }
+ }()
+
+ blocksStream := make(chan bookkeeping.Block, initializeCachesReadaheadBlocksStream)
+ blockEvalFailed := make(chan struct{}, 1)
+ var blockRetrievalError error
+ go func() {
+ defer close(blocksStream)
+ for roundNumber := lastBalancesRound + 1; roundNumber <= lastestBlockRound; roundNumber++ {
+ blk, blockRetrievalError = l.Block(roundNumber)
+ if blockRetrievalError != nil {
+ return
+ }
+ select {
+ case blocksStream <- blk:
+ case <-blockEvalFailed:
+ return
+ }
+ }
+ }()
+
+ lastFlushedRound := lastBalancesRound
+ const accountsCacheLoadingMessageInterval = 5 * time.Second
+ lastProgressMessage := time.Now().Add(-accountsCacheLoadingMessageInterval / 2)
+
+ // rollbackSynchronousMode ensures that we switch to "fast writing mode" when we start flushing out rounds to disk, and that
+ // we exit this mode when we're done.
+ rollbackSynchronousMode := false
+ defer func() {
+ if rollbackSynchronousMode {
+ // restore default synchronous mode
+ err0 := tr.dbs.Wdb.SetSynchronousMode(context.Background(), tr.synchronousMode, tr.synchronousMode >= db.SynchronousModeFull)
+ // override the returned error only in case there is no error - since this
+ // operation has a lower criticality.
+ if err == nil {
+ err = err0
+ }
+ }
+ }()
+
+ for blk := range blocksStream {
+ delta, err = l.trackerEvalVerified(blk, &accLedgerEval)
+ if err != nil {
+ close(blockEvalFailed)
+ return
+ }
+ tr.newBlock(blk, delta)
+
+ // flush to disk if any of the following applies:
+ // 1. if we have loaded up more than initializeCachesRoundFlushInterval rounds since the last time we flushed the data to disk
+ // 2. if we completed the loading and we loaded up more than 320 rounds.
+ flushIntervalExceed := blk.Round()-lastFlushedRound > initializeCachesRoundFlushInterval
+ loadCompleted := (lastestBlockRound == blk.Round() && lastBalancesRound+basics.Round(blk.ConsensusProtocol().MaxBalLookback) < lastestBlockRound)
+ if flushIntervalExceed || loadCompleted {
+ // adjust the last flush time, so that we would not hold off the flushing due to "working too fast"
+ tr.lastFlushTime = time.Now().Add(-balancesFlushInterval)
+
+ if !rollbackSynchronousMode {
+ // switch to rebuild synchronous mode to improve performance
+ err0 := tr.dbs.Wdb.SetSynchronousMode(context.Background(), tr.accountsRebuildSynchronousMode, tr.accountsRebuildSynchronousMode >= db.SynchronousModeFull)
+ if err0 != nil {
+ tr.log.Warnf("initializeTrackerCaches was unable to switch to rbuild synchronous mode : %v", err0)
+ } else {
+ // flip the switch to rollback the synchronous mode once we're done.
+ rollbackSynchronousMode = true
+ }
+ }
+
+ var roundsBehind basics.Round
+
+ // flush the account data
+ tr.scheduleCommit(blk.Round(), basics.Round(config.Consensus[blk.BlockHeader.CurrentProtocol].MaxBalLookback))
+ // wait for the writing to complete.
+ tr.waitAccountsWriting()
+
+ func() {
+ tr.mu.RLock()
+ defer tr.mu.RUnlock()
+
+ // The au.dbRound after writing should be ~320 behind the block round.
+ roundsBehind = blk.Round() - tr.dbRound
+ }()
+
+ // are we too far behind ? ( taking into consideration the catchpoint writing, which can stall the writing for quite a bit )
+ if roundsBehind > initializeCachesRoundFlushInterval+basics.Round(catchpointInterval) {
+ // we're unable to persist changes. This is unexpected, but there is no point in keep trying batching additional changes since any further changes
+ // would just accumulate in memory.
+ close(blockEvalFailed)
+ tr.log.Errorf("initializeTrackerCaches was unable to fill up the account caches accounts round = %d, block round = %d. See above error for more details.", blk.Round()-roundsBehind, blk.Round())
+ err = fmt.Errorf("initializeTrackerCaches failed to initialize the account data caches")
+ return
+ }
+
+ // and once we flushed it to disk, update the lastFlushedRound
+ lastFlushedRound = blk.Round()
+ }
+
+ // if enough time have passed since the last time we wrote a message to the log file then give the user an update about the progess.
+ if time.Since(lastProgressMessage) > accountsCacheLoadingMessageInterval {
+ // drop the initial message if we're got to this point since a message saying "still initializing" that comes after "is initializing" doesn't seems to be right.
+ select {
+ case skipAccountCacheMessage <- struct{}{}:
+ // if we got to this point, we should be able to close the writeAccountCacheMessageCompleted channel to have the "completed initializing" message written.
+ close(writeAccountCacheMessageCompleted)
+ default:
+ }
+ tr.log.Infof("initializeTrackerCaches is still initializing account data caches, %d rounds loaded out of %d rounds", blk.Round()-lastBalancesRound, lastestBlockRound-lastBalancesRound)
+ lastProgressMessage = time.Now()
+ }
+
+ // prepare for the next iteration.
+ accLedgerEval.prevHeader = *delta.Hdr
+ }
+
+ if blockRetrievalError != nil {
+ err = blockRetrievalError
+ }
+ return
+
}