Merge pull request #3170 from cfromknecht/remove-router-reject-cache

routing/router: remove router-level reject cache
This commit is contained in:
Johan T. Halseth 2019-06-11 21:52:27 +02:00 committed by GitHub
commit 5485101f9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -381,9 +381,6 @@ type ChannelRouter struct {
// consistency between the various database accesses. // consistency between the various database accesses.
channelEdgeMtx *multimutex.Mutex channelEdgeMtx *multimutex.Mutex
rejectMtx sync.RWMutex
rejectCache map[uint64]struct{}
sync.RWMutex sync.RWMutex
quit chan struct{} quit chan struct{}
@ -413,7 +410,6 @@ func New(cfg Config) (*ChannelRouter, error) {
ntfnClientUpdates: make(chan *topologyClientUpdate), ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex(), channelEdgeMtx: multimutex.NewMutex(),
selfNode: selfNode, selfNode: selfNode,
rejectCache: make(map[uint64]struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -804,17 +800,9 @@ func (r *ChannelRouter) pruneZombieChans() error {
// TODO(roasbeef): add ability to delete single directional edge // TODO(roasbeef): add ability to delete single directional edge
chansToPrune = append(chansToPrune, info.ChannelID) chansToPrune = append(chansToPrune, info.ChannelID)
// As we're detecting this as a zombie channel, we'll add this
// to the set of recently rejected items so we don't re-accept
// it shortly after.
r.rejectCache[info.ChannelID] = struct{}{}
return nil return nil
} }
r.rejectMtx.Lock()
defer r.rejectMtx.Unlock()
err := r.cfg.Graph.ForEachChannel(filterPruneChans) err := r.cfg.Graph.ForEachChannel(filterPruneChans)
if err != nil { if err != nil {
return fmt.Errorf("unable to filter local zombie channels: "+ return fmt.Errorf("unable to filter local zombie channels: "+
@ -1120,16 +1108,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
log.Infof("Updated vertex data for node=%x", msg.PubKeyBytes) log.Infof("Updated vertex data for node=%x", msg.PubKeyBytes)
case *channeldb.ChannelEdgeInfo: case *channeldb.ChannelEdgeInfo:
// If we recently rejected this channel edge, then we won't
// attempt to re-process it.
r.rejectMtx.RLock()
if _, ok := r.rejectCache[msg.ChannelID]; ok {
r.rejectMtx.RUnlock()
return newErrf(ErrRejected, "recently rejected "+
"chan_id=%v", msg.ChannelID)
}
r.rejectMtx.RUnlock()
// Prior to processing the announcement we first check if we // Prior to processing the announcement we first check if we
// already know of this channel, if so, then we can exit early. // already know of this channel, if so, then we can exit early.
_, _, exists, isZombie, err := r.cfg.Graph.HasChannelEdge( _, _, exists, isZombie, err := r.cfg.Graph.HasChannelEdge(
@ -1169,10 +1147,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID) channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
fundingPoint, _, err := r.fetchChanPoint(&channelID) fundingPoint, _, err := r.fetchChanPoint(&channelID)
if err != nil { if err != nil {
r.rejectMtx.Lock()
r.rejectCache[msg.ChannelID] = struct{}{}
r.rejectMtx.Unlock()
return errors.Errorf("unable to fetch chan point for "+ return errors.Errorf("unable to fetch chan point for "+
"chan_id=%v: %v", msg.ChannelID, err) "chan_id=%v: %v", msg.ChannelID, err)
} }
@ -1199,10 +1173,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
r.quit, r.quit,
) )
if err != nil { if err != nil {
r.rejectMtx.Lock()
r.rejectCache[msg.ChannelID] = struct{}{}
r.rejectMtx.Unlock()
return fmt.Errorf("unable to fetch utxo "+ return fmt.Errorf("unable to fetch utxo "+
"for chan_id=%v, chan_point=%v: %v", "for chan_id=%v, chan_point=%v: %v",
msg.ChannelID, fundingPoint, err) msg.ChannelID, fundingPoint, err)
@ -1251,16 +1221,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
} }
case *channeldb.ChannelEdgePolicy: case *channeldb.ChannelEdgePolicy:
// If we recently rejected this channel edge, then we won't
// attempt to re-process it.
r.rejectMtx.RLock()
if _, ok := r.rejectCache[msg.ChannelID]; ok {
r.rejectMtx.RUnlock()
return newErrf(ErrRejected, "recently rejected "+
"chan_id=%v", msg.ChannelID)
}
r.rejectMtx.RUnlock()
// We make sure to hold the mutex for this channel ID, // We make sure to hold the mutex for this channel ID,
// such that no other goroutine is concurrently doing // such that no other goroutine is concurrently doing
// database accesses for the same channel ID. // database accesses for the same channel ID.