peer: adds tracking of go routines to sync disconnect

In addition to improved synchronization between the client
  and server, this commit also moves the channel snapshotting
  procedure such that it is handled without submitting a query
  to the primary select statement. This is primarily done as a
  precaution to ensure that no deadlocks occur, has channel
  snapshotting has the potential to block restarts.
This commit is contained in:
Conner Fromknecht 2017-08-08 16:51:19 -07:00 committed by Olaoluwa Osuntokun
parent 91d6b0492e
commit efd9cf12b8

69
peer.go

@ -111,16 +111,11 @@ type peer struct {
// objects to queue messages to be sent out on the wire. // objects to queue messages to be sent out on the wire.
outgoingQueue chan outgoinMsg outgoingQueue chan outgoinMsg
// sendQueueSync is used as a semaphore to synchronize writes between
// the writeHandler and the queueHandler.
sendQueueSync chan struct{}
// activeChannels is a map which stores the state machines of all // activeChannels is a map which stores the state machines of all
// active channels. Channels are indexed into the map by the txid of // active channels. Channels are indexed into the map by the txid of
// the funding transaction which opened the channel. // the funding transaction which opened the channel.
activeChanMtx sync.RWMutex activeChanMtx sync.RWMutex
activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel
chanSnapshotReqs chan *chanSnapshotReq
// newChannels is used by the fundingManager to send fully opened // newChannels is used by the fundingManager to send fully opened
// channels to the source peer which handled the funding workflow. // channels to the source peer which handled the funding workflow.
@ -172,13 +167,11 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
server: server, server: server,
sendQueueSync: make(chan struct{}, 1), sendQueue: make(chan outgoinMsg),
sendQueue: make(chan outgoinMsg, 1), outgoingQueue: make(chan outgoinMsg),
outgoingQueue: make(chan outgoinMsg, outgoingQueueLen),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
chanSnapshotReqs: make(chan *chanSnapshotReq), newChannels: make(chan *newChannelMsg, 1),
newChannels: make(chan *newChannelMsg, 1),
localCloseChanReqs: make(chan *htlcswitch.ChanClose), localCloseChanReqs: make(chan *htlcswitch.ChanClose),
shutdownChanReqs: make(chan *lnwire.Shutdown), shutdownChanReqs: make(chan *lnwire.Shutdown),
@ -215,7 +208,10 @@ func (p *peer) Start() error {
// message MUST be sent before any other message. // message MUST be sent before any other message.
readErr := make(chan error, 1) readErr := make(chan error, 1)
msgChan := make(chan lnwire.Message, 1) msgChan := make(chan lnwire.Message, 1)
p.wg.Add(1)
go func() { go func() {
defer p.wg.Done()
msg, err := p.readNextMessage() msg, err := p.readNextMessage()
if err != nil { if err != nil {
readErr <- err readErr <- err
@ -360,6 +356,8 @@ func (p *peer) Disconnect(reason error) {
p.conn.Close() p.conn.Close()
close(p.quit) close(p.quit)
p.wg.Wait()
} }
// String returns the string representation of this peer. // String returns the string representation of this peer.
@ -419,6 +417,7 @@ type chanMsgStream struct {
mtx sync.Mutex mtx sync.Mutex
wg sync.WaitGroup
quit chan struct{} quit chan struct{}
} }
@ -441,6 +440,7 @@ func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer,
// Start starts the chanMsgStream. // Start starts the chanMsgStream.
func (c *chanMsgStream) Start() { func (c *chanMsgStream) Start() {
c.wg.Add(1)
go c.msgConsumer() go c.msgConsumer()
} }
@ -452,11 +452,15 @@ func (c *chanMsgStream) Stop() {
// Wake up the msgConsumer is we've been signalled to exit. // Wake up the msgConsumer is we've been signalled to exit.
c.msgCond.Signal() c.msgCond.Signal()
c.wg.Wait()
} }
// msgConsumer is the main goroutine that streams messages from the peer's // msgConsumer is the main goroutine that streams messages from the peer's
// readHandler directly to the target channel. // readHandler directly to the target channel.
func (c *chanMsgStream) msgConsumer() { func (c *chanMsgStream) msgConsumer() {
defer c.wg.Done()
peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:]) peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:])
for { for {
@ -529,6 +533,8 @@ func (c *chanMsgStream) AddMsg(msg lnwire.Message) {
// //
// NOTE: This method MUST be run as a goroutine. // NOTE: This method MUST be run as a goroutine.
func (p *peer) readHandler() { func (p *peer) readHandler() {
defer p.wg.Done()
chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream) chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream)
out: out:
for atomic.LoadInt32(&p.disconnect) == 0 { for atomic.LoadInt32(&p.disconnect) == 0 {
@ -651,7 +657,6 @@ out:
delete(chanMsgStreams, cid) delete(chanMsgStreams, cid)
} }
p.wg.Done()
peerLog.Tracef("readHandler for peer %v done", p) peerLog.Tracef("readHandler for peer %v done", p)
} }
@ -820,6 +825,8 @@ func (p *peer) queueHandler() {
// //
// NOTE: This method MUST be run as a goroutine. // NOTE: This method MUST be run as a goroutine.
func (p *peer) pingHandler() { func (p *peer) pingHandler() {
defer p.wg.Done()
pingTicker := time.NewTicker(pingInterval) pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop() defer pingTicker.Stop()
@ -835,8 +842,6 @@ out:
break out break out
} }
} }
p.wg.Done()
} }
// PingTime returns the estimated ping time to the peer in microseconds. // PingTime returns the estimated ping time to the peer in microseconds.
@ -857,9 +862,16 @@ func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) {
// ChannelSnapshots returns a slice of channel snapshots detailing all // ChannelSnapshots returns a slice of channel snapshots detailing all
// currently active channels maintained with the remote peer. // currently active channels maintained with the remote peer.
func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot {
resp := make(chan []*channeldb.ChannelSnapshot, 1) p.activeChanMtx.RLock()
p.chanSnapshotReqs <- &chanSnapshotReq{resp} defer p.activeChanMtx.RUnlock()
return <-resp
snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels))
for _, activeChan := range p.activeChannels {
snapshot := activeChan.StateSnapshot()
snapshots = append(snapshots, snapshot)
}
return snapshots
} }
// closingScripts are the set of clsoign deslivery scripts for each party. This // closingScripts are the set of clsoign deslivery scripts for each party. This
@ -877,6 +889,8 @@ type closingScripts struct {
// //
// NOTE: This method MUST be run as a goroutine. // NOTE: This method MUST be run as a goroutine.
func (p *peer) channelManager() { func (p *peer) channelManager() {
defer p.wg.Done()
// chanShutdowns is a map of channels for which our node has initiated // chanShutdowns is a map of channels for which our node has initiated
// a cooperative channel close. When an lnwire.Shutdown is received, // a cooperative channel close. When an lnwire.Shutdown is received,
// this allows the node to determine the next step to be taken in the // this allows the node to determine the next step to be taken in the
@ -907,17 +921,6 @@ func (p *peer) channelManager() {
out: out:
for { for {
select { select {
case req := <-p.chanSnapshotReqs:
p.activeChanMtx.RLock()
snapshots := make([]*channeldb.ChannelSnapshot, 0,
len(p.activeChannels))
for _, activeChan := range p.activeChannels {
snapshot := activeChan.StateSnapshot()
snapshots = append(snapshots, snapshot)
}
p.activeChanMtx.RUnlock()
req.resp <- snapshots
// A new channel has arrived which means we've just completed a // A new channel has arrived which means we've just completed a
// funding workflow. We'll initialize the necessary local // funding workflow. We'll initialize the necessary local
// state, and notify the htlc switch of a new link. // state, and notify the htlc switch of a new link.
@ -1117,8 +1120,6 @@ out:
break out break out
} }
} }
p.wg.Done()
} }
// handleLocalClose kicks-off the workflow to execute a cooperative or forced // handleLocalClose kicks-off the workflow to execute a cooperative or forced
@ -1356,7 +1357,6 @@ func (p *peer) handleInitClosingSigned(req *htlcswitch.ChanClose,
notifier := p.server.cc.chainNotifier notifier := p.server.cc.chainNotifier
go waitForChanToClose(uint32(bestHeight), notifier, req.Err, go waitForChanToClose(uint32(bestHeight), notifier, req.Err,
req.ChanPoint, &closingTxid, func() { req.ChanPoint, &closingTxid, func() {
// First, we'll mark the database as being fully closed // First, we'll mark the database as being fully closed
// so we'll no longer watch for its ultimate closure // so we'll no longer watch for its ultimate closure
// upon startup. // upon startup.
@ -1463,7 +1463,8 @@ func (p *peer) handleResponseClosingSigned(msg *lnwire.ClosingSigned,
// upon startup. // upon startup.
err := p.server.chanDB.MarkChanFullyClosed(chanPoint) err := p.server.chanDB.MarkChanFullyClosed(chanPoint)
if err != nil { if err != nil {
peerLog.Errorf("unable to mark channel as closed: %v", err) peerLog.Errorf("unable to mark channel "+
"as closed: %v", err)
return return
} }
}, },