From ce115a6a59f9e04107befac2301521e63077b4a2 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 17:49:12 -0700 Subject: [PATCH 01/15] peer: raise readHandler wait group done to defer statement In this commit, we raise the readHandler wait group done into a defer statement at the top of the method. This fixes an existing but that would cause the readHandler to declare it had exited, yet possibly still be waiting on the chan message stream below to exit. --- peer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/peer.go b/peer.go index a49a3cf7..7c979f85 100644 --- a/peer.go +++ b/peer.go @@ -878,6 +878,7 @@ func newDiscMsgStream(p *peer) *msgStream { // // NOTE: This method MUST be run as a goroutine. func (p *peer) readHandler() { + defer p.wg.Done() // We'll stop the timer after a new messages is received, and also // reset it after we process the next message. @@ -1066,8 +1067,6 @@ out: idleTimer.Reset(idleTimeout) } - p.wg.Done() - p.Disconnect(errors.New("read handler closed")) for cid, chanStream := range chanMsgStreams { From a7656454aab52f7dd3be001d41cf335f4208da87 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 17:53:07 -0700 Subject: [PATCH 02/15] peer: stop chanMsg streams in defer right after creation --- peer.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/peer.go b/peer.go index 7c979f85..63566a8e 100644 --- a/peer.go +++ b/peer.go @@ -1057,6 +1057,7 @@ out: chanStream = newChanMsgStream(p, targetChan) chanMsgStreams[targetChan] = chanStream chanStream.Start() + defer chanStream.Stop() } // With the stream obtained, add the message to the @@ -1069,12 +1070,6 @@ out: p.Disconnect(errors.New("read handler closed")) - for cid, chanStream := range chanMsgStreams { - chanStream.Stop() - - delete(chanMsgStreams, cid) - } - peerLog.Tracef("readHandler for peer %v done", p) } From e4e4a6ab5872e5d1cfc2713b8cc44695bc8778d6 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 17:54:26 -0700 Subject: [PATCH 03/15] peer: move atomic var increment in msgConsumer goroutine to defer In this commit we move the atomic var increment that signals the consumer goourtine has exited to the top of the method in a defer statement. This cleans up some duplicate code and also adheres to the pattern of using defers to signal cleaning up any dependent goroutine state on exit. --- peer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/peer.go b/peer.go index 63566a8e..a9833e4b 100644 --- a/peer.go +++ b/peer.go @@ -738,6 +738,7 @@ func (ms *msgStream) Stop() { func (ms *msgStream) msgConsumer() { defer ms.wg.Done() defer peerLog.Tracef(ms.stopMsg) + defer atomic.StoreInt32(&ms.streamShutdown, 1) peerLog.Tracef(ms.startMsg) @@ -754,7 +755,6 @@ func (ms *msgStream) msgConsumer() { select { case <-ms.quit: ms.msgCond.L.Unlock() - atomic.StoreInt32(&ms.streamShutdown, 1) return default: } @@ -778,7 +778,6 @@ func (ms *msgStream) msgConsumer() { select { case ms.producerSema <- struct{}{}: case <-ms.quit: - atomic.StoreInt32(&ms.streamShutdown, 1) return } } From 0b5a403fcec725b6216da14dab1ac4c3cb82e7c9 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 17:55:53 -0700 Subject: [PATCH 04/15] funding: add caller quit channel to waitUntilChannelOpen In this commit, we add a caller quit channel to waitUntilChannelOpen. This ensures that the caller won't block forever if it needs to exit before the funding manager exits, or the channel barrier is actually closed. --- fundingmanager.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 425ded11..161b0439 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -2771,7 +2771,9 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { // waitUntilChannelOpen is designed to prevent other lnd subsystems from // sending new update messages to a channel before the channel is fully // opened. -func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { +func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID, + quit <-chan struct{}) { + f.barrierMtx.RLock() barrier, ok := f.newChanBarriers[targetChan] f.barrierMtx.RUnlock() @@ -2781,8 +2783,10 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { select { case <-barrier: - case <-f.quit: // TODO(roasbeef): add timer? - break + case <-quit: + return + case <-f.quit: + return } fndgLog.Tracef("barrier for ChanID(%v) closed", targetChan) From 33c5c9661e3ec8e8070d897e616af7b46160b69f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 17:57:06 -0700 Subject: [PATCH 05/15] peer: allow msgStream goroutines to quit within the apply function In this commit, we thread through the quit of the peer to the execution of the apply function for a msgStream. This change ensures that if the target is still processing the message, then the peer is able to exit cleanly and not block insensately. --- peer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/peer.go b/peer.go index a9833e4b..751f1464 100644 --- a/peer.go +++ b/peer.go @@ -836,7 +836,9 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { // to the other side, they immediately send a // channel update message, but we haven't yet // sent the channel to the channelManager. - p.server.fundingMgr.waitUntilChannelOpen(cid) + p.server.fundingMgr.waitUntilChannelOpen( + cid, p.quit, + ) } // TODO(roasbeef): only wait if not chan sync From 5130949604b04eb164e09a6b77850a748fd40e7f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 17:58:24 -0700 Subject: [PATCH 06/15] peer: allow msgConsumers to also exit on peer quit send --- peer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/peer.go b/peer.go index 751f1464..bbf3a60f 100644 --- a/peer.go +++ b/peer.go @@ -753,6 +753,8 @@ func (ms *msgStream) msgConsumer() { // Otherwise, we'll check the message queue for any new // items. select { + case <-ms.peer.quit: + return case <-ms.quit: ms.msgCond.L.Unlock() return @@ -777,6 +779,8 @@ func (ms *msgStream) msgConsumer() { // grow indefinitely. select { case ms.producerSema <- struct{}{}: + case <-ms.peer.quit: + return case <-ms.quit: return } From 745cc3a8f4922dd4031f736a0ccb277229c3ad15 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 18:11:17 -0700 Subject: [PATCH 07/15] server: don't attempt to disable private channels --- server.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server.go b/server.go index 74a85a2f..9bd996f5 100644 --- a/server.go +++ b/server.go @@ -3090,6 +3090,13 @@ func (s *server) watchChannelStatus() { // the status of closed channels around. newStatus := make(map[wire.OutPoint]activeStatus) for _, c := range channels { + // We'll skip any private channels, as they + // aren't used for routing within the network + // by other nodes. + if c.ChannelFlags&lnwire.FFAnnounceChannel == 0 { + continue + } + chanID := lnwire.NewChanIDFromOutPoint( &c.FundingOutpoint) From d6f534cfa52f421efc8485809af50c1fba164ffb Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 18:13:53 -0700 Subject: [PATCH 08/15] peer: don't attempt to re-enable any private channels --- peer.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/peer.go b/peer.go index bbf3a60f..4f62a20e 100644 --- a/peer.go +++ b/peer.go @@ -318,7 +318,7 @@ func (p *peer) Start() error { // loadActiveChannels creates indexes within the peer for tracking all active // channels returned by the database. func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { - var activeChans []wire.OutPoint + var activePublicChans []wire.OutPoint for _, dbChan := range chans { lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, p.server.witnessBeacon, dbChan, @@ -431,14 +431,19 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { p.activeChannels[chanID] = lnChan p.activeChanMtx.Unlock() - activeChans = append(activeChans, *chanPoint) + // Only if the channel is public do we need to collect it for + // sending out a new enable update. + chanIsPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 + if chanIsPublic { + activePublicChans = append(activePublicChans, *chanPoint) + } } - // As a final measure we launch a goroutine that will ensure the - // channels are not currently disabled, as that will make us skip it - // during path finding. + // As a final measure we launch a goroutine that will ensure the newly + // loaded public channels are not currently disabled, as that will make + // us skip it during path finding. go func() { - for _, chanPoint := range activeChans { + for _, chanPoint := range activePublicChans { // Set the channel disabled=false by sending out a new // ChannelUpdate. If this channel is already active, // the update won't be sent. From 0e510c8b5c5567ef077aefbb6e61a7bce9128e5e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 19:58:29 -0700 Subject: [PATCH 09/15] funding+peer: don't attempt to deliver messages if the peer is shutting down --- fundingmanager.go | 9 ++++++--- peer.go | 11 ++++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 161b0439..2ab37788 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -2772,7 +2772,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { // sending new update messages to a channel before the channel is fully // opened. func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID, - quit <-chan struct{}) { + quit <-chan struct{}) error { f.barrierMtx.RLock() barrier, ok := f.newChanBarriers[targetChan] @@ -2784,13 +2784,16 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID, select { case <-barrier: case <-quit: - return + return ErrFundingManagerShuttingDown case <-f.quit: - return + return ErrFundingManagerShuttingDown } fndgLog.Tracef("barrier for ChanID(%v) closed", targetChan) + return nil } + + return nil } // processFundingError sends a message to the fundingManager allowing it to diff --git a/peer.go b/peer.go index 4f62a20e..1edd7368 100644 --- a/peer.go +++ b/peer.go @@ -845,13 +845,18 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { // to the other side, they immediately send a // channel update message, but we haven't yet // sent the channel to the channelManager. - p.server.fundingMgr.waitUntilChannelOpen( + err := p.server.fundingMgr.waitUntilChannelOpen( cid, p.quit, ) + if err != nil { + // If we have a non-nil error, then the + // funding manager is shutting down, s + // we can exit here without attempting + // to deliver the message. + return + } } - // TODO(roasbeef): only wait if not chan sync - // Dispatch the commitment update message to the proper active // goroutine dedicated to this channel. if chanLink == nil { From 169cb723cefe4ac715a22626cc5248f6a3d99e6e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 24 Aug 2018 20:56:35 -0700 Subject: [PATCH 10/15] build: update dep for golang.org/x/crypto to compile under golang 1.11 --- Gopkg.lock | 5 +++-- Gopkg.toml | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 3791d9da..b5bb71b4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -330,7 +330,7 @@ revision = "1efa31f08b9333f1bd4882d61f9d668a70cd902e" [[projects]] - digest = "1:61cf25ac3ac0c8c50959666ddf732a396fbc445570be051c8b9795c8a087fde7" + digest = "1:270627be1fb5b0875bdbf8a96f86a551adc0aeba97fb63af3caaa21c0a39c499" name = "golang.org/x/crypto" packages = [ "blake2b", @@ -338,6 +338,7 @@ "curve25519", "hkdf", "internal/chacha20", + "internal/subtle", "nacl/box", "nacl/secretbox", "pbkdf2", @@ -349,7 +350,7 @@ "ssh/terminal", ] pruneopts = "UT" - revision = "49796115aa4b964c318aad4f3084fdb41e9aa067" + revision = "614d502a4dac94afa3a6ce146bd1736da82514c6" [[projects]] digest = "1:0764abb1e99bb977d1b9f320d02859d4a737252da3e1fd233c4ae0f9522e7446" diff --git a/Gopkg.toml b/Gopkg.toml index 2dec59e3..062bf36c 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -84,7 +84,7 @@ [[constraint]] name = "golang.org/x/crypto" - revision = "49796115aa4b964c318aad4f3084fdb41e9aa067" + revision = "614d502a4dac94afa3a6ce146bd1736da82514c6" [[constraint]] name = "golang.org/x/net" From 1364dca5a6e4293acc6a289aac41a32d226c759a Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 25 Aug 2018 17:10:25 -0700 Subject: [PATCH 11/15] lnpeer: extend Peer interface with new QuitSignal method In this commit, we extend the Peer interface with a new QuitSignal method. This method is meant to expose a read-only quit channel which will allow callers to cancel any actions based on the lifetime of the underlying peer. --- lnpeer/peer.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lnpeer/peer.go b/lnpeer/peer.go index 99efe845..fb0e743c 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -33,4 +33,10 @@ type Peer interface { // Address returns the network address of the remote peer. Address() net.Addr + + // QuitSignal is a method that should return a channel which will be + // sent upon or closed once the backing peer exits. This allows callers + // using the interface to cancel any processing in the event the backing + // implementation exits. + QuitSignal() <-chan struct{} } From 7f480f723c1272aadbe530a8df01d6690ade21a1 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 25 Aug 2018 17:10:57 -0700 Subject: [PATCH 12/15] peer: add QuitSignal to ensure peer struct adheres to lnpeer.Peer interface --- peer.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/peer.go b/peer.go index 1edd7368..3ec3cce4 100644 --- a/peer.go +++ b/peer.go @@ -315,6 +315,16 @@ func (p *peer) Start() error { return nil } +// QuitSignal is a method that should return a channel which will be sent upon +// or closed once the backing peer exits. This allows callers using the +// interface to cancel any processing in the event the backing implementation +// exits. +// +// NOTE: Part of the lnpeer.Peer interface. +func (p *peer) QuitSignal() <-chan struct{} { + return p.quit +} + // loadActiveChannels creates indexes within the peer for tracking all active // channels returned by the database. func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { From 19552b0dbfa8d91e676a734d3b346886525d6739 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 25 Aug 2018 17:11:40 -0700 Subject: [PATCH 13/15] htlcswitch+funding+discovery: update mock peers to add new QuitSignal method --- discovery/gossiper_test.go | 3 +++ fundingmanager_test.go | 4 ++++ htlcswitch/link_test.go | 4 ++++ htlcswitch/mock.go | 4 ++++ 4 files changed, 15 insertions(+) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 7d98c558..bc3642fd 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -2181,3 +2181,6 @@ func (p *mockPeer) PubKey() [33]byte { return pubkey } func (p *mockPeer) Address() net.Addr { return nil } +func (p *mockPeer) QuitSignal() <-chan struct{} { + return p.quit +} diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 88253877..752f3b53 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -174,6 +174,10 @@ func (n *testNode) WipeChannel(_ *wire.OutPoint) error { return nil } +func (n *testNode) QuitSignal() <-chan struct{} { + return n.shutdownChannel +} + func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel, quit <-chan struct{}) error { diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 1bb1fc95..f9b72405 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1459,6 +1459,10 @@ type mockPeer struct { quit chan struct{} } +func (m *mockPeer) QuitSignal() <-chan struct{} { + return m.quit +} + var _ lnpeer.Peer = (*mockPeer)(nil) func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index e2539fac..617229b6 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -238,6 +238,10 @@ func (s *mockServer) Start() error { return nil } +func (s *mockServer) QuitSignal() <-chan struct{} { + return s.quit +} + // mockHopIterator represents the test version of hop iterator which instead // of encrypting the path in onion blob just stores the path as a list of hops. type mockHopIterator struct { From 13a6d413ac4c3d4521df97b2ad216ee622a4b506 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 25 Aug 2018 17:16:03 -0700 Subject: [PATCH 14/15] discovery: select on peer's QuitSignal to allow caller to unblock if disconnecting In this commit, we select on the peer's QuitSignal to allow the caller to unblock if the peer itself is disconnecting. With this change, we now ensure that it isn't possible for a peer to block on this method and prevent a graceful exit. --- discovery/gossiper.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 75daea1a..a698d60d 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -481,6 +481,11 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, select { case d.networkMsgs <- nMsg: + + // If the peer that sent us this error is quitting, then we don't need + // to send back an error and can return immediately. + case <-peer.QuitSignal(): + return nil case <-d.quit: nMsg.err <- ErrGossiperShuttingDown } From f2db18733b003719af53f48467cf3808326f49ab Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 25 Aug 2018 17:16:22 -0700 Subject: [PATCH 15/15] peer: before and after obtaining link for chan update, check quit signal --- peer.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/peer.go b/peer.go index 3ec3cce4..c82ea843 100644 --- a/peer.go +++ b/peer.go @@ -867,8 +867,17 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { } } - // Dispatch the commitment update message to the proper active - // goroutine dedicated to this channel. + // In order to avoid unnecessarily delivering message + // as the peer is exiting, we'll check quickly to see + // if we need to exit. + select { + case <-p.quit: + return + default: + } + + // Dispatch the commitment update message to the proper + // active goroutine dedicated to this channel. if chanLink == nil { link, err := p.server.htlcSwitch.GetLink(cid) if err != nil { @@ -879,6 +888,15 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { chanLink = link } + // In order to avoid unnecessarily delivering message + // as the peer is exiting, we'll check quickly to see + // if we need to exit. + select { + case <-p.quit: + return + default: + } + chanLink.HandleChannelUpdate(msg) }, )