diff --git a/Gopkg.lock b/Gopkg.lock index 3791d9da..b5bb71b4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -330,7 +330,7 @@ revision = "1efa31f08b9333f1bd4882d61f9d668a70cd902e" [[projects]] - digest = "1:61cf25ac3ac0c8c50959666ddf732a396fbc445570be051c8b9795c8a087fde7" + digest = "1:270627be1fb5b0875bdbf8a96f86a551adc0aeba97fb63af3caaa21c0a39c499" name = "golang.org/x/crypto" packages = [ "blake2b", @@ -338,6 +338,7 @@ "curve25519", "hkdf", "internal/chacha20", + "internal/subtle", "nacl/box", "nacl/secretbox", "pbkdf2", @@ -349,7 +350,7 @@ "ssh/terminal", ] pruneopts = "UT" - revision = "49796115aa4b964c318aad4f3084fdb41e9aa067" + revision = "614d502a4dac94afa3a6ce146bd1736da82514c6" [[projects]] digest = "1:0764abb1e99bb977d1b9f320d02859d4a737252da3e1fd233c4ae0f9522e7446" diff --git a/Gopkg.toml b/Gopkg.toml index 2dec59e3..062bf36c 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -84,7 +84,7 @@ [[constraint]] name = "golang.org/x/crypto" - revision = "49796115aa4b964c318aad4f3084fdb41e9aa067" + revision = "614d502a4dac94afa3a6ce146bd1736da82514c6" [[constraint]] name = "golang.org/x/net" diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 75daea1a..a698d60d 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -481,6 +481,11 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, select { case d.networkMsgs <- nMsg: + + // If the peer that sent us this error is quitting, then we don't need + // to send back an error and can return immediately. + case <-peer.QuitSignal(): + return nil case <-d.quit: nMsg.err <- ErrGossiperShuttingDown } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 7d98c558..bc3642fd 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -2181,3 +2181,6 @@ func (p *mockPeer) PubKey() [33]byte { return pubkey } func (p *mockPeer) Address() net.Addr { return nil } +func (p *mockPeer) QuitSignal() <-chan struct{} { + return p.quit +} diff --git a/fundingmanager.go b/fundingmanager.go index 425ded11..2ab37788 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -2771,7 +2771,9 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { // waitUntilChannelOpen is designed to prevent other lnd subsystems from // sending new update messages to a channel before the channel is fully // opened. -func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { +func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID, + quit <-chan struct{}) error { + f.barrierMtx.RLock() barrier, ok := f.newChanBarriers[targetChan] f.barrierMtx.RUnlock() @@ -2781,12 +2783,17 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { select { case <-barrier: - case <-f.quit: // TODO(roasbeef): add timer? - break + case <-quit: + return ErrFundingManagerShuttingDown + case <-f.quit: + return ErrFundingManagerShuttingDown } fndgLog.Tracef("barrier for ChanID(%v) closed", targetChan) + return nil } + + return nil } // processFundingError sends a message to the fundingManager allowing it to diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 88253877..752f3b53 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -174,6 +174,10 @@ func (n *testNode) WipeChannel(_ *wire.OutPoint) error { return nil } +func (n *testNode) QuitSignal() <-chan struct{} { + return n.shutdownChannel +} + func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel, quit <-chan struct{}) error { diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 1bb1fc95..f9b72405 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1459,6 +1459,10 @@ type mockPeer struct { quit chan struct{} } +func (m *mockPeer) QuitSignal() <-chan struct{} { + return m.quit +} + var _ lnpeer.Peer = (*mockPeer)(nil) func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index e2539fac..617229b6 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -238,6 +238,10 @@ func (s *mockServer) Start() error { return nil } +func (s *mockServer) QuitSignal() <-chan struct{} { + return s.quit +} + // mockHopIterator represents the test version of hop iterator which instead // of encrypting the path in onion blob just stores the path as a list of hops. type mockHopIterator struct { diff --git a/lnpeer/peer.go b/lnpeer/peer.go index 99efe845..fb0e743c 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -33,4 +33,10 @@ type Peer interface { // Address returns the network address of the remote peer. Address() net.Addr + + // QuitSignal is a method that should return a channel which will be + // sent upon or closed once the backing peer exits. This allows callers + // using the interface to cancel any processing in the event the backing + // implementation exits. + QuitSignal() <-chan struct{} } diff --git a/peer.go b/peer.go index a49a3cf7..c82ea843 100644 --- a/peer.go +++ b/peer.go @@ -315,10 +315,20 @@ func (p *peer) Start() error { return nil } +// QuitSignal is a method that should return a channel which will be sent upon +// or closed once the backing peer exits. This allows callers using the +// interface to cancel any processing in the event the backing implementation +// exits. +// +// NOTE: Part of the lnpeer.Peer interface. +func (p *peer) QuitSignal() <-chan struct{} { + return p.quit +} + // loadActiveChannels creates indexes within the peer for tracking all active // channels returned by the database. func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { - var activeChans []wire.OutPoint + var activePublicChans []wire.OutPoint for _, dbChan := range chans { lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, p.server.witnessBeacon, dbChan, @@ -431,14 +441,19 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { p.activeChannels[chanID] = lnChan p.activeChanMtx.Unlock() - activeChans = append(activeChans, *chanPoint) + // Only if the channel is public do we need to collect it for + // sending out a new enable update. + chanIsPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 + if chanIsPublic { + activePublicChans = append(activePublicChans, *chanPoint) + } } - // As a final measure we launch a goroutine that will ensure the - // channels are not currently disabled, as that will make us skip it - // during path finding. + // As a final measure we launch a goroutine that will ensure the newly + // loaded public channels are not currently disabled, as that will make + // us skip it during path finding. go func() { - for _, chanPoint := range activeChans { + for _, chanPoint := range activePublicChans { // Set the channel disabled=false by sending out a new // ChannelUpdate. If this channel is already active, // the update won't be sent. @@ -738,6 +753,7 @@ func (ms *msgStream) Stop() { func (ms *msgStream) msgConsumer() { defer ms.wg.Done() defer peerLog.Tracef(ms.stopMsg) + defer atomic.StoreInt32(&ms.streamShutdown, 1) peerLog.Tracef(ms.startMsg) @@ -752,9 +768,10 @@ func (ms *msgStream) msgConsumer() { // Otherwise, we'll check the message queue for any new // items. select { + case <-ms.peer.quit: + return case <-ms.quit: ms.msgCond.L.Unlock() - atomic.StoreInt32(&ms.streamShutdown, 1) return default: } @@ -777,8 +794,9 @@ func (ms *msgStream) msgConsumer() { // grow indefinitely. select { case ms.producerSema <- struct{}{}: + case <-ms.peer.quit: + return case <-ms.quit: - atomic.StoreInt32(&ms.streamShutdown, 1) return } } @@ -837,13 +855,29 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { // to the other side, they immediately send a // channel update message, but we haven't yet // sent the channel to the channelManager. - p.server.fundingMgr.waitUntilChannelOpen(cid) + err := p.server.fundingMgr.waitUntilChannelOpen( + cid, p.quit, + ) + if err != nil { + // If we have a non-nil error, then the + // funding manager is shutting down, s + // we can exit here without attempting + // to deliver the message. + return + } } - // TODO(roasbeef): only wait if not chan sync + // In order to avoid unnecessarily delivering message + // as the peer is exiting, we'll check quickly to see + // if we need to exit. + select { + case <-p.quit: + return + default: + } - // Dispatch the commitment update message to the proper active - // goroutine dedicated to this channel. + // Dispatch the commitment update message to the proper + // active goroutine dedicated to this channel. if chanLink == nil { link, err := p.server.htlcSwitch.GetLink(cid) if err != nil { @@ -854,6 +888,15 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { chanLink = link } + // In order to avoid unnecessarily delivering message + // as the peer is exiting, we'll check quickly to see + // if we need to exit. + select { + case <-p.quit: + return + default: + } + chanLink.HandleChannelUpdate(msg) }, ) @@ -878,6 +921,7 @@ func newDiscMsgStream(p *peer) *msgStream { // // NOTE: This method MUST be run as a goroutine. func (p *peer) readHandler() { + defer p.wg.Done() // We'll stop the timer after a new messages is received, and also // reset it after we process the next message. @@ -1056,6 +1100,7 @@ out: chanStream = newChanMsgStream(p, targetChan) chanMsgStreams[targetChan] = chanStream chanStream.Start() + defer chanStream.Stop() } // With the stream obtained, add the message to the @@ -1066,16 +1111,8 @@ out: idleTimer.Reset(idleTimeout) } - p.wg.Done() - p.Disconnect(errors.New("read handler closed")) - for cid, chanStream := range chanMsgStreams { - chanStream.Stop() - - delete(chanMsgStreams, cid) - } - peerLog.Tracef("readHandler for peer %v done", p) } diff --git a/server.go b/server.go index 74a85a2f..9bd996f5 100644 --- a/server.go +++ b/server.go @@ -3090,6 +3090,13 @@ func (s *server) watchChannelStatus() { // the status of closed channels around. newStatus := make(map[wire.OutPoint]activeStatus) for _, c := range channels { + // We'll skip any private channels, as they + // aren't used for routing within the network + // by other nodes. + if c.ChannelFlags&lnwire.FFAnnounceChannel == 0 { + continue + } + chanID := lnwire.NewChanIDFromOutPoint( &c.FundingOutpoint)