diff --git a/peer.go b/peer.go index 517f4bfe..e141cd6c 100644 --- a/peer.go +++ b/peer.go @@ -29,6 +29,9 @@ import ( var ( numNodes int32 + + // ErrPeerExiting signals that the peer received a disconnect request. + ErrPeerExiting = errors.Errorf("peer exiting") ) const ( @@ -1104,7 +1107,7 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) { func (p *peer) writeMessage(msg lnwire.Message) error { // Simply exit if we're shutting down. if atomic.LoadInt32(&p.disconnect) != 0 { - return nil + return ErrPeerExiting } // TODO(roasbeef): add message summaries @@ -1153,8 +1156,8 @@ out: atomic.StoreInt64(&p.pingLastSend, now) } - // Write out the message to the socket, closing the - // 'sentChan' if it's non-nil, The 'sentChan' allows + // Write out the message to the socket, responding with + // error if `errChan` is non-nil. The `errChan` allows // callers to optionally synchronize sends with the // writeHandler. err := p.writeMessage(outMsg.msg) @@ -1168,7 +1171,7 @@ out: } case <-p.quit: - exitErr = errors.Errorf("peer exiting") + exitErr = ErrPeerExiting break out } } @@ -1262,7 +1265,7 @@ func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) { case <-p.quit: peerLog.Tracef("Peer shutting down, could not enqueue msg.") if errChan != nil { - errChan <- fmt.Errorf("peer shutting down") + errChan <- ErrPeerExiting } } } diff --git a/server.go b/server.go index 3f5cbe94..945fd9a6 100644 --- a/server.go +++ b/server.go @@ -1092,7 +1092,7 @@ func (s *server) SendToPeer(target *btcec.PublicKey, case err := <-errChan: return err case <-targetPeer.quit: - return fmt.Errorf("peer shutting down") + return ErrPeerExiting case <-s.quit: return ErrServerShuttingDown } @@ -1184,7 +1184,8 @@ func (s *server) sendPeerMessages( // event, we defer a call to Done on both WaitGroups to 1) ensure that // server will be able to shutdown after its go routines exit, and 2) // so the server can return to the caller of BroadcastMessage. - if wg != nil { + isBroadcast := wg != nil + if isBroadcast { defer s.wg.Done() defer wg.Done() } @@ -1194,9 +1195,16 @@ func (s *server) sendPeerMessages( // the queue. var errChans []chan error for _, msg := range msgs { - errChan := make(chan error, 1) + // If this is not broadcast, create error channels to provide + // synchronous feedback regarding the delivery of the message to + // a specific peer. + var errChan chan error + if !isBroadcast { + errChan = make(chan error, 1) + errChans = append(errChans, errChan) + } + targetPeer.queueMsg(msg, errChan) - errChans = append(errChans, errChan) } return errChans