Merge pull request #2690 from cfromknecht/hwsc-fndg-priority-queue
peer: deprioritize gossip traffic
This commit is contained in:
commit
c853555d1b
@ -358,7 +358,7 @@ func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error {
|
|||||||
|
|
||||||
// With all the announcement messages gathered, send them all in a
|
// With all the announcement messages gathered, send them all in a
|
||||||
// single batch to the target peer.
|
// single batch to the target peer.
|
||||||
return syncPeer.SendMessage(false, announceMessages...)
|
return syncPeer.SendMessageLazy(false, announceMessages...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the
|
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the
|
||||||
@ -1111,7 +1111,7 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
|
|||||||
encodingType: encoding,
|
encodingType: encoding,
|
||||||
chunkSize: encodingTypeToChunkSize[encoding],
|
chunkSize: encodingTypeToChunkSize[encoding],
|
||||||
sendToPeer: func(msgs ...lnwire.Message) error {
|
sendToPeer: func(msgs ...lnwire.Message) error {
|
||||||
return syncPeer.SendMessage(false, msgs...)
|
return syncPeer.SendMessageLazy(false, msgs...)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
copy(syncer.peerPub[:], nodeID[:])
|
copy(syncer.peerPub[:], nodeID[:])
|
||||||
|
@ -35,6 +35,11 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *mockPeer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
|
||||||
|
return p.SendMessage(sync, msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error {
|
func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -170,6 +170,10 @@ func (n *testNode) SendMessage(_ bool, msg ...lnwire.Message) error {
|
|||||||
return n.sendMessage(msg[0])
|
return n.sendMessage(msg[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *testNode) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
|
||||||
|
return n.SendMessage(sync, msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *testNode) WipeChannel(_ *wire.OutPoint) error {
|
func (n *testNode) WipeChannel(_ *wire.OutPoint) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1549,6 +1549,9 @@ func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (m *mockPeer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
|
||||||
|
return m.SendMessage(sync, msgs...)
|
||||||
|
}
|
||||||
func (m *mockPeer) AddNewChannel(_ *channeldb.OpenChannel,
|
func (m *mockPeer) AddNewChannel(_ *channeldb.OpenChannel,
|
||||||
_ <-chan struct{}) error {
|
_ <-chan struct{}) error {
|
||||||
return nil
|
return nil
|
||||||
|
@ -502,6 +502,10 @@ func (s *mockServer) SendMessage(sync bool, msgs ...lnwire.Message) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *mockServer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
func (s *mockServer) readHandler(message lnwire.Message) error {
|
func (s *mockServer) readHandler(message lnwire.Message) error {
|
||||||
var targetChan lnwire.ChannelID
|
var targetChan lnwire.ChannelID
|
||||||
|
|
||||||
|
@ -12,10 +12,17 @@ import (
|
|||||||
// Peer is an interface which represents the remote lightning node inside our
|
// Peer is an interface which represents the remote lightning node inside our
|
||||||
// system.
|
// system.
|
||||||
type Peer interface {
|
type Peer interface {
|
||||||
// SendMessage sends a variadic number of message to remote peer. The
|
// SendMessage sends a variadic number of high-priority message to
|
||||||
// first argument denotes if the method should block until the message
|
// remote peer. The first argument denotes if the method should block
|
||||||
// has been sent to the remote peer.
|
// until the messages have been sent to the remote peer or an error is
|
||||||
SendMessage(sync bool, msg ...lnwire.Message) error
|
// returned, otherwise it returns immediately after queuing.
|
||||||
|
SendMessage(sync bool, msgs ...lnwire.Message) error
|
||||||
|
|
||||||
|
// SendMessageLazy sends a variadic number of low-priority message to
|
||||||
|
// remote peer. The first argument denotes if the method should block
|
||||||
|
// until the messages have been sent to the remote peer or an error is
|
||||||
|
// returned, otherwise it returns immediately after queueing.
|
||||||
|
SendMessageLazy(sync bool, msgs ...lnwire.Message) error
|
||||||
|
|
||||||
// AddNewChannel adds a new channel to the peer. The channel should fail
|
// AddNewChannel adds a new channel to the peer. The channel should fail
|
||||||
// to be added if the cancel channel is closed.
|
// to be added if the cancel channel is closed.
|
||||||
|
105
peer.go
105
peer.go
@ -64,8 +64,9 @@ const (
|
|||||||
// a buffered channel which will be sent upon once the write is complete. This
|
// a buffered channel which will be sent upon once the write is complete. This
|
||||||
// buffered channel acts as a semaphore to be used for synchronization purposes.
|
// buffered channel acts as a semaphore to be used for synchronization purposes.
|
||||||
type outgoingMsg struct {
|
type outgoingMsg struct {
|
||||||
msg lnwire.Message
|
priority bool
|
||||||
errChan chan error // MUST be buffered.
|
msg lnwire.Message
|
||||||
|
errChan chan error // MUST be buffered.
|
||||||
}
|
}
|
||||||
|
|
||||||
// newChannelMsg packages a channeldb.OpenChannel with a channel that allows
|
// newChannelMsg packages a channeldb.OpenChannel with a channel that allows
|
||||||
@ -1479,24 +1480,45 @@ out:
|
|||||||
func (p *peer) queueHandler() {
|
func (p *peer) queueHandler() {
|
||||||
defer p.wg.Done()
|
defer p.wg.Done()
|
||||||
|
|
||||||
// pendingMsgs will hold all messages waiting to be added
|
// priorityMsgs holds an in order list of messages deemed high-priority
|
||||||
// to the sendQueue.
|
// to be added to the sendQueue. This predominately includes messages
|
||||||
pendingMsgs := list.New()
|
// from the funding manager and htlcswitch.
|
||||||
|
priorityMsgs := list.New()
|
||||||
|
|
||||||
|
// lazyMsgs holds an in order list of messages deemed low-priority to be
|
||||||
|
// added to the sendQueue only after all high-priority messages have
|
||||||
|
// been queued. This predominately includes messages from the gossiper.
|
||||||
|
lazyMsgs := list.New()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Examine the front of the queue.
|
// Examine the front of the priority queue, if it is empty check
|
||||||
elem := pendingMsgs.Front()
|
// the low priority queue.
|
||||||
|
elem := priorityMsgs.Front()
|
||||||
|
if elem == nil {
|
||||||
|
elem = lazyMsgs.Front()
|
||||||
|
}
|
||||||
|
|
||||||
if elem != nil {
|
if elem != nil {
|
||||||
|
front := elem.Value.(outgoingMsg)
|
||||||
|
|
||||||
// There's an element on the queue, try adding
|
// There's an element on the queue, try adding
|
||||||
// it to the sendQueue. We also watch for
|
// it to the sendQueue. We also watch for
|
||||||
// messages on the outgoingQueue, in case the
|
// messages on the outgoingQueue, in case the
|
||||||
// writeHandler cannot accept messages on the
|
// writeHandler cannot accept messages on the
|
||||||
// sendQueue.
|
// sendQueue.
|
||||||
select {
|
select {
|
||||||
case p.sendQueue <- elem.Value.(outgoingMsg):
|
case p.sendQueue <- front:
|
||||||
pendingMsgs.Remove(elem)
|
if front.priority {
|
||||||
|
priorityMsgs.Remove(elem)
|
||||||
|
} else {
|
||||||
|
lazyMsgs.Remove(elem)
|
||||||
|
}
|
||||||
case msg := <-p.outgoingQueue:
|
case msg := <-p.outgoingQueue:
|
||||||
pendingMsgs.PushBack(msg)
|
if msg.priority {
|
||||||
|
priorityMsgs.PushBack(msg)
|
||||||
|
} else {
|
||||||
|
lazyMsgs.PushBack(msg)
|
||||||
|
}
|
||||||
case <-p.quit:
|
case <-p.quit:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1506,7 +1528,11 @@ func (p *peer) queueHandler() {
|
|||||||
// into the queue from outside sub-systems.
|
// into the queue from outside sub-systems.
|
||||||
select {
|
select {
|
||||||
case msg := <-p.outgoingQueue:
|
case msg := <-p.outgoingQueue:
|
||||||
pendingMsgs.PushBack(msg)
|
if msg.priority {
|
||||||
|
priorityMsgs.PushBack(msg)
|
||||||
|
} else {
|
||||||
|
lazyMsgs.PushBack(msg)
|
||||||
|
}
|
||||||
case <-p.quit:
|
case <-p.quit:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1544,13 +1570,26 @@ func (p *peer) PingTime() int64 {
|
|||||||
return atomic.LoadInt64(&p.pingTime)
|
return atomic.LoadInt64(&p.pingTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// queueMsg queues a new lnwire.Message to be eventually sent out on the
|
// queueMsg adds the lnwire.Message to the back of the high priority send queue.
|
||||||
// wire. It returns an error if we failed to queue the message. An error
|
// If the errChan is non-nil, an error is sent back if the msg failed to queue
|
||||||
// is sent on errChan if the message fails being sent to the peer, or
|
// or failed to write, and nil otherwise.
|
||||||
// nil otherwise.
|
|
||||||
func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) {
|
func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) {
|
||||||
|
p.queue(true, msg, errChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// queueMsgLazy adds the lnwire.Message to the back of the low priority send
|
||||||
|
// queue. If the errChan is non-nil, an error is sent back if the msg failed to
|
||||||
|
// queue or failed to write, and nil otherwise.
|
||||||
|
func (p *peer) queueMsgLazy(msg lnwire.Message, errChan chan error) {
|
||||||
|
p.queue(false, msg, errChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// queue sends a given message to the queueHandler using the passed priority. If
|
||||||
|
// the errChan is non-nil, an error is sent back if the msg failed to queue or
|
||||||
|
// failed to write, and nil otherwise.
|
||||||
|
func (p *peer) queue(priority bool, msg lnwire.Message, errChan chan error) {
|
||||||
select {
|
select {
|
||||||
case p.outgoingQueue <- outgoingMsg{msg, errChan}:
|
case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}:
|
||||||
case <-p.quit:
|
case <-p.quit:
|
||||||
peerLog.Tracef("Peer shutting down, could not enqueue msg.")
|
peerLog.Tracef("Peer shutting down, could not enqueue msg.")
|
||||||
if errChan != nil {
|
if errChan != nil {
|
||||||
@ -2312,16 +2351,38 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendMessage sends a variadic number of message to remote peer. The first
|
// SendMessage sends a variadic number of high-priority message to remote peer.
|
||||||
// argument denotes if the method should block until the message has been sent
|
// The first argument denotes if the method should block until the messages have
|
||||||
// to the remote peer.
|
// been sent to the remote peer or an error is returned, otherwise it returns
|
||||||
|
// immediately after queuing.
|
||||||
//
|
//
|
||||||
// NOTE: Part of the lnpeer.Peer interface.
|
// NOTE: Part of the lnpeer.Peer interface.
|
||||||
func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error {
|
func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error {
|
||||||
|
return p.sendMessage(sync, true, msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendMessageLazy sends a variadic number of low-priority message to remote
|
||||||
|
// peer. The first argument denotes if the method should block until the
|
||||||
|
// messages have been sent to the remote peer or an error is returned, otherwise
|
||||||
|
// it returns immediately after queueing.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the lnpeer.Peer interface.
|
||||||
|
func (p *peer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
|
||||||
|
return p.sendMessage(sync, false, msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendMessage queues a variadic number of messages using the passed priority
|
||||||
|
// to the remote peer. If sync is true, this method will block until the
|
||||||
|
// messages have been sent to the remote peer or an error is returned, otherwise
|
||||||
|
// it returns immediately after queueing.
|
||||||
|
func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error {
|
||||||
// Add all incoming messages to the outgoing queue. A list of error
|
// Add all incoming messages to the outgoing queue. A list of error
|
||||||
// chans is populated for each message if the caller requested a sync
|
// chans is populated for each message if the caller requested a sync
|
||||||
// send.
|
// send.
|
||||||
var errChans []chan error
|
var errChans []chan error
|
||||||
|
if sync {
|
||||||
|
errChans = make([]chan error, 0, len(msgs))
|
||||||
|
}
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
// If a sync send was requested, create an error chan to listen
|
// If a sync send was requested, create an error chan to listen
|
||||||
// for an ack from the writeHandler.
|
// for an ack from the writeHandler.
|
||||||
@ -2331,7 +2392,11 @@ func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error {
|
|||||||
errChans = append(errChans, errChan)
|
errChans = append(errChans, errChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.queueMsg(msg, errChan)
|
if priority {
|
||||||
|
p.queueMsg(msg, errChan)
|
||||||
|
} else {
|
||||||
|
p.queueMsgLazy(msg, errChan)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all replies from the writeHandler. For async sends, this
|
// Wait for all replies from the writeHandler. For async sends, this
|
||||||
|
106
server.go
106
server.go
@ -1884,6 +1884,8 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
|
|||||||
|
|
||||||
// BroadcastMessage sends a request to the server to broadcast a set of
|
// BroadcastMessage sends a request to the server to broadcast a set of
|
||||||
// messages to all peers other than the one specified by the `skips` parameter.
|
// messages to all peers other than the one specified by the `skips` parameter.
|
||||||
|
// All messages sent via BroadcastMessage will be queued for lazy delivery to
|
||||||
|
// the target peers.
|
||||||
//
|
//
|
||||||
// NOTE: This function is safe for concurrent access.
|
// NOTE: This function is safe for concurrent access.
|
||||||
func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
|
func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
|
||||||
@ -1916,7 +1918,12 @@ func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
|
|||||||
// Dispatch a go routine to enqueue all messages to this peer.
|
// Dispatch a go routine to enqueue all messages to this peer.
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.sendPeerMessages(sPeer, msgs, &wg)
|
go func(p lnpeer.Peer) {
|
||||||
|
defer s.wg.Done()
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
p.SendMessageLazy(false, msgs...)
|
||||||
|
}(sPeer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all messages to have been dispatched before returning to
|
// Wait for all messages to have been dispatched before returning to
|
||||||
@ -1926,53 +1933,6 @@ func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendToPeer send a message to the server telling it to send the specific set
|
|
||||||
// of message to a particular peer. If the peer connect be found, then this
|
|
||||||
// method will return a non-nil error.
|
|
||||||
//
|
|
||||||
// NOTE: This function is safe for concurrent access.
|
|
||||||
func (s *server) SendToPeer(target *btcec.PublicKey,
|
|
||||||
msgs ...lnwire.Message) error {
|
|
||||||
|
|
||||||
// Compute the target peer's identifier.
|
|
||||||
targetPubBytes := target.SerializeCompressed()
|
|
||||||
|
|
||||||
srvrLog.Tracef("Attempting to send msgs %v to: %x",
|
|
||||||
len(msgs), targetPubBytes)
|
|
||||||
|
|
||||||
// Lookup intended target in peersByPub, returning an error to the
|
|
||||||
// caller if the peer is unknown. Access to peersByPub is synchronized
|
|
||||||
// here to ensure we consider the exact set of peers present at the
|
|
||||||
// time of invocation.
|
|
||||||
s.mu.RLock()
|
|
||||||
targetPeer, err := s.findPeerByPubStr(string(targetPubBytes))
|
|
||||||
s.mu.RUnlock()
|
|
||||||
if err == ErrPeerNotConnected {
|
|
||||||
srvrLog.Errorf("unable to send message to %x, "+
|
|
||||||
"peer is not connected", targetPubBytes)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send messages to the peer and get the error channels that will be
|
|
||||||
// signaled by the peer's write handler.
|
|
||||||
errChans := s.sendPeerMessages(targetPeer, msgs, nil)
|
|
||||||
|
|
||||||
// With the server's shared lock released, we now handle all of the
|
|
||||||
// errors being returned from the target peer's write handler.
|
|
||||||
for _, errChan := range errChans {
|
|
||||||
select {
|
|
||||||
case err := <-errChan:
|
|
||||||
return err
|
|
||||||
case <-targetPeer.quit:
|
|
||||||
return ErrPeerExiting
|
|
||||||
case <-s.quit:
|
|
||||||
return ErrServerShuttingDown
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotifyWhenOnline can be called by other subsystems to get notified when a
|
// NotifyWhenOnline can be called by other subsystems to get notified when a
|
||||||
// particular peer comes online. The peer itself is sent across the peerChan.
|
// particular peer comes online. The peer itself is sent across the peerChan.
|
||||||
//
|
//
|
||||||
@ -2036,56 +1996,6 @@ func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
|
|
||||||
// `targetPeer`. This method supports additional broadcast-level
|
|
||||||
// synchronization by using the additional `wg` to coordinate a particular
|
|
||||||
// broadcast. Since this method will wait for the return error from sending
|
|
||||||
// each message, it should be run as a goroutine (see comment below) and
|
|
||||||
// the error ignored if used for broadcasting messages, where the result
|
|
||||||
// from sending the messages is not of importance.
|
|
||||||
//
|
|
||||||
// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a
|
|
||||||
// go routine--both `wg` and the server's WaitGroup should be incremented
|
|
||||||
// beforehand. If this method is not spawned as a go routine, the provided
|
|
||||||
// `wg` should be nil, and the server's WaitGroup should not be tracking this
|
|
||||||
// invocation.
|
|
||||||
func (s *server) sendPeerMessages(
|
|
||||||
targetPeer *peer,
|
|
||||||
msgs []lnwire.Message,
|
|
||||||
wg *sync.WaitGroup) []chan error {
|
|
||||||
|
|
||||||
// If a WaitGroup is provided, we assume that this method was spawned
|
|
||||||
// as a go routine, and that it is being tracked by both the server's
|
|
||||||
// WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this
|
|
||||||
// event, we defer a call to Done on both WaitGroups to 1) ensure that
|
|
||||||
// server will be able to shutdown after its go routines exit, and 2)
|
|
||||||
// so the server can return to the caller of BroadcastMessage.
|
|
||||||
isBroadcast := wg != nil
|
|
||||||
if isBroadcast {
|
|
||||||
defer s.wg.Done()
|
|
||||||
defer wg.Done()
|
|
||||||
}
|
|
||||||
|
|
||||||
// We queue each message, creating a slice of error channels that
|
|
||||||
// can be inspected after every message is successfully added to
|
|
||||||
// the queue.
|
|
||||||
var errChans []chan error
|
|
||||||
for _, msg := range msgs {
|
|
||||||
// If this is not broadcast, create error channels to provide
|
|
||||||
// synchronous feedback regarding the delivery of the message to
|
|
||||||
// a specific peer.
|
|
||||||
var errChan chan error
|
|
||||||
if !isBroadcast {
|
|
||||||
errChan = make(chan error, 1)
|
|
||||||
errChans = append(errChans, errChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
targetPeer.queueMsg(msg, errChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
return errChans
|
|
||||||
}
|
|
||||||
|
|
||||||
// FindPeer will return the peer that corresponds to the passed in public key.
|
// FindPeer will return the peer that corresponds to the passed in public key.
|
||||||
// This function is used by the funding manager, allowing it to update the
|
// This function is used by the funding manager, allowing it to update the
|
||||||
// daemon's local representation of the remote peer.
|
// daemon's local representation of the remote peer.
|
||||||
|
Loading…
Reference in New Issue
Block a user