routing: remove route cache
This commit removes the QueryRoutes route cache. It is causing wrong routes to be returned because not all of the request parameters are stored. The cache allowed high frequency QueryRoutes calls to the same destination and with the same amount to be returned fast. This behaviour can also be achieved by caching the request on the client side. In case a route is invalidated because of for example a channel update, the subsequent SendToRoute call will fail. This is a trigger to call QueryRoutes again for a fresh route.
This commit is contained in:
parent
c0141a7bc1
commit
3b94627c12
@ -1778,7 +1778,6 @@ func TestPathFindSpecExample(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// We'll now request a route from A -> B -> C.
|
// We'll now request a route from A -> B -> C.
|
||||||
ctx.router.routeCache = make(map[routeTuple][]*Route)
|
|
||||||
routes, err = ctx.router.FindRoutes(
|
routes, err = ctx.router.FindRoutes(
|
||||||
source.PubKeyBytes, carol, amt, noRestrictions, 100,
|
source.PubKeyBytes, carol, amt, noRestrictions, 100,
|
||||||
)
|
)
|
||||||
|
@ -285,17 +285,6 @@ type ChannelRouter struct {
|
|||||||
// when doing any path finding.
|
// when doing any path finding.
|
||||||
selfNode *channeldb.LightningNode
|
selfNode *channeldb.LightningNode
|
||||||
|
|
||||||
// routeCache is a map that caches the k-shortest paths from ourselves
|
|
||||||
// to a given target destination for a particular payment amount. This
|
|
||||||
// map is used as an optimization to speed up subsequent payments to a
|
|
||||||
// particular destination. This map will be cleared each time a new
|
|
||||||
// channel announcement is accepted, or a new block arrives that
|
|
||||||
// results in channels being closed.
|
|
||||||
//
|
|
||||||
// TODO(roasbeef): make LRU
|
|
||||||
routeCacheMtx sync.RWMutex
|
|
||||||
routeCache map[routeTuple][]*Route
|
|
||||||
|
|
||||||
// newBlocks is a channel in which new blocks connected to the end of
|
// newBlocks is a channel in which new blocks connected to the end of
|
||||||
// the main chain are sent over, and blocks updated after a call to
|
// the main chain are sent over, and blocks updated after a call to
|
||||||
// UpdateFilter.
|
// UpdateFilter.
|
||||||
@ -367,7 +356,6 @@ func New(cfg Config) (*ChannelRouter, error) {
|
|||||||
ntfnClientUpdates: make(chan *topologyClientUpdate),
|
ntfnClientUpdates: make(chan *topologyClientUpdate),
|
||||||
channelEdgeMtx: multimutex.NewMutex(),
|
channelEdgeMtx: multimutex.NewMutex(),
|
||||||
selfNode: selfNode,
|
selfNode: selfNode,
|
||||||
routeCache: make(map[routeTuple][]*Route),
|
|
||||||
rejectCache: make(map[uint64]struct{}),
|
rejectCache: make(map[uint64]struct{}),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
@ -810,12 +798,6 @@ func (r *ChannelRouter) networkHandler() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invalidate the route cache, as some channels might
|
|
||||||
// not be confirmed anymore.
|
|
||||||
r.routeCacheMtx.Lock()
|
|
||||||
r.routeCache = make(map[routeTuple][]*Route)
|
|
||||||
r.routeCacheMtx.Unlock()
|
|
||||||
|
|
||||||
// TODO(halseth): notify client about the reorg?
|
// TODO(halseth): notify client about the reorg?
|
||||||
|
|
||||||
// A new block has arrived, so we can prune the channel graph
|
// A new block has arrived, so we can prune the channel graph
|
||||||
@ -873,18 +855,6 @@ func (r *ChannelRouter) networkHandler() {
|
|||||||
log.Infof("Block %v (height=%v) closed %v channels",
|
log.Infof("Block %v (height=%v) closed %v channels",
|
||||||
chainUpdate.Hash, blockHeight, len(chansClosed))
|
chainUpdate.Hash, blockHeight, len(chansClosed))
|
||||||
|
|
||||||
// Invalidate the route cache as the block height has
|
|
||||||
// changed which will invalidate the HTLC timeouts we
|
|
||||||
// have crafted within each of the pre-computed routes.
|
|
||||||
//
|
|
||||||
// TODO(roasbeef): need to invalidate after each
|
|
||||||
// chan ann update?
|
|
||||||
// * can have map of chanID to routes involved, avoids
|
|
||||||
// full invalidation
|
|
||||||
r.routeCacheMtx.Lock()
|
|
||||||
r.routeCache = make(map[routeTuple][]*Route)
|
|
||||||
r.routeCacheMtx.Unlock()
|
|
||||||
|
|
||||||
if len(chansClosed) == 0 {
|
if len(chansClosed) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -985,8 +955,6 @@ func (r *ChannelRouter) assertNodeAnnFreshness(node Vertex,
|
|||||||
// then error is returned.
|
// then error is returned.
|
||||||
func (r *ChannelRouter) processUpdate(msg interface{}) error {
|
func (r *ChannelRouter) processUpdate(msg interface{}) error {
|
||||||
|
|
||||||
var invalidateCache bool
|
|
||||||
|
|
||||||
switch msg := msg.(type) {
|
switch msg := msg.(type) {
|
||||||
case *channeldb.LightningNode:
|
case *channeldb.LightningNode:
|
||||||
// Before we add the node to the database, we'll check to see
|
// Before we add the node to the database, we'll check to see
|
||||||
@ -1103,7 +1071,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
|
|||||||
return errors.Errorf("unable to add edge: %v", err)
|
return errors.Errorf("unable to add edge: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
invalidateCache = true
|
|
||||||
log.Infof("New channel discovered! Link "+
|
log.Infof("New channel discovered! Link "+
|
||||||
"connects %x and %x with ChannelPoint(%v): "+
|
"connects %x and %x with ChannelPoint(%v): "+
|
||||||
"chan_id=%v, capacity=%v",
|
"chan_id=%v, capacity=%v",
|
||||||
@ -1214,7 +1181,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
invalidateCache = true
|
|
||||||
log.Tracef("New channel update applied: %v",
|
log.Tracef("New channel update applied: %v",
|
||||||
newLogClosure(func() string { return spew.Sdump(msg) }))
|
newLogClosure(func() string { return spew.Sdump(msg) }))
|
||||||
|
|
||||||
@ -1222,15 +1188,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
|
|||||||
return errors.Errorf("wrong routing update message type")
|
return errors.Errorf("wrong routing update message type")
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we've received a channel update, then invalidate the route cache
|
|
||||||
// as channels within the graph have closed, which may affect our
|
|
||||||
// choice of the KSP's for a particular routeTuple.
|
|
||||||
if invalidateCache {
|
|
||||||
r.routeCacheMtx.Lock()
|
|
||||||
r.routeCache = make(map[routeTuple][]*Route)
|
|
||||||
r.routeCacheMtx.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1360,25 +1317,6 @@ func (r *ChannelRouter) FindRoutes(source, target Vertex,
|
|||||||
|
|
||||||
log.Debugf("Searching for path to %x, sending %v", target, amt)
|
log.Debugf("Searching for path to %x, sending %v", target, amt)
|
||||||
|
|
||||||
// Before attempting to perform a series of graph traversals to find the
|
|
||||||
// k-shortest paths to the destination, we'll first consult our path
|
|
||||||
// cache
|
|
||||||
//
|
|
||||||
// TODO: Route cache should store all request parameters instead of just
|
|
||||||
// amt and target. Currently false positives are returned if just the
|
|
||||||
// restrictions (fee limit, ignore lists) or finalExpiry are different.
|
|
||||||
rt := newRouteTuple(amt, target[:])
|
|
||||||
r.routeCacheMtx.RLock()
|
|
||||||
routes, ok := r.routeCache[rt]
|
|
||||||
r.routeCacheMtx.RUnlock()
|
|
||||||
|
|
||||||
// If we already have a cached route, and it contains at least the
|
|
||||||
// number of paths requested, then we'll return it directly as there's
|
|
||||||
// no need to repeat the computation.
|
|
||||||
if ok && uint32(len(routes)) >= numPaths {
|
|
||||||
return routes, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we don't have a set of routes cached, we'll query the graph for a
|
// If we don't have a set of routes cached, we'll query the graph for a
|
||||||
// set of potential routes to the destination node that can support our
|
// set of potential routes to the destination node that can support our
|
||||||
// payment amount. If no such routes can be found then an error will be
|
// payment amount. If no such routes can be found then an error will be
|
||||||
@ -1450,12 +1388,6 @@ func (r *ChannelRouter) FindRoutes(source, target Vertex,
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
// Populate the cache with this set of fresh routes so we can reuse
|
|
||||||
// them in the future.
|
|
||||||
r.routeCacheMtx.Lock()
|
|
||||||
r.routeCache[rt] = validRoutes
|
|
||||||
r.routeCacheMtx.Unlock()
|
|
||||||
|
|
||||||
return validRoutes, nil
|
return validRoutes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user