peer+contractcourt: delegate watching for co-op closes to the chainWatcher

In this commit, we modify the interaction between the chanCloser
sub-system and the chain notifier all together. This fixes a series of
bugs as before this commit, we wouldn’t be able to detect if the remote
party actually broadcasted *any* of the transactions that we signed off
upon. This would be rejected to the user by having a “zombie” channel
close that would never actually be resolved.

Rather than the chanCloser watching for on-chain closes, we’ll now open
up a co-op close context to the chainWatcher (via a layer of
indirection via the ChainArbitrator), and report to it all possible
closes that we’ve signed. The chainWatcher will then be able to launch
a goroutine to properly update the database state once any of the
possible closure transactions confirms.
This commit is contained in:
Olaoluwa Osuntokun 2018-01-19 17:23:38 -08:00
parent 1604c75a9c
commit 3ec83cc82f
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
5 changed files with 201 additions and 32 deletions

@ -6,6 +6,7 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@ -141,6 +142,8 @@ type channelCloser struct {
// TODO(roasbeef): abstract away // TODO(roasbeef): abstract away
closeReq *htlcswitch.ChanClose closeReq *htlcswitch.ChanClose
closeCtx *contractcourt.CooperativeCloseCtx
// localDeliveryScript is the script that we'll send our settled // localDeliveryScript is the script that we'll send our settled
// channel funds to. // channel funds to.
localDeliveryScript []byte localDeliveryScript []byte
@ -155,7 +158,8 @@ type channelCloser struct {
// only be populated iff, we're the initiator of this closing request. // only be populated iff, we're the initiator of this closing request.
func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte, func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte,
idealFeePerkw btcutil.Amount, negotiationHeight uint32, idealFeePerkw btcutil.Amount, negotiationHeight uint32,
closeReq *htlcswitch.ChanClose) *channelCloser { closeReq *htlcswitch.ChanClose,
closeCtx *contractcourt.CooperativeCloseCtx) *channelCloser {
// Given the target fee-per-kw, we'll compute what our ideal _total_ // Given the target fee-per-kw, we'll compute what our ideal _total_
// fee will be starting at for this fee negotiation. // fee will be starting at for this fee negotiation.
@ -191,6 +195,7 @@ func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte,
cfg: cfg, cfg: cfg,
negotiationHeight: negotiationHeight, negotiationHeight: negotiationHeight,
idealFeeSat: idealFeeSat, idealFeeSat: idealFeeSat,
closeCtx: closeCtx,
localDeliveryScript: deliveryScript, localDeliveryScript: deliveryScript,
priorFeeOffers: make(map[btcutil.Amount]*lnwire.ClosingSigned), priorFeeOffers: make(map[btcutil.Amount]*lnwire.ClosingSigned),
} }
@ -459,19 +464,20 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
// being closed within the database. // being closed within the database.
closingTxid := closeTx.TxHash() closingTxid := closeTx.TxHash()
chanInfo := c.cfg.channel.StateSnapshot() chanInfo := c.cfg.channel.StateSnapshot()
closeSummary := &channeldb.ChannelCloseSummary{ c.closeCtx.Finalize(&channeldb.ChannelCloseSummary{
ChanPoint: c.chanPoint, ChanPoint: c.chanPoint,
ChainHash: chanInfo.ChainHash, ChainHash: chanInfo.ChainHash,
ClosingTXID: closingTxid, ClosingTXID: closingTxid,
CloseHeight: c.negotiationHeight,
RemotePub: &chanInfo.RemoteIdentity, RemotePub: &chanInfo.RemoteIdentity,
Capacity: chanInfo.Capacity, Capacity: chanInfo.Capacity,
SettledBalance: finalLocalBalance, SettledBalance: finalLocalBalance,
CloseType: channeldb.CooperativeClose, CloseType: channeldb.CooperativeClose,
ShortChanID: c.cfg.channel.ShortChanID(),
IsPending: true, IsPending: true,
} })
if err := c.cfg.channel.DeleteState(closeSummary); err != nil {
return nil, false, err // TODO(roasbeef): don't need, ChainWatcher will handle
}
c.state = closeFinished c.state = closeFinished
@ -507,7 +513,8 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
// transaction for a channel based on the prior fee negotiations and our // transaction for a channel based on the prior fee negotiations and our
// current compromise fee. // current compromise fee.
func (c *channelCloser) proposeCloseSigned(fee btcutil.Amount) (*lnwire.ClosingSigned, error) { func (c *channelCloser) proposeCloseSigned(fee btcutil.Amount) (*lnwire.ClosingSigned, error) {
rawSig, err := c.cfg.channel.CreateCloseProposal(
rawSig, txid, localAmt, err := c.cfg.channel.CreateCloseProposal(
fee, c.localDeliveryScript, c.remoteDeliveryScript, fee, c.localDeliveryScript, c.remoteDeliveryScript,
) )
if err != nil { if err != nil {
@ -535,6 +542,20 @@ func (c *channelCloser) proposeCloseSigned(fee btcutil.Amount) (*lnwire.ClosingS
// accepts our offer. This way, we don't have to re-sign. // accepts our offer. This way, we don't have to re-sign.
c.priorFeeOffers[fee] = closeSignedMsg c.priorFeeOffers[fee] = closeSignedMsg
chanInfo := c.cfg.channel.StateSnapshot()
c.closeCtx.LogPotentialClose(&channeldb.ChannelCloseSummary{
ChanPoint: c.chanPoint,
ChainHash: chanInfo.ChainHash,
ClosingTXID: *txid,
CloseHeight: c.negotiationHeight,
RemotePub: &chanInfo.RemoteIdentity,
Capacity: chanInfo.Capacity,
SettledBalance: localAmt,
CloseType: channeldb.CooperativeClose,
ShortChanID: c.cfg.channel.ShortChanID(),
IsPending: true,
})
return closeSignedMsg, nil return closeSignedMsg, nil
} }

@ -617,3 +617,139 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
return c.chanState.CloseChannel(&closeSummary) return c.chanState.CloseChannel(&closeSummary)
} }
// CooperativeCloseContext is a transactional object that's used by external
// parties to initiate a cooperative closure negotiation. During the
// negotiation, we sign multiple versions of a closing transaction, either of
// which may be counter signed and broadcast by the remote party at any time.
// As a result, we'll need to watch the chain to see if any of these confirm,
// only afterwards will we mark the channel as fully closed.
type CooperativeCloseCtx struct {
// potentialCloses is a channel will be used by the party negotiating
// the cooperative closure to send possible closing states to the chain
// watcher to ensure we detect all on-chain spends.
potentialCloses chan *channeldb.ChannelCloseSummary
activeCloses map[chainhash.Hash]struct{}
watchCancel chan struct{}
watcher *chainWatcher
sync.Mutex
}
// BeginCooperativeClose should be called by the party negotiating the
// cooperative closure before the first signature is sent to the remote party.
// This will return a context that should be used to communicate possible
// closing states so we can act on them.
func (c *chainWatcher) BeginCooperativeClose() *CooperativeCloseCtx {
// We'll simply return a new close context that will be used be the
// caller to notify us of potential closes.
return &CooperativeCloseCtx{
potentialCloses: make(chan *channeldb.ChannelCloseSummary),
watchCancel: make(chan struct{}),
activeCloses: make(map[chainhash.Hash]struct{}),
watcher: c,
}
}
// LogPotentialClose should be called by the party negotiating the cooperative
// closure once they signed a new state, but *before* they transmit it to the
// remote party. This will ensure that the chain watcher is able to log the new
// state it should watch the chain for.
func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.ChannelCloseSummary) {
c.Lock()
defer c.Unlock()
// We'll check to see if we're already watching for a close of this
// channel, if so, then we'll exit early to avoid launching a duplicate
// goroutine.
if _, ok := c.activeCloses[potentialClose.ClosingTXID]; ok {
return
}
// Otherwise, we'll mark this txid as currently being watched.
c.activeCloses[potentialClose.ClosingTXID] = struct{}{}
// We'll take this potential close, and launch a goroutine which will
// wait until it's confirmed, then update the database state. When a
// potential close gets confirmed, we'll cancel out all other launched
// goroutines.
go func() {
confNtfn, err := c.watcher.notifier.RegisterConfirmationsNtfn(
&potentialClose.ClosingTXID, 1,
uint32(potentialClose.CloseHeight),
)
if err != nil {
log.Errorf("unable to register for conf: %v", err)
return
}
log.Infof("Waiting for txid=%v to close ChannelPoint(%v) on chain",
potentialClose.ClosingTXID, c.watcher.chanState.FundingOutpoint)
select {
case confInfo, ok := <-confNtfn.Confirmed:
if !ok {
log.Errorf("notifier exiting")
return
}
log.Infof("ChannelPoint(%v) is fully closed, at "+
"height: %v", c.watcher.chanState.FundingOutpoint,
confInfo.BlockHeight)
close(c.watchCancel)
c.watcher.Lock()
for _, sub := range c.watcher.clientSubscriptions {
select {
case sub.CooperativeClosure <- struct{}{}:
case <-c.watcher.quit:
}
}
c.watcher.Unlock()
err := c.watcher.chanState.CloseChannel(potentialClose)
if err != nil {
log.Warnf("unable to update latest close for "+
"ChannelPoint(%v)",
c.watcher.chanState.FundingOutpoint)
}
err = c.watcher.markChanClosed()
if err != nil {
log.Errorf("unable to mark chan fully "+
"closed: %v", err)
return
}
case <-c.watchCancel:
log.Debugf("Exiting watch for close of txid=%v for "+
"ChannelPoint(%v)", potentialClose.ClosingTXID,
c.watcher.chanState.FundingOutpoint)
case <-c.watcher.quit:
return
}
}()
}
// Finalize should be called once both parties agree on a final transaction to
// close out the channel. This method will immediately mark the channel as
// pending closed in the database, then launch a goroutine to mark the channel
// fully closed upon confirmation.
func (c *CooperativeCloseCtx) Finalize(preferredClose *channeldb.ChannelCloseSummary) error {
log.Infof("Finalizing chan close for ChannelPoint(%v)",
c.watcher.chanState.FundingOutpoint)
err := c.watcher.chanState.CloseChannel(preferredClose)
if err != nil {
return err
}
go c.LogPotentialClose(preferredClose)
return nil
}

@ -230,7 +230,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
return lnwire.NodeAnnouncement{}, nil return lnwire.NodeAnnouncement{}, nil
}, },
ArbiterChan: arbiterChan,
SendToPeer: func(target *btcec.PublicKey, msgs ...lnwire.Message) error { SendToPeer: func(target *btcec.PublicKey, msgs ...lnwire.Message) error {
select { select {
case sentMessages <- msgs[0]: case sentMessages <- msgs[0]:
@ -257,7 +256,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
return lnwallet.NewLightningChannel( return lnwallet.NewLightningChannel(
signer, signer,
nil, nil,
nil,
channel) channel)
} }
} }
@ -271,7 +269,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
RequiredRemoteDelay: func(amt btcutil.Amount) uint16 { RequiredRemoteDelay: func(amt btcutil.Amount) uint16 {
return 4 return 4
}, },
ArbitrateNewChan: func(*channeldb.OpenChannel) error { WatchNewChannel: func(*channeldb.OpenChannel) error {
return nil return nil
}, },
}) })
@ -330,7 +328,6 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
return lnwire.NodeAnnouncement{}, nil return lnwire.NodeAnnouncement{}, nil
}, },
ArbiterChan: oldCfg.ArbiterChan,
SendToPeer: func(target *btcec.PublicKey, SendToPeer: func(target *btcec.PublicKey,
msgs ...lnwire.Message) error { msgs ...lnwire.Message) error {
select { select {

41
peer.go

@ -1425,6 +1425,18 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e
return nil, err return nil, err
} }
// Before we create the chan closer, we'll start a new
// cooperative channel closure transaction from the chain arb.
// Wtih this context, we'll ensure that we're able to respond
// if *any* of the transactions we sign off on are ever
// braodacast.
closeCtx, err := p.server.chainArb.BeginCoopChanClose(
*channel.ChannelPoint(),
)
if err != nil {
return nil, err
}
chanCloser = newChannelCloser( chanCloser = newChannelCloser(
chanCloseCfg{ chanCloseCfg{
channel: channel, channel: channel,
@ -1437,6 +1449,7 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e
targetFeePerKw, targetFeePerKw,
uint32(startingHeight), uint32(startingHeight),
nil, nil,
closeCtx,
) )
p.activeChanCloses[chanID] = chanCloser p.activeChanCloses[chanID] = chanCloser
} }
@ -1479,7 +1492,14 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
return return
} }
_, startingHeight, err := p.server.cc.chainIO.GetBestBlock() // Before we create the chan closer, we'll start a new
// cooperative channel closure transaction from the chain arb.
// Wtih this context, we'll ensure that we're able to respond
// if *any* of the transactions we sign off on are ever
// braodacast.
closeCtx, err := p.server.chainArb.BeginCoopChanClose(
*channel.ChannelPoint(),
)
if err != nil { if err != nil {
peerLog.Errorf(err.Error()) peerLog.Errorf(err.Error())
req.Err <- err req.Err <- err
@ -1488,6 +1508,12 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
// Next, we'll create a new channel closer state machine to // Next, we'll create a new channel closer state machine to
// handle the close negotiation. // handle the close negotiation.
_, startingHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil {
peerLog.Errorf(err.Error())
req.Err <- err
return
}
chanCloser := newChannelCloser( chanCloser := newChannelCloser(
chanCloseCfg{ chanCloseCfg{
channel: channel, channel: channel,
@ -1500,6 +1526,7 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
req.TargetFeePerKw, req.TargetFeePerKw,
uint32(startingHeight), uint32(startingHeight),
req, req,
closeCtx,
) )
p.activeChanCloses[chanID] = chanCloser p.activeChanCloses[chanID] = chanCloser
@ -1591,18 +1618,6 @@ func (p *peer) finalizeChanClosure(chanCloser *channelCloser) {
go waitForChanToClose(chanCloser.negotiationHeight, notifier, errChan, go waitForChanToClose(chanCloser.negotiationHeight, notifier, errChan,
chanPoint, &closingTxid, func() { chanPoint, &closingTxid, func() {
// First, we'll mark the database as being fully closed
// so we'll no longer watch for its ultimate closure
// upon startup.
err := p.server.chanDB.MarkChanFullyClosed(chanPoint)
if err != nil {
if closeReq != nil {
closeReq.Err <- err
}
return
}
// Respond to the local subsystem which requested the // Respond to the local subsystem which requested the
// channel closure. // channel closure.
if closeReq != nil { if closeReq != nil {

@ -88,7 +88,7 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
// We accept the fee, and send a ClosingSigned with the same fee back, // We accept the fee, and send a ClosingSigned with the same fee back,
// so she knows we agreed. // so she knows we agreed.
peerFee := responderClosingSigned.FeeSatoshis peerFee := responderClosingSigned.FeeSatoshis
initiatorSig, err := initiatorChan.CreateCloseProposal( initiatorSig, _, _, err := initiatorChan.CreateCloseProposal(
peerFee, dummyDeliveryScript, respDeliveryScript, peerFee, dummyDeliveryScript, respDeliveryScript,
) )
if err != nil { if err != nil {
@ -178,7 +178,7 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
t.Fatalf("unable to query fee estimator: %v", err) t.Fatalf("unable to query fee estimator: %v", err)
} }
fee := btcutil.Amount(responderChan.CalcFee(uint64(feeRate * 1000))) fee := btcutil.Amount(responderChan.CalcFee(uint64(feeRate * 1000)))
closeSig, err := responderChan.CreateCloseProposal(fee, closeSig, _, _, err := responderChan.CreateCloseProposal(fee,
dummyDeliveryScript, initiatorDeliveryScript) dummyDeliveryScript, initiatorDeliveryScript)
if err != nil { if err != nil {
t.Fatalf("unable to create close proposal: %v", err) t.Fatalf("unable to create close proposal: %v", err)
@ -287,7 +287,7 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
// We don't agree with the fee, and will send back one that's 2.5x. // We don't agree with the fee, and will send back one that's 2.5x.
preferredRespFee := responderClosingSigned.FeeSatoshis preferredRespFee := responderClosingSigned.FeeSatoshis
increasedFee := btcutil.Amount(float64(preferredRespFee) * 2.5) increasedFee := btcutil.Amount(float64(preferredRespFee) * 2.5)
initiatorSig, err := initiatorChan.CreateCloseProposal( initiatorSig, _, _, err := initiatorChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, respDeliveryScript, increasedFee, dummyDeliveryScript, respDeliveryScript,
) )
if err != nil { if err != nil {
@ -331,7 +331,7 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
// We try negotiating a 2.1x fee, which should also be rejected. // We try negotiating a 2.1x fee, which should also be rejected.
increasedFee = btcutil.Amount(float64(preferredRespFee) * 2.1) increasedFee = btcutil.Amount(float64(preferredRespFee) * 2.1)
initiatorSig, err = initiatorChan.CreateCloseProposal( initiatorSig, _, _, err = initiatorChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, respDeliveryScript, increasedFee, dummyDeliveryScript, respDeliveryScript,
) )
if err != nil { if err != nil {
@ -376,7 +376,7 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
// Finally, we'll accept the fee by echoing back the same fee that they // Finally, we'll accept the fee by echoing back the same fee that they
// sent to us. // sent to us.
initiatorSig, err = initiatorChan.CreateCloseProposal( initiatorSig, _, _, err = initiatorChan.CreateCloseProposal(
peerFee, dummyDeliveryScript, respDeliveryScript, peerFee, dummyDeliveryScript, respDeliveryScript,
) )
if err != nil { if err != nil {
@ -471,7 +471,7 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
uint64(initiatorIdealFeeRate * 1000), uint64(initiatorIdealFeeRate * 1000),
) )
increasedFee := btcutil.Amount(float64(initiatorIdealFee) * 2.5) increasedFee := btcutil.Amount(float64(initiatorIdealFee) * 2.5)
closeSig, err := responderChan.CreateCloseProposal( closeSig, _, _, err := responderChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, initiatorDeliveryScript, increasedFee, dummyDeliveryScript, initiatorDeliveryScript,
) )
if err != nil { if err != nil {
@ -536,7 +536,7 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
// We try negotiating a 2.1x fee, which should also be rejected. // We try negotiating a 2.1x fee, which should also be rejected.
increasedFee = btcutil.Amount(float64(initiatorIdealFee) * 2.1) increasedFee = btcutil.Amount(float64(initiatorIdealFee) * 2.1)
responderSig, err := responderChan.CreateCloseProposal( responderSig, _, _, err := responderChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, initiatorDeliveryScript, increasedFee, dummyDeliveryScript, initiatorDeliveryScript,
) )
if err != nil { if err != nil {
@ -582,7 +582,7 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
// At this point, we'll accept their fee by sending back a CloseSigned // At this point, we'll accept their fee by sending back a CloseSigned
// message with an identical fee. // message with an identical fee.
responderSig, err = responderChan.CreateCloseProposal( responderSig, _, _, err = responderChan.CreateCloseProposal(
peerFee, dummyDeliveryScript, initiatorDeliveryScript, peerFee, dummyDeliveryScript, initiatorDeliveryScript,
) )
if err != nil { if err != nil {