diff --git a/peer.go b/peer.go index 247daab7..33639a9b 100644 --- a/peer.go +++ b/peer.go @@ -349,7 +349,8 @@ func (p *peer) Start() error { peerLog.Debugf("Loaded %v active channels from database with "+ "NodeKey(%x)", len(activeChans), p.PubKey()) - if err := p.loadActiveChannels(activeChans); err != nil { + msgs, err := p.loadActiveChannels(activeChans) + if err != nil { return fmt.Errorf("unable to load channels: %v", err) } @@ -362,6 +363,17 @@ func (p *peer) Start() error { go p.channelManager() go p.pingHandler() + // Now that the peer has started up, we send any channel sync messages + // that must be resent for borked channels. + if len(msgs) > 0 { + peerLog.Infof("Sending %d channel sync messages to peer after "+ + "loading active channels", len(msgs)) + if err := p.SendMessage(true, msgs...); err != nil { + peerLog.Warnf("Failed sending channel sync "+ + "messages to peer %v: %v", p, err) + } + } + return nil } @@ -400,14 +412,22 @@ func (p *peer) QuitSignal() <-chan struct{} { } // loadActiveChannels creates indexes within the peer for tracking all active -// channels returned by the database. -func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { +// channels returned by the database. It returns a slice of channel reestablish +// messages that should be sent to the peer immediately, in case we have borked +// channels that haven't been closed yet. +func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( + []lnwire.Message, error) { + + // Return a slice of messages to send to the peers in case the channel + // cannot be loaded normally. + var msgs []lnwire.Message + for _, dbChan := range chans { lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, dbChan, p.server.sigPool, ) if err != nil { - return err + return nil, err } chanPoint := &dbChan.FundingOutpoint @@ -427,6 +447,22 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { case dbChan.HasChanStatus(channeldb.ChanStatusLocalDataLoss): peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+ "start.", chanPoint, dbChan.ChanStatus()) + + // To help our peer recover from a potential data loss, + // we resend our channel reestablish message if the + // channel is in a borked state. We won't process any + // channel reestablish message sent from the peer, but + // that's okay since the assumption is that we did when + // marking the channel borked. + chanSync, err := dbChan.ChanSyncMsg(false) + if err != nil { + peerLog.Errorf("Unable to create channel "+ + "reestablish message for channel %v: "+ + "%v", chanPoint, err) + continue + } + + msgs = append(msgs, chanSync) continue } @@ -440,7 +476,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { _, currentHeight, err := p.server.cc.chainIO.GetBestBlock() if err != nil { - return err + return nil, err } // Before we register this new link with the HTLC Switch, we'll @@ -449,7 +485,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { graph := p.server.chanDB.ChannelGraph() info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint) if err != nil && err != channeldb.ErrEdgeNotFound { - return err + return nil, err } // We'll filter out our policy from the directional channel @@ -497,7 +533,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { *chanPoint, ) if err != nil { - return err + return nil, err } // Create the link and add it to the switch. @@ -506,8 +542,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { currentHeight, true, ) if err != nil { - return fmt.Errorf("unable to add link %v to switch: %v", - chanPoint, err) + return nil, fmt.Errorf("unable to add link %v to "+ + "switch: %v", chanPoint, err) } p.activeChanMtx.Lock() @@ -515,7 +551,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { p.activeChanMtx.Unlock() } - return nil + return msgs, nil } // addLink creates and adds a new link from the specified channel.