summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Lee <64482439+algojohnlee@users.noreply.github.com>2021-10-26 09:34:42 -0400
committerGitHub <noreply@github.com>2021-10-26 09:34:42 -0400
commit378816d0439cf4cf2647952eec68a4e24ad6fa06 (patch)
tree58de9505cfa4e951e0583fe62eaa5dd2beca5f25
parent53cf0134c131bbbe51e689a149494cb90890a4b4 (diff)
parent615fee293d10582a886029a95974806a3d7bdcd6 (diff)
Merge pull request #3139 from algorand/rel/beta-3.1.3
go-algorand v3.1.3-beta
-rw-r--r--buildnumber.dat2
-rw-r--r--catchup/fetcher_test.go6
-rw-r--r--catchup/peerSelector_test.go5
-rw-r--r--catchup/service.go3
-rw-r--r--config/config.go4
-rw-r--r--config/local_defaults.go4
-rw-r--r--data/txHandler.go2
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--installer/config.json.example4
-rw-r--r--network/latencyTracker.go170
-rw-r--r--network/latencyTracker_test.go121
-rw-r--r--network/ping.go70
-rw-r--r--network/ping_test.go74
-rw-r--r--network/wsNetwork.go85
-rw-r--r--network/wsNetwork_test.go6
-rw-r--r--network/wsPeer.go69
-rw-r--r--node/txnSyncConn.go7
-rw-r--r--rpcs/blockService_test.go5
-rw-r--r--test/testdata/configs/config-v18.json96
-rw-r--r--txnsync/bloomFilter_test.go6
-rw-r--r--txnsync/emulatorNode_test.go6
-rw-r--r--txnsync/incoming.go123
-rw-r--r--txnsync/incomingMsgQ.go372
-rw-r--r--txnsync/incomingMsgQ_test.go158
-rw-r--r--txnsync/incoming_test.go56
-rw-r--r--txnsync/interfaces.go5
-rw-r--r--txnsync/mainloop.go12
-rw-r--r--txnsync/outgoing.go36
-rw-r--r--txnsync/outgoing_test.go23
-rw-r--r--txnsync/peer.go19
-rw-r--r--txnsync/peer_test.go55
-rw-r--r--txnsync/service_test.go8
33 files changed, 1178 insertions, 440 deletions
diff --git a/buildnumber.dat b/buildnumber.dat
index 0cfbf0888..00750edc0 100644
--- a/buildnumber.dat
+++ b/buildnumber.dat
@@ -1 +1 @@
-2
+3
diff --git a/catchup/fetcher_test.go b/catchup/fetcher_test.go
index 13b0c7fe2..4d62fdde0 100644
--- a/catchup/fetcher_test.go
+++ b/catchup/fetcher_test.go
@@ -23,6 +23,7 @@ import (
"net/url"
"strings"
"testing"
+ "time"
"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
@@ -291,6 +292,11 @@ func (p *testUnicastPeer) IsOutgoing() bool {
return false
}
+// GetConnectionLatency returns the connection latency between the local node and this peer.
+func (p *testUnicastPeer) GetConnectionLatency() time.Duration {
+ return time.Duration(0)
+}
+
func (p *testUnicastPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, callback network.UnicastWebsocketMessageStateCallback) error {
ps := p.gn.(*httpTestPeerSource)
var dispather network.MessageHandler
diff --git a/catchup/peerSelector_test.go b/catchup/peerSelector_test.go
index 0147e1988..02d759b20 100644
--- a/catchup/peerSelector_test.go
+++ b/catchup/peerSelector_test.go
@@ -66,6 +66,11 @@ func (d *mockUnicastPeer) IsOutgoing() bool {
return false
}
+// GetConnectionLatency returns the connection latency between the local node and this peer.
+func (d *mockUnicastPeer) GetConnectionLatency() time.Duration {
+ return time.Duration(0)
+}
+
func TestPeerAddress(t *testing.T) {
partitiontest.PartitionTest(t)
diff --git a/catchup/service.go b/catchup/service.go
index 211b31608..f8f92b9b6 100644
--- a/catchup/service.go
+++ b/catchup/service.go
@@ -324,6 +324,9 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
if err != nil {
switch err.(type) {
+ case ledgercore.ErrNonSequentialBlockEval:
+ s.log.Infof("fetchAndWrite(%d): no need to re-evaluate historical block", r)
+ return true
case ledgercore.BlockInLedgerError:
s.log.Infof("fetchAndWrite(%d): block already in ledger", r)
return true
diff --git a/config/config.go b/config/config.go
index 71eaba88d..781ab73a8 100644
--- a/config/config.go
+++ b/config/config.go
@@ -63,7 +63,7 @@ type Local struct {
// Version tracks the current version of the defaults so we can migrate old -> new
// This is specifically important whenever we decide to change the default value
// for an existing parameter. This field tag must be updated any time we add a new version.
- Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17"`
+ Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18"`
// environmental (may be overridden)
// When enabled, stores blocks indefinitally, otherwise, only the most recents blocks
@@ -84,7 +84,7 @@ type Local struct {
MaxConnectionsPerIP int `version[3]:"30"`
// 0 == disable
- PeerPingPeriodSeconds int `version[0]:"0"`
+ PeerPingPeriodSeconds int `version[0]:"0" version[18]:"10"`
// for https serving
TLSCertFile string `version[0]:""`
diff --git a/config/local_defaults.go b/config/local_defaults.go
index c7a986bd7..de8090339 100644
--- a/config/local_defaults.go
+++ b/config/local_defaults.go
@@ -20,7 +20,7 @@
package config
var defaultLocal = Local{
- Version: 17,
+ Version: 18,
AccountUpdatesStatsInterval: 5000000000,
AccountsRebuildSynchronousMode: 1,
AnnounceParticipationKey: true,
@@ -92,7 +92,7 @@ var defaultLocal = Local{
OutgoingMessageFilterBucketSize: 128,
ParticipationKeysRefreshInterval: 60000000000,
PeerConnectionsUpdateInterval: 3600,
- PeerPingPeriodSeconds: 0,
+ PeerPingPeriodSeconds: 10,
PriorityPeers: map[string]bool{},
PublicAddress: "",
ReconnectTime: 60000000000,
diff --git a/data/txHandler.go b/data/txHandler.go
index 48fe7f7b2..0ecef433c 100644
--- a/data/txHandler.go
+++ b/data/txHandler.go
@@ -538,7 +538,7 @@ func (handler *solicitedAsyncTxHandler) loop(ctx context.Context) {
// We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings
err := handler.txHandler.net.Relay(ctx, protocol.TxnTag, reencode(txnGroup.Transactions), false, groups.networkPeer)
if err != nil {
- logging.Base().Infof("solicitedAsyncTxHandler was unable to relay transaction message : %v")
+ logging.Base().Infof("solicitedAsyncTxHandler was unable to relay transaction message : %v", err)
break
}
}
diff --git a/go.mod b/go.mod
index 2ecfb9903..eb84f8d10 100644
--- a/go.mod
+++ b/go.mod
@@ -8,7 +8,7 @@ require (
github.com/algorand/graphtrace v0.0.0-20201117160756-e524ed1a6f64
github.com/algorand/msgp v1.1.48
github.com/algorand/oapi-codegen v1.3.5-algorand5
- github.com/algorand/websocket v1.4.2
+ github.com/algorand/websocket v1.4.3
github.com/algorand/xorfilter v0.2.0
github.com/aws/aws-sdk-go v1.16.5
github.com/chrismcguire/gobberish v0.0.0-20150821175641-1d8adb509a0e
diff --git a/go.sum b/go.sum
index c5e86d8ce..d86dba456 100644
--- a/go.sum
+++ b/go.sum
@@ -10,8 +10,8 @@ github.com/algorand/msgp v1.1.48 h1:5P+gVmTnk0m37r+rA3ZsFZW219ZqmCLulW5f8Z+3nx8=
github.com/algorand/msgp v1.1.48/go.mod h1:LtOntbYiCHj/Sl/Sqxtf8CZOrDt2a8Dv3tLaS6mcnUE=
github.com/algorand/oapi-codegen v1.3.5-algorand5 h1:y576Ca2/guQddQrQA7dtL5KcOx5xQgPeIupiuFMGyCI=
github.com/algorand/oapi-codegen v1.3.5-algorand5/go.mod h1:/k0Ywn0lnt92uBMyE+yiRf/Wo3/chxHHsAfenD09EbY=
-github.com/algorand/websocket v1.4.2 h1:zMB7ukz+c7tcef8rVqmKQTv6KQtxXtCFuiAqKaE7n9I=
-github.com/algorand/websocket v1.4.2/go.mod h1:0nFSn+xppw/GZS9hgWPS3b8/4FcA3Pj7XQxm+wqHGx8=
+github.com/algorand/websocket v1.4.3 h1:8YiA+ZtwqAyg0K30lQyl7gUdKUArYXvBtd/cTFwA4uQ=
+github.com/algorand/websocket v1.4.3/go.mod h1:0nFSn+xppw/GZS9hgWPS3b8/4FcA3Pj7XQxm+wqHGx8=
github.com/algorand/xorfilter v0.2.0 h1:YC31ANxdZ2jmtbwqv1+USskVSqjkeiRZcQGc6//ro9Q=
github.com/algorand/xorfilter v0.2.0/go.mod h1:f5cJsYrFbJhXkbjnV4odJB44np05/PvwvdBnABnQoUs=
github.com/aws/aws-sdk-go v1.16.5 h1:NVxzZXIuwX828VcJrpNxxWjur1tlOBISdMdDdHIKHcc=
diff --git a/installer/config.json.example b/installer/config.json.example
index aa80a6dad..b1a977e64 100644
--- a/installer/config.json.example
+++ b/installer/config.json.example
@@ -1,5 +1,5 @@
{
- "Version": 17,
+ "Version": 18,
"AccountUpdatesStatsInterval": 5000000000,
"AccountsRebuildSynchronousMode": 1,
"AnnounceParticipationKey": true,
@@ -71,7 +71,7 @@
"OutgoingMessageFilterBucketSize": 128,
"ParticipationKeysRefreshInterval": 60000000000,
"PeerConnectionsUpdateInterval": 3600,
- "PeerPingPeriodSeconds": 0,
+ "PeerPingPeriodSeconds": 10,
"PriorityPeers": {},
"PublicAddress": "",
"ReconnectTime": 60000000000,
diff --git a/network/latencyTracker.go b/network/latencyTracker.go
new file mode 100644
index 000000000..ff503ddb2
--- /dev/null
+++ b/network/latencyTracker.go
@@ -0,0 +1,170 @@
+// Copyright (C) 2019-2021 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "errors"
+ "net"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/algorand/websocket"
+
+ "github.com/algorand/go-deadlock"
+
+ "github.com/algorand/go-algorand/config"
+)
+
+const pongMessageWriteDuration = time.Second
+const pingMessageWriteDuration = time.Second
+
+var errInvalidPongMessageContent = errors.New("invalid pong message content")
+var errInvalidPingMessageContent = errors.New("invalid ping message content")
+
+// latencyTracker works in conjunction with the wspeer in measuring the
+// communication latency over the websocket connection.
+type latencyTracker struct {
+ // receivedPacketCounter is a counter for all incoming messages
+ // placed here to be aligned with 64bit address.
+ receivedPacketCounter uint64
+
+ // latency is the effective latency of the connection.
+ // placed here to be aligned with 64bit address.
+ latency int64
+
+ // lastPingSentTime is the timestamp at which we last sent a message.
+ // this variable is only touched by checkPingSending, and therefore doesn't
+ // need to be syncronized. The "clone" of this variable lastPingSentTimeSynced,
+ // is being used by both the checkPingSending as well as by the pongHandler
+ // and therefore require synchronization.
+ lastPingSentTime int64
+
+ // static variables
+ // ( doesn't get changed after init, hence, no synchronization needed )
+
+ // conn is the underlying connection object.
+ conn wsPeerWebsocketConn
+
+ // enabled indicates whether the pingpong is currently enabled or not.
+ enabled bool
+
+ // pingInterval is the max interval at which the client would send ping messages.
+ pingInterval time.Duration
+
+ // lastPingMu synchronize the protected variables that might be modified across
+ // the checkPingSending and the pongHandler. All the variable below this point
+ // need to be syncronized with the mutex.
+ lastPingMu deadlock.Mutex
+
+ // lastPingID is the last ping ID, a monotonic growing number used to ensure
+ // that the pong message we've receive corresponds to the latest ping message
+ // that we've sent.
+ lastPingID uint64
+
+ // lastPingReceivedCounter stores message counter at the time we sent the ping.
+ // In order to ensure the timing accuracy, we want to have no other messages
+ // being exchanged. This, of course, would only delay the ping-pong until a
+ // better measurement could be taken.
+ lastPingReceivedCounter uint64
+
+ // lastPingSentTimeSynced, as stated above, is the syncronized version of lastPingSentTime.
+ // it is used only in the case where we end up sending the ping message.
+ lastPingSentTimeSynced int64
+}
+
+func (lt *latencyTracker) init(conn wsPeerWebsocketConn, cfg config.Local, initialConnectionLatency time.Duration) {
+ lt.conn = conn
+ lt.enabled = cfg.PeerPingPeriodSeconds > 0 && cfg.EnablePingHandler
+ lt.latency = int64(initialConnectionLatency)
+ lt.pingInterval = time.Duration(cfg.PeerPingPeriodSeconds) * time.Second
+ conn.SetPingHandler(lt.pingHandler)
+ conn.SetPongHandler(lt.pongHandler)
+}
+
+func (lt *latencyTracker) pingHandler(message string) error {
+ if _, err := strconv.Atoi(message); err != nil {
+ return errInvalidPingMessageContent
+ }
+ err := lt.conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(pongMessageWriteDuration))
+ if err == websocket.ErrCloseSent {
+ return nil
+ } else if e, ok := err.(net.Error); ok && e.Temporary() {
+ return nil
+ }
+ return err
+}
+
+func (lt *latencyTracker) pongHandler(message string) error {
+ pongID, err := strconv.Atoi(message)
+ if err != nil {
+ return errInvalidPongMessageContent
+ }
+
+ lt.lastPingMu.Lock()
+ defer lt.lastPingMu.Unlock()
+
+ if uint64(pongID) != lt.lastPingID {
+ // we've sent more than one ping since; ignore this message.
+ return nil
+ }
+ if lt.receivedPacketCounter != lt.lastPingReceivedCounter {
+ // we've received other messages since the one that we sent. The timing
+ // here would not be accurate.
+ return nil
+ }
+ lastPingSentTime := time.Unix(0, lt.lastPingSentTimeSynced)
+ roundtripDuration := time.Since(lastPingSentTime)
+ atomic.StoreInt64(&lt.latency, roundtripDuration.Nanoseconds())
+ return nil
+}
+
+func (lt *latencyTracker) getConnectionLatency() time.Duration {
+ return time.Duration(atomic.LoadInt64(&lt.latency))
+}
+
+func (lt *latencyTracker) checkPingSending(now *time.Time) error {
+ if !lt.enabled {
+ return nil
+ }
+ if now.Sub(time.Unix(0, lt.lastPingSentTime)) < lt.pingInterval {
+ return nil
+ }
+
+ // it looks like it's time to send a ping :
+ lt.lastPingMu.Lock()
+ defer lt.lastPingMu.Unlock()
+
+ lt.lastPingID++
+ err := lt.conn.WriteControl(websocket.PingMessage, []byte(strconv.Itoa(int(lt.lastPingID))), now.Add(pingMessageWriteDuration))
+ if err == websocket.ErrCloseSent {
+ return nil
+ } else if e, ok := err.(net.Error); ok && e.Temporary() {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ lt.lastPingSentTimeSynced = now.UnixNano()
+ lt.lastPingReceivedCounter = atomic.LoadUint64(&lt.receivedPacketCounter)
+ lt.lastPingSentTime = lt.lastPingSentTimeSynced
+ return nil
+}
+
+func (lt *latencyTracker) increaseReceivedCounter() {
+ atomic.AddUint64(&lt.receivedPacketCounter, 1)
+}
diff --git a/network/latencyTracker_test.go b/network/latencyTracker_test.go
new file mode 100644
index 000000000..e7a62c1d7
--- /dev/null
+++ b/network/latencyTracker_test.go
@@ -0,0 +1,121 @@
+// Copyright (C) 2019-2021 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "context"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/algorand/go-algorand/protocol"
+ "github.com/algorand/go-algorand/test/partitiontest"
+)
+
+func TestLatencyTracker(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ netA := makeTestFilterWebsocketNode(t, "a")
+ netA.config.GossipFanout = 1
+ netA.config.PeerPingPeriodSeconds = 2
+ netA.Start()
+ defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+
+ netB := makeTestFilterWebsocketNode(t, "b")
+ netB.config.GossipFanout = 1
+ addrA, postListen := netA.Address()
+ require.True(t, postListen)
+ t.Log(addrA)
+ netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
+
+ netB.Start()
+ defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ counter := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})}
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
+ debugTag2 := protocol.ProposalPayloadTag
+ counter2 := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})}
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: debugTag2, MessageHandler: counter2}})
+
+ readyTimeout := time.NewTimer(2 * time.Second)
+ waitReady(t, netA, readyTimeout.C)
+ waitReady(t, netB, readyTimeout.C)
+
+ msg := make([]byte, 200)
+ rand.Read(msg)
+ var lastMsgTime time.Time
+
+ var connLatencyInitialA time.Duration
+ // wait for up to 20 seconds for the network latency to be established.
+ startTime := time.Now()
+ for {
+ if time.Since(lastMsgTime) > 100*time.Millisecond {
+ netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
+ lastMsgTime = time.Now()
+ }
+
+ connLatencyA := netA.peers[0].GetConnectionLatency()
+ if connLatencyA == time.Duration(0) {
+ require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds())
+ time.Sleep(time.Millisecond)
+ continue
+ }
+ require.LessOrEqual(t, connLatencyA.Nanoseconds(), (20 * time.Second).Nanoseconds())
+ connLatencyInitialA = connLatencyA
+ break
+ }
+
+ // wait for up to 20 seconds for the network latency to be established.
+ startTime = time.Now()
+ lastMsgTime = time.Time{}
+ for {
+ if time.Since(lastMsgTime) > 100*time.Millisecond {
+ netB.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
+ lastMsgTime = time.Now()
+ }
+
+ connLatencyB := netB.peers[0].GetConnectionLatency()
+ if connLatencyB == time.Duration(0) {
+ require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds())
+ time.Sleep(time.Millisecond)
+ continue
+ }
+ require.LessOrEqual(t, connLatencyB.Nanoseconds(), (20 * time.Second).Nanoseconds())
+ break
+ }
+
+ // send the given message until we get a different latency.
+ // wait for up to 20 seconds for the network latency to be established.
+ startTime = time.Now()
+ lastMsgTime = time.Time{}
+ for {
+ if time.Since(lastMsgTime) > 100*time.Millisecond {
+ netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
+ lastMsgTime = time.Now()
+ }
+
+ connLatencyA := netA.peers[0].GetConnectionLatency()
+ if connLatencyA != connLatencyInitialA {
+ require.NotEqual(t, connLatencyA.Nanoseconds(), int64(0))
+ waitTime := time.Since(lastMsgTime)
+ require.Less(t, waitTime.Seconds(), float64(netA.config.PeerPingPeriodSeconds*2))
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
+}
diff --git a/network/ping.go b/network/ping.go
deleted file mode 100644
index 02915b49f..000000000
--- a/network/ping.go
+++ /dev/null
@@ -1,70 +0,0 @@
-// Copyright (C) 2019-2021 Algorand, Inc.
-// This file is part of go-algorand
-//
-// go-algorand is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// go-algorand is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
-
-package network
-
-import (
- "bytes"
- "context"
- "time"
-
- "github.com/algorand/go-algorand/crypto"
- "github.com/algorand/go-algorand/protocol"
-)
-
-func pingHandler(message IncomingMessage) OutgoingMessage {
- if len(message.Data) > 8 {
- return OutgoingMessage{}
- }
- message.Net.(*WebsocketNetwork).log.Debugf("ping from peer %#v", message.Sender.(*wsPeer).wsPeerCore)
- peer := message.Sender.(*wsPeer)
- tbytes := []byte(protocol.PingReplyTag)
- mbytes := make([]byte, len(tbytes)+len(message.Data))
- copy(mbytes, tbytes)
- copy(mbytes[len(tbytes):], message.Data)
- var digest crypto.Digest // leave blank, ping message too short
- peer.writeNonBlock(context.Background(), mbytes, false, digest, time.Now(), nil)
- return OutgoingMessage{}
-}
-
-func pingReplyHandler(message IncomingMessage) OutgoingMessage {
- log := message.Net.(*WebsocketNetwork).log
- now := time.Now()
- peer := message.Sender.(*wsPeer)
- peer.pingLock.Lock()
- defer peer.pingLock.Unlock()
- if !peer.pingInFlight {
- log.Infof("ping reply with non in flight from %s", peer.rootURL)
- return OutgoingMessage{}
- }
- if len(peer.pingData) != len(message.Data) {
- log.Infof("ping reply with wrong length want %d got %d, from %s", len(peer.pingData), len(message.Data), peer.rootURL)
- return OutgoingMessage{}
- }
- if 0 != bytes.Compare(peer.pingData, message.Data) {
- log.Infof("ping reply with wrong data from %s", peer.rootURL)
- return OutgoingMessage{}
- }
- peer.pingInFlight = false
- peer.lastPingRoundTripTime = now.Sub(peer.pingSent)
- log.Debugf("ping returned in %s from %s", peer.lastPingRoundTripTime, message.Sender.(*wsPeer).rootURL)
- return OutgoingMessage{}
-}
-
-var pingHandlers = []TaggedMessageHandler{
- {protocol.PingTag, HandlerFunc(pingHandler)},
- {protocol.PingReplyTag, HandlerFunc(pingReplyHandler)},
-}
diff --git a/network/ping_test.go b/network/ping_test.go
deleted file mode 100644
index 85b1ef2c3..000000000
--- a/network/ping_test.go
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright (C) 2019-2021 Algorand, Inc.
-// This file is part of go-algorand
-//
-// go-algorand is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// go-algorand is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
-
-package network
-
-import (
- "testing"
- "time"
-
- "github.com/algorand/go-algorand/test/partitiontest"
- "github.com/stretchr/testify/require"
-)
-
-// for two node network, check that B can ping A and get a reply
-func TestPing(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- netA := makeTestWebsocketNode(t)
- netA.config.GossipFanout = 1
- netA.config.PeerPingPeriodSeconds = 5
- netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
- netB := makeTestWebsocketNode(t)
- netB.config.GossipFanout = 1
- netB.config.PeerPingPeriodSeconds = 5
- addrA, postListen := netA.Address()
- require.True(t, postListen)
- t.Log(addrA)
- netB.phonebook = MakePhonebook(1, 1*time.Millisecond)
- netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
- netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
-
- readyTimeout := time.NewTimer(2 * time.Second)
- waitReady(t, netA, readyTimeout.C)
- t.Log("a ready")
- waitReady(t, netB, readyTimeout.C)
- t.Log("b ready")
-
- bpeers := netB.GetPeers(PeersConnectedOut)
- require.Equal(t, 1, len(bpeers))
-
- peer := bpeers[0].(*wsPeer)
- prePing := time.Now()
- peer.sendPing()
- const waitStep = 10 * time.Millisecond
- for i := 1; i <= 100; i++ {
- time.Sleep(waitStep)
- _, lastPingRoundTripTime := peer.pingTimes()
- if lastPingRoundTripTime > 0 {
- postPing := time.Now()
- testTime := postPing.Sub(prePing)
- if lastPingRoundTripTime < testTime {
- // success
- return
- }
- t.Fatalf("ping came back with bogus time %s after %s test waiting", lastPingRoundTripTime, testTime)
- }
- }
- t.FailNow()
-}
diff --git a/network/wsNetwork.go b/network/wsNetwork.go
index 139bcc487..1a86eadf0 100644
--- a/network/wsNetwork.go
+++ b/network/wsNetwork.go
@@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io/ioutil"
- "math"
"net"
"net/http"
"net/textproto"
@@ -31,7 +30,6 @@ import (
"path"
"regexp"
"runtime"
- "sort"
"strconv"
"strings"
"sync"
@@ -789,9 +787,6 @@ func (wn *WebsocketNetwork) Start() {
wn.scheme = "http"
}
wn.meshUpdateRequests <- meshRequest{false, nil}
- if wn.config.EnablePingHandler {
- wn.RegisterHandlers(pingHandlers)
- }
if wn.prioScheme != nil {
wn.RegisterHandlers(prioHandlers)
}
@@ -801,10 +796,7 @@ func (wn *WebsocketNetwork) Start() {
}
wn.wg.Add(1)
go wn.meshThread()
- if wn.config.PeerPingPeriodSeconds > 0 {
- wn.wg.Add(1)
- go wn.pingThread()
- }
+
// we shouldn't have any ticker here.. but in case we do - just stop it.
if wn.peersConnectivityCheckTicker != nil {
wn.peersConnectivityCheckTicker.Stop()
@@ -1785,81 +1777,6 @@ func (wn *WebsocketNetwork) prioWeightRefresh() {
}
}
-// Wake up the thread to do work this often.
-const pingThreadPeriod = 30 * time.Second
-
-// If ping stats are older than this, don't include in metrics.
-const maxPingAge = 30 * time.Minute
-
-// pingThread wakes up periodically to refresh the ping times on peers and update the metrics gauges.
-func (wn *WebsocketNetwork) pingThread() {
- defer wn.wg.Done()
- ticker := time.NewTicker(pingThreadPeriod)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- case <-wn.ctx.Done():
- return
- }
- sendList := wn.peersToPing()
- wn.log.Debugf("ping %d peers...", len(sendList))
- for _, peer := range sendList {
- if !peer.sendPing() {
- // if we failed to send a ping, see how long it was since last successful ping.
- lastPingSent, _ := peer.pingTimes()
- wn.log.Infof("failed to ping to %v for the past %f seconds", peer, time.Now().Sub(lastPingSent).Seconds())
- }
- }
- }
-}
-
-// Walks list of peers, gathers list of peers to ping, also calculates statistics.
-func (wn *WebsocketNetwork) peersToPing() []*wsPeer {
- wn.peersLock.RLock()
- defer wn.peersLock.RUnlock()
- // Never flood outbound traffic by trying to ping all the peers at once.
- // Send to at most one fifth of the peers.
- maxSend := 1 + (len(wn.peers) / 5)
- out := make([]*wsPeer, 0, maxSend)
- now := time.Now()
- // a list to sort to find median
- times := make([]float64, 0, len(wn.peers))
- var min = math.MaxFloat64
- var max float64
- var sum float64
- pingPeriod := time.Duration(wn.config.PeerPingPeriodSeconds) * time.Second
- for _, peer := range wn.peers {
- lastPingSent, lastPingRoundTripTime := peer.pingTimes()
- sendToNow := now.Sub(lastPingSent)
- if (sendToNow > pingPeriod) && (len(out) < maxSend) {
- out = append(out, peer)
- }
- if (lastPingRoundTripTime > 0) && (sendToNow < maxPingAge) {
- ftime := lastPingRoundTripTime.Seconds()
- sum += ftime
- times = append(times, ftime)
- if ftime < min {
- min = ftime
- }
- if ftime > max {
- max = ftime
- }
- }
- }
- if len(times) != 0 {
- sort.Float64s(times)
- median := times[len(times)/2]
- medianPing.Set(median, nil)
- mean := sum / float64(len(times))
- meanPing.Set(mean, nil)
- minPing.Set(min, nil)
- maxPing.Set(max, nil)
- wn.log.Infof("ping times min=%f mean=%f median=%f max=%f", min, mean, median, max)
- }
- return out
-}
-
func (wn *WebsocketNetwork) getDNSAddrs(dnsBootstrap string) (relaysAddresses []string, archiverAddresses []string) {
var err error
relaysAddresses, err = tools_network.ReadFromSRV("algobootstrap", "tcp", dnsBootstrap, wn.config.FallbackDNSResolverAddress, wn.config.DNSSecuritySRVEnforced())
diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go
index 6571f5b9e..1a5a3d427 100644
--- a/network/wsNetwork_test.go
+++ b/network/wsNetwork_test.go
@@ -614,6 +614,12 @@ func (nc *nopConn) SetReadLimit(limit int64) {
func (nc *nopConn) CloseWithoutFlush() error {
return nil
}
+func (nc *nopConn) SetPingHandler(h func(appData string) error) {
+
+}
+func (nc *nopConn) SetPongHandler(h func(appData string) error) {
+
+}
var nopConnSingleton = nopConn{}
diff --git a/network/wsPeer.go b/network/wsPeer.go
index 75e7a9633..c661629aa 100644
--- a/network/wsPeer.go
+++ b/network/wsPeer.go
@@ -90,6 +90,8 @@ type wsPeerWebsocketConn interface {
WriteControl(int, []byte, time.Time) error
SetReadLimit(int64)
CloseWithoutFlush() error
+ SetPingHandler(h func(appData string) error)
+ SetPongHandler(h func(appData string) error)
}
type sendMessage struct {
@@ -137,7 +139,7 @@ type wsPeer struct {
// lastPacketTime contains the UnixNano at the last time a successful communication was made with the peer.
// "successful communication" above refers to either reading from or writing to a connection without receiving any
// error.
- // we want this to be a 64-bit aligned for atomics.
+ // we want this to be a 64-bit aligned for atomics support on 32bit platforms.
lastPacketTime int64
// intermittentOutgoingMessageEnqueueTime contains the UnixNano of the message's enqueue time that is currently being written to the
@@ -173,11 +175,7 @@ type wsPeer struct {
processed chan struct{}
- pingLock deadlock.Mutex
- pingSent time.Time
- pingData []byte
- pingInFlight bool
- lastPingRoundTripTime time.Duration
+ latencyTracker latencyTracker
// Hint about position in wn.peers. Definitely valid if the peer
// is present in wn.peers.
@@ -250,6 +248,7 @@ type UnicastPeer interface {
Request(ctx context.Context, tag Tag, topics Topics) (resp *Response, e error)
Respond(ctx context.Context, reqMsg IncomingMessage, topics Topics) (e error)
IsOutgoing() bool
+ GetConnectionLatency() time.Duration
}
// Create a wsPeerCore object
@@ -285,6 +284,11 @@ func (wp *wsPeer) IsOutgoing() bool {
return wp.outgoing
}
+// GetConnectionLatency returns the connection latency between the local node and this peer.
+func (wp *wsPeer) GetConnectionLatency() time.Duration {
+ return wp.latencyTracker.getConnectionLatency()
+}
+
// Unicast sends the given bytes to this specific peer. Does not wait for message to be sent.
// (Implements UnicastPeer)
func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, callback UnicastWebsocketMessageStateCallback) error {
@@ -378,6 +382,13 @@ func (wp *wsPeer) init(config config.Local, sendBufferLength int) {
wp.sendMessageTag = txSendMsgTags
}
+ wp.latencyTracker.init(wp.conn, config, time.Duration(0))
+ // send a ping right away.
+ now := time.Now()
+ if err := wp.latencyTracker.checkPingSending(&now); err != nil {
+ wp.net.log.Infof("failed to send ping message to peer : %v", err)
+ }
+
wp.wg.Add(2)
go wp.readLoop()
go wp.writeLoop()
@@ -446,6 +457,7 @@ func (wp *wsPeer) readLoop() {
wp.reportReadErr(err)
return
}
+ wp.latencyTracker.increaseReceivedCounter()
msg.processing = wp.processed
msg.Received = time.Now().UnixNano()
msg.Data = slurper.Bytes()
@@ -639,7 +651,8 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
}
// check if this message was waiting in the queue for too long. If this is the case, return "true" to indicate that we want to close the connection.
- msgWaitDuration := time.Now().Sub(msg.enqueued)
+ now := time.Now()
+ msgWaitDuration := now.Sub(msg.enqueued)
if msgWaitDuration > maxMessageQueueDuration {
wp.net.log.Warnf("peer stale enqueued message %dms", msgWaitDuration.Nanoseconds()/1000000)
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "stale message"})
@@ -651,6 +664,12 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
}
return disconnectStaleWrite
}
+
+ // is it time to send a ping message ?
+ if err := wp.latencyTracker.checkPingSending(&now); err != nil {
+ wp.net.log.Infof("failed to send ping message to peer : %v", err)
+ }
+
atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, msg.enqueued.UnixNano())
defer atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, 0)
err := wp.conn.WriteMessage(websocket.BinaryMessage, msg.data)
@@ -772,42 +791,6 @@ func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio
return false
}
-const pingLength = 8
-const maxPingWait = 60 * time.Second
-
-// sendPing sends a ping block to the peer.
-// return true if either a ping request was enqueued or there is already ping request in flight in the past maxPingWait time.
-func (wp *wsPeer) sendPing() bool {
- wp.pingLock.Lock()
- defer wp.pingLock.Unlock()
- now := time.Now()
- if wp.pingInFlight && (now.Sub(wp.pingSent) < maxPingWait) {
- return true
- }
-
- tagBytes := []byte(protocol.PingTag)
- mbytes := make([]byte, len(tagBytes)+pingLength)
- copy(mbytes, tagBytes)
- crypto.RandBytes(mbytes[len(tagBytes):])
- wp.pingData = mbytes[len(tagBytes):]
- sent := wp.writeNonBlock(context.Background(), mbytes, false, crypto.Digest{}, time.Now(), nil) // todo : we might want to use the callback function to figure a more precise sending time.
-
- if sent {
- wp.pingInFlight = true
- wp.pingSent = now
- }
- return sent
-}
-
-// get some times out of the peer while observing the ping data lock
-func (wp *wsPeer) pingTimes() (lastPingSent time.Time, lastPingRoundTripTime time.Duration) {
- wp.pingLock.Lock()
- defer wp.pingLock.Unlock()
- lastPingSent = wp.pingSent
- lastPingRoundTripTime = wp.lastPingRoundTripTime
- return
-}
-
// called when the connection had an error or closed remotely
func (wp *wsPeer) internalClose(reason disconnectReason) {
if atomic.CompareAndSwapInt32(&wp.didSignalClose, 0, 1) {
diff --git a/node/txnSyncConn.go b/node/txnSyncConn.go
index 1a88b7998..00f5607ea 100644
--- a/node/txnSyncConn.go
+++ b/node/txnSyncConn.go
@@ -156,6 +156,11 @@ func (tsnc *transactionSyncNodeConnector) SendPeerMessage(netPeer interface{}, m
}
}
+func (tsnc *transactionSyncNodeConnector) GetPeerLatency(netPeer interface{}) time.Duration {
+ unicastPeer := netPeer.(network.UnicastPeer)
+ return unicastPeer.GetConnectionLatency()
+}
+
// GetPendingTransactionGroups is called by the transaction sync when it needs to look into the transaction
// pool and get the updated set of pending transactions. The second returned argument is the latest locally originated
// group counter within the given transaction groups list. If there is no group that is locally originated, the expected
@@ -215,7 +220,7 @@ func (tsnc *transactionSyncNodeConnector) Handle(raw network.IncomingMessage) ne
peer = peerData.(*txnsync.Peer)
}
- err := tsnc.messageHandler(raw.Sender, peer, raw.Data, raw.Sequence)
+ err := tsnc.messageHandler(raw.Sender, peer, raw.Data, raw.Sequence, raw.Received)
if err != nil {
return network.OutgoingMessage{
Action: network.Disconnect,
diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go
index 5b16ae87b..38d07aba1 100644
--- a/rpcs/blockService_test.go
+++ b/rpcs/blockService_test.go
@@ -53,6 +53,11 @@ func (mup *mockUnicastPeer) Version() string {
func (mup *mockUnicastPeer) IsOutgoing() bool {
return false
}
+
+// GetConnectionLatency returns the connection latency between the local node and this peer.
+func (mup *mockUnicastPeer) GetConnectionLatency() time.Duration {
+ return time.Duration(0)
+}
func (mup *mockUnicastPeer) Request(ctx context.Context, tag network.Tag, topics network.Topics) (resp *network.Response, e error) {
return nil, nil
}
diff --git a/test/testdata/configs/config-v18.json b/test/testdata/configs/config-v18.json
new file mode 100644
index 000000000..b1a977e64
--- /dev/null
+++ b/test/testdata/configs/config-v18.json
@@ -0,0 +1,96 @@
+{
+ "Version": 18,
+ "AccountUpdatesStatsInterval": 5000000000,
+ "AccountsRebuildSynchronousMode": 1,
+ "AnnounceParticipationKey": true,
+ "Archival": false,
+ "BaseLoggerDebugLevel": 4,
+ "BlockServiceCustomFallbackEndpoints": "",
+ "BroadcastConnectionsLimit": -1,
+ "CadaverSizeTarget": 1073741824,
+ "CatchpointFileHistoryLength": 365,
+ "CatchpointInterval": 10000,
+ "CatchpointTracking": 0,
+ "CatchupBlockDownloadRetryAttempts": 1000,
+ "CatchupBlockValidateMode": 0,
+ "CatchupFailurePeerRefreshRate": 10,
+ "CatchupGossipBlockFetchTimeoutSec": 4,
+ "CatchupHTTPBlockFetchTimeoutSec": 4,
+ "CatchupLedgerDownloadRetryAttempts": 50,
+ "CatchupParallelBlocks": 16,
+ "ConnectionsRateLimitingCount": 60,
+ "ConnectionsRateLimitingWindowSeconds": 1,
+ "DNSBootstrapID": "<network>.algorand.network",
+ "DNSSecurityFlags": 1,
+ "DeadlockDetection": 0,
+ "DisableLocalhostConnectionRateLimit": true,
+ "DisableNetworking": false,
+ "DisableOutgoingConnectionThrottling": false,
+ "EnableAccountUpdatesStats": false,
+ "EnableAgreementReporting": false,
+ "EnableAgreementTimeMetrics": false,
+ "EnableAssembleStats": false,
+ "EnableBlockService": false,
+ "EnableBlockServiceFallbackToArchiver": true,
+ "EnableCatchupFromArchiveServers": false,
+ "EnableDeveloperAPI": false,
+ "EnableGossipBlockService": true,
+ "EnableIncomingMessageFilter": false,
+ "EnableLedgerService": false,
+ "EnableMetricReporting": false,
+ "EnableOutgoingNetworkMessageFiltering": true,
+ "EnablePingHandler": true,
+ "EnableProcessBlockStats": false,
+ "EnableProfiler": false,
+ "EnableRequestLogger": false,
+ "EnableTopAccountsReporting": false,
+ "EnableVerbosedTransactionSyncLogging": false,
+ "EndpointAddress": "127.0.0.1:0",
+ "FallbackDNSResolverAddress": "",
+ "ForceFetchTransactions": false,
+ "ForceRelayMessages": false,
+ "GossipFanout": 4,
+ "IncomingConnectionsLimit": 800,
+ "IncomingMessageFilterBucketCount": 5,
+ "IncomingMessageFilterBucketSize": 512,
+ "IsIndexerActive": false,
+ "LedgerSynchronousMode": 2,
+ "LogArchiveMaxAge": "",
+ "LogArchiveName": "node.archive.log",
+ "LogSizeLimit": 1073741824,
+ "MaxCatchpointDownloadDuration": 7200000000000,
+ "MaxConnectionsPerIP": 30,
+ "MinCatchpointFileDownloadBytesPerSecond": 20480,
+ "NetAddress": "",
+ "NetworkMessageTraceServer": "",
+ "NetworkProtocolVersion": "",
+ "NodeExporterListenAddress": ":9100",
+ "NodeExporterPath": "./node_exporter",
+ "OptimizeAccountsDatabaseOnStartup": false,
+ "OutgoingMessageFilterBucketCount": 3,
+ "OutgoingMessageFilterBucketSize": 128,
+ "ParticipationKeysRefreshInterval": 60000000000,
+ "PeerConnectionsUpdateInterval": 3600,
+ "PeerPingPeriodSeconds": 10,
+ "PriorityPeers": {},
+ "PublicAddress": "",
+ "ReconnectTime": 60000000000,
+ "ReservedFDs": 256,
+ "RestReadTimeoutSeconds": 15,
+ "RestWriteTimeoutSeconds": 120,
+ "RunHosted": false,
+ "SuggestedFeeBlockHistory": 3,
+ "SuggestedFeeSlidingWindowSize": 50,
+ "TLSCertFile": "",
+ "TLSKeyFile": "",
+ "TelemetryToLog": true,
+ "TransactionSyncDataExchangeRate": 0,
+ "TransactionSyncSignificantMessageThreshold": 0,
+ "TxPoolExponentialIncreaseFactor": 2,
+ "TxPoolSize": 15000,
+ "TxSyncIntervalSeconds": 60,
+ "TxSyncServeResponseSize": 1000000,
+ "TxSyncTimeoutSeconds": 30,
+ "UseXForwardedForAddressField": "",
+ "VerifiedTranscationsCacheSize": 30000
+}
diff --git a/txnsync/bloomFilter_test.go b/txnsync/bloomFilter_test.go
index 71a0d4151..57a1635fc 100644
--- a/txnsync/bloomFilter_test.go
+++ b/txnsync/bloomFilter_test.go
@@ -20,6 +20,7 @@ import (
"encoding/binary"
"math/rand"
"testing"
+ "time"
"github.com/stretchr/testify/require"
@@ -357,6 +358,11 @@ func (fn *justRandomFakeNode) UpdatePeers(txsyncPeers []*Peer, netPeers []interf
}
func (fn *justRandomFakeNode) SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback) {
}
+
+func (fn *justRandomFakeNode) GetPeerLatency(netPeer interface{}) time.Duration {
+ return 0
+}
+
func (fn *justRandomFakeNode) GetPendingTransactionGroups() (txGroups []pooldata.SignedTxGroup, latestLocallyOriginatedGroupCounter uint64) {
return
}
diff --git a/txnsync/emulatorNode_test.go b/txnsync/emulatorNode_test.go
index 27bb75115..378f1622f 100644
--- a/txnsync/emulatorNode_test.go
+++ b/txnsync/emulatorNode_test.go
@@ -278,6 +278,10 @@ func (n *emulatedNode) SendPeerMessage(netPeer interface{}, msg []byte, callback
peer.outSeq++
}
+func (n *emulatedNode) GetPeerLatency(netPeer interface{}) time.Duration {
+ return 0
+}
+
func (n *emulatedNode) GetPendingTransactionGroups() ([]pooldata.SignedTxGroup, uint64) {
return n.txpoolEntries, n.latestLocallyOriginatedGroupCounter
}
@@ -356,7 +360,7 @@ func (n *emulatedNode) step() {
peer.mu.Unlock()
- msgHandler(peer, peer.peer, msgBytes, msgInSeq)
+ msgHandler(peer, peer.peer, msgBytes, msgInSeq, 0)
n.unblock()
n.waitBlocked()
peer.mu.Lock()
diff --git a/txnsync/incoming.go b/txnsync/incoming.go
index d6365a6de..83ac416f1 100644
--- a/txnsync/incoming.go
+++ b/txnsync/incoming.go
@@ -20,8 +20,6 @@ import (
"errors"
"time"
- "github.com/algorand/go-deadlock"
-
"github.com/algorand/go-algorand/data/pooldata"
)
@@ -40,75 +38,12 @@ type incomingMessage struct {
encodedSize int // the byte length of the incoming network message
bloomFilter *testableBloomFilter
transactionGroups []pooldata.SignedTxGroup
-}
-
-// incomingMessageQueue manages the global incoming message queue across all the incoming peers.
-type incomingMessageQueue struct {
- incomingMessages chan incomingMessage
- enqueuedPeers map[*Peer]struct{}
- enqueuedPeersMu deadlock.Mutex
-}
-
-// maxPeersCount defines the maximum number of supported peers that can have their messages waiting
-// in the incoming message queue at the same time. This number can be lower then the actual number of
-// connected peers, as it's used only for pending messages.
-const maxPeersCount = 1024
-
-// makeIncomingMessageQueue creates an incomingMessageQueue object and initializes all the internal variables.
-func makeIncomingMessageQueue() incomingMessageQueue {
- return incomingMessageQueue{
- incomingMessages: make(chan incomingMessage, maxPeersCount),
- enqueuedPeers: make(map[*Peer]struct{}, maxPeersCount),
- }
-}
-
-// getIncomingMessageChannel returns the incoming messages channel, which would contain entries once
-// we have one ( or more ) pending incoming messages.
-func (imq *incomingMessageQueue) getIncomingMessageChannel() <-chan incomingMessage {
- return imq.incomingMessages
-}
-
-// enqueue places the given message on the queue, if and only if it's associated peer doesn't
-// appear on the incoming message queue already. In the case there is no peer, the message
-// would be placed on the queue as is.
-// The method returns false if the incoming message doesn't have it's peer on the queue and
-// the method has failed to place the message on the queue. True is returned otherwise.
-func (imq *incomingMessageQueue) enqueue(m incomingMessage) bool {
- if m.peer != nil {
- imq.enqueuedPeersMu.Lock()
- defer imq.enqueuedPeersMu.Unlock()
- if _, has := imq.enqueuedPeers[m.peer]; has {
- return true
- }
- }
- select {
- case imq.incomingMessages <- m:
- // if we successfully enqueued the message, set the enqueuedPeers so that we won't enqueue the same peer twice.
- if m.peer != nil {
- // at this time, the enqueuedPeersMu is still under lock ( due to the above defer ), so we can access
- // the enqueuedPeers here.
- imq.enqueuedPeers[m.peer] = struct{}{}
- }
- return true
- default:
- return false
- }
-}
-
-// clear removes the peer that is associated with the message ( if any ) from
-// the enqueuedPeers map, allowing future messages from this peer to be placed on the
-// incoming message queue.
-func (imq *incomingMessageQueue) clear(m incomingMessage) {
- if m.peer != nil {
- imq.enqueuedPeersMu.Lock()
- defer imq.enqueuedPeersMu.Unlock()
- delete(imq.enqueuedPeers, m.peer)
- }
+ timeReceived int64
}
// incomingMessageHandler
// note - this message is called by the network go-routine dispatch pool, and is not synchronized with the rest of the transaction synchronizer
-func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64) (err error) {
+func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64, receivedTimestamp int64) (err error) {
// increase number of incoming messages metric.
txsyncIncomingMessagesTotal.Inc(nil)
@@ -120,17 +55,19 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
}
}()
- incomingMessage := incomingMessage{networkPeer: networkPeer, sequenceNumber: sequenceNumber, encodedSize: len(message), peer: peer}
+ incomingMessage := incomingMessage{networkPeer: networkPeer, sequenceNumber: sequenceNumber, encodedSize: len(message), peer: peer, timeReceived: receivedTimestamp}
_, err = incomingMessage.message.UnmarshalMsg(message)
if err != nil {
// if we received a message that we cannot parse, disconnect.
s.log.Infof("received unparsable transaction sync message from peer. disconnecting from peer.")
+ s.incomingMessagesQ.erase(peer, networkPeer)
return err
}
if incomingMessage.message.Version != txnBlockMessageVersion {
// we receive a message from a version that we don't support, disconnect.
s.log.Infof("received unsupported transaction sync message version from peer (%d). disconnecting from peer.", incomingMessage.message.Version)
+ s.incomingMessagesQ.erase(peer, networkPeer)
return errUnsupportedTransactionSyncMessageVersion
}
@@ -139,6 +76,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
bloomFilter, err := decodeBloomFilter(incomingMessage.message.TxnBloomFilter)
if err != nil {
s.log.Infof("Invalid bloom filter received from peer : %v", err)
+ s.incomingMessagesQ.erase(peer, networkPeer)
return errInvalidBloomFilter
}
incomingMessage.bloomFilter = bloomFilter
@@ -150,6 +88,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
incomingMessage.transactionGroups, err = decodeTransactionGroups(incomingMessage.message.TransactionGroups, s.genesisID, s.genesisHash)
if err != nil {
s.log.Infof("failed to decode received transactions groups: %v\n", err)
+ s.incomingMessagesQ.erase(peer, networkPeer)
return errDecodingReceivedTransactionGroupsFailed
}
@@ -158,10 +97,21 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
// all the peer objects are created synchronously.
enqueued := s.incomingMessagesQ.enqueue(incomingMessage)
if !enqueued {
- // if we can't enqueue that, return an error, which would disconnect the peer.
- // ( we have to disconnect, since otherwise, we would have no way to synchronize the sequence number)
- s.log.Infof("unable to enqueue incoming message from a peer without txsync allocated data; incoming messages queue is full. disconnecting from peer.")
- return errTransactionSyncIncomingMessageQueueFull
+ // if we failed to enqueue, it means that the queue is full. Try to remove disconnected
+ // peers from the queue before re-attempting.
+ peers := s.node.GetPeers()
+ if s.incomingMessagesQ.prunePeers(peers) {
+ // if we were successful in removing at least a single peer, then try to add the entry again.
+ enqueued = s.incomingMessagesQ.enqueue(incomingMessage)
+ }
+ if !enqueued {
+ // if we can't enqueue that, return an error, which would disconnect the peer.
+ // ( we have to disconnect, since otherwise, we would have no way to synchronize the sequence number)
+ s.log.Infof("unable to enqueue incoming message from a peer without txsync allocated data; incoming messages queue is full. disconnecting from peer.")
+ s.incomingMessagesQ.erase(peer, networkPeer)
+ return errTransactionSyncIncomingMessageQueueFull
+ }
+
}
return nil
}
@@ -170,15 +120,26 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
if err != nil {
// if the incoming message queue for this peer is full, disconnect from this peer.
s.log.Infof("unable to enqueue incoming message into peer incoming message backlog. disconnecting from peer.")
+ s.incomingMessagesQ.erase(peer, networkPeer)
return err
}
// (maybe) place the peer message on the main queue. This would get skipped if the peer is already on the queue.
enqueued := s.incomingMessagesQ.enqueue(incomingMessage)
if !enqueued {
- // if we can't enqueue that, return an error, which would disconnect the peer.
- s.log.Infof("unable to enqueue incoming message from a peer with txsync allocated data; incoming messages queue is full. disconnecting from peer.")
- return errTransactionSyncIncomingMessageQueueFull
+ // if we failed to enqueue, it means that the queue is full. Try to remove disconnected
+ // peers from the queue before re-attempting.
+ peers := s.node.GetPeers()
+ if s.incomingMessagesQ.prunePeers(peers) {
+ // if we were successful in removing at least a single peer, then try to add the entry again.
+ enqueued = s.incomingMessagesQ.enqueue(incomingMessage)
+ }
+ if !enqueued {
+ // if we can't enqueue that, return an error, which would disconnect the peer.
+ s.log.Infof("unable to enqueue incoming message from a peer with txsync allocated data; incoming messages queue is full. disconnecting from peer.")
+ s.incomingMessagesQ.erase(peer, networkPeer)
+ return errTransactionSyncIncomingMessageQueueFull
+ }
}
return nil
}
@@ -194,7 +155,7 @@ func (s *syncState) evaluateIncomingMessage(message incomingMessage) {
}
if peerInfo.TxnSyncPeer == nil {
// we couldn't really do much about this message previously, since we didn't have the peer.
- peer = makePeer(message.networkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log)
+ peer = makePeer(message.networkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log, s.node.GetPeerLatency(message.networkPeer))
// let the network peer object know about our peer
s.node.UpdatePeers([]*Peer{peer}, []interface{}{message.networkPeer}, 0)
} else {
@@ -207,9 +168,7 @@ func (s *syncState) evaluateIncomingMessage(message incomingMessage) {
return
}
}
- // clear the peer that is associated with this incoming message from the message queue, allowing future
- // messages from the peer to be placed on the message queue.
- s.incomingMessagesQ.clear(message)
+
messageProcessed := false
transactionPoolSize := 0
totalAccumulatedTransactionsCount := 0 // the number of transactions that were added during the execution of this method
@@ -249,7 +208,11 @@ incomingMessageLoop:
}
peer.updateRequestParams(incomingMsg.message.UpdatedRequestParams.Modulator, incomingMsg.message.UpdatedRequestParams.Offset)
- peer.updateIncomingMessageTiming(incomingMsg.message.MsgSync, s.round, s.clock.Since(), incomingMsg.encodedSize)
+ timeInQueue := time.Duration(0)
+ if incomingMsg.timeReceived > 0 {
+ timeInQueue = time.Since(time.Unix(0, incomingMsg.timeReceived))
+ }
+ peer.updateIncomingMessageTiming(incomingMsg.message.MsgSync, s.round, s.clock.Since(), timeInQueue, peer.cachedLatency, incomingMsg.encodedSize)
// if the peer's round is more than a single round behind the local node, then we don't want to
// try and load the transactions. The other peer should first catch up before getting transactions.
diff --git a/txnsync/incomingMsgQ.go b/txnsync/incomingMsgQ.go
new file mode 100644
index 000000000..aecb673bd
--- /dev/null
+++ b/txnsync/incomingMsgQ.go
@@ -0,0 +1,372 @@
+// Copyright (C) 2019-2021 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package txnsync
+
+import (
+ "sync"
+
+ "github.com/algorand/go-deadlock"
+)
+
+// queuedMsgEntry used as a helper struct to manage the manipulation of incoming
+// message queue.
+type queuedMsgEntry struct {
+ msg incomingMessage
+ next *queuedMsgEntry
+ prev *queuedMsgEntry
+}
+
+type queuedMsgList struct {
+ head *queuedMsgEntry
+}
+
+// incomingMessageQueue manages the global incoming message queue across all the incoming peers.
+type incomingMessageQueue struct {
+ outboundPeerCh chan incomingMessage
+ enqueuedPeersMap map[*Peer]*queuedMsgEntry
+ messages queuedMsgList
+ freelist queuedMsgList
+ enqueuedPeersMu deadlock.Mutex
+ enqueuedPeersCond *sync.Cond
+ shutdownRequest chan struct{}
+ shutdownConfirmed chan struct{}
+ deletePeersCh chan interface{}
+ peerlessCount int
+}
+
+// maxPeersCount defines the maximum number of supported peers that can have their messages waiting
+// in the incoming message queue at the same time. This number can be lower then the actual number of
+// connected peers, as it's used only for pending messages.
+const maxPeersCount = 2048
+
+// maxPeerlessCount is the number of messages that we've received that doesn't have a Peer object allocated
+// for them ( yet )
+const maxPeerlessCount = 512
+
+// makeIncomingMessageQueue creates an incomingMessageQueue object and initializes all the internal variables.
+func makeIncomingMessageQueue() *incomingMessageQueue {
+ imq := &incomingMessageQueue{
+ outboundPeerCh: make(chan incomingMessage),
+ enqueuedPeersMap: make(map[*Peer]*queuedMsgEntry, maxPeersCount),
+ shutdownRequest: make(chan struct{}, 1),
+ shutdownConfirmed: make(chan struct{}, 1),
+ deletePeersCh: make(chan interface{}),
+ }
+ imq.enqueuedPeersCond = sync.NewCond(&imq.enqueuedPeersMu)
+ imq.freelist.initialize(maxPeersCount)
+ go imq.messagePump()
+ return imq
+}
+
+// dequeueHead removes the first head message from the linked list.
+func (ml *queuedMsgList) dequeueHead() (out *queuedMsgEntry) {
+ if ml.head == nil {
+ return nil
+ }
+ entry := ml.head
+ out = entry
+ if entry.next == entry {
+ ml.head = nil
+ return
+ }
+ entry.next.prev = entry.prev
+ entry.prev.next = entry.next
+ ml.head = entry.next
+ out.next = out
+ out.prev = out
+ return
+}
+
+// initialize initializes a list to have msgCount entries.
+func (ml *queuedMsgList) initialize(msgCount int) {
+ msgs := make([]queuedMsgEntry, msgCount)
+ for i := 0; i < msgCount; i++ {
+ msgs[i].next = &msgs[(i+1)%msgCount]
+ msgs[i].prev = &msgs[(i+msgCount-1)%msgCount]
+ }
+ ml.head = &msgs[0]
+}
+
+// empty methods tests to see if the linked list is empty
+func (ml *queuedMsgList) empty() bool {
+ return ml.head == nil
+}
+
+// remove removes the given msg from the linked list. The method
+// is written with the assumption that the given msg is known to be
+// part of the linked list.
+func (ml *queuedMsgList) remove(msg *queuedMsgEntry) {
+ if msg.next == msg {
+ ml.head = nil
+ return
+ }
+ msg.prev.next = msg.next
+ msg.next.prev = msg.prev
+ if ml.head == msg {
+ ml.head = msg.next
+ }
+ msg.prev = msg
+ msg.next = msg
+}
+
+// filterRemove removes zero or more messages from the linked list, for which the given
+// removeFunc returns true. The removed linked list entries are returned as a linked list.
+func (ml *queuedMsgList) filterRemove(removeFunc func(*queuedMsgEntry) bool) *queuedMsgEntry {
+ if ml.empty() {
+ return nil
+ }
+ // do we have a single item ?
+ if ml.head.next == ml.head {
+ if removeFunc(ml.head) {
+ out := ml.head
+ ml.head = nil
+ return out
+ }
+ return nil
+ }
+ current := ml.head
+ last := ml.head.prev
+ var letGo queuedMsgList
+ for {
+ next := current.next
+ if removeFunc(current) {
+ ml.remove(current)
+ letGo.enqueueTail(current)
+ }
+ if current == last {
+ break
+ }
+ current = next
+ }
+ return letGo.head
+}
+
+// enqueueTail adds to the current linked list another linked list whose head is msg.
+func (ml *queuedMsgList) enqueueTail(msg *queuedMsgEntry) {
+ if ml.head == nil {
+ ml.head = msg
+ return
+ } else if msg == nil {
+ return
+ }
+ lastEntryOld := ml.head.prev
+ lastEntryNew := msg.prev
+ lastEntryOld.next = msg
+ ml.head.prev = lastEntryNew
+ msg.prev = lastEntryOld
+ lastEntryNew.next = ml.head
+}
+
+// shutdown signals to the message pump to shut down and waits until the message pump goroutine
+// aborts.
+func (imq *incomingMessageQueue) shutdown() {
+ imq.enqueuedPeersMu.Lock()
+ close(imq.shutdownRequest)
+ imq.enqueuedPeersCond.Signal()
+ imq.enqueuedPeersMu.Unlock()
+ <-imq.shutdownConfirmed
+}
+
+// messagePump is the incoming message queue message pump. It takes messages from the messages list
+// and attempt to write these to the outboundPeerCh.
+func (imq *incomingMessageQueue) messagePump() {
+ defer close(imq.shutdownConfirmed)
+ imq.enqueuedPeersMu.Lock()
+ defer imq.enqueuedPeersMu.Unlock()
+
+ for {
+ // check if we need to shutdown.
+ select {
+ case <-imq.shutdownRequest:
+ return
+ default:
+ }
+
+ // do we have any item to enqueue ?
+ if !imq.messages.empty() {
+ msgEntry := imq.messages.dequeueHead()
+ msg := msgEntry.msg
+ imq.freelist.enqueueTail(msgEntry)
+ if msg.peer != nil {
+ delete(imq.enqueuedPeersMap, msg.peer)
+ } else {
+ imq.peerlessCount--
+ }
+ imq.enqueuedPeersMu.Unlock()
+ writeOutboundMessage:
+ select {
+ case imq.outboundPeerCh <- msg:
+ imq.enqueuedPeersMu.Lock()
+ continue
+ case <-imq.shutdownRequest:
+ imq.enqueuedPeersMu.Lock()
+ return
+ // see if this msg need to be delivered or not.
+ case droppedPeer := <-imq.deletePeersCh:
+ if msg.networkPeer == droppedPeer {
+ // we want to skip this message.
+ imq.enqueuedPeersMu.Lock()
+ continue
+ }
+ goto writeOutboundMessage
+ }
+ }
+ imq.enqueuedPeersCond.Wait()
+ }
+}
+
+// getIncomingMessageChannel returns the incoming messages channel, which would contain entries once
+// we have one ( or more ) pending incoming messages.
+func (imq *incomingMessageQueue) getIncomingMessageChannel() <-chan incomingMessage {
+ return imq.outboundPeerCh
+}
+
+// enqueue places the given message on the queue, if and only if it's associated peer doesn't
+// appear on the incoming message queue already. In the case there is no peer, the message
+// would be placed on the queue as is.
+// The method returns false if the incoming message doesn't have it's peer on the queue and
+// the method has failed to place the message on the queue. True is returned otherwise.
+func (imq *incomingMessageQueue) enqueue(m incomingMessage) bool {
+ imq.enqueuedPeersMu.Lock()
+ defer imq.enqueuedPeersMu.Unlock()
+ if m.peer != nil {
+ if _, has := imq.enqueuedPeersMap[m.peer]; has {
+ return true
+ }
+ } else {
+ // do we have enough "room" for peerless messages ?
+ if imq.peerlessCount >= maxPeerlessCount {
+ return false
+ }
+ }
+ // do we have enough room in the message queue for the new message ?
+ if imq.freelist.empty() {
+ // no - we don't have enough room in the circular buffer.
+ return false
+ }
+ freeMsgEntry := imq.freelist.dequeueHead()
+ freeMsgEntry.msg = m
+ imq.messages.enqueueTail(freeMsgEntry)
+ // if we successfully enqueued the message, set the enqueuedPeersMap so that we won't enqueue the same peer twice.
+ if m.peer != nil {
+ imq.enqueuedPeersMap[m.peer] = freeMsgEntry
+ } else {
+ imq.peerlessCount++
+ }
+ imq.enqueuedPeersCond.Signal()
+ return true
+}
+
+// erase removes all the entries associated with the given network peer.
+// this method isn't very efficient, and should be used only in cases where
+// we disconnect from a peer and want to cleanup all the pending tasks associated
+// with that peer.
+func (imq *incomingMessageQueue) erase(peer *Peer, networkPeer interface{}) {
+ imq.enqueuedPeersMu.Lock()
+
+ var peerMsgEntry *queuedMsgEntry
+ if peer == nil {
+ // lookup for a Peer object.
+ for peer, peerMsgEntry = range imq.enqueuedPeersMap {
+ if peer.networkPeer != networkPeer {
+ continue
+ }
+ break
+ }
+ } else {
+ var has bool
+ if peerMsgEntry, has = imq.enqueuedPeersMap[peer]; !has {
+ // the peer object is not in the map.
+ peer = nil
+ }
+ }
+
+ if peer != nil {
+ delete(imq.enqueuedPeersMap, peer)
+ imq.messages.remove(peerMsgEntry)
+ imq.freelist.enqueueTail(peerMsgEntry)
+ imq.enqueuedPeersMu.Unlock()
+ select {
+ case imq.deletePeersCh <- networkPeer:
+ default:
+ }
+ return
+ }
+
+ imq.removeMessageByNetworkPeer(networkPeer)
+ imq.enqueuedPeersMu.Unlock()
+ select {
+ case imq.deletePeersCh <- networkPeer:
+ default:
+ }
+}
+
+// removeMessageByNetworkPeer removes the messages associated with the given network peer from the
+// queue.
+// note : the method expect that the enqueuedPeersMu lock would be taken.
+func (imq *incomingMessageQueue) removeMessageByNetworkPeer(networkPeer interface{}) {
+ peerlessCount := 0
+ removeByNetworkPeer := func(msg *queuedMsgEntry) bool {
+ if msg.msg.networkPeer == networkPeer {
+ if msg.msg.peer == nil {
+ peerlessCount++
+ }
+ return true
+ }
+ return false
+ }
+ removeList := imq.messages.filterRemove(removeByNetworkPeer)
+ imq.freelist.enqueueTail(removeList)
+ imq.peerlessCount -= peerlessCount
+}
+
+// prunePeers removes from the enqueuedMessages queue all the entries that are not provided in the
+// given activePeers slice.
+func (imq *incomingMessageQueue) prunePeers(activePeers []PeerInfo) (peerRemoved bool) {
+ activePeersMap := make(map[*Peer]bool)
+ activeNetworkPeersMap := make(map[interface{}]bool)
+ for _, activePeer := range activePeers {
+ if activePeer.TxnSyncPeer != nil {
+ activePeersMap[activePeer.TxnSyncPeer] = true
+ }
+ if activePeer.NetworkPeer != nil {
+ activeNetworkPeersMap[activePeer.NetworkPeer] = true
+ }
+ }
+ imq.enqueuedPeersMu.Lock()
+ defer imq.enqueuedPeersMu.Unlock()
+ peerlessCount := 0
+ isPeerMissing := func(msg *queuedMsgEntry) bool {
+ if msg.msg.peer != nil {
+ if !activePeersMap[msg.msg.peer] {
+ return true
+ }
+ }
+ if !activeNetworkPeersMap[msg.msg.networkPeer] {
+ if msg.msg.peer == nil {
+ peerlessCount++
+ }
+ return true
+ }
+ return false
+ }
+ removeList := imq.messages.filterRemove(isPeerMissing)
+ peerRemoved = removeList != nil
+ imq.freelist.enqueueTail(removeList)
+ imq.peerlessCount -= peerlessCount
+ return
+}
diff --git a/txnsync/incomingMsgQ_test.go b/txnsync/incomingMsgQ_test.go
new file mode 100644
index 000000000..9ff1adf4b
--- /dev/null
+++ b/txnsync/incomingMsgQ_test.go
@@ -0,0 +1,158 @@
+// Copyright (C) 2019-2021 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
+
+package txnsync
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/algorand/go-algorand/test/partitiontest"
+)
+
+// fillMessageQueue fills the message queue with the given message.
+func (imq *incomingMessageQueue) fillMessageQueue(msg incomingMessage) {
+ imq.enqueuedPeersMu.Lock()
+ for i := 0; i < maxPeersCount; i++ {
+ msgEntry := imq.freelist.dequeueHead()
+ msgEntry.msg = msg
+ imq.messages.enqueueTail(msgEntry)
+ }
+ if msg.peer == nil {
+ imq.peerlessCount += maxPeersCount
+ }
+ imq.enqueuedPeersCond.Signal()
+ imq.enqueuedPeersMu.Unlock()
+
+ // wait for a single message to be consumed by the message pump.
+ for {
+ imq.enqueuedPeersMu.Lock()
+ if !imq.freelist.empty() {
+ break
+ }
+ imq.enqueuedPeersMu.Unlock()
+ time.Sleep(time.Millisecond)
+ }
+ for !imq.freelist.empty() {
+ msgEntry := imq.freelist.dequeueHead()
+ msgEntry.msg = msg
+ imq.messages.enqueueTail(msgEntry)
+ }
+ imq.enqueuedPeersCond.Signal()
+ imq.enqueuedPeersMu.Unlock()
+}
+
+// count counts teh number of messages in the list
+func (ml *queuedMsgList) count() int {
+ first := ml.head
+ cur := first
+ count := 0
+ for cur != nil {
+ next := cur.next
+ if next == first {
+ next = nil
+ }
+ count++
+ cur = next
+ }
+ return count
+}
+
+// validateLinking test to see the the entries in the list are correctly connected.
+func (ml *queuedMsgList) validateLinking(t *testing.T) {
+ cur := ml.head
+ if cur == nil {
+ return
+ }
+ seen := make(map[*queuedMsgEntry]bool)
+ list := make([]*queuedMsgEntry, 0)
+ for {
+ if seen[cur] {
+ break
+ }
+ seen[cur] = true
+ require.NotNil(t, cur.prev)
+ require.NotNil(t, cur.next)
+ list = append(list, cur)
+ cur = cur.next
+ }
+ for i := range list {
+ require.Equal(t, list[i], list[(i+len(list)-1)%len(list)].next)
+ require.Equal(t, list[i], list[(i+1)%len(list)].prev)
+ }
+}
+
+// TestMsgQCounts tests the message queue add/remove manipulations
+func TestMsgQCounts(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ var list queuedMsgList
+ list.initialize(7)
+ list.validateLinking(t)
+ require.Equal(t, 7, list.count())
+ list.dequeueHead()
+ list.validateLinking(t)
+ require.Equal(t, 6, list.count())
+ var anotherList queuedMsgList
+ anotherList.initialize(4)
+ require.Equal(t, 4, anotherList.count())
+ list.enqueueTail(anotherList.head)
+ list.validateLinking(t)
+ require.Equal(t, 10, list.count())
+}
+
+// TestMsgQFiltering tests the message queue filtering
+func TestMsgQFiltering(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ item1 := &queuedMsgEntry{}
+ item2 := &queuedMsgEntry{}
+ item3 := &queuedMsgEntry{}
+ item1.next = item1
+ item1.prev = item1
+ item2.next = item2
+ item2.prev = item2
+ item3.next = item3
+ item3.prev = item3
+
+ var list queuedMsgList
+ list.enqueueTail(item1)
+ list.enqueueTail(item2)
+ list.enqueueTail(item3)
+
+ // test removing head.
+ removedItem1 := list.filterRemove(func(msg *queuedMsgEntry) bool {
+ return msg == item1
+ })
+ require.Equal(t, item1, removedItem1)
+ require.Equal(t, 2, list.count())
+
+ // test removing tail
+ removedItem3 := list.filterRemove(func(msg *queuedMsgEntry) bool {
+ return msg == item3
+ })
+ require.Equal(t, item3, removedItem3)
+ require.Equal(t, 1, list.count())
+
+ // test removing last item
+ removedItem2 := list.filterRemove(func(msg *queuedMsgEntry) bool {
+ return msg == item2
+ })
+ require.Equal(t, item2, removedItem2)
+ require.True(t, list.empty())
+}
diff --git a/txnsync/incoming_test.go b/txnsync/incoming_test.go
index 62782ff84..978a1c4ca 100644
--- a/txnsync/incoming_test.go
+++ b/txnsync/incoming_test.go
@@ -58,27 +58,29 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) {
cfg := config.GetDefaultLocal()
mNodeConnector := &mockNodeConnector{transactionPoolSize: 3}
s := syncState{
- log: wrapLogger(&incLogger, &cfg),
- node: mNodeConnector,
+ log: wrapLogger(&incLogger, &cfg),
+ node: mNodeConnector,
+ clock: mNodeConnector.Clock(),
+ incomingMessagesQ: makeIncomingMessageQueue(),
}
// expect UnmarshalMsg error
messageBytes[0] = 0
- err := s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
+ err := s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
msgpe := msgp.TypeError{}
require.True(t, errors.As(err, &msgpe))
// expect wrong version error
message = transactionBlockMessage{Version: -3}
messageBytes = message.MarshalMsg(nil)
- err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
+ err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errUnsupportedTransactionSyncMessageVersion, err)
// expect error decoding bloomFilter
message.Version = 1
message.TxnBloomFilter.BloomFilterType = byte(multiHashBloomFilter)
messageBytes = message.MarshalMsg(nil)
- err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
+ err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errInvalidBloomFilter, err)
// error decoding transaction groups
@@ -89,35 +91,46 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) {
require.NoError(t, err)
message.TransactionGroups = packedTransactionGroups{Bytes: []byte{1}}
messageBytes = message.MarshalMsg(nil)
- err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
+ err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errDecodingReceivedTransactionGroupsFailed, err)
+ s.incomingMessagesQ.shutdown()
+
+ peer := Peer{networkPeer: &s}
// error queue full
message.TransactionGroups = packedTransactionGroups{}
messageBytes = message.MarshalMsg(nil)
- err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
+ s.incomingMessagesQ = makeIncomingMessageQueue()
+ s.incomingMessagesQ.fillMessageQueue(incomingMessage{peer: &peer, networkPeer: &s.incomingMessagesQ})
+ mNodeConnector.peers = append(mNodeConnector.peers, PeerInfo{TxnSyncPeer: &peer, NetworkPeer: &s.incomingMessagesQ})
+ err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errTransactionSyncIncomingMessageQueueFull, err)
+ s.incomingMessagesQ.shutdown()
// Success where peer == nil
s.incomingMessagesQ = makeIncomingMessageQueue()
- err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
+ err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.NoError(t, err)
-
- peer := Peer{}
+ s.incomingMessagesQ.shutdown()
// error when placing the peer message on the main queue (incomingMessages cannot accept messages)
- s.incomingMessagesQ = incomingMessageQueue{}
- err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber)
+ s.incomingMessagesQ = makeIncomingMessageQueue()
+ s.incomingMessagesQ.fillMessageQueue(incomingMessage{peer: nil, networkPeer: &s})
+ mNodeConnector.peers = append(mNodeConnector.peers, PeerInfo{NetworkPeer: &s})
+
+ err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber, 0)
require.Equal(t, errTransactionSyncIncomingMessageQueueFull, err)
+ s.incomingMessagesQ.shutdown()
s.incomingMessagesQ = makeIncomingMessageQueue()
err = nil
// fill up the incoming message queue (one was already added)
for x := 1; x <= messageOrderingHeapLimit; x++ {
require.NoError(t, err)
- err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber)
+ err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber, 0)
}
require.Equal(t, errHeapReachedCapacity, err)
+ s.incomingMessagesQ.shutdown()
}
func TestEvaluateIncomingMessagePart1(t *testing.T) {
@@ -150,28 +163,29 @@ func TestEvaluateIncomingMessagePart1(t *testing.T) {
mNodeConnector.updatingPeers = false
s.incomingMessagesQ = makeIncomingMessageQueue()
- // Add a peer here, and make sure it is cleared
- s.incomingMessagesQ.enqueuedPeers[peer] = struct{}{}
+ defer s.incomingMessagesQ.shutdown()
+ message.peer = peer
+ require.True(t, s.incomingMessagesQ.enqueue(message))
mNodeConnector.peerInfo.TxnSyncPeer = peer
peer.incomingMessages = messageOrderingHeap{}
// TxnSyncPeer in peerInfo
s.evaluateIncomingMessage(message)
require.False(t, mNodeConnector.updatingPeers)
- _, found := s.incomingMessagesQ.enqueuedPeers[peer]
+ <-s.incomingMessagesQ.getIncomingMessageChannel()
+ _, found := s.incomingMessagesQ.enqueuedPeersMap[peer]
require.False(t, found)
- // fill the hip with messageOrderingHeapLimit elements so that the incomingMessages enqueue fails
+ // fill the heap with messageOrderingHeapLimit elements so that the incomingMessages enqueue fails
+ message.networkPeer = &s
+ message.peer = nil
for x := 0; x < messageOrderingHeapLimit; x++ {
err := peer.incomingMessages.enqueue(message)
require.NoError(t, err)
}
- // Add a peer here, and make sure it is not cleared after the error
- s.incomingMessagesQ.enqueuedPeers[peer] = struct{}{}
+ mNodeConnector.peers = []PeerInfo{{TxnSyncPeer: peer, NetworkPeer: &s}}
// TxnSyncPeer in peerInfo
s.evaluateIncomingMessage(message)
require.False(t, mNodeConnector.updatingPeers)
- _, found = s.incomingMessagesQ.enqueuedPeers[peer]
- require.True(t, found)
}
func TestEvaluateIncomingMessagePart2(t *testing.T) {
diff --git a/txnsync/interfaces.go b/txnsync/interfaces.go
index 976bdbc00..77ac07163 100644
--- a/txnsync/interfaces.go
+++ b/txnsync/interfaces.go
@@ -17,6 +17,8 @@
package txnsync
import (
+ "time"
+
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/pooldata"
"github.com/algorand/go-algorand/util/timers"
@@ -46,7 +48,7 @@ type Event struct {
}
// IncomingMessageHandler is the signature of the incoming message handler used by the transaction sync to receive network messages
-type IncomingMessageHandler func(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64) error
+type IncomingMessageHandler func(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64, receivedTimestamp int64) error
// SendMessageCallback define a message sent feedback for performing message tracking
type SendMessageCallback func(enqueued bool, sequenceNumber uint64) error
@@ -79,6 +81,7 @@ type NodeConnector interface {
// across all the connected peers.
UpdatePeers(txsyncPeers []*Peer, netPeers []interface{}, peersAverageDataExchangeRate uint64)
SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback)
+ GetPeerLatency(netPeer interface{}) time.Duration
// GetPendingTransactionGroups is called by the transaction sync when it needs to look into the transaction
// pool and get the updated set of pending transactions. The second returned argument is the latest locally originated
// group counter within the given transaction groups list. If there is no group that is locally originated, the expected
diff --git a/txnsync/mainloop.go b/txnsync/mainloop.go
index ca899bc9c..138206be0 100644
--- a/txnsync/mainloop.go
+++ b/txnsync/mainloop.go
@@ -70,7 +70,7 @@ type syncState struct {
scheduler peerScheduler
interruptablePeers []*Peer
interruptablePeersMap map[*Peer]int // map a peer into the index of interruptablePeers
- incomingMessagesQ incomingMessageQueue
+ incomingMessagesQ *incomingMessageQueue
outgoingMessagesCallbackCh chan sentMessageMetadata
nextOffsetRollingCh <-chan time.Time
requestsOffset uint64
@@ -105,6 +105,7 @@ func (s *syncState) mainloop(serviceCtx context.Context, wg *sync.WaitGroup) {
s.clock = s.node.Clock()
s.incomingMessagesQ = makeIncomingMessageQueue()
+ defer s.incomingMessagesQ.shutdown()
s.outgoingMessagesCallbackCh = make(chan sentMessageMetadata, 1024)
s.interruptablePeersMap = make(map[*Peer]int)
s.scheduler.node = s.node
@@ -308,6 +309,7 @@ func (s *syncState) onNewRoundEvent(ent Event) {
if !s.isRelay {
s.nextOffsetRollingCh = s.clock.TimeoutAt(kickoffTime + 2*s.lastBeta)
}
+ s.updatePeersLatency(peers)
s.updatePeersRequestParams(peers)
}
@@ -395,7 +397,7 @@ func (s *syncState) getPeers() (result []*Peer) {
// some of the network peers might not have a sync peer, so we need to create one for these.
for _, peerInfo := range peersInfo {
if peerInfo.TxnSyncPeer == nil {
- syncPeer := makePeer(peerInfo.NetworkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log)
+ syncPeer := makePeer(peerInfo.NetworkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log, s.node.GetPeerLatency(peerInfo.NetworkPeer))
peerInfo.TxnSyncPeer = syncPeer
updatedNetworkPeers = append(updatedNetworkPeers, peerInfo.NetworkPeer)
updatedNetworkPeersSync = append(updatedNetworkPeersSync, syncPeer)
@@ -435,3 +437,9 @@ func (s *syncState) updatePeersRequestParams(peers []*Peer) {
}
}
}
+
+func (s *syncState) updatePeersLatency(peers []*Peer) {
+ for _, peer := range peers {
+ peer.cachedLatency = s.node.GetPeerLatency(peer.networkPeer)
+ }
+}
diff --git a/txnsync/outgoing.go b/txnsync/outgoing.go
index 98697f311..fa18bd353 100644
--- a/txnsync/outgoing.go
+++ b/txnsync/outgoing.go
@@ -51,27 +51,35 @@ type sentMessageMetadata struct {
// could be a lengthy operation which does't need to be blocking the main loop. Moving the actual encoding into an
// execution pool thread frees up the main loop, allowing smoother operation.
type messageAsyncEncoder struct {
- state *syncState
- messageData sentMessageMetadata
- roundClock timers.WallClock
- peerDataExchangeRate uint64
+ state *syncState
+ messageData sentMessageMetadata
+ roundClock timers.WallClock
+ lastReceivedMessageTimestamp time.Duration
+ peerDataExchangeRate uint64
+ // sentMessagesCh is a copy of the outgoingMessagesCallbackCh in the syncState object. We want to create a copy of
+ // the channel so that in case of a txnsync restart ( i.e. fast catchup ), we can still generate a new channel
+ // without triggering a data race. The alternative is to block the txnsync.Shutdown() until we receive the feedback
+ // from the network library, but that could be susceptible to undesired network disconnections.
+ sentMessagesCh chan sentMessageMetadata
}
// asyncMessageSent called via the network package to inform the txsync that a message was enqueued, and the associated sequence number.
func (encoder *messageAsyncEncoder) asyncMessageSent(enqueued bool, sequenceNumber uint64) error {
if !enqueued {
encoder.state.log.Infof("unable to send message to peer. disconnecting from peer.")
+ encoder.state.incomingMessagesQ.erase(encoder.messageData.peer, encoder.messageData.peer.networkPeer)
return errTransactionSyncOutgoingMessageSendFailed
}
// record the sequence number here, so that we can store that later on.
encoder.messageData.sequenceNumber = sequenceNumber
select {
- case encoder.state.outgoingMessagesCallbackCh <- encoder.messageData:
+ case encoder.sentMessagesCh <- encoder.messageData:
return nil
default:
// if we can't place it on the channel, return an error so that the node could disconnect from this peer.
encoder.state.log.Infof("unable to enqueue outgoing message confirmation; outgoingMessagesCallbackCh is full. disconnecting from peer.")
+ encoder.state.incomingMessagesQ.erase(encoder.messageData.peer, encoder.messageData.peer.networkPeer)
return errTransactionSyncOutgoingMessageQueueFull
}
}
@@ -89,6 +97,12 @@ func (encoder *messageAsyncEncoder) asyncEncodeAndSend(interface{}) interface{}
encoder.messageData.transactionGroups = nil // clear out to allow GC to reclaim
}
+ if encoder.lastReceivedMessageTimestamp >= 0 {
+ // adding a nanosecond to the elapsed time is meaningless for the data rate calculation, but would ensure that
+ // the ResponseElapsedTime field has a clear distinction between "being set" vs. "not being set"
+ encoder.messageData.message.MsgSync.ResponseElapsedTime = uint64((encoder.roundClock.Since() - encoder.lastReceivedMessageTimestamp).Nanoseconds())
+ }
+
encodedMessage := encoder.messageData.message.MarshalMsg(getMessageBuffer())
encoder.messageData.encodedMessageSize = len(encodedMessage)
// now that the message is ready, we can discard the encoded transaction group slice to allow the GC to collect it.
@@ -136,9 +150,9 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D
pendingTransactions.pendingTransactionsGroups, pendingTransactions.latestLocallyOriginatedGroupCounter = s.node.GetPendingTransactionGroups()
profGetTxnsGroups.end()
for _, peer := range peers {
- msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate}
+ msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate, sentMessagesCh: s.outgoingMessagesCallbackCh}
profAssembleMessage.start()
- msgEncoder.messageData, assembledBloomFilter = s.assemblePeerMessage(peer, &pendingTransactions)
+ msgEncoder.messageData, assembledBloomFilter, msgEncoder.lastReceivedMessageTimestamp = s.assemblePeerMessage(peer, &pendingTransactions)
profAssembleMessage.end()
isPartialMessage := msgEncoder.messageData.partialMessage
// The message that we've just encoded is expected to be sent out with the next sequence number.
@@ -178,7 +192,7 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D
}
}
-func (s *syncState) assemblePeerMessage(peer *Peer, pendingTransactions *pendingTransactionGroupsSnapshot) (metaMessage sentMessageMetadata, assembledBloomFilter bloomFilter) {
+func (s *syncState) assemblePeerMessage(peer *Peer, pendingTransactions *pendingTransactionGroupsSnapshot) (metaMessage sentMessageMetadata, assembledBloomFilter bloomFilter, lastReceivedMessageTimestamp time.Duration) {
metaMessage = sentMessageMetadata{
peer: peer,
message: &transactionBlockMessage{
@@ -276,10 +290,10 @@ notxns:
}
metaMessage.message.MsgSync.RefTxnBlockMsgSeq = peer.nextReceivedMessageSeq - 1
+ // signify that timestamp is not set
+ lastReceivedMessageTimestamp = time.Duration(-1)
if peer.lastReceivedMessageTimestamp != 0 && peer.lastReceivedMessageLocalRound == s.round {
- // adding a nanosecond to the elapsed time is meaningless for the data rate calculation, but would ensure that
- // the ResponseElapsedTime field has a clear distinction between "being set" vs. "not being set"
- metaMessage.message.MsgSync.ResponseElapsedTime = uint64((s.clock.Since() - peer.lastReceivedMessageTimestamp).Nanoseconds()) + 1
+ lastReceivedMessageTimestamp = peer.lastReceivedMessageTimestamp
// reset the lastReceivedMessageTimestamp so that we won't be using that again on a subsequent outgoing message.
peer.lastReceivedMessageTimestamp = 0
}
diff --git a/txnsync/outgoing_test.go b/txnsync/outgoing_test.go
index 151c716b2..3136161ef 100644
--- a/txnsync/outgoing_test.go
+++ b/txnsync/outgoing_test.go
@@ -62,6 +62,8 @@ func TestAsyncMessageSent(t *testing.T) {
var s syncState
s.clock = timers.MakeMonotonicClock(time.Now())
s.log = mockAsyncLogger{}
+ s.incomingMessagesQ = makeIncomingMessageQueue()
+ defer s.incomingMessagesQ.shutdown()
asyncEncoder := messageAsyncEncoder{
state: &s,
@@ -72,7 +74,8 @@ func TestAsyncMessageSent(t *testing.T) {
},
peer: &Peer{},
},
- roundClock: timers.MakeMonotonicClock(time.Now()),
+ roundClock: timers.MakeMonotonicClock(time.Now()),
+ sentMessagesCh: s.outgoingMessagesCallbackCh,
}
oldTimestamp := asyncEncoder.messageData.sentTimestamp
@@ -83,11 +86,11 @@ func TestAsyncMessageSent(t *testing.T) {
a.Equal(asyncEncoder.messageData.sequenceNumber, uint64(1337))
// Make this buffered for now so we catch the select statement
- asyncEncoder.state.outgoingMessagesCallbackCh = make(chan sentMessageMetadata, 1)
+ asyncEncoder.sentMessagesCh = make(chan sentMessageMetadata, 1)
err = asyncEncoder.asyncMessageSent(true, 1337)
a.Nil(err)
- a.Equal(1, len(asyncEncoder.state.outgoingMessagesCallbackCh))
+ a.Equal(1, len(asyncEncoder.sentMessagesCh))
}
type mockAsyncNodeConnector struct {
@@ -286,14 +289,14 @@ func TestAssemblePeerMessage_messageConstBloomFilter(t *testing.T) {
peer.isOutgoing = true
peer.state = peerStateLateBloom
- metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
+ metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)
a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
a.Equal(metaMessage.peer, &peer)
a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
a.Equal(metaMessage.message.Round, s.round)
- a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0)
+ a.True(responseTime >= 0)
a.Equal(s.lastBloomFilter, expectedFilter)
}
@@ -329,14 +332,14 @@ func TestAssemblePeerMessage_messageConstBloomFilterNonRelay(t *testing.T) {
peer.isOutgoing = true
peer.state = peerStateLateBloom
- metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
+ metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)
a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
a.Equal(metaMessage.peer, &peer)
a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
a.Equal(metaMessage.message.Round, s.round)
- a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0)
+ a.True(responseTime >= 0)
a.NotEqual(s.lastBloomFilter, expectedFilter)
}
@@ -361,14 +364,14 @@ func TestAssemblePeerMessage_messageConstNextMinDelay_messageConstUpdateRequestP
s.isRelay = true
s.lastBeta = 123 * time.Nanosecond
- metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
+ metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)
a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
a.Equal(metaMessage.peer, &peer)
a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
a.Equal(metaMessage.message.Round, s.round)
- a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0)
+ a.True(responseTime >= 0)
a.Equal(metaMessage.message.MsgSync.NextMsgMinDelay, uint64(s.lastBeta.Nanoseconds())*2)
}
@@ -405,7 +408,7 @@ func TestAssemblePeerMessage_messageConstTransactions(t *testing.T) {
peer.isOutgoing = true
peer.state = peerStateHoldsoff
- metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
+ metaMessage, _, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
a.Equal(len(metaMessage.transactionGroups), 1)
a.True(reflect.DeepEqual(metaMessage.transactionGroups[0], pendingTransactions.pendingTransactionsGroups[0]))
diff --git a/txnsync/peer.go b/txnsync/peer.go
index 1aa04c574..b33ed0ed1 100644
--- a/txnsync/peer.go
+++ b/txnsync/peer.go
@@ -157,6 +157,8 @@ type Peer struct {
// dataExchangeRate is the combined upload/download rate in bytes/second
dataExchangeRate uint64
+ // cachedLatency is the measured network latency of a peer, updated every round
+ cachedLatency time.Duration
// these two fields describe "what does the local peer want the remote peer to send back"
localTransactionsModulator byte
@@ -265,12 +267,13 @@ func (t *transactionGroupCounterTracker) index(offset, modulator byte) int {
return -1
}
-func makePeer(networkPeer interface{}, isOutgoing bool, isLocalNodeRelay bool, cfg *config.Local, log Logger) *Peer {
+func makePeer(networkPeer interface{}, isOutgoing bool, isLocalNodeRelay bool, cfg *config.Local, log Logger, latency time.Duration) *Peer {
p := &Peer{
networkPeer: networkPeer,
isOutgoing: isOutgoing,
recentSentTransactions: makeTransactionCache(shortTermRecentTransactionsSentBufferLength, longTermRecentTransactionsSentBufferLength, pendingUnconfirmedRemoteMessages),
dataExchangeRate: defaultDataExchangeRate,
+ cachedLatency: latency,
transactionPoolAckCh: make(chan uint64, maxAcceptedMsgSeq),
transactionPoolAckMessages: make([]uint64, 0, maxAcceptedMsgSeq),
significantMessageThreshold: defaultSignificantMessageThreshold,
@@ -582,25 +585,25 @@ func (p *Peer) updateIncomingTransactionGroups(txnGroups []pooldata.SignedTxGrou
}
}
-func (p *Peer) updateIncomingMessageTiming(timings timingParams, currentRound basics.Round, currentTime time.Duration, incomingMessageSize int) {
+func (p *Peer) updateIncomingMessageTiming(timings timingParams, currentRound basics.Round, currentTime time.Duration, timeInQueue time.Duration, peerLatency time.Duration, incomingMessageSize int) {
p.lastConfirmedMessageSeqReceived = timings.RefTxnBlockMsgSeq
// if we received a message that references our previous message, see if they occurred on the same round
if p.lastConfirmedMessageSeqReceived == p.lastSentMessageSequenceNumber && p.lastSentMessageRound == currentRound && p.lastSentMessageTimestamp > 0 {
// if so, we might be able to calculate the bandwidth.
- timeSinceLastMessageWasSent := currentTime - p.lastSentMessageTimestamp
+ timeSinceLastMessageWasSent := currentTime - timeInQueue - p.lastSentMessageTimestamp
networkMessageSize := uint64(p.lastSentMessageSize + incomingMessageSize)
- if timings.ResponseElapsedTime != 0 && timeSinceLastMessageWasSent > time.Duration(timings.ResponseElapsedTime) && networkMessageSize >= p.significantMessageThreshold {
- networkTrasmitTime := timeSinceLastMessageWasSent - time.Duration(timings.ResponseElapsedTime)
+ if timings.ResponseElapsedTime != 0 && peerLatency > 0 && timeSinceLastMessageWasSent > time.Duration(timings.ResponseElapsedTime)+peerLatency && networkMessageSize >= p.significantMessageThreshold {
+ networkTrasmitTime := timeSinceLastMessageWasSent - time.Duration(timings.ResponseElapsedTime) - peerLatency
dataExchangeRate := uint64(time.Second) * networkMessageSize / uint64(networkTrasmitTime)
+ // clamp data exchange rate to realistic metrics
if dataExchangeRate < minDataExchangeRateThreshold {
dataExchangeRate = minDataExchangeRateThreshold
} else if dataExchangeRate > maxDataExchangeRateThreshold {
dataExchangeRate = maxDataExchangeRateThreshold
}
- // clamp data exchange rate to realistic metrics
- p.dataExchangeRate = dataExchangeRate
// fmt.Printf("incoming message : updating data exchange to %d; network msg size = %d+%d, transmit time = %v\n", dataExchangeRate, p.lastSentMessageSize, incomingMessageSize, networkTrasmitTime)
+ p.dataExchangeRate = dataExchangeRate
}
// given that we've (maybe) updated the data exchange rate, we need to clear out the lastSendMessage information
@@ -611,7 +614,7 @@ func (p *Peer) updateIncomingMessageTiming(timings timingParams, currentRound ba
p.lastSentMessageSize = 0
}
p.lastReceivedMessageLocalRound = currentRound
- p.lastReceivedMessageTimestamp = currentTime
+ p.lastReceivedMessageTimestamp = currentTime - timeInQueue
p.lastReceivedMessageSize = incomingMessageSize
p.lastReceivedMessageNextMsgMinDelay = time.Duration(timings.NextMsgMinDelay) * time.Nanosecond
p.recentSentTransactions.acknowledge(timings.AcceptedMsgSeq)
diff --git a/txnsync/peer_test.go b/txnsync/peer_test.go
index cc45cd9a4..fb18e7d22 100644
--- a/txnsync/peer_test.go
+++ b/txnsync/peer_test.go
@@ -267,7 +267,7 @@ func TestGetNextScheduleOffset(t *testing.T) {
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
if test.fxn != nil {
test.fxn(p)
}
@@ -398,7 +398,7 @@ func TestGetMessageConstructionOps(t *testing.T) {
log := wrapLogger(tlog, &config)
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
if test.fxn != nil {
test.fxn(p)
}
@@ -527,7 +527,7 @@ func TestAdvancePeerState(t *testing.T) {
log := wrapLogger(tlog, &config)
for i, test := range tests {
t.Run(string(rune(i)), func(t *testing.T) {
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
if test.fxn != nil {
test.fxn(p)
}
@@ -554,7 +554,7 @@ func TestUpdateIncomingMessageTiming(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
currentRound := basics.Round(1)
currentTime := time.Millisecond * 123
@@ -565,7 +565,7 @@ func TestUpdateIncomingMessageTiming(t *testing.T) {
p.lastConfirmedMessageSeqReceived = p.lastSentMessageSequenceNumber + 1
- p.updateIncomingMessageTiming(timing, currentRound, currentTime, currentMessageSize)
+ p.updateIncomingMessageTiming(timing, currentRound, currentTime, 0, time.Millisecond, currentMessageSize)
a.Equal(p.lastReceivedMessageLocalRound, currentRound)
a.Equal(p.lastReceivedMessageTimestamp, currentTime)
@@ -579,7 +579,7 @@ func TestUpdateIncomingMessageTiming(t *testing.T) {
timing.ResponseElapsedTime = 1
p.lastSentMessageTimestamp = 1 * time.Millisecond
currentMessageSize = maxDataExchangeRateThreshold + 1
- p.updateIncomingMessageTiming(timing, currentRound, currentTime, currentMessageSize)
+ p.updateIncomingMessageTiming(timing, currentRound, currentTime, 0, time.Millisecond, currentMessageSize)
a.Equal(uint64(maxDataExchangeRateThreshold), p.dataExchangeRate)
@@ -590,9 +590,20 @@ func TestUpdateIncomingMessageTiming(t *testing.T) {
p.lastSentMessageSize = 0
currentMessageSize = int(p.significantMessageThreshold)
currentTime = time.Millisecond * 1000
- p.updateIncomingMessageTiming(timing, currentRound, currentTime, currentMessageSize)
+ p.updateIncomingMessageTiming(timing, currentRound, currentTime, 0, time.Millisecond, currentMessageSize)
a.Equal(uint64(minDataExchangeRateThreshold), p.dataExchangeRate)
+
+ p.lastConfirmedMessageSeqReceived = p.lastSentMessageSequenceNumber
+ p.lastSentMessageRound = currentRound
+ timing.ResponseElapsedTime = uint64(time.Millisecond)
+ p.lastSentMessageTimestamp = 1 * time.Millisecond
+ p.lastSentMessageSize = 0
+ currentMessageSize = 100000
+ currentTime = time.Millisecond * 123
+ p.updateIncomingMessageTiming(timing, currentRound, currentTime, time.Millisecond, time.Millisecond*100, currentMessageSize)
+
+ a.Equal(uint64(5000000), p.dataExchangeRate)
}
// TestUpdateIncomingTransactionGroups tests updating the incoming transaction groups
@@ -624,7 +635,7 @@ func TestUpdateIncomingTransactionGroups(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
p.recentSentTransactions.reset()
@@ -643,7 +654,7 @@ func TestUpdateRequestParams(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
oldModulator := p.requestedTransactionsModulator
oldOffset := p.requestedTransactionsOffset
@@ -679,7 +690,7 @@ func TestAddIncomingBloomFilter(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
for i := 0; i < 2*maxIncomingBloomFilterHistory; i++ {
bf := &testableBloomFilter{
@@ -743,7 +754,7 @@ func TestSelectPendingTransactions(t *testing.T) {
log := wrapLogger(tlog, &config)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
if test.fxn != nil {
test.fxn(p)
}
@@ -810,7 +821,7 @@ func TestGetAcceptedMessages(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
var testList []uint64
chPtr := &p.transactionPoolAckCh
@@ -835,7 +846,7 @@ func TestDequeuePendingTransactionPoolAckMessages(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
ch := p.transactionPoolAckCh
var testList []uint64
@@ -880,7 +891,7 @@ func TestUpdateMessageSent(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
txMsg := &transactionBlockMessage{
Version: txnBlockMessageVersion,
@@ -917,10 +928,10 @@ func TestIncomingPeersOnly(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p1 := makePeer(nil, true, true, &config, log)
- p2 := makePeer(nil, true, false, &config, log)
- p3 := makePeer(nil, false, true, &config, log)
- p4 := makePeer(nil, false, false, &config, log)
+ p1 := makePeer(nil, true, true, &config, log, 0)
+ p2 := makePeer(nil, true, false, &config, log, 0)
+ p3 := makePeer(nil, false, true, &config, log, 0)
+ p4 := makePeer(nil, false, false, &config, log, 0)
peers := []*Peer{p1, p2, p3, p4}
@@ -939,7 +950,7 @@ func TestLocalRequestParams(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p := makePeer(nil, true, true, &config, log)
+ p := makePeer(nil, true, true, &config, log, 0)
p.setLocalRequestParams(256, 256)
offset, modulator := p.getLocalRequestParams()
@@ -962,7 +973,7 @@ func TestSimpleGetters(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p := makePeer(sentinelInterface, true, true, &config, log)
+ p := makePeer(sentinelInterface, true, true, &config, log, 0)
a.Equal(p.GetNetworkPeer(), sentinelInterface)
a.Equal(p.GetTransactionPoolAckChannel(), p.transactionPoolAckCh)
@@ -978,7 +989,7 @@ func TestMakePeer(t *testing.T) {
config := config.GetDefaultLocal()
tlog := logging.TestingLog(t)
log := wrapLogger(tlog, &config)
- p1 := makePeer(sentinelInterface, true, true, &config, log)
+ p1 := makePeer(sentinelInterface, true, true, &config, log, 0)
a.NotNil(p1)
a.Equal(p1.networkPeer, sentinelInterface)
@@ -988,7 +999,7 @@ func TestMakePeer(t *testing.T) {
a.Equal(p1.dataExchangeRate, uint64(defaultRelayToRelayDataExchangeRate))
// Check that we have different values if the local node relay is false
- p2 := makePeer(sentinelInterface, true, false, &config, log)
+ p2 := makePeer(sentinelInterface, true, false, &config, log, 0)
a.NotNil(p2)
a.Equal(p1.networkPeer, sentinelInterface)
diff --git a/txnsync/service_test.go b/txnsync/service_test.go
index 182f83539..2b262fc18 100644
--- a/txnsync/service_test.go
+++ b/txnsync/service_test.go
@@ -43,6 +43,7 @@ type mockNodeConnector struct {
peerInfo PeerInfo
updatingPeers bool
transactionPoolSize int
+ peers []PeerInfo
}
func makeMockNodeConnector(calledEvents *bool) mockNodeConnector {
@@ -69,7 +70,7 @@ func (fn *mockNodeConnector) Random(rng uint64) uint64 {
return rv % rng
}
-func (fn *mockNodeConnector) GetPeers() []PeerInfo { return nil }
+func (fn *mockNodeConnector) GetPeers() []PeerInfo { return fn.peers }
func (fn *mockNodeConnector) GetPeer(interface{}) (out PeerInfo) {
return fn.peerInfo
@@ -80,6 +81,11 @@ func (fn *mockNodeConnector) UpdatePeers(txsyncPeers []*Peer, netPeers []interfa
}
func (fn *mockNodeConnector) SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback) {
}
+
+func (fn *mockNodeConnector) GetPeerLatency(netPeer interface{}) time.Duration {
+ return 0
+}
+
func (fn *mockNodeConnector) GetPendingTransactionGroups() (txGroups []pooldata.SignedTxGroup, latestLocallyOriginatedGroupCounter uint64) {
return
}