From 3d8f1946702c0e20c19dfc9ae8f6d1af321a775d Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 10 Sep 2019 14:37:16 +0200 Subject: [PATCH 1/8] discovery/gossiper: extract adding nodeAnnouncement into method --- discovery/gossiper.go | 56 +++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 7db498e6..f0db2e70 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1454,6 +1454,33 @@ func (d *AuthenticatedGossiper) processRejectedEdge( return announcements, nil } +// addNode processes the given node announcement, and adds it to our channel +// graph. +func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error { + if err := routing.ValidateNodeAnn(msg); err != nil { + return fmt.Errorf("unable to validate node announcement: %v", + err) + } + + timestamp := time.Unix(int64(msg.Timestamp), 0) + features := lnwire.NewFeatureVector( + msg.Features, lnwire.GlobalFeatures, + ) + node := &channeldb.LightningNode{ + HaveNodeAnnouncement: true, + LastUpdate: timestamp, + Addresses: msg.Addresses, + PubKeyBytes: msg.NodeID, + Alias: msg.Alias.String(), + AuthSigBytes: msg.Signature.ToSignatureBytes(), + Features: features, + Color: msg.RGBColor, + ExtraOpaqueData: msg.ExtraOpaqueData, + } + + return d.cfg.Router.AddNode(node) +} + // processNetworkAnnouncement processes a new network relate authenticated // channel or node announcement or announcements proofs. If the announcement // didn't affect the internal state due to either being out of date, invalid, @@ -1487,30 +1514,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - if err := routing.ValidateNodeAnn(msg); err != nil { - err := fmt.Errorf("unable to validate "+ - "node announcement: %v", err) - log.Error(err) - nMsg.err <- err - return nil - } - - features := lnwire.NewFeatureVector( - msg.Features, lnwire.GlobalFeatures, - ) - node := &channeldb.LightningNode{ - HaveNodeAnnouncement: true, - LastUpdate: timestamp, - Addresses: msg.Addresses, - PubKeyBytes: msg.NodeID, - Alias: msg.Alias.String(), - AuthSigBytes: msg.Signature.ToSignatureBytes(), - Features: features, - Color: msg.RGBColor, - ExtraOpaqueData: msg.ExtraOpaqueData, - } - - if err := d.cfg.Router.AddNode(node); err != nil { + if err := d.addNode(msg); err != nil { if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { @@ -1526,10 +1530,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // In order to ensure we don't leak unadvertised nodes, we'll // make a quick check to ensure this node intends to publicly // advertise itself to the network. - isPublic, err := d.cfg.Router.IsPublicNode(node.PubKeyBytes) + isPublic, err := d.cfg.Router.IsPublicNode(msg.NodeID) if err != nil { log.Errorf("Unable to determine if node %x is "+ - "advertised: %v", node.PubKeyBytes, err) + "advertised: %v", msg.NodeID, err) nMsg.err <- err return nil } From 74c9551564708c74f36105729a4e05110a92cdb6 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 10 Sep 2019 14:37:17 +0200 Subject: [PATCH 2/8] discovery+server: make RebroadcastInterval part of config --- discovery/gossiper.go | 17 +++++++++++------ server.go | 1 + 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index f0db2e70..0e0e9e9e 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -160,6 +160,13 @@ type Config struct { // should check if we need re-broadcast any of our personal channels. RetransmitDelay time.Duration + // RebroadcastInterval is the maximum time we wait between sending out + // channel updates for our active channels and our own node + // announcement. We do this to ensure our active presence on the + // network is known, and we are not being considered a zombie node or + // having zombie channels. + RebroadcastInterval time.Duration + // WaitingProofStore is a persistent storage of partial channel proof // announcement messages. We use it to buffer half of the material // needed to reconstruct a full authenticated channel announcement. @@ -1193,14 +1200,12 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error { return nil } - const broadcastInterval = time.Hour * 24 - timeElapsed := time.Since(edge.LastUpdate) - // If it's been a full day since we've re-broadcasted the - // channel, add the channel to the set of edges we need to - // update. - if timeElapsed >= broadcastInterval { + // If it's been longer than RebroadcastInterval since we've + // re-broadcasted the channel, add the channel to the set of + // edges we need to update. + if timeElapsed >= d.cfg.RebroadcastInterval { edgesToUpdate = append(edgesToUpdate, updateTuple{ info: info, edge: edge, diff --git a/server.go b/server.go index c47be832..636ec38b 100644 --- a/server.go +++ b/server.go @@ -720,6 +720,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, ProofMatureDelta: 0, TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), RetransmitDelay: time.Minute * 30, + RebroadcastInterval: time.Hour * 24, WaitingProofStore: waitingProofStore, MessageStore: gossipMessageStore, AnnSigner: s.nodeSigner, From e201fbe396113955d51d9b28343f92108275269f Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 10 Sep 2019 14:37:17 +0200 Subject: [PATCH 3/8] discovery+server: RetransmitDelay->RetransmitTicker Also let retransmitStaleChannels take a timestamp, to make it easier to test. --- discovery/gossiper.go | 21 +++++++++++---------- discovery/gossiper_test.go | 9 +++++++-- server.go | 2 +- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 0e0e9e9e..91ab4d5a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -156,9 +156,10 @@ type Config struct { // the last trickle tick. TrickleDelay time.Duration - // RetransmitDelay is the period of a timer which indicates that we - // should check if we need re-broadcast any of our personal channels. - RetransmitDelay time.Duration + // RetransmitTicker is a ticker that ticks with a period which + // indicates that we should check if we need re-broadcast any of our + // personal channels. + RetransmitTicker ticker.Ticker // RebroadcastInterval is the maximum time we wait between sending out // channel updates for our active channels and our own node @@ -888,15 +889,15 @@ func (d *AuthenticatedGossiper) networkHandler() { announcements := deDupedAnnouncements{} announcements.Reset() - retransmitTimer := time.NewTicker(d.cfg.RetransmitDelay) - defer retransmitTimer.Stop() + d.cfg.RetransmitTicker.Resume() + defer d.cfg.RetransmitTicker.Stop() trickleTimer := time.NewTicker(d.cfg.TrickleDelay) defer trickleTimer.Stop() // To start, we'll first check to see if there are any stale channels // that we need to re-transmit. - if err := d.retransmitStaleChannels(); err != nil { + if err := d.retransmitStaleChannels(time.Now()); err != nil { log.Errorf("Unable to rebroadcast stale channels: %v", err) } @@ -1111,8 +1112,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // personal channels. This addresses the case of "zombie" // channels and channel advertisements that have been dropped, // or not properly propagated through the network. - case <-retransmitTimer.C: - if err := d.retransmitStaleChannels(); err != nil { + case tick := <-d.cfg.RetransmitTicker.Ticks(): + if err := d.retransmitStaleChannels(tick); err != nil { log.Errorf("unable to rebroadcast stale "+ "channels: %v", err) } @@ -1166,7 +1167,7 @@ func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool { // is known to maintain to check to see if any of them are "stale". A channel // is stale iff, the last timestamp of its rebroadcast is older then // broadcastInterval. -func (d *AuthenticatedGossiper) retransmitStaleChannels() error { +func (d *AuthenticatedGossiper) retransmitStaleChannels(now time.Time) error { // Iterate over all of our channels and check if any of them fall // within the prune interval or re-broadcast interval. type updateTuple struct { @@ -1200,7 +1201,7 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error { return nil } - timeElapsed := time.Since(edge.LastUpdate) + timeElapsed := now.Sub(edge.LastUpdate) // If it's been longer than RebroadcastInterval since we've // re-broadcasted the channel, add the channel to the set of diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 2a92c26e..54b674fa 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -58,6 +58,8 @@ var ( trickleDelay = time.Millisecond * 100 retransmitDelay = time.Hour * 1 proofMatureDelta uint32 + + rebroadcastInterval = time.Hour * 1000000 ) // makeTestDB creates a new instance of the ChannelDB for testing purposes. A @@ -741,7 +743,8 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { }, Router: router, TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, + RetransmitTicker: ticker.NewForce(retransmitDelay), + RebroadcastInterval: rebroadcastInterval, ProofMatureDelta: proofMatureDelta, WaitingProofStore: waitingProofStore, MessageStore: newMockMessageStore(), @@ -1493,7 +1496,8 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, Router: ctx.gossiper.cfg.Router, TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, + RetransmitTicker: ticker.NewForce(retransmitDelay), + RebroadcastInterval: rebroadcastInterval, ProofMatureDelta: proofMatureDelta, WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, MessageStore: ctx.gossiper.cfg.MessageStore, @@ -1657,6 +1661,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } + select { case msg := <-sentToPeer: assertMessage(t, batch.chanUpdAnn1, msg) diff --git a/server.go b/server.go index 636ec38b..1fff76a6 100644 --- a/server.go +++ b/server.go @@ -719,7 +719,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, NotifyWhenOffline: s.NotifyWhenOffline, ProofMatureDelta: 0, TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), - RetransmitDelay: time.Minute * 30, + RetransmitTicker: ticker.New(time.Minute * 30), RebroadcastInterval: time.Hour * 24, WaitingProofStore: waitingProofStore, MessageStore: gossipMessageStore, From 8b9fd039ec6cdb24a979754357aaf7a50f9a41bf Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 10 Sep 2019 14:37:17 +0200 Subject: [PATCH 4/8] discovery/gossiper test: remove mockGraphSource.SelfEdges --- discovery/gossiper_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 54b674fa..63b566ae 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -165,10 +165,6 @@ func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error { return nil } -func (r *mockGraphSource) SelfEdges() ([]*channeldb.ChannelEdgePolicy, error) { - return nil, nil -} - func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) { return r.bestHeight, nil } From 70d63abe9f3eb16ee010e8b66e011c46f2fe0d3e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 10 Sep 2019 14:37:17 +0200 Subject: [PATCH 5/8] discovery/test: set global test timestamp --- discovery/gossiper_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 63b566ae..dee6d847 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -59,6 +59,9 @@ var ( retransmitDelay = time.Hour * 1 proofMatureDelta uint32 + // The test timestamp + rebroadcast interval makes sure messages won't + // be rebroadcasted automaticallty during the tests. + testTimestamp = uint32(1234567890) rebroadcastInterval = time.Hour * 1000000 ) @@ -469,7 +472,7 @@ type annBatch struct { func createAnnouncements(blockHeight uint32) (*annBatch, error) { var err error var batch annBatch - timestamp := uint32(123456) + timestamp := testTimestamp batch.nodeAnn1, err = createNodeAnnouncement(nodeKeyPriv1, timestamp) if err != nil { @@ -779,8 +782,7 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { func TestProcessAnnouncement(t *testing.T) { t.Parallel() - timestamp := uint32(123456) - + timestamp := testTimestamp ctx, cleanup, err := createTestCtx(0) if err != nil { t.Fatalf("can't create context: %v", err) @@ -888,7 +890,7 @@ func TestProcessAnnouncement(t *testing.T) { func TestPrematureAnnouncement(t *testing.T) { t.Parallel() - timestamp := uint32(123456) + timestamp := testTimestamp ctx, cleanup, err := createTestCtx(0) if err != nil { @@ -1798,7 +1800,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { func TestDeDuplicatedAnnouncements(t *testing.T) { t.Parallel() - timestamp := uint32(123456) + timestamp := testTimestamp announcements := deDupedAnnouncements{} announcements.Reset() @@ -2668,6 +2670,7 @@ func TestExtraDataChannelAnnouncementValidation(t *testing.T) { func TestExtraDataChannelUpdateValidation(t *testing.T) { t.Parallel() + timestamp := testTimestamp ctx, cleanup, err := createTestCtx(0) if err != nil { t.Fatalf("can't create context: %v", err) @@ -2675,7 +2678,6 @@ func TestExtraDataChannelUpdateValidation(t *testing.T) { defer cleanup() remotePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil} - timestamp := uint32(123456) // In this scenario, we'll create two announcements, one regular // channel announcement, and another channel update announcement, that @@ -2742,7 +2744,7 @@ func TestExtraDataNodeAnnouncementValidation(t *testing.T) { defer cleanup() remotePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil} - timestamp := uint32(123456) + timestamp := testTimestamp // We'll create a node announcement that includes a set of opaque data // which we don't know of, but will store anyway in order to ensure From e36d15582cb11bf70fed34409cd693c3a06725e5 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 10 Sep 2019 14:37:17 +0200 Subject: [PATCH 6/8] discovery/gossiper test: add TestRetransmit This commit adds a test that ensures outdated announcements are retransmitted when the RetransmitTicker ticks. --- discovery/gossiper_test.go | 165 +++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index dee6d847..40d68054 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -2766,6 +2766,171 @@ func TestExtraDataNodeAnnouncementValidation(t *testing.T) { } } +// assertBroadcast checks that num messages are being broadcasted from the +// gossiper. The broadcasted messages are returned. +func assertBroadcast(t *testing.T, ctx *testCtx, num int) []lnwire.Message { + t.Helper() + + var msgs []lnwire.Message + for i := 0; i < num; i++ { + select { + case msg := <-ctx.broadcastedMessage: + msgs = append(msgs, msg.msg) + case <-time.After(time.Second): + t.Fatalf("expected %d messages to be broadcast, only "+ + "got %d", num, i) + } + } + + // No more messages should be broadcast. + select { + case msg := <-ctx.broadcastedMessage: + t.Fatalf("unexpected message was broadcast: %T", msg.msg) + case <-time.After(2 * trickleDelay): + } + + return msgs +} + +// assertProcessAnnouncemnt is a helper method that checks that the result of +// processing an announcement is successful. +func assertProcessAnnouncement(t *testing.T, result chan error) { + t.Helper() + + select { + case err := <-result: + if err != nil { + t.Fatalf("unable to process :%v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("did not process announcement") + } +} + +// TestRetransmit checks that the expected announcements are retransmitted when +// the retransmit ticker ticks. +func TestRetransmit(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(proofMatureDelta) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + remotePeer := &mockPeer{remoteKey, nil, nil} + + // Process a local channel annoucement, channel update and node + // announcement. No messages should be broadcasted yet, since no proof + // has been exchanged. + assertProcessAnnouncement( + t, ctx.gossiper.ProcessLocalAnnouncement( + batch.localChanAnn, localKey, + ), + ) + assertBroadcast(t, ctx, 0) + + assertProcessAnnouncement( + t, ctx.gossiper.ProcessLocalAnnouncement( + batch.chanUpdAnn1, localKey, + ), + ) + assertBroadcast(t, ctx, 0) + + assertProcessAnnouncement( + t, ctx.gossiper.ProcessLocalAnnouncement( + batch.nodeAnn1, localKey, + ), + ) + assertBroadcast(t, ctx, 0) + + // Add the remote channel update to the gossiper. Similarly, nothing + // should be broadcasted. + assertProcessAnnouncement( + t, ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, remotePeer, + ), + ) + assertBroadcast(t, ctx, 0) + + // Now add the local and remote proof to the gossiper, which should + // trigger a broadcast of the announcements. + assertProcessAnnouncement( + t, ctx.gossiper.ProcessLocalAnnouncement( + batch.localProofAnn, localKey, + ), + ) + assertBroadcast(t, ctx, 0) + + assertProcessAnnouncement( + t, ctx.gossiper.ProcessRemoteAnnouncement( + batch.remoteProofAnn, remotePeer, + ), + ) + + // checkAnncouncments make sure the expected number of channel + // announcements + channel updates + node announcements are broadcast. + checkAnnouncements := func(t *testing.T, chanAnns, chanUpds, + nodeAnns int) { + + t.Helper() + + num := chanAnns + chanUpds + nodeAnns + anns := assertBroadcast(t, ctx, num) + + // Count the received announcements. + var chanAnn, chanUpd, nodeAnn int + for _, msg := range anns { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + chanAnn++ + case *lnwire.ChannelUpdate: + chanUpd++ + case *lnwire.NodeAnnouncement: + nodeAnn++ + } + } + + if chanAnn != chanAnns || chanUpd != chanUpds || + nodeAnn != nodeAnns { + t.Fatalf("unexpected number of announcements: "+ + "chanAnn=%d, chanUpd=%d, nodeAnn=%d", + chanAnn, chanUpd, nodeAnn) + } + } + + // All announcements should be broadcast, including the remote channel + // update. + checkAnnouncements(t, 1, 2, 1) + + // Now let the retransmit ticker tick, which should trigger updates to + // be rebroadcast. + now := time.Unix(int64(testTimestamp), 0) + future := now.Add(rebroadcastInterval + 10*time.Second) + select { + case ctx.gossiper.cfg.RetransmitTicker.(*ticker.Force).Force <- future: + case <-time.After(2 * time.Second): + t.Fatalf("unable to force tick") + } + + // The channel announcement + local channel update should be + // re-broadcast. + checkAnnouncements(t, 1, 1, 0) +} + // TestNodeAnnouncementNoChannels tests that NodeAnnouncements for nodes with // no existing channels in the graph do not get forwarded. func TestNodeAnnouncementNoChannels(t *testing.T) { From 24004fcb376e872ea53d418c320b63ea69548d43 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 10 Sep 2019 14:37:18 +0200 Subject: [PATCH 7/8] gossiper+server: define SelfNodeAnnouncement --- discovery/gossiper.go | 6 ++++++ discovery/gossiper_test.go | 6 ++++++ server.go | 17 ++++++++++------- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 91ab4d5a..54d6bd96 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -147,6 +147,12 @@ type Config struct { // notification for when it reconnects. NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} + // SelfNodeAnnouncement is a function that fetches our own current node + // announcement, for use when determining whether we should update our + // peers about our presence on the network. If the refresh is true, a + // new and updated announcement will be returned. + SelfNodeAnnouncement func(refresh bool) (lnwire.NodeAnnouncement, error) + // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. ProofMatureDelta uint32 diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 40d68054..62edd4fb 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -740,6 +740,11 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { c := make(chan struct{}) return c }, + SelfNodeAnnouncement: func(bool) (lnwire.NodeAnnouncement, error) { + return lnwire.NodeAnnouncement{ + Timestamp: testTimestamp, + }, nil + }, Router: router, TrickleDelay: trickleDelay, RetransmitTicker: ticker.NewForce(retransmitDelay), @@ -1492,6 +1497,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { Broadcast: ctx.gossiper.cfg.Broadcast, NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, + SelfNodeAnnouncement: ctx.gossiper.cfg.SelfNodeAnnouncement, Router: ctx.gossiper.cfg.Router, TrickleDelay: trickleDelay, RetransmitTicker: ticker.NewForce(retransmitDelay), diff --git a/server.go b/server.go index 1fff76a6..a002aab9 100644 --- a/server.go +++ b/server.go @@ -710,13 +710,16 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, } s.authGossiper = discovery.New(discovery.Config{ - Router: s.chanRouter, - Notifier: s.cc.chainNotifier, - ChainHash: *activeNetParams.GenesisHash, - Broadcast: s.BroadcastMessage, - ChanSeries: chanSeries, - NotifyWhenOnline: s.NotifyWhenOnline, - NotifyWhenOffline: s.NotifyWhenOffline, + Router: s.chanRouter, + Notifier: s.cc.chainNotifier, + ChainHash: *activeNetParams.GenesisHash, + Broadcast: s.BroadcastMessage, + ChanSeries: chanSeries, + NotifyWhenOnline: s.NotifyWhenOnline, + NotifyWhenOffline: s.NotifyWhenOffline, + SelfNodeAnnouncement: func(refresh bool) (lnwire.NodeAnnouncement, error) { + return s.genNodeAnnouncement(refresh) + }, ProofMatureDelta: 0, TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), RetransmitTicker: ticker.New(time.Minute * 30), From 92123c603d60ad20cb7702cec7e0417e73e1fd91 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 10 Sep 2019 14:37:18 +0200 Subject: [PATCH 8/8] gossiper: retransmit self NodeAnnouncement --- discovery/gossiper.go | 83 ++++++++++++++++++++++++++++++-------- discovery/gossiper_test.go | 6 +-- 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 54d6bd96..b943cfe3 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -901,10 +901,10 @@ func (d *AuthenticatedGossiper) networkHandler() { trickleTimer := time.NewTicker(d.cfg.TrickleDelay) defer trickleTimer.Stop() - // To start, we'll first check to see if there are any stale channels - // that we need to re-transmit. - if err := d.retransmitStaleChannels(time.Now()); err != nil { - log.Errorf("Unable to rebroadcast stale channels: %v", err) + // To start, we'll first check to see if there are any stale channel or + // node announcements that we need to re-transmit. + if err := d.retransmitStaleAnns(time.Now()); err != nil { + log.Errorf("Unable to rebroadcast stale announcements: %v", err) } // We'll use this validation to ensure that we process jobs in their @@ -1115,13 +1115,14 @@ func (d *AuthenticatedGossiper) networkHandler() { // The retransmission timer has ticked which indicates that we // should check if we need to prune or re-broadcast any of our - // personal channels. This addresses the case of "zombie" - // channels and channel advertisements that have been dropped, - // or not properly propagated through the network. + // personal channels or node announcement. This addresses the + // case of "zombie" channels and channel advertisements that + // have been dropped, or not properly propagated through the + // network. case tick := <-d.cfg.RetransmitTicker.Ticks(): - if err := d.retransmitStaleChannels(tick); err != nil { + if err := d.retransmitStaleAnns(tick); err != nil { log.Errorf("unable to rebroadcast stale "+ - "channels: %v", err) + "announcements: %v", err) } // The gossiper has been signalled to exit, to we exit our @@ -1169,18 +1170,23 @@ func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool { } } -// retransmitStaleChannels examines all outgoing channels that the source node -// is known to maintain to check to see if any of them are "stale". A channel -// is stale iff, the last timestamp of its rebroadcast is older then -// broadcastInterval. -func (d *AuthenticatedGossiper) retransmitStaleChannels(now time.Time) error { +// retransmitStaleAnns examines all outgoing channels that the source node is +// known to maintain to check to see if any of them are "stale". A channel is +// stale iff, the last timestamp of its rebroadcast is older than the +// RebroadcastInterval. We also check if a refreshed node announcement should +// be resent. +func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error { // Iterate over all of our channels and check if any of them fall // within the prune interval or re-broadcast interval. type updateTuple struct { info *channeldb.ChannelEdgeInfo edge *channeldb.ChannelEdgePolicy } - var edgesToUpdate []updateTuple + + var ( + havePublicChannels bool + edgesToUpdate []updateTuple + ) err := d.cfg.Router.ForAllOutgoingChannels(func( info *channeldb.ChannelEdgeInfo, edge *channeldb.ChannelEdgePolicy) error { @@ -1196,6 +1202,11 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels(now time.Time) error { return nil } + // We make a note that we have at least one public channel. We + // use this to determine whether we should send a node + // announcement below. + havePublicChannels = true + // If this edge has a ChannelUpdate that was created before the // introduction of the MaxHTLC field, then we'll update this // edge to propagate this information in the network. @@ -1246,13 +1257,51 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels(now time.Time) error { signedUpdates = append(signedUpdates, chanUpdate) } - // If we don't have any channels to re-broadcast, then we'll exit + // If we don't have any public channels, we return as we don't want to + // broadcast anything that would reveal our existence. + if !havePublicChannels { + return nil + } + + // We'll also check that our NodeAnnouncement is not too old. + currentNodeAnn, err := d.cfg.SelfNodeAnnouncement(false) + if err != nil { + return fmt.Errorf("unable to get current node announment: %v", + err) + } + + timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0) + timeElapsed := now.Sub(timestamp) + + // If it's been a full day since we've re-broadcasted the + // node announcement, refresh it and resend it. + nodeAnnStr := "" + if timeElapsed >= d.cfg.RebroadcastInterval { + newNodeAnn, err := d.cfg.SelfNodeAnnouncement(true) + if err != nil { + return fmt.Errorf("unable to get refreshed node "+ + "announcement: %v", err) + } + + signedUpdates = append(signedUpdates, &newNodeAnn) + nodeAnnStr = " and our refreshed node announcement" + + // Before broadcasting the refreshed node announcement, add it + // to our own graph. + if err := d.addNode(&newNodeAnn); err != nil { + log.Errorf("Unable to add refreshed node announcement "+ + "to graph: %v", err) + } + } + + // If we don't have any updates to re-broadcast, then we'll exit // early. if len(signedUpdates) == 0 { return nil } - log.Infof("Retransmitting %v outgoing channels", len(edgesToUpdate)) + log.Infof("Retransmitting %v outgoing channels%v", + len(edgesToUpdate), nodeAnnStr) // With all the wire announcements properly crafted, we'll broadcast // our known outgoing channels to all our immediate peers. diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 62edd4fb..5f224d58 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -2932,9 +2932,9 @@ func TestRetransmit(t *testing.T) { t.Fatalf("unable to force tick") } - // The channel announcement + local channel update should be - // re-broadcast. - checkAnnouncements(t, 1, 1, 0) + // The channel announcement + local channel update + node announcement + // should be re-broadcast. + checkAnnouncements(t, 1, 1, 1) } // TestNodeAnnouncementNoChannels tests that NodeAnnouncements for nodes with