diff --git a/chancloser.go b/chancloser.go index be8395ce..3c015a5b 100644 --- a/chancloser.go +++ b/chancloser.go @@ -435,6 +435,14 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b "close: %v", c.chanPoint, err) } + // Before publishing the closing tx, we persist it to the + // database, such that it can be republished if something goes + // wrong. + err = c.cfg.channel.MarkCommitmentBroadcasted(closeTx) + if err != nil { + return nil, false, err + } + // With the closing transaction crafted, we'll now broadcast it // to the network. peerLog.Infof("Broadcasting cooperative close tx: %v", @@ -444,9 +452,6 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b if err := c.cfg.broadcastTx(closeTx); err != nil { return nil, false, err } - if err := c.cfg.channel.MarkCommitmentBroadcasted(); err != nil { - return nil, false, err - } // Finally, we'll transition to the closeFinished state, and // also return the final close signed message we sent. diff --git a/channeldb/channel.go b/channeldb/channel.go index 2b1ec949..8b5d5253 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" @@ -58,6 +59,10 @@ var ( // remote peer during a channel sync in case we have lost channel state. dataLossCommitPointKey = []byte("data-loss-commit-point-key") + // closingTxKey points to a the closing tx that we broadcasted when + // moving the channel to state CommitBroadcasted. + closingTxKey = []byte("closing-tx-key") + // commitDiffKey stores the current pending commitment state we've // extended to the remote party (if any). Each time we propose a new // state, we store the information necessary to reconstruct this state @@ -103,6 +108,10 @@ var ( // in the database. ErrNoCommitPoint = fmt.Errorf("no commit point found") + // ErrNoCloseTx is returned when no closing tx is found for a channel + // in the state CommitBroadcasted. + ErrNoCloseTx = fmt.Errorf("no closing tx found") + // ErrNoRestoredChannelMutation is returned when a caller attempts to // mutate a channel that's been recovered. ErrNoRestoredChannelMutation = fmt.Errorf("cannot mutate restored " + @@ -514,16 +523,6 @@ type OpenChannel struct { sync.RWMutex } -// FullSync serializes, and writes to disk the *full* channel state, using -// both the active channel bucket to store the prefixed column fields, and the -// remote node's ID to store the remainder of the channel state. -func (c *OpenChannel) FullSync() error { - c.Lock() - defer c.Unlock() - - return c.Db.Update(c.fullSync) -} - // ShortChanID returns the current ShortChannelID of this channel. func (c *OpenChannel) ShortChanID() lnwire.ShortChannelID { c.RLock() @@ -648,9 +647,8 @@ func fetchChanBucket(tx *bbolt.Tx, nodeKey *btcec.PublicKey, return chanBucket, nil } -// fullSync is an internal version of the FullSync method which allows callers -// to sync the contents of an OpenChannel while re-using an existing database -// transaction. +// fullSync syncs the contents of an OpenChannel while re-using an existing +// database transaction. func (c *OpenChannel) fullSync(tx *bbolt.Tx) error { // First fetch the top level bucket which stores all data related to // current, active channels. @@ -736,44 +734,16 @@ func (c *OpenChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error { c.Lock() defer c.Unlock() - var status ChannelStatus - if err := c.Db.Update(func(tx *bbolt.Tx) error { - chanBucket, err := fetchChanBucket( - tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, - ) - if err != nil { - return err - } - - channel, err := fetchOpenChannel(chanBucket, &c.FundingOutpoint) - if err != nil { - return err - } - - // Add status LocalDataLoss to the existing bitvector found in - // the DB. - status = channel.chanStatus | ChanStatusLocalDataLoss - channel.chanStatus = status - - var b bytes.Buffer - if err := WriteElement(&b, commitPoint); err != nil { - return err - } - - err = chanBucket.Put(dataLossCommitPointKey, b.Bytes()) - if err != nil { - return err - } - - return putOpenChannel(chanBucket, channel) - }); err != nil { + var b bytes.Buffer + if err := WriteElement(&b, commitPoint); err != nil { return err } - // Update the in-memory representation to keep it in sync with the DB. - c.chanStatus = status + putCommitPoint := func(chanBucket *bbolt.Bucket) error { + return chanBucket.Put(dataLossCommitPointKey, b.Bytes()) + } - return nil + return c.putChanStatus(ChanStatusLocalDataLoss, putCommitPoint) } // DataLossCommitPoint retrieves the stored commit point set during @@ -821,6 +791,82 @@ func (c *OpenChannel) MarkBorked() error { return c.putChanStatus(ChanStatusBorked) } +// ChanSyncMsg returns the ChannelReestablish message that should be sent upon +// reconnection with the remote peer that we're maintaining this channel with. +// The information contained within this message is necessary to re-sync our +// commitment chains in the case of a last or only partially processed message. +// When the remote party receiver this message one of three things may happen: +// +// 1. We're fully synced and no messages need to be sent. +// 2. We didn't get the last CommitSig message they sent, to they'll re-send +// it. +// 3. We didn't get the last RevokeAndAck message they sent, so they'll +// re-send it. +// +// If this is a restored channel, having status ChanStatusRestored, then we'll +// modify our typical chan sync message to ensure they force close even if +// we're on the very first state. +func (c *OpenChannel) ChanSyncMsg() (*lnwire.ChannelReestablish, error) { + c.Lock() + defer c.Unlock() + + // The remote commitment height that we'll send in the + // ChannelReestablish message is our current commitment height plus + // one. If the receiver thinks that our commitment height is actually + // *equal* to this value, then they'll re-send the last commitment that + // they sent but we never fully processed. + localHeight := c.LocalCommitment.CommitHeight + nextLocalCommitHeight := localHeight + 1 + + // The second value we'll send is the height of the remote commitment + // from our PoV. If the receiver thinks that their height is actually + // *one plus* this value, then they'll re-send their last revocation. + remoteChainTipHeight := c.RemoteCommitment.CommitHeight + + // If this channel has undergone a commitment update, then in order to + // prove to the remote party our knowledge of their prior commitment + // state, we'll also send over the last commitment secret that the + // remote party sent. + var lastCommitSecret [32]byte + if remoteChainTipHeight != 0 { + remoteSecret, err := c.RevocationStore.LookUp( + remoteChainTipHeight - 1, + ) + if err != nil { + return nil, err + } + lastCommitSecret = [32]byte(*remoteSecret) + } + + // Additionally, we'll send over the current unrevoked commitment on + // our local commitment transaction. + currentCommitSecret, err := c.RevocationProducer.AtIndex( + localHeight, + ) + if err != nil { + return nil, err + } + + // If we've restored this channel, then we'll purposefully give them an + // invalid LocalUnrevokedCommitPoint so they'll force close the channel + // allowing us to sweep our funds. + if c.hasChanStatus(ChanStatusRestored) { + currentCommitSecret[0] ^= 1 + } + + return &lnwire.ChannelReestablish{ + ChanID: lnwire.NewChanIDFromOutPoint( + &c.FundingOutpoint, + ), + NextLocalCommitHeight: nextLocalCommitHeight, + RemoteCommitTailHeight: remoteChainTipHeight, + LastRemoteCommitSecret: lastCommitSecret, + LocalUnrevokedCommitPoint: input.ComputeCommitmentPoint( + currentCommitSecret[:], + ), + }, nil +} + // isBorked returns true if the channel has been marked as borked in the // database. This requires an existing database transaction to already be // active. @@ -837,15 +883,63 @@ func (c *OpenChannel) isBorked(chanBucket *bbolt.Bucket) (bool, error) { // MarkCommitmentBroadcasted marks the channel as a commitment transaction has // been broadcast, either our own or the remote, and we should watch the chain -// for it to confirm before taking any further action. -func (c *OpenChannel) MarkCommitmentBroadcasted() error { +// for it to confirm before taking any further action. It takes as argument the +// closing tx _we believe_ will appear in the chain. This is only used to +// republish this tx at startup to ensure propagation, and we should still +// handle the case where a different tx actually hits the chain. +func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx) error { c.Lock() defer c.Unlock() - return c.putChanStatus(ChanStatusCommitBroadcasted) + var b bytes.Buffer + if err := WriteElement(&b, closeTx); err != nil { + return err + } + + putClosingTx := func(chanBucket *bbolt.Bucket) error { + return chanBucket.Put(closingTxKey, b.Bytes()) + } + + return c.putChanStatus(ChanStatusCommitBroadcasted, putClosingTx) } -func (c *OpenChannel) putChanStatus(status ChannelStatus) error { +// BroadcastedCommitment retrieves the stored closing tx set during +// MarkCommitmentBroadcasted. If not found ErrNoCloseTx is returned. +func (c *OpenChannel) BroadcastedCommitment() (*wire.MsgTx, error) { + var closeTx *wire.MsgTx + + err := c.Db.View(func(tx *bbolt.Tx) error { + chanBucket, err := fetchChanBucket( + tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, + ) + switch err { + case nil: + case ErrNoChanDBExists, ErrNoActiveChannels, ErrChannelNotFound: + return ErrNoCloseTx + default: + return err + } + + bs := chanBucket.Get(closingTxKey) + if bs == nil { + return ErrNoCloseTx + } + r := bytes.NewReader(bs) + return ReadElement(r, &closeTx) + }) + if err != nil { + return nil, err + } + + return closeTx, nil +} + +// putChanStatus appends the given status to the channel. fs is an optional +// list of closures that are given the chanBucket in order to atomically add +// extra information together with the new status. +func (c *OpenChannel) putChanStatus(status ChannelStatus, + fs ...func(*bbolt.Bucket) error) error { + if err := c.Db.Update(func(tx *bbolt.Tx) error { chanBucket, err := fetchChanBucket( tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, @@ -863,7 +957,17 @@ func (c *OpenChannel) putChanStatus(status ChannelStatus) error { status = channel.chanStatus | status channel.chanStatus = status - return putOpenChannel(chanBucket, channel) + if err := putOpenChannel(chanBucket, channel); err != nil { + return err + } + + for _, f := range fs { + if err := f(chanBucket); err != nil { + return err + } + } + + return nil }); err != nil { return err } diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 770f6f35..21bb738c 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -263,7 +263,12 @@ func TestOpenChannelPutGetDelete(t *testing.T) { OnionBlob: []byte("onionblob"), }, } - if err := state.FullSync(); err != nil { + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := state.SyncPending(addr, 101); err != nil { t.Fatalf("unable to save and serialize channel state: %v", err) } @@ -363,7 +368,12 @@ func TestChannelStateTransition(t *testing.T) { if err != nil { t.Fatalf("unable to create channel state: %v", err) } - if err := channel.FullSync(); err != nil { + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := channel.SyncPending(addr, 101); err != nil { t.Fatalf("unable to save and serialize channel state: %v", err) } @@ -881,7 +891,13 @@ func TestFetchWaitingCloseChannels(t *testing.T) { // This would happen in the event of a force close and should make the // channels enter a state of waiting close. for _, channel := range channels { - if err := channel.MarkCommitmentBroadcasted(); err != nil { + closeTx := wire.NewMsgTx(2) + closeTx.AddTxIn( + &wire.TxIn{ + PreviousOutPoint: channel.FundingOutpoint, + }, + ) + if err := channel.MarkCommitmentBroadcasted(closeTx); err != nil { t.Fatalf("unable to mark commitment broadcast: %v", err) } } @@ -906,6 +922,19 @@ func TestFetchWaitingCloseChannels(t *testing.T) { t.Fatalf("expected channel %v to be waiting close", channel.FundingOutpoint) } + + // Finally, make sure we can retrieve the closing tx for the + // channel. + closeTx, err := channel.BroadcastedCommitment() + if err != nil { + t.Fatalf("Unable to retrieve commitment: %v", err) + } + + if closeTx.TxIn[0].PreviousOutPoint != channel.FundingOutpoint { + t.Fatalf("expected outpoint %v, got %v", + channel.FundingOutpoint, + closeTx.TxIn[0].PreviousOutPoint) + } } } diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 43b62619..198f6e2b 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -110,7 +110,12 @@ func TestFetchClosedChannelForID(t *testing.T) { for i := uint32(0); i < numChans; i++ { // Save the open channel to disk. state.FundingOutpoint.Index = i - if err := state.FullSync(); err != nil { + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := state.SyncPending(addr, 101); err != nil { t.Fatalf("unable to save and serialize channel "+ "state: %v", err) } diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index b3cd497d..44035b06 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -412,6 +412,36 @@ func (c *ChainArbitrator) Start() error { } c.activeChannels[chanPoint] = channelArb + + // If the channel has had its commitment broadcasted already, + // republish it in case it didn't propagate. + if !channel.HasChanStatus( + channeldb.ChanStatusCommitBroadcasted, + ) { + continue + } + + closeTx, err := channel.BroadcastedCommitment() + switch { + + // This can happen for channels that had their closing tx + // published before we started storing it to disk. + case err == channeldb.ErrNoCloseTx: + log.Warnf("Channel %v is in state CommitBroadcasted, "+ + "but no closing tx to re-publish...", chanPoint) + continue + + case err != nil: + return err + } + + log.Infof("Re-publishing closing tx(%v) for channel %v", + closeTx.TxHash(), chanPoint) + err = c.cfg.PublishTx(closeTx) + if err != nil && err != lnwallet.ErrDoubleSpend { + log.Warnf("Unable to broadcast close tx(%v): %v", + closeTx.TxHash(), err) + } } // In addition to the channels that we know to be open, we'll also diff --git a/contractcourt/chain_arbitrator_test.go b/contractcourt/chain_arbitrator_test.go index 236a3336..4ff3d064 100644 --- a/contractcourt/chain_arbitrator_test.go +++ b/contractcourt/chain_arbitrator_test.go @@ -1 +1,117 @@ package contractcourt + +import ( + "io/ioutil" + "net" + "os" + "testing" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwallet" +) + +// TestChainArbitratorRepulishCommitment testst that the chain arbitrator will +// republish closing transactions for channels marked CommitementBroadcast in +// the database at startup. +func TestChainArbitratorRepublishCommitment(t *testing.T) { + t.Parallel() + + tempPath, err := ioutil.TempDir("", "testdb") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempPath) + + db, err := channeldb.Open(tempPath) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // Create 10 test channels and sync them to the database. + const numChans = 10 + var channels []*channeldb.OpenChannel + for i := 0; i < numChans; i++ { + lChannel, _, cleanup, err := lnwallet.CreateTestChannels() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + channel := lChannel.State() + + // We manually set the db here to make sure all channels are + // synced to the same db. + channel.Db = db + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := channel.SyncPending(addr, 101); err != nil { + t.Fatal(err) + } + + channels = append(channels, channel) + } + + // Mark half of the channels as commitment broadcasted. + for i := 0; i < numChans/2; i++ { + closeTx := channels[i].FundingTxn.Copy() + closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint + err := channels[i].MarkCommitmentBroadcasted(closeTx) + if err != nil { + t.Fatal(err) + } + } + + // We keep track of the transactions published by the ChainArbitrator + // at startup. + published := make(map[chainhash.Hash]struct{}) + + chainArbCfg := ChainArbitratorConfig{ + ChainIO: &mockChainIO{}, + Notifier: &mockNotifier{}, + PublishTx: func(tx *wire.MsgTx) error { + published[tx.TxHash()] = struct{}{} + return nil + }, + } + chainArb := NewChainArbitrator( + chainArbCfg, db, + ) + + if err := chainArb.Start(); err != nil { + t.Fatal(err) + } + defer func() { + if err := chainArb.Stop(); err != nil { + t.Fatal(err) + } + }() + + // Half of the channels should have had their closing tx re-published. + if len(published) != numChans/2 { + t.Fatalf("expected %d re-published transactions, got %d", + numChans/2, len(published)) + } + + // And make sure the published transactions are correct, and unique. + for i := 0; i < numChans/2; i++ { + closeTx := channels[i].FundingTxn.Copy() + closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint + + _, ok := published[closeTx.TxHash()] + if !ok { + t.Fatalf("closing tx not re-published") + } + + delete(published, closeTx.TxHash()) + } + + if len(published) != 0 { + t.Fatalf("unexpected tx published") + } +} diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index 8b17e75a..3d13c922 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -731,10 +731,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet } // Attempt to add a channel sync message to the close summary. - chanSync, err := lnwallet.ChanSyncMsg( - c.cfg.chanState, - c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + chanSync, err := c.cfg.chanState.ChanSyncMsg() if err != nil { log.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", c.cfg.chanState.FundingOutpoint, err) @@ -811,10 +808,7 @@ func (c *chainWatcher) dispatchLocalForceClose( } // Attempt to add a channel sync message to the close summary. - chanSync, err := lnwallet.ChanSyncMsg( - c.cfg.chanState, - c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + chanSync, err := c.cfg.chanState.ChanSyncMsg() if err != nil { log.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", c.cfg.chanState.FundingOutpoint, err) @@ -998,10 +992,7 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail } // Attempt to add a channel sync message to the close summary. - chanSync, err := lnwallet.ChanSyncMsg( - c.cfg.chanState, - c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + chanSync, err := c.cfg.chanState.ChanSyncMsg() if err != nil { log.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", c.cfg.chanState.FundingOutpoint, err) diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index fbad1465..42593537 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -97,7 +97,7 @@ type ChannelArbitratorConfig struct { // MarkCommitmentBroadcasted should mark the channel as the commitment // being broadcast, and we are waiting for the commitment to confirm. - MarkCommitmentBroadcasted func() error + MarkCommitmentBroadcasted func(*wire.MsgTx) error // MarkChannelClosed marks the channel closed in the database, with the // passed close summary. After this method successfully returns we can @@ -821,6 +821,16 @@ func (c *ChannelArbitrator) stateStep( } closeTx = closeSummary.CloseTx + // Before publishing the transaction, we store it to the + // database, such that we can re-publish later in case it + // didn't propagate. + if err := c.cfg.MarkCommitmentBroadcasted(closeTx); err != nil { + log.Errorf("ChannelArbitrator(%v): unable to "+ + "mark commitment broadcasted: %v", + c.cfg.ChanPoint, err) + return StateError, closeTx, err + } + // With the close transaction in hand, broadcast the // transaction to the network, thereby entering the post // channel resolution state. @@ -840,12 +850,6 @@ func (c *ChannelArbitrator) stateStep( } } - if err := c.cfg.MarkCommitmentBroadcasted(); err != nil { - log.Errorf("ChannelArbitrator(%v): unable to "+ - "mark commitment broadcasted: %v", - c.cfg.ChanPoint, err) - } - // We go to the StateCommitmentBroadcasted state, where we'll // be waiting for the commitment to be confirmed. nextState = StateCommitmentBroadcasted diff --git a/contractcourt/channel_arbitrator_test.go b/contractcourt/channel_arbitrator_test.go index b785b02a..f98fe116 100644 --- a/contractcourt/channel_arbitrator_test.go +++ b/contractcourt/channel_arbitrator_test.go @@ -213,7 +213,7 @@ func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator, } return summary, nil }, - MarkCommitmentBroadcasted: func() error { + MarkCommitmentBroadcasted: func(_ *wire.MsgTx) error { return nil }, MarkChannelClosed: func(*channeldb.ChannelCloseSummary) error { diff --git a/htlcswitch/link.go b/htlcswitch/link.go index c4603d4b..351a7623 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -609,30 +609,12 @@ func (l *channelLink) syncChanStates() error { // side. Based on this message, the remote party will decide if they // need to retransmit any data or not. chanState := l.channel.State() - localChanSyncMsg, err := lnwallet.ChanSyncMsg( - chanState, - chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + localChanSyncMsg, err := chanState.ChanSyncMsg() if err != nil { return fmt.Errorf("unable to generate chan sync message for "+ "ChannelPoint(%v)", l.channel.ChannelPoint()) } - // If we have a restored channel, we'll delay sending our channel - // reestablish message briefly to ensure we first have a stable - // connection. Sending the message will cause the remote peer to force - // close the channel, which currently may not be resumed reliably if the - // connection is being torn down simultaneously. This delay can be - // removed after the force close is reliable, but in the meantime it - // improves the reliability of successfully closing out the channel. - if chanState.HasChanStatus(channeldb.ChanStatusRestored) { - select { - case <-time.After(5 * time.Second): - case <-l.quit: - return ErrLinkShuttingDown - } - } - if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil { return fmt.Errorf("Unable to send chan sync message for "+ "ChannelPoint(%v)", l.channel.ChannelPoint()) @@ -896,6 +878,11 @@ func (l *channelLink) htlcManager() { if l.cfg.SyncStates { err := l.syncChanStates() if err != nil { + log.Warnf("Error when syncing channel states: %v", err) + + errDataLoss, localDataLoss := + err.(*lnwallet.ErrCommitSyncLocalDataLoss) + switch { case err == ErrLinkShuttingDown: log.Debugf("unable to sync channel states, " + @@ -918,6 +905,12 @@ func (l *channelLink) htlcManager() { // what they sent us before. // TODO(halseth): ban peer? case err == lnwallet.ErrInvalidLocalUnrevokedCommitPoint: + // We'll fail the link and tell the peer to + // force close the channel. Note that the + // database state is not updated here, but will + // be updated when the close transaction is + // ready to avoid that we go down before + // storing the transaction in the db. l.fail( LinkFailureError{ code: ErrSyncError, @@ -931,13 +924,18 @@ func (l *channelLink) htlcManager() { // We have lost state and cannot safely force close the // channel. Fail the channel and wait for the remote to // hopefully force close it. The remote has sent us its - // latest unrevoked commitment point, that we stored in - // the database, that we can use to retrieve the funds - // when the remote closes the channel. - // TODO(halseth): mark this, such that we prevent - // channel from being force closed by the user or - // contractcourt etc. - case err == lnwallet.ErrCommitSyncLocalDataLoss: + // latest unrevoked commitment point, and we'll store + // it in the database, such that we can attempt to + // recover the funds if the remote force closes the + // channel. + case localDataLoss: + err := l.channel.MarkDataLoss( + errDataLoss.CommitPoint, + ) + if err != nil { + log.Errorf("Unable to mark channel "+ + "data loss: %v", err) + } // We determined the commit chains were not possible to // sync. We cautiously fail the channel, but don't @@ -945,6 +943,10 @@ func (l *channelLink) htlcManager() { // TODO(halseth): can we safely force close in any // cases where this error is returned? case err == lnwallet.ErrCannotSyncCommitChains: + if err := l.channel.MarkBorked(); err != nil { + log.Errorf("Unable to mark channel "+ + "borked: %v", err) + } // Other, unspecified error. default: diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index dfd4be60..6c26b2a0 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -8121,7 +8121,9 @@ func assertDLPExecuted(net *lntest.NetworkHarness, t *harnessTest, assertTxInBlock(t, block, forceClose) // Dave should sweep his funds immediately, as they are not timelocked. - daveSweep, err := waitForTxInMempool(net.Miner.Node, minerMempoolTimeout) + daveSweep, err := waitForTxInMempool( + net.Miner.Node, minerMempoolTimeout, + ) if err != nil { t.Fatalf("unable to find Dave's sweep tx in mempool: %v", err) } @@ -13559,6 +13561,27 @@ func testChanRestoreScenario(t *harnessTest, net *lntest.NetworkHarness, t.Fatalf("unable to restore node: %v", err) } + // First ensure that the on-chain balance is restored. + err = wait.NoError(func() error { + ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) + balReq := &lnrpc.WalletBalanceRequest{} + daveBalResp, err := dave.WalletBalance(ctxt, balReq) + if err != nil { + return err + } + + daveBal := daveBalResp.ConfirmedBalance + if daveBal <= 0 { + return fmt.Errorf("expected positive balance, had %v", + daveBal) + } + + return nil + }, defaultTimeout) + if err != nil { + t.Fatalf("On-chain balance not restored: %v", err) + } + // Now that we have our new node up, we expect that it'll re-connect to // Carol automatically based on the restored backup. ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 99f3e640..90871498 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -81,16 +81,6 @@ var ( ErrInvalidLocalUnrevokedCommitPoint = fmt.Errorf("unrevoked commit " + "point is invalid") - // ErrCommitSyncLocalDataLoss is returned in the case that we receive a - // valid commit secret within the ChannelReestablish message from the - // remote node AND they advertise a RemoteCommitTailHeight higher than - // our current known height. This means we have lost some critical - // data, and must fail the channel and MUST NOT force close it. Instead - // we should wait for the remote to force close it, such that we can - // attempt to sweep our funds. - ErrCommitSyncLocalDataLoss = fmt.Errorf("possible local commitment " + - "state data loss") - // ErrCommitSyncRemoteDataLoss is returned in the case that we receive // a ChannelReestablish message from the remote that advertises a // NextLocalCommitHeight that is lower than what they have already @@ -101,6 +91,30 @@ var ( "state data loss") ) +// ErrCommitSyncLocalDataLoss is returned in the case that we receive a valid +// commit secret within the ChannelReestablish message from the remote node AND +// they advertise a RemoteCommitTailHeight higher than our current known +// height. This means we have lost some critical data, and must fail the +// channel and MUST NOT force close it. Instead we should wait for the remote +// to force close it, such that we can attempt to sweep our funds. The +// commitment point needed to sweep the remote's force close is encapsuled. +type ErrCommitSyncLocalDataLoss struct { + // ChannelPoint is the identifier for the channel that experienced data + // loss. + ChannelPoint wire.OutPoint + + // CommitPoint is the last unrevoked commit point, sent to us by the + // remote when we determined we had lost state. + CommitPoint *btcec.PublicKey +} + +// Error returns a string representation of the local data loss error. +func (e *ErrCommitSyncLocalDataLoss) Error() string { + return fmt.Sprintf("ChannelPoint(%v) with CommitPoint(%x) had "+ + "possible local commitment state data loss", e.ChannelPoint, + e.CommitPoint.SerializeCompressed()) +} + // channelState is an enum like type which represents the current state of a // particular channel. // TODO(roasbeef): actually update state @@ -3293,10 +3307,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( // doesn't support data loss protection. In either case // it is not safe for us to keep using the channel, so // we mark it borked and fail the channel. - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } - walletLog.Errorf("ChannelPoint(%v), sync failed: "+ "local data loss, but no recovery option.", lc.channelState.FundingOutpoint) @@ -3304,16 +3314,11 @@ func (lc *LightningChannel) ProcessChanSyncMsg( } // In this case, we've likely lost data and shouldn't proceed - // with channel updates. So we'll store the commit point we - // were given in the database, such that we can attempt to - // recover the funds if the remote force closes the channel. - err := lc.channelState.MarkDataLoss( - msg.LocalUnrevokedCommitPoint, - ) - if err != nil { - return nil, nil, nil, err + // with channel updates. + return nil, nil, nil, &ErrCommitSyncLocalDataLoss{ + ChannelPoint: lc.channelState.FundingOutpoint, + CommitPoint: msg.LocalUnrevokedCommitPoint, } - return nil, nil, nil, ErrCommitSyncLocalDataLoss // If the height of our commitment chain reported by the remote party // is behind our view of the chain, then they probably lost some state, @@ -3323,10 +3328,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( "believes our tail height is %v, while we have %v!", lc.channelState.FundingOutpoint, msg.RemoteCommitTailHeight, localTailHeight) - - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCommitSyncRemoteDataLoss // Their view of our commit chain is consistent with our view. @@ -3390,10 +3391,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( "believes our tail height is %v, while we have %v!", lc.channelState.FundingOutpoint, msg.RemoteCommitTailHeight, localTailHeight) - - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCannotSyncCommitChains } @@ -3412,9 +3409,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( lc.channelState.FundingOutpoint, msg.NextLocalCommitHeight, remoteTipHeight) - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCannotSyncCommitChains // They are waiting for a state they have already ACKed. @@ -3426,9 +3420,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( // They previously ACKed our current tail, and now they are // waiting for it. They probably lost state. - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCommitSyncRemoteDataLoss // They have received our latest commitment, life is good. @@ -3474,10 +3465,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( "next commit height is %v, while we believe it is %v!", lc.channelState.FundingOutpoint, msg.NextLocalCommitHeight, remoteTipHeight) - - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCannotSyncCommitChains } @@ -3515,97 +3502,12 @@ func (lc *LightningChannel) ProcessChanSyncMsg( "sent invalid commit point for height %v!", lc.channelState.FundingOutpoint, msg.NextLocalCommitHeight) - - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } - - // TODO(halseth): force close? return nil, nil, nil, ErrInvalidLocalUnrevokedCommitPoint } return updates, openedCircuits, closedCircuits, nil } -// ChanSyncMsg returns the ChannelReestablish message that should be sent upon -// reconnection with the remote peer that we're maintaining this channel with. -// The information contained within this message is necessary to re-sync our -// commitment chains in the case of a last or only partially processed message. -// When the remote party receiver this message one of three things may happen: -// -// 1. We're fully synced and no messages need to be sent. -// 2. We didn't get the last CommitSig message they sent, to they'll re-send -// it. -// 3. We didn't get the last RevokeAndAck message they sent, so they'll -// re-send it. -// -// The isRestoredChan bool indicates if we need to craft a chan sync message -// for a channel that's been restored. If this is a restored channel, then -// we'll modify our typical chan sync message to ensure they force close even -// if we're on the very first state. -func ChanSyncMsg(c *channeldb.OpenChannel, - isRestoredChan bool) (*lnwire.ChannelReestablish, error) { - - c.Lock() - defer c.Unlock() - - // The remote commitment height that we'll send in the - // ChannelReestablish message is our current commitment height plus - // one. If the receiver thinks that our commitment height is actually - // *equal* to this value, then they'll re-send the last commitment that - // they sent but we never fully processed. - localHeight := c.LocalCommitment.CommitHeight - nextLocalCommitHeight := localHeight + 1 - - // The second value we'll send is the height of the remote commitment - // from our PoV. If the receiver thinks that their height is actually - // *one plus* this value, then they'll re-send their last revocation. - remoteChainTipHeight := c.RemoteCommitment.CommitHeight - - // If this channel has undergone a commitment update, then in order to - // prove to the remote party our knowledge of their prior commitment - // state, we'll also send over the last commitment secret that the - // remote party sent. - var lastCommitSecret [32]byte - if remoteChainTipHeight != 0 { - remoteSecret, err := c.RevocationStore.LookUp( - remoteChainTipHeight - 1, - ) - if err != nil { - return nil, err - } - lastCommitSecret = [32]byte(*remoteSecret) - } - - // Additionally, we'll send over the current unrevoked commitment on - // our local commitment transaction. - currentCommitSecret, err := c.RevocationProducer.AtIndex( - localHeight, - ) - if err != nil { - return nil, err - } - - // If we've restored this channel, then we'll purposefully give them an - // invalid LocalUnrevokedCommitPoint so they'll force close the channel - // allowing us to sweep our funds. - if isRestoredChan { - currentCommitSecret[0] ^= 1 - } - - return &lnwire.ChannelReestablish{ - ChanID: lnwire.NewChanIDFromOutPoint( - &c.FundingOutpoint, - ), - NextLocalCommitHeight: nextLocalCommitHeight, - RemoteCommitTailHeight: remoteChainTipHeight, - LastRemoteCommitSecret: lastCommitSecret, - LocalUnrevokedCommitPoint: input.ComputeCommitmentPoint( - currentCommitSecret[:], - ), - }, nil -} - // computeView takes the given htlcView, and calculates the balances, filtered // view (settling unsettled HTLCs), commitment weight and feePerKw, after // applying the HTLCs to the latest commitment. The returned balances are the @@ -5187,10 +5089,7 @@ func NewUnilateralCloseSummary(chanState *channeldb.OpenChannel, signer input.Si } // Attempt to add a channel sync message to the close summary. - chanSync, err := ChanSyncMsg( - chanState, - chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + chanSync, err := chanState.ChanSyncMsg() if err != nil { walletLog.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", chanState.FundingOutpoint, err) @@ -6336,14 +6235,34 @@ func (lc *LightningChannel) State() *channeldb.OpenChannel { return lc.channelState } -// MarkCommitmentBroadcasted marks the channel as a commitment transaction has -// been broadcast, either our own or the remote, and we should watch the chain -// for it to confirm before taking any further action. -func (lc *LightningChannel) MarkCommitmentBroadcasted() error { +// MarkBorked marks the event when the channel as reached an irreconcilable +// state, such as a channel breach or state desynchronization. Borked channels +// should never be added to the switch. +func (lc *LightningChannel) MarkBorked() error { lc.Lock() defer lc.Unlock() - return lc.channelState.MarkCommitmentBroadcasted() + return lc.channelState.MarkBorked() +} + +// MarkCommitmentBroadcasted marks the channel as a commitment transaction has +// been broadcast, either our own or the remote, and we should watch the chain +// for it to confirm before taking any further action. +func (lc *LightningChannel) MarkCommitmentBroadcasted(tx *wire.MsgTx) error { + lc.Lock() + defer lc.Unlock() + + return lc.channelState.MarkCommitmentBroadcasted(tx) +} + +// MarkDataLoss marks sets the channel status to LocalDataLoss and stores the +// passed commitPoint for use to retrieve funds in case the remote force closes +// the channel. +func (lc *LightningChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error { + lc.Lock() + defer lc.Unlock() + + return lc.channelState.MarkDataLoss(commitPoint) } // ActiveHtlcs returns a slice of HTLC's which are currently active on *both* diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 69702091..ca686d34 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -2530,7 +2530,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel, _, _, line, _ := runtime.Caller(1) - aliceChanSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceChanSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("line #%v: unable to produce chan sync msg: %v", line, err) @@ -2545,7 +2545,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel, "instead wants to send: %v", line, spew.Sdump(bobMsgsToSend)) } - bobChanSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobChanSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("line #%v: unable to produce chan sync msg: %v", line, err) @@ -2778,11 +2778,11 @@ func TestChanSyncOweCommitment(t *testing.T) { // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, // while Bob should think he's properly synchronized. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3092,11 +3092,11 @@ func TestChanSyncOweRevocation(t *testing.T) { // If we fetch the channel sync messages at this state, then Alice // should report that she owes Bob a revocation message, while Bob // thinks they're fully in sync. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3261,11 +3261,11 @@ func TestChanSyncOweRevocationAndCommit(t *testing.T) { // If we now attempt to resync, then Alice should conclude that she // doesn't need any further updates, while Bob concludes that he needs // to re-send both his revocation and commit sig message. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3472,11 +3472,11 @@ func TestChanSyncOweRevocationAndCommitForceTransition(t *testing.T) { // Now if we attempt to synchronize states at this point, Alice should // detect that she owes nothing, while Bob should re-send both his // RevokeAndAck as well as his commitment message. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3677,18 +3677,18 @@ func TestChanSyncFailure(t *testing.T) { assertLocalDataLoss := func(aliceOld *LightningChannel) { t.Helper() - aliceSyncMsg, err := ChanSyncMsg(aliceOld.channelState, false) + aliceSyncMsg, err := aliceOld.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } // Alice should detect from Bob's message that she lost state. _, _, _, err = aliceOld.ProcessChanSyncMsg(bobSyncMsg) - if err != ErrCommitSyncLocalDataLoss { + if _, ok := err.(*ErrCommitSyncLocalDataLoss); !ok { t.Fatalf("wrong error, expected "+ "ErrCommitSyncLocalDataLoss instead got: %v", err) @@ -3755,7 +3755,7 @@ func TestChanSyncFailure(t *testing.T) { // If we remove the recovery options from Bob's message, Alice cannot // tell if she lost state, since Bob might be lying. She still should // be able to detect that chains cannot be synced. - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3769,7 +3769,7 @@ func TestChanSyncFailure(t *testing.T) { // If Bob lies about the NextLocalCommitHeight, making it greater than // what Alice expect, she cannot tell for sure whether she lost state, // but should detect the desync. - bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3782,7 +3782,7 @@ func TestChanSyncFailure(t *testing.T) { // If Bob's NextLocalCommitHeight is lower than what Alice expects, Bob // probably lost state. - bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3795,7 +3795,7 @@ func TestChanSyncFailure(t *testing.T) { // If Alice and Bob's states are in sync, but Bob is sending the wrong // LocalUnrevokedCommitPoint, Alice should detect this. - bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3824,7 +3824,7 @@ func TestChanSyncFailure(t *testing.T) { // when there's a pending remote commit. halfAdvance() - bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3912,11 +3912,11 @@ func TestChannelRetransmissionFeeUpdate(t *testing.T) { // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, // while Bob should think he's properly synchronized. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -4361,11 +4361,11 @@ func TestChanSyncInvalidLastSecret(t *testing.T) { } // Next, we'll produce the ChanSync messages for both parties. - aliceChanSync, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceChanSync, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to generate chan sync msg: %v", err) } - bobChanSync, err := ChanSyncMsg(bobChannel.channelState, false) + bobChanSync, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to generate chan sync msg: %v", err) } @@ -4377,7 +4377,7 @@ func TestChanSyncInvalidLastSecret(t *testing.T) { // Alice's former self should conclude that she possibly lost data as // Bob is sending a valid commit secret for the latest state. _, _, _, err = aliceOld.ProcessChanSyncMsg(bobChanSync) - if err != ErrCommitSyncLocalDataLoss { + if _, ok := err.(*ErrCommitSyncLocalDataLoss); !ok { t.Fatalf("wrong error, expected ErrCommitSyncLocalDataLoss "+ "instead got: %v", err) } diff --git a/lnwallet/test_utils.go b/lnwallet/test_utils.go index 4dee6ccb..32c6db95 100644 --- a/lnwallet/test_utils.go +++ b/lnwallet/test_utils.go @@ -7,6 +7,8 @@ import ( "encoding/hex" "io" "io/ioutil" + prand "math/rand" + "net" "os" "github.com/btcsuite/btcd/btcec" @@ -101,7 +103,7 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error) prevOut := &wire.OutPoint{ Hash: chainhash.Hash(testHdSeed), - Index: 0, + Index: prand.Uint32(), } fundingTxIn := wire.NewTxIn(prevOut, nil, nil) @@ -334,10 +336,20 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error) return nil, nil, nil, err } - if err := channelAlice.channelState.FullSync(); err != nil { + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := channelAlice.channelState.SyncPending(addr, 101); err != nil { return nil, nil, nil, err } - if err := channelBob.channelState.FullSync(); err != nil { + + addr = &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + + if err := channelBob.channelState.SyncPending(addr, 101); err != nil { return nil, nil, nil, err } diff --git a/peer.go b/peer.go index 3ca6b672..7f2d9155 100644 --- a/peer.go +++ b/peer.go @@ -355,7 +355,8 @@ func (p *peer) Start() error { peerLog.Debugf("Loaded %v active channels from database with "+ "NodeKey(%x)", len(activeChans), p.PubKey()) - if err := p.loadActiveChannels(activeChans); err != nil { + msgs, err := p.loadActiveChannels(activeChans) + if err != nil { return fmt.Errorf("unable to load channels: %v", err) } @@ -368,6 +369,17 @@ func (p *peer) Start() error { go p.channelManager() go p.pingHandler() + // Now that the peer has started up, we send any channel sync messages + // that must be resent for borked channels. + if len(msgs) > 0 { + peerLog.Infof("Sending %d channel sync messages to peer after "+ + "loading active channels", len(msgs)) + if err := p.SendMessage(true, msgs...); err != nil { + peerLog.Warnf("Failed sending channel sync "+ + "messages to peer %v: %v", p, err) + } + } + return nil } @@ -406,14 +418,22 @@ func (p *peer) QuitSignal() <-chan struct{} { } // loadActiveChannels creates indexes within the peer for tracking all active -// channels returned by the database. -func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { +// channels returned by the database. It returns a slice of channel reestablish +// messages that should be sent to the peer immediately, in case we have borked +// channels that haven't been closed yet. +func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( + []lnwire.Message, error) { + + // Return a slice of messages to send to the peers in case the channel + // cannot be loaded normally. + var msgs []lnwire.Message + for _, dbChan := range chans { lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, dbChan, p.server.sigPool, ) if err != nil { - return err + return nil, err } chanPoint := &dbChan.FundingOutpoint @@ -433,6 +453,22 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { case dbChan.HasChanStatus(channeldb.ChanStatusLocalDataLoss): peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+ "start.", chanPoint, dbChan.ChanStatus()) + + // To help our peer recover from a potential data loss, + // we resend our channel reestablish message if the + // channel is in a borked state. We won't process any + // channel reestablish message sent from the peer, but + // that's okay since the assumption is that we did when + // marking the channel borked. + chanSync, err := dbChan.ChanSyncMsg() + if err != nil { + peerLog.Errorf("Unable to create channel "+ + "reestablish message for channel %v: "+ + "%v", chanPoint, err) + continue + } + + msgs = append(msgs, chanSync) continue } @@ -446,7 +482,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { _, currentHeight, err := p.server.cc.chainIO.GetBestBlock() if err != nil { - return err + return nil, err } // Before we register this new link with the HTLC Switch, we'll @@ -455,7 +491,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { graph := p.server.chanDB.ChannelGraph() info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint) if err != nil && err != channeldb.ErrEdgeNotFound { - return err + return nil, err } // We'll filter out our policy from the directional channel @@ -503,7 +539,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { *chanPoint, ) if err != nil { - return err + return nil, err } // Create the link and add it to the switch. @@ -512,8 +548,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { currentHeight, true, ) if err != nil { - return fmt.Errorf("unable to add link %v to switch: %v", - chanPoint, err) + return nil, fmt.Errorf("unable to add link %v to "+ + "switch: %v", chanPoint, err) } p.activeChanMtx.Lock() @@ -521,7 +557,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { p.activeChanMtx.Unlock() } - return nil + return msgs, nil } // addLink creates and adds a new link from the specified channel.