From bf05e4778046643d55739a755be175ed9e9402fe Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 30 Jan 2018 20:41:27 -0800 Subject: [PATCH] discovery: add additional gossiper level reject cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we’ll add a new reject cache to ensure that we don’t attempt to re-process any announcements already rejected by the ChannelRouter. --- discovery/gossiper.go | 52 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 5d722caf..64dee356 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -193,6 +193,9 @@ type AuthenticatedGossiper struct { // consistent between when the DB is first read until it's written. channelMtx *multimutex.Mutex + rejectMtx sync.RWMutex + recentRejects map[uint64]struct{} + sync.Mutex } @@ -214,6 +217,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { prematureChannelUpdates: make(map[uint64][]*networkMsg), waitingProofs: storage, channelMtx: multimutex.NewMutex(), + recentRejects: make(map[uint64]struct{}), }, nil } @@ -890,6 +894,13 @@ func (d *AuthenticatedGossiper) networkHandler() { continue } + // If this message was recently rejected, then we won't + // attempt to re-process it. + if d.isRecentlyRejectedMsg(announcement.msg) { + announcement.err <- fmt.Errorf("recently rejected") + continue + } + // We'll set up any dependent, and wait until a free // slot for this job opens up, this allow us to not // have thousands of goroutines active. @@ -1014,6 +1025,26 @@ func (d *AuthenticatedGossiper) networkHandler() { } } +// isRecentlyRejectedMsg returns true if we recently rejected a message, and +// false otherwise, This avoids expensive reprocessing of the message. +func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool { + d.rejectMtx.RLock() + defer d.rejectMtx.RUnlock() + + switch m := msg.(type) { + case *lnwire.ChannelUpdate: + _, ok := d.recentRejects[m.ShortChannelID.ToUint64()] + return ok + + case *lnwire.ChannelAnnouncement: + _, ok := d.recentRejects[m.ShortChannelID.ToUint64()] + return ok + + default: + return false + } +} + // retransmitStaleChannels examines all outgoing channels that the source node // is known to maintain to check to see if any of them are "stale". A channel // is stale iff, the last timestamp of its rebroadcast is older then @@ -1325,6 +1356,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n log.Error("Ignoring ChannelAnnouncement from "+ "chain=%v, gossiper on chain=%v", msg.ChainHash, d.cfg.ChainHash) + d.rejectMtx.Lock() + d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} + d.rejectMtx.Unlock() return nil } @@ -1356,6 +1390,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n if err := ValidateChannelAnn(msg); err != nil { err := errors.Errorf("unable to validate "+ "announcement: %v", err) + d.rejectMtx.Lock() + d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} + d.rejectMtx.Unlock() log.Error(err) nMsg.err <- err @@ -1416,6 +1453,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // see if we get any new announcements. anns, rErr := d.processRejectedEdge(msg, proof) if rErr != nil { + d.rejectMtx.Lock() + d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} + d.rejectMtx.Unlock() nMsg.err <- rErr return nil } @@ -1517,6 +1557,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n log.Error("Ignoring ChannelUpdate from "+ "chain=%v, gossiper on chain=%v", msg.ChainHash, d.cfg.ChainHash) + d.rejectMtx.Lock() + d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} + d.rejectMtx.Unlock() return nil } @@ -1590,6 +1633,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n shortChanID, err) log.Error(err) nMsg.err <- err + + d.rejectMtx.Lock() + d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} + d.rejectMtx.Unlock() return nil } } @@ -1633,6 +1680,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { log.Debug(err) } else { + d.rejectMtx.Lock() + d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} + d.rejectMtx.Unlock() log.Error(err) } @@ -1808,7 +1858,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n chanInfo.AuthProof, chanInfo, e1, e2, ) if err != nil { - log.Error("unable to gen ann: %v", err) + log.Errorf("unable to gen ann: %v", err) return } err = d.cfg.SendToPeer(nMsg.peer, chanAnn)