summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShant Karakashian <55754073+algonautshant@users.noreply.github.com>2022-01-19 19:18:51 -0500
committerGitHub <noreply@github.com>2022-01-19 19:18:51 -0500
commit9d494736afc5f0ff46bffd7505d3a56b8e0ce305 (patch)
tree99e3cbe758361f85374f97dbe135cee716b1e669
parent82eabffaa24b591554c2d83b36d8003ccf555a89 (diff)
avoid generating log error on EnsureValidatedBlock / EnsureBlock (#3424)
In EnsureBlock,, do not log as error message if the error is ledgercore.ErrNonSequentialBlockEval and the block round is in the past (i.e. already in the ledger).
-rw-r--r--catchup/service.go6
-rw-r--r--data/ledger.go20
-rw-r--r--data/ledger_test.go241
-rw-r--r--ledger/ledger.go7
4 files changed, 265 insertions, 9 deletions
diff --git a/catchup/service.go b/catchup/service.go
index 4005316d5..f022a3dcc 100644
--- a/catchup/service.go
+++ b/catchup/service.go
@@ -345,6 +345,12 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
// if the context expired, just exit.
return false
}
+ if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound {
+ // the block was added to the ledger from elsewhere after fetching it here
+ // only the agreement could have added this block into the ledger, catchup is complete
+ s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r)
+ return false
+ }
s.log.Warnf("fetchAndWrite(%d): failed to validate block : %v", r, err)
return false
}
diff --git a/data/ledger.go b/data/ledger.go
index f5d851082..8fc03cb6e 100644
--- a/data/ledger.go
+++ b/data/ledger.go
@@ -325,14 +325,16 @@ func (l *Ledger) EnsureValidatedBlock(vb *ledgercore.ValidatedBlock, c agreement
break
}
- logfn := logging.Base().Errorf
+ logfn := l.log.Errorf
switch err.(type) {
case ledgercore.BlockInLedgerError:
- logfn = logging.Base().Debugf
+ // If the block is already in the ledger (catchup and agreement might be competing),
+ // reporting this as a debug message is sufficient.
+ logfn = l.log.Debugf
+ // Otherwise, the error is because the block is in the future. Error is logged.
}
-
- logfn("could not write block %d to the ledger: %v", round, err)
+ logfn("data.EnsureValidatedBlock: could not write block %d to the ledger: %v", round, err)
}
}
@@ -353,14 +355,16 @@ func (l *Ledger) EnsureBlock(block *bookkeeping.Block, c agreement.Certificate)
switch err.(type) {
case protocol.Error:
if !protocolErrorLogged {
- logging.Base().Errorf("unrecoverable protocol error detected at block %d: %v", round, err)
+ l.log.Errorf("data.EnsureBlock: unrecoverable protocol error detected at block %d: %v", round, err)
protocolErrorLogged = true
}
case ledgercore.BlockInLedgerError:
- logging.Base().Debugf("could not write block %d to the ledger: %v", round, err)
- return // this error implies that l.LastRound() >= round
+ // The block is already in the ledger. Catchup and agreement could be competing
+ // It is sufficient to report this as a Debug message
+ l.log.Debugf("data.EnsureBlock: could not write block %d to the ledger: %v", round, err)
+ return
default:
- logging.Base().Errorf("could not write block %d to the ledger: %v", round, err)
+ l.log.Errorf("data.EnsureBlock: could not write block %d to the ledger: %v", round, err)
}
// If there was an error add a short delay before the next attempt.
diff --git a/data/ledger_test.go b/data/ledger_test.go
index 29456608b..dc50147db 100644
--- a/data/ledger_test.go
+++ b/data/ledger_test.go
@@ -17,6 +17,9 @@
package data
import (
+ "context"
+ "fmt"
+ "sync"
"testing"
"github.com/stretchr/testify/require"
@@ -32,6 +35,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
+ "github.com/algorand/go-algorand/util/execpool"
)
var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
@@ -420,3 +424,240 @@ func TestConsensusVersion(t *testing.T) {
require.Equal(t, protocol.ConsensusVersion(""), ver)
require.Equal(t, ledgercore.ErrNoEntry{Round: basics.Round(blk.BlockHeader.NextProtocolSwitchOn + 1), Latest: basics.Round(blk.BlockHeader.Round), Committed: basics.Round(blk.BlockHeader.Round)}, err)
}
+
+type loggedMessages struct {
+ logging.Logger
+ expectedMessages chan string
+ unexpectedMessages chan string
+}
+
+func (lm loggedMessages) Debug(args ...interface{}) {
+ m := fmt.Sprint(args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Debugf(s string, args ...interface{}) {
+ m := fmt.Sprintf(s, args...)
+ lm.expectedMessages <- m
+}
+func (lm loggedMessages) Info(args ...interface{}) {
+ m := fmt.Sprint(args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Infof(s string, args ...interface{}) {
+ m := fmt.Sprintf(s, args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Warn(args ...interface{}) {
+ m := fmt.Sprint(args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Warnf(s string, args ...interface{}) {
+ m := fmt.Sprintf(s, args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Error(args ...interface{}) {
+ m := fmt.Sprint(args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Errorf(s string, args ...interface{}) {
+ m := fmt.Sprintf(s, args...)
+ lm.unexpectedMessages <- m
+}
+
+// TestLedgerErrorValidate creates 3 parallel routines adding blocks to the ledger through different interfaces.
+// The purpose here is to simulate the scenario where the catchup and the agreement compete to add blocks to the ledger.
+// The error messages reported can be excessive or unnecessary. This test evaluates what messages are generate and at what frequency.
+func TestLedgerErrorValidate(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
+ var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36}
+
+ proto, _ := config.Consensus[protocol.ConsensusCurrentVersion]
+ origProto := proto
+ defer func() {
+ config.Consensus[protocol.ConsensusCurrentVersion] = origProto
+ }()
+ proto.MinBalance = 0
+ config.Consensus[protocol.ConsensusCurrentVersion] = proto
+
+ blk := bookkeeping.Block{}
+ blk.CurrentProtocol = protocol.ConsensusCurrentVersion
+ blk.RewardsPool = testPoolAddr
+ blk.FeeSink = testSinkAddr
+ blk.BlockHeader.GenesisHash = crypto.Hash([]byte(t.Name()))
+
+ accts := make(map[basics.Address]basics.AccountData)
+ accts[testPoolAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0})
+ accts[testSinkAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0})
+
+ genesisInitState := ledgercore.InitState{
+ Accounts: accts,
+ Block: blk,
+ GenesisHash: crypto.Hash([]byte(t.Name())),
+ }
+
+ expectedMessages := make(chan string, 100)
+ unexpectedMessages := make(chan string, 100)
+
+ const inMem = true
+ cfg := config.GetDefaultLocal()
+ cfg.Archival = true
+ log := loggedMessages{Logger: logging.TestingLog(t), expectedMessages: expectedMessages, unexpectedMessages: unexpectedMessages}
+ log.SetLevel(logging.Debug)
+ realLedger, err := ledger.OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
+ require.NoError(t, err, "could not open ledger")
+ defer realLedger.Close()
+
+ l := Ledger{Ledger: realLedger, log: log}
+ l.log.SetLevel(logging.Debug)
+ require.NotNil(t, &l)
+
+ totalsRound, _, err := realLedger.LatestTotals()
+ require.NoError(t, err)
+ require.Equal(t, basics.Round(0), totalsRound)
+
+ errChan := make(chan error, 1)
+ defer close(errChan)
+
+ wg := sync.WaitGroup{}
+ defer wg.Wait()
+
+ blkChan1 := make(chan bookkeeping.Block, 10)
+ blkChan2 := make(chan bookkeeping.Block, 10)
+ blkChan3 := make(chan bookkeeping.Block, 10)
+ defer close(blkChan1)
+ defer close(blkChan2)
+ defer close(blkChan3)
+
+ // Add blocks to the ledger via EnsureValidatedBlock. This calls AddValidatedBlock, which simply
+ // passes the block to blockQueue. The returned error is handled by EnsureValidatedBlock, which reports
+ // in the form of logged error message.
+ go func() {
+ wg.Add(1)
+ i := 0
+ for blk := range blkChan1 {
+ i++
+ vb, err := validatedBlock(l.Ledger, blk)
+ if err != nil {
+ // AddBlock already added the block
+ // This is okay to ignore.
+ // This error is generated from ledger.Ledger Validate function, used from:
+ // - node blockValidatorImpl Validate
+ // - catchup service s.ledger.Validate (Catchup service returns after the first error)
+ continue
+ }
+ l.EnsureValidatedBlock(vb, agreement.Certificate{})
+ }
+ wg.Done()
+ }()
+
+ // Add blocks to the ledger via EnsureBlock. This basically calls AddBlock, but handles
+ // the errors by logging them. Checking the logged messages to verify its behavior.
+ go func() {
+ wg.Add(1)
+ i := 0
+ for blk := range blkChan2 {
+ i++
+ l.EnsureBlock(&blk, agreement.Certificate{})
+ }
+ wg.Done()
+ }()
+
+ // Add blocks directly to the ledger
+ go func() {
+ wg.Add(1)
+ i := 0
+ for blk := range blkChan3 {
+ i++
+ err := l.AddBlock(blk, agreement.Certificate{})
+ // AddBlock is used in 2 places:
+ // - data.ledger.EnsureBlock which reports a log message as Error or Debug
+ // - catchup.service.fetchAndWrite which leads to interrupting catchup or skiping the round
+ if err != nil {
+ switch err.(type) {
+ // The following two cases are okay to ignore, since these are expected and handled
+ case ledgercore.BlockInLedgerError:
+ case ledgercore.ErrNonSequentialBlockEval:
+ continue
+ default:
+ // Make sure unexpected error is not obtained here
+ errChan <- err
+ }
+ }
+ l.WaitForCommit(blk.BlockHeader.Round)
+ }
+ wg.Done()
+ }()
+
+ // flush the messages output during the setup
+ more := true
+ for more {
+ select {
+ case <-expectedMessages:
+ case <-unexpectedMessages:
+ default:
+ more = false
+ }
+ }
+
+ for rnd := basics.Round(1); rnd <= basics.Round(2000); rnd++ {
+ blk, err := getEmptyBlock(rnd-1, l.Ledger, t.Name(), genesisInitState.Accounts)
+ require.NoError(t, err)
+ blkChan3 <- blk
+ blkChan2 <- blk
+ blkChan1 <- blk
+
+ more = true
+ for more {
+ select {
+ case err := <-errChan:
+ require.NoError(t, err)
+ case <-expectedMessages:
+ // only debug messages should be reported
+ case um := <-unexpectedMessages:
+ require.Empty(t, um, um)
+ default:
+ more = false
+ }
+ }
+ }
+}
+
+func validatedBlock(l *ledger.Ledger, blk bookkeeping.Block) (vb *ledgercore.ValidatedBlock, err error) {
+ backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
+ defer backlogPool.Shutdown()
+ vb, err = l.Validate(context.Background(), blk, backlogPool)
+ return
+}
+
+func getEmptyBlock(afterRound basics.Round, l *ledger.Ledger, genesisID string, initAccounts map[basics.Address]basics.AccountData) (blk bookkeeping.Block, err error) {
+ l.WaitForCommit(afterRound)
+
+ lastBlock, err := l.Block(l.Latest())
+ if err != nil {
+ return
+ }
+
+ proto := config.Consensus[lastBlock.CurrentProtocol]
+ blk.BlockHeader = bookkeeping.BlockHeader{
+ GenesisID: genesisID,
+ Round: l.Latest() + 1,
+ Branch: lastBlock.Hash(),
+ TimeStamp: 0,
+ }
+
+ if proto.SupportGenesisHash {
+ blk.BlockHeader.GenesisHash = crypto.Hash([]byte(genesisID))
+ }
+
+ blk.RewardsPool = testPoolAddr
+ blk.FeeSink = testSinkAddr
+ blk.CurrentProtocol = lastBlock.CurrentProtocol
+
+ blk.TxnRoot, err = blk.PaysetCommit()
+ if err != nil {
+ return
+ }
+ return
+}
diff --git a/ledger/ledger.go b/ledger/ledger.go
index 02d5516af..cf5476e1b 100644
--- a/ledger/ledger.go
+++ b/ledger/ledger.go
@@ -578,6 +578,11 @@ func (l *Ledger) AddBlock(blk bookkeeping.Block, cert agreement.Certificate) err
updates, err := internal.Eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil)
if err != nil {
+ if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound {
+ return ledgercore.BlockInLedgerError{
+ LastRound: errNSBE.EvaluatorRound,
+ NextRound: errNSBE.LatestRound + 1}
+ }
return err
}
vb := ledgercore.MakeValidatedBlock(blk, updates)
@@ -602,7 +607,7 @@ func (l *Ledger) AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.
}
l.headerCache.Put(blk.Round(), blk.BlockHeader)
l.trackers.newBlock(blk, vb.Delta())
- l.log.Debugf("added blk %d", blk.Round())
+ l.log.Debugf("ledger.AddValidatedBlock: added blk %d", blk.Round())
return nil
}