summaryrefslogtreecommitdiff
path: root/network/wsPeer.go
diff options
context:
space:
mode:
Diffstat (limited to 'network/wsPeer.go')
-rw-r--r--network/wsPeer.go150
1 files changed, 54 insertions, 96 deletions
diff --git a/network/wsPeer.go b/network/wsPeer.go
index c661629aa..f476cfa7e 100644
--- a/network/wsPeer.go
+++ b/network/wsPeer.go
@@ -76,10 +76,10 @@ var defaultSendMessageTags = map[protocol.Tag]bool{
protocol.ProposalPayloadTag: true,
protocol.TopicMsgRespTag: true,
protocol.MsgOfInterestTag: true,
+ protocol.TxnTag: true,
protocol.UniCatchupReqTag: true,
protocol.UniEnsBlockReqTag: true,
protocol.VoteBundleTag: true,
- protocol.Txn2Tag: true,
}
// interface allows substituting debug implementation for *websocket.Conn
@@ -96,10 +96,10 @@ type wsPeerWebsocketConn interface {
type sendMessage struct {
data []byte
- enqueued time.Time // the time at which the message was first generated
- peerEnqueued time.Time // the time at which the peer was attempting to enqueue the message
- msgTags map[protocol.Tag]bool // when msgTags is specified ( i.e. non-nil ), the send goroutine is to replace the message tag filter with this one. No data would be accompanied to this message.
- callback UnicastWebsocketMessageStateCallback // when non-nil, the callback function would be called after entry would be placed on the outgoing websocket queue
+ enqueued time.Time // the time at which the message was first generated
+ peerEnqueued time.Time // the time at which the peer was attempting to enqueue the message
+ msgTags map[protocol.Tag]bool // when msgTags is specified ( i.e. non-nil ), the send goroutine is to replace the message tag filter with this one. No data would be accompanied to this message.
+ hash crypto.Digest
ctx context.Context
}
@@ -124,7 +124,6 @@ const disconnectLeastPerformingPeer disconnectReason = "LeastPerformingPeer"
const disconnectCliqueResolve disconnectReason = "CliqueResolving"
const disconnectRequestReceived disconnectReason = "DisconnectRequest"
const disconnectStaleWrite disconnectReason = "DisconnectStaleWrite"
-const disconnectClientCallback disconnectReason = "ClientCallback"
// Response is the structure holding the response from the server
type Response struct {
@@ -175,7 +174,11 @@ type wsPeer struct {
processed chan struct{}
- latencyTracker latencyTracker
+ pingLock deadlock.Mutex
+ pingSent time.Time
+ pingData []byte
+ pingInFlight bool
+ lastPingRoundTripTime time.Duration
// Hint about position in wn.peers. Definitely valid if the peer
// is present in wn.peers.
@@ -221,9 +224,6 @@ type wsPeer struct {
// clientDataStoreMu synchronizes access to clientDataStore
clientDataStoreMu deadlock.Mutex
-
- // outgoingMessageCounters counts the number of messages send for each tag. It allows us to use implicit message counting.
- outgoingMessageCounters map[protocol.Tag]uint64
}
// HTTPPeer is what the opaque Peer might be.
@@ -233,22 +233,16 @@ type HTTPPeer interface {
GetHTTPClient() *http.Client
}
-// UnicastWebsocketMessageStateCallback provide asyncrounious feedback for the sequence number of a message
-// if the caller return an error, the network peer would disconnect
-type UnicastWebsocketMessageStateCallback func(enqueued bool, sequenceNumber uint64) error
-
// UnicastPeer is another possible interface for the opaque Peer.
// It is possible that we can only initiate a connection to a peer over websockets.
type UnicastPeer interface {
GetAddress() string
// Unicast sends the given bytes to this specific peer. Does not wait for message to be sent.
- Unicast(ctx context.Context, data []byte, tag protocol.Tag, callback UnicastWebsocketMessageStateCallback) error
+ Unicast(ctx context.Context, data []byte, tag protocol.Tag) error
// Version returns the matching version from network.SupportedProtocolVersions
Version() string
Request(ctx context.Context, tag Tag, topics Topics) (resp *Response, e error)
Respond(ctx context.Context, reqMsg IncomingMessage, topics Topics) (e error)
- IsOutgoing() bool
- GetConnectionLatency() time.Duration
}
// Create a wsPeerCore object
@@ -278,20 +272,9 @@ func (wp *wsPeer) Version() string {
return wp.version
}
-// IsOutgoing returns true if the connection is an outgoing connection or false if it the connection
-// is an incoming connection.
-func (wp *wsPeer) IsOutgoing() bool {
- return wp.outgoing
-}
-
-// GetConnectionLatency returns the connection latency between the local node and this peer.
-func (wp *wsPeer) GetConnectionLatency() time.Duration {
- return wp.latencyTracker.getConnectionLatency()
-}
-
// Unicast sends the given bytes to this specific peer. Does not wait for message to be sent.
// (Implements UnicastPeer)
-func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, callback UnicastWebsocketMessageStateCallback) error {
+func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag) error {
var err error
tbytes := []byte(tag)
@@ -303,7 +286,7 @@ func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, cal
digest = crypto.Hash(mbytes)
}
- ok := wp.writeNonBlock(ctx, mbytes, false, digest, time.Now(), callback)
+ ok := wp.writeNonBlock(ctx, mbytes, false, digest, time.Now())
if !ok {
networkBroadcastsDropped.Inc(nil)
err = fmt.Errorf("wsPeer failed to unicast: %v", wp.GetAddress())
@@ -356,7 +339,6 @@ func (wp *wsPeer) init(config config.Local, sendBufferLength int) {
wp.responseChannels = make(map[uint64]chan *Response)
wp.sendMessageTag = defaultSendMessageTags
wp.clientDataStore = make(map[string]interface{})
- wp.outgoingMessageCounters = make(map[protocol.Tag]uint64)
// processed is a channel that messageHandlerThread writes to
// when it's done with one of our messages, so that we can queue
@@ -371,24 +353,6 @@ func (wp *wsPeer) init(config config.Local, sendBufferLength int) {
wp.outgoingMsgFilter = makeMessageFilter(config.OutgoingMessageFilterBucketCount, config.OutgoingMessageFilterBucketSize)
}
- // if we're on an older version, then add the old style transaction message to the send messages tag.
- // once we deprecate old style transaction sending, this part can go away.
- if wp.version != "3.0" {
- txSendMsgTags := make(map[protocol.Tag]bool)
- for tag := range wp.sendMessageTag {
- txSendMsgTags[tag] = true
- }
- txSendMsgTags[protocol.TxnTag] = true
- wp.sendMessageTag = txSendMsgTags
- }
-
- wp.latencyTracker.init(wp.conn, config, time.Duration(0))
- // send a ping right away.
- now := time.Now()
- if err := wp.latencyTracker.checkPingSending(&now); err != nil {
- wp.net.log.Infof("failed to send ping message to peer : %v", err)
- }
-
wp.wg.Add(2)
go wp.readLoop()
go wp.writeLoop()
@@ -421,7 +385,6 @@ func (wp *wsPeer) readLoop() {
}()
wp.conn.SetReadLimit(maxMessageLength)
slurper := MakeLimitedReaderSlurper(averageMessageLength, maxMessageLength)
- sequenceCounters := make(map[protocol.Tag]uint64)
for {
msg := IncomingMessage{}
mtype, reader, err := wp.conn.NextReader()
@@ -457,7 +420,7 @@ func (wp *wsPeer) readLoop() {
wp.reportReadErr(err)
return
}
- wp.latencyTracker.increaseReceivedCounter()
+
msg.processing = wp.processed
msg.Received = time.Now().UnixNano()
msg.Data = slurper.Bytes()
@@ -468,8 +431,6 @@ func (wp *wsPeer) readLoop() {
networkReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2))
networkMessageReceivedByTag.Add(string(tag[:]), 1)
msg.Sender = wp
- msg.Sequence = sequenceCounters[msg.Tag]
- sequenceCounters[msg.Tag] = msg.Sequence + 1
// for outgoing connections, we want to notify the connection monitor that we've received
// a message. The connection monitor would update it's statistics accordingly.
@@ -623,12 +584,6 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
if len(msg.data) > maxMessageLength {
wp.net.log.Errorf("trying to send a message longer than we would receive: %d > %d tag=%s", len(msg.data), maxMessageLength, string(msg.data[0:2]))
// just drop it, don't break the connection
- if msg.callback != nil {
- // let the callback know that the message was not sent.
- if nil != msg.callback(false, 0) {
- return disconnectClientCallback
- }
- }
return disconnectReasonNone
}
if msg.msgTags != nil {
@@ -641,12 +596,6 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
tag := protocol.Tag(msg.data[:2])
if !wp.sendMessageTag[tag] {
// the peer isn't interested in this message.
- if msg.callback != nil {
- // let the callback know that the message was not sent.
- if nil != msg.callback(false, 0) {
- return disconnectClientCallback
- }
- }
return disconnectReasonNone
}
@@ -656,20 +605,9 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
if msgWaitDuration > maxMessageQueueDuration {
wp.net.log.Warnf("peer stale enqueued message %dms", msgWaitDuration.Nanoseconds()/1000000)
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "stale message"})
- if msg.callback != nil {
- // let the callback know that the message was not sent.
- if nil != msg.callback(false, 0) {
- return disconnectClientCallback
- }
- }
return disconnectStaleWrite
}
- // is it time to send a ping message ?
- if err := wp.latencyTracker.checkPingSending(&now); err != nil {
- wp.net.log.Infof("failed to send ping message to peer : %v", err)
- }
-
atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, msg.enqueued.UnixNano())
defer atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, 0)
err := wp.conn.WriteMessage(websocket.BinaryMessage, msg.data)
@@ -678,12 +616,6 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
wp.net.log.Warn("peer write error ", err)
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "write err"})
}
- if msg.callback != nil {
- // let the callback know that the message was not sent.
- if nil != msg.callback(false, 0) {
- return disconnectClientCallback
- }
- }
return disconnectWriteError
}
atomic.StoreInt64(&wp.lastPacketTime, time.Now().UnixNano())
@@ -692,16 +624,6 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
networkMessageSentTotal.AddUint64(1, nil)
networkMessageSentByTag.Add(string(tag), 1)
networkMessageQueueMicrosTotal.AddUint64(uint64(time.Now().Sub(msg.peerEnqueued).Nanoseconds()/1000), nil)
-
- if msg.callback != nil {
- // for performance reasons, we count messages only for messages that request a callback. we might want to revisit this
- // in the future.
- seq := wp.outgoingMessageCounters[tag]
- if nil != msg.callback(true, seq) {
- return disconnectClientCallback
- }
- wp.outgoingMessageCounters[tag] = seq + 1
- }
return disconnectReasonNone
}
@@ -744,16 +666,16 @@ func (wp *wsPeer) writeLoopCleanup(reason disconnectReason) {
wp.wg.Done()
}
-func (wp *wsPeer) writeNonBlock(ctx context.Context, data []byte, highPrio bool, digest crypto.Digest, msgEnqueueTime time.Time, callback UnicastWebsocketMessageStateCallback) bool {
+func (wp *wsPeer) writeNonBlock(ctx context.Context, data []byte, highPrio bool, digest crypto.Digest, msgEnqueueTime time.Time) bool {
msgs := make([][]byte, 1, 1)
digests := make([]crypto.Digest, 1, 1)
msgs[0] = data
digests[0] = digest
- return wp.writeNonBlockMsgs(ctx, msgs, highPrio, digests, msgEnqueueTime, callback)
+ return wp.writeNonBlockMsgs(ctx, msgs, highPrio, digests, msgEnqueueTime)
}
// return true if enqueued/sent
-func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio bool, digest []crypto.Digest, msgEnqueueTime time.Time, callback UnicastWebsocketMessageStateCallback) bool {
+func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio bool, digest []crypto.Digest, msgEnqueueTime time.Time) bool {
includeIndices := make([]int, 0, len(data))
for i := range data {
if wp.outgoingMsgFilter != nil && len(data[i]) > messageFilterSize && wp.outgoingMsgFilter.CheckDigest(digest[i], false, false) {
@@ -775,7 +697,7 @@ func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio
msgs := make([]sendMessage, 0, len(includeIndices))
enqueueTime := time.Now()
for _, index := range includeIndices {
- msgs = append(msgs, sendMessage{data: data[index], enqueued: msgEnqueueTime, peerEnqueued: enqueueTime, ctx: ctx, callback: callback})
+ msgs = append(msgs, sendMessage{data: data[index], enqueued: msgEnqueueTime, peerEnqueued: enqueueTime, hash: digest[index], ctx: ctx})
}
if highPrio {
@@ -791,6 +713,42 @@ func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio
return false
}
+const pingLength = 8
+const maxPingWait = 60 * time.Second
+
+// sendPing sends a ping block to the peer.
+// return true if either a ping request was enqueued or there is already ping request in flight in the past maxPingWait time.
+func (wp *wsPeer) sendPing() bool {
+ wp.pingLock.Lock()
+ defer wp.pingLock.Unlock()
+ now := time.Now()
+ if wp.pingInFlight && (now.Sub(wp.pingSent) < maxPingWait) {
+ return true
+ }
+
+ tagBytes := []byte(protocol.PingTag)
+ mbytes := make([]byte, len(tagBytes)+pingLength)
+ copy(mbytes, tagBytes)
+ crypto.RandBytes(mbytes[len(tagBytes):])
+ wp.pingData = mbytes[len(tagBytes):]
+ sent := wp.writeNonBlock(context.Background(), mbytes, false, crypto.Digest{}, time.Now())
+
+ if sent {
+ wp.pingInFlight = true
+ wp.pingSent = now
+ }
+ return sent
+}
+
+// get some times out of the peer while observing the ping data lock
+func (wp *wsPeer) pingTimes() (lastPingSent time.Time, lastPingRoundTripTime time.Duration) {
+ wp.pingLock.Lock()
+ defer wp.pingLock.Unlock()
+ lastPingSent = wp.pingSent
+ lastPingRoundTripTime = wp.lastPingRoundTripTime
+ return
+}
+
// called when the connection had an error or closed remotely
func (wp *wsPeer) internalClose(reason disconnectReason) {
if atomic.CompareAndSwapInt32(&wp.didSignalClose, 0, 1) {