htlcswitch/link: restrict EligibleToForward to wait for reestablish

This commit modifies the link's EligibleToForward() method only return
true once the peers have successfully exchanged channel reestablish
messages. This is a preliminary step to increasing the reestablish
timeout, ensuring the switch won't try to forward over links while
we're waiting for the remote peer to resume the connection.
This commit is contained in:
Conner Fromknecht 2019-09-19 12:46:56 -07:00
parent 6dca07577d
commit 9d6ee2ebd9
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
2 changed files with 56 additions and 14 deletions

@ -280,6 +280,7 @@ type ChannelLinkConfig struct {
type channelLink struct {
// The following fields are only meant to be used *atomically*
started int32
reestablished int32
shutdown int32
// failed should be set to true in case a link error happens, making
@ -540,7 +541,21 @@ func (l *channelLink) WaitForShutdown() {
// the all-zero source ID, meaning that the channel has had its ID finalized.
func (l *channelLink) EligibleToForward() bool {
return l.channel.RemoteNextRevocation() != nil &&
l.ShortChanID() != hop.Source
l.ShortChanID() != hop.Source &&
l.isReestablished()
}
// isReestablished returns true if the link has successfully completed the
// channel reestablishment dance.
func (l *channelLink) isReestablished() bool {
return atomic.LoadInt32(&l.reestablished) == 1
}
// markReestablished signals that the remote peer has successfully exchanged
// channel reestablish messages and that the channel is ready to process
// subsequent messages.
func (l *channelLink) markReestablished() {
atomic.StoreInt32(&l.reestablished, 1)
}
// sampleNetworkFee samples the current fee rate on the network to get into the
@ -878,14 +893,6 @@ func (l *channelLink) htlcManager() {
log.Infof("HTLC manager for ChannelPoint(%v) started, "+
"bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth())
// Funding locked has already been received, so we'll go ahead and
// deliver the active channel notification since EligibleToForward
// returns true now that the link has been added to the switch. We'll
// also defer the inactive notification for when the link exits to
// ensure that every active notification is matched by an inactive one.
l.cfg.NotifyActiveChannel(*l.ChannelPoint())
defer l.cfg.NotifyInactiveChannel(*l.ChannelPoint())
// TODO(roasbeef): need to call wipe chan whenever D/C?
// If this isn't the first time that this channel link has been
@ -961,6 +968,17 @@ func (l *channelLink) htlcManager() {
}
}
// We've successfully reestablished the channel, mark it as such to
// allow the switch to forward HTLCs in the outbound direction.
l.markReestablished()
// Now that we've received both funding locked and channel reestablish,
// we can go ahead and send the active channel notification. We'll also
// defer the inactive notification for when the link exits to ensure
// that every active notification is matched by an inactive one.
l.cfg.NotifyActiveChannel(*l.ChannelPoint())
defer l.cfg.NotifyInactiveChannel(*l.ChannelPoint())
// With the channel states synced, we now reset the mailbox to ensure
// we start processing all unacked packets in order. This is done here
// to ensure that all acknowledgments that occur during channel

@ -29,6 +29,7 @@ import (
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
@ -837,7 +838,12 @@ func (n *threeHopNetwork) start() error {
return err
}
return nil
return waitLinksEligible(map[string]*channelLink{
"alice": n.aliceChannelLink,
"bob first": n.firstBobChannelLink,
"bob second": n.secondBobChannelLink,
"carol": n.carolChannelLink,
})
}
// stop stops nodes and cleanup its databases.
@ -1130,6 +1136,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
if err := server.htlcSwitch.AddLink(link); err != nil {
return nil, fmt.Errorf("unable to add channel link: %v", err)
}
go func() {
for {
select {
@ -1230,7 +1237,10 @@ func (n *twoHopNetwork) start() error {
return err
}
return nil
return waitLinksEligible(map[string]*channelLink{
"alice": n.aliceChannelLink,
"bob": n.bobChannelLink,
})
}
// stop stops nodes and cleanup its databases.
@ -1320,12 +1330,26 @@ func (n *twoHopNetwork) makeHoldPayment(sendingPeer, receivingPeer lnpeer.Peer,
return paymentErr
}
// waitLinksEligible blocks until all links the provided name-to-link map are
// eligible to forward HTLCs.
func waitLinksEligible(links map[string]*channelLink) error {
return wait.NoError(func() error {
for name, link := range links {
if link.EligibleToForward() {
continue
}
return fmt.Errorf("%s channel link not eligible", name)
}
return nil
}, 3*time.Second)
}
// timeout implements a test level timeout.
func timeout(t *testing.T) func() {
done := make(chan struct{})
go func() {
select {
case <-time.After(5 * time.Second):
case <-time.After(10 * time.Second):
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
panic("test timeout")