peer: send channel reestablish message for borked channels

When loading active channels for a connected peer, we gather channel
sync messages for all borked channels, and send them to the peer. This
should help a peer realize that the state is irreconcible, as we have
already realized.
This commit is contained in:
Johan T. Halseth 2019-09-06 13:14:38 +02:00
parent eb1b84c0b4
commit 1974bfa4cf
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

56
peer.go

@ -349,7 +349,8 @@ func (p *peer) Start() error {
peerLog.Debugf("Loaded %v active channels from database with "+ peerLog.Debugf("Loaded %v active channels from database with "+
"NodeKey(%x)", len(activeChans), p.PubKey()) "NodeKey(%x)", len(activeChans), p.PubKey())
if err := p.loadActiveChannels(activeChans); err != nil { msgs, err := p.loadActiveChannels(activeChans)
if err != nil {
return fmt.Errorf("unable to load channels: %v", err) return fmt.Errorf("unable to load channels: %v", err)
} }
@ -362,6 +363,17 @@ func (p *peer) Start() error {
go p.channelManager() go p.channelManager()
go p.pingHandler() go p.pingHandler()
// Now that the peer has started up, we send any channel sync messages
// that must be resent for borked channels.
if len(msgs) > 0 {
peerLog.Infof("Sending %d channel sync messages to peer after "+
"loading active channels", len(msgs))
if err := p.SendMessage(true, msgs...); err != nil {
peerLog.Warnf("Failed sending channel sync "+
"messages to peer %v: %v", p, err)
}
}
return nil return nil
} }
@ -400,14 +412,22 @@ func (p *peer) QuitSignal() <-chan struct{} {
} }
// loadActiveChannels creates indexes within the peer for tracking all active // loadActiveChannels creates indexes within the peer for tracking all active
// channels returned by the database. // channels returned by the database. It returns a slice of channel reestablish
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { // messages that should be sent to the peer immediately, in case we have borked
// channels that haven't been closed yet.
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
[]lnwire.Message, error) {
// Return a slice of messages to send to the peers in case the channel
// cannot be loaded normally.
var msgs []lnwire.Message
for _, dbChan := range chans { for _, dbChan := range chans {
lnChan, err := lnwallet.NewLightningChannel( lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, dbChan, p.server.sigPool, p.server.cc.signer, dbChan, p.server.sigPool,
) )
if err != nil { if err != nil {
return err return nil, err
} }
chanPoint := &dbChan.FundingOutpoint chanPoint := &dbChan.FundingOutpoint
@ -427,6 +447,22 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
case dbChan.HasChanStatus(channeldb.ChanStatusLocalDataLoss): case dbChan.HasChanStatus(channeldb.ChanStatusLocalDataLoss):
peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+ peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+
"start.", chanPoint, dbChan.ChanStatus()) "start.", chanPoint, dbChan.ChanStatus())
// To help our peer recover from a potential data loss,
// we resend our channel reestablish message if the
// channel is in a borked state. We won't process any
// channel reestablish message sent from the peer, but
// that's okay since the assumption is that we did when
// marking the channel borked.
chanSync, err := dbChan.ChanSyncMsg(false)
if err != nil {
peerLog.Errorf("Unable to create channel "+
"reestablish message for channel %v: "+
"%v", chanPoint, err)
continue
}
msgs = append(msgs, chanSync)
continue continue
} }
@ -440,7 +476,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
_, currentHeight, err := p.server.cc.chainIO.GetBestBlock() _, currentHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil { if err != nil {
return err return nil, err
} }
// Before we register this new link with the HTLC Switch, we'll // Before we register this new link with the HTLC Switch, we'll
@ -449,7 +485,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
graph := p.server.chanDB.ChannelGraph() graph := p.server.chanDB.ChannelGraph()
info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint) info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint)
if err != nil && err != channeldb.ErrEdgeNotFound { if err != nil && err != channeldb.ErrEdgeNotFound {
return err return nil, err
} }
// We'll filter out our policy from the directional channel // We'll filter out our policy from the directional channel
@ -497,7 +533,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
*chanPoint, *chanPoint,
) )
if err != nil { if err != nil {
return err return nil, err
} }
// Create the link and add it to the switch. // Create the link and add it to the switch.
@ -506,8 +542,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
currentHeight, true, currentHeight, true,
) )
if err != nil { if err != nil {
return fmt.Errorf("unable to add link %v to switch: %v", return nil, fmt.Errorf("unable to add link %v to "+
chanPoint, err) "switch: %v", chanPoint, err)
} }
p.activeChanMtx.Lock() p.activeChanMtx.Lock()
@ -515,7 +551,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()
} }
return nil return msgs, nil
} }
// addLink creates and adds a new link from the specified channel. // addLink creates and adds a new link from the specified channel.