diff options
author | Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> | 2024-01-23 15:28:49 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-23 15:28:49 -0500 |
commit | 349073102f9f57fa6a267f9f76d20f085b0ff43b (patch) | |
tree | 72c26c69ea6e9703acfd3834c384a455fdaede1b | |
parent | a12324fd989883ce9bdc4c4fdecd26d7d59db965 (diff) |
blockdb: bound max deleted blocks per blockdb sync (#5910)
-rw-r--r-- | ledger/blockqueue.go | 17 | ||||
-rw-r--r-- | ledger/blockqueue_test.go | 115 | ||||
-rw-r--r-- | ledger/ledger.go | 6 | ||||
-rw-r--r-- | ledger/ledger_test.go | 2 |
4 files changed, 137 insertions, 3 deletions
diff --git a/ledger/blockqueue.go b/ledger/blockqueue.go index 686d51b96..7c1728101 100644 --- a/ledger/blockqueue.go +++ b/ledger/blockqueue.go @@ -111,6 +111,8 @@ func (bq *blockQueue) stop() { } } +const maxDeletionBatchSize = 10_000 + func (bq *blockQueue) syncer() { bq.mu.Lock() for { @@ -164,6 +166,21 @@ func (bq *blockQueue) syncer() { bq.mu.Unlock() minToSave := bq.l.notifyCommit(committed) + var earliest basics.Round + err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + var err0 error + earliest, err0 = blockdb.BlockEarliest(tx) + if err0 != nil { + bq.l.log.Warnf("blockQueue.syncer: BlockEarliest(): %v", err0) + } + return err0 + }) + if err == nil { + if basics.SubSaturate(minToSave, earliest) > maxDeletionBatchSize { + minToSave = basics.AddSaturate(earliest, maxDeletionBatchSize) + } + } + bfstart := time.Now() ledgerSyncBlockforgetCount.Inc(nil) err = bq.l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { diff --git a/ledger/blockqueue_test.go b/ledger/blockqueue_test.go index 8b451a95e..e74fbc0b3 100644 --- a/ledger/blockqueue_test.go +++ b/ledger/blockqueue_test.go @@ -17,9 +17,12 @@ package ledger import ( + "context" + "database/sql" "errors" "fmt" "testing" + "time" "github.com/stretchr/testify/require" @@ -29,10 +32,12 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/ledger/store/blockdb" ledgertesting "github.com/algorand/go-algorand/ledger/testing" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" + "github.com/algorand/go-algorand/util/db" ) func randomBlock(r basics.Round) blockEntry { @@ -128,3 +133,113 @@ func TestGetEncodedBlockCert(t *testing.T) { expectedErr := &ledgercore.ErrNoEntry{} require.True(t, errors.As(err, expectedErr)) } + +// it is not great to use trackers here but at the moment there is no abstraction for the ledger +type uptoTracker struct { + emptyTracker +} + +// committedUpTo in the emptyTracker just stores the committed round. +func (t *uptoTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { + return 5_000, basics.Round(0) +} + +// TestBlockQueueSyncerDeletion ensures that the block queue syncer deletes no more than maxDeletionBatchSize blocks at time +func TestBlockQueueSyncerDeletion(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + tests := []struct { + name string + expectedEarliest basics.Round + tracker ledgerTracker + }{ + {"max_batch", maxDeletionBatchSize, nil}, // no trackers, max deletion + {"5k_tracker", 5_000, &uptoTracker{}}, // tracker sets minToSave to 5k + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + const dbMem = true + blockDBs, err := db.OpenPair(t.Name()+".block.sqlite", dbMem) + require.NoError(t, err) + + log := logging.TestingLog(t) + err = blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + return initBlocksDB(tx, log, []bookkeeping.Block{}, false) + }) + require.NoError(t, err) + + // add 15k blocks + const maxBlocks = maxDeletionBatchSize + maxDeletionBatchSize/2 // 15_000 + err = blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + for i := 0; i < maxBlocks; i++ { + err0 := blockdb.BlockPut( + tx, + bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: basics.Round(i)}}, + agreement.Certificate{}) + if err0 != nil { + return err0 + } + } + return nil + }) + require.NoError(t, err) + + var earliest, latest basics.Round + err = blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + var err0 error + earliest, err0 = blockdb.BlockEarliest(tx) + if err0 != nil { + return err0 + } + latest, err0 = blockdb.BlockLatest(tx) + return err0 + }) + require.NoError(t, err) + require.Equal(t, basics.Round(0), earliest) + require.Equal(t, basics.Round(maxBlocks-1), latest) + + // trigger deletion and ensure no more than 10k blocks gone + //make a minimal ledger for blockqueue + + l := &Ledger{ + log: log, + blockDBs: blockDBs, + } + if test.tracker != nil { + l.trackers.trackers = append(l.trackers.trackers, test.tracker) + } + blockq, _ := newBlockQueue(l) + err = blockq.start() + require.NoError(t, err) + + // add a block. Eventually the syncer will called on an empty ledger + // forcing deleting all 15_000 rounds. The deletion scoping should limit it to 10_000 rounds instead + err = blockq.putBlock(bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: maxBlocks}}, agreement.Certificate{}) + require.NoError(t, err) + + require.Eventually(t, func() bool { + var latest basics.Round + err = blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + var err0 error + latest, err0 = blockdb.BlockLatest(tx) + return err0 + }) + require.NoError(t, err) + return latest == maxBlocks + }, 1*time.Second, 10*time.Millisecond) + + blockq.stop() + + err = blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + var err0 error + earliest, err0 = blockdb.BlockEarliest(tx) + return err0 + }) + require.NoError(t, err) + require.Equal(t, test.expectedEarliest, earliest) + }) + } +} diff --git a/ledger/ledger.go b/ledger/ledger.go index d5436ad5d..fa1be1a76 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -173,7 +173,7 @@ func OpenLedger[T string | DirsAndPrefix]( start := time.Now() ledgerInitblocksdbCount.Inc(nil) err = l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - return initBlocksDB(tx, l, []bookkeeping.Block{genesisInitState.Block}, cfg.Archival) + return initBlocksDB(tx, l.log, []bookkeeping.Block{genesisInitState.Block}, cfg.Archival) }) ledgerInitblocksdbMicros.AddMicrosecondsSince(start, nil) if err != nil { @@ -364,7 +364,7 @@ func (l *Ledger) setSynchronousMode(ctx context.Context, synchronousMode db.Sync // initBlocksDB performs DB initialization: // - creates and populates it with genesis blocks // - ensures DB is in good shape for archival mode and resets it if not -func initBlocksDB(tx *sql.Tx, l *Ledger, initBlocks []bookkeeping.Block, isArchival bool) (err error) { +func initBlocksDB(tx *sql.Tx, log logging.Logger, initBlocks []bookkeeping.Block, isArchival bool) (err error) { err = blockdb.BlockInit(tx, initBlocks) if err != nil { err = fmt.Errorf("initBlocksDB.blockInit %v", err) @@ -382,7 +382,7 @@ func initBlocksDB(tx *sql.Tx, l *Ledger, initBlocks []bookkeeping.Block, isArchi // Detect possible problem - archival node needs all block but have only subsequence of them // So reset the DB and init it again if earliest != basics.Round(0) { - l.log.Warnf("resetting blocks DB (earliest block is %v)", earliest) + log.Warnf("resetting blocks DB (earliest block is %v)", earliest) err := blockdb.BlockResetDB(tx) if err != nil { err = fmt.Errorf("initBlocksDB.blockResetDB %v", err) diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index 79ae16284..ab5bc293a 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -2877,11 +2877,13 @@ func testVotersReloadFromDiskAfterOneStateProofCommitted(t *testing.T, cfg confi } triggerDeleteVoters(t, l, genesisInitState) + l.acctsOnline.voters.votersMu.Lock() vtSnapshot := l.acctsOnline.voters.votersForRoundCache // verifying that the tree for round 512 is still in the cache, but the tree for round 256 is evicted. require.Contains(t, vtSnapshot, basics.Round(496)) require.NotContains(t, vtSnapshot, basics.Round(240)) + l.acctsOnline.voters.votersMu.Unlock() err = l.reloadLedger() require.NoError(t, err) |