peer: properly manage channel close lifecycle within the database
Within this commit the peer will now properly manage the channel close life cycle within the database. This entails marking the channel as pending closed either once the closing transaction has been broadcast or the close request message has been sent to the other side. Once the closing transaction has been confirmed, the transaction will be marked as fully closed within the database. A helper function has been added to factor out “waiting for a transaction to confirm” when handling moth local and remote cooperative closure flows. Finally, we no longer delete the channel state within wipeChannel as this will now be managed distinctly by callers.
This commit is contained in:
parent
20bbe1e12d
commit
62d6ac6a8f
156
peer.go
156
peer.go
@ -6,6 +6,7 @@ import (
|
|||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -14,6 +15,7 @@ import (
|
|||||||
"github.com/go-errors/errors"
|
"github.com/go-errors/errors"
|
||||||
"github.com/lightningnetwork/lightning-onion"
|
"github.com/lightningnetwork/lightning-onion"
|
||||||
"github.com/lightningnetwork/lnd/brontide"
|
"github.com/lightningnetwork/lnd/brontide"
|
||||||
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/lnrpc"
|
"github.com/lightningnetwork/lnd/lnrpc"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
@ -857,7 +859,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
|
|||||||
|
|
||||||
switch req.CloseType {
|
switch req.CloseType {
|
||||||
// A type of CloseRegular indicates that the user has opted to close
|
// A type of CloseRegular indicates that the user has opted to close
|
||||||
// out this channel on-chian, so we execute the cooperative channel
|
// out this channel on-chain, so we execute the cooperative channel
|
||||||
// closure workflow.
|
// closure workflow.
|
||||||
case CloseRegular:
|
case CloseRegular:
|
||||||
closingTxid, err = p.executeCooperativeClose(channel)
|
closingTxid, err = p.executeCooperativeClose(channel)
|
||||||
@ -883,6 +885,32 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Once we've completed the cooperative channel closure, we'll wipe the
|
||||||
|
// channel so we reject any incoming forward or payment requests via
|
||||||
|
// this channel.
|
||||||
|
p.server.breachArbiter.settledContracts <- req.chanPoint
|
||||||
|
if err := wipeChannel(p, channel); err != nil {
|
||||||
|
req.err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear out the current channel state, marking the channel as being
|
||||||
|
// closed within the database.
|
||||||
|
chanInfo := channel.StateSnapshot()
|
||||||
|
closeSummary := &channeldb.ChannelCloseSummary{
|
||||||
|
ChanPoint: *req.chanPoint,
|
||||||
|
ClosingTXID: *closingTxid,
|
||||||
|
RemotePub: &chanInfo.RemoteIdentity,
|
||||||
|
Capacity: chanInfo.Capacity,
|
||||||
|
OurBalance: chanInfo.LocalBalance,
|
||||||
|
CloseType: channeldb.CooperativeClose,
|
||||||
|
IsPending: true,
|
||||||
|
}
|
||||||
|
if err := channel.DeleteState(closeSummary); err != nil {
|
||||||
|
req.err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Update the caller with a new event detailing the current pending
|
// Update the caller with a new event detailing the current pending
|
||||||
// state of this request.
|
// state of this request.
|
||||||
req.updates <- &lnrpc.CloseStatusUpdate{
|
req.updates <- &lnrpc.CloseStatusUpdate{
|
||||||
@ -896,36 +924,16 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
|
|||||||
// Finally, launch a goroutine which will request to be notified by the
|
// Finally, launch a goroutine which will request to be notified by the
|
||||||
// ChainNotifier once the closure transaction obtains a single
|
// ChainNotifier once the closure transaction obtains a single
|
||||||
// confirmation.
|
// confirmation.
|
||||||
go func() {
|
notifier := p.server.chainNotifier
|
||||||
// TODO(roasbeef): add param for num needed confs
|
go waitForChanToClose(notifier, req.err, req.chanPoint, closingTxid, func() {
|
||||||
notifier := p.server.chainNotifier
|
// First, we'll mark the database as being fully closed so
|
||||||
confNtfn, err := notifier.RegisterConfirmationsNtfn(closingTxid, 1)
|
// we'll no longer watch for its ultimate closure upon startup.
|
||||||
|
err := p.server.chanDB.MarkChanFullyClosed(req.chanPoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.err <- err
|
req.err <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
|
||||||
case height, ok := <-confNtfn.Confirmed:
|
|
||||||
// In the case that the ChainNotifier is shutting down,
|
|
||||||
// all subscriber notification channels will be closed,
|
|
||||||
// generating a nil receive.
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// The channel has been closed, remove it from any
|
|
||||||
// active indexes, and the database state.
|
|
||||||
peerLog.Infof("ChannelPoint(%v) is now closed at "+
|
|
||||||
"height %v", req.chanPoint, height.BlockHeight)
|
|
||||||
if err := wipeChannel(p, channel); err != nil {
|
|
||||||
req.err <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-p.quit:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Respond to the local subsystem which requested the channel
|
// Respond to the local subsystem which requested the channel
|
||||||
// closure.
|
// closure.
|
||||||
req.updates <- &lnrpc.CloseStatusUpdate{
|
req.updates <- &lnrpc.CloseStatusUpdate{
|
||||||
@ -936,9 +944,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
})
|
||||||
p.server.breachArbiter.settledContracts <- req.chanPoint
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleRemoteClose completes a request for cooperative channel closure
|
// handleRemoteClose completes a request for cooperative channel closure
|
||||||
@ -955,9 +961,8 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) {
|
|||||||
|
|
||||||
chanPoint := channel.ChannelPoint()
|
chanPoint := channel.ChannelPoint()
|
||||||
|
|
||||||
// Now that we have their signature for the closure transaction, we
|
// Now that we have their signature for the closure transaction, we can
|
||||||
// can assemble the final closure transaction, complete with our
|
// assemble the final closure transaction, complete with our signature.
|
||||||
// signature.
|
|
||||||
sig := req.RequesterCloseSig
|
sig := req.RequesterCloseSig
|
||||||
closeSig := append(sig.Serialize(), byte(txscript.SigHashAll))
|
closeSig := append(sig.Serialize(), byte(txscript.SigHashAll))
|
||||||
closeTx, err := channel.CompleteCooperativeClose(closeSig)
|
closeTx, err := channel.CompleteCooperativeClose(closeSig)
|
||||||
@ -975,21 +980,94 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) {
|
|||||||
}))
|
}))
|
||||||
|
|
||||||
// Finally, broadcast the closure transaction, to the network.
|
// Finally, broadcast the closure transaction, to the network.
|
||||||
if err := p.server.lnwallet.PublishTransaction(closeTx); err != nil {
|
err = p.server.lnwallet.PublishTransaction(closeTx)
|
||||||
|
if err != nil && !strings.Contains(err.Error(), "already have") {
|
||||||
peerLog.Errorf("channel close tx from "+
|
peerLog.Errorf("channel close tx from "+
|
||||||
"ChannelPoint(%v) rejected: %v",
|
"ChannelPoint(%v) rejected: %v",
|
||||||
chanPoint, err)
|
chanPoint, err)
|
||||||
// TODO(roasbeef): send ErrorGeneric to other side
|
// TODO(roasbeef): send ErrorGeneric to other side
|
||||||
|
// * remove check above to error
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(roasbeef): also wait for confs before removing state
|
p.server.breachArbiter.settledContracts <- chanPoint
|
||||||
|
|
||||||
|
// We've just broadcast the transaction which closes the channel, so
|
||||||
|
// we'll wipe the channel from all our local indexes and also signal to
|
||||||
|
// the switch that this channel is now closed.
|
||||||
peerLog.Infof("ChannelPoint(%v) is now closed", chanPoint)
|
peerLog.Infof("ChannelPoint(%v) is now closed", chanPoint)
|
||||||
if err := wipeChannel(p, channel); err != nil {
|
if err := wipeChannel(p, channel); err != nil {
|
||||||
peerLog.Errorf("unable to wipe channel: %v", err)
|
peerLog.Errorf("unable to wipe channel: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.server.breachArbiter.settledContracts <- chanPoint
|
// Clear out the current channel state, marking the channel as being
|
||||||
|
// closed within the database.
|
||||||
|
closeTxid := closeTx.TxHash()
|
||||||
|
chanInfo := channel.StateSnapshot()
|
||||||
|
closeSummary := &channeldb.ChannelCloseSummary{
|
||||||
|
ChanPoint: *chanPoint,
|
||||||
|
ClosingTXID: closeTxid,
|
||||||
|
RemotePub: &chanInfo.RemoteIdentity,
|
||||||
|
Capacity: chanInfo.Capacity,
|
||||||
|
OurBalance: chanInfo.LocalBalance,
|
||||||
|
CloseType: channeldb.CooperativeClose,
|
||||||
|
IsPending: true,
|
||||||
|
}
|
||||||
|
if err := channel.DeleteState(closeSummary); err != nil {
|
||||||
|
peerLog.Errorf("unable to delete channel state: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, we'll launch a goroutine to watch the network for the
|
||||||
|
// confirmation of the closing transaction, and mark the channel as
|
||||||
|
// such within the database (once it's confirmed").
|
||||||
|
notifier := p.server.chainNotifier
|
||||||
|
go waitForChanToClose(notifier, nil, chanPoint, &closeTxid,
|
||||||
|
func() {
|
||||||
|
// Now that the closing transaction has been confirmed,
|
||||||
|
// we'll mark the database as being fully closed so now
|
||||||
|
// that we no longer watch for its ultimate closure
|
||||||
|
// upon startup.
|
||||||
|
err := p.server.chanDB.MarkChanFullyClosed(chanPoint)
|
||||||
|
if err != nil {
|
||||||
|
peerLog.Errorf("unable to mark channel as closed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForChanToClose uses the passed notifier to wait until the channel has
|
||||||
|
// been detected as closed on chain and then concludes by executing the
|
||||||
|
// following actions: the channel point will be sent over the settleChan, and
|
||||||
|
// finally the callback will be executed. If any error is encountered within
|
||||||
|
// the function, then it will be sent over the errChan.
|
||||||
|
func waitForChanToClose(notifier chainntnfs.ChainNotifier,
|
||||||
|
errChan chan error, chanPoint *wire.OutPoint,
|
||||||
|
closingTxID *chainhash.Hash, cb func()) {
|
||||||
|
|
||||||
|
// TODO(roasbeef): add param for num needed confs
|
||||||
|
confNtfn, err := notifier.RegisterConfirmationsNtfn(closingTxID, 1)
|
||||||
|
if err != nil && errChan != nil {
|
||||||
|
errChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// In the case that the ChainNotifier is shutting down, all subscriber
|
||||||
|
// notification channels will be closed, generating a nil receive.
|
||||||
|
height, ok := <-confNtfn.Confirmed
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel has been closed, remove it from any active indexes, and
|
||||||
|
// the database state.
|
||||||
|
srvrLog.Infof("ChannelPoint(%v) is now closed at "+
|
||||||
|
"height %v", chanPoint, height.BlockHeight)
|
||||||
|
|
||||||
|
// Finally, execute the closure call back to mark the confirmation of
|
||||||
|
// the transaction closing the contract.
|
||||||
|
cb()
|
||||||
}
|
}
|
||||||
|
|
||||||
// wipeChannel removes the passed channel from all indexes associated with the
|
// wipeChannel removes the passed channel from all indexes associated with the
|
||||||
@ -1030,14 +1108,6 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error {
|
|||||||
delete(p.htlcManagers, chanID)
|
delete(p.htlcManagers, chanID)
|
||||||
p.htlcManMtx.RUnlock()
|
p.htlcManMtx.RUnlock()
|
||||||
|
|
||||||
// Finally, we purge the channel's state from the database, leaving a
|
|
||||||
// small summary for historical records.
|
|
||||||
if err := channel.DeleteState(); err != nil {
|
|
||||||
peerLog.Errorf("Unable to delete ChannelPoint(%v) "+
|
|
||||||
"from db: %v", chanID, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user