routing: add a caching layer in front of the KSP algorithm

This commit adds caching to our route finding. Caching is done on a
tuple-basis mapping a (dest, amt) pair to a previously calculated set
of shortest paths. The cache invalidated on two occasions: when a block
closes a set of transactions, or we received a new channel update or
channel announcement message.

With this change, payments are now snappier from the PoV of an
application developer as we no longer need to do a series of disk-seeks
before we dispatch each payment.
This commit is contained in:
Olaoluwa Osuntokun 2017-03-20 21:28:39 -07:00
parent 47c065b72c
commit 3da8cd7551
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

@ -86,6 +86,25 @@ type Config struct {
htlcAdd *lnwire.UpdateAddHTLC) ([32]byte, error)
}
// routeTuple is an entry within the ChannelRouter's route cache. We cache
// prospective routes based on first the destination, and then the target
// amount. We required the target amount as that will influence the available
// set of paths for a payment.
type routeTuple struct {
amt btcutil.Amount
dest [33]byte
}
// newRouteTuple creates a new route tuple from the target and amount.
func newRouteTuple(amt btcutil.Amount, dest *btcec.PublicKey) routeTuple {
r := routeTuple{
amt: amt,
}
copy(r.dest[:], dest.SerializeCompressed())
return r
}
// ChannelRouter is the layer 3 router within the Lightning stack. Below the
// ChannelRouter is the HtlcSwitch, and below that is the Bitcoin blockchain
// itself. The primary role of the ChannelRouter is to respond to queries for
@ -110,10 +129,16 @@ type ChannelRouter struct {
// when doing any path finding.
selfNode *channeldb.LightningNode
// TODO(roasbeef): make LRU, invalidate upon new block connect
shortestPathCache map[[33]byte][]*Route
nodeCache map[[33]byte]*channeldb.LightningNode
edgeCache map[wire.OutPoint]*channeldb.ChannelEdgePolicy
// routeCache is a map that caches the k-shortest paths from ourselves
// to a given target destination for a particular payment amount. This
// map is used as an optimization to speed up subsequent payments to a
// particular destination. This map will be cleared each time a new
// channel announcement is accepted, or a new block arrives that
// results in channels being closed.
//
// TODO(roasbeef): make LRU
routeCacheMtx sync.RWMutex
routeCache map[routeTuple][]*Route
// newBlocks is a channel in which new blocks connected to the end of
// the main chain are sent over.
@ -194,6 +219,7 @@ func New(cfg Config) (*ChannelRouter, error) {
prematureAnnouncements: make(map[uint32][]lnwire.Message),
topologyClients: make(map[uint64]topologyClient),
ntfnClientUpdates: make(chan *topologyClientUpdate),
routeCache: make(map[routeTuple][]*Route),
quit: make(chan struct{}),
}, nil
}
@ -486,6 +512,13 @@ func (r *ChannelRouter) networkHandler() {
continue
}
// Invalidate the route cache as channels within the
// graph have closed, which may affect our choice of
// the KSP's for a particular routeTuple.
r.routeCacheMtx.Lock()
r.routeCache = make(map[routeTuple][]*Route)
r.routeCacheMtx.Unlock()
// Notify all currently registered clients of the newly
// closed channels.
closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
@ -628,6 +661,8 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool {
return chanID.BlockHeight > r.bestHeight
}
var invalidateCache bool
switch msg := msg.(type) {
// A new node announcement has arrived which either presents a new
@ -749,6 +784,7 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool {
return false
}
invalidateCache = true
log.Infof("New channel discovered! Link "+
"connects %x and %x with ChannelPoint(%v), chan_id=%v",
msg.FirstNodeID.SerializeCompressed(),
@ -845,10 +881,20 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool {
return false
}
invalidateCache = true
log.Infof("New channel update applied: %v",
spew.Sdump(chanUpdate))
}
// If we've received a channel update, then invalidate the route cache
// as channels within the graph have closed, which may affect our
// choice of the KSP's for a particular routeTuple.
if invalidateCache {
r.routeCacheMtx.Lock()
r.routeCache = make(map[routeTuple][]*Route)
r.routeCacheMtx.Unlock()
}
return true
}
@ -1222,23 +1268,44 @@ type LightningPayment struct {
func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *Route, error) {
log.Tracef("Dispatching route for lightning payment: %v",
newLogClosure(func() string {
payment.Target.Curve = nil
return spew.Sdump(payment)
}),
)
// TODO(roasbeef): consult KSP cache before dispatching
var (
sendError error
preImage [32]byte
)
// Query the graph for a set of potential routes to the destination
// node that can support our payment amount. If no such routes can be
// found then an error will be returned.
routes, err := r.FindRoutes(payment.Target, payment.Amount)
if err != nil {
return preImage, nil, err
// TODO(roasbeef): consult KSP cache before dispatching
// Before attempting to perform a series of graph traversals to find
// the k-shortest paths to the destination, we'll first consult our
// path cache
rt := newRouteTuple(payment.Amount, payment.Target)
r.routeCacheMtx.RLock()
routes, ok := r.routeCache[rt]
r.routeCacheMtx.RUnlock()
// If we don't have a set of routes cached, we'll query the graph for a
// set of potential routes to the destination node that can support our
// payment amount. If no such routes can be found then an error will be
// returned.
if !ok {
freshRoutes, err := r.FindRoutes(payment.Target, payment.Amount)
if err != nil {
return preImage, nil, err
}
// Populate the cache with this set of fresh routes so we can
// reuse them in the future.
r.routeCacheMtx.Lock()
r.routeCache[rt] = freshRoutes
r.routeCacheMtx.Unlock()
routes = freshRoutes
}
// For each eligible path, we'll attempt to successfully send our
@ -1246,7 +1313,7 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *Route
// serially until either once succeeds, or we've exhausted our set of
// available paths.
for _, route := range routes {
log.Tracef("Attempting to send payment %x, using route: %#v",
log.Tracef("Attempting to send payment %x, using route: %v",
payment.PaymentHash, newLogClosure(func() string {
return spew.Sdump(route)
}),