diff options
author | Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> | 2022-02-03 20:20:17 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-03 20:20:17 -0500 |
commit | 0d65a0f93f214087569939ad2cea0c0f1b15d8d8 (patch) | |
tree | cf87a22f727a6041b103748cc9591502f432af25 | |
parent | 6230dc55c78918a2d4685a4b1e6ca78fdc35888b (diff) |
Optimize catchpointdump utility (#3561)
## Summary
* Do not load everything into memory but write directly in into a file
This creates extremely high number of reallocation and CPU and RAM wasting.
* Add cmd options for downloading only (no loading), and for downloading
from a single relay
# Perf data:
| param | current | new |
|-----------|--------|---------|
| RAM | 5 GB | 200 MB |
| Total time | 823 s | 441 s |
Downloading time went down from minutes to tens of seconds.
-rw-r--r-- | cmd/catchpointdump/file.go | 31 | ||||
-rw-r--r-- | cmd/catchpointdump/net.go | 152 |
2 files changed, 82 insertions, 101 deletions
diff --git a/cmd/catchpointdump/file.go b/cmd/catchpointdump/file.go index 78c7a4c4d..3335575b0 100644 --- a/cmd/catchpointdump/file.go +++ b/cmd/catchpointdump/file.go @@ -19,13 +19,11 @@ package main import ( "archive/tar" "bufio" - "bytes" "context" "database/sql" "encoding/json" "fmt" "io" - "io/ioutil" "os" "strings" "time" @@ -59,9 +57,14 @@ var fileCmd = &cobra.Command{ cmd.HelpFunc()(cmd, args) return } - tarFileBytes, err := ioutil.ReadFile(tarFile) - if err != nil || len(tarFileBytes) == 0 { - reportErrorf("Unable to read '%s' : %v", tarFile, err) + stats, err := os.Stat(tarFile) + if err != nil { + reportErrorf("Unable to stat '%s' : %v", tarFile, err) + } + tarSize := stats.Size() + + if tarSize == 0 { + reportErrorf("Empty file '%s' : %v", tarFile, err) } genesisInitState := ledgercore.InitState{} cfg := config.GetDefaultLocal() @@ -84,7 +87,14 @@ var fileCmd = &cobra.Command{ reportErrorf("Unable to initialize catchup database : %v", err) } var fileHeader ledger.CatchpointFileHeader - fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, tarFileBytes) + + reader, err := os.Open(tarFile) + if err != nil { + reportErrorf("Unable to read '%s' : %v", tarFile, err) + } + defer reader.Close() + + fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, tarSize) if err != nil { reportErrorf("Unable to load catchpoint file into in-memory database : %v", err) } @@ -115,15 +125,14 @@ func printLoadCatchpointProgressLine(progress int, barLength int, dld int64) { fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld)) } -func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, fileBytes []byte) (fileHeader ledger.CatchpointFileHeader, err error) { +func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, tarFile io.Reader, tarSize int64) (fileHeader ledger.CatchpointFileHeader, err error) { fmt.Printf("\n") printLoadCatchpointProgressLine(0, 50, 0) lastProgressUpdate := time.Now() progress := uint64(0) defer printLoadCatchpointProgressLine(0, 0, 0) - reader := bytes.NewReader(fileBytes) - tarReader := tar.NewReader(reader) + tarReader := tar.NewReader(tarFile) var downloadProgress ledger.CatchpointCatchupAccessorProgress for { header, err := tarReader.Next() @@ -158,9 +167,9 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc // we already know it's valid, since we validated that above. protocol.Decode(balancesBlockBytes, &fileHeader) } - if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond && len(fileBytes) > 0 { + if time.Since(lastProgressUpdate) > 50*time.Millisecond && tarSize > 0 { lastProgressUpdate = time.Now() - printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(len(fileBytes))), 50, int64(progress)) + printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(tarSize)), 50, int64(progress)) } } } diff --git a/cmd/catchpointdump/net.go b/cmd/catchpointdump/net.go index da4292bb8..f4a31bff9 100644 --- a/cmd/catchpointdump/net.go +++ b/cmd/catchpointdump/net.go @@ -40,6 +40,8 @@ import ( var networkName string var round int var relayAddress string +var singleCatchpoint bool +var downloadOnly bool const ( escapeCursorUp = string("\033[A") // Cursor Up @@ -52,12 +54,14 @@ func init() { netCmd.Flags().StringVarP(&networkName, "net", "n", "", "Specify the network name ( i.e. mainnet.algorand.network )") netCmd.Flags().IntVarP(&round, "round", "r", 0, "Specify the round number ( i.e. 7700000 )") netCmd.Flags().StringVarP(&relayAddress, "relay", "p", "", "Relay address to use ( i.e. r-ru.algorand-mainnet.network:4160 )") + netCmd.Flags().BoolVarP(&singleCatchpoint, "single", "s", true, "Download/process only from a single relay") + netCmd.Flags().BoolVarP(&downloadOnly, "download", "l", false, "Download only, do not process") } var netCmd = &cobra.Command{ Use: "net", - Short: "Download and decode (possibly all) catchpoint files from all or specified the relay(s) on the network for a particular round", - Long: "Download and decode (possibly all) catchpoint files from all or specified the relay(s) on the network for a particular round", + Short: "Download and decode (possibly all) catchpoint files from possibly all or specified the relay(s) on the network for a particular round", + Long: "Download and decode (possibly all) catchpoint files from possibly all or specified the relay(s) on the network for a particular round", Args: validateNoPosArgsFn, Run: func(cmd *cobra.Command, args []string) { if networkName == "" || round == 0 { @@ -77,21 +81,20 @@ var netCmd = &cobra.Command{ } for _, addr := range addrs { - catchpointFileBytes, err := downloadCatchpoint(addr) - if err != nil || catchpointFileBytes == nil { - reportInfof("failed to download catchpoint from '%s' : %v", addr, err) - continue - } - err = saveCatchpointTarFile(addr, catchpointFileBytes) + tarName, err := downloadCatchpoint(addr, round) if err != nil { - reportInfof("failed to save catchpoint file for '%s' : %v", addr, err) + reportInfof("failed to download catchpoint from '%s' : %v", addr, err) continue } - err = makeFileDump(addr, catchpointFileBytes) + err = makeFileDump(addr, tarName) if err != nil { reportInfof("failed to make a dump from tar file for '%s' : %v", addr, err) continue } + if singleCatchpoint { + // a catchpoint processes successfully, exit if needed + break + } } }, } @@ -144,13 +147,13 @@ func printDownloadProgressLine(progress int, barLength int, url string, dld int6 fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld)) } -func downloadCatchpoint(addr string) ([]byte, error) { +func downloadCatchpoint(addr string, round int) (tarName string, err error) { genesisID := strings.Split(networkName, ".")[0] + "-v1.0" url := "http://" + addr + "/v1/" + genesisID + "/ledger/" + strconv.FormatUint(uint64(round), 36) fmt.Printf("downloading from %s\n", url) request, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { - return nil, err + return } timeoutContext, timeoutContextCancel := context.WithTimeout(context.Background(), config.GetDefaultLocal().MaxCatchpointDownloadDuration) @@ -159,7 +162,7 @@ func downloadCatchpoint(addr string) ([]byte, error) { network.SetUserAgentHeader(request.Header) response, err := http.DefaultClient.Do(request) if err != nil { - return nil, err + return } defer response.Body.Close() @@ -167,108 +170,64 @@ func downloadCatchpoint(addr string) ([]byte, error) { switch response.StatusCode { case http.StatusOK: case http.StatusNotFound: // server could not find a block with that round numbers. - return nil, fmt.Errorf("no catchpoint file for round %d", round) + err = fmt.Errorf("no catchpoint file for round %d", round) + return default: - return nil, fmt.Errorf("error response status code %d", response.StatusCode) - } - wdReader := util.MakeWatchdogStreamReader(response.Body, 4096, 4096, 2*time.Second) - outBytes := make([]byte, 0, 4096) - tempBytes := make([]byte, 4096) - lastProgressUpdate := time.Now() - progress := -25 - printDownloadProgressLine(progress, 50, url, 0) - defer printDownloadProgressLine(0, 0, url, 0) - for { - n, err := wdReader.Read(tempBytes) - if err != nil { - if err == io.EOF { - outBytes = append(outBytes, tempBytes[:n]...) - return outBytes, nil - } - return nil, err - } - if cap(outBytes) < len(outBytes)+n { - // need to increase buffer. - newBuffer := make([]byte, cap(outBytes)+n, cap(outBytes)+1024*1024) - copy(newBuffer, outBytes) - copy(newBuffer[len(outBytes):], tempBytes[:n]) - outBytes = newBuffer - } else { - outBytes = append(outBytes, tempBytes[:n]...) - } - err = wdReader.Reset() - if err != nil { - if err == io.EOF { - return outBytes, nil - } - return nil, err - } - if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond { - lastProgressUpdate = time.Now() - printDownloadProgressLine(progress, 50, url, int64(len(outBytes))) - progress++ - } - } -} - -func printSaveProgressLine(progress int, barLength int, filename string, dld int64) { - if barLength == 0 { - fmt.Printf(escapeCursorUp+escapeDeleteLine+"[ Done ] Saved %s\n", filename) + err = fmt.Errorf("error response status code %d", response.StatusCode) return } - outString := "[" + strings.Repeat(escapeSquare, progress) + strings.Repeat(escapeDot, barLength-progress) + "] Saving " + filename + " ..." - - fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld)) -} - -func saveCatchpointTarFile(addr string, catchpointFileBytes []byte) (err error) { - // make a directory: dirName := "./" + strings.Split(networkName, ".")[0] + "/" + strings.Split(addr, ".")[0] os.RemoveAll(dirName) err = os.MkdirAll(dirName, 0777) if err != nil && !os.IsExist(err) { return } - destFileName := dirName + "/" + strconv.FormatUint(uint64(round), 10) + ".tar" - file, err2 := os.Create(destFileName) // will create a file with 0666 permission. + tarName = dirName + "/" + strconv.FormatUint(uint64(round), 10) + ".tar" + file, err2 := os.Create(tarName) // will create a file with 0666 permission. if err2 != nil { - return err2 + return tarName, err2 } defer func() { err = file.Close() }() writeChunkSize := 64 * 1024 + + wdReader := util.MakeWatchdogStreamReader(response.Body, 4096, 4096, 2*time.Second) + var totalBytes int + tempBytes := make([]byte, writeChunkSize) lastProgressUpdate := time.Now() - fmt.Printf("\n") - printSaveProgressLine(0, 50, destFileName, 0) - progress := uint64(0) - defer printSaveProgressLine(0, 0, destFileName, 0) - total := len(catchpointFileBytes) + progress := -25 + printDownloadProgressLine(progress, 50, url, 0) + defer printDownloadProgressLine(0, 0, url, 0) + var n int for { - writeSize := writeChunkSize - if len(catchpointFileBytes) < writeSize { - writeSize = len(catchpointFileBytes) + n, err = wdReader.Read(tempBytes) + if err != nil && err != io.EOF { + return } - if writeSize <= 0 { - break + totalBytes += n + writtenBytes, err2 := file.Write(tempBytes[:n]) + if err2 != nil || n != writtenBytes { + return tarName, err2 } - n, err2 := file.Write(catchpointFileBytes[:writeSize]) - if err2 != nil || n != writeSize { - return err + + err = wdReader.Reset() + if err != nil { + if err == io.EOF { + return tarName, nil + } + return } - catchpointFileBytes = catchpointFileBytes[n:] - if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond && total > 0 { + if time.Since(lastProgressUpdate) > 50*time.Millisecond { lastProgressUpdate = time.Now() - printSaveProgressLine(int(float64(progress)*50.0/float64(total)), 50, destFileName, int64(progress)) - + printDownloadProgressLine(progress, 50, url, int64(totalBytes)) + progress++ } - progress += uint64(n) } - return } -func makeFileDump(addr string, catchpointFileBytes []byte) error { +func makeFileDump(addr string, tarFile string) error { genesisInitState := ledgercore.InitState{} deleteLedgerFiles := func() { os.Remove("./ledger.block.sqlite") @@ -294,8 +253,21 @@ func makeFileDump(addr string, catchpointFileBytes []byte) error { if err != nil { reportErrorf("Unable to initialize catchup database : %v", err) } + + stats, err := os.Stat(tarFile) + if err != nil { + return err + } + tarSize := stats.Size() + + reader, err := os.Open(tarFile) + if err != nil { + return err + } + defer reader.Close() + var fileHeader ledger.CatchpointFileHeader - fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, catchpointFileBytes) + fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, tarSize) if err != nil { reportErrorf("Unable to load catchpoint file into in-memory database : %v", err) } |