diff options
Diffstat (limited to 'ledger/acctupdates.go')
-rw-r--r-- | ledger/acctupdates.go | 1603 |
1 files changed, 168 insertions, 1435 deletions
diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index a49ef95d7..16d933fbb 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -20,13 +20,9 @@ import ( "container/heap" "context" "database/sql" - "encoding/hex" "fmt" "io" - "os" - "path/filepath" "sort" - "strconv" "sync" "sync/atomic" "time" @@ -35,7 +31,6 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" - "github.com/algorand/go-algorand/crypto/merkletrie" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" @@ -52,24 +47,8 @@ const ( balancesFlushInterval = 5 * time.Second // pendingDeltasFlushThreshold is the deltas count threshold above we flush the pending balances regardless of the flush interval. pendingDeltasFlushThreshold = 128 - // trieRebuildAccountChunkSize defines the number of accounts that would get read at a single chunk - // before added to the trie during trie construction - trieRebuildAccountChunkSize = 16384 - // trieRebuildCommitFrequency defines the number of accounts that would get added before we call evict to commit the changes and adjust the memory cache. - trieRebuildCommitFrequency = 65536 - // trieAccumulatedChangesFlush defines the number of pending changes that would be applied to the merkle trie before - // we attempt to commit them to disk while writing a batch of rounds balances to disk. - trieAccumulatedChangesFlush = 256 ) -// trieCachedNodesCount defines how many balances trie nodes we would like to keep around in memory. -// value was calibrated using BenchmarkCalibrateCacheNodeSize -var trieCachedNodesCount = 9000 - -// merkleCommitterNodesPerPage controls how many nodes will be stored in a single page -// value was calibrated using BenchmarkCalibrateNodesPerPage -var merkleCommitterNodesPerPage = int64(116) - // baseAccountsPendingAccountsBufferSize defines the size of the base account pending accounts buffer size. // At the beginning of a new round, the entries from this buffer are being flushed into the base accounts map. const baseAccountsPendingAccountsBufferSize = 100000 @@ -99,14 +78,6 @@ const initializingAccountCachesMessageTimeout = 3 * time.Second // where we end up batching up to 1000 rounds in a single update. const accountsUpdatePerRoundHighWatermark = 1 * time.Second -// TrieMemoryConfig is the memory configuration setup used for the merkle trie. -var TrieMemoryConfig = merkletrie.MemoryConfig{ - NodesCountPerPage: merkleCommitterNodesPerPage, - CachedNodesCount: trieCachedNodesCount, - PageFillFactor: 0.95, - MaxChildrenPagesThreshold: 64, -} - // A modifiedAccount represents an account that has been modified since // the persistent state stored in the account DB (i.e., in the range of // rounds covered by the accountUpdates tracker). @@ -125,28 +96,9 @@ type modifiedAccount struct { type accountUpdates struct { // constant variables ( initialized on initialize, and never changed afterward ) - // initAccounts specifies initial account values for database. - initAccounts map[basics.Address]basics.AccountData - - // initProto specifies the initial consensus parameters at the genesis block. - initProto config.ConsensusParams - - // dbDirectory is the directory where the ledger and block sql file resides as well as the parent directory for the catchup files to be generated - dbDirectory string - - // catchpointInterval is the configured interval at which the accountUpdates would generate catchpoint labels and catchpoint files. - catchpointInterval uint64 - // archivalLedger determines whether the associated ledger was configured as archival ledger or not. archivalLedger bool - // catchpointFileHistoryLength defines how many catchpoint files we want to store back. - // 0 means don't store any, -1 mean unlimited and positive number suggest the number of most recent catchpoint files. - catchpointFileHistoryLength int - - // vacuumOnStartup controls whether the accounts database would get vacuumed on startup. - vacuumOnStartup bool - // dynamic variables // Connection to the database. @@ -155,9 +107,9 @@ type accountUpdates struct { // Prepared SQL statements for fast accounts DB lookups. accountsq *accountsDbQueries - // dbRound is always exactly accountsRound(), - // cached to avoid SQL queries. - dbRound basics.Round + // cachedDBRound is always exactly tracker DB round (and therefore, accountsRound()), + // cached to use in lookup functions + cachedDBRound basics.Round // deltas stores updates for every round after dbRound. deltas []ledgercore.AccountDeltas @@ -181,75 +133,29 @@ type accountUpdates struct { // i.e., totals is one longer than deltas. roundTotals []ledgercore.AccountTotals - // roundDigest stores the digest of the block for every round starting with dbRound and every round after it. - roundDigest []crypto.Digest - // log copied from ledger log logging.Logger - // lastFlushTime is the time we last flushed updates to - // the accounts DB (bumping dbRound). - lastFlushTime time.Time - // ledger is the source ledger, which is used to synchronize // the rounds at which we need to flush the balances to disk // in favor of the catchpoint to be generated. ledger ledgerForTracker - // The Trie tracking the current account balances. Always matches the balances that were - // written to the database. - balancesTrie *merkletrie.Trie - - // The last catchpoint label that was written to the database. Should always align with what's in the database. - // note that this is the last catchpoint *label* and not the catchpoint file. - lastCatchpointLabel string - - // catchpointWriting help to synchronize the catchpoint file writing. When this atomic variable is 0, no writing is going on. - // Any non-zero value indicates a catchpoint being written. - catchpointWriting int32 - - // catchpointSlowWriting suggest to the accounts writer that it should finish writing up the catchpoint file ASAP. - // when this channel is closed, the accounts writer would try and complete the writing as soon as possible. - // otherwise, it would take it's time and perform periodic sleeps between chunks processing. - catchpointSlowWriting chan struct{} - - // ctx is the context for the committing go-routine. It's also used as the "parent" of the catchpoint generation operation. - ctx context.Context - - // ctxCancel is the canceling function for canceling the committing go-routine ( i.e. signaling the committing go-routine that it's time to abort ) - ctxCancel context.CancelFunc - // deltasAccum stores the accumulated deltas for every round starting dbRound-1. deltasAccum []int - // committedOffset is the offset at which we'd like to persist all the previous account information to disk. - committedOffset chan deferredCommit - // accountsMu is the synchronization mutex for accessing the various non-static variables. accountsMu deadlock.RWMutex // accountsReadCond used to synchronize read access to the internal data structures. accountsReadCond *sync.Cond - // accountsWriting provides synchronization around the background writing of account balances. - accountsWriting sync.WaitGroup - - // commitSyncerClosed is the blocking channel for synchronizing closing the commitSyncer goroutine. Once it's closed, the - // commitSyncer can be assumed to have aborted. - commitSyncerClosed chan struct{} - // voters keeps track of Merkle trees of online accounts, used for compact certificates. voters *votersTracker // baseAccounts stores the most recently used accounts, at exactly dbRound baseAccounts lruAccounts - // the synchronous mode that would be used for the account database. - synchronousMode db.SynchronousMode - - // the synchronous mode that would be used while the accounts database is being rebuilt. - accountsRebuildSynchronousMode db.SynchronousMode - // logAccountUpdatesMetrics is a flag for enable/disable metrics logging logAccountUpdatesMetrics bool @@ -300,106 +206,37 @@ func (e *MismatchingDatabaseRoundError) Error() string { } // initialize initializes the accountUpdates structure -func (au *accountUpdates) initialize(cfg config.Local, dbPathPrefix string, genesisProto config.ConsensusParams, genesisAccounts map[basics.Address]basics.AccountData) { - au.initProto = genesisProto - au.initAccounts = genesisAccounts - au.dbDirectory = filepath.Dir(dbPathPrefix) +func (au *accountUpdates) initialize(cfg config.Local) { au.archivalLedger = cfg.Archival - switch cfg.CatchpointTracking { - case -1: - au.catchpointInterval = 0 - default: - // give a warning, then fall thought - logging.Base().Warnf("accountUpdates: the CatchpointTracking field in the config.json file contains an invalid value (%d). The default value of 0 would be used instead.", cfg.CatchpointTracking) - fallthrough - case 0: - if au.archivalLedger { - au.catchpointInterval = cfg.CatchpointInterval - } else { - au.catchpointInterval = 0 - } - case 1: - au.catchpointInterval = cfg.CatchpointInterval - } - au.catchpointFileHistoryLength = cfg.CatchpointFileHistoryLength - if cfg.CatchpointFileHistoryLength < -1 { - au.catchpointFileHistoryLength = -1 - } - au.vacuumOnStartup = cfg.OptimizeAccountsDatabaseOnStartup - // initialize the commitSyncerClosed with a closed channel ( since the commitSyncer go-routine is not active ) - au.commitSyncerClosed = make(chan struct{}) - close(au.commitSyncerClosed) au.accountsReadCond = sync.NewCond(au.accountsMu.RLocker()) - au.synchronousMode = db.SynchronousMode(cfg.LedgerSynchronousMode) - au.accountsRebuildSynchronousMode = db.SynchronousMode(cfg.AccountsRebuildSynchronousMode) // log metrics au.logAccountUpdatesMetrics = cfg.EnableAccountUpdatesStats au.logAccountUpdatesInterval = cfg.AccountUpdatesStatsInterval - } // loadFromDisk is the 2nd level initialization, and is required before the accountUpdates becomes functional // The close function is expected to be call in pair with loadFromDisk -func (au *accountUpdates) loadFromDisk(l ledgerForTracker) error { +func (au *accountUpdates) loadFromDisk(l ledgerForTracker, lastBalancesRound basics.Round) error { au.accountsMu.Lock() defer au.accountsMu.Unlock() - var writingCatchpointRound uint64 - lastBalancesRound, lastestBlockRound, err := au.initializeFromDisk(l) - - if err != nil { - return err - } - var writingCatchpointDigest crypto.Digest - - writingCatchpointRound, _, err = au.accountsq.readCatchpointStateUint64(context.Background(), catchpointStateWritingCatchpoint) - if err != nil { - return err - } - - writingCatchpointDigest, err = au.initializeCaches(lastBalancesRound, lastestBlockRound, basics.Round(writingCatchpointRound)) + au.cachedDBRound = lastBalancesRound + err := au.initializeFromDisk(l, lastBalancesRound) if err != nil { return err } - - if writingCatchpointRound != 0 && au.catchpointInterval != 0 { - au.generateCatchpoint(basics.Round(writingCatchpointRound), au.lastCatchpointLabel, writingCatchpointDigest, time.Duration(0)) - } - - au.voters = &votersTracker{} - err = au.voters.loadFromDisk(l, au) - if err != nil { - return err - } - return nil } -// waitAccountsWriting waits for all the pending ( or current ) account writing to be completed. -func (au *accountUpdates) waitAccountsWriting() { - au.accountsWriting.Wait() -} - // close closes the accountUpdates, waiting for all the child go-routine to complete func (au *accountUpdates) close() { if au.voters != nil { au.voters.close() } - if au.ctxCancel != nil { - au.ctxCancel() - } - au.waitAccountsWriting() - // this would block until the commitSyncerClosed channel get closed. - <-au.commitSyncerClosed - au.baseAccounts.prune(0) -} -// IsWritingCatchpointFile returns true when a catchpoint file is being generated. The function is used by the catchup service -// to avoid memory pressure until the catchpoint file writing is complete. -func (au *accountUpdates) IsWritingCatchpointFile() bool { - return atomic.LoadInt32(&au.catchpointWriting) != 0 + au.baseAccounts.prune(0) } // LookupWithRewards returns the account data for a given address at a given round. @@ -428,7 +265,7 @@ func (au *accountUpdates) ListApplications(maxAppIdx basics.AppIndex, maxResults func (au *accountUpdates) listCreatables(maxCreatableIdx basics.CreatableIndex, maxResults uint64, ctype basics.CreatableType) ([]basics.CreatableLocator, error) { au.accountsMu.RLock() for { - currentDbRound := au.dbRound + currentDbRound := au.cachedDBRound currentDeltaLen := len(au.deltas) // Sort indices for creatables that have been created/deleted. If this // turns out to be too inefficient, we could keep around a heap of @@ -502,7 +339,7 @@ func (au *accountUpdates) listCreatables(maxCreatableIdx basics.CreatableIndex, return []basics.CreatableLocator{}, &StaleDatabaseRoundError{databaseRound: dbRound, memoryRound: currentDbRound} } au.accountsMu.RLock() - for currentDbRound >= au.dbRound && currentDeltaLen == len(au.deltas) { + for currentDbRound >= au.cachedDBRound && currentDeltaLen == len(au.deltas) { au.accountsReadCond.Wait() } } @@ -511,11 +348,11 @@ func (au *accountUpdates) listCreatables(maxCreatableIdx basics.CreatableIndex, // onlineTop returns the top n online accounts, sorted by their normalized // balance and address, whose voting keys are valid in voteRnd. See the // normalization description in AccountData.NormalizedOnlineBalance(). -func (au *accountUpdates) onlineTop(rnd basics.Round, voteRnd basics.Round, n uint64) ([]*onlineAccount, error) { +func (au *accountUpdates) onlineTop(rnd basics.Round, voteRnd basics.Round, n uint64) ([]*ledgercore.OnlineAccount, error) { proto := au.ledger.GenesisProto() au.accountsMu.RLock() for { - currentDbRound := au.dbRound + currentDbRound := au.cachedDBRound currentDeltaLen := len(au.deltas) offset, err := au.roundOffset(rnd) if err != nil { @@ -530,7 +367,7 @@ func (au *accountUpdates) onlineTop(rnd basics.Round, voteRnd basics.Round, n ui // is not valid in voteRnd. Otherwise, the *onlineAccount is the // representation of the most recent state of the account, and it // is online and can vote in voteRnd. - modifiedAccounts := make(map[basics.Address]*onlineAccount) + modifiedAccounts := make(map[basics.Address]*ledgercore.OnlineAccount) for o := uint64(0); o < offset; o++ { for i := 0; i < au.deltas[o].Len(); i++ { addr, d := au.deltas[o].GetByIdx(i) @@ -558,12 +395,12 @@ func (au *accountUpdates) onlineTop(rnd basics.Round, voteRnd basics.Round, n ui // // Keep asking for more accounts until we get the desired number, // or there are no more accounts left. - candidates := make(map[basics.Address]*onlineAccount) + candidates := make(map[basics.Address]*ledgercore.OnlineAccount) batchOffset := uint64(0) batchSize := uint64(1024) var dbRound basics.Round for uint64(len(candidates)) < n+uint64(len(modifiedAccounts)) { - var accts map[basics.Address]*onlineAccount + var accts map[basics.Address]*ledgercore.OnlineAccount start := time.Now() ledgerAccountsonlinetopCount.Inc(nil) err = au.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { @@ -571,7 +408,7 @@ func (au *accountUpdates) onlineTop(rnd basics.Round, voteRnd basics.Round, n ui if err != nil { return } - dbRound, _, err = accountsRound(tx) + dbRound, err = accountsRound(tx) return }) ledgerAccountsonlinetopMicros.AddMicrosecondsSince(start, nil) @@ -601,7 +438,7 @@ func (au *accountUpdates) onlineTop(rnd basics.Round, voteRnd basics.Round, n ui if dbRound != currentDbRound && dbRound != basics.Round(0) { // database round doesn't match the last au.dbRound we sampled. au.accountsMu.RLock() - for currentDbRound >= au.dbRound && currentDeltaLen == len(au.deltas) { + for currentDbRound >= au.cachedDBRound && currentDeltaLen == len(au.deltas) { au.accountsReadCond.Wait() } continue @@ -627,9 +464,9 @@ func (au *accountUpdates) onlineTop(rnd basics.Round, voteRnd basics.Round, n ui heap.Push(topHeap, data) } - var res []*onlineAccount + var res []*ledgercore.OnlineAccount for topHeap.Len() > 0 && uint64(len(res)) < n { - acct := heap.Pop(topHeap).(*onlineAccount) + acct := heap.Pop(topHeap).(*ledgercore.OnlineAccount) res = append(res, acct) } @@ -637,129 +474,72 @@ func (au *accountUpdates) onlineTop(rnd basics.Round, voteRnd basics.Round, n ui } } -// GetLastCatchpointLabel retrieves the last catchpoint label that was stored to the database. -func (au *accountUpdates) GetLastCatchpointLabel() string { - au.accountsMu.RLock() - defer au.accountsMu.RUnlock() - return au.lastCatchpointLabel -} - // GetCreatorForRound returns the creator for a given asset/app index at a given round func (au *accountUpdates) GetCreatorForRound(rnd basics.Round, cidx basics.CreatableIndex, ctype basics.CreatableType) (creator basics.Address, ok bool, err error) { return au.getCreatorForRound(rnd, cidx, ctype, true /* take the lock */) } -// committedUpTo enqueues committing the balances for round committedRound-lookback. +// committedUpTo implements the ledgerTracker interface for accountUpdates. +// The method informs the tracker that committedRound and all it's previous rounds have +// been committed to the block database. The method returns what is the oldest round +// number that can be removed from the blocks database as well as the lookback that this +// tracker maintains. +func (au *accountUpdates) committedUpTo(committedRound basics.Round) (retRound, lookback basics.Round) { + au.accountsMu.RLock() + defer au.accountsMu.RUnlock() + + retRound = basics.Round(0) + lookback = basics.Round(config.Consensus[au.versions[len(au.versions)-1]].MaxBalLookback) + if committedRound < lookback { + return + } + + retRound = au.cachedDBRound + return +} + +// produceCommittingTask enqueues committing the balances for round committedRound-lookback. // The deferred committing is done so that we could calculate the historical balances lookback rounds back. // Since we don't want to hold off the tracker's mutex for too long, we'll defer the database persistence of this // operation to a syncer goroutine. The one caveat is that when storing a catchpoint round, we would want to // wait until the catchpoint creation is done, so that the persistence of the catchpoint file would have an // uninterrupted view of the balances at a given point of time. -func (au *accountUpdates) committedUpTo(committedRound basics.Round) (retRound basics.Round) { - var isCatchpointRound, hasMultipleIntermediateCatchpoint bool +func (au *accountUpdates) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { var offset uint64 - var dc deferredCommit au.accountsMu.RLock() - defer func() { - au.accountsMu.RUnlock() - if dc.offset != 0 { - au.committedOffset <- dc - } - }() - retRound = basics.Round(0) - var pendingDeltas int + defer au.accountsMu.RUnlock() - lookback := basics.Round(config.Consensus[au.versions[len(au.versions)-1]].MaxBalLookback) - if committedRound < lookback { - return + if committedRound < dcr.lookback { + return nil } - retRound = au.dbRound - newBase := committedRound - lookback - if newBase <= au.dbRound { + newBase := committedRound - dcr.lookback + if newBase <= dbRound { // Already forgotten - return - } - - if newBase > au.dbRound+basics.Round(len(au.deltas)) { - au.log.Panicf("committedUpTo: block %d too far in the future, lookback %d, dbRound %d, deltas %d", committedRound, lookback, au.dbRound, len(au.deltas)) + return nil } - hasIntermediateCatchpoint := false - hasMultipleIntermediateCatchpoint = false - // check if there was a catchpoint between au.dbRound+lookback and newBase+lookback - if au.catchpointInterval > 0 { - nextCatchpointRound := ((uint64(au.dbRound+lookback) + au.catchpointInterval) / au.catchpointInterval) * au.catchpointInterval - - if nextCatchpointRound < uint64(newBase+lookback) { - mostRecentCatchpointRound := (uint64(committedRound) / au.catchpointInterval) * au.catchpointInterval - newBase = basics.Round(nextCatchpointRound) - lookback - if mostRecentCatchpointRound > nextCatchpointRound { - hasMultipleIntermediateCatchpoint = true - // skip if there is more than one catchpoint in queue - newBase = basics.Round(mostRecentCatchpointRound) - lookback - } - hasIntermediateCatchpoint = true - } - } - - // if we're still writing the previous balances, we can't move forward yet. - if au.IsWritingCatchpointFile() { - // if we hit this path, it means that we're still writing a catchpoint. - // see if the new delta range contains another catchpoint. - if hasIntermediateCatchpoint { - // check if we're already attempting to perform fast-writing. - select { - case <-au.catchpointSlowWriting: - // yes, we're already doing fast-writing. - default: - // no, we're not yet doing fast writing, make it so. - close(au.catchpointSlowWriting) - } - } - return + if newBase > dbRound+basics.Round(len(au.deltas)) { + au.log.Panicf("produceCommittingTask: block %d too far in the future, lookback %d, dbRound %d (cached %d), deltas %d", committedRound, dcr.lookback, dbRound, au.cachedDBRound, len(au.deltas)) } if au.voters != nil { newBase = au.voters.lowestRound(newBase) } - offset = uint64(newBase - au.dbRound) + offset = uint64(newBase - dbRound) offset = au.consecutiveVersion(offset) - // check to see if this is a catchpoint round - isCatchpointRound = ((offset + uint64(lookback+au.dbRound)) > 0) && (au.catchpointInterval != 0) && (0 == (uint64((offset + uint64(lookback+au.dbRound))) % au.catchpointInterval)) - // calculate the number of pending deltas - pendingDeltas = au.deltasAccum[offset] - au.deltasAccum[0] - - // If we recently flushed, wait to aggregate some more blocks. - // ( unless we're creating a catchpoint, in which case we want to flush it right away - // so that all the instances of the catchpoint would contain exactly the same data ) - flushTime := time.Now() - if !flushTime.After(au.lastFlushTime.Add(balancesFlushInterval)) && !isCatchpointRound && pendingDeltas < pendingDeltasFlushThreshold { - return au.dbRound - } - - if isCatchpointRound && au.archivalLedger { - // store non-zero ( all ones ) into the catchpointWriting atomic variable to indicate that a catchpoint is being written ( or, queued to be written ) - atomic.StoreInt32(&au.catchpointWriting, int32(-1)) - au.catchpointSlowWriting = make(chan struct{}, 1) - if hasMultipleIntermediateCatchpoint { - close(au.catchpointSlowWriting) - } - } + dcr.pendingDeltas = au.deltasAccum[offset] - au.deltasAccum[0] - dc = deferredCommit{ - offset: offset, - dbRound: au.dbRound, - lookback: lookback, - } - if offset != 0 { - au.accountsWriting.Add(1) - } - return + // submit committing task only if offset is non-zero in addition to + // 1) no pending catchpoint writes + // 2) batching requirements meet or catchpoint round + dcr.oldBase = dbRound + dcr.offset = offset + return dcr } func (au *accountUpdates) consecutiveVersion(offset uint64) uint64 { @@ -822,64 +602,6 @@ func (r *readCloseSizer) Size() (int64, error) { return r.size, nil } -// GetCatchpointStream returns a ReadCloseSizer to the catchpoint file associated with the provided round -func (au *accountUpdates) GetCatchpointStream(round basics.Round) (ReadCloseSizer, error) { - dbFileName := "" - fileSize := int64(0) - start := time.Now() - ledgerGetcatchpointCount.Inc(nil) - err := au.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - dbFileName, _, fileSize, err = getCatchpoint(tx, round) - return - }) - ledgerGetcatchpointMicros.AddMicrosecondsSince(start, nil) - if err != nil && err != sql.ErrNoRows { - // we had some sql error. - return nil, fmt.Errorf("accountUpdates: getCatchpointStream: unable to lookup catchpoint %d: %v", round, err) - } - if dbFileName != "" { - catchpointPath := filepath.Join(au.dbDirectory, dbFileName) - file, err := os.OpenFile(catchpointPath, os.O_RDONLY, 0666) - if err == nil && file != nil { - return &readCloseSizer{ReadCloser: file, size: fileSize}, nil - } - // else, see if this is a file-not-found error - if os.IsNotExist(err) { - // the database told us that we have this file.. but we couldn't find it. - // delete it from the database. - err := au.saveCatchpointFile(round, "", 0, "") - if err != nil { - au.log.Warnf("accountUpdates: getCatchpointStream: unable to delete missing catchpoint entry: %v", err) - return nil, err - } - - return nil, ledgercore.ErrNoEntry{} - } - // it's some other error. - return nil, fmt.Errorf("accountUpdates: getCatchpointStream: unable to open catchpoint file '%s' %v", catchpointPath, err) - } - - // if the database doesn't know about that round, see if we have that file anyway: - fileName := filepath.Join("catchpoints", catchpointRoundToPath(round)) - catchpointPath := filepath.Join(au.dbDirectory, fileName) - file, err := os.OpenFile(catchpointPath, os.O_RDONLY, 0666) - if err == nil && file != nil { - // great, if found that we should have had this in the database.. add this one now : - fileInfo, err := file.Stat() - if err != nil { - // we couldn't get the stat, so just return with the file. - return &readCloseSizer{ReadCloser: file, size: -1}, nil - } - - err = au.saveCatchpointFile(round, fileName, fileInfo.Size(), "") - if err != nil { - au.log.Warnf("accountUpdates: getCatchpointStream: unable to save missing catchpoint entry: %v", err) - } - return &readCloseSizer{ReadCloser: file, size: fileInfo.Size()}, nil - } - return nil, ledgercore.ErrNoEntry{} -} - // functions below this line are all internal functions // accountUpdatesLedgerEvaluator is a "ledger emulator" which is used *only* by initializeCaches, as a way to shortcut @@ -903,7 +625,7 @@ func (aul *accountUpdatesLedgerEvaluator) GenesisHash() crypto.Digest { } // CompactCertVoters returns the top online accounts at round rnd. -func (aul *accountUpdatesLedgerEvaluator) CompactCertVoters(rnd basics.Round) (voters *VotersForRound, err error) { +func (aul *accountUpdatesLedgerEvaluator) CompactCertVoters(rnd basics.Round) (voters *ledgercore.VotersForRound, err error) { return aul.au.voters.getVoters(rnd) } @@ -922,7 +644,7 @@ func (aul *accountUpdatesLedgerEvaluator) LatestTotals() (basics.Round, ledgerco } // CheckDup test to see if the given transaction id/lease already exists. It's not needed by the accountUpdatesLedgerEvaluator and implemented as a stub. -func (aul *accountUpdatesLedgerEvaluator) CheckDup(config.ConsensusParams, basics.Round, basics.Round, basics.Round, transactions.Txid, TxLease) error { +func (aul *accountUpdatesLedgerEvaluator) CheckDup(config.ConsensusParams, basics.Round, basics.Round, basics.Round, transactions.Txid, ledgercore.Txlease) error { // this is a non-issue since this call will never be made on non-validating evaluation return fmt.Errorf("accountUpdatesLedgerEvaluator: tried to check for dup during accountUpdates initialization ") } @@ -951,205 +673,20 @@ func (au *accountUpdates) totalsImpl(rnd basics.Round) (totals ledgercore.Accoun // latestTotalsImpl returns the totals of all accounts for the most recent round, as well as the round number func (au *accountUpdates) latestTotalsImpl() (basics.Round, ledgercore.AccountTotals, error) { offset := len(au.deltas) - rnd := au.dbRound + basics.Round(len(au.deltas)) + rnd := au.cachedDBRound + basics.Round(len(au.deltas)) return rnd, au.roundTotals[offset], nil } -// initializeCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ). -// the method also support balances recovery in cases where the difference between the lastBalancesRound and the lastestBlockRound -// is far greater than 320; in these cases, it would flush to disk periodically in order to avoid high memory consumption. -func (au *accountUpdates) initializeCaches(lastBalancesRound, lastestBlockRound, writingCatchpointRound basics.Round) (catchpointBlockDigest crypto.Digest, err error) { - var blk bookkeeping.Block - var delta ledgercore.StateDelta - - accLedgerEval := accountUpdatesLedgerEvaluator{ - au: au, - } - if lastBalancesRound < lastestBlockRound { - accLedgerEval.prevHeader, err = au.ledger.BlockHdr(lastBalancesRound) - if err != nil { - return - } - } - - skipAccountCacheMessage := make(chan struct{}) - writeAccountCacheMessageCompleted := make(chan struct{}) - defer func() { - close(skipAccountCacheMessage) - select { - case <-writeAccountCacheMessageCompleted: - if err == nil { - au.log.Infof("initializeCaches completed initializing account data caches") - } - default: - } - }() - - // this goroutine logs a message once if the parent function have not completed in initializingAccountCachesMessageTimeout seconds. - // the message is important, since we're blocking on the ledger block database here, and we want to make sure that we log a message - // within the above timeout. - go func() { - select { - case <-time.After(initializingAccountCachesMessageTimeout): - au.log.Infof("initializeCaches is initializing account data caches") - close(writeAccountCacheMessageCompleted) - case <-skipAccountCacheMessage: - } - }() - - blocksStream := make(chan bookkeeping.Block, initializeCachesReadaheadBlocksStream) - blockEvalFailed := make(chan struct{}, 1) - var blockRetrievalError error - go func() { - defer close(blocksStream) - for roundNumber := lastBalancesRound + 1; roundNumber <= lastestBlockRound; roundNumber++ { - blk, blockRetrievalError = au.ledger.Block(roundNumber) - if blockRetrievalError != nil { - return - } - select { - case blocksStream <- blk: - case <-blockEvalFailed: - return - } - } - }() - - lastFlushedRound := lastBalancesRound - const accountsCacheLoadingMessageInterval = 5 * time.Second - lastProgressMessage := time.Now().Add(-accountsCacheLoadingMessageInterval / 2) - - // rollbackSynchronousMode ensures that we switch to "fast writing mode" when we start flushing out rounds to disk, and that - // we exit this mode when we're done. - rollbackSynchronousMode := false - defer func() { - if rollbackSynchronousMode { - // restore default synchronous mode - au.dbs.Wdb.SetSynchronousMode(context.Background(), au.synchronousMode, au.synchronousMode >= db.SynchronousModeFull) - } - }() - - for blk := range blocksStream { - delta, err = au.ledger.trackerEvalVerified(blk, &accLedgerEval) - if err != nil { - close(blockEvalFailed) - return - } - - au.newBlockImpl(blk, delta) - - if blk.Round() == basics.Round(writingCatchpointRound) { - catchpointBlockDigest = blk.Digest() - } - - // flush to disk if any of the following applies: - // 1. if we have loaded up more than initializeCachesRoundFlushInterval rounds since the last time we flushed the data to disk - // 2. if we completed the loading and we loaded up more than 320 rounds. - flushIntervalExceed := blk.Round()-lastFlushedRound > initializeCachesRoundFlushInterval - loadCompleted := (lastestBlockRound == blk.Round() && lastBalancesRound+basics.Round(blk.ConsensusProtocol().MaxBalLookback) < lastestBlockRound) - if flushIntervalExceed || loadCompleted { - // adjust the last flush time, so that we would not hold off the flushing due to "working too fast" - au.lastFlushTime = time.Now().Add(-balancesFlushInterval) - - if !rollbackSynchronousMode { - // switch to rebuild synchronous mode to improve performance - au.dbs.Wdb.SetSynchronousMode(context.Background(), au.accountsRebuildSynchronousMode, au.accountsRebuildSynchronousMode >= db.SynchronousModeFull) - - // flip the switch to rollback the synchronous mode once we're done. - rollbackSynchronousMode = true - } - - // The unlocking/relocking here isn't very elegant, but it does get the work done : - // this method is called on either startup or when fast catchup is complete. In the former usecase, the - // locking here is not really needed since the system is only starting up, and there are no other - // consumers for the accounts update. On the latter usecase, the function would always have exactly 320 rounds, - // and therefore this wouldn't be an issue. - // However, to make sure we're not missing any other future codepath, unlocking here and re-locking later on is a pretty - // safe bet. - au.accountsMu.Unlock() - - // flush the account data - au.committedUpTo(blk.Round()) - - // wait for the writing to complete. - au.waitAccountsWriting() - - // The au.dbRound after writing should be ~320 behind the block round. - roundsBehind := blk.Round() - au.dbRound - - au.accountsMu.Lock() - - // are we too far behind ? ( taking into consideration the catchpoint writing, which can stall the writing for quite a bit ) - if roundsBehind > initializeCachesRoundFlushInterval+basics.Round(au.catchpointInterval) { - // we're unable to persist changes. This is unexpected, but there is no point in keep trying batching additional changes since any further changes - // would just accumulate in memory. - close(blockEvalFailed) - au.log.Errorf("initializeCaches was unable to fill up the account caches accounts round = %d, block round = %d. See above error for more details.", au.dbRound, blk.Round()) - err = fmt.Errorf("initializeCaches failed to initialize the account data caches") - return - } - - // and once we flushed it to disk, update the lastFlushedRound - lastFlushedRound = blk.Round() - } - - // if enough time have passed since the last time we wrote a message to the log file then give the user an update about the progess. - if time.Now().Sub(lastProgressMessage) > accountsCacheLoadingMessageInterval { - // drop the initial message if we're got to this point since a message saying "still initializing" that comes after "is initializing" doesn't seems to be right. - select { - case skipAccountCacheMessage <- struct{}{}: - // if we got to this point, we should be able to close the writeAccountCacheMessageCompleted channel to have the "completed initializing" message written. - close(writeAccountCacheMessageCompleted) - default: - } - au.log.Infof("initializeCaches is still initializing account data caches, %d rounds loaded out of %d rounds", blk.Round()-lastBalancesRound, lastestBlockRound-lastBalancesRound) - lastProgressMessage = time.Now() - } - - // prepare for the next iteration. - accLedgerEval.prevHeader = *delta.Hdr - } - - if blockRetrievalError != nil { - err = blockRetrievalError - } - return -} - // initializeFromDisk performs the atomic operation of loading the accounts data information from disk -// and preparing the accountUpdates for operation, including initializing the commitSyncer goroutine. -func (au *accountUpdates) initializeFromDisk(l ledgerForTracker) (lastBalancesRound, lastestBlockRound basics.Round, err error) { +// and preparing the accountUpdates for operation. +func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRound basics.Round) (err error) { au.dbs = l.trackerDB() au.log = l.trackerLog() au.ledger = l - if au.initAccounts == nil { - err = fmt.Errorf("accountUpdates.initializeFromDisk: initAccounts not set") - return - } - - lastestBlockRound = l.Latest() start := time.Now() ledgerAccountsinitCount.Inc(nil) err = au.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - var err0 error - au.dbRound, err0 = au.accountsInitialize(ctx, tx) - if err0 != nil { - return err0 - } - // Check for blocks DB and tracker DB un-sync - if au.dbRound > lastestBlockRound { - au.log.Warnf("accountUpdates.initializeFromDisk: resetting accounts DB (on round %v, but blocks DB's latest is %v)", au.dbRound, lastestBlockRound) - err0 = accountsReset(tx) - if err0 != nil { - return err0 - } - au.dbRound, err0 = au.accountsInitialize(ctx, tx) - if err0 != nil { - return err0 - } - } - totals, err0 := accountsTotals(tx, false) if err0 != nil { return err0 @@ -1164,19 +701,12 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker) (lastBalancesRo return } - // the VacuumDatabase would be a no-op if au.vacuumOnStartup is cleared. - au.vacuumDatabase(context.Background()) + au.accountsq, err = accountsInitDbQueries(au.dbs.Rdb.Handle, au.dbs.Wdb.Handle) if err != nil { return } - au.accountsq, err = accountsDbInit(au.dbs.Rdb.Handle, au.dbs.Wdb.Handle) - au.lastCatchpointLabel, _, err = au.accountsq.readCatchpointStateString(context.Background(), catchpointStateLastCatchpoint) - if err != nil { - return - } - - hdr, err := l.BlockHdr(au.dbRound) + hdr, err := l.BlockHdr(lastBalancesRound) if err != nil { return } @@ -1187,498 +717,8 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker) (lastBalancesRo au.accounts = make(map[basics.Address]modifiedAccount) au.creatables = make(map[basics.CreatableIndex]ledgercore.ModifiedCreatable) au.deltasAccum = []int{0} - au.roundDigest = nil - - au.catchpointWriting = 0 - // keep these channel closed if we're not generating catchpoint - au.catchpointSlowWriting = make(chan struct{}, 1) - close(au.catchpointSlowWriting) - au.ctx, au.ctxCancel = context.WithCancel(context.Background()) - au.committedOffset = make(chan deferredCommit, 1) - au.commitSyncerClosed = make(chan struct{}) - go au.commitSyncer(au.committedOffset) - - lastBalancesRound = au.dbRound - au.baseAccounts.init(au.log, baseAccountsPendingAccountsBufferSize, baseAccountsPendingAccountsWarnThreshold) - return -} - -// accountHashBuilder calculates the hash key used for the trie by combining the account address and the account data -func accountHashBuilder(addr basics.Address, accountData basics.AccountData, encodedAccountData []byte) []byte { - hash := make([]byte, 4+crypto.DigestSize) - // write out the lowest 32 bits of the reward base. This should improve the caching of the trie by allowing - // recent updated to be in-cache, and "older" nodes will be left alone. - for i, rewards := 3, accountData.RewardsBase; i >= 0; i, rewards = i-1, rewards>>8 { - // the following takes the rewards & 255 -> hash[i] - hash[i] = byte(rewards) - } - entryHash := crypto.Hash(append(addr[:], encodedAccountData[:]...)) - copy(hash[4:], entryHash[:]) - return hash[:] -} - -// accountsInitialize initializes the accounts DB if needed and return current account round. -// as part of the initialization, it tests the current database schema version, and perform upgrade -// procedures to bring it up to the database schema supported by the binary. -func (au *accountUpdates) accountsInitialize(ctx context.Context, tx *sql.Tx) (basics.Round, error) { - // check current database version. - dbVersion, err := db.GetUserVersion(ctx, tx) - if err != nil { - return 0, fmt.Errorf("accountsInitialize unable to read database schema version : %v", err) - } - - // if database version is greater than supported by current binary, write a warning. This would keep the existing - // fallback behavior where we could use an older binary iff the schema happen to be backward compatible. - if dbVersion > accountDBVersion { - au.log.Warnf("accountsInitialize database schema version is %d, but algod supports only %d", dbVersion, accountDBVersion) - } - - if dbVersion < accountDBVersion { - au.log.Infof("accountsInitialize upgrading database schema from version %d to version %d", dbVersion, accountDBVersion) - // newDatabase is determined during the tables creations. If we're filling the database with accounts, - // then we set this variable to true, allowing some of the upgrades to be skipped. - var newDatabase bool - for dbVersion < accountDBVersion { - au.log.Infof("accountsInitialize performing upgrade from version %d", dbVersion) - // perform the initialization/upgrade - switch dbVersion { - case 0: - dbVersion, newDatabase, err = au.upgradeDatabaseSchema0(ctx, tx) - if err != nil { - au.log.Warnf("accountsInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 0 : %v", err) - return 0, err - } - case 1: - dbVersion, err = au.upgradeDatabaseSchema1(ctx, tx, newDatabase) - if err != nil { - au.log.Warnf("accountsInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 1 : %v", err) - return 0, err - } - case 2: - dbVersion, err = au.upgradeDatabaseSchema2(ctx, tx, newDatabase) - if err != nil { - au.log.Warnf("accountsInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 2 : %v", err) - return 0, err - } - case 3: - dbVersion, err = au.upgradeDatabaseSchema3(ctx, tx, newDatabase) - if err != nil { - au.log.Warnf("accountsInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 3 : %v", err) - return 0, err - } - case 4: - dbVersion, err = au.upgradeDatabaseSchema4(ctx, tx, newDatabase) - if err != nil { - au.log.Warnf("accountsInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 4 : %v", err) - return 0, err - } - default: - return 0, fmt.Errorf("accountsInitialize unable to upgrade database from schema version %d", dbVersion) - } - } - - au.log.Infof("accountsInitialize database schema upgrade complete") - } - - rnd, hashRound, err := accountsRound(tx) - if err != nil { - return 0, err - } - - if hashRound != rnd { - // if the hashed round is different then the base round, something was modified, and the accounts aren't in sync - // with the hashes. - err = resetAccountHashes(tx) - if err != nil { - return 0, err - } - // if catchpoint is disabled on this node, we could complete the initialization right here. - if au.catchpointInterval == 0 { - return rnd, nil - } - } - - // create the merkle trie for the balances - committer, err := MakeMerkleCommitter(tx, false) - if err != nil { - return 0, fmt.Errorf("accountsInitialize was unable to makeMerkleCommitter: %v", err) - } - - trie, err := merkletrie.MakeTrie(committer, TrieMemoryConfig) - if err != nil { - return 0, fmt.Errorf("accountsInitialize was unable to MakeTrie: %v", err) - } - - // we might have a database that was previously initialized, and now we're adding the balances trie. In that case, we need to add all the existing balances to this trie. - // we can figure this out by examining the hash of the root: - rootHash, err := trie.RootHash() - if err != nil { - return rnd, fmt.Errorf("accountsInitialize was unable to retrieve trie root hash: %v", err) - } - - if rootHash.IsZero() { - au.log.Infof("accountsInitialize rebuilding merkle trie for round %d", rnd) - accountBuilderIt := makeOrderedAccountsIter(tx, trieRebuildAccountChunkSize) - defer accountBuilderIt.Close(ctx) - startTrieBuildTime := time.Now() - accountsCount := 0 - lastRebuildTime := startTrieBuildTime - pendingAccounts := 0 - totalOrderedAccounts := 0 - for { - accts, processedRows, err := accountBuilderIt.Next(ctx) - if err == sql.ErrNoRows { - // the account builder would return sql.ErrNoRows when no more data is available. - break - } else if err != nil { - return rnd, err - } - - if len(accts) > 0 { - accountsCount += len(accts) - pendingAccounts += len(accts) - for _, acct := range accts { - added, err := trie.Add(acct.digest) - if err != nil { - return rnd, fmt.Errorf("accountsInitialize was unable to add changes to trie: %v", err) - } - if !added { - au.log.Warnf("accountsInitialize attempted to add duplicate hash '%s' to merkle trie for account %v", hex.EncodeToString(acct.digest), acct.address) - } - } - - if pendingAccounts >= trieRebuildCommitFrequency { - // this trie Evict will commit using the current transaction. - // if anything goes wrong, it will still get rolled back. - _, err = trie.Evict(true) - if err != nil { - return 0, fmt.Errorf("accountsInitialize was unable to commit changes to trie: %v", err) - } - pendingAccounts = 0 - } - - if time.Now().Sub(lastRebuildTime) > 5*time.Second { - // let the user know that the trie is still being rebuilt. - au.log.Infof("accountsInitialize still building the trie, and processed so far %d accounts", accountsCount) - lastRebuildTime = time.Now() - } - } else if processedRows > 0 { - totalOrderedAccounts += processedRows - // if it's not ordered, we can ignore it for now; we'll just increase the counters and emit logs periodically. - if time.Now().Sub(lastRebuildTime) > 5*time.Second { - // let the user know that the trie is still being rebuilt. - au.log.Infof("accountsInitialize still building the trie, and hashed so far %d accounts", totalOrderedAccounts) - lastRebuildTime = time.Now() - } - } - } - - // this trie Evict will commit using the current transaction. - // if anything goes wrong, it will still get rolled back. - _, err = trie.Evict(true) - if err != nil { - return 0, fmt.Errorf("accountsInitialize was unable to commit changes to trie: %v", err) - } - - // we've just updated the merkle trie, update the hashRound to reflect that. - err = updateAccountsRound(tx, rnd, rnd) - if err != nil { - return 0, fmt.Errorf("accountsInitialize was unable to update the account round to %d: %v", rnd, err) - } - - au.log.Infof("accountsInitialize rebuilt the merkle trie with %d entries in %v", accountsCount, time.Now().Sub(startTrieBuildTime)) - } - au.balancesTrie = trie - return rnd, nil -} - -// upgradeDatabaseSchema0 upgrades the database schema from version 0 to version 1 -// -// Schema of version 0 is expected to be aligned with the schema used on version 2.0.8 or before. -// Any database of version 2.0.8 would be of version 0. At this point, the database might -// have the following tables : ( i.e. a newly created database would not have these ) -// * acctrounds -// * accounttotals -// * accountbase -// * assetcreators -// * storedcatchpoints -// * accounthashes -// * catchpointstate -// -// As the first step of the upgrade, the above tables are being created if they do not already exists. -// Following that, the assetcreators table is being altered by adding a new column to it (ctype). -// Last, in case the database was just created, it would get initialized with the following: -// The accountbase would get initialized with the au.initAccounts -// The accounttotals would get initialized to align with the initialization account added to accountbase -// The acctrounds would get updated to indicate that the balance matches round 0 -// -func (au *accountUpdates) upgradeDatabaseSchema0(ctx context.Context, tx *sql.Tx) (updatedDBVersion int32, newDatabase bool, err error) { - au.log.Infof("accountsInitialize initializing schema") - newDatabase, err = accountsInit(tx, au.initAccounts, au.initProto) - if err != nil { - return 0, newDatabase, fmt.Errorf("accountsInitialize unable to initialize schema : %v", err) - } - _, err = db.SetUserVersion(ctx, tx, 1) - if err != nil { - return 0, newDatabase, fmt.Errorf("accountsInitialize unable to update database schema version from 0 to 1: %v", err) - } - return 1, newDatabase, nil -} - -// upgradeDatabaseSchema1 upgrades the database schema from version 1 to version 2 -// -// The schema updated to version 2 intended to ensure that the encoding of all the accounts data is -// both canonical and identical across the entire network. On release 2.0.5 we released an upgrade to the messagepack. -// the upgraded messagepack was decoding the account data correctly, but would have different -// encoding compared to it's predecessor. As a result, some of the account data that was previously stored -// would have different encoded representation than the one on disk. -// To address this, this startup procedure would attempt to scan all the accounts data. for each account data, we would -// see if it's encoding aligns with the current messagepack encoder. If it doesn't we would update it's encoding. -// then, depending if we found any such account data, we would reset the merkle trie and stored catchpoints. -// once the upgrade is complete, the accountsInitialize would (if needed) rebuild the merkle trie using the new -// encoded accounts. -// -// This upgrade doesn't change any of the actual database schema ( i.e. tables, indexes ) but rather just performing -// a functional update to it's content. -// -func (au *accountUpdates) upgradeDatabaseSchema1(ctx context.Context, tx *sql.Tx, newDatabase bool) (updatedDBVersion int32, err error) { - var modifiedAccounts uint - if newDatabase { - goto schemaUpdateComplete - } - - // update accounts encoding. - au.log.Infof("accountsInitialize verifying accounts data encoding") - modifiedAccounts, err = reencodeAccounts(ctx, tx) - if err != nil { - return 0, err - } - - if modifiedAccounts > 0 { - au.log.Infof("accountsInitialize reencoded %d accounts", modifiedAccounts) - - au.log.Infof("accountsInitialize resetting account hashes") - // reset the merkle trie - err = resetAccountHashes(tx) - if err != nil { - return 0, fmt.Errorf("accountsInitialize unable to reset account hashes : %v", err) - } - - au.log.Infof("accountsInitialize preparing queries") - // initialize a new accountsq with the incoming transaction. - accountsq, err := accountsDbInit(tx, tx) - if err != nil { - return 0, fmt.Errorf("accountsInitialize unable to prepare queries : %v", err) - } - - // close the prepared statements when we're done with them. - defer accountsq.close() - - au.log.Infof("accountsInitialize resetting prior catchpoints") - // delete the last catchpoint label if we have any. - _, err = accountsq.writeCatchpointStateString(ctx, catchpointStateLastCatchpoint, "") - if err != nil { - return 0, fmt.Errorf("accountsInitialize unable to clear prior catchpoint : %v", err) - } - - au.log.Infof("accountsInitialize deleting stored catchpoints") - // delete catchpoints. - err = au.deleteStoredCatchpoints(ctx, accountsq) - if err != nil { - return 0, fmt.Errorf("accountsInitialize unable to delete stored catchpoints : %v", err) - } - } else { - au.log.Infof("accountsInitialize found that no accounts needed to be reencoded") - } - -schemaUpdateComplete: - // update version - _, err = db.SetUserVersion(ctx, tx, 2) - if err != nil { - return 0, fmt.Errorf("accountsInitialize unable to update database schema version from 1 to 2: %v", err) - } - return 2, nil -} - -// upgradeDatabaseSchema2 upgrades the database schema from version 2 to version 3 -// -// This upgrade only enables the database vacuuming which will take place once the upgrade process is complete. -// If the user has already specified the OptimizeAccountsDatabaseOnStartup flag in the configuration file, this -// step becomes a no-op. -// -func (au *accountUpdates) upgradeDatabaseSchema2(ctx context.Context, tx *sql.Tx, newDatabase bool) (updatedDBVersion int32, err error) { - if !newDatabase { - au.vacuumOnStartup = true - } - - // update version - _, err = db.SetUserVersion(ctx, tx, 3) - if err != nil { - return 0, fmt.Errorf("accountsInitialize unable to update database schema version from 2 to 3: %v", err) - } - return 3, nil -} - -// upgradeDatabaseSchema3 upgrades the database schema from version 3 to version 4, -// adding the normalizedonlinebalance column to the accountbase table. -func (au *accountUpdates) upgradeDatabaseSchema3(ctx context.Context, tx *sql.Tx, newDatabase bool) (updatedDBVersion int32, err error) { - err = accountsAddNormalizedBalance(tx, au.ledger.GenesisProto()) - if err != nil { - return 0, err - } - - // update version - _, err = db.SetUserVersion(ctx, tx, 4) - if err != nil { - return 0, fmt.Errorf("accountsInitialize unable to update database schema version from 3 to 4: %v", err) - } - return 4, nil -} - -// upgradeDatabaseSchema4 does not change the schema but migrates data: -// remove empty AccountData entries from accountbase table -func (au *accountUpdates) upgradeDatabaseSchema4(ctx context.Context, tx *sql.Tx, newDatabase bool) (updatedDBVersion int32, err error) { - queryAddresses := au.catchpointInterval != 0 - var numDeleted int64 - var addresses []basics.Address - - if newDatabase { - goto done - } - - numDeleted, addresses, err = removeEmptyAccountData(tx, queryAddresses) - if err != nil { - return 0, err - } - - if queryAddresses && len(addresses) > 0 { - mc, err := MakeMerkleCommitter(tx, false) - if err != nil { - // at this point record deleted and DB is pruned for account data - // if hash deletion fails just log it and do not abort startup - au.log.Errorf("upgradeDatabaseSchema4: failed to create merkle committer: %v", err) - goto done - } - trie, err := merkletrie.MakeTrie(mc, TrieMemoryConfig) - if err != nil { - au.log.Errorf("upgradeDatabaseSchema4: failed to create merkle trie: %v", err) - goto done - } - - var totalHashesDeleted int - for _, addr := range addresses { - hash := accountHashBuilder(addr, basics.AccountData{}, []byte{0x80}) - deleted, err := trie.Delete(hash) - if err != nil { - au.log.Errorf("upgradeDatabaseSchema4: failed to delete hash '%s' from merkle trie for account %v: %v", hex.EncodeToString(hash), addr, err) - } else { - if !deleted { - au.log.Warnf("upgradeDatabaseSchema4: failed to delete hash '%s' from merkle trie for account %v", hex.EncodeToString(hash), addr) - } else { - totalHashesDeleted++ - } - } - } - - if _, err = trie.Commit(); err != nil { - au.log.Errorf("upgradeDatabaseSchema4: failed to commit changes to merkle trie: %v", err) - } - - au.log.Infof("upgradeDatabaseSchema4: deleted %d hashes", totalHashesDeleted) - } - -done: - au.log.Infof("upgradeDatabaseSchema4: deleted %d rows", numDeleted) - - // update version - _, err = db.SetUserVersion(ctx, tx, 5) - if err != nil { - return 0, fmt.Errorf("accountsInitialize unable to update database schema version from 4 to 5: %v", err) - } - return 5, nil -} - -// deleteStoredCatchpoints iterates over the storedcatchpoints table and deletes all the files stored on disk. -// once all the files have been deleted, it would go ahead and remove the entries from the table. -func (au *accountUpdates) deleteStoredCatchpoints(ctx context.Context, dbQueries *accountsDbQueries) (err error) { - catchpointsFilesChunkSize := 50 - for { - fileNames, err := dbQueries.getOldestCatchpointFiles(ctx, catchpointsFilesChunkSize, 0) - if err != nil { - return err - } - if len(fileNames) == 0 { - break - } - - for round, fileName := range fileNames { - absCatchpointFileName := filepath.Join(au.dbDirectory, fileName) - err = os.Remove(absCatchpointFileName) - if err == nil || os.IsNotExist(err) { - // it's ok if the file doesn't exist. just remove it from the database and we'll be good to go. - } else { - // we can't delete the file, abort - - return fmt.Errorf("unable to delete old catchpoint file '%s' : %v", absCatchpointFileName, err) - } - // clear the entry from the database - err = dbQueries.storeCatchpoint(ctx, round, "", "", 0) - if err != nil { - return err - } - } - } - return nil -} - -// accountsUpdateBalances applies the given compactAccountDeltas to the merkle trie -func (au *accountUpdates) accountsUpdateBalances(accountsDeltas compactAccountDeltas) (err error) { - if au.catchpointInterval == 0 { - return nil - } - var added, deleted bool - accumulatedChanges := 0 - - for i := 0; i < accountsDeltas.len(); i++ { - addr, delta := accountsDeltas.getByIdx(i) - if !delta.old.accountData.IsZero() { - deleteHash := accountHashBuilder(addr, delta.old.accountData, protocol.Encode(&delta.old.accountData)) - deleted, err = au.balancesTrie.Delete(deleteHash) - if err != nil { - return fmt.Errorf("failed to delete hash '%s' from merkle trie for account %v: %w", hex.EncodeToString(deleteHash), addr, err) - } - if !deleted { - au.log.Warnf("failed to delete hash '%s' from merkle trie for account %v", hex.EncodeToString(deleteHash), addr) - } else { - accumulatedChanges++ - } - } - - if !delta.new.IsZero() { - addHash := accountHashBuilder(addr, delta.new, protocol.Encode(&delta.new)) - added, err = au.balancesTrie.Add(addHash) - if err != nil { - return fmt.Errorf("attempted to add duplicate hash '%s' to merkle trie for account %v: %w", hex.EncodeToString(addHash), addr, err) - } - if !added { - au.log.Warnf("attempted to add duplicate hash '%s' to merkle trie for account %v", hex.EncodeToString(addHash), addr) - } else { - accumulatedChanges++ - } - } - } - if accumulatedChanges >= trieAccumulatedChangesFlush { - accumulatedChanges = 0 - _, err = au.balancesTrie.Commit() - if err != nil { - return - } - } - - // write it all to disk. - if accumulatedChanges > 0 { - _, err = au.balancesTrie.Commit() - } + au.baseAccounts.init(au.log, baseAccountsPendingAccountsBufferSize, baseAccountsPendingAccountsWarnThreshold) return } @@ -1693,12 +733,11 @@ func (au *accountUpdates) newBlockImpl(blk bookkeeping.Block, delta ledgercore.S } if rnd != au.latest()+1 { - au.log.Panicf("accountUpdates: newBlockImpl %d too far in the future, dbRound %d, deltas %d", rnd, au.dbRound, len(au.deltas)) + au.log.Panicf("accountUpdates: newBlockImpl %d too far in the future, dbRound %d, deltas %d", rnd, au.cachedDBRound, len(au.deltas)) } au.deltas = append(au.deltas, delta.Accts) au.versions = append(au.versions, blk.CurrentProtocol) au.creatableDeltas = append(au.creatableDeltas, delta.Creatables) - au.roundDigest = append(au.roundDigest, blk.Digest()) au.deltasAccum = append(au.deltasAccum, delta.Accts.Len()+au.deltasAccum[len(au.deltasAccum)-1]) au.baseAccounts.flushPendingWrites() @@ -1748,7 +787,7 @@ func (au *accountUpdates) lookupWithRewards(rnd basics.Round, addr basics.Addres var persistedData persistedAccountData withRewards := true for { - currentDbRound := au.dbRound + currentDbRound := au.cachedDBRound currentDeltaLen := len(au.deltas) offset, err = au.roundOffset(rnd) if err != nil { @@ -1818,7 +857,7 @@ func (au *accountUpdates) lookupWithRewards(rnd basics.Round, addr basics.Addres } au.accountsMu.RLock() needUnlock = true - for currentDbRound >= au.dbRound && currentDeltaLen == len(au.deltas) { + for currentDbRound >= au.cachedDBRound && currentDeltaLen == len(au.deltas) { au.accountsReadCond.Wait() } } @@ -1839,7 +878,7 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add var offset uint64 var persistedData persistedAccountData for { - currentDbRound := au.dbRound + currentDbRound := au.cachedDBRound currentDeltaLen := len(au.deltas) offset, err = au.roundOffset(rnd) if err != nil { @@ -1904,7 +943,7 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add } au.accountsMu.RLock() needUnlock = true - for currentDbRound >= au.dbRound && currentDeltaLen == len(au.deltas) { + for currentDbRound >= au.cachedDBRound && currentDeltaLen == len(au.deltas) { au.accountsReadCond.Wait() } } else { @@ -1930,7 +969,7 @@ func (au *accountUpdates) getCreatorForRound(rnd basics.Round, cidx basics.Creat var dbRound basics.Round var offset uint64 for { - currentDbRound := au.dbRound + currentDbRound := au.cachedDBRound currentDeltaLen := len(au.deltas) offset, err = au.roundOffset(rnd) if err != nil { @@ -1978,7 +1017,7 @@ func (au *accountUpdates) getCreatorForRound(rnd basics.Round, cidx basics.Creat } au.accountsMu.RLock() unlock = true - for currentDbRound >= au.dbRound && currentDeltaLen == len(au.deltas) { + for currentDbRound >= au.cachedDBRound && currentDeltaLen == len(au.deltas) { au.accountsReadCond.Wait() } } else { @@ -1988,274 +1027,156 @@ func (au *accountUpdates) getCreatorForRound(rnd basics.Round, cidx basics.Creat } } -// accountsCreateCatchpointLabel creates a catchpoint label and write it. -func (au *accountUpdates) accountsCreateCatchpointLabel(committedRound basics.Round, totals ledgercore.AccountTotals, ledgerBlockDigest crypto.Digest, trieBalancesHash crypto.Digest) (label string, err error) { - cpLabel := ledgercore.MakeCatchpointLabel(committedRound, ledgerBlockDigest, trieBalancesHash, totals) - label = cpLabel.String() - _, err = au.accountsq.writeCatchpointStateString(context.Background(), catchpointStateLastCatchpoint, label) - return -} - // roundOffset calculates the offset of the given round compared to the current dbRound. Requires that the lock would be taken. func (au *accountUpdates) roundOffset(rnd basics.Round) (offset uint64, err error) { - if rnd < au.dbRound { + if rnd < au.cachedDBRound { err = &RoundOffsetError{ round: rnd, - dbRound: au.dbRound, + dbRound: au.cachedDBRound, } return } - off := uint64(rnd - au.dbRound) + off := uint64(rnd - au.cachedDBRound) if off > uint64(len(au.deltas)) { - err = fmt.Errorf("round %d too high: dbRound %d, deltas %d", rnd, au.dbRound, len(au.deltas)) + err = fmt.Errorf("round %d too high: dbRound %d, deltas %d", rnd, au.cachedDBRound, len(au.deltas)) return } return off, nil } -// commitSyncer is the syncer go-routine function which perform the database updates. Internally, it dequeues deferredCommits and -// send the tasks to commitRound for completing the operation. -func (au *accountUpdates) commitSyncer(deferredCommits chan deferredCommit) { - defer close(au.commitSyncerClosed) - for { - select { - case committedOffset, ok := <-deferredCommits: - if !ok { - return - } - au.commitRound(committedOffset.offset, committedOffset.dbRound, committedOffset.lookback) - case <-au.ctx.Done(): - // drain the pending commits queue: - drained := false - for !drained { - select { - case <-deferredCommits: - au.accountsWriting.Done() - default: - drained = true - } - } - return - } - } -} +func (au *accountUpdates) handleUnorderedCommit(offset uint64, dbRound basics.Round, lookback basics.Round) { -// commitRound write to the database a "chunk" of rounds, and update the dbRound accordingly. -func (au *accountUpdates) commitRound(offset uint64, dbRound basics.Round, lookback basics.Round) { - var stats telemetryspec.AccountsUpdateMetrics - var updateStats bool +} +// prepareCommit prepares data to write to the database a "chunk" of rounds, and update the cached dbRound accordingly. +func (au *accountUpdates) prepareCommit(dcc *deferredCommitContext) error { if au.logAccountUpdatesMetrics { now := time.Now() if now.Sub(au.lastMetricsLogTime) >= au.logAccountUpdatesInterval { - updateStats = true + dcc.updateStats = true au.lastMetricsLogTime = now } } - defer au.accountsWriting.Done() - au.accountsMu.RLock() - - // we can exit right away, as this is the result of mis-ordered call to committedUpTo. - if au.dbRound < dbRound || offset < uint64(au.dbRound-dbRound) { - // if this is an archival ledger, we might need to update the catchpointWriting variable. - if au.archivalLedger { - // determine if this was a catchpoint round - isCatchpointRound := ((offset + uint64(lookback+dbRound)) > 0) && (au.catchpointInterval != 0) && (0 == (uint64((offset + uint64(lookback+dbRound))) % au.catchpointInterval)) - if isCatchpointRound { - // it was a catchpoint round, so update the catchpointWriting to indicate that we're done. - atomic.StoreInt32(&au.catchpointWriting, 0) - } - } - au.accountsMu.RUnlock() - return - } - - // adjust the offset according to what happened meanwhile.. - offset -= uint64(au.dbRound - dbRound) - - // if this iteration need to flush out zero rounds, just return right away. - // this usecase can happen when two subsequent calls to committedUpTo concludes that the same rounds range need to be - // flush, without the commitRound have a chance of committing these rounds. - if offset == 0 { - au.accountsMu.RUnlock() - return - } - - dbRound = au.dbRound + offset := dcc.offset - newBase := basics.Round(offset) + dbRound - flushTime := time.Now() - isCatchpointRound := ((offset + uint64(lookback+dbRound)) > 0) && (au.catchpointInterval != 0) && (0 == (uint64((offset + uint64(lookback+dbRound))) % au.catchpointInterval)) + au.accountsMu.RLock() // create a copy of the deltas, round totals and protos for the range we're going to flush. - deltas := make([]ledgercore.AccountDeltas, offset, offset) - creatableDeltas := make([]map[basics.CreatableIndex]ledgercore.ModifiedCreatable, offset, offset) - roundTotals := make([]ledgercore.AccountTotals, offset+1, offset+1) - copy(deltas, au.deltas[:offset]) + dcc.deltas = make([]ledgercore.AccountDeltas, offset) + creatableDeltas := make([]map[basics.CreatableIndex]ledgercore.ModifiedCreatable, offset) + dcc.roundTotals = au.roundTotals[offset] + copy(dcc.deltas, au.deltas[:offset]) copy(creatableDeltas, au.creatableDeltas[:offset]) - copy(roundTotals, au.roundTotals[:offset+1]) // verify version correctness : all the entries in the au.versions[1:offset+1] should have the *same* version, and the committedUpTo should be enforcing that. if au.versions[1] != au.versions[offset] { au.accountsMu.RUnlock() - au.log.Errorf("attempted to commit series of rounds with non-uniform consensus versions") - return - } - consensusVersion := au.versions[1] - - var committedRoundDigest crypto.Digest - if isCatchpointRound { - committedRoundDigest = au.roundDigest[offset+uint64(lookback)-1] + // in scheduleCommit, we expect that this function to update the catchpointWriting when + // it's on a catchpoint round and it's an archival ledger. Doing this in a deferred function + // here would prevent us from "forgetting" to update this variable later on. + // The same is repeated in commitRound on errors. + if dcc.isCatchpointRound && au.archivalLedger { + atomic.StoreInt32(dcc.catchpointWriting, 0) + } + return fmt.Errorf("attempted to commit series of rounds with non-uniform consensus versions") } // compact all the deltas - when we're trying to persist multiple rounds, we might have the same account // being updated multiple times. When that happen, we can safely omit the intermediate updates. - compactDeltas := makeCompactAccountDeltas(deltas, au.baseAccounts) - compactCreatableDeltas := compactCreatableDeltas(creatableDeltas) + dcc.compactAccountDeltas = makeCompactAccountDeltas(dcc.deltas, au.baseAccounts) + dcc.compactCreatableDeltas = compactCreatableDeltas(creatableDeltas) au.accountsMu.RUnlock() - // in committedUpTo, we expect that this function to update the catchpointWriting when - // it's on a catchpoint round and it's an archival ledger. Doing this in a deferred function - // here would prevent us from "forgetting" to update this variable later on. - defer func() { - if isCatchpointRound && au.archivalLedger { - atomic.StoreInt32(&au.catchpointWriting, 0) - } - }() - - var catchpointLabel string - beforeUpdatingBalancesTime := time.Now() - var trieBalancesHash crypto.Digest + dcc.genesisProto = au.ledger.GenesisProto() - genesisProto := au.ledger.GenesisProto() - - start := time.Now() - ledgerCommitroundCount.Inc(nil) - var updatedPersistedAccounts []persistedAccountData - if updateStats { - stats.DatabaseCommitDuration = time.Duration(time.Now().UnixNano()) + if dcc.updateStats { + dcc.stats.DatabaseCommitDuration = time.Duration(time.Now().UnixNano()) } - err := au.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - treeTargetRound := basics.Round(0) - if au.catchpointInterval > 0 { - mc, err0 := MakeMerkleCommitter(tx, false) - if err0 != nil { - return err0 - } - if au.balancesTrie == nil { - trie, err := merkletrie.MakeTrie(mc, TrieMemoryConfig) - if err != nil { - au.log.Warnf("unable to create merkle trie during committedUpTo: %v", err) - return err - } - au.balancesTrie = trie - } else { - au.balancesTrie.SetCommitter(mc) - } - treeTargetRound = dbRound + basics.Round(offset) - } - db.ResetTransactionWarnDeadline(ctx, tx, time.Now().Add(accountsUpdatePerRoundHighWatermark*time.Duration(offset))) - - if updateStats { - stats.OldAccountPreloadDuration = time.Duration(time.Now().UnixNano()) - } - - err = compactDeltas.accountsLoadOld(tx) - if err != nil { - return err - } - - if updateStats { - stats.OldAccountPreloadDuration = time.Duration(time.Now().UnixNano()) - stats.OldAccountPreloadDuration - } - - err = totalsNewRounds(tx, deltas[:offset], compactDeltas, roundTotals[1:offset+1], config.Consensus[consensusVersion]) - if err != nil { - return err - } + return nil +} - if updateStats { - stats.MerkleTrieUpdateDuration = time.Duration(time.Now().UnixNano()) - } +// commitRound closure is called within the same transaction for all trackers +// it receives current offset and dbRound +func (au *accountUpdates) commitRound(ctx context.Context, tx *sql.Tx, dcc *deferredCommitContext) (err error) { + offset := dcc.offset + dbRound := dcc.oldBase - err = au.accountsUpdateBalances(compactDeltas) + defer func() { if err != nil { - return err + if dcc.isCatchpointRound && au.archivalLedger { + atomic.StoreInt32(dcc.catchpointWriting, 0) + } } + }() - if updateStats { - now := time.Duration(time.Now().UnixNano()) - stats.MerkleTrieUpdateDuration = now - stats.MerkleTrieUpdateDuration - stats.AccountsWritingDuration = now - } + _, err = db.ResetTransactionWarnDeadline(ctx, tx, time.Now().Add(accountsUpdatePerRoundHighWatermark*time.Duration(offset))) + if err != nil { + return err + } - // the updates of the actual account data is done last since the accountsNewRound would modify the compactDeltas old values - // so that we can update the base account back. - updatedPersistedAccounts, err = accountsNewRound(tx, compactDeltas, compactCreatableDeltas, genesisProto, dbRound+basics.Round(offset)) - if err != nil { - return err - } + if dcc.updateStats { + dcc.stats.OldAccountPreloadDuration = time.Duration(time.Now().UnixNano()) + } - if updateStats { - stats.AccountsWritingDuration = time.Duration(time.Now().UnixNano()) - stats.AccountsWritingDuration - } + err = dcc.compactAccountDeltas.accountsLoadOld(tx) + if err != nil { + return err + } - err = updateAccountsRound(tx, dbRound+basics.Round(offset), treeTargetRound) - if err != nil { - return err - } + if dcc.updateStats { + dcc.stats.OldAccountPreloadDuration = time.Duration(time.Now().UnixNano()) - dcc.stats.OldAccountPreloadDuration + } - if isCatchpointRound { - trieBalancesHash, err = au.balancesTrie.RootHash() - if err != nil { - return - } - } - return nil - }) - ledgerCommitroundMicros.AddMicrosecondsSince(start, nil) + err = accountsPutTotals(tx, dcc.roundTotals, false) if err != nil { - au.balancesTrie = nil - au.log.Warnf("unable to advance account snapshot (%d-%d): %v", dbRound, dbRound+basics.Round(offset), err) - return + return err } - if updateStats { - stats.DatabaseCommitDuration = time.Duration(time.Now().UnixNano()) - stats.DatabaseCommitDuration - stats.AccountsWritingDuration - stats.MerkleTrieUpdateDuration - stats.OldAccountPreloadDuration + if dcc.updateStats { + dcc.stats.AccountsWritingDuration = time.Duration(time.Now().UnixNano()) } - if isCatchpointRound { - catchpointLabel, err = au.accountsCreateCatchpointLabel(dbRound+basics.Round(offset)+lookback, roundTotals[offset], committedRoundDigest, trieBalancesHash) - if err != nil { - au.log.Warnf("commitRound : unable to create a catchpoint label: %v", err) - } + // the updates of the actual account data is done last since the accountsNewRound would modify the compactDeltas old values + // so that we can update the base account back. + dcc.updatedPersistedAccounts, err = accountsNewRound(tx, dcc.compactAccountDeltas, dcc.compactCreatableDeltas, dcc.genesisProto, dbRound+basics.Round(offset)) + if err != nil { + return err } - if au.balancesTrie != nil { - _, err = au.balancesTrie.Evict(false) - if err != nil { - au.log.Warnf("merkle trie failed to evict: %v", err) - } + + if dcc.updateStats { + dcc.stats.AccountsWritingDuration = time.Duration(time.Now().UnixNano()) - dcc.stats.AccountsWritingDuration } - if isCatchpointRound && catchpointLabel != "" { - au.lastCatchpointLabel = catchpointLabel + return +} + +func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitContext) { + if dcc.updateStats { + spentDuration := dcc.stats.DatabaseCommitDuration + dcc.stats.AccountsWritingDuration + dcc.stats.MerkleTrieUpdateDuration + dcc.stats.OldAccountPreloadDuration + dcc.stats.DatabaseCommitDuration = time.Duration(time.Now().UnixNano()) - spentDuration } - updatingBalancesDuration := time.Now().Sub(beforeUpdatingBalancesTime) - if updateStats { - stats.MemoryUpdatesDuration = time.Duration(time.Now().UnixNano()) + offset := dcc.offset + dbRound := dcc.oldBase + newBase := dcc.newBase + + dcc.updatingBalancesDuration = time.Since(dcc.flushTime) + + if dcc.updateStats { + dcc.stats.MemoryUpdatesDuration = time.Duration(time.Now().UnixNano()) } + au.accountsMu.Lock() // Drop reference counts to modified accounts, and evict them // from in-memory cache when no references remain. - for i := 0; i < compactDeltas.len(); i++ { - addr, acctUpdate := compactDeltas.getByIdx(i) + for i := 0; i < dcc.compactAccountDeltas.len(); i++ { + addr, acctUpdate := dcc.compactAccountDeltas.getByIdx(i) cnt := acctUpdate.ndeltas macct, ok := au.accounts[addr] if !ok { @@ -2272,11 +1193,11 @@ func (au *accountUpdates) commitRound(offset uint64, dbRound basics.Round, lookb } } - for _, persistedAcct := range updatedPersistedAccounts { + for _, persistedAcct := range dcc.updatedPersistedAccounts { au.baseAccounts.write(persistedAcct) } - for cidx, modCrt := range compactCreatableDeltas { + for cidx, modCrt := range dcc.compactCreatableDeltas { cnt := modCrt.Ndeltas mcreat, ok := au.creatables[cidx] if !ok { @@ -2295,39 +1216,29 @@ func (au *accountUpdates) commitRound(offset uint64, dbRound basics.Round, lookb au.deltas = au.deltas[offset:] au.deltasAccum = au.deltasAccum[offset:] - au.roundDigest = au.roundDigest[offset:] au.versions = au.versions[offset:] au.roundTotals = au.roundTotals[offset:] au.creatableDeltas = au.creatableDeltas[offset:] - au.dbRound = newBase - au.lastFlushTime = flushTime + au.cachedDBRound = newBase au.accountsMu.Unlock() - if updateStats { - stats.MemoryUpdatesDuration = time.Duration(time.Now().UnixNano()) - stats.MemoryUpdatesDuration + if dcc.updateStats { + dcc.stats.MemoryUpdatesDuration = time.Duration(time.Now().UnixNano()) - dcc.stats.MemoryUpdatesDuration } au.accountsReadCond.Broadcast() - if isCatchpointRound && au.archivalLedger && catchpointLabel != "" { - // generate the catchpoint file. This need to be done inline so that it will block any new accounts that from being written. - // the generateCatchpoint expects that the accounts data would not be modified in the background during it's execution. - au.generateCatchpoint(basics.Round(offset)+dbRound+lookback, catchpointLabel, committedRoundDigest, updatingBalancesDuration) - } - // log telemetry event - if updateStats { - stats.StartRound = uint64(dbRound) - stats.RoundsCount = offset - stats.UpdatedAccountsCount = uint64(len(updatedPersistedAccounts)) - stats.UpdatedCreatablesCount = uint64(len(compactCreatableDeltas)) + if dcc.updateStats { + dcc.stats.StartRound = uint64(dbRound) + dcc.stats.RoundsCount = offset + dcc.stats.UpdatedAccountsCount = uint64(len(dcc.updatedPersistedAccounts)) + dcc.stats.UpdatedCreatablesCount = uint64(len(dcc.compactCreatableDeltas)) - var details struct { - } - au.log.Metrics(telemetryspec.Accounts, stats, details) + var details struct{} + au.log.Metrics(telemetryspec.Accounts, dcc.stats, details) } - } // compactCreatableDeltas takes an array of creatables map deltas ( one array entry per round ), and compact the array into a single @@ -2363,189 +1274,11 @@ func compactCreatableDeltas(creatableDeltas []map[basics.CreatableIndex]ledgerco // latest returns the latest round func (au *accountUpdates) latest() basics.Round { - return au.dbRound + basics.Round(len(au.deltas)) -} - -// generateCatchpoint generates a single catchpoint file -func (au *accountUpdates) generateCatchpoint(committedRound basics.Round, label string, committedRoundDigest crypto.Digest, updatingBalancesDuration time.Duration) { - beforeGeneratingCatchpointTime := time.Now() - catchpointGenerationStats := telemetryspec.CatchpointGenerationEventDetails{ - BalancesWriteTime: uint64(updatingBalancesDuration.Nanoseconds()), - } - - // the retryCatchpointCreation is used to repeat the catchpoint file generation in case the node crashed / aborted during startup - // before the catchpoint file generation could be completed. - retryCatchpointCreation := false - au.log.Debugf("accountUpdates: generateCatchpoint: generating catchpoint for round %d", committedRound) - defer func() { - if !retryCatchpointCreation { - // clear the writingCatchpoint flag - _, err := au.accountsq.writeCatchpointStateUint64(context.Background(), catchpointStateWritingCatchpoint, uint64(0)) - if err != nil { - au.log.Warnf("accountUpdates: generateCatchpoint unable to clear catchpoint state '%s' for round %d: %v", catchpointStateWritingCatchpoint, committedRound, err) - } - } - }() - - _, err := au.accountsq.writeCatchpointStateUint64(context.Background(), catchpointStateWritingCatchpoint, uint64(committedRound)) - if err != nil { - au.log.Warnf("accountUpdates: generateCatchpoint unable to write catchpoint state '%s' for round %d: %v", catchpointStateWritingCatchpoint, committedRound, err) - return - } - - relCatchpointFileName := filepath.Join("catchpoints", catchpointRoundToPath(committedRound)) - absCatchpointFileName := filepath.Join(au.dbDirectory, relCatchpointFileName) - - more := true - const shortChunkExecutionDuration = 50 * time.Millisecond - const longChunkExecutionDuration = 1 * time.Second - var chunkExecutionDuration time.Duration - select { - case <-au.catchpointSlowWriting: - chunkExecutionDuration = longChunkExecutionDuration - default: - chunkExecutionDuration = shortChunkExecutionDuration - } - - var catchpointWriter *catchpointWriter - start := time.Now() - ledgerGeneratecatchpointCount.Inc(nil) - err = au.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - catchpointWriter = makeCatchpointWriter(au.ctx, absCatchpointFileName, tx, committedRound, committedRoundDigest, label) - for more { - stepCtx, stepCancelFunction := context.WithTimeout(au.ctx, chunkExecutionDuration) - writeStepStartTime := time.Now() - more, err = catchpointWriter.WriteStep(stepCtx) - // accumulate the actual time we've spent writing in this step. - catchpointGenerationStats.CPUTime += uint64(time.Now().Sub(writeStepStartTime).Nanoseconds()) - stepCancelFunction() - if more && err == nil { - // we just wrote some data, but there is more to be written. - // go to sleep for while. - // before going to sleep, extend the transaction timeout so that we won't get warnings: - db.ResetTransactionWarnDeadline(ctx, tx, time.Now().Add(1*time.Second)) - select { - case <-time.After(100 * time.Millisecond): - // increase the time slot allocated for writing the catchpoint, but stop when we get to the longChunkExecutionDuration limit. - // this would allow the catchpoint writing speed to ramp up while still leaving some cpu available. - chunkExecutionDuration *= 2 - if chunkExecutionDuration > longChunkExecutionDuration { - chunkExecutionDuration = longChunkExecutionDuration - } - case <-au.ctx.Done(): - retryCatchpointCreation = true - err2 := catchpointWriter.Abort() - if err2 != nil { - return fmt.Errorf("error removing catchpoint file : %v", err2) - } - return nil - case <-au.catchpointSlowWriting: - chunkExecutionDuration = longChunkExecutionDuration - } - } - if err != nil { - err = fmt.Errorf("unable to create catchpoint : %v", err) - err2 := catchpointWriter.Abort() - if err2 != nil { - au.log.Warnf("accountUpdates: generateCatchpoint: error removing catchpoint file : %v", err2) - } - return - } - } - return - }) - ledgerGeneratecatchpointMicros.AddMicrosecondsSince(start, nil) - - if err != nil { - au.log.Warnf("accountUpdates: generateCatchpoint: %v", err) - return - } - if catchpointWriter == nil { - au.log.Warnf("accountUpdates: generateCatchpoint: nil catchpointWriter") - return - } - - err = au.saveCatchpointFile(committedRound, relCatchpointFileName, catchpointWriter.GetSize(), catchpointWriter.GetCatchpoint()) - if err != nil { - au.log.Warnf("accountUpdates: generateCatchpoint: unable to save catchpoint: %v", err) - return - } - catchpointGenerationStats.FileSize = uint64(catchpointWriter.GetSize()) - catchpointGenerationStats.WritingDuration = uint64(time.Now().Sub(beforeGeneratingCatchpointTime).Nanoseconds()) - catchpointGenerationStats.AccountsCount = catchpointWriter.GetTotalAccounts() - catchpointGenerationStats.CatchpointLabel = catchpointWriter.GetCatchpoint() - au.log.EventWithDetails(telemetryspec.Accounts, telemetryspec.CatchpointGenerationEvent, catchpointGenerationStats) - au.log.With("writingDuration", catchpointGenerationStats.WritingDuration). - With("CPUTime", catchpointGenerationStats.CPUTime). - With("balancesWriteTime", catchpointGenerationStats.BalancesWriteTime). - With("accountsCount", catchpointGenerationStats.AccountsCount). - With("fileSize", catchpointGenerationStats.FileSize). - With("catchpointLabel", catchpointGenerationStats.CatchpointLabel). - Infof("Catchpoint file was generated") -} - -// catchpointRoundToPath calculate the catchpoint file path for a given round -func catchpointRoundToPath(rnd basics.Round) string { - irnd := int64(rnd) / 256 - outStr := "" - for irnd > 0 { - outStr = filepath.Join(outStr, fmt.Sprintf("%02x", irnd%256)) - irnd = irnd / 256 - } - outStr = filepath.Join(outStr, strconv.FormatInt(int64(rnd), 10)+".catchpoint") - return outStr -} - -// saveCatchpointFile stores the provided fileName as the stored catchpoint for the given round. -// after a successful insert operation to the database, it would delete up to 2 old entries, as needed. -// deleting 2 entries while inserting single entry allow us to adjust the size of the backing storage and have the -// database and storage realign. -func (au *accountUpdates) saveCatchpointFile(round basics.Round, fileName string, fileSize int64, catchpoint string) (err error) { - if au.catchpointFileHistoryLength != 0 { - err = au.accountsq.storeCatchpoint(context.Background(), round, fileName, catchpoint, fileSize) - if err != nil { - au.log.Warnf("accountUpdates: saveCatchpoint: unable to save catchpoint: %v", err) - return - } - } else { - err = os.Remove(fileName) - if err != nil { - au.log.Warnf("accountUpdates: saveCatchpoint: unable to remove file (%s): %v", fileName, err) - return - } - } - if au.catchpointFileHistoryLength == -1 { - return - } - var filesToDelete map[basics.Round]string - filesToDelete, err = au.accountsq.getOldestCatchpointFiles(context.Background(), 2, au.catchpointFileHistoryLength) - if err != nil { - return fmt.Errorf("unable to delete catchpoint file, getOldestCatchpointFiles failed : %v", err) - } - for round, fileToDelete := range filesToDelete { - absCatchpointFileName := filepath.Join(au.dbDirectory, fileToDelete) - err = os.Remove(absCatchpointFileName) - if err == nil || os.IsNotExist(err) { - // it's ok if the file doesn't exist. just remove it from the database and we'll be good to go. - err = nil - } else { - // we can't delete the file, abort - - return fmt.Errorf("unable to delete old catchpoint file '%s' : %v", absCatchpointFileName, err) - } - err = au.accountsq.storeCatchpoint(context.Background(), round, "", "", 0) - if err != nil { - return fmt.Errorf("unable to delete old catchpoint entry '%s' : %v", fileToDelete, err) - } - } - return + return au.cachedDBRound + basics.Round(len(au.deltas)) } // the vacuumDatabase performs a full vacuum of the accounts database. func (au *accountUpdates) vacuumDatabase(ctx context.Context) (err error) { - if !au.vacuumOnStartup { - return - } - // vaccumming the database would modify the some of the tables rowid, so we need to make sure any stored in-memory // rowid are flushed. au.baseAccounts.prune(0) |