summaryrefslogtreecommitdiff
path: root/txnsync/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'txnsync/peer.go')
-rw-r--r--txnsync/peer.go852
1 files changed, 0 insertions, 852 deletions
diff --git a/txnsync/peer.go b/txnsync/peer.go
deleted file mode 100644
index b33ed0ed1..000000000
--- a/txnsync/peer.go
+++ /dev/null
@@ -1,852 +0,0 @@
-// Copyright (C) 2019-2021 Algorand, Inc.
-// This file is part of go-algorand
-//
-// go-algorand is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// go-algorand is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
-
-package txnsync
-
-import (
- "math"
- "sort"
- "time"
-
- "github.com/algorand/go-algorand/config"
- "github.com/algorand/go-algorand/data/basics"
- "github.com/algorand/go-algorand/data/pooldata"
- "github.com/algorand/go-algorand/data/transactions"
-)
-
-//msgp:ignore peerState
-type peerState int
-
-//msgp:ignore peersOps
-type peersOps int
-
-//msgp:ignore messageConstructionOps
-type messageConstructionOps int
-
-const maxIncomingBloomFilterHistory = 200
-
-// shortTermRecentTransactionsSentBufferLength is the size of the short term storage for the recently sent transaction ids.
-// it should be configured sufficiently high so that any number of transaction sent would not exceed that number before
-// the other peer has a chance of sending a feedback. ( when the feedback is received, we will store these IDs into the long-term cache )
-const shortTermRecentTransactionsSentBufferLength = 5000
-
-// pendingUnconfirmedRemoteMessages is the number of messages we would cache before receiving a feedback from the other
-// peer that these message have been accepted. The general guideline here is that if we have a message every 200ms on one side
-// and a message every 20ms on the other, then the ratio of 200/20 = 10, should be the number of required messages (min).
-const pendingUnconfirmedRemoteMessages = 20
-
-// longTermRecentTransactionsSentBufferLength is the size of the long term transaction id cache.
-const longTermRecentTransactionsSentBufferLength = 15000
-const minDataExchangeRateThreshold = 500 * 1024 // 500KB/s, which is ~3.9Mbps
-const maxDataExchangeRateThreshold = 100 * 1024 * 1024 / 8 // 100Mbps
-const defaultDataExchangeRate = minDataExchangeRateThreshold
-const defaultRelayToRelayDataExchangeRate = 10 * 1024 * 1024 / 8 // 10Mbps
-const bloomFilterRetryCount = 3 // number of bloom filters we would try against each transaction group before skipping it.
-const maxTransactionGroupTrackers = 15 // number of different bloom filter parameters we store before rolling over
-
-const (
- // peerStateStartup is before the timeout for the sending the first message to the peer has reached.
- // for an outgoing peer, it means that an incoming message arrived, and one or more messages need to be sent out.
- peerStateStartup peerState = iota
- // peerStateHoldsoff is set once a message was sent to a peer, and we're holding off before sending additional messages.
- peerStateHoldsoff
- // peerStateInterrupt is set once the holdoff period for the peer have expired.
- peerStateInterrupt
- // peerStateLateBloom is set for outgoing peers on relays, indicating that the next message should be a bloom filter only message.
- peerStateLateBloom
-
- peerOpsSendMessage peersOps = 1
- peerOpsSetInterruptible peersOps = 2
- peerOpsClearInterruptible peersOps = 4
- peerOpsReschedule peersOps = 8
-
- messageConstBloomFilter messageConstructionOps = 1
- messageConstTransactions messageConstructionOps = 2
- messageConstNextMinDelay messageConstructionOps = 4
- messageConstUpdateRequestParams messageConstructionOps = 8
-
- // defaultSignificantMessageThreshold is the minimal transmitted message size which would be used for recalculating the
- // data exchange rate.
- defaultSignificantMessageThreshold = 50000
-)
-
-// incomingBloomFilter stores an incoming bloom filter, along with the associated round number.
-// the round number allow us to prune filters from rounds n-2 and below.
-type incomingBloomFilter struct {
- filter *testableBloomFilter
- round basics.Round
-}
-
-// Peer contains peer-related data which extends the data "known" and managed by the network package.
-type Peer struct {
- // networkPeer is the network package exported peer. It's created on construction and never change afterward.
- networkPeer interface{}
- // isOutgoing defines whether the peer is an outgoing peer or not. For relays, this is meaningful as these have
- // slightly different message timing logic.
- isOutgoing bool
- // significantMessageThreshold is the minimal transmitted message size which would be used for recalculating the
- // data exchange rate. When significantMessageThreshold is equal to math.MaxUint64, no data exchange rate updates would be
- // performed.
- significantMessageThreshold uint64
- // state defines the peer state ( in terms of state machine state ). It's touched only by the sync main state machine
- state peerState
-
- log Logger
-
- // lastRound is the latest round reported by the peer.
- lastRound basics.Round
-
- // incomingMessages contains the incoming messages from this peer. This heap help us to reorder the incoming messages so that
- // we could process them in the tcp-transport order.
- incomingMessages messageOrderingHeap
-
- // nextReceivedMessageSeq is a counter containing the next message sequence number that we expect to see from this peer.
- nextReceivedMessageSeq uint64 // the next message seq that we expect to receive from that peer; implies that all previous messages have been accepted.
-
- // recentIncomingBloomFilters contains the recent list of bloom filters sent from the peer. When considering sending transactions, we check this
- // array to determine if the peer already has this message.
- recentIncomingBloomFilters []incomingBloomFilter
-
- // recentSentTransactions contains the recently sent transactions. It's needed since we don't want to rely on the other peer's bloom filter while
- // sending back-to-back messages.
- recentSentTransactions *transactionCache
- // recentSentTransactionsRound is the round associated with the cache of recently sent transactions. We keep this variable around so that we can
- // flush the cache on every round so that we can give pending transaction another chance of being transmitted.
- recentSentTransactionsRound basics.Round
-
- // these two fields describe "what does that peer asked us to send it"
- requestedTransactionsModulator byte
- requestedTransactionsOffset byte
-
- // lastSentMessageSequenceNumber is the last sequence number of the message that we sent.
- lastSentMessageSequenceNumber uint64
- // lastSentMessageRound is the round the last sent message was sent on. The timestamps are relative to the beginning of the round
- // and therefore need to be evaluated togather.
- lastSentMessageRound basics.Round
- // lastSentMessageTimestamp the timestamp at which the last message was sent.
- lastSentMessageTimestamp time.Duration
- // lastSentMessageSize is the encoded message size of the last sent message
- lastSentMessageSize int
- // lastSentBloomFilter is the last bloom filter that was sent to this peer.
- // This bloom filter could be stale if no bloom filter was included in the last message.
- lastSentBloomFilter bloomFilter
-
- // sentFilterParams records the Round and max txn group counter of the last filter sent to a peer (for each {Modulator,Offset}).
- // From this an efficient next filter can be calculated for just the new txns, or a full filter after a Round turnover.
- sentFilterParams sentFilters
-
- // lastConfirmedMessageSeqReceived is the last message sequence number that was confirmed by the peer to have been accepted.
- lastConfirmedMessageSeqReceived uint64
- lastReceivedMessageLocalRound basics.Round
- lastReceivedMessageTimestamp time.Duration
- lastReceivedMessageSize int
- lastReceivedMessageNextMsgMinDelay time.Duration
-
- // dataExchangeRate is the combined upload/download rate in bytes/second
- dataExchangeRate uint64
- // cachedLatency is the measured network latency of a peer, updated every round
- cachedLatency time.Duration
-
- // these two fields describe "what does the local peer want the remote peer to send back"
- localTransactionsModulator byte
- localTransactionsBaseOffset byte
-
- // lastTransactionSelectionTracker tracks the last transaction group counter that we've evaluated on the selectPendingTransactions method.
- // it used to ensure that on subsequent calls, we won't need to scan the entire pending transactions array from the beginning.
- // the implementation here is breaking it up per request params, so that we can apply the above logic per request params ( i.e. different
- // offset/modulator ), as well as add retry attempts for multiple bloom filters.
- lastTransactionSelectionTracker transactionGroupCounterTracker
-
- // nextStateTimestamp indicates the next timestamp where the peer state would need to be changed.
- // it used to allow sending partial message while retaining the "next-beta time", or, in the case of outgoing relays,
- // its being used to hold when we need to send the last (bloom) message.
- nextStateTimestamp time.Duration
- // messageSeriesPendingTransactions contain the transactions we are sending in the current "message-series". It allows us to pick a given
- // "snapshot" from the transaction pool, and send that "snapshot" to completion before attempting to re-iterate.
- messageSeriesPendingTransactions []pooldata.SignedTxGroup
-
- // transactionPoolAckCh is passed to the transaction handler when incoming transaction arrives. The channel is passed upstream, so that once
- // a transaction is added to the transaction pool, we can get some feedback for that.
- transactionPoolAckCh chan uint64
-
- // transactionPoolAckMessages maintain a list of the recent incoming messages sequence numbers whose transactions were added fully to the transaction
- // pool. This list is being flushed out every time we send a message to the peer.
- transactionPoolAckMessages []uint64
-
- // used by the selectPendingTransactions method, the lastSelectedTransactionsCount contains the number of entries selected on the previous iteration.
- // this value is used to optimize the memory preallocation for the selection IDs array.
- lastSelectedTransactionsCount int
-}
-
-// requestParamsGroupCounterState stores the latest group counters for a given set of request params.
-// we use this to ensure we can have multiple iteration of bloom filter scanning over each individual
-// transaction group. This method allow us to reduce the bloom filter errors while avoid scanning the
-// list of transactions redundently.
-//msgp:ignore transactionGroupCounterState
-type requestParamsGroupCounterState struct {
- offset byte
- modulator byte
- groupCounters [bloomFilterRetryCount]uint64
-}
-
-// transactionGroupCounterTracker manages the group counter state for each request param.
-//msgp:ignore transactionGroupCounterTracker
-type transactionGroupCounterTracker []requestParamsGroupCounterState
-
-// get returns the group counter for a given set of request param.
-func (t *transactionGroupCounterTracker) get(offset, modulator byte) uint64 {
- i := t.index(offset, modulator)
- if i >= 0 {
- return (*t)[i].groupCounters[0]
- }
- return 0
-}
-
-// set updates the group counter for a given set of request param. If no such request
-// param currently exists, it create it.
-func (t *transactionGroupCounterTracker) set(offset, modulator byte, counter uint64) {
- i := t.index(offset, modulator)
- if i >= 0 {
- (*t)[i].groupCounters[0] = counter
- return
- }
- // if it doesn't exists -
- state := requestParamsGroupCounterState{
- offset: offset,
- modulator: modulator,
- }
- state.groupCounters[0] = counter
-
- if len(*t) == maxTransactionGroupTrackers {
- // shift all entries by one.
- copy((*t)[0:], (*t)[1:])
- (*t)[maxTransactionGroupTrackers-1] = state
- } else {
- *t = append(*t, state)
- }
-}
-
-// roll the counters for a given requests params, so that we would go back and
-// rescan some of the previous transaction groups ( but not all !) when selectPendingTransactions is called.
-func (t *transactionGroupCounterTracker) roll(offset, modulator byte) {
- i := t.index(offset, modulator)
- if i < 0 {
- return
- }
-
- if (*t)[i].groupCounters[1] >= (*t)[i].groupCounters[0] {
- return
- }
- firstGroupCounter := (*t)[i].groupCounters[0]
- copy((*t)[i].groupCounters[0:], (*t)[i].groupCounters[1:])
- (*t)[i].groupCounters[bloomFilterRetryCount-1] = firstGroupCounter
-}
-
-// index is a helper method for the transactionGroupCounterTracker, helping to locate the index of
-// a requestParamsGroupCounterState in the array that matches the provided request params. The method
-// uses a linear search, which works best against small arrays.
-func (t *transactionGroupCounterTracker) index(offset, modulator byte) int {
- for i, counter := range *t {
- if counter.offset == offset && counter.modulator == modulator {
- return i
- }
- }
- return -1
-}
-
-func makePeer(networkPeer interface{}, isOutgoing bool, isLocalNodeRelay bool, cfg *config.Local, log Logger, latency time.Duration) *Peer {
- p := &Peer{
- networkPeer: networkPeer,
- isOutgoing: isOutgoing,
- recentSentTransactions: makeTransactionCache(shortTermRecentTransactionsSentBufferLength, longTermRecentTransactionsSentBufferLength, pendingUnconfirmedRemoteMessages),
- dataExchangeRate: defaultDataExchangeRate,
- cachedLatency: latency,
- transactionPoolAckCh: make(chan uint64, maxAcceptedMsgSeq),
- transactionPoolAckMessages: make([]uint64, 0, maxAcceptedMsgSeq),
- significantMessageThreshold: defaultSignificantMessageThreshold,
- log: log,
- }
- if isLocalNodeRelay {
- p.requestedTransactionsModulator = 1
- p.dataExchangeRate = defaultRelayToRelayDataExchangeRate
- }
- if cfg.TransactionSyncDataExchangeRate > 0 {
- p.dataExchangeRate = cfg.TransactionSyncDataExchangeRate
- p.significantMessageThreshold = math.MaxUint64
- }
- if cfg.TransactionSyncSignificantMessageThreshold > 0 && cfg.TransactionSyncDataExchangeRate == 0 {
- p.significantMessageThreshold = cfg.TransactionSyncSignificantMessageThreshold
- }
- // increase the number of total created peers.
- txsyncCreatedPeersTotal.Inc(nil)
- return p
-}
-
-// GetNetworkPeer returns the network peer associated with this particular peer.
-func (p *Peer) GetNetworkPeer() interface{} {
- return p.networkPeer
-}
-
-// GetTransactionPoolAckChannel returns the transaction pool ack channel
-func (p *Peer) GetTransactionPoolAckChannel() chan uint64 {
- return p.transactionPoolAckCh
-}
-
-// dequeuePendingTransactionPoolAckMessages removed the pending entries from transactionPoolAckCh and add them to transactionPoolAckMessages
-func (p *Peer) dequeuePendingTransactionPoolAckMessages() {
- for {
- select {
- case msgSeq := <-p.transactionPoolAckCh:
- if len(p.transactionPoolAckMessages) == maxAcceptedMsgSeq {
- p.transactionPoolAckMessages = append(p.transactionPoolAckMessages[1:], msgSeq)
- } else {
- p.transactionPoolAckMessages = append(p.transactionPoolAckMessages, msgSeq)
- }
- default:
- return
- }
- }
-}
-
-// outgoing related methods :
-
-// getAcceptedMessages returns the content of the transactionPoolAckMessages and clear the existing buffer.
-func (p *Peer) getAcceptedMessages() []uint64 {
- p.dequeuePendingTransactionPoolAckMessages()
- acceptedMessages := p.transactionPoolAckMessages
- p.transactionPoolAckMessages = make([]uint64, 0, maxAcceptedMsgSeq)
- return acceptedMessages
-}
-
-func (p *Peer) selectPendingTransactions(pendingTransactions []pooldata.SignedTxGroup, sendWindow time.Duration, round basics.Round, bloomFilterSize int) (selectedTxns []pooldata.SignedTxGroup, selectedTxnIDs []transactions.Txid, partialTransactionsSet bool) {
- // if peer is too far back, don't send it any transactions ( or if the peer is not interested in transactions )
- if p.lastRound < round.SubSaturate(1) || p.requestedTransactionsModulator == 0 {
- return nil, nil, false
- }
-
- if len(p.messageSeriesPendingTransactions) > 0 {
- pendingTransactions = p.messageSeriesPendingTransactions
- }
-
- if len(pendingTransactions) == 0 {
- return nil, nil, false
- }
-
- // flush the recent sent transaction cache on the beginning of a new round to give pending transactions another
- // chance of being transmitted.
- if p.recentSentTransactionsRound != round {
- p.recentSentTransactions.reset()
- p.recentSentTransactionsRound = round
- }
-
- windowLengthBytes := int(uint64(sendWindow) * p.dataExchangeRate / uint64(time.Second))
- windowLengthBytes -= bloomFilterSize
-
- accumulatedSize := 0
-
- lastTransactionSelectionGroupCounter := p.lastTransactionSelectionTracker.get(p.requestedTransactionsOffset, p.requestedTransactionsModulator)
-
- startIndex := sort.Search(len(pendingTransactions), func(i int) bool {
- return pendingTransactions[i].GroupCounter >= lastTransactionSelectionGroupCounter
- })
-
- selectedIDsSliceLength := len(pendingTransactions) - startIndex
- if selectedIDsSliceLength > p.lastSelectedTransactionsCount*2 {
- selectedIDsSliceLength = p.lastSelectedTransactionsCount * 2
- }
- selectedTxnIDs = make([]transactions.Txid, 0, selectedIDsSliceLength)
- selectedTxns = make([]pooldata.SignedTxGroup, 0, selectedIDsSliceLength)
-
- windowSizedReached := false
- hasMorePendingTransactions := false
-
- // create a list of all the bloom filters that might need to be tested. This list excludes bloom filters
- // which has the same modulator and a different offset.
- var effectiveBloomFilters []int
- effectiveBloomFilters = make([]int, 0, len(p.recentIncomingBloomFilters))
- for filterIdx := len(p.recentIncomingBloomFilters) - 1; filterIdx >= 0; filterIdx-- {
- if p.recentIncomingBloomFilters[filterIdx].filter == nil {
- continue
- }
- if p.recentIncomingBloomFilters[filterIdx].filter.encodingParams.Modulator != p.requestedTransactionsModulator || p.recentIncomingBloomFilters[filterIdx].filter.encodingParams.Offset != p.requestedTransactionsOffset {
- continue
- }
- effectiveBloomFilters = append(effectiveBloomFilters, filterIdx)
- }
-
- // removedTxn := 0
- grpIdx := startIndex
-scanLoop:
- for ; grpIdx < len(pendingTransactions); grpIdx++ {
- txID := pendingTransactions[grpIdx].GroupTransactionID
-
- // check if the peer would be interested in these messages -
- if p.requestedTransactionsModulator > 1 {
- if txidToUint64(txID)%uint64(p.requestedTransactionsModulator) != uint64(p.requestedTransactionsOffset) {
- continue
- }
- }
-
- // filter out transactions that we already previously sent.
- if p.recentSentTransactions.contained(txID) {
- // we already sent that transaction. no need to send again.
- continue
- }
-
- // check if the peer already received these messages from a different source other than us.
- for _, filterIdx := range effectiveBloomFilters {
- if p.recentIncomingBloomFilters[filterIdx].filter.test(txID) {
- // removedTxn++
- continue scanLoop
- }
- }
-
- if windowSizedReached {
- hasMorePendingTransactions = true
- break
- }
- selectedTxns = append(selectedTxns, pendingTransactions[grpIdx])
- selectedTxnIDs = append(selectedTxnIDs, txID)
-
- // add the size of the transaction group
- accumulatedSize += pendingTransactions[grpIdx].EncodedLength
-
- if accumulatedSize > windowLengthBytes {
- windowSizedReached = true
- }
- }
-
- p.lastSelectedTransactionsCount = len(selectedTxnIDs)
-
- // if we've over-allocated, resize the buffer; This becomes important on relays,
- // as storing these arrays can consume considerable amount of memory.
- if len(selectedTxnIDs)*2 < cap(selectedTxnIDs) {
- exactBuffer := make([]transactions.Txid, len(selectedTxnIDs))
- copy(exactBuffer, selectedTxnIDs)
- selectedTxnIDs = exactBuffer
- }
-
- // update the lastTransactionSelectionGroupCounter if needed -
- // if we selected any transaction to be sent, update the lastTransactionSelectionGroupCounter with the latest
- // group counter. If the startIndex was *after* the last pending transaction, it means that we don't
- // need to update the lastTransactionSelectionGroupCounter since it's already ahead of everything in the pending transactions.
- if grpIdx >= 0 && startIndex < len(pendingTransactions) {
- if grpIdx == len(pendingTransactions) {
- if grpIdx > 0 {
- p.lastTransactionSelectionTracker.set(p.requestedTransactionsOffset, p.requestedTransactionsModulator, pendingTransactions[grpIdx-1].GroupCounter+1)
- }
- } else {
- p.lastTransactionSelectionTracker.set(p.requestedTransactionsOffset, p.requestedTransactionsModulator, pendingTransactions[grpIdx].GroupCounter)
- }
- }
-
- if !hasMorePendingTransactions {
- // we're done with the current sequence.
- p.messageSeriesPendingTransactions = nil
- }
-
- // fmt.Printf("selectPendingTransactions : selected %d transactions, %d not needed and aborted after exceeding data length %d/%d more = %v\n", len(selectedTxnIDs), removedTxn, accumulatedSize, windowLengthBytes, hasMorePendingTransactions)
-
- return selectedTxns, selectedTxnIDs, hasMorePendingTransactions
-}
-
-// getLocalRequestParams returns the local requests params
-func (p *Peer) getLocalRequestParams() (offset, modulator byte) {
- return p.localTransactionsBaseOffset, p.localTransactionsModulator
-}
-
-// update the peer once the message was sent successfully.
-func (p *Peer) updateMessageSent(txMsg *transactionBlockMessage, selectedTxnIDs []transactions.Txid, timestamp time.Duration, sequenceNumber uint64, messageSize int) {
- p.recentSentTransactions.addSlice(selectedTxnIDs, sequenceNumber, timestamp)
- p.lastSentMessageSequenceNumber = sequenceNumber
- p.lastSentMessageRound = txMsg.Round
- p.lastSentMessageTimestamp = timestamp
- p.lastSentMessageSize = messageSize
-}
-
-// update the peer's lastSentBloomFilter.
-func (p *Peer) updateSentBoomFilter(filter bloomFilter, round basics.Round) {
- if filter.encodedLength > 0 {
- p.lastSentBloomFilter = filter
- p.sentFilterParams.setSentFilter(filter, round)
- }
-}
-
-// setLocalRequestParams stores the peer request params.
-func (p *Peer) setLocalRequestParams(offset, modulator uint64) {
- if modulator > 255 {
- modulator = 255
- }
- p.localTransactionsModulator = byte(modulator)
- if modulator != 0 {
- p.localTransactionsBaseOffset = byte(offset % modulator)
- }
-}
-
-// peers array functions
-
-// incomingPeersOnly scan the input peers array and return a subset of the peers that are incoming peers.
-func incomingPeersOnly(peers []*Peer) (incomingPeers []*Peer) {
- incomingPeers = make([]*Peer, 0, len(peers))
- for _, peer := range peers {
- if !peer.isOutgoing {
- incomingPeers = append(incomingPeers, peer)
- }
- }
- return
-}
-
-// incoming related functions
-
-// addIncomingBloomFilter keeps the most recent {maxIncomingBloomFilterHistory} filters
-func (p *Peer) addIncomingBloomFilter(round basics.Round, incomingFilter *testableBloomFilter, currentRound basics.Round) {
- minRound := currentRound.SubSaturate(2)
- if round < minRound {
- // ignore data from the past
- return
- }
- bf := incomingBloomFilter{
- round: round,
- filter: incomingFilter,
- }
- elemOk := func(i int) bool {
- ribf := p.recentIncomingBloomFilters[i]
- if ribf.filter == nil {
- return false
- }
- if ribf.round < minRound {
- return false
- }
- if incomingFilter.clearPrevious && ribf.filter.encodingParams.Offset == incomingFilter.encodingParams.Offset && ribf.filter.encodingParams.Modulator == incomingFilter.encodingParams.Modulator {
- return false
- }
- return true
- }
- // compact the prior list to the front of the array.
- // order doesn't matter.
- pos := 0
- last := len(p.recentIncomingBloomFilters) - 1
- oldestRound := currentRound + 1
- firstOfOldest := -1
- for pos <= last {
- if elemOk(pos) {
- if p.recentIncomingBloomFilters[pos].round < oldestRound {
- oldestRound = p.recentIncomingBloomFilters[pos].round
- firstOfOldest = pos
- }
- pos++
- continue
- }
- p.recentIncomingBloomFilters[pos] = p.recentIncomingBloomFilters[last]
- p.recentIncomingBloomFilters[last].filter = nil // GC
- last--
- }
- p.recentIncomingBloomFilters = p.recentIncomingBloomFilters[:last+1]
- // Simple case: append
- if last+1 < maxIncomingBloomFilterHistory {
- p.recentIncomingBloomFilters = append(p.recentIncomingBloomFilters, bf)
- return
- }
- // Too much traffic case: replace the first thing we find of the oldest round
- if firstOfOldest >= 0 {
- p.recentIncomingBloomFilters[firstOfOldest] = bf
- return
- }
- // This line should be unreachable, but putting in an error log to test that assumption.
- p.log.Error("addIncomingBloomFilter failed to trim p.recentIncomingBloomFilters (new filter lost)")
-}
-
-func (p *Peer) updateRequestParams(modulator, offset byte) {
- p.requestedTransactionsModulator = modulator
- p.requestedTransactionsOffset = offset
-}
-
-// update the recentSentTransactions with the incoming transaction groups. This would prevent us from sending the received transactions back to the
-// peer that sent it to us. This comes in addition to the bloom filter, if being sent by the other peer.
-func (p *Peer) updateIncomingTransactionGroups(txnGroups []pooldata.SignedTxGroup) {
- for _, txnGroup := range txnGroups {
- if len(txnGroup.Transactions) > 0 {
- // The GroupTransactionID field is not yet updated, so we'll be calculating it's value here and passing it.
- p.recentSentTransactions.add(txnGroup.Transactions.ID())
- }
- }
-}
-
-func (p *Peer) updateIncomingMessageTiming(timings timingParams, currentRound basics.Round, currentTime time.Duration, timeInQueue time.Duration, peerLatency time.Duration, incomingMessageSize int) {
- p.lastConfirmedMessageSeqReceived = timings.RefTxnBlockMsgSeq
- // if we received a message that references our previous message, see if they occurred on the same round
- if p.lastConfirmedMessageSeqReceived == p.lastSentMessageSequenceNumber && p.lastSentMessageRound == currentRound && p.lastSentMessageTimestamp > 0 {
- // if so, we might be able to calculate the bandwidth.
- timeSinceLastMessageWasSent := currentTime - timeInQueue - p.lastSentMessageTimestamp
- networkMessageSize := uint64(p.lastSentMessageSize + incomingMessageSize)
- if timings.ResponseElapsedTime != 0 && peerLatency > 0 && timeSinceLastMessageWasSent > time.Duration(timings.ResponseElapsedTime)+peerLatency && networkMessageSize >= p.significantMessageThreshold {
- networkTrasmitTime := timeSinceLastMessageWasSent - time.Duration(timings.ResponseElapsedTime) - peerLatency
- dataExchangeRate := uint64(time.Second) * networkMessageSize / uint64(networkTrasmitTime)
-
- // clamp data exchange rate to realistic metrics
- if dataExchangeRate < minDataExchangeRateThreshold {
- dataExchangeRate = minDataExchangeRateThreshold
- } else if dataExchangeRate > maxDataExchangeRateThreshold {
- dataExchangeRate = maxDataExchangeRateThreshold
- }
- // fmt.Printf("incoming message : updating data exchange to %d; network msg size = %d+%d, transmit time = %v\n", dataExchangeRate, p.lastSentMessageSize, incomingMessageSize, networkTrasmitTime)
- p.dataExchangeRate = dataExchangeRate
- }
-
- // given that we've (maybe) updated the data exchange rate, we need to clear out the lastSendMessage information
- // so we won't use that again on a subsequent incoming message.
- p.lastSentMessageSequenceNumber = 0
- p.lastSentMessageRound = 0
- p.lastSentMessageTimestamp = 0
- p.lastSentMessageSize = 0
- }
- p.lastReceivedMessageLocalRound = currentRound
- p.lastReceivedMessageTimestamp = currentTime - timeInQueue
- p.lastReceivedMessageSize = incomingMessageSize
- p.lastReceivedMessageNextMsgMinDelay = time.Duration(timings.NextMsgMinDelay) * time.Nanosecond
- p.recentSentTransactions.acknowledge(timings.AcceptedMsgSeq)
-}
-
-// advancePeerState is called when a peer schedule arrives, before we're doing any operation.
-// The method would determine whether a message need to be sent, and adjust the peer state
-// accordingly.
-func (p *Peer) advancePeerState(currenTime time.Duration, isRelay bool) (ops peersOps) {
- if isRelay {
- if p.isOutgoing {
- // outgoing peers are "special", as they respond to messages rather then generating their own.
- // we need to figure the special state needed for "late bloom filter message"
- switch p.state {
- case peerStateStartup:
- p.nextStateTimestamp = currenTime + p.lastReceivedMessageNextMsgMinDelay
- messagesCount := p.lastReceivedMessageNextMsgMinDelay / messageTimeWindow
- if messagesCount <= 2 {
- // we have time to send only a single message. This message need to include both transactions and bloom filter.
- p.state = peerStateLateBloom
- } else {
- // we have enough time to send multiple messages, make the first n-1 message have no bloom filter, and have the last one
- // include a bloom filter.
- p.state = peerStateHoldsoff
- }
-
- // send a message
- ops |= peerOpsSendMessage
- case peerStateHoldsoff:
- // calculate how more messages we can send ( if needed )
- messagesCount := (p.nextStateTimestamp - currenTime) / messageTimeWindow
- if messagesCount <= 2 {
- // we have time to send only a single message. This message need to include both transactions and bloom filter.
- p.state = peerStateLateBloom
- }
-
- // send a message
- ops |= peerOpsSendMessage
-
- // the rescehduling would be done in the sendMessageLoop, since we need to know if additional messages are needed.
- case peerStateLateBloom:
- // send a message
- ops |= peerOpsSendMessage
-
- default:
- // this isn't expected, so we can just ignore this.
- // todo : log
- }
- } else {
- // non-outgoing
- switch p.state {
- case peerStateStartup:
- p.state = peerStateHoldsoff
- fallthrough
- case peerStateHoldsoff:
- // prepare the send message array.
- ops |= peerOpsSendMessage
- default: // peerStateInterrupt & peerStateLateBloom
- // this isn't expected, so we can just ignore this.
- // todo : log
- }
- }
- } else {
- switch p.state {
- case peerStateStartup:
- p.state = peerStateHoldsoff
- ops |= peerOpsSendMessage
-
- case peerStateHoldsoff:
- if p.nextStateTimestamp == 0 {
- p.state = peerStateInterrupt
- ops |= peerOpsSetInterruptible | peerOpsReschedule
- } else {
- ops |= peerOpsSendMessage
- }
-
- case peerStateInterrupt:
- p.state = peerStateHoldsoff
- ops |= peerOpsSendMessage | peerOpsClearInterruptible
-
- default: // peerStateLateBloom
- // this isn't expected, so we can just ignore this.
- // todo : log
- }
- }
- return ops
-}
-
-// getMessageConstructionOps constructs the messageConstructionOps that would be needed when
-// sending a message back to the peer. The two arguments are:
-// - isRelay defines whether the local node is a relay.
-// - fetchTransactions defines whether the local node is interested in receiving transactions from
-// the peer ( this is essentially allow us to skip receiving transactions for non-relays that aren't going
-// to make any proposals )
-func (p *Peer) getMessageConstructionOps(isRelay bool, fetchTransactions bool) (ops messageConstructionOps) {
- // on outgoing peers of relays, we want have some custom logic.
- if isRelay {
- if p.isOutgoing {
- switch p.state {
- case peerStateLateBloom:
- if p.localTransactionsModulator != 0 {
- ops |= messageConstBloomFilter
- }
- case peerStateHoldsoff:
- ops |= messageConstTransactions
- }
- } else {
- if p.requestedTransactionsModulator != 0 {
- ops |= messageConstTransactions
- if p.nextStateTimestamp == 0 && p.localTransactionsModulator != 0 {
- ops |= messageConstBloomFilter
- }
- }
- if p.nextStateTimestamp == 0 {
- ops |= messageConstNextMinDelay
- }
- }
- ops |= messageConstUpdateRequestParams
- } else {
- ops |= messageConstTransactions // send transactions to the other peer
- if fetchTransactions {
- switch p.localTransactionsModulator {
- case 0:
- // don't send bloom filter.
- case 1:
- // special optimization if we have just one relay that we're connected to:
- // generate the bloom filter only once per 2*beta message.
- // this would reduce the number of unneeded bloom filters generation dramatically.
- // that single relay would know which messages it previously sent us, and would refrain from
- // sending these again.
- if p.nextStateTimestamp == 0 {
- ops |= messageConstBloomFilter
- }
- default:
- ops |= messageConstBloomFilter
- }
- ops |= messageConstUpdateRequestParams
- }
- }
- return ops
-}
-
-// getNextScheduleOffset is called after a message was sent to the peer, and we need to evaluate the next
-// scheduling time.
-func (p *Peer) getNextScheduleOffset(isRelay bool, beta time.Duration, partialMessage bool, currentTime time.Duration) (offset time.Duration, ops peersOps) {
- if partialMessage {
- if isRelay {
- if p.isOutgoing {
- if p.state == peerStateHoldsoff {
- // we have enough time to send another message.
- return messageTimeWindow, peerOpsReschedule
- }
- } else {
- // a partial message was sent to an incoming peer
- if p.nextStateTimestamp > time.Duration(0) {
- if currentTime+messageTimeWindow*2 < p.nextStateTimestamp {
- // we have enough time to send another message
- return messageTimeWindow, peerOpsReschedule
- }
- // we don't have enough time to send another message.
- next := p.nextStateTimestamp
- p.nextStateTimestamp = 0
- return next - currentTime, peerOpsReschedule
- }
- p.nextStateTimestamp = currentTime + 2*beta
- return messageTimeWindow, peerOpsReschedule
- }
- } else {
- if p.nextStateTimestamp > time.Duration(0) {
- if currentTime+messageTimeWindow*2 < p.nextStateTimestamp {
- // we have enough time to send another message
- return messageTimeWindow, peerOpsReschedule
- }
- // we don't have enough time, so don't get into "interrupt" state,
- // since we're already sending messages.
- next := p.nextStateTimestamp
- p.nextStateTimestamp = 0
- p.messageSeriesPendingTransactions = nil
- // move to the next state.
- p.state = peerStateHoldsoff
- return next - currentTime, peerOpsReschedule | peerOpsClearInterruptible
-
- }
- // this is the first message
- p.nextStateTimestamp = currentTime + 2*beta
-
- return messageTimeWindow, peerOpsReschedule
- }
- } else {
- if isRelay {
- if p.isOutgoing {
- if p.state == peerStateHoldsoff {
- // even that we're done now, we need to send another message that would contain the bloom filter
- p.state = peerStateLateBloom
-
- bloomMessageExtrapolatedSendingTime := messageTimeWindow
- // try to improve the sending time by using the last sent bloom filter as the expected message size.
- if p.lastSentBloomFilter.containedTxnsRange.transactionsCount > 0 {
- lastBloomFilterSize := uint64(p.lastSentBloomFilter.encodedLength)
- bloomMessageExtrapolatedSendingTime = time.Duration(lastBloomFilterSize * p.dataExchangeRate)
- }
-
- next := p.nextStateTimestamp - bloomMessageExtrapolatedSendingTime - currentTime
- p.nextStateTimestamp = 0
- return next, peerOpsReschedule
- }
- p.nextStateTimestamp = 0
- } else {
- // we sent a message to an incoming connection. No more data to send.
- if p.nextStateTimestamp > time.Duration(0) {
- next := p.nextStateTimestamp
- p.nextStateTimestamp = 0
- return next - currentTime, peerOpsReschedule
- }
- p.nextStateTimestamp = 0
- return beta * 2, peerOpsReschedule
- }
- } else {
- if p.nextStateTimestamp > time.Duration(0) {
- next := p.nextStateTimestamp
- p.nextStateTimestamp = 0
- return next - currentTime, peerOpsReschedule
- }
- return beta, peerOpsReschedule
- }
- }
- return time.Duration(0), 0
-}
-
-func (p *Peer) networkAddress() string {
- if peerAddress, supportInterface := p.networkPeer.(networkPeerAddress); supportInterface {
- return peerAddress.GetAddress()
- }
- return ""
-}