diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 10ca18de..7f776f79 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1817,35 +1817,33 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // ensure we don't block here, as we can handle only one // announcement at a time. for _, cu := range channelUpdates { + d.wg.Add(1) go func(nMsg *networkMsg) { + defer d.wg.Done() + switch msg := nMsg.msg.(type) { + // Reprocess the message, making sure we return + // an error to the original caller in case the + // gossiper shuts down. case *lnwire.ChannelUpdate: - // We can safely wait for the error to - // be returned, as in case of shutdown, - // the gossiper will return an error. - var err error - if nMsg.isRemote { - err = <-d.ProcessRemoteAnnouncement( - msg, nMsg.peer) - } else { - err = <-d.ProcessLocalAnnouncement( - msg, nMsg.source) - } - if err != nil { - log.Errorf("Failed reprocessing"+ - " ChannelUpdate for "+ - "shortChanID=%v: %v", - msg.ShortChannelID.ToUint64(), - err) - return + log.Debugf("Reprocessing"+ + " ChannelUpdate for "+ + "shortChanID=%v", + msg.ShortChannelID.ToUint64()) + + select { + case d.networkMsgs <- nMsg: + case <-d.quit: + nMsg.err <- ErrGossiperShuttingDown } // We don't expect any other message type than // ChannelUpdate to be in this map. default: log.Errorf("Unsupported message type "+ - "found among ChannelUpdates: %T", msg) + "found among ChannelUpdates: "+ + "%T", msg) } }(cu) } @@ -1963,19 +1961,27 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // If the node supports it, we may try to // request the chan ann from it. + d.wg.Add(1) go func() { + defer d.wg.Done() + reqErr := d.maybeRequestChanAnn( msg.ShortChannelID, ) if reqErr != nil { - log.Errorf("unable to request ann "+ - "for chan_id=%v: %v", shortChanID, + log.Errorf("unable to request "+ + "ann for chan_id=%v: "+ + "%v", shortChanID, reqErr) } }() - nMsg.err <- nil + // NOTE: We don't return anything on the error + // channel for this message, as we expect that + // will be done when this ChannelUpdate is + // later reprocessed. return nil + default: err := errors.Errorf("unable to validate "+ "channel update short_chan_id=%v: %v", diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index dc0ad4f7..7d98c558 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -2005,10 +2005,8 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Recreate the case where the remote node is sending us its ChannelUpdate // before we have been able to process our own ChannelAnnouncement and // ChannelUpdate. - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer) - if err != nil { - t.Fatalf("unable to process :%v", err) - } + errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer) + select { case <-ctx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") @@ -2069,6 +2067,15 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // At this point the remote ChannelUpdate we received earlier should // be reprocessed, as we now have the necessary edge entry in the graph. + select { + case err := <-errRemoteAnn: + if err != nil { + t.Fatalf("error re-processing remote update: %v", err) + } + case <-time.After(2 * trickleDelay): + t.Fatalf("remote update was not processed") + } + // Check that the ChannelEdgePolicy was added to the graph. chanInfo, e1, e2, err = ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID) if err != nil {