From 555de44d9fe96cb9762b509b1329594a4bdb6b52 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 9 Feb 2021 19:55:45 -0800 Subject: [PATCH 1/2] Revert "Merge pull request #4895 from wpaulino/disallow-premature-chan-updates" This reverts commit 6e6384114c890cdfd486ace5885118150940df86, reversing changes made to 98ea4332716f953c039308c4e28cb4e55f8f89bc. --- discovery/gossiper.go | 111 +++++++++++++--- discovery/gossiper_test.go | 255 ++++++++++++++++++++++++++++++++++--- 2 files changed, 333 insertions(+), 33 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 8a79a4a7..4f792fd4 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -274,6 +274,13 @@ type AuthenticatedGossiper struct { // every new block height. blockEpochs *chainntnfs.BlockEpochEvent + // prematureChannelUpdates is a map of ChannelUpdates we have received + // that wasn't associated with any channel we know about. We store + // them temporarily, such that we can reprocess them when a + // ChannelAnnouncement for the channel is received. + prematureChannelUpdates map[uint64][]*networkMsg + pChanUpdMtx sync.Mutex + // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the // networkHandler. @@ -330,6 +337,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { networkMsgs: make(chan *networkMsg), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), + prematureChannelUpdates: make(map[uint64][]*networkMsg), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), heightForLastChanUpdate: make(map[uint64][2]uint32), @@ -1024,7 +1032,8 @@ func (d *AuthenticatedGossiper) networkHandler() { }() - // A new block has arrived, update our best height. + // A new block has arrived, so we can re-process the previously + // premature announcements. case newBlock, ok := <-d.blockEpochs.Epochs: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. @@ -1691,6 +1700,55 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } + // If we earlier received any ChannelUpdates for this channel, + // we can now process them, as the channel is added to the + // graph. + shortChanID := msg.ShortChannelID.ToUint64() + var channelUpdates []*networkMsg + + d.pChanUpdMtx.Lock() + channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...) + + // Now delete the premature ChannelUpdates, since we added them + // all to the queue of network messages. + delete(d.prematureChannelUpdates, shortChanID) + d.pChanUpdMtx.Unlock() + + // Launch a new goroutine to handle each ChannelUpdate, this to + // ensure we don't block here, as we can handle only one + // announcement at a time. + for _, cu := range channelUpdates { + d.wg.Add(1) + go func(nMsg *networkMsg) { + defer d.wg.Done() + + switch msg := nMsg.msg.(type) { + + // Reprocess the message, making sure we return + // an error to the original caller in case the + // gossiper shuts down. + case *lnwire.ChannelUpdate: + log.Debugf("Reprocessing"+ + " ChannelUpdate for "+ + "shortChanID=%v", + msg.ShortChannelID.ToUint64()) + + select { + case d.networkMsgs <- nMsg: + case <-d.quit: + nMsg.err <- ErrGossiperShuttingDown + } + + // We don't expect any other message type than + // ChannelUpdate to be in this map. + default: + log.Errorf("Unsupported message type "+ + "found among ChannelUpdates: "+ + "%T", msg) + } + }(cu) + } + // Channel announcement was successfully proceeded and know it // might be broadcast to other connected nodes if it was // announcement with proof (remote). @@ -1729,7 +1787,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID := msg.ShortChannelID.ToUint64() // If the advertised inclusionary block is beyond our knowledge - // of the chain tip, then we'll ignore it. + // of the chain tip, then we'll put the announcement in limbo + // to be fully verified once we advance forward in the chain. d.Lock() if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { log.Infof("Update announcement for "+ @@ -1738,8 +1797,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, blockHeight, d.bestHeight) d.Unlock() - - nMsg.err <- nil return nil } d.Unlock() @@ -1810,19 +1867,43 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Debugf("Removed edge with chan_id=%v from zombie "+ "index", msg.ShortChannelID) - nMsg.err <- nil - return nil - + // We'll fallthrough to ensure we stash the update until + // we receive its corresponding ChannelAnnouncement. + // This is needed to ensure the edge exists in the graph + // before applying the update. + fallthrough case channeldb.ErrGraphNotFound: fallthrough case channeldb.ErrGraphNoEdgesFound: fallthrough case channeldb.ErrEdgeNotFound: - log.Debugf("Got ChannelUpdate for edge not found in "+ - "graph(shortChanID=%v)", shortChanID) + // If the edge corresponding to this ChannelUpdate was + // not found in the graph, this might be a channel in + // the process of being opened, and we haven't processed + // our own ChannelAnnouncement yet, hence it is not + // found in the graph. This usually gets resolved after + // the channel proofs are exchanged and the channel is + // broadcasted to the rest of the network, but in case + // this is a private channel this won't ever happen. + // This can also happen in the case of a zombie channel + // with a fresh update for which we don't have a + // ChannelAnnouncement for since we reject them. Because + // of this, we temporarily add it to a map, and + // reprocess it after our own ChannelAnnouncement has + // been processed. + d.pChanUpdMtx.Lock() + d.prematureChannelUpdates[shortChanID] = append( + d.prematureChannelUpdates[shortChanID], nMsg, + ) + d.pChanUpdMtx.Unlock() - // TODO: Add to recentRejects like below? Return error? - nMsg.err <- nil + log.Debugf("Got ChannelUpdate for edge not found in "+ + "graph(shortChanID=%v), saving for "+ + "reprocessing later", shortChanID) + + // NOTE: We don't return anything on the error channel + // for this message, as we expect that will be done when + // this ChannelUpdate is later reprocessed. return nil default: @@ -2255,17 +2336,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( source: nMsg.source, msg: chanAnn, }) - if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil { + if e1Ann != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, - source: src, + source: nMsg.source, msg: e1Ann, }) } - if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil { + if e2Ann != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, - source: src, + source: nMsg.source, msg: e2Ann, }) } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index eebf1bd6..1c1e60ca 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -908,10 +908,8 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): - if err != nil { - t.Fatal(err) - } + case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): + t.Fatal("announcement was proceeded") case <-time.After(100 * time.Millisecond): } @@ -928,10 +926,8 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): - if err != nil { - t.Fatal(err) - } + case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): + t.Fatal("announcement was proceeded") case <-time.After(100 * time.Millisecond): } @@ -2377,13 +2373,27 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { // The channel update cannot be successfully processed and broadcast // until the channel announcement is. Since the channel update indicates - // a fresh new update, the gossiper should mark the channel as live and - // allow it once it sees it again. - errChan := ctx.gossiper.ProcessRemoteAnnouncement( + // a fresh new update, the gossiper should stash it until it sees the + // corresponding channel announcement. + updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement( batch.chanUpdAnn2, remotePeer, ) + select { - case err := <-errChan: + case <-ctx.broadcastedMessage: + t.Fatal("expected to not broadcast live channel update " + + "without announcement") + case <-time.After(2 * trickleDelay): + } + + // We'll go ahead and process the channel announcement to ensure the + // channel update is processed thereafter. + processAnnouncement(batch.remoteChanAnn, false, false) + + // After successfully processing the announcement, the channel update + // should have been processed and broadcast successfully as well. + select { + case err := <-updateErrChan: if err != nil { t.Fatalf("expected to process live channel update: %v", err) @@ -2392,17 +2402,226 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { t.Fatal("expected to process announcement") } + select { + case msgWithSenders := <-ctx.broadcastedMessage: + assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg) + case <-time.After(2 * trickleDelay): + t.Fatal("expected to broadcast live channel update") + } +} + +// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate +// from the remote before we have processed our own ChannelAnnouncement, it will +// be reprocessed later, after our ChannelAnnouncement. +func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. + sentMsgs := make(chan lnwire.Message, 10) + remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } + + // Recreate the case where the remote node is sending us its ChannelUpdate + // before we have been able to process our own ChannelAnnouncement and + // ChannelUpdate. + errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, remotePeer, + ) select { case <-ctx.broadcastedMessage: - t.Fatal("expected to not broadcast live channel update " + - "without announcement") + t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } - // Re-process the channel announcement and update. Both should be - // applied to the graph and broadcast. - processAnnouncement(batch.remoteChanAnn, false, false) - processAnnouncement(batch.chanUpdAnn2, false, false) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer) + if err != nil { + t.Fatalf("unable to process node ann: %v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("node announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Since the remote ChannelUpdate was added for an edge that + // we did not already know about, it should have been added + // to the map of premature ChannelUpdates. Check that nothing + // was added to the graph. + chanInfo, e1, e2, err := ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID) + if err != channeldb.ErrEdgeNotFound { + t.Fatalf("Expected ErrEdgeNotFound, got: %v", err) + } + if chanInfo != nil { + t.Fatalf("chanInfo was not nil") + } + if e1 != nil { + t.Fatalf("e1 was not nil") + } + if e2 != nil { + t.Fatalf("e2 was not nil") + } + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1, localKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("node announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // The local ChannelUpdate should now be sent directly to the remote peer, + // such that the edge can be used for routing, regardless if this channel + // is announced or not (private channel). + select { + case msg := <-sentMsgs: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(1 * time.Second): + t.Fatal("gossiper did not send channel update to peer") + } + + // At this point the remote ChannelUpdate we received earlier should + // be reprocessed, as we now have the necessary edge entry in the graph. + select { + case err := <-errRemoteAnn: + if err != nil { + t.Fatalf("error re-processing remote update: %v", err) + } + case <-time.After(2 * trickleDelay): + t.Fatalf("remote update was not processed") + } + + // Check that the ChannelEdgePolicy was added to the graph. + chanInfo, e1, e2, err = ctx.router.GetChannelByID( + batch.chanUpdAnn1.ShortChannelID, + ) + if err != nil { + t.Fatalf("unable to get channel from router: %v", err) + } + if chanInfo == nil { + t.Fatalf("chanInfo was nil") + } + if e1 == nil { + t.Fatalf("e1 was nil") + } + if e2 == nil { + t.Fatalf("e2 was nil") + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.localProofAnn, localKey, + ) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcements were broadcast") + case <-time.After(2 * trickleDelay): + } + + number := 0 + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + func() { + number = 0 + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 1 { + t.Fatal("wrong number of objects in storage") + } + + err = <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.remoteProofAnn, remotePeer, + ) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + for i := 0; i < 4; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number = 0 + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + func() { + number = 0 + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } } // TestExtraDataChannelAnnouncementValidation tests that we're able to properly From 904003fbcb67df2e939c312759c282cf8f5f92df Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 6 Jan 2021 12:51:22 -0800 Subject: [PATCH 2/2] discovery: use source of ann upon confirmed channel ann batch We do this instead of using the source of the AnnounceSignatures message, as we filter out the source when broadcasting any announcements, leading to the remote node not receiving our channel update. Note that this is done more for the sake of correctness and to address a flake within the integration tests, as channel updates are sent directly and reliably to channel counterparts. --- discovery/gossiper.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 4f792fd4..e669f61a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2336,17 +2336,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( source: nMsg.source, msg: chanAnn, }) - if e1Ann != nil { + if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, - source: nMsg.source, + source: src, msg: e1Ann, }) } - if e2Ann != nil { + if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, - source: nMsg.source, + source: src, msg: e2Ann, }) }