From 3da8cd7551ca6107a376375fc8c7537aa09c7170 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 20 Mar 2017 21:28:39 -0700 Subject: [PATCH] routing: add a caching layer in front of the KSP algorithm This commit adds caching to our route finding. Caching is done on a tuple-basis mapping a (dest, amt) pair to a previously calculated set of shortest paths. The cache invalidated on two occasions: when a block closes a set of transactions, or we received a new channel update or channel announcement message. With this change, payments are now snappier from the PoV of an application developer as we no longer need to do a series of disk-seeks before we dispatch each payment. --- routing/router.go | 93 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 80 insertions(+), 13 deletions(-) diff --git a/routing/router.go b/routing/router.go index ace1646c..e9670928 100644 --- a/routing/router.go +++ b/routing/router.go @@ -86,6 +86,25 @@ type Config struct { htlcAdd *lnwire.UpdateAddHTLC) ([32]byte, error) } +// routeTuple is an entry within the ChannelRouter's route cache. We cache +// prospective routes based on first the destination, and then the target +// amount. We required the target amount as that will influence the available +// set of paths for a payment. +type routeTuple struct { + amt btcutil.Amount + dest [33]byte +} + +// newRouteTuple creates a new route tuple from the target and amount. +func newRouteTuple(amt btcutil.Amount, dest *btcec.PublicKey) routeTuple { + r := routeTuple{ + amt: amt, + } + copy(r.dest[:], dest.SerializeCompressed()) + + return r +} + // ChannelRouter is the layer 3 router within the Lightning stack. Below the // ChannelRouter is the HtlcSwitch, and below that is the Bitcoin blockchain // itself. The primary role of the ChannelRouter is to respond to queries for @@ -110,10 +129,16 @@ type ChannelRouter struct { // when doing any path finding. selfNode *channeldb.LightningNode - // TODO(roasbeef): make LRU, invalidate upon new block connect - shortestPathCache map[[33]byte][]*Route - nodeCache map[[33]byte]*channeldb.LightningNode - edgeCache map[wire.OutPoint]*channeldb.ChannelEdgePolicy + // routeCache is a map that caches the k-shortest paths from ourselves + // to a given target destination for a particular payment amount. This + // map is used as an optimization to speed up subsequent payments to a + // particular destination. This map will be cleared each time a new + // channel announcement is accepted, or a new block arrives that + // results in channels being closed. + // + // TODO(roasbeef): make LRU + routeCacheMtx sync.RWMutex + routeCache map[routeTuple][]*Route // newBlocks is a channel in which new blocks connected to the end of // the main chain are sent over. @@ -194,6 +219,7 @@ func New(cfg Config) (*ChannelRouter, error) { prematureAnnouncements: make(map[uint32][]lnwire.Message), topologyClients: make(map[uint64]topologyClient), ntfnClientUpdates: make(chan *topologyClientUpdate), + routeCache: make(map[routeTuple][]*Route), quit: make(chan struct{}), }, nil } @@ -486,6 +512,13 @@ func (r *ChannelRouter) networkHandler() { continue } + // Invalidate the route cache as channels within the + // graph have closed, which may affect our choice of + // the KSP's for a particular routeTuple. + r.routeCacheMtx.Lock() + r.routeCache = make(map[routeTuple][]*Route) + r.routeCacheMtx.Unlock() + // Notify all currently registered clients of the newly // closed channels. closeSummaries := createCloseSummaries(blockHeight, chansClosed...) @@ -628,6 +661,8 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool { return chanID.BlockHeight > r.bestHeight } + var invalidateCache bool + switch msg := msg.(type) { // A new node announcement has arrived which either presents a new @@ -749,6 +784,7 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool { return false } + invalidateCache = true log.Infof("New channel discovered! Link "+ "connects %x and %x with ChannelPoint(%v), chan_id=%v", msg.FirstNodeID.SerializeCompressed(), @@ -845,10 +881,20 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool { return false } + invalidateCache = true log.Infof("New channel update applied: %v", spew.Sdump(chanUpdate)) } + // If we've received a channel update, then invalidate the route cache + // as channels within the graph have closed, which may affect our + // choice of the KSP's for a particular routeTuple. + if invalidateCache { + r.routeCacheMtx.Lock() + r.routeCache = make(map[routeTuple][]*Route) + r.routeCacheMtx.Unlock() + } + return true } @@ -1222,23 +1268,44 @@ type LightningPayment struct { func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *Route, error) { log.Tracef("Dispatching route for lightning payment: %v", newLogClosure(func() string { + payment.Target.Curve = nil return spew.Sdump(payment) }), ) - // TODO(roasbeef): consult KSP cache before dispatching - var ( sendError error preImage [32]byte ) - // Query the graph for a set of potential routes to the destination - // node that can support our payment amount. If no such routes can be - // found then an error will be returned. - routes, err := r.FindRoutes(payment.Target, payment.Amount) - if err != nil { - return preImage, nil, err + // TODO(roasbeef): consult KSP cache before dispatching + + // Before attempting to perform a series of graph traversals to find + // the k-shortest paths to the destination, we'll first consult our + // path cache + rt := newRouteTuple(payment.Amount, payment.Target) + + r.routeCacheMtx.RLock() + routes, ok := r.routeCache[rt] + r.routeCacheMtx.RUnlock() + + // If we don't have a set of routes cached, we'll query the graph for a + // set of potential routes to the destination node that can support our + // payment amount. If no such routes can be found then an error will be + // returned. + if !ok { + freshRoutes, err := r.FindRoutes(payment.Target, payment.Amount) + if err != nil { + return preImage, nil, err + } + + // Populate the cache with this set of fresh routes so we can + // reuse them in the future. + r.routeCacheMtx.Lock() + r.routeCache[rt] = freshRoutes + r.routeCacheMtx.Unlock() + + routes = freshRoutes } // For each eligible path, we'll attempt to successfully send our @@ -1246,7 +1313,7 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *Route // serially until either once succeeds, or we've exhausted our set of // available paths. for _, route := range routes { - log.Tracef("Attempting to send payment %x, using route: %#v", + log.Tracef("Attempting to send payment %x, using route: %v", payment.PaymentHash, newLogClosure(func() string { return spew.Sdump(route) }),