From efd9cf12b855664f351049c39edfe99a7acc2200 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 8 Aug 2017 16:51:19 -0700 Subject: [PATCH] peer: adds tracking of go routines to sync disconnect In addition to improved synchronization between the client and server, this commit also moves the channel snapshotting procedure such that it is handled without submitting a query to the primary select statement. This is primarily done as a precaution to ensure that no deadlocks occur, has channel snapshotting has the potential to block restarts. --- peer.go | 69 +++++++++++++++++++++++++++++---------------------------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/peer.go b/peer.go index 5007bb2e..728ea02f 100644 --- a/peer.go +++ b/peer.go @@ -111,16 +111,11 @@ type peer struct { // objects to queue messages to be sent out on the wire. outgoingQueue chan outgoinMsg - // sendQueueSync is used as a semaphore to synchronize writes between - // the writeHandler and the queueHandler. - sendQueueSync chan struct{} - // activeChannels is a map which stores the state machines of all // active channels. Channels are indexed into the map by the txid of // the funding transaction which opened the channel. - activeChanMtx sync.RWMutex - activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel - chanSnapshotReqs chan *chanSnapshotReq + activeChanMtx sync.RWMutex + activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel // newChannels is used by the fundingManager to send fully opened // channels to the source peer which handled the funding workflow. @@ -172,13 +167,11 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, server: server, - sendQueueSync: make(chan struct{}, 1), - sendQueue: make(chan outgoinMsg, 1), - outgoingQueue: make(chan outgoinMsg, outgoingQueueLen), + sendQueue: make(chan outgoinMsg), + outgoingQueue: make(chan outgoinMsg), - activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), - chanSnapshotReqs: make(chan *chanSnapshotReq), - newChannels: make(chan *newChannelMsg, 1), + activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), + newChannels: make(chan *newChannelMsg, 1), localCloseChanReqs: make(chan *htlcswitch.ChanClose), shutdownChanReqs: make(chan *lnwire.Shutdown), @@ -215,7 +208,10 @@ func (p *peer) Start() error { // message MUST be sent before any other message. readErr := make(chan error, 1) msgChan := make(chan lnwire.Message, 1) + p.wg.Add(1) go func() { + defer p.wg.Done() + msg, err := p.readNextMessage() if err != nil { readErr <- err @@ -360,6 +356,8 @@ func (p *peer) Disconnect(reason error) { p.conn.Close() close(p.quit) + + p.wg.Wait() } // String returns the string representation of this peer. @@ -419,6 +417,7 @@ type chanMsgStream struct { mtx sync.Mutex + wg sync.WaitGroup quit chan struct{} } @@ -441,6 +440,7 @@ func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer, // Start starts the chanMsgStream. func (c *chanMsgStream) Start() { + c.wg.Add(1) go c.msgConsumer() } @@ -452,11 +452,15 @@ func (c *chanMsgStream) Stop() { // Wake up the msgConsumer is we've been signalled to exit. c.msgCond.Signal() + + c.wg.Wait() } // msgConsumer is the main goroutine that streams messages from the peer's // readHandler directly to the target channel. func (c *chanMsgStream) msgConsumer() { + defer c.wg.Done() + peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:]) for { @@ -529,6 +533,8 @@ func (c *chanMsgStream) AddMsg(msg lnwire.Message) { // // NOTE: This method MUST be run as a goroutine. func (p *peer) readHandler() { + defer p.wg.Done() + chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream) out: for atomic.LoadInt32(&p.disconnect) == 0 { @@ -651,7 +657,6 @@ out: delete(chanMsgStreams, cid) } - p.wg.Done() peerLog.Tracef("readHandler for peer %v done", p) } @@ -820,6 +825,8 @@ func (p *peer) queueHandler() { // // NOTE: This method MUST be run as a goroutine. func (p *peer) pingHandler() { + defer p.wg.Done() + pingTicker := time.NewTicker(pingInterval) defer pingTicker.Stop() @@ -835,8 +842,6 @@ out: break out } } - - p.wg.Done() } // PingTime returns the estimated ping time to the peer in microseconds. @@ -857,9 +862,16 @@ func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { // ChannelSnapshots returns a slice of channel snapshots detailing all // currently active channels maintained with the remote peer. func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { - resp := make(chan []*channeldb.ChannelSnapshot, 1) - p.chanSnapshotReqs <- &chanSnapshotReq{resp} - return <-resp + p.activeChanMtx.RLock() + defer p.activeChanMtx.RUnlock() + + snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels)) + for _, activeChan := range p.activeChannels { + snapshot := activeChan.StateSnapshot() + snapshots = append(snapshots, snapshot) + } + + return snapshots } // closingScripts are the set of clsoign deslivery scripts for each party. This @@ -877,6 +889,8 @@ type closingScripts struct { // // NOTE: This method MUST be run as a goroutine. func (p *peer) channelManager() { + defer p.wg.Done() + // chanShutdowns is a map of channels for which our node has initiated // a cooperative channel close. When an lnwire.Shutdown is received, // this allows the node to determine the next step to be taken in the @@ -907,17 +921,6 @@ func (p *peer) channelManager() { out: for { select { - case req := <-p.chanSnapshotReqs: - p.activeChanMtx.RLock() - snapshots := make([]*channeldb.ChannelSnapshot, 0, - len(p.activeChannels)) - for _, activeChan := range p.activeChannels { - snapshot := activeChan.StateSnapshot() - snapshots = append(snapshots, snapshot) - } - p.activeChanMtx.RUnlock() - req.resp <- snapshots - // A new channel has arrived which means we've just completed a // funding workflow. We'll initialize the necessary local // state, and notify the htlc switch of a new link. @@ -1117,8 +1120,6 @@ out: break out } } - - p.wg.Done() } // handleLocalClose kicks-off the workflow to execute a cooperative or forced @@ -1356,7 +1357,6 @@ func (p *peer) handleInitClosingSigned(req *htlcswitch.ChanClose, notifier := p.server.cc.chainNotifier go waitForChanToClose(uint32(bestHeight), notifier, req.Err, req.ChanPoint, &closingTxid, func() { - // First, we'll mark the database as being fully closed // so we'll no longer watch for its ultimate closure // upon startup. @@ -1463,7 +1463,8 @@ func (p *peer) handleResponseClosingSigned(msg *lnwire.ClosingSigned, // upon startup. err := p.server.chanDB.MarkChanFullyClosed(chanPoint) if err != nil { - peerLog.Errorf("unable to mark channel as closed: %v", err) + peerLog.Errorf("unable to mark channel "+ + "as closed: %v", err) return } },