diff options
Diffstat (limited to 'txnsync/emulatorNode_test.go')
-rw-r--r-- | txnsync/emulatorNode_test.go | 397 |
1 files changed, 0 insertions, 397 deletions
diff --git a/txnsync/emulatorNode_test.go b/txnsync/emulatorNode_test.go deleted file mode 100644 index 378f1622f..000000000 --- a/txnsync/emulatorNode_test.go +++ /dev/null @@ -1,397 +0,0 @@ -// Copyright (C) 2019-2021 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. - -package txnsync - -import ( - "errors" - "fmt" - "sort" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/algorand/go-algorand/crypto" - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/pooldata" - "github.com/algorand/go-algorand/data/transactions" - "github.com/algorand/go-algorand/protocol" - "github.com/algorand/go-algorand/util/timers" -) - -type queuedSentMessageCallback struct { - callback SendMessageCallback - seq uint64 -} -type queuedMessage struct { - bytes []byte - readyAt time.Duration -} -type networkPeer struct { - peer *Peer - uploadSpeed uint64 - downloadSpeed uint64 - isOutgoing bool - outSeq uint64 - inSeq uint64 - target int - - messageQ []queuedMessage // incoming message queue - - mu sync.Mutex `algofix:"allow sync.Mutex"` - - deferredSentMessages []queuedSentMessageCallback // outgoing messages callback queue -} - -// emulatedNode implements the NodeConnector interface -type emulatedNode struct { - externalEvents chan Event - emulator *emulator - peers map[int]*networkPeer - nodeIndex int - expiredTx []pooldata.SignedTxGroup - txpoolEntries []pooldata.SignedTxGroup - txpoolIds map[transactions.Txid]bool - latestLocallyOriginatedGroupCounter uint64 - name string - blocked chan struct{} - mu sync.Mutex `algofix:"allow sync.Mutex"` - txpoolGroupCounter uint64 - blockingEnabled bool - nodeBlocked chan struct{} // channel is closed when node is blocked. - nodeRunning chan struct{} // channel is closed when node is running. -} - -func makeEmulatedNode(emulator *emulator, nodeIdx int) *emulatedNode { - en := &emulatedNode{ - emulator: emulator, - peers: make(map[int]*networkPeer), - externalEvents: make(chan Event, 10000), - nodeIndex: nodeIdx, - txpoolIds: make(map[transactions.Txid]bool), - name: emulator.scenario.netConfig.nodes[nodeIdx].name, - blockingEnabled: true, - nodeBlocked: make(chan struct{}, 1), - nodeRunning: make(chan struct{}, 1), - } - close(en.nodeRunning) - - // add outgoing connections - for _, conn := range emulator.scenario.netConfig.nodes[nodeIdx].outgoingConnections { - en.peers[conn.target] = &networkPeer{ - uploadSpeed: conn.uploadSpeed, - downloadSpeed: conn.downloadSpeed, - isOutgoing: true, - target: conn.target, - } - } - // add incoming connections - for nodeID, nodeConfig := range emulator.scenario.netConfig.nodes { - if nodeID == nodeIdx { - continue - } - for _, conn := range nodeConfig.outgoingConnections { - if conn.target != nodeIdx { - continue - } - // the upload & download speeds are in reverse. This isn't a bug since we want the incoming - // connection to be the opposite side of the connection. - en.peers[nodeID] = &networkPeer{ - uploadSpeed: conn.downloadSpeed, - downloadSpeed: conn.uploadSpeed, - isOutgoing: false, - target: nodeID, - } - } - } - return en -} - -func (n *emulatedNode) Events() <-chan Event { - return n.externalEvents -} - -func (n *emulatedNode) NotifyMonitor() chan struct{} { - var c chan struct{} - n.mu.Lock() - if n.blockingEnabled { - c = make(chan struct{}) - n.blocked = c - close(n.nodeBlocked) - n.nodeRunning = make(chan struct{}, 1) - n.mu.Unlock() - <-c - n.mu.Lock() - close(n.nodeRunning) - n.nodeBlocked = make(chan struct{}, 1) - n.mu.Unlock() - // return a closed channel. - return c - } - n.mu.Unlock() - // return an open channel - return make(chan struct{}) -} -func (n *emulatedNode) disableBlocking() { - n.mu.Lock() - n.blockingEnabled = false - n.mu.Unlock() - n.unblock() -} -func (n *emulatedNode) unblock() { - n.mu.Lock() - // wait until the state changes to StateMachineRunning - select { - case <-n.nodeBlocked: - // we're blocked. - if n.blocked != nil { - close(n.blocked) - n.blocked = nil - } - runningCh := n.nodeRunning - n.mu.Unlock() - <-runningCh - return - default: - } - n.mu.Unlock() -} - -func (n *emulatedNode) waitBlocked() { - n.mu.Lock() - select { - case <-n.nodeRunning: - blockedCh := n.nodeBlocked - n.mu.Unlock() - <-blockedCh - return - default: - } - n.mu.Unlock() -} - -func (n *emulatedNode) GetCurrentRoundSettings() RoundSettings { - return RoundSettings{ - Round: n.emulator.currentRound, - FetchTransactions: true, - } - -} -func (n *emulatedNode) Clock() timers.WallClock { - return n.emulator.clock.Zero().(timers.WallClock) -} - -func (n *emulatedNode) Random(x uint64) (out uint64) { - limit := x - x += uint64(n.nodeIndex) * 997 - x += uint64(n.emulator.currentRound) * 797 - x += uint64(n.emulator.lastRandom) * 797 - bytes := make([]byte, 8) - for i := 0; i < 8; i++ { - bytes[i] = byte(x >> (i * 8)) - } - digest := crypto.Hash(bytes) - out = 0 - for i := 0; i < 8; i++ { - out = out << 8 - out += uint64(digest[i]) - } - out = out % limit - n.emulator.lastRandom ^= out - return out -} - -func (n *emulatedNode) orderedPeers() (out []*networkPeer) { - peerToIndex := make(map[*networkPeer]int) - for idx, peer := range n.peers { - out = append(out, peer) - peerToIndex[peer] = idx - } - // sort the peers, which we need in order to make the test deterministic. - sort.Slice(out, func(i, j int) bool { - netPeer1 := out[i] - netPeer2 := out[j] - return peerToIndex[netPeer1] < peerToIndex[netPeer2] - }) - return -} - -func (n *emulatedNode) GetPeers() (out []PeerInfo) { - for _, peer := range n.orderedPeers() { - out = append(out, PeerInfo{TxnSyncPeer: peer.peer, NetworkPeer: peer, IsOutgoing: peer.isOutgoing}) - } - return out -} - -func (n *emulatedNode) GetPeer(p interface{}) PeerInfo { - netPeer := p.(*networkPeer) - return PeerInfo{ - TxnSyncPeer: netPeer.peer, - IsOutgoing: netPeer.isOutgoing, - NetworkPeer: p, - } -} - -func (n *emulatedNode) UpdatePeers(txPeers []*Peer, netPeers []interface{}, _ uint64) { - for i, peer := range netPeers { - netPeer := peer.(*networkPeer) - netPeer.peer = txPeers[i] - } -} - -func (n *emulatedNode) enqueueMessage(from int, msg queuedMessage) { - n.peers[from].mu.Lock() - baseTime := n.emulator.clock.Since() - if len(n.peers[from].messageQ) > 0 { - if n.peers[from].messageQ[len(n.peers[from].messageQ)-1].readyAt > baseTime { - baseTime = n.peers[from].messageQ[len(n.peers[from].messageQ)-1].readyAt - } - } - // the message bytes need to be copied, so that the originating bytes could be safely deleted. - msgBytes := make([]byte, len(msg.bytes)) - copy(msgBytes[:], msg.bytes[:]) - n.peers[from].messageQ = append(n.peers[from].messageQ, queuedMessage{bytes: msgBytes, readyAt: baseTime + msg.readyAt}) - n.peers[from].mu.Unlock() -} - -func (n *emulatedNode) SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback) { - peer := netPeer.(*networkPeer) - otherNode := n.emulator.nodes[peer.target] - sendTime := time.Duration(len(msg)) * time.Second / time.Duration(peer.uploadSpeed) - otherNode.enqueueMessage(n.nodeIndex, queuedMessage{bytes: msg, readyAt: sendTime}) - - peer.deferredSentMessages = append(peer.deferredSentMessages, queuedSentMessageCallback{callback: callback, seq: peer.outSeq}) - peer.outSeq++ -} - -func (n *emulatedNode) GetPeerLatency(netPeer interface{}) time.Duration { - return 0 -} - -func (n *emulatedNode) GetPendingTransactionGroups() ([]pooldata.SignedTxGroup, uint64) { - return n.txpoolEntries, n.latestLocallyOriginatedGroupCounter -} - -func (n *emulatedNode) IncomingTransactionGroups(peer *Peer, messageSeq uint64, txGroups []pooldata.SignedTxGroup) (transactionPoolSize int) { - // add to transaction pool. - duplicateMessage := 0 - duplicateMessageSize := 0 - encodingBuf := protocol.GetEncodingBuf() - transactionPoolSize = len(n.txpoolEntries) - for _, group := range txGroups { - if group.Transactions[0].Txn.LastValid < n.emulator.currentRound { - continue - } - txID := group.Transactions[0].ID() - if n.txpoolIds[txID] { - duplicateMessage++ - duplicateMessageSize += len(group.Transactions[0].Txn.Note) - continue - } - n.txpoolIds[txID] = true - group.GroupCounter = n.txpoolGroupCounter - n.txpoolGroupCounter++ - group.GroupTransactionID = group.Transactions.ID() - for _, txn := range group.Transactions { - encodingBuf = encodingBuf[:0] - group.EncodedLength += len(txn.MarshalMsg(encodingBuf)) - } - n.txpoolEntries = append(n.txpoolEntries, group) - } - protocol.PutEncodingBuf(encodingBuf) - if duplicateMessage > 0 && testing.Verbose() { - fmt.Printf("%s : %d duplicate messages recieved\n", n.name, duplicateMessage) - } - atomic.AddUint64(&n.emulator.totalDuplicateTransactions, uint64(duplicateMessage)) - atomic.AddUint64(&n.emulator.totalDuplicateTransactionSize, uint64(duplicateMessageSize)) - select { - case peer.GetTransactionPoolAckChannel() <- messageSeq: - default: - panic(errors.New("IncomingTransactionGroups was unable to write messageSeq to the ack channel")) - } - return -} - -func (n *emulatedNode) step() { - msgHandler := n.emulator.syncers[n.nodeIndex].GetIncomingMessageHandler() - now := n.emulator.clock.Since() - // check if we have any pending network messages and forward them. - - for _, peer := range n.orderedPeers() { - peer.mu.Lock() - - for i := len(peer.deferredSentMessages); i > 0; i-- { - dm := peer.deferredSentMessages[0] - peer.deferredSentMessages = peer.deferredSentMessages[1:] - peer.mu.Unlock() - err := dm.callback(true, dm.seq) - if err != nil { - panic(err) - } - n.unblock() - n.waitBlocked() - peer.mu.Lock() - } - - for i := len(peer.messageQ); i > 0; i-- { - if peer.messageQ[0].readyAt > now { - break - } - - msgBytes := peer.messageQ[0].bytes - msgInSeq := peer.inSeq - - peer.inSeq++ - peer.messageQ = peer.messageQ[1:] - - peer.mu.Unlock() - - msgHandler(peer, peer.peer, msgBytes, msgInSeq, 0) - n.unblock() - n.waitBlocked() - peer.mu.Lock() - - } - peer.mu.Unlock() - } - -} -func (n *emulatedNode) onNewRound(round basics.Round, hasParticipationKeys bool) { - // if this is a relay, then we always want to fetch transactions, regardless if we have participation keys. - fetchTransactions := hasParticipationKeys - if n.emulator.scenario.netConfig.nodes[n.nodeIndex].isRelay { - fetchTransactions = true - } - - for i := len(n.txpoolEntries) - 1; i >= 0; i-- { - if n.txpoolEntries[i].Transactions[0].Txn.LastValid < round { - delete(n.txpoolIds, n.txpoolEntries[i].Transactions[0].ID()) - n.expiredTx = append(n.expiredTx, n.txpoolEntries[i]) - n.txpoolEntries = append(n.txpoolEntries[0:i], n.txpoolEntries[i+1:]...) - } - } - - n.externalEvents <- MakeNewRoundEvent(round, fetchTransactions) -} - -func (n *emulatedNode) onNewTransactionPoolEntry() { - n.externalEvents <- MakeTransactionPoolChangeEvent(len(n.txpoolEntries), false) -} - -func (p *networkPeer) GetAddress() string { - return fmt.Sprintf("%d", p.target) -} |