Merge pull request #1785 from Roasbeef/ensure-peer-quit

peer+server: ensure the peer is always able to quit even mid msgStream application
This commit is contained in:
Olaoluwa Osuntokun 2018-08-25 18:24:12 -07:00 committed by GitHub
commit 26f68da5b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 104 additions and 26 deletions

5
Gopkg.lock generated

@ -330,7 +330,7 @@
revision = "1efa31f08b9333f1bd4882d61f9d668a70cd902e" revision = "1efa31f08b9333f1bd4882d61f9d668a70cd902e"
[[projects]] [[projects]]
digest = "1:61cf25ac3ac0c8c50959666ddf732a396fbc445570be051c8b9795c8a087fde7" digest = "1:270627be1fb5b0875bdbf8a96f86a551adc0aeba97fb63af3caaa21c0a39c499"
name = "golang.org/x/crypto" name = "golang.org/x/crypto"
packages = [ packages = [
"blake2b", "blake2b",
@ -338,6 +338,7 @@
"curve25519", "curve25519",
"hkdf", "hkdf",
"internal/chacha20", "internal/chacha20",
"internal/subtle",
"nacl/box", "nacl/box",
"nacl/secretbox", "nacl/secretbox",
"pbkdf2", "pbkdf2",
@ -349,7 +350,7 @@
"ssh/terminal", "ssh/terminal",
] ]
pruneopts = "UT" pruneopts = "UT"
revision = "49796115aa4b964c318aad4f3084fdb41e9aa067" revision = "614d502a4dac94afa3a6ce146bd1736da82514c6"
[[projects]] [[projects]]
digest = "1:0764abb1e99bb977d1b9f320d02859d4a737252da3e1fd233c4ae0f9522e7446" digest = "1:0764abb1e99bb977d1b9f320d02859d4a737252da3e1fd233c4ae0f9522e7446"

@ -84,7 +84,7 @@
[[constraint]] [[constraint]]
name = "golang.org/x/crypto" name = "golang.org/x/crypto"
revision = "49796115aa4b964c318aad4f3084fdb41e9aa067" revision = "614d502a4dac94afa3a6ce146bd1736da82514c6"
[[constraint]] [[constraint]]
name = "golang.org/x/net" name = "golang.org/x/net"

@ -481,6 +481,11 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
select { select {
case d.networkMsgs <- nMsg: case d.networkMsgs <- nMsg:
// If the peer that sent us this error is quitting, then we don't need
// to send back an error and can return immediately.
case <-peer.QuitSignal():
return nil
case <-d.quit: case <-d.quit:
nMsg.err <- ErrGossiperShuttingDown nMsg.err <- ErrGossiperShuttingDown
} }

@ -2181,3 +2181,6 @@ func (p *mockPeer) PubKey() [33]byte {
return pubkey return pubkey
} }
func (p *mockPeer) Address() net.Addr { return nil } func (p *mockPeer) Address() net.Addr { return nil }
func (p *mockPeer) QuitSignal() <-chan struct{} {
return p.quit
}

@ -2771,7 +2771,9 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
// waitUntilChannelOpen is designed to prevent other lnd subsystems from // waitUntilChannelOpen is designed to prevent other lnd subsystems from
// sending new update messages to a channel before the channel is fully // sending new update messages to a channel before the channel is fully
// opened. // opened.
func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID,
quit <-chan struct{}) error {
f.barrierMtx.RLock() f.barrierMtx.RLock()
barrier, ok := f.newChanBarriers[targetChan] barrier, ok := f.newChanBarriers[targetChan]
f.barrierMtx.RUnlock() f.barrierMtx.RUnlock()
@ -2781,12 +2783,17 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) {
select { select {
case <-barrier: case <-barrier:
case <-f.quit: // TODO(roasbeef): add timer? case <-quit:
break return ErrFundingManagerShuttingDown
case <-f.quit:
return ErrFundingManagerShuttingDown
} }
fndgLog.Tracef("barrier for ChanID(%v) closed", targetChan) fndgLog.Tracef("barrier for ChanID(%v) closed", targetChan)
return nil
} }
return nil
} }
// processFundingError sends a message to the fundingManager allowing it to // processFundingError sends a message to the fundingManager allowing it to

@ -174,6 +174,10 @@ func (n *testNode) WipeChannel(_ *wire.OutPoint) error {
return nil return nil
} }
func (n *testNode) QuitSignal() <-chan struct{} {
return n.shutdownChannel
}
func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel, func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel,
quit <-chan struct{}) error { quit <-chan struct{}) error {

@ -1459,6 +1459,10 @@ type mockPeer struct {
quit chan struct{} quit chan struct{}
} }
func (m *mockPeer) QuitSignal() <-chan struct{} {
return m.quit
}
var _ lnpeer.Peer = (*mockPeer)(nil) var _ lnpeer.Peer = (*mockPeer)(nil)
func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error {

@ -238,6 +238,10 @@ func (s *mockServer) Start() error {
return nil return nil
} }
func (s *mockServer) QuitSignal() <-chan struct{} {
return s.quit
}
// mockHopIterator represents the test version of hop iterator which instead // mockHopIterator represents the test version of hop iterator which instead
// of encrypting the path in onion blob just stores the path as a list of hops. // of encrypting the path in onion blob just stores the path as a list of hops.
type mockHopIterator struct { type mockHopIterator struct {

@ -33,4 +33,10 @@ type Peer interface {
// Address returns the network address of the remote peer. // Address returns the network address of the remote peer.
Address() net.Addr Address() net.Addr
// QuitSignal is a method that should return a channel which will be
// sent upon or closed once the backing peer exits. This allows callers
// using the interface to cancel any processing in the event the backing
// implementation exits.
QuitSignal() <-chan struct{}
} }

77
peer.go

@ -315,10 +315,20 @@ func (p *peer) Start() error {
return nil return nil
} }
// QuitSignal is a method that should return a channel which will be sent upon
// or closed once the backing peer exits. This allows callers using the
// interface to cancel any processing in the event the backing implementation
// exits.
//
// NOTE: Part of the lnpeer.Peer interface.
func (p *peer) QuitSignal() <-chan struct{} {
return p.quit
}
// loadActiveChannels creates indexes within the peer for tracking all active // loadActiveChannels creates indexes within the peer for tracking all active
// channels returned by the database. // channels returned by the database.
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
var activeChans []wire.OutPoint var activePublicChans []wire.OutPoint
for _, dbChan := range chans { for _, dbChan := range chans {
lnChan, err := lnwallet.NewLightningChannel( lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, p.server.witnessBeacon, dbChan, p.server.cc.signer, p.server.witnessBeacon, dbChan,
@ -431,14 +441,19 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
p.activeChannels[chanID] = lnChan p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()
activeChans = append(activeChans, *chanPoint) // Only if the channel is public do we need to collect it for
// sending out a new enable update.
chanIsPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
if chanIsPublic {
activePublicChans = append(activePublicChans, *chanPoint)
}
} }
// As a final measure we launch a goroutine that will ensure the // As a final measure we launch a goroutine that will ensure the newly
// channels are not currently disabled, as that will make us skip it // loaded public channels are not currently disabled, as that will make
// during path finding. // us skip it during path finding.
go func() { go func() {
for _, chanPoint := range activeChans { for _, chanPoint := range activePublicChans {
// Set the channel disabled=false by sending out a new // Set the channel disabled=false by sending out a new
// ChannelUpdate. If this channel is already active, // ChannelUpdate. If this channel is already active,
// the update won't be sent. // the update won't be sent.
@ -738,6 +753,7 @@ func (ms *msgStream) Stop() {
func (ms *msgStream) msgConsumer() { func (ms *msgStream) msgConsumer() {
defer ms.wg.Done() defer ms.wg.Done()
defer peerLog.Tracef(ms.stopMsg) defer peerLog.Tracef(ms.stopMsg)
defer atomic.StoreInt32(&ms.streamShutdown, 1)
peerLog.Tracef(ms.startMsg) peerLog.Tracef(ms.startMsg)
@ -752,9 +768,10 @@ func (ms *msgStream) msgConsumer() {
// Otherwise, we'll check the message queue for any new // Otherwise, we'll check the message queue for any new
// items. // items.
select { select {
case <-ms.peer.quit:
return
case <-ms.quit: case <-ms.quit:
ms.msgCond.L.Unlock() ms.msgCond.L.Unlock()
atomic.StoreInt32(&ms.streamShutdown, 1)
return return
default: default:
} }
@ -777,8 +794,9 @@ func (ms *msgStream) msgConsumer() {
// grow indefinitely. // grow indefinitely.
select { select {
case ms.producerSema <- struct{}{}: case ms.producerSema <- struct{}{}:
case <-ms.peer.quit:
return
case <-ms.quit: case <-ms.quit:
atomic.StoreInt32(&ms.streamShutdown, 1)
return return
} }
} }
@ -837,13 +855,29 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
// to the other side, they immediately send a // to the other side, they immediately send a
// channel update message, but we haven't yet // channel update message, but we haven't yet
// sent the channel to the channelManager. // sent the channel to the channelManager.
p.server.fundingMgr.waitUntilChannelOpen(cid) err := p.server.fundingMgr.waitUntilChannelOpen(
cid, p.quit,
)
if err != nil {
// If we have a non-nil error, then the
// funding manager is shutting down, s
// we can exit here without attempting
// to deliver the message.
return
}
} }
// TODO(roasbeef): only wait if not chan sync // In order to avoid unnecessarily delivering message
// as the peer is exiting, we'll check quickly to see
// if we need to exit.
select {
case <-p.quit:
return
default:
}
// Dispatch the commitment update message to the proper active // Dispatch the commitment update message to the proper
// goroutine dedicated to this channel. // active goroutine dedicated to this channel.
if chanLink == nil { if chanLink == nil {
link, err := p.server.htlcSwitch.GetLink(cid) link, err := p.server.htlcSwitch.GetLink(cid)
if err != nil { if err != nil {
@ -854,6 +888,15 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
chanLink = link chanLink = link
} }
// In order to avoid unnecessarily delivering message
// as the peer is exiting, we'll check quickly to see
// if we need to exit.
select {
case <-p.quit:
return
default:
}
chanLink.HandleChannelUpdate(msg) chanLink.HandleChannelUpdate(msg)
}, },
) )
@ -878,6 +921,7 @@ func newDiscMsgStream(p *peer) *msgStream {
// //
// NOTE: This method MUST be run as a goroutine. // NOTE: This method MUST be run as a goroutine.
func (p *peer) readHandler() { func (p *peer) readHandler() {
defer p.wg.Done()
// We'll stop the timer after a new messages is received, and also // We'll stop the timer after a new messages is received, and also
// reset it after we process the next message. // reset it after we process the next message.
@ -1056,6 +1100,7 @@ out:
chanStream = newChanMsgStream(p, targetChan) chanStream = newChanMsgStream(p, targetChan)
chanMsgStreams[targetChan] = chanStream chanMsgStreams[targetChan] = chanStream
chanStream.Start() chanStream.Start()
defer chanStream.Stop()
} }
// With the stream obtained, add the message to the // With the stream obtained, add the message to the
@ -1066,16 +1111,8 @@ out:
idleTimer.Reset(idleTimeout) idleTimer.Reset(idleTimeout)
} }
p.wg.Done()
p.Disconnect(errors.New("read handler closed")) p.Disconnect(errors.New("read handler closed"))
for cid, chanStream := range chanMsgStreams {
chanStream.Stop()
delete(chanMsgStreams, cid)
}
peerLog.Tracef("readHandler for peer %v done", p) peerLog.Tracef("readHandler for peer %v done", p)
} }

@ -3090,6 +3090,13 @@ func (s *server) watchChannelStatus() {
// the status of closed channels around. // the status of closed channels around.
newStatus := make(map[wire.OutPoint]activeStatus) newStatus := make(map[wire.OutPoint]activeStatus)
for _, c := range channels { for _, c := range channels {
// We'll skip any private channels, as they
// aren't used for routing within the network
// by other nodes.
if c.ChannelFlags&lnwire.FFAnnounceChannel == 0 {
continue
}
chanID := lnwire.NewChanIDFromOutPoint( chanID := lnwire.NewChanIDFromOutPoint(
&c.FundingOutpoint) &c.FundingOutpoint)