summaryrefslogtreecommitdiff
path: root/ledger/trackerdb.go
diff options
context:
space:
mode:
Diffstat (limited to 'ledger/trackerdb.go')
-rw-r--r--ledger/trackerdb.go365
1 files changed, 365 insertions, 0 deletions
diff --git a/ledger/trackerdb.go b/ledger/trackerdb.go
new file mode 100644
index 000000000..dd73f1a8a
--- /dev/null
+++ b/ledger/trackerdb.go
@@ -0,0 +1,365 @@
+// Copyright (C) 2019-2021 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package ledger
+
+import (
+ "context"
+ "database/sql"
+ "encoding/hex"
+ "fmt"
+
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/crypto/merkletrie"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/logging"
+ "github.com/algorand/go-algorand/util/db"
+)
+
+type trackerDBParams struct {
+ initAccounts map[basics.Address]basics.AccountData
+ initProto config.ConsensusParams
+ catchpointEnabled bool
+ dbPathPrefix string
+}
+
+type trackerDBSchemaInitializer struct {
+ trackerDBParams
+
+ // schemaVersion contains current db version
+ schemaVersion int32
+ // vacuumOnStartup controls whether the accounts database would get vacuumed on startup.
+ vacuumOnStartup bool
+ // newDatabase indicates if the db is newly created
+ newDatabase bool
+
+ log logging.Logger
+}
+
+type trackerDBInitParams struct {
+ schemaVersion int32
+ vacuumOnStartup bool
+}
+
+// trackerDBInitialize initializes the accounts DB if needed and return current account round.
+// as part of the initialization, it tests the current database schema version, and perform upgrade
+// procedures to bring it up to the database schema supported by the binary.
+func trackerDBInitialize(l ledgerForTracker, catchpointEnabled bool, dbPathPrefix string) (mgr trackerDBInitParams, err error) {
+ dbs := l.trackerDB()
+ log := l.trackerLog()
+
+ lastestBlockRound := l.Latest()
+
+ if l.GenesisAccounts() == nil {
+ err = fmt.Errorf("trackerDBInitialize: initAccounts not set")
+ return
+ }
+
+ err = dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
+ tp := trackerDBParams{l.GenesisAccounts(), l.GenesisProto(), catchpointEnabled, dbPathPrefix}
+ var err0 error
+ mgr, err0 = trackerDBInitializeImpl(ctx, tx, tp, log)
+ if err0 != nil {
+ return err0
+ }
+ lastBalancesRound, err := accountsRound(tx)
+ if err != nil {
+ return err
+ }
+ // Check for blocks DB and tracker DB un-sync
+ if lastBalancesRound > lastestBlockRound {
+ log.Warnf("trackerDBInitialize: resetting accounts DB (on round %v, but blocks DB's latest is %v)", lastBalancesRound, lastestBlockRound)
+ err0 = accountsReset(tx)
+ if err0 != nil {
+ return err0
+ }
+ mgr, err0 = trackerDBInitializeImpl(ctx, tx, tp, log)
+ if err0 != nil {
+ return err0
+ }
+ }
+ return nil
+ })
+
+ return
+}
+
+// trackerDBInitializeImpl initializes the accounts DB if needed and return current account round.
+// as part of the initialization, it tests the current database schema version, and perform upgrade
+// procedures to bring it up to the database schema supported by the binary.
+func trackerDBInitializeImpl(ctx context.Context, tx *sql.Tx, params trackerDBParams, log logging.Logger) (mgr trackerDBInitParams, err error) {
+ // check current database version.
+ dbVersion, err := db.GetUserVersion(ctx, tx)
+ if err != nil {
+ return trackerDBInitParams{}, fmt.Errorf("trackerDBInitialize unable to read database schema version : %v", err)
+ }
+
+ tu := trackerDBSchemaInitializer{
+ trackerDBParams: params,
+ schemaVersion: dbVersion,
+ log: log,
+ }
+
+ // if database version is greater than supported by current binary, write a warning. This would keep the existing
+ // fallback behavior where we could use an older binary iff the schema happen to be backward compatible.
+ if tu.version() > accountDBVersion {
+ tu.log.Warnf("trackerDBInitialize database schema version is %d, but algod supports only %d", tu.version(), accountDBVersion)
+ }
+
+ if tu.version() < accountDBVersion {
+ tu.log.Infof("trackerDBInitialize upgrading database schema from version %d to version %d", tu.version(), accountDBVersion)
+ // newDatabase is determined during the tables creations. If we're filling the database with accounts,
+ // then we set this variable to true, allowing some of the upgrades to be skipped.
+ for tu.version() < accountDBVersion {
+ tu.log.Infof("trackerDBInitialize performing upgrade from version %d", tu.version())
+ // perform the initialization/upgrade
+ switch tu.version() {
+ case 0:
+ err = tu.upgradeDatabaseSchema0(ctx, tx)
+ if err != nil {
+ tu.log.Warnf("trackerDBInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 0 : %v", err)
+ return
+ }
+ case 1:
+ err = tu.upgradeDatabaseSchema1(ctx, tx)
+ if err != nil {
+ tu.log.Warnf("trackerDBInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 1 : %v", err)
+ return
+ }
+ case 2:
+ err = tu.upgradeDatabaseSchema2(ctx, tx)
+ if err != nil {
+ tu.log.Warnf("trackerDBInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 2 : %v", err)
+ return
+ }
+ case 3:
+ err = tu.upgradeDatabaseSchema3(ctx, tx)
+ if err != nil {
+ tu.log.Warnf("trackerDBInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 3 : %v", err)
+ return
+ }
+ case 4:
+ err = tu.upgradeDatabaseSchema4(ctx, tx)
+ if err != nil {
+ tu.log.Warnf("trackerDBInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 4 : %v", err)
+ return
+ }
+ default:
+ return trackerDBInitParams{}, fmt.Errorf("trackerDBInitialize unable to upgrade database from schema version %d", tu.schemaVersion)
+ }
+ }
+ tu.log.Infof("trackerDBInitialize database schema upgrade complete")
+ }
+
+ return trackerDBInitParams{tu.schemaVersion, tu.vacuumOnStartup}, nil
+}
+
+func (tu *trackerDBSchemaInitializer) setVersion(ctx context.Context, tx *sql.Tx, version int32) (err error) {
+ oldVersion := tu.schemaVersion
+ tu.schemaVersion = version
+ _, err = db.SetUserVersion(ctx, tx, tu.schemaVersion)
+ if err != nil {
+ return fmt.Errorf("trackerDBInitialize unable to update database schema version from %d to %d: %v", oldVersion, version, err)
+ }
+ return nil
+}
+
+func (tu trackerDBSchemaInitializer) version() int32 {
+ return tu.schemaVersion
+}
+
+// upgradeDatabaseSchema0 upgrades the database schema from version 0 to version 1
+//
+// Schema of version 0 is expected to be aligned with the schema used on version 2.0.8 or before.
+// Any database of version 2.0.8 would be of version 0. At this point, the database might
+// have the following tables : ( i.e. a newly created database would not have these )
+// * acctrounds
+// * accounttotals
+// * accountbase
+// * assetcreators
+// * storedcatchpoints
+// * accounthashes
+// * catchpointstate
+//
+// As the first step of the upgrade, the above tables are being created if they do not already exists.
+// Following that, the assetcreators table is being altered by adding a new column to it (ctype).
+// Last, in case the database was just created, it would get initialized with the following:
+// The accountbase would get initialized with the au.initAccounts
+// The accounttotals would get initialized to align with the initialization account added to accountbase
+// The acctrounds would get updated to indicate that the balance matches round 0
+//
+func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema0(ctx context.Context, tx *sql.Tx) (err error) {
+ tu.log.Infof("upgradeDatabaseSchema0 initializing schema")
+ tu.newDatabase, err = accountsInit(tx, tu.initAccounts, tu.initProto)
+ if err != nil {
+ return fmt.Errorf("upgradeDatabaseSchema0 unable to initialize schema : %v", err)
+ }
+ return tu.setVersion(ctx, tx, 1)
+}
+
+// upgradeDatabaseSchema1 upgrades the database schema from version 1 to version 2
+//
+// The schema updated to version 2 intended to ensure that the encoding of all the accounts data is
+// both canonical and identical across the entire network. On release 2.0.5 we released an upgrade to the messagepack.
+// the upgraded messagepack was decoding the account data correctly, but would have different
+// encoding compared to it's predecessor. As a result, some of the account data that was previously stored
+// would have different encoded representation than the one on disk.
+// To address this, this startup procedure would attempt to scan all the accounts data. for each account data, we would
+// see if it's encoding aligns with the current messagepack encoder. If it doesn't we would update it's encoding.
+// then, depending if we found any such account data, we would reset the merkle trie and stored catchpoints.
+// once the upgrade is complete, the trackerDBInitialize would (if needed) rebuild the merkle trie using the new
+// encoded accounts.
+//
+// This upgrade doesn't change any of the actual database schema ( i.e. tables, indexes ) but rather just performing
+// a functional update to it's content.
+//
+func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema1(ctx context.Context, tx *sql.Tx) (err error) {
+ var modifiedAccounts uint
+ if tu.newDatabase {
+ goto schemaUpdateComplete
+ }
+
+ // update accounts encoding.
+ tu.log.Infof("upgradeDatabaseSchema1 verifying accounts data encoding")
+ modifiedAccounts, err = reencodeAccounts(ctx, tx)
+ if err != nil {
+ return err
+ }
+
+ if modifiedAccounts > 0 {
+ tu.log.Infof("upgradeDatabaseSchema1 reencoded %d accounts", modifiedAccounts)
+
+ tu.log.Infof("upgradeDatabaseSchema1 resetting account hashes")
+ // reset the merkle trie
+ err = resetAccountHashes(tx)
+ if err != nil {
+ return fmt.Errorf("upgradeDatabaseSchema1 unable to reset account hashes : %v", err)
+ }
+
+ tu.log.Infof("upgradeDatabaseSchema1 preparing queries")
+ // initialize a new accountsq with the incoming transaction.
+ accountsq, err := accountsInitDbQueries(tx, tx)
+ if err != nil {
+ return fmt.Errorf("upgradeDatabaseSchema1 unable to prepare queries : %v", err)
+ }
+
+ // close the prepared statements when we're done with them.
+ defer accountsq.close()
+
+ tu.log.Infof("upgradeDatabaseSchema1 resetting prior catchpoints")
+ // delete the last catchpoint label if we have any.
+ _, err = accountsq.writeCatchpointStateString(ctx, catchpointStateLastCatchpoint, "")
+ if err != nil {
+ return fmt.Errorf("upgradeDatabaseSchema1 unable to clear prior catchpoint : %v", err)
+ }
+
+ tu.log.Infof("upgradeDatabaseSchema1 deleting stored catchpoints")
+ // delete catchpoints.
+ err = deleteStoredCatchpoints(ctx, accountsq, tu.dbPathPrefix)
+ if err != nil {
+ return fmt.Errorf("upgradeDatabaseSchema1 unable to delete stored catchpoints : %v", err)
+ }
+ } else {
+ tu.log.Infof("upgradeDatabaseSchema1 found that no accounts needed to be reencoded")
+ }
+
+schemaUpdateComplete:
+ return tu.setVersion(ctx, tx, 2)
+}
+
+// upgradeDatabaseSchema2 upgrades the database schema from version 2 to version 3
+//
+// This upgrade only enables the database vacuuming which will take place once the upgrade process is complete.
+// If the user has already specified the OptimizeAccountsDatabaseOnStartup flag in the configuration file, this
+// step becomes a no-op.
+//
+func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema2(ctx context.Context, tx *sql.Tx) (err error) {
+ if !tu.newDatabase {
+ tu.vacuumOnStartup = true
+ }
+
+ // update version
+ return tu.setVersion(ctx, tx, 3)
+}
+
+// upgradeDatabaseSchema3 upgrades the database schema from version 3 to version 4,
+// adding the normalizedonlinebalance column to the accountbase table.
+func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema3(ctx context.Context, tx *sql.Tx) (err error) {
+ err = accountsAddNormalizedBalance(tx, tu.initProto)
+ if err != nil {
+ return err
+ }
+
+ // update version
+ return tu.setVersion(ctx, tx, 4)
+}
+
+// upgradeDatabaseSchema4 does not change the schema but migrates data:
+// remove empty AccountData entries from accountbase table
+func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema4(ctx context.Context, tx *sql.Tx) (err error) {
+ var numDeleted int64
+ var addresses []basics.Address
+
+ if tu.newDatabase {
+ goto done
+ }
+
+ numDeleted, addresses, err = removeEmptyAccountData(tx, tu.catchpointEnabled)
+ if err != nil {
+ return err
+ }
+
+ if tu.catchpointEnabled && len(addresses) > 0 {
+ mc, err := MakeMerkleCommitter(tx, false)
+ if err != nil {
+ // at this point record deleted and DB is pruned for account data
+ // if hash deletion fails just log it and do not abort startup
+ tu.log.Errorf("upgradeDatabaseSchema4: failed to create merkle committer: %v", err)
+ goto done
+ }
+ trie, err := merkletrie.MakeTrie(mc, TrieMemoryConfig)
+ if err != nil {
+ tu.log.Errorf("upgradeDatabaseSchema4: failed to create merkle trie: %v", err)
+ goto done
+ }
+
+ var totalHashesDeleted int
+ for _, addr := range addresses {
+ hash := accountHashBuilder(addr, basics.AccountData{}, []byte{0x80})
+ deleted, err := trie.Delete(hash)
+ if err != nil {
+ tu.log.Errorf("upgradeDatabaseSchema4: failed to delete hash '%s' from merkle trie for account %v: %v", hex.EncodeToString(hash), addr, err)
+ } else {
+ if !deleted {
+ tu.log.Warnf("upgradeDatabaseSchema4: failed to delete hash '%s' from merkle trie for account %v", hex.EncodeToString(hash), addr)
+ } else {
+ totalHashesDeleted++
+ }
+ }
+ }
+
+ if _, err = trie.Commit(); err != nil {
+ tu.log.Errorf("upgradeDatabaseSchema4: failed to commit changes to merkle trie: %v", err)
+ }
+
+ tu.log.Infof("upgradeDatabaseSchema4: deleted %d hashes", totalHashesDeleted)
+ }
+
+done:
+ tu.log.Infof("upgradeDatabaseSchema4: deleted %d rows", numDeleted)
+
+ return tu.setVersion(ctx, tx, 5)
+}