summaryrefslogtreecommitdiff
path: root/data/ledger_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'data/ledger_test.go')
-rw-r--r--data/ledger_test.go241
1 files changed, 241 insertions, 0 deletions
diff --git a/data/ledger_test.go b/data/ledger_test.go
index 29456608b..dc50147db 100644
--- a/data/ledger_test.go
+++ b/data/ledger_test.go
@@ -17,6 +17,9 @@
package data
import (
+ "context"
+ "fmt"
+ "sync"
"testing"
"github.com/stretchr/testify/require"
@@ -32,6 +35,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
+ "github.com/algorand/go-algorand/util/execpool"
)
var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
@@ -420,3 +424,240 @@ func TestConsensusVersion(t *testing.T) {
require.Equal(t, protocol.ConsensusVersion(""), ver)
require.Equal(t, ledgercore.ErrNoEntry{Round: basics.Round(blk.BlockHeader.NextProtocolSwitchOn + 1), Latest: basics.Round(blk.BlockHeader.Round), Committed: basics.Round(blk.BlockHeader.Round)}, err)
}
+
+type loggedMessages struct {
+ logging.Logger
+ expectedMessages chan string
+ unexpectedMessages chan string
+}
+
+func (lm loggedMessages) Debug(args ...interface{}) {
+ m := fmt.Sprint(args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Debugf(s string, args ...interface{}) {
+ m := fmt.Sprintf(s, args...)
+ lm.expectedMessages <- m
+}
+func (lm loggedMessages) Info(args ...interface{}) {
+ m := fmt.Sprint(args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Infof(s string, args ...interface{}) {
+ m := fmt.Sprintf(s, args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Warn(args ...interface{}) {
+ m := fmt.Sprint(args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Warnf(s string, args ...interface{}) {
+ m := fmt.Sprintf(s, args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Error(args ...interface{}) {
+ m := fmt.Sprint(args...)
+ lm.unexpectedMessages <- m
+}
+func (lm loggedMessages) Errorf(s string, args ...interface{}) {
+ m := fmt.Sprintf(s, args...)
+ lm.unexpectedMessages <- m
+}
+
+// TestLedgerErrorValidate creates 3 parallel routines adding blocks to the ledger through different interfaces.
+// The purpose here is to simulate the scenario where the catchup and the agreement compete to add blocks to the ledger.
+// The error messages reported can be excessive or unnecessary. This test evaluates what messages are generate and at what frequency.
+func TestLedgerErrorValidate(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
+ var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36}
+
+ proto, _ := config.Consensus[protocol.ConsensusCurrentVersion]
+ origProto := proto
+ defer func() {
+ config.Consensus[protocol.ConsensusCurrentVersion] = origProto
+ }()
+ proto.MinBalance = 0
+ config.Consensus[protocol.ConsensusCurrentVersion] = proto
+
+ blk := bookkeeping.Block{}
+ blk.CurrentProtocol = protocol.ConsensusCurrentVersion
+ blk.RewardsPool = testPoolAddr
+ blk.FeeSink = testSinkAddr
+ blk.BlockHeader.GenesisHash = crypto.Hash([]byte(t.Name()))
+
+ accts := make(map[basics.Address]basics.AccountData)
+ accts[testPoolAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0})
+ accts[testSinkAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0})
+
+ genesisInitState := ledgercore.InitState{
+ Accounts: accts,
+ Block: blk,
+ GenesisHash: crypto.Hash([]byte(t.Name())),
+ }
+
+ expectedMessages := make(chan string, 100)
+ unexpectedMessages := make(chan string, 100)
+
+ const inMem = true
+ cfg := config.GetDefaultLocal()
+ cfg.Archival = true
+ log := loggedMessages{Logger: logging.TestingLog(t), expectedMessages: expectedMessages, unexpectedMessages: unexpectedMessages}
+ log.SetLevel(logging.Debug)
+ realLedger, err := ledger.OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
+ require.NoError(t, err, "could not open ledger")
+ defer realLedger.Close()
+
+ l := Ledger{Ledger: realLedger, log: log}
+ l.log.SetLevel(logging.Debug)
+ require.NotNil(t, &l)
+
+ totalsRound, _, err := realLedger.LatestTotals()
+ require.NoError(t, err)
+ require.Equal(t, basics.Round(0), totalsRound)
+
+ errChan := make(chan error, 1)
+ defer close(errChan)
+
+ wg := sync.WaitGroup{}
+ defer wg.Wait()
+
+ blkChan1 := make(chan bookkeeping.Block, 10)
+ blkChan2 := make(chan bookkeeping.Block, 10)
+ blkChan3 := make(chan bookkeeping.Block, 10)
+ defer close(blkChan1)
+ defer close(blkChan2)
+ defer close(blkChan3)
+
+ // Add blocks to the ledger via EnsureValidatedBlock. This calls AddValidatedBlock, which simply
+ // passes the block to blockQueue. The returned error is handled by EnsureValidatedBlock, which reports
+ // in the form of logged error message.
+ go func() {
+ wg.Add(1)
+ i := 0
+ for blk := range blkChan1 {
+ i++
+ vb, err := validatedBlock(l.Ledger, blk)
+ if err != nil {
+ // AddBlock already added the block
+ // This is okay to ignore.
+ // This error is generated from ledger.Ledger Validate function, used from:
+ // - node blockValidatorImpl Validate
+ // - catchup service s.ledger.Validate (Catchup service returns after the first error)
+ continue
+ }
+ l.EnsureValidatedBlock(vb, agreement.Certificate{})
+ }
+ wg.Done()
+ }()
+
+ // Add blocks to the ledger via EnsureBlock. This basically calls AddBlock, but handles
+ // the errors by logging them. Checking the logged messages to verify its behavior.
+ go func() {
+ wg.Add(1)
+ i := 0
+ for blk := range blkChan2 {
+ i++
+ l.EnsureBlock(&blk, agreement.Certificate{})
+ }
+ wg.Done()
+ }()
+
+ // Add blocks directly to the ledger
+ go func() {
+ wg.Add(1)
+ i := 0
+ for blk := range blkChan3 {
+ i++
+ err := l.AddBlock(blk, agreement.Certificate{})
+ // AddBlock is used in 2 places:
+ // - data.ledger.EnsureBlock which reports a log message as Error or Debug
+ // - catchup.service.fetchAndWrite which leads to interrupting catchup or skiping the round
+ if err != nil {
+ switch err.(type) {
+ // The following two cases are okay to ignore, since these are expected and handled
+ case ledgercore.BlockInLedgerError:
+ case ledgercore.ErrNonSequentialBlockEval:
+ continue
+ default:
+ // Make sure unexpected error is not obtained here
+ errChan <- err
+ }
+ }
+ l.WaitForCommit(blk.BlockHeader.Round)
+ }
+ wg.Done()
+ }()
+
+ // flush the messages output during the setup
+ more := true
+ for more {
+ select {
+ case <-expectedMessages:
+ case <-unexpectedMessages:
+ default:
+ more = false
+ }
+ }
+
+ for rnd := basics.Round(1); rnd <= basics.Round(2000); rnd++ {
+ blk, err := getEmptyBlock(rnd-1, l.Ledger, t.Name(), genesisInitState.Accounts)
+ require.NoError(t, err)
+ blkChan3 <- blk
+ blkChan2 <- blk
+ blkChan1 <- blk
+
+ more = true
+ for more {
+ select {
+ case err := <-errChan:
+ require.NoError(t, err)
+ case <-expectedMessages:
+ // only debug messages should be reported
+ case um := <-unexpectedMessages:
+ require.Empty(t, um, um)
+ default:
+ more = false
+ }
+ }
+ }
+}
+
+func validatedBlock(l *ledger.Ledger, blk bookkeeping.Block) (vb *ledgercore.ValidatedBlock, err error) {
+ backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
+ defer backlogPool.Shutdown()
+ vb, err = l.Validate(context.Background(), blk, backlogPool)
+ return
+}
+
+func getEmptyBlock(afterRound basics.Round, l *ledger.Ledger, genesisID string, initAccounts map[basics.Address]basics.AccountData) (blk bookkeeping.Block, err error) {
+ l.WaitForCommit(afterRound)
+
+ lastBlock, err := l.Block(l.Latest())
+ if err != nil {
+ return
+ }
+
+ proto := config.Consensus[lastBlock.CurrentProtocol]
+ blk.BlockHeader = bookkeeping.BlockHeader{
+ GenesisID: genesisID,
+ Round: l.Latest() + 1,
+ Branch: lastBlock.Hash(),
+ TimeStamp: 0,
+ }
+
+ if proto.SupportGenesisHash {
+ blk.BlockHeader.GenesisHash = crypto.Hash([]byte(genesisID))
+ }
+
+ blk.RewardsPool = testPoolAddr
+ blk.FeeSink = testSinkAddr
+ blk.CurrentProtocol = lastBlock.CurrentProtocol
+
+ blk.TxnRoot, err = blk.PaysetCommit()
+ if err != nil {
+ return
+ }
+ return
+}