summaryrefslogtreecommitdiff
path: root/txnsync/outgoing_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'txnsync/outgoing_test.go')
-rw-r--r--txnsync/outgoing_test.go615
1 files changed, 0 insertions, 615 deletions
diff --git a/txnsync/outgoing_test.go b/txnsync/outgoing_test.go
deleted file mode 100644
index 3136161ef..000000000
--- a/txnsync/outgoing_test.go
+++ /dev/null
@@ -1,615 +0,0 @@
-// Copyright (C) 2019-2021 Algorand, Inc.
-// This file is part of go-algorand
-//
-// go-algorand is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// go-algorand is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
-
-package txnsync
-
-import (
- "context"
- "reflect"
- "sync"
- "testing"
- "time"
-
- "github.com/stretchr/testify/require"
-
- "github.com/algorand/go-algorand/data/basics"
- "github.com/algorand/go-algorand/data/pooldata"
- "github.com/algorand/go-algorand/data/transactions"
- "github.com/algorand/go-algorand/logging"
- "github.com/algorand/go-algorand/protocol"
- "github.com/algorand/go-algorand/test/partitiontest"
- "github.com/algorand/go-algorand/util/execpool"
- "github.com/algorand/go-algorand/util/timers"
-)
-
-type mockAsyncLogger struct {
- logging.Logger
- warnCalled *bool
-}
-
-func (m mockAsyncLogger) outgoingMessage(mstat msgStats) {
-}
-
-func (m mockAsyncLogger) incomingMessage(mstat msgStats) {
-}
-
-func (m mockAsyncLogger) Infof(string, ...interface{}) {}
-
-func (m mockAsyncLogger) Warnf(string, ...interface{}) {
- if m.warnCalled != nil {
- *m.warnCalled = true
- }
-}
-
-func TestAsyncMessageSent(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- var s syncState
- s.clock = timers.MakeMonotonicClock(time.Now())
- s.log = mockAsyncLogger{}
- s.incomingMessagesQ = makeIncomingMessageQueue()
- defer s.incomingMessagesQ.shutdown()
-
- asyncEncoder := messageAsyncEncoder{
- state: &s,
- messageData: sentMessageMetadata{
- message: &transactionBlockMessage{
- Version: txnBlockMessageVersion,
- Round: 1,
- },
- peer: &Peer{},
- },
- roundClock: timers.MakeMonotonicClock(time.Now()),
- sentMessagesCh: s.outgoingMessagesCallbackCh,
- }
-
- oldTimestamp := asyncEncoder.messageData.sentTimestamp
- a.Equal(asyncEncoder.asyncMessageSent(false, 0), errTransactionSyncOutgoingMessageSendFailed)
- err := asyncEncoder.asyncMessageSent(true, 1337)
- a.Equal(err, errTransactionSyncOutgoingMessageQueueFull)
- a.Equal(asyncEncoder.messageData.sentTimestamp, oldTimestamp)
- a.Equal(asyncEncoder.messageData.sequenceNumber, uint64(1337))
-
- // Make this buffered for now so we catch the select statement
- asyncEncoder.sentMessagesCh = make(chan sentMessageMetadata, 1)
-
- err = asyncEncoder.asyncMessageSent(true, 1337)
- a.Nil(err)
- a.Equal(1, len(asyncEncoder.sentMessagesCh))
-}
-
-type mockAsyncNodeConnector struct {
- NodeConnector
- called *bool
- largeTxnGroup bool
-}
-
-func (m mockAsyncNodeConnector) Random(rng uint64) uint64 {
- // We need to be deterministic in our "randomness" for the tests
- return 42
-}
-
-func (m mockAsyncNodeConnector) SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback) {
- *m.called = true
-}
-
-func (m mockAsyncNodeConnector) GetPendingTransactionGroups() (txGroups []pooldata.SignedTxGroup, latestLocallyOriginatedGroupCounter uint64) {
- if m.largeTxnGroup {
- rval := []pooldata.SignedTxGroup{}
- for i := 0; i < 100000; i++ {
- // Because we use this with non-relay nodes, the syncState will
- // use the locallyGeneratedTransactions() function.
- // To make sure we fill the values appropriately, we are going to
- // set every value here to be locally originated
- // Additionally, we want the encoded length to be 1000 (or something rather large)
- // to make sure that we can attain partial messages (see TestSendMessageLoop test)
- rval = append(rval, pooldata.SignedTxGroup{EncodedLength: 1000, LocallyOriginated: true})
- }
-
- return rval, 1
- }
- return []pooldata.SignedTxGroup{}, 1
-}
-
-// TestAsyncEncodeAndSendErr Tests response when encodeTransactionGroups doesn't return an error
-func TestAsyncEncodeAndSendNonErr(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- var s syncState
- s.clock = timers.MakeMonotonicClock(time.Now())
- warnCalled := false
- s.log = mockAsyncLogger{warnCalled: &warnCalled}
- sendPeerMessageCalled := false
- s.node = mockAsyncNodeConnector{called: &sendPeerMessageCalled}
- s.messageSendWaitGroup = sync.WaitGroup{}
-
- txnGrps := []pooldata.SignedTxGroup{
- pooldata.SignedTxGroup{
- Transactions: []transactions.SignedTxn{
- transactions.SignedTxn{
- Txn: transactions.Transaction{
- Type: protocol.AssetConfigTx,
- },
- },
- },
- },
- }
-
- asyncEncoder := messageAsyncEncoder{
- state: &s,
- messageData: sentMessageMetadata{
- message: &transactionBlockMessage{
- Version: txnBlockMessageVersion,
- Round: 1,
- },
- transactionGroups: txnGrps,
- peer: &Peer{},
- },
- roundClock: timers.MakeMonotonicClock(time.Now()),
- }
-
- asyncEncoder.state.messageSendWaitGroup.Add(1)
-
- err := asyncEncoder.asyncEncodeAndSend(nil)
-
- a.Nil(err)
- a.False(warnCalled)
- a.True(sendPeerMessageCalled)
- a.Nil(asyncEncoder.messageData.transactionGroups)
-}
-
-// TestAsyncEncodeAndSendErr Tests response when encodeTransactionGroups returns an error
-func TestAsyncEncodeAndSendErr(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- var s syncState
- s.clock = timers.MakeMonotonicClock(time.Now())
- warnCalled := false
- s.log = mockAsyncLogger{warnCalled: &warnCalled}
- sendPeerMessageCalled := false
- s.node = mockAsyncNodeConnector{called: &sendPeerMessageCalled}
- s.messageSendWaitGroup = sync.WaitGroup{}
-
- txnGrps := []pooldata.SignedTxGroup{
- pooldata.SignedTxGroup{
- Transactions: []transactions.SignedTxn{
- transactions.SignedTxn{
- Txn: transactions.Transaction{
- Type: protocol.UnknownTx,
- },
- },
- },
- },
- }
-
- asyncEncoder := messageAsyncEncoder{
- state: &s,
- messageData: sentMessageMetadata{
- message: &transactionBlockMessage{
- Version: txnBlockMessageVersion,
- Round: 1,
- },
- transactionGroups: txnGrps,
- peer: &Peer{},
- },
- roundClock: timers.MakeMonotonicClock(time.Now()),
- }
-
- asyncEncoder.state.messageSendWaitGroup.Add(1)
-
- err := asyncEncoder.asyncEncodeAndSend(nil)
-
- a.Nil(err)
- a.True(warnCalled)
- a.True(sendPeerMessageCalled)
-
-}
-
-// TestAsyncEncodeAndSend Tests that SendPeerMessage is called in the node connector
-func TestAsyncEncodeAndSend(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- var s syncState
- s.clock = timers.MakeMonotonicClock(time.Now())
- s.log = mockAsyncLogger{}
- sendPeerMessageCalled := false
- s.node = mockAsyncNodeConnector{called: &sendPeerMessageCalled}
- s.messageSendWaitGroup = sync.WaitGroup{}
-
- asyncEncoder := messageAsyncEncoder{
- state: &s,
- messageData: sentMessageMetadata{
- message: &transactionBlockMessage{
- Version: txnBlockMessageVersion,
- Round: 1,
- },
- peer: &Peer{},
- },
- roundClock: timers.MakeMonotonicClock(time.Now()),
- }
-
- asyncEncoder.state.messageSendWaitGroup.Add(1)
-
- err := asyncEncoder.asyncEncodeAndSend(nil)
- a.Nil(err)
- a.True(sendPeerMessageCalled)
- a.NotZero(asyncEncoder.messageData.sentTimestamp)
-
-}
-
-// TestAssemblePeerMessage_messageConstBloomFilter Tests assemblePeerMessage with messageConstBloomFilter msgOps
-func TestAssemblePeerMessage_messageConstBloomFilter(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- s := syncState{
- node: mockAsyncNodeConnector{},
- clock: timers.MakeMonotonicClock(time.Now()),
- }
-
- s.profiler = makeProfiler(1*time.Millisecond, s.clock, s.log, 1*time.Millisecond)
-
- peer := Peer{}
-
- pendingTransactions := pendingTransactionGroupsSnapshot{
- pendingTransactionsGroups: []pooldata.SignedTxGroup{
- pooldata.SignedTxGroup{},
- },
- }
-
- peer.setLocalRequestParams(111, 222)
- peer.lastReceivedMessageTimestamp = 100
- peer.lastReceivedMessageLocalRound = s.round
-
- expectedFilter := s.makeBloomFilter(requestParams{Offset: 111, Modulator: 222}, pendingTransactions.pendingTransactionsGroups, nil, &s.lastBloomFilter)
-
- s.isRelay = true
- peer.isOutgoing = true
- peer.state = peerStateLateBloom
-
- metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)
-
- a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
- a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
- a.Equal(metaMessage.peer, &peer)
- a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
- a.Equal(metaMessage.message.Round, s.round)
- a.True(responseTime >= 0)
- a.Equal(s.lastBloomFilter, expectedFilter)
-}
-
-// TestAssemblePeerMessage_messageConstBloomFilterNonRelay Tests assemblePeerMessage with messageConstBloomFilter msgOps in a non-relay scenario
-func TestAssemblePeerMessage_messageConstBloomFilterNonRelay(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- s := syncState{
- node: mockAsyncNodeConnector{largeTxnGroup: true},
- clock: timers.MakeMonotonicClock(time.Now()),
- }
-
- s.profiler = makeProfiler(1*time.Millisecond, s.clock, s.log, 1*time.Millisecond)
-
- peer := Peer{}
-
- pendingTransactions := pendingTransactionGroupsSnapshot{
- pendingTransactionsGroups: []pooldata.SignedTxGroup{
- pooldata.SignedTxGroup{},
- },
- }
-
- peer.setLocalRequestParams(111, 222)
- peer.lastReceivedMessageTimestamp = 100
- peer.lastReceivedMessageLocalRound = s.round
-
- expectedFilter := s.makeBloomFilter(requestParams{Offset: 111, Modulator: 222}, pendingTransactions.pendingTransactionsGroups, nil, &s.lastBloomFilter)
-
- s.isRelay = false
- s.fetchTransactions = true
- peer.isOutgoing = true
- peer.state = peerStateLateBloom
-
- metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)
-
- a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
- a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
- a.Equal(metaMessage.peer, &peer)
- a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
- a.Equal(metaMessage.message.Round, s.round)
- a.True(responseTime >= 0)
- a.NotEqual(s.lastBloomFilter, expectedFilter)
-}
-
-// TestAssemblePeerMessage_messageConstNextMinDelay_messageConstUpdateRequestParams Tests assemblePeerMessage with messageConstNextMinDelay | messageConstUpdateRequestParams msgOps
-func TestAssemblePeerMessage_messageConstNextMinDelay_messageConstUpdateRequestParams(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- s := syncState{clock: timers.MakeMonotonicClock(time.Now())}
-
- s.profiler = makeProfiler(1*time.Millisecond, s.clock, s.log, 1*time.Millisecond)
-
- peer := Peer{}
-
- pendingTransactions := pendingTransactionGroupsSnapshot{}
-
- peer.setLocalRequestParams(111, 222)
- peer.lastReceivedMessageTimestamp = 100
- peer.lastReceivedMessageLocalRound = s.round
-
- s.isRelay = true
- s.lastBeta = 123 * time.Nanosecond
-
- metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)
-
- a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
- a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
- a.Equal(metaMessage.peer, &peer)
- a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
- a.Equal(metaMessage.message.Round, s.round)
- a.True(responseTime >= 0)
- a.Equal(metaMessage.message.MsgSync.NextMsgMinDelay, uint64(s.lastBeta.Nanoseconds())*2)
-
-}
-
-// TestAssemblePeerMessage_messageConstTransactions Tests assemblePeerMessage messageConstTransactions msgOps
-func TestAssemblePeerMessage_messageConstTransactions(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- s := syncState{clock: timers.MakeMonotonicClock(time.Now())}
-
- s.profiler = makeProfiler(1*time.Millisecond, s.clock, s.log, 1*time.Millisecond)
-
- peer := Peer{}
-
- pendingTransactions := pendingTransactionGroupsSnapshot{
- latestLocallyOriginatedGroupCounter: 1,
- pendingTransactionsGroups: []pooldata.SignedTxGroup{
- pooldata.SignedTxGroup{
- LocallyOriginated: true,
- EncodedLength: 2,
- },
- },
- }
-
- peer.setLocalRequestParams(111, 222)
- peer.lastReceivedMessageTimestamp = 100
- peer.lastReceivedMessageLocalRound = s.round
- peer.requestedTransactionsModulator = 2
- peer.recentSentTransactions = makeTransactionCache(5, 10, 20)
-
- s.isRelay = false
- peer.isOutgoing = true
- peer.state = peerStateHoldsoff
-
- metaMessage, _, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
-
- a.Equal(len(metaMessage.transactionGroups), 1)
- a.True(reflect.DeepEqual(metaMessage.transactionGroups[0], pendingTransactions.pendingTransactionsGroups[0]))
-
-}
-
-// TestLocallyGeneratedTransactions Separately tests that generating transactions are being
-// correctly made given a signed transaction group array.
-func TestLocallyGeneratedTransactions(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- pendingTransactions := &pendingTransactionGroupsSnapshot{}
-
- s := syncState{}
-
- pendingTransactions.latestLocallyOriginatedGroupCounter = 1
-
- a.Equal(s.locallyGeneratedTransactions(pendingTransactions), []pooldata.SignedTxGroup{})
-
- pendingTransactions.pendingTransactionsGroups = []pooldata.SignedTxGroup{
- pooldata.SignedTxGroup{
- LocallyOriginated: true,
- EncodedLength: 2,
- },
- pooldata.SignedTxGroup{
- LocallyOriginated: false,
- EncodedLength: 1,
- },
- pooldata.SignedTxGroup{
- LocallyOriginated: true,
- EncodedLength: 3,
- },
- }
-
- pendingTransactions.latestLocallyOriginatedGroupCounter = pooldata.InvalidSignedTxGroupCounter
-
- a.Equal(s.locallyGeneratedTransactions(pendingTransactions), []pooldata.SignedTxGroup{})
-
- pendingTransactions.latestLocallyOriginatedGroupCounter = 1
-
- expected := []pooldata.SignedTxGroup{
-
- pooldata.SignedTxGroup{
- LocallyOriginated: true,
- EncodedLength: 2,
- },
-
- pooldata.SignedTxGroup{
- LocallyOriginated: true,
- EncodedLength: 3,
- },
- }
-
- a.Equal(s.locallyGeneratedTransactions(pendingTransactions), expected)
-
-}
-
-type mockBacklogThreadPool struct {
- execpool.BacklogPool
- enqueueCalled *int
-}
-
-func (b *mockBacklogThreadPool) EnqueueBacklog(enqueueCtx context.Context, t execpool.ExecFunc, arg interface{}, out chan interface{}) error {
- if b.enqueueCalled != nil {
- *b.enqueueCalled++
- }
-
- return nil
-}
-
-// TestEnqueue directly tests that enqueue will call the Done() function for the messageSendWaitGroup
-func TestEnqueue(t *testing.T) {
-
- partitiontest.PartitionTest(t)
-
- s := syncState{clock: timers.MakeMonotonicClock(time.Now())}
- s.log = mockAsyncLogger{}
- s.node = &mockNodeConnector{}
- s.threadpool = execpool.MakeBacklog(execpool.MakePool(t), 5, execpool.LowPriority, t)
-
- prof := makeProfiler(2*time.Millisecond, s.clock, s.log, 3*time.Millisecond)
- s.profiler = prof
-
- asyncEncoder := messageAsyncEncoder{
- state: &s,
- messageData: sentMessageMetadata{
- message: &transactionBlockMessage{
- Version: txnBlockMessageVersion,
- Round: 1,
- },
- peer: &Peer{},
- },
- roundClock: timers.MakeMonotonicClock(time.Now()),
- }
-
- asyncEncoder.enqueue()
-
- // Wait for the enqueued function to return the messageSendWaitGroup
- s.messageSendWaitGroup.Wait()
-
- // Dummy require to make sure we pass this test...the real value of this test
- // is to make sure that the wait group is appropriately set
- require.True(t, true)
-
-}
-
-// TestSendMessageLoop tests the send message loop
-func TestSendMessageLoop(t *testing.T) {
-
- partitiontest.PartitionTest(t)
-
- enqueueCalled := 0
-
- s := syncState{
- clock: timers.MakeMonotonicClock(time.Now()),
- scheduler: makePeerScheduler(),
- }
- s.log = mockAsyncLogger{}
- // Get a large amount of signed txns with a low data exchange rate
- // to get partial messages to trigger peerOpsClearInterruptible
- s.node = &mockAsyncNodeConnector{largeTxnGroup: true}
- s.threadpool = &mockBacklogThreadPool{enqueueCalled: &enqueueCalled}
-
- prof := makeProfiler(2*time.Millisecond, s.clock, s.log, 3*time.Millisecond)
- s.profiler = prof
- s.interruptablePeersMap = make(map[*Peer]int)
-
- peers := []*Peer{
- // peerOpsReschedule
- &Peer{
- recentSentTransactions: makeTransactionCache(10, 20, 10),
- requestedTransactionsModulator: 2,
- // Reduced rate to trigger partial messages
- dataExchangeRate: 10,
- // greater than 0 for state machine logic
- nextStateTimestamp: 1 * time.Millisecond,
- },
- &Peer{
- recentSentTransactions: makeTransactionCache(10, 20, 10),
- requestedTransactionsModulator: 2,
- // Reduced rate to trigger partial messages
- dataExchangeRate: 10,
- // greater than 0 for state machine logic
- nextStateTimestamp: 1 * time.Millisecond,
- },
- }
-
- // Add the peers to test that peerOpsClearInterruptible removes them
-
- for _, p := range peers {
- s.interruptablePeers = append(s.interruptablePeers, p)
- s.interruptablePeersMap[p] = len(s.interruptablePeers) - 1
- }
-
- // The deadline is set to a ridiculously high number to make sure that we cycle through all our peers
- // and not break
- s.sendMessageLoop(s.clock.Since(), s.clock.DeadlineMonitorAt(s.clock.Since()+5*time.Minute), peers)
-
- require.Equal(t, 2, enqueueCalled)
- require.Equal(t, 0, len(s.interruptablePeersMap))
-
-}
-
-// TestEvaluateOutgoingMessage tests the evaluateOutgoingMessage function of syncState
-func TestEvaluateOutgoingMessage(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- a := require.New(t)
-
- s := syncState{clock: timers.MakeMonotonicClock(time.Now())}
- s.log = mockAsyncLogger{}
-
- peer := Peer{
- recentSentTransactions: makeTransactionCache(10, 20, 10),
- }
-
- sentMessage := sentMessageMetadata{
- sentTimestamp: time.Duration(time.Millisecond * 1234),
- message: &transactionBlockMessage{Round: 3},
- sequenceNumber: 42,
- projectedSequenceNumber: 44,
- encodedMessageSize: 23,
- peer: &peer,
- }
-
- s.evaluateOutgoingMessage(sentMessage)
- // This should be zero because sequenceNumber and projectedSequenceNumber are not equal
- a.Equal(peer.lastSentMessageTimestamp, 0*time.Millisecond)
-
- a.Equal(peer.lastSentMessageSequenceNumber, uint64(42))
- a.Equal(peer.lastSentMessageRound, basics.Round(3))
- a.Equal(peer.lastSentMessageSize, 23)
-
- sentMessage.sequenceNumber = sentMessage.projectedSequenceNumber
-
- s.evaluateOutgoingMessage(sentMessage)
- a.Equal(peer.lastSentMessageTimestamp, 1234*time.Millisecond)
-
- a.Equal(peer.lastSentMessageSequenceNumber, uint64(44))
- a.Equal(peer.lastSentMessageRound, basics.Round(3))
- a.Equal(peer.lastSentMessageSize, 23)
-}