Merge pull request #4144 from cfromknecht/frozen-chan-fixups

multi: frozen chan fixups
This commit is contained in:
Wilmer Paulino 2020-04-03 11:21:22 -07:00 committed by GitHub
commit d04bc200e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 61 additions and 102 deletions

@ -2,6 +2,7 @@ package channeldb
import ( import (
"bytes" "bytes"
"crypto/sha256"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
@ -1483,6 +1484,36 @@ func (c *OpenChannel) BalancesAtHeight(height uint64) (lnwire.MilliSatoshi,
return commit.LocalBalance, commit.RemoteBalance, nil return commit.LocalBalance, commit.RemoteBalance, nil
} }
// ActiveHtlcs returns a slice of HTLC's which are currently active on *both*
// commitment transactions.
func (c *OpenChannel) ActiveHtlcs() []HTLC {
c.RLock()
defer c.RUnlock()
// We'll only return HTLC's that are locked into *both* commitment
// transactions. So we'll iterate through their set of HTLC's to note
// which ones are present on their commitment.
remoteHtlcs := make(map[[32]byte]struct{})
for _, htlc := range c.RemoteCommitment.Htlcs {
onionHash := sha256.Sum256(htlc.OnionBlob)
remoteHtlcs[onionHash] = struct{}{}
}
// Now that we know which HTLC's they have, we'll only mark the HTLC's
// as active if *we* know them as well.
activeHtlcs := make([]HTLC, 0, len(remoteHtlcs))
for _, htlc := range c.LocalCommitment.Htlcs {
onionHash := sha256.Sum256(htlc.OnionBlob)
if _, ok := remoteHtlcs[onionHash]; !ok {
continue
}
activeHtlcs = append(activeHtlcs, htlc)
}
return activeHtlcs
}
// HTLC is the on-disk representation of a hash time-locked contract. HTLCs are // HTLC is the on-disk representation of a hash time-locked contract. HTLCs are
// contained within ChannelDeltas which encode the current state of the // contained within ChannelDeltas which encode the current state of the
// commitment between state updates. // commitment between state updates.

@ -45,8 +45,8 @@ func (p *mockPeer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error { func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error {
return nil return nil
} }
func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } func (p *mockPeer) WipeChannel(_ *wire.OutPoint) {}
func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk }
func (p *mockPeer) PubKey() [33]byte { func (p *mockPeer) PubKey() [33]byte {
var pubkey [33]byte var pubkey [33]byte
copy(pubkey[:], p.pk.SerializeCompressed()) copy(pubkey[:], p.pk.SerializeCompressed())

@ -201,9 +201,7 @@ func (n *testNode) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
return n.SendMessage(sync, msgs...) return n.SendMessage(sync, msgs...)
} }
func (n *testNode) WipeChannel(_ *wire.OutPoint) error { func (n *testNode) WipeChannel(_ *wire.OutPoint) {}
return nil
}
func (n *testNode) QuitSignal() <-chan struct{} { func (n *testNode) QuitSignal() <-chan struct{} {
return n.shutdownChannel return n.shutdownChannel

@ -1084,11 +1084,7 @@ out:
// TODO(roasbeef): remove all together // TODO(roasbeef): remove all together
go func() { go func() {
chanPoint := l.channel.ChannelPoint() chanPoint := l.channel.ChannelPoint()
err := l.cfg.Peer.WipeChannel(chanPoint) l.cfg.Peer.WipeChannel(chanPoint)
if err != nil {
l.log.Errorf("unable to wipe channel "+
"%v", err)
}
}() }()
break out break out

@ -1652,9 +1652,7 @@ func (m *mockPeer) AddNewChannel(_ *channeldb.OpenChannel,
_ <-chan struct{}) error { _ <-chan struct{}) error {
return nil return nil
} }
func (m *mockPeer) WipeChannel(*wire.OutPoint) error { func (m *mockPeer) WipeChannel(*wire.OutPoint) {}
return nil
}
func (m *mockPeer) PubKey() [33]byte { func (m *mockPeer) PubKey() [33]byte {
return [33]byte{} return [33]byte{}
} }

@ -602,9 +602,7 @@ func (s *mockServer) AddNewChannel(channel *channeldb.OpenChannel,
return nil return nil
} }
func (s *mockServer) WipeChannel(*wire.OutPoint) error { func (s *mockServer) WipeChannel(*wire.OutPoint) {}
return nil
}
func (s *mockServer) LocalFeatures() *lnwire.FeatureVector { func (s *mockServer) LocalFeatures() *lnwire.FeatureVector {
return nil return nil

@ -30,7 +30,7 @@ type Peer interface {
// WipeChannel removes the channel uniquely identified by its channel // WipeChannel removes the channel uniquely identified by its channel
// point from all indexes associated with the peer. // point from all indexes associated with the peer.
WipeChannel(*wire.OutPoint) error WipeChannel(*wire.OutPoint)
// PubKey returns the serialized public key of the remote peer. // PubKey returns the serialized public key of the remote peer.
PubKey() [33]byte PubKey() [33]byte

@ -6628,27 +6628,7 @@ func (lc *LightningChannel) ActiveHtlcs() []channeldb.HTLC {
lc.RLock() lc.RLock()
defer lc.RUnlock() defer lc.RUnlock()
// We'll only return HTLC's that are locked into *both* commitment return lc.channelState.ActiveHtlcs()
// transactions. So we'll iterate through their set of HTLC's to note
// which ones are present on their commitment.
remoteHtlcs := make(map[[32]byte]struct{})
for _, htlc := range lc.channelState.RemoteCommitment.Htlcs {
onionHash := sha256.Sum256(htlc.OnionBlob[:])
remoteHtlcs[onionHash] = struct{}{}
}
// Now that we know which HTLC's they have, we'll only mark the HTLC's
// as active if *we* know them as well.
activeHtlcs := make([]channeldb.HTLC, 0, len(remoteHtlcs))
for _, htlc := range lc.channelState.LocalCommitment.Htlcs {
if _, ok := remoteHtlcs[sha256.Sum256(htlc.OnionBlob[:])]; !ok {
continue
}
activeHtlcs = append(activeHtlcs, htlc)
}
return activeHtlcs
} }
// LocalChanReserve returns our local ChanReserve requirement for the remote party. // LocalChanReserve returns our local ChanReserve requirement for the remote party.
@ -6667,14 +6647,6 @@ func (lc *LightningChannel) NextLocalHtlcIndex() (uint64, error) {
return lc.channelState.NextLocalHtlcIndex() return lc.channelState.NextLocalHtlcIndex()
} }
// RemoteCommitHeight returns the commitment height of the remote chain.
func (lc *LightningChannel) RemoteCommitHeight() uint64 {
lc.RLock()
defer lc.RUnlock()
return lc.channelState.RemoteCommitment.CommitHeight
}
// FwdMinHtlc returns the minimum HTLC value required by the remote node, i.e. // FwdMinHtlc returns the minimum HTLC value required by the remote node, i.e.
// the minimum value HTLC we can forward on this channel. // the minimum value HTLC we can forward on this channel.
func (lc *LightningChannel) FwdMinHtlc() lnwire.MilliSatoshi { func (lc *LightningChannel) FwdMinHtlc() lnwire.MilliSatoshi {

24
peer.go

@ -2445,13 +2445,7 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
// TODO(roasbeef): no longer need with newer beach logic? // TODO(roasbeef): no longer need with newer beach logic?
peerLog.Infof("ChannelPoint(%v) has been breached, wiping "+ peerLog.Infof("ChannelPoint(%v) has been breached, wiping "+
"channel", req.ChanPoint) "channel", req.ChanPoint)
if err := p.WipeChannel(req.ChanPoint); err != nil { p.WipeChannel(req.ChanPoint)
peerLog.Infof("Unable to wipe channel after detected "+
"breach: %v", err)
req.Err <- err
return
}
return
} }
} }
@ -2478,11 +2472,7 @@ func (p *peer) handleLinkFailure(failure linkFailureReport) {
// link and cancel back any adds in its mailboxes such that we can // link and cancel back any adds in its mailboxes such that we can
// safely force close without the link being added again and updates // safely force close without the link being added again and updates
// being applied. // being applied.
if err := p.WipeChannel(&failure.chanPoint); err != nil { p.WipeChannel(&failure.chanPoint)
peerLog.Errorf("Unable to wipe link for chanpoint=%v",
failure.chanPoint)
return
}
// If the error encountered was severe enough, we'll now force close the // If the error encountered was severe enough, we'll now force close the
// channel to prevent readding it to the switch in the future. // channel to prevent readding it to the switch in the future.
@ -2534,11 +2524,7 @@ func (p *peer) finalizeChanClosure(chanCloser *channelCloser) {
// First, we'll clear all indexes related to the channel in question. // First, we'll clear all indexes related to the channel in question.
chanPoint := chanCloser.cfg.channel.ChannelPoint() chanPoint := chanCloser.cfg.channel.ChannelPoint()
if err := p.WipeChannel(chanPoint); err != nil { p.WipeChannel(chanPoint)
if closeReq != nil {
closeReq.Err <- err
}
}
// Next, we'll launch a goroutine which will request to be notified by // Next, we'll launch a goroutine which will request to be notified by
// the ChainNotifier once the closure transaction obtains a single // the ChainNotifier once the closure transaction obtains a single
@ -2628,7 +2614,7 @@ func waitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
// WipeChannel removes the passed channel point from all indexes associated with // WipeChannel removes the passed channel point from all indexes associated with
// the peer, and the switch. // the peer, and the switch.
func (p *peer) WipeChannel(chanPoint *wire.OutPoint) error { func (p *peer) WipeChannel(chanPoint *wire.OutPoint) {
chanID := lnwire.NewChanIDFromOutPoint(chanPoint) chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
p.activeChanMtx.Lock() p.activeChanMtx.Lock()
@ -2638,8 +2624,6 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) error {
// Instruct the HtlcSwitch to close this link as the channel is no // Instruct the HtlcSwitch to close this link as the channel is no
// longer active. // longer active.
p.server.htlcSwitch.RemoveLink(chanID) p.server.htlcSwitch.RemoveLink(chanID)
return nil
} }
// handleInitMsg handles the incoming init message which contains global and // handleInitMsg handles the incoming init message which contains global and

@ -1985,24 +1985,17 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
// First, we'll fetch the channel as is, as we'll need to examine it // First, we'll fetch the channel as is, as we'll need to examine it
// regardless of if this is a force close or not. // regardless of if this is a force close or not.
channel, err := r.fetchActiveChannel(*chanPoint) channel, err := r.server.chanDB.FetchChannel(*chanPoint)
if err != nil { if err != nil {
return err return err
} }
// If this is a frozen channel, then we only allow the close to proceed // Retrieve the best height of the chain, which we'll use to complete
// if we were the responder to this channel. // either closing flow.
_, bestHeight, err := r.server.cc.chainIO.GetBestBlock() _, bestHeight, err := r.server.cc.chainIO.GetBestBlock()
if err != nil { if err != nil {
return err return err
} }
if channel.State().ChanType.IsFrozen() && channel.IsInitiator() &&
uint32(bestHeight) < channel.State().ThawHeight {
return fmt.Errorf("cannot co-op close frozen channel as "+
"initiator until height=%v, (current_height=%v)",
channel.State().ThawHeight, bestHeight)
}
// If a force closure was requested, then we'll handle all the details // If a force closure was requested, then we'll handle all the details
// around the creation and broadcast of the unilateral closure // around the creation and broadcast of the unilateral closure
@ -2014,14 +2007,14 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
// ensure that the switch doesn't continue to see this channel // ensure that the switch doesn't continue to see this channel
// as eligible for forwarding HTLC's. If the peer is online, // as eligible for forwarding HTLC's. If the peer is online,
// then we'll also purge all of its indexes. // then we'll also purge all of its indexes.
remotePub := &channel.StateSnapshot().RemoteIdentity remotePub := channel.IdentityPub
if peer, err := r.server.FindPeer(remotePub); err == nil { if peer, err := r.server.FindPeer(remotePub); err == nil {
// TODO(roasbeef): actually get the active channel // TODO(roasbeef): actually get the active channel
// instead too? // instead too?
// * so only need to grab from database // * so only need to grab from database
peer.WipeChannel(channel.ChannelPoint()) peer.WipeChannel(&channel.FundingOutpoint)
} else { } else {
chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint()) chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
r.server.htlcSwitch.RemoveLink(chanID) r.server.htlcSwitch.RemoveLink(chanID)
} }
@ -2057,6 +2050,17 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
} }
}) })
} else { } else {
// If this is a frozen channel, then we only allow the co-op
// close to proceed if we were the responder to this channel.
if channel.ChanType.IsFrozen() && channel.IsInitiator &&
uint32(bestHeight) < channel.ThawHeight {
return fmt.Errorf("cannot co-op close frozen channel "+
"as initiator until height=%v, "+
"(current_height=%v)", channel.ThawHeight,
bestHeight)
}
// If the link is not known by the switch, we cannot gracefully close // If the link is not known by the switch, we cannot gracefully close
// the channel. // the channel.
channelID := lnwire.NewChanIDFromOutPoint(chanPoint) channelID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -2259,10 +2263,7 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
} }
remotePub := dbChan.IdentityPub remotePub := dbChan.IdentityPub
if peer, err := r.server.FindPeer(remotePub); err == nil { if peer, err := r.server.FindPeer(remotePub); err == nil {
if err := peer.WipeChannel(chanPoint); err != nil { peer.WipeChannel(chanPoint)
return nil, fmt.Errorf("unable to wipe "+
"channel state: %v", err)
}
} }
default: default:
@ -2305,25 +2306,6 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
return &lnrpc.AbandonChannelResponse{}, nil return &lnrpc.AbandonChannelResponse{}, nil
} }
// fetchActiveChannel attempts to locate a channel identified by its channel
// point from the database's set of all currently opened channels and
// return it as a fully populated state machine
func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) (
*lnwallet.LightningChannel, error) {
dbChan, err := r.server.chanDB.FetchChannel(chanPoint)
if err != nil {
return nil, err
}
// If the channel is successfully fetched from the database,
// we create a fully populated channel state machine which
// uses the db channel as backing storage.
return lnwallet.NewLightningChannel(
r.server.cc.wallet.Cfg.Signer, dbChan, nil,
)
}
// GetInfo returns general information concerning the lightning node including // GetInfo returns general information concerning the lightning node including
// its identity pubkey, alias, the chains it is connected to, and information // its identity pubkey, alias, the chains it is connected to, and information
// concerning the number of open+pending channels. // concerning the number of open+pending channels.