summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com>2024-02-22 19:27:43 -0500
committerGitHub <noreply@github.com>2024-02-22 19:27:43 -0500
commitef82b2b4cd28976647ecb07841838ac008823951 (patch)
tree3119979949a1e4bb69f066327ce1cab8d5b55447
parent504c6ba8a9232802124501250650994b8256041f (diff)
p2p: introduce Gossip peer capability (#5935)feature/p2p
-rw-r--r--network/p2p/capabilities.go2
-rw-r--r--network/p2p/capabilities_test.go6
-rw-r--r--network/p2p/dnsaddr/resolve_test.go20
-rw-r--r--network/p2p/p2p.go28
-rw-r--r--network/p2p/p2p_test.go4
-rw-r--r--network/p2p/peerstore/peerstore.go33
-rw-r--r--network/p2p/peerstore/peerstore_test.go141
-rw-r--r--network/p2p/pubsub.go2
-rw-r--r--network/p2pNetwork.go295
-rw-r--r--network/p2pNetwork_test.go434
-rw-r--r--node/node.go3
-rw-r--r--node/node_test.go103
12 files changed, 869 insertions, 202 deletions
diff --git a/network/p2p/capabilities.go b/network/p2p/capabilities.go
index 1ead897e6..f48977397 100644
--- a/network/p2p/capabilities.go
+++ b/network/p2p/capabilities.go
@@ -41,6 +41,8 @@ const (
Archival Capability = "archival"
// Catchpoints storing nodes
Catchpoints = "catchpointStoring"
+ // Gossip nodes are non permissioned relays
+ Gossip = "gossip"
)
const operationTimeout = time.Second * 5
diff --git a/network/p2p/capabilities_test.go b/network/p2p/capabilities_test.go
index 08e04f6f6..2b98b4980 100644
--- a/network/p2p/capabilities_test.go
+++ b/network/p2p/capabilities_test.go
@@ -47,7 +47,7 @@ func TestCapabilities_Discovery(t *testing.T) {
testSize := 3
for i := 0; i < testSize; i++ {
tempdir := t.TempDir()
- ps, err := peerstore.NewPeerStore(nil)
+ ps, err := peerstore.NewPeerStore(nil, "")
require.NoError(t, err)
h, _, err := MakeHost(config.GetDefaultLocal(), tempdir, ps)
require.NoError(t, err)
@@ -83,7 +83,7 @@ func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT {
tmpdir := t.TempDir()
pk, err := GetPrivKey(cfg, tmpdir)
require.NoError(t, err)
- ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{})
+ ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{}, "")
require.NoError(t, err)
h, err := libp2p.New(
libp2p.ListenAddrStrings("/dns4/localhost/tcp/0"),
@@ -134,7 +134,7 @@ func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*Cap
tmpdir := t.TempDir()
pk, err := GetPrivKey(cfg, tmpdir)
require.NoError(t, err)
- ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{})
+ ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{}, "")
require.NoError(t, err)
h, err := libp2p.New(
libp2p.ListenAddrStrings("/dns4/localhost/tcp/0"),
diff --git a/network/p2p/dnsaddr/resolve_test.go b/network/p2p/dnsaddr/resolve_test.go
index 03190ab5f..30acbd3e5 100644
--- a/network/p2p/dnsaddr/resolve_test.go
+++ b/network/p2p/dnsaddr/resolve_test.go
@@ -37,22 +37,22 @@ func TestIsDnsaddr(t *testing.T) {
t.Parallel()
testcases := []struct {
- name string
- addr string
- expected bool
+ name string
+ addr string
+ isDnsaddr bool
}{
- {name: "DnsAddr", addr: "/dnsaddr/foobar.com", expected: true},
- {name: "DnsAddrWithPeerId", addr: "/dnsaddr/foobar.com/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", expected: true},
- {name: "DnsAddrWithIPPeerId", addr: "/dnsaddr/foobar.com/ip4/127.0.0.1/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", expected: true},
- {name: "Dns4Addr", addr: "/dns4/foobar.com/", expected: false},
- {name: "Dns6Addr", addr: "/dns6/foobar.com/", expected: false},
- {name: "Dns4AddrWithPeerId", addr: "/dns4/foobar.com/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", expected: false},
+ {name: "DnsAddr", addr: "/dnsaddr/foobar.com", isDnsaddr: true},
+ {name: "DnsAddrWithPeerId", addr: "/dnsaddr/foobar.com/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", isDnsaddr: true},
+ {name: "DnsAddrWithIPPeerId", addr: "/dnsaddr/foobar.com/ip4/127.0.0.1/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", isDnsaddr: true},
+ {name: "Dns4Addr", addr: "/dns4/foobar.com/", isDnsaddr: false},
+ {name: "Dns6Addr", addr: "/dns6/foobar.com/", isDnsaddr: false},
+ {name: "Dns4AddrWithPeerId", addr: "/dns4/foobar.com/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", isDnsaddr: false},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
maddr, err := multiaddr.NewMultiaddr(testcase.addr)
require.NoError(t, err)
- require.Equal(t, testcase.expected, isDnsaddr(maddr))
+ require.Equal(t, testcase.isDnsaddr, isDnsaddr(maddr))
})
}
}
diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go
index b399c569b..e67403a3a 100644
--- a/network/p2p/p2p.go
+++ b/network/p2p/p2p.go
@@ -25,8 +25,9 @@ import (
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
+ pstore "github.com/algorand/go-algorand/network/p2p/peerstore"
+ "github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-deadlock"
- "github.com/multiformats/go-multiaddr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -34,14 +35,20 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
- "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
+ "github.com/multiformats/go-multiaddr"
)
+// SubNextCancellable is an abstraction for pubsub.Subscription
+type SubNextCancellable interface {
+ Next(ctx context.Context) (*pubsub.Message, error)
+ Cancel()
+}
+
// Service defines the interface used by the network integrating with underlying p2p implementation
type Service interface {
Start() error
@@ -56,7 +63,7 @@ type Service interface {
Conns() []network.Conn
ListPeersForTopic(topic string) []peer.ID
- Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error)
+ Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error)
Publish(ctx context.Context, topic string, data []byte) error
GetStream(peer.ID) (network.Stream, bool)
@@ -83,7 +90,7 @@ const dialTimeout = 30 * time.Second
// MakeHost creates a libp2p host but does not start listening.
// Use host.Network().Listen() on the returned address to start listening.
-func MakeHost(cfg config.Local, datadir string, pstore peerstore.Peerstore) (host.Host, string, error) {
+func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host.Host, string, error) {
// load stored peer ID, or make ephemeral peer ID
privKey, err := GetPrivKey(cfg, datadir)
if err != nil {
@@ -216,20 +223,21 @@ func (s *serviceImpl) IDSigner() *PeerIDChallengeSigner {
// DialPeersUntilTargetCount attempts to establish connections to the provided phonebook addresses
func (s *serviceImpl) DialPeersUntilTargetCount(targetConnCount int) {
- peerIDs := s.host.Peerstore().Peers()
- for _, peerID := range peerIDs {
+ ps := s.host.Peerstore().(*pstore.PeerStore)
+ peerIDs := ps.GetAddresses(targetConnCount, phonebook.PhoneBookEntryRelayRole)
+ for _, peerInfo := range peerIDs {
+ peerInfo := peerInfo.(*peer.AddrInfo)
// if we are at our target count stop trying to connect
if len(s.host.Network().Conns()) == targetConnCount {
return
}
// if we are already connected to this peer, skip it
- if len(s.host.Network().ConnsToPeer(peerID)) > 0 {
+ if len(s.host.Network().ConnsToPeer(peerInfo.ID)) > 0 {
continue
}
- peerInfo := s.host.Peerstore().PeerInfo(peerID)
- err := s.DialNode(context.Background(), &peerInfo) // leaving the calls as blocking for now, to not over-connect beyond fanout
+ err := s.DialNode(context.Background(), peerInfo) // leaving the calls as blocking for now, to not over-connect beyond fanout
if err != nil {
- s.log.Warnf("failed to connect to peer %s: %v", peerID, err)
+ s.log.Warnf("failed to connect to peer %s: %v", peerInfo.ID, err)
}
}
}
diff --git a/network/p2p/p2p_test.go b/network/p2p/p2p_test.go
index 005dbc833..4935238a9 100644
--- a/network/p2p/p2p_test.go
+++ b/network/p2p/p2p_test.go
@@ -89,7 +89,7 @@ func TestP2PStreamingHost(t *testing.T) {
cfg := config.GetDefaultLocal()
dir := t.TempDir()
- pstore, err := peerstore.NewPeerStore(nil)
+ pstore, err := peerstore.NewPeerStore(nil, "")
require.NoError(t, err)
h, la, err := MakeHost(cfg, dir, pstore)
require.NoError(t, err)
@@ -115,7 +115,7 @@ func TestP2PStreamingHost(t *testing.T) {
ID: h.ID(),
Addrs: h.Addrs(),
}
- cpstore, err := peerstore.NewPeerStore([]*peer.AddrInfo{&addrInfo})
+ cpstore, err := peerstore.NewPeerStore([]*peer.AddrInfo{&addrInfo}, "")
require.NoError(t, err)
c, _, err := MakeHost(cfg, dir, cpstore)
require.NoError(t, err)
diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go
index d2a088e43..9bbb48ab4 100644
--- a/network/p2p/peerstore/peerstore.go
+++ b/network/p2p/peerstore/peerstore.go
@@ -68,18 +68,20 @@ type peerStoreCAB interface {
}
// NewPeerStore creates a new peerstore backed by a datastore.
-func NewPeerStore(addrInfo []*peer.AddrInfo) (*PeerStore, error) {
+func NewPeerStore(addrInfo []*peer.AddrInfo, network string) (*PeerStore, error) {
ps, err := mempstore.NewPeerstore()
if err != nil {
return nil, fmt.Errorf("cannot initialize a peerstore: %w", err)
}
// initialize peerstore with addresses
+ peers := make([]interface{}, len(addrInfo))
for i := 0; i < len(addrInfo); i++ {
- info := addrInfo[i]
- ps.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
+ peers[i] = addrInfo[i]
}
+
pstore := &PeerStore{peerStoreCAB: ps}
+ pstore.AddPersistentPeers(peers, network, phonebook.PhoneBookEntryRelayRole)
return pstore, nil
}
@@ -98,7 +100,7 @@ func MakePhonebook(connectionsRateLimitingCount uint,
}
// GetAddresses returns up to N addresses, but may return fewer
-func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []string {
+func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []interface{} {
return shuffleSelect(ps.filterRetryTime(time.Now(), role), n)
}
@@ -206,7 +208,7 @@ func (ps *PeerStore) UpdateConnectionTime(addr interface{}, provisionalTime time
}
// ReplacePeerList replaces the peer list for the given networkName and role.
-func (ps *PeerStore) ReplacePeerList(addressesThey []string, networkName string, role phonebook.PhoneBookEntryRoles) {
+func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName string, role phonebook.PhoneBookEntryRoles) {
// prepare a map of items we'd like to remove.
removeItems := make(map[peer.ID]bool, 0)
peerIDs := ps.Peers()
@@ -221,10 +223,7 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []string, networkName string,
}
for _, addr := range addressesThey {
- info, err := peerInfoFromDomainPort(addr)
- if err != nil {
- return
- }
+ info := addr.(*peer.AddrInfo)
data, _ := ps.Get(info.ID, addressDataKey)
if data != nil {
// we already have this.
@@ -294,7 +293,7 @@ func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) {
}
ad := data.(addressData)
delete(ad.networkNames, networkName)
- if 0 == len(ad.networkNames) {
+ if len(ad.networkNames) == 0 {
ps.ClearAddrs(peerID)
_ = ps.Put(peerID, addressDataKey, nil)
}
@@ -319,21 +318,23 @@ func (ps *PeerStore) popNElements(n int, peerID peer.ID) {
_ = ps.Put(peerID, addressDataKey, ad)
}
-func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []string {
- o := make([]string, 0, len(ps.Peers()))
+func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []interface{} {
+ o := make([]interface{}, 0, len(ps.Peers()))
for _, peerID := range ps.Peers() {
data, _ := ps.Get(peerID, addressDataKey)
if data != nil {
ad := data.(addressData)
if t.After(ad.retryAfter) && role == ad.role {
- o = append(o, string(peerID))
+ mas := ps.Addrs(peerID)
+ info := peer.AddrInfo{ID: peerID, Addrs: mas}
+ o = append(o, &info)
}
}
}
return o
}
-func shuffleSelect(set []string, n int) []string {
+func shuffleSelect(set []interface{}, n int) []interface{} {
if n >= len(set) || n == getAllAddresses {
// return shuffled copy of everything
out := slices.Clone(set)
@@ -350,13 +351,13 @@ func shuffleSelect(set []string, n int) []string {
}
}
}
- out := make([]string, n)
+ out := make([]interface{}, n)
for i, index := range indexSample {
out[i] = set[index]
}
return out
}
-func shuffleStrings(set []string) {
+func shuffleStrings(set []interface{}) {
rand.Shuffle(len(set), func(i, j int) { set[i], set[j] = set[j], set[i] })
}
diff --git a/network/p2p/peerstore/peerstore_test.go b/network/p2p/peerstore/peerstore_test.go
index ebe45b87a..83edae8fb 100644
--- a/network/p2p/peerstore/peerstore_test.go
+++ b/network/p2p/peerstore/peerstore_test.go
@@ -51,7 +51,7 @@ func TestPeerstore(t *testing.T) {
}
addrInfo, _ := PeerInfoFromAddrs(peerAddrs)
- ps, err := NewPeerStore(addrInfo)
+ ps, err := NewPeerStore(addrInfo, "net-id")
require.NoError(t, err)
defer ps.Close()
@@ -89,12 +89,13 @@ func TestPeerstore(t *testing.T) {
}
-func testPhonebookAll(t *testing.T, set []string, ph *PeerStore) {
+func testPhonebookAll(t *testing.T, set []*peer.AddrInfo, ph *PeerStore) {
actual := ph.GetAddresses(len(set), PhoneBookEntryRelayRole)
for _, got := range actual {
+ info := got.(*peer.AddrInfo)
ok := false
for _, known := range set {
- if got == known {
+ if info.ID == known.ID {
ok = true
break
}
@@ -106,7 +107,8 @@ func testPhonebookAll(t *testing.T, set []string, ph *PeerStore) {
for _, known := range set {
ok := false
for _, got := range actual {
- if got == known {
+ info := got.(*peer.AddrInfo)
+ if info.ID == known.ID {
ok = true
break
}
@@ -117,18 +119,19 @@ func testPhonebookAll(t *testing.T, set []string, ph *PeerStore) {
}
}
-func testPhonebookUniform(t *testing.T, set []string, ph *PeerStore, getsize int) {
+func testPhonebookUniform(t *testing.T, set []*peer.AddrInfo, ph *PeerStore, getsize int) {
uniformityTestLength := 250000 / len(set)
expected := (uniformityTestLength * getsize) / len(set)
counts := make(map[string]int)
for i := 0; i < len(set); i++ {
- counts[set[i]] = 0
+ counts[set[i].ID.String()] = 0
}
for i := 0; i < uniformityTestLength; i++ {
actual := ph.GetAddresses(getsize, PhoneBookEntryRelayRole)
for _, xa := range actual {
- if _, ok := counts[xa]; ok {
- counts[xa]++
+ info := xa.(*peer.AddrInfo)
+ if _, ok := counts[info.ID.String()]; ok {
+ counts[info.ID.String()]++
}
}
}
@@ -151,6 +154,13 @@ func TestArrayPhonebookAll(t *testing.T) {
partitiontest.PartitionTest(t)
set := []string{"a:4041", "b:4042", "c:4043", "d:4044", "e:4045", "f:4046", "g:4047", "h:4048", "i:4049", "j:4010"}
+ infoSet := make([]*peer.AddrInfo, 0)
+ for _, addr := range set {
+ info, err := peerInfoFromDomainPort(addr)
+ require.NoError(t, err)
+ infoSet = append(infoSet, info)
+ }
+
ph, err := MakePhonebook(1, 1*time.Millisecond)
require.NoError(t, err)
for _, addr := range set {
@@ -159,13 +169,20 @@ func TestArrayPhonebookAll(t *testing.T) {
ph.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
ph.Put(info.ID, addressDataKey, entry)
}
- testPhonebookAll(t, set, ph)
+ testPhonebookAll(t, infoSet, ph)
}
func TestArrayPhonebookUniform1(t *testing.T) {
partitiontest.PartitionTest(t)
set := []string{"a:4041", "b:4042", "c:4043", "d:4044", "e:4045", "f:4046", "g:4047", "h:4048", "i:4049", "j:4010"}
+ infoSet := make([]*peer.AddrInfo, 0)
+ for _, addr := range set {
+ info, err := peerInfoFromDomainPort(addr)
+ require.NoError(t, err)
+ infoSet = append(infoSet, info)
+ }
+
ph, err := MakePhonebook(1, 1*time.Millisecond)
require.NoError(t, err)
for _, addr := range set {
@@ -174,13 +191,20 @@ func TestArrayPhonebookUniform1(t *testing.T) {
ph.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
ph.Put(info.ID, addressDataKey, entry)
}
- testPhonebookUniform(t, set, ph, 1)
+ testPhonebookUniform(t, infoSet, ph, 1)
}
func TestArrayPhonebookUniform3(t *testing.T) {
partitiontest.PartitionTest(t)
set := []string{"a:4041", "b:4042", "c:4043", "d:4044", "e:4045", "f:4046", "g:4047", "h:4048", "i:4049", "j:4010"}
+ infoSet := make([]*peer.AddrInfo, 0)
+ for _, addr := range set {
+ info, err := peerInfoFromDomainPort(addr)
+ require.NoError(t, err)
+ infoSet = append(infoSet, info)
+ }
+
ph, err := MakePhonebook(1, 1*time.Millisecond)
require.NoError(t, err)
for _, addr := range set {
@@ -189,19 +213,25 @@ func TestArrayPhonebookUniform3(t *testing.T) {
ph.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
ph.Put(info.ID, addressDataKey, entry)
}
- testPhonebookUniform(t, set, ph, 3)
+ testPhonebookUniform(t, infoSet, ph, 3)
}
func TestMultiPhonebook(t *testing.T) {
partitiontest.PartitionTest(t)
set := []string{"a:4041", "b:4042", "c:4043", "d:4044", "e:4045", "f:4046", "g:4047", "h:4048", "i:4049", "j:4010"}
- pha := make([]string, 0)
- for _, e := range set[:5] {
+ infoSet := make([]*peer.AddrInfo, 0)
+ for _, addr := range set {
+ info, err := peerInfoFromDomainPort(addr)
+ require.NoError(t, err)
+ infoSet = append(infoSet, info)
+ }
+ pha := make([]interface{}, 0)
+ for _, e := range infoSet[:5] {
pha = append(pha, e)
}
- phb := make([]string, 0)
- for _, e := range set[5:] {
+ phb := make([]interface{}, 0)
+ for _, e := range infoSet[5:] {
phb = append(phb, e)
}
@@ -210,9 +240,9 @@ func TestMultiPhonebook(t *testing.T) {
ph.ReplacePeerList(pha, "pha", PhoneBookEntryRelayRole)
ph.ReplacePeerList(phb, "phb", PhoneBookEntryRelayRole)
- testPhonebookAll(t, set, ph)
- testPhonebookUniform(t, set, ph, 1)
- testPhonebookUniform(t, set, ph, 3)
+ testPhonebookAll(t, infoSet, ph)
+ testPhonebookUniform(t, infoSet, ph, 1)
+ testPhonebookUniform(t, infoSet, ph, 3)
}
// TestMultiPhonebookPersistentPeers validates that the peers added via Phonebook.AddPersistentPeers
@@ -224,12 +254,19 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) {
require.NoError(t, err)
persistentPeers := []interface{}{info}
set := []string{"b:4042", "c:4043", "d:4044", "e:4045", "f:4046", "g:4047", "h:4048", "i:4049", "j:4010"}
- pha := make([]string, 0)
- for _, e := range set[:5] {
+ infoSet := make([]*peer.AddrInfo, 0)
+ for _, addr := range set {
+ info, err := peerInfoFromDomainPort(addr)
+ require.NoError(t, err)
+ infoSet = append(infoSet, info)
+ }
+
+ pha := make([]interface{}, 0)
+ for _, e := range infoSet[:5] {
pha = append(pha, e)
}
- phb := make([]string, 0)
- for _, e := range set[5:] {
+ phb := make([]interface{}, 0)
+ for _, e := range infoSet[5:] {
phb = append(phb, e)
}
ph, err := MakePhonebook(1, 1*time.Millisecond)
@@ -239,12 +276,19 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) {
ph.ReplacePeerList(pha, "pha", PhoneBookEntryRelayRole)
ph.ReplacePeerList(phb, "phb", PhoneBookEntryRelayRole)
- testPhonebookAll(t, append(set, "a:4041"), ph)
+ testPhonebookAll(t, append(infoSet, info), ph)
allAddresses := ph.GetAddresses(len(set)+len(persistentPeers), PhoneBookEntryRelayRole)
for _, pp := range persistentPeers {
pp := pp.(*peer.AddrInfo)
- // TODO: modify as needed when completely switching from peerID = "host:port" to peer.AddrInfo
- require.Contains(t, allAddresses, string(pp.ID))
+ found := false
+ for _, addr := range allAddresses {
+ addr := addr.(*peer.AddrInfo)
+ if addr.ID == pp.ID {
+ found = true
+ break
+ }
+ }
+ require.True(t, found, fmt.Sprintf("%s not found in %v", string(pp.ID), allAddresses))
}
}
@@ -252,12 +296,19 @@ func TestMultiPhonebookDuplicateFiltering(t *testing.T) {
partitiontest.PartitionTest(t)
set := []string{"b:4042", "c:4043", "d:4044", "e:4045", "f:4046", "g:4047", "h:4048", "i:4049", "j:4010"}
- pha := make([]string, 0)
- for _, e := range set[:7] {
+ infoSet := make([]*peer.AddrInfo, 0)
+ for _, addr := range set {
+ info, err := peerInfoFromDomainPort(addr)
+ require.NoError(t, err)
+ infoSet = append(infoSet, info)
+ }
+
+ pha := make([]interface{}, 0)
+ for _, e := range infoSet[:7] {
pha = append(pha, e)
}
- phb := make([]string, 0)
- for _, e := range set[3:] {
+ phb := make([]interface{}, 0)
+ for _, e := range infoSet[3:] {
phb = append(phb, e)
}
ph, err := MakePhonebook(1, 1*time.Millisecond)
@@ -265,9 +316,9 @@ func TestMultiPhonebookDuplicateFiltering(t *testing.T) {
ph.ReplacePeerList(pha, "pha", PhoneBookEntryRelayRole)
ph.ReplacePeerList(phb, "phb", PhoneBookEntryRelayRole)
- testPhonebookAll(t, set, ph)
- testPhonebookUniform(t, set, ph, 1)
- testPhonebookUniform(t, set, ph, 3)
+ testPhonebookAll(t, infoSet, ph)
+ testPhonebookUniform(t, infoSet, ph, 1)
+ testPhonebookUniform(t, infoSet, ph, 3)
}
func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) {
@@ -292,7 +343,7 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) {
// Test the addresses are populated in the phonebook and a
// time can be added to one of them
- entries.ReplacePeerList([]string{addr1, addr2}, "default", PhoneBookEntryRelayRole)
+ entries.ReplacePeerList([]interface{}{info1, info2}, "default", PhoneBookEntryRelayRole)
addrInPhonebook, waitTime, provisionalTime := entries.GetConnectionWaitTime(info1)
require.Equal(t, true, addrInPhonebook)
require.Equal(t, time.Duration(0), waitTime)
@@ -407,10 +458,24 @@ func TestPhonebookRoles(t *testing.T) {
relaysSet := []string{"relay1:4040", "relay2:4041", "relay3:4042"}
archiverSet := []string{"archiver1:1111", "archiver2:1112", "archiver3:1113"}
+ infoRelaySet := make([]interface{}, 0)
+ for _, addr := range relaysSet {
+ info, err := peerInfoFromDomainPort(addr)
+ require.NoError(t, err)
+ infoRelaySet = append(infoRelaySet, info)
+ }
+
+ infoArchiverSet := make([]interface{}, 0)
+ for _, addr := range archiverSet {
+ info, err := peerInfoFromDomainPort(addr)
+ require.NoError(t, err)
+ infoArchiverSet = append(infoArchiverSet, info)
+ }
+
ph, err := MakePhonebook(1, 1)
require.NoError(t, err)
- ph.ReplacePeerList(relaysSet, "default", PhoneBookEntryRelayRole)
- ph.ReplacePeerList(archiverSet, "default", PhoneBookEntryArchiverRole)
+ ph.ReplacePeerList(infoRelaySet, "default", PhoneBookEntryRelayRole)
+ ph.ReplacePeerList(infoArchiverSet, "default", PhoneBookEntryArchiverRole)
require.Equal(t, len(relaysSet)+len(archiverSet), len(ph.Peers()))
require.Equal(t, len(relaysSet)+len(archiverSet), ph.Length())
@@ -420,11 +485,13 @@ func TestPhonebookRoles(t *testing.T) {
entries := ph.GetAddresses(l, role)
if role == PhoneBookEntryRelayRole {
for _, entry := range entries {
- require.Contains(t, entry, "relay")
+ entry := entry.(*peer.AddrInfo)
+ require.Contains(t, string(entry.ID), "relay")
}
} else if role == PhoneBookEntryArchiverRole {
for _, entry := range entries {
- require.Contains(t, entry, "archiver")
+ entry := entry.(*peer.AddrInfo)
+ require.Contains(t, string(entry.ID), "archiver")
}
}
}
diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go
index 372c9249c..5d1c14463 100644
--- a/network/p2p/pubsub.go
+++ b/network/p2p/pubsub.go
@@ -133,7 +133,7 @@ func (s *serviceImpl) getOrCreateTopic(topicName string) (*pubsub.Topic, error)
}
// Subscribe returns a subscription to the given topic
-func (s *serviceImpl) Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error) {
+func (s *serviceImpl) Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error) {
if err := s.pubsub.RegisterTopicValidator(topic, val); err != nil {
return nil, err
}
diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go
index be59e09d6..ce969e179 100644
--- a/network/p2pNetwork.go
+++ b/network/p2pNetwork.go
@@ -31,6 +31,7 @@ import (
"github.com/algorand/go-algorand/network/p2p"
"github.com/algorand/go-algorand/network/p2p/dnsaddr"
"github.com/algorand/go-algorand/network/p2p/peerstore"
+ "github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-deadlock"
@@ -67,33 +68,38 @@ type P2PNetwork struct {
wsPeersChangeCounter atomic.Int32
wsPeersConnectivityCheckTicker *time.Ticker
+ relayMessages bool // True if we should relay messages from other nodes (nominally true for relays, false otherwise)
+ wantTXGossip atomic.Bool
+
capabilitiesDiscovery *p2p.CapabilitiesDiscovery
- bootstrapper bootstrapper
- nodeInfo NodeInfo
- pstore *peerstore.PeerStore
- httpServer *p2p.HTTPServer
+ bootstrapperStart func()
+ bootstrapperStop func()
+ nodeInfo NodeInfo
+ pstore *peerstore.PeerStore
+ httpServer *p2p.HTTPServer
}
type bootstrapper struct {
- cfg config.Local
- networkID protocol.NetworkID
- phonebookPeers []*peer.AddrInfo
- resolveControler dnsaddr.ResolveController
- started bool
+ cfg config.Local
+ networkID protocol.NetworkID
+ phonebookPeers []*peer.AddrInfo
+ resolveController dnsaddr.ResolveController
+ started atomic.Bool
+ log logging.Logger
}
func (b *bootstrapper) start() {
- b.started = true
+ b.started.Store(true)
}
func (b *bootstrapper) stop() {
- b.started = false
+ b.started.Store(false)
}
func (b *bootstrapper) BootstrapFunc() []peer.AddrInfo {
// not started yet, do not give it any peers
- if !b.started {
+ if !b.started.Load() {
return nil
}
@@ -108,30 +114,76 @@ func (b *bootstrapper) BootstrapFunc() []peer.AddrInfo {
return addrs
}
- return getBootstrapPeers(b.cfg, b.networkID, b.resolveControler)
+ return dnsLookupBootstrapPeers(b.log, b.cfg, b.networkID, b.resolveController)
}
-// getBootstrapPeers looks up a list of Multiaddrs strings from the dnsaddr records at the primary
+// dnsLookupBootstrapPeers looks up a list of Multiaddrs strings from the dnsaddr records at the primary
// SRV record domain.
-func getBootstrapPeers(cfg config.Local, network protocol.NetworkID, controller dnsaddr.ResolveController) []peer.AddrInfo {
+func dnsLookupBootstrapPeers(log logging.Logger, cfg config.Local, network protocol.NetworkID, controller dnsaddr.ResolveController) []peer.AddrInfo {
var addrs []peer.AddrInfo
bootstraps := cfg.DNSBootstrapArray(network)
for _, dnsBootstrap := range bootstraps {
- resolvedAddrs, err := dnsaddr.MultiaddrsFromResolver(dnsBootstrap.PrimarySRVBootstrap, controller)
- if err != nil {
- continue
+ var resolvedAddrs, resolvedAddrsBackup []multiaddr.Multiaddr
+ var errPrim, errBackup error
+ resolvedAddrs, errPrim = dnsaddr.MultiaddrsFromResolver(dnsBootstrap.PrimarySRVBootstrap, controller)
+ if errPrim != nil {
+ log.Infof("Failed to resolve bootstrap peers from %s: %v", dnsBootstrap.PrimarySRVBootstrap, errPrim)
}
- for _, resolvedAddr := range resolvedAddrs {
- info, err0 := peer.AddrInfoFromP2pAddr(resolvedAddr)
- if err0 != nil {
- continue
+ if dnsBootstrap.BackupSRVBootstrap != "" {
+ resolvedAddrsBackup, errBackup = dnsaddr.MultiaddrsFromResolver(dnsBootstrap.BackupSRVBootstrap, controller)
+ if errBackup != nil {
+ log.Infof("Failed to resolve bootstrap peers from %s: %v", dnsBootstrap.BackupSRVBootstrap, errBackup)
}
- addrs = append(addrs, *info)
+ }
+
+ if len(resolvedAddrs) > 0 || len(resolvedAddrsBackup) > 0 {
+ resolvedAddrInfos := mergeP2PMultiaddrResolvedAddresses(resolvedAddrs, resolvedAddrsBackup)
+ addrs = append(addrs, resolvedAddrInfos...)
}
}
return addrs
}
+func mergeP2PMultiaddrResolvedAddresses(primary, backup []multiaddr.Multiaddr) []peer.AddrInfo {
+ // deduplicate addresses by PeerID
+ unique := make(map[peer.ID]*peer.AddrInfo)
+ for _, addr := range primary {
+ info, err0 := peer.AddrInfoFromP2pAddr(addr)
+ if err0 != nil {
+ continue
+ }
+ unique[info.ID] = info
+ }
+ for _, addr := range backup {
+ info, err0 := peer.AddrInfoFromP2pAddr(addr)
+ if err0 != nil {
+ continue
+ }
+ unique[info.ID] = info
+ }
+ var result []peer.AddrInfo
+ for _, addr := range unique {
+ result = append(result, *addr)
+ }
+ return result
+}
+
+func mergeP2PAddrInfoResolvedAddresses(primary, backup []peer.AddrInfo) []peer.AddrInfo {
+ // deduplicate addresses by PeerID
+ unique := make(map[peer.ID]peer.AddrInfo)
+ for _, addr := range primary {
+ unique[addr.ID] = addr
+ }
+ for _, addr := range backup {
+ unique[addr.ID] = addr
+ }
+ var result []peer.AddrInfo
+ for _, addr := range unique {
+ result = append(result, addr)
+ }
+ return result
+}
+
type p2pPeerStats struct {
txReceived atomic.Uint64
}
@@ -152,23 +204,26 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
for malAddr, malErr := range malformedAddrs {
log.Infof("Ignoring malformed phonebook address %s: %s", malAddr, malErr)
}
- pstore, err := peerstore.NewPeerStore(addrInfo)
+ pstore, err := peerstore.NewPeerStore(addrInfo, string(networkID))
if err != nil {
return nil, err
}
+ relayMessages := cfg.IsGossipServer() || cfg.ForceRelayMessages
net := &P2PNetwork{
- log: log,
- config: cfg,
- genesisID: genesisID,
- networkID: networkID,
- topicTags: map[protocol.Tag]string{"TX": p2p.TXTopicName},
- wsPeers: make(map[peer.ID]*wsPeer),
- wsPeersToIDs: make(map[*wsPeer]peer.ID),
- peerStats: make(map[peer.ID]*p2pPeerStats),
- nodeInfo: node,
- pstore: pstore,
+ log: log,
+ config: cfg,
+ genesisID: genesisID,
+ networkID: networkID,
+ topicTags: map[protocol.Tag]string{protocol.TxnTag: p2p.TXTopicName},
+ wsPeers: make(map[peer.ID]*wsPeer),
+ wsPeersToIDs: make(map[*wsPeer]peer.ID),
+ peerStats: make(map[peer.ID]*p2pPeerStats),
+ nodeInfo: node,
+ pstore: pstore,
+ relayMessages: relayMessages,
}
+
net.ctx, net.ctxCancel = context.WithCancel(context.Background())
net.handler = msgHandler{
ctx: net.ctx,
@@ -196,11 +251,14 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
}
bootstrapper := &bootstrapper{
- cfg: cfg,
- networkID: networkID,
- phonebookPeers: addrInfo,
- resolveControler: dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecuritySRVEnforced(), ""),
+ cfg: cfg,
+ networkID: networkID,
+ phonebookPeers: addrInfo,
+ resolveController: dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecuritySRVEnforced(), ""),
+ log: net.log,
}
+ net.bootstrapperStart = bootstrapper.start
+ net.bootstrapperStop = bootstrapper.stop
if cfg.EnableDHTProviders {
disc, err0 := p2p.MakeCapabilitiesDiscovery(net.ctx, cfg, h, networkID, net.log, bootstrapper.BootstrapFunc)
@@ -240,13 +298,18 @@ func (n *P2PNetwork) PeerIDSigner() identityChallengeSigner {
// Start threads, listen on sockets.
func (n *P2PNetwork) Start() error {
- n.wg.Add(1)
- n.bootstrapper.start()
+ n.bootstrapperStart()
err := n.service.Start()
if err != nil {
return err
}
- go n.txTopicHandleLoop()
+
+ wantTXGossip := n.relayMessages || n.config.ForceFetchTransactions || n.nodeInfo.IsParticipating()
+ if wantTXGossip {
+ n.wantTXGossip.Store(true)
+ n.wg.Add(1)
+ go n.txTopicHandleLoop()
+ }
if n.wsPeersConnectivityCheckTicker != nil {
n.wsPeersConnectivityCheckTicker.Stop()
@@ -263,7 +326,6 @@ func (n *P2PNetwork) Start() error {
n.wg.Add(1)
go n.broadcaster.broadcastThread(&n.wg, n)
- n.service.DialPeersUntilTargetCount(n.config.GossipFanout)
n.wg.Add(1)
go n.meshThread()
@@ -289,7 +351,7 @@ func (n *P2PNetwork) Stop() {
n.innerStop()
n.ctxCancel()
n.service.Close()
- n.bootstrapper.stop()
+ n.bootstrapperStop()
n.httpServer.Close()
n.wg.Wait()
}
@@ -314,14 +376,46 @@ func (n *P2PNetwork) innerStop() {
closeGroup.Wait()
}
+// meshThreadInner fetches nodes from DHT and attempts to connect to them
+func (n *P2PNetwork) meshThreadInner() {
+ defer n.service.DialPeersUntilTargetCount(n.config.GossipFanout)
+
+ // fetch peers from DNS
+ var dnsPeers, dhtPeers []peer.AddrInfo
+ dnsPeers = dnsLookupBootstrapPeers(n.log, n.config, n.networkID, dnsaddr.NewMultiaddrDNSResolveController(n.config.DNSSecuritySRVEnforced(), ""))
+
+ // discover peers from DHT
+ if n.capabilitiesDiscovery != nil {
+ var err error
+ dhtPeers, err = n.capabilitiesDiscovery.PeersForCapability(p2p.Gossip, n.config.GossipFanout)
+ if err != nil {
+ n.log.Warnf("Error getting relay nodes from capabilities discovery: %v", err)
+ return
+ }
+ n.log.Debugf("Discovered %d gossip peers from DHT", len(dhtPeers))
+ }
+
+ peers := mergeP2PAddrInfoResolvedAddresses(dnsPeers, dhtPeers)
+ replace := make([]interface{}, 0, len(peers))
+ for i := range peers {
+ replace = append(replace, &peers[i])
+ }
+ n.pstore.ReplacePeerList(replace, string(n.networkID), phonebook.PhoneBookEntryRelayRole)
+}
+
func (n *P2PNetwork) meshThread() {
defer n.wg.Done()
- timer := time.NewTicker(meshThreadInterval)
+ timer := time.NewTicker(1) // start immediately and reset after
defer timer.Stop()
+ var resetTimer bool
for {
select {
case <-timer.C:
- n.service.DialPeersUntilTargetCount(n.config.GossipFanout)
+ n.meshThreadInner()
+ if !resetTimer {
+ timer.Reset(meshThreadInterval)
+ resetTimer = true
+ }
case <-n.ctx.Done():
return
}
@@ -382,7 +476,10 @@ func (n *P2PNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byt
// Relay message
func (n *P2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error {
- return n.Broadcast(ctx, tag, data, wait, except)
+ if n.relayMessages {
+ return n.Broadcast(ctx, tag, data, wait, except)
+ }
+ return nil
}
// Disconnect from a peer, probably due to protocol errors.
@@ -435,6 +532,33 @@ func (n *P2PNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
// `replace` optionally drops existing connections before making new ones.
// `quit` chan allows cancellation.
func (n *P2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {
+ n.meshThreadInner()
+}
+
+func addrInfoToWsPeerCore(n *P2PNetwork, addrInfo *peer.AddrInfo) (wsPeerCore, bool) {
+ mas, err := peer.AddrInfoToP2pAddrs(addrInfo)
+ if err != nil {
+ n.log.Warnf("Archival AddrInfo conversion error: %v", err)
+ return wsPeerCore{}, false
+ }
+ if len(mas) == 0 {
+ n.log.Warnf("Archival AddrInfo: empty multiaddr for : %v", addrInfo)
+ return wsPeerCore{}, false
+ }
+ addr := mas[0].String()
+
+ maxIdleConnsPerHost := int(n.config.ConnectionsRateLimitingCount)
+ client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout, maxIdleConnsPerHost)
+ if err != nil {
+ n.log.Warnf("MakeHTTPClient failed: %v", err)
+ return wsPeerCore{}, false
+ }
+
+ peerCore := makePeerCore(
+ n.ctx, n, n.log, n.handler.readBuffer,
+ addr, client, "", /*origin address*/
+ )
+ return peerCore, true
}
// GetPeers returns a list of Peers we could potentially send a direct message to.
@@ -451,24 +575,21 @@ func (n *P2PNetwork) GetPeers(options ...PeerOption) []Peer {
}
n.wsPeersLock.RUnlock()
case PeersPhonebookRelays:
- // TODO: query peerstore for PhoneBookEntryRelayRole
- // TODO: currently peerstore is not populated in a way to store roles
- // return all nodes at the moment
-
- // // return copy of phonebook, which probably also contains peers we're connected to, but if it doesn't maybe we shouldn't be making new connections to those peers (because they disappeared from the directory)
- // addrs := n.pstore.GetAddresses(1000, PhoneBookEntryRelayRole)
- // for _, addr := range addrs {
- // peerCore := makePeerCore(n.ctx, n, n.log, n.handler.readBuffer, addr, n.GetRoundTripper(nil), "" /*origin address*/)
- // peers = append(peers, &peerCore)
- // }
-
- // temporary return all nodes
- n.wsPeersLock.RLock()
- for _, peer := range n.wsPeers {
- peers = append(peers, Peer(peer))
+ const maxNodes = 100
+ peerIDs := n.pstore.GetAddresses(maxNodes, phonebook.PhoneBookEntryRelayRole)
+ for _, peerInfo := range peerIDs {
+ peerInfo := peerInfo.(*peer.AddrInfo)
+ if peerCore, ok := addrInfoToWsPeerCore(n, peerInfo); ok {
+ peers = append(peers, &peerCore)
+ }
+ }
+ if n.log.GetLevel() >= logging.Debug && len(peers) > 0 {
+ addrs := make([]string, 0, len(peers))
+ for _, peer := range peers {
+ addrs = append(addrs, peer.(*wsPeerCore).GetAddress())
+ }
+ n.log.Debugf("Relay node(s) from peerstore: %v", addrs)
}
- n.wsPeersLock.RUnlock()
-
case PeersPhonebookArchivalNodes:
// query known archival nodes from DHT if enabled
if n.config.EnableDHTProviders {
@@ -480,30 +601,11 @@ func (n *P2PNetwork) GetPeers(options ...PeerOption) []Peer {
}
n.log.Debugf("Got %d archival node(s) from DHT", len(infos))
for _, addrInfo := range infos {
+ // TODO: remove after go1.22
info := addrInfo
- mas, err := peer.AddrInfoToP2pAddrs(&info)
- if err != nil {
- n.log.Warnf("Archival AddrInfo conversion error: %v", err)
- continue
- }
- if len(mas) == 0 {
- n.log.Warnf("Archival AddrInfo: empty multiaddr for : %v", addrInfo)
- continue
+ if peerCore, ok := addrInfoToWsPeerCore(n, &info); ok {
+ peers = append(peers, &peerCore)
}
- addr := mas[0].String()
-
- maxIdleConnsPerHost := int(n.config.ConnectionsRateLimitingCount)
- client, err := p2p.MakeHTTPClientWithRateLimit(&info, n.pstore, limitcaller.DefaultQueueingTimeout, maxIdleConnsPerHost)
- if err != nil {
- n.log.Warnf("MakeHTTPClient failed: %v", err)
- continue
- }
-
- peerCore := makePeerCore(
- n.ctx, n, n.log, n.handler.readBuffer,
- addr, client, "", /*origin address*/
- )
- peers = append(peers, &peerCore)
}
if n.log.GetLevel() >= logging.Debug && len(peers) > 0 {
addrs := make([]string, 0, len(peers))
@@ -560,7 +662,19 @@ func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) {
// this is the only indication that we have that we haven't formed a clique, where all incoming messages
// arrive very quickly, but might be missing some votes. The usage of this call is expected to have similar
// characteristics as with a watchdog timer.
-func (n *P2PNetwork) OnNetworkAdvance() {}
+func (n *P2PNetwork) OnNetworkAdvance() {
+ if n.nodeInfo != nil {
+ old := n.wantTXGossip.Load()
+ new := n.nodeInfo.IsParticipating()
+ if old != new {
+ n.wantTXGossip.Store(new)
+ if new {
+ n.wg.Add(1)
+ go n.txTopicHandleLoop()
+ }
+ }
+ }
+}
// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same
// request that was provided to the http handler ( or provide a fallback Context() to that )
@@ -720,6 +834,7 @@ func (n *P2PNetwork) txTopicHandleLoop() {
n.log.Errorf("Failed to subscribe to topic %s: %v", p2p.TXTopicName, err)
return
}
+ n.log.Debugf("Subscribed to topic %s", p2p.TXTopicName)
for {
msg, err := sub.Next(n.ctx)
@@ -727,6 +842,7 @@ func (n *P2PNetwork) txTopicHandleLoop() {
if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled {
n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID())
}
+ n.log.Debugf("Canceling subscription to topic %s due Next error", p2p.TXTopicName)
sub.Cancel()
return
}
@@ -735,6 +851,13 @@ func (n *P2PNetwork) txTopicHandleLoop() {
// from gossipsub's point of view, it's just waiting to hear back from the validator,
// and txHandler does all its work in the validator, so we don't need to do anything here
_ = msg
+
+ // participation or configuration change, cancel subscription and quit
+ if !n.wantTXGossip.Load() {
+ n.log.Debugf("Canceling subscription to topic %s due participation change", p2p.TXTopicName)
+ sub.Cancel()
+ return
+ }
}
}
diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go
index ebed7c1af..cc08cc0cc 100644
--- a/network/p2pNetwork_test.go
+++ b/network/p2pNetwork_test.go
@@ -37,16 +37,24 @@ import (
"github.com/algorand/go-algorand/test/partitiontest"
pubsub "github.com/libp2p/go-libp2p-pubsub"
+ "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
+func (n *P2PNetwork) hasPeers() bool {
+ n.wsPeersLock.RLock()
+ defer n.wsPeersLock.RUnlock()
+ return len(n.wsPeers) > 0
+}
+
func TestP2PSubmitTX(t *testing.T) {
partitiontest.PartitionTest(t)
cfg := config.GetDefaultLocal()
+ cfg.ForceFetchTransactions = true
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
@@ -66,7 +74,6 @@ func TestP2PSubmitTX(t *testing.T) {
defer netB.Stop()
netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
-
require.NoError(t, err)
netC.Start()
defer netC.Stop()
@@ -81,7 +88,13 @@ func TestP2PSubmitTX(t *testing.T) {
2*time.Second,
50*time.Millisecond,
)
+ require.Eventually(t, func() bool {
+ return netA.hasPeers() && netB.hasPeers() && netC.hasPeers()
+ }, 2*time.Second, 50*time.Millisecond)
+
+ // for some reason the above check is not enough in race builds on CI
time.Sleep(time.Second) // give time for peers to connect.
+
// now we should be connected in a line: B <-> A <-> C where both B and C are connected to A but not each other
// Since we aren't using the transaction handler in this test, we need to register a pass-through handler
@@ -117,6 +130,90 @@ func TestP2PSubmitTX(t *testing.T) {
)
}
+// TestP2PSubmitTXNoGossip tests nodes without gossip enabled cannot receive transactions
+func TestP2PSubmitTXNoGossip(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ cfg := config.GetDefaultLocal()
+ cfg.ForceFetchTransactions = true
+ log := logging.TestingLog(t)
+ netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
+ require.NoError(t, err)
+ netA.Start()
+ defer netA.Stop()
+
+ peerInfoA := netA.service.AddrInfo()
+ addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
+ require.NoError(t, err)
+ require.NotZero(t, addrsA[0])
+
+ multiAddrStr := addrsA[0].String()
+ phoneBookAddresses := []string{multiAddrStr}
+ netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
+ require.NoError(t, err)
+ netB.Start()
+ defer netB.Stop()
+
+ require.Eventually(
+ t,
+ func() bool {
+ return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) == 1 &&
+ len(netB.service.ListPeersForTopic(p2p.TXTopicName)) == 1
+ },
+ 2*time.Second,
+ 50*time.Millisecond,
+ )
+
+ // run netC in NPN mode (no relay => no gossip sup => no TX receiving)
+ cfg.ForceFetchTransactions = false
+ netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
+ require.NoError(t, err)
+ netC.Start()
+ defer netC.Stop()
+
+ require.Eventually(t, func() bool {
+ return netA.hasPeers() && netB.hasPeers() && netC.hasPeers()
+ }, 2*time.Second, 50*time.Millisecond)
+
+ time.Sleep(time.Second) // give time for peers to connect.
+
+ // ensure netC cannot receive messages
+ passThroughHandler := []TaggedMessageHandler{
+ {Tag: protocol.TxnTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage {
+ return OutgoingMessage{Action: Broadcast}
+ })},
+ }
+
+ netB.RegisterHandlers(passThroughHandler)
+ netC.RegisterHandlers(passThroughHandler)
+ for i := 0; i < 10; i++ {
+ err = netA.Broadcast(context.Background(), protocol.TxnTag, []byte(fmt.Sprintf("test %d", i)), false, nil)
+ require.NoError(t, err)
+ }
+
+ // check netB received the messages
+ require.Eventually(
+ t,
+ func() bool {
+ netB.peerStatsMu.Lock()
+ netBpeerStatsA, ok := netB.peerStats[netA.service.ID()]
+ netB.peerStatsMu.Unlock()
+ if !ok {
+ return false
+ }
+ return netBpeerStatsA.txReceived.Load() == 10
+ },
+ 1*time.Second,
+ 50*time.Millisecond,
+ )
+
+ // check netB did not receive the messages
+ netC.peerStatsMu.Lock()
+ _, ok := netC.peerStats[netA.service.ID()]
+ netC.peerStatsMu.Unlock()
+ require.False(t, ok)
+}
+
func TestP2PSubmitWS(t *testing.T) {
partitiontest.PartitionTest(t)
@@ -148,17 +245,12 @@ func TestP2PSubmitWS(t *testing.T) {
require.NoError(t, err)
defer netC.Stop()
- require.Eventually(
- t,
- func() bool {
- return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) == 2 &&
- len(netB.service.ListPeersForTopic(p2p.TXTopicName)) == 1 &&
- len(netC.service.ListPeersForTopic(p2p.TXTopicName)) == 1
- },
- 2*time.Second,
- 50*time.Millisecond,
- )
- time.Sleep(time.Second) // XX give time for peers to connect. Knowing about them being subscribed to topics is clearly not enough
+ require.Eventually(t, func() bool {
+ return netA.hasPeers() && netB.hasPeers() && netC.hasPeers()
+ }, 2*time.Second, 50*time.Millisecond)
+
+ time.Sleep(time.Second) // give time for peers to connect.
+
// now we should be connected in a line: B <-> A <-> C where both B and C are connected to A but not each other
testTag := protocol.AgreementVoteTag
@@ -242,7 +334,7 @@ func (s *mockService) ListPeersForTopic(topic string) []peer.ID {
return nil
}
-func (s *mockService) Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error) {
+func (s *mockService) Subscribe(topic string, val pubsub.ValidatorEx) (p2p.SubNextCancellable, error) {
return nil, nil
}
func (s *mockService) Publish(ctx context.Context, topic string, data []byte) error {
@@ -341,18 +433,27 @@ func (c *mockResolveController) Resolver() dnsaddr.Resolver {
type mockResolver struct{}
func (r *mockResolver) Resolve(ctx context.Context, _ ma.Multiaddr) ([]ma.Multiaddr, error) {
- maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC")
+ // return random stuff each time
+ _, publicKey, err := crypto.GenerateKeyPair(crypto.RSA, 2048)
+ if err != nil {
+ panic(err)
+ }
+ peerID, err := peer.IDFromPublicKey(publicKey)
+ if err != nil {
+ panic(err)
+ }
+ maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/p2p/" + peerID.String())
return []ma.Multiaddr{maddr}, err
}
-func TestBootstrapFunc(t *testing.T) {
+func TestP2PBootstrapFunc(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)
b := bootstrapper{}
require.Nil(t, b.BootstrapFunc())
- b.started = true
+ b.started.Store(true)
p := peer.AddrInfo{ID: "test"}
b.phonebookPeers = []*peer.AddrInfo{&p}
require.Equal(t, []peer.AddrInfo{p}, b.BootstrapFunc())
@@ -363,7 +464,7 @@ func TestBootstrapFunc(t *testing.T) {
b.cfg.DNSBootstrapID = "<network>.algodev.network"
b.cfg.DNSSecurityFlags = 0
b.networkID = "devnet"
- b.resolveControler = &mockResolveController{}
+ b.resolveController = &mockResolveController{}
addrs := b.BootstrapFunc()
@@ -373,7 +474,7 @@ func TestBootstrapFunc(t *testing.T) {
require.GreaterOrEqual(t, len(addr.Addrs), 1)
}
-func TestGetBootstrapPeersFailure(t *testing.T) {
+func TestP2PdnsLookupBootstrapPeersFailure(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)
@@ -382,12 +483,12 @@ func TestGetBootstrapPeersFailure(t *testing.T) {
cfg.DNSBootstrapID = "non-existent.algodev.network"
controller := nilResolveController{}
- addrs := getBootstrapPeers(cfg, "test", &controller)
+ addrs := dnsLookupBootstrapPeers(logging.TestingLog(t), cfg, "test", &controller)
require.Equal(t, 0, len(addrs))
}
-func TestGetBootstrapPeersInvalidAddr(t *testing.T) {
+func TestP2PdnsLookupBootstrapPeersInvalidAddr(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)
@@ -396,11 +497,29 @@ func TestGetBootstrapPeersInvalidAddr(t *testing.T) {
cfg.DNSBootstrapID = "<network>.algodev.network"
controller := nilResolveController{}
- addrs := getBootstrapPeers(cfg, "testInvalidAddr", &controller)
+ addrs := dnsLookupBootstrapPeers(logging.TestingLog(t), cfg, "testInvalidAddr", &controller)
require.Equal(t, 0, len(addrs))
}
+func TestP2PdnsLookupBootstrapPeersWithBackup(t *testing.T) {
+ t.Parallel()
+ partitiontest.PartitionTest(t)
+
+ cfg := config.GetDefaultLocal()
+ cfg.DNSSecurityFlags = 0
+ cfg.DNSBootstrapID = "<network>.algodev.network"
+
+ controller := &mockResolveController{}
+ addrs := dnsLookupBootstrapPeers(logging.TestingLog(t), cfg, "test", controller)
+ require.GreaterOrEqual(t, len(addrs), 1)
+
+ cfg.DNSBootstrapID = "<network>.algodev.network?backup=<network>.backup.algodev.network"
+ addrs = dnsLookupBootstrapPeers(logging.TestingLog(t), cfg, "test", controller)
+ require.GreaterOrEqual(t, len(addrs), 2)
+
+}
+
type capNodeInfo struct {
nopeNodeInfo
cap p2p.Capability
@@ -427,8 +546,8 @@ func waitForRouting(t *testing.T, disc *p2p.CapabilitiesDiscovery) {
}
}
-// TestP2PNetworkDHTCapabilities runs nodes with capabilites and ensures that connected nodes
-// can discover themself. The other nodes receive the first node in bootstrap list before starting.
+// TestP2PNetworkDHTCapabilities runs nodes with capabilities and ensures that connected nodes
+// can discover itself. The other nodes receive the first node in bootstrap list before starting.
// There is two variations of the test: only netA advertises capabilities, and all nodes advertise.
func TestP2PNetworkDHTCapabilities(t *testing.T) {
partitiontest.PartitionTest(t)
@@ -475,16 +594,10 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) {
require.NoError(t, err)
defer netC.Stop()
- require.Eventually(
- t,
- func() bool {
- return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
- len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
- len(netC.service.ListPeersForTopic(p2p.TXTopicName)) > 0
- },
- 2*time.Second,
- 50*time.Millisecond,
- )
+ require.Eventually(t, func() bool {
+ return netA.hasPeers() && netB.hasPeers() && netC.hasPeers()
+ }, 2*time.Second, 50*time.Millisecond)
+
t.Logf("peers connected")
nets := []*P2PNetwork{netA, netB, netC}
@@ -506,9 +619,13 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) {
t.Logf("DHT is ready")
- // ensure all peers are connected
+ // ensure all peers are connected - wait for connectivity as needed
for _, disc := range discs {
- require.Equal(t, 2, len(disc.Host().Network().Peers()))
+ go func(disc *p2p.CapabilitiesDiscovery) {
+ require.Eventuallyf(t, func() bool {
+ return len(disc.Host().Network().Peers()) == 2
+ }, time.Minute, time.Second, "Not all peers were found")
+ }(disc)
}
wg.Add(len(discs))
@@ -553,7 +670,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) {
}
// TestMultiaddrConversionToFrom ensures Multiaddr can be serialized back to an address without losing information
-func TestMultiaddrConversionToFrom(t *testing.T) {
+func TestP2PMultiaddrConversionToFrom(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()
@@ -645,3 +762,248 @@ func TestP2PHTTPHandler(t *testing.T) {
_, err = httpClient.Get("/test")
require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout)
}
+
+func TestP2PRelay(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ cfg := config.GetDefaultLocal()
+ cfg.ForceFetchTransactions = true
+ log := logging.TestingLog(t)
+ netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
+ require.NoError(t, err)
+
+ err = netA.Start()
+ require.NoError(t, err)
+ defer netA.Stop()
+
+ peerInfoA := netA.service.AddrInfo()
+ addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
+ require.NoError(t, err)
+ require.NotZero(t, addrsA[0])
+
+ multiAddrStr := addrsA[0].String()
+ phoneBookAddresses := []string{multiAddrStr}
+
+ netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
+ require.NoError(t, err)
+ err = netB.Start()
+ require.NoError(t, err)
+ defer netB.Stop()
+
+ require.Eventually(
+ t,
+ func() bool {
+ return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
+ len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0
+ },
+ 2*time.Second,
+ 50*time.Millisecond,
+ )
+
+ require.Eventually(t, func() bool {
+ return netA.hasPeers() && netB.hasPeers()
+ }, 2*time.Second, 50*time.Millisecond)
+
+ counter := newMessageCounter(t, 1)
+ counterDone := counter.done
+ netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
+
+ // send 5 messages from both netB to netA
+ // since there is no node with listening address set => no messages should be received
+ for i := 0; i < 5; i++ {
+ err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
+ require.NoError(t, err)
+ }
+
+ select {
+ case <-counterDone:
+ require.Fail(t, "No messages should have been received")
+ case <-time.After(1 * time.Second):
+ }
+
+ // add netC with listening address set, and enable relaying on netB
+ // ensure all messages are received by netA
+ cfg.NetAddress = "127.0.0.1:0"
+ netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
+ require.NoError(t, err)
+ err = netC.Start()
+ require.NoError(t, err)
+ defer netC.Stop()
+
+ netB.relayMessages = true
+
+ require.Eventually(
+ t,
+ func() bool {
+ return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
+ len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
+ len(netC.service.ListPeersForTopic(p2p.TXTopicName)) > 0
+ },
+ 2*time.Second,
+ 50*time.Millisecond,
+ )
+
+ require.Eventually(t, func() bool {
+ return netA.hasPeers() && netB.hasPeers() && netC.hasPeers()
+ }, 2*time.Second, 50*time.Millisecond)
+
+ const expectedMsgs = 10
+ counter = newMessageCounter(t, expectedMsgs)
+ counterDone = counter.done
+ netA.ClearHandlers()
+ netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})
+
+ for i := 0; i < expectedMsgs/2; i++ {
+ err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
+ require.NoError(t, err)
+ err = netC.Relay(context.Background(), protocol.TxnTag, []byte{11, 12, 10 + byte(i), 14}, true, nil)
+ require.NoError(t, err)
+ }
+ // send some duplicate messages, they should be dropped
+ for i := 0; i < expectedMsgs/2; i++ {
+ err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
+ require.NoError(t, err)
+ }
+
+ select {
+ case <-counterDone:
+ case <-time.After(2 * time.Second):
+ if counter.count < expectedMsgs {
+ require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", expectedMsgs, counter.count)
+ } else if counter.count > expectedMsgs {
+ require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", expectedMsgs, counter.count)
+ }
+ }
+}
+
+type mockSubPService struct {
+ mockService
+ count atomic.Int64
+}
+
+type mockSubscription struct {
+}
+
+func (m *mockSubscription) Next(ctx context.Context) (*pubsub.Message, error) { return nil, nil }
+func (m *mockSubscription) Cancel() {}
+
+func (m *mockSubPService) Subscribe(topic string, val pubsub.ValidatorEx) (p2p.SubNextCancellable, error) {
+ m.count.Add(1)
+ return &mockSubscription{}, nil
+}
+
+// TestP2PWantTXGossip checks txTopicHandleLoop runs as expected on wantTXGossip changes
+func TestP2PWantTXGossip(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ // cancelled context to trigger subscription.Next to return
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ mockService := &mockSubPService{}
+ net := &P2PNetwork{
+ service: mockService,
+ log: logging.TestingLog(t),
+ ctx: ctx,
+ nodeInfo: &nopeNodeInfo{},
+ }
+ net.wantTXGossip.Store(false)
+
+ // ensure wantTXGossip from false to false is noop
+ net.OnNetworkAdvance()
+ require.Eventually(t, func() bool { net.wg.Wait(); return true }, 1*time.Second, 50*time.Millisecond)
+ require.Equal(t, int64(0), mockService.count.Load())
+ require.False(t, net.wantTXGossip.Load())
+
+ // ensure wantTXGossip from true (wantTXGossip) to false (nopeNodeInfo) is noop
+ net.wantTXGossip.Store(true)
+ net.OnNetworkAdvance()
+ require.Eventually(t, func() bool { net.wg.Wait(); return true }, 1*time.Second, 50*time.Millisecond)
+ require.Equal(t, int64(0), mockService.count.Load())
+ require.False(t, net.wantTXGossip.Load())
+
+ // check false to true change triggers subscription
+ net.nodeInfo = &participatingNodeInfo{}
+ net.OnNetworkAdvance()
+ require.Eventually(t, func() bool { return mockService.count.Load() == 1 }, 1*time.Second, 50*time.Millisecond)
+ require.True(t, net.wantTXGossip.Load())
+
+ // check true to true change is noop
+ net.OnNetworkAdvance()
+ require.Eventually(t, func() bool { return mockService.count.Load() == 1 }, 1*time.Second, 50*time.Millisecond)
+ require.True(t, net.wantTXGossip.Load())
+}
+
+func TestMergeP2PAddrInfoResolvedAddresses(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ m1, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN")
+ require.NoError(t, err)
+ m2, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb")
+ require.NoError(t, err)
+ m3, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC")
+ require.NoError(t, err)
+ m4, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001")
+ require.NoError(t, err)
+
+ var tests = []struct {
+ name string
+ primary []ma.Multiaddr
+ backup []ma.Multiaddr
+ expected int
+ hasInvalid bool
+ }{
+ {"no overlap", []ma.Multiaddr{m1}, []ma.Multiaddr{m2}, 2, false},
+ {"complete overlap", []ma.Multiaddr{m1}, []ma.Multiaddr{m1}, 1, false},
+ {"partial overlap", []ma.Multiaddr{m1, m2}, []ma.Multiaddr{m1, m3}, 3, false},
+ {"empty slices", []ma.Multiaddr{}, []ma.Multiaddr{}, 0, false},
+ {"nil slices", nil, nil, 0, false},
+ {"invalid p2p", []ma.Multiaddr{m1, m4}, []ma.Multiaddr{m2, m4}, 2, true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ r1 := mergeP2PMultiaddrResolvedAddresses(tt.primary, tt.backup)
+ if len(r1) != tt.expected {
+ t.Errorf("Expected %d addresses, got %d", tt.expected, len(r1))
+ }
+
+ var info1 []peer.AddrInfo
+ var info2 []peer.AddrInfo
+ for _, addr := range tt.primary {
+ info, err0 := peer.AddrInfoFromP2pAddr(addr)
+ if tt.hasInvalid {
+ if err0 == nil {
+ info1 = append(info1, *info)
+ }
+ } else {
+ require.NoError(t, err0)
+ info1 = append(info1, *info)
+ }
+ }
+ for _, addr := range tt.backup {
+ info, err0 := peer.AddrInfoFromP2pAddr(addr)
+ if tt.hasInvalid {
+ if err0 == nil {
+ info2 = append(info2, *info)
+ }
+ } else {
+ require.NoError(t, err0)
+ info2 = append(info2, *info)
+ }
+ }
+ if info1 == nil && tt.primary != nil {
+ info1 = []peer.AddrInfo{}
+ }
+ if info2 == nil && tt.backup != nil {
+ info1 = []peer.AddrInfo{}
+ }
+
+ r2 := mergeP2PAddrInfoResolvedAddresses(info1, info2)
+ if len(r2) != tt.expected {
+ t.Errorf("Expected %d addresses, got %d", tt.expected, len(r2))
+ }
+ })
+ }
+}
diff --git a/node/node.go b/node/node.go
index b848a643c..33c3ca865 100644
--- a/node/node.go
+++ b/node/node.go
@@ -399,6 +399,9 @@ func (node *AlgorandFullNode) Capabilities() []p2p.Capability {
if node.config.StoresCatchpoints() {
caps = append(caps, p2p.Catchpoints)
}
+ if node.config.EnableGossipService && node.config.IsGossipServer() {
+ caps = append(caps, p2p.Gossip)
+ }
return caps
}
diff --git a/node/node_test.go b/node/node_test.go
index a83ea7ab3..d4e2a08eb 100644
--- a/node/node_test.go
+++ b/node/node_test.go
@@ -911,7 +911,7 @@ func TestNodeHybridTopology(t *testing.T) {
t.Logf("Node%d phonebook: empty", i)
return []string{}
case 1:
- // node 1 (R) connectes to all
+ // node 1 (R) connects to all
t.Logf("Node%d phonebook: %s, %s, %s, %s", i, ni[0].wsNetAddr(), ni[2].wsNetAddr(), ni[0].p2pMultiAddr(), ni[2].p2pMultiAddr())
return []string{ni[0].wsNetAddr(), ni[2].wsNetAddr(), ni[0].p2pMultiAddr(), ni[2].p2pMultiAddr()}
case 2:
@@ -952,3 +952,104 @@ func TestNodeHybridTopology(t *testing.T) {
require.Fail(t, fmt.Sprintf("no block notification for wallet: %v.", wallets[0]))
}
}
+
+// TestNodeP2PRelays creates a network of 3 nodes with the following topology:
+// R1 (relay, DHT) -> R2 (relay, phonebook) <- N (part node)
+// Expect N to discover R1 via DHT and connect to it.
+func TestNodeP2PRelays(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ const consensusTest0 = protocol.ConsensusVersion("test0")
+
+ configurableConsensus := make(config.ConsensusProtocols)
+
+ testParams0 := config.Consensus[protocol.ConsensusCurrentVersion]
+ testParams0.AgreementFilterTimeoutPeriod0 = 500 * time.Millisecond
+ configurableConsensus[consensusTest0] = testParams0
+
+ minMoneyAtStart := 1_000_000
+ maxMoneyAtStart := 100_000_000_000
+ gen := rand.New(rand.NewSource(2))
+
+ const numAccounts = 3
+ acctStake := make([]basics.MicroAlgos, numAccounts)
+ // only node N has stake
+ acctStake[2] = basics.MicroAlgos{Raw: uint64(minMoneyAtStart + (gen.Int() % (maxMoneyAtStart - minMoneyAtStart)))}
+
+ configHook := func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) {
+ cfg = config.GetDefaultLocal()
+ cfg.BaseLoggerDebugLevel = uint32(logging.Debug)
+ cfg.EnableP2P = true
+ cfg.NetAddress = ""
+ cfg.EnableDHTProviders = true
+
+ cfg.P2PPersistPeerID = true
+ genesisDirs, err := cfg.EnsureAndResolveGenesisDirs(ni.rootDir, ni.genesis.ID(), nil)
+ require.NoError(t, err)
+ privKey, err := p2p.GetPrivKey(cfg, genesisDirs.RootGenesisDir)
+ require.NoError(t, err)
+ ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic())
+ require.NoError(t, err)
+
+ switch ni.idx {
+ case 2:
+ // N is not a relay
+ default:
+ cfg.NetAddress = ni.p2pNetAddr()
+ }
+ return ni, cfg
+ }
+
+ phonebookHook := func(ni []nodeInfo, i int) []string {
+ switch i {
+ case 0:
+ // node R1 connects to R2
+ t.Logf("Node%d phonebook: %s", i, ni[1].p2pMultiAddr())
+ return []string{ni[1].p2pMultiAddr()}
+ case 1:
+ // node R2 connects to none one
+ t.Logf("Node%d phonebook: empty", i)
+ return []string{}
+ case 2:
+ // node N only connects to R1
+ t.Logf("Node%d phonebook: %s", i, ni[1].p2pMultiAddr())
+ return []string{ni[1].p2pMultiAddr()}
+ default:
+ t.Errorf("not expected number of nodes: %d", i)
+ t.FailNow()
+ }
+ return nil
+ }
+
+ backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
+ defer backlogPool.Shutdown()
+
+ nodes, wallets := setupFullNodesEx(t, consensusTest0, backlogPool, configurableConsensus, acctStake, configHook, phonebookHook)
+ require.Len(t, nodes, 3)
+ require.Len(t, wallets, 3)
+ for i := 0; i < len(nodes); i++ {
+ defer os.Remove(wallets[i])
+ defer nodes[i].Stop()
+ }
+
+ startAndConnectNodes(nodes, nodelayFirstNodeStartDelay)
+
+ require.Eventually(t, func() bool {
+ connectPeers(nodes)
+
+ // since p2p open streams based on peer ID, there is no way to judge
+ // connectivity based on exact In/Out so count both
+ return len(nodes[0].net.GetPeers(network.PeersConnectedIn, network.PeersConnectedOut)) >= 1 &&
+ len(nodes[1].net.GetPeers(network.PeersConnectedIn, network.PeersConnectedOut)) >= 2 &&
+ len(nodes[2].net.GetPeers(network.PeersConnectedIn, network.PeersConnectedOut)) >= 1
+ }, 60*time.Second, 1*time.Second)
+
+ t.Log("Nodes connected to R2")
+
+ // wait until N gets R1 in its phonebook
+ require.Eventually(t, func() bool {
+ // refresh N's peers in order to learn DHT data faster
+ nodes[2].net.RequestConnectOutgoing(false, nil)
+ return len(nodes[2].net.GetPeers(network.PeersPhonebookRelays)) == 2
+ }, 80*time.Second, 1*time.Second)
+}