lnd: properly execute force closures kicked off via RPC

This commit includes some slight refactoring to properly execute force
closures which are initiated by RPC clients.

The CloseLink method within the htlcSwitch has been extended to take an
additional parameter which indicates if the link should be closed
forcefully. If so, then the channelManager which dispatches the request
executes a force closure using the target channel state machine. Once
the closing transaction has been broadcast, the summary is sent to the
utxoNursery so the outputs can be swept once they’re mature.
This commit is contained in:
Olaoluwa Osuntokun 2016-09-12 12:42:26 -07:00
parent bfa2a1d698
commit 80b09f7d6f
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 122 additions and 36 deletions

@ -63,7 +63,13 @@ type htlcSwitch struct {
started int32 // atomic started int32 // atomic
shutdown int32 // atomic shutdown int32 // atomic
chanIndex map[wire.OutPoint]*link // chanIndex maps a channel's outpoint to a link which contains
// additional information about the channel, and additionally houses a
// pointer to the peer mangaing the channel.
chanIndex map[wire.OutPoint]*link
// interfaces maps a node's ID to the set of links (active channels) we
// currently have open with that peer.
interfaces map[wire.ShaHash][]*link interfaces map[wire.ShaHash][]*link
// TODO(roasbeef): msgs for dynamic link quality // TODO(roasbeef): msgs for dynamic link quality
@ -395,20 +401,29 @@ func (h *htlcSwitch) UnregisterLink(chanInterface [32]byte, chanPoint *wire.OutP
// closeChanReq represents a request to close a particular channel specified // closeChanReq represents a request to close a particular channel specified
// by its outpoint. // by its outpoint.
type closeLinkReq struct { type closeLinkReq struct {
chanPoint *wire.OutPoint chanPoint *wire.OutPoint
forceClose bool
updates chan *lnrpc.CloseStatusUpdate updates chan *lnrpc.CloseStatusUpdate
err chan error err chan error
} }
// CloseLink closes an active link targetted by it's channel point. Closing the // CloseLink closes an active link targetted by it's channel point. Closing the
// link initiates a cooperative channel closure. // link initiates a cooperative channel closure iff forceClose is false. If
// TODO(roabeef): bool flag for timeout/force // forceClose is true, then a unilateral channel closure is executed.
func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint) (chan *lnrpc.CloseStatusUpdate, chan error) { // TODO(roabeef): bool flag for timeout
func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint,
forceClose bool) (chan *lnrpc.CloseStatusUpdate, chan error) {
updateChan := make(chan *lnrpc.CloseStatusUpdate, 1) updateChan := make(chan *lnrpc.CloseStatusUpdate, 1)
errChan := make(chan error, 1) errChan := make(chan error, 1)
h.linkControl <- &closeLinkReq{chanPoint, updateChan, errChan} h.linkControl <- &closeLinkReq{
chanPoint: chanPoint,
forceClose: forceClose,
updates: updateChan,
err: errChan,
}
return updateChan, errChan return updateChan, errChan
} }

116
peer.go

@ -8,6 +8,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/BitfuryLightning/tools/rt/graph"
"github.com/btcsuite/fastsha256" "github.com/btcsuite/fastsha256"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
@ -19,7 +20,6 @@ import (
"github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
"github.com/BitfuryLightning/tools/rt/graph"
) )
var ( var (
@ -624,46 +624,102 @@ out:
p.wg.Done() p.wg.Done()
} }
// handleLocalClose kicks-off the workflow to execute a cooperative closure of // executeForceClose executes a unilateral close of the target channel by
// the channel initiated by a local sub-system. // broadcasting the current commitment state directly on-chain. Once the
func (p *peer) handleLocalClose(req *closeLinkReq) { // commitment transaction has been broadcast, a struct describing the final
chanPoint := req.chanPoint // state of the channel is sent to the utxoNursery in order to ultimatley sweep
key := wire.OutPoint{ // the immature outputs.
Hash: chanPoint.Hash, func (p *peer) executeForceClose(channel *lnwallet.LightningChannel) (*wire.ShaHash, error) {
Index: chanPoint.Index, // Execute a unilateral close shutting down all further channel
// operation.
closeSummary, err := channel.ForceClose()
if err != nil {
return nil, err
} }
channel := p.activeChannels[key]
closeTx := closeSummary.CloseTx
txid := closeTx.TxSha()
// With the close transaction in hand, broadcast the transaction to the
// network, thereby entering the psot channel resolution state.
peerLog.Infof("Broadcasting force close transaction: %v",
channel.ChannelPoint(), newLogClosure(func() string {
return spew.Sdump(closeTx)
}))
if err := p.server.lnwallet.PublishTransaction(closeTx); err != nil {
return nil, err
}
// Send the closed channel sumary over to the utxoNursery in order to
// have its outputs sweeped back into the wallet once they're mature.
p.server.utxoNursery.incubateOutputs(closeSummary)
return &txid, nil
}
// executeCooperativeClose executes the initial phase of a user-executed
// cooperative channel close. The channel state machine is transitioned to the
// closing phase, then our half of the closing witness is sent over to the
// remote peer.
func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*wire.ShaHash, error) {
// Shift the channel state machine into a 'closing' state. This // Shift the channel state machine into a 'closing' state. This
// generates a signature for the closing tx, as well as a txid of the // generates a signature for the closing tx, as well as a txid of the
// closing tx itself, allowing us to watch the network to determine // closing tx itself, allowing us to watch the network to determine
// when the remote node broadcasts the fully signed closing transaction. // when the remote node broadcasts the fully signed closing
// transaction.
sig, txid, err := channel.InitCooperativeClose() sig, txid, err := channel.InitCooperativeClose()
if err != nil { if err != nil {
req.err <- err return nil, err
return
} }
peerLog.Infof("Executing cooperative closure of "+
"ChanPoint(%v) with peerID(%v), txid=%v", key, p.id,
txid)
// With our signature for the close tx generated, send the signature chanPoint := channel.ChannelPoint()
// to the remote peer instructing it to close this particular channel peerLog.Infof("Executing cooperative closure of "+
"ChanPoint(%v) with peerID(%v), txid=%v", chanPoint, p.id, txid)
// With our signature for the close tx generated, send the signature to
// the remote peer instructing it to close this particular channel
// point. // point.
// TODO(roasbeef): remove encoding redundancy // TODO(roasbeef): remove encoding redundancy
closeSig, err := btcec.ParseSignature(sig, btcec.S256()) closeSig, err := btcec.ParseSignature(sig, btcec.S256())
if err != nil { if err != nil {
req.err <- err return nil, err
return
} }
closeReq := lnwire.NewCloseRequest(chanPoint, closeSig) closeReq := lnwire.NewCloseRequest(chanPoint, closeSig)
p.queueMsg(closeReq, nil) p.queueMsg(closeReq, nil)
return txid, nil
}
// handleLocalClose kicks-off the workflow to execute a cooperative or forced
// unilateral closure of the channel initiated by a local sub-system.
func (p *peer) handleLocalClose(req *closeLinkReq) {
var (
err error
closingTxid *wire.ShaHash
)
channel := p.activeChannels[*req.chanPoint]
if req.forceClose {
closingTxid, err = p.executeForceClose(channel)
peerLog.Infof("Force closing ChannelPoint(%v) with txid: %v",
req.chanPoint, closingTxid)
} else {
closingTxid, err = p.executeCooperativeClose(channel)
peerLog.Infof("Attempting cooperative close of "+
"ChannelPoint(%v) with txid: %v", req.chanPoint,
closingTxid)
}
if err != nil {
req.err <- err
return
}
// Update the caller w.r.t the current pending state of this request. // Update the caller w.r.t the current pending state of this request.
req.updates <- &lnrpc.CloseStatusUpdate{ req.updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ClosePending{ Update: &lnrpc.CloseStatusUpdate_ClosePending{
ClosePending: &lnrpc.PendingUpdate{ ClosePending: &lnrpc.PendingUpdate{
Txid: txid[:], Txid: closingTxid[:],
}, },
}, },
} }
@ -673,7 +729,8 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
// confirmation. // confirmation.
go func() { go func() {
// TODO(roasbeef): add param for num needed confs // TODO(roasbeef): add param for num needed confs
confNtfn, err := p.server.chainNotifier.RegisterConfirmationsNtfn(txid, 1) notifier := p.server.chainNotifier
confNtfn, err := notifier.RegisterConfirmationsNtfn(closingTxid, 1)
if err != nil { if err != nil {
req.err <- err req.err <- err
return return
@ -692,7 +749,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
// The channel has been closed, remove it from any // The channel has been closed, remove it from any
// active indexes, and the database state. // active indexes, and the database state.
peerLog.Infof("ChannelPoint(%v) is now "+ peerLog.Infof("ChannelPoint(%v) is now "+
"closed at height %v", key, height) "closed at height %v", req.chanPoint, height)
if err := wipeChannel(p, channel); err != nil { if err := wipeChannel(p, channel); err != nil {
req.err <- err req.err <- err
return return
@ -706,7 +763,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
req.updates <- &lnrpc.CloseStatusUpdate{ req.updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{ Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{ ChanClose: &lnrpc.ChannelCloseUpdate{
ClosingTxid: txid[:], ClosingTxid: closingTxid[:],
Success: true, Success: true,
}, },
}, },
@ -867,6 +924,19 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
out: out:
for { for {
select { select {
case <-channel.UnilateralCloseSignal:
peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain",
state.chanPoint)
if err := wipeChannel(p, channel); err != nil {
peerLog.Errorf("Unable to wipe channel %v", err)
}
break out
case <-channel.ForceCloseSignal:
peerLog.Warnf("ChannelPoint(%v) has been force "+
"closed, disconnecting from peerID(%x)",
state.chanPoint, p.id)
break out
//p.Disconnect()
// TODO(roasbeef): prevent leaking ticker? // TODO(roasbeef): prevent leaking ticker?
case <-state.logCommitTimer: case <-state.logCommitTimer:
// If we haven't sent or received a new commitment // If we haven't sent or received a new commitment

@ -246,6 +246,7 @@ out:
func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
updateStream lnrpc.Lightning_CloseChannelServer) error { updateStream lnrpc.Lightning_CloseChannelServer) error {
force := in.Force
index := in.ChannelPoint.OutputIndex index := in.ChannelPoint.OutputIndex
txid, err := wire.NewShaHash(in.ChannelPoint.FundingTxid) txid, err := wire.NewShaHash(in.ChannelPoint.FundingTxid)
if err != nil { if err != nil {
@ -257,7 +258,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)", rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)",
targetChannelPoint) targetChannelPoint)
updateChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint) updateChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint, force)
out: out:
for { for {
@ -279,7 +280,7 @@ out:
switch closeUpdate := closingUpdate.Update.(type) { switch closeUpdate := closingUpdate.Update.(type) {
case *lnrpc.CloseStatusUpdate_ChanClose: case *lnrpc.CloseStatusUpdate_ChanClose:
h, _ := wire.NewShaHash(closeUpdate.ChanClose.ClosingTxid) h, _ := wire.NewShaHash(closeUpdate.ChanClose.ClosingTxid)
rpcsLog.Errorf("[closechannel] close completed: "+ rpcsLog.Infof("[closechannel] close completed: "+
"txid(%v)", h) "txid(%v)", h)
break out break out
} }
@ -492,11 +493,11 @@ func (r *rpcServer) ShowRoutingTable(ctx context.Context,
for _, channel := range rtCopy.AllChannels() { for _, channel := range rtCopy.AllChannels() {
channels = append(channels, channels = append(channels,
&lnrpc.RoutingTableLink{ &lnrpc.RoutingTableLink{
Id1: channel.Id1.String(), Id1: channel.Id1.String(),
Id2: channel.Id2.String(), Id2: channel.Id2.String(),
Outpoint: channel.EdgeID.String(), Outpoint: channel.EdgeID.String(),
Capacity: channel.Info.Capacity(), Capacity: channel.Info.Capacity(),
Weight: channel.Info.Weight(), Weight: channel.Info.Weight(),
}, },
) )
} }