diff options
Diffstat (limited to 'data/ledger_test.go')
-rw-r--r-- | data/ledger_test.go | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/data/ledger_test.go b/data/ledger_test.go index 29456608b..dc50147db 100644 --- a/data/ledger_test.go +++ b/data/ledger_test.go @@ -17,6 +17,9 @@ package data import ( + "context" + "fmt" + "sync" "testing" "github.com/stretchr/testify/require" @@ -32,6 +35,7 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" + "github.com/algorand/go-algorand/util/execpool" ) var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} @@ -420,3 +424,240 @@ func TestConsensusVersion(t *testing.T) { require.Equal(t, protocol.ConsensusVersion(""), ver) require.Equal(t, ledgercore.ErrNoEntry{Round: basics.Round(blk.BlockHeader.NextProtocolSwitchOn + 1), Latest: basics.Round(blk.BlockHeader.Round), Committed: basics.Round(blk.BlockHeader.Round)}, err) } + +type loggedMessages struct { + logging.Logger + expectedMessages chan string + unexpectedMessages chan string +} + +func (lm loggedMessages) Debug(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Debugf(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.expectedMessages <- m +} +func (lm loggedMessages) Info(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Infof(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Warn(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Warnf(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Error(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Errorf(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.unexpectedMessages <- m +} + +// TestLedgerErrorValidate creates 3 parallel routines adding blocks to the ledger through different interfaces. +// The purpose here is to simulate the scenario where the catchup and the agreement compete to add blocks to the ledger. +// The error messages reported can be excessive or unnecessary. This test evaluates what messages are generate and at what frequency. +func TestLedgerErrorValidate(t *testing.T) { + partitiontest.PartitionTest(t) + + var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36} + + proto, _ := config.Consensus[protocol.ConsensusCurrentVersion] + origProto := proto + defer func() { + config.Consensus[protocol.ConsensusCurrentVersion] = origProto + }() + proto.MinBalance = 0 + config.Consensus[protocol.ConsensusCurrentVersion] = proto + + blk := bookkeeping.Block{} + blk.CurrentProtocol = protocol.ConsensusCurrentVersion + blk.RewardsPool = testPoolAddr + blk.FeeSink = testSinkAddr + blk.BlockHeader.GenesisHash = crypto.Hash([]byte(t.Name())) + + accts := make(map[basics.Address]basics.AccountData) + accts[testPoolAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0}) + accts[testSinkAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0}) + + genesisInitState := ledgercore.InitState{ + Accounts: accts, + Block: blk, + GenesisHash: crypto.Hash([]byte(t.Name())), + } + + expectedMessages := make(chan string, 100) + unexpectedMessages := make(chan string, 100) + + const inMem = true + cfg := config.GetDefaultLocal() + cfg.Archival = true + log := loggedMessages{Logger: logging.TestingLog(t), expectedMessages: expectedMessages, unexpectedMessages: unexpectedMessages} + log.SetLevel(logging.Debug) + realLedger, err := ledger.OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) + require.NoError(t, err, "could not open ledger") + defer realLedger.Close() + + l := Ledger{Ledger: realLedger, log: log} + l.log.SetLevel(logging.Debug) + require.NotNil(t, &l) + + totalsRound, _, err := realLedger.LatestTotals() + require.NoError(t, err) + require.Equal(t, basics.Round(0), totalsRound) + + errChan := make(chan error, 1) + defer close(errChan) + + wg := sync.WaitGroup{} + defer wg.Wait() + + blkChan1 := make(chan bookkeeping.Block, 10) + blkChan2 := make(chan bookkeeping.Block, 10) + blkChan3 := make(chan bookkeeping.Block, 10) + defer close(blkChan1) + defer close(blkChan2) + defer close(blkChan3) + + // Add blocks to the ledger via EnsureValidatedBlock. This calls AddValidatedBlock, which simply + // passes the block to blockQueue. The returned error is handled by EnsureValidatedBlock, which reports + // in the form of logged error message. + go func() { + wg.Add(1) + i := 0 + for blk := range blkChan1 { + i++ + vb, err := validatedBlock(l.Ledger, blk) + if err != nil { + // AddBlock already added the block + // This is okay to ignore. + // This error is generated from ledger.Ledger Validate function, used from: + // - node blockValidatorImpl Validate + // - catchup service s.ledger.Validate (Catchup service returns after the first error) + continue + } + l.EnsureValidatedBlock(vb, agreement.Certificate{}) + } + wg.Done() + }() + + // Add blocks to the ledger via EnsureBlock. This basically calls AddBlock, but handles + // the errors by logging them. Checking the logged messages to verify its behavior. + go func() { + wg.Add(1) + i := 0 + for blk := range blkChan2 { + i++ + l.EnsureBlock(&blk, agreement.Certificate{}) + } + wg.Done() + }() + + // Add blocks directly to the ledger + go func() { + wg.Add(1) + i := 0 + for blk := range blkChan3 { + i++ + err := l.AddBlock(blk, agreement.Certificate{}) + // AddBlock is used in 2 places: + // - data.ledger.EnsureBlock which reports a log message as Error or Debug + // - catchup.service.fetchAndWrite which leads to interrupting catchup or skiping the round + if err != nil { + switch err.(type) { + // The following two cases are okay to ignore, since these are expected and handled + case ledgercore.BlockInLedgerError: + case ledgercore.ErrNonSequentialBlockEval: + continue + default: + // Make sure unexpected error is not obtained here + errChan <- err + } + } + l.WaitForCommit(blk.BlockHeader.Round) + } + wg.Done() + }() + + // flush the messages output during the setup + more := true + for more { + select { + case <-expectedMessages: + case <-unexpectedMessages: + default: + more = false + } + } + + for rnd := basics.Round(1); rnd <= basics.Round(2000); rnd++ { + blk, err := getEmptyBlock(rnd-1, l.Ledger, t.Name(), genesisInitState.Accounts) + require.NoError(t, err) + blkChan3 <- blk + blkChan2 <- blk + blkChan1 <- blk + + more = true + for more { + select { + case err := <-errChan: + require.NoError(t, err) + case <-expectedMessages: + // only debug messages should be reported + case um := <-unexpectedMessages: + require.Empty(t, um, um) + default: + more = false + } + } + } +} + +func validatedBlock(l *ledger.Ledger, blk bookkeeping.Block) (vb *ledgercore.ValidatedBlock, err error) { + backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) + defer backlogPool.Shutdown() + vb, err = l.Validate(context.Background(), blk, backlogPool) + return +} + +func getEmptyBlock(afterRound basics.Round, l *ledger.Ledger, genesisID string, initAccounts map[basics.Address]basics.AccountData) (blk bookkeeping.Block, err error) { + l.WaitForCommit(afterRound) + + lastBlock, err := l.Block(l.Latest()) + if err != nil { + return + } + + proto := config.Consensus[lastBlock.CurrentProtocol] + blk.BlockHeader = bookkeeping.BlockHeader{ + GenesisID: genesisID, + Round: l.Latest() + 1, + Branch: lastBlock.Hash(), + TimeStamp: 0, + } + + if proto.SupportGenesisHash { + blk.BlockHeader.GenesisHash = crypto.Hash([]byte(genesisID)) + } + + blk.RewardsPool = testPoolAddr + blk.FeeSink = testSinkAddr + blk.CurrentProtocol = lastBlock.CurrentProtocol + + blk.TxnRoot, err = blk.PaysetCommit() + if err != nil { + return + } + return +} |