summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormichaeldiamant <michaeldiamant@users.noreply.github.com>2022-11-16 15:53:19 -0500
committermichaeldiamant <michaeldiamant@users.noreply.github.com>2022-11-16 15:53:19 -0500
commita8e6cfa88b64efd2187e90f9801fe17e589843ca (patch)
tree3b5740e39da55b35c2ea3126748487fc98f03d97
parentf3b226c312258ed20ca56528989c115260f94463 (diff)
Try to wire in support for counting accounts vs KVskvs_catchpoint_stats
-rw-r--r--catchup/catchpointService.go29
-rw-r--r--ledger/catchupaccessor.go29
2 files changed, 38 insertions, 20 deletions
diff --git a/catchup/catchpointService.go b/catchup/catchpointService.go
index 1fa90c52a..d98caf478 100644
--- a/catchup/catchpointService.go
+++ b/catchup/catchpointService.go
@@ -53,6 +53,9 @@ type CatchpointCatchupStats struct {
TotalAccounts uint64
ProcessedAccounts uint64
VerifiedAccounts uint64
+ TotalKVs uint64
+ ProcessedKVs uint64
+ VerifiedKVs uint64
TotalBlocks uint64
AcquiredBlocks uint64
VerifiedBlocks uint64
@@ -304,7 +307,7 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
if err == nil {
cs.log.Infof("ledger downloaded in %d seconds", time.Since(start)/time.Second)
start = time.Now()
- err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedAccounts)
+ err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
if err == nil {
cs.log.Infof("built merkle trie in %d seconds", time.Since(start)/time.Second)
break
@@ -336,12 +339,27 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
return nil
}
-// updateVerifiedAccounts update the user's statistics for the given verified accounts
-func (cs *CatchpointCatchupService) updateVerifiedAccounts(addedTrieHashes uint64) {
+// updateVerifiedCounts update the user's statistics for the given verified hashes
+func (cs *CatchpointCatchupService) updateVerifiedCounts(hashes [][]byte) {
cs.statsMu.Lock()
defer cs.statsMu.Unlock()
+
+ addedTrieAccountHashes := uint64(0)
+ addedTrieKVHashes := uint64(0)
+ for _, hash := range hashes {
+ if (hash[4] == 3) { // KV
+ addedTrieKVHashes++
+ } else {
+ addedTrieAccountHashes++
+ }
+ }
+
if cs.stats.TotalAccountHashes > 0 {
- cs.stats.VerifiedAccounts = cs.stats.TotalAccounts * addedTrieHashes / cs.stats.TotalAccountHashes
+ cs.stats.VerifiedAccounts = cs.stats.TotalAccounts * addedTrieAccountHashes / cs.stats.TotalAccountHashes
+ }
+
+ if cs.stats.TotalKVs > 0 { // TODO Is TotalKVHashes needed?
+ cs.stats.VerifiedKVs = addedTrieKVHashes
}
}
@@ -757,8 +775,9 @@ func (cs *CatchpointCatchupService) updateLedgerFetcherProgress(fetcherStats *le
defer cs.statsMu.Unlock()
cs.stats.TotalAccounts = fetcherStats.TotalAccounts
cs.stats.ProcessedAccounts = fetcherStats.ProcessedAccounts
+ cs.stats.TotalKVs = fetcherStats.TotalKVs
+ cs.stats.ProcessedKVs = fetcherStats.ProcessedKVs
cs.stats.ProcessedBytes = fetcherStats.ProcessedBytes
- cs.stats.TotalKVHashes = fetcherStats.TotalKVHashes
cs.stats.TotalAccountHashes = fetcherStats.TotalAccountHashes
}
diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go
index 619e63ed7..4749a0cb4 100644
--- a/ledger/catchupaccessor.go
+++ b/ledger/catchupaccessor.go
@@ -59,7 +59,7 @@ type CatchpointCatchupAccessor interface {
ProcessStagingBalances(ctx context.Context, sectionName string, bytes []byte, progress *CatchpointCatchupAccessorProgress) (err error)
// BuildMerkleTrie inserts the account hashes into the merkle trie
- BuildMerkleTrie(ctx context.Context, progressUpdates func(uint64)) (err error)
+ BuildMerkleTrie(ctx context.Context, progressUpdates func([][]byte)) (err error)
// GetCatchupBlockRound returns the latest block round matching the current catchpoint
GetCatchupBlockRound(ctx context.Context) (round basics.Round, err error)
@@ -275,11 +275,12 @@ type CatchpointCatchupAccessorProgress struct {
TotalAccounts uint64
ProcessedAccounts uint64
ProcessedBytes uint64
+ TotalKVs uint64
+ ProcessedKVs uint64
TotalChunks uint64
SeenHeader bool
Version uint64
TotalAccountHashes uint64
- TotalKVHashes uint64
// Having the cachedTrie here would help to accelerate the catchup process since the trie maintain an internal cache of nodes.
// While rebuilding the trie, we don't want to force and reload (some) of these nodes into the cache for each catchpoint file chunk.
@@ -345,6 +346,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingContent(ctx context.Contex
if err == nil {
progress.SeenHeader = true
progress.TotalAccounts = fileHeader.TotalAccounts
+ progress.TotalKVs = fileHeader.TotalKVs
progress.TotalChunks = fileHeader.TotalChunks
progress.Version = fileHeader.Version
@@ -546,7 +548,6 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
start := time.Now()
errKVs = c.stagingWriter.writeKVs(ctx, chunkKVs)
- progress.TotalKVHashes += uint64(len(chunkKVs))
durKVs = time.Since(start)
}()
@@ -572,6 +573,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
ledgerProcessstagingbalancesMicros.AddMicrosecondsSince(start, nil)
progress.ProcessedBytes += uint64(len(bytes))
+ progress.ProcessedKVs += uint64(len(chunkKVs))
for _, acctBal := range normalizedAccountBalances {
progress.TotalAccountHashes += uint64(len(acctBal.accountHashes))
if !acctBal.partialBalance {
@@ -592,7 +594,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
}
// BuildMerkleTrie would process the catchpointpendinghashes and insert all the items in it into the merkle trie
-func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, progressUpdates func(uint64)) (err error) {
+func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, progressUpdates func([][]byte)) (err error) {
wdb := c.ledger.trackerDB().Wdb
rdb := c.ledger.trackerDB().Rdb
err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
@@ -652,11 +654,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro
var trie *merkletrie.Trie
uncommitedHashesCount := 0
keepWriting := true
- hashesWritten := uint64(0)
var mc *MerkleCommitter
- if progressUpdates != nil {
- progressUpdates(hashesWritten)
- }
err := wdb.Atomic(func(transactionCtx context.Context, tx *sql.Tx) (err error) {
// create the merkle trie for the balances
@@ -693,18 +691,22 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro
return
}
trie.SetCommitter(mc)
- for _, accountHash := range hashesToWrite {
+ for _, hash := range hashesToWrite {
var added bool
- added, err = trie.Add(accountHash)
+ added, err = trie.Add(hash)
if !added {
- return fmt.Errorf("CatchpointCatchupAccessorImpl::BuildMerkleTrie: The provided catchpoint file contained the same account more than once. hash '%s'", hex.EncodeToString(accountHash))
+ return fmt.Errorf("CatchpointCatchupAccessorImpl::BuildMerkleTrie: The provided catchpoint file contained the same account more than once. hash '%s'", hex.EncodeToString(hash))
}
if err != nil {
return
}
+
}
uncommitedHashesCount += len(hashesToWrite)
- hashesWritten += uint64(len(hashesToWrite))
+ if progressUpdates != nil {
+ progressUpdates(hashesToWrite)
+ }
+
return nil
})
if err != nil {
@@ -732,9 +734,6 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro
continue
}
}
- if progressUpdates != nil {
- progressUpdates(hashesWritten)
- }
}
if err != nil {
errChan <- err