contractcourt: extend the chainWatcher to be able to detect co-op closes

In this commit, we extend the chainWatcher to be able to automatically
detect co-op closes of the channel. With this change, it’s now fully
encompassed so able to detect all types of closes on-chain. We detect a
co-op close due to the sequence number being finalized, as well as
paying to us directly in a regular p2wkh-like output.
This commit is contained in:
Olaoluwa Osuntokun 2018-01-19 17:12:08 -08:00
parent ebb4c84b32
commit 62f951a969
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -3,12 +3,17 @@ package contractcourt
import ( import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
) )
// ChainEventSubscription is a struct that houses a subscription to be notified // ChainEventSubscription is a struct that houses a subscription to be notified
@ -17,7 +22,7 @@ import (
// channel closure, and a channel breach. The fourth type: a force close is // channel closure, and a channel breach. The fourth type: a force close is
// locally initiated, so we don't provide any event stream for said event. // locally initiated, so we don't provide any event stream for said event.
type ChainEventSubscription struct { type ChainEventSubscription struct {
// ChanPoint is that channel that chain events will be dispatched fo. // ChanPoint is that channel that chain events will be dispatched for.
ChanPoint wire.OutPoint ChanPoint wire.OutPoint
// UnilateralClosure is a channel that will be sent upon in the event that // UnilateralClosure is a channel that will be sent upon in the event that
@ -54,6 +59,9 @@ type ChainEventSubscription struct {
// that the channel has been closed, and also give them the materials necessary // that the channel has been closed, and also give them the materials necessary
// to sweep the funds of the channel on chain eventually. // to sweep the funds of the channel on chain eventually.
type chainWatcher struct { type chainWatcher struct {
started int32
stopped int32
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -90,6 +98,21 @@ type chainWatcher struct {
// clientSubscriptions is a map that keeps track of all the active // clientSubscriptions is a map that keeps track of all the active
// client subscriptions for events related to this channel. // client subscriptions for events related to this channel.
clientSubscriptions map[uint64]*ChainEventSubscription clientSubscriptions map[uint64]*ChainEventSubscription
// possibleCloses is a map from cooperative closing transaction txid to
// a close summary that describes the nature of the channel closure.
// We'll use this map to keep track of all possible channel closures to
// ensure out db state is correct in the end.
possibleCloses map[chainhash.Hash]*channeldb.ChannelCloseSummary
// markChanClosed is a method that will be called by the watcher if it
// detects that a cooperative closure transaction has successfully been
// confirmed.
markChanClosed func() error
// isOurAddr is a function that returns true if the passed address is
// known to us.
isOurAddr func(btcutil.Address) bool
} }
// newChainWatcher returns a new instance of a chainWatcher for a channel given // newChainWatcher returns a new instance of a chainWatcher for a channel given
@ -97,7 +120,8 @@ type chainWatcher struct {
// detect on chain events. // detect on chain events.
func newChainWatcher(chanState *channeldb.OpenChannel, func newChainWatcher(chanState *channeldb.OpenChannel,
notifier chainntnfs.ChainNotifier, pCache WitnessBeacon, notifier chainntnfs.ChainNotifier, pCache WitnessBeacon,
signer lnwallet.Signer) (*chainWatcher, error) { signer lnwallet.Signer, isOurAddr func(btcutil.Address) bool,
markChanClosed func() error) (*chainWatcher, error) {
// In order to be able to detect the nature of a potential channel // In order to be able to detect the nature of a potential channel
// closure we'll need to reconstruct the state hint bytes used to // closure we'll need to reconstruct the state hint bytes used to
@ -121,15 +145,22 @@ func newChainWatcher(chanState *channeldb.OpenChannel,
stateHintObfuscator: stateHint, stateHintObfuscator: stateHint,
notifier: notifier, notifier: notifier,
pCache: pCache, pCache: pCache,
markChanClosed: markChanClosed,
signer: signer, signer: signer,
quit: make(chan struct{}), quit: make(chan struct{}),
clientSubscriptions: make(map[uint64]*ChainEventSubscription), clientSubscriptions: make(map[uint64]*ChainEventSubscription),
isOurAddr: isOurAddr,
possibleCloses: make(map[chainhash.Hash]*channeldb.ChannelCloseSummary),
}, nil }, nil
} }
// Start starts all goroutines that the chainWatcher needs to perform its // Start starts all goroutines that the chainWatcher needs to perform its
// duties. // duties.
func (c *chainWatcher) Start() error { func (c *chainWatcher) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
}
log.Debugf("Starting chain watcher for ChannelPoint(%v)", log.Debugf("Starting chain watcher for ChannelPoint(%v)",
c.chanState.FundingOutpoint) c.chanState.FundingOutpoint)
@ -162,6 +193,10 @@ func (c *chainWatcher) Start() error {
// Stop signals the close observer to gracefully exit. // Stop signals the close observer to gracefully exit.
func (c *chainWatcher) Stop() error { func (c *chainWatcher) Stop() error {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return nil
}
close(c.quit) close(c.quit)
c.wg.Wait() c.wg.Wait()
@ -217,6 +252,10 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) {
// We've detected a spend of the channel onchain! Depending on // We've detected a spend of the channel onchain! Depending on
// the type of spend, we'll act accordingly , so we'll examine // the type of spend, we'll act accordingly , so we'll examine
// the spending transaction to determine what we should do. // the spending transaction to determine what we should do.
//
// TODO(Roasbeef): need to be able to ensure this only triggers
// on confirmation, to ensure if multiple txns are broadcast, we
// act on the one that's timestamped
case commitSpend, ok := <-spendNtfn.Spend: case commitSpend, ok := <-spendNtfn.Spend:
// If the channel was closed, then this means that the // If the channel was closed, then this means that the
// notifier exited, so we will as well. // notifier exited, so we will as well.
@ -260,9 +299,16 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) {
// Next, we'll check to see if this is a cooperative // Next, we'll check to see if this is a cooperative
// channel closure or not. This is characterized by // channel closure or not. This is characterized by
// // having an input sequence number that's finalized.
// TODO(roasbeef): check to see if txid amongst those // This won't happen with regular commitment
// that we know are co-op channel closes // transactions due to the state hint encoding scheme.
if commitTxBroadcast.TxIn[0].Sequence == wire.MaxTxInSequenceNum {
err := c.dispatchCooperativeClose(commitSpend)
if err != nil {
log.Errorf("unable to handle co op close: %v", err)
}
return
}
log.Warnf("Unprompted commitment broadcast for "+ log.Warnf("Unprompted commitment broadcast for "+
"ChannelPoint(%v) ", c.chanState.FundingOutpoint) "ChannelPoint(%v) ", c.chanState.FundingOutpoint)
@ -325,6 +371,117 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) {
} }
} }
// toSelfAmount takes a transaction and returns the sum of all outputs that pay
// to a script that the wallet controls. If no outputs pay to us, then we
// return zero. This is possible as our output may have been trimmed due to
// being dust.
func (c *chainWatcher) toSelfAmount(tx *wire.MsgTx) btcutil.Amount {
var selfAmt btcutil.Amount
for _, txOut := range tx.TxOut {
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
// Doesn't matter what net we actually pass in.
txOut.PkScript, &chaincfg.TestNet3Params,
)
if err != nil {
continue
}
for _, addr := range addrs {
if c.isOurAddr(addr) {
selfAmt += btcutil.Amount(txOut.Value)
}
}
}
return selfAmt
}
// dispatchCooperativeClose processed a detect cooperative channel closure.
// We'll use the spending transaction to locate our output within the
// transaction, then clean up the database state. We'll also dispatch a
// notification to all subscribers that the channel has been closed in this
// manner.
func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDetail) error {
broadcastTx := commitSpend.SpendingTx
log.Infof("Cooperative closure for ChannelPoint(%v): %v",
c.chanState.FundingOutpoint, spew.Sdump(broadcastTx))
// If the input *is* final, then we'll check to see which output is
// ours.
localAmt := c.toSelfAmount(broadcastTx)
// Once this is known, we'll mark the state as pending close in the
// database.
closeSummary := &channeldb.ChannelCloseSummary{
ChanPoint: c.chanState.FundingOutpoint,
ChainHash: c.chanState.ChainHash,
ClosingTXID: *commitSpend.SpenderTxHash,
RemotePub: c.chanState.IdentityPub,
Capacity: c.chanState.Capacity,
CloseHeight: uint32(commitSpend.SpendingHeight),
SettledBalance: localAmt,
CloseType: channeldb.CooperativeClose,
ShortChanID: c.chanState.ShortChanID,
IsPending: true,
}
err := c.chanState.CloseChannel(closeSummary)
if err != nil && err != channeldb.ErrNoActiveChannels &&
err != channeldb.ErrNoChanDBExists {
return fmt.Errorf("unable to close chan state: %v", err)
}
// Finally, we'll launch a goroutine to mark the channel as fully
// closed once the transaction confirmed.
go func() {
confNtfn, err := c.notifier.RegisterConfirmationsNtfn(
commitSpend.SpenderTxHash, 1,
uint32(commitSpend.SpendingHeight),
)
if err != nil {
log.Errorf("unable to register for conf: %v", err)
return
}
log.Infof("Waiting for txid=%v to close ChannelPoint(%v) on chain",
commitSpend.SpenderTxHash, c.chanState.FundingOutpoint)
select {
case confInfo, ok := <-confNtfn.Confirmed:
if !ok {
log.Errorf("notifier exiting")
return
}
log.Infof("ChannelPoint(%v) is fully closed, at height: %v",
c.chanState.FundingOutpoint, confInfo.BlockHeight)
err := c.markChanClosed()
if err != nil {
log.Errorf("unable to mark chan fully "+
"closed: %v", err)
return
}
case <-c.quit:
return
}
}()
c.Lock()
for _, sub := range c.clientSubscriptions {
select {
case sub.CooperativeClosure <- struct{}{}:
case <-c.quit:
return fmt.Errorf("exiting")
}
}
c.Unlock()
return nil
}
// dispatchRemoteClose processes a detected unilateral channel closure by the // dispatchRemoteClose processes a detected unilateral channel closure by the
// remote party. This function will prepare a UnilateralCloseSummary which will // remote party. This function will prepare a UnilateralCloseSummary which will
// then be sent to any subscribers allowing them to resolve all our funds in // then be sent to any subscribers allowing them to resolve all our funds in