diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 1c18253e..add69c0f 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -432,35 +432,11 @@ func (c *ChainArbitrator) Start() error { c.activeChannels[chanPoint] = channelArb - // If the channel has had its commitment broadcasted already, - // republish it in case it didn't propagate. - if !channel.HasChanStatus( - channeldb.ChanStatusCommitBroadcasted, - ) { - continue - } - - closeTx, err := channel.BroadcastedCommitment() - switch { - - // This can happen for channels that had their closing tx - // published before we started storing it to disk. - case err == channeldb.ErrNoCloseTx: - log.Warnf("Channel %v is in state CommitBroadcasted, "+ - "but no closing tx to re-publish...", chanPoint) - continue - - case err != nil: + // Republish any closing transactions for this channel. + err = c.publishClosingTxs(channel) + if err != nil { return err } - - log.Infof("Re-publishing closing tx(%v) for channel %v", - closeTx.TxHash(), chanPoint) - err = c.cfg.PublishTx(closeTx) - if err != nil && err != lnwallet.ErrDoubleSpend { - log.Warnf("Unable to broadcast close tx(%v): %v", - closeTx.TxHash(), err) - } } // In addition to the channels that we know to be open, we'll also @@ -570,6 +546,90 @@ func (c *ChainArbitrator) Start() error { return nil } +// publishClosingTxs will load any stored cooperative or unilater closing +// transactions and republish them. This helps ensure propagation of the +// transactions in the event that prior publications failed. +func (c *ChainArbitrator) publishClosingTxs( + channel *channeldb.OpenChannel) error { + + // If the channel has had its unilateral close broadcasted already, + // republish it in case it didn't propagate. + if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) { + err := c.rebroadcast( + channel, channeldb.ChanStatusCommitBroadcasted, + ) + if err != nil { + return err + } + } + + // If the channel has had its cooperative close broadcasted + // already, republish it in case it didn't propagate. + if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) { + err := c.rebroadcast( + channel, channeldb.ChanStatusCoopBroadcasted, + ) + if err != nil { + return err + } + } + + return nil +} + +// rebroadcast is a helper method which will republish the unilateral or +// cooperative close transaction or a channel in a particular state. +// +// NOTE: There is no risk to caling this method if the channel isn't in either +// CommimentBroadcasted or CoopBroadcasted, but the logs will be misleading. +func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel, + state channeldb.ChannelStatus) error { + + chanPoint := channel.FundingOutpoint + + var ( + closeTx *wire.MsgTx + kind string + err error + ) + switch state { + case channeldb.ChanStatusCommitBroadcasted: + kind = "force" + closeTx, err = channel.BroadcastedCommitment() + + case channeldb.ChanStatusCoopBroadcasted: + kind = "coop" + closeTx, err = channel.BroadcastedCooperative() + + default: + return fmt.Errorf("unknown closing state: %v", state) + } + + switch { + + // This can happen for channels that had their closing tx published + // before we started storing it to disk. + case err == channeldb.ErrNoCloseTx: + log.Warnf("Channel %v is in state %v, but no %s closing tx "+ + "to re-publish...", chanPoint, state, kind) + return nil + + case err != nil: + return err + } + + log.Infof("Re-publishing %s close tx(%v) for channel %v", + kind, closeTx.TxHash(), chanPoint) + + err = c.cfg.PublishTx(closeTx) + if err != nil && err != lnwallet.ErrDoubleSpend { + log.Warnf("Unable to broadcast %s close tx(%v): %v", + kind, closeTx.TxHash(), err) + } + + return nil +} + // Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active // channel arbitrators will be signalled to exit, and this method will block // until they've all exited. diff --git a/contractcourt/chain_arbitrator_test.go b/contractcourt/chain_arbitrator_test.go index 28682c92..07c31b28 100644 --- a/contractcourt/chain_arbitrator_test.go +++ b/contractcourt/chain_arbitrator_test.go @@ -12,10 +12,10 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" ) -// TestChainArbitratorRepulishCommitment testst that the chain arbitrator will -// republish closing transactions for channels marked CommitementBroadcast in -// the database at startup. -func TestChainArbitratorRepublishCommitment(t *testing.T) { +// TestChainArbitratorRepulishCloses tests that the chain arbitrator will +// republish closing transactions for channels marked CommitementBroadcast or +// CoopBroadcast in the database at startup. +func TestChainArbitratorRepublishCloses(t *testing.T) { t.Parallel() tempPath, err := ioutil.TempDir("", "testdb") @@ -65,17 +65,22 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) { if err != nil { t.Fatal(err) } + + err = channels[i].MarkCoopBroadcasted(closeTx) + if err != nil { + t.Fatal(err) + } } // We keep track of the transactions published by the ChainArbitrator // at startup. - published := make(map[chainhash.Hash]struct{}) + published := make(map[chainhash.Hash]int) chainArbCfg := ChainArbitratorConfig{ ChainIO: &mockChainIO{}, Notifier: &mockNotifier{}, PublishTx: func(tx *wire.MsgTx) error { - published[tx.TxHash()] = struct{}{} + published[tx.TxHash()]++ return nil }, } @@ -103,11 +108,16 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) { closeTx := channels[i].FundingTxn.Copy() closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint - _, ok := published[closeTx.TxHash()] + count, ok := published[closeTx.TxHash()] if !ok { t.Fatalf("closing tx not re-published") } + // We expect one coop close and one force close. + if count != 2 { + t.Fatalf("expected 2 closing txns, only got %d", count) + } + delete(published, closeTx.TxHash()) }