diff options
author | michaeldiamant <michaeldiamant@users.noreply.github.com> | 2022-11-16 15:53:19 -0500 |
---|---|---|
committer | michaeldiamant <michaeldiamant@users.noreply.github.com> | 2022-11-16 15:53:19 -0500 |
commit | a8e6cfa88b64efd2187e90f9801fe17e589843ca (patch) | |
tree | 3b5740e39da55b35c2ea3126748487fc98f03d97 | |
parent | f3b226c312258ed20ca56528989c115260f94463 (diff) |
Try to wire in support for counting accounts vs KVskvs_catchpoint_stats
-rw-r--r-- | catchup/catchpointService.go | 29 | ||||
-rw-r--r-- | ledger/catchupaccessor.go | 29 |
2 files changed, 38 insertions, 20 deletions
diff --git a/catchup/catchpointService.go b/catchup/catchpointService.go index 1fa90c52a..d98caf478 100644 --- a/catchup/catchpointService.go +++ b/catchup/catchpointService.go @@ -53,6 +53,9 @@ type CatchpointCatchupStats struct { TotalAccounts uint64 ProcessedAccounts uint64 VerifiedAccounts uint64 + TotalKVs uint64 + ProcessedKVs uint64 + VerifiedKVs uint64 TotalBlocks uint64 AcquiredBlocks uint64 VerifiedBlocks uint64 @@ -304,7 +307,7 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) { if err == nil { cs.log.Infof("ledger downloaded in %d seconds", time.Since(start)/time.Second) start = time.Now() - err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedAccounts) + err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts) if err == nil { cs.log.Infof("built merkle trie in %d seconds", time.Since(start)/time.Second) break @@ -336,12 +339,27 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) { return nil } -// updateVerifiedAccounts update the user's statistics for the given verified accounts -func (cs *CatchpointCatchupService) updateVerifiedAccounts(addedTrieHashes uint64) { +// updateVerifiedCounts update the user's statistics for the given verified hashes +func (cs *CatchpointCatchupService) updateVerifiedCounts(hashes [][]byte) { cs.statsMu.Lock() defer cs.statsMu.Unlock() + + addedTrieAccountHashes := uint64(0) + addedTrieKVHashes := uint64(0) + for _, hash := range hashes { + if (hash[4] == 3) { // KV + addedTrieKVHashes++ + } else { + addedTrieAccountHashes++ + } + } + if cs.stats.TotalAccountHashes > 0 { - cs.stats.VerifiedAccounts = cs.stats.TotalAccounts * addedTrieHashes / cs.stats.TotalAccountHashes + cs.stats.VerifiedAccounts = cs.stats.TotalAccounts * addedTrieAccountHashes / cs.stats.TotalAccountHashes + } + + if cs.stats.TotalKVs > 0 { // TODO Is TotalKVHashes needed? + cs.stats.VerifiedKVs = addedTrieKVHashes } } @@ -757,8 +775,9 @@ func (cs *CatchpointCatchupService) updateLedgerFetcherProgress(fetcherStats *le defer cs.statsMu.Unlock() cs.stats.TotalAccounts = fetcherStats.TotalAccounts cs.stats.ProcessedAccounts = fetcherStats.ProcessedAccounts + cs.stats.TotalKVs = fetcherStats.TotalKVs + cs.stats.ProcessedKVs = fetcherStats.ProcessedKVs cs.stats.ProcessedBytes = fetcherStats.ProcessedBytes - cs.stats.TotalKVHashes = fetcherStats.TotalKVHashes cs.stats.TotalAccountHashes = fetcherStats.TotalAccountHashes } diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go index 619e63ed7..4749a0cb4 100644 --- a/ledger/catchupaccessor.go +++ b/ledger/catchupaccessor.go @@ -59,7 +59,7 @@ type CatchpointCatchupAccessor interface { ProcessStagingBalances(ctx context.Context, sectionName string, bytes []byte, progress *CatchpointCatchupAccessorProgress) (err error) // BuildMerkleTrie inserts the account hashes into the merkle trie - BuildMerkleTrie(ctx context.Context, progressUpdates func(uint64)) (err error) + BuildMerkleTrie(ctx context.Context, progressUpdates func([][]byte)) (err error) // GetCatchupBlockRound returns the latest block round matching the current catchpoint GetCatchupBlockRound(ctx context.Context) (round basics.Round, err error) @@ -275,11 +275,12 @@ type CatchpointCatchupAccessorProgress struct { TotalAccounts uint64 ProcessedAccounts uint64 ProcessedBytes uint64 + TotalKVs uint64 + ProcessedKVs uint64 TotalChunks uint64 SeenHeader bool Version uint64 TotalAccountHashes uint64 - TotalKVHashes uint64 // Having the cachedTrie here would help to accelerate the catchup process since the trie maintain an internal cache of nodes. // While rebuilding the trie, we don't want to force and reload (some) of these nodes into the cache for each catchpoint file chunk. @@ -345,6 +346,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingContent(ctx context.Contex if err == nil { progress.SeenHeader = true progress.TotalAccounts = fileHeader.TotalAccounts + progress.TotalKVs = fileHeader.TotalKVs progress.TotalChunks = fileHeader.TotalChunks progress.Version = fileHeader.Version @@ -546,7 +548,6 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte start := time.Now() errKVs = c.stagingWriter.writeKVs(ctx, chunkKVs) - progress.TotalKVHashes += uint64(len(chunkKVs)) durKVs = time.Since(start) }() @@ -572,6 +573,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte ledgerProcessstagingbalancesMicros.AddMicrosecondsSince(start, nil) progress.ProcessedBytes += uint64(len(bytes)) + progress.ProcessedKVs += uint64(len(chunkKVs)) for _, acctBal := range normalizedAccountBalances { progress.TotalAccountHashes += uint64(len(acctBal.accountHashes)) if !acctBal.partialBalance { @@ -592,7 +594,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte } // BuildMerkleTrie would process the catchpointpendinghashes and insert all the items in it into the merkle trie -func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, progressUpdates func(uint64)) (err error) { +func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, progressUpdates func([][]byte)) (err error) { wdb := c.ledger.trackerDB().Wdb rdb := c.ledger.trackerDB().Rdb err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { @@ -652,11 +654,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro var trie *merkletrie.Trie uncommitedHashesCount := 0 keepWriting := true - hashesWritten := uint64(0) var mc *MerkleCommitter - if progressUpdates != nil { - progressUpdates(hashesWritten) - } err := wdb.Atomic(func(transactionCtx context.Context, tx *sql.Tx) (err error) { // create the merkle trie for the balances @@ -693,18 +691,22 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro return } trie.SetCommitter(mc) - for _, accountHash := range hashesToWrite { + for _, hash := range hashesToWrite { var added bool - added, err = trie.Add(accountHash) + added, err = trie.Add(hash) if !added { - return fmt.Errorf("CatchpointCatchupAccessorImpl::BuildMerkleTrie: The provided catchpoint file contained the same account more than once. hash '%s'", hex.EncodeToString(accountHash)) + return fmt.Errorf("CatchpointCatchupAccessorImpl::BuildMerkleTrie: The provided catchpoint file contained the same account more than once. hash '%s'", hex.EncodeToString(hash)) } if err != nil { return } + } uncommitedHashesCount += len(hashesToWrite) - hashesWritten += uint64(len(hashesToWrite)) + if progressUpdates != nil { + progressUpdates(hashesToWrite) + } + return nil }) if err != nil { @@ -732,9 +734,6 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro continue } } - if progressUpdates != nil { - progressUpdates(hashesWritten) - } } if err != nil { errChan <- err |