diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 8a79a4a7..4f792fd4 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -274,6 +274,13 @@ type AuthenticatedGossiper struct { // every new block height. blockEpochs *chainntnfs.BlockEpochEvent + // prematureChannelUpdates is a map of ChannelUpdates we have received + // that wasn't associated with any channel we know about. We store + // them temporarily, such that we can reprocess them when a + // ChannelAnnouncement for the channel is received. + prematureChannelUpdates map[uint64][]*networkMsg + pChanUpdMtx sync.Mutex + // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the // networkHandler. @@ -330,6 +337,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { networkMsgs: make(chan *networkMsg), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), + prematureChannelUpdates: make(map[uint64][]*networkMsg), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), heightForLastChanUpdate: make(map[uint64][2]uint32), @@ -1024,7 +1032,8 @@ func (d *AuthenticatedGossiper) networkHandler() { }() - // A new block has arrived, update our best height. + // A new block has arrived, so we can re-process the previously + // premature announcements. case newBlock, ok := <-d.blockEpochs.Epochs: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. @@ -1691,6 +1700,55 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } + // If we earlier received any ChannelUpdates for this channel, + // we can now process them, as the channel is added to the + // graph. + shortChanID := msg.ShortChannelID.ToUint64() + var channelUpdates []*networkMsg + + d.pChanUpdMtx.Lock() + channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...) + + // Now delete the premature ChannelUpdates, since we added them + // all to the queue of network messages. + delete(d.prematureChannelUpdates, shortChanID) + d.pChanUpdMtx.Unlock() + + // Launch a new goroutine to handle each ChannelUpdate, this to + // ensure we don't block here, as we can handle only one + // announcement at a time. + for _, cu := range channelUpdates { + d.wg.Add(1) + go func(nMsg *networkMsg) { + defer d.wg.Done() + + switch msg := nMsg.msg.(type) { + + // Reprocess the message, making sure we return + // an error to the original caller in case the + // gossiper shuts down. + case *lnwire.ChannelUpdate: + log.Debugf("Reprocessing"+ + " ChannelUpdate for "+ + "shortChanID=%v", + msg.ShortChannelID.ToUint64()) + + select { + case d.networkMsgs <- nMsg: + case <-d.quit: + nMsg.err <- ErrGossiperShuttingDown + } + + // We don't expect any other message type than + // ChannelUpdate to be in this map. + default: + log.Errorf("Unsupported message type "+ + "found among ChannelUpdates: "+ + "%T", msg) + } + }(cu) + } + // Channel announcement was successfully proceeded and know it // might be broadcast to other connected nodes if it was // announcement with proof (remote). @@ -1729,7 +1787,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID := msg.ShortChannelID.ToUint64() // If the advertised inclusionary block is beyond our knowledge - // of the chain tip, then we'll ignore it. + // of the chain tip, then we'll put the announcement in limbo + // to be fully verified once we advance forward in the chain. d.Lock() if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { log.Infof("Update announcement for "+ @@ -1738,8 +1797,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, blockHeight, d.bestHeight) d.Unlock() - - nMsg.err <- nil return nil } d.Unlock() @@ -1810,19 +1867,43 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Debugf("Removed edge with chan_id=%v from zombie "+ "index", msg.ShortChannelID) - nMsg.err <- nil - return nil - + // We'll fallthrough to ensure we stash the update until + // we receive its corresponding ChannelAnnouncement. + // This is needed to ensure the edge exists in the graph + // before applying the update. + fallthrough case channeldb.ErrGraphNotFound: fallthrough case channeldb.ErrGraphNoEdgesFound: fallthrough case channeldb.ErrEdgeNotFound: - log.Debugf("Got ChannelUpdate for edge not found in "+ - "graph(shortChanID=%v)", shortChanID) + // If the edge corresponding to this ChannelUpdate was + // not found in the graph, this might be a channel in + // the process of being opened, and we haven't processed + // our own ChannelAnnouncement yet, hence it is not + // found in the graph. This usually gets resolved after + // the channel proofs are exchanged and the channel is + // broadcasted to the rest of the network, but in case + // this is a private channel this won't ever happen. + // This can also happen in the case of a zombie channel + // with a fresh update for which we don't have a + // ChannelAnnouncement for since we reject them. Because + // of this, we temporarily add it to a map, and + // reprocess it after our own ChannelAnnouncement has + // been processed. + d.pChanUpdMtx.Lock() + d.prematureChannelUpdates[shortChanID] = append( + d.prematureChannelUpdates[shortChanID], nMsg, + ) + d.pChanUpdMtx.Unlock() - // TODO: Add to recentRejects like below? Return error? - nMsg.err <- nil + log.Debugf("Got ChannelUpdate for edge not found in "+ + "graph(shortChanID=%v), saving for "+ + "reprocessing later", shortChanID) + + // NOTE: We don't return anything on the error channel + // for this message, as we expect that will be done when + // this ChannelUpdate is later reprocessed. return nil default: @@ -2255,17 +2336,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( source: nMsg.source, msg: chanAnn, }) - if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil { + if e1Ann != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, - source: src, + source: nMsg.source, msg: e1Ann, }) } - if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil { + if e2Ann != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, - source: src, + source: nMsg.source, msg: e2Ann, }) } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index eebf1bd6..1c1e60ca 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -908,10 +908,8 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): - if err != nil { - t.Fatal(err) - } + case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): + t.Fatal("announcement was proceeded") case <-time.After(100 * time.Millisecond): } @@ -928,10 +926,8 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): - if err != nil { - t.Fatal(err) - } + case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): + t.Fatal("announcement was proceeded") case <-time.After(100 * time.Millisecond): } @@ -2377,13 +2373,27 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { // The channel update cannot be successfully processed and broadcast // until the channel announcement is. Since the channel update indicates - // a fresh new update, the gossiper should mark the channel as live and - // allow it once it sees it again. - errChan := ctx.gossiper.ProcessRemoteAnnouncement( + // a fresh new update, the gossiper should stash it until it sees the + // corresponding channel announcement. + updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement( batch.chanUpdAnn2, remotePeer, ) + select { - case err := <-errChan: + case <-ctx.broadcastedMessage: + t.Fatal("expected to not broadcast live channel update " + + "without announcement") + case <-time.After(2 * trickleDelay): + } + + // We'll go ahead and process the channel announcement to ensure the + // channel update is processed thereafter. + processAnnouncement(batch.remoteChanAnn, false, false) + + // After successfully processing the announcement, the channel update + // should have been processed and broadcast successfully as well. + select { + case err := <-updateErrChan: if err != nil { t.Fatalf("expected to process live channel update: %v", err) @@ -2392,17 +2402,226 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { t.Fatal("expected to process announcement") } + select { + case msgWithSenders := <-ctx.broadcastedMessage: + assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg) + case <-time.After(2 * trickleDelay): + t.Fatal("expected to broadcast live channel update") + } +} + +// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate +// from the remote before we have processed our own ChannelAnnouncement, it will +// be reprocessed later, after our ChannelAnnouncement. +func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. + sentMsgs := make(chan lnwire.Message, 10) + remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } + + // Recreate the case where the remote node is sending us its ChannelUpdate + // before we have been able to process our own ChannelAnnouncement and + // ChannelUpdate. + errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, remotePeer, + ) select { case <-ctx.broadcastedMessage: - t.Fatal("expected to not broadcast live channel update " + - "without announcement") + t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } - // Re-process the channel announcement and update. Both should be - // applied to the graph and broadcast. - processAnnouncement(batch.remoteChanAnn, false, false) - processAnnouncement(batch.chanUpdAnn2, false, false) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer) + if err != nil { + t.Fatalf("unable to process node ann: %v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("node announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Since the remote ChannelUpdate was added for an edge that + // we did not already know about, it should have been added + // to the map of premature ChannelUpdates. Check that nothing + // was added to the graph. + chanInfo, e1, e2, err := ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID) + if err != channeldb.ErrEdgeNotFound { + t.Fatalf("Expected ErrEdgeNotFound, got: %v", err) + } + if chanInfo != nil { + t.Fatalf("chanInfo was not nil") + } + if e1 != nil { + t.Fatalf("e1 was not nil") + } + if e2 != nil { + t.Fatalf("e2 was not nil") + } + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1, localKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("node announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // The local ChannelUpdate should now be sent directly to the remote peer, + // such that the edge can be used for routing, regardless if this channel + // is announced or not (private channel). + select { + case msg := <-sentMsgs: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(1 * time.Second): + t.Fatal("gossiper did not send channel update to peer") + } + + // At this point the remote ChannelUpdate we received earlier should + // be reprocessed, as we now have the necessary edge entry in the graph. + select { + case err := <-errRemoteAnn: + if err != nil { + t.Fatalf("error re-processing remote update: %v", err) + } + case <-time.After(2 * trickleDelay): + t.Fatalf("remote update was not processed") + } + + // Check that the ChannelEdgePolicy was added to the graph. + chanInfo, e1, e2, err = ctx.router.GetChannelByID( + batch.chanUpdAnn1.ShortChannelID, + ) + if err != nil { + t.Fatalf("unable to get channel from router: %v", err) + } + if chanInfo == nil { + t.Fatalf("chanInfo was nil") + } + if e1 == nil { + t.Fatalf("e1 was nil") + } + if e2 == nil { + t.Fatalf("e2 was nil") + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.localProofAnn, localKey, + ) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcements were broadcast") + case <-time.After(2 * trickleDelay): + } + + number := 0 + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + func() { + number = 0 + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 1 { + t.Fatal("wrong number of objects in storage") + } + + err = <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.remoteProofAnn, remotePeer, + ) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + for i := 0; i < 4; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number = 0 + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + func() { + number = 0 + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } } // TestExtraDataChannelAnnouncementValidation tests that we're able to properly