Merge pull request #1199 from cfromknecht/queue-msg-err-chans

peer/server: Remove Broadcast err chans
This commit is contained in:
Olaoluwa Osuntokun 2018-05-10 16:55:57 -07:00 committed by GitHub
commit b271ed5ffb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 9 deletions

13
peer.go

@ -29,6 +29,9 @@ import (
var ( var (
numNodes int32 numNodes int32
// ErrPeerExiting signals that the peer received a disconnect request.
ErrPeerExiting = errors.Errorf("peer exiting")
) )
const ( const (
@ -1104,7 +1107,7 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) {
func (p *peer) writeMessage(msg lnwire.Message) error { func (p *peer) writeMessage(msg lnwire.Message) error {
// Simply exit if we're shutting down. // Simply exit if we're shutting down.
if atomic.LoadInt32(&p.disconnect) != 0 { if atomic.LoadInt32(&p.disconnect) != 0 {
return nil return ErrPeerExiting
} }
// TODO(roasbeef): add message summaries // TODO(roasbeef): add message summaries
@ -1153,8 +1156,8 @@ out:
atomic.StoreInt64(&p.pingLastSend, now) atomic.StoreInt64(&p.pingLastSend, now)
} }
// Write out the message to the socket, closing the // Write out the message to the socket, responding with
// 'sentChan' if it's non-nil, The 'sentChan' allows // error if `errChan` is non-nil. The `errChan` allows
// callers to optionally synchronize sends with the // callers to optionally synchronize sends with the
// writeHandler. // writeHandler.
err := p.writeMessage(outMsg.msg) err := p.writeMessage(outMsg.msg)
@ -1168,7 +1171,7 @@ out:
} }
case <-p.quit: case <-p.quit:
exitErr = errors.Errorf("peer exiting") exitErr = ErrPeerExiting
break out break out
} }
} }
@ -1262,7 +1265,7 @@ func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) {
case <-p.quit: case <-p.quit:
peerLog.Tracef("Peer shutting down, could not enqueue msg.") peerLog.Tracef("Peer shutting down, could not enqueue msg.")
if errChan != nil { if errChan != nil {
errChan <- fmt.Errorf("peer shutting down") errChan <- ErrPeerExiting
} }
} }
} }

@ -1092,7 +1092,7 @@ func (s *server) SendToPeer(target *btcec.PublicKey,
case err := <-errChan: case err := <-errChan:
return err return err
case <-targetPeer.quit: case <-targetPeer.quit:
return fmt.Errorf("peer shutting down") return ErrPeerExiting
case <-s.quit: case <-s.quit:
return ErrServerShuttingDown return ErrServerShuttingDown
} }
@ -1184,7 +1184,8 @@ func (s *server) sendPeerMessages(
// event, we defer a call to Done on both WaitGroups to 1) ensure that // event, we defer a call to Done on both WaitGroups to 1) ensure that
// server will be able to shutdown after its go routines exit, and 2) // server will be able to shutdown after its go routines exit, and 2)
// so the server can return to the caller of BroadcastMessage. // so the server can return to the caller of BroadcastMessage.
if wg != nil { isBroadcast := wg != nil
if isBroadcast {
defer s.wg.Done() defer s.wg.Done()
defer wg.Done() defer wg.Done()
} }
@ -1194,11 +1195,18 @@ func (s *server) sendPeerMessages(
// the queue. // the queue.
var errChans []chan error var errChans []chan error
for _, msg := range msgs { for _, msg := range msgs {
errChan := make(chan error, 1) // If this is not broadcast, create error channels to provide
targetPeer.queueMsg(msg, errChan) // synchronous feedback regarding the delivery of the message to
// a specific peer.
var errChan chan error
if !isBroadcast {
errChan = make(chan error, 1)
errChans = append(errChans, errChan) errChans = append(errChans, errChan)
} }
targetPeer.queueMsg(msg, errChan)
}
return errChans return errChans
} }