Merge pull request #3789 from cfromknecht/coop-close-rpc-status

republish force and coop closes on startup
This commit is contained in:
Conner Fromknecht 2019-12-05 10:35:53 -08:00 committed by GitHub
commit f3398c0c0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 269 additions and 65 deletions

@ -211,6 +211,24 @@ func (c *channelCloser) initChanShutdown() (*lnwire.Shutdown, error) {
// TODO(roasbeef): err if channel has htlc's?
// Before closing, we'll attempt to send a disable update for the
// channel. We do so before closing the channel as otherwise the current
// edge policy won't be retrievable from the graph.
if err := c.cfg.disableChannel(c.chanPoint); err != nil {
peerLog.Warnf("Unable to disable channel %v on "+
"close: %v", c.chanPoint, err)
}
// Before continuing, mark the channel as cooperatively closed with a
// nil txn. Even though we haven't negotiated the final txn, this
// guarantees that our listchannels rpc will be externally consistent,
// and reflect that the channel is being shutdown by the time the
// closing request returns.
err := c.cfg.channel.MarkCoopBroadcasted(nil)
if err != nil {
return nil, err
}
// Before returning the shutdown message, we'll unregister the channel
// to ensure that it isn't seen as usable within the system.
//
@ -490,18 +508,10 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
}
c.closingTx = closeTx
// Before closing, we'll attempt to send a disable update for
// the channel. We do so before closing the channel as otherwise
// the current edge policy won't be retrievable from the graph.
if err := c.cfg.disableChannel(c.chanPoint); err != nil {
peerLog.Warnf("Unable to disable channel %v on "+
"close: %v", c.chanPoint, err)
}
// Before publishing the closing tx, we persist it to the
// database, such that it can be republished if something goes
// wrong.
err = c.cfg.channel.MarkCommitmentBroadcasted(closeTx)
err = c.cfg.channel.MarkCoopBroadcasted(closeTx)
if err != nil {
return nil, false, err
}
@ -549,7 +559,6 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
// transaction for a channel based on the prior fee negotiations and our
// current compromise fee.
func (c *channelCloser) proposeCloseSigned(fee btcutil.Amount) (*lnwire.ClosingSigned, error) {
rawSig, _, _, err := c.cfg.channel.CreateCloseProposal(
fee, c.localDeliveryScript, c.remoteDeliveryScript,
)

@ -69,9 +69,13 @@ var (
// remote peer during a channel sync in case we have lost channel state.
dataLossCommitPointKey = []byte("data-loss-commit-point-key")
// closingTxKey points to a the closing tx that we broadcasted when
// moving the channel to state CommitBroadcasted.
closingTxKey = []byte("closing-tx-key")
// forceCloseTxKey points to a the unilateral closing tx that we
// broadcasted when moving the channel to state CommitBroadcasted.
forceCloseTxKey = []byte("closing-tx-key")
// coopCloseTxKey points to a the cooperative closing tx that we
// broadcasted when moving the channel to state CoopBroadcasted.
coopCloseTxKey = []byte("coop-closing-tx-key")
// commitDiffKey stores the current pending commitment state we've
// extended to the remote party (if any). Each time we propose a new
@ -382,6 +386,10 @@ var (
// has been restored, and doesn't have all the fields a typical channel
// will have.
ChanStatusRestored ChannelStatus = 1 << 3
// ChanStatusCoopBroadcasted indicates that a cooperative close for this
// channel has been broadcasted.
ChanStatusCoopBroadcasted ChannelStatus = 1 << 4
)
// chanStatusStrings maps a ChannelStatus to a human friendly string that
@ -392,6 +400,7 @@ var chanStatusStrings = map[ChannelStatus]string{
ChanStatusCommitBroadcasted: "ChanStatusCommitBroadcasted",
ChanStatusLocalDataLoss: "ChanStatusLocalDataLoss",
ChanStatusRestored: "ChanStatusRestored",
ChanStatusCoopBroadcasted: "ChanStatusCoopBroadcasted",
}
// orderedChanStatusFlags is an in-order list of all that channel status flags.
@ -401,6 +410,7 @@ var orderedChanStatusFlags = []ChannelStatus{
ChanStatusCommitBroadcasted,
ChanStatusLocalDataLoss,
ChanStatusRestored,
ChanStatusCoopBroadcasted,
}
// String returns a human-readable representation of the ChannelStatus.
@ -951,24 +961,65 @@ func (c *OpenChannel) isBorked(chanBucket *bbolt.Bucket) (bool, error) {
// republish this tx at startup to ensure propagation, and we should still
// handle the case where a different tx actually hits the chain.
func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx) error {
return c.markBroadcasted(
ChanStatusCommitBroadcasted, forceCloseTxKey, closeTx,
)
}
// MarkCoopBroadcasted marks the channel to indicate that a cooperative close
// transaction has been broadcast, either our own or the remote, and that we
// should wach the chain for it to confirm before taking further action. It
// takes as argument a cooperative close tx that could appear on chain, and
// should be rebroadcast upon startup. This is only used to republish and ensure
// propagation, and we should still handle the case where a different tx
// actually hits the chain.
func (c *OpenChannel) MarkCoopBroadcasted(closeTx *wire.MsgTx) error {
return c.markBroadcasted(
ChanStatusCoopBroadcasted, coopCloseTxKey, closeTx,
)
}
// markBroadcasted is a helper function which modifies the channel status of the
// receiving channel and inserts a close transaction under the requested key,
// which should specify either a coop or force close.
func (c *OpenChannel) markBroadcasted(status ChannelStatus, key []byte,
closeTx *wire.MsgTx) error {
c.Lock()
defer c.Unlock()
var b bytes.Buffer
if err := WriteElement(&b, closeTx); err != nil {
return err
// If a closing tx is provided, we'll generate a closure to write the
// transaction in the appropriate bucket under the given key.
var putClosingTx func(*bbolt.Bucket) error
if closeTx != nil {
var b bytes.Buffer
if err := WriteElement(&b, closeTx); err != nil {
return err
}
putClosingTx = func(chanBucket *bbolt.Bucket) error {
return chanBucket.Put(key, b.Bytes())
}
}
putClosingTx := func(chanBucket *bbolt.Bucket) error {
return chanBucket.Put(closingTxKey, b.Bytes())
}
return c.putChanStatus(ChanStatusCommitBroadcasted, putClosingTx)
return c.putChanStatus(status, putClosingTx)
}
// BroadcastedCommitment retrieves the stored closing tx set during
// BroadcastedCommitment retrieves the stored unilateral closing tx set during
// MarkCommitmentBroadcasted. If not found ErrNoCloseTx is returned.
func (c *OpenChannel) BroadcastedCommitment() (*wire.MsgTx, error) {
return c.getClosingTx(forceCloseTxKey)
}
// BroadcastedCooperative retrieves the stored cooperative closing tx set during
// MarkCoopBroadcasted. If not found ErrNoCloseTx is returned.
func (c *OpenChannel) BroadcastedCooperative() (*wire.MsgTx, error) {
return c.getClosingTx(coopCloseTxKey)
}
// getClosingTx is a helper method which returns the stored closing transaction
// for key. The caller should use either the force or coop closing keys.
func (c *OpenChannel) getClosingTx(key []byte) (*wire.MsgTx, error) {
var closeTx *wire.MsgTx
err := c.Db.View(func(tx *bbolt.Tx) error {
@ -983,7 +1034,7 @@ func (c *OpenChannel) BroadcastedCommitment() (*wire.MsgTx, error) {
return err
}
bs := chanBucket.Get(closingTxKey)
bs := chanBucket.Get(key)
if bs == nil {
return ErrNoCloseTx
}
@ -1025,6 +1076,11 @@ func (c *OpenChannel) putChanStatus(status ChannelStatus,
}
for _, f := range fs {
// Skip execution of nil closures.
if f == nil {
continue
}
if err := f(chanBucket); err != nil {
return err
}

@ -992,9 +992,28 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
PreviousOutPoint: channel.FundingOutpoint,
},
)
if err := channel.MarkCommitmentBroadcasted(closeTx); err != nil {
t.Fatalf("unable to mark commitment broadcast: %v", err)
}
// Now try to marking a coop close with a nil tx. This should
// succeed, but it shouldn't exit when queried.
if err = channel.MarkCoopBroadcasted(nil); err != nil {
t.Fatalf("unable to mark nil coop broadcast: %v", err)
}
_, err := channel.BroadcastedCooperative()
if err != ErrNoCloseTx {
t.Fatalf("expected no closing tx error, got: %v", err)
}
// Finally, modify the close tx deterministically and also mark
// it as coop closed. Later we will test that distinct
// transactions are returned for both coop and force closes.
closeTx.TxIn[0].PreviousOutPoint.Index ^= 1
if err := channel.MarkCoopBroadcasted(closeTx); err != nil {
t.Fatalf("unable to mark coop broadcast: %v", err)
}
}
// Now, we'll fetch all the channels waiting to be closed from the
@ -1004,7 +1023,7 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
if err != nil {
t.Fatalf("unable to fetch all waiting close channels: %v", err)
}
if len(waitingCloseChannels) != 2 {
if len(waitingCloseChannels) != numChannels {
t.Fatalf("expected %d channels waiting to be closed, got %d", 2,
len(waitingCloseChannels))
}
@ -1018,17 +1037,31 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
channel.FundingOutpoint)
}
// Finally, make sure we can retrieve the closing tx for the
// channel.
closeTx, err := channel.BroadcastedCommitment()
chanPoint := channel.FundingOutpoint
// Assert that the force close transaction is retrievable.
forceCloseTx, err := channel.BroadcastedCommitment()
if err != nil {
t.Fatalf("Unable to retrieve commitment: %v", err)
}
if closeTx.TxIn[0].PreviousOutPoint != channel.FundingOutpoint {
if forceCloseTx.TxIn[0].PreviousOutPoint != chanPoint {
t.Fatalf("expected outpoint %v, got %v",
channel.FundingOutpoint,
closeTx.TxIn[0].PreviousOutPoint)
chanPoint,
forceCloseTx.TxIn[0].PreviousOutPoint)
}
// Assert that the coop close transaction is retrievable.
coopCloseTx, err := channel.BroadcastedCooperative()
if err != nil {
t.Fatalf("unable to retrieve coop close: %v", err)
}
chanPoint.Index ^= 1
if coopCloseTx.TxIn[0].PreviousOutPoint != chanPoint {
t.Fatalf("expected outpoint %v, got %v",
chanPoint,
coopCloseTx.TxIn[0].PreviousOutPoint)
}
}
}

@ -432,35 +432,11 @@ func (c *ChainArbitrator) Start() error {
c.activeChannels[chanPoint] = channelArb
// If the channel has had its commitment broadcasted already,
// republish it in case it didn't propagate.
if !channel.HasChanStatus(
channeldb.ChanStatusCommitBroadcasted,
) {
continue
}
closeTx, err := channel.BroadcastedCommitment()
switch {
// This can happen for channels that had their closing tx
// published before we started storing it to disk.
case err == channeldb.ErrNoCloseTx:
log.Warnf("Channel %v is in state CommitBroadcasted, "+
"but no closing tx to re-publish...", chanPoint)
continue
case err != nil:
// Republish any closing transactions for this channel.
err = c.publishClosingTxs(channel)
if err != nil {
return err
}
log.Infof("Re-publishing closing tx(%v) for channel %v",
closeTx.TxHash(), chanPoint)
err = c.cfg.PublishTx(closeTx)
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Warnf("Unable to broadcast close tx(%v): %v",
closeTx.TxHash(), err)
}
}
// In addition to the channels that we know to be open, we'll also
@ -570,6 +546,90 @@ func (c *ChainArbitrator) Start() error {
return nil
}
// publishClosingTxs will load any stored cooperative or unilater closing
// transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed.
func (c *ChainArbitrator) publishClosingTxs(
channel *channeldb.OpenChannel) error {
// If the channel has had its unilateral close broadcasted already,
// republish it in case it didn't propagate.
if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
err := c.rebroadcast(
channel, channeldb.ChanStatusCommitBroadcasted,
)
if err != nil {
return err
}
}
// If the channel has had its cooperative close broadcasted
// already, republish it in case it didn't propagate.
if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
err := c.rebroadcast(
channel, channeldb.ChanStatusCoopBroadcasted,
)
if err != nil {
return err
}
}
return nil
}
// rebroadcast is a helper method which will republish the unilateral or
// cooperative close transaction or a channel in a particular state.
//
// NOTE: There is no risk to caling this method if the channel isn't in either
// CommimentBroadcasted or CoopBroadcasted, but the logs will be misleading.
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
state channeldb.ChannelStatus) error {
chanPoint := channel.FundingOutpoint
var (
closeTx *wire.MsgTx
kind string
err error
)
switch state {
case channeldb.ChanStatusCommitBroadcasted:
kind = "force"
closeTx, err = channel.BroadcastedCommitment()
case channeldb.ChanStatusCoopBroadcasted:
kind = "coop"
closeTx, err = channel.BroadcastedCooperative()
default:
return fmt.Errorf("unknown closing state: %v", state)
}
switch {
// This can happen for channels that had their closing tx published
// before we started storing it to disk.
case err == channeldb.ErrNoCloseTx:
log.Warnf("Channel %v is in state %v, but no %s closing tx "+
"to re-publish...", chanPoint, state, kind)
return nil
case err != nil:
return err
}
log.Infof("Re-publishing %s close tx(%v) for channel %v",
kind, closeTx.TxHash(), chanPoint)
err = c.cfg.PublishTx(closeTx)
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Warnf("Unable to broadcast %s close tx(%v): %v",
kind, closeTx.TxHash(), err)
}
return nil
}
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
// channel arbitrators will be signalled to exit, and this method will block
// until they've all exited.

@ -12,10 +12,10 @@ import (
"github.com/lightningnetwork/lnd/lnwallet"
)
// TestChainArbitratorRepulishCommitment testst that the chain arbitrator will
// republish closing transactions for channels marked CommitementBroadcast in
// the database at startup.
func TestChainArbitratorRepublishCommitment(t *testing.T) {
// TestChainArbitratorRepulishCloses tests that the chain arbitrator will
// republish closing transactions for channels marked CommitementBroadcast or
// CoopBroadcast in the database at startup.
func TestChainArbitratorRepublishCloses(t *testing.T) {
t.Parallel()
tempPath, err := ioutil.TempDir("", "testdb")
@ -65,17 +65,22 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = channels[i].MarkCoopBroadcasted(closeTx)
if err != nil {
t.Fatal(err)
}
}
// We keep track of the transactions published by the ChainArbitrator
// at startup.
published := make(map[chainhash.Hash]struct{})
published := make(map[chainhash.Hash]int)
chainArbCfg := ChainArbitratorConfig{
ChainIO: &mockChainIO{},
Notifier: &mockNotifier{},
PublishTx: func(tx *wire.MsgTx) error {
published[tx.TxHash()] = struct{}{}
published[tx.TxHash()]++
return nil
},
}
@ -103,11 +108,16 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) {
closeTx := channels[i].FundingTxn.Copy()
closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint
_, ok := published[closeTx.TxHash()]
count, ok := published[closeTx.TxHash()]
if !ok {
t.Fatalf("closing tx not re-published")
}
// We expect one coop close and one force close.
if count != 2 {
t.Fatalf("expected 2 closing txns, only got %d", count)
}
delete(published, closeTx.TxHash())
}

@ -32,6 +32,7 @@ import (
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
@ -332,10 +333,33 @@ func assertChannelClosed(ctx context.Context, t *harnessTest,
}
chanPointStr := fmt.Sprintf("%v:%v", txid, fundingChanPoint.OutputIndex)
// If the channel appears in list channels, ensure that its state
// contains ChanStatusCoopBroadcasted.
ctxt, _ := context.WithTimeout(ctx, defaultTimeout)
listChansRequest := &lnrpc.ListChannelsRequest{}
listChansResp, err := node.ListChannels(ctxt, listChansRequest)
if err != nil {
t.Fatalf("unable to query for list channels: %v", err)
}
for _, channel := range listChansResp.Channels {
// Skip other channels.
if channel.ChannelPoint != chanPointStr {
continue
}
// Assert that the channel is in coop broadcasted.
if !strings.Contains(channel.ChanStatusFlags,
channeldb.ChanStatusCoopBroadcasted.String()) {
t.Fatalf("channel not coop broadcasted, "+
"got: %v", channel.ChanStatusFlags)
}
}
// At this point, the channel should now be marked as being in the
// state of "waiting close".
ctxt, _ = context.WithTimeout(ctx, defaultTimeout)
pendingChansRequest := &lnrpc.PendingChannelsRequest{}
pendingChanResp, err := node.PendingChannels(ctx, pendingChansRequest)
pendingChanResp, err := node.PendingChannels(ctxt, pendingChansRequest)
if err != nil {
t.Fatalf("unable to query for pending channels: %v", err)
}

@ -6394,6 +6394,16 @@ func (lc *LightningChannel) MarkCommitmentBroadcasted(tx *wire.MsgTx) error {
return lc.channelState.MarkCommitmentBroadcasted(tx)
}
// MarkCoopBroadcasted marks the channel as a cooperative close transaction has
// been broadcast, and that we should watch the chain for it to confirm before
// taking any further action.
func (lc *LightningChannel) MarkCoopBroadcasted(tx *wire.MsgTx) error {
lc.Lock()
defer lc.Unlock()
return lc.channelState.MarkCoopBroadcasted(tx)
}
// MarkDataLoss marks sets the channel status to LocalDataLoss and stores the
// passed commitPoint for use to retrieve funds in case the remote force closes
// the channel.

@ -463,6 +463,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
fallthrough
case dbChan.HasChanStatus(channeldb.ChanStatusCommitBroadcasted):
fallthrough
case dbChan.HasChanStatus(channeldb.ChanStatusCoopBroadcasted):
fallthrough
case dbChan.HasChanStatus(channeldb.ChanStatusLocalDataLoss):
peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+
"start.", chanPoint, dbChan.ChanStatus())