discovery/gossiper: don't return on errChan for ChannelUpdate not yet processed
Previosuly we would immediately return nil on the error channel for premature ChannelUpdates, which would break the expection that a a returned non-error meant the update was successfully added to the database. This meant that the caller would believe the update was added to the database, while it is actually still in volatile memory and can be lost during restarts. This change makes us handle premature ChannelUpdates as we handle other premature announcements within the gossiper, by deferring sending on the error channel until we have reprocessed the update.
This commit is contained in:
parent
7bbb2bbc80
commit
0bc415c683
@ -1817,35 +1817,33 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
// ensure we don't block here, as we can handle only one
|
// ensure we don't block here, as we can handle only one
|
||||||
// announcement at a time.
|
// announcement at a time.
|
||||||
for _, cu := range channelUpdates {
|
for _, cu := range channelUpdates {
|
||||||
|
d.wg.Add(1)
|
||||||
go func(nMsg *networkMsg) {
|
go func(nMsg *networkMsg) {
|
||||||
|
defer d.wg.Done()
|
||||||
|
|
||||||
switch msg := nMsg.msg.(type) {
|
switch msg := nMsg.msg.(type) {
|
||||||
|
|
||||||
|
// Reprocess the message, making sure we return
|
||||||
|
// an error to the original caller in case the
|
||||||
|
// gossiper shuts down.
|
||||||
case *lnwire.ChannelUpdate:
|
case *lnwire.ChannelUpdate:
|
||||||
// We can safely wait for the error to
|
log.Debugf("Reprocessing"+
|
||||||
// be returned, as in case of shutdown,
|
" ChannelUpdate for "+
|
||||||
// the gossiper will return an error.
|
"shortChanID=%v",
|
||||||
var err error
|
msg.ShortChannelID.ToUint64())
|
||||||
if nMsg.isRemote {
|
|
||||||
err = <-d.ProcessRemoteAnnouncement(
|
select {
|
||||||
msg, nMsg.peer)
|
case d.networkMsgs <- nMsg:
|
||||||
} else {
|
case <-d.quit:
|
||||||
err = <-d.ProcessLocalAnnouncement(
|
nMsg.err <- ErrGossiperShuttingDown
|
||||||
msg, nMsg.source)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Failed reprocessing"+
|
|
||||||
" ChannelUpdate for "+
|
|
||||||
"shortChanID=%v: %v",
|
|
||||||
msg.ShortChannelID.ToUint64(),
|
|
||||||
err)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't expect any other message type than
|
// We don't expect any other message type than
|
||||||
// ChannelUpdate to be in this map.
|
// ChannelUpdate to be in this map.
|
||||||
default:
|
default:
|
||||||
log.Errorf("Unsupported message type "+
|
log.Errorf("Unsupported message type "+
|
||||||
"found among ChannelUpdates: %T", msg)
|
"found among ChannelUpdates: "+
|
||||||
|
"%T", msg)
|
||||||
}
|
}
|
||||||
}(cu)
|
}(cu)
|
||||||
}
|
}
|
||||||
@ -1963,19 +1961,27 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
|
|
||||||
// If the node supports it, we may try to
|
// If the node supports it, we may try to
|
||||||
// request the chan ann from it.
|
// request the chan ann from it.
|
||||||
|
d.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer d.wg.Done()
|
||||||
|
|
||||||
reqErr := d.maybeRequestChanAnn(
|
reqErr := d.maybeRequestChanAnn(
|
||||||
msg.ShortChannelID,
|
msg.ShortChannelID,
|
||||||
)
|
)
|
||||||
if reqErr != nil {
|
if reqErr != nil {
|
||||||
log.Errorf("unable to request ann "+
|
log.Errorf("unable to request "+
|
||||||
"for chan_id=%v: %v", shortChanID,
|
"ann for chan_id=%v: "+
|
||||||
|
"%v", shortChanID,
|
||||||
reqErr)
|
reqErr)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
nMsg.err <- nil
|
// NOTE: We don't return anything on the error
|
||||||
|
// channel for this message, as we expect that
|
||||||
|
// will be done when this ChannelUpdate is
|
||||||
|
// later reprocessed.
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
err := errors.Errorf("unable to validate "+
|
err := errors.Errorf("unable to validate "+
|
||||||
"channel update short_chan_id=%v: %v",
|
"channel update short_chan_id=%v: %v",
|
||||||
|
@ -2005,10 +2005,8 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
|
|||||||
// Recreate the case where the remote node is sending us its ChannelUpdate
|
// Recreate the case where the remote node is sending us its ChannelUpdate
|
||||||
// before we have been able to process our own ChannelAnnouncement and
|
// before we have been able to process our own ChannelAnnouncement and
|
||||||
// ChannelUpdate.
|
// ChannelUpdate.
|
||||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer)
|
errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unable to process :%v", err)
|
|
||||||
}
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.broadcastedMessage:
|
case <-ctx.broadcastedMessage:
|
||||||
t.Fatal("channel update announcement was broadcast")
|
t.Fatal("channel update announcement was broadcast")
|
||||||
@ -2069,6 +2067,15 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
|
|||||||
|
|
||||||
// At this point the remote ChannelUpdate we received earlier should
|
// At this point the remote ChannelUpdate we received earlier should
|
||||||
// be reprocessed, as we now have the necessary edge entry in the graph.
|
// be reprocessed, as we now have the necessary edge entry in the graph.
|
||||||
|
select {
|
||||||
|
case err := <-errRemoteAnn:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error re-processing remote update: %v", err)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * trickleDelay):
|
||||||
|
t.Fatalf("remote update was not processed")
|
||||||
|
}
|
||||||
|
|
||||||
// Check that the ChannelEdgePolicy was added to the graph.
|
// Check that the ChannelEdgePolicy was added to the graph.
|
||||||
chanInfo, e1, e2, err = ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID)
|
chanInfo, e1, e2, err = ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user