From 0bc415c68358550dc485f5ff71b91fa248a0e409 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 20 Aug 2018 14:28:10 +0200 Subject: [PATCH] discovery/gossiper: don't return on errChan for ChannelUpdate not yet processed Previosuly we would immediately return nil on the error channel for premature ChannelUpdates, which would break the expection that a a returned non-error meant the update was successfully added to the database. This meant that the caller would believe the update was added to the database, while it is actually still in volatile memory and can be lost during restarts. This change makes us handle premature ChannelUpdates as we handle other premature announcements within the gossiper, by deferring sending on the error channel until we have reprocessed the update. --- discovery/gossiper.go | 50 +++++++++++++++++++++----------------- discovery/gossiper_test.go | 15 +++++++++--- 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 10ca18de..7f776f79 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1817,35 +1817,33 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // ensure we don't block here, as we can handle only one // announcement at a time. for _, cu := range channelUpdates { + d.wg.Add(1) go func(nMsg *networkMsg) { + defer d.wg.Done() + switch msg := nMsg.msg.(type) { + // Reprocess the message, making sure we return + // an error to the original caller in case the + // gossiper shuts down. case *lnwire.ChannelUpdate: - // We can safely wait for the error to - // be returned, as in case of shutdown, - // the gossiper will return an error. - var err error - if nMsg.isRemote { - err = <-d.ProcessRemoteAnnouncement( - msg, nMsg.peer) - } else { - err = <-d.ProcessLocalAnnouncement( - msg, nMsg.source) - } - if err != nil { - log.Errorf("Failed reprocessing"+ - " ChannelUpdate for "+ - "shortChanID=%v: %v", - msg.ShortChannelID.ToUint64(), - err) - return + log.Debugf("Reprocessing"+ + " ChannelUpdate for "+ + "shortChanID=%v", + msg.ShortChannelID.ToUint64()) + + select { + case d.networkMsgs <- nMsg: + case <-d.quit: + nMsg.err <- ErrGossiperShuttingDown } // We don't expect any other message type than // ChannelUpdate to be in this map. default: log.Errorf("Unsupported message type "+ - "found among ChannelUpdates: %T", msg) + "found among ChannelUpdates: "+ + "%T", msg) } }(cu) } @@ -1963,19 +1961,27 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // If the node supports it, we may try to // request the chan ann from it. + d.wg.Add(1) go func() { + defer d.wg.Done() + reqErr := d.maybeRequestChanAnn( msg.ShortChannelID, ) if reqErr != nil { - log.Errorf("unable to request ann "+ - "for chan_id=%v: %v", shortChanID, + log.Errorf("unable to request "+ + "ann for chan_id=%v: "+ + "%v", shortChanID, reqErr) } }() - nMsg.err <- nil + // NOTE: We don't return anything on the error + // channel for this message, as we expect that + // will be done when this ChannelUpdate is + // later reprocessed. return nil + default: err := errors.Errorf("unable to validate "+ "channel update short_chan_id=%v: %v", diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index dc0ad4f7..7d98c558 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -2005,10 +2005,8 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Recreate the case where the remote node is sending us its ChannelUpdate // before we have been able to process our own ChannelAnnouncement and // ChannelUpdate. - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer) - if err != nil { - t.Fatalf("unable to process :%v", err) - } + errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer) + select { case <-ctx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") @@ -2069,6 +2067,15 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // At this point the remote ChannelUpdate we received earlier should // be reprocessed, as we now have the necessary edge entry in the graph. + select { + case err := <-errRemoteAnn: + if err != nil { + t.Fatalf("error re-processing remote update: %v", err) + } + case <-time.After(2 * trickleDelay): + t.Fatalf("remote update was not processed") + } + // Check that the ChannelEdgePolicy was added to the graph. chanInfo, e1, e2, err = ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID) if err != nil {