From 9d6ee2ebd90d625937a87861ba5619fa6fcd014e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 19 Sep 2019 12:46:56 -0700 Subject: [PATCH] htlcswitch/link: restrict EligibleToForward to wait for reestablish This commit modifies the link's EligibleToForward() method only return true once the peers have successfully exchanged channel reestablish messages. This is a preliminary step to increasing the reestablish timeout, ensuring the switch won't try to forward over links while we're waiting for the remote peer to resume the connection. --- htlcswitch/link.go | 40 +++++++++++++++++++++++++++++----------- htlcswitch/test_utils.go | 30 +++++++++++++++++++++++++++--- 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 5ad55bc7..f405fa7c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -279,8 +279,9 @@ type ChannelLinkConfig struct { // message ordering and updates. type channelLink struct { // The following fields are only meant to be used *atomically* - started int32 - shutdown int32 + started int32 + reestablished int32 + shutdown int32 // failed should be set to true in case a link error happens, making // sure we don't process any more updates. @@ -540,7 +541,21 @@ func (l *channelLink) WaitForShutdown() { // the all-zero source ID, meaning that the channel has had its ID finalized. func (l *channelLink) EligibleToForward() bool { return l.channel.RemoteNextRevocation() != nil && - l.ShortChanID() != hop.Source + l.ShortChanID() != hop.Source && + l.isReestablished() +} + +// isReestablished returns true if the link has successfully completed the +// channel reestablishment dance. +func (l *channelLink) isReestablished() bool { + return atomic.LoadInt32(&l.reestablished) == 1 +} + +// markReestablished signals that the remote peer has successfully exchanged +// channel reestablish messages and that the channel is ready to process +// subsequent messages. +func (l *channelLink) markReestablished() { + atomic.StoreInt32(&l.reestablished, 1) } // sampleNetworkFee samples the current fee rate on the network to get into the @@ -878,14 +893,6 @@ func (l *channelLink) htlcManager() { log.Infof("HTLC manager for ChannelPoint(%v) started, "+ "bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth()) - // Funding locked has already been received, so we'll go ahead and - // deliver the active channel notification since EligibleToForward - // returns true now that the link has been added to the switch. We'll - // also defer the inactive notification for when the link exits to - // ensure that every active notification is matched by an inactive one. - l.cfg.NotifyActiveChannel(*l.ChannelPoint()) - defer l.cfg.NotifyInactiveChannel(*l.ChannelPoint()) - // TODO(roasbeef): need to call wipe chan whenever D/C? // If this isn't the first time that this channel link has been @@ -961,6 +968,17 @@ func (l *channelLink) htlcManager() { } } + // We've successfully reestablished the channel, mark it as such to + // allow the switch to forward HTLCs in the outbound direction. + l.markReestablished() + + // Now that we've received both funding locked and channel reestablish, + // we can go ahead and send the active channel notification. We'll also + // defer the inactive notification for when the link exits to ensure + // that every active notification is matched by an inactive one. + l.cfg.NotifyActiveChannel(*l.ChannelPoint()) + defer l.cfg.NotifyInactiveChannel(*l.ChannelPoint()) + // With the channel states synced, we now reset the mailbox to ensure // we start processing all unacked packets in order. This is done here // to ensure that all acknowledgments that occur during channel diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 7bd46531..a908afb3 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -29,6 +29,7 @@ import ( "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -837,7 +838,12 @@ func (n *threeHopNetwork) start() error { return err } - return nil + return waitLinksEligible(map[string]*channelLink{ + "alice": n.aliceChannelLink, + "bob first": n.firstBobChannelLink, + "bob second": n.secondBobChannelLink, + "carol": n.carolChannelLink, + }) } // stop stops nodes and cleanup its databases. @@ -1130,6 +1136,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, if err := server.htlcSwitch.AddLink(link); err != nil { return nil, fmt.Errorf("unable to add channel link: %v", err) } + go func() { for { select { @@ -1230,7 +1237,10 @@ func (n *twoHopNetwork) start() error { return err } - return nil + return waitLinksEligible(map[string]*channelLink{ + "alice": n.aliceChannelLink, + "bob": n.bobChannelLink, + }) } // stop stops nodes and cleanup its databases. @@ -1320,12 +1330,26 @@ func (n *twoHopNetwork) makeHoldPayment(sendingPeer, receivingPeer lnpeer.Peer, return paymentErr } +// waitLinksEligible blocks until all links the provided name-to-link map are +// eligible to forward HTLCs. +func waitLinksEligible(links map[string]*channelLink) error { + return wait.NoError(func() error { + for name, link := range links { + if link.EligibleToForward() { + continue + } + return fmt.Errorf("%s channel link not eligible", name) + } + return nil + }, 3*time.Second) +} + // timeout implements a test level timeout. func timeout(t *testing.T) func() { done := make(chan struct{}) go func() { select { - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) panic("test timeout")