diff options
author | Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> | 2023-12-07 15:51:02 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-07 15:51:02 -0500 |
commit | 9229066ea56b1f8713385b905006e8ba09265cef (patch) | |
tree | 1d0c54d8a54b3c38ec917e07cc2681538e8dabea | |
parent | ebd35930d5865c6688e371149e10f3393c622948 (diff) |
network: fixes to public address support (#5851)
* Remove http.Request.RemoteAddr overwriting in request tracker
* Remove http.Request from request tracker
* Add a new remoteAddresss() method providing most meaningful address for incoming requests
-rw-r--r-- | network/requestTracker.go | 78 | ||||
-rw-r--r-- | network/requestTracker_test.go | 26 | ||||
-rw-r--r-- | network/wsNetwork.go | 26 | ||||
-rw-r--r-- | network/wsNetwork_test.go | 151 | ||||
-rw-r--r-- | network/wsPeer.go | 1 |
5 files changed, 185 insertions, 97 deletions
diff --git a/network/requestTracker.go b/network/requestTracker.go index 6445b9349..63fd4a72b 100644 --- a/network/requestTracker.go +++ b/network/requestTracker.go @@ -40,15 +40,27 @@ const ( ) // TrackerRequest hold the tracking data associated with a single request. +// It supposed by an upstream http.Handler called before the wsNetwork's ServeHTTP +// and wsNetwork's Listener (see Accept() method) type TrackerRequest struct { - created time.Time - remoteHost string - remotePort string - remoteAddr string - request *http.Request + created time.Time + // remoteHost is IP address of the remote host and it is equal to either + // a host part of the remoteAddr or to the value of X-Forwarded-For header (UseXForwardedForAddressField config value). + remoteHost string + // remotePort is the port of the remote peer as reported by the connection or + // by the standard http.Request.RemoteAddr field. + remotePort string + // remoteAddr is IP:Port of the remote host retrieved from the connection + // or from the standard http.Request.RemoteAddr field. + // This field is the real address of the remote incoming connection. + remoteAddr string + // otherPublicAddr is the public address of the other node, as reported by the other node + // via the X-Algorand-Location header. + // It is used for logging and as a rootURL for when creating a new wsPeer from a request. + otherPublicAddr string + otherTelemetryGUID string otherInstanceName string - otherPublicAddr string connection net.Conn noPrune bool } @@ -68,6 +80,43 @@ func makeTrackerRequest(remoteAddr, remoteHost, remotePort string, createTime ti } } +// remoteAddress a best guessed remote address for the request. +// Rational is the following: +// remoteAddress() is used either for logging or as rootURL for creating a new wsPeer. +// rootURL is an address to connect to. It is well defined only for peers from a phonebooks, +// and for incoming peers the best guess is either otherPublicAddr, remoteHost, or remoteAddr. +// - otherPublicAddr is provided by a remote peer by X-Algorand-Location header and cannot be trusted, +// but can be used if remoteHost matches to otherPublicAddr value. In this case otherPublicAddr is a better guess +// for a rootURL because it might include a port. +// - remoteHost is either a real address of the remote peer or a value of X-Forwarded-For header. +// Use it if remoteHost was taken from X-Forwarded-For header. +// Note, the remoteHost does not include a port since a listening port is not known. +// - remoteAddr is used otherwise. +func (tr *TrackerRequest) remoteAddress() string { + if len(tr.otherPublicAddr) != 0 { + url, err := ParseHostOrURL(tr.otherPublicAddr) + if err == nil && len(tr.remoteHost) > 0 && url.Hostname() == tr.remoteHost { + return tr.otherPublicAddr + } + } + url, err := ParseHostOrURL(tr.remoteAddr) + if err != nil { + // tr.remoteAddr can't be parsed so try to use tr.remoteHost + // there is a chance it came from a proxy and has a meaningful value + if len(tr.remoteHost) != 0 { + return tr.remoteHost + } + // otherwise fallback to tr.remoteAddr + return tr.remoteAddr + } + if url.Hostname() != tr.remoteHost { + // if remoteAddr's host not equal to remoteHost then the remoteHost + // is definitely came from a proxy, use it + return tr.remoteHost + } + return tr.remoteAddr +} + // hostIncomingRequests holds all the requests that are originating from a single host. type hostIncomingRequests struct { remoteHost string @@ -142,7 +191,6 @@ func (ard *hostIncomingRequests) add(trackerRequest *TrackerRequest) { } // it's going to be added somewhere in the middle. ard.requests = append(ard.requests[:itemIdx], append([]*TrackerRequest{trackerRequest}, ard.requests[itemIdx:]...)...) - return } // countConnections counts the number of connection that we have that occurred after the provided specified time @@ -372,7 +420,7 @@ func (rt *RequestTracker) sendBlockedConnectionResponse(conn net.Conn, requestTi func (rt *RequestTracker) pruneAcceptedConnections(pruneStartDate time.Time) { localAddrToRemove := []net.Addr{} for localAddr, request := range rt.acceptedConnections { - if request.noPrune == false && request.created.Before(pruneStartDate) { + if !request.noPrune && request.created.Before(pruneStartDate) { localAddrToRemove = append(localAddrToRemove, localAddr) } } @@ -397,7 +445,7 @@ func (rt *RequestTracker) getWaitUntilNoConnectionsChannel(checkInterval time.Du return len(rt.httpConnections) == 0 } - for true { + for { if checkEmpty(rt) { close(done) return @@ -449,7 +497,7 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http. trackedRequest := rt.acceptedConnections[localAddr] if trackedRequest != nil { // update the original tracker request so that it won't get pruned. - if trackedRequest.noPrune == false { + if !trackedRequest.noPrune { trackedRequest.noPrune = true rt.hostRequests.convertToAdditionalRequest(trackedRequest) } @@ -464,10 +512,9 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http. } // update the origin address. - rt.updateRequestRemoteAddr(trackedRequest, request) + rt.remoteHostProxyFix(request.Header, trackedRequest) rt.httpConnectionsMu.Lock() - trackedRequest.request = request trackedRequest.otherTelemetryGUID, trackedRequest.otherInstanceName, trackedRequest.otherPublicAddr = getCommonHeaders(request.Header) rt.httpHostRequests.addRequest(trackedRequest) rt.httpHostRequests.pruneRequests(rateLimitingWindowStartTime) @@ -506,13 +553,12 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http. } -// updateRequestRemoteAddr updates the origin IP address in both the trackedRequest as well as in the request.RemoteAddr string -func (rt *RequestTracker) updateRequestRemoteAddr(trackedRequest *TrackerRequest, request *http.Request) { - originIP := rt.getForwardedConnectionAddress(request.Header) +// remoteHostProxyFix updates the origin IP address in the trackedRequest +func (rt *RequestTracker) remoteHostProxyFix(header http.Header, trackedRequest *TrackerRequest) { + originIP := rt.getForwardedConnectionAddress(header) if originIP == nil { return } - request.RemoteAddr = originIP.String() + ":" + trackedRequest.remotePort trackedRequest.remoteHost = originIP.String() } diff --git a/network/requestTracker_test.go b/network/requestTracker_test.go index 194106914..65349987e 100644 --- a/network/requestTracker_test.go +++ b/network/requestTracker_test.go @@ -172,6 +172,32 @@ func TestRateLimiting(t *testing.T) { } } +func TestRemoteAddress(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + tr := makeTrackerRequest("127.0.0.1:444", "", "", time.Now(), nil) + require.Equal(t, "127.0.0.1:444", tr.remoteAddr) + require.Equal(t, "127.0.0.1", tr.remoteHost) + require.Equal(t, "444", tr.remotePort) + + require.Equal(t, "127.0.0.1:444", tr.remoteAddress()) + + // remoteHost set to something else via X-Forwared-For HTTP headers + tr.remoteHost = "10.0.0.1" + require.Equal(t, "10.0.0.1", tr.remoteAddress()) + + // otherPublicAddr is set via X-Algorand-Location HTTP header + // and matches to the remoteHost + tr.otherPublicAddr = "10.0.0.1:555" + require.Equal(t, "10.0.0.1:555", tr.remoteAddress()) + + // otherPublicAddr does not match remoteHost + tr.remoteHost = "127.0.0.1" + tr.otherPublicAddr = "127.0.0.99:555" + require.Equal(t, "127.0.0.1:444", tr.remoteAddress()) +} + func TestIsLocalHost(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 7f8b3046c..d316fcd81 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -377,9 +377,9 @@ func (wn *WebsocketNetwork) PublicAddress() string { // If except is not nil then we will not send it to that neighboring Peer. // if wait is true then the call blocks until the packet has actually been sent to all neighbors. func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error { - dataArray := make([][]byte, 1, 1) + dataArray := make([][]byte, 1) dataArray[0] = data - tagArray := make([]protocol.Tag, 1, 1) + tagArray := make([]protocol.Tag, 1) tagArray[0] = tag return wn.broadcaster.BroadcastArray(ctx, tagArray, dataArray, wait, except) } @@ -947,7 +947,7 @@ func (wn *WebsocketNetwork) checkProtocolVersionMatch(otherHeaders http.Header) // checkIncomingConnectionVariables checks the variables that were provided on the request, and compares them to the // local server supported parameters. If all good, it returns http.StatusOK; otherwise, it write the error to the ResponseWriter // and returns the http status. -func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.ResponseWriter, request *http.Request) int { +func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.ResponseWriter, request *http.Request, remoteAddrForLogging string) int { // check to see that the genesisID in the request URI is valid and matches the supported one. pathVars := mux.Vars(request) otherGenesisID, hasGenesisID := pathVars["genesisID"] @@ -958,7 +958,7 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo } if wn.GenesisID != otherGenesisID { - wn.log.Warn(filterASCII(fmt.Sprintf("new peer %#v genesis mismatch, mine=%#v theirs=%#v, headers %#v", request.RemoteAddr, wn.GenesisID, otherGenesisID, request.Header))) + wn.log.Warn(filterASCII(fmt.Sprintf("new peer %#v genesis mismatch, mine=%#v theirs=%#v, headers %#v", remoteAddrForLogging, wn.GenesisID, otherGenesisID, request.Header))) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "mismatching genesis-id"}) response.WriteHeader(http.StatusPreconditionFailed) n, err := response.Write([]byte("mismatching genesis ID")) @@ -973,7 +973,7 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo // This is pretty harmless and some configurations of phonebooks or DNS records make this likely. Quietly filter it out. var message string // missing header. - wn.log.Warn(filterASCII(fmt.Sprintf("new peer %s did not include random ID header in request. mine=%s headers %#v", request.RemoteAddr, wn.RandomID, request.Header))) + wn.log.Warn(filterASCII(fmt.Sprintf("new peer %s did not include random ID header in request. mine=%s headers %#v", remoteAddrForLogging, wn.RandomID, request.Header))) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "missing random ID header"}) message = fmt.Sprintf("Request was missing a %s header", NodeRandomHeader) response.WriteHeader(http.StatusPreconditionFailed) @@ -985,7 +985,7 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo } else if otherRandom == wn.RandomID { // This is pretty harmless and some configurations of phonebooks or DNS records make this likely. Quietly filter it out. var message string - wn.log.Debugf("new peer %s has same node random id, am I talking to myself? %s", request.RemoteAddr, wn.RandomID) + wn.log.Debugf("new peer %s has same node random id, am I talking to myself? %s", remoteAddrForLogging, wn.RandomID) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "matching random ID header"}) message = fmt.Sprintf("Request included matching %s=%s header", NodeRandomHeader, otherRandom) response.WriteHeader(http.StatusLoopDetected) @@ -1025,7 +1025,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt matchingVersion, otherVersion := wn.checkProtocolVersionMatch(request.Header) if matchingVersion == "" { - wn.log.Info(filterASCII(fmt.Sprintf("new peer %s version mismatch, mine=%v theirs=%s, headers %#v", request.RemoteAddr, wn.supportedProtocolVersions, otherVersion, request.Header))) + wn.log.Info(filterASCII(fmt.Sprintf("new peer %s version mismatch, mine=%v theirs=%s, headers %#v", trackedRequest.remoteHost, wn.supportedProtocolVersions, otherVersion, request.Header))) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "mismatching protocol version"}) response.WriteHeader(http.StatusPreconditionFailed) message := fmt.Sprintf("Requested version %s not in %v mismatches server version", filterASCII(otherVersion), wn.supportedProtocolVersions) @@ -1036,14 +1036,11 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt return } - if wn.checkIncomingConnectionVariables(response, request) != http.StatusOK { + if wn.checkIncomingConnectionVariables(response, request, trackedRequest.remoteAddress()) != http.StatusOK { // we've already logged and written all response(s). return } - // if UseXForwardedForAddressField is not empty, attempt to override the otherPublicAddr with the X Forwarded For origin - trackedRequest.otherPublicAddr = trackedRequest.remoteAddr - responseHeader := make(http.Header) wn.setHeaders(responseHeader) responseHeader.Set(ProtocolVersionHeader, matchingVersion) @@ -1063,7 +1060,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt peerIDChallenge, peerID, err = wn.identityScheme.VerifyRequestAndAttachResponse(responseHeader, request.Header) if err != nil { networkPeerIdentityError.Inc(nil) - wn.log.With("err", err).With("remote", trackedRequest.otherPublicAddr).With("local", localAddr).Warnf("peer (%s) supplied an invalid identity challenge, abandoning peering", trackedRequest.otherPublicAddr) + wn.log.With("err", err).With("remote", trackedRequest.remoteAddress()).With("local", localAddr).Warnf("peer (%s) supplied an invalid identity challenge, abandoning peering", trackedRequest.remoteAddr) return } } @@ -1081,7 +1078,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt } peer := &wsPeer{ - wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, trackedRequest.otherPublicAddr, wn.GetRoundTripper(), trackedRequest.remoteHost), + wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, trackedRequest.remoteAddress(), wn.GetRoundTripper(), trackedRequest.remoteHost), conn: wsPeerWebsocketConnImpl{conn}, outgoing: false, InstanceName: trackedRequest.otherInstanceName, @@ -1097,7 +1094,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt peer.TelemetryGUID = trackedRequest.otherTelemetryGUID peer.init(wn.config, wn.outgoingMessagesBufferSize) wn.addPeer(peer) - wn.log.With("event", "ConnectedIn").With("remote", trackedRequest.otherPublicAddr).With("local", localAddr).Infof("Accepted incoming connection from peer %s", trackedRequest.otherPublicAddr) + wn.log.With("event", "ConnectedIn").With("remote", trackedRequest.remoteAddress()).With("local", localAddr).Infof("Accepted incoming connection from peer %s", trackedRequest.remoteAddr) wn.log.EventWithDetails(telemetryspec.Network, telemetryspec.ConnectPeerEvent, telemetryspec.PeerEventDetails{ Address: trackedRequest.remoteHost, @@ -2047,6 +2044,7 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { } }() defer wn.wg.Done() + requestHeader := make(http.Header) wn.setHeaders(requestHeader) for _, supportedProtocolVersion := range wn.supportedProtocolVersions { diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 05e484843..f8eeeb71f 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -1332,8 +1332,6 @@ func TestPeeringWithIdentityChallenge(t *testing.T) { addrA, ok := netA.Address() require.True(t, ok) - gossipA, err := netA.addrToGossipAddr(addrA) - require.NoError(t, err) addrB, ok := netB.Address() require.True(t, ok) @@ -1349,7 +1347,9 @@ func TestPeeringWithIdentityChallenge(t *testing.T) { netA.wg.Add(1) netA.tryConnect(addrB, gossipB) // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) + assert.Eventually(t, func() bool { + return len(netA.GetPeers(PeersConnectedOut)) == 1 + }, time.Second, 50*time.Millisecond) } // just one A->B connection assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn))) @@ -1362,17 +1362,16 @@ func TestPeeringWithIdentityChallenge(t *testing.T) { assert.Equal(t, 1, netA.identityTracker.(*mockIdentityTracker).getInsertCount()) // netB has to wait for a final verification message over WS Handler, so pause a moment - time.Sleep(250 * time.Millisecond) + assert.Eventually(t, func() bool { + return netB.identityTracker.(*mockIdentityTracker).getSetCount() == 1 + }, time.Second, 50*time.Millisecond) + assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getSetCount()) assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getInsertCount()) // bi-directional connection from B should not proceed - if _, ok := netB.tryConnectReserveAddr(addrA); ok { - netB.wg.Add(1) - netB.tryConnect(addrA, gossipA) - // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) - } + _, ok = netB.tryConnectReserveAddr(addrA) + assert.False(t, ok) // still just one A->B connection assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn))) @@ -1381,9 +1380,9 @@ func TestPeeringWithIdentityChallenge(t *testing.T) { assert.Equal(t, 0, len(netB.GetPeers(PeersConnectedOut))) // netA never attempts to set identity as it never sees a verified identity assert.Equal(t, 1, netA.identityTracker.(*mockIdentityTracker).getSetCount()) - // netB would attempt to add the identity to the tracker - // but it would not end up being added - assert.Equal(t, 2, netB.identityTracker.(*mockIdentityTracker).getSetCount()) + // no connecton => netB does attepmt to add the identity to the tracker + // and it would not end up being added + assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getSetCount()) assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getInsertCount()) // Check deduplication again, this time from A @@ -1391,15 +1390,19 @@ func TestPeeringWithIdentityChallenge(t *testing.T) { // will prevent this connection from attempting in the first place // in the real world, that isConnectedTo doesn't always trigger, if the hosts are behind // a load balancer or other NAT - if _, ok := netA.tryConnectReserveAddr(addrB); ok || true { - netA.wg.Add(1) - netA.tryConnect(addrB, gossipB) - // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) - } + _, ok = netA.tryConnectReserveAddr(addrB) + assert.False(t, ok) + netA.wg.Add(1) + old := networkPeerIdentityDisconnect.GetUint64Value() + netA.tryConnect(addrB, gossipB) + // let the tryConnect go forward + assert.Eventually(t, func() bool { + new := networkPeerIdentityDisconnect.GetUint64Value() + return new > old + }, time.Second, 50*time.Millisecond) // netB never tries to add a new identity, since the connection gets abandoned before it is verified - assert.Equal(t, 2, netB.identityTracker.(*mockIdentityTracker).getSetCount()) + assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getSetCount()) assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getInsertCount()) // still just one A->B connection assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn))) @@ -1411,11 +1414,9 @@ func TestPeeringWithIdentityChallenge(t *testing.T) { // the underlying connection is being closed. In this case, the read loop // on the peer will detect and close the peer. Since this is asynchronous, // we wait and check regularly to allow the connection to settle - assert.Eventually( - t, - func() bool { return len(netB.GetPeers(PeersConnectedIn)) == 1 }, - 5*time.Second, - 100*time.Millisecond) + assert.Eventually(t, func() bool { + return len(netB.GetPeers(PeersConnectedIn)) == 1 + }, time.Second, 50*time.Millisecond) // Now have A connect to node C, which has the same PublicAddress as B (e.g., because it shares the // same public load balancer endpoint). C will have a different identity keypair and so will not be @@ -1432,13 +1433,15 @@ func TestPeeringWithIdentityChallenge(t *testing.T) { require.True(t, ok) gossipC, err := netC.addrToGossipAddr(addrC) require.NoError(t, err) - addrC = hostAndPort(addrC) + assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedOut))) // A connects to C (but uses addrB here to simulate case where B & C have the same PublicAddress) netA.wg.Add(1) netA.tryConnect(addrB, gossipC) // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) + assert.Eventually(t, func() bool { + return len(netA.GetPeers(PeersConnectedOut)) == 2 + }, time.Second, 50*time.Millisecond) // A->B and A->C both open assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn))) @@ -1453,7 +1456,10 @@ func TestPeeringWithIdentityChallenge(t *testing.T) { assert.Equal(t, 2, netA.identityTracker.(*mockIdentityTracker).getInsertCount()) // netC has to wait for a final verification message over WS Handler, so pause a moment - time.Sleep(250 * time.Millisecond) + assert.Eventually(t, func() bool { + return netC.identityTracker.(*mockIdentityTracker).getSetCount() == 1 + }, time.Second, 50*time.Millisecond) + assert.Equal(t, 1, netC.identityTracker.(*mockIdentityTracker).getSetCount()) assert.Equal(t, 1, netC.identityTracker.(*mockIdentityTracker).getInsertCount()) @@ -1481,8 +1487,6 @@ func TestPeeringSenderIdentityChallengeOnly(t *testing.T) { addrA, ok := netA.Address() require.True(t, ok) - gossipA, err := netA.addrToGossipAddr(addrA) - require.NoError(t, err) addrB, ok := netB.Address() require.True(t, ok) @@ -1493,12 +1497,16 @@ func TestPeeringSenderIdentityChallengeOnly(t *testing.T) { addrA = hostAndPort(addrA) addrB = hostAndPort(addrB) + assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedOut))) + assert.Equal(t, 0, len(netB.GetPeers(PeersConnectedIn))) + // first connection should work just fine if _, ok := netA.tryConnectReserveAddr(addrB); ok { netA.wg.Add(1) netA.tryConnect(addrB, gossipB) - // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) + assert.Eventually(t, func() bool { + return len(netA.GetPeers(PeersConnectedOut)) == 1 + }, time.Second, 50*time.Millisecond) } assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedOut))) assert.Equal(t, 1, len(netB.GetPeers(PeersConnectedIn))) @@ -1507,18 +1515,15 @@ func TestPeeringSenderIdentityChallengeOnly(t *testing.T) { assert.Equal(t, 0, netA.identityTracker.(*mockIdentityTracker).getSetCount()) assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount()) - // bi-directional connection should also work - if _, ok := netB.tryConnectReserveAddr(addrA); ok { - netB.wg.Add(1) - netB.tryConnect(addrA, gossipA) - // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) - } - // the nodes are connected redundantly - assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn))) + // bi-directional connection does not work because netA advertises its public address + _, ok = netB.tryConnectReserveAddr(addrA) + assert.False(t, ok) + + // no redundant connections + assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn))) assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedOut))) assert.Equal(t, 1, len(netB.GetPeers(PeersConnectedIn))) - assert.Equal(t, 1, len(netB.GetPeers(PeersConnectedOut))) + assert.Equal(t, 0, len(netB.GetPeers(PeersConnectedOut))) // confirm identity map was not added to for either host assert.Equal(t, 0, netA.identityTracker.(*mockIdentityTracker).getSetCount()) assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount()) @@ -1558,12 +1563,15 @@ func TestPeeringReceiverIdentityChallengeOnly(t *testing.T) { addrA = hostAndPort(addrA) addrB = hostAndPort(addrB) + assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedOut))) // first connection should work just fine if _, ok := netA.tryConnectReserveAddr(addrB); ok { netA.wg.Add(1) netA.tryConnect(addrB, gossipB) // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) + assert.Eventually(t, func() bool { + return len(netA.GetPeers(PeersConnectedOut)) == 1 + }, time.Second, 50*time.Millisecond) } // single A->B connection assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn))) @@ -1580,7 +1588,9 @@ func TestPeeringReceiverIdentityChallengeOnly(t *testing.T) { netB.wg.Add(1) netB.tryConnect(addrA, gossipA) // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) + assert.Eventually(t, func() bool { + return len(netB.GetPeers(PeersConnectedOut)) == 1 + }, time.Second, 50*time.Millisecond) } assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn))) assert.Equal(t, 1, len(netA.GetPeers(PeersConnectedOut))) @@ -1591,7 +1601,7 @@ func TestPeeringReceiverIdentityChallengeOnly(t *testing.T) { assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount()) } -// TestPeeringIncorrectDeduplicationName confirm that if the reciever can't match +// TestPeeringIncorrectDeduplicationName confirm that if the reciever can't match // the Address in the challenge to its PublicAddress, identities aren't exchanged, but peering continues func TestPeeringIncorrectDeduplicationName(t *testing.T) { partitiontest.PartitionTest(t) @@ -1625,12 +1635,15 @@ func TestPeeringIncorrectDeduplicationName(t *testing.T) { addrA = hostAndPort(addrA) addrB = hostAndPort(addrB) + assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedOut))) // first connection should work just fine if _, ok := netA.tryConnectReserveAddr(addrB); ok { netA.wg.Add(1) netA.tryConnect(addrB, gossipB) // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) + assert.Eventually(t, func() bool { + return len(netA.GetPeers(PeersConnectedOut)) == 1 + }, time.Second, 50*time.Millisecond) } // single A->B connection assert.Equal(t, 0, len(netA.GetPeers(PeersConnectedIn))) @@ -1643,14 +1656,18 @@ func TestPeeringIncorrectDeduplicationName(t *testing.T) { assert.Equal(t, 0, netA.identityTracker.(*mockIdentityTracker).getSetCount()) assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount()) - // bi-directional connection should also work + // bi-directional connection would now work since netB detects to be connected to netA in tryConnectReserveAddr, + // so force it. // this second connection should set identities, because the reciever address matches now - if _, ok := netB.tryConnectReserveAddr(addrA); ok { - netB.wg.Add(1) - netB.tryConnect(addrA, gossipA) - // let the tryConnect go forward - time.Sleep(250 * time.Millisecond) - } + _, ok = netB.tryConnectReserveAddr(addrA) + assert.False(t, ok) + netB.wg.Add(1) + netB.tryConnect(addrA, gossipA) + // let the tryConnect go forward + assert.Eventually(t, func() bool { + return len(netB.GetPeers(PeersConnectedOut)) == 1 + }, time.Second, 50*time.Millisecond) + // confirm that at this point the identityTracker was called once per network // and inserted once per network assert.Equal(t, 1, netA.identityTracker.(*mockIdentityTracker).getSetCount()) @@ -1982,14 +1999,13 @@ func TestPeeringWithBadIdentityVerification(t *testing.T) { partitiontest.PartitionTest(t) type testCase struct { - name string - verifyResponse func(t *testing.T, h http.Header, c identityChallengeValue) (crypto.PublicKey, []byte, error) - totalInA int - totalOutA int - totalInB int - totalOutB int - additionalSleep time.Duration - occupied bool + name string + verifyResponse func(t *testing.T, h http.Header, c identityChallengeValue) (crypto.PublicKey, []byte, error) + totalInA int + totalOutA int + totalInB int + totalOutB int + occupied bool } testCases := []testCase{ @@ -2602,7 +2618,7 @@ func TestSlowPeerDisconnection(t *testing.T) { peers, _ = netA.peerSnapshot(peers) if len(peers) == 0 || peers[0] != peer { // make sure it took more than 1 second, and less than 5 seconds. - waitTime := time.Now().Sub(beforeLoopTime) + waitTime := time.Since(beforeLoopTime) require.LessOrEqual(t, int64(time.Second), int64(waitTime)) require.GreaterOrEqual(t, int64(5*time.Second), int64(waitTime)) break @@ -2895,7 +2911,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { netB.config.EnablePingHandler = false addrA, postListen := netA.Address() require.True(t, postListen) - t.Log(addrA) + t.Logf("netA %s", addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) // have netB asking netA to send it ft2, deregister ping handler to make sure that we aren't exceeding the maximum MOI messagesize @@ -2905,6 +2921,8 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { netB.Start() defer netStop(t, netB, "B") + addrB, _ := netB.Address() + t.Logf("netB %s", addrB) incomingMsgSync := deadlock.Mutex{} msgCounters := make(map[protocol.Tag]int) @@ -3652,7 +3670,7 @@ func BenchmarkVariableTransactionMessageBlockSizes(t *testing.B) { netB.Broadcast(context.Background(), protocol.TxnTag, dataBuffer, true, nil) <-msgProcessed } - deltaTime := time.Now().Sub(startTime) + deltaTime := time.Since(startTime) rate = float64(t.N) * float64(time.Second) / float64(deltaTime) t.ReportMetric(rate, "txn/sec") }) @@ -3794,7 +3812,6 @@ func TestWebsocketNetworkTelemetryTCP(t *testing.T) { type mockServer struct { *httptest.Server URL string - t *testing.T waitForClientClose bool } @@ -3845,7 +3862,7 @@ func (t mockHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } wr.Close() - for true { + for { // echo a message back to the client _, _, err := ws.NextReader() if err != nil { @@ -4002,7 +4019,7 @@ func TestDiscardUnrequestedBlockResponse(t *testing.T) { require.Eventually(t, func() bool { return netA.NumPeers() == 1 }, 500*time.Millisecond, 25*time.Millisecond) // send an unrequested block response - msg := make([]sendMessage, 1, 1) + msg := make([]sendMessage, 1) msg[0] = sendMessage{ data: append([]byte(protocol.TopicMsgRespTag), []byte("foo")...), enqueued: time.Now(), diff --git a/network/wsPeer.go b/network/wsPeer.go index 4ed32539a..7cbdbeaeb 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -360,6 +360,7 @@ func makePeerCore(ctx context.Context, net GossipNode, log logging.Logger, readB } // GetAddress returns the root url to use to connect to this peer. +// This implements HTTPPeer interface and used by external services to determine where to connect to. // TODO: should GetAddress be added to Peer interface? func (wp *wsPeerCore) GetAddress() string { return wp.rootURL |