summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com>2022-02-03 20:20:17 -0500
committerGitHub <noreply@github.com>2022-02-03 20:20:17 -0500
commit0d65a0f93f214087569939ad2cea0c0f1b15d8d8 (patch)
treecf87a22f727a6041b103748cc9591502f432af25
parent6230dc55c78918a2d4685a4b1e6ca78fdc35888b (diff)
Optimize catchpointdump utility (#3561)
## Summary * Do not load everything into memory but write directly in into a file This creates extremely high number of reallocation and CPU and RAM wasting. * Add cmd options for downloading only (no loading), and for downloading from a single relay # Perf data: | param | current | new | |-----------|--------|---------| | RAM | 5 GB | 200 MB | | Total time | 823 s | 441 s | Downloading time went down from minutes to tens of seconds.
-rw-r--r--cmd/catchpointdump/file.go31
-rw-r--r--cmd/catchpointdump/net.go152
2 files changed, 82 insertions, 101 deletions
diff --git a/cmd/catchpointdump/file.go b/cmd/catchpointdump/file.go
index 78c7a4c4d..3335575b0 100644
--- a/cmd/catchpointdump/file.go
+++ b/cmd/catchpointdump/file.go
@@ -19,13 +19,11 @@ package main
import (
"archive/tar"
"bufio"
- "bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
- "io/ioutil"
"os"
"strings"
"time"
@@ -59,9 +57,14 @@ var fileCmd = &cobra.Command{
cmd.HelpFunc()(cmd, args)
return
}
- tarFileBytes, err := ioutil.ReadFile(tarFile)
- if err != nil || len(tarFileBytes) == 0 {
- reportErrorf("Unable to read '%s' : %v", tarFile, err)
+ stats, err := os.Stat(tarFile)
+ if err != nil {
+ reportErrorf("Unable to stat '%s' : %v", tarFile, err)
+ }
+ tarSize := stats.Size()
+
+ if tarSize == 0 {
+ reportErrorf("Empty file '%s' : %v", tarFile, err)
}
genesisInitState := ledgercore.InitState{}
cfg := config.GetDefaultLocal()
@@ -84,7 +87,14 @@ var fileCmd = &cobra.Command{
reportErrorf("Unable to initialize catchup database : %v", err)
}
var fileHeader ledger.CatchpointFileHeader
- fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, tarFileBytes)
+
+ reader, err := os.Open(tarFile)
+ if err != nil {
+ reportErrorf("Unable to read '%s' : %v", tarFile, err)
+ }
+ defer reader.Close()
+
+ fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, tarSize)
if err != nil {
reportErrorf("Unable to load catchpoint file into in-memory database : %v", err)
}
@@ -115,15 +125,14 @@ func printLoadCatchpointProgressLine(progress int, barLength int, dld int64) {
fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld))
}
-func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, fileBytes []byte) (fileHeader ledger.CatchpointFileHeader, err error) {
+func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, tarFile io.Reader, tarSize int64) (fileHeader ledger.CatchpointFileHeader, err error) {
fmt.Printf("\n")
printLoadCatchpointProgressLine(0, 50, 0)
lastProgressUpdate := time.Now()
progress := uint64(0)
defer printLoadCatchpointProgressLine(0, 0, 0)
- reader := bytes.NewReader(fileBytes)
- tarReader := tar.NewReader(reader)
+ tarReader := tar.NewReader(tarFile)
var downloadProgress ledger.CatchpointCatchupAccessorProgress
for {
header, err := tarReader.Next()
@@ -158,9 +167,9 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc
// we already know it's valid, since we validated that above.
protocol.Decode(balancesBlockBytes, &fileHeader)
}
- if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond && len(fileBytes) > 0 {
+ if time.Since(lastProgressUpdate) > 50*time.Millisecond && tarSize > 0 {
lastProgressUpdate = time.Now()
- printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(len(fileBytes))), 50, int64(progress))
+ printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(tarSize)), 50, int64(progress))
}
}
}
diff --git a/cmd/catchpointdump/net.go b/cmd/catchpointdump/net.go
index da4292bb8..f4a31bff9 100644
--- a/cmd/catchpointdump/net.go
+++ b/cmd/catchpointdump/net.go
@@ -40,6 +40,8 @@ import (
var networkName string
var round int
var relayAddress string
+var singleCatchpoint bool
+var downloadOnly bool
const (
escapeCursorUp = string("\033[A") // Cursor Up
@@ -52,12 +54,14 @@ func init() {
netCmd.Flags().StringVarP(&networkName, "net", "n", "", "Specify the network name ( i.e. mainnet.algorand.network )")
netCmd.Flags().IntVarP(&round, "round", "r", 0, "Specify the round number ( i.e. 7700000 )")
netCmd.Flags().StringVarP(&relayAddress, "relay", "p", "", "Relay address to use ( i.e. r-ru.algorand-mainnet.network:4160 )")
+ netCmd.Flags().BoolVarP(&singleCatchpoint, "single", "s", true, "Download/process only from a single relay")
+ netCmd.Flags().BoolVarP(&downloadOnly, "download", "l", false, "Download only, do not process")
}
var netCmd = &cobra.Command{
Use: "net",
- Short: "Download and decode (possibly all) catchpoint files from all or specified the relay(s) on the network for a particular round",
- Long: "Download and decode (possibly all) catchpoint files from all or specified the relay(s) on the network for a particular round",
+ Short: "Download and decode (possibly all) catchpoint files from possibly all or specified the relay(s) on the network for a particular round",
+ Long: "Download and decode (possibly all) catchpoint files from possibly all or specified the relay(s) on the network for a particular round",
Args: validateNoPosArgsFn,
Run: func(cmd *cobra.Command, args []string) {
if networkName == "" || round == 0 {
@@ -77,21 +81,20 @@ var netCmd = &cobra.Command{
}
for _, addr := range addrs {
- catchpointFileBytes, err := downloadCatchpoint(addr)
- if err != nil || catchpointFileBytes == nil {
- reportInfof("failed to download catchpoint from '%s' : %v", addr, err)
- continue
- }
- err = saveCatchpointTarFile(addr, catchpointFileBytes)
+ tarName, err := downloadCatchpoint(addr, round)
if err != nil {
- reportInfof("failed to save catchpoint file for '%s' : %v", addr, err)
+ reportInfof("failed to download catchpoint from '%s' : %v", addr, err)
continue
}
- err = makeFileDump(addr, catchpointFileBytes)
+ err = makeFileDump(addr, tarName)
if err != nil {
reportInfof("failed to make a dump from tar file for '%s' : %v", addr, err)
continue
}
+ if singleCatchpoint {
+ // a catchpoint processes successfully, exit if needed
+ break
+ }
}
},
}
@@ -144,13 +147,13 @@ func printDownloadProgressLine(progress int, barLength int, url string, dld int6
fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld))
}
-func downloadCatchpoint(addr string) ([]byte, error) {
+func downloadCatchpoint(addr string, round int) (tarName string, err error) {
genesisID := strings.Split(networkName, ".")[0] + "-v1.0"
url := "http://" + addr + "/v1/" + genesisID + "/ledger/" + strconv.FormatUint(uint64(round), 36)
fmt.Printf("downloading from %s\n", url)
request, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
- return nil, err
+ return
}
timeoutContext, timeoutContextCancel := context.WithTimeout(context.Background(), config.GetDefaultLocal().MaxCatchpointDownloadDuration)
@@ -159,7 +162,7 @@ func downloadCatchpoint(addr string) ([]byte, error) {
network.SetUserAgentHeader(request.Header)
response, err := http.DefaultClient.Do(request)
if err != nil {
- return nil, err
+ return
}
defer response.Body.Close()
@@ -167,108 +170,64 @@ func downloadCatchpoint(addr string) ([]byte, error) {
switch response.StatusCode {
case http.StatusOK:
case http.StatusNotFound: // server could not find a block with that round numbers.
- return nil, fmt.Errorf("no catchpoint file for round %d", round)
+ err = fmt.Errorf("no catchpoint file for round %d", round)
+ return
default:
- return nil, fmt.Errorf("error response status code %d", response.StatusCode)
- }
- wdReader := util.MakeWatchdogStreamReader(response.Body, 4096, 4096, 2*time.Second)
- outBytes := make([]byte, 0, 4096)
- tempBytes := make([]byte, 4096)
- lastProgressUpdate := time.Now()
- progress := -25
- printDownloadProgressLine(progress, 50, url, 0)
- defer printDownloadProgressLine(0, 0, url, 0)
- for {
- n, err := wdReader.Read(tempBytes)
- if err != nil {
- if err == io.EOF {
- outBytes = append(outBytes, tempBytes[:n]...)
- return outBytes, nil
- }
- return nil, err
- }
- if cap(outBytes) < len(outBytes)+n {
- // need to increase buffer.
- newBuffer := make([]byte, cap(outBytes)+n, cap(outBytes)+1024*1024)
- copy(newBuffer, outBytes)
- copy(newBuffer[len(outBytes):], tempBytes[:n])
- outBytes = newBuffer
- } else {
- outBytes = append(outBytes, tempBytes[:n]...)
- }
- err = wdReader.Reset()
- if err != nil {
- if err == io.EOF {
- return outBytes, nil
- }
- return nil, err
- }
- if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond {
- lastProgressUpdate = time.Now()
- printDownloadProgressLine(progress, 50, url, int64(len(outBytes)))
- progress++
- }
- }
-}
-
-func printSaveProgressLine(progress int, barLength int, filename string, dld int64) {
- if barLength == 0 {
- fmt.Printf(escapeCursorUp+escapeDeleteLine+"[ Done ] Saved %s\n", filename)
+ err = fmt.Errorf("error response status code %d", response.StatusCode)
return
}
- outString := "[" + strings.Repeat(escapeSquare, progress) + strings.Repeat(escapeDot, barLength-progress) + "] Saving " + filename + " ..."
-
- fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld))
-}
-
-func saveCatchpointTarFile(addr string, catchpointFileBytes []byte) (err error) {
- // make a directory:
dirName := "./" + strings.Split(networkName, ".")[0] + "/" + strings.Split(addr, ".")[0]
os.RemoveAll(dirName)
err = os.MkdirAll(dirName, 0777)
if err != nil && !os.IsExist(err) {
return
}
- destFileName := dirName + "/" + strconv.FormatUint(uint64(round), 10) + ".tar"
- file, err2 := os.Create(destFileName) // will create a file with 0666 permission.
+ tarName = dirName + "/" + strconv.FormatUint(uint64(round), 10) + ".tar"
+ file, err2 := os.Create(tarName) // will create a file with 0666 permission.
if err2 != nil {
- return err2
+ return tarName, err2
}
defer func() {
err = file.Close()
}()
writeChunkSize := 64 * 1024
+
+ wdReader := util.MakeWatchdogStreamReader(response.Body, 4096, 4096, 2*time.Second)
+ var totalBytes int
+ tempBytes := make([]byte, writeChunkSize)
lastProgressUpdate := time.Now()
- fmt.Printf("\n")
- printSaveProgressLine(0, 50, destFileName, 0)
- progress := uint64(0)
- defer printSaveProgressLine(0, 0, destFileName, 0)
- total := len(catchpointFileBytes)
+ progress := -25
+ printDownloadProgressLine(progress, 50, url, 0)
+ defer printDownloadProgressLine(0, 0, url, 0)
+ var n int
for {
- writeSize := writeChunkSize
- if len(catchpointFileBytes) < writeSize {
- writeSize = len(catchpointFileBytes)
+ n, err = wdReader.Read(tempBytes)
+ if err != nil && err != io.EOF {
+ return
}
- if writeSize <= 0 {
- break
+ totalBytes += n
+ writtenBytes, err2 := file.Write(tempBytes[:n])
+ if err2 != nil || n != writtenBytes {
+ return tarName, err2
}
- n, err2 := file.Write(catchpointFileBytes[:writeSize])
- if err2 != nil || n != writeSize {
- return err
+
+ err = wdReader.Reset()
+ if err != nil {
+ if err == io.EOF {
+ return tarName, nil
+ }
+ return
}
- catchpointFileBytes = catchpointFileBytes[n:]
- if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond && total > 0 {
+ if time.Since(lastProgressUpdate) > 50*time.Millisecond {
lastProgressUpdate = time.Now()
- printSaveProgressLine(int(float64(progress)*50.0/float64(total)), 50, destFileName, int64(progress))
-
+ printDownloadProgressLine(progress, 50, url, int64(totalBytes))
+ progress++
}
- progress += uint64(n)
}
- return
}
-func makeFileDump(addr string, catchpointFileBytes []byte) error {
+func makeFileDump(addr string, tarFile string) error {
genesisInitState := ledgercore.InitState{}
deleteLedgerFiles := func() {
os.Remove("./ledger.block.sqlite")
@@ -294,8 +253,21 @@ func makeFileDump(addr string, catchpointFileBytes []byte) error {
if err != nil {
reportErrorf("Unable to initialize catchup database : %v", err)
}
+
+ stats, err := os.Stat(tarFile)
+ if err != nil {
+ return err
+ }
+ tarSize := stats.Size()
+
+ reader, err := os.Open(tarFile)
+ if err != nil {
+ return err
+ }
+ defer reader.Close()
+
var fileHeader ledger.CatchpointFileHeader
- fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, catchpointFileBytes)
+ fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, tarSize)
if err != nil {
reportErrorf("Unable to load catchpoint file into in-memory database : %v", err)
}