summaryrefslogtreecommitdiff
path: root/network/wsNetwork_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'network/wsNetwork_test.go')
-rw-r--r--network/wsNetwork_test.go225
1 files changed, 86 insertions, 139 deletions
diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go
index 1a5a3d427..74c690241 100644
--- a/network/wsNetwork_test.go
+++ b/network/wsNetwork_test.go
@@ -238,7 +238,7 @@ func TestWebsocketNetworkBasic(t *testing.T) {
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
counter := newMessageCounter(t, 2)
counterDone := counter.done
- netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
@@ -246,8 +246,8 @@ func TestWebsocketNetworkBasic(t *testing.T) {
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")
- netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte("foo"), false, nil)
- netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte("bar"), false, nil)
+ netA.Broadcast(context.Background(), protocol.TxnTag, []byte("foo"), false, nil)
+ netA.Broadcast(context.Background(), protocol.TxnTag, []byte("bar"), false, nil)
select {
case <-counterDone:
@@ -274,7 +274,7 @@ func TestWebsocketNetworkUnicast(t *testing.T) {
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
counter := newMessageCounter(t, 2)
counterDone := counter.done
- netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
@@ -285,9 +285,9 @@ func TestWebsocketNetworkUnicast(t *testing.T) {
require.Equal(t, 1, len(netA.peers))
require.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn)))
peerB := netA.peers[0]
- err := peerB.Unicast(context.Background(), []byte("foo"), protocol.AgreementVoteTag, nil)
+ err := peerB.Unicast(context.Background(), []byte("foo"), protocol.TxnTag)
assert.NoError(t, err)
- err = peerB.Unicast(context.Background(), []byte("bar"), protocol.AgreementVoteTag, nil)
+ err = peerB.Unicast(context.Background(), []byte("bar"), protocol.TxnTag)
assert.NoError(t, err)
select {
@@ -353,7 +353,7 @@ func TestWebsocketNetworkArray(t *testing.T) {
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
counter := newMessageCounter(t, 3)
counterDone := counter.done
- netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
@@ -361,7 +361,7 @@ func TestWebsocketNetworkArray(t *testing.T) {
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")
- tags := []protocol.Tag{protocol.AgreementVoteTag, protocol.AgreementVoteTag, protocol.AgreementVoteTag}
+ tags := []protocol.Tag{protocol.TxnTag, protocol.TxnTag, protocol.TxnTag}
data := [][]byte{[]byte("foo"), []byte("bar"), []byte("algo")}
netA.BroadcastArray(context.Background(), tags, data, false, nil)
@@ -390,7 +390,7 @@ func TestWebsocketNetworkCancel(t *testing.T) {
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
counter := newMessageCounter(t, 100)
counterDone := counter.done
- netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
@@ -401,8 +401,8 @@ func TestWebsocketNetworkCancel(t *testing.T) {
tags := make([]protocol.Tag, 100)
data := make([][]byte, 100)
for i := range data {
- tags[i] = protocol.AgreementVoteTag
- data[i] = []byte(fmt.Sprintf("%d", i))
+ tags[i] = protocol.TxnTag
+ data[i] = []byte(string(rune(i)))
}
ctx, cancel := context.WithCancel(context.Background())
@@ -438,7 +438,7 @@ func TestWebsocketNetworkCancel(t *testing.T) {
mbytes := make([]byte, len(tbytes)+len(msg))
copy(mbytes, tbytes)
copy(mbytes[len(tbytes):], msg)
- msgs = append(msgs, sendMessage{data: mbytes, enqueued: time.Now(), peerEnqueued: enqueueTime, ctx: context.Background()})
+ msgs = append(msgs, sendMessage{data: mbytes, enqueued: time.Now(), peerEnqueued: enqueueTime, hash: crypto.Hash(mbytes), ctx: context.Background()})
}
msgs[50].ctx = ctx
@@ -476,7 +476,7 @@ func TestWebsocketNetworkNoAddress(t *testing.T) {
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
counter := newMessageCounter(t, 2)
counterDone := counter.done
- netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
@@ -484,8 +484,8 @@ func TestWebsocketNetworkNoAddress(t *testing.T) {
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")
- netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte("foo"), false, nil)
- netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte("bar"), false, nil)
+ netA.Broadcast(context.Background(), protocol.TxnTag, []byte("foo"), false, nil)
+ netA.Broadcast(context.Background(), protocol.TxnTag, []byte("bar"), false, nil)
select {
case <-counterDone:
@@ -508,7 +508,7 @@ func lineNetwork(t *testing.T, numNodes int) (nodes []*WebsocketNetwork, counter
addrPrev, postListen := nodes[i-1].Address()
require.True(t, postListen)
nodes[i].phonebook.ReplacePeerList([]string{addrPrev}, "default", PhoneBookEntryRelayRole)
- nodes[i].RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: &counters[i]}})
+ nodes[i].RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: &counters[i]}})
}
nodes[i].Start()
counters[i].t = t
@@ -565,7 +565,7 @@ func TestLineNetwork(t *testing.T) {
sendTime := time.Now().UnixNano()
var timeblob [8]byte
binary.LittleEndian.PutUint64(timeblob[:], uint64(sendTime))
- nodes[0].Broadcast(context.Background(), protocol.AgreementVoteTag, timeblob[:], true, nil)
+ nodes[0].Broadcast(context.Background(), protocol.TxnTag, timeblob[:], true, nil)
}
select {
case <-counterDone:
@@ -792,124 +792,71 @@ func avgSendBufferHighPrioLength(wn *WebsocketNetwork) float64 {
return float64(sum) / float64(len(wn.peers))
}
-// TestSlowOutboundPeer tests what happens when one outbound peer is slow and the rest are fine.
+// TestSlowOutboundPeer tests what happens when one outbound peer is slow and the rest are fine. Current logic is to disconnect the one slow peer when its outbound channel is full.
+//
+// This is a deeply invasive test that reaches into the guts of WebsocketNetwork and wsPeer. If the implementation chainges consider throwing away or totally reimplementing this test.
func TestSlowOutboundPeer(t *testing.T) {
partitiontest.PartitionTest(t)
- nodeA := makeTestWebsocketNode(t)
- nodeA.config.GossipFanout = 0
- nodeA.Start()
- defer nodeA.Stop()
-
- addrA, postListenA := nodeA.Address()
- require.True(t, postListenA)
-
- nodeB := makeTestWebsocketNode(t)
- nodeB.config.GossipFanout = 0
- nodeB.Start()
- defer nodeB.Stop()
-
- addrB, postListenB := nodeB.Address()
- require.True(t, postListenB)
-
+ t.Skip() // todo - update this test to reflect the new implementation.
+ xtag := protocol.ProposalPayloadTag
node := makeTestWebsocketNode(t)
- node.config.GossipFanout = 2
- dl := eventsDetailsLogger{Logger: logging.TestingLog(t), eventReceived: make(chan interface{}, 100), eventIdentifier: telemetryspec.DisconnectPeerEvent}
- node.log = dl
-
- node.phonebook.ReplacePeerList([]string{addrA, addrB}, "default", PhoneBookEntryRelayRole)
- node.Start()
- defer node.Stop()
-
- msg := make([]byte, 100)
- rand.Read(msg)
-
- muHandleA := deadlock.Mutex{}
- numHandledA := 0
- waitMessageArriveHandlerA := func(msg IncomingMessage) (out OutgoingMessage) {
- muHandleA.Lock()
- defer muHandleA.Unlock()
- numHandledA++
- return
+ destPeers := make([]wsPeer, 5)
+ for i := range destPeers {
+ destPeers[i].closing = make(chan struct{})
+ destPeers[i].net = node
+ destPeers[i].sendBufferHighPrio = make(chan sendMessages, sendBufferLength)
+ destPeers[i].sendBufferBulk = make(chan sendMessages, sendBufferLength)
+ destPeers[i].conn = &nopConnSingleton
+ destPeers[i].rootURL = fmt.Sprintf("fake %d", i)
+ node.addPeer(&destPeers[i])
}
- nodeA.RegisterHandlers([]TaggedMessageHandler{
- {
- Tag: protocol.AgreementVoteTag,
- MessageHandler: HandlerFunc(waitMessageArriveHandlerA),
- }})
-
- muHandleB := deadlock.Mutex{}
- numHandledB := 0
- waitMessageArriveHandlerB := func(msg IncomingMessage) (out OutgoingMessage) {
- muHandleB.Lock()
- defer muHandleB.Unlock()
- numHandledB++
- return
+ node.Start()
+ tctx, cf := context.WithTimeout(context.Background(), 5*time.Second)
+ for i := 0; i < sendBufferLength; i++ {
+ t.Logf("broadcast %d", i)
+ sent := node.Broadcast(tctx, xtag, []byte{byte(i)}, true, nil)
+ require.NoError(t, sent)
}
- nodeB.RegisterHandlers([]TaggedMessageHandler{
- {
- Tag: protocol.AgreementVoteTag,
- MessageHandler: HandlerFunc(waitMessageArriveHandlerB),
- }})
-
- readyTimeout := time.NewTimer(2 * time.Second)
- waitReady(t, node, readyTimeout.C)
- require.Equal(t, 2, len(node.peers))
-
- callback := func(enqueued bool, sequenceNumber uint64) error {
- time.Sleep(2 * maxMessageQueueDuration)
- return nil
+ cf()
+ ok := false
+ for i := 0; i < 10; i++ {
+ time.Sleep(time.Millisecond)
+ aoql := avgSendBufferHighPrioLength(node)
+ if aoql == sendBufferLength {
+ ok = true
+ break
+ }
+ t.Logf("node.avgOutboundQueueLength() %f", aoql)
}
-
- rand.Read(msg)
- x := 0
-MAINLOOP:
- for ; x < 1000; x++ {
- select {
- case eventDetails := <-dl.eventReceived:
- switch disconnectPeerEventDetails := eventDetails.(type) {
- case telemetryspec.DisconnectPeerEventDetails:
- require.Equal(t, string(disconnectSlowConn), disconnectPeerEventDetails.Reason)
- default:
- require.FailNow(t, "Unexpected event was send : %v", eventDetails)
- }
- break MAINLOOP
- default:
+ require.True(t, ok)
+ for p := range destPeers {
+ if p == 0 {
+ continue
}
-
- node.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
- if x == 0 {
- node.peers[0].Unicast(context.Background(), msg, protocol.AgreementVoteTag, callback)
+ for j := 0; j < sendBufferLength; j++ {
+ // throw away a message as if sent
+ <-destPeers[p].sendBufferHighPrio
}
- time.Sleep(200 * time.Millisecond)
}
+ aoql := avgSendBufferHighPrioLength(node)
+ if aoql > (sendBufferLength / 2) {
+ t.Fatalf("avgOutboundQueueLength=%f wanted <%f", aoql, sendBufferLength/2.0)
+ return
+ }
+ // it shouldn't have closed for just sitting on the limit of full
+ require.False(t, peerIsClosed(&destPeers[0]))
- require.Less(t, x, 1000)
+ // function context just to contain defer cf()
+ func() {
+ timeout, cf := context.WithTimeout(context.Background(), time.Second)
+ defer cf()
+ sent := node.Broadcast(timeout, xtag, []byte{byte(42)}, true, nil)
+ assert.NoError(t, sent)
+ }()
- maxNumHandled := 0
- minNumHandled := 0
- for i := 0; i < 10; i++ {
- muHandleA.Lock()
- a := numHandledA
- muHandleA.Unlock()
-
- muHandleB.Lock()
- b := numHandledB
- muHandleB.Unlock()
-
- maxNumHandled = b
- minNumHandled = a
- if maxNumHandled < a {
- maxNumHandled = a
- minNumHandled = b
- }
- if maxNumHandled == x {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
- require.Equal(t, maxNumHandled, x)
- require.Less(t, minNumHandled, x/2)
+ // and now with the rest of the peers well and this one slow, we closed the slow one
+ require.True(t, peerIsClosed(&destPeers[0]))
}
func makeTestFilterWebsocketNode(t *testing.T, nodename string) *WebsocketNetwork {
@@ -1097,7 +1044,7 @@ func BenchmarkWebsocketNetworkBasic(t *testing.B) {
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
returns := make(chan uint64, 100)
bhandler := benchmarkHandler{returns}
- netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: &bhandler}})
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: &bhandler}})
readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
@@ -1120,7 +1067,7 @@ func BenchmarkWebsocketNetworkBasic(t *testing.B) {
}
msg := make([]byte, msgSize)
binary.LittleEndian.PutUint64(msg, uint64(i))
- err := netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
+ err := netA.Broadcast(context.Background(), protocol.TxnTag, msg, true, nil)
if err != nil {
t.Errorf("error on broadcast: %v", err)
return
@@ -1222,7 +1169,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) {
netB.config.GossipFanout = 1
netB.config.NetAddress = ""
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
- netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counterB}})
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterB}})
netB.Start()
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
@@ -1236,7 +1183,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) {
netC.config.GossipFanout = 1
netC.config.NetAddress = ""
netC.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
- netC.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counterC}})
+ netC.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterC}})
netC.Start()
defer func() { t.Log("stopping C"); netC.Stop(); t.Log("C done") }()
@@ -1260,7 +1207,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) {
waitReady(t, netA, time.After(time.Second))
firstPeer := netA.peers[0]
- netA.Broadcast(context.Background(), protocol.AgreementVoteTag, nil, true, nil)
+ netA.Broadcast(context.Background(), protocol.TxnTag, nil, true, nil)
failed := false
select {
@@ -1450,7 +1397,7 @@ func TestDelayedMessageDrop(t *testing.T) {
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
counter := newMessageCounter(t, 5)
counterDone := counter.done
- netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
+ netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
@@ -1458,7 +1405,7 @@ func TestDelayedMessageDrop(t *testing.T) {
currentTime := time.Now()
for i := 0; i < 10; i++ {
- err := netA.broadcastWithTimestamp(protocol.AgreementVoteTag, []byte("foo"), currentTime.Add(time.Hour*time.Duration(i-5)))
+ err := netA.broadcastWithTimestamp(protocol.TxnTag, []byte("foo"), currentTime.Add(time.Hour*time.Duration(i-5)))
require.NoErrorf(t, err, "No error was expected")
}
@@ -1553,7 +1500,7 @@ func TestForceMessageRelaying(t *testing.T) {
counter := newMessageCounter(t, 5)
counterDone := counter.done
- netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
+ netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
netA.Start()
addrA, postListen := netA.Address()
require.Truef(t, postListen, "Listening network failed to start")
@@ -1580,9 +1527,9 @@ func TestForceMessageRelaying(t *testing.T) {
// send 5 messages from both netB and netC to netA
for i := 0; i < 5; i++ {
- err := netB.Relay(context.Background(), protocol.AgreementVoteTag, []byte{1, 2, 3}, true, nil)
+ err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3}, true, nil)
require.NoError(t, err)
- err = netC.Relay(context.Background(), protocol.AgreementVoteTag, []byte{1, 2, 3}, true, nil)
+ err = netC.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3}, true, nil)
require.NoError(t, err)
}
@@ -1598,13 +1545,13 @@ func TestForceMessageRelaying(t *testing.T) {
netA.ClearHandlers()
counter = newMessageCounter(t, 10)
counterDone = counter.done
- netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
+ netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
// hack the relayMessages on the netB so that it would start sending messages.
netB.relayMessages = true
// send additional 10 messages from netB
for i := 0; i < 10; i++ {
- err := netB.Relay(context.Background(), protocol.AgreementVoteTag, []byte{1, 2, 3}, true, nil)
+ err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3}, true, nil)
require.NoError(t, err)
}
@@ -1827,7 +1774,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) {
// send 5 messages of few types.
for i := 0; i < 5; i++ {
netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil)
- netA.Broadcast(context.Background(), protocol.CompactCertSigTag, []byte{0, 1, 2, 3, 4}, true, nil)
+ netA.Broadcast(context.Background(), protocol.TxnTag, []byte{0, 1, 2, 3, 4}, true, nil)
netA.Broadcast(context.Background(), protocol.ProposalPayloadTag, []byte{0, 1, 2, 3, 4}, true, nil)
netA.Broadcast(context.Background(), protocol.VoteBundleTag, []byte{0, 1, 2, 3, 4}, true, nil)
}
@@ -1925,7 +1872,7 @@ func TestWebsocketDisconnection(t *testing.T) {
case eventDetails := <-dl.eventReceived:
switch disconnectPeerEventDetails := eventDetails.(type) {
case telemetryspec.DisconnectPeerEventDetails:
- require.Equal(t, string(disconnectRequestReceived), disconnectPeerEventDetails.Reason)
+ require.Equal(t, disconnectPeerEventDetails.Reason, string(disconnectRequestReceived))
default:
require.FailNow(t, "Unexpected event was send : %v", eventDetails)
}
@@ -2061,7 +2008,7 @@ func BenchmarkVariableTransactionMessageBlockSizes(t *testing.B) {
// register all the handlers.
taggedHandlersA := []TaggedMessageHandler{
{
- Tag: protocol.AgreementVoteTag,
+ Tag: protocol.TxnTag,
MessageHandler: HandlerFunc(msgHandlerA),
},
}
@@ -2085,7 +2032,7 @@ func BenchmarkVariableTransactionMessageBlockSizes(t *testing.B) {
t.ResetTimer()
startTime := time.Now()
for i := 0; i < t.N/txnCount; i++ {
- netB.Broadcast(context.Background(), protocol.AgreementVoteTag, dataBuffer, true, nil)
+ netB.Broadcast(context.Background(), protocol.TxnTag, dataBuffer, true, nil)
<-msgProcessed
}
deltaTime := time.Now().Sub(startTime)