From 0122dda88a96f558c04dc747208a25d1689ea9c1 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:38 +0200 Subject: [PATCH 01/17] channeldb/channel: remove unused FullSync method The exported FullSync method is only used by test code, so we remove it and instead use SyncPending. --- channeldb/channel.go | 15 ++------------- channeldb/channel_test.go | 14 ++++++++++++-- channeldb/db_test.go | 7 ++++++- lnwallet/test_utils.go | 15 +++++++++++++-- 4 files changed, 33 insertions(+), 18 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 2b1ec949..b11e09e8 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -514,16 +514,6 @@ type OpenChannel struct { sync.RWMutex } -// FullSync serializes, and writes to disk the *full* channel state, using -// both the active channel bucket to store the prefixed column fields, and the -// remote node's ID to store the remainder of the channel state. -func (c *OpenChannel) FullSync() error { - c.Lock() - defer c.Unlock() - - return c.Db.Update(c.fullSync) -} - // ShortChanID returns the current ShortChannelID of this channel. func (c *OpenChannel) ShortChanID() lnwire.ShortChannelID { c.RLock() @@ -648,9 +638,8 @@ func fetchChanBucket(tx *bbolt.Tx, nodeKey *btcec.PublicKey, return chanBucket, nil } -// fullSync is an internal version of the FullSync method which allows callers -// to sync the contents of an OpenChannel while re-using an existing database -// transaction. +// fullSync syncs the contents of an OpenChannel while re-using an existing +// database transaction. func (c *OpenChannel) fullSync(tx *bbolt.Tx) error { // First fetch the top level bucket which stores all data related to // current, active channels. diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 770f6f35..46ac311c 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -263,7 +263,12 @@ func TestOpenChannelPutGetDelete(t *testing.T) { OnionBlob: []byte("onionblob"), }, } - if err := state.FullSync(); err != nil { + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := state.SyncPending(addr, 101); err != nil { t.Fatalf("unable to save and serialize channel state: %v", err) } @@ -363,7 +368,12 @@ func TestChannelStateTransition(t *testing.T) { if err != nil { t.Fatalf("unable to create channel state: %v", err) } - if err := channel.FullSync(); err != nil { + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := channel.SyncPending(addr, 101); err != nil { t.Fatalf("unable to save and serialize channel state: %v", err) } diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 43b62619..198f6e2b 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -110,7 +110,12 @@ func TestFetchClosedChannelForID(t *testing.T) { for i := uint32(0); i < numChans; i++ { // Save the open channel to disk. state.FundingOutpoint.Index = i - if err := state.FullSync(); err != nil { + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := state.SyncPending(addr, 101); err != nil { t.Fatalf("unable to save and serialize channel "+ "state: %v", err) } diff --git a/lnwallet/test_utils.go b/lnwallet/test_utils.go index 4dee6ccb..37f13c55 100644 --- a/lnwallet/test_utils.go +++ b/lnwallet/test_utils.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "io" "io/ioutil" + "net" "os" "github.com/btcsuite/btcd/btcec" @@ -334,10 +335,20 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error) return nil, nil, nil, err } - if err := channelAlice.channelState.FullSync(); err != nil { + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := channelAlice.channelState.SyncPending(addr, 101); err != nil { return nil, nil, nil, err } - if err := channelBob.channelState.FullSync(); err != nil { + + addr = &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + + if err := channelBob.channelState.SyncPending(addr, 101); err != nil { return nil, nil, nil, err } From eb1b84c0b44edbd03f0e7f46f5e78a81922f45d7 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:38 +0200 Subject: [PATCH 02/17] channeldb+lnwallet: make ChanSyncMsg method on OpenChannel --- channeldb/channel.go | 80 +++++++++++++++++++++++++++++++++ contractcourt/chain_watcher.go | 9 ++-- htlcswitch/link.go | 3 +- lnwallet/channel.go | 82 +--------------------------------- lnwallet/channel_test.go | 42 ++++++++--------- 5 files changed, 106 insertions(+), 110 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index b11e09e8..a16fa1c1 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" @@ -810,6 +811,85 @@ func (c *OpenChannel) MarkBorked() error { return c.putChanStatus(ChanStatusBorked) } +// ChanSyncMsg returns the ChannelReestablish message that should be sent upon +// reconnection with the remote peer that we're maintaining this channel with. +// The information contained within this message is necessary to re-sync our +// commitment chains in the case of a last or only partially processed message. +// When the remote party receiver this message one of three things may happen: +// +// 1. We're fully synced and no messages need to be sent. +// 2. We didn't get the last CommitSig message they sent, to they'll re-send +// it. +// 3. We didn't get the last RevokeAndAck message they sent, so they'll +// re-send it. +// +// The isRestoredChan bool indicates if we need to craft a chan sync message +// for a channel that's been restored. If this is a restored channel, then +// we'll modify our typical chan sync message to ensure they force close even +// if we're on the very first state. +func (c *OpenChannel) ChanSyncMsg( + isRestoredChan bool) (*lnwire.ChannelReestablish, error) { + + c.Lock() + defer c.Unlock() + + // The remote commitment height that we'll send in the + // ChannelReestablish message is our current commitment height plus + // one. If the receiver thinks that our commitment height is actually + // *equal* to this value, then they'll re-send the last commitment that + // they sent but we never fully processed. + localHeight := c.LocalCommitment.CommitHeight + nextLocalCommitHeight := localHeight + 1 + + // The second value we'll send is the height of the remote commitment + // from our PoV. If the receiver thinks that their height is actually + // *one plus* this value, then they'll re-send their last revocation. + remoteChainTipHeight := c.RemoteCommitment.CommitHeight + + // If this channel has undergone a commitment update, then in order to + // prove to the remote party our knowledge of their prior commitment + // state, we'll also send over the last commitment secret that the + // remote party sent. + var lastCommitSecret [32]byte + if remoteChainTipHeight != 0 { + remoteSecret, err := c.RevocationStore.LookUp( + remoteChainTipHeight - 1, + ) + if err != nil { + return nil, err + } + lastCommitSecret = [32]byte(*remoteSecret) + } + + // Additionally, we'll send over the current unrevoked commitment on + // our local commitment transaction. + currentCommitSecret, err := c.RevocationProducer.AtIndex( + localHeight, + ) + if err != nil { + return nil, err + } + + // If we've restored this channel, then we'll purposefully give them an + // invalid LocalUnrevokedCommitPoint so they'll force close the channel + // allowing us to sweep our funds. + if isRestoredChan { + currentCommitSecret[0] ^= 1 + } + + return &lnwire.ChannelReestablish{ + ChanID: lnwire.NewChanIDFromOutPoint( + &c.FundingOutpoint, + ), + NextLocalCommitHeight: nextLocalCommitHeight, + RemoteCommitTailHeight: remoteChainTipHeight, + LastRemoteCommitSecret: lastCommitSecret, + LocalUnrevokedCommitPoint: input.ComputeCommitmentPoint( + currentCommitSecret[:], + ), + }, nil +} + // isBorked returns true if the channel has been marked as borked in the // database. This requires an existing database transaction to already be // active. diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index 8b17e75a..c7ba2323 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -731,8 +731,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet } // Attempt to add a channel sync message to the close summary. - chanSync, err := lnwallet.ChanSyncMsg( - c.cfg.chanState, + chanSync, err := c.cfg.chanState.ChanSyncMsg( c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored), ) if err != nil { @@ -811,8 +810,7 @@ func (c *chainWatcher) dispatchLocalForceClose( } // Attempt to add a channel sync message to the close summary. - chanSync, err := lnwallet.ChanSyncMsg( - c.cfg.chanState, + chanSync, err := c.cfg.chanState.ChanSyncMsg( c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored), ) if err != nil { @@ -998,8 +996,7 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail } // Attempt to add a channel sync message to the close summary. - chanSync, err := lnwallet.ChanSyncMsg( - c.cfg.chanState, + chanSync, err := c.cfg.chanState.ChanSyncMsg( c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored), ) if err != nil { diff --git a/htlcswitch/link.go b/htlcswitch/link.go index c4603d4b..c7ef3800 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -609,8 +609,7 @@ func (l *channelLink) syncChanStates() error { // side. Based on this message, the remote party will decide if they // need to retransmit any data or not. chanState := l.channel.State() - localChanSyncMsg, err := lnwallet.ChanSyncMsg( - chanState, + localChanSyncMsg, err := chanState.ChanSyncMsg( chanState.HasChanStatus(channeldb.ChanStatusRestored), ) if err != nil { diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 99f3e640..3e6ef764 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -3527,85 +3527,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( return updates, openedCircuits, closedCircuits, nil } -// ChanSyncMsg returns the ChannelReestablish message that should be sent upon -// reconnection with the remote peer that we're maintaining this channel with. -// The information contained within this message is necessary to re-sync our -// commitment chains in the case of a last or only partially processed message. -// When the remote party receiver this message one of three things may happen: -// -// 1. We're fully synced and no messages need to be sent. -// 2. We didn't get the last CommitSig message they sent, to they'll re-send -// it. -// 3. We didn't get the last RevokeAndAck message they sent, so they'll -// re-send it. -// -// The isRestoredChan bool indicates if we need to craft a chan sync message -// for a channel that's been restored. If this is a restored channel, then -// we'll modify our typical chan sync message to ensure they force close even -// if we're on the very first state. -func ChanSyncMsg(c *channeldb.OpenChannel, - isRestoredChan bool) (*lnwire.ChannelReestablish, error) { - - c.Lock() - defer c.Unlock() - - // The remote commitment height that we'll send in the - // ChannelReestablish message is our current commitment height plus - // one. If the receiver thinks that our commitment height is actually - // *equal* to this value, then they'll re-send the last commitment that - // they sent but we never fully processed. - localHeight := c.LocalCommitment.CommitHeight - nextLocalCommitHeight := localHeight + 1 - - // The second value we'll send is the height of the remote commitment - // from our PoV. If the receiver thinks that their height is actually - // *one plus* this value, then they'll re-send their last revocation. - remoteChainTipHeight := c.RemoteCommitment.CommitHeight - - // If this channel has undergone a commitment update, then in order to - // prove to the remote party our knowledge of their prior commitment - // state, we'll also send over the last commitment secret that the - // remote party sent. - var lastCommitSecret [32]byte - if remoteChainTipHeight != 0 { - remoteSecret, err := c.RevocationStore.LookUp( - remoteChainTipHeight - 1, - ) - if err != nil { - return nil, err - } - lastCommitSecret = [32]byte(*remoteSecret) - } - - // Additionally, we'll send over the current unrevoked commitment on - // our local commitment transaction. - currentCommitSecret, err := c.RevocationProducer.AtIndex( - localHeight, - ) - if err != nil { - return nil, err - } - - // If we've restored this channel, then we'll purposefully give them an - // invalid LocalUnrevokedCommitPoint so they'll force close the channel - // allowing us to sweep our funds. - if isRestoredChan { - currentCommitSecret[0] ^= 1 - } - - return &lnwire.ChannelReestablish{ - ChanID: lnwire.NewChanIDFromOutPoint( - &c.FundingOutpoint, - ), - NextLocalCommitHeight: nextLocalCommitHeight, - RemoteCommitTailHeight: remoteChainTipHeight, - LastRemoteCommitSecret: lastCommitSecret, - LocalUnrevokedCommitPoint: input.ComputeCommitmentPoint( - currentCommitSecret[:], - ), - }, nil -} - // computeView takes the given htlcView, and calculates the balances, filtered // view (settling unsettled HTLCs), commitment weight and feePerKw, after // applying the HTLCs to the latest commitment. The returned balances are the @@ -5187,8 +5108,7 @@ func NewUnilateralCloseSummary(chanState *channeldb.OpenChannel, signer input.Si } // Attempt to add a channel sync message to the close summary. - chanSync, err := ChanSyncMsg( - chanState, + chanSync, err := chanState.ChanSyncMsg( chanState.HasChanStatus(channeldb.ChanStatusRestored), ) if err != nil { diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 69702091..81077862 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -2530,7 +2530,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel, _, _, line, _ := runtime.Caller(1) - aliceChanSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceChanSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("line #%v: unable to produce chan sync msg: %v", line, err) @@ -2545,7 +2545,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel, "instead wants to send: %v", line, spew.Sdump(bobMsgsToSend)) } - bobChanSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobChanSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("line #%v: unable to produce chan sync msg: %v", line, err) @@ -2778,11 +2778,11 @@ func TestChanSyncOweCommitment(t *testing.T) { // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, // while Bob should think he's properly synchronized. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3092,11 +3092,11 @@ func TestChanSyncOweRevocation(t *testing.T) { // If we fetch the channel sync messages at this state, then Alice // should report that she owes Bob a revocation message, while Bob // thinks they're fully in sync. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3261,11 +3261,11 @@ func TestChanSyncOweRevocationAndCommit(t *testing.T) { // If we now attempt to resync, then Alice should conclude that she // doesn't need any further updates, while Bob concludes that he needs // to re-send both his revocation and commit sig message. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3472,11 +3472,11 @@ func TestChanSyncOweRevocationAndCommitForceTransition(t *testing.T) { // Now if we attempt to synchronize states at this point, Alice should // detect that she owes nothing, while Bob should re-send both his // RevokeAndAck as well as his commitment message. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3677,11 +3677,11 @@ func TestChanSyncFailure(t *testing.T) { assertLocalDataLoss := func(aliceOld *LightningChannel) { t.Helper() - aliceSyncMsg, err := ChanSyncMsg(aliceOld.channelState, false) + aliceSyncMsg, err := aliceOld.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3755,7 +3755,7 @@ func TestChanSyncFailure(t *testing.T) { // If we remove the recovery options from Bob's message, Alice cannot // tell if she lost state, since Bob might be lying. She still should // be able to detect that chains cannot be synced. - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3769,7 +3769,7 @@ func TestChanSyncFailure(t *testing.T) { // If Bob lies about the NextLocalCommitHeight, making it greater than // what Alice expect, she cannot tell for sure whether she lost state, // but should detect the desync. - bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3782,7 +3782,7 @@ func TestChanSyncFailure(t *testing.T) { // If Bob's NextLocalCommitHeight is lower than what Alice expects, Bob // probably lost state. - bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3795,7 +3795,7 @@ func TestChanSyncFailure(t *testing.T) { // If Alice and Bob's states are in sync, but Bob is sending the wrong // LocalUnrevokedCommitPoint, Alice should detect this. - bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3824,7 +3824,7 @@ func TestChanSyncFailure(t *testing.T) { // when there's a pending remote commit. halfAdvance() - bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3912,11 +3912,11 @@ func TestChannelRetransmissionFeeUpdate(t *testing.T) { // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, // while Bob should think he's properly synchronized. - aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -4361,11 +4361,11 @@ func TestChanSyncInvalidLastSecret(t *testing.T) { } // Next, we'll produce the ChanSync messages for both parties. - aliceChanSync, err := ChanSyncMsg(aliceChannel.channelState, false) + aliceChanSync, err := aliceChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to generate chan sync msg: %v", err) } - bobChanSync, err := ChanSyncMsg(bobChannel.channelState, false) + bobChanSync, err := bobChannel.channelState.ChanSyncMsg(false) if err != nil { t.Fatalf("unable to generate chan sync msg: %v", err) } From 1974bfa4cf7ea50ddfa981801c241bb811510103 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:38 +0200 Subject: [PATCH 03/17] peer: send channel reestablish message for borked channels When loading active channels for a connected peer, we gather channel sync messages for all borked channels, and send them to the peer. This should help a peer realize that the state is irreconcible, as we have already realized. --- peer.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/peer.go b/peer.go index 247daab7..33639a9b 100644 --- a/peer.go +++ b/peer.go @@ -349,7 +349,8 @@ func (p *peer) Start() error { peerLog.Debugf("Loaded %v active channels from database with "+ "NodeKey(%x)", len(activeChans), p.PubKey()) - if err := p.loadActiveChannels(activeChans); err != nil { + msgs, err := p.loadActiveChannels(activeChans) + if err != nil { return fmt.Errorf("unable to load channels: %v", err) } @@ -362,6 +363,17 @@ func (p *peer) Start() error { go p.channelManager() go p.pingHandler() + // Now that the peer has started up, we send any channel sync messages + // that must be resent for borked channels. + if len(msgs) > 0 { + peerLog.Infof("Sending %d channel sync messages to peer after "+ + "loading active channels", len(msgs)) + if err := p.SendMessage(true, msgs...); err != nil { + peerLog.Warnf("Failed sending channel sync "+ + "messages to peer %v: %v", p, err) + } + } + return nil } @@ -400,14 +412,22 @@ func (p *peer) QuitSignal() <-chan struct{} { } // loadActiveChannels creates indexes within the peer for tracking all active -// channels returned by the database. -func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { +// channels returned by the database. It returns a slice of channel reestablish +// messages that should be sent to the peer immediately, in case we have borked +// channels that haven't been closed yet. +func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( + []lnwire.Message, error) { + + // Return a slice of messages to send to the peers in case the channel + // cannot be loaded normally. + var msgs []lnwire.Message + for _, dbChan := range chans { lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, dbChan, p.server.sigPool, ) if err != nil { - return err + return nil, err } chanPoint := &dbChan.FundingOutpoint @@ -427,6 +447,22 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { case dbChan.HasChanStatus(channeldb.ChanStatusLocalDataLoss): peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+ "start.", chanPoint, dbChan.ChanStatus()) + + // To help our peer recover from a potential data loss, + // we resend our channel reestablish message if the + // channel is in a borked state. We won't process any + // channel reestablish message sent from the peer, but + // that's okay since the assumption is that we did when + // marking the channel borked. + chanSync, err := dbChan.ChanSyncMsg(false) + if err != nil { + peerLog.Errorf("Unable to create channel "+ + "reestablish message for channel %v: "+ + "%v", chanPoint, err) + continue + } + + msgs = append(msgs, chanSync) continue } @@ -440,7 +476,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { _, currentHeight, err := p.server.cc.chainIO.GetBestBlock() if err != nil { - return err + return nil, err } // Before we register this new link with the HTLC Switch, we'll @@ -449,7 +485,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { graph := p.server.chanDB.ChannelGraph() info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint) if err != nil && err != channeldb.ErrEdgeNotFound { - return err + return nil, err } // We'll filter out our policy from the directional channel @@ -497,7 +533,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { *chanPoint, ) if err != nil { - return err + return nil, err } // Create the link and add it to the switch. @@ -506,8 +542,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { currentHeight, true, ) if err != nil { - return fmt.Errorf("unable to add link %v to switch: %v", - chanPoint, err) + return nil, fmt.Errorf("unable to add link %v to "+ + "switch: %v", chanPoint, err) } p.activeChanMtx.Lock() @@ -515,7 +551,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { p.activeChanMtx.Unlock() } - return nil + return msgs, nil } // addLink creates and adds a new link from the specified channel. From a810092e535463c05b40f7cc01a6bee9c05bca72 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:39 +0200 Subject: [PATCH 04/17] channeldb/channel: make putChanStatus take optional extra closures --- channeldb/channel.go | 59 +++++++++++++++++--------------------------- 1 file changed, 23 insertions(+), 36 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index a16fa1c1..70104000 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -726,44 +726,16 @@ func (c *OpenChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error { c.Lock() defer c.Unlock() - var status ChannelStatus - if err := c.Db.Update(func(tx *bbolt.Tx) error { - chanBucket, err := fetchChanBucket( - tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, - ) - if err != nil { - return err - } - - channel, err := fetchOpenChannel(chanBucket, &c.FundingOutpoint) - if err != nil { - return err - } - - // Add status LocalDataLoss to the existing bitvector found in - // the DB. - status = channel.chanStatus | ChanStatusLocalDataLoss - channel.chanStatus = status - - var b bytes.Buffer - if err := WriteElement(&b, commitPoint); err != nil { - return err - } - - err = chanBucket.Put(dataLossCommitPointKey, b.Bytes()) - if err != nil { - return err - } - - return putOpenChannel(chanBucket, channel) - }); err != nil { + var b bytes.Buffer + if err := WriteElement(&b, commitPoint); err != nil { return err } - // Update the in-memory representation to keep it in sync with the DB. - c.chanStatus = status + putCommitPoint := func(chanBucket *bbolt.Bucket) error { + return chanBucket.Put(dataLossCommitPointKey, b.Bytes()) + } - return nil + return c.putChanStatus(ChanStatusLocalDataLoss, putCommitPoint) } // DataLossCommitPoint retrieves the stored commit point set during @@ -914,7 +886,12 @@ func (c *OpenChannel) MarkCommitmentBroadcasted() error { return c.putChanStatus(ChanStatusCommitBroadcasted) } -func (c *OpenChannel) putChanStatus(status ChannelStatus) error { +// putChanStatus appends the given status to the channel. fs is an optional +// list of closures that are given the chanBucket in order to atomically add +// extra information together with the new status. +func (c *OpenChannel) putChanStatus(status ChannelStatus, + fs ...func(*bbolt.Bucket) error) error { + if err := c.Db.Update(func(tx *bbolt.Tx) error { chanBucket, err := fetchChanBucket( tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, @@ -932,7 +909,17 @@ func (c *OpenChannel) putChanStatus(status ChannelStatus) error { status = channel.chanStatus | status channel.chanStatus = status - return putOpenChannel(chanBucket, channel) + if err := putOpenChannel(chanBucket, channel); err != nil { + return err + } + + for _, f := range fs { + if err := f(chanBucket); err != nil { + return err + } + } + + return nil }); err != nil { return err } From 02b2787e4482ec4d8a37dd5f76db66dee79c5df6 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:39 +0200 Subject: [PATCH 05/17] multi: make MarkCommitmentBroadcasted take closeTx --- chancloser.go | 4 +++- channeldb/channel.go | 22 +++++++++++++++++++--- channeldb/channel_test.go | 3 ++- contractcourt/channel_arbitrator.go | 4 ++-- contractcourt/channel_arbitrator_test.go | 2 +- lnwallet/channel.go | 4 ++-- 6 files changed, 29 insertions(+), 10 deletions(-) diff --git a/chancloser.go b/chancloser.go index be8395ce..d78a3824 100644 --- a/chancloser.go +++ b/chancloser.go @@ -444,7 +444,9 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b if err := c.cfg.broadcastTx(closeTx); err != nil { return nil, false, err } - if err := c.cfg.channel.MarkCommitmentBroadcasted(); err != nil { + + err = c.cfg.channel.MarkCommitmentBroadcasted(closeTx) + if err != nil { return nil, false, err } diff --git a/channeldb/channel.go b/channeldb/channel.go index 70104000..79416a28 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -59,6 +59,10 @@ var ( // remote peer during a channel sync in case we have lost channel state. dataLossCommitPointKey = []byte("data-loss-commit-point-key") + // closingTxKey points to a the closing tx that we broadcasted when + // moving the channel to state CommitBroadcasted. + closingTxKey = []byte("closing-tx-key") + // commitDiffKey stores the current pending commitment state we've // extended to the remote party (if any). Each time we propose a new // state, we store the information necessary to reconstruct this state @@ -878,12 +882,24 @@ func (c *OpenChannel) isBorked(chanBucket *bbolt.Bucket) (bool, error) { // MarkCommitmentBroadcasted marks the channel as a commitment transaction has // been broadcast, either our own or the remote, and we should watch the chain -// for it to confirm before taking any further action. -func (c *OpenChannel) MarkCommitmentBroadcasted() error { +// for it to confirm before taking any further action. It takes as argument the +// closing tx _we believe_ will appear in the chain. This is only used to +// republish this tx at startup to ensure propagation, and we should still +// handle the case where a different tx actually hits the chain. +func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx) error { c.Lock() defer c.Unlock() - return c.putChanStatus(ChanStatusCommitBroadcasted) + var b bytes.Buffer + if err := WriteElement(&b, closeTx); err != nil { + return err + } + + putClosingTx := func(chanBucket *bbolt.Bucket) error { + return chanBucket.Put(closingTxKey, b.Bytes()) + } + + return c.putChanStatus(ChanStatusCommitBroadcasted, putClosingTx) } // putChanStatus appends the given status to the channel. fs is an optional diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 46ac311c..ad03d4ea 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -891,7 +891,8 @@ func TestFetchWaitingCloseChannels(t *testing.T) { // This would happen in the event of a force close and should make the // channels enter a state of waiting close. for _, channel := range channels { - if err := channel.MarkCommitmentBroadcasted(); err != nil { + closeTx := &wire.MsgTx{} + if err := channel.MarkCommitmentBroadcasted(closeTx); err != nil { t.Fatalf("unable to mark commitment broadcast: %v", err) } } diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index fbad1465..c15d855e 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -97,7 +97,7 @@ type ChannelArbitratorConfig struct { // MarkCommitmentBroadcasted should mark the channel as the commitment // being broadcast, and we are waiting for the commitment to confirm. - MarkCommitmentBroadcasted func() error + MarkCommitmentBroadcasted func(*wire.MsgTx) error // MarkChannelClosed marks the channel closed in the database, with the // passed close summary. After this method successfully returns we can @@ -840,7 +840,7 @@ func (c *ChannelArbitrator) stateStep( } } - if err := c.cfg.MarkCommitmentBroadcasted(); err != nil { + if err := c.cfg.MarkCommitmentBroadcasted(closeTx); err != nil { log.Errorf("ChannelArbitrator(%v): unable to "+ "mark commitment broadcasted: %v", c.cfg.ChanPoint, err) diff --git a/contractcourt/channel_arbitrator_test.go b/contractcourt/channel_arbitrator_test.go index b785b02a..f98fe116 100644 --- a/contractcourt/channel_arbitrator_test.go +++ b/contractcourt/channel_arbitrator_test.go @@ -213,7 +213,7 @@ func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator, } return summary, nil }, - MarkCommitmentBroadcasted: func() error { + MarkCommitmentBroadcasted: func(_ *wire.MsgTx) error { return nil }, MarkChannelClosed: func(*channeldb.ChannelCloseSummary) error { diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 3e6ef764..00a7f0db 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -6259,11 +6259,11 @@ func (lc *LightningChannel) State() *channeldb.OpenChannel { // MarkCommitmentBroadcasted marks the channel as a commitment transaction has // been broadcast, either our own or the remote, and we should watch the chain // for it to confirm before taking any further action. -func (lc *LightningChannel) MarkCommitmentBroadcasted() error { +func (lc *LightningChannel) MarkCommitmentBroadcasted(tx *wire.MsgTx) error { lc.Lock() defer lc.Unlock() - return lc.channelState.MarkCommitmentBroadcasted() + return lc.channelState.MarkCommitmentBroadcasted(tx) } // ActiveHtlcs returns a slice of HTLC's which are currently active on *both* From ac0e9b60169515726bad5400ab21851e46669117 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:39 +0200 Subject: [PATCH 06/17] channeldb/channel: add BroadcastedCommitment --- channeldb/channel.go | 35 +++++++++++++++++++++++++++++++++++ channeldb/channel_test.go | 20 +++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 79416a28..3db55e08 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -108,6 +108,10 @@ var ( // in the database. ErrNoCommitPoint = fmt.Errorf("no commit point found") + // ErrNoCloseTx is returned when no closing tx is found for a channel + // in the state CommitBroadcasted. + ErrNoCloseTx = fmt.Errorf("no closing tx found") + // ErrNoRestoredChannelMutation is returned when a caller attempts to // mutate a channel that's been recovered. ErrNoRestoredChannelMutation = fmt.Errorf("cannot mutate restored " + @@ -902,6 +906,37 @@ func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx) error { return c.putChanStatus(ChanStatusCommitBroadcasted, putClosingTx) } +// BroadcastedCommitment retrieves the stored closing tx set during +// MarkCommitmentBroadcasted. If not found ErrNoCloseTx is returned. +func (c *OpenChannel) BroadcastedCommitment() (*wire.MsgTx, error) { + var closeTx *wire.MsgTx + + err := c.Db.View(func(tx *bbolt.Tx) error { + chanBucket, err := fetchChanBucket( + tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, + ) + switch err { + case nil: + case ErrNoChanDBExists, ErrNoActiveChannels, ErrChannelNotFound: + return ErrNoCloseTx + default: + return err + } + + bs := chanBucket.Get(closingTxKey) + if bs == nil { + return ErrNoCloseTx + } + r := bytes.NewReader(bs) + return ReadElement(r, &closeTx) + }) + if err != nil { + return nil, err + } + + return closeTx, nil +} + // putChanStatus appends the given status to the channel. fs is an optional // list of closures that are given the chanBucket in order to atomically add // extra information together with the new status. diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index ad03d4ea..21bb738c 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -891,7 +891,12 @@ func TestFetchWaitingCloseChannels(t *testing.T) { // This would happen in the event of a force close and should make the // channels enter a state of waiting close. for _, channel := range channels { - closeTx := &wire.MsgTx{} + closeTx := wire.NewMsgTx(2) + closeTx.AddTxIn( + &wire.TxIn{ + PreviousOutPoint: channel.FundingOutpoint, + }, + ) if err := channel.MarkCommitmentBroadcasted(closeTx); err != nil { t.Fatalf("unable to mark commitment broadcast: %v", err) } @@ -917,6 +922,19 @@ func TestFetchWaitingCloseChannels(t *testing.T) { t.Fatalf("expected channel %v to be waiting close", channel.FundingOutpoint) } + + // Finally, make sure we can retrieve the closing tx for the + // channel. + closeTx, err := channel.BroadcastedCommitment() + if err != nil { + t.Fatalf("Unable to retrieve commitment: %v", err) + } + + if closeTx.TxIn[0].PreviousOutPoint != channel.FundingOutpoint { + t.Fatalf("expected outpoint %v, got %v", + channel.FundingOutpoint, + closeTx.TxIn[0].PreviousOutPoint) + } } } From 425afd28eaec63bb249c06511b075ddb89f38611 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:39 +0200 Subject: [PATCH 07/17] contractcourt/chain_arbitrator: republish closeTx for open channels --- contractcourt/chain_arbitrator.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index b3cd497d..44035b06 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -412,6 +412,36 @@ func (c *ChainArbitrator) Start() error { } c.activeChannels[chanPoint] = channelArb + + // If the channel has had its commitment broadcasted already, + // republish it in case it didn't propagate. + if !channel.HasChanStatus( + channeldb.ChanStatusCommitBroadcasted, + ) { + continue + } + + closeTx, err := channel.BroadcastedCommitment() + switch { + + // This can happen for channels that had their closing tx + // published before we started storing it to disk. + case err == channeldb.ErrNoCloseTx: + log.Warnf("Channel %v is in state CommitBroadcasted, "+ + "but no closing tx to re-publish...", chanPoint) + continue + + case err != nil: + return err + } + + log.Infof("Re-publishing closing tx(%v) for channel %v", + closeTx.TxHash(), chanPoint) + err = c.cfg.PublishTx(closeTx) + if err != nil && err != lnwallet.ErrDoubleSpend { + log.Warnf("Unable to broadcast close tx(%v): %v", + closeTx.TxHash(), err) + } } // In addition to the channels that we know to be open, we'll also From 07a42971bfd50d964cff59540427fef44599281f Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:39 +0200 Subject: [PATCH 08/17] lnwallet/test_utils: make CreateTestChannel return random funding op Previously it would always be the same, resulting in multiple calls to the method not being usabel to create more than one set of channels. --- lnwallet/test_utils.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lnwallet/test_utils.go b/lnwallet/test_utils.go index 37f13c55..32c6db95 100644 --- a/lnwallet/test_utils.go +++ b/lnwallet/test_utils.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "io" "io/ioutil" + prand "math/rand" "net" "os" @@ -102,7 +103,7 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error) prevOut := &wire.OutPoint{ Hash: chainhash.Hash(testHdSeed), - Index: 0, + Index: prand.Uint32(), } fundingTxIn := wire.NewTxIn(prevOut, nil, nil) From d75feeb95354cc6311ed61724df7d98548ce7633 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:39 +0200 Subject: [PATCH 09/17] contractcourt/chain_arbitrator_test: add TestChainArbitratorRepublishCommitment TestChainArbitratorRepulishCommitment testst that the chain arbitrator will republish closing transactions for channels marked CommitementBroadcast in the database at startup. --- contractcourt/chain_arbitrator_test.go | 116 +++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/contractcourt/chain_arbitrator_test.go b/contractcourt/chain_arbitrator_test.go index 236a3336..4ff3d064 100644 --- a/contractcourt/chain_arbitrator_test.go +++ b/contractcourt/chain_arbitrator_test.go @@ -1 +1,117 @@ package contractcourt + +import ( + "io/ioutil" + "net" + "os" + "testing" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwallet" +) + +// TestChainArbitratorRepulishCommitment testst that the chain arbitrator will +// republish closing transactions for channels marked CommitementBroadcast in +// the database at startup. +func TestChainArbitratorRepublishCommitment(t *testing.T) { + t.Parallel() + + tempPath, err := ioutil.TempDir("", "testdb") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempPath) + + db, err := channeldb.Open(tempPath) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // Create 10 test channels and sync them to the database. + const numChans = 10 + var channels []*channeldb.OpenChannel + for i := 0; i < numChans; i++ { + lChannel, _, cleanup, err := lnwallet.CreateTestChannels() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + channel := lChannel.State() + + // We manually set the db here to make sure all channels are + // synced to the same db. + channel.Db = db + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := channel.SyncPending(addr, 101); err != nil { + t.Fatal(err) + } + + channels = append(channels, channel) + } + + // Mark half of the channels as commitment broadcasted. + for i := 0; i < numChans/2; i++ { + closeTx := channels[i].FundingTxn.Copy() + closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint + err := channels[i].MarkCommitmentBroadcasted(closeTx) + if err != nil { + t.Fatal(err) + } + } + + // We keep track of the transactions published by the ChainArbitrator + // at startup. + published := make(map[chainhash.Hash]struct{}) + + chainArbCfg := ChainArbitratorConfig{ + ChainIO: &mockChainIO{}, + Notifier: &mockNotifier{}, + PublishTx: func(tx *wire.MsgTx) error { + published[tx.TxHash()] = struct{}{} + return nil + }, + } + chainArb := NewChainArbitrator( + chainArbCfg, db, + ) + + if err := chainArb.Start(); err != nil { + t.Fatal(err) + } + defer func() { + if err := chainArb.Stop(); err != nil { + t.Fatal(err) + } + }() + + // Half of the channels should have had their closing tx re-published. + if len(published) != numChans/2 { + t.Fatalf("expected %d re-published transactions, got %d", + numChans/2, len(published)) + } + + // And make sure the published transactions are correct, and unique. + for i := 0; i < numChans/2; i++ { + closeTx := channels[i].FundingTxn.Copy() + closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint + + _, ok := published[closeTx.TxHash()] + if !ok { + t.Fatalf("closing tx not re-published") + } + + delete(published, closeTx.TxHash()) + } + + if len(published) != 0 { + t.Fatalf("unexpected tx published") + } +} From f40f4620f7da416c18a54bbbc36b51d24ab6b149 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:39 +0200 Subject: [PATCH 10/17] lnwallet/channel: make ErrCommitSyncLocalDataLoss type This commit converts the ErrCommitSyncLocalDataLoss error into a struct, that also holds the received last unrevoked commit point from the remote party. --- htlcswitch/link.go | 7 ++++++- lnwallet/channel.go | 40 +++++++++++++++++++++++++++++----------- lnwallet/channel_test.go | 4 ++-- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index c7ef3800..5c283038 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -895,6 +895,11 @@ func (l *channelLink) htlcManager() { if l.cfg.SyncStates { err := l.syncChanStates() if err != nil { + log.Warnf("Error when syncing channel states: %v", err) + + _, localDataLoss := + err.(*lnwallet.ErrCommitSyncLocalDataLoss) + switch { case err == ErrLinkShuttingDown: log.Debugf("unable to sync channel states, " + @@ -936,7 +941,7 @@ func (l *channelLink) htlcManager() { // TODO(halseth): mark this, such that we prevent // channel from being force closed by the user or // contractcourt etc. - case err == lnwallet.ErrCommitSyncLocalDataLoss: + case localDataLoss: // We determined the commit chains were not possible to // sync. We cautiously fail the channel, but don't diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 00a7f0db..1b61a637 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -81,16 +81,6 @@ var ( ErrInvalidLocalUnrevokedCommitPoint = fmt.Errorf("unrevoked commit " + "point is invalid") - // ErrCommitSyncLocalDataLoss is returned in the case that we receive a - // valid commit secret within the ChannelReestablish message from the - // remote node AND they advertise a RemoteCommitTailHeight higher than - // our current known height. This means we have lost some critical - // data, and must fail the channel and MUST NOT force close it. Instead - // we should wait for the remote to force close it, such that we can - // attempt to sweep our funds. - ErrCommitSyncLocalDataLoss = fmt.Errorf("possible local commitment " + - "state data loss") - // ErrCommitSyncRemoteDataLoss is returned in the case that we receive // a ChannelReestablish message from the remote that advertises a // NextLocalCommitHeight that is lower than what they have already @@ -101,6 +91,30 @@ var ( "state data loss") ) +// ErrCommitSyncLocalDataLoss is returned in the case that we receive a valid +// commit secret within the ChannelReestablish message from the remote node AND +// they advertise a RemoteCommitTailHeight higher than our current known +// height. This means we have lost some critical data, and must fail the +// channel and MUST NOT force close it. Instead we should wait for the remote +// to force close it, such that we can attempt to sweep our funds. The +// commitment point needed to sweep the remote's force close is encapsuled. +type ErrCommitSyncLocalDataLoss struct { + // ChannelPoint is the identifier for the channel that experienced data + // loss. + ChannelPoint wire.OutPoint + + // CommitPoint is the last unrevoked commit point, sent to us by the + // remote when we determined we had lost state. + CommitPoint *btcec.PublicKey +} + +// Error returns a string representation of the local data loss error. +func (e *ErrCommitSyncLocalDataLoss) Error() string { + return fmt.Sprintf("ChannelPoint(%v) with CommitPoint(%x) had "+ + "possible local commitment state data loss", e.ChannelPoint, + e.CommitPoint.SerializeCompressed()) +} + // channelState is an enum like type which represents the current state of a // particular channel. // TODO(roasbeef): actually update state @@ -3313,7 +3327,11 @@ func (lc *LightningChannel) ProcessChanSyncMsg( if err != nil { return nil, nil, nil, err } - return nil, nil, nil, ErrCommitSyncLocalDataLoss + + return nil, nil, nil, &ErrCommitSyncLocalDataLoss{ + ChannelPoint: lc.channelState.FundingOutpoint, + CommitPoint: msg.LocalUnrevokedCommitPoint, + } // If the height of our commitment chain reported by the remote party // is behind our view of the chain, then they probably lost some state, diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 81077862..3a0281c5 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -3688,7 +3688,7 @@ func TestChanSyncFailure(t *testing.T) { // Alice should detect from Bob's message that she lost state. _, _, _, err = aliceOld.ProcessChanSyncMsg(bobSyncMsg) - if err != ErrCommitSyncLocalDataLoss { + if _, ok := err.(*ErrCommitSyncLocalDataLoss); !ok { t.Fatalf("wrong error, expected "+ "ErrCommitSyncLocalDataLoss instead got: %v", err) @@ -4377,7 +4377,7 @@ func TestChanSyncInvalidLastSecret(t *testing.T) { // Alice's former self should conclude that she possibly lost data as // Bob is sending a valid commit secret for the latest state. _, _, _, err = aliceOld.ProcessChanSyncMsg(bobChanSync) - if err != ErrCommitSyncLocalDataLoss { + if _, ok := err.(*ErrCommitSyncLocalDataLoss); !ok { t.Fatalf("wrong error, expected ErrCommitSyncLocalDataLoss "+ "instead got: %v", err) } From 6e361d04cf3587ac3dd6155f4394fb775502a8d6 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:40 +0200 Subject: [PATCH 11/17] lnwallet+link: move marking channel states to link Instead of marking the database state when processing the channel reestablishment message, we wait for the result of this processing to arrive in the link, and mark it accordingly in the database here. We do this move the logic determining whether we should force close the channel or not, and what state to mark it in the DB, to the same place, as these need to be consistent. --- htlcswitch/link.go | 29 ++++++++++++++++------ lnwallet/channel.go | 59 ++++++++++++++++----------------------------- 2 files changed, 43 insertions(+), 45 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 5c283038..2b78abeb 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -897,7 +897,7 @@ func (l *channelLink) htlcManager() { if err != nil { log.Warnf("Error when syncing channel states: %v", err) - _, localDataLoss := + errDataLoss, localDataLoss := err.(*lnwallet.ErrCommitSyncLocalDataLoss) switch { @@ -922,6 +922,12 @@ func (l *channelLink) htlcManager() { // what they sent us before. // TODO(halseth): ban peer? case err == lnwallet.ErrInvalidLocalUnrevokedCommitPoint: + err = l.channel.MarkBorked() + if err != nil { + log.Errorf("Unable to mark channel "+ + "borked: %v", err) + } + l.fail( LinkFailureError{ code: ErrSyncError, @@ -935,13 +941,18 @@ func (l *channelLink) htlcManager() { // We have lost state and cannot safely force close the // channel. Fail the channel and wait for the remote to // hopefully force close it. The remote has sent us its - // latest unrevoked commitment point, that we stored in - // the database, that we can use to retrieve the funds - // when the remote closes the channel. - // TODO(halseth): mark this, such that we prevent - // channel from being force closed by the user or - // contractcourt etc. + // latest unrevoked commitment point, and we'll store + // it in the database, such that we can attempt to + // recover the funds if the remote force closes the + // channel. case localDataLoss: + err := l.channel.MarkDataLoss( + errDataLoss.CommitPoint, + ) + if err != nil { + log.Errorf("Unable to mark channel "+ + "data loss: %v", err) + } // We determined the commit chains were not possible to // sync. We cautiously fail the channel, but don't @@ -949,6 +960,10 @@ func (l *channelLink) htlcManager() { // TODO(halseth): can we safely force close in any // cases where this error is returned? case err == lnwallet.ErrCannotSyncCommitChains: + if err := l.channel.MarkBorked(); err != nil { + log.Errorf("Unable to mark channel "+ + "borked: %v", err) + } // Other, unspecified error. default: diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 1b61a637..6a0a3e8d 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -3307,10 +3307,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( // doesn't support data loss protection. In either case // it is not safe for us to keep using the channel, so // we mark it borked and fail the channel. - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } - walletLog.Errorf("ChannelPoint(%v), sync failed: "+ "local data loss, but no recovery option.", lc.channelState.FundingOutpoint) @@ -3318,16 +3314,7 @@ func (lc *LightningChannel) ProcessChanSyncMsg( } // In this case, we've likely lost data and shouldn't proceed - // with channel updates. So we'll store the commit point we - // were given in the database, such that we can attempt to - // recover the funds if the remote force closes the channel. - err := lc.channelState.MarkDataLoss( - msg.LocalUnrevokedCommitPoint, - ) - if err != nil { - return nil, nil, nil, err - } - + // with channel updates. return nil, nil, nil, &ErrCommitSyncLocalDataLoss{ ChannelPoint: lc.channelState.FundingOutpoint, CommitPoint: msg.LocalUnrevokedCommitPoint, @@ -3341,10 +3328,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( "believes our tail height is %v, while we have %v!", lc.channelState.FundingOutpoint, msg.RemoteCommitTailHeight, localTailHeight) - - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCommitSyncRemoteDataLoss // Their view of our commit chain is consistent with our view. @@ -3408,10 +3391,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( "believes our tail height is %v, while we have %v!", lc.channelState.FundingOutpoint, msg.RemoteCommitTailHeight, localTailHeight) - - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCannotSyncCommitChains } @@ -3430,9 +3409,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( lc.channelState.FundingOutpoint, msg.NextLocalCommitHeight, remoteTipHeight) - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCannotSyncCommitChains // They are waiting for a state they have already ACKed. @@ -3444,9 +3420,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( // They previously ACKed our current tail, and now they are // waiting for it. They probably lost state. - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCommitSyncRemoteDataLoss // They have received our latest commitment, life is good. @@ -3492,10 +3465,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( "next commit height is %v, while we believe it is %v!", lc.channelState.FundingOutpoint, msg.NextLocalCommitHeight, remoteTipHeight) - - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } return nil, nil, nil, ErrCannotSyncCommitChains } @@ -3533,12 +3502,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( "sent invalid commit point for height %v!", lc.channelState.FundingOutpoint, msg.NextLocalCommitHeight) - - if err := lc.channelState.MarkBorked(); err != nil { - return nil, nil, nil, err - } - - // TODO(halseth): force close? return nil, nil, nil, ErrInvalidLocalUnrevokedCommitPoint } @@ -6274,6 +6237,16 @@ func (lc *LightningChannel) State() *channeldb.OpenChannel { return lc.channelState } +// MarkBorked marks the event when the channel as reached an irreconcilable +// state, such as a channel breach or state desynchronization. Borked channels +// should never be added to the switch. +func (lc *LightningChannel) MarkBorked() error { + lc.Lock() + defer lc.Unlock() + + return lc.channelState.MarkBorked() +} + // MarkCommitmentBroadcasted marks the channel as a commitment transaction has // been broadcast, either our own or the remote, and we should watch the chain // for it to confirm before taking any further action. @@ -6284,6 +6257,16 @@ func (lc *LightningChannel) MarkCommitmentBroadcasted(tx *wire.MsgTx) error { return lc.channelState.MarkCommitmentBroadcasted(tx) } +// MarkDataLoss marks sets the channel status to LocalDataLoss and stores the +// passed commitPoint for use to retrieve funds in case the remote force closes +// the channel. +func (lc *LightningChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error { + lc.Lock() + defer lc.Unlock() + + return lc.channelState.MarkDataLoss(commitPoint) +} + // ActiveHtlcs returns a slice of HTLC's which are currently active on *both* // commitment transactions. func (lc *LightningChannel) ActiveHtlcs() []channeldb.HTLC { From 9423fadf56a6268e068e847d4a4f4a91ef46e0b1 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:40 +0200 Subject: [PATCH 12/17] htlcswitch/link: don't mark channel borked on force close scanarios Instead of marking the channel Borked in cases where we want to force close it, we immediately let the peer fail the link. The channel state will instead be updated by the channel arbitrator, which will transition to StateBroadcastCommit, marking the channel borked, then marking the commitment tx broadcasted right before publishing the force close tx. We do this to avoid the case where we would mark it Borked, but go down before being able to publish the closing tx. Storing the force close tx ensures it will be re-published on startup. --- htlcswitch/link.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 2b78abeb..04cebc8d 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -922,12 +922,12 @@ func (l *channelLink) htlcManager() { // what they sent us before. // TODO(halseth): ban peer? case err == lnwallet.ErrInvalidLocalUnrevokedCommitPoint: - err = l.channel.MarkBorked() - if err != nil { - log.Errorf("Unable to mark channel "+ - "borked: %v", err) - } - + // We'll fail the link and tell the peer to + // force close the channel. Note that the + // database state is not updated here, but will + // be updated when the close transaction is + // ready to avoid that we go down before + // storing the transaction in the db. l.fail( LinkFailureError{ code: ErrSyncError, From 2cb80c4449a5e528980d35d3f014aaefcf78b967 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 11 Sep 2019 10:56:33 +0200 Subject: [PATCH 13/17] contractcourt/chain_arbitrator: mark commitment broadcasted before publish Before publishing the close tx to the network and commit to the StateCommitmentBroadcasted state, we mark the commitment as broadcasted and store it to the db. This ensures it will get re-published on startup if we go down. --- contractcourt/channel_arbitrator.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index c15d855e..42593537 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -821,6 +821,16 @@ func (c *ChannelArbitrator) stateStep( } closeTx = closeSummary.CloseTx + // Before publishing the transaction, we store it to the + // database, such that we can re-publish later in case it + // didn't propagate. + if err := c.cfg.MarkCommitmentBroadcasted(closeTx); err != nil { + log.Errorf("ChannelArbitrator(%v): unable to "+ + "mark commitment broadcasted: %v", + c.cfg.ChanPoint, err) + return StateError, closeTx, err + } + // With the close transaction in hand, broadcast the // transaction to the network, thereby entering the post // channel resolution state. @@ -840,12 +850,6 @@ func (c *ChannelArbitrator) stateStep( } } - if err := c.cfg.MarkCommitmentBroadcasted(closeTx); err != nil { - log.Errorf("ChannelArbitrator(%v): unable to "+ - "mark commitment broadcasted: %v", - c.cfg.ChanPoint, err) - } - // We go to the StateCommitmentBroadcasted state, where we'll // be waiting for the commitment to be confirmed. nextState = StateCommitmentBroadcasted From c90b1dd17ddf9de83f43a14cbaca21395ccef4f3 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 6 Sep 2019 13:14:40 +0200 Subject: [PATCH 14/17] chancloser: mark commitment broadcast before publish We call MarkCommitmentBroadcasted before publishing the closing tx to ensure we can attempt to republish at startup if something goes wrong. --- chancloser.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/chancloser.go b/chancloser.go index d78a3824..3c015a5b 100644 --- a/chancloser.go +++ b/chancloser.go @@ -435,6 +435,14 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b "close: %v", c.chanPoint, err) } + // Before publishing the closing tx, we persist it to the + // database, such that it can be republished if something goes + // wrong. + err = c.cfg.channel.MarkCommitmentBroadcasted(closeTx) + if err != nil { + return nil, false, err + } + // With the closing transaction crafted, we'll now broadcast it // to the network. peerLog.Infof("Broadcasting cooperative close tx: %v", @@ -445,11 +453,6 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b return nil, false, err } - err = c.cfg.channel.MarkCommitmentBroadcasted(closeTx) - if err != nil { - return nil, false, err - } - // Finally, we'll transition to the closeFinished state, and // also return the final close signed message we sent. // Additionally, we return true for the second argument to From 2a6ad6e6343ff56a03fc0c1d27d148aebed854a4 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 11 Sep 2019 11:15:57 +0200 Subject: [PATCH 15/17] channeldb+lnwallet: don't pass isRestoredChan to ChanSyncMsg Since we have access to the internal state of the channel, we can instead get it directly instead of passing it in as a parameter. --- channeldb/channel.go | 13 ++++------- contractcourt/chain_watcher.go | 12 +++------- htlcswitch/link.go | 4 +--- lnwallet/channel.go | 4 +--- lnwallet/channel_test.go | 42 +++++++++++++++++----------------- peer.go | 2 +- 6 files changed, 32 insertions(+), 45 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 3db55e08..8b5d5253 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -803,13 +803,10 @@ func (c *OpenChannel) MarkBorked() error { // 3. We didn't get the last RevokeAndAck message they sent, so they'll // re-send it. // -// The isRestoredChan bool indicates if we need to craft a chan sync message -// for a channel that's been restored. If this is a restored channel, then -// we'll modify our typical chan sync message to ensure they force close even -// if we're on the very first state. -func (c *OpenChannel) ChanSyncMsg( - isRestoredChan bool) (*lnwire.ChannelReestablish, error) { - +// If this is a restored channel, having status ChanStatusRestored, then we'll +// modify our typical chan sync message to ensure they force close even if +// we're on the very first state. +func (c *OpenChannel) ChanSyncMsg() (*lnwire.ChannelReestablish, error) { c.Lock() defer c.Unlock() @@ -853,7 +850,7 @@ func (c *OpenChannel) ChanSyncMsg( // If we've restored this channel, then we'll purposefully give them an // invalid LocalUnrevokedCommitPoint so they'll force close the channel // allowing us to sweep our funds. - if isRestoredChan { + if c.hasChanStatus(ChanStatusRestored) { currentCommitSecret[0] ^= 1 } diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index c7ba2323..3d13c922 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -731,9 +731,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet } // Attempt to add a channel sync message to the close summary. - chanSync, err := c.cfg.chanState.ChanSyncMsg( - c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + chanSync, err := c.cfg.chanState.ChanSyncMsg() if err != nil { log.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", c.cfg.chanState.FundingOutpoint, err) @@ -810,9 +808,7 @@ func (c *chainWatcher) dispatchLocalForceClose( } // Attempt to add a channel sync message to the close summary. - chanSync, err := c.cfg.chanState.ChanSyncMsg( - c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + chanSync, err := c.cfg.chanState.ChanSyncMsg() if err != nil { log.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", c.cfg.chanState.FundingOutpoint, err) @@ -996,9 +992,7 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail } // Attempt to add a channel sync message to the close summary. - chanSync, err := c.cfg.chanState.ChanSyncMsg( - c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + chanSync, err := c.cfg.chanState.ChanSyncMsg() if err != nil { log.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", c.cfg.chanState.FundingOutpoint, err) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 04cebc8d..fc92a5e9 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -609,9 +609,7 @@ func (l *channelLink) syncChanStates() error { // side. Based on this message, the remote party will decide if they // need to retransmit any data or not. chanState := l.channel.State() - localChanSyncMsg, err := chanState.ChanSyncMsg( - chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + localChanSyncMsg, err := chanState.ChanSyncMsg() if err != nil { return fmt.Errorf("unable to generate chan sync message for "+ "ChannelPoint(%v)", l.channel.ChannelPoint()) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 6a0a3e8d..90871498 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -5089,9 +5089,7 @@ func NewUnilateralCloseSummary(chanState *channeldb.OpenChannel, signer input.Si } // Attempt to add a channel sync message to the close summary. - chanSync, err := chanState.ChanSyncMsg( - chanState.HasChanStatus(channeldb.ChanStatusRestored), - ) + chanSync, err := chanState.ChanSyncMsg() if err != nil { walletLog.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", chanState.FundingOutpoint, err) diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 3a0281c5..ca686d34 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -2530,7 +2530,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel, _, _, line, _ := runtime.Caller(1) - aliceChanSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) + aliceChanSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("line #%v: unable to produce chan sync msg: %v", line, err) @@ -2545,7 +2545,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel, "instead wants to send: %v", line, spew.Sdump(bobMsgsToSend)) } - bobChanSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) + bobChanSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("line #%v: unable to produce chan sync msg: %v", line, err) @@ -2778,11 +2778,11 @@ func TestChanSyncOweCommitment(t *testing.T) { // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, // while Bob should think he's properly synchronized. - aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3092,11 +3092,11 @@ func TestChanSyncOweRevocation(t *testing.T) { // If we fetch the channel sync messages at this state, then Alice // should report that she owes Bob a revocation message, while Bob // thinks they're fully in sync. - aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3261,11 +3261,11 @@ func TestChanSyncOweRevocationAndCommit(t *testing.T) { // If we now attempt to resync, then Alice should conclude that she // doesn't need any further updates, while Bob concludes that he needs // to re-send both his revocation and commit sig message. - aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3472,11 +3472,11 @@ func TestChanSyncOweRevocationAndCommitForceTransition(t *testing.T) { // Now if we attempt to synchronize states at this point, Alice should // detect that she owes nothing, while Bob should re-send both his // RevokeAndAck as well as his commitment message. - aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3677,11 +3677,11 @@ func TestChanSyncFailure(t *testing.T) { assertLocalDataLoss := func(aliceOld *LightningChannel) { t.Helper() - aliceSyncMsg, err := aliceOld.channelState.ChanSyncMsg(false) + aliceSyncMsg, err := aliceOld.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3755,7 +3755,7 @@ func TestChanSyncFailure(t *testing.T) { // If we remove the recovery options from Bob's message, Alice cannot // tell if she lost state, since Bob might be lying. She still should // be able to detect that chains cannot be synced. - bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3769,7 +3769,7 @@ func TestChanSyncFailure(t *testing.T) { // If Bob lies about the NextLocalCommitHeight, making it greater than // what Alice expect, she cannot tell for sure whether she lost state, // but should detect the desync. - bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3782,7 +3782,7 @@ func TestChanSyncFailure(t *testing.T) { // If Bob's NextLocalCommitHeight is lower than what Alice expects, Bob // probably lost state. - bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3795,7 +3795,7 @@ func TestChanSyncFailure(t *testing.T) { // If Alice and Bob's states are in sync, but Bob is sending the wrong // LocalUnrevokedCommitPoint, Alice should detect this. - bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3824,7 +3824,7 @@ func TestChanSyncFailure(t *testing.T) { // when there's a pending remote commit. halfAdvance() - bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3912,11 +3912,11 @@ func TestChannelRetransmissionFeeUpdate(t *testing.T) { // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, // while Bob should think he's properly synchronized. - aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg(false) + aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg(false) + bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -4361,11 +4361,11 @@ func TestChanSyncInvalidLastSecret(t *testing.T) { } // Next, we'll produce the ChanSync messages for both parties. - aliceChanSync, err := aliceChannel.channelState.ChanSyncMsg(false) + aliceChanSync, err := aliceChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to generate chan sync msg: %v", err) } - bobChanSync, err := bobChannel.channelState.ChanSyncMsg(false) + bobChanSync, err := bobChannel.channelState.ChanSyncMsg() if err != nil { t.Fatalf("unable to generate chan sync msg: %v", err) } diff --git a/peer.go b/peer.go index 33639a9b..962f5fad 100644 --- a/peer.go +++ b/peer.go @@ -454,7 +454,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( // channel reestablish message sent from the peer, but // that's okay since the assumption is that we did when // marking the channel borked. - chanSync, err := dbChan.ChanSyncMsg(false) + chanSync, err := dbChan.ChanSyncMsg() if err != nil { peerLog.Errorf("Unable to create channel "+ "reestablish message for channel %v: "+ From 2d194c9672c3f3afb3e292ffd3b19538b3212fdc Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 11 Sep 2019 12:26:52 +0200 Subject: [PATCH 16/17] htlcswitch/link: remove chan sync delay Earlier this delay was needed to increase the likelihood that the DLP scanario was successfully completed. Since we would risk the connection being torn down, and the link exit, we could end up with the remote marking the channel borked, but not finishing the force close. With the previous set of commits, we should now trigger the force close before we merk the channel borked, which should ensure we'll resume the orocess on next restart/connect. --- htlcswitch/link.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index fc92a5e9..351a7623 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -615,21 +615,6 @@ func (l *channelLink) syncChanStates() error { "ChannelPoint(%v)", l.channel.ChannelPoint()) } - // If we have a restored channel, we'll delay sending our channel - // reestablish message briefly to ensure we first have a stable - // connection. Sending the message will cause the remote peer to force - // close the channel, which currently may not be resumed reliably if the - // connection is being torn down simultaneously. This delay can be - // removed after the force close is reliable, but in the meantime it - // improves the reliability of successfully closing out the channel. - if chanState.HasChanStatus(channeldb.ChanStatusRestored) { - select { - case <-time.After(5 * time.Second): - case <-l.quit: - return ErrLinkShuttingDown - } - } - if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil { return fmt.Errorf("Unable to send chan sync message for "+ "ChannelPoint(%v)", l.channel.ChannelPoint()) From 97093b4223d9a3b0cdecd99fb079a609d530b008 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 18 Sep 2019 12:46:44 +0200 Subject: [PATCH 17/17] lntest/itest: wait for on-chain balance restore We add a wait predicate to make sure the node's on-chain balance is restored before continuing the restore test case. This is needed since the DLP test scenario includes several restarts of the node, and if the node isn't done scanning for on-chain balance before the restart happens, it would be unlocked without a recovery window, causing funds to be left undiscovered. --- lntest/itest/lnd_test.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index dfd4be60..6c26b2a0 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -8121,7 +8121,9 @@ func assertDLPExecuted(net *lntest.NetworkHarness, t *harnessTest, assertTxInBlock(t, block, forceClose) // Dave should sweep his funds immediately, as they are not timelocked. - daveSweep, err := waitForTxInMempool(net.Miner.Node, minerMempoolTimeout) + daveSweep, err := waitForTxInMempool( + net.Miner.Node, minerMempoolTimeout, + ) if err != nil { t.Fatalf("unable to find Dave's sweep tx in mempool: %v", err) } @@ -13559,6 +13561,27 @@ func testChanRestoreScenario(t *harnessTest, net *lntest.NetworkHarness, t.Fatalf("unable to restore node: %v", err) } + // First ensure that the on-chain balance is restored. + err = wait.NoError(func() error { + ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) + balReq := &lnrpc.WalletBalanceRequest{} + daveBalResp, err := dave.WalletBalance(ctxt, balReq) + if err != nil { + return err + } + + daveBal := daveBalResp.ConfirmedBalance + if daveBal <= 0 { + return fmt.Errorf("expected positive balance, had %v", + daveBal) + } + + return nil + }, defaultTimeout) + if err != nil { + t.Fatalf("On-chain balance not restored: %v", err) + } + // Now that we have our new node up, we expect that it'll re-connect to // Carol automatically based on the restored backup. ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)