discovery/gossiper: bypass main event loop for queries

This commit restructures the delivery of gossip
query related messages, such that they are delivered
directly to the gossip syncers. Gossip query rate
limiting was introduced in #1824 on a per-peer basis.
However, since all gossip query messages were being
delivered in the main event loop, the end result is
that one rate-limited peer could stall all other
peers.

In addition, since no other peers would be able to
submit gossip-related messages through the blocked
event loop, the back pressure would eventually rate
limit the read handlers of all peers as well.
The end result would be lengthy delays in reading
messages related to htlc forwarding.

The fix is to lift the delivery of gossip query
messages outside of the main event loop. With
this change, the rate limiting backpressure is
delivered only to the intended peer.
This commit is contained in:
Conner Fromknecht 2018-11-01 17:13:13 -07:00
parent b0b2475660
commit 96c47f7de4
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -456,12 +456,67 @@ func (d *AuthenticatedGossiper) Stop() {
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
peer lnpeer.Peer) chan error {
errChan := make(chan error, 1)
// For messages in the known set of channel series queries, we'll
// dispatch the message directly to the gossipSyncer, and skip the main
// processing loop.
switch m := msg.(type) {
case *lnwire.QueryShortChanIDs,
*lnwire.QueryChannelRange,
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:
syncer, err := d.findGossipSyncer(peer.IdentityKey())
if err != nil {
log.Warnf("Unable to find gossip syncer for "+
"peer=%x: %v", peer.PubKey(), err)
errChan <- err
return errChan
}
// If we've found the message target, then we'll dispatch the
// message directly to it.
syncer.ProcessQueryMsg(m)
errChan <- nil
return errChan
// If a peer is updating its current update horizon, then we'll dispatch
// that directly to the proper gossipSyncer.
case *lnwire.GossipTimestampRange:
syncer, err := d.findGossipSyncer(peer.IdentityKey())
if err != nil {
log.Warnf("Unable to find gossip syncer for "+
"peer=%x: %v", peer.PubKey(), err)
errChan <- err
return errChan
}
// If we've found the message target, then we'll dispatch the
// message directly to it.
err = syncer.ApplyGossipFilter(m)
if err != nil {
log.Warnf("unable to apply gossip "+
"filter for peer=%x: %v",
peer.PubKey(), err)
errChan <- err
return errChan
}
errChan <- nil
return errChan
}
nMsg := &networkMsg{
msg: msg,
isRemote: true,
peer: peer,
source: peer.IdentityKey(),
err: make(chan error, 1),
err: errChan,
}
select {
@ -965,7 +1020,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
policyUpdate.errResp <- nil
case announcement := <-d.networkMsgs:
switch msg := announcement.msg.(type) {
switch announcement.msg.(type) {
// Channel announcement signatures are amongst the only
// messages that we'll process serially.
case *lnwire.AnnounceSignatures:
@ -978,51 +1033,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
)
}
continue
// If a peer is updating its current update horizon,
// then we'll dispatch that directly to the proper
// gossipSyncer.
case *lnwire.GossipTimestampRange:
syncer, err := d.findGossipSyncer(
announcement.source,
)
if err != nil {
log.Warnf("Unable to find gossip "+
"syncer for peer=%x: %v",
announcement.peer.PubKey(), err)
continue
}
// If we've found the message target, then
// we'll dispatch the message directly to it.
err = syncer.ApplyGossipFilter(msg)
if err != nil {
log.Warnf("unable to apply gossip "+
"filter for peer=%x: %v",
announcement.peer.PubKey(), err)
}
continue
// For messages in the known set of channel series
// queries, we'll dispatch the message directly to the
// peer, and skip the main processing loop.
case *lnwire.QueryShortChanIDs,
*lnwire.QueryChannelRange,
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:
syncer, err := d.findGossipSyncer(
announcement.source,
)
if err != nil {
log.Warnf("Unable to find gossip "+
"syncer for peer=%x: %v",
announcement.source, err)
continue
}
syncer.ProcessQueryMsg(announcement.msg)
continue
}
// If this message was recently rejected, then we won't