diff options
Diffstat (limited to 'network/wsNetwork_test.go')
-rw-r--r-- | network/wsNetwork_test.go | 225 |
1 files changed, 86 insertions, 139 deletions
diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 1a5a3d427..74c690241 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -238,7 +238,7 @@ func TestWebsocketNetworkBasic(t *testing.T) { defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() counter := newMessageCounter(t, 2) counterDone := counter.done - netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) readyTimeout := time.NewTimer(2 * time.Second) waitReady(t, netA, readyTimeout.C) @@ -246,8 +246,8 @@ func TestWebsocketNetworkBasic(t *testing.T) { waitReady(t, netB, readyTimeout.C) t.Log("b ready") - netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte("foo"), false, nil) - netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte("bar"), false, nil) + netA.Broadcast(context.Background(), protocol.TxnTag, []byte("foo"), false, nil) + netA.Broadcast(context.Background(), protocol.TxnTag, []byte("bar"), false, nil) select { case <-counterDone: @@ -274,7 +274,7 @@ func TestWebsocketNetworkUnicast(t *testing.T) { defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() counter := newMessageCounter(t, 2) counterDone := counter.done - netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) readyTimeout := time.NewTimer(2 * time.Second) waitReady(t, netA, readyTimeout.C) @@ -285,9 +285,9 @@ func TestWebsocketNetworkUnicast(t *testing.T) { require.Equal(t, 1, len(netA.peers)) require.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn))) peerB := netA.peers[0] - err := peerB.Unicast(context.Background(), []byte("foo"), protocol.AgreementVoteTag, nil) + err := peerB.Unicast(context.Background(), []byte("foo"), protocol.TxnTag) assert.NoError(t, err) - err = peerB.Unicast(context.Background(), []byte("bar"), protocol.AgreementVoteTag, nil) + err = peerB.Unicast(context.Background(), []byte("bar"), protocol.TxnTag) assert.NoError(t, err) select { @@ -353,7 +353,7 @@ func TestWebsocketNetworkArray(t *testing.T) { defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() counter := newMessageCounter(t, 3) counterDone := counter.done - netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) readyTimeout := time.NewTimer(2 * time.Second) waitReady(t, netA, readyTimeout.C) @@ -361,7 +361,7 @@ func TestWebsocketNetworkArray(t *testing.T) { waitReady(t, netB, readyTimeout.C) t.Log("b ready") - tags := []protocol.Tag{protocol.AgreementVoteTag, protocol.AgreementVoteTag, protocol.AgreementVoteTag} + tags := []protocol.Tag{protocol.TxnTag, protocol.TxnTag, protocol.TxnTag} data := [][]byte{[]byte("foo"), []byte("bar"), []byte("algo")} netA.BroadcastArray(context.Background(), tags, data, false, nil) @@ -390,7 +390,7 @@ func TestWebsocketNetworkCancel(t *testing.T) { defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() counter := newMessageCounter(t, 100) counterDone := counter.done - netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) readyTimeout := time.NewTimer(2 * time.Second) waitReady(t, netA, readyTimeout.C) @@ -401,8 +401,8 @@ func TestWebsocketNetworkCancel(t *testing.T) { tags := make([]protocol.Tag, 100) data := make([][]byte, 100) for i := range data { - tags[i] = protocol.AgreementVoteTag - data[i] = []byte(fmt.Sprintf("%d", i)) + tags[i] = protocol.TxnTag + data[i] = []byte(string(rune(i))) } ctx, cancel := context.WithCancel(context.Background()) @@ -438,7 +438,7 @@ func TestWebsocketNetworkCancel(t *testing.T) { mbytes := make([]byte, len(tbytes)+len(msg)) copy(mbytes, tbytes) copy(mbytes[len(tbytes):], msg) - msgs = append(msgs, sendMessage{data: mbytes, enqueued: time.Now(), peerEnqueued: enqueueTime, ctx: context.Background()}) + msgs = append(msgs, sendMessage{data: mbytes, enqueued: time.Now(), peerEnqueued: enqueueTime, hash: crypto.Hash(mbytes), ctx: context.Background()}) } msgs[50].ctx = ctx @@ -476,7 +476,7 @@ func TestWebsocketNetworkNoAddress(t *testing.T) { defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() counter := newMessageCounter(t, 2) counterDone := counter.done - netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) readyTimeout := time.NewTimer(2 * time.Second) waitReady(t, netA, readyTimeout.C) @@ -484,8 +484,8 @@ func TestWebsocketNetworkNoAddress(t *testing.T) { waitReady(t, netB, readyTimeout.C) t.Log("b ready") - netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte("foo"), false, nil) - netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte("bar"), false, nil) + netA.Broadcast(context.Background(), protocol.TxnTag, []byte("foo"), false, nil) + netA.Broadcast(context.Background(), protocol.TxnTag, []byte("bar"), false, nil) select { case <-counterDone: @@ -508,7 +508,7 @@ func lineNetwork(t *testing.T, numNodes int) (nodes []*WebsocketNetwork, counter addrPrev, postListen := nodes[i-1].Address() require.True(t, postListen) nodes[i].phonebook.ReplacePeerList([]string{addrPrev}, "default", PhoneBookEntryRelayRole) - nodes[i].RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: &counters[i]}}) + nodes[i].RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: &counters[i]}}) } nodes[i].Start() counters[i].t = t @@ -565,7 +565,7 @@ func TestLineNetwork(t *testing.T) { sendTime := time.Now().UnixNano() var timeblob [8]byte binary.LittleEndian.PutUint64(timeblob[:], uint64(sendTime)) - nodes[0].Broadcast(context.Background(), protocol.AgreementVoteTag, timeblob[:], true, nil) + nodes[0].Broadcast(context.Background(), protocol.TxnTag, timeblob[:], true, nil) } select { case <-counterDone: @@ -792,124 +792,71 @@ func avgSendBufferHighPrioLength(wn *WebsocketNetwork) float64 { return float64(sum) / float64(len(wn.peers)) } -// TestSlowOutboundPeer tests what happens when one outbound peer is slow and the rest are fine. +// TestSlowOutboundPeer tests what happens when one outbound peer is slow and the rest are fine. Current logic is to disconnect the one slow peer when its outbound channel is full. +// +// This is a deeply invasive test that reaches into the guts of WebsocketNetwork and wsPeer. If the implementation chainges consider throwing away or totally reimplementing this test. func TestSlowOutboundPeer(t *testing.T) { partitiontest.PartitionTest(t) - nodeA := makeTestWebsocketNode(t) - nodeA.config.GossipFanout = 0 - nodeA.Start() - defer nodeA.Stop() - - addrA, postListenA := nodeA.Address() - require.True(t, postListenA) - - nodeB := makeTestWebsocketNode(t) - nodeB.config.GossipFanout = 0 - nodeB.Start() - defer nodeB.Stop() - - addrB, postListenB := nodeB.Address() - require.True(t, postListenB) - + t.Skip() // todo - update this test to reflect the new implementation. + xtag := protocol.ProposalPayloadTag node := makeTestWebsocketNode(t) - node.config.GossipFanout = 2 - dl := eventsDetailsLogger{Logger: logging.TestingLog(t), eventReceived: make(chan interface{}, 100), eventIdentifier: telemetryspec.DisconnectPeerEvent} - node.log = dl - - node.phonebook.ReplacePeerList([]string{addrA, addrB}, "default", PhoneBookEntryRelayRole) - node.Start() - defer node.Stop() - - msg := make([]byte, 100) - rand.Read(msg) - - muHandleA := deadlock.Mutex{} - numHandledA := 0 - waitMessageArriveHandlerA := func(msg IncomingMessage) (out OutgoingMessage) { - muHandleA.Lock() - defer muHandleA.Unlock() - numHandledA++ - return + destPeers := make([]wsPeer, 5) + for i := range destPeers { + destPeers[i].closing = make(chan struct{}) + destPeers[i].net = node + destPeers[i].sendBufferHighPrio = make(chan sendMessages, sendBufferLength) + destPeers[i].sendBufferBulk = make(chan sendMessages, sendBufferLength) + destPeers[i].conn = &nopConnSingleton + destPeers[i].rootURL = fmt.Sprintf("fake %d", i) + node.addPeer(&destPeers[i]) } - nodeA.RegisterHandlers([]TaggedMessageHandler{ - { - Tag: protocol.AgreementVoteTag, - MessageHandler: HandlerFunc(waitMessageArriveHandlerA), - }}) - - muHandleB := deadlock.Mutex{} - numHandledB := 0 - waitMessageArriveHandlerB := func(msg IncomingMessage) (out OutgoingMessage) { - muHandleB.Lock() - defer muHandleB.Unlock() - numHandledB++ - return + node.Start() + tctx, cf := context.WithTimeout(context.Background(), 5*time.Second) + for i := 0; i < sendBufferLength; i++ { + t.Logf("broadcast %d", i) + sent := node.Broadcast(tctx, xtag, []byte{byte(i)}, true, nil) + require.NoError(t, sent) } - nodeB.RegisterHandlers([]TaggedMessageHandler{ - { - Tag: protocol.AgreementVoteTag, - MessageHandler: HandlerFunc(waitMessageArriveHandlerB), - }}) - - readyTimeout := time.NewTimer(2 * time.Second) - waitReady(t, node, readyTimeout.C) - require.Equal(t, 2, len(node.peers)) - - callback := func(enqueued bool, sequenceNumber uint64) error { - time.Sleep(2 * maxMessageQueueDuration) - return nil + cf() + ok := false + for i := 0; i < 10; i++ { + time.Sleep(time.Millisecond) + aoql := avgSendBufferHighPrioLength(node) + if aoql == sendBufferLength { + ok = true + break + } + t.Logf("node.avgOutboundQueueLength() %f", aoql) } - - rand.Read(msg) - x := 0 -MAINLOOP: - for ; x < 1000; x++ { - select { - case eventDetails := <-dl.eventReceived: - switch disconnectPeerEventDetails := eventDetails.(type) { - case telemetryspec.DisconnectPeerEventDetails: - require.Equal(t, string(disconnectSlowConn), disconnectPeerEventDetails.Reason) - default: - require.FailNow(t, "Unexpected event was send : %v", eventDetails) - } - break MAINLOOP - default: + require.True(t, ok) + for p := range destPeers { + if p == 0 { + continue } - - node.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil) - if x == 0 { - node.peers[0].Unicast(context.Background(), msg, protocol.AgreementVoteTag, callback) + for j := 0; j < sendBufferLength; j++ { + // throw away a message as if sent + <-destPeers[p].sendBufferHighPrio } - time.Sleep(200 * time.Millisecond) } + aoql := avgSendBufferHighPrioLength(node) + if aoql > (sendBufferLength / 2) { + t.Fatalf("avgOutboundQueueLength=%f wanted <%f", aoql, sendBufferLength/2.0) + return + } + // it shouldn't have closed for just sitting on the limit of full + require.False(t, peerIsClosed(&destPeers[0])) - require.Less(t, x, 1000) + // function context just to contain defer cf() + func() { + timeout, cf := context.WithTimeout(context.Background(), time.Second) + defer cf() + sent := node.Broadcast(timeout, xtag, []byte{byte(42)}, true, nil) + assert.NoError(t, sent) + }() - maxNumHandled := 0 - minNumHandled := 0 - for i := 0; i < 10; i++ { - muHandleA.Lock() - a := numHandledA - muHandleA.Unlock() - - muHandleB.Lock() - b := numHandledB - muHandleB.Unlock() - - maxNumHandled = b - minNumHandled = a - if maxNumHandled < a { - maxNumHandled = a - minNumHandled = b - } - if maxNumHandled == x { - break - } - time.Sleep(100 * time.Millisecond) - } - require.Equal(t, maxNumHandled, x) - require.Less(t, minNumHandled, x/2) + // and now with the rest of the peers well and this one slow, we closed the slow one + require.True(t, peerIsClosed(&destPeers[0])) } func makeTestFilterWebsocketNode(t *testing.T, nodename string) *WebsocketNetwork { @@ -1097,7 +1044,7 @@ func BenchmarkWebsocketNetworkBasic(t *testing.B) { defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() returns := make(chan uint64, 100) bhandler := benchmarkHandler{returns} - netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: &bhandler}}) + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: &bhandler}}) readyTimeout := time.NewTimer(2 * time.Second) waitReady(t, netA, readyTimeout.C) @@ -1120,7 +1067,7 @@ func BenchmarkWebsocketNetworkBasic(t *testing.B) { } msg := make([]byte, msgSize) binary.LittleEndian.PutUint64(msg, uint64(i)) - err := netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil) + err := netA.Broadcast(context.Background(), protocol.TxnTag, msg, true, nil) if err != nil { t.Errorf("error on broadcast: %v", err) return @@ -1222,7 +1169,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) { netB.config.GossipFanout = 1 netB.config.NetAddress = "" netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) - netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counterB}}) + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterB}}) netB.Start() defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() @@ -1236,7 +1183,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) { netC.config.GossipFanout = 1 netC.config.NetAddress = "" netC.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) - netC.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counterC}}) + netC.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterC}}) netC.Start() defer func() { t.Log("stopping C"); netC.Stop(); t.Log("C done") }() @@ -1260,7 +1207,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) { waitReady(t, netA, time.After(time.Second)) firstPeer := netA.peers[0] - netA.Broadcast(context.Background(), protocol.AgreementVoteTag, nil, true, nil) + netA.Broadcast(context.Background(), protocol.TxnTag, nil, true, nil) failed := false select { @@ -1450,7 +1397,7 @@ func TestDelayedMessageDrop(t *testing.T) { defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() counter := newMessageCounter(t, 5) counterDone := counter.done - netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) readyTimeout := time.NewTimer(2 * time.Second) waitReady(t, netA, readyTimeout.C) @@ -1458,7 +1405,7 @@ func TestDelayedMessageDrop(t *testing.T) { currentTime := time.Now() for i := 0; i < 10; i++ { - err := netA.broadcastWithTimestamp(protocol.AgreementVoteTag, []byte("foo"), currentTime.Add(time.Hour*time.Duration(i-5))) + err := netA.broadcastWithTimestamp(protocol.TxnTag, []byte("foo"), currentTime.Add(time.Hour*time.Duration(i-5))) require.NoErrorf(t, err, "No error was expected") } @@ -1553,7 +1500,7 @@ func TestForceMessageRelaying(t *testing.T) { counter := newMessageCounter(t, 5) counterDone := counter.done - netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) + netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) netA.Start() addrA, postListen := netA.Address() require.Truef(t, postListen, "Listening network failed to start") @@ -1580,9 +1527,9 @@ func TestForceMessageRelaying(t *testing.T) { // send 5 messages from both netB and netC to netA for i := 0; i < 5; i++ { - err := netB.Relay(context.Background(), protocol.AgreementVoteTag, []byte{1, 2, 3}, true, nil) + err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3}, true, nil) require.NoError(t, err) - err = netC.Relay(context.Background(), protocol.AgreementVoteTag, []byte{1, 2, 3}, true, nil) + err = netC.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3}, true, nil) require.NoError(t, err) } @@ -1598,13 +1545,13 @@ func TestForceMessageRelaying(t *testing.T) { netA.ClearHandlers() counter = newMessageCounter(t, 10) counterDone = counter.done - netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) + netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) // hack the relayMessages on the netB so that it would start sending messages. netB.relayMessages = true // send additional 10 messages from netB for i := 0; i < 10; i++ { - err := netB.Relay(context.Background(), protocol.AgreementVoteTag, []byte{1, 2, 3}, true, nil) + err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3}, true, nil) require.NoError(t, err) } @@ -1827,7 +1774,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { // send 5 messages of few types. for i := 0; i < 5; i++ { netA.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil) - netA.Broadcast(context.Background(), protocol.CompactCertSigTag, []byte{0, 1, 2, 3, 4}, true, nil) + netA.Broadcast(context.Background(), protocol.TxnTag, []byte{0, 1, 2, 3, 4}, true, nil) netA.Broadcast(context.Background(), protocol.ProposalPayloadTag, []byte{0, 1, 2, 3, 4}, true, nil) netA.Broadcast(context.Background(), protocol.VoteBundleTag, []byte{0, 1, 2, 3, 4}, true, nil) } @@ -1925,7 +1872,7 @@ func TestWebsocketDisconnection(t *testing.T) { case eventDetails := <-dl.eventReceived: switch disconnectPeerEventDetails := eventDetails.(type) { case telemetryspec.DisconnectPeerEventDetails: - require.Equal(t, string(disconnectRequestReceived), disconnectPeerEventDetails.Reason) + require.Equal(t, disconnectPeerEventDetails.Reason, string(disconnectRequestReceived)) default: require.FailNow(t, "Unexpected event was send : %v", eventDetails) } @@ -2061,7 +2008,7 @@ func BenchmarkVariableTransactionMessageBlockSizes(t *testing.B) { // register all the handlers. taggedHandlersA := []TaggedMessageHandler{ { - Tag: protocol.AgreementVoteTag, + Tag: protocol.TxnTag, MessageHandler: HandlerFunc(msgHandlerA), }, } @@ -2085,7 +2032,7 @@ func BenchmarkVariableTransactionMessageBlockSizes(t *testing.B) { t.ResetTimer() startTime := time.Now() for i := 0; i < t.N/txnCount; i++ { - netB.Broadcast(context.Background(), protocol.AgreementVoteTag, dataBuffer, true, nil) + netB.Broadcast(context.Background(), protocol.TxnTag, dataBuffer, true, nil) <-msgProcessed } deltaTime := time.Now().Sub(startTime) |