diff options
Diffstat (limited to 'network/wsPeer.go')
-rw-r--r-- | network/wsPeer.go | 150 |
1 files changed, 54 insertions, 96 deletions
diff --git a/network/wsPeer.go b/network/wsPeer.go index c661629aa..f476cfa7e 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -76,10 +76,10 @@ var defaultSendMessageTags = map[protocol.Tag]bool{ protocol.ProposalPayloadTag: true, protocol.TopicMsgRespTag: true, protocol.MsgOfInterestTag: true, + protocol.TxnTag: true, protocol.UniCatchupReqTag: true, protocol.UniEnsBlockReqTag: true, protocol.VoteBundleTag: true, - protocol.Txn2Tag: true, } // interface allows substituting debug implementation for *websocket.Conn @@ -96,10 +96,10 @@ type wsPeerWebsocketConn interface { type sendMessage struct { data []byte - enqueued time.Time // the time at which the message was first generated - peerEnqueued time.Time // the time at which the peer was attempting to enqueue the message - msgTags map[protocol.Tag]bool // when msgTags is specified ( i.e. non-nil ), the send goroutine is to replace the message tag filter with this one. No data would be accompanied to this message. - callback UnicastWebsocketMessageStateCallback // when non-nil, the callback function would be called after entry would be placed on the outgoing websocket queue + enqueued time.Time // the time at which the message was first generated + peerEnqueued time.Time // the time at which the peer was attempting to enqueue the message + msgTags map[protocol.Tag]bool // when msgTags is specified ( i.e. non-nil ), the send goroutine is to replace the message tag filter with this one. No data would be accompanied to this message. + hash crypto.Digest ctx context.Context } @@ -124,7 +124,6 @@ const disconnectLeastPerformingPeer disconnectReason = "LeastPerformingPeer" const disconnectCliqueResolve disconnectReason = "CliqueResolving" const disconnectRequestReceived disconnectReason = "DisconnectRequest" const disconnectStaleWrite disconnectReason = "DisconnectStaleWrite" -const disconnectClientCallback disconnectReason = "ClientCallback" // Response is the structure holding the response from the server type Response struct { @@ -175,7 +174,11 @@ type wsPeer struct { processed chan struct{} - latencyTracker latencyTracker + pingLock deadlock.Mutex + pingSent time.Time + pingData []byte + pingInFlight bool + lastPingRoundTripTime time.Duration // Hint about position in wn.peers. Definitely valid if the peer // is present in wn.peers. @@ -221,9 +224,6 @@ type wsPeer struct { // clientDataStoreMu synchronizes access to clientDataStore clientDataStoreMu deadlock.Mutex - - // outgoingMessageCounters counts the number of messages send for each tag. It allows us to use implicit message counting. - outgoingMessageCounters map[protocol.Tag]uint64 } // HTTPPeer is what the opaque Peer might be. @@ -233,22 +233,16 @@ type HTTPPeer interface { GetHTTPClient() *http.Client } -// UnicastWebsocketMessageStateCallback provide asyncrounious feedback for the sequence number of a message -// if the caller return an error, the network peer would disconnect -type UnicastWebsocketMessageStateCallback func(enqueued bool, sequenceNumber uint64) error - // UnicastPeer is another possible interface for the opaque Peer. // It is possible that we can only initiate a connection to a peer over websockets. type UnicastPeer interface { GetAddress() string // Unicast sends the given bytes to this specific peer. Does not wait for message to be sent. - Unicast(ctx context.Context, data []byte, tag protocol.Tag, callback UnicastWebsocketMessageStateCallback) error + Unicast(ctx context.Context, data []byte, tag protocol.Tag) error // Version returns the matching version from network.SupportedProtocolVersions Version() string Request(ctx context.Context, tag Tag, topics Topics) (resp *Response, e error) Respond(ctx context.Context, reqMsg IncomingMessage, topics Topics) (e error) - IsOutgoing() bool - GetConnectionLatency() time.Duration } // Create a wsPeerCore object @@ -278,20 +272,9 @@ func (wp *wsPeer) Version() string { return wp.version } -// IsOutgoing returns true if the connection is an outgoing connection or false if it the connection -// is an incoming connection. -func (wp *wsPeer) IsOutgoing() bool { - return wp.outgoing -} - -// GetConnectionLatency returns the connection latency between the local node and this peer. -func (wp *wsPeer) GetConnectionLatency() time.Duration { - return wp.latencyTracker.getConnectionLatency() -} - // Unicast sends the given bytes to this specific peer. Does not wait for message to be sent. // (Implements UnicastPeer) -func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, callback UnicastWebsocketMessageStateCallback) error { +func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag) error { var err error tbytes := []byte(tag) @@ -303,7 +286,7 @@ func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, cal digest = crypto.Hash(mbytes) } - ok := wp.writeNonBlock(ctx, mbytes, false, digest, time.Now(), callback) + ok := wp.writeNonBlock(ctx, mbytes, false, digest, time.Now()) if !ok { networkBroadcastsDropped.Inc(nil) err = fmt.Errorf("wsPeer failed to unicast: %v", wp.GetAddress()) @@ -356,7 +339,6 @@ func (wp *wsPeer) init(config config.Local, sendBufferLength int) { wp.responseChannels = make(map[uint64]chan *Response) wp.sendMessageTag = defaultSendMessageTags wp.clientDataStore = make(map[string]interface{}) - wp.outgoingMessageCounters = make(map[protocol.Tag]uint64) // processed is a channel that messageHandlerThread writes to // when it's done with one of our messages, so that we can queue @@ -371,24 +353,6 @@ func (wp *wsPeer) init(config config.Local, sendBufferLength int) { wp.outgoingMsgFilter = makeMessageFilter(config.OutgoingMessageFilterBucketCount, config.OutgoingMessageFilterBucketSize) } - // if we're on an older version, then add the old style transaction message to the send messages tag. - // once we deprecate old style transaction sending, this part can go away. - if wp.version != "3.0" { - txSendMsgTags := make(map[protocol.Tag]bool) - for tag := range wp.sendMessageTag { - txSendMsgTags[tag] = true - } - txSendMsgTags[protocol.TxnTag] = true - wp.sendMessageTag = txSendMsgTags - } - - wp.latencyTracker.init(wp.conn, config, time.Duration(0)) - // send a ping right away. - now := time.Now() - if err := wp.latencyTracker.checkPingSending(&now); err != nil { - wp.net.log.Infof("failed to send ping message to peer : %v", err) - } - wp.wg.Add(2) go wp.readLoop() go wp.writeLoop() @@ -421,7 +385,6 @@ func (wp *wsPeer) readLoop() { }() wp.conn.SetReadLimit(maxMessageLength) slurper := MakeLimitedReaderSlurper(averageMessageLength, maxMessageLength) - sequenceCounters := make(map[protocol.Tag]uint64) for { msg := IncomingMessage{} mtype, reader, err := wp.conn.NextReader() @@ -457,7 +420,7 @@ func (wp *wsPeer) readLoop() { wp.reportReadErr(err) return } - wp.latencyTracker.increaseReceivedCounter() + msg.processing = wp.processed msg.Received = time.Now().UnixNano() msg.Data = slurper.Bytes() @@ -468,8 +431,6 @@ func (wp *wsPeer) readLoop() { networkReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2)) networkMessageReceivedByTag.Add(string(tag[:]), 1) msg.Sender = wp - msg.Sequence = sequenceCounters[msg.Tag] - sequenceCounters[msg.Tag] = msg.Sequence + 1 // for outgoing connections, we want to notify the connection monitor that we've received // a message. The connection monitor would update it's statistics accordingly. @@ -623,12 +584,6 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { if len(msg.data) > maxMessageLength { wp.net.log.Errorf("trying to send a message longer than we would receive: %d > %d tag=%s", len(msg.data), maxMessageLength, string(msg.data[0:2])) // just drop it, don't break the connection - if msg.callback != nil { - // let the callback know that the message was not sent. - if nil != msg.callback(false, 0) { - return disconnectClientCallback - } - } return disconnectReasonNone } if msg.msgTags != nil { @@ -641,12 +596,6 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { tag := protocol.Tag(msg.data[:2]) if !wp.sendMessageTag[tag] { // the peer isn't interested in this message. - if msg.callback != nil { - // let the callback know that the message was not sent. - if nil != msg.callback(false, 0) { - return disconnectClientCallback - } - } return disconnectReasonNone } @@ -656,20 +605,9 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { if msgWaitDuration > maxMessageQueueDuration { wp.net.log.Warnf("peer stale enqueued message %dms", msgWaitDuration.Nanoseconds()/1000000) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "stale message"}) - if msg.callback != nil { - // let the callback know that the message was not sent. - if nil != msg.callback(false, 0) { - return disconnectClientCallback - } - } return disconnectStaleWrite } - // is it time to send a ping message ? - if err := wp.latencyTracker.checkPingSending(&now); err != nil { - wp.net.log.Infof("failed to send ping message to peer : %v", err) - } - atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, msg.enqueued.UnixNano()) defer atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, 0) err := wp.conn.WriteMessage(websocket.BinaryMessage, msg.data) @@ -678,12 +616,6 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { wp.net.log.Warn("peer write error ", err) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "write err"}) } - if msg.callback != nil { - // let the callback know that the message was not sent. - if nil != msg.callback(false, 0) { - return disconnectClientCallback - } - } return disconnectWriteError } atomic.StoreInt64(&wp.lastPacketTime, time.Now().UnixNano()) @@ -692,16 +624,6 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { networkMessageSentTotal.AddUint64(1, nil) networkMessageSentByTag.Add(string(tag), 1) networkMessageQueueMicrosTotal.AddUint64(uint64(time.Now().Sub(msg.peerEnqueued).Nanoseconds()/1000), nil) - - if msg.callback != nil { - // for performance reasons, we count messages only for messages that request a callback. we might want to revisit this - // in the future. - seq := wp.outgoingMessageCounters[tag] - if nil != msg.callback(true, seq) { - return disconnectClientCallback - } - wp.outgoingMessageCounters[tag] = seq + 1 - } return disconnectReasonNone } @@ -744,16 +666,16 @@ func (wp *wsPeer) writeLoopCleanup(reason disconnectReason) { wp.wg.Done() } -func (wp *wsPeer) writeNonBlock(ctx context.Context, data []byte, highPrio bool, digest crypto.Digest, msgEnqueueTime time.Time, callback UnicastWebsocketMessageStateCallback) bool { +func (wp *wsPeer) writeNonBlock(ctx context.Context, data []byte, highPrio bool, digest crypto.Digest, msgEnqueueTime time.Time) bool { msgs := make([][]byte, 1, 1) digests := make([]crypto.Digest, 1, 1) msgs[0] = data digests[0] = digest - return wp.writeNonBlockMsgs(ctx, msgs, highPrio, digests, msgEnqueueTime, callback) + return wp.writeNonBlockMsgs(ctx, msgs, highPrio, digests, msgEnqueueTime) } // return true if enqueued/sent -func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio bool, digest []crypto.Digest, msgEnqueueTime time.Time, callback UnicastWebsocketMessageStateCallback) bool { +func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio bool, digest []crypto.Digest, msgEnqueueTime time.Time) bool { includeIndices := make([]int, 0, len(data)) for i := range data { if wp.outgoingMsgFilter != nil && len(data[i]) > messageFilterSize && wp.outgoingMsgFilter.CheckDigest(digest[i], false, false) { @@ -775,7 +697,7 @@ func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio msgs := make([]sendMessage, 0, len(includeIndices)) enqueueTime := time.Now() for _, index := range includeIndices { - msgs = append(msgs, sendMessage{data: data[index], enqueued: msgEnqueueTime, peerEnqueued: enqueueTime, ctx: ctx, callback: callback}) + msgs = append(msgs, sendMessage{data: data[index], enqueued: msgEnqueueTime, peerEnqueued: enqueueTime, hash: digest[index], ctx: ctx}) } if highPrio { @@ -791,6 +713,42 @@ func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio return false } +const pingLength = 8 +const maxPingWait = 60 * time.Second + +// sendPing sends a ping block to the peer. +// return true if either a ping request was enqueued or there is already ping request in flight in the past maxPingWait time. +func (wp *wsPeer) sendPing() bool { + wp.pingLock.Lock() + defer wp.pingLock.Unlock() + now := time.Now() + if wp.pingInFlight && (now.Sub(wp.pingSent) < maxPingWait) { + return true + } + + tagBytes := []byte(protocol.PingTag) + mbytes := make([]byte, len(tagBytes)+pingLength) + copy(mbytes, tagBytes) + crypto.RandBytes(mbytes[len(tagBytes):]) + wp.pingData = mbytes[len(tagBytes):] + sent := wp.writeNonBlock(context.Background(), mbytes, false, crypto.Digest{}, time.Now()) + + if sent { + wp.pingInFlight = true + wp.pingSent = now + } + return sent +} + +// get some times out of the peer while observing the ping data lock +func (wp *wsPeer) pingTimes() (lastPingSent time.Time, lastPingRoundTripTime time.Duration) { + wp.pingLock.Lock() + defer wp.pingLock.Unlock() + lastPingSent = wp.pingSent + lastPingRoundTripTime = wp.lastPingRoundTripTime + return +} + // called when the connection had an error or closed remotely func (wp *wsPeer) internalClose(reason disconnectReason) { if atomic.CompareAndSwapInt32(&wp.didSignalClose, 0, 1) { |