From e3b529939e8aab538d32fbf288ce5b49e20328b9 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 27 Jan 2021 13:34:55 +0100 Subject: [PATCH 1/4] batch: add option for executing requests immediately We make the default non-lazy, and will make the incoming gossip requests lazy. --- batch/interface.go | 19 ++++++++++++++++++- batch/scheduler.go | 6 ++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/batch/interface.go b/batch/interface.go index b9ab8b77..e07cbd49 100644 --- a/batch/interface.go +++ b/batch/interface.go @@ -25,6 +25,23 @@ type Request struct { // // NOTE: This field is optional. OnCommit func(commitErr error) error + + // lazy should be true if we don't have to immediately execute this + // request when it comes in. This means that it can be scheduled later, + // allowing larger batches. + lazy bool +} + +// SchedulerOption is a type that can be used to supply options to a scheduled +// request. +type SchedulerOption func(r *Request) + +// LazyAdd will make the request be executed lazily, added to the next batch to +// reduce db contention. +func LazyAdd() SchedulerOption { + return func(r *Request) { + r.lazy = true + } } // Scheduler abstracts a generic batching engine that accumulates an incoming @@ -32,7 +49,7 @@ type Request struct { type Scheduler interface { // Execute schedules a Request for execution with the next available // batch. This method blocks until the the underlying closure has been - // run against the databse. The resulting error is returned to the + // run against the database. The resulting error is returned to the // caller. Execute(req *Request) error } diff --git a/batch/scheduler.go b/batch/scheduler.go index 7d681376..941c6f37 100644 --- a/batch/scheduler.go +++ b/batch/scheduler.go @@ -60,6 +60,12 @@ func (s *TimeScheduler) Execute(r *Request) error { time.AfterFunc(s.duration, s.b.trigger) } s.b.reqs = append(s.b.reqs, &req) + + // If this is a non-lazy request, we'll execute the batch immediately. + if !r.lazy { + go s.b.trigger() + } + s.mu.Unlock() // Wait for the batch to process the request. If the batch didn't From b5d6d7b4fde0743552733309781221351d7c8006 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 27 Jan 2021 13:35:32 +0100 Subject: [PATCH 2/4] channeldb: add SchedulerOp arg to graph update methods --- channeldb/graph.go | 43 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index 2eaf08ed..f042dcc4 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -458,12 +458,20 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { // channel update. // // TODO(roasbeef): also need sig of announcement -func (c *ChannelGraph) AddLightningNode(node *LightningNode) error { - return c.nodeScheduler.Execute(&batch.Request{ +func (c *ChannelGraph) AddLightningNode(node *LightningNode, + op ...batch.SchedulerOption) error { + + r := &batch.Request{ Update: func(tx kvdb.RwTx) error { return addLightningNode(tx, node) }, - }) + } + + for _, f := range op { + f(r) + } + + return c.nodeScheduler.Execute(r) } func addLightningNode(tx kvdb.RwTx, node *LightningNode) error { @@ -588,9 +596,11 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket, // involved in creation of the channel, and the set of features that the channel // supports. The chanPoint and chanID are used to uniquely identify the edge // globally within the database. -func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { +func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo, + op ...batch.SchedulerOption) error { + var alreadyExists bool - return c.chanScheduler.Execute(&batch.Request{ + r := &batch.Request{ Reset: func() { alreadyExists = false }, @@ -618,7 +628,13 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { return nil } }, - }) + } + + for _, f := range op { + f(r) + } + + return c.chanScheduler.Execute(r) } // addChannelEdge is the private form of AddChannelEdge that allows callers to @@ -1994,12 +2010,15 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex, // updated, otherwise it's the second node's information. The node ordering is // determined by the lexicographical ordering of the identity public keys of the // nodes on either side of the channel. -func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { +func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy, + op ...batch.SchedulerOption) error { + var ( isUpdate1 bool edgeNotFound bool ) - return c.chanScheduler.Execute(&batch.Request{ + + r := &batch.Request{ Reset: func() { isUpdate1 = false edgeNotFound = false @@ -2028,7 +2047,13 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { return nil } }, - }) + } + + for _, f := range op { + f(r) + } + + return c.chanScheduler.Execute(r) } func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) { From 7e34132c5330bde9cabfe4e260df3e7b3472ced1 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 27 Jan 2021 13:39:18 +0100 Subject: [PATCH 3/4] routing: let graph methods take scheduler option --- discovery/gossiper_test.go | 13 ++++++++++--- routing/router.go | 37 +++++++++++++++++++++++++------------ 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 1c1e60ca..8c1136e4 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -22,6 +22,7 @@ import ( "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnpeer" @@ -115,7 +116,9 @@ func newMockRouter(height uint32) *mockGraphSource { var _ routing.ChannelGraphSource = (*mockGraphSource)(nil) -func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error { +func (r *mockGraphSource) AddNode(node *channeldb.LightningNode, + _ ...batch.SchedulerOption) error { + r.mu.Lock() defer r.mu.Unlock() @@ -123,7 +126,9 @@ func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error { return nil } -func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error { +func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo, + _ ...batch.SchedulerOption) error { + r.mu.Lock() defer r.mu.Unlock() @@ -135,7 +140,9 @@ func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error { return nil } -func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error { +func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy, + _ ...batch.SchedulerOption) error { + r.mu.Lock() defer r.mu.Unlock() diff --git a/routing/router.go b/routing/router.go index 5d70f4b8..4c1aa221 100644 --- a/routing/router.go +++ b/routing/router.go @@ -14,6 +14,7 @@ import ( "github.com/go-errors/errors" sphinx "github.com/lightningnetwork/lightning-onion" + "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/clock" @@ -82,12 +83,12 @@ type ChannelGraphSource interface { // AddNode is used to add information about a node to the router // database. If the node with this pubkey is not present in an existing // channel, it will be ignored. - AddNode(node *channeldb.LightningNode) error + AddNode(node *channeldb.LightningNode, op ...batch.SchedulerOption) error // AddEdge is used to add edge/channel to the topology of the router, // after all information about channel will be gathered this // edge/channel might be used in construction of payment path. - AddEdge(edge *channeldb.ChannelEdgeInfo) error + AddEdge(edge *channeldb.ChannelEdgeInfo, op ...batch.SchedulerOption) error // AddProof updates the channel edge info with proof which is needed to // properly announce the edge to the rest of the network. @@ -95,7 +96,7 @@ type ChannelGraphSource interface { // UpdateEdge is used to update edge information, without this message // edge considered as not fully constructed. - UpdateEdge(policy *channeldb.ChannelEdgePolicy) error + UpdateEdge(policy *channeldb.ChannelEdgePolicy, op ...batch.SchedulerOption) error // IsStaleNode returns true if the graph source has a node announcement // for the target node with a more recent timestamp. This method will @@ -957,7 +958,7 @@ func (r *ChannelRouter) networkHandler() { // this is either a new update from our PoV or // an update to a prior vertex/edge we // previously accepted. - err = r.processUpdate(update.msg) + err = r.processUpdate(update.msg, update.op...) update.err <- err // If this message had any dependencies, then @@ -1176,7 +1177,9 @@ func (r *ChannelRouter) assertNodeAnnFreshness(node route.Vertex, // channel/edge update network update. If the update didn't affect the internal // state of the draft due to either being out of date, invalid, or redundant, // then error is returned. -func (r *ChannelRouter) processUpdate(msg interface{}) error { +func (r *ChannelRouter) processUpdate(msg interface{}, + op ...batch.SchedulerOption) error { + switch msg := msg.(type) { case *channeldb.LightningNode: // Before we add the node to the database, we'll check to see @@ -1187,7 +1190,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { return err } - if err := r.cfg.Graph.AddLightningNode(msg); err != nil { + if err := r.cfg.Graph.AddLightningNode(msg, op...); err != nil { return errors.Errorf("unable to add node %v to the "+ "graph: %v", msg.PubKeyBytes, err) } @@ -1219,7 +1222,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // short-circuit our path straight to adding the edge to our // graph. if r.cfg.AssumeChannelValid { - if err := r.cfg.Graph.AddChannelEdge(msg); err != nil { + if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil { return fmt.Errorf("unable to add edge: %v", err) } log.Tracef("New channel discovered! Link "+ @@ -1291,7 +1294,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // after commitment fees are dynamic. msg.Capacity = btcutil.Amount(chanUtxo.Value) msg.ChannelPoint = *fundingPoint - if err := r.cfg.Graph.AddChannelEdge(msg); err != nil { + if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil { return errors.Errorf("unable to add edge: %v", err) } @@ -1390,7 +1393,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // Now that we know this isn't a stale update, we'll apply the // new edge policy to the proper directional edge within the // channel graph. - if err = r.cfg.Graph.UpdateEdgePolicy(msg); err != nil { + if err = r.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil { err := errors.Errorf("unable to add channel: %v", err) log.Error(err) return err @@ -1444,6 +1447,7 @@ func (r *ChannelRouter) fetchFundingTx( // error channel. type routingMsg struct { msg interface{} + op []batch.SchedulerOption err chan error } @@ -2140,9 +2144,12 @@ func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate, // be ignored. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error { +func (r *ChannelRouter) AddNode(node *channeldb.LightningNode, + op ...batch.SchedulerOption) error { + rMsg := &routingMsg{ msg: node, + op: op, err: make(chan error, 1), } @@ -2164,9 +2171,12 @@ func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error { // in construction of payment path. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error { +func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo, + op ...batch.SchedulerOption) error { + rMsg := &routingMsg{ msg: edge, + op: op, err: make(chan error, 1), } @@ -2187,9 +2197,12 @@ func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error { // considered as not fully constructed. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy) error { +func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy, + op ...batch.SchedulerOption) error { + rMsg := &routingMsg{ msg: update, + op: op, err: make(chan error, 1), } From c9afc93151b1ff4311ff0d5ac2050e3b35aacdc9 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 27 Jan 2021 13:38:23 +0100 Subject: [PATCH 4/4] discovery/gossiper: add local updates to graph immediately Since the batch interval can potentially be long, adding local updates to the graph could be slow. This would slow down operations like adding our own channel update and announcements during the funding process, and updating edge policies for local channels. Now we instead check whether the update is remote or not, and only for remote updates use the SchedulerOption to lazily add them to the graph. --- discovery/gossiper.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index e669f61a..8993e34a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnpeer" @@ -1449,7 +1450,9 @@ func (d *AuthenticatedGossiper) processRejectedEdge( // addNode processes the given node announcement, and adds it to our channel // graph. -func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error { +func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement, + op ...batch.SchedulerOption) error { + if err := routing.ValidateNodeAnn(msg); err != nil { return fmt.Errorf("unable to validate node announcement: %v", err) @@ -1469,7 +1472,7 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error { ExtraOpaqueData: msg.ExtraOpaqueData, } - return d.cfg.Router.AddNode(node) + return d.cfg.Router.AddNode(node, op...) } // processNetworkAnnouncement processes a new network relate authenticated @@ -1486,6 +1489,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return chanID.BlockHeight+delta > d.bestHeight } + // If this is a remote update, we set the scheduler option to lazily + // add it to the graph. + var schedulerOp []batch.SchedulerOption + if nMsg.isRemote { + schedulerOp = append(schedulerOp, batch.LazyAdd()) + } + var announcements []networkMsg switch msg := nMsg.msg.(type) { @@ -1504,7 +1514,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - if err := d.addNode(msg); err != nil { + if err := d.addNode(msg, schedulerOp...); err != nil { if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { @@ -1662,7 +1672,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // writes to the DB. d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - if err := d.cfg.Router.AddEdge(edge); err != nil { + if err := d.cfg.Router.AddEdge(edge, schedulerOp...); err != nil { // If the edge was rejected due to already being known, // then it may be that case that this new message has a // fresh channel proof, so we'll check. @@ -2002,7 +2012,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( ExtraOpaqueData: msg.ExtraOpaqueData, } - if err := d.cfg.Router.UpdateEdge(update); err != nil { + if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil { if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { log.Debug(err)