diff options
author | John Lee <64482439+algojohnlee@users.noreply.github.com> | 2021-10-26 09:34:42 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-26 09:34:42 -0400 |
commit | 378816d0439cf4cf2647952eec68a4e24ad6fa06 (patch) | |
tree | 58de9505cfa4e951e0583fe62eaa5dd2beca5f25 | |
parent | 53cf0134c131bbbe51e689a149494cb90890a4b4 (diff) | |
parent | 615fee293d10582a886029a95974806a3d7bdcd6 (diff) |
Merge pull request #3139 from algorand/rel/beta-3.1.3
go-algorand v3.1.3-beta
33 files changed, 1178 insertions, 440 deletions
diff --git a/buildnumber.dat b/buildnumber.dat index 0cfbf0888..00750edc0 100644 --- a/buildnumber.dat +++ b/buildnumber.dat @@ -1 +1 @@ -2 +3 diff --git a/catchup/fetcher_test.go b/catchup/fetcher_test.go index 13b0c7fe2..4d62fdde0 100644 --- a/catchup/fetcher_test.go +++ b/catchup/fetcher_test.go @@ -23,6 +23,7 @@ import ( "net/url" "strings" "testing" + "time" "github.com/gorilla/mux" "github.com/stretchr/testify/require" @@ -291,6 +292,11 @@ func (p *testUnicastPeer) IsOutgoing() bool { return false } +// GetConnectionLatency returns the connection latency between the local node and this peer. +func (p *testUnicastPeer) GetConnectionLatency() time.Duration { + return time.Duration(0) +} + func (p *testUnicastPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, callback network.UnicastWebsocketMessageStateCallback) error { ps := p.gn.(*httpTestPeerSource) var dispather network.MessageHandler diff --git a/catchup/peerSelector_test.go b/catchup/peerSelector_test.go index 0147e1988..02d759b20 100644 --- a/catchup/peerSelector_test.go +++ b/catchup/peerSelector_test.go @@ -66,6 +66,11 @@ func (d *mockUnicastPeer) IsOutgoing() bool { return false } +// GetConnectionLatency returns the connection latency between the local node and this peer. +func (d *mockUnicastPeer) GetConnectionLatency() time.Duration { + return time.Duration(0) +} + func TestPeerAddress(t *testing.T) { partitiontest.PartitionTest(t) diff --git a/catchup/service.go b/catchup/service.go index 211b31608..f8f92b9b6 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -324,6 +324,9 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, if err != nil { switch err.(type) { + case ledgercore.ErrNonSequentialBlockEval: + s.log.Infof("fetchAndWrite(%d): no need to re-evaluate historical block", r) + return true case ledgercore.BlockInLedgerError: s.log.Infof("fetchAndWrite(%d): block already in ledger", r) return true diff --git a/config/config.go b/config/config.go index 71eaba88d..781ab73a8 100644 --- a/config/config.go +++ b/config/config.go @@ -63,7 +63,7 @@ type Local struct { // Version tracks the current version of the defaults so we can migrate old -> new // This is specifically important whenever we decide to change the default value // for an existing parameter. This field tag must be updated any time we add a new version. - Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17"` + Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18"` // environmental (may be overridden) // When enabled, stores blocks indefinitally, otherwise, only the most recents blocks @@ -84,7 +84,7 @@ type Local struct { MaxConnectionsPerIP int `version[3]:"30"` // 0 == disable - PeerPingPeriodSeconds int `version[0]:"0"` + PeerPingPeriodSeconds int `version[0]:"0" version[18]:"10"` // for https serving TLSCertFile string `version[0]:""` diff --git a/config/local_defaults.go b/config/local_defaults.go index c7a986bd7..de8090339 100644 --- a/config/local_defaults.go +++ b/config/local_defaults.go @@ -20,7 +20,7 @@ package config var defaultLocal = Local{ - Version: 17, + Version: 18, AccountUpdatesStatsInterval: 5000000000, AccountsRebuildSynchronousMode: 1, AnnounceParticipationKey: true, @@ -92,7 +92,7 @@ var defaultLocal = Local{ OutgoingMessageFilterBucketSize: 128, ParticipationKeysRefreshInterval: 60000000000, PeerConnectionsUpdateInterval: 3600, - PeerPingPeriodSeconds: 0, + PeerPingPeriodSeconds: 10, PriorityPeers: map[string]bool{}, PublicAddress: "", ReconnectTime: 60000000000, diff --git a/data/txHandler.go b/data/txHandler.go index 48fe7f7b2..0ecef433c 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -538,7 +538,7 @@ func (handler *solicitedAsyncTxHandler) loop(ctx context.Context) { // We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings err := handler.txHandler.net.Relay(ctx, protocol.TxnTag, reencode(txnGroup.Transactions), false, groups.networkPeer) if err != nil { - logging.Base().Infof("solicitedAsyncTxHandler was unable to relay transaction message : %v") + logging.Base().Infof("solicitedAsyncTxHandler was unable to relay transaction message : %v", err) break } } @@ -8,7 +8,7 @@ require ( github.com/algorand/graphtrace v0.0.0-20201117160756-e524ed1a6f64 github.com/algorand/msgp v1.1.48 github.com/algorand/oapi-codegen v1.3.5-algorand5 - github.com/algorand/websocket v1.4.2 + github.com/algorand/websocket v1.4.3 github.com/algorand/xorfilter v0.2.0 github.com/aws/aws-sdk-go v1.16.5 github.com/chrismcguire/gobberish v0.0.0-20150821175641-1d8adb509a0e @@ -10,8 +10,8 @@ github.com/algorand/msgp v1.1.48 h1:5P+gVmTnk0m37r+rA3ZsFZW219ZqmCLulW5f8Z+3nx8= github.com/algorand/msgp v1.1.48/go.mod h1:LtOntbYiCHj/Sl/Sqxtf8CZOrDt2a8Dv3tLaS6mcnUE= github.com/algorand/oapi-codegen v1.3.5-algorand5 h1:y576Ca2/guQddQrQA7dtL5KcOx5xQgPeIupiuFMGyCI= github.com/algorand/oapi-codegen v1.3.5-algorand5/go.mod h1:/k0Ywn0lnt92uBMyE+yiRf/Wo3/chxHHsAfenD09EbY= -github.com/algorand/websocket v1.4.2 h1:zMB7ukz+c7tcef8rVqmKQTv6KQtxXtCFuiAqKaE7n9I= -github.com/algorand/websocket v1.4.2/go.mod h1:0nFSn+xppw/GZS9hgWPS3b8/4FcA3Pj7XQxm+wqHGx8= +github.com/algorand/websocket v1.4.3 h1:8YiA+ZtwqAyg0K30lQyl7gUdKUArYXvBtd/cTFwA4uQ= +github.com/algorand/websocket v1.4.3/go.mod h1:0nFSn+xppw/GZS9hgWPS3b8/4FcA3Pj7XQxm+wqHGx8= github.com/algorand/xorfilter v0.2.0 h1:YC31ANxdZ2jmtbwqv1+USskVSqjkeiRZcQGc6//ro9Q= github.com/algorand/xorfilter v0.2.0/go.mod h1:f5cJsYrFbJhXkbjnV4odJB44np05/PvwvdBnABnQoUs= github.com/aws/aws-sdk-go v1.16.5 h1:NVxzZXIuwX828VcJrpNxxWjur1tlOBISdMdDdHIKHcc= diff --git a/installer/config.json.example b/installer/config.json.example index aa80a6dad..b1a977e64 100644 --- a/installer/config.json.example +++ b/installer/config.json.example @@ -1,5 +1,5 @@ { - "Version": 17, + "Version": 18, "AccountUpdatesStatsInterval": 5000000000, "AccountsRebuildSynchronousMode": 1, "AnnounceParticipationKey": true, @@ -71,7 +71,7 @@ "OutgoingMessageFilterBucketSize": 128, "ParticipationKeysRefreshInterval": 60000000000, "PeerConnectionsUpdateInterval": 3600, - "PeerPingPeriodSeconds": 0, + "PeerPingPeriodSeconds": 10, "PriorityPeers": {}, "PublicAddress": "", "ReconnectTime": 60000000000, diff --git a/network/latencyTracker.go b/network/latencyTracker.go new file mode 100644 index 000000000..ff503ddb2 --- /dev/null +++ b/network/latencyTracker.go @@ -0,0 +1,170 @@ +// Copyright (C) 2019-2021 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package network + +import ( + "errors" + "net" + "strconv" + "sync/atomic" + "time" + + "github.com/algorand/websocket" + + "github.com/algorand/go-deadlock" + + "github.com/algorand/go-algorand/config" +) + +const pongMessageWriteDuration = time.Second +const pingMessageWriteDuration = time.Second + +var errInvalidPongMessageContent = errors.New("invalid pong message content") +var errInvalidPingMessageContent = errors.New("invalid ping message content") + +// latencyTracker works in conjunction with the wspeer in measuring the +// communication latency over the websocket connection. +type latencyTracker struct { + // receivedPacketCounter is a counter for all incoming messages + // placed here to be aligned with 64bit address. + receivedPacketCounter uint64 + + // latency is the effective latency of the connection. + // placed here to be aligned with 64bit address. + latency int64 + + // lastPingSentTime is the timestamp at which we last sent a message. + // this variable is only touched by checkPingSending, and therefore doesn't + // need to be syncronized. The "clone" of this variable lastPingSentTimeSynced, + // is being used by both the checkPingSending as well as by the pongHandler + // and therefore require synchronization. + lastPingSentTime int64 + + // static variables + // ( doesn't get changed after init, hence, no synchronization needed ) + + // conn is the underlying connection object. + conn wsPeerWebsocketConn + + // enabled indicates whether the pingpong is currently enabled or not. + enabled bool + + // pingInterval is the max interval at which the client would send ping messages. + pingInterval time.Duration + + // lastPingMu synchronize the protected variables that might be modified across + // the checkPingSending and the pongHandler. All the variable below this point + // need to be syncronized with the mutex. + lastPingMu deadlock.Mutex + + // lastPingID is the last ping ID, a monotonic growing number used to ensure + // that the pong message we've receive corresponds to the latest ping message + // that we've sent. + lastPingID uint64 + + // lastPingReceivedCounter stores message counter at the time we sent the ping. + // In order to ensure the timing accuracy, we want to have no other messages + // being exchanged. This, of course, would only delay the ping-pong until a + // better measurement could be taken. + lastPingReceivedCounter uint64 + + // lastPingSentTimeSynced, as stated above, is the syncronized version of lastPingSentTime. + // it is used only in the case where we end up sending the ping message. + lastPingSentTimeSynced int64 +} + +func (lt *latencyTracker) init(conn wsPeerWebsocketConn, cfg config.Local, initialConnectionLatency time.Duration) { + lt.conn = conn + lt.enabled = cfg.PeerPingPeriodSeconds > 0 && cfg.EnablePingHandler + lt.latency = int64(initialConnectionLatency) + lt.pingInterval = time.Duration(cfg.PeerPingPeriodSeconds) * time.Second + conn.SetPingHandler(lt.pingHandler) + conn.SetPongHandler(lt.pongHandler) +} + +func (lt *latencyTracker) pingHandler(message string) error { + if _, err := strconv.Atoi(message); err != nil { + return errInvalidPingMessageContent + } + err := lt.conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(pongMessageWriteDuration)) + if err == websocket.ErrCloseSent { + return nil + } else if e, ok := err.(net.Error); ok && e.Temporary() { + return nil + } + return err +} + +func (lt *latencyTracker) pongHandler(message string) error { + pongID, err := strconv.Atoi(message) + if err != nil { + return errInvalidPongMessageContent + } + + lt.lastPingMu.Lock() + defer lt.lastPingMu.Unlock() + + if uint64(pongID) != lt.lastPingID { + // we've sent more than one ping since; ignore this message. + return nil + } + if lt.receivedPacketCounter != lt.lastPingReceivedCounter { + // we've received other messages since the one that we sent. The timing + // here would not be accurate. + return nil + } + lastPingSentTime := time.Unix(0, lt.lastPingSentTimeSynced) + roundtripDuration := time.Since(lastPingSentTime) + atomic.StoreInt64(<.latency, roundtripDuration.Nanoseconds()) + return nil +} + +func (lt *latencyTracker) getConnectionLatency() time.Duration { + return time.Duration(atomic.LoadInt64(<.latency)) +} + +func (lt *latencyTracker) checkPingSending(now *time.Time) error { + if !lt.enabled { + return nil + } + if now.Sub(time.Unix(0, lt.lastPingSentTime)) < lt.pingInterval { + return nil + } + + // it looks like it's time to send a ping : + lt.lastPingMu.Lock() + defer lt.lastPingMu.Unlock() + + lt.lastPingID++ + err := lt.conn.WriteControl(websocket.PingMessage, []byte(strconv.Itoa(int(lt.lastPingID))), now.Add(pingMessageWriteDuration)) + if err == websocket.ErrCloseSent { + return nil + } else if e, ok := err.(net.Error); ok && e.Temporary() { + return nil + } + if err != nil { + return err + } + lt.lastPingSentTimeSynced = now.UnixNano() + lt.lastPingReceivedCounter = atomic.LoadUint64(<.receivedPacketCounter) + lt.lastPingSentTime = lt.lastPingSentTimeSynced + return nil +} + +func (lt *latencyTracker) increaseReceivedCounter() { + atomic.AddUint64(<.receivedPacketCounter, 1) +} diff --git a/network/latencyTracker_test.go b/network/latencyTracker_test.go new file mode 100644 index 000000000..e7a62c1d7 --- /dev/null +++ b/network/latencyTracker_test.go @@ -0,0 +1,121 @@ +// Copyright (C) 2019-2021 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package network + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/test/partitiontest" +) + +func TestLatencyTracker(t *testing.T) { + partitiontest.PartitionTest(t) + + netA := makeTestFilterWebsocketNode(t, "a") + netA.config.GossipFanout = 1 + netA.config.PeerPingPeriodSeconds = 2 + netA.Start() + defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + + netB := makeTestFilterWebsocketNode(t, "b") + netB.config.GossipFanout = 1 + addrA, postListen := netA.Address() + require.True(t, postListen) + t.Log(addrA) + netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) + + netB.Start() + defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + counter := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})} + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) + debugTag2 := protocol.ProposalPayloadTag + counter2 := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})} + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: debugTag2, MessageHandler: counter2}}) + + readyTimeout := time.NewTimer(2 * time.Second) + waitReady(t, netA, readyTimeout.C) + waitReady(t, netB, readyTimeout.C) + + msg := make([]byte, 200) + rand.Read(msg) + var lastMsgTime time.Time + + var connLatencyInitialA time.Duration + // wait for up to 20 seconds for the network latency to be established. + startTime := time.Now() + for { + if time.Since(lastMsgTime) > 100*time.Millisecond { + netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil) + lastMsgTime = time.Now() + } + + connLatencyA := netA.peers[0].GetConnectionLatency() + if connLatencyA == time.Duration(0) { + require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds()) + time.Sleep(time.Millisecond) + continue + } + require.LessOrEqual(t, connLatencyA.Nanoseconds(), (20 * time.Second).Nanoseconds()) + connLatencyInitialA = connLatencyA + break + } + + // wait for up to 20 seconds for the network latency to be established. + startTime = time.Now() + lastMsgTime = time.Time{} + for { + if time.Since(lastMsgTime) > 100*time.Millisecond { + netB.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil) + lastMsgTime = time.Now() + } + + connLatencyB := netB.peers[0].GetConnectionLatency() + if connLatencyB == time.Duration(0) { + require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds()) + time.Sleep(time.Millisecond) + continue + } + require.LessOrEqual(t, connLatencyB.Nanoseconds(), (20 * time.Second).Nanoseconds()) + break + } + + // send the given message until we get a different latency. + // wait for up to 20 seconds for the network latency to be established. + startTime = time.Now() + lastMsgTime = time.Time{} + for { + if time.Since(lastMsgTime) > 100*time.Millisecond { + netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil) + lastMsgTime = time.Now() + } + + connLatencyA := netA.peers[0].GetConnectionLatency() + if connLatencyA != connLatencyInitialA { + require.NotEqual(t, connLatencyA.Nanoseconds(), int64(0)) + waitTime := time.Since(lastMsgTime) + require.Less(t, waitTime.Seconds(), float64(netA.config.PeerPingPeriodSeconds*2)) + break + } + time.Sleep(time.Millisecond) + } +} diff --git a/network/ping.go b/network/ping.go deleted file mode 100644 index 02915b49f..000000000 --- a/network/ping.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (C) 2019-2021 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. - -package network - -import ( - "bytes" - "context" - "time" - - "github.com/algorand/go-algorand/crypto" - "github.com/algorand/go-algorand/protocol" -) - -func pingHandler(message IncomingMessage) OutgoingMessage { - if len(message.Data) > 8 { - return OutgoingMessage{} - } - message.Net.(*WebsocketNetwork).log.Debugf("ping from peer %#v", message.Sender.(*wsPeer).wsPeerCore) - peer := message.Sender.(*wsPeer) - tbytes := []byte(protocol.PingReplyTag) - mbytes := make([]byte, len(tbytes)+len(message.Data)) - copy(mbytes, tbytes) - copy(mbytes[len(tbytes):], message.Data) - var digest crypto.Digest // leave blank, ping message too short - peer.writeNonBlock(context.Background(), mbytes, false, digest, time.Now(), nil) - return OutgoingMessage{} -} - -func pingReplyHandler(message IncomingMessage) OutgoingMessage { - log := message.Net.(*WebsocketNetwork).log - now := time.Now() - peer := message.Sender.(*wsPeer) - peer.pingLock.Lock() - defer peer.pingLock.Unlock() - if !peer.pingInFlight { - log.Infof("ping reply with non in flight from %s", peer.rootURL) - return OutgoingMessage{} - } - if len(peer.pingData) != len(message.Data) { - log.Infof("ping reply with wrong length want %d got %d, from %s", len(peer.pingData), len(message.Data), peer.rootURL) - return OutgoingMessage{} - } - if 0 != bytes.Compare(peer.pingData, message.Data) { - log.Infof("ping reply with wrong data from %s", peer.rootURL) - return OutgoingMessage{} - } - peer.pingInFlight = false - peer.lastPingRoundTripTime = now.Sub(peer.pingSent) - log.Debugf("ping returned in %s from %s", peer.lastPingRoundTripTime, message.Sender.(*wsPeer).rootURL) - return OutgoingMessage{} -} - -var pingHandlers = []TaggedMessageHandler{ - {protocol.PingTag, HandlerFunc(pingHandler)}, - {protocol.PingReplyTag, HandlerFunc(pingReplyHandler)}, -} diff --git a/network/ping_test.go b/network/ping_test.go deleted file mode 100644 index 85b1ef2c3..000000000 --- a/network/ping_test.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (C) 2019-2021 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. - -package network - -import ( - "testing" - "time" - - "github.com/algorand/go-algorand/test/partitiontest" - "github.com/stretchr/testify/require" -) - -// for two node network, check that B can ping A and get a reply -func TestPing(t *testing.T) { - partitiontest.PartitionTest(t) - - netA := makeTestWebsocketNode(t) - netA.config.GossipFanout = 1 - netA.config.PeerPingPeriodSeconds = 5 - netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() - netB := makeTestWebsocketNode(t) - netB.config.GossipFanout = 1 - netB.config.PeerPingPeriodSeconds = 5 - addrA, postListen := netA.Address() - require.True(t, postListen) - t.Log(addrA) - netB.phonebook = MakePhonebook(1, 1*time.Millisecond) - netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) - netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() - - readyTimeout := time.NewTimer(2 * time.Second) - waitReady(t, netA, readyTimeout.C) - t.Log("a ready") - waitReady(t, netB, readyTimeout.C) - t.Log("b ready") - - bpeers := netB.GetPeers(PeersConnectedOut) - require.Equal(t, 1, len(bpeers)) - - peer := bpeers[0].(*wsPeer) - prePing := time.Now() - peer.sendPing() - const waitStep = 10 * time.Millisecond - for i := 1; i <= 100; i++ { - time.Sleep(waitStep) - _, lastPingRoundTripTime := peer.pingTimes() - if lastPingRoundTripTime > 0 { - postPing := time.Now() - testTime := postPing.Sub(prePing) - if lastPingRoundTripTime < testTime { - // success - return - } - t.Fatalf("ping came back with bogus time %s after %s test waiting", lastPingRoundTripTime, testTime) - } - } - t.FailNow() -} diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 139bcc487..1a86eadf0 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "io/ioutil" - "math" "net" "net/http" "net/textproto" @@ -31,7 +30,6 @@ import ( "path" "regexp" "runtime" - "sort" "strconv" "strings" "sync" @@ -789,9 +787,6 @@ func (wn *WebsocketNetwork) Start() { wn.scheme = "http" } wn.meshUpdateRequests <- meshRequest{false, nil} - if wn.config.EnablePingHandler { - wn.RegisterHandlers(pingHandlers) - } if wn.prioScheme != nil { wn.RegisterHandlers(prioHandlers) } @@ -801,10 +796,7 @@ func (wn *WebsocketNetwork) Start() { } wn.wg.Add(1) go wn.meshThread() - if wn.config.PeerPingPeriodSeconds > 0 { - wn.wg.Add(1) - go wn.pingThread() - } + // we shouldn't have any ticker here.. but in case we do - just stop it. if wn.peersConnectivityCheckTicker != nil { wn.peersConnectivityCheckTicker.Stop() @@ -1785,81 +1777,6 @@ func (wn *WebsocketNetwork) prioWeightRefresh() { } } -// Wake up the thread to do work this often. -const pingThreadPeriod = 30 * time.Second - -// If ping stats are older than this, don't include in metrics. -const maxPingAge = 30 * time.Minute - -// pingThread wakes up periodically to refresh the ping times on peers and update the metrics gauges. -func (wn *WebsocketNetwork) pingThread() { - defer wn.wg.Done() - ticker := time.NewTicker(pingThreadPeriod) - defer ticker.Stop() - for { - select { - case <-ticker.C: - case <-wn.ctx.Done(): - return - } - sendList := wn.peersToPing() - wn.log.Debugf("ping %d peers...", len(sendList)) - for _, peer := range sendList { - if !peer.sendPing() { - // if we failed to send a ping, see how long it was since last successful ping. - lastPingSent, _ := peer.pingTimes() - wn.log.Infof("failed to ping to %v for the past %f seconds", peer, time.Now().Sub(lastPingSent).Seconds()) - } - } - } -} - -// Walks list of peers, gathers list of peers to ping, also calculates statistics. -func (wn *WebsocketNetwork) peersToPing() []*wsPeer { - wn.peersLock.RLock() - defer wn.peersLock.RUnlock() - // Never flood outbound traffic by trying to ping all the peers at once. - // Send to at most one fifth of the peers. - maxSend := 1 + (len(wn.peers) / 5) - out := make([]*wsPeer, 0, maxSend) - now := time.Now() - // a list to sort to find median - times := make([]float64, 0, len(wn.peers)) - var min = math.MaxFloat64 - var max float64 - var sum float64 - pingPeriod := time.Duration(wn.config.PeerPingPeriodSeconds) * time.Second - for _, peer := range wn.peers { - lastPingSent, lastPingRoundTripTime := peer.pingTimes() - sendToNow := now.Sub(lastPingSent) - if (sendToNow > pingPeriod) && (len(out) < maxSend) { - out = append(out, peer) - } - if (lastPingRoundTripTime > 0) && (sendToNow < maxPingAge) { - ftime := lastPingRoundTripTime.Seconds() - sum += ftime - times = append(times, ftime) - if ftime < min { - min = ftime - } - if ftime > max { - max = ftime - } - } - } - if len(times) != 0 { - sort.Float64s(times) - median := times[len(times)/2] - medianPing.Set(median, nil) - mean := sum / float64(len(times)) - meanPing.Set(mean, nil) - minPing.Set(min, nil) - maxPing.Set(max, nil) - wn.log.Infof("ping times min=%f mean=%f median=%f max=%f", min, mean, median, max) - } - return out -} - func (wn *WebsocketNetwork) getDNSAddrs(dnsBootstrap string) (relaysAddresses []string, archiverAddresses []string) { var err error relaysAddresses, err = tools_network.ReadFromSRV("algobootstrap", "tcp", dnsBootstrap, wn.config.FallbackDNSResolverAddress, wn.config.DNSSecuritySRVEnforced()) diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 6571f5b9e..1a5a3d427 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -614,6 +614,12 @@ func (nc *nopConn) SetReadLimit(limit int64) { func (nc *nopConn) CloseWithoutFlush() error { return nil } +func (nc *nopConn) SetPingHandler(h func(appData string) error) { + +} +func (nc *nopConn) SetPongHandler(h func(appData string) error) { + +} var nopConnSingleton = nopConn{} diff --git a/network/wsPeer.go b/network/wsPeer.go index 75e7a9633..c661629aa 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -90,6 +90,8 @@ type wsPeerWebsocketConn interface { WriteControl(int, []byte, time.Time) error SetReadLimit(int64) CloseWithoutFlush() error + SetPingHandler(h func(appData string) error) + SetPongHandler(h func(appData string) error) } type sendMessage struct { @@ -137,7 +139,7 @@ type wsPeer struct { // lastPacketTime contains the UnixNano at the last time a successful communication was made with the peer. // "successful communication" above refers to either reading from or writing to a connection without receiving any // error. - // we want this to be a 64-bit aligned for atomics. + // we want this to be a 64-bit aligned for atomics support on 32bit platforms. lastPacketTime int64 // intermittentOutgoingMessageEnqueueTime contains the UnixNano of the message's enqueue time that is currently being written to the @@ -173,11 +175,7 @@ type wsPeer struct { processed chan struct{} - pingLock deadlock.Mutex - pingSent time.Time - pingData []byte - pingInFlight bool - lastPingRoundTripTime time.Duration + latencyTracker latencyTracker // Hint about position in wn.peers. Definitely valid if the peer // is present in wn.peers. @@ -250,6 +248,7 @@ type UnicastPeer interface { Request(ctx context.Context, tag Tag, topics Topics) (resp *Response, e error) Respond(ctx context.Context, reqMsg IncomingMessage, topics Topics) (e error) IsOutgoing() bool + GetConnectionLatency() time.Duration } // Create a wsPeerCore object @@ -285,6 +284,11 @@ func (wp *wsPeer) IsOutgoing() bool { return wp.outgoing } +// GetConnectionLatency returns the connection latency between the local node and this peer. +func (wp *wsPeer) GetConnectionLatency() time.Duration { + return wp.latencyTracker.getConnectionLatency() +} + // Unicast sends the given bytes to this specific peer. Does not wait for message to be sent. // (Implements UnicastPeer) func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, callback UnicastWebsocketMessageStateCallback) error { @@ -378,6 +382,13 @@ func (wp *wsPeer) init(config config.Local, sendBufferLength int) { wp.sendMessageTag = txSendMsgTags } + wp.latencyTracker.init(wp.conn, config, time.Duration(0)) + // send a ping right away. + now := time.Now() + if err := wp.latencyTracker.checkPingSending(&now); err != nil { + wp.net.log.Infof("failed to send ping message to peer : %v", err) + } + wp.wg.Add(2) go wp.readLoop() go wp.writeLoop() @@ -446,6 +457,7 @@ func (wp *wsPeer) readLoop() { wp.reportReadErr(err) return } + wp.latencyTracker.increaseReceivedCounter() msg.processing = wp.processed msg.Received = time.Now().UnixNano() msg.Data = slurper.Bytes() @@ -639,7 +651,8 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { } // check if this message was waiting in the queue for too long. If this is the case, return "true" to indicate that we want to close the connection. - msgWaitDuration := time.Now().Sub(msg.enqueued) + now := time.Now() + msgWaitDuration := now.Sub(msg.enqueued) if msgWaitDuration > maxMessageQueueDuration { wp.net.log.Warnf("peer stale enqueued message %dms", msgWaitDuration.Nanoseconds()/1000000) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "stale message"}) @@ -651,6 +664,12 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { } return disconnectStaleWrite } + + // is it time to send a ping message ? + if err := wp.latencyTracker.checkPingSending(&now); err != nil { + wp.net.log.Infof("failed to send ping message to peer : %v", err) + } + atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, msg.enqueued.UnixNano()) defer atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, 0) err := wp.conn.WriteMessage(websocket.BinaryMessage, msg.data) @@ -772,42 +791,6 @@ func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio return false } -const pingLength = 8 -const maxPingWait = 60 * time.Second - -// sendPing sends a ping block to the peer. -// return true if either a ping request was enqueued or there is already ping request in flight in the past maxPingWait time. -func (wp *wsPeer) sendPing() bool { - wp.pingLock.Lock() - defer wp.pingLock.Unlock() - now := time.Now() - if wp.pingInFlight && (now.Sub(wp.pingSent) < maxPingWait) { - return true - } - - tagBytes := []byte(protocol.PingTag) - mbytes := make([]byte, len(tagBytes)+pingLength) - copy(mbytes, tagBytes) - crypto.RandBytes(mbytes[len(tagBytes):]) - wp.pingData = mbytes[len(tagBytes):] - sent := wp.writeNonBlock(context.Background(), mbytes, false, crypto.Digest{}, time.Now(), nil) // todo : we might want to use the callback function to figure a more precise sending time. - - if sent { - wp.pingInFlight = true - wp.pingSent = now - } - return sent -} - -// get some times out of the peer while observing the ping data lock -func (wp *wsPeer) pingTimes() (lastPingSent time.Time, lastPingRoundTripTime time.Duration) { - wp.pingLock.Lock() - defer wp.pingLock.Unlock() - lastPingSent = wp.pingSent - lastPingRoundTripTime = wp.lastPingRoundTripTime - return -} - // called when the connection had an error or closed remotely func (wp *wsPeer) internalClose(reason disconnectReason) { if atomic.CompareAndSwapInt32(&wp.didSignalClose, 0, 1) { diff --git a/node/txnSyncConn.go b/node/txnSyncConn.go index 1a88b7998..00f5607ea 100644 --- a/node/txnSyncConn.go +++ b/node/txnSyncConn.go @@ -156,6 +156,11 @@ func (tsnc *transactionSyncNodeConnector) SendPeerMessage(netPeer interface{}, m } } +func (tsnc *transactionSyncNodeConnector) GetPeerLatency(netPeer interface{}) time.Duration { + unicastPeer := netPeer.(network.UnicastPeer) + return unicastPeer.GetConnectionLatency() +} + // GetPendingTransactionGroups is called by the transaction sync when it needs to look into the transaction // pool and get the updated set of pending transactions. The second returned argument is the latest locally originated // group counter within the given transaction groups list. If there is no group that is locally originated, the expected @@ -215,7 +220,7 @@ func (tsnc *transactionSyncNodeConnector) Handle(raw network.IncomingMessage) ne peer = peerData.(*txnsync.Peer) } - err := tsnc.messageHandler(raw.Sender, peer, raw.Data, raw.Sequence) + err := tsnc.messageHandler(raw.Sender, peer, raw.Data, raw.Sequence, raw.Received) if err != nil { return network.OutgoingMessage{ Action: network.Disconnect, diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go index 5b16ae87b..38d07aba1 100644 --- a/rpcs/blockService_test.go +++ b/rpcs/blockService_test.go @@ -53,6 +53,11 @@ func (mup *mockUnicastPeer) Version() string { func (mup *mockUnicastPeer) IsOutgoing() bool { return false } + +// GetConnectionLatency returns the connection latency between the local node and this peer. +func (mup *mockUnicastPeer) GetConnectionLatency() time.Duration { + return time.Duration(0) +} func (mup *mockUnicastPeer) Request(ctx context.Context, tag network.Tag, topics network.Topics) (resp *network.Response, e error) { return nil, nil } diff --git a/test/testdata/configs/config-v18.json b/test/testdata/configs/config-v18.json new file mode 100644 index 000000000..b1a977e64 --- /dev/null +++ b/test/testdata/configs/config-v18.json @@ -0,0 +1,96 @@ +{ + "Version": 18, + "AccountUpdatesStatsInterval": 5000000000, + "AccountsRebuildSynchronousMode": 1, + "AnnounceParticipationKey": true, + "Archival": false, + "BaseLoggerDebugLevel": 4, + "BlockServiceCustomFallbackEndpoints": "", + "BroadcastConnectionsLimit": -1, + "CadaverSizeTarget": 1073741824, + "CatchpointFileHistoryLength": 365, + "CatchpointInterval": 10000, + "CatchpointTracking": 0, + "CatchupBlockDownloadRetryAttempts": 1000, + "CatchupBlockValidateMode": 0, + "CatchupFailurePeerRefreshRate": 10, + "CatchupGossipBlockFetchTimeoutSec": 4, + "CatchupHTTPBlockFetchTimeoutSec": 4, + "CatchupLedgerDownloadRetryAttempts": 50, + "CatchupParallelBlocks": 16, + "ConnectionsRateLimitingCount": 60, + "ConnectionsRateLimitingWindowSeconds": 1, + "DNSBootstrapID": "<network>.algorand.network", + "DNSSecurityFlags": 1, + "DeadlockDetection": 0, + "DisableLocalhostConnectionRateLimit": true, + "DisableNetworking": false, + "DisableOutgoingConnectionThrottling": false, + "EnableAccountUpdatesStats": false, + "EnableAgreementReporting": false, + "EnableAgreementTimeMetrics": false, + "EnableAssembleStats": false, + "EnableBlockService": false, + "EnableBlockServiceFallbackToArchiver": true, + "EnableCatchupFromArchiveServers": false, + "EnableDeveloperAPI": false, + "EnableGossipBlockService": true, + "EnableIncomingMessageFilter": false, + "EnableLedgerService": false, + "EnableMetricReporting": false, + "EnableOutgoingNetworkMessageFiltering": true, + "EnablePingHandler": true, + "EnableProcessBlockStats": false, + "EnableProfiler": false, + "EnableRequestLogger": false, + "EnableTopAccountsReporting": false, + "EnableVerbosedTransactionSyncLogging": false, + "EndpointAddress": "127.0.0.1:0", + "FallbackDNSResolverAddress": "", + "ForceFetchTransactions": false, + "ForceRelayMessages": false, + "GossipFanout": 4, + "IncomingConnectionsLimit": 800, + "IncomingMessageFilterBucketCount": 5, + "IncomingMessageFilterBucketSize": 512, + "IsIndexerActive": false, + "LedgerSynchronousMode": 2, + "LogArchiveMaxAge": "", + "LogArchiveName": "node.archive.log", + "LogSizeLimit": 1073741824, + "MaxCatchpointDownloadDuration": 7200000000000, + "MaxConnectionsPerIP": 30, + "MinCatchpointFileDownloadBytesPerSecond": 20480, + "NetAddress": "", + "NetworkMessageTraceServer": "", + "NetworkProtocolVersion": "", + "NodeExporterListenAddress": ":9100", + "NodeExporterPath": "./node_exporter", + "OptimizeAccountsDatabaseOnStartup": false, + "OutgoingMessageFilterBucketCount": 3, + "OutgoingMessageFilterBucketSize": 128, + "ParticipationKeysRefreshInterval": 60000000000, + "PeerConnectionsUpdateInterval": 3600, + "PeerPingPeriodSeconds": 10, + "PriorityPeers": {}, + "PublicAddress": "", + "ReconnectTime": 60000000000, + "ReservedFDs": 256, + "RestReadTimeoutSeconds": 15, + "RestWriteTimeoutSeconds": 120, + "RunHosted": false, + "SuggestedFeeBlockHistory": 3, + "SuggestedFeeSlidingWindowSize": 50, + "TLSCertFile": "", + "TLSKeyFile": "", + "TelemetryToLog": true, + "TransactionSyncDataExchangeRate": 0, + "TransactionSyncSignificantMessageThreshold": 0, + "TxPoolExponentialIncreaseFactor": 2, + "TxPoolSize": 15000, + "TxSyncIntervalSeconds": 60, + "TxSyncServeResponseSize": 1000000, + "TxSyncTimeoutSeconds": 30, + "UseXForwardedForAddressField": "", + "VerifiedTranscationsCacheSize": 30000 +} diff --git a/txnsync/bloomFilter_test.go b/txnsync/bloomFilter_test.go index 71a0d4151..57a1635fc 100644 --- a/txnsync/bloomFilter_test.go +++ b/txnsync/bloomFilter_test.go @@ -20,6 +20,7 @@ import ( "encoding/binary" "math/rand" "testing" + "time" "github.com/stretchr/testify/require" @@ -357,6 +358,11 @@ func (fn *justRandomFakeNode) UpdatePeers(txsyncPeers []*Peer, netPeers []interf } func (fn *justRandomFakeNode) SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback) { } + +func (fn *justRandomFakeNode) GetPeerLatency(netPeer interface{}) time.Duration { + return 0 +} + func (fn *justRandomFakeNode) GetPendingTransactionGroups() (txGroups []pooldata.SignedTxGroup, latestLocallyOriginatedGroupCounter uint64) { return } diff --git a/txnsync/emulatorNode_test.go b/txnsync/emulatorNode_test.go index 27bb75115..378f1622f 100644 --- a/txnsync/emulatorNode_test.go +++ b/txnsync/emulatorNode_test.go @@ -278,6 +278,10 @@ func (n *emulatedNode) SendPeerMessage(netPeer interface{}, msg []byte, callback peer.outSeq++ } +func (n *emulatedNode) GetPeerLatency(netPeer interface{}) time.Duration { + return 0 +} + func (n *emulatedNode) GetPendingTransactionGroups() ([]pooldata.SignedTxGroup, uint64) { return n.txpoolEntries, n.latestLocallyOriginatedGroupCounter } @@ -356,7 +360,7 @@ func (n *emulatedNode) step() { peer.mu.Unlock() - msgHandler(peer, peer.peer, msgBytes, msgInSeq) + msgHandler(peer, peer.peer, msgBytes, msgInSeq, 0) n.unblock() n.waitBlocked() peer.mu.Lock() diff --git a/txnsync/incoming.go b/txnsync/incoming.go index d6365a6de..83ac416f1 100644 --- a/txnsync/incoming.go +++ b/txnsync/incoming.go @@ -20,8 +20,6 @@ import ( "errors" "time" - "github.com/algorand/go-deadlock" - "github.com/algorand/go-algorand/data/pooldata" ) @@ -40,75 +38,12 @@ type incomingMessage struct { encodedSize int // the byte length of the incoming network message bloomFilter *testableBloomFilter transactionGroups []pooldata.SignedTxGroup -} - -// incomingMessageQueue manages the global incoming message queue across all the incoming peers. -type incomingMessageQueue struct { - incomingMessages chan incomingMessage - enqueuedPeers map[*Peer]struct{} - enqueuedPeersMu deadlock.Mutex -} - -// maxPeersCount defines the maximum number of supported peers that can have their messages waiting -// in the incoming message queue at the same time. This number can be lower then the actual number of -// connected peers, as it's used only for pending messages. -const maxPeersCount = 1024 - -// makeIncomingMessageQueue creates an incomingMessageQueue object and initializes all the internal variables. -func makeIncomingMessageQueue() incomingMessageQueue { - return incomingMessageQueue{ - incomingMessages: make(chan incomingMessage, maxPeersCount), - enqueuedPeers: make(map[*Peer]struct{}, maxPeersCount), - } -} - -// getIncomingMessageChannel returns the incoming messages channel, which would contain entries once -// we have one ( or more ) pending incoming messages. -func (imq *incomingMessageQueue) getIncomingMessageChannel() <-chan incomingMessage { - return imq.incomingMessages -} - -// enqueue places the given message on the queue, if and only if it's associated peer doesn't -// appear on the incoming message queue already. In the case there is no peer, the message -// would be placed on the queue as is. -// The method returns false if the incoming message doesn't have it's peer on the queue and -// the method has failed to place the message on the queue. True is returned otherwise. -func (imq *incomingMessageQueue) enqueue(m incomingMessage) bool { - if m.peer != nil { - imq.enqueuedPeersMu.Lock() - defer imq.enqueuedPeersMu.Unlock() - if _, has := imq.enqueuedPeers[m.peer]; has { - return true - } - } - select { - case imq.incomingMessages <- m: - // if we successfully enqueued the message, set the enqueuedPeers so that we won't enqueue the same peer twice. - if m.peer != nil { - // at this time, the enqueuedPeersMu is still under lock ( due to the above defer ), so we can access - // the enqueuedPeers here. - imq.enqueuedPeers[m.peer] = struct{}{} - } - return true - default: - return false - } -} - -// clear removes the peer that is associated with the message ( if any ) from -// the enqueuedPeers map, allowing future messages from this peer to be placed on the -// incoming message queue. -func (imq *incomingMessageQueue) clear(m incomingMessage) { - if m.peer != nil { - imq.enqueuedPeersMu.Lock() - defer imq.enqueuedPeersMu.Unlock() - delete(imq.enqueuedPeers, m.peer) - } + timeReceived int64 } // incomingMessageHandler // note - this message is called by the network go-routine dispatch pool, and is not synchronized with the rest of the transaction synchronizer -func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64) (err error) { +func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64, receivedTimestamp int64) (err error) { // increase number of incoming messages metric. txsyncIncomingMessagesTotal.Inc(nil) @@ -120,17 +55,19 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P } }() - incomingMessage := incomingMessage{networkPeer: networkPeer, sequenceNumber: sequenceNumber, encodedSize: len(message), peer: peer} + incomingMessage := incomingMessage{networkPeer: networkPeer, sequenceNumber: sequenceNumber, encodedSize: len(message), peer: peer, timeReceived: receivedTimestamp} _, err = incomingMessage.message.UnmarshalMsg(message) if err != nil { // if we received a message that we cannot parse, disconnect. s.log.Infof("received unparsable transaction sync message from peer. disconnecting from peer.") + s.incomingMessagesQ.erase(peer, networkPeer) return err } if incomingMessage.message.Version != txnBlockMessageVersion { // we receive a message from a version that we don't support, disconnect. s.log.Infof("received unsupported transaction sync message version from peer (%d). disconnecting from peer.", incomingMessage.message.Version) + s.incomingMessagesQ.erase(peer, networkPeer) return errUnsupportedTransactionSyncMessageVersion } @@ -139,6 +76,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P bloomFilter, err := decodeBloomFilter(incomingMessage.message.TxnBloomFilter) if err != nil { s.log.Infof("Invalid bloom filter received from peer : %v", err) + s.incomingMessagesQ.erase(peer, networkPeer) return errInvalidBloomFilter } incomingMessage.bloomFilter = bloomFilter @@ -150,6 +88,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P incomingMessage.transactionGroups, err = decodeTransactionGroups(incomingMessage.message.TransactionGroups, s.genesisID, s.genesisHash) if err != nil { s.log.Infof("failed to decode received transactions groups: %v\n", err) + s.incomingMessagesQ.erase(peer, networkPeer) return errDecodingReceivedTransactionGroupsFailed } @@ -158,10 +97,21 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P // all the peer objects are created synchronously. enqueued := s.incomingMessagesQ.enqueue(incomingMessage) if !enqueued { - // if we can't enqueue that, return an error, which would disconnect the peer. - // ( we have to disconnect, since otherwise, we would have no way to synchronize the sequence number) - s.log.Infof("unable to enqueue incoming message from a peer without txsync allocated data; incoming messages queue is full. disconnecting from peer.") - return errTransactionSyncIncomingMessageQueueFull + // if we failed to enqueue, it means that the queue is full. Try to remove disconnected + // peers from the queue before re-attempting. + peers := s.node.GetPeers() + if s.incomingMessagesQ.prunePeers(peers) { + // if we were successful in removing at least a single peer, then try to add the entry again. + enqueued = s.incomingMessagesQ.enqueue(incomingMessage) + } + if !enqueued { + // if we can't enqueue that, return an error, which would disconnect the peer. + // ( we have to disconnect, since otherwise, we would have no way to synchronize the sequence number) + s.log.Infof("unable to enqueue incoming message from a peer without txsync allocated data; incoming messages queue is full. disconnecting from peer.") + s.incomingMessagesQ.erase(peer, networkPeer) + return errTransactionSyncIncomingMessageQueueFull + } + } return nil } @@ -170,15 +120,26 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P if err != nil { // if the incoming message queue for this peer is full, disconnect from this peer. s.log.Infof("unable to enqueue incoming message into peer incoming message backlog. disconnecting from peer.") + s.incomingMessagesQ.erase(peer, networkPeer) return err } // (maybe) place the peer message on the main queue. This would get skipped if the peer is already on the queue. enqueued := s.incomingMessagesQ.enqueue(incomingMessage) if !enqueued { - // if we can't enqueue that, return an error, which would disconnect the peer. - s.log.Infof("unable to enqueue incoming message from a peer with txsync allocated data; incoming messages queue is full. disconnecting from peer.") - return errTransactionSyncIncomingMessageQueueFull + // if we failed to enqueue, it means that the queue is full. Try to remove disconnected + // peers from the queue before re-attempting. + peers := s.node.GetPeers() + if s.incomingMessagesQ.prunePeers(peers) { + // if we were successful in removing at least a single peer, then try to add the entry again. + enqueued = s.incomingMessagesQ.enqueue(incomingMessage) + } + if !enqueued { + // if we can't enqueue that, return an error, which would disconnect the peer. + s.log.Infof("unable to enqueue incoming message from a peer with txsync allocated data; incoming messages queue is full. disconnecting from peer.") + s.incomingMessagesQ.erase(peer, networkPeer) + return errTransactionSyncIncomingMessageQueueFull + } } return nil } @@ -194,7 +155,7 @@ func (s *syncState) evaluateIncomingMessage(message incomingMessage) { } if peerInfo.TxnSyncPeer == nil { // we couldn't really do much about this message previously, since we didn't have the peer. - peer = makePeer(message.networkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log) + peer = makePeer(message.networkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log, s.node.GetPeerLatency(message.networkPeer)) // let the network peer object know about our peer s.node.UpdatePeers([]*Peer{peer}, []interface{}{message.networkPeer}, 0) } else { @@ -207,9 +168,7 @@ func (s *syncState) evaluateIncomingMessage(message incomingMessage) { return } } - // clear the peer that is associated with this incoming message from the message queue, allowing future - // messages from the peer to be placed on the message queue. - s.incomingMessagesQ.clear(message) + messageProcessed := false transactionPoolSize := 0 totalAccumulatedTransactionsCount := 0 // the number of transactions that were added during the execution of this method @@ -249,7 +208,11 @@ incomingMessageLoop: } peer.updateRequestParams(incomingMsg.message.UpdatedRequestParams.Modulator, incomingMsg.message.UpdatedRequestParams.Offset) - peer.updateIncomingMessageTiming(incomingMsg.message.MsgSync, s.round, s.clock.Since(), incomingMsg.encodedSize) + timeInQueue := time.Duration(0) + if incomingMsg.timeReceived > 0 { + timeInQueue = time.Since(time.Unix(0, incomingMsg.timeReceived)) + } + peer.updateIncomingMessageTiming(incomingMsg.message.MsgSync, s.round, s.clock.Since(), timeInQueue, peer.cachedLatency, incomingMsg.encodedSize) // if the peer's round is more than a single round behind the local node, then we don't want to // try and load the transactions. The other peer should first catch up before getting transactions. diff --git a/txnsync/incomingMsgQ.go b/txnsync/incomingMsgQ.go new file mode 100644 index 000000000..aecb673bd --- /dev/null +++ b/txnsync/incomingMsgQ.go @@ -0,0 +1,372 @@ +// Copyright (C) 2019-2021 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package txnsync + +import ( + "sync" + + "github.com/algorand/go-deadlock" +) + +// queuedMsgEntry used as a helper struct to manage the manipulation of incoming +// message queue. +type queuedMsgEntry struct { + msg incomingMessage + next *queuedMsgEntry + prev *queuedMsgEntry +} + +type queuedMsgList struct { + head *queuedMsgEntry +} + +// incomingMessageQueue manages the global incoming message queue across all the incoming peers. +type incomingMessageQueue struct { + outboundPeerCh chan incomingMessage + enqueuedPeersMap map[*Peer]*queuedMsgEntry + messages queuedMsgList + freelist queuedMsgList + enqueuedPeersMu deadlock.Mutex + enqueuedPeersCond *sync.Cond + shutdownRequest chan struct{} + shutdownConfirmed chan struct{} + deletePeersCh chan interface{} + peerlessCount int +} + +// maxPeersCount defines the maximum number of supported peers that can have their messages waiting +// in the incoming message queue at the same time. This number can be lower then the actual number of +// connected peers, as it's used only for pending messages. +const maxPeersCount = 2048 + +// maxPeerlessCount is the number of messages that we've received that doesn't have a Peer object allocated +// for them ( yet ) +const maxPeerlessCount = 512 + +// makeIncomingMessageQueue creates an incomingMessageQueue object and initializes all the internal variables. +func makeIncomingMessageQueue() *incomingMessageQueue { + imq := &incomingMessageQueue{ + outboundPeerCh: make(chan incomingMessage), + enqueuedPeersMap: make(map[*Peer]*queuedMsgEntry, maxPeersCount), + shutdownRequest: make(chan struct{}, 1), + shutdownConfirmed: make(chan struct{}, 1), + deletePeersCh: make(chan interface{}), + } + imq.enqueuedPeersCond = sync.NewCond(&imq.enqueuedPeersMu) + imq.freelist.initialize(maxPeersCount) + go imq.messagePump() + return imq +} + +// dequeueHead removes the first head message from the linked list. +func (ml *queuedMsgList) dequeueHead() (out *queuedMsgEntry) { + if ml.head == nil { + return nil + } + entry := ml.head + out = entry + if entry.next == entry { + ml.head = nil + return + } + entry.next.prev = entry.prev + entry.prev.next = entry.next + ml.head = entry.next + out.next = out + out.prev = out + return +} + +// initialize initializes a list to have msgCount entries. +func (ml *queuedMsgList) initialize(msgCount int) { + msgs := make([]queuedMsgEntry, msgCount) + for i := 0; i < msgCount; i++ { + msgs[i].next = &msgs[(i+1)%msgCount] + msgs[i].prev = &msgs[(i+msgCount-1)%msgCount] + } + ml.head = &msgs[0] +} + +// empty methods tests to see if the linked list is empty +func (ml *queuedMsgList) empty() bool { + return ml.head == nil +} + +// remove removes the given msg from the linked list. The method +// is written with the assumption that the given msg is known to be +// part of the linked list. +func (ml *queuedMsgList) remove(msg *queuedMsgEntry) { + if msg.next == msg { + ml.head = nil + return + } + msg.prev.next = msg.next + msg.next.prev = msg.prev + if ml.head == msg { + ml.head = msg.next + } + msg.prev = msg + msg.next = msg +} + +// filterRemove removes zero or more messages from the linked list, for which the given +// removeFunc returns true. The removed linked list entries are returned as a linked list. +func (ml *queuedMsgList) filterRemove(removeFunc func(*queuedMsgEntry) bool) *queuedMsgEntry { + if ml.empty() { + return nil + } + // do we have a single item ? + if ml.head.next == ml.head { + if removeFunc(ml.head) { + out := ml.head + ml.head = nil + return out + } + return nil + } + current := ml.head + last := ml.head.prev + var letGo queuedMsgList + for { + next := current.next + if removeFunc(current) { + ml.remove(current) + letGo.enqueueTail(current) + } + if current == last { + break + } + current = next + } + return letGo.head +} + +// enqueueTail adds to the current linked list another linked list whose head is msg. +func (ml *queuedMsgList) enqueueTail(msg *queuedMsgEntry) { + if ml.head == nil { + ml.head = msg + return + } else if msg == nil { + return + } + lastEntryOld := ml.head.prev + lastEntryNew := msg.prev + lastEntryOld.next = msg + ml.head.prev = lastEntryNew + msg.prev = lastEntryOld + lastEntryNew.next = ml.head +} + +// shutdown signals to the message pump to shut down and waits until the message pump goroutine +// aborts. +func (imq *incomingMessageQueue) shutdown() { + imq.enqueuedPeersMu.Lock() + close(imq.shutdownRequest) + imq.enqueuedPeersCond.Signal() + imq.enqueuedPeersMu.Unlock() + <-imq.shutdownConfirmed +} + +// messagePump is the incoming message queue message pump. It takes messages from the messages list +// and attempt to write these to the outboundPeerCh. +func (imq *incomingMessageQueue) messagePump() { + defer close(imq.shutdownConfirmed) + imq.enqueuedPeersMu.Lock() + defer imq.enqueuedPeersMu.Unlock() + + for { + // check if we need to shutdown. + select { + case <-imq.shutdownRequest: + return + default: + } + + // do we have any item to enqueue ? + if !imq.messages.empty() { + msgEntry := imq.messages.dequeueHead() + msg := msgEntry.msg + imq.freelist.enqueueTail(msgEntry) + if msg.peer != nil { + delete(imq.enqueuedPeersMap, msg.peer) + } else { + imq.peerlessCount-- + } + imq.enqueuedPeersMu.Unlock() + writeOutboundMessage: + select { + case imq.outboundPeerCh <- msg: + imq.enqueuedPeersMu.Lock() + continue + case <-imq.shutdownRequest: + imq.enqueuedPeersMu.Lock() + return + // see if this msg need to be delivered or not. + case droppedPeer := <-imq.deletePeersCh: + if msg.networkPeer == droppedPeer { + // we want to skip this message. + imq.enqueuedPeersMu.Lock() + continue + } + goto writeOutboundMessage + } + } + imq.enqueuedPeersCond.Wait() + } +} + +// getIncomingMessageChannel returns the incoming messages channel, which would contain entries once +// we have one ( or more ) pending incoming messages. +func (imq *incomingMessageQueue) getIncomingMessageChannel() <-chan incomingMessage { + return imq.outboundPeerCh +} + +// enqueue places the given message on the queue, if and only if it's associated peer doesn't +// appear on the incoming message queue already. In the case there is no peer, the message +// would be placed on the queue as is. +// The method returns false if the incoming message doesn't have it's peer on the queue and +// the method has failed to place the message on the queue. True is returned otherwise. +func (imq *incomingMessageQueue) enqueue(m incomingMessage) bool { + imq.enqueuedPeersMu.Lock() + defer imq.enqueuedPeersMu.Unlock() + if m.peer != nil { + if _, has := imq.enqueuedPeersMap[m.peer]; has { + return true + } + } else { + // do we have enough "room" for peerless messages ? + if imq.peerlessCount >= maxPeerlessCount { + return false + } + } + // do we have enough room in the message queue for the new message ? + if imq.freelist.empty() { + // no - we don't have enough room in the circular buffer. + return false + } + freeMsgEntry := imq.freelist.dequeueHead() + freeMsgEntry.msg = m + imq.messages.enqueueTail(freeMsgEntry) + // if we successfully enqueued the message, set the enqueuedPeersMap so that we won't enqueue the same peer twice. + if m.peer != nil { + imq.enqueuedPeersMap[m.peer] = freeMsgEntry + } else { + imq.peerlessCount++ + } + imq.enqueuedPeersCond.Signal() + return true +} + +// erase removes all the entries associated with the given network peer. +// this method isn't very efficient, and should be used only in cases where +// we disconnect from a peer and want to cleanup all the pending tasks associated +// with that peer. +func (imq *incomingMessageQueue) erase(peer *Peer, networkPeer interface{}) { + imq.enqueuedPeersMu.Lock() + + var peerMsgEntry *queuedMsgEntry + if peer == nil { + // lookup for a Peer object. + for peer, peerMsgEntry = range imq.enqueuedPeersMap { + if peer.networkPeer != networkPeer { + continue + } + break + } + } else { + var has bool + if peerMsgEntry, has = imq.enqueuedPeersMap[peer]; !has { + // the peer object is not in the map. + peer = nil + } + } + + if peer != nil { + delete(imq.enqueuedPeersMap, peer) + imq.messages.remove(peerMsgEntry) + imq.freelist.enqueueTail(peerMsgEntry) + imq.enqueuedPeersMu.Unlock() + select { + case imq.deletePeersCh <- networkPeer: + default: + } + return + } + + imq.removeMessageByNetworkPeer(networkPeer) + imq.enqueuedPeersMu.Unlock() + select { + case imq.deletePeersCh <- networkPeer: + default: + } +} + +// removeMessageByNetworkPeer removes the messages associated with the given network peer from the +// queue. +// note : the method expect that the enqueuedPeersMu lock would be taken. +func (imq *incomingMessageQueue) removeMessageByNetworkPeer(networkPeer interface{}) { + peerlessCount := 0 + removeByNetworkPeer := func(msg *queuedMsgEntry) bool { + if msg.msg.networkPeer == networkPeer { + if msg.msg.peer == nil { + peerlessCount++ + } + return true + } + return false + } + removeList := imq.messages.filterRemove(removeByNetworkPeer) + imq.freelist.enqueueTail(removeList) + imq.peerlessCount -= peerlessCount +} + +// prunePeers removes from the enqueuedMessages queue all the entries that are not provided in the +// given activePeers slice. +func (imq *incomingMessageQueue) prunePeers(activePeers []PeerInfo) (peerRemoved bool) { + activePeersMap := make(map[*Peer]bool) + activeNetworkPeersMap := make(map[interface{}]bool) + for _, activePeer := range activePeers { + if activePeer.TxnSyncPeer != nil { + activePeersMap[activePeer.TxnSyncPeer] = true + } + if activePeer.NetworkPeer != nil { + activeNetworkPeersMap[activePeer.NetworkPeer] = true + } + } + imq.enqueuedPeersMu.Lock() + defer imq.enqueuedPeersMu.Unlock() + peerlessCount := 0 + isPeerMissing := func(msg *queuedMsgEntry) bool { + if msg.msg.peer != nil { + if !activePeersMap[msg.msg.peer] { + return true + } + } + if !activeNetworkPeersMap[msg.msg.networkPeer] { + if msg.msg.peer == nil { + peerlessCount++ + } + return true + } + return false + } + removeList := imq.messages.filterRemove(isPeerMissing) + peerRemoved = removeList != nil + imq.freelist.enqueueTail(removeList) + imq.peerlessCount -= peerlessCount + return +} diff --git a/txnsync/incomingMsgQ_test.go b/txnsync/incomingMsgQ_test.go new file mode 100644 index 000000000..9ff1adf4b --- /dev/null +++ b/txnsync/incomingMsgQ_test.go @@ -0,0 +1,158 @@ +// Copyright (C) 2019-2021 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see <https://www.gnu.org/licenses/>. + +package txnsync + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/test/partitiontest" +) + +// fillMessageQueue fills the message queue with the given message. +func (imq *incomingMessageQueue) fillMessageQueue(msg incomingMessage) { + imq.enqueuedPeersMu.Lock() + for i := 0; i < maxPeersCount; i++ { + msgEntry := imq.freelist.dequeueHead() + msgEntry.msg = msg + imq.messages.enqueueTail(msgEntry) + } + if msg.peer == nil { + imq.peerlessCount += maxPeersCount + } + imq.enqueuedPeersCond.Signal() + imq.enqueuedPeersMu.Unlock() + + // wait for a single message to be consumed by the message pump. + for { + imq.enqueuedPeersMu.Lock() + if !imq.freelist.empty() { + break + } + imq.enqueuedPeersMu.Unlock() + time.Sleep(time.Millisecond) + } + for !imq.freelist.empty() { + msgEntry := imq.freelist.dequeueHead() + msgEntry.msg = msg + imq.messages.enqueueTail(msgEntry) + } + imq.enqueuedPeersCond.Signal() + imq.enqueuedPeersMu.Unlock() +} + +// count counts teh number of messages in the list +func (ml *queuedMsgList) count() int { + first := ml.head + cur := first + count := 0 + for cur != nil { + next := cur.next + if next == first { + next = nil + } + count++ + cur = next + } + return count +} + +// validateLinking test to see the the entries in the list are correctly connected. +func (ml *queuedMsgList) validateLinking(t *testing.T) { + cur := ml.head + if cur == nil { + return + } + seen := make(map[*queuedMsgEntry]bool) + list := make([]*queuedMsgEntry, 0) + for { + if seen[cur] { + break + } + seen[cur] = true + require.NotNil(t, cur.prev) + require.NotNil(t, cur.next) + list = append(list, cur) + cur = cur.next + } + for i := range list { + require.Equal(t, list[i], list[(i+len(list)-1)%len(list)].next) + require.Equal(t, list[i], list[(i+1)%len(list)].prev) + } +} + +// TestMsgQCounts tests the message queue add/remove manipulations +func TestMsgQCounts(t *testing.T) { + partitiontest.PartitionTest(t) + + var list queuedMsgList + list.initialize(7) + list.validateLinking(t) + require.Equal(t, 7, list.count()) + list.dequeueHead() + list.validateLinking(t) + require.Equal(t, 6, list.count()) + var anotherList queuedMsgList + anotherList.initialize(4) + require.Equal(t, 4, anotherList.count()) + list.enqueueTail(anotherList.head) + list.validateLinking(t) + require.Equal(t, 10, list.count()) +} + +// TestMsgQFiltering tests the message queue filtering +func TestMsgQFiltering(t *testing.T) { + partitiontest.PartitionTest(t) + + item1 := &queuedMsgEntry{} + item2 := &queuedMsgEntry{} + item3 := &queuedMsgEntry{} + item1.next = item1 + item1.prev = item1 + item2.next = item2 + item2.prev = item2 + item3.next = item3 + item3.prev = item3 + + var list queuedMsgList + list.enqueueTail(item1) + list.enqueueTail(item2) + list.enqueueTail(item3) + + // test removing head. + removedItem1 := list.filterRemove(func(msg *queuedMsgEntry) bool { + return msg == item1 + }) + require.Equal(t, item1, removedItem1) + require.Equal(t, 2, list.count()) + + // test removing tail + removedItem3 := list.filterRemove(func(msg *queuedMsgEntry) bool { + return msg == item3 + }) + require.Equal(t, item3, removedItem3) + require.Equal(t, 1, list.count()) + + // test removing last item + removedItem2 := list.filterRemove(func(msg *queuedMsgEntry) bool { + return msg == item2 + }) + require.Equal(t, item2, removedItem2) + require.True(t, list.empty()) +} diff --git a/txnsync/incoming_test.go b/txnsync/incoming_test.go index 62782ff84..978a1c4ca 100644 --- a/txnsync/incoming_test.go +++ b/txnsync/incoming_test.go @@ -58,27 +58,29 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) { cfg := config.GetDefaultLocal() mNodeConnector := &mockNodeConnector{transactionPoolSize: 3} s := syncState{ - log: wrapLogger(&incLogger, &cfg), - node: mNodeConnector, + log: wrapLogger(&incLogger, &cfg), + node: mNodeConnector, + clock: mNodeConnector.Clock(), + incomingMessagesQ: makeIncomingMessageQueue(), } // expect UnmarshalMsg error messageBytes[0] = 0 - err := s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber) + err := s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0) msgpe := msgp.TypeError{} require.True(t, errors.As(err, &msgpe)) // expect wrong version error message = transactionBlockMessage{Version: -3} messageBytes = message.MarshalMsg(nil) - err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber) + err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0) require.Equal(t, errUnsupportedTransactionSyncMessageVersion, err) // expect error decoding bloomFilter message.Version = 1 message.TxnBloomFilter.BloomFilterType = byte(multiHashBloomFilter) messageBytes = message.MarshalMsg(nil) - err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber) + err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0) require.Equal(t, errInvalidBloomFilter, err) // error decoding transaction groups @@ -89,35 +91,46 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) { require.NoError(t, err) message.TransactionGroups = packedTransactionGroups{Bytes: []byte{1}} messageBytes = message.MarshalMsg(nil) - err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber) + err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0) require.Equal(t, errDecodingReceivedTransactionGroupsFailed, err) + s.incomingMessagesQ.shutdown() + + peer := Peer{networkPeer: &s} // error queue full message.TransactionGroups = packedTransactionGroups{} messageBytes = message.MarshalMsg(nil) - err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber) + s.incomingMessagesQ = makeIncomingMessageQueue() + s.incomingMessagesQ.fillMessageQueue(incomingMessage{peer: &peer, networkPeer: &s.incomingMessagesQ}) + mNodeConnector.peers = append(mNodeConnector.peers, PeerInfo{TxnSyncPeer: &peer, NetworkPeer: &s.incomingMessagesQ}) + err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0) require.Equal(t, errTransactionSyncIncomingMessageQueueFull, err) + s.incomingMessagesQ.shutdown() // Success where peer == nil s.incomingMessagesQ = makeIncomingMessageQueue() - err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber) + err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0) require.NoError(t, err) - - peer := Peer{} + s.incomingMessagesQ.shutdown() // error when placing the peer message on the main queue (incomingMessages cannot accept messages) - s.incomingMessagesQ = incomingMessageQueue{} - err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber) + s.incomingMessagesQ = makeIncomingMessageQueue() + s.incomingMessagesQ.fillMessageQueue(incomingMessage{peer: nil, networkPeer: &s}) + mNodeConnector.peers = append(mNodeConnector.peers, PeerInfo{NetworkPeer: &s}) + + err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber, 0) require.Equal(t, errTransactionSyncIncomingMessageQueueFull, err) + s.incomingMessagesQ.shutdown() s.incomingMessagesQ = makeIncomingMessageQueue() err = nil // fill up the incoming message queue (one was already added) for x := 1; x <= messageOrderingHeapLimit; x++ { require.NoError(t, err) - err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber) + err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber, 0) } require.Equal(t, errHeapReachedCapacity, err) + s.incomingMessagesQ.shutdown() } func TestEvaluateIncomingMessagePart1(t *testing.T) { @@ -150,28 +163,29 @@ func TestEvaluateIncomingMessagePart1(t *testing.T) { mNodeConnector.updatingPeers = false s.incomingMessagesQ = makeIncomingMessageQueue() - // Add a peer here, and make sure it is cleared - s.incomingMessagesQ.enqueuedPeers[peer] = struct{}{} + defer s.incomingMessagesQ.shutdown() + message.peer = peer + require.True(t, s.incomingMessagesQ.enqueue(message)) mNodeConnector.peerInfo.TxnSyncPeer = peer peer.incomingMessages = messageOrderingHeap{} // TxnSyncPeer in peerInfo s.evaluateIncomingMessage(message) require.False(t, mNodeConnector.updatingPeers) - _, found := s.incomingMessagesQ.enqueuedPeers[peer] + <-s.incomingMessagesQ.getIncomingMessageChannel() + _, found := s.incomingMessagesQ.enqueuedPeersMap[peer] require.False(t, found) - // fill the hip with messageOrderingHeapLimit elements so that the incomingMessages enqueue fails + // fill the heap with messageOrderingHeapLimit elements so that the incomingMessages enqueue fails + message.networkPeer = &s + message.peer = nil for x := 0; x < messageOrderingHeapLimit; x++ { err := peer.incomingMessages.enqueue(message) require.NoError(t, err) } - // Add a peer here, and make sure it is not cleared after the error - s.incomingMessagesQ.enqueuedPeers[peer] = struct{}{} + mNodeConnector.peers = []PeerInfo{{TxnSyncPeer: peer, NetworkPeer: &s}} // TxnSyncPeer in peerInfo s.evaluateIncomingMessage(message) require.False(t, mNodeConnector.updatingPeers) - _, found = s.incomingMessagesQ.enqueuedPeers[peer] - require.True(t, found) } func TestEvaluateIncomingMessagePart2(t *testing.T) { diff --git a/txnsync/interfaces.go b/txnsync/interfaces.go index 976bdbc00..77ac07163 100644 --- a/txnsync/interfaces.go +++ b/txnsync/interfaces.go @@ -17,6 +17,8 @@ package txnsync import ( + "time" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/pooldata" "github.com/algorand/go-algorand/util/timers" @@ -46,7 +48,7 @@ type Event struct { } // IncomingMessageHandler is the signature of the incoming message handler used by the transaction sync to receive network messages -type IncomingMessageHandler func(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64) error +type IncomingMessageHandler func(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64, receivedTimestamp int64) error // SendMessageCallback define a message sent feedback for performing message tracking type SendMessageCallback func(enqueued bool, sequenceNumber uint64) error @@ -79,6 +81,7 @@ type NodeConnector interface { // across all the connected peers. UpdatePeers(txsyncPeers []*Peer, netPeers []interface{}, peersAverageDataExchangeRate uint64) SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback) + GetPeerLatency(netPeer interface{}) time.Duration // GetPendingTransactionGroups is called by the transaction sync when it needs to look into the transaction // pool and get the updated set of pending transactions. The second returned argument is the latest locally originated // group counter within the given transaction groups list. If there is no group that is locally originated, the expected diff --git a/txnsync/mainloop.go b/txnsync/mainloop.go index ca899bc9c..138206be0 100644 --- a/txnsync/mainloop.go +++ b/txnsync/mainloop.go @@ -70,7 +70,7 @@ type syncState struct { scheduler peerScheduler interruptablePeers []*Peer interruptablePeersMap map[*Peer]int // map a peer into the index of interruptablePeers - incomingMessagesQ incomingMessageQueue + incomingMessagesQ *incomingMessageQueue outgoingMessagesCallbackCh chan sentMessageMetadata nextOffsetRollingCh <-chan time.Time requestsOffset uint64 @@ -105,6 +105,7 @@ func (s *syncState) mainloop(serviceCtx context.Context, wg *sync.WaitGroup) { s.clock = s.node.Clock() s.incomingMessagesQ = makeIncomingMessageQueue() + defer s.incomingMessagesQ.shutdown() s.outgoingMessagesCallbackCh = make(chan sentMessageMetadata, 1024) s.interruptablePeersMap = make(map[*Peer]int) s.scheduler.node = s.node @@ -308,6 +309,7 @@ func (s *syncState) onNewRoundEvent(ent Event) { if !s.isRelay { s.nextOffsetRollingCh = s.clock.TimeoutAt(kickoffTime + 2*s.lastBeta) } + s.updatePeersLatency(peers) s.updatePeersRequestParams(peers) } @@ -395,7 +397,7 @@ func (s *syncState) getPeers() (result []*Peer) { // some of the network peers might not have a sync peer, so we need to create one for these. for _, peerInfo := range peersInfo { if peerInfo.TxnSyncPeer == nil { - syncPeer := makePeer(peerInfo.NetworkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log) + syncPeer := makePeer(peerInfo.NetworkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log, s.node.GetPeerLatency(peerInfo.NetworkPeer)) peerInfo.TxnSyncPeer = syncPeer updatedNetworkPeers = append(updatedNetworkPeers, peerInfo.NetworkPeer) updatedNetworkPeersSync = append(updatedNetworkPeersSync, syncPeer) @@ -435,3 +437,9 @@ func (s *syncState) updatePeersRequestParams(peers []*Peer) { } } } + +func (s *syncState) updatePeersLatency(peers []*Peer) { + for _, peer := range peers { + peer.cachedLatency = s.node.GetPeerLatency(peer.networkPeer) + } +} diff --git a/txnsync/outgoing.go b/txnsync/outgoing.go index 98697f311..fa18bd353 100644 --- a/txnsync/outgoing.go +++ b/txnsync/outgoing.go @@ -51,27 +51,35 @@ type sentMessageMetadata struct { // could be a lengthy operation which does't need to be blocking the main loop. Moving the actual encoding into an // execution pool thread frees up the main loop, allowing smoother operation. type messageAsyncEncoder struct { - state *syncState - messageData sentMessageMetadata - roundClock timers.WallClock - peerDataExchangeRate uint64 + state *syncState + messageData sentMessageMetadata + roundClock timers.WallClock + lastReceivedMessageTimestamp time.Duration + peerDataExchangeRate uint64 + // sentMessagesCh is a copy of the outgoingMessagesCallbackCh in the syncState object. We want to create a copy of + // the channel so that in case of a txnsync restart ( i.e. fast catchup ), we can still generate a new channel + // without triggering a data race. The alternative is to block the txnsync.Shutdown() until we receive the feedback + // from the network library, but that could be susceptible to undesired network disconnections. + sentMessagesCh chan sentMessageMetadata } // asyncMessageSent called via the network package to inform the txsync that a message was enqueued, and the associated sequence number. func (encoder *messageAsyncEncoder) asyncMessageSent(enqueued bool, sequenceNumber uint64) error { if !enqueued { encoder.state.log.Infof("unable to send message to peer. disconnecting from peer.") + encoder.state.incomingMessagesQ.erase(encoder.messageData.peer, encoder.messageData.peer.networkPeer) return errTransactionSyncOutgoingMessageSendFailed } // record the sequence number here, so that we can store that later on. encoder.messageData.sequenceNumber = sequenceNumber select { - case encoder.state.outgoingMessagesCallbackCh <- encoder.messageData: + case encoder.sentMessagesCh <- encoder.messageData: return nil default: // if we can't place it on the channel, return an error so that the node could disconnect from this peer. encoder.state.log.Infof("unable to enqueue outgoing message confirmation; outgoingMessagesCallbackCh is full. disconnecting from peer.") + encoder.state.incomingMessagesQ.erase(encoder.messageData.peer, encoder.messageData.peer.networkPeer) return errTransactionSyncOutgoingMessageQueueFull } } @@ -89,6 +97,12 @@ func (encoder *messageAsyncEncoder) asyncEncodeAndSend(interface{}) interface{} encoder.messageData.transactionGroups = nil // clear out to allow GC to reclaim } + if encoder.lastReceivedMessageTimestamp >= 0 { + // adding a nanosecond to the elapsed time is meaningless for the data rate calculation, but would ensure that + // the ResponseElapsedTime field has a clear distinction between "being set" vs. "not being set" + encoder.messageData.message.MsgSync.ResponseElapsedTime = uint64((encoder.roundClock.Since() - encoder.lastReceivedMessageTimestamp).Nanoseconds()) + } + encodedMessage := encoder.messageData.message.MarshalMsg(getMessageBuffer()) encoder.messageData.encodedMessageSize = len(encodedMessage) // now that the message is ready, we can discard the encoded transaction group slice to allow the GC to collect it. @@ -136,9 +150,9 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D pendingTransactions.pendingTransactionsGroups, pendingTransactions.latestLocallyOriginatedGroupCounter = s.node.GetPendingTransactionGroups() profGetTxnsGroups.end() for _, peer := range peers { - msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate} + msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate, sentMessagesCh: s.outgoingMessagesCallbackCh} profAssembleMessage.start() - msgEncoder.messageData, assembledBloomFilter = s.assemblePeerMessage(peer, &pendingTransactions) + msgEncoder.messageData, assembledBloomFilter, msgEncoder.lastReceivedMessageTimestamp = s.assemblePeerMessage(peer, &pendingTransactions) profAssembleMessage.end() isPartialMessage := msgEncoder.messageData.partialMessage // The message that we've just encoded is expected to be sent out with the next sequence number. @@ -178,7 +192,7 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D } } -func (s *syncState) assemblePeerMessage(peer *Peer, pendingTransactions *pendingTransactionGroupsSnapshot) (metaMessage sentMessageMetadata, assembledBloomFilter bloomFilter) { +func (s *syncState) assemblePeerMessage(peer *Peer, pendingTransactions *pendingTransactionGroupsSnapshot) (metaMessage sentMessageMetadata, assembledBloomFilter bloomFilter, lastReceivedMessageTimestamp time.Duration) { metaMessage = sentMessageMetadata{ peer: peer, message: &transactionBlockMessage{ @@ -276,10 +290,10 @@ notxns: } metaMessage.message.MsgSync.RefTxnBlockMsgSeq = peer.nextReceivedMessageSeq - 1 + // signify that timestamp is not set + lastReceivedMessageTimestamp = time.Duration(-1) if peer.lastReceivedMessageTimestamp != 0 && peer.lastReceivedMessageLocalRound == s.round { - // adding a nanosecond to the elapsed time is meaningless for the data rate calculation, but would ensure that - // the ResponseElapsedTime field has a clear distinction between "being set" vs. "not being set" - metaMessage.message.MsgSync.ResponseElapsedTime = uint64((s.clock.Since() - peer.lastReceivedMessageTimestamp).Nanoseconds()) + 1 + lastReceivedMessageTimestamp = peer.lastReceivedMessageTimestamp // reset the lastReceivedMessageTimestamp so that we won't be using that again on a subsequent outgoing message. peer.lastReceivedMessageTimestamp = 0 } diff --git a/txnsync/outgoing_test.go b/txnsync/outgoing_test.go index 151c716b2..3136161ef 100644 --- a/txnsync/outgoing_test.go +++ b/txnsync/outgoing_test.go @@ -62,6 +62,8 @@ func TestAsyncMessageSent(t *testing.T) { var s syncState s.clock = timers.MakeMonotonicClock(time.Now()) s.log = mockAsyncLogger{} + s.incomingMessagesQ = makeIncomingMessageQueue() + defer s.incomingMessagesQ.shutdown() asyncEncoder := messageAsyncEncoder{ state: &s, @@ -72,7 +74,8 @@ func TestAsyncMessageSent(t *testing.T) { }, peer: &Peer{}, }, - roundClock: timers.MakeMonotonicClock(time.Now()), + roundClock: timers.MakeMonotonicClock(time.Now()), + sentMessagesCh: s.outgoingMessagesCallbackCh, } oldTimestamp := asyncEncoder.messageData.sentTimestamp @@ -83,11 +86,11 @@ func TestAsyncMessageSent(t *testing.T) { a.Equal(asyncEncoder.messageData.sequenceNumber, uint64(1337)) // Make this buffered for now so we catch the select statement - asyncEncoder.state.outgoingMessagesCallbackCh = make(chan sentMessageMetadata, 1) + asyncEncoder.sentMessagesCh = make(chan sentMessageMetadata, 1) err = asyncEncoder.asyncMessageSent(true, 1337) a.Nil(err) - a.Equal(1, len(asyncEncoder.state.outgoingMessagesCallbackCh)) + a.Equal(1, len(asyncEncoder.sentMessagesCh)) } type mockAsyncNodeConnector struct { @@ -286,14 +289,14 @@ func TestAssemblePeerMessage_messageConstBloomFilter(t *testing.T) { peer.isOutgoing = true peer.state = peerStateLateBloom - metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions) + metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions) a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222)) a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111)) a.Equal(metaMessage.peer, &peer) a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion)) a.Equal(metaMessage.message.Round, s.round) - a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0) + a.True(responseTime >= 0) a.Equal(s.lastBloomFilter, expectedFilter) } @@ -329,14 +332,14 @@ func TestAssemblePeerMessage_messageConstBloomFilterNonRelay(t *testing.T) { peer.isOutgoing = true peer.state = peerStateLateBloom - metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions) + metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions) a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222)) a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111)) a.Equal(metaMessage.peer, &peer) a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion)) a.Equal(metaMessage.message.Round, s.round) - a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0) + a.True(responseTime >= 0) a.NotEqual(s.lastBloomFilter, expectedFilter) } @@ -361,14 +364,14 @@ func TestAssemblePeerMessage_messageConstNextMinDelay_messageConstUpdateRequestP s.isRelay = true s.lastBeta = 123 * time.Nanosecond - metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions) + metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions) a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222)) a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111)) a.Equal(metaMessage.peer, &peer) a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion)) a.Equal(metaMessage.message.Round, s.round) - a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0) + a.True(responseTime >= 0) a.Equal(metaMessage.message.MsgSync.NextMsgMinDelay, uint64(s.lastBeta.Nanoseconds())*2) } @@ -405,7 +408,7 @@ func TestAssemblePeerMessage_messageConstTransactions(t *testing.T) { peer.isOutgoing = true peer.state = peerStateHoldsoff - metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions) + metaMessage, _, _ := s.assemblePeerMessage(&peer, &pendingTransactions) a.Equal(len(metaMessage.transactionGroups), 1) a.True(reflect.DeepEqual(metaMessage.transactionGroups[0], pendingTransactions.pendingTransactionsGroups[0])) diff --git a/txnsync/peer.go b/txnsync/peer.go index 1aa04c574..b33ed0ed1 100644 --- a/txnsync/peer.go +++ b/txnsync/peer.go @@ -157,6 +157,8 @@ type Peer struct { // dataExchangeRate is the combined upload/download rate in bytes/second dataExchangeRate uint64 + // cachedLatency is the measured network latency of a peer, updated every round + cachedLatency time.Duration // these two fields describe "what does the local peer want the remote peer to send back" localTransactionsModulator byte @@ -265,12 +267,13 @@ func (t *transactionGroupCounterTracker) index(offset, modulator byte) int { return -1 } -func makePeer(networkPeer interface{}, isOutgoing bool, isLocalNodeRelay bool, cfg *config.Local, log Logger) *Peer { +func makePeer(networkPeer interface{}, isOutgoing bool, isLocalNodeRelay bool, cfg *config.Local, log Logger, latency time.Duration) *Peer { p := &Peer{ networkPeer: networkPeer, isOutgoing: isOutgoing, recentSentTransactions: makeTransactionCache(shortTermRecentTransactionsSentBufferLength, longTermRecentTransactionsSentBufferLength, pendingUnconfirmedRemoteMessages), dataExchangeRate: defaultDataExchangeRate, + cachedLatency: latency, transactionPoolAckCh: make(chan uint64, maxAcceptedMsgSeq), transactionPoolAckMessages: make([]uint64, 0, maxAcceptedMsgSeq), significantMessageThreshold: defaultSignificantMessageThreshold, @@ -582,25 +585,25 @@ func (p *Peer) updateIncomingTransactionGroups(txnGroups []pooldata.SignedTxGrou } } -func (p *Peer) updateIncomingMessageTiming(timings timingParams, currentRound basics.Round, currentTime time.Duration, incomingMessageSize int) { +func (p *Peer) updateIncomingMessageTiming(timings timingParams, currentRound basics.Round, currentTime time.Duration, timeInQueue time.Duration, peerLatency time.Duration, incomingMessageSize int) { p.lastConfirmedMessageSeqReceived = timings.RefTxnBlockMsgSeq // if we received a message that references our previous message, see if they occurred on the same round if p.lastConfirmedMessageSeqReceived == p.lastSentMessageSequenceNumber && p.lastSentMessageRound == currentRound && p.lastSentMessageTimestamp > 0 { // if so, we might be able to calculate the bandwidth. - timeSinceLastMessageWasSent := currentTime - p.lastSentMessageTimestamp + timeSinceLastMessageWasSent := currentTime - timeInQueue - p.lastSentMessageTimestamp networkMessageSize := uint64(p.lastSentMessageSize + incomingMessageSize) - if timings.ResponseElapsedTime != 0 && timeSinceLastMessageWasSent > time.Duration(timings.ResponseElapsedTime) && networkMessageSize >= p.significantMessageThreshold { - networkTrasmitTime := timeSinceLastMessageWasSent - time.Duration(timings.ResponseElapsedTime) + if timings.ResponseElapsedTime != 0 && peerLatency > 0 && timeSinceLastMessageWasSent > time.Duration(timings.ResponseElapsedTime)+peerLatency && networkMessageSize >= p.significantMessageThreshold { + networkTrasmitTime := timeSinceLastMessageWasSent - time.Duration(timings.ResponseElapsedTime) - peerLatency dataExchangeRate := uint64(time.Second) * networkMessageSize / uint64(networkTrasmitTime) + // clamp data exchange rate to realistic metrics if dataExchangeRate < minDataExchangeRateThreshold { dataExchangeRate = minDataExchangeRateThreshold } else if dataExchangeRate > maxDataExchangeRateThreshold { dataExchangeRate = maxDataExchangeRateThreshold } - // clamp data exchange rate to realistic metrics - p.dataExchangeRate = dataExchangeRate // fmt.Printf("incoming message : updating data exchange to %d; network msg size = %d+%d, transmit time = %v\n", dataExchangeRate, p.lastSentMessageSize, incomingMessageSize, networkTrasmitTime) + p.dataExchangeRate = dataExchangeRate } // given that we've (maybe) updated the data exchange rate, we need to clear out the lastSendMessage information @@ -611,7 +614,7 @@ func (p *Peer) updateIncomingMessageTiming(timings timingParams, currentRound ba p.lastSentMessageSize = 0 } p.lastReceivedMessageLocalRound = currentRound - p.lastReceivedMessageTimestamp = currentTime + p.lastReceivedMessageTimestamp = currentTime - timeInQueue p.lastReceivedMessageSize = incomingMessageSize p.lastReceivedMessageNextMsgMinDelay = time.Duration(timings.NextMsgMinDelay) * time.Nanosecond p.recentSentTransactions.acknowledge(timings.AcceptedMsgSeq) diff --git a/txnsync/peer_test.go b/txnsync/peer_test.go index cc45cd9a4..fb18e7d22 100644 --- a/txnsync/peer_test.go +++ b/txnsync/peer_test.go @@ -267,7 +267,7 @@ func TestGetNextScheduleOffset(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) if test.fxn != nil { test.fxn(p) } @@ -398,7 +398,7 @@ func TestGetMessageConstructionOps(t *testing.T) { log := wrapLogger(tlog, &config) for i, test := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) if test.fxn != nil { test.fxn(p) } @@ -527,7 +527,7 @@ func TestAdvancePeerState(t *testing.T) { log := wrapLogger(tlog, &config) for i, test := range tests { t.Run(string(rune(i)), func(t *testing.T) { - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) if test.fxn != nil { test.fxn(p) } @@ -554,7 +554,7 @@ func TestUpdateIncomingMessageTiming(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) currentRound := basics.Round(1) currentTime := time.Millisecond * 123 @@ -565,7 +565,7 @@ func TestUpdateIncomingMessageTiming(t *testing.T) { p.lastConfirmedMessageSeqReceived = p.lastSentMessageSequenceNumber + 1 - p.updateIncomingMessageTiming(timing, currentRound, currentTime, currentMessageSize) + p.updateIncomingMessageTiming(timing, currentRound, currentTime, 0, time.Millisecond, currentMessageSize) a.Equal(p.lastReceivedMessageLocalRound, currentRound) a.Equal(p.lastReceivedMessageTimestamp, currentTime) @@ -579,7 +579,7 @@ func TestUpdateIncomingMessageTiming(t *testing.T) { timing.ResponseElapsedTime = 1 p.lastSentMessageTimestamp = 1 * time.Millisecond currentMessageSize = maxDataExchangeRateThreshold + 1 - p.updateIncomingMessageTiming(timing, currentRound, currentTime, currentMessageSize) + p.updateIncomingMessageTiming(timing, currentRound, currentTime, 0, time.Millisecond, currentMessageSize) a.Equal(uint64(maxDataExchangeRateThreshold), p.dataExchangeRate) @@ -590,9 +590,20 @@ func TestUpdateIncomingMessageTiming(t *testing.T) { p.lastSentMessageSize = 0 currentMessageSize = int(p.significantMessageThreshold) currentTime = time.Millisecond * 1000 - p.updateIncomingMessageTiming(timing, currentRound, currentTime, currentMessageSize) + p.updateIncomingMessageTiming(timing, currentRound, currentTime, 0, time.Millisecond, currentMessageSize) a.Equal(uint64(minDataExchangeRateThreshold), p.dataExchangeRate) + + p.lastConfirmedMessageSeqReceived = p.lastSentMessageSequenceNumber + p.lastSentMessageRound = currentRound + timing.ResponseElapsedTime = uint64(time.Millisecond) + p.lastSentMessageTimestamp = 1 * time.Millisecond + p.lastSentMessageSize = 0 + currentMessageSize = 100000 + currentTime = time.Millisecond * 123 + p.updateIncomingMessageTiming(timing, currentRound, currentTime, time.Millisecond, time.Millisecond*100, currentMessageSize) + + a.Equal(uint64(5000000), p.dataExchangeRate) } // TestUpdateIncomingTransactionGroups tests updating the incoming transaction groups @@ -624,7 +635,7 @@ func TestUpdateIncomingTransactionGroups(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) p.recentSentTransactions.reset() @@ -643,7 +654,7 @@ func TestUpdateRequestParams(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) oldModulator := p.requestedTransactionsModulator oldOffset := p.requestedTransactionsOffset @@ -679,7 +690,7 @@ func TestAddIncomingBloomFilter(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) for i := 0; i < 2*maxIncomingBloomFilterHistory; i++ { bf := &testableBloomFilter{ @@ -743,7 +754,7 @@ func TestSelectPendingTransactions(t *testing.T) { log := wrapLogger(tlog, &config) for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) if test.fxn != nil { test.fxn(p) } @@ -810,7 +821,7 @@ func TestGetAcceptedMessages(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) var testList []uint64 chPtr := &p.transactionPoolAckCh @@ -835,7 +846,7 @@ func TestDequeuePendingTransactionPoolAckMessages(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) ch := p.transactionPoolAckCh var testList []uint64 @@ -880,7 +891,7 @@ func TestUpdateMessageSent(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) txMsg := &transactionBlockMessage{ Version: txnBlockMessageVersion, @@ -917,10 +928,10 @@ func TestIncomingPeersOnly(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p1 := makePeer(nil, true, true, &config, log) - p2 := makePeer(nil, true, false, &config, log) - p3 := makePeer(nil, false, true, &config, log) - p4 := makePeer(nil, false, false, &config, log) + p1 := makePeer(nil, true, true, &config, log, 0) + p2 := makePeer(nil, true, false, &config, log, 0) + p3 := makePeer(nil, false, true, &config, log, 0) + p4 := makePeer(nil, false, false, &config, log, 0) peers := []*Peer{p1, p2, p3, p4} @@ -939,7 +950,7 @@ func TestLocalRequestParams(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p := makePeer(nil, true, true, &config, log) + p := makePeer(nil, true, true, &config, log, 0) p.setLocalRequestParams(256, 256) offset, modulator := p.getLocalRequestParams() @@ -962,7 +973,7 @@ func TestSimpleGetters(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p := makePeer(sentinelInterface, true, true, &config, log) + p := makePeer(sentinelInterface, true, true, &config, log, 0) a.Equal(p.GetNetworkPeer(), sentinelInterface) a.Equal(p.GetTransactionPoolAckChannel(), p.transactionPoolAckCh) @@ -978,7 +989,7 @@ func TestMakePeer(t *testing.T) { config := config.GetDefaultLocal() tlog := logging.TestingLog(t) log := wrapLogger(tlog, &config) - p1 := makePeer(sentinelInterface, true, true, &config, log) + p1 := makePeer(sentinelInterface, true, true, &config, log, 0) a.NotNil(p1) a.Equal(p1.networkPeer, sentinelInterface) @@ -988,7 +999,7 @@ func TestMakePeer(t *testing.T) { a.Equal(p1.dataExchangeRate, uint64(defaultRelayToRelayDataExchangeRate)) // Check that we have different values if the local node relay is false - p2 := makePeer(sentinelInterface, true, false, &config, log) + p2 := makePeer(sentinelInterface, true, false, &config, log, 0) a.NotNil(p2) a.Equal(p1.networkPeer, sentinelInterface) diff --git a/txnsync/service_test.go b/txnsync/service_test.go index 182f83539..2b262fc18 100644 --- a/txnsync/service_test.go +++ b/txnsync/service_test.go @@ -43,6 +43,7 @@ type mockNodeConnector struct { peerInfo PeerInfo updatingPeers bool transactionPoolSize int + peers []PeerInfo } func makeMockNodeConnector(calledEvents *bool) mockNodeConnector { @@ -69,7 +70,7 @@ func (fn *mockNodeConnector) Random(rng uint64) uint64 { return rv % rng } -func (fn *mockNodeConnector) GetPeers() []PeerInfo { return nil } +func (fn *mockNodeConnector) GetPeers() []PeerInfo { return fn.peers } func (fn *mockNodeConnector) GetPeer(interface{}) (out PeerInfo) { return fn.peerInfo @@ -80,6 +81,11 @@ func (fn *mockNodeConnector) UpdatePeers(txsyncPeers []*Peer, netPeers []interfa } func (fn *mockNodeConnector) SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback) { } + +func (fn *mockNodeConnector) GetPeerLatency(netPeer interface{}) time.Duration { + return 0 +} + func (fn *mockNodeConnector) GetPendingTransactionGroups() (txGroups []pooldata.SignedTxGroup, latestLocallyOriginatedGroupCounter uint64) { return } |