From 9d494736afc5f0ff46bffd7505d3a56b8e0ce305 Mon Sep 17 00:00:00 2001 From: Shant Karakashian <55754073+algonautshant@users.noreply.github.com> Date: Wed, 19 Jan 2022 19:18:51 -0500 Subject: avoid generating log error on EnsureValidatedBlock / EnsureBlock (#3424) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In EnsureBlock,, do not log as error message if the error is ledgercore.ErrNonSequentialBlockEval and the block round is in the past (i.e. already in the ledger). --- catchup/service.go | 6 ++ data/ledger.go | 20 +++-- data/ledger_test.go | 241 ++++++++++++++++++++++++++++++++++++++++++++++++++++ ledger/ledger.go | 7 +- 4 files changed, 265 insertions(+), 9 deletions(-) diff --git a/catchup/service.go b/catchup/service.go index 4005316d5..f022a3dcc 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -345,6 +345,12 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, // if the context expired, just exit. return false } + if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound { + // the block was added to the ledger from elsewhere after fetching it here + // only the agreement could have added this block into the ledger, catchup is complete + s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r) + return false + } s.log.Warnf("fetchAndWrite(%d): failed to validate block : %v", r, err) return false } diff --git a/data/ledger.go b/data/ledger.go index f5d851082..8fc03cb6e 100644 --- a/data/ledger.go +++ b/data/ledger.go @@ -325,14 +325,16 @@ func (l *Ledger) EnsureValidatedBlock(vb *ledgercore.ValidatedBlock, c agreement break } - logfn := logging.Base().Errorf + logfn := l.log.Errorf switch err.(type) { case ledgercore.BlockInLedgerError: - logfn = logging.Base().Debugf + // If the block is already in the ledger (catchup and agreement might be competing), + // reporting this as a debug message is sufficient. + logfn = l.log.Debugf + // Otherwise, the error is because the block is in the future. Error is logged. } - - logfn("could not write block %d to the ledger: %v", round, err) + logfn("data.EnsureValidatedBlock: could not write block %d to the ledger: %v", round, err) } } @@ -353,14 +355,16 @@ func (l *Ledger) EnsureBlock(block *bookkeeping.Block, c agreement.Certificate) switch err.(type) { case protocol.Error: if !protocolErrorLogged { - logging.Base().Errorf("unrecoverable protocol error detected at block %d: %v", round, err) + l.log.Errorf("data.EnsureBlock: unrecoverable protocol error detected at block %d: %v", round, err) protocolErrorLogged = true } case ledgercore.BlockInLedgerError: - logging.Base().Debugf("could not write block %d to the ledger: %v", round, err) - return // this error implies that l.LastRound() >= round + // The block is already in the ledger. Catchup and agreement could be competing + // It is sufficient to report this as a Debug message + l.log.Debugf("data.EnsureBlock: could not write block %d to the ledger: %v", round, err) + return default: - logging.Base().Errorf("could not write block %d to the ledger: %v", round, err) + l.log.Errorf("data.EnsureBlock: could not write block %d to the ledger: %v", round, err) } // If there was an error add a short delay before the next attempt. diff --git a/data/ledger_test.go b/data/ledger_test.go index 29456608b..dc50147db 100644 --- a/data/ledger_test.go +++ b/data/ledger_test.go @@ -17,6 +17,9 @@ package data import ( + "context" + "fmt" + "sync" "testing" "github.com/stretchr/testify/require" @@ -32,6 +35,7 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" + "github.com/algorand/go-algorand/util/execpool" ) var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} @@ -420,3 +424,240 @@ func TestConsensusVersion(t *testing.T) { require.Equal(t, protocol.ConsensusVersion(""), ver) require.Equal(t, ledgercore.ErrNoEntry{Round: basics.Round(blk.BlockHeader.NextProtocolSwitchOn + 1), Latest: basics.Round(blk.BlockHeader.Round), Committed: basics.Round(blk.BlockHeader.Round)}, err) } + +type loggedMessages struct { + logging.Logger + expectedMessages chan string + unexpectedMessages chan string +} + +func (lm loggedMessages) Debug(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Debugf(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.expectedMessages <- m +} +func (lm loggedMessages) Info(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Infof(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Warn(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Warnf(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Error(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Errorf(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.unexpectedMessages <- m +} + +// TestLedgerErrorValidate creates 3 parallel routines adding blocks to the ledger through different interfaces. +// The purpose here is to simulate the scenario where the catchup and the agreement compete to add blocks to the ledger. +// The error messages reported can be excessive or unnecessary. This test evaluates what messages are generate and at what frequency. +func TestLedgerErrorValidate(t *testing.T) { + partitiontest.PartitionTest(t) + + var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36} + + proto, _ := config.Consensus[protocol.ConsensusCurrentVersion] + origProto := proto + defer func() { + config.Consensus[protocol.ConsensusCurrentVersion] = origProto + }() + proto.MinBalance = 0 + config.Consensus[protocol.ConsensusCurrentVersion] = proto + + blk := bookkeeping.Block{} + blk.CurrentProtocol = protocol.ConsensusCurrentVersion + blk.RewardsPool = testPoolAddr + blk.FeeSink = testSinkAddr + blk.BlockHeader.GenesisHash = crypto.Hash([]byte(t.Name())) + + accts := make(map[basics.Address]basics.AccountData) + accts[testPoolAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0}) + accts[testSinkAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0}) + + genesisInitState := ledgercore.InitState{ + Accounts: accts, + Block: blk, + GenesisHash: crypto.Hash([]byte(t.Name())), + } + + expectedMessages := make(chan string, 100) + unexpectedMessages := make(chan string, 100) + + const inMem = true + cfg := config.GetDefaultLocal() + cfg.Archival = true + log := loggedMessages{Logger: logging.TestingLog(t), expectedMessages: expectedMessages, unexpectedMessages: unexpectedMessages} + log.SetLevel(logging.Debug) + realLedger, err := ledger.OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) + require.NoError(t, err, "could not open ledger") + defer realLedger.Close() + + l := Ledger{Ledger: realLedger, log: log} + l.log.SetLevel(logging.Debug) + require.NotNil(t, &l) + + totalsRound, _, err := realLedger.LatestTotals() + require.NoError(t, err) + require.Equal(t, basics.Round(0), totalsRound) + + errChan := make(chan error, 1) + defer close(errChan) + + wg := sync.WaitGroup{} + defer wg.Wait() + + blkChan1 := make(chan bookkeeping.Block, 10) + blkChan2 := make(chan bookkeeping.Block, 10) + blkChan3 := make(chan bookkeeping.Block, 10) + defer close(blkChan1) + defer close(blkChan2) + defer close(blkChan3) + + // Add blocks to the ledger via EnsureValidatedBlock. This calls AddValidatedBlock, which simply + // passes the block to blockQueue. The returned error is handled by EnsureValidatedBlock, which reports + // in the form of logged error message. + go func() { + wg.Add(1) + i := 0 + for blk := range blkChan1 { + i++ + vb, err := validatedBlock(l.Ledger, blk) + if err != nil { + // AddBlock already added the block + // This is okay to ignore. + // This error is generated from ledger.Ledger Validate function, used from: + // - node blockValidatorImpl Validate + // - catchup service s.ledger.Validate (Catchup service returns after the first error) + continue + } + l.EnsureValidatedBlock(vb, agreement.Certificate{}) + } + wg.Done() + }() + + // Add blocks to the ledger via EnsureBlock. This basically calls AddBlock, but handles + // the errors by logging them. Checking the logged messages to verify its behavior. + go func() { + wg.Add(1) + i := 0 + for blk := range blkChan2 { + i++ + l.EnsureBlock(&blk, agreement.Certificate{}) + } + wg.Done() + }() + + // Add blocks directly to the ledger + go func() { + wg.Add(1) + i := 0 + for blk := range blkChan3 { + i++ + err := l.AddBlock(blk, agreement.Certificate{}) + // AddBlock is used in 2 places: + // - data.ledger.EnsureBlock which reports a log message as Error or Debug + // - catchup.service.fetchAndWrite which leads to interrupting catchup or skiping the round + if err != nil { + switch err.(type) { + // The following two cases are okay to ignore, since these are expected and handled + case ledgercore.BlockInLedgerError: + case ledgercore.ErrNonSequentialBlockEval: + continue + default: + // Make sure unexpected error is not obtained here + errChan <- err + } + } + l.WaitForCommit(blk.BlockHeader.Round) + } + wg.Done() + }() + + // flush the messages output during the setup + more := true + for more { + select { + case <-expectedMessages: + case <-unexpectedMessages: + default: + more = false + } + } + + for rnd := basics.Round(1); rnd <= basics.Round(2000); rnd++ { + blk, err := getEmptyBlock(rnd-1, l.Ledger, t.Name(), genesisInitState.Accounts) + require.NoError(t, err) + blkChan3 <- blk + blkChan2 <- blk + blkChan1 <- blk + + more = true + for more { + select { + case err := <-errChan: + require.NoError(t, err) + case <-expectedMessages: + // only debug messages should be reported + case um := <-unexpectedMessages: + require.Empty(t, um, um) + default: + more = false + } + } + } +} + +func validatedBlock(l *ledger.Ledger, blk bookkeeping.Block) (vb *ledgercore.ValidatedBlock, err error) { + backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) + defer backlogPool.Shutdown() + vb, err = l.Validate(context.Background(), blk, backlogPool) + return +} + +func getEmptyBlock(afterRound basics.Round, l *ledger.Ledger, genesisID string, initAccounts map[basics.Address]basics.AccountData) (blk bookkeeping.Block, err error) { + l.WaitForCommit(afterRound) + + lastBlock, err := l.Block(l.Latest()) + if err != nil { + return + } + + proto := config.Consensus[lastBlock.CurrentProtocol] + blk.BlockHeader = bookkeeping.BlockHeader{ + GenesisID: genesisID, + Round: l.Latest() + 1, + Branch: lastBlock.Hash(), + TimeStamp: 0, + } + + if proto.SupportGenesisHash { + blk.BlockHeader.GenesisHash = crypto.Hash([]byte(genesisID)) + } + + blk.RewardsPool = testPoolAddr + blk.FeeSink = testSinkAddr + blk.CurrentProtocol = lastBlock.CurrentProtocol + + blk.TxnRoot, err = blk.PaysetCommit() + if err != nil { + return + } + return +} diff --git a/ledger/ledger.go b/ledger/ledger.go index 02d5516af..cf5476e1b 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -578,6 +578,11 @@ func (l *Ledger) AddBlock(blk bookkeeping.Block, cert agreement.Certificate) err updates, err := internal.Eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil) if err != nil { + if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound { + return ledgercore.BlockInLedgerError{ + LastRound: errNSBE.EvaluatorRound, + NextRound: errNSBE.LatestRound + 1} + } return err } vb := ledgercore.MakeValidatedBlock(blk, updates) @@ -602,7 +607,7 @@ func (l *Ledger) AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement. } l.headerCache.Put(blk.Round(), blk.BlockHeader) l.trackers.newBlock(blk, vb.Delta()) - l.log.Debugf("added blk %d", blk.Round()) + l.log.Debugf("ledger.AddValidatedBlock: added blk %d", blk.Round()) return nil } -- cgit v1.2.3