diff options
Diffstat (limited to 'ledger/catchpointtracker.go')
-rw-r--r-- | ledger/catchpointtracker.go | 901 |
1 files changed, 901 insertions, 0 deletions
diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go new file mode 100644 index 000000000..e70b526da --- /dev/null +++ b/ledger/catchpointtracker.go @@ -0,0 +1,901 @@ +// Copyright (C) 2019-2021 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package ledger + +import ( + "context" + "database/sql" + "encoding/hex" + "fmt" + "os" + "path/filepath" + "strconv" + "sync/atomic" + "time" + + "github.com/algorand/go-deadlock" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/crypto/merkletrie" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/logging/telemetryspec" + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/util/db" +) + +// trieCachedNodesCount defines how many balances trie nodes we would like to keep around in memory. +// value was calibrated using BenchmarkCalibrateCacheNodeSize +var trieCachedNodesCount = 9000 + +// merkleCommitterNodesPerPage controls how many nodes will be stored in a single page +// value was calibrated using BenchmarkCalibrateNodesPerPage +var merkleCommitterNodesPerPage = int64(116) + +const ( + // trieRebuildAccountChunkSize defines the number of accounts that would get read at a single chunk + // before added to the trie during trie construction + trieRebuildAccountChunkSize = 16384 + // trieRebuildCommitFrequency defines the number of accounts that would get added before we call evict to commit the changes and adjust the memory cache. + trieRebuildCommitFrequency = 65536 + // trieAccumulatedChangesFlush defines the number of pending changes that would be applied to the merkle trie before + // we attempt to commit them to disk while writing a batch of rounds balances to disk. + trieAccumulatedChangesFlush = 256 +) + +// TrieMemoryConfig is the memory configuration setup used for the merkle trie. +var TrieMemoryConfig = merkletrie.MemoryConfig{ + NodesCountPerPage: merkleCommitterNodesPerPage, + CachedNodesCount: trieCachedNodesCount, + PageFillFactor: 0.95, + MaxChildrenPagesThreshold: 64, +} + +type catchpointTracker struct { + // dbDirectory is the directory where the ledger and block sql file resides as well as the parent directory for the catchup files to be generated + dbDirectory string + + // catchpointInterval is the configured interval at which the accountUpdates would generate catchpoint labels and catchpoint files. + catchpointInterval uint64 + + // catchpointFileHistoryLength defines how many catchpoint files we want to store back. + // 0 means don't store any, -1 mean unlimited and positive number suggest the number of most recent catchpoint files. + catchpointFileHistoryLength int + + // archivalLedger determines whether the associated ledger was configured as archival ledger or not. + archivalLedger bool + + // Prepared SQL statements for fast accounts DB lookups. + accountsq *accountsDbQueries + + // log copied from ledger + log logging.Logger + + // Connection to the database. + dbs db.Pair + + // The last catchpoint label that was written to the database. Should always align with what's in the database. + // note that this is the last catchpoint *label* and not the catchpoint file. + lastCatchpointLabel string + + // catchpointSlowWriting suggest to the accounts writer that it should finish writing up the catchpoint file ASAP. + // when this channel is closed, the accounts writer would try and complete the writing as soon as possible. + // otherwise, it would take it's time and perform periodic sleeps between chunks processing. + catchpointSlowWriting chan struct{} + + // catchpointWriting help to synchronize the catchpoint file writing. When this atomic variable is 0, no writing is going on. + // Any non-zero value indicates a catchpoint being written, or scheduled to be written. + catchpointWriting int32 + + // The Trie tracking the current account balances. Always matches the balances that were + // written to the database. + balancesTrie *merkletrie.Trie + + // catchpointsMu is the synchronization mutex for accessing the various non-static variables. + catchpointsMu deadlock.RWMutex + + // roundDigest stores the digest of the block for every round starting with dbRound and every round after it. + roundDigest []crypto.Digest +} + +// initialize initializes the catchpointTracker structure +func (ct *catchpointTracker) initialize(cfg config.Local, dbPathPrefix string) { + ct.dbDirectory = filepath.Dir(dbPathPrefix) + ct.archivalLedger = cfg.Archival + switch cfg.CatchpointTracking { + case -1: + ct.catchpointInterval = 0 + default: + // give a warning, then fall thought + logging.Base().Warnf("catchpointTracker: the CatchpointTracking field in the config.json file contains an invalid value (%d). The default value of 0 would be used instead.", cfg.CatchpointTracking) + fallthrough + case 0: + if ct.archivalLedger { + ct.catchpointInterval = cfg.CatchpointInterval + } else { + ct.catchpointInterval = 0 + } + case 1: + ct.catchpointInterval = cfg.CatchpointInterval + } + + ct.catchpointFileHistoryLength = cfg.CatchpointFileHistoryLength + if cfg.CatchpointFileHistoryLength < -1 { + ct.catchpointFileHistoryLength = -1 + } +} + +// GetLastCatchpointLabel retrieves the last catchpoint label that was stored to the database. +func (ct *catchpointTracker) GetLastCatchpointLabel() string { + ct.catchpointsMu.RLock() + defer ct.catchpointsMu.RUnlock() + return ct.lastCatchpointLabel +} + +// loadFromDisk loads the state of a tracker from persistent +// storage. The ledger argument allows loadFromDisk to load +// blocks from the database, or access its own state. The +// ledgerForTracker interface abstracts away the details of +// ledger internals so that individual trackers can be tested +// in isolation. +func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, lastBalancesRound basics.Round) (err error) { + ct.log = l.trackerLog() + ct.dbs = l.trackerDB() + + ct.roundDigest = nil + ct.catchpointWriting = 0 + // keep these channel closed if we're not generating catchpoint + ct.catchpointSlowWriting = make(chan struct{}, 1) + close(ct.catchpointSlowWriting) + + err = ct.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err0 := ct.accountsInitializeHashes(ctx, tx, lastBalancesRound) + if err0 != nil { + return err0 + } + return nil + }) + + if err != nil { + return err + } + + ct.accountsq, err = accountsInitDbQueries(ct.dbs.Rdb.Handle, ct.dbs.Wdb.Handle) + if err != nil { + return + } + + ct.lastCatchpointLabel, _, err = ct.accountsq.readCatchpointStateString(context.Background(), catchpointStateLastCatchpoint) + if err != nil { + return + } + + writingCatchpointRound, _, err := ct.accountsq.readCatchpointStateUint64(context.Background(), catchpointStateWritingCatchpoint) + if err != nil { + return err + } + if writingCatchpointRound == 0 || !ct.catchpointEnabled() { + return nil + } + var dbRound basics.Round + // make sure that the database is at the desired round. + err = ct.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + dbRound, err = accountsRound(tx) + return + }) + if err != nil { + return err + } + if dbRound != basics.Round(writingCatchpointRound) { + return nil + } + + blk, err := l.Block(dbRound) + if err != nil { + return err + } + blockHeaderDigest := blk.Digest() + + ct.generateCatchpoint(context.Background(), basics.Round(writingCatchpointRound), ct.lastCatchpointLabel, blockHeaderDigest, time.Duration(0)) + return nil +} + +// newBlock informs the tracker of a new block from round +// rnd and a given ledgercore.StateDelta as produced by BlockEvaluator. +func (ct *catchpointTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { + ct.catchpointsMu.Lock() + defer ct.catchpointsMu.Unlock() + ct.roundDigest = append(ct.roundDigest, blk.Digest()) +} + +// committedUpTo implements the ledgerTracker interface for catchpointTracker. +// The method informs the tracker that committedRound and all it's previous rounds have +// been committed to the block database. The method returns what is the oldest round +// number that can be removed from the blocks database as well as the lookback that this +// tracker maintains. +func (ct *catchpointTracker) committedUpTo(rnd basics.Round) (retRound, lookback basics.Round) { + return rnd, basics.Round(0) +} + +func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { + var hasMultipleIntermediateCatchpoint, hasIntermediateCatchpoint bool + + newBase := dcr.oldBase + basics.Round(dcr.offset) + + // check if there was a catchpoint between dcc.oldBase+lookback and dcc.oldBase+offset+lookback + if ct.catchpointInterval > 0 { + nextCatchpointRound := ((uint64(dcr.oldBase+dcr.lookback) + ct.catchpointInterval) / ct.catchpointInterval) * ct.catchpointInterval + + if nextCatchpointRound < uint64(dcr.oldBase+dcr.lookback)+dcr.offset { + mostRecentCatchpointRound := (uint64(committedRound) / ct.catchpointInterval) * ct.catchpointInterval + newBase = basics.Round(nextCatchpointRound) - dcr.lookback + if mostRecentCatchpointRound > nextCatchpointRound { + hasMultipleIntermediateCatchpoint = true + // skip if there is more than one catchpoint in queue + newBase = basics.Round(mostRecentCatchpointRound) - dcr.lookback + } + hasIntermediateCatchpoint = true + } + } + + // if we're still writing the previous balances, we can't move forward yet. + if ct.IsWritingCatchpointFile() { + // if we hit this path, it means that we're still writing a catchpoint. + // see if the new delta range contains another catchpoint. + if hasIntermediateCatchpoint { + // check if we're already attempting to perform fast-writing. + select { + case <-ct.catchpointSlowWriting: + // yes, we're already doing fast-writing. + default: + // no, we're not yet doing fast writing, make it so. + close(ct.catchpointSlowWriting) + } + } + return nil + } + + dcr.offset = uint64(newBase - dcr.oldBase) + + // check to see if this is a catchpoint round + dcr.isCatchpointRound = ct.isCatchpointRound(dcr.offset, dcr.oldBase, dcr.lookback) + + if dcr.isCatchpointRound && ct.archivalLedger { + // store non-zero ( all ones ) into the catchpointWriting atomic variable to indicate that a catchpoint is being written ( or, queued to be written ) + atomic.StoreInt32(&ct.catchpointWriting, int32(-1)) + ct.catchpointSlowWriting = make(chan struct{}, 1) + if hasMultipleIntermediateCatchpoint { + close(ct.catchpointSlowWriting) + } + } + + dcr.catchpointWriting = &ct.catchpointWriting + + return dcr +} + +// prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data. +// If an error returned the process is aborted. +func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error { + ct.catchpointsMu.RLock() + defer ct.catchpointsMu.RUnlock() + if dcc.isCatchpointRound { + dcc.committedRoundDigest = ct.roundDigest[dcc.offset+uint64(dcc.lookback)-1] + } + return nil +} + +func (ct *catchpointTracker) commitRound(ctx context.Context, tx *sql.Tx, dcc *deferredCommitContext) (err error) { + treeTargetRound := basics.Round(0) + offset := dcc.offset + dbRound := dcc.oldBase + + defer func() { + if err != nil { + if dcc.isCatchpointRound && ct.archivalLedger { + atomic.StoreInt32(&ct.catchpointWriting, 0) + } + } + }() + + if ct.catchpointEnabled() { + var mc *MerkleCommitter + mc, err = MakeMerkleCommitter(tx, false) + if err != nil { + return + } + + var trie *merkletrie.Trie + if ct.balancesTrie == nil { + trie, err = merkletrie.MakeTrie(mc, TrieMemoryConfig) + if err != nil { + ct.log.Warnf("unable to create merkle trie during committedUpTo: %v", err) + return err + } + ct.balancesTrie = trie + } else { + ct.balancesTrie.SetCommitter(mc) + } + treeTargetRound = dbRound + basics.Round(offset) + } + + if dcc.updateStats { + dcc.stats.MerkleTrieUpdateDuration = time.Duration(time.Now().UnixNano()) + } + + err = ct.accountsUpdateBalances(dcc.compactAccountDeltas) + if err != nil { + return err + } + + if dcc.updateStats { + now := time.Duration(time.Now().UnixNano()) + dcc.stats.MerkleTrieUpdateDuration = now - dcc.stats.MerkleTrieUpdateDuration + } + + err = updateAccountsHashRound(tx, treeTargetRound) + if err != nil { + return err + } + + if dcc.isCatchpointRound { + dcc.trieBalancesHash, err = ct.balancesTrie.RootHash() + if err != nil { + return err + } + } + return nil +} + +func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { + var err error + if dcc.isCatchpointRound { + dcc.catchpointLabel, err = ct.accountsCreateCatchpointLabel(dcc.newBase+dcc.lookback, dcc.roundTotals, dcc.committedRoundDigest, dcc.trieBalancesHash) + if err != nil { + ct.log.Warnf("commitRound : unable to create a catchpoint label: %v", err) + } + } + if ct.balancesTrie != nil { + _, err = ct.balancesTrie.Evict(false) + if err != nil { + ct.log.Warnf("merkle trie failed to evict: %v", err) + } + } + + if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + ct.lastCatchpointLabel = dcc.catchpointLabel + } + dcc.updatingBalancesDuration = time.Since(dcc.flushTime) + + if dcc.updateStats { + dcc.stats.MemoryUpdatesDuration = time.Duration(time.Now().UnixNano()) + } + + ct.catchpointsMu.Lock() + + ct.roundDigest = ct.roundDigest[dcc.offset:] + + ct.catchpointsMu.Unlock() + + if dcc.isCatchpointRound && ct.archivalLedger && dcc.catchpointLabel != "" { + // generate the catchpoint file. This need to be done inline so that it will block any new accounts that from being written. + // the generateCatchpoint expects that the accounts data would not be modified in the background during it's execution. + ct.generateCatchpoint(ctx, basics.Round(dcc.offset)+dcc.oldBase+dcc.lookback, dcc.catchpointLabel, dcc.committedRoundDigest, dcc.updatingBalancesDuration) + } + // in scheduleCommit, we expect that this function to update the catchpointWriting when + // it's on a catchpoint round and it's an archival ledger. Doing this in a deferred function + // here would prevent us from "forgetting" to update this variable later on. + if dcc.isCatchpointRound && ct.archivalLedger { + atomic.StoreInt32(dcc.catchpointWriting, 0) + } +} + +// handleUnorderedCommit is a special method for handling deferred commits that are out of order. +// Tracker might update own state in this case. For example, account updates tracker cancels +// scheduled catchpoint writing that deferred commit. +func (ct *catchpointTracker) handleUnorderedCommit(offset uint64, dbRound basics.Round, lookback basics.Round) { + // if this is an archival ledger, we might need to update the catchpointWriting variable. + if ct.archivalLedger { + // determine if this was a catchpoint round + if ct.isCatchpointRound(offset, dbRound, lookback) { + // it was a catchpoint round, so update the catchpointWriting to indicate that we're done. + atomic.StoreInt32(&ct.catchpointWriting, 0) + } + } +} + +// close terminates the tracker, reclaiming any resources +// like open database connections or goroutines. close may +// be called even if loadFromDisk() is not called or does +// not succeed. +func (ct *catchpointTracker) close() { + +} + +// accountsUpdateBalances applies the given compactAccountDeltas to the merkle trie +func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccountDeltas) (err error) { + if !ct.catchpointEnabled() { + return nil + } + var added, deleted bool + accumulatedChanges := 0 + + for i := 0; i < accountsDeltas.len(); i++ { + addr, delta := accountsDeltas.getByIdx(i) + if !delta.old.accountData.IsZero() { + deleteHash := accountHashBuilder(addr, delta.old.accountData, protocol.Encode(&delta.old.accountData)) + deleted, err = ct.balancesTrie.Delete(deleteHash) + if err != nil { + return fmt.Errorf("failed to delete hash '%s' from merkle trie for account %v: %w", hex.EncodeToString(deleteHash), addr, err) + } + if !deleted { + ct.log.Warnf("failed to delete hash '%s' from merkle trie for account %v", hex.EncodeToString(deleteHash), addr) + } else { + accumulatedChanges++ + } + } + + if !delta.new.IsZero() { + addHash := accountHashBuilder(addr, delta.new, protocol.Encode(&delta.new)) + added, err = ct.balancesTrie.Add(addHash) + if err != nil { + return fmt.Errorf("attempted to add duplicate hash '%s' to merkle trie for account %v: %w", hex.EncodeToString(addHash), addr, err) + } + if !added { + ct.log.Warnf("attempted to add duplicate hash '%s' to merkle trie for account %v", hex.EncodeToString(addHash), addr) + } else { + accumulatedChanges++ + } + } + } + if accumulatedChanges >= trieAccumulatedChangesFlush { + accumulatedChanges = 0 + _, err = ct.balancesTrie.Commit() + if err != nil { + return + } + } + + // write it all to disk. + if accumulatedChanges > 0 { + _, err = ct.balancesTrie.Commit() + } + + return +} + +// IsWritingCatchpointFile returns true when a catchpoint file is being generated. The function is used by the catchup service +// to avoid memory pressure until the catchpoint file writing is complete. +func (ct *catchpointTracker) IsWritingCatchpointFile() bool { + return atomic.LoadInt32(&ct.catchpointWriting) != 0 +} + +// isCatchpointRound returns true if the round at the given offset, dbRound with the provided lookback should be a catchpoint round. +func (ct *catchpointTracker) isCatchpointRound(offset uint64, dbRound basics.Round, lookback basics.Round) bool { + return ((offset + uint64(lookback+dbRound)) > 0) && (ct.catchpointInterval != 0) && ((uint64((offset + uint64(lookback+dbRound))) % ct.catchpointInterval) == 0) +} + +// accountsCreateCatchpointLabel creates a catchpoint label and write it. +func (ct *catchpointTracker) accountsCreateCatchpointLabel(committedRound basics.Round, totals ledgercore.AccountTotals, ledgerBlockDigest crypto.Digest, trieBalancesHash crypto.Digest) (label string, err error) { + cpLabel := ledgercore.MakeCatchpointLabel(committedRound, ledgerBlockDigest, trieBalancesHash, totals) + label = cpLabel.String() + _, err = ct.accountsq.writeCatchpointStateString(context.Background(), catchpointStateLastCatchpoint, label) + return +} + +// generateCatchpoint generates a single catchpoint file +func (ct *catchpointTracker) generateCatchpoint(ctx context.Context, committedRound basics.Round, label string, committedRoundDigest crypto.Digest, updatingBalancesDuration time.Duration) { + beforeGeneratingCatchpointTime := time.Now() + catchpointGenerationStats := telemetryspec.CatchpointGenerationEventDetails{ + BalancesWriteTime: uint64(updatingBalancesDuration.Nanoseconds()), + } + + // the retryCatchpointCreation is used to repeat the catchpoint file generation in case the node crashed / aborted during startup + // before the catchpoint file generation could be completed. + retryCatchpointCreation := false + ct.log.Debugf("accountUpdates: generateCatchpoint: generating catchpoint for round %d", committedRound) + defer func() { + if !retryCatchpointCreation { + // clear the writingCatchpoint flag + _, err := ct.accountsq.writeCatchpointStateUint64(context.Background(), catchpointStateWritingCatchpoint, uint64(0)) + if err != nil { + ct.log.Warnf("accountUpdates: generateCatchpoint unable to clear catchpoint state '%s' for round %d: %v", catchpointStateWritingCatchpoint, committedRound, err) + } + } + }() + + _, err := ct.accountsq.writeCatchpointStateUint64(context.Background(), catchpointStateWritingCatchpoint, uint64(committedRound)) + if err != nil { + ct.log.Warnf("accountUpdates: generateCatchpoint unable to write catchpoint state '%s' for round %d: %v", catchpointStateWritingCatchpoint, committedRound, err) + return + } + + relCatchpointFileName := filepath.Join("catchpoints", catchpointRoundToPath(committedRound)) + absCatchpointFileName := filepath.Join(ct.dbDirectory, relCatchpointFileName) + + more := true + const shortChunkExecutionDuration = 50 * time.Millisecond + const longChunkExecutionDuration = 1 * time.Second + var chunkExecutionDuration time.Duration + select { + case <-ct.catchpointSlowWriting: + chunkExecutionDuration = longChunkExecutionDuration + default: + chunkExecutionDuration = shortChunkExecutionDuration + } + + var catchpointWriter *catchpointWriter + start := time.Now() + ledgerGeneratecatchpointCount.Inc(nil) + err = ct.dbs.Rdb.Atomic(func(dbCtx context.Context, tx *sql.Tx) (err error) { + catchpointWriter = makeCatchpointWriter(ctx, absCatchpointFileName, tx, committedRound, committedRoundDigest, label) + for more { + stepCtx, stepCancelFunction := context.WithTimeout(ctx, chunkExecutionDuration) + writeStepStartTime := time.Now() + more, err = catchpointWriter.WriteStep(stepCtx) + // accumulate the actual time we've spent writing in this step. + catchpointGenerationStats.CPUTime += uint64(time.Since(writeStepStartTime).Nanoseconds()) + stepCancelFunction() + if more && err == nil { + // we just wrote some data, but there is more to be written. + // go to sleep for while. + // before going to sleep, extend the transaction timeout so that we won't get warnings: + _, err0 := db.ResetTransactionWarnDeadline(dbCtx, tx, time.Now().Add(1*time.Second)) + if err0 != nil { + ct.log.Warnf("catchpointTracker: generateCatchpoint: failed to reset transaction warn deadline : %v", err0) + } + select { + case <-time.After(100 * time.Millisecond): + // increase the time slot allocated for writing the catchpoint, but stop when we get to the longChunkExecutionDuration limit. + // this would allow the catchpoint writing speed to ramp up while still leaving some cpu available. + chunkExecutionDuration *= 2 + if chunkExecutionDuration > longChunkExecutionDuration { + chunkExecutionDuration = longChunkExecutionDuration + } + case <-ctx.Done(): + retryCatchpointCreation = true + err2 := catchpointWriter.Abort() + if err2 != nil { + return fmt.Errorf("error removing catchpoint file : %v", err2) + } + return nil + case <-ct.catchpointSlowWriting: + chunkExecutionDuration = longChunkExecutionDuration + } + } + if err != nil { + err = fmt.Errorf("unable to create catchpoint : %v", err) + err2 := catchpointWriter.Abort() + if err2 != nil { + ct.log.Warnf("accountUpdates: generateCatchpoint: error removing catchpoint file : %v", err2) + } + return + } + } + return + }) + ledgerGeneratecatchpointMicros.AddMicrosecondsSince(start, nil) + + if err != nil { + ct.log.Warnf("accountUpdates: generateCatchpoint: %v", err) + return + } + if catchpointWriter == nil { + ct.log.Warnf("accountUpdates: generateCatchpoint: nil catchpointWriter") + return + } + + err = ct.saveCatchpointFile(committedRound, relCatchpointFileName, catchpointWriter.GetSize(), catchpointWriter.GetCatchpoint()) + if err != nil { + ct.log.Warnf("accountUpdates: generateCatchpoint: unable to save catchpoint: %v", err) + return + } + catchpointGenerationStats.FileSize = uint64(catchpointWriter.GetSize()) + catchpointGenerationStats.WritingDuration = uint64(time.Since(beforeGeneratingCatchpointTime).Nanoseconds()) + catchpointGenerationStats.AccountsCount = catchpointWriter.GetTotalAccounts() + catchpointGenerationStats.CatchpointLabel = catchpointWriter.GetCatchpoint() + ct.log.EventWithDetails(telemetryspec.Accounts, telemetryspec.CatchpointGenerationEvent, catchpointGenerationStats) + ct.log.With("writingDuration", catchpointGenerationStats.WritingDuration). + With("CPUTime", catchpointGenerationStats.CPUTime). + With("balancesWriteTime", catchpointGenerationStats.BalancesWriteTime). + With("accountsCount", catchpointGenerationStats.AccountsCount). + With("fileSize", catchpointGenerationStats.FileSize). + With("catchpointLabel", catchpointGenerationStats.CatchpointLabel). + Infof("Catchpoint file was generated") +} + +// catchpointRoundToPath calculate the catchpoint file path for a given round +func catchpointRoundToPath(rnd basics.Round) string { + irnd := int64(rnd) / 256 + outStr := "" + for irnd > 0 { + outStr = filepath.Join(outStr, fmt.Sprintf("%02x", irnd%256)) + irnd = irnd / 256 + } + outStr = filepath.Join(outStr, strconv.FormatInt(int64(rnd), 10)+".catchpoint") + return outStr +} + +// saveCatchpointFile stores the provided fileName as the stored catchpoint for the given round. +// after a successful insert operation to the database, it would delete up to 2 old entries, as needed. +// deleting 2 entries while inserting single entry allow us to adjust the size of the backing storage and have the +// database and storage realign. +func (ct *catchpointTracker) saveCatchpointFile(round basics.Round, fileName string, fileSize int64, catchpoint string) (err error) { + if ct.catchpointFileHistoryLength != 0 { + err = ct.accountsq.storeCatchpoint(context.Background(), round, fileName, catchpoint, fileSize) + if err != nil { + ct.log.Warnf("accountUpdates: saveCatchpoint: unable to save catchpoint: %v", err) + return + } + } else { + err = os.Remove(fileName) + if err != nil { + ct.log.Warnf("accountUpdates: saveCatchpoint: unable to remove file (%s): %v", fileName, err) + return + } + } + if ct.catchpointFileHistoryLength == -1 { + return + } + var filesToDelete map[basics.Round]string + filesToDelete, err = ct.accountsq.getOldestCatchpointFiles(context.Background(), 2, ct.catchpointFileHistoryLength) + if err != nil { + return fmt.Errorf("unable to delete catchpoint file, getOldestCatchpointFiles failed : %v", err) + } + for round, fileToDelete := range filesToDelete { + absCatchpointFileName := filepath.Join(ct.dbDirectory, fileToDelete) + err = os.Remove(absCatchpointFileName) + if err == nil || os.IsNotExist(err) { + // it's ok if the file doesn't exist. just remove it from the database and we'll be good to go. + err = nil + } else { + // we can't delete the file, abort - + return fmt.Errorf("unable to delete old catchpoint file '%s' : %v", absCatchpointFileName, err) + } + err = ct.accountsq.storeCatchpoint(context.Background(), round, "", "", 0) + if err != nil { + return fmt.Errorf("unable to delete old catchpoint entry '%s' : %v", fileToDelete, err) + } + } + return +} + +// GetCatchpointStream returns a ReadCloseSizer to the catchpoint file associated with the provided round +func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseSizer, error) { + dbFileName := "" + fileSize := int64(0) + start := time.Now() + ledgerGetcatchpointCount.Inc(nil) + err := ct.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + dbFileName, _, fileSize, err = getCatchpoint(tx, round) + return + }) + ledgerGetcatchpointMicros.AddMicrosecondsSince(start, nil) + if err != nil && err != sql.ErrNoRows { + // we had some sql error. + return nil, fmt.Errorf("accountUpdates: getCatchpointStream: unable to lookup catchpoint %d: %v", round, err) + } + if dbFileName != "" { + catchpointPath := filepath.Join(ct.dbDirectory, dbFileName) + file, err := os.OpenFile(catchpointPath, os.O_RDONLY, 0666) + if err == nil && file != nil { + return &readCloseSizer{ReadCloser: file, size: fileSize}, nil + } + // else, see if this is a file-not-found error + if os.IsNotExist(err) { + // the database told us that we have this file.. but we couldn't find it. + // delete it from the database. + err := ct.saveCatchpointFile(round, "", 0, "") + if err != nil { + ct.log.Warnf("accountUpdates: getCatchpointStream: unable to delete missing catchpoint entry: %v", err) + return nil, err + } + + return nil, ledgercore.ErrNoEntry{} + } + // it's some other error. + return nil, fmt.Errorf("accountUpdates: getCatchpointStream: unable to open catchpoint file '%s' %v", catchpointPath, err) + } + + // if the database doesn't know about that round, see if we have that file anyway: + fileName := filepath.Join("catchpoints", catchpointRoundToPath(round)) + catchpointPath := filepath.Join(ct.dbDirectory, fileName) + file, err := os.OpenFile(catchpointPath, os.O_RDONLY, 0666) + if err == nil && file != nil { + // great, if found that we should have had this in the database.. add this one now : + fileInfo, err := file.Stat() + if err != nil { + // we couldn't get the stat, so just return with the file. + return &readCloseSizer{ReadCloser: file, size: -1}, nil + } + + err = ct.saveCatchpointFile(round, fileName, fileInfo.Size(), "") + if err != nil { + ct.log.Warnf("accountUpdates: getCatchpointStream: unable to save missing catchpoint entry: %v", err) + } + return &readCloseSizer{ReadCloser: file, size: fileInfo.Size()}, nil + } + return nil, ledgercore.ErrNoEntry{} +} + +// deleteStoredCatchpoints iterates over the storedcatchpoints table and deletes all the files stored on disk. +// once all the files have been deleted, it would go ahead and remove the entries from the table. +func deleteStoredCatchpoints(ctx context.Context, dbQueries *accountsDbQueries, dbDirectory string) (err error) { + catchpointsFilesChunkSize := 50 + for { + fileNames, err := dbQueries.getOldestCatchpointFiles(ctx, catchpointsFilesChunkSize, 0) + if err != nil { + return err + } + if len(fileNames) == 0 { + break + } + + for round, fileName := range fileNames { + absCatchpointFileName := filepath.Join(dbDirectory, fileName) + err = os.Remove(absCatchpointFileName) + if err == nil || os.IsNotExist(err) { + // it's ok if the file doesn't exist. just remove it from the database and we'll be good to go. + } else { + // we can't delete the file, abort - + return fmt.Errorf("unable to delete old catchpoint file '%s' : %v", absCatchpointFileName, err) + } + // clear the entry from the database + err = dbQueries.storeCatchpoint(ctx, round, "", "", 0) + if err != nil { + return err + } + } + } + return nil +} + +// accountHashBuilder calculates the hash key used for the trie by combining the account address and the account data +func accountHashBuilder(addr basics.Address, accountData basics.AccountData, encodedAccountData []byte) []byte { + hash := make([]byte, 4+crypto.DigestSize) + // write out the lowest 32 bits of the reward base. This should improve the caching of the trie by allowing + // recent updated to be in-cache, and "older" nodes will be left alone. + for i, rewards := 3, accountData.RewardsBase; i >= 0; i, rewards = i-1, rewards>>8 { + // the following takes the rewards & 255 -> hash[i] + hash[i] = byte(rewards) + } + entryHash := crypto.Hash(append(addr[:], encodedAccountData[:]...)) + copy(hash[4:], entryHash[:]) + return hash[:] +} + +func (ct *catchpointTracker) catchpointEnabled() bool { + return ct.catchpointInterval != 0 +} + +// accountsInitializeHashes initializes account hashes. +// as part of the initialization, it tests if a hash table matches to account base and updates the former. +func (ct *catchpointTracker) accountsInitializeHashes(ctx context.Context, tx *sql.Tx, rnd basics.Round) error { + hashRound, err := accountsHashRound(tx) + if err != nil { + return err + } + + if hashRound != rnd { + // if the hashed round is different then the base round, something was modified, and the accounts aren't in sync + // with the hashes. + err = resetAccountHashes(tx) + if err != nil { + return err + } + // if catchpoint is disabled on this node, we could complete the initialization right here. + if !ct.catchpointEnabled() { + return nil + } + } + + // create the merkle trie for the balances + committer, err := MakeMerkleCommitter(tx, false) + if err != nil { + return fmt.Errorf("accountsInitialize was unable to makeMerkleCommitter: %v", err) + } + + trie, err := merkletrie.MakeTrie(committer, TrieMemoryConfig) + if err != nil { + return fmt.Errorf("accountsInitialize was unable to MakeTrie: %v", err) + } + + // we might have a database that was previously initialized, and now we're adding the balances trie. In that case, we need to add all the existing balances to this trie. + // we can figure this out by examining the hash of the root: + rootHash, err := trie.RootHash() + if err != nil { + return fmt.Errorf("accountsInitialize was unable to retrieve trie root hash: %v", err) + } + + if rootHash.IsZero() { + ct.log.Infof("accountsInitialize rebuilding merkle trie for round %d", rnd) + accountBuilderIt := makeOrderedAccountsIter(tx, trieRebuildAccountChunkSize) + defer accountBuilderIt.Close(ctx) + startTrieBuildTime := time.Now() + accountsCount := 0 + lastRebuildTime := startTrieBuildTime + pendingAccounts := 0 + totalOrderedAccounts := 0 + for { + accts, processedRows, err := accountBuilderIt.Next(ctx) + if err == sql.ErrNoRows { + // the account builder would return sql.ErrNoRows when no more data is available. + break + } else if err != nil { + return err + } + + if len(accts) > 0 { + accountsCount += len(accts) + pendingAccounts += len(accts) + for _, acct := range accts { + added, err := trie.Add(acct.digest) + if err != nil { + return fmt.Errorf("accountsInitialize was unable to add changes to trie: %v", err) + } + if !added { + ct.log.Warnf("accountsInitialize attempted to add duplicate hash '%s' to merkle trie for account %v", hex.EncodeToString(acct.digest), acct.address) + } + } + + if pendingAccounts >= trieRebuildCommitFrequency { + // this trie Evict will commit using the current transaction. + // if anything goes wrong, it will still get rolled back. + _, err = trie.Evict(true) + if err != nil { + return fmt.Errorf("accountsInitialize was unable to commit changes to trie: %v", err) + } + pendingAccounts = 0 + } + + if time.Since(lastRebuildTime) > 5*time.Second { + // let the user know that the trie is still being rebuilt. + ct.log.Infof("accountsInitialize still building the trie, and processed so far %d accounts", accountsCount) + lastRebuildTime = time.Now() + } + } else if processedRows > 0 { + totalOrderedAccounts += processedRows + // if it's not ordered, we can ignore it for now; we'll just increase the counters and emit logs periodically. + if time.Since(lastRebuildTime) > 5*time.Second { + // let the user know that the trie is still being rebuilt. + ct.log.Infof("accountsInitialize still building the trie, and hashed so far %d accounts", totalOrderedAccounts) + lastRebuildTime = time.Now() + } + } + } + + // this trie Evict will commit using the current transaction. + // if anything goes wrong, it will still get rolled back. + _, err = trie.Evict(true) + if err != nil { + return fmt.Errorf("accountsInitialize was unable to commit changes to trie: %v", err) + } + + // we've just updated the merkle trie, update the hashRound to reflect that. + err = updateAccountsHashRound(tx, rnd) + if err != nil { + return fmt.Errorf("accountsInitialize was unable to update the account hash round to %d: %v", rnd, err) + } + + ct.log.Infof("accountsInitialize rebuilt the merkle trie with %d entries in %v", accountsCount, time.Since(startTrieBuildTime)) + } + ct.balancesTrie = trie + return nil +} |