From 96c47f7de4835ffc1fbb74d3dc207fef93658c7d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 1 Nov 2018 17:13:13 -0700 Subject: [PATCH] discovery/gossiper: bypass main event loop for queries This commit restructures the delivery of gossip query related messages, such that they are delivered directly to the gossip syncers. Gossip query rate limiting was introduced in #1824 on a per-peer basis. However, since all gossip query messages were being delivered in the main event loop, the end result is that one rate-limited peer could stall all other peers. In addition, since no other peers would be able to submit gossip-related messages through the blocked event loop, the back pressure would eventually rate limit the read handlers of all peers as well. The end result would be lengthy delays in reading messages related to htlc forwarding. The fix is to lift the delivery of gossip query messages outside of the main event loop. With this change, the rate limiting backpressure is delivered only to the intended peer. --- discovery/gossiper.go | 104 +++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 47 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 88fd401a..8257c2fc 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -456,12 +456,67 @@ func (d *AuthenticatedGossiper) Stop() { func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, peer lnpeer.Peer) chan error { + errChan := make(chan error, 1) + + // For messages in the known set of channel series queries, we'll + // dispatch the message directly to the gossipSyncer, and skip the main + // processing loop. + switch m := msg.(type) { + case *lnwire.QueryShortChanIDs, + *lnwire.QueryChannelRange, + *lnwire.ReplyChannelRange, + *lnwire.ReplyShortChanIDsEnd: + + syncer, err := d.findGossipSyncer(peer.IdentityKey()) + if err != nil { + log.Warnf("Unable to find gossip syncer for "+ + "peer=%x: %v", peer.PubKey(), err) + + errChan <- err + return errChan + } + + // If we've found the message target, then we'll dispatch the + // message directly to it. + syncer.ProcessQueryMsg(m) + + errChan <- nil + return errChan + + // If a peer is updating its current update horizon, then we'll dispatch + // that directly to the proper gossipSyncer. + case *lnwire.GossipTimestampRange: + syncer, err := d.findGossipSyncer(peer.IdentityKey()) + if err != nil { + log.Warnf("Unable to find gossip syncer for "+ + "peer=%x: %v", peer.PubKey(), err) + + errChan <- err + return errChan + } + + // If we've found the message target, then we'll dispatch the + // message directly to it. + err = syncer.ApplyGossipFilter(m) + if err != nil { + log.Warnf("unable to apply gossip "+ + "filter for peer=%x: %v", + peer.PubKey(), err) + + errChan <- err + return errChan + } + + errChan <- nil + return errChan + } + nMsg := &networkMsg{ msg: msg, isRemote: true, peer: peer, source: peer.IdentityKey(), - err: make(chan error, 1), + err: errChan, } select { @@ -965,7 +1020,7 @@ func (d *AuthenticatedGossiper) networkHandler() { policyUpdate.errResp <- nil case announcement := <-d.networkMsgs: - switch msg := announcement.msg.(type) { + switch announcement.msg.(type) { // Channel announcement signatures are amongst the only // messages that we'll process serially. case *lnwire.AnnounceSignatures: @@ -978,51 +1033,6 @@ func (d *AuthenticatedGossiper) networkHandler() { ) } continue - - // If a peer is updating its current update horizon, - // then we'll dispatch that directly to the proper - // gossipSyncer. - case *lnwire.GossipTimestampRange: - syncer, err := d.findGossipSyncer( - announcement.source, - ) - if err != nil { - log.Warnf("Unable to find gossip "+ - "syncer for peer=%x: %v", - announcement.peer.PubKey(), err) - continue - } - - // If we've found the message target, then - // we'll dispatch the message directly to it. - err = syncer.ApplyGossipFilter(msg) - if err != nil { - log.Warnf("unable to apply gossip "+ - "filter for peer=%x: %v", - announcement.peer.PubKey(), err) - } - continue - - // For messages in the known set of channel series - // queries, we'll dispatch the message directly to the - // peer, and skip the main processing loop. - case *lnwire.QueryShortChanIDs, - *lnwire.QueryChannelRange, - *lnwire.ReplyChannelRange, - *lnwire.ReplyShortChanIDsEnd: - - syncer, err := d.findGossipSyncer( - announcement.source, - ) - if err != nil { - log.Warnf("Unable to find gossip "+ - "syncer for peer=%x: %v", - announcement.source, err) - continue - } - - syncer.ProcessQueryMsg(announcement.msg) - continue } // If this message was recently rejected, then we won't