summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Lee <64482439+algojohnlee@users.noreply.github.com>2022-06-10 16:31:49 -0400
committerGitHub <noreply@github.com>2022-06-10 16:31:49 -0400
commitccb2ec3cc82f3e6c203b7c9eddc729d9a8ece24e (patch)
treea345b2789d5e1d596b0e979dac3204bb09359325
parentadb47f944bcc6b6fcddd85c963220a58d2623cef (diff)
parent53c3684fd437316b484ef2ba8e5ff88c5f945ee5 (diff)
Merge pull request #4116 from onetechnical/relbeta3.7.2v3.7.2-beta
go-algorand v3.7.2-beta
-rw-r--r--buildnumber.dat2
-rw-r--r--network/wsNetwork.go42
-rw-r--r--network/wsNetwork_test.go180
3 files changed, 132 insertions, 92 deletions
diff --git a/buildnumber.dat b/buildnumber.dat
index d00491fd7..0cfbf0888 100644
--- a/buildnumber.dat
+++ b/buildnumber.dat
@@ -1 +1 @@
-1
+2
diff --git a/network/wsNetwork.go b/network/wsNetwork.go
index 68394a924..1a4070fd1 100644
--- a/network/wsNetwork.go
+++ b/network/wsNetwork.go
@@ -417,8 +417,8 @@ type WebsocketNetwork struct {
// messagesOfInterestMu protects messagesOfInterest and ensures
// that messagesOfInterestEnc does not change once it is set during
// network start.
- messagesOfInterestMu deadlock.Mutex
- messagesOfInterestCond *sync.Cond
+ messagesOfInterestMu deadlock.Mutex
+ messagesOfInterestRefresh chan struct{}
// peersConnectivityCheckTicker is the timer for testing that all the connected peers
// are still transmitting or receiving information. The channel produced by this ticker
@@ -764,7 +764,7 @@ func (wn *WebsocketNetwork) setup() {
SupportedProtocolVersions = []string{wn.config.NetworkProtocolVersion}
}
- wn.messagesOfInterestCond = sync.NewCond(&wn.messagesOfInterestMu)
+ wn.messagesOfInterestRefresh = make(chan struct{}, 2)
wn.messagesOfInterestGeneration = 1 // something nonzero so that any new wsPeer needs updating
if wn.relayMessages {
wn.RegisterMessageInterest(protocol.CompactCertSigTag)
@@ -1178,7 +1178,7 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf
if messagesOfInterestEnc != nil {
peer.sendMessagesOfInterest(messagesOfInterestGeneration, messagesOfInterestEnc)
} else {
- wn.log.Infof("msgOfInterest Enc=nil")
+ wn.log.Infof("msgOfInterest Enc=nil, MOIGen=%d", messagesOfInterestGeneration)
}
}
}
@@ -1726,14 +1726,10 @@ func (wn *WebsocketNetwork) OnNetworkAdvance() {
defer wn.lastNetworkAdvanceMu.Unlock()
wn.lastNetworkAdvance = time.Now().UTC()
if wn.nodeInfo != nil && !wn.relayMessages && !wn.config.ForceFetchTransactions {
- // if we're not a relay, and not participating, we don't need txn pool
- wantTXGossip := wn.nodeInfo.IsParticipating()
- if wantTXGossip && (wn.wantTXGossip != wantTXGossipYes) {
- wn.RegisterMessageInterest(protocol.TxnTag)
- wn.wantTXGossip = wantTXGossipYes
- } else if !wantTXGossip && (wn.wantTXGossip != wantTXGossipNo) {
- wn.DeregisterMessageInterest(protocol.TxnTag)
- wn.wantTXGossip = wantTXGossipNo
+ select {
+ case wn.messagesOfInterestRefresh <- struct{}{}:
+ default:
+ // if the notify chan is full, it will get around to updating the latest when it actually runs
}
}
}
@@ -2350,18 +2346,24 @@ func (wn *WebsocketNetwork) updateMessagesOfInterestEnc() {
wn.messagesOfInterestEnc = MarshallMessageOfInterestMap(wn.messagesOfInterest)
wn.messagesOfInterestEncoded = true
atomic.AddUint32(&wn.messagesOfInterestGeneration, 1)
- wn.messagesOfInterestCond.Broadcast()
+ var peers []*wsPeer
+ peers, _ = wn.peerSnapshot(peers)
+ for _, peer := range peers {
+ wn.maybeSendMessagesOfInterest(peer, wn.messagesOfInterestEnc)
+ }
}
func (wn *WebsocketNetwork) postMessagesOfInterestThread() {
- var peers []*wsPeer
- wn.messagesOfInterestMu.Lock()
- defer wn.messagesOfInterestMu.Unlock()
for {
- wn.messagesOfInterestCond.Wait()
- peers, _ = wn.peerSnapshot(peers)
- for _, peer := range peers {
- wn.maybeSendMessagesOfInterest(peer, wn.messagesOfInterestEnc)
+ <-wn.messagesOfInterestRefresh
+ // if we're not a relay, and not participating, we don't need txn pool
+ wantTXGossip := wn.nodeInfo.IsParticipating()
+ if wantTXGossip && (wn.wantTXGossip != wantTXGossipYes) {
+ wn.RegisterMessageInterest(protocol.TxnTag)
+ atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipYes)
+ } else if !wantTXGossip && (wn.wantTXGossip != wantTXGossipNo) {
+ wn.DeregisterMessageInterest(protocol.TxnTag)
+ atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipNo)
}
}
}
diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go
index 02294f0c0..424586f01 100644
--- a/network/wsNetwork_test.go
+++ b/network/wsNetwork_test.go
@@ -219,6 +219,13 @@ func waitReady(t testing.TB, wn *WebsocketNetwork, timeout <-chan time.Time) boo
}
}
+func netStop(t testing.TB, wn *WebsocketNetwork, name string) {
+ t.Logf("stopping %s", name)
+ wn.Stop()
+ time.Sleep(time.Millisecond) // Stop is imperfect and some worker threads can log an error after Stop and that causes a testing error
+ t.Logf("%s done", name)
+}
+
// Set up two nodes, test that a.Broadcast is received by B
func TestWebsocketNetworkBasic(t *testing.T) {
partitiontest.PartitionTest(t)
@@ -226,7 +233,7 @@ func TestWebsocketNetworkBasic(t *testing.T) {
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
@@ -234,7 +241,7 @@ func TestWebsocketNetworkBasic(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
@@ -262,7 +269,7 @@ func TestWebsocketNetworkUnicast(t *testing.T) {
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
@@ -270,7 +277,7 @@ func TestWebsocketNetworkUnicast(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
@@ -303,7 +310,7 @@ func TestWebsocketPeerData(t *testing.T) {
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
@@ -311,7 +318,7 @@ func TestWebsocketPeerData(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
@@ -341,7 +348,7 @@ func TestWebsocketNetworkArray(t *testing.T) {
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
@@ -349,7 +356,7 @@ func TestWebsocketNetworkArray(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
counter := newMessageCounter(t, 3)
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
@@ -378,7 +385,7 @@ func TestWebsocketNetworkCancel(t *testing.T) {
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
@@ -386,7 +393,7 @@ func TestWebsocketNetworkCancel(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
counter := newMessageCounter(t, 100)
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
@@ -461,7 +468,7 @@ func TestWebsocketNetworkNoAddress(t *testing.T) {
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
noAddressConfig := defaultConfig
noAddressConfig.NetAddress = ""
@@ -472,7 +479,7 @@ func TestWebsocketNetworkNoAddress(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
@@ -886,7 +893,7 @@ func TestDupFilter(t *testing.T) {
netA := makeTestFilterWebsocketNode(t, "a")
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestFilterWebsocketNode(t, "b")
netB.config.GossipFanout = 2
addrA, postListen := netA.Address()
@@ -894,7 +901,7 @@ func TestDupFilter(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
counter := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})}
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
debugTag2 := protocol.ProposalPayloadTag
@@ -1032,7 +1039,7 @@ func BenchmarkWebsocketNetworkBasic(t *testing.B) {
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
@@ -1040,7 +1047,7 @@ func BenchmarkWebsocketNetworkBasic(t *testing.B) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
returns := make(chan uint64, 100)
bhandler := benchmarkHandler{returns}
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: &bhandler}})
@@ -1109,7 +1116,7 @@ func TestWebsocketNetworkPrio(t *testing.T) {
netA.config.GossipFanout = 1
netA.prioResponseChan = make(chan *wsPeer, 10)
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
prioB := netPrioStub{}
crypto.RandBytes(prioB.addr[:])
@@ -1122,7 +1129,7 @@ func TestWebsocketNetworkPrio(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
// Wait for response message to propagate from B to A
select {
@@ -1154,7 +1161,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) {
netA.config.GossipFanout = 2
netA.prioResponseChan = make(chan *wsPeer, 10)
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
addrA, postListen := netA.Address()
require.True(t, postListen)
@@ -1170,7 +1177,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) {
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterB}})
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
counterC := newMessageCounter(t, 1)
counterCdone := counterC.done
@@ -1382,7 +1389,7 @@ func TestDelayedMessageDrop(t *testing.T) {
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
noAddressConfig := defaultConfig
noAddressConfig.NetAddress = ""
@@ -1393,7 +1400,7 @@ func TestDelayedMessageDrop(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
counter := newMessageCounter(t, 5)
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
@@ -1435,7 +1442,7 @@ func TestSlowPeerDisconnection(t *testing.T) {
netA := wn
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
noAddressConfig := defaultConfig
noAddressConfig.NetAddress = ""
@@ -1446,7 +1453,7 @@ func TestSlowPeerDisconnection(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
@@ -1508,7 +1515,7 @@ func TestForceMessageRelaying(t *testing.T) {
netA := wn
netA.config.GossipFanout = 1
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
counter := newMessageCounter(t, 5)
counterDone := counter.done
@@ -1523,7 +1530,7 @@ func TestForceMessageRelaying(t *testing.T) {
netB.config.GossipFanout = 1
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
noAddressConfig.ForceRelayMessages = true
netC := makeTestWebsocketNodeWithConfig(t, noAddressConfig)
@@ -1672,7 +1679,7 @@ func TestWebsocketNetworkTopicRoundtrip(t *testing.T) {
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
@@ -1680,7 +1687,7 @@ func TestWebsocketNetworkTopicRoundtrip(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
netB.RegisterHandlers([]TaggedMessageHandler{
{
@@ -1729,6 +1736,41 @@ var (
testTags = []protocol.Tag{ft1, ft2, ft3, ft4}
)
+func waitPeerInternalChanQuiet(t *testing.T, netA *WebsocketNetwork) {
+ // okay, but now we need to wait for asynchronous thread within netA to _apply_ the MOI to its peer for netB...
+ timeout := time.Now().Add(100 * time.Millisecond)
+ waiting := true
+ for waiting {
+ time.Sleep(1 * time.Millisecond)
+ peers := netA.GetPeers(PeersConnectedIn)
+ for _, pg := range peers {
+ wp := pg.(*wsPeer)
+ if len(wp.sendBufferHighPrio)+len(wp.sendBufferBulk) == 0 {
+ waiting = false
+ break
+ }
+ }
+ if time.Now().After(timeout) {
+ for _, pg := range peers {
+ wp := pg.(*wsPeer)
+ if len(wp.sendBufferHighPrio)+len(wp.sendBufferBulk) == 0 {
+ t.Fatalf("netA peer buff empty timeout len(high)=%d, len(bulk)=%d", len(wp.sendBufferHighPrio), len(wp.sendBufferBulk))
+ }
+ }
+ }
+ }
+}
+
+func waitForMOIRefreshQuiet(netB *WebsocketNetwork) {
+ for {
+ // wait for async messagesOfInterestRefresh
+ time.Sleep(time.Millisecond)
+ if len(netB.messagesOfInterestRefresh) == 0 {
+ break
+ }
+ }
+}
+
// Set up two nodes, have one of them request a certain message tag mask, and verify the other follow that.
func TestWebsocketNetworkMessageOfInterest(t *testing.T) {
partitiontest.PartitionTest(t)
@@ -1738,7 +1780,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) {
netA.config.EnablePingHandler = false
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
netB.config.EnablePingHandler = false
@@ -1747,7 +1789,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
incomingMsgSync := deadlock.Mutex{}
msgCounters := make(map[protocol.Tag]int)
@@ -1802,28 +1844,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) {
// send another message which we can track, so that we'll know that the first message was delivered.
netB.Broadcast(context.Background(), protocol.VoteBundleTag, []byte{0, 1, 2, 3, 4}, true, nil)
messageFilterArriveWg.Wait()
- // okay, but now we need to wait for asynchronous thread within netA to _apply_ the MOI to its peer for netB...
- timeout := time.Now().Add(100 * time.Millisecond)
- waiting := true
- for waiting {
- time.Sleep(1 * time.Millisecond)
- peers := netA.GetPeers(PeersConnectedIn)
- for _, pg := range peers {
- wp := pg.(*wsPeer)
- if len(wp.sendBufferHighPrio)+len(wp.sendBufferBulk) == 0 {
- waiting = false
- break
- }
- }
- if time.Now().After(timeout) {
- for _, pg := range peers {
- wp := pg.(*wsPeer)
- if len(wp.sendBufferHighPrio)+len(wp.sendBufferBulk) == 0 {
- t.Fatalf("netA peer buff empty timeout len(high)=%d, len(bulk)=%d", len(wp.sendBufferHighPrio), len(wp.sendBufferBulk))
- }
- }
- }
- }
+ waitPeerInternalChanQuiet(t, netA)
messageArriveWg.Add(5 * 2) // we're expecting exactly 10 messages.
// send 5 messages of few types.
@@ -1872,7 +1893,7 @@ func TestWebsocketNetworkTXMessageOfInterestRelay(t *testing.T) {
netA.config.EnablePingHandler = false
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
bConfig := defaultConfig
bConfig.NetAddress = ""
bConfig.ForceRelayMessages = true
@@ -1884,12 +1905,13 @@ func TestWebsocketNetworkTXMessageOfInterestRelay(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
incomingMsgSync := deadlock.Mutex{}
msgCounters := make(map[protocol.Tag]int)
messageArriveWg := sync.WaitGroup{}
msgHandler := func(msg IncomingMessage) (out OutgoingMessage) {
+ t.Logf("A->B %s", msg.Tag)
incomingMsgSync.Lock()
defer incomingMsgSync.Unlock()
msgCounters[msg.Tag] = msgCounters[msg.Tag] + 1
@@ -1923,6 +1945,7 @@ func TestWebsocketNetworkTXMessageOfInterestRelay(t *testing.T) {
waitReady(t, netB, readyTimeout.C)
netB.OnNetworkAdvance()
+ waitForMOIRefreshQuiet(netB)
// send another message which we can track, so that we'll know that the first message was delivered.
netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil)
messageFilterArriveWg.Wait()
@@ -1954,7 +1977,7 @@ func TestWebsocketNetworkTXMessageOfInterestForceTx(t *testing.T) {
netA.config.EnablePingHandler = false
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
bConfig := defaultConfig
bConfig.NetAddress = ""
bConfig.ForceFetchTransactions = true
@@ -1966,12 +1989,13 @@ func TestWebsocketNetworkTXMessageOfInterestForceTx(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
incomingMsgSync := deadlock.Mutex{}
msgCounters := make(map[protocol.Tag]int)
messageArriveWg := sync.WaitGroup{}
msgHandler := func(msg IncomingMessage) (out OutgoingMessage) {
+ t.Logf("A->B %s", msg.Tag)
incomingMsgSync.Lock()
defer incomingMsgSync.Unlock()
msgCounters[msg.Tag] = msgCounters[msg.Tag] + 1
@@ -2005,6 +2029,7 @@ func TestWebsocketNetworkTXMessageOfInterestForceTx(t *testing.T) {
waitReady(t, netB, readyTimeout.C)
netB.OnNetworkAdvance()
+ waitForMOIRefreshQuiet(netB)
// send another message which we can track, so that we'll know that the first message was delivered.
netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil)
messageFilterArriveWg.Wait()
@@ -2034,7 +2059,7 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) {
netA.config.GossipFanout = 1
netA.config.EnablePingHandler = false
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
bConfig := defaultConfig
bConfig.NetAddress = ""
@@ -2046,14 +2071,15 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
require.False(t, netB.relayMessages)
- require.Equal(t, uint32(wantTXGossipUnk), netB.wantTXGossip)
+ require.Equal(t, uint32(wantTXGossipUnk), atomic.LoadUint32(&netB.wantTXGossip))
incomingMsgSync := deadlock.Mutex{}
msgCounters := make(map[protocol.Tag]int)
messageArriveWg := sync.WaitGroup{}
msgHandler := func(msg IncomingMessage) (out OutgoingMessage) {
+ t.Logf("A->B %s", msg.Tag)
incomingMsgSync.Lock()
defer incomingMsgSync.Unlock()
msgCounters[msg.Tag] = msgCounters[msg.Tag] + 1
@@ -2087,12 +2113,18 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) {
waitReady(t, netB, readyTimeout.C)
netB.OnNetworkAdvance()
- // TODO: better event driven thing for netB sending new MOI
- time.Sleep(10 * time.Millisecond)
- require.Equal(t, uint32(wantTXGossipNo), netB.wantTXGossip)
+ waitForMOIRefreshQuiet(netB)
+ for i := 0; i < 10; i++ {
+ if atomic.LoadUint32(&netB.wantTXGossip) == uint32(wantTXGossipNo) {
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
+ require.Equal(t, uint32(wantTXGossipNo), atomic.LoadUint32(&netB.wantTXGossip))
// send another message which we can track, so that we'll know that the first message was delivered.
netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil)
messageFilterArriveWg.Wait()
+ waitPeerInternalChanQuiet(t, netA)
messageArriveWg.Add(5 * 3) // we're expecting exactly 15 messages.
// send 5 messages of few types.
@@ -2105,7 +2137,7 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) {
// wait until all the expected messages arrive.
messageArriveWg.Wait()
incomingMsgSync.Lock()
- require.Equal(t, 3, len(msgCounters))
+ require.Equal(t, 3, len(msgCounters), msgCounters)
for tag, count := range msgCounters {
if tag == protocol.TxnTag {
require.Equal(t, 0, count)
@@ -2131,7 +2163,7 @@ func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) {
netA.config.GossipFanout = 1
netA.config.EnablePingHandler = false
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
bConfig := defaultConfig
bConfig.NetAddress = ""
@@ -2144,14 +2176,15 @@ func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
require.False(t, netB.relayMessages)
- require.Equal(t, uint32(wantTXGossipUnk), netB.wantTXGossip)
+ require.Equal(t, uint32(wantTXGossipUnk), atomic.LoadUint32(&netB.wantTXGossip))
incomingMsgSync := deadlock.Mutex{}
msgCounters := make(map[protocol.Tag]int)
messageArriveWg := sync.WaitGroup{}
msgHandler := func(msg IncomingMessage) (out OutgoingMessage) {
+ t.Logf("A->B %s", msg.Tag)
incomingMsgSync.Lock()
defer incomingMsgSync.Unlock()
msgCounters[msg.Tag] = msgCounters[msg.Tag] + 1
@@ -2185,9 +2218,14 @@ func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) {
waitReady(t, netB, readyTimeout.C)
netB.OnNetworkAdvance()
- // TODO: better event driven thing for netB sending new MOI
- time.Sleep(10 * time.Millisecond)
- require.Equal(t, uint32(wantTXGossipYes), netB.wantTXGossip)
+ waitForMOIRefreshQuiet(netB)
+ for i := 0; i < 10; i++ {
+ if atomic.LoadUint32(&netB.wantTXGossip) == uint32(wantTXGossipYes) {
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
+ require.Equal(t, uint32(wantTXGossipYes), atomic.LoadUint32(&netB.wantTXGossip))
// send another message which we can track, so that we'll know that the first message was delivered.
netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil)
messageFilterArriveWg.Wait()
@@ -2229,7 +2267,7 @@ func TestWebsocketDisconnection(t *testing.T) {
netA.log = dl
netA.Start()
- defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
+ defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
netB.config.EnablePingHandler = false
@@ -2238,7 +2276,7 @@ func TestWebsocketDisconnection(t *testing.T) {
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
- defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
+ defer netStop(t, netB, "B")
msgHandlerA := func(msg IncomingMessage) (out OutgoingMessage) {
// if we received a message, send a message back.