discovery: reprocess premature ChannelUpdates

This commit makes the gossiper store received ChannelUpdates
that is not for any known channel in a map, such that they
can be reprocessed when the ChannelAnnouncement arrives.

This is done to handle the case where we receive a ChannelUpdate
from our channel counterparty before we have been able to process
our own local ChannelAnnouncement.
This commit is contained in:
Johan T. Halseth 2017-12-14 17:52:41 +01:00 committed by Olaoluwa Osuntokun
parent 8e120d1e62
commit db829cd0c5

@ -139,6 +139,13 @@ type AuthenticatedGossiper struct {
// TODO(roasbeef): limit premature networkMsgs to N
prematureAnnouncements map[uint32][]*networkMsg
// prematureChannelUpdates is a map of ChannelUpdates we have
// received that wasn't associated with any channel we know about.
// We store them temporarily, such that we can reprocess them when
// a ChannelAnnouncement for the channel is received.
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
// waitingProofs is a persistent storage of partial channel proof
// announcement messages. We use it to buffer half of the material
// needed to reconstruct a full authenticated channel announcement. Once
@ -174,13 +181,14 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
}
return &AuthenticatedGossiper{
selfKey: selfKey,
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
feeUpdates: make(chan *feeUpdateRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
waitingProofs: storage,
selfKey: selfKey,
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
feeUpdates: make(chan *feeUpdateRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
waitingProofs: storage,
}, nil
}
@ -1013,6 +1021,60 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
return nil
}
// If we earlier received any ChannelUpdates for this channel,
// we can now process them, as the channel is added to the
// graph.
shortChanID := msg.ShortChannelID.ToUint64()
var channelUpdates []*networkMsg
d.pChanUpdMtx.Lock()
for _, cu := range d.prematureChannelUpdates[shortChanID] {
channelUpdates = append(channelUpdates, cu)
}
// Now delete the premature ChannelUpdates, since we added them
// all to the queue of network messages.
delete(d.prematureChannelUpdates, shortChanID)
d.pChanUpdMtx.Unlock()
// Launch a new goroutine to handle each ChannelUpdate,
// this to ensure we don't block here, as we can handle
// only one announcement at a time.
for _, cu := range channelUpdates {
go func(nMsg *networkMsg) {
switch msg := nMsg.msg.(type) {
case *lnwire.ChannelUpdate:
// We can safely wait for the error to
// be returned, as in case of shutdown,
// the gossiper will return an error.
var err error
if nMsg.isRemote {
err = <-d.ProcessRemoteAnnouncement(
msg, nMsg.peer)
} else {
err = <-d.ProcessLocalAnnouncement(
msg, nMsg.peer)
}
if err != nil {
log.Errorf("Failed reprocessing"+
" ChannelUpdate for "+
"shortChanID=%v: %v",
msg.ShortChannelID.ToUint64(),
err)
return
}
// We don't expect any other message type
// than ChannelUpdate to be in this map.
default:
log.Errorf("Unsupported message type "+
"found among ChannelUpdates: %T", msg)
}
}(cu)
}
// Channel announcement was successfully proceeded and know it
// might be broadcast to other connected nodes if it was
// announcement with proof (remote).
@ -1063,12 +1125,44 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
// verify message signature.
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil {
err := errors.Errorf("unable to validate "+
"channel update short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
switch err {
case channeldb.ErrGraphNotFound:
fallthrough
case channeldb.ErrGraphNoEdgesFound:
fallthrough
case channeldb.ErrEdgeNotFound:
// If the edge corresponding to this
// ChannelUpdate was not found in the graph,
// this might be a channel in the process of
// being opened, and we haven't processed our
// own ChannelAnnouncement yet, hence it is not
// found in the graph. This usually gets
// resolved after the channel proofs are
// exchanged and the channel is broadcasted to
// the rest of the network, but in case this
// is a private channel this won't ever happen.
// Because of this, we temporarily add it to a
// map, and reprocess it after our own
// ChannelAnnouncement has been processed.
d.pChanUpdMtx.Lock()
d.prematureChannelUpdates[shortChanID] = append(
d.prematureChannelUpdates[shortChanID],
nMsg)
d.pChanUpdMtx.Unlock()
log.Infof("Got ChannelUpdate for edge not "+
"found in graph(shortChanID=%v), "+
"saving for reprocessing later",
shortChanID)
nMsg.err <- nil
return nil
default:
err := errors.Errorf("unable to validate "+
"channel update short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
}
}
// The least-significant bit in the flag on the channel update
@ -1117,11 +1211,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
return nil
}
// If this is a local ChannelUpdate without an AuthProof, it means
// it is an update to a channel that is not (yet) supposed to be
// announced to the greater network. However, our channel counter
// party will need to be given the update, so we'll try sending
// the update directly to the remote peer.
// If this is a local ChannelUpdate without an AuthProof, it
// means it is an update to a channel that is not (yet)
// supposed to be announced to the greater network. However,
// our channel counter party will need to be given the update,
// so we'll try sending the update directly to the remote peer.
if !nMsg.isRemote && chanInfo.AuthProof == nil {
// Get our peer's public key.
var remotePeer *btcec.PublicKey