From 80b09f7d6fa34034eddb3911dcdd83ce17a2b13d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 12 Sep 2016 12:42:26 -0700 Subject: [PATCH] lnd: properly execute force closures kicked off via RPC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit includes some slight refactoring to properly execute force closures which are initiated by RPC clients. The CloseLink method within the htlcSwitch has been extended to take an additional parameter which indicates if the link should be closed forcefully. If so, then the channelManager which dispatches the request executes a force closure using the target channel state machine. Once the closing transaction has been broadcast, the summary is sent to the utxoNursery so the outputs can be swept once they’re mature. --- htlcswitch.go | 27 +++++++++--- peer.go | 116 ++++++++++++++++++++++++++++++++++++++++---------- rpcserver.go | 15 ++++--- 3 files changed, 122 insertions(+), 36 deletions(-) diff --git a/htlcswitch.go b/htlcswitch.go index aca430cc..d042438f 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -63,7 +63,13 @@ type htlcSwitch struct { started int32 // atomic shutdown int32 // atomic - chanIndex map[wire.OutPoint]*link + // chanIndex maps a channel's outpoint to a link which contains + // additional information about the channel, and additionally houses a + // pointer to the peer mangaing the channel. + chanIndex map[wire.OutPoint]*link + + // interfaces maps a node's ID to the set of links (active channels) we + // currently have open with that peer. interfaces map[wire.ShaHash][]*link // TODO(roasbeef): msgs for dynamic link quality @@ -395,20 +401,29 @@ func (h *htlcSwitch) UnregisterLink(chanInterface [32]byte, chanPoint *wire.OutP // closeChanReq represents a request to close a particular channel specified // by its outpoint. type closeLinkReq struct { - chanPoint *wire.OutPoint + chanPoint *wire.OutPoint + forceClose bool updates chan *lnrpc.CloseStatusUpdate err chan error } // CloseLink closes an active link targetted by it's channel point. Closing the -// link initiates a cooperative channel closure. -// TODO(roabeef): bool flag for timeout/force -func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint) (chan *lnrpc.CloseStatusUpdate, chan error) { +// link initiates a cooperative channel closure iff forceClose is false. If +// forceClose is true, then a unilateral channel closure is executed. +// TODO(roabeef): bool flag for timeout +func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint, + forceClose bool) (chan *lnrpc.CloseStatusUpdate, chan error) { + updateChan := make(chan *lnrpc.CloseStatusUpdate, 1) errChan := make(chan error, 1) - h.linkControl <- &closeLinkReq{chanPoint, updateChan, errChan} + h.linkControl <- &closeLinkReq{ + chanPoint: chanPoint, + forceClose: forceClose, + updates: updateChan, + err: errChan, + } return updateChan, errChan } diff --git a/peer.go b/peer.go index 938d9ccc..9a97d26a 100644 --- a/peer.go +++ b/peer.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/BitfuryLightning/tools/rt/graph" "github.com/btcsuite/fastsha256" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" @@ -19,7 +20,6 @@ import ( "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" - "github.com/BitfuryLightning/tools/rt/graph" ) var ( @@ -624,46 +624,102 @@ out: p.wg.Done() } -// handleLocalClose kicks-off the workflow to execute a cooperative closure of -// the channel initiated by a local sub-system. -func (p *peer) handleLocalClose(req *closeLinkReq) { - chanPoint := req.chanPoint - key := wire.OutPoint{ - Hash: chanPoint.Hash, - Index: chanPoint.Index, +// executeForceClose executes a unilateral close of the target channel by +// broadcasting the current commitment state directly on-chain. Once the +// commitment transaction has been broadcast, a struct describing the final +// state of the channel is sent to the utxoNursery in order to ultimatley sweep +// the immature outputs. +func (p *peer) executeForceClose(channel *lnwallet.LightningChannel) (*wire.ShaHash, error) { + // Execute a unilateral close shutting down all further channel + // operation. + closeSummary, err := channel.ForceClose() + if err != nil { + return nil, err } - channel := p.activeChannels[key] + closeTx := closeSummary.CloseTx + txid := closeTx.TxSha() + + // With the close transaction in hand, broadcast the transaction to the + // network, thereby entering the psot channel resolution state. + peerLog.Infof("Broadcasting force close transaction: %v", + channel.ChannelPoint(), newLogClosure(func() string { + return spew.Sdump(closeTx) + })) + if err := p.server.lnwallet.PublishTransaction(closeTx); err != nil { + return nil, err + } + + // Send the closed channel sumary over to the utxoNursery in order to + // have its outputs sweeped back into the wallet once they're mature. + p.server.utxoNursery.incubateOutputs(closeSummary) + + return &txid, nil +} + +// executeCooperativeClose executes the initial phase of a user-executed +// cooperative channel close. The channel state machine is transitioned to the +// closing phase, then our half of the closing witness is sent over to the +// remote peer. +func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*wire.ShaHash, error) { // Shift the channel state machine into a 'closing' state. This // generates a signature for the closing tx, as well as a txid of the // closing tx itself, allowing us to watch the network to determine - // when the remote node broadcasts the fully signed closing transaction. + // when the remote node broadcasts the fully signed closing + // transaction. sig, txid, err := channel.InitCooperativeClose() if err != nil { - req.err <- err - return + return nil, err } - peerLog.Infof("Executing cooperative closure of "+ - "ChanPoint(%v) with peerID(%v), txid=%v", key, p.id, - txid) - // With our signature for the close tx generated, send the signature - // to the remote peer instructing it to close this particular channel + chanPoint := channel.ChannelPoint() + peerLog.Infof("Executing cooperative closure of "+ + "ChanPoint(%v) with peerID(%v), txid=%v", chanPoint, p.id, txid) + + // With our signature for the close tx generated, send the signature to + // the remote peer instructing it to close this particular channel // point. // TODO(roasbeef): remove encoding redundancy closeSig, err := btcec.ParseSignature(sig, btcec.S256()) if err != nil { - req.err <- err - return + return nil, err } closeReq := lnwire.NewCloseRequest(chanPoint, closeSig) p.queueMsg(closeReq, nil) + return txid, nil +} + +// handleLocalClose kicks-off the workflow to execute a cooperative or forced +// unilateral closure of the channel initiated by a local sub-system. +func (p *peer) handleLocalClose(req *closeLinkReq) { + var ( + err error + closingTxid *wire.ShaHash + ) + + channel := p.activeChannels[*req.chanPoint] + + if req.forceClose { + closingTxid, err = p.executeForceClose(channel) + peerLog.Infof("Force closing ChannelPoint(%v) with txid: %v", + req.chanPoint, closingTxid) + } else { + closingTxid, err = p.executeCooperativeClose(channel) + peerLog.Infof("Attempting cooperative close of "+ + "ChannelPoint(%v) with txid: %v", req.chanPoint, + closingTxid) + } + if err != nil { + req.err <- err + return + } + // Update the caller w.r.t the current pending state of this request. req.updates <- &lnrpc.CloseStatusUpdate{ Update: &lnrpc.CloseStatusUpdate_ClosePending{ ClosePending: &lnrpc.PendingUpdate{ - Txid: txid[:], + Txid: closingTxid[:], }, }, } @@ -673,7 +729,8 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { // confirmation. go func() { // TODO(roasbeef): add param for num needed confs - confNtfn, err := p.server.chainNotifier.RegisterConfirmationsNtfn(txid, 1) + notifier := p.server.chainNotifier + confNtfn, err := notifier.RegisterConfirmationsNtfn(closingTxid, 1) if err != nil { req.err <- err return @@ -692,7 +749,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { // The channel has been closed, remove it from any // active indexes, and the database state. peerLog.Infof("ChannelPoint(%v) is now "+ - "closed at height %v", key, height) + "closed at height %v", req.chanPoint, height) if err := wipeChannel(p, channel); err != nil { req.err <- err return @@ -706,7 +763,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { req.updates <- &lnrpc.CloseStatusUpdate{ Update: &lnrpc.CloseStatusUpdate_ChanClose{ ChanClose: &lnrpc.ChannelCloseUpdate{ - ClosingTxid: txid[:], + ClosingTxid: closingTxid[:], Success: true, }, }, @@ -867,6 +924,19 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, out: for { select { + case <-channel.UnilateralCloseSignal: + peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", + state.chanPoint) + if err := wipeChannel(p, channel); err != nil { + peerLog.Errorf("Unable to wipe channel %v", err) + } + break out + case <-channel.ForceCloseSignal: + peerLog.Warnf("ChannelPoint(%v) has been force "+ + "closed, disconnecting from peerID(%x)", + state.chanPoint, p.id) + break out + //p.Disconnect() // TODO(roasbeef): prevent leaking ticker? case <-state.logCommitTimer: // If we haven't sent or received a new commitment diff --git a/rpcserver.go b/rpcserver.go index f67821cd..82d3e799 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -246,6 +246,7 @@ out: func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, updateStream lnrpc.Lightning_CloseChannelServer) error { + force := in.Force index := in.ChannelPoint.OutputIndex txid, err := wire.NewShaHash(in.ChannelPoint.FundingTxid) if err != nil { @@ -257,7 +258,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)", targetChannelPoint) - updateChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint) + updateChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint, force) out: for { @@ -279,7 +280,7 @@ out: switch closeUpdate := closingUpdate.Update.(type) { case *lnrpc.CloseStatusUpdate_ChanClose: h, _ := wire.NewShaHash(closeUpdate.ChanClose.ClosingTxid) - rpcsLog.Errorf("[closechannel] close completed: "+ + rpcsLog.Infof("[closechannel] close completed: "+ "txid(%v)", h) break out } @@ -492,11 +493,11 @@ func (r *rpcServer) ShowRoutingTable(ctx context.Context, for _, channel := range rtCopy.AllChannels() { channels = append(channels, &lnrpc.RoutingTableLink{ - Id1: channel.Id1.String(), - Id2: channel.Id2.String(), - Outpoint: channel.EdgeID.String(), - Capacity: channel.Info.Capacity(), - Weight: channel.Info.Weight(), + Id1: channel.Id1.String(), + Id2: channel.Id2.String(), + Outpoint: channel.EdgeID.String(), + Capacity: channel.Info.Capacity(), + Weight: channel.Info.Weight(), }, ) }