From afb570efcde21aaf8adf96e1d4569d9016ab295d Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Oct 2024 12:47:37 +0300 Subject: [PATCH 01/18] start implementing lightpush test --- wakuv2/nwaku.go | 100 +++++++++++++++++++++++++++++++++++++++++++ wakuv2/nwaku_test.go | 66 +++++++++++++++++++++++++++- 2 files changed, 164 insertions(+), 2 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index baa4e2e4987..53085f4a036 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2267,6 +2267,106 @@ func (w *Waku) LegacyStoreNode() legacy_store.Store { return nil } +type WakuMessageHash = string +type WakuPubsubTopic = string +type WakuContentTopic = string + +type WakuConfig struct { + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + NodeKey string `json:"key,omitempty"` + EnableRelay bool `json:"relay"` + LogLevel string `json:"logLevel"` + DnsDiscovery bool `json:"dnsDiscovery,omitempty"` + DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` + MaxMessageSize string `json:"maxMessageSize,omitempty"` + Staticnodes []string `json:"staticnodes,omitempty"` + Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` + Discv5Discovery bool `json:"discv5Discovery,omitempty"` + ClusterID uint16 `json:"clusterId,omitempty"` + Shards []uint16 `json:"shards,omitempty"` + PeerExchange bool `json:"peerExchange,omitempty"` + PeerExchangeNode string `json:"peerExchangeNode,omitempty"` +} + +type Waku struct { + wakuCtx unsafe.Pointer + + appDB *sql.DB + + dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery + dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map + + // Filter-related + filters *common.Filters // Message filters installed with Subscribe function + filterManager *filterapi.FilterManager + + privateKeys map[string]*ecdsa.PrivateKey // Private key storage + symKeys map[string][]byte // Symmetric key storage + keyMu sync.RWMutex // Mutex associated with key stores + + envelopeCache *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] // Pool of envelopes currently tracked by this node + poolMu sync.RWMutex // Mutex to sync the message and expiration pools + + bandwidthCounter *metrics.BandwidthCounter + + protectedTopicStore *persistence.ProtectedTopicsStore + + sendQueue *publish.MessageQueue + limiter *publish.PublishRateLimiter + + missingMsgVerifier *missing.MissingMessageVerifier + + msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + cfg *WakuConfig + options []node.WakuNodeOption + + envelopeFeed event.Feed + + storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids + storeMsgIDsMu sync.RWMutex + + messageSentCheck *publish.MessageSentCheck + + topicHealthStatusChan chan peermanager.TopicHealthStatus + connectionNotifChan chan node.PeerConnection + connStatusSubscriptions map[string]*types.ConnStatusSubscription + connStatusMu sync.Mutex + onlineChecker *onlinechecker.DefaultOnlineChecker + state connection.State + + logger *zap.Logger + + // NTP Synced timesource + timesource *timesource.NTPTimeSource + + // seededBootnodesForDiscV5 indicates whether we manage to retrieve discovery + // bootnodes successfully + seededBootnodesForDiscV5 bool + + // goingOnline is channel that notifies when connectivity has changed from offline to online + goingOnline chan struct{} + + // discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery + discV5BootstrapNodes []string + + onHistoricMessagesRequestFailed func([]byte, peer.ID, error) + onPeerStats func(types.ConnStatus) + + // statusTelemetryClient ITelemetryClient + + defaultShardInfo protocol.RelayShards +} + +func (w *Waku) Stop() error { + return w.WakuStop() +} + func WakuSetup() { C.waku_setup() } diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index fdf6f5458a9..2c32d2ee919 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -13,6 +13,7 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/store" + "go.uber.org/zap" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -323,10 +324,69 @@ func makeTestTree(domain string, nodes []*enode.Node, links []string) (*ethdnsdi return tree, url } -/* func TestPeerExchange(t *testing.T) { + logger, err := zap.NewDevelopment() require.NoError(t, err) + + // start node which serve as PeerExchange server + pxServerConfig := WakuConfig{ + // Port: 30303, + // NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", + EnableRelay: false, + LogLevel: "DEBUG", + Discv5Discovery: true, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: true, + } + pxServerNode, err := New(nil, "", &pxServerConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, pxServerNode.Start()) + + time.Sleep(1 * time.Second) + + enr, err := pxServerNode.ENR() + require.NoError(t, err) + require.NotNil(t, enr) + + // start node that will be discovered by PeerExchange + discV5NodeConfig := WakuConfig{ + // Port: 30303, + // NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", + EnableRelay: false, + LogLevel: "DEBUG", + Discv5Discovery: true, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: false, + Discv5BootstrapNodes: []string{enr.String()}, + } + + discV5Node, err := New(nil, "", &discV5NodeConfig, logger.Named("discV5Node"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, discV5Node.Start()) + + time.Sleep(1 * time.Second) + + // start light node which use PeerExchange to discover peers + + pxClientConfig := WakuConfig{ + // Port: 30303, + // NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", + EnableRelay: false, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: false, + PeerExchangeNode: "", // TODO: fill + } + + + + /* logger, err := zap.NewDevelopment() + require.NoError(t, err) // start node which serve as PeerExchange server config := &Config{} config.ClusterID = 16 @@ -401,9 +461,11 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, lightNode.Stop()) require.NoError(t, pxServerNode.Stop()) - require.NoError(t, discV5Node.Stop()) + require.NoError(t, discV5Node.Stop()) */ } +/* + func TestWakuV2Filter(t *testing.T) { t.Skip("flaky test") From 141aad2bc9a34e720083974d7c27d4d896f844a3 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Oct 2024 15:46:03 +0300 Subject: [PATCH 02/18] calling relaySubscrbe only if relay is enabled --- wakuv2/nwaku.go | 153 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 53085f4a036..ae6e9232b74 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -3276,4 +3276,157 @@ func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue { return r.storeResponse.GetMessages() +// New creates a WakuV2 client ready to communicate through the LibP2P network. +func New(nodeKey *ecdsa.PrivateKey, + fleet string, + cfg *WakuConfig, + logger *zap.Logger, + appDB *sql.DB, + ts *timesource.NTPTimeSource, + onHistoricMessagesRequestFailed func([]byte, peer.ID, error), + onPeerStats func(types.ConnStatus)) (*Waku, error) { + + // Lock the main goroutine to its current OS thread + runtime.LockOSThread() + + WakuSetup() // This should only be called once in the whole app's life + + node, err := wakuNew(nodeKey, + fleet, + cfg, logger, appDB, ts, onHistoricMessagesRequestFailed, + onPeerStats) + if err != nil { + return nil, err + } + + defaultPubsubTopic, err := node.WakuDefaultPubsubTopic() + if err != nil { + fmt.Println("Error happened:", err.Error()) + } + + if cfg.EnableRelay { + err = node.WakuRelaySubscribe(defaultPubsubTopic) + if err != nil { + fmt.Println("Error happened:", err.Error()) + } + } + + node.WakuSetEventCallback() + + return node, nil + + // if !cfg.UseThrottledPublish || testing.Testing() { + // // To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter, + // // basically disabling the rate limit functionality + // waku.limiter = publish.NewPublishRateLimiter(rate.Inf, 1) + + // } else { + // waku.limiter = publish.NewPublishRateLimiter(publishingLimiterRate, publishingLimitBurst) + // } + + // waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) + // waku.bandwidthCounter = metrics.NewBandwidthCounter() + + // if nodeKey == nil { + // // No nodekey is provided, create an ephemeral key + // nodeKey, err = crypto.GenerateKey() + // if err != nil { + // return nil, fmt.Errorf("failed to generate a random go-waku private key: %v", err) + // } + // } + + // hostAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprint(cfg.Host, ":", cfg.Port)) + // if err != nil { + // return nil, fmt.Errorf("failed to setup the network interface: %v", err) + // } + + // libp2pOpts := node.DefaultLibP2POptions + // libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(waku.bandwidthCounter)) + // libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) + + // opts := []node.WakuNodeOption{ + // node.WithLibP2POptions(libp2pOpts...), + // node.WithPrivateKey(nodeKey), + // node.WithHostAddress(hostAddr), + // node.WithConnectionNotification(waku.connectionNotifChan), + // node.WithTopicHealthStatusChannel(waku.topicHealthStatusChan), + // node.WithKeepAlive(randomPeersKeepAliveInterval, allPeersKeepAliveInterval), + // node.WithLogger(logger), + // node.WithLogLevel(logger.Level()), + // node.WithClusterID(cfg.ClusterID), + // node.WithMaxMsgSize(1024 * 1024), + // } + + // if cfg.EnableDiscV5 { + // bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes) + // if err != nil { + // logger.Error("failed to get bootstrap nodes", zap.Error(err)) + // return nil, err + // } + // opts = append(opts, node.WithDiscoveryV5(uint(cfg.UDPPort), bootnodes, cfg.AutoUpdate)) + // } + // shards, err := protocol.TopicsToRelayShards(cfg.DefaultShardPubsubTopic) + // if err != nil { + // logger.Error("FATAL ERROR: failed to parse relay shards", zap.Error(err)) + // return nil, errors.New("failed to parse relay shard, invalid pubsubTopic configuration") + // } + // if len(shards) == 0 { //Hack so that tests don't fail. TODO: Need to remove this once tests are changed to use proper cluster and shard. + // shardInfo := protocol.RelayShards{ClusterID: 0, ShardIDs: []uint16{0}} + // shards = append(shards, shardInfo) + // } + // waku.defaultShardInfo = shards[0] + // if cfg.LightClient { + // opts = append(opts, node.WithWakuFilterLightNode()) + // waku.defaultShardInfo = shards[0] + // opts = append(opts, node.WithMaxPeerConnections(cfg.DiscoveryLimit)) + // cfg.EnableStoreConfirmationForMessagesSent = false + // //TODO: temporary work-around to improve lightClient connectivity, need to be removed once community sharding is implemented + // opts = append(opts, node.WithPubSubTopics(cfg.DefaultShardedPubsubTopics)) + // } else { + // relayOpts := []pubsub.Option{ + // pubsub.WithMaxMessageSize(int(waku.cfg.MaxMessageSize)), + // } + + // if waku.logger.Level() == zap.DebugLevel { + // relayOpts = append(relayOpts, pubsub.WithEventTracer(waku)) + // } + + // opts = append(opts, node.WithWakuRelayAndMinPeers(waku.cfg.MinPeersForRelay, relayOpts...)) + // opts = append(opts, node.WithMaxPeerConnections(maxRelayPeers)) + // cfg.EnablePeerExchangeClient = true //Enabling this until discv5 issues are resolved. This will enable more peers to be connected for relay mesh. + // cfg.EnableStoreConfirmationForMessagesSent = true + // } + + // if cfg.EnableStore { + // if appDB == nil { + // return nil, errors.New("appDB is required for store") + // } + // opts = append(opts, node.WithWakuStore()) + // dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(appDB), persistence.WithRetentionPolicy(cfg.StoreCapacity, time.Duration(cfg.StoreSeconds)*time.Second)) + // if err != nil { + // return nil, err + // } + // opts = append(opts, node.WithMessageProvider(dbStore)) + // } + + // if !cfg.LightClient { + // opts = append(opts, node.WithWakuFilterFullNode(filter.WithMaxSubscribers(20))) + // opts = append(opts, node.WithLightPush(lightpush.WithRateLimiter(1, 1))) + // } + + // if appDB != nil { + // waku.protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB) + // if err != nil { + // return nil, err + // } + // } + + // if cfg.EnablePeerExchangeServer { + // opts = append(opts, node.WithPeerExchange(peer_exchange.WithRateLimiter(1, 1))) + // } + + // waku.options = opts + // waku.logger.Info("setup the go-waku node successfully") + + // return waku, nil } From 5b4c2e4e1dcc628052fc4ed1545456029915c7c0 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Oct 2024 17:29:54 +0300 Subject: [PATCH 03/18] fixing address in use error --- wakuv2/nwaku.go | 1 + wakuv2/nwaku_test.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index ae6e9232b74..9a93af2cd41 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2283,6 +2283,7 @@ type WakuConfig struct { Staticnodes []string `json:"staticnodes,omitempty"` Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` Discv5Discovery bool `json:"discv5Discovery,omitempty"` + Discv5UdpPort uint16 `json:"discv5UdpPort,omitempty"` ClusterID uint16 `json:"clusterId,omitempty"` Shards []uint16 `json:"shards,omitempty"` PeerExchange bool `json:"peerExchange,omitempty"` diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 2c32d2ee919..5b8d7c8629e 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -339,7 +339,9 @@ func TestPeerExchange(t *testing.T) { ClusterID: 16, Shards: []uint16{64}, PeerExchange: true, + Discv5UdpPort: 9000, } + pxServerNode, err := New(nil, "", &pxServerConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, pxServerNode.Start()) @@ -350,6 +352,8 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, err) require.NotNil(t, enr) + ma, err := pxServerNode.ListenAddresses() + // start node that will be discovered by PeerExchange discV5NodeConfig := WakuConfig{ // Port: 30303, @@ -361,6 +365,7 @@ func TestPeerExchange(t *testing.T) { Shards: []uint16{64}, PeerExchange: false, Discv5BootstrapNodes: []string{enr.String()}, + Discv5UdpPort: 9001, } discV5Node, err := New(nil, "", &discV5NodeConfig, logger.Named("discV5Node"), nil, nil, nil, nil) @@ -380,9 +385,18 @@ func TestPeerExchange(t *testing.T) { ClusterID: 16, Shards: []uint16{64}, PeerExchange: false, + Discv5UdpPort: 9002, PeerExchangeNode: "", // TODO: fill } + lightNode, err := New(nil, "", &pxClientConfig, logger.Named("lightNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, lightNode.Start()) + + require.NoError(t, lightNode.Stop()) + require.NoError(t, pxServerNode.Stop()) + require.NoError(t, discV5Node.Stop()) + /* logger, err := zap.NewDevelopment() From 722234ed5b8796e333422c82beb644a5e5d5c7e4 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Oct 2024 15:36:22 +0300 Subject: [PATCH 04/18] advancing with the px test --- wakuv2/nwaku.go | 36 +++++++++++++++++++++++++++++++++++- wakuv2/nwaku_test.go | 27 +++++++++++++++++++++++---- 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 9a93af2cd41..d6349ba4074 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -216,6 +216,10 @@ package wakuv2 WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } + static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { + WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) ); + } + static void cGoWakuLightpushPublish(void* wakuCtx, const char* pubSubTopic, const char* jsonWakuMessage, @@ -2854,6 +2858,36 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) return 0, errors.New(errMsg) } +func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetPeerIdsFromPeerStore(self.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return peer.IDSlice{}, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, peer := range itemsPeerIds { + id, err := peermod.Decode(peer) + if err != nil { + errMsg := "GetPeerIdsFromPeerStore - error decoding peerId: " + err.Error() + return nil, errors.New(errMsg) + } + peers = append(peers, id) + } + + return peers, nil + } + errMsg := "error GetPeerIdsFromPeerStore: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) +} + func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { var resp = C.allocResp() var cProtocol = C.CString(protocol) @@ -2874,7 +2908,7 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { for _, p := range itemsPeerIds { id, err := peer.Decode(p) if err != nil { - errMsg := "GetPeerIdsByProtocol - error converting string to int: " + err.Error() + errMsg := "GetPeerIdsByProtocol - error decoding peerId: " + err.Error() return nil, errors.New(errMsg) } peers = append(peers, id) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 5b8d7c8629e..8ec3915caf0 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -353,6 +353,8 @@ func TestPeerExchange(t *testing.T) { require.NotNil(t, enr) ma, err := pxServerNode.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, ma) // start node that will be discovered by PeerExchange discV5NodeConfig := WakuConfig{ @@ -374,8 +376,7 @@ func TestPeerExchange(t *testing.T) { time.Sleep(1 * time.Second) - // start light node which use PeerExchange to discover peers - + // start light node which use PeerExchange to discover peers pxClientConfig := WakuConfig{ // Port: 30303, // NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", @@ -384,15 +385,33 @@ func TestPeerExchange(t *testing.T) { Discv5Discovery: false, ClusterID: 16, Shards: []uint16{64}, - PeerExchange: false, + PeerExchange: true, Discv5UdpPort: 9002, - PeerExchangeNode: "", // TODO: fill + PeerExchangeNode: ma[0].String(), } lightNode, err := New(nil, "", &pxClientConfig, logger.Named("lightNode"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, lightNode.Start()) + // Sanity check, not great, but it's probably helpful + options := func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 30 * time.Second + } + + err = tt.RetryWithBackOff(func() error { + peers, err := lightNode.GetPeerIdsFromPeerStore() + fmt.Println("------------ peers: ", peers) + if err != nil { + return err + } + if len(peers) == 2 { + return nil + } + return errors.New("no peers discovered") + }, options) + require.NoError(t, err) + require.NoError(t, lightNode.Stop()) require.NoError(t, pxServerNode.Stop()) require.NoError(t, discV5Node.Stop()) From 3889501c7f6f09b20853f6d4fb797fb682e8895f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Oct 2024 16:32:21 +0300 Subject: [PATCH 05/18] integrating waku_get_my_peerid --- wakuv2/nwaku.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index d6349ba4074..390681800d0 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -208,6 +208,10 @@ package wakuv2 WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) ); } + static void cGoWakuGetMyPeerId(void* ctx, void* resp) { + WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) ); + } + static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } @@ -2195,10 +2199,24 @@ func (w *Waku) Clean() error { return nil } -// TODO-nwaku -func (w *Waku) PeerID() peer.ID { - // return w.node.Host().ID() - return "" +func (w *Waku) PeerID() (peer.ID, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetMyPeerId(w.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + + peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + id, err := peermod.Decode(peerIdStr) + if err != nil { + errMsg := "WakuGetMyPeerId - error decoding peerId: " + err.Error() + return "", errors.New(errMsg) + } + return id, nil + } + errMsg := "error WakuGetMyPeerId: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return "", errors.New(errMsg) } // validatePrivateKey checks the format of the given private key. From b90db4c6049a44daa80eb138829a688184bc449b Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 18 Oct 2024 14:55:42 +0300 Subject: [PATCH 06/18] passing test --- wakuv2/nwaku.go | 1 + wakuv2/nwaku_test.go | 99 +++++++++++++++++++++++++++++--------------- 2 files changed, 66 insertions(+), 34 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 390681800d0..d8e48c0e879 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2310,6 +2310,7 @@ type WakuConfig struct { Shards []uint16 `json:"shards,omitempty"` PeerExchange bool `json:"peerExchange,omitempty"` PeerExchangeNode string `json:"peerExchangeNode,omitempty"` + TcpPort uint16 `json:"tcpPort,omitempty"` } type Waku struct { diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 8ec3915caf0..813c7efd0aa 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -329,57 +329,87 @@ func TestPeerExchange(t *testing.T) { logger, err := zap.NewDevelopment() require.NoError(t, err) - // start node which serve as PeerExchange server - pxServerConfig := WakuConfig{ - // Port: 30303, - // NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", - EnableRelay: false, + // start node that will be discovered by PeerExchange + discV5NodeConfig := WakuConfig{ + EnableRelay: true, LogLevel: "DEBUG", Discv5Discovery: true, ClusterID: 16, Shards: []uint16{64}, - PeerExchange: true, - Discv5UdpPort: 9000, + PeerExchange: false, + Discv5UdpPort: 9001, + TcpPort: 62000, } - pxServerNode, err := New(nil, "", &pxServerConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) + discV5Node, err := New(nil, "", &discV5NodeConfig, logger.Named("discV5Node"), nil, nil, nil, nil) require.NoError(t, err) - require.NoError(t, pxServerNode.Start()) + require.NoError(t, discV5Node.Start()) + + fmt.Println("--------- GABRIEL started discV5Node") time.Sleep(1 * time.Second) - enr, err := pxServerNode.ENR() + discV5NodePeerId, err := discV5Node.PeerID() require.NoError(t, err) - require.NotNil(t, enr) - ma, err := pxServerNode.ListenAddresses() + discv5NodeEnr, err := discV5Node.ENR() require.NoError(t, err) - require.NotNil(t, ma) - - // start node that will be discovered by PeerExchange - discV5NodeConfig := WakuConfig{ - // Port: 30303, - // NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", - EnableRelay: false, + + + // start node which serves as PeerExchange server + pxServerConfig := WakuConfig{ + EnableRelay: true, LogLevel: "DEBUG", Discv5Discovery: true, ClusterID: 16, Shards: []uint16{64}, - PeerExchange: false, - Discv5BootstrapNodes: []string{enr.String()}, - Discv5UdpPort: 9001, + PeerExchange: true, + Discv5UdpPort: 9000, + Discv5BootstrapNodes: []string{discv5NodeEnr.String()}, + TcpPort: 61000, } - discV5Node, err := New(nil, "", &discV5NodeConfig, logger.Named("discV5Node"), nil, nil, nil, nil) + pxServerNode, err := New(nil, "", &pxServerConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, pxServerNode.Start()) + + time.Sleep(10 * time.Second) + + serverNodeMa, err := pxServerNode.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, serverNodeMa) + + // Sanity check, not great, but it's probably helpful + options := func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 30 * time.Second + } + + discv5Peers, err := discV5Node.GetPeerIdsFromPeerStore() + require.NoError(t, err) + fmt.Println("------ GABRIEL discV5Node peers: ", discv5Peers) + + // Check that pxServerNode has discV5Node in its Peer Store + err = tt.RetryWithBackOff(func() error { + peers, err := pxServerNode.GetPeerIdsFromPeerStore() + + fmt.Println("------ GABRIEL pxServerNode peers: ", peers) + + if err != nil { + return err + } + + if slices.Contains(peers, discV5NodePeerId){ + return nil + } + + return errors.New("pxServer is missing the discv5 node in its peer store") + }, options) require.NoError(t, err) - require.NoError(t, discV5Node.Start()) time.Sleep(1 * time.Second) // start light node which use PeerExchange to discover peers pxClientConfig := WakuConfig{ - // Port: 30303, - // NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", EnableRelay: false, LogLevel: "DEBUG", Discv5Discovery: false, @@ -387,28 +417,29 @@ func TestPeerExchange(t *testing.T) { Shards: []uint16{64}, PeerExchange: true, Discv5UdpPort: 9002, - PeerExchangeNode: ma[0].String(), + TcpPort: 62000, + PeerExchangeNode: serverNodeMa[0].String(), } lightNode, err := New(nil, "", &pxClientConfig, logger.Named("lightNode"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, lightNode.Start()) - // Sanity check, not great, but it's probably helpful - options := func(b *backoff.ExponentialBackOff) { - b.MaxElapsedTime = 30 * time.Second - } + pxServerPeerId, err := pxServerNode.PeerID() + require.NoError(t, err) + err = tt.RetryWithBackOff(func() error { peers, err := lightNode.GetPeerIdsFromPeerStore() - fmt.Println("------------ peers: ", peers) + fmt.Println("------ GABRIEL lightnode peers: ", peers) if err != nil { return err } - if len(peers) == 2 { + + if slices.Contains(peers, discV5NodePeerId) && slices.Contains(peers, pxServerPeerId) { return nil } - return errors.New("no peers discovered") + return errors.New("lightnode is missing peers") }, options) require.NoError(t, err) From 35823dc701d01c0d1fd2751f4ae0d816f1f5ed3d Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 21 Oct 2024 11:03:37 +0300 Subject: [PATCH 07/18] small refactor and logs --- wakuv2/nwaku_test.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 813c7efd0aa..9072611d682 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -338,7 +338,7 @@ func TestPeerExchange(t *testing.T) { Shards: []uint16{64}, PeerExchange: false, Discv5UdpPort: 9001, - TcpPort: 62000, + TcpPort: 60001, } discV5Node, err := New(nil, "", &discV5NodeConfig, logger.Named("discV5Node"), nil, nil, nil, nil) @@ -366,28 +366,30 @@ func TestPeerExchange(t *testing.T) { PeerExchange: true, Discv5UdpPort: 9000, Discv5BootstrapNodes: []string{discv5NodeEnr.String()}, - TcpPort: 61000, + TcpPort: 60002, } pxServerNode, err := New(nil, "", &pxServerConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, pxServerNode.Start()) + fmt.Println("------ GABRIEL started big sleep") time.Sleep(10 * time.Second) + fmt.Println("------ GABRIEL finished big sleep") serverNodeMa, err := pxServerNode.ListenAddresses() require.NoError(t, err) require.NotNil(t, serverNodeMa) - // Sanity check, not great, but it's probably helpful - options := func(b *backoff.ExponentialBackOff) { - b.MaxElapsedTime = 30 * time.Second - } - discv5Peers, err := discV5Node.GetPeerIdsFromPeerStore() require.NoError(t, err) fmt.Println("------ GABRIEL discV5Node peers: ", discv5Peers) + // Sanity check, not great, but it's probably helpful + options := func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 30 * time.Second + } + // Check that pxServerNode has discV5Node in its Peer Store err = tt.RetryWithBackOff(func() error { peers, err := pxServerNode.GetPeerIdsFromPeerStore() @@ -406,8 +408,6 @@ func TestPeerExchange(t *testing.T) { }, options) require.NoError(t, err) - time.Sleep(1 * time.Second) - // start light node which use PeerExchange to discover peers pxClientConfig := WakuConfig{ EnableRelay: false, @@ -417,7 +417,7 @@ func TestPeerExchange(t *testing.T) { Shards: []uint16{64}, PeerExchange: true, Discv5UdpPort: 9002, - TcpPort: 62000, + TcpPort: 60003, PeerExchangeNode: serverNodeMa[0].String(), } @@ -425,6 +425,7 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, err) require.NoError(t, lightNode.Start()) + time.Sleep(1 * time.Second) pxServerPeerId, err := pxServerNode.PeerID() require.NoError(t, err) From abaff4dc41e3c0ec45a6d879a9f6d7b348dec30a Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 23 Oct 2024 14:58:07 +0300 Subject: [PATCH 08/18] cleanup --- wakuv2/nwaku.go | 12 ++++++++---- wakuv2/nwaku_test.go | 38 ++++++++++++++++++++------------------ 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index d8e48c0e879..cae02fee9cf 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2733,18 +2733,22 @@ func wakuStoreQuery( return "", errors.New(errMsg) } -func (self *Waku) WakuPeerExchangeRequest(numPeers uint64) (string, error) { +func (self *Waku) WakuPeerExchangeRequest(numPeers uint64) (uint64, error) { var resp = C.allocResp() defer C.freeResp(resp) C.cGoWakuPeerExchangeQuery(self.wakuCtx, C.uint64_t(numPeers), resp) if C.getRet(resp) == C.RET_OK { - msg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return msg, nil + numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) + if err != nil { + return 0, err + } + return numRecvPeers, nil } errMsg := "error WakuPeerExchangeRequest: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", errors.New(errMsg) + return 0, errors.New(errMsg) } func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error { diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 9072611d682..98448a8f11c 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -196,7 +196,6 @@ func TestBasicWakuV2(t *testing.T) { // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - numConnected, err := w.GetNumConnectedPeers() if err != nil { return err @@ -338,15 +337,13 @@ func TestPeerExchange(t *testing.T) { Shards: []uint16{64}, PeerExchange: false, Discv5UdpPort: 9001, - TcpPort: 60001, + TcpPort: 60010, } discV5Node, err := New(nil, "", &discV5NodeConfig, logger.Named("discV5Node"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, discV5Node.Start()) - fmt.Println("--------- GABRIEL started discV5Node") - time.Sleep(1 * time.Second) discV5NodePeerId, err := discV5Node.PeerID() @@ -355,7 +352,6 @@ func TestPeerExchange(t *testing.T) { discv5NodeEnr, err := discV5Node.ENR() require.NoError(t, err) - // start node which serves as PeerExchange server pxServerConfig := WakuConfig{ EnableRelay: true, @@ -366,25 +362,19 @@ func TestPeerExchange(t *testing.T) { PeerExchange: true, Discv5UdpPort: 9000, Discv5BootstrapNodes: []string{discv5NodeEnr.String()}, - TcpPort: 60002, + TcpPort: 60011, } pxServerNode, err := New(nil, "", &pxServerConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, pxServerNode.Start()) - fmt.Println("------ GABRIEL started big sleep") - time.Sleep(10 * time.Second) - fmt.Println("------ GABRIEL finished big sleep") + time.Sleep(1 * time.Second) serverNodeMa, err := pxServerNode.ListenAddresses() require.NoError(t, err) require.NotNil(t, serverNodeMa) - discv5Peers, err := discV5Node.GetPeerIdsFromPeerStore() - require.NoError(t, err) - fmt.Println("------ GABRIEL discV5Node peers: ", discv5Peers) - // Sanity check, not great, but it's probably helpful options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 30 * time.Second @@ -393,9 +383,7 @@ func TestPeerExchange(t *testing.T) { // Check that pxServerNode has discV5Node in its Peer Store err = tt.RetryWithBackOff(func() error { peers, err := pxServerNode.GetPeerIdsFromPeerStore() - - fmt.Println("------ GABRIEL pxServerNode peers: ", peers) - + if err != nil { return err } @@ -417,7 +405,7 @@ func TestPeerExchange(t *testing.T) { Shards: []uint16{64}, PeerExchange: true, Discv5UdpPort: 9002, - TcpPort: 60003, + TcpPort: 60012, PeerExchangeNode: serverNodeMa[0].String(), } @@ -432,7 +420,6 @@ func TestPeerExchange(t *testing.T) { err = tt.RetryWithBackOff(func() error { peers, err := lightNode.GetPeerIdsFromPeerStore() - fmt.Println("------ GABRIEL lightnode peers: ", peers) if err != nil { return err } @@ -444,6 +431,21 @@ func TestPeerExchange(t *testing.T) { }, options) require.NoError(t, err) + // Now perform the PX request manually to see if it also works + err = tt.RetryWithBackOff(func() error { + numPeersReceived, err := lightNode.WakuPeerExchangeRequest(1) + if err != nil { + return err + } + + if numPeersReceived == 1 { + return nil + } + return errors.New("Peer Exchange is not returning peers") + }, options) + require.NoError(t, err) + + require.NoError(t, lightNode.Stop()) require.NoError(t, pxServerNode.Stop()) require.NoError(t, discV5Node.Stop()) From 60ed207d2e5262c6d1910584e000e327e5f0bc34 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 23 Oct 2024 15:35:55 +0300 Subject: [PATCH 09/18] updating nwaku --- third_party/nwaku | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/third_party/nwaku b/third_party/nwaku index c5a825e206c..80c7581ab15 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit c5a825e206c1cb3e6e0cf8c01410527804cb76c4 +Subproject commit 80c7581ab15e048c24b984dd91b6867e277a0223 From 85255dfa3911462ad67d52bd6d7b3a1ac777ee3b Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 23 Oct 2024 19:03:01 +0300 Subject: [PATCH 10/18] resolve conflicts --- wakuv2/nwaku.go | 142 ++++++++---------------------------------------- 1 file changed, 23 insertions(+), 119 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index cae02fee9cf..f35df8fc880 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -377,19 +377,23 @@ type WakuPubsubTopic = string type WakuContentTopic = string type WakuConfig struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - NodeKey string `json:"key,omitempty"` - EnableRelay bool `json:"relay"` - LogLevel string `json:"logLevel"` - DnsDiscovery bool `json:"dnsDiscovery,omitempty"` - DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` - MaxMessageSize string `json:"maxMessageSize,omitempty"` + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + NodeKey string `json:"key,omitempty"` + EnableRelay bool `json:"relay"` + LogLevel string `json:"logLevel"` + DnsDiscovery bool `json:"dnsDiscovery,omitempty"` + DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` + MaxMessageSize string `json:"maxMessageSize,omitempty"` Staticnodes []string `json:"staticnodes,omitempty"` Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` - Discv5Discovery bool `json:"discv5Discovery,omitempty"` - ClusterID uint16 `json:"clusterId,omitempty"` - Shards []uint16 `json:"shards,omitempty"` + Discv5Discovery bool `json:"discv5Discovery,omitempty"` + Discv5UdpPort uint16 `json:"discv5UdpPort,omitempty"` + ClusterID uint16 `json:"clusterId,omitempty"` + Shards []uint16 `json:"shards,omitempty"` + PeerExchange bool `json:"peerExchange,omitempty"` + PeerExchangeNode string `json:"peerExchangeNode,omitempty"` + TcpPort uint16 `json:"tcpPort,omitempty"` } // Waku represents a dark communication interface through the Ethereum @@ -505,9 +509,11 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon return nil, err } - err = node.WakuRelaySubscribe(defaultPubsubTopic) - if err != nil { - return nil, err + if cfg.EnableRelay { + err = node.WakuRelaySubscribe(defaultPubsubTopic) + if err != nil { + return nil, err + } } node.WakuSetEventCallback() @@ -2207,7 +2213,7 @@ func (w *Waku) PeerID() (peer.ID, error) { if C.getRet(resp) == C.RET_OK { peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - id, err := peermod.Decode(peerIdStr) + id, err := peer.Decode(peerIdStr) if err != nil { errMsg := "WakuGetMyPeerId - error decoding peerId: " + err.Error() return "", errors.New(errMsg) @@ -2289,108 +2295,6 @@ func (w *Waku) LegacyStoreNode() legacy_store.Store { return nil } -type WakuMessageHash = string -type WakuPubsubTopic = string -type WakuContentTopic = string - -type WakuConfig struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - NodeKey string `json:"key,omitempty"` - EnableRelay bool `json:"relay"` - LogLevel string `json:"logLevel"` - DnsDiscovery bool `json:"dnsDiscovery,omitempty"` - DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` - MaxMessageSize string `json:"maxMessageSize,omitempty"` - Staticnodes []string `json:"staticnodes,omitempty"` - Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` - Discv5Discovery bool `json:"discv5Discovery,omitempty"` - Discv5UdpPort uint16 `json:"discv5UdpPort,omitempty"` - ClusterID uint16 `json:"clusterId,omitempty"` - Shards []uint16 `json:"shards,omitempty"` - PeerExchange bool `json:"peerExchange,omitempty"` - PeerExchangeNode string `json:"peerExchangeNode,omitempty"` - TcpPort uint16 `json:"tcpPort,omitempty"` -} - -type Waku struct { - wakuCtx unsafe.Pointer - - appDB *sql.DB - - dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery - dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map - - // Filter-related - filters *common.Filters // Message filters installed with Subscribe function - filterManager *filterapi.FilterManager - - privateKeys map[string]*ecdsa.PrivateKey // Private key storage - symKeys map[string][]byte // Symmetric key storage - keyMu sync.RWMutex // Mutex associated with key stores - - envelopeCache *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] // Pool of envelopes currently tracked by this node - poolMu sync.RWMutex // Mutex to sync the message and expiration pools - - bandwidthCounter *metrics.BandwidthCounter - - protectedTopicStore *persistence.ProtectedTopicsStore - - sendQueue *publish.MessageQueue - limiter *publish.PublishRateLimiter - - missingMsgVerifier *missing.MissingMessageVerifier - - msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded - - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - - cfg *WakuConfig - options []node.WakuNodeOption - - envelopeFeed event.Feed - - storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids - storeMsgIDsMu sync.RWMutex - - messageSentCheck *publish.MessageSentCheck - - topicHealthStatusChan chan peermanager.TopicHealthStatus - connectionNotifChan chan node.PeerConnection - connStatusSubscriptions map[string]*types.ConnStatusSubscription - connStatusMu sync.Mutex - onlineChecker *onlinechecker.DefaultOnlineChecker - state connection.State - - logger *zap.Logger - - // NTP Synced timesource - timesource *timesource.NTPTimeSource - - // seededBootnodesForDiscV5 indicates whether we manage to retrieve discovery - // bootnodes successfully - seededBootnodesForDiscV5 bool - - // goingOnline is channel that notifies when connectivity has changed from offline to online - goingOnline chan struct{} - - // discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery - discV5BootstrapNodes []string - - onHistoricMessagesRequestFailed func([]byte, peer.ID, error) - onPeerStats func(types.ConnStatus) - - // statusTelemetryClient ITelemetryClient - - defaultShardInfo protocol.RelayShards -} - -func (w *Waku) Stop() error { - return w.WakuStop() -} - func WakuSetup() { C.waku_setup() } @@ -2895,8 +2799,8 @@ func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) { itemsPeerIds := strings.Split(peersStr, ",") var peers peer.IDSlice - for _, peer := range itemsPeerIds { - id, err := peermod.Decode(peer) + for _, peerId := range itemsPeerIds { + id, err := peer.Decode(peerId) if err != nil { errMsg := "GetPeerIdsFromPeerStore - error decoding peerId: " + err.Error() return nil, errors.New(errMsg) From 766d92530e16e42ef6f08d0cc8eb7f3dc73f1f2c Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 23 Oct 2024 19:09:10 +0300 Subject: [PATCH 11/18] comments --- wakuv2/nwaku_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 98448a8f11c..8472595628c 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -324,7 +324,6 @@ func makeTestTree(domain string, nodes []*enode.Node, links []string) (*ethdnsdi } func TestPeerExchange(t *testing.T) { - logger, err := zap.NewDevelopment() require.NoError(t, err) @@ -396,7 +395,7 @@ func TestPeerExchange(t *testing.T) { }, options) require.NoError(t, err) - // start light node which use PeerExchange to discover peers + // start light node which uses PeerExchange to discover peers pxClientConfig := WakuConfig{ EnableRelay: false, LogLevel: "DEBUG", @@ -418,6 +417,7 @@ func TestPeerExchange(t *testing.T) { pxServerPeerId, err := pxServerNode.PeerID() require.NoError(t, err) + // Check that the light node discovered the discV5Node and has both nodes in its peer store err = tt.RetryWithBackOff(func() error { peers, err := lightNode.GetPeerIdsFromPeerStore() if err != nil { @@ -446,6 +446,7 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, err) + // Stop nodes require.NoError(t, lightNode.Stop()) require.NoError(t, pxServerNode.Stop()) require.NoError(t, discV5Node.Stop()) From 6543579a3baca93cd3f579accb95cf2df1c40c98 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 24 Oct 2024 16:57:46 +0300 Subject: [PATCH 12/18] implementing feedback --- wakuv2/nwaku.go | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index f35df8fc880..27935f36d06 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2215,14 +2215,13 @@ func (w *Waku) PeerID() (peer.ID, error) { peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) id, err := peer.Decode(peerIdStr) if err != nil { - errMsg := "WakuGetMyPeerId - error decoding peerId: " + err.Error() - return "", errors.New(errMsg) + errMsg := "WakuGetMyPeerId - decoding peerId: %w" + return "", fmt.Errorf(errMsg, err) } return id, nil } - errMsg := "error WakuGetMyPeerId: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return "", fmt.Errorf("WakuGetMyPeerId: %s", errMsg) } // validatePrivateKey checks the format of the given private key. @@ -2650,9 +2649,8 @@ func (self *Waku) WakuPeerExchangeRequest(numPeers uint64) (uint64, error) { } return numRecvPeers, nil } - errMsg := "error WakuPeerExchangeRequest: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return 0, errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return 0, fmt.Errorf("WakuPeerExchangeRequest: %w", errMsg) } func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error { @@ -2802,17 +2800,15 @@ func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) { for _, peerId := range itemsPeerIds { id, err := peer.Decode(peerId) if err != nil { - errMsg := "GetPeerIdsFromPeerStore - error decoding peerId: " + err.Error() - return nil, errors.New(errMsg) + return nil, fmt.Errorf("GetPeerIdsFromPeerStore - decoding peerId: %w", err) } peers = append(peers, id) } return peers, nil } - errMsg := "error GetPeerIdsFromPeerStore: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %w", errMsg) } func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { @@ -2835,8 +2831,7 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { for _, p := range itemsPeerIds { id, err := peer.Decode(p) if err != nil { - errMsg := "GetPeerIdsByProtocol - error decoding peerId: " + err.Error() - return nil, errors.New(errMsg) + return nil, fmt.Errorf("GetPeerIdsByProtocol - decoding peerId: %w", err) } peers = append(peers, id) } From 6338e9237f48fdeded12205cfa207abf7db9a50e Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 24 Oct 2024 16:58:43 +0300 Subject: [PATCH 13/18] updating nwaku --- third_party/nwaku | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/third_party/nwaku b/third_party/nwaku index 80c7581ab15..de11e576f4b 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit 80c7581ab15e048c24b984dd91b6867e277a0223 +Subproject commit de11e576f4b69b63b4135cfb9549ef15cdc1ad34 From 9f1f43e6203b681ee19b526d5b10dfc6c0f9c83d Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 24 Oct 2024 17:01:57 +0300 Subject: [PATCH 14/18] fix compilation error --- wakuv2/nwaku.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 27935f36d06..64a9658e946 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2650,7 +2650,7 @@ func (self *Waku) WakuPeerExchangeRequest(numPeers uint64) (uint64, error) { return numRecvPeers, nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return 0, fmt.Errorf("WakuPeerExchangeRequest: %w", errMsg) + return 0, fmt.Errorf("WakuPeerExchangeRequest: %s", errMsg) } func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error { @@ -2808,7 +2808,7 @@ func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) { return peers, nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %w", errMsg) + return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) } func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { From eb54ce8d39f0ad5e0f41a5cdcd9621ecabb4868d Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 24 Oct 2024 17:24:21 +0300 Subject: [PATCH 15/18] fixing flaky test --- wakuv2/nwaku_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 8472595628c..6e727e26fd7 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -368,7 +368,8 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, err) require.NoError(t, pxServerNode.Start()) - time.Sleep(1 * time.Second) + // Adding an extra second to make sure PX cache is not empty + time.Sleep(2 * time.Second) serverNodeMa, err := pxServerNode.ListenAddresses() require.NoError(t, err) From 2b5b1f3442331c8ad7fa8c8f7fb3914a685f2df7 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 24 Oct 2024 19:07:26 +0300 Subject: [PATCH 16/18] fixing conflicts --- wakuv2/nwaku.go | 155 +------------------------------------------ wakuv2/nwaku_test.go | 27 ++++++-- 2 files changed, 22 insertions(+), 160 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 64a9658e946..5d055480659 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -509,7 +509,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon return nil, err } - if cfg.EnableRelay { + if nwakuCfg.EnableRelay { err = node.WakuRelaySubscribe(defaultPubsubTopic) if err != nil { return nil, err @@ -3233,157 +3233,4 @@ func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue { return r.storeResponse.GetMessages() -// New creates a WakuV2 client ready to communicate through the LibP2P network. -func New(nodeKey *ecdsa.PrivateKey, - fleet string, - cfg *WakuConfig, - logger *zap.Logger, - appDB *sql.DB, - ts *timesource.NTPTimeSource, - onHistoricMessagesRequestFailed func([]byte, peer.ID, error), - onPeerStats func(types.ConnStatus)) (*Waku, error) { - - // Lock the main goroutine to its current OS thread - runtime.LockOSThread() - - WakuSetup() // This should only be called once in the whole app's life - - node, err := wakuNew(nodeKey, - fleet, - cfg, logger, appDB, ts, onHistoricMessagesRequestFailed, - onPeerStats) - if err != nil { - return nil, err - } - - defaultPubsubTopic, err := node.WakuDefaultPubsubTopic() - if err != nil { - fmt.Println("Error happened:", err.Error()) - } - - if cfg.EnableRelay { - err = node.WakuRelaySubscribe(defaultPubsubTopic) - if err != nil { - fmt.Println("Error happened:", err.Error()) - } - } - - node.WakuSetEventCallback() - - return node, nil - - // if !cfg.UseThrottledPublish || testing.Testing() { - // // To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter, - // // basically disabling the rate limit functionality - // waku.limiter = publish.NewPublishRateLimiter(rate.Inf, 1) - - // } else { - // waku.limiter = publish.NewPublishRateLimiter(publishingLimiterRate, publishingLimitBurst) - // } - - // waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) - // waku.bandwidthCounter = metrics.NewBandwidthCounter() - - // if nodeKey == nil { - // // No nodekey is provided, create an ephemeral key - // nodeKey, err = crypto.GenerateKey() - // if err != nil { - // return nil, fmt.Errorf("failed to generate a random go-waku private key: %v", err) - // } - // } - - // hostAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprint(cfg.Host, ":", cfg.Port)) - // if err != nil { - // return nil, fmt.Errorf("failed to setup the network interface: %v", err) - // } - - // libp2pOpts := node.DefaultLibP2POptions - // libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(waku.bandwidthCounter)) - // libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) - - // opts := []node.WakuNodeOption{ - // node.WithLibP2POptions(libp2pOpts...), - // node.WithPrivateKey(nodeKey), - // node.WithHostAddress(hostAddr), - // node.WithConnectionNotification(waku.connectionNotifChan), - // node.WithTopicHealthStatusChannel(waku.topicHealthStatusChan), - // node.WithKeepAlive(randomPeersKeepAliveInterval, allPeersKeepAliveInterval), - // node.WithLogger(logger), - // node.WithLogLevel(logger.Level()), - // node.WithClusterID(cfg.ClusterID), - // node.WithMaxMsgSize(1024 * 1024), - // } - - // if cfg.EnableDiscV5 { - // bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes) - // if err != nil { - // logger.Error("failed to get bootstrap nodes", zap.Error(err)) - // return nil, err - // } - // opts = append(opts, node.WithDiscoveryV5(uint(cfg.UDPPort), bootnodes, cfg.AutoUpdate)) - // } - // shards, err := protocol.TopicsToRelayShards(cfg.DefaultShardPubsubTopic) - // if err != nil { - // logger.Error("FATAL ERROR: failed to parse relay shards", zap.Error(err)) - // return nil, errors.New("failed to parse relay shard, invalid pubsubTopic configuration") - // } - // if len(shards) == 0 { //Hack so that tests don't fail. TODO: Need to remove this once tests are changed to use proper cluster and shard. - // shardInfo := protocol.RelayShards{ClusterID: 0, ShardIDs: []uint16{0}} - // shards = append(shards, shardInfo) - // } - // waku.defaultShardInfo = shards[0] - // if cfg.LightClient { - // opts = append(opts, node.WithWakuFilterLightNode()) - // waku.defaultShardInfo = shards[0] - // opts = append(opts, node.WithMaxPeerConnections(cfg.DiscoveryLimit)) - // cfg.EnableStoreConfirmationForMessagesSent = false - // //TODO: temporary work-around to improve lightClient connectivity, need to be removed once community sharding is implemented - // opts = append(opts, node.WithPubSubTopics(cfg.DefaultShardedPubsubTopics)) - // } else { - // relayOpts := []pubsub.Option{ - // pubsub.WithMaxMessageSize(int(waku.cfg.MaxMessageSize)), - // } - - // if waku.logger.Level() == zap.DebugLevel { - // relayOpts = append(relayOpts, pubsub.WithEventTracer(waku)) - // } - - // opts = append(opts, node.WithWakuRelayAndMinPeers(waku.cfg.MinPeersForRelay, relayOpts...)) - // opts = append(opts, node.WithMaxPeerConnections(maxRelayPeers)) - // cfg.EnablePeerExchangeClient = true //Enabling this until discv5 issues are resolved. This will enable more peers to be connected for relay mesh. - // cfg.EnableStoreConfirmationForMessagesSent = true - // } - - // if cfg.EnableStore { - // if appDB == nil { - // return nil, errors.New("appDB is required for store") - // } - // opts = append(opts, node.WithWakuStore()) - // dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(appDB), persistence.WithRetentionPolicy(cfg.StoreCapacity, time.Duration(cfg.StoreSeconds)*time.Second)) - // if err != nil { - // return nil, err - // } - // opts = append(opts, node.WithMessageProvider(dbStore)) - // } - - // if !cfg.LightClient { - // opts = append(opts, node.WithWakuFilterFullNode(filter.WithMaxSubscribers(20))) - // opts = append(opts, node.WithLightPush(lightpush.WithRateLimiter(1, 1))) - // } - - // if appDB != nil { - // waku.protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB) - // if err != nil { - // return nil, err - // } - // } - - // if cfg.EnablePeerExchangeServer { - // opts = append(opts, node.WithPeerExchange(peer_exchange.WithRateLimiter(1, 1))) - // } - - // waku.options = opts - // waku.logger.Info("setup the go-waku node successfully") - - // return waku, nil } diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 6e727e26fd7..537a517936d 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -327,8 +327,13 @@ func TestPeerExchange(t *testing.T) { logger, err := zap.NewDevelopment() require.NoError(t, err) + discV5NodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + // start node that will be discovered by PeerExchange - discV5NodeConfig := WakuConfig{ + discV5NodeWakuConfig := WakuConfig{ EnableRelay: true, LogLevel: "DEBUG", Discv5Discovery: true, @@ -339,7 +344,7 @@ func TestPeerExchange(t *testing.T) { TcpPort: 60010, } - discV5Node, err := New(nil, "", &discV5NodeConfig, logger.Named("discV5Node"), nil, nil, nil, nil) + discV5Node, err := New(nil, "", &discV5NodeConfig, &discV5NodeWakuConfig, logger.Named("discV5Node"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, discV5Node.Start()) @@ -351,8 +356,13 @@ func TestPeerExchange(t *testing.T) { discv5NodeEnr, err := discV5Node.ENR() require.NoError(t, err) + pxServerConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + // start node which serves as PeerExchange server - pxServerConfig := WakuConfig{ + pxServerWakuConfig := WakuConfig{ EnableRelay: true, LogLevel: "DEBUG", Discv5Discovery: true, @@ -364,7 +374,7 @@ func TestPeerExchange(t *testing.T) { TcpPort: 60011, } - pxServerNode, err := New(nil, "", &pxServerConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) + pxServerNode, err := New(nil, "", &pxServerConfig, &pxServerWakuConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, pxServerNode.Start()) @@ -396,8 +406,13 @@ func TestPeerExchange(t *testing.T) { }, options) require.NoError(t, err) + pxClientConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + // start light node which uses PeerExchange to discover peers - pxClientConfig := WakuConfig{ + pxClientWakuConfig := WakuConfig{ EnableRelay: false, LogLevel: "DEBUG", Discv5Discovery: false, @@ -409,7 +424,7 @@ func TestPeerExchange(t *testing.T) { PeerExchangeNode: serverNodeMa[0].String(), } - lightNode, err := New(nil, "", &pxClientConfig, logger.Named("lightNode"), nil, nil, nil, nil) + lightNode, err := New(nil, "", &pxClientConfig, &pxClientWakuConfig, logger.Named("lightNode"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, lightNode.Start()) From 280be14e054e6326d7048efdfe24297227564ae8 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 24 Oct 2024 19:24:58 +0300 Subject: [PATCH 17/18] fixed linting --- wakuv2/nwaku.go | 38 ++++++++++++------------ wakuv2/nwaku_test.go | 71 +++++++++++++++++++++----------------------- 2 files changed, 53 insertions(+), 56 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 5d055480659..bcb32460a4c 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -377,23 +377,23 @@ type WakuPubsubTopic = string type WakuContentTopic = string type WakuConfig struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - NodeKey string `json:"key,omitempty"` - EnableRelay bool `json:"relay"` - LogLevel string `json:"logLevel"` - DnsDiscovery bool `json:"dnsDiscovery,omitempty"` - DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` - MaxMessageSize string `json:"maxMessageSize,omitempty"` + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + NodeKey string `json:"key,omitempty"` + EnableRelay bool `json:"relay"` + LogLevel string `json:"logLevel"` + DnsDiscovery bool `json:"dnsDiscovery,omitempty"` + DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` + MaxMessageSize string `json:"maxMessageSize,omitempty"` Staticnodes []string `json:"staticnodes,omitempty"` Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` - Discv5Discovery bool `json:"discv5Discovery,omitempty"` - Discv5UdpPort uint16 `json:"discv5UdpPort,omitempty"` - ClusterID uint16 `json:"clusterId,omitempty"` - Shards []uint16 `json:"shards,omitempty"` - PeerExchange bool `json:"peerExchange,omitempty"` - PeerExchangeNode string `json:"peerExchangeNode,omitempty"` - TcpPort uint16 `json:"tcpPort,omitempty"` + Discv5Discovery bool `json:"discv5Discovery,omitempty"` + Discv5UdpPort uint16 `json:"discv5UdpPort,omitempty"` + ClusterID uint16 `json:"clusterId,omitempty"` + Shards []uint16 `json:"shards,omitempty"` + PeerExchange bool `json:"peerExchange,omitempty"` + PeerExchangeNode string `json:"peerExchangeNode,omitempty"` + TcpPort uint16 `json:"tcpPort,omitempty"` } // Waku represents a dark communication interface through the Ethereum @@ -2211,17 +2211,17 @@ func (w *Waku) PeerID() (peer.ID, error) { C.cGoWakuGetMyPeerId(w.wakuCtx, resp) if C.getRet(resp) == C.RET_OK { - - peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + + peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) id, err := peer.Decode(peerIdStr) if err != nil { errMsg := "WakuGetMyPeerId - decoding peerId: %w" - return "", fmt.Errorf(errMsg, err) + return "", fmt.Errorf(errMsg, err) } return id, nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", fmt.Errorf("WakuGetMyPeerId: %s", errMsg) + return "", fmt.Errorf("WakuGetMyPeerId: %s", errMsg) } // validatePrivateKey checks the format of the given private key. diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 537a517936d..92e3be54d16 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -326,22 +326,22 @@ func makeTestTree(domain string, nodes []*enode.Node, links []string) (*ethdnsdi func TestPeerExchange(t *testing.T) { logger, err := zap.NewDevelopment() require.NoError(t, err) - + discV5NodeConfig := Config{ UseThrottledPublish: true, ClusterID: 16, } - + // start node that will be discovered by PeerExchange discV5NodeWakuConfig := WakuConfig{ - EnableRelay: true, - LogLevel: "DEBUG", + EnableRelay: true, + LogLevel: "DEBUG", Discv5Discovery: true, - ClusterID: 16, - Shards: []uint16{64}, - PeerExchange: false, - Discv5UdpPort: 9001, - TcpPort: 60010, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: false, + Discv5UdpPort: 9001, + TcpPort: 60010, } discV5Node, err := New(nil, "", &discV5NodeConfig, &discV5NodeWakuConfig, logger.Named("discV5Node"), nil, nil, nil, nil) @@ -355,7 +355,7 @@ func TestPeerExchange(t *testing.T) { discv5NodeEnr, err := discV5Node.ENR() require.NoError(t, err) - + pxServerConfig := Config{ UseThrottledPublish: true, ClusterID: 16, @@ -363,15 +363,15 @@ func TestPeerExchange(t *testing.T) { // start node which serves as PeerExchange server pxServerWakuConfig := WakuConfig{ - EnableRelay: true, - LogLevel: "DEBUG", - Discv5Discovery: true, - ClusterID: 16, - Shards: []uint16{64}, - PeerExchange: true, - Discv5UdpPort: 9000, + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: true, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: true, + Discv5UdpPort: 9000, Discv5BootstrapNodes: []string{discv5NodeEnr.String()}, - TcpPort: 60011, + TcpPort: 60011, } pxServerNode, err := New(nil, "", &pxServerConfig, &pxServerWakuConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) @@ -389,19 +389,19 @@ func TestPeerExchange(t *testing.T) { options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 30 * time.Second } - + // Check that pxServerNode has discV5Node in its Peer Store err = tt.RetryWithBackOff(func() error { peers, err := pxServerNode.GetPeerIdsFromPeerStore() - + if err != nil { return err } - - if slices.Contains(peers, discV5NodePeerId){ + + if slices.Contains(peers, discV5NodePeerId) { return nil } - + return errors.New("pxServer is missing the discv5 node in its peer store") }, options) require.NoError(t, err) @@ -410,17 +410,17 @@ func TestPeerExchange(t *testing.T) { UseThrottledPublish: true, ClusterID: 16, } - - // start light node which uses PeerExchange to discover peers + + // start light node which uses PeerExchange to discover peers pxClientWakuConfig := WakuConfig{ - EnableRelay: false, - LogLevel: "DEBUG", - Discv5Discovery: false, - ClusterID: 16, - Shards: []uint16{64}, - PeerExchange: true, - Discv5UdpPort: 9002, - TcpPort: 60012, + EnableRelay: false, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: true, + Discv5UdpPort: 9002, + TcpPort: 60012, PeerExchangeNode: serverNodeMa[0].String(), } @@ -432,7 +432,7 @@ func TestPeerExchange(t *testing.T) { pxServerPeerId, err := pxServerNode.PeerID() require.NoError(t, err) - + // Check that the light node discovered the discV5Node and has both nodes in its peer store err = tt.RetryWithBackOff(func() error { peers, err := lightNode.GetPeerIdsFromPeerStore() @@ -461,14 +461,11 @@ func TestPeerExchange(t *testing.T) { }, options) require.NoError(t, err) - // Stop nodes require.NoError(t, lightNode.Stop()) require.NoError(t, pxServerNode.Stop()) require.NoError(t, discV5Node.Stop()) - - /* logger, err := zap.NewDevelopment() require.NoError(t, err) // start node which serve as PeerExchange server From f4c45022c8190220b631f3d269f254910e02ae8a Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 25 Oct 2024 10:58:23 +0300 Subject: [PATCH 18/18] fixing test instructions --- wakuv2/nwaku_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 92e3be54d16..417348e029c 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -156,7 +156,7 @@ func parseNodes(rec []string) []*enode.Node { // IP_ADDRESS=$(hostname -I | awk '{print $1}'); // docker run \ // -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \ -// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \ +// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG --shard=64 --tcp-port=61000 \ // --nat=extip:${IP_ADDRESS} --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ func TestBasicWakuV2(t *testing.T) {