summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com>2024-01-23 15:28:49 -0500
committerGitHub <noreply@github.com>2024-01-23 15:28:49 -0500
commit349073102f9f57fa6a267f9f76d20f085b0ff43b (patch)
tree72c26c69ea6e9703acfd3834c384a455fdaede1b
parenta12324fd989883ce9bdc4c4fdecd26d7d59db965 (diff)
blockdb: bound max deleted blocks per blockdb sync (#5910)
-rw-r--r--ledger/blockqueue.go17
-rw-r--r--ledger/blockqueue_test.go115
-rw-r--r--ledger/ledger.go6
-rw-r--r--ledger/ledger_test.go2
4 files changed, 137 insertions, 3 deletions
diff --git a/ledger/blockqueue.go b/ledger/blockqueue.go
index 686d51b96..7c1728101 100644
--- a/ledger/blockqueue.go
+++ b/ledger/blockqueue.go
@@ -111,6 +111,8 @@ func (bq *blockQueue) stop() {
}
}
+const maxDeletionBatchSize = 10_000
+
func (bq *blockQueue) syncer() {
bq.mu.Lock()
for {
@@ -164,6 +166,21 @@ func (bq *blockQueue) syncer() {
bq.mu.Unlock()
minToSave := bq.l.notifyCommit(committed)
+ var earliest basics.Round
+ err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
+ var err0 error
+ earliest, err0 = blockdb.BlockEarliest(tx)
+ if err0 != nil {
+ bq.l.log.Warnf("blockQueue.syncer: BlockEarliest(): %v", err0)
+ }
+ return err0
+ })
+ if err == nil {
+ if basics.SubSaturate(minToSave, earliest) > maxDeletionBatchSize {
+ minToSave = basics.AddSaturate(earliest, maxDeletionBatchSize)
+ }
+ }
+
bfstart := time.Now()
ledgerSyncBlockforgetCount.Inc(nil)
err = bq.l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
diff --git a/ledger/blockqueue_test.go b/ledger/blockqueue_test.go
index 8b451a95e..e74fbc0b3 100644
--- a/ledger/blockqueue_test.go
+++ b/ledger/blockqueue_test.go
@@ -17,9 +17,12 @@
package ledger
import (
+ "context"
+ "database/sql"
"errors"
"fmt"
"testing"
+ "time"
"github.com/stretchr/testify/require"
@@ -29,10 +32,12 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
+ "github.com/algorand/go-algorand/ledger/store/blockdb"
ledgertesting "github.com/algorand/go-algorand/ledger/testing"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
+ "github.com/algorand/go-algorand/util/db"
)
func randomBlock(r basics.Round) blockEntry {
@@ -128,3 +133,113 @@ func TestGetEncodedBlockCert(t *testing.T) {
expectedErr := &ledgercore.ErrNoEntry{}
require.True(t, errors.As(err, expectedErr))
}
+
+// it is not great to use trackers here but at the moment there is no abstraction for the ledger
+type uptoTracker struct {
+ emptyTracker
+}
+
+// committedUpTo in the emptyTracker just stores the committed round.
+func (t *uptoTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
+ return 5_000, basics.Round(0)
+}
+
+// TestBlockQueueSyncerDeletion ensures that the block queue syncer deletes no more than maxDeletionBatchSize blocks at time
+func TestBlockQueueSyncerDeletion(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ expectedEarliest basics.Round
+ tracker ledgerTracker
+ }{
+ {"max_batch", maxDeletionBatchSize, nil}, // no trackers, max deletion
+ {"5k_tracker", 5_000, &uptoTracker{}}, // tracker sets minToSave to 5k
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+
+ const dbMem = true
+ blockDBs, err := db.OpenPair(t.Name()+".block.sqlite", dbMem)
+ require.NoError(t, err)
+
+ log := logging.TestingLog(t)
+ err = blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
+ return initBlocksDB(tx, log, []bookkeeping.Block{}, false)
+ })
+ require.NoError(t, err)
+
+ // add 15k blocks
+ const maxBlocks = maxDeletionBatchSize + maxDeletionBatchSize/2 // 15_000
+ err = blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
+ for i := 0; i < maxBlocks; i++ {
+ err0 := blockdb.BlockPut(
+ tx,
+ bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: basics.Round(i)}},
+ agreement.Certificate{})
+ if err0 != nil {
+ return err0
+ }
+ }
+ return nil
+ })
+ require.NoError(t, err)
+
+ var earliest, latest basics.Round
+ err = blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
+ var err0 error
+ earliest, err0 = blockdb.BlockEarliest(tx)
+ if err0 != nil {
+ return err0
+ }
+ latest, err0 = blockdb.BlockLatest(tx)
+ return err0
+ })
+ require.NoError(t, err)
+ require.Equal(t, basics.Round(0), earliest)
+ require.Equal(t, basics.Round(maxBlocks-1), latest)
+
+ // trigger deletion and ensure no more than 10k blocks gone
+ //make a minimal ledger for blockqueue
+
+ l := &Ledger{
+ log: log,
+ blockDBs: blockDBs,
+ }
+ if test.tracker != nil {
+ l.trackers.trackers = append(l.trackers.trackers, test.tracker)
+ }
+ blockq, _ := newBlockQueue(l)
+ err = blockq.start()
+ require.NoError(t, err)
+
+ // add a block. Eventually the syncer will called on an empty ledger
+ // forcing deleting all 15_000 rounds. The deletion scoping should limit it to 10_000 rounds instead
+ err = blockq.putBlock(bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: maxBlocks}}, agreement.Certificate{})
+ require.NoError(t, err)
+
+ require.Eventually(t, func() bool {
+ var latest basics.Round
+ err = blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
+ var err0 error
+ latest, err0 = blockdb.BlockLatest(tx)
+ return err0
+ })
+ require.NoError(t, err)
+ return latest == maxBlocks
+ }, 1*time.Second, 10*time.Millisecond)
+
+ blockq.stop()
+
+ err = blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
+ var err0 error
+ earliest, err0 = blockdb.BlockEarliest(tx)
+ return err0
+ })
+ require.NoError(t, err)
+ require.Equal(t, test.expectedEarliest, earliest)
+ })
+ }
+}
diff --git a/ledger/ledger.go b/ledger/ledger.go
index d5436ad5d..fa1be1a76 100644
--- a/ledger/ledger.go
+++ b/ledger/ledger.go
@@ -173,7 +173,7 @@ func OpenLedger[T string | DirsAndPrefix](
start := time.Now()
ledgerInitblocksdbCount.Inc(nil)
err = l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
- return initBlocksDB(tx, l, []bookkeeping.Block{genesisInitState.Block}, cfg.Archival)
+ return initBlocksDB(tx, l.log, []bookkeeping.Block{genesisInitState.Block}, cfg.Archival)
})
ledgerInitblocksdbMicros.AddMicrosecondsSince(start, nil)
if err != nil {
@@ -364,7 +364,7 @@ func (l *Ledger) setSynchronousMode(ctx context.Context, synchronousMode db.Sync
// initBlocksDB performs DB initialization:
// - creates and populates it with genesis blocks
// - ensures DB is in good shape for archival mode and resets it if not
-func initBlocksDB(tx *sql.Tx, l *Ledger, initBlocks []bookkeeping.Block, isArchival bool) (err error) {
+func initBlocksDB(tx *sql.Tx, log logging.Logger, initBlocks []bookkeeping.Block, isArchival bool) (err error) {
err = blockdb.BlockInit(tx, initBlocks)
if err != nil {
err = fmt.Errorf("initBlocksDB.blockInit %v", err)
@@ -382,7 +382,7 @@ func initBlocksDB(tx *sql.Tx, l *Ledger, initBlocks []bookkeeping.Block, isArchi
// Detect possible problem - archival node needs all block but have only subsequence of them
// So reset the DB and init it again
if earliest != basics.Round(0) {
- l.log.Warnf("resetting blocks DB (earliest block is %v)", earliest)
+ log.Warnf("resetting blocks DB (earliest block is %v)", earliest)
err := blockdb.BlockResetDB(tx)
if err != nil {
err = fmt.Errorf("initBlocksDB.blockResetDB %v", err)
diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go
index 79ae16284..ab5bc293a 100644
--- a/ledger/ledger_test.go
+++ b/ledger/ledger_test.go
@@ -2877,11 +2877,13 @@ func testVotersReloadFromDiskAfterOneStateProofCommitted(t *testing.T, cfg confi
}
triggerDeleteVoters(t, l, genesisInitState)
+ l.acctsOnline.voters.votersMu.Lock()
vtSnapshot := l.acctsOnline.voters.votersForRoundCache
// verifying that the tree for round 512 is still in the cache, but the tree for round 256 is evicted.
require.Contains(t, vtSnapshot, basics.Round(496))
require.NotContains(t, vtSnapshot, basics.Round(240))
+ l.acctsOnline.voters.votersMu.Unlock()
err = l.reloadLedger()
require.NoError(t, err)