diff --git a/discovery/gossiper.go b/discovery/gossiper.go index b29c23e8..d8a8efe5 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -265,13 +265,6 @@ type AuthenticatedGossiper struct { // every new block height. blockEpochs *chainntnfs.BlockEpochEvent - // prematureChannelUpdates is a map of ChannelUpdates we have received - // that wasn't associated with any channel we know about. We store - // them temporarily, such that we can reprocess them when a - // ChannelAnnouncement for the channel is received. - prematureChannelUpdates map[uint64][]*networkMsg - pChanUpdMtx sync.Mutex - // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the // networkHandler. @@ -328,7 +321,6 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { networkMsgs: make(chan *networkMsg), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), - prematureChannelUpdates: make(map[uint64][]*networkMsg), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), heightForLastChanUpdate: make(map[uint64][2]uint32), @@ -1022,8 +1014,7 @@ func (d *AuthenticatedGossiper) networkHandler() { }() - // A new block has arrived, so we can re-process the previously - // premature announcements. + // A new block has arrived, update our best height. case newBlock, ok := <-d.blockEpochs.Epochs: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. @@ -1690,55 +1681,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - // If we earlier received any ChannelUpdates for this channel, - // we can now process them, as the channel is added to the - // graph. - shortChanID := msg.ShortChannelID.ToUint64() - var channelUpdates []*networkMsg - - d.pChanUpdMtx.Lock() - channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...) - - // Now delete the premature ChannelUpdates, since we added them - // all to the queue of network messages. - delete(d.prematureChannelUpdates, shortChanID) - d.pChanUpdMtx.Unlock() - - // Launch a new goroutine to handle each ChannelUpdate, this to - // ensure we don't block here, as we can handle only one - // announcement at a time. - for _, cu := range channelUpdates { - d.wg.Add(1) - go func(nMsg *networkMsg) { - defer d.wg.Done() - - switch msg := nMsg.msg.(type) { - - // Reprocess the message, making sure we return - // an error to the original caller in case the - // gossiper shuts down. - case *lnwire.ChannelUpdate: - log.Debugf("Reprocessing"+ - " ChannelUpdate for "+ - "shortChanID=%v", - msg.ShortChannelID.ToUint64()) - - select { - case d.networkMsgs <- nMsg: - case <-d.quit: - nMsg.err <- ErrGossiperShuttingDown - } - - // We don't expect any other message type than - // ChannelUpdate to be in this map. - default: - log.Errorf("Unsupported message type "+ - "found among ChannelUpdates: "+ - "%T", msg) - } - }(cu) - } - // Channel announcement was successfully proceeded and know it // might be broadcast to other connected nodes if it was // announcement with proof (remote). @@ -1777,8 +1719,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID := msg.ShortChannelID.ToUint64() // If the advertised inclusionary block is beyond our knowledge - // of the chain tip, then we'll put the announcement in limbo - // to be fully verified once we advance forward in the chain. + // of the chain tip, then we'll ignore it. d.Lock() if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { log.Infof("Update announcement for "+ @@ -1787,6 +1728,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, blockHeight, d.bestHeight) d.Unlock() + + nMsg.err <- nil return nil } d.Unlock() @@ -1857,43 +1800,19 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Debugf("Removed edge with chan_id=%v from zombie "+ "index", msg.ShortChannelID) - // We'll fallthrough to ensure we stash the update until - // we receive its corresponding ChannelAnnouncement. - // This is needed to ensure the edge exists in the graph - // before applying the update. - fallthrough + nMsg.err <- nil + return nil + case channeldb.ErrGraphNotFound: fallthrough case channeldb.ErrGraphNoEdgesFound: fallthrough case channeldb.ErrEdgeNotFound: - // If the edge corresponding to this ChannelUpdate was - // not found in the graph, this might be a channel in - // the process of being opened, and we haven't processed - // our own ChannelAnnouncement yet, hence it is not - // found in the graph. This usually gets resolved after - // the channel proofs are exchanged and the channel is - // broadcasted to the rest of the network, but in case - // this is a private channel this won't ever happen. - // This can also happen in the case of a zombie channel - // with a fresh update for which we don't have a - // ChannelAnnouncement for since we reject them. Because - // of this, we temporarily add it to a map, and - // reprocess it after our own ChannelAnnouncement has - // been processed. - d.pChanUpdMtx.Lock() - d.prematureChannelUpdates[shortChanID] = append( - d.prematureChannelUpdates[shortChanID], nMsg, - ) - d.pChanUpdMtx.Unlock() - log.Debugf("Got ChannelUpdate for edge not found in "+ - "graph(shortChanID=%v), saving for "+ - "reprocessing later", shortChanID) + "graph(shortChanID=%v)", shortChanID) - // NOTE: We don't return anything on the error channel - // for this message, as we expect that will be done when - // this ChannelUpdate is later reprocessed. + // TODO: Add to recentRejects like below? Return error? + nMsg.err <- nil return nil default: diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index c3ed17b1..b3bccdb4 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -907,8 +907,10 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): - t.Fatal("announcement was proceeded") + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): + if err != nil { + t.Fatal(err) + } case <-time.After(100 * time.Millisecond): } @@ -925,8 +927,10 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): - t.Fatal("announcement was proceeded") + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): + if err != nil { + t.Fatal(err) + } case <-time.After(100 * time.Millisecond): } @@ -2372,27 +2376,13 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { // The channel update cannot be successfully processed and broadcast // until the channel announcement is. Since the channel update indicates - // a fresh new update, the gossiper should stash it until it sees the - // corresponding channel announcement. - updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement( + // a fresh new update, the gossiper should mark the channel as live and + // allow it once it sees it again. + errChan := ctx.gossiper.ProcessRemoteAnnouncement( batch.chanUpdAnn2, remotePeer, ) - select { - case <-ctx.broadcastedMessage: - t.Fatal("expected to not broadcast live channel update " + - "without announcement") - case <-time.After(2 * trickleDelay): - } - - // We'll go ahead and process the channel announcement to ensure the - // channel update is processed thereafter. - processAnnouncement(batch.remoteChanAnn, false, false) - - // After successfully processing the announcement, the channel update - // should have been processed and broadcast successfully as well. - select { - case err := <-updateErrChan: + case err := <-errChan: if err != nil { t.Fatalf("expected to process live channel update: %v", err) @@ -2401,226 +2391,17 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { t.Fatal("expected to process announcement") } - select { - case msgWithSenders := <-ctx.broadcastedMessage: - assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg) - case <-time.After(2 * trickleDelay): - t.Fatal("expected to broadcast live channel update") - } -} - -// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate -// from the remote before we have processed our own ChannelAnnouncement, it will -// be reprocessed later, after our ChannelAnnouncement. -func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { - t.Parallel() - - ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) - if err != nil { - t.Fatalf("can't create context: %v", err) - } - defer cleanup() - - batch, err := createAnnouncements(0) - if err != nil { - t.Fatalf("can't generate announcements: %v", err) - } - - localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) - if err != nil { - t.Fatalf("unable to parse pubkey: %v", err) - } - remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) - if err != nil { - t.Fatalf("unable to parse pubkey: %v", err) - } - - // Set up a channel that we can use to inspect the messages sent - // directly from the gossiper. - sentMsgs := make(chan lnwire.Message, 10) - remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} - - // Override NotifyWhenOnline to return the remote peer which we expect - // meesages to be sent to. - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, - peerChan chan<- lnpeer.Peer) { - - peerChan <- remotePeer - } - - // Recreate the case where the remote node is sending us its ChannelUpdate - // before we have been able to process our own ChannelAnnouncement and - // ChannelUpdate. - errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, - ) select { case <-ctx.broadcastedMessage: - t.Fatal("channel update announcement was broadcast") + t.Fatal("expected to not broadcast live channel update " + + "without announcement") case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer) - if err != nil { - t.Fatalf("unable to process node ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("node announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - // Since the remote ChannelUpdate was added for an edge that - // we did not already know about, it should have been added - // to the map of premature ChannelUpdates. Check that nothing - // was added to the graph. - chanInfo, e1, e2, err := ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID) - if err != channeldb.ErrEdgeNotFound { - t.Fatalf("Expected ErrEdgeNotFound, got: %v", err) - } - if chanInfo != nil { - t.Fatalf("chanInfo was not nil") - } - if e1 != nil { - t.Fatalf("e1 was not nil") - } - if e2 != nil { - t.Fatalf("e2 was not nil") - } - - // Recreate lightning network topology. Initialize router with channel - // between two nodes. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) - if err != nil { - t.Fatalf("unable to process :%v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey) - if err != nil { - t.Fatalf("unable to process :%v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel update announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1, localKey) - if err != nil { - t.Fatalf("unable to process :%v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("node announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - // The local ChannelUpdate should now be sent directly to the remote peer, - // such that the edge can be used for routing, regardless if this channel - // is announced or not (private channel). - select { - case msg := <-sentMsgs: - assertMessage(t, batch.chanUpdAnn1, msg) - case <-time.After(1 * time.Second): - t.Fatal("gossiper did not send channel update to peer") - } - - // At this point the remote ChannelUpdate we received earlier should - // be reprocessed, as we now have the necessary edge entry in the graph. - select { - case err := <-errRemoteAnn: - if err != nil { - t.Fatalf("error re-processing remote update: %v", err) - } - case <-time.After(2 * trickleDelay): - t.Fatalf("remote update was not processed") - } - - // Check that the ChannelEdgePolicy was added to the graph. - chanInfo, e1, e2, err = ctx.router.GetChannelByID( - batch.chanUpdAnn1.ShortChannelID, - ) - if err != nil { - t.Fatalf("unable to get channel from router: %v", err) - } - if chanInfo == nil { - t.Fatalf("chanInfo was nil") - } - if e1 == nil { - t.Fatalf("e1 was nil") - } - if e2 == nil { - t.Fatalf("e2 was nil") - } - - // Pretending that we receive local channel announcement from funding - // manager, thereby kick off the announcement exchange process. - err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.localProofAnn, localKey, - ) - if err != nil { - t.Fatalf("unable to process :%v", err) - } - - select { - case <-ctx.broadcastedMessage: - t.Fatal("announcements were broadcast") - case <-time.After(2 * trickleDelay): - } - - number := 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( - func(*channeldb.WaitingProof) error { - number++ - return nil - }, - func() { - number = 0 - }, - ); err != nil { - t.Fatalf("unable to retrieve objects from store: %v", err) - } - - if number != 1 { - t.Fatal("wrong number of objects in storage") - } - - err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, - ) - if err != nil { - t.Fatalf("unable to process :%v", err) - } - - for i := 0; i < 4; i++ { - select { - case <-ctx.broadcastedMessage: - case <-time.After(time.Second): - t.Fatal("announcement wasn't broadcast") - } - } - - number = 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( - func(*channeldb.WaitingProof) error { - number++ - return nil - }, - func() { - number = 0 - }, - ); err != nil && err != channeldb.ErrWaitingProofNotFound { - t.Fatalf("unable to retrieve objects from store: %v", err) - } - - if number != 0 { - t.Fatal("waiting proof should be removed from storage") - } + // Re-process the channel announcement and update. Both should be + // applied to the graph and broadcast. + processAnnouncement(batch.remoteChanAnn, false, false) + processAnnouncement(batch.chanUpdAnn2, false, false) } // TestExtraDataChannelAnnouncementValidation tests that we're able to properly