summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com>2023-12-07 15:51:02 -0500
committerGitHub <noreply@github.com>2023-12-07 15:51:02 -0500
commit9229066ea56b1f8713385b905006e8ba09265cef (patch)
tree1d0c54d8a54b3c38ec917e07cc2681538e8dabea
parentebd35930d5865c6688e371149e10f3393c622948 (diff)
network: fixes to public address support (#5851)
* Remove http.Request.RemoteAddr overwriting in request tracker * Remove http.Request from request tracker * Add a new remoteAddresss() method providing most meaningful address for incoming requests
-rw-r--r--network/requestTracker.go78
-rw-r--r--network/requestTracker_test.go26
-rw-r--r--network/wsNetwork.go26
-rw-r--r--network/wsNetwork_test.go151
-rw-r--r--network/wsPeer.go1
5 files changed, 185 insertions, 97 deletions
diff --git a/network/requestTracker.go b/network/requestTracker.go
index 6445b9349..63fd4a72b 100644
--- a/network/requestTracker.go
+++ b/network/requestTracker.go
@@ -40,15 +40,27 @@ const (
)
// TrackerRequest hold the tracking data associated with a single request.
+// It supposed by an upstream http.Handler called before the wsNetwork's ServeHTTP
+// and wsNetwork's Listener (see Accept() method)
type TrackerRequest struct {
- created time.Time
- remoteHost string
- remotePort string
- remoteAddr string
- request *http.Request
+ created time.Time
+ // remoteHost is IP address of the remote host and it is equal to either
+ // a host part of the remoteAddr or to the value of X-Forwarded-For header (UseXForwardedForAddressField config value).
+ remoteHost string
+ // remotePort is the port of the remote peer as reported by the connection or
+ // by the standard http.Request.RemoteAddr field.
+ remotePort string
+ // remoteAddr is IP:Port of the remote host retrieved from the connection
+ // or from the standard http.Request.RemoteAddr field.
+ // This field is the real address of the remote incoming connection.
+ remoteAddr string
+ // otherPublicAddr is the public address of the other node, as reported by the other node
+ // via the X-Algorand-Location header.
+ // It is used for logging and as a rootURL for when creating a new wsPeer from a request.
+ otherPublicAddr string
+
otherTelemetryGUID string
otherInstanceName string
- otherPublicAddr string
connection net.Conn
noPrune bool
}
@@ -68,6 +80,43 @@ func makeTrackerRequest(remoteAddr, remoteHost, remotePort string, createTime ti
}
}
+// remoteAddress a best guessed remote address for the request.
+// Rational is the following:
+// remoteAddress() is used either for logging or as rootURL for creating a new wsPeer.
+// rootURL is an address to connect to. It is well defined only for peers from a phonebooks,
+// and for incoming peers the best guess is either otherPublicAddr, remoteHost, or remoteAddr.
+// - otherPublicAddr is provided by a remote peer by X-Algorand-Location header and cannot be trusted,
+// but can be used if remoteHost matches to otherPublicAddr value. In this case otherPublicAddr is a better guess
+// for a rootURL because it might include a port.
+// - remoteHost is either a real address of the remote peer or a value of X-Forwarded-For header.
+// Use it if remoteHost was taken from X-Forwarded-For header.
+// Note, the remoteHost does not include a port since a listening port is not known.
+// - remoteAddr is used otherwise.
+func (tr *TrackerRequest) remoteAddress() string {
+ if len(tr.otherPublicAddr) != 0 {
+ url, err := ParseHostOrURL(tr.otherPublicAddr)
+ if err == nil && len(tr.remoteHost) > 0 && url.Hostname() == tr.remoteHost {
+ return tr.otherPublicAddr
+ }
+ }
+ url, err := ParseHostOrURL(tr.remoteAddr)
+ if err != nil {
+ // tr.remoteAddr can't be parsed so try to use tr.remoteHost
+ // there is a chance it came from a proxy and has a meaningful value
+ if len(tr.remoteHost) != 0 {
+ return tr.remoteHost
+ }
+ // otherwise fallback to tr.remoteAddr
+ return tr.remoteAddr
+ }
+ if url.Hostname() != tr.remoteHost {
+ // if remoteAddr's host not equal to remoteHost then the remoteHost
+ // is definitely came from a proxy, use it
+ return tr.remoteHost
+ }
+ return tr.remoteAddr
+}
+
// hostIncomingRequests holds all the requests that are originating from a single host.
type hostIncomingRequests struct {
remoteHost string
@@ -142,7 +191,6 @@ func (ard *hostIncomingRequests) add(trackerRequest *TrackerRequest) {
}
// it's going to be added somewhere in the middle.
ard.requests = append(ard.requests[:itemIdx], append([]*TrackerRequest{trackerRequest}, ard.requests[itemIdx:]...)...)
- return
}
// countConnections counts the number of connection that we have that occurred after the provided specified time
@@ -372,7 +420,7 @@ func (rt *RequestTracker) sendBlockedConnectionResponse(conn net.Conn, requestTi
func (rt *RequestTracker) pruneAcceptedConnections(pruneStartDate time.Time) {
localAddrToRemove := []net.Addr{}
for localAddr, request := range rt.acceptedConnections {
- if request.noPrune == false && request.created.Before(pruneStartDate) {
+ if !request.noPrune && request.created.Before(pruneStartDate) {
localAddrToRemove = append(localAddrToRemove, localAddr)
}
}
@@ -397,7 +445,7 @@ func (rt *RequestTracker) getWaitUntilNoConnectionsChannel(checkInterval time.Du
return len(rt.httpConnections) == 0
}
- for true {
+ for {
if checkEmpty(rt) {
close(done)
return
@@ -449,7 +497,7 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.
trackedRequest := rt.acceptedConnections[localAddr]
if trackedRequest != nil {
// update the original tracker request so that it won't get pruned.
- if trackedRequest.noPrune == false {
+ if !trackedRequest.noPrune {
trackedRequest.noPrune = true
rt.hostRequests.convertToAdditionalRequest(trackedRequest)
}
@@ -464,10 +512,9 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.
}
// update the origin address.
- rt.updateRequestRemoteAddr(trackedRequest, request)
+ rt.remoteHostProxyFix(request.Header, trackedRequest)
rt.httpConnectionsMu.Lock()
- trackedRequest.request = request
trackedRequest.otherTelemetryGUID, trackedRequest.otherInstanceName, trackedRequest.otherPublicAddr = getCommonHeaders(request.Header)
rt.httpHostRequests.addRequest(trackedRequest)
rt.httpHostRequests.pruneRequests(rateLimitingWindowStartTime)
@@ -506,13 +553,12 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.
}
-// updateRequestRemoteAddr updates the origin IP address in both the trackedRequest as well as in the request.RemoteAddr string
-func (rt *RequestTracker) updateRequestRemoteAddr(trackedRequest *TrackerRequest, request *http.Request) {
- originIP := rt.getForwardedConnectionAddress(request.Header)
+// remoteHostProxyFix updates the origin IP address in the trackedRequest
+func (rt *RequestTracker) remoteHostProxyFix(header http.Header, trackedRequest *TrackerRequest) {
+ originIP := rt.getForwardedConnectionAddress(header)
if originIP == nil {
return
}
- request.RemoteAddr = originIP.String() + ":" + trackedRequest.remotePort
trackedRequest.remoteHost = originIP.String()
}
diff --git a/network/requestTracker_test.go b/network/requestTracker_test.go
index 194106914..65349987e 100644
--- a/network/requestTracker_test.go
+++ b/network/requestTracker_test.go
@@ -172,6 +172,32 @@ func TestRateLimiting(t *testing.T) {
}
}
+func TestRemoteAddress(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ tr := makeTrackerRequest("127.0.0.1:444", "", "", time.Now(), nil)
+ require.Equal(t, "127.0.0.1:444", tr.remoteAddr)
+ require.Equal(t, "127.0.0.1", tr.remoteHost)
+ require.Equal(t, "444", tr.remotePort)
+
+ require.Equal(t, "127.0.0.1:444", tr.remoteAddress())
+
+ // remoteHost set to something else via X-Forwared-For HTTP headers
+ tr.remoteHost = "10.0.0.1"
+ require.Equal(t, "10.0.0.1", tr.remoteAddress())
+
+ // otherPublicAddr is set via X-Algorand-Location HTTP header
+ // and matches to the remoteHost
+ tr.otherPublicAddr = "10.0.0.1:555"
+ require.Equal(t, "10.0.0.1:555", tr.remoteAddress())
+
+ // otherPublicAddr does not match remoteHost
+ tr.remoteHost = "127.0.0.1"
+ tr.otherPublicAddr = "127.0.0.99:555"
+ require.Equal(t, "127.0.0.1:444", tr.remoteAddress())
+}
+
func TestIsLocalHost(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()
diff --git a/network/wsNetwork.go b/network/wsNetwork.go
index 7f8b3046c..d316fcd81 100644
--- a/network/wsNetwork.go
+++ b/network/wsNetwork.go
@@ -377,9 +377,9 @@ func (wn *WebsocketNetwork) PublicAddress() string {
// If except is not nil then we will not send it to that neighboring Peer.
// if wait is true then the call blocks until the packet has actually been sent to all neighbors.
func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error {
- dataArray := make([][]byte, 1, 1)
+ dataArray := make([][]byte, 1)
dataArray[0] = data
- tagArray := make([]protocol.Tag, 1, 1)
+ tagArray := make([]protocol.Tag, 1)
tagArray[0] = tag
return wn.broadcaster.BroadcastArray(ctx, tagArray, dataArray, wait, except)
}
@@ -947,7 +947,7 @@ func (wn *WebsocketNetwork) checkProtocolVersionMatch(otherHeaders http.Header)
// checkIncomingConnectionVariables checks the variables that were provided on the request, and compares them to the
// local server supported parameters. If all good, it returns http.StatusOK; otherwise, it write the error to the ResponseWriter
// and returns the http status.
-func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.ResponseWriter, request *http.Request) int {
+func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.ResponseWriter, request *http.Request, remoteAddrForLogging string) int {
// check to see that the genesisID in the request URI is valid and matches the supported one.
pathVars := mux.Vars(request)
otherGenesisID, hasGenesisID := pathVars["genesisID"]
@@ -958,7 +958,7 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo
}
if wn.GenesisID != otherGenesisID {
- wn.log.Warn(filterASCII(fmt.Sprintf("new peer %#v genesis mismatch, mine=%#v theirs=%#v, headers %#v", request.RemoteAddr, wn.GenesisID, otherGenesisID, request.Header)))
+ wn.log.Warn(filterASCII(fmt.Sprintf("new peer %#v genesis mismatch, mine=%#v theirs=%#v, headers %#v", remoteAddrForLogging, wn.GenesisID, otherGenesisID, request.Header)))
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "mismatching genesis-id"})
response.WriteHeader(http.StatusPreconditionFailed)
n, err := response.Write([]byte("mismatching genesis ID"))
@@ -973,7 +973,7 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo
// This is pretty harmless and some configurations of phonebooks or DNS records make this likely. Quietly filter it out.
var message string
// missing header.
- wn.log.Warn(filterASCII(fmt.Sprintf("new peer %s did not include random ID header in request. mine=%s headers %#v", request.RemoteAddr, wn.RandomID, request.Header)))
+ wn.log.Warn(filterASCII(fmt.Sprintf("new peer %s did not include random ID header in request. mine=%s headers %#v", remoteAddrForLogging, wn.RandomID, request.Header)))
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "missing random ID header"})
message = fmt.Sprintf("Request was missing a %s header", NodeRandomHeader)
response.WriteHeader(http.StatusPreconditionFailed)
@@ -985,7 +985,7 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo
} else if otherRandom == wn.RandomID {
// This is pretty harmless and some configurations of phonebooks or DNS records make this likely. Quietly filter it out.
var message string
- wn.log.Debugf("new peer %s has same node random id, am I talking to myself? %s", request.RemoteAddr, wn.RandomID)
+ wn.log.Debugf("new peer %s has same node random id, am I talking to myself? %s", remoteAddrForLogging, wn.RandomID)
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "matching random ID header"})
message = fmt.Sprintf("Request included matching %s=%s header", NodeRandomHeader, otherRandom)
response.WriteHeader(http.StatusLoopDetected)
@@ -1025,7 +1025,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
matchingVersion, otherVersion := wn.checkProtocolVersionMatch(request.Header)
if matchingVersion == "" {
- wn.log.Info(filterASCII(fmt.Sprintf("new peer %s version mismatch, mine=%v theirs=%s, headers %#v", request.RemoteAddr, wn.supportedProtocolVersions, otherVersion, request.Header)))
+ wn.log.Info(filterASCII(fmt.Sprintf("new peer %s version mismatch, mine=%v theirs=%s, headers %#v", trackedRequest.remoteHost, wn.supportedProtocolVersions, otherVersion, request.Header)))
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "mismatching protocol version"})
response.WriteHeader(http.StatusPreconditionFailed)
message := fmt.Sprintf("Requested version %s not in %v mismatches server version", filterASCII(otherVersion), wn.supportedProtocolVersions)
@@ -1036,14 +1036,11 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
return
}
- if wn.checkIncomingConnectionVariables(response, request) != http.StatusOK {
+ if wn.checkIncomingConnectionVariables(response, request, trackedRequest.remoteAddress()) != http.StatusOK {
// we've already logged and written all response(s).
return
}
- // if UseXForwardedForAddressField is not empty, attempt to override the otherPublicAddr with the X Forwarded For origin
- trackedRequest.otherPublicAddr = trackedRequest.remoteAddr
-
responseHeader := make(http.Header)
wn.setHeaders(responseHeader)
responseHeader.Set(ProtocolVersionHeader, matchingVersion)
@@ -1063,7 +1060,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
peerIDChallenge, peerID, err = wn.identityScheme.VerifyRequestAndAttachResponse(responseHeader, request.Header)
if err != nil {
networkPeerIdentityError.Inc(nil)
- wn.log.With("err", err).With("remote", trackedRequest.otherPublicAddr).With("local", localAddr).Warnf("peer (%s) supplied an invalid identity challenge, abandoning peering", trackedRequest.otherPublicAddr)
+ wn.log.With("err", err).With("remote", trackedRequest.remoteAddress()).With("local", localAddr).Warnf("peer (%s) supplied an invalid identity challenge, abandoning peering", trackedRequest.remoteAddr)
return
}
}
@@ -1081,7 +1078,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
}
peer := &wsPeer{
- wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, trackedRequest.otherPublicAddr, wn.GetRoundTripper(), trackedRequest.remoteHost),
+ wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, trackedRequest.remoteAddress(), wn.GetRoundTripper(), trackedRequest.remoteHost),
conn: wsPeerWebsocketConnImpl{conn},
outgoing: false,
InstanceName: trackedRequest.otherInstanceName,
@@ -1097,7 +1094,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
peer.TelemetryGUID = trackedRequest.otherTelemetryGUID
peer.init(wn.config, wn.outgoingMessagesBufferSize)
wn.addPeer(peer)
- wn.log.With("event", "ConnectedIn").With("remote", trackedRequest.otherPublicAddr).With("local", localAddr).Infof("Accepted incoming connection from peer %s", trackedRequest.otherPublicAddr)
+ wn.log.With("event", "ConnectedIn").With("remote", trackedRequest.remoteAddress()).With("local", localAddr).Infof("Accepted incoming connection from peer %s", trackedRequest.remoteAddr)
wn.log.EventWithDetails(telemetryspec.Network, telemetryspec.ConnectPeerEvent,
telemetryspec.PeerEventDetails{
Address: trackedRequest.remoteHost,
@@ -2047,6 +2044,7 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) {
}
}()
defer wn.wg.Done()
+
requestHeader := make(http.Header)
wn.setHeaders(requestHeader)
for _, supportedProtocolVersion := range wn.supportedProtocolVersions {
diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go
index 05e484843..f8eeeb71f 100644
--- a/network/wsNetwork_test.go
+++ b/network/wsNetwork_test.go
@@ -1332,8 +1332,6 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
addrA, ok := netA.Address()
require.True(t, ok)
- gossipA, err := netA.addrToGossipAddr(addrA)
- require.NoError(t, err)
addrB, ok := netB.Address()
require.True(t, ok)
@@ -1349,7 +1347,9 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
netA.wg.Add(1)
netA.tryConnect(addrB, gossipB)
// let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
+ assert.Eventually(t, func() bool {
+ return len(netA.GetPeers(PeersConnectedOut)) == 1
+ }, time.Second, 50*time.Millisecond)
}
// just one A->B connection
assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn)))
@@ -1362,17 +1362,16 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
assert.Equal(t, 1, netA.identityTracker.(*mockIdentityTracker).getInsertCount())
// netB has to wait for a final verification message over WS Handler, so pause a moment
- time.Sleep(250 * time.Millisecond)
+ assert.Eventually(t, func() bool {
+ return netB.identityTracker.(*mockIdentityTracker).getSetCount() == 1
+ }, time.Second, 50*time.Millisecond)
+
assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getSetCount())
assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getInsertCount())
// bi-directional connection from B should not proceed
- if _, ok := netB.tryConnectReserveAddr(addrA); ok {
- netB.wg.Add(1)
- netB.tryConnect(addrA, gossipA)
- // let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
- }
+ _, ok = netB.tryConnectReserveAddr(addrA)
+ assert.False(t, ok)
// still just one A->B connection
assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn)))
@@ -1381,9 +1380,9 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
assert.Equal(t, 0, len(netB.GetPeers(PeersConnectedOut)))
// netA never attempts to set identity as it never sees a verified identity
assert.Equal(t, 1, netA.identityTracker.(*mockIdentityTracker).getSetCount())
- // netB would attempt to add the identity to the tracker
- // but it would not end up being added
- assert.Equal(t, 2, netB.identityTracker.(*mockIdentityTracker).getSetCount())
+ // no connecton => netB does attepmt to add the identity to the tracker
+ // and it would not end up being added
+ assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getSetCount())
assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getInsertCount())
// Check deduplication again, this time from A
@@ -1391,15 +1390,19 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
// will prevent this connection from attempting in the first place
// in the real world, that isConnectedTo doesn't always trigger, if the hosts are behind
// a load balancer or other NAT
- if _, ok := netA.tryConnectReserveAddr(addrB); ok || true {
- netA.wg.Add(1)
- netA.tryConnect(addrB, gossipB)
- // let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
- }
+ _, ok = netA.tryConnectReserveAddr(addrB)
+ assert.False(t, ok)
+ netA.wg.Add(1)
+ old := networkPeerIdentityDisconnect.GetUint64Value()
+ netA.tryConnect(addrB, gossipB)
+ // let the tryConnect go forward
+ assert.Eventually(t, func() bool {
+ new := networkPeerIdentityDisconnect.GetUint64Value()
+ return new > old
+ }, time.Second, 50*time.Millisecond)
// netB never tries to add a new identity, since the connection gets abandoned before it is verified
- assert.Equal(t, 2, netB.identityTracker.(*mockIdentityTracker).getSetCount())
+ assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getSetCount())
assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getInsertCount())
// still just one A->B connection
assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn)))
@@ -1411,11 +1414,9 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
// the underlying connection is being closed. In this case, the read loop
// on the peer will detect and close the peer. Since this is asynchronous,
// we wait and check regularly to allow the connection to settle
- assert.Eventually(
- t,
- func() bool { return len(netB.GetPeers(PeersConnectedIn)) == 1 },
- 5*time.Second,
- 100*time.Millisecond)
+ assert.Eventually(t, func() bool {
+ return len(netB.GetPeers(PeersConnectedIn)) == 1
+ }, time.Second, 50*time.Millisecond)
// Now have A connect to node C, which has the same PublicAddress as B (e.g., because it shares the
// same public load balancer endpoint). C will have a different identity keypair and so will not be
@@ -1432,13 +1433,15 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
require.True(t, ok)
gossipC, err := netC.addrToGossipAddr(addrC)
require.NoError(t, err)
- addrC = hostAndPort(addrC)
+ assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedOut)))
// A connects to C (but uses addrB here to simulate case where B & C have the same PublicAddress)
netA.wg.Add(1)
netA.tryConnect(addrB, gossipC)
// let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
+ assert.Eventually(t, func() bool {
+ return len(netA.GetPeers(PeersConnectedOut)) == 2
+ }, time.Second, 50*time.Millisecond)
// A->B and A->C both open
assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn)))
@@ -1453,7 +1456,10 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
assert.Equal(t, 2, netA.identityTracker.(*mockIdentityTracker).getInsertCount())
// netC has to wait for a final verification message over WS Handler, so pause a moment
- time.Sleep(250 * time.Millisecond)
+ assert.Eventually(t, func() bool {
+ return netC.identityTracker.(*mockIdentityTracker).getSetCount() == 1
+ }, time.Second, 50*time.Millisecond)
+
assert.Equal(t, 1, netC.identityTracker.(*mockIdentityTracker).getSetCount())
assert.Equal(t, 1, netC.identityTracker.(*mockIdentityTracker).getInsertCount())
@@ -1481,8 +1487,6 @@ func TestPeeringSenderIdentityChallengeOnly(t *testing.T) {
addrA, ok := netA.Address()
require.True(t, ok)
- gossipA, err := netA.addrToGossipAddr(addrA)
- require.NoError(t, err)
addrB, ok := netB.Address()
require.True(t, ok)
@@ -1493,12 +1497,16 @@ func TestPeeringSenderIdentityChallengeOnly(t *testing.T) {
addrA = hostAndPort(addrA)
addrB = hostAndPort(addrB)
+ assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedOut)))
+ assert.Equal(t, 0, len(netB.GetPeers(PeersConnectedIn)))
+
// first connection should work just fine
if _, ok := netA.tryConnectReserveAddr(addrB); ok {
netA.wg.Add(1)
netA.tryConnect(addrB, gossipB)
- // let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
+ assert.Eventually(t, func() bool {
+ return len(netA.GetPeers(PeersConnectedOut)) == 1
+ }, time.Second, 50*time.Millisecond)
}
assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedOut)))
assert.Equal(t, 1, len(netB.GetPeers(PeersConnectedIn)))
@@ -1507,18 +1515,15 @@ func TestPeeringSenderIdentityChallengeOnly(t *testing.T) {
assert.Equal(t, 0, netA.identityTracker.(*mockIdentityTracker).getSetCount())
assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount())
- // bi-directional connection should also work
- if _, ok := netB.tryConnectReserveAddr(addrA); ok {
- netB.wg.Add(1)
- netB.tryConnect(addrA, gossipA)
- // let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
- }
- // the nodes are connected redundantly
- assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn)))
+ // bi-directional connection does not work because netA advertises its public address
+ _, ok = netB.tryConnectReserveAddr(addrA)
+ assert.False(t, ok)
+
+ // no redundant connections
+ assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn)))
assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedOut)))
assert.Equal(t, 1, len(netB.GetPeers(PeersConnectedIn)))
- assert.Equal(t, 1, len(netB.GetPeers(PeersConnectedOut)))
+ assert.Equal(t, 0, len(netB.GetPeers(PeersConnectedOut)))
// confirm identity map was not added to for either host
assert.Equal(t, 0, netA.identityTracker.(*mockIdentityTracker).getSetCount())
assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount())
@@ -1558,12 +1563,15 @@ func TestPeeringReceiverIdentityChallengeOnly(t *testing.T) {
addrA = hostAndPort(addrA)
addrB = hostAndPort(addrB)
+ assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedOut)))
// first connection should work just fine
if _, ok := netA.tryConnectReserveAddr(addrB); ok {
netA.wg.Add(1)
netA.tryConnect(addrB, gossipB)
// let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
+ assert.Eventually(t, func() bool {
+ return len(netA.GetPeers(PeersConnectedOut)) == 1
+ }, time.Second, 50*time.Millisecond)
}
// single A->B connection
assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn)))
@@ -1580,7 +1588,9 @@ func TestPeeringReceiverIdentityChallengeOnly(t *testing.T) {
netB.wg.Add(1)
netB.tryConnect(addrA, gossipA)
// let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
+ assert.Eventually(t, func() bool {
+ return len(netB.GetPeers(PeersConnectedOut)) == 1
+ }, time.Second, 50*time.Millisecond)
}
assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn)))
assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedOut)))
@@ -1591,7 +1601,7 @@ func TestPeeringReceiverIdentityChallengeOnly(t *testing.T) {
assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount())
}
-// TestPeeringIncorrectDeduplicationName confirm that if the reciever can't match
+// TestPeeringIncorrectDeduplicationName confirm that if the reciever can't match
// the Address in the challenge to its PublicAddress, identities aren't exchanged, but peering continues
func TestPeeringIncorrectDeduplicationName(t *testing.T) {
partitiontest.PartitionTest(t)
@@ -1625,12 +1635,15 @@ func TestPeeringIncorrectDeduplicationName(t *testing.T) {
addrA = hostAndPort(addrA)
addrB = hostAndPort(addrB)
+ assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedOut)))
// first connection should work just fine
if _, ok := netA.tryConnectReserveAddr(addrB); ok {
netA.wg.Add(1)
netA.tryConnect(addrB, gossipB)
// let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
+ assert.Eventually(t, func() bool {
+ return len(netA.GetPeers(PeersConnectedOut)) == 1
+ }, time.Second, 50*time.Millisecond)
}
// single A->B connection
assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn)))
@@ -1643,14 +1656,18 @@ func TestPeeringIncorrectDeduplicationName(t *testing.T) {
assert.Equal(t, 0, netA.identityTracker.(*mockIdentityTracker).getSetCount())
assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount())
- // bi-directional connection should also work
+ // bi-directional connection would now work since netB detects to be connected to netA in tryConnectReserveAddr,
+ // so force it.
// this second connection should set identities, because the reciever address matches now
- if _, ok := netB.tryConnectReserveAddr(addrA); ok {
- netB.wg.Add(1)
- netB.tryConnect(addrA, gossipA)
- // let the tryConnect go forward
- time.Sleep(250 * time.Millisecond)
- }
+ _, ok = netB.tryConnectReserveAddr(addrA)
+ assert.False(t, ok)
+ netB.wg.Add(1)
+ netB.tryConnect(addrA, gossipA)
+ // let the tryConnect go forward
+ assert.Eventually(t, func() bool {
+ return len(netB.GetPeers(PeersConnectedOut)) == 1
+ }, time.Second, 50*time.Millisecond)
+
// confirm that at this point the identityTracker was called once per network
// and inserted once per network
assert.Equal(t, 1, netA.identityTracker.(*mockIdentityTracker).getSetCount())
@@ -1982,14 +1999,13 @@ func TestPeeringWithBadIdentityVerification(t *testing.T) {
partitiontest.PartitionTest(t)
type testCase struct {
- name string
- verifyResponse func(t *testing.T, h http.Header, c identityChallengeValue) (crypto.PublicKey, []byte, error)
- totalInA int
- totalOutA int
- totalInB int
- totalOutB int
- additionalSleep time.Duration
- occupied bool
+ name string
+ verifyResponse func(t *testing.T, h http.Header, c identityChallengeValue) (crypto.PublicKey, []byte, error)
+ totalInA int
+ totalOutA int
+ totalInB int
+ totalOutB int
+ occupied bool
}
testCases := []testCase{
@@ -2602,7 +2618,7 @@ func TestSlowPeerDisconnection(t *testing.T) {
peers, _ = netA.peerSnapshot(peers)
if len(peers) == 0 || peers[0] != peer {
// make sure it took more than 1 second, and less than 5 seconds.
- waitTime := time.Now().Sub(beforeLoopTime)
+ waitTime := time.Since(beforeLoopTime)
require.LessOrEqual(t, int64(time.Second), int64(waitTime))
require.GreaterOrEqual(t, int64(5*time.Second), int64(waitTime))
break
@@ -2895,7 +2911,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) {
netB.config.EnablePingHandler = false
addrA, postListen := netA.Address()
require.True(t, postListen)
- t.Log(addrA)
+ t.Logf("netA %s", addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
// have netB asking netA to send it ft2, deregister ping handler to make sure that we aren't exceeding the maximum MOI messagesize
@@ -2905,6 +2921,8 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) {
netB.Start()
defer netStop(t, netB, "B")
+ addrB, _ := netB.Address()
+ t.Logf("netB %s", addrB)
incomingMsgSync := deadlock.Mutex{}
msgCounters := make(map[protocol.Tag]int)
@@ -3652,7 +3670,7 @@ func BenchmarkVariableTransactionMessageBlockSizes(t *testing.B) {
netB.Broadcast(context.Background(), protocol.TxnTag, dataBuffer, true, nil)
<-msgProcessed
}
- deltaTime := time.Now().Sub(startTime)
+ deltaTime := time.Since(startTime)
rate = float64(t.N) * float64(time.Second) / float64(deltaTime)
t.ReportMetric(rate, "txn/sec")
})
@@ -3794,7 +3812,6 @@ func TestWebsocketNetworkTelemetryTCP(t *testing.T) {
type mockServer struct {
*httptest.Server
URL string
- t *testing.T
waitForClientClose bool
}
@@ -3845,7 +3862,7 @@ func (t mockHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
wr.Close()
- for true {
+ for {
// echo a message back to the client
_, _, err := ws.NextReader()
if err != nil {
@@ -4002,7 +4019,7 @@ func TestDiscardUnrequestedBlockResponse(t *testing.T) {
require.Eventually(t, func() bool { return netA.NumPeers() == 1 }, 500*time.Millisecond, 25*time.Millisecond)
// send an unrequested block response
- msg := make([]sendMessage, 1, 1)
+ msg := make([]sendMessage, 1)
msg[0] = sendMessage{
data: append([]byte(protocol.TopicMsgRespTag), []byte("foo")...),
enqueued: time.Now(),
diff --git a/network/wsPeer.go b/network/wsPeer.go
index 4ed32539a..7cbdbeaeb 100644
--- a/network/wsPeer.go
+++ b/network/wsPeer.go
@@ -360,6 +360,7 @@ func makePeerCore(ctx context.Context, net GossipNode, log logging.Logger, readB
}
// GetAddress returns the root url to use to connect to this peer.
+// This implements HTTPPeer interface and used by external services to determine where to connect to.
// TODO: should GetAddress be added to Peer interface?
func (wp *wsPeerCore) GetAddress() string {
return wp.rootURL