contractcourt: generalize rebroadcast for force and coop

This commit is contained in:
Conner Fromknecht 2019-12-04 13:29:51 -08:00
parent 1c0dc98a7c
commit b3c28c9cba
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
2 changed files with 104 additions and 34 deletions

@ -432,35 +432,11 @@ func (c *ChainArbitrator) Start() error {
c.activeChannels[chanPoint] = channelArb
// If the channel has had its commitment broadcasted already,
// republish it in case it didn't propagate.
if !channel.HasChanStatus(
channeldb.ChanStatusCommitBroadcasted,
) {
continue
}
closeTx, err := channel.BroadcastedCommitment()
switch {
// This can happen for channels that had their closing tx
// published before we started storing it to disk.
case err == channeldb.ErrNoCloseTx:
log.Warnf("Channel %v is in state CommitBroadcasted, "+
"but no closing tx to re-publish...", chanPoint)
continue
case err != nil:
// Republish any closing transactions for this channel.
err = c.publishClosingTxs(channel)
if err != nil {
return err
}
log.Infof("Re-publishing closing tx(%v) for channel %v",
closeTx.TxHash(), chanPoint)
err = c.cfg.PublishTx(closeTx)
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Warnf("Unable to broadcast close tx(%v): %v",
closeTx.TxHash(), err)
}
}
// In addition to the channels that we know to be open, we'll also
@ -570,6 +546,90 @@ func (c *ChainArbitrator) Start() error {
return nil
}
// publishClosingTxs will load any stored cooperative or unilater closing
// transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed.
func (c *ChainArbitrator) publishClosingTxs(
channel *channeldb.OpenChannel) error {
// If the channel has had its unilateral close broadcasted already,
// republish it in case it didn't propagate.
if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
err := c.rebroadcast(
channel, channeldb.ChanStatusCommitBroadcasted,
)
if err != nil {
return err
}
}
// If the channel has had its cooperative close broadcasted
// already, republish it in case it didn't propagate.
if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
err := c.rebroadcast(
channel, channeldb.ChanStatusCoopBroadcasted,
)
if err != nil {
return err
}
}
return nil
}
// rebroadcast is a helper method which will republish the unilateral or
// cooperative close transaction or a channel in a particular state.
//
// NOTE: There is no risk to caling this method if the channel isn't in either
// CommimentBroadcasted or CoopBroadcasted, but the logs will be misleading.
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
state channeldb.ChannelStatus) error {
chanPoint := channel.FundingOutpoint
var (
closeTx *wire.MsgTx
kind string
err error
)
switch state {
case channeldb.ChanStatusCommitBroadcasted:
kind = "force"
closeTx, err = channel.BroadcastedCommitment()
case channeldb.ChanStatusCoopBroadcasted:
kind = "coop"
closeTx, err = channel.BroadcastedCooperative()
default:
return fmt.Errorf("unknown closing state: %v", state)
}
switch {
// This can happen for channels that had their closing tx published
// before we started storing it to disk.
case err == channeldb.ErrNoCloseTx:
log.Warnf("Channel %v is in state %v, but no %s closing tx "+
"to re-publish...", chanPoint, state, kind)
return nil
case err != nil:
return err
}
log.Infof("Re-publishing %s close tx(%v) for channel %v",
kind, closeTx.TxHash(), chanPoint)
err = c.cfg.PublishTx(closeTx)
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Warnf("Unable to broadcast %s close tx(%v): %v",
kind, closeTx.TxHash(), err)
}
return nil
}
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
// channel arbitrators will be signalled to exit, and this method will block
// until they've all exited.

@ -12,10 +12,10 @@ import (
"github.com/lightningnetwork/lnd/lnwallet"
)
// TestChainArbitratorRepulishCommitment testst that the chain arbitrator will
// republish closing transactions for channels marked CommitementBroadcast in
// the database at startup.
func TestChainArbitratorRepublishCommitment(t *testing.T) {
// TestChainArbitratorRepulishCloses tests that the chain arbitrator will
// republish closing transactions for channels marked CommitementBroadcast or
// CoopBroadcast in the database at startup.
func TestChainArbitratorRepublishCloses(t *testing.T) {
t.Parallel()
tempPath, err := ioutil.TempDir("", "testdb")
@ -65,17 +65,22 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = channels[i].MarkCoopBroadcasted(closeTx)
if err != nil {
t.Fatal(err)
}
}
// We keep track of the transactions published by the ChainArbitrator
// at startup.
published := make(map[chainhash.Hash]struct{})
published := make(map[chainhash.Hash]int)
chainArbCfg := ChainArbitratorConfig{
ChainIO: &mockChainIO{},
Notifier: &mockNotifier{},
PublishTx: func(tx *wire.MsgTx) error {
published[tx.TxHash()] = struct{}{}
published[tx.TxHash()]++
return nil
},
}
@ -103,11 +108,16 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) {
closeTx := channels[i].FundingTxn.Copy()
closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint
_, ok := published[closeTx.TxHash()]
count, ok := published[closeTx.TxHash()]
if !ok {
t.Fatalf("closing tx not re-published")
}
// We expect one coop close and one force close.
if count != 2 {
t.Fatalf("expected 2 closing txns, only got %d", count)
}
delete(published, closeTx.TxHash())
}