Merge pull request #457 from Roasbeef/funding-locked-fix
multi: fix reliable re-processing+re-transmission of FundingLocked factoring in ChannelReestabilshment
This commit is contained in:
commit
c0e88cd8a8
@ -85,6 +85,13 @@ type ChannelLink interface {
|
|||||||
// the channel link opened.
|
// the channel link opened.
|
||||||
Peer() Peer
|
Peer() Peer
|
||||||
|
|
||||||
|
// EligibleToForward returns a bool indicating if the channel is able
|
||||||
|
// to actively accept requests to forward HTLC's. A channel may be
|
||||||
|
// active, but not able to forward HTLC's if it hasn't yet finalized
|
||||||
|
// the pre-channel operation protocol with the remote peer. The switch
|
||||||
|
// will use this function in forwarding decisions accordingly.
|
||||||
|
EligibleToForward() bool
|
||||||
|
|
||||||
// Start/Stop are used to initiate the start/stop of the channel link
|
// Start/Stop are used to initiate the start/stop of the channel link
|
||||||
// functioning.
|
// functioning.
|
||||||
Start() error
|
Start() error
|
||||||
|
@ -292,6 +292,14 @@ func (l *channelLink) Stop() {
|
|||||||
l.cfg.BlockEpochs.Cancel()
|
l.cfg.BlockEpochs.Cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EligibleToForward returns a bool indicating if the channel is able to
|
||||||
|
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
|
||||||
|
// we know the remote party's next revocation point. Otherwise, we can't
|
||||||
|
// initiate new channel state.
|
||||||
|
func (l *channelLink) EligibleToForward() bool {
|
||||||
|
return l.channel.RemoteNextRevocation() != nil
|
||||||
|
}
|
||||||
|
|
||||||
// sampleNetworkFee samples the current fee rate on the network to get into the
|
// sampleNetworkFee samples the current fee rate on the network to get into the
|
||||||
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
|
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
|
||||||
// this is the native rate used when computing the fee for commitment
|
// this is the native rate used when computing the fee for commitment
|
||||||
@ -397,10 +405,12 @@ func (l *channelLink) htlcManager() {
|
|||||||
// this, as at this point we can't be sure if they've
|
// this, as at this point we can't be sure if they've
|
||||||
// really received the FundingLocked message.
|
// really received the FundingLocked message.
|
||||||
if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
|
if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
|
||||||
localChanSyncMsg.NextLocalCommitHeight == 1 {
|
localChanSyncMsg.NextLocalCommitHeight == 1 &&
|
||||||
|
!l.channel.IsPending() {
|
||||||
|
|
||||||
log.Debugf("Resending fundingLocked message " +
|
log.Infof("ChannelPoint(%v): resending "+
|
||||||
"to peer")
|
"FundingLocked message to peer",
|
||||||
|
l.channel.ChannelPoint())
|
||||||
|
|
||||||
nextRevocation, err := l.channel.NextRevocationKey()
|
nextRevocation, err := l.channel.NextRevocationKey()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -430,6 +430,7 @@ func (f *mockChannelLink) Bandwidth() lnwire.MilliSatoshi { return 99999999
|
|||||||
func (f *mockChannelLink) Peer() Peer { return f.peer }
|
func (f *mockChannelLink) Peer() Peer { return f.peer }
|
||||||
func (f *mockChannelLink) Start() error { return nil }
|
func (f *mockChannelLink) Start() error { return nil }
|
||||||
func (f *mockChannelLink) Stop() {}
|
func (f *mockChannelLink) Stop() {}
|
||||||
|
func (f *mockChannelLink) EligibleToForward() bool { return true }
|
||||||
|
|
||||||
var _ ChannelLink = (*mockChannelLink)(nil)
|
var _ ChannelLink = (*mockChannelLink)(nil)
|
||||||
|
|
||||||
|
@ -366,7 +366,9 @@ func (s *Switch) handleLocalDispatch(payment *pendingPayment, packet *htlcPacket
|
|||||||
)
|
)
|
||||||
for _, link := range links {
|
for _, link := range links {
|
||||||
bandwidth := link.Bandwidth()
|
bandwidth := link.Bandwidth()
|
||||||
if bandwidth > largestBandwidth {
|
if link.EligibleToForward() &&
|
||||||
|
bandwidth > largestBandwidth {
|
||||||
|
|
||||||
largestBandwidth = bandwidth
|
largestBandwidth = bandwidth
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -488,7 +490,9 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
|
|||||||
// bandwidth.
|
// bandwidth.
|
||||||
var destination ChannelLink
|
var destination ChannelLink
|
||||||
for _, link := range interfaceLinks {
|
for _, link := range interfaceLinks {
|
||||||
if link.Bandwidth() >= htlc.Amount {
|
if link.EligibleToForward() &&
|
||||||
|
link.Bandwidth() >= htlc.Amount {
|
||||||
|
|
||||||
destination = link
|
destination = link
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -5115,3 +5115,12 @@ func (lc *LightningChannel) CommitFeeRate() btcutil.Amount {
|
|||||||
|
|
||||||
return lc.channelState.LocalCommitment.FeePerKw
|
return lc.channelState.LocalCommitment.FeePerKw
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsPending returns true if the channel's funding transaction has been fully
|
||||||
|
// confirmed, and false otherwise.
|
||||||
|
func (lc *LightningChannel) IsPending() bool {
|
||||||
|
lc.RLock()
|
||||||
|
defer lc.RUnlock()
|
||||||
|
|
||||||
|
return lc.channelState.IsPending
|
||||||
|
}
|
||||||
|
76
peer.go
76
peer.go
@ -294,12 +294,6 @@ func (p *peer) Start() error {
|
|||||||
// channels returned by the database.
|
// channels returned by the database.
|
||||||
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
||||||
for _, dbChan := range chans {
|
for _, dbChan := range chans {
|
||||||
// If the channel isn't yet open, then we don't need to process
|
|
||||||
// it any further.
|
|
||||||
if dbChan.IsPending {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
lnChan, err := lnwallet.NewLightningChannel(p.server.cc.signer,
|
lnChan, err := lnwallet.NewLightningChannel(p.server.cc.signer,
|
||||||
p.server.cc.chainNotifier, p.server.cc.feeEstimator, dbChan)
|
p.server.cc.chainNotifier, p.server.cc.feeEstimator, dbChan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -308,17 +302,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
|||||||
|
|
||||||
chanPoint := &dbChan.FundingOutpoint
|
chanPoint := &dbChan.FundingOutpoint
|
||||||
|
|
||||||
// If the channel we read form disk has a nil next revocation
|
|
||||||
// key, then we'll skip loading this channel. We must do this
|
|
||||||
// as it doesn't yet have the needed items required to initiate
|
|
||||||
// a local state transition, or one triggered by forwarding an
|
|
||||||
// HTLC.
|
|
||||||
if lnChan.RemoteNextRevocation() == nil {
|
|
||||||
peerLog.Debugf("Skipping ChannelPoint(%v), lacking "+
|
|
||||||
"next commit point", chanPoint)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
|
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
|
||||||
|
|
||||||
p.activeChanMtx.Lock()
|
p.activeChanMtx.Lock()
|
||||||
@ -600,13 +583,25 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
|
|||||||
fmt.Sprintf("Update stream for ChannelID(%x) created", cid),
|
fmt.Sprintf("Update stream for ChannelID(%x) created", cid),
|
||||||
fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid),
|
fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid),
|
||||||
func(msg lnwire.Message) {
|
func(msg lnwire.Message) {
|
||||||
// We'll send a message to the funding manager and wait iff an
|
_, isChanSycMsg := msg.(*lnwire.ChannelReestablish)
|
||||||
// active funding process for this channel hasn't yet completed.
|
|
||||||
// We do this in order to account for the following scenario: we
|
// If this is the chanSync message, then we'll develri
|
||||||
// send the funding locked message to the other side, they
|
// it imemdately to the active link.
|
||||||
// immediately send a channel update message, but we haven't yet
|
if !isChanSycMsg {
|
||||||
|
// We'll send a message to the funding manager
|
||||||
|
// and wait iff an active funding process for
|
||||||
|
// this channel hasn't yet completed. We do
|
||||||
|
// this in order to account for the following
|
||||||
|
// scenario: we send the funding locked message
|
||||||
|
// to the other side, they immediately send a
|
||||||
|
// channel update message, but we haven't yet
|
||||||
// sent the channel to the channelManager.
|
// sent the channel to the channelManager.
|
||||||
|
peerLog.Infof("waiting on chan open to deliver: %v",
|
||||||
|
spew.Sdump(msg))
|
||||||
p.server.fundingMgr.waitUntilChannelOpen(cid)
|
p.server.fundingMgr.waitUntilChannelOpen(cid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(roasbeef): only wait if not chan sync
|
||||||
|
|
||||||
// Dispatch the commitment update message to the proper active
|
// Dispatch the commitment update message to the proper active
|
||||||
// goroutine dedicated to this channel.
|
// goroutine dedicated to this channel.
|
||||||
@ -933,6 +928,8 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) {
|
|||||||
}))
|
}))
|
||||||
|
|
||||||
switch m := msg.(type) {
|
switch m := msg.(type) {
|
||||||
|
case *lnwire.ChannelReestablish:
|
||||||
|
m.LocalUnrevokedCommitPoint.Curve = nil
|
||||||
case *lnwire.RevokeAndAck:
|
case *lnwire.RevokeAndAck:
|
||||||
m.NextRevocationKey.Curve = nil
|
m.NextRevocationKey.Curve = nil
|
||||||
case *lnwire.NodeAnnouncement:
|
case *lnwire.NodeAnnouncement:
|
||||||
@ -1182,13 +1179,46 @@ out:
|
|||||||
|
|
||||||
// Make sure this channel is not already active.
|
// Make sure this channel is not already active.
|
||||||
p.activeChanMtx.Lock()
|
p.activeChanMtx.Lock()
|
||||||
if _, ok := p.activeChannels[chanID]; ok {
|
if currentChan, ok := p.activeChannels[chanID]; ok {
|
||||||
peerLog.Infof("Already have ChannelPoint(%v), "+
|
peerLog.Infof("Already have ChannelPoint(%v), "+
|
||||||
"ignoring.", chanPoint)
|
"ignoring.", chanPoint)
|
||||||
|
|
||||||
p.activeChanMtx.Unlock()
|
p.activeChanMtx.Unlock()
|
||||||
close(newChanReq.done)
|
close(newChanReq.done)
|
||||||
newChanReq.channel.Stop()
|
newChanReq.channel.Stop()
|
||||||
newChanReq.channel.CancelObserver()
|
newChanReq.channel.CancelObserver()
|
||||||
|
|
||||||
|
// We'll re-send our current channel to the
|
||||||
|
// breachArbiter to ensure that it has the most
|
||||||
|
// up to date version.
|
||||||
|
select {
|
||||||
|
case p.server.breachArbiter.newContracts <- currentChan:
|
||||||
|
case <-p.server.quit:
|
||||||
|
return
|
||||||
|
case <-p.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we're being sent a new channel, and our
|
||||||
|
// existing channel doesn't have the next
|
||||||
|
// revocation, then we need to update the
|
||||||
|
// current exsiting channel.
|
||||||
|
if currentChan.RemoteNextRevocation() != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
peerLog.Infof("Processing retransmitted "+
|
||||||
|
"FundingLocked for ChannelPoint(%v)",
|
||||||
|
chanPoint)
|
||||||
|
|
||||||
|
nextRevoke := newChan.RemoteNextRevocation()
|
||||||
|
err := currentChan.InitNextRevocation(nextRevoke)
|
||||||
|
if err != nil {
|
||||||
|
peerLog.Errorf("unable to init chan "+
|
||||||
|
"revocation: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1465,8 +1465,11 @@ func (r *rpcServer) ListChannels(ctx context.Context,
|
|||||||
|
|
||||||
channelID := lnwire.NewChanIDFromOutPoint(&chanPoint)
|
channelID := lnwire.NewChanIDFromOutPoint(&chanPoint)
|
||||||
var linkActive bool
|
var linkActive bool
|
||||||
if _, err := r.server.htlcSwitch.GetLink(channelID); err == nil {
|
if link, err := r.server.htlcSwitch.GetLink(channelID); err == nil {
|
||||||
linkActive = true
|
// A channel is only considered active if it is known
|
||||||
|
// by the switch *and* able to forward
|
||||||
|
// incoming/outgoing payments.
|
||||||
|
linkActive = link.EligibleToForward()
|
||||||
}
|
}
|
||||||
|
|
||||||
// As this is required for display purposes, we'll calculate
|
// As this is required for display purposes, we'll calculate
|
||||||
|
Loading…
Reference in New Issue
Block a user