Merge pull request #2135 from cfromknecht/isolate-gossip-rate-limiting

[discovery] Isolate gossip rate limiting
This commit is contained in:
Olaoluwa Osuntokun 2018-11-02 20:13:29 -07:00 committed by GitHub
commit b600985063
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 55 deletions

@ -456,12 +456,67 @@ func (d *AuthenticatedGossiper) Stop() {
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
peer lnpeer.Peer) chan error { peer lnpeer.Peer) chan error {
errChan := make(chan error, 1)
// For messages in the known set of channel series queries, we'll
// dispatch the message directly to the gossipSyncer, and skip the main
// processing loop.
switch m := msg.(type) {
case *lnwire.QueryShortChanIDs,
*lnwire.QueryChannelRange,
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:
syncer, err := d.findGossipSyncer(peer.IdentityKey())
if err != nil {
log.Warnf("Unable to find gossip syncer for "+
"peer=%x: %v", peer.PubKey(), err)
errChan <- err
return errChan
}
// If we've found the message target, then we'll dispatch the
// message directly to it.
syncer.ProcessQueryMsg(m, peer.QuitSignal())
errChan <- nil
return errChan
// If a peer is updating its current update horizon, then we'll dispatch
// that directly to the proper gossipSyncer.
case *lnwire.GossipTimestampRange:
syncer, err := d.findGossipSyncer(peer.IdentityKey())
if err != nil {
log.Warnf("Unable to find gossip syncer for "+
"peer=%x: %v", peer.PubKey(), err)
errChan <- err
return errChan
}
// If we've found the message target, then we'll dispatch the
// message directly to it.
err = syncer.ApplyGossipFilter(m)
if err != nil {
log.Warnf("unable to apply gossip "+
"filter for peer=%x: %v",
peer.PubKey(), err)
errChan <- err
return errChan
}
errChan <- nil
return errChan
}
nMsg := &networkMsg{ nMsg := &networkMsg{
msg: msg, msg: msg,
isRemote: true, isRemote: true,
peer: peer, peer: peer,
source: peer.IdentityKey(), source: peer.IdentityKey(),
err: make(chan error, 1), err: errChan,
} }
select { select {
@ -965,7 +1020,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
policyUpdate.errResp <- nil policyUpdate.errResp <- nil
case announcement := <-d.networkMsgs: case announcement := <-d.networkMsgs:
switch msg := announcement.msg.(type) { switch announcement.msg.(type) {
// Channel announcement signatures are amongst the only // Channel announcement signatures are amongst the only
// messages that we'll process serially. // messages that we'll process serially.
case *lnwire.AnnounceSignatures: case *lnwire.AnnounceSignatures:
@ -978,51 +1033,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
) )
} }
continue continue
// If a peer is updating its current update horizon,
// then we'll dispatch that directly to the proper
// gossipSyncer.
case *lnwire.GossipTimestampRange:
syncer, err := d.findGossipSyncer(
announcement.source,
)
if err != nil {
log.Warnf("Unable to find gossip "+
"syncer for peer=%x: %v",
announcement.peer.PubKey(), err)
continue
}
// If we've found the message target, then
// we'll dispatch the message directly to it.
err = syncer.ApplyGossipFilter(msg)
if err != nil {
log.Warnf("unable to apply gossip "+
"filter for peer=%x: %v",
announcement.peer.PubKey(), err)
}
continue
// For messages in the known set of channel series
// queries, we'll dispatch the message directly to the
// peer, and skip the main processing loop.
case *lnwire.QueryShortChanIDs,
*lnwire.QueryChannelRange,
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:
syncer, err := d.findGossipSyncer(
announcement.source,
)
if err != nil {
log.Warnf("Unable to find gossip "+
"syncer for peer=%x: %v",
announcement.source, err)
continue
}
syncer.ProcessQueryMsg(announcement.msg)
continue
} }
// If this message was recently rejected, then we won't // If this message was recently rejected, then we won't

@ -982,12 +982,11 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
// ProcessQueryMsg is used by outside callers to pass new channel time series // ProcessQueryMsg is used by outside callers to pass new channel time series
// queries to the internal processing goroutine. // queries to the internal processing goroutine.
func (g *gossipSyncer) ProcessQueryMsg(msg lnwire.Message) { func (g *gossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) {
select { select {
case g.gossipMsgs <- msg: case g.gossipMsgs <- msg:
return case <-peerQuit:
case <-g.quit: case <-g.quit:
return
} }
} }

10
peer.go

@ -793,7 +793,7 @@ func (ms *msgStream) msgConsumer() {
// AddMsg adds a new message to the msgStream. This function is safe for // AddMsg adds a new message to the msgStream. This function is safe for
// concurrent access. // concurrent access.
func (ms *msgStream) AddMsg(msg lnwire.Message, quit chan struct{}) { func (ms *msgStream) AddMsg(msg lnwire.Message) {
// First, we'll attempt to receive from the producerSema struct. This // First, we'll attempt to receive from the producerSema struct. This
// acts as a sempahore to prevent us from indefinitely buffering // acts as a sempahore to prevent us from indefinitely buffering
// incoming items from the wire. Either the msg queue isn't full, and // incoming items from the wire. Either the msg queue isn't full, and
@ -801,7 +801,7 @@ func (ms *msgStream) AddMsg(msg lnwire.Message, quit chan struct{}) {
// we're signalled to quit, or a slot is freed up. // we're signalled to quit, or a slot is freed up.
select { select {
case <-ms.producerSema: case <-ms.producerSema:
case <-quit: case <-ms.peer.quit:
return return
case <-ms.quit: case <-ms.quit:
return return
@ -1019,7 +1019,7 @@ out:
// forward the error to all channels with this peer. // forward the error to all channels with this peer.
case msg.ChanID == lnwire.ConnectionWideID: case msg.ChanID == lnwire.ConnectionWideID:
for chanID, chanStream := range chanMsgStreams { for chanID, chanStream := range chanMsgStreams {
chanStream.AddMsg(nextMsg, p.quit) chanStream.AddMsg(nextMsg)
// Also marked this channel as failed, // Also marked this channel as failed,
// so we won't try to restart it on // so we won't try to restart it on
@ -1081,7 +1081,7 @@ out:
*lnwire.ReplyChannelRange, *lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd: *lnwire.ReplyShortChanIDsEnd:
discStream.AddMsg(msg, p.quit) discStream.AddMsg(msg)
default: default:
peerLog.Errorf("unknown message %v received from peer "+ peerLog.Errorf("unknown message %v received from peer "+
@ -1104,7 +1104,7 @@ out:
// With the stream obtained, add the message to the // With the stream obtained, add the message to the
// stream so we can continue processing message. // stream so we can continue processing message.
chanStream.AddMsg(nextMsg, p.quit) chanStream.AddMsg(nextMsg)
} }
idleTimer.Reset(idleTimeout) idleTimer.Reset(idleTimeout)