From fb9218d10062873b7c921f53d8e9d180b0e7f748 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 24 Nov 2020 16:38:14 -0800 Subject: [PATCH 1/7] discovery/gossiper: channel announcements can't be outdated --- discovery/gossiper.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a72ddcd7..6bbdce30 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1684,9 +1684,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // If the edge was rejected due to already being known, // then it may be that case that this new message has a // fresh channel proof, so we'll check. - if routing.IsError(err, routing.ErrOutdated, - routing.ErrIgnored) { - + if routing.IsError(err, routing.ErrIgnored) { // Attempt to process the rejected message to // see if we get any new announcements. anns, rErr := d.processRejectedEdge(msg, proof) From d1634b5e13d5e7d2e67145b79f40cc3ba1fa5daf Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 24 Nov 2020 16:38:27 -0800 Subject: [PATCH 2/7] batch: add external batching engine for bbolt operations --- batch/batch.go | 102 ++++++++++++++++++++++++++++++++++++++++++++ batch/interface.go | 38 +++++++++++++++++ batch/scheduler.go | 103 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 243 insertions(+) create mode 100644 batch/batch.go create mode 100644 batch/interface.go create mode 100644 batch/scheduler.go diff --git a/batch/batch.go b/batch/batch.go new file mode 100644 index 00000000..6b1fa2ab --- /dev/null +++ b/batch/batch.go @@ -0,0 +1,102 @@ +package batch + +import ( + "errors" + "sync" + + "github.com/lightningnetwork/lnd/channeldb/kvdb" +) + +// errSolo is a sentinel error indicating that the requester should re-run the +// operation in isolation. +var errSolo = errors.New( + "batch function returned an error and should be re-run solo", +) + +type request struct { + *Request + errChan chan error +} + +type batch struct { + db kvdb.Backend + start sync.Once + reqs []*request + clear func(b *batch) + locker sync.Locker +} + +// trigger is the entry point for the batch and ensures that run is started at +// most once. +func (b *batch) trigger() { + b.start.Do(b.run) +} + +// run executes the current batch of requests. If any individual requests fail +// alongside others they will be retried by the caller. +func (b *batch) run() { + // Clear the batch from its scheduler, ensuring that no new requests are + // added to this batch. + b.clear(b) + + // If a cache lock was provided, hold it until the this method returns. + // This is critical for ensuring external consistency of the operation, + // so that caches don't get out of sync with the on disk state. + if b.locker != nil { + b.locker.Lock() + defer b.locker.Unlock() + } + + // Apply the batch until a subset succeeds or all of them fail. Requests + // that fail will be retried individually. + for len(b.reqs) > 0 { + var failIdx = -1 + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + for i, req := range b.reqs { + err := req.Update(tx) + if err != nil { + failIdx = i + return err + } + } + return nil + }, func() { + for _, req := range b.reqs { + if req.Reset != nil { + req.Reset() + } + } + }) + + // If a request's Update failed, extract it and re-run the + // batch. The removed request will be retried individually by + // the caller. + if failIdx >= 0 { + req := b.reqs[failIdx] + + // It's safe to shorten b.reqs here because the + // scheduler's batch no longer points to us. + b.reqs[failIdx] = b.reqs[len(b.reqs)-1] + b.reqs = b.reqs[:len(b.reqs)-1] + + // Tell the submitter re-run it solo, continue with the + // rest of the batch. + req.errChan <- errSolo + continue + } + + // None of the remaining requests failed, process the errors + // using each request's OnCommit closure and return the error + // to the requester. If no OnCommit closure is provided, simply + // return the error directly. + for _, req := range b.reqs { + if req.OnCommit != nil { + req.errChan <- req.OnCommit(err) + } else { + req.errChan <- err + } + } + + return + } +} diff --git a/batch/interface.go b/batch/interface.go new file mode 100644 index 00000000..b9ab8b77 --- /dev/null +++ b/batch/interface.go @@ -0,0 +1,38 @@ +package batch + +import "github.com/lightningnetwork/lnd/channeldb/kvdb" + +// Request defines an operation that can be batched into a single bbolt +// transaction. +type Request struct { + // Reset is called before each invocation of Update and is used to clear + // any possible modifications to local state as a result of previous + // calls to Update that were not committed due to a concurrent batch + // failure. + // + // NOTE: This field is optional. + Reset func() + + // Update is applied alongside other operations in the batch. + // + // NOTE: This method MUST NOT acquire any mutexes. + Update func(tx kvdb.RwTx) error + + // OnCommit is called if the batch or a subset of the batch including + // this request all succeeded without failure. The passed error should + // contain the result of the transaction commit, as that can still fail + // even if none of the closures returned an error. + // + // NOTE: This field is optional. + OnCommit func(commitErr error) error +} + +// Scheduler abstracts a generic batching engine that accumulates an incoming +// set of Requests, executes them, and returns the error from the operation. +type Scheduler interface { + // Execute schedules a Request for execution with the next available + // batch. This method blocks until the the underlying closure has been + // run against the databse. The resulting error is returned to the + // caller. + Execute(req *Request) error +} diff --git a/batch/scheduler.go b/batch/scheduler.go new file mode 100644 index 00000000..7d681376 --- /dev/null +++ b/batch/scheduler.go @@ -0,0 +1,103 @@ +package batch + +import ( + "sync" + "time" + + "github.com/lightningnetwork/lnd/channeldb/kvdb" +) + +// TimeScheduler is a batching engine that executes requests within a fixed +// horizon. When the first request is received, a TimeScheduler waits a +// configurable duration for other concurrent requests to join the batch. Once +// this time has elapsed, the batch is closed and executed. Subsequent requests +// are then added to a new batch which undergoes the same process. +type TimeScheduler struct { + db kvdb.Backend + locker sync.Locker + duration time.Duration + + mu sync.Mutex + b *batch +} + +// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at +// which to schedule batches. If the operation needs to modify a higher-level +// cache, the cache's lock should be provided to so that external consistency +// can be maintained, as successful db operations will cause a request's +// OnCommit method to be executed while holding this lock. +func NewTimeScheduler(db kvdb.Backend, locker sync.Locker, + duration time.Duration) *TimeScheduler { + + return &TimeScheduler{ + db: db, + locker: locker, + duration: duration, + } +} + +// Execute schedules the provided request for batch execution along with other +// concurrent requests. The request will be executed within a fixed horizon, +// parameterizeed by the duration of the scheduler. The error from the +// underlying operation is returned to the caller. +// +// NOTE: Part of the Scheduler interface. +func (s *TimeScheduler) Execute(r *Request) error { + req := request{ + Request: r, + errChan: make(chan error, 1), + } + + // Add the request to the current batch. If the batch has been cleared + // or no batch exists, create a new one. + s.mu.Lock() + if s.b == nil { + s.b = &batch{ + db: s.db, + clear: s.clear, + locker: s.locker, + } + time.AfterFunc(s.duration, s.b.trigger) + } + s.b.reqs = append(s.b.reqs, &req) + s.mu.Unlock() + + // Wait for the batch to process the request. If the batch didn't + // ask us to execute the request individually, simply return the error. + err := <-req.errChan + if err != errSolo { + return err + } + + // Obtain exclusive access to the cache if this scheduler needs to + // modify the cache in OnCommit. + if s.locker != nil { + s.locker.Lock() + defer s.locker.Unlock() + } + + // Otherwise, run the request on its own. + commitErr := kvdb.Update(s.db, req.Update, func() { + if req.Reset != nil { + req.Reset() + } + }) + + // Finally, return the commit error directly or execute the OnCommit + // closure with the commit error if present. + if req.OnCommit != nil { + return req.OnCommit(commitErr) + } + + return commitErr +} + +// clear resets the scheduler's batch to nil so that no more requests can be +// added. +func (s *TimeScheduler) clear(b *batch) { + s.mu.Lock() + if s.b == b { + s.b = nil + } + s.mu.Unlock() +} From 9cfe08c879176ae498cce2cb070c0004079af473 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 24 Nov 2020 16:38:40 -0800 Subject: [PATCH 3/7] channeldb/graph: split out cache updates for policy changes --- channeldb/graph.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index 67910852..aa9abbaf 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -1947,33 +1947,37 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { return err } + c.updateEdgeCache(edge, isUpdate1) + + return nil +} + +func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) { // If an entry for this channel is found in reject cache, we'll modify // the entry with the updated timestamp for the direction that was just // written. If the edge doesn't exist, we'll load the cache entry lazily // during the next query for this edge. - if entry, ok := c.rejectCache.get(edge.ChannelID); ok { + if entry, ok := c.rejectCache.get(e.ChannelID); ok { if isUpdate1 { - entry.upd1Time = edge.LastUpdate.Unix() + entry.upd1Time = e.LastUpdate.Unix() } else { - entry.upd2Time = edge.LastUpdate.Unix() + entry.upd2Time = e.LastUpdate.Unix() } - c.rejectCache.insert(edge.ChannelID, entry) + c.rejectCache.insert(e.ChannelID, entry) } // If an entry for this channel is found in channel cache, we'll modify // the entry with the updated policy for the direction that was just // written. If the edge doesn't exist, we'll defer loading the info and // policies and lazily read from disk during the next query. - if channel, ok := c.chanCache.get(edge.ChannelID); ok { + if channel, ok := c.chanCache.get(e.ChannelID); ok { if isUpdate1 { - channel.Policy1 = edge + channel.Policy1 = e } else { - channel.Policy2 = edge + channel.Policy2 = e } - c.chanCache.insert(edge.ChannelID, channel) + c.chanCache.insert(e.ChannelID, channel) } - - return nil } // updateEdgePolicy attempts to update an edge's policy within the relevant From edaa5d4308c28de97ff42d0c19280ec0f4f6a7c9 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 24 Nov 2020 16:39:28 -0800 Subject: [PATCH 4/7] channeldb/graph: add batched APIs for graph ingestion --- channeldb/graph.go | 129 ++++++++++++++++++++++------------ channeldb/graph_test.go | 148 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 235 insertions(+), 42 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index aa9abbaf..4e4dd995 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -18,6 +18,7 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -177,16 +178,26 @@ type ChannelGraph struct { cacheMu sync.RWMutex rejectCache *rejectCache chanCache *channelCache + + chanScheduler batch.Scheduler + nodeScheduler batch.Scheduler } // newChannelGraph allocates a new ChannelGraph backed by a DB instance. The // returned instance has its own unique reject cache and channel cache. func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph { - return &ChannelGraph{ + g := &ChannelGraph{ db: db, rejectCache: newRejectCache(rejectCacheSize), chanCache: newChannelCache(chanCacheSize), } + g.chanScheduler = batch.NewTimeScheduler( + db.Backend, &g.cacheMu, 500*time.Millisecond, + ) + g.nodeScheduler = batch.NewTimeScheduler( + db.Backend, nil, 500*time.Millisecond, + ) + return g } // Database returns a pointer to the underlying database. @@ -440,15 +451,17 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { // AddLightningNode adds a vertex/node to the graph database. If the node is not // in the database from before, this will add a new, unconnected one to the // graph. If it is present from before, this will update that node's -// information. Note that this method is expected to only be called to update -// an already present node from a node announcement, or to insert a node found -// in a channel update. +// information. Note that this method is expected to only be called to update an +// already present node from a node announcement, or to insert a node found in a +// channel update. // // TODO(roasbeef): also need sig of announcement func (c *ChannelGraph) AddLightningNode(node *LightningNode) error { - return kvdb.Update(c.db, func(tx kvdb.RwTx) error { - return addLightningNode(tx, node) - }, func() {}) + return c.nodeScheduler.Execute(&batch.Request{ + Update: func(tx kvdb.RwTx) error { + return addLightningNode(tx, node) + }, + }) } func addLightningNode(tx kvdb.RwTx, node *LightningNode) error { @@ -568,26 +581,42 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket, } // AddChannelEdge adds a new (undirected, blank) edge to the graph database. An -// undirected edge from the two target nodes are created. The information -// stored denotes the static attributes of the channel, such as the channelID, -// the keys involved in creation of the channel, and the set of features that -// the channel supports. The chanPoint and chanID are used to uniquely identify -// the edge globally within the database. +// undirected edge from the two target nodes are created. The information stored +// denotes the static attributes of the channel, such as the channelID, the keys +// involved in creation of the channel, and the set of features that the channel +// supports. The chanPoint and chanID are used to uniquely identify the edge +// globally within the database. func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { - c.cacheMu.Lock() - defer c.cacheMu.Unlock() + var alreadyExists bool + return c.chanScheduler.Execute(&batch.Request{ + Reset: func() { + alreadyExists = false + }, + Update: func(tx kvdb.RwTx) error { + err := c.addChannelEdge(tx, edge) - err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { - return c.addChannelEdge(tx, edge) - }, func() {}) - if err != nil { - return err - } + // Silence ErrEdgeAlreadyExist so that the batch can + // succeed, but propagate the error via local state. + if err == ErrEdgeAlreadyExist { + alreadyExists = true + return nil + } - c.rejectCache.remove(edge.ChannelID) - c.chanCache.remove(edge.ChannelID) - - return nil + return err + }, + OnCommit: func(err error) error { + switch { + case err != nil: + return err + case alreadyExists: + return ErrEdgeAlreadyExist + default: + c.rejectCache.remove(edge.ChannelID) + c.chanCache.remove(edge.ChannelID) + return nil + } + }, + }) } // addChannelEdge is the private form of AddChannelEdge that allows callers to @@ -1929,27 +1958,43 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex, // the ChannelEdgePolicy determines which of the directed edges are being // updated. If the flag is 1, then the first node's information is being // updated, otherwise it's the second node's information. The node ordering is -// determined by the lexicographical ordering of the identity public keys of -// the nodes on either side of the channel. +// determined by the lexicographical ordering of the identity public keys of the +// nodes on either side of the channel. func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { - c.cacheMu.Lock() - defer c.cacheMu.Unlock() + var ( + isUpdate1 bool + edgeNotFound bool + ) + return c.chanScheduler.Execute(&batch.Request{ + Reset: func() { + isUpdate1 = false + edgeNotFound = false + }, + Update: func(tx kvdb.RwTx) error { + var err error + isUpdate1, err = updateEdgePolicy(tx, edge) - var isUpdate1 bool - err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { - var err error - isUpdate1, err = updateEdgePolicy(tx, edge) - return err - }, func() { - isUpdate1 = false + // Silence ErrEdgeNotFound so that the batch can + // succeed, but propagate the error via local state. + if err == ErrEdgeNotFound { + edgeNotFound = true + return nil + } + + return err + }, + OnCommit: func(err error) error { + switch { + case err != nil: + return err + case edgeNotFound: + return ErrEdgeNotFound + default: + c.updateEdgeCache(edge, isUpdate1) + return nil + } + }, }) - if err != nil { - return err - } - - c.updateEdgeCache(edge, isUpdate1) - - return nil } func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) { diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 52d1114d..2abdcc8e 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -3,6 +3,7 @@ package channeldb import ( "bytes" "crypto/sha256" + "errors" "fmt" "image/color" "math" @@ -11,6 +12,7 @@ import ( "net" "reflect" "runtime" + "sync" "testing" "time" @@ -21,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" ) var ( @@ -3195,3 +3198,148 @@ func TestComputeFee(t *testing.T) { t.Fatalf("expected fee %v, but got %v", fee, fwdFee) } } + +// TestBatchedAddChannelEdge asserts that BatchedAddChannelEdge properly +// executes multiple AddChannelEdge requests in a single txn. +func TestBatchedAddChannelEdge(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + require.Nil(t, err) + defer cleanUp() + + graph := db.ChannelGraph() + sourceNode, err := createTestVertex(db) + require.Nil(t, err) + err = graph.SetSourceNode(sourceNode) + require.Nil(t, err) + + // We'd like to test the insertion/deletion of edges, so we create two + // vertexes to connect. + node1, err := createTestVertex(db) + require.Nil(t, err) + node2, err := createTestVertex(db) + require.Nil(t, err) + + // In addition to the fake vertexes we create some fake channel + // identifiers. + var spendOutputs []*wire.OutPoint + var blockHash chainhash.Hash + copy(blockHash[:], bytes.Repeat([]byte{1}, 32)) + + // Prune the graph a few times to make sure we have entries in the + // prune log. + _, err = graph.PruneGraph(spendOutputs, &blockHash, 155) + require.Nil(t, err) + var blockHash2 chainhash.Hash + copy(blockHash2[:], bytes.Repeat([]byte{2}, 32)) + + _, err = graph.PruneGraph(spendOutputs, &blockHash2, 156) + require.Nil(t, err) + + // We'll create 3 almost identical edges, so first create a helper + // method containing all logic for doing so. + + // Create an edge which has its block height at 156. + height := uint32(156) + edgeInfo, _ := createEdge(height, 0, 0, 0, node1, node2) + + // Create an edge with block height 157. We give it + // maximum values for tx index and position, to make + // sure our database range scan get edges from the + // entire range. + edgeInfo2, _ := createEdge( + height+1, math.MaxUint32&0x00ffffff, math.MaxUint16, 1, + node1, node2, + ) + + // Create a third edge, this with a block height of 155. + edgeInfo3, _ := createEdge(height-1, 0, 0, 2, node1, node2) + + edges := []ChannelEdgeInfo{edgeInfo, edgeInfo2, edgeInfo3} + errChan := make(chan error, len(edges)) + errTimeout := errors.New("timeout adding batched channel") + + // Now add all these new edges to the database. + var wg sync.WaitGroup + for _, edge := range edges { + wg.Add(1) + go func(edge ChannelEdgeInfo) { + defer wg.Done() + + select { + case errChan <- graph.AddChannelEdge(&edge): + case <-time.After(2 * time.Second): + errChan <- errTimeout + } + }(edge) + } + wg.Wait() + + for i := 0; i < len(edges); i++ { + err := <-errChan + require.Nil(t, err) + } +} + +// TestBatchedUpdateEdgePolicy asserts that BatchedUpdateEdgePolicy properly +// executes multiple UpdateEdgePolicy requests in a single txn. +func TestBatchedUpdateEdgePolicy(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + require.Nil(t, err) + defer cleanUp() + + graph := db.ChannelGraph() + + // We'd like to test the update of edges inserted into the database, so + // we create two vertexes to connect. + node1, err := createTestVertex(db) + require.Nil(t, err) + err = graph.AddLightningNode(node1) + require.Nil(t, err) + node2, err := createTestVertex(db) + require.Nil(t, err) + err = graph.AddLightningNode(node2) + require.Nil(t, err) + + // Create an edge and add it to the db. + edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2) + + // Make sure inserting the policy at this point, before the edge info + // is added, will fail. + err = graph.UpdateEdgePolicy(edge1) + require.Error(t, ErrEdgeNotFound, err) + + // Add the edge info. + err = graph.AddChannelEdge(edgeInfo) + require.Nil(t, err) + + errTimeout := errors.New("timeout adding batched channel") + + updates := []*ChannelEdgePolicy{edge1, edge2} + + errChan := make(chan error, len(updates)) + + // Now add all these new edges to the database. + var wg sync.WaitGroup + for _, update := range updates { + wg.Add(1) + go func(update *ChannelEdgePolicy) { + defer wg.Done() + + select { + case errChan <- graph.UpdateEdgePolicy(update): + case <-time.After(2 * time.Second): + errChan <- errTimeout + } + }(update) + } + wg.Wait() + + for i := 0; i < len(updates); i++ { + err := <-errChan + require.Nil(t, err) + } +} From f8154c65c5a9128062c4605a02f958ce8c5ff8aa Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 24 Nov 2020 16:39:47 -0800 Subject: [PATCH 5/7] discovery/gossiper: increase validation barrier size to 1000 This allows for a 1000 different validation operations to proceed concurrently. Now that we are batching operations at the db level, the average number of outstanding requests will be higher since the commit latency has increased. To compensate, we allow for more outstanding requests to keep the gossiper busy while batches are constructed. --- discovery/gossiper.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 6bbdce30..97f4808a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "runtime" "sync" "time" @@ -904,9 +903,7 @@ func (d *AuthenticatedGossiper) networkHandler() { // We'll use this validation to ensure that we process jobs in their // dependency order during parallel validation. - validationBarrier := routing.NewValidationBarrier( - runtime.NumCPU()*4, d.quit, - ) + validationBarrier := routing.NewValidationBarrier(1000, d.quit) for { select { From e8c545e909951c2ca586e1c659dbfca88b6a3ec6 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 24 Nov 2020 16:40:10 -0800 Subject: [PATCH 6/7] routing/router: increase validation barrier size to 1000 This allows for a 1000 different persistent operations to proceed concurrently. Now that we are batching operations at the db level, the average number of outstanding requests will be higher since the commit latency has increased. To compensate, we allow for more outstanding requests to keep the router busy while batches are constructed. --- routing/router.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/routing/router.go b/routing/router.go index af1e8d75..c8f228da 100644 --- a/routing/router.go +++ b/routing/router.go @@ -3,7 +3,6 @@ package routing import ( "bytes" "fmt" - "runtime" "sync" "sync/atomic" "time" @@ -914,7 +913,7 @@ func (r *ChannelRouter) networkHandler() { // We'll use this validation barrier to ensure that we process all jobs // in the proper order during parallel validation. - validationBarrier := NewValidationBarrier(runtime.NumCPU()*4, r.quit) + validationBarrier := NewValidationBarrier(1000, r.quit) for { From 82a238317c29ed1a3069051c99e85f68de1f0a5b Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 24 Nov 2020 16:40:54 -0800 Subject: [PATCH 7/7] lncfg+itest: expose configurable batch-commit-interval This will permit a greater degree of tuning or customization depending on various hardware/environmental factors. --- channeldb/db.go | 1 + channeldb/graph.go | 7 ++++--- channeldb/options.go | 12 +++++++++++ lncfg/db.go | 13 ++++++++---- lnd.go | 3 +++ lntest/itest/lnd_test.go | 43 ++++++++++++++++++++++------------------ lntest/node.go | 1 + sample-lnd.conf | 4 ++++ 8 files changed, 58 insertions(+), 26 deletions(-) diff --git a/channeldb/db.go b/channeldb/db.go index 465003c4..5c7d90b0 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -275,6 +275,7 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, } chanDB.graph = newChannelGraph( chanDB, opts.RejectCacheSize, opts.ChannelCacheSize, + opts.BatchCommitInterval, ) // Synchronize the version of database and apply migrations if needed. diff --git a/channeldb/graph.go b/channeldb/graph.go index 4e4dd995..ff4a871d 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -185,17 +185,18 @@ type ChannelGraph struct { // newChannelGraph allocates a new ChannelGraph backed by a DB instance. The // returned instance has its own unique reject cache and channel cache. -func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph { +func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int, + batchCommitInterval time.Duration) *ChannelGraph { g := &ChannelGraph{ db: db, rejectCache: newRejectCache(rejectCacheSize), chanCache: newChannelCache(chanCacheSize), } g.chanScheduler = batch.NewTimeScheduler( - db.Backend, &g.cacheMu, 500*time.Millisecond, + db.Backend, &g.cacheMu, batchCommitInterval, ) g.nodeScheduler = batch.NewTimeScheduler( - db.Backend, nil, 500*time.Millisecond, + db.Backend, nil, batchCommitInterval, ) return g } diff --git a/channeldb/options.go b/channeldb/options.go index c9144650..285da570 100644 --- a/channeldb/options.go +++ b/channeldb/options.go @@ -31,6 +31,10 @@ type Options struct { // channel cache. ChannelCacheSize int + // BatchCommitInterval is the maximum duration the batch schedulers will + // wait before attempting to commit a pending set of updates. + BatchCommitInterval time.Duration + // clock is the time source used by the database. clock clock.Clock @@ -92,6 +96,14 @@ func OptionAutoCompactMinAge(minAge time.Duration) OptionModifier { } } +// OptionSetBatchCommitInterval sets the batch commit interval for the internval +// batch schedulers. +func OptionSetBatchCommitInterval(interval time.Duration) OptionModifier { + return func(o *Options) { + o.BatchCommitInterval = interval + } +} + // OptionClock sets a non-default clock dependency. func OptionClock(clock clock.Clock) OptionModifier { return func(o *Options) { diff --git a/lncfg/db.go b/lncfg/db.go index 63b58c46..8c920314 100644 --- a/lncfg/db.go +++ b/lncfg/db.go @@ -3,20 +3,24 @@ package lncfg import ( "context" "fmt" + "time" "github.com/lightningnetwork/lnd/channeldb/kvdb" ) const ( - dbName = "channel.db" - BoltBackend = "bolt" - EtcdBackend = "etcd" + dbName = "channel.db" + BoltBackend = "bolt" + EtcdBackend = "etcd" + DefaultBatchCommitInterval = 500 * time.Millisecond ) // DB holds database configuration for LND. type DB struct { Backend string `long:"backend" description:"The selected database backend."` + BatchCommitInterval time.Duration `long:"batch-commit-interval" description:"The maximum duration the channel graph batch schedulers will wait before attempting to commit a batch of pending updates. This can be tradeoff database contenion for commit latency."` + Etcd *kvdb.EtcdConfig `group:"etcd" namespace:"etcd" description:"Etcd settings."` Bolt *kvdb.BoltConfig `group:"bolt" namespace:"bolt" description:"Bolt settings."` @@ -25,7 +29,8 @@ type DB struct { // NewDB creates and returns a new default DB config. func DefaultDB() *DB { return &DB{ - Backend: BoltBackend, + Backend: BoltBackend, + BatchCommitInterval: DefaultBatchCommitInterval, Bolt: &kvdb.BoltConfig{ AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, }, diff --git a/lnd.go b/lnd.go index 43692ccc..11997d14 100644 --- a/lnd.go +++ b/lnd.go @@ -1381,6 +1381,7 @@ func initializeDatabases(ctx context.Context, databaseBackends.LocalDB, channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), + channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), channeldb.OptionDryRunMigration(cfg.DryRunMigration), ) switch { @@ -1409,6 +1410,7 @@ func initializeDatabases(ctx context.Context, databaseBackends.LocalDB, channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), + channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), channeldb.OptionDryRunMigration(cfg.DryRunMigration), ) switch { @@ -1433,6 +1435,7 @@ func initializeDatabases(ctx context.Context, remoteChanDB, err = channeldb.CreateWithBackend( databaseBackends.RemoteDB, channeldb.OptionDryRunMigration(cfg.DryRunMigration), + channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), ) switch { case err == channeldb.ErrDryRunMigrationOK: diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 05c00d85..41b2790d 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -1846,31 +1846,36 @@ func getChannelPolicies(t *harnessTest, node *lntest.HarnessNode, } ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) chanGraph, err := node.DescribeGraph(ctxt, descReq) - if err != nil { - t.Fatalf("unable to query for alice's graph: %v", err) - } + require.NoError(t.t, err, "unable to query for alice's graph") var policies []*lnrpc.RoutingPolicy -out: - for _, chanPoint := range chanPoints { - for _, e := range chanGraph.Edges { - if e.ChanPoint != txStr(chanPoint) { - continue + err = wait.NoError(func() error { + out: + for _, chanPoint := range chanPoints { + for _, e := range chanGraph.Edges { + if e.ChanPoint != txStr(chanPoint) { + continue + } + + if e.Node1Pub == advertisingNode { + policies = append(policies, + e.Node1Policy) + } else { + policies = append(policies, + e.Node2Policy) + } + + continue out } - if e.Node1Pub == advertisingNode { - policies = append(policies, e.Node1Policy) - } else { - policies = append(policies, e.Node2Policy) - } - - continue out + // If we've iterated over all the known edges and we weren't + // able to find this specific one, then we'll fail. + return fmt.Errorf("did not find edge %v", txStr(chanPoint)) } - // If we've iterated over all the known edges and we weren't - // able to find this specific one, then we'll fail. - t.Fatalf("did not find edge %v", txStr(chanPoint)) - } + return nil + }, defaultTimeout) + require.NoError(t.t, err) return policies } diff --git a/lntest/node.go b/lntest/node.go index 4fcbc78b..d96fe995 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -238,6 +238,7 @@ func (cfg NodeConfig) genArgs() []string { args = append(args, "--nobootstrap") args = append(args, "--debuglevel=debug") args = append(args, "--bitcoin.defaultchanconfs=1") + args = append(args, fmt.Sprintf("--db.batch-commit-interval=%v", 10*time.Millisecond)) args = append(args, fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV)) args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr())) args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr())) diff --git a/sample-lnd.conf b/sample-lnd.conf index 7fd8a696..760df39e 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -930,6 +930,10 @@ litecoin.node=ltcd ; also has experimental support for etcd, a replicated backend. ; db.backend=bolt +; The maximum interval the graph database will wait between attempting to flush +; a batch of modifications to disk. Defaults to 500 milliseconds. +; db.batch-commit-interval=500ms + [etcd] ; Etcd database host. ; db.etcd.host=localhost:2379