diff --git a/batch/interface.go b/batch/interface.go index b9ab8b77..e07cbd49 100644 --- a/batch/interface.go +++ b/batch/interface.go @@ -25,6 +25,23 @@ type Request struct { // // NOTE: This field is optional. OnCommit func(commitErr error) error + + // lazy should be true if we don't have to immediately execute this + // request when it comes in. This means that it can be scheduled later, + // allowing larger batches. + lazy bool +} + +// SchedulerOption is a type that can be used to supply options to a scheduled +// request. +type SchedulerOption func(r *Request) + +// LazyAdd will make the request be executed lazily, added to the next batch to +// reduce db contention. +func LazyAdd() SchedulerOption { + return func(r *Request) { + r.lazy = true + } } // Scheduler abstracts a generic batching engine that accumulates an incoming @@ -32,7 +49,7 @@ type Request struct { type Scheduler interface { // Execute schedules a Request for execution with the next available // batch. This method blocks until the the underlying closure has been - // run against the databse. The resulting error is returned to the + // run against the database. The resulting error is returned to the // caller. Execute(req *Request) error } diff --git a/batch/scheduler.go b/batch/scheduler.go index 7d681376..941c6f37 100644 --- a/batch/scheduler.go +++ b/batch/scheduler.go @@ -60,6 +60,12 @@ func (s *TimeScheduler) Execute(r *Request) error { time.AfterFunc(s.duration, s.b.trigger) } s.b.reqs = append(s.b.reqs, &req) + + // If this is a non-lazy request, we'll execute the batch immediately. + if !r.lazy { + go s.b.trigger() + } + s.mu.Unlock() // Wait for the batch to process the request. If the batch didn't diff --git a/channeldb/graph.go b/channeldb/graph.go index 2eaf08ed..f042dcc4 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -458,12 +458,20 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { // channel update. // // TODO(roasbeef): also need sig of announcement -func (c *ChannelGraph) AddLightningNode(node *LightningNode) error { - return c.nodeScheduler.Execute(&batch.Request{ +func (c *ChannelGraph) AddLightningNode(node *LightningNode, + op ...batch.SchedulerOption) error { + + r := &batch.Request{ Update: func(tx kvdb.RwTx) error { return addLightningNode(tx, node) }, - }) + } + + for _, f := range op { + f(r) + } + + return c.nodeScheduler.Execute(r) } func addLightningNode(tx kvdb.RwTx, node *LightningNode) error { @@ -588,9 +596,11 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket, // involved in creation of the channel, and the set of features that the channel // supports. The chanPoint and chanID are used to uniquely identify the edge // globally within the database. -func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { +func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo, + op ...batch.SchedulerOption) error { + var alreadyExists bool - return c.chanScheduler.Execute(&batch.Request{ + r := &batch.Request{ Reset: func() { alreadyExists = false }, @@ -618,7 +628,13 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { return nil } }, - }) + } + + for _, f := range op { + f(r) + } + + return c.chanScheduler.Execute(r) } // addChannelEdge is the private form of AddChannelEdge that allows callers to @@ -1994,12 +2010,15 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex, // updated, otherwise it's the second node's information. The node ordering is // determined by the lexicographical ordering of the identity public keys of the // nodes on either side of the channel. -func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { +func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy, + op ...batch.SchedulerOption) error { + var ( isUpdate1 bool edgeNotFound bool ) - return c.chanScheduler.Execute(&batch.Request{ + + r := &batch.Request{ Reset: func() { isUpdate1 = false edgeNotFound = false @@ -2028,7 +2047,13 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { return nil } }, - }) + } + + for _, f := range op { + f(r) + } + + return c.chanScheduler.Execute(r) } func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) { diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 348f1742..96e2e09d 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnpeer" @@ -1468,7 +1469,9 @@ func (d *AuthenticatedGossiper) processRejectedEdge( // addNode processes the given node announcement, and adds it to our channel // graph. -func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error { +func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement, + op ...batch.SchedulerOption) error { + if err := routing.ValidateNodeAnn(msg); err != nil { return fmt.Errorf("unable to validate node announcement: %v", err) @@ -1488,7 +1491,7 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error { ExtraOpaqueData: msg.ExtraOpaqueData, } - return d.cfg.Router.AddNode(node) + return d.cfg.Router.AddNode(node, op...) } // processNetworkAnnouncement processes a new network relate authenticated @@ -1505,6 +1508,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return chanID.BlockHeight+delta > d.bestHeight } + // If this is a remote update, we set the scheduler option to lazily + // add it to the graph. + var schedulerOp []batch.SchedulerOption + if nMsg.isRemote { + schedulerOp = append(schedulerOp, batch.LazyAdd()) + } + var announcements []networkMsg switch msg := nMsg.msg.(type) { @@ -1523,7 +1533,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - if err := d.addNode(msg); err != nil { + if err := d.addNode(msg, schedulerOp...); err != nil { if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { @@ -1681,7 +1691,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // writes to the DB. d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - if err := d.cfg.Router.AddEdge(edge); err != nil { + if err := d.cfg.Router.AddEdge(edge, schedulerOp...); err != nil { // If the edge was rejected due to already being known, // then it may be that case that this new message has a // fresh channel proof, so we'll check. @@ -2031,7 +2041,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( ExtraOpaqueData: msg.ExtraOpaqueData, } - if err := d.cfg.Router.UpdateEdge(update); err != nil { + if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil { if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { log.Debug(err) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index a551f4cb..0ff234da 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -22,6 +22,7 @@ import ( "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnpeer" @@ -115,7 +116,9 @@ func newMockRouter(height uint32) *mockGraphSource { var _ routing.ChannelGraphSource = (*mockGraphSource)(nil) -func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error { +func (r *mockGraphSource) AddNode(node *channeldb.LightningNode, + _ ...batch.SchedulerOption) error { + r.mu.Lock() defer r.mu.Unlock() @@ -123,7 +126,9 @@ func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error { return nil } -func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error { +func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo, + _ ...batch.SchedulerOption) error { + r.mu.Lock() defer r.mu.Unlock() @@ -135,7 +140,9 @@ func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error { return nil } -func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error { +func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy, + _ ...batch.SchedulerOption) error { + r.mu.Lock() defer r.mu.Unlock() diff --git a/routing/router.go b/routing/router.go index 5d70f4b8..4c1aa221 100644 --- a/routing/router.go +++ b/routing/router.go @@ -14,6 +14,7 @@ import ( "github.com/go-errors/errors" sphinx "github.com/lightningnetwork/lightning-onion" + "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/clock" @@ -82,12 +83,12 @@ type ChannelGraphSource interface { // AddNode is used to add information about a node to the router // database. If the node with this pubkey is not present in an existing // channel, it will be ignored. - AddNode(node *channeldb.LightningNode) error + AddNode(node *channeldb.LightningNode, op ...batch.SchedulerOption) error // AddEdge is used to add edge/channel to the topology of the router, // after all information about channel will be gathered this // edge/channel might be used in construction of payment path. - AddEdge(edge *channeldb.ChannelEdgeInfo) error + AddEdge(edge *channeldb.ChannelEdgeInfo, op ...batch.SchedulerOption) error // AddProof updates the channel edge info with proof which is needed to // properly announce the edge to the rest of the network. @@ -95,7 +96,7 @@ type ChannelGraphSource interface { // UpdateEdge is used to update edge information, without this message // edge considered as not fully constructed. - UpdateEdge(policy *channeldb.ChannelEdgePolicy) error + UpdateEdge(policy *channeldb.ChannelEdgePolicy, op ...batch.SchedulerOption) error // IsStaleNode returns true if the graph source has a node announcement // for the target node with a more recent timestamp. This method will @@ -957,7 +958,7 @@ func (r *ChannelRouter) networkHandler() { // this is either a new update from our PoV or // an update to a prior vertex/edge we // previously accepted. - err = r.processUpdate(update.msg) + err = r.processUpdate(update.msg, update.op...) update.err <- err // If this message had any dependencies, then @@ -1176,7 +1177,9 @@ func (r *ChannelRouter) assertNodeAnnFreshness(node route.Vertex, // channel/edge update network update. If the update didn't affect the internal // state of the draft due to either being out of date, invalid, or redundant, // then error is returned. -func (r *ChannelRouter) processUpdate(msg interface{}) error { +func (r *ChannelRouter) processUpdate(msg interface{}, + op ...batch.SchedulerOption) error { + switch msg := msg.(type) { case *channeldb.LightningNode: // Before we add the node to the database, we'll check to see @@ -1187,7 +1190,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { return err } - if err := r.cfg.Graph.AddLightningNode(msg); err != nil { + if err := r.cfg.Graph.AddLightningNode(msg, op...); err != nil { return errors.Errorf("unable to add node %v to the "+ "graph: %v", msg.PubKeyBytes, err) } @@ -1219,7 +1222,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // short-circuit our path straight to adding the edge to our // graph. if r.cfg.AssumeChannelValid { - if err := r.cfg.Graph.AddChannelEdge(msg); err != nil { + if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil { return fmt.Errorf("unable to add edge: %v", err) } log.Tracef("New channel discovered! Link "+ @@ -1291,7 +1294,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // after commitment fees are dynamic. msg.Capacity = btcutil.Amount(chanUtxo.Value) msg.ChannelPoint = *fundingPoint - if err := r.cfg.Graph.AddChannelEdge(msg); err != nil { + if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil { return errors.Errorf("unable to add edge: %v", err) } @@ -1390,7 +1393,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // Now that we know this isn't a stale update, we'll apply the // new edge policy to the proper directional edge within the // channel graph. - if err = r.cfg.Graph.UpdateEdgePolicy(msg); err != nil { + if err = r.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil { err := errors.Errorf("unable to add channel: %v", err) log.Error(err) return err @@ -1444,6 +1447,7 @@ func (r *ChannelRouter) fetchFundingTx( // error channel. type routingMsg struct { msg interface{} + op []batch.SchedulerOption err chan error } @@ -2140,9 +2144,12 @@ func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate, // be ignored. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error { +func (r *ChannelRouter) AddNode(node *channeldb.LightningNode, + op ...batch.SchedulerOption) error { + rMsg := &routingMsg{ msg: node, + op: op, err: make(chan error, 1), } @@ -2164,9 +2171,12 @@ func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error { // in construction of payment path. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error { +func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo, + op ...batch.SchedulerOption) error { + rMsg := &routingMsg{ msg: edge, + op: op, err: make(chan error, 1), } @@ -2187,9 +2197,12 @@ func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error { // considered as not fully constructed. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy) error { +func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy, + op ...batch.SchedulerOption) error { + rMsg := &routingMsg{ msg: update, + op: op, err: make(chan error, 1), }