diff --git a/batch/batch.go b/batch/batch.go new file mode 100644 index 00000000..6b1fa2ab --- /dev/null +++ b/batch/batch.go @@ -0,0 +1,102 @@ +package batch + +import ( + "errors" + "sync" + + "github.com/lightningnetwork/lnd/channeldb/kvdb" +) + +// errSolo is a sentinel error indicating that the requester should re-run the +// operation in isolation. +var errSolo = errors.New( + "batch function returned an error and should be re-run solo", +) + +type request struct { + *Request + errChan chan error +} + +type batch struct { + db kvdb.Backend + start sync.Once + reqs []*request + clear func(b *batch) + locker sync.Locker +} + +// trigger is the entry point for the batch and ensures that run is started at +// most once. +func (b *batch) trigger() { + b.start.Do(b.run) +} + +// run executes the current batch of requests. If any individual requests fail +// alongside others they will be retried by the caller. +func (b *batch) run() { + // Clear the batch from its scheduler, ensuring that no new requests are + // added to this batch. + b.clear(b) + + // If a cache lock was provided, hold it until the this method returns. + // This is critical for ensuring external consistency of the operation, + // so that caches don't get out of sync with the on disk state. + if b.locker != nil { + b.locker.Lock() + defer b.locker.Unlock() + } + + // Apply the batch until a subset succeeds or all of them fail. Requests + // that fail will be retried individually. + for len(b.reqs) > 0 { + var failIdx = -1 + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + for i, req := range b.reqs { + err := req.Update(tx) + if err != nil { + failIdx = i + return err + } + } + return nil + }, func() { + for _, req := range b.reqs { + if req.Reset != nil { + req.Reset() + } + } + }) + + // If a request's Update failed, extract it and re-run the + // batch. The removed request will be retried individually by + // the caller. + if failIdx >= 0 { + req := b.reqs[failIdx] + + // It's safe to shorten b.reqs here because the + // scheduler's batch no longer points to us. + b.reqs[failIdx] = b.reqs[len(b.reqs)-1] + b.reqs = b.reqs[:len(b.reqs)-1] + + // Tell the submitter re-run it solo, continue with the + // rest of the batch. + req.errChan <- errSolo + continue + } + + // None of the remaining requests failed, process the errors + // using each request's OnCommit closure and return the error + // to the requester. If no OnCommit closure is provided, simply + // return the error directly. + for _, req := range b.reqs { + if req.OnCommit != nil { + req.errChan <- req.OnCommit(err) + } else { + req.errChan <- err + } + } + + return + } +} diff --git a/batch/interface.go b/batch/interface.go new file mode 100644 index 00000000..b9ab8b77 --- /dev/null +++ b/batch/interface.go @@ -0,0 +1,38 @@ +package batch + +import "github.com/lightningnetwork/lnd/channeldb/kvdb" + +// Request defines an operation that can be batched into a single bbolt +// transaction. +type Request struct { + // Reset is called before each invocation of Update and is used to clear + // any possible modifications to local state as a result of previous + // calls to Update that were not committed due to a concurrent batch + // failure. + // + // NOTE: This field is optional. + Reset func() + + // Update is applied alongside other operations in the batch. + // + // NOTE: This method MUST NOT acquire any mutexes. + Update func(tx kvdb.RwTx) error + + // OnCommit is called if the batch or a subset of the batch including + // this request all succeeded without failure. The passed error should + // contain the result of the transaction commit, as that can still fail + // even if none of the closures returned an error. + // + // NOTE: This field is optional. + OnCommit func(commitErr error) error +} + +// Scheduler abstracts a generic batching engine that accumulates an incoming +// set of Requests, executes them, and returns the error from the operation. +type Scheduler interface { + // Execute schedules a Request for execution with the next available + // batch. This method blocks until the the underlying closure has been + // run against the databse. The resulting error is returned to the + // caller. + Execute(req *Request) error +} diff --git a/batch/scheduler.go b/batch/scheduler.go new file mode 100644 index 00000000..7d681376 --- /dev/null +++ b/batch/scheduler.go @@ -0,0 +1,103 @@ +package batch + +import ( + "sync" + "time" + + "github.com/lightningnetwork/lnd/channeldb/kvdb" +) + +// TimeScheduler is a batching engine that executes requests within a fixed +// horizon. When the first request is received, a TimeScheduler waits a +// configurable duration for other concurrent requests to join the batch. Once +// this time has elapsed, the batch is closed and executed. Subsequent requests +// are then added to a new batch which undergoes the same process. +type TimeScheduler struct { + db kvdb.Backend + locker sync.Locker + duration time.Duration + + mu sync.Mutex + b *batch +} + +// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at +// which to schedule batches. If the operation needs to modify a higher-level +// cache, the cache's lock should be provided to so that external consistency +// can be maintained, as successful db operations will cause a request's +// OnCommit method to be executed while holding this lock. +func NewTimeScheduler(db kvdb.Backend, locker sync.Locker, + duration time.Duration) *TimeScheduler { + + return &TimeScheduler{ + db: db, + locker: locker, + duration: duration, + } +} + +// Execute schedules the provided request for batch execution along with other +// concurrent requests. The request will be executed within a fixed horizon, +// parameterizeed by the duration of the scheduler. The error from the +// underlying operation is returned to the caller. +// +// NOTE: Part of the Scheduler interface. +func (s *TimeScheduler) Execute(r *Request) error { + req := request{ + Request: r, + errChan: make(chan error, 1), + } + + // Add the request to the current batch. If the batch has been cleared + // or no batch exists, create a new one. + s.mu.Lock() + if s.b == nil { + s.b = &batch{ + db: s.db, + clear: s.clear, + locker: s.locker, + } + time.AfterFunc(s.duration, s.b.trigger) + } + s.b.reqs = append(s.b.reqs, &req) + s.mu.Unlock() + + // Wait for the batch to process the request. If the batch didn't + // ask us to execute the request individually, simply return the error. + err := <-req.errChan + if err != errSolo { + return err + } + + // Obtain exclusive access to the cache if this scheduler needs to + // modify the cache in OnCommit. + if s.locker != nil { + s.locker.Lock() + defer s.locker.Unlock() + } + + // Otherwise, run the request on its own. + commitErr := kvdb.Update(s.db, req.Update, func() { + if req.Reset != nil { + req.Reset() + } + }) + + // Finally, return the commit error directly or execute the OnCommit + // closure with the commit error if present. + if req.OnCommit != nil { + return req.OnCommit(commitErr) + } + + return commitErr +} + +// clear resets the scheduler's batch to nil so that no more requests can be +// added. +func (s *TimeScheduler) clear(b *batch) { + s.mu.Lock() + if s.b == b { + s.b = nil + } + s.mu.Unlock() +} diff --git a/channeldb/db.go b/channeldb/db.go index 465003c4..5c7d90b0 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -275,6 +275,7 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, } chanDB.graph = newChannelGraph( chanDB, opts.RejectCacheSize, opts.ChannelCacheSize, + opts.BatchCommitInterval, ) // Synchronize the version of database and apply migrations if needed. diff --git a/channeldb/graph.go b/channeldb/graph.go index 5d599cf6..2333bc94 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -18,6 +18,7 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -177,16 +178,27 @@ type ChannelGraph struct { cacheMu sync.RWMutex rejectCache *rejectCache chanCache *channelCache + + chanScheduler batch.Scheduler + nodeScheduler batch.Scheduler } // newChannelGraph allocates a new ChannelGraph backed by a DB instance. The // returned instance has its own unique reject cache and channel cache. -func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph { - return &ChannelGraph{ +func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int, + batchCommitInterval time.Duration) *ChannelGraph { + g := &ChannelGraph{ db: db, rejectCache: newRejectCache(rejectCacheSize), chanCache: newChannelCache(chanCacheSize), } + g.chanScheduler = batch.NewTimeScheduler( + db.Backend, &g.cacheMu, batchCommitInterval, + ) + g.nodeScheduler = batch.NewTimeScheduler( + db.Backend, nil, batchCommitInterval, + ) + return g } // Database returns a pointer to the underlying database. @@ -440,15 +452,17 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { // AddLightningNode adds a vertex/node to the graph database. If the node is not // in the database from before, this will add a new, unconnected one to the // graph. If it is present from before, this will update that node's -// information. Note that this method is expected to only be called to update -// an already present node from a node announcement, or to insert a node found -// in a channel update. +// information. Note that this method is expected to only be called to update an +// already present node from a node announcement, or to insert a node found in a +// channel update. // // TODO(roasbeef): also need sig of announcement func (c *ChannelGraph) AddLightningNode(node *LightningNode) error { - return kvdb.Update(c.db, func(tx kvdb.RwTx) error { - return addLightningNode(tx, node) - }, func() {}) + return c.nodeScheduler.Execute(&batch.Request{ + Update: func(tx kvdb.RwTx) error { + return addLightningNode(tx, node) + }, + }) } func addLightningNode(tx kvdb.RwTx, node *LightningNode) error { @@ -568,26 +582,42 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket, } // AddChannelEdge adds a new (undirected, blank) edge to the graph database. An -// undirected edge from the two target nodes are created. The information -// stored denotes the static attributes of the channel, such as the channelID, -// the keys involved in creation of the channel, and the set of features that -// the channel supports. The chanPoint and chanID are used to uniquely identify -// the edge globally within the database. +// undirected edge from the two target nodes are created. The information stored +// denotes the static attributes of the channel, such as the channelID, the keys +// involved in creation of the channel, and the set of features that the channel +// supports. The chanPoint and chanID are used to uniquely identify the edge +// globally within the database. func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { - c.cacheMu.Lock() - defer c.cacheMu.Unlock() + var alreadyExists bool + return c.chanScheduler.Execute(&batch.Request{ + Reset: func() { + alreadyExists = false + }, + Update: func(tx kvdb.RwTx) error { + err := c.addChannelEdge(tx, edge) - err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { - return c.addChannelEdge(tx, edge) - }, func() {}) - if err != nil { - return err - } + // Silence ErrEdgeAlreadyExist so that the batch can + // succeed, but propagate the error via local state. + if err == ErrEdgeAlreadyExist { + alreadyExists = true + return nil + } - c.rejectCache.remove(edge.ChannelID) - c.chanCache.remove(edge.ChannelID) - - return nil + return err + }, + OnCommit: func(err error) error { + switch { + case err != nil: + return err + case alreadyExists: + return ErrEdgeAlreadyExist + default: + c.rejectCache.remove(edge.ChannelID) + c.chanCache.remove(edge.ChannelID) + return nil + } + }, + }) } // addChannelEdge is the private form of AddChannelEdge that allows callers to @@ -1929,51 +1959,71 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex, // the ChannelEdgePolicy determines which of the directed edges are being // updated. If the flag is 1, then the first node's information is being // updated, otherwise it's the second node's information. The node ordering is -// determined by the lexicographical ordering of the identity public keys of -// the nodes on either side of the channel. +// determined by the lexicographical ordering of the identity public keys of the +// nodes on either side of the channel. func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { - c.cacheMu.Lock() - defer c.cacheMu.Unlock() + var ( + isUpdate1 bool + edgeNotFound bool + ) + return c.chanScheduler.Execute(&batch.Request{ + Reset: func() { + isUpdate1 = false + edgeNotFound = false + }, + Update: func(tx kvdb.RwTx) error { + var err error + isUpdate1, err = updateEdgePolicy(tx, edge) - var isUpdate1 bool - err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { - var err error - isUpdate1, err = updateEdgePolicy(tx, edge) - return err - }, func() { - isUpdate1 = false + // Silence ErrEdgeNotFound so that the batch can + // succeed, but propagate the error via local state. + if err == ErrEdgeNotFound { + edgeNotFound = true + return nil + } + + return err + }, + OnCommit: func(err error) error { + switch { + case err != nil: + return err + case edgeNotFound: + return ErrEdgeNotFound + default: + c.updateEdgeCache(edge, isUpdate1) + return nil + } + }, }) - if err != nil { - return err - } +} +func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) { // If an entry for this channel is found in reject cache, we'll modify // the entry with the updated timestamp for the direction that was just // written. If the edge doesn't exist, we'll load the cache entry lazily // during the next query for this edge. - if entry, ok := c.rejectCache.get(edge.ChannelID); ok { + if entry, ok := c.rejectCache.get(e.ChannelID); ok { if isUpdate1 { - entry.upd1Time = edge.LastUpdate.Unix() + entry.upd1Time = e.LastUpdate.Unix() } else { - entry.upd2Time = edge.LastUpdate.Unix() + entry.upd2Time = e.LastUpdate.Unix() } - c.rejectCache.insert(edge.ChannelID, entry) + c.rejectCache.insert(e.ChannelID, entry) } // If an entry for this channel is found in channel cache, we'll modify // the entry with the updated policy for the direction that was just // written. If the edge doesn't exist, we'll defer loading the info and // policies and lazily read from disk during the next query. - if channel, ok := c.chanCache.get(edge.ChannelID); ok { + if channel, ok := c.chanCache.get(e.ChannelID); ok { if isUpdate1 { - channel.Policy1 = edge + channel.Policy1 = e } else { - channel.Policy2 = edge + channel.Policy2 = e } - c.chanCache.insert(edge.ChannelID, channel) + c.chanCache.insert(e.ChannelID, channel) } - - return nil } // updateEdgePolicy attempts to update an edge's policy within the relevant diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 52d1114d..2abdcc8e 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -3,6 +3,7 @@ package channeldb import ( "bytes" "crypto/sha256" + "errors" "fmt" "image/color" "math" @@ -11,6 +12,7 @@ import ( "net" "reflect" "runtime" + "sync" "testing" "time" @@ -21,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" ) var ( @@ -3195,3 +3198,148 @@ func TestComputeFee(t *testing.T) { t.Fatalf("expected fee %v, but got %v", fee, fwdFee) } } + +// TestBatchedAddChannelEdge asserts that BatchedAddChannelEdge properly +// executes multiple AddChannelEdge requests in a single txn. +func TestBatchedAddChannelEdge(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + require.Nil(t, err) + defer cleanUp() + + graph := db.ChannelGraph() + sourceNode, err := createTestVertex(db) + require.Nil(t, err) + err = graph.SetSourceNode(sourceNode) + require.Nil(t, err) + + // We'd like to test the insertion/deletion of edges, so we create two + // vertexes to connect. + node1, err := createTestVertex(db) + require.Nil(t, err) + node2, err := createTestVertex(db) + require.Nil(t, err) + + // In addition to the fake vertexes we create some fake channel + // identifiers. + var spendOutputs []*wire.OutPoint + var blockHash chainhash.Hash + copy(blockHash[:], bytes.Repeat([]byte{1}, 32)) + + // Prune the graph a few times to make sure we have entries in the + // prune log. + _, err = graph.PruneGraph(spendOutputs, &blockHash, 155) + require.Nil(t, err) + var blockHash2 chainhash.Hash + copy(blockHash2[:], bytes.Repeat([]byte{2}, 32)) + + _, err = graph.PruneGraph(spendOutputs, &blockHash2, 156) + require.Nil(t, err) + + // We'll create 3 almost identical edges, so first create a helper + // method containing all logic for doing so. + + // Create an edge which has its block height at 156. + height := uint32(156) + edgeInfo, _ := createEdge(height, 0, 0, 0, node1, node2) + + // Create an edge with block height 157. We give it + // maximum values for tx index and position, to make + // sure our database range scan get edges from the + // entire range. + edgeInfo2, _ := createEdge( + height+1, math.MaxUint32&0x00ffffff, math.MaxUint16, 1, + node1, node2, + ) + + // Create a third edge, this with a block height of 155. + edgeInfo3, _ := createEdge(height-1, 0, 0, 2, node1, node2) + + edges := []ChannelEdgeInfo{edgeInfo, edgeInfo2, edgeInfo3} + errChan := make(chan error, len(edges)) + errTimeout := errors.New("timeout adding batched channel") + + // Now add all these new edges to the database. + var wg sync.WaitGroup + for _, edge := range edges { + wg.Add(1) + go func(edge ChannelEdgeInfo) { + defer wg.Done() + + select { + case errChan <- graph.AddChannelEdge(&edge): + case <-time.After(2 * time.Second): + errChan <- errTimeout + } + }(edge) + } + wg.Wait() + + for i := 0; i < len(edges); i++ { + err := <-errChan + require.Nil(t, err) + } +} + +// TestBatchedUpdateEdgePolicy asserts that BatchedUpdateEdgePolicy properly +// executes multiple UpdateEdgePolicy requests in a single txn. +func TestBatchedUpdateEdgePolicy(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + require.Nil(t, err) + defer cleanUp() + + graph := db.ChannelGraph() + + // We'd like to test the update of edges inserted into the database, so + // we create two vertexes to connect. + node1, err := createTestVertex(db) + require.Nil(t, err) + err = graph.AddLightningNode(node1) + require.Nil(t, err) + node2, err := createTestVertex(db) + require.Nil(t, err) + err = graph.AddLightningNode(node2) + require.Nil(t, err) + + // Create an edge and add it to the db. + edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2) + + // Make sure inserting the policy at this point, before the edge info + // is added, will fail. + err = graph.UpdateEdgePolicy(edge1) + require.Error(t, ErrEdgeNotFound, err) + + // Add the edge info. + err = graph.AddChannelEdge(edgeInfo) + require.Nil(t, err) + + errTimeout := errors.New("timeout adding batched channel") + + updates := []*ChannelEdgePolicy{edge1, edge2} + + errChan := make(chan error, len(updates)) + + // Now add all these new edges to the database. + var wg sync.WaitGroup + for _, update := range updates { + wg.Add(1) + go func(update *ChannelEdgePolicy) { + defer wg.Done() + + select { + case errChan <- graph.UpdateEdgePolicy(update): + case <-time.After(2 * time.Second): + errChan <- errTimeout + } + }(update) + } + wg.Wait() + + for i := 0; i < len(updates); i++ { + err := <-errChan + require.Nil(t, err) + } +} diff --git a/channeldb/options.go b/channeldb/options.go index c9144650..285da570 100644 --- a/channeldb/options.go +++ b/channeldb/options.go @@ -31,6 +31,10 @@ type Options struct { // channel cache. ChannelCacheSize int + // BatchCommitInterval is the maximum duration the batch schedulers will + // wait before attempting to commit a pending set of updates. + BatchCommitInterval time.Duration + // clock is the time source used by the database. clock clock.Clock @@ -92,6 +96,14 @@ func OptionAutoCompactMinAge(minAge time.Duration) OptionModifier { } } +// OptionSetBatchCommitInterval sets the batch commit interval for the internval +// batch schedulers. +func OptionSetBatchCommitInterval(interval time.Duration) OptionModifier { + return func(o *Options) { + o.BatchCommitInterval = interval + } +} + // OptionClock sets a non-default clock dependency. func OptionClock(clock clock.Clock) OptionModifier { return func(o *Options) { diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a20fb270..87dcde7c 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "runtime" "sync" "time" @@ -912,9 +911,7 @@ func (d *AuthenticatedGossiper) networkHandler() { // We'll use this validation to ensure that we process jobs in their // dependency order during parallel validation. - validationBarrier := routing.NewValidationBarrier( - runtime.NumCPU()*4, d.quit, - ) + validationBarrier := routing.NewValidationBarrier(1000, d.quit) for { select { @@ -1692,9 +1689,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // If the edge was rejected due to already being known, // then it may be that case that this new message has a // fresh channel proof, so we'll check. - if routing.IsError(err, routing.ErrOutdated, - routing.ErrIgnored) { - + if routing.IsError(err, routing.ErrIgnored) { // Attempt to process the rejected message to // see if we get any new announcements. anns, rErr := d.processRejectedEdge(msg, proof) diff --git a/lncfg/db.go b/lncfg/db.go index 63b58c46..8c920314 100644 --- a/lncfg/db.go +++ b/lncfg/db.go @@ -3,20 +3,24 @@ package lncfg import ( "context" "fmt" + "time" "github.com/lightningnetwork/lnd/channeldb/kvdb" ) const ( - dbName = "channel.db" - BoltBackend = "bolt" - EtcdBackend = "etcd" + dbName = "channel.db" + BoltBackend = "bolt" + EtcdBackend = "etcd" + DefaultBatchCommitInterval = 500 * time.Millisecond ) // DB holds database configuration for LND. type DB struct { Backend string `long:"backend" description:"The selected database backend."` + BatchCommitInterval time.Duration `long:"batch-commit-interval" description:"The maximum duration the channel graph batch schedulers will wait before attempting to commit a batch of pending updates. This can be tradeoff database contenion for commit latency."` + Etcd *kvdb.EtcdConfig `group:"etcd" namespace:"etcd" description:"Etcd settings."` Bolt *kvdb.BoltConfig `group:"bolt" namespace:"bolt" description:"Bolt settings."` @@ -25,7 +29,8 @@ type DB struct { // NewDB creates and returns a new default DB config. func DefaultDB() *DB { return &DB{ - Backend: BoltBackend, + Backend: BoltBackend, + BatchCommitInterval: DefaultBatchCommitInterval, Bolt: &kvdb.BoltConfig{ AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, }, diff --git a/lnd.go b/lnd.go index 43692ccc..11997d14 100644 --- a/lnd.go +++ b/lnd.go @@ -1381,6 +1381,7 @@ func initializeDatabases(ctx context.Context, databaseBackends.LocalDB, channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), + channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), channeldb.OptionDryRunMigration(cfg.DryRunMigration), ) switch { @@ -1409,6 +1410,7 @@ func initializeDatabases(ctx context.Context, databaseBackends.LocalDB, channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), + channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), channeldb.OptionDryRunMigration(cfg.DryRunMigration), ) switch { @@ -1433,6 +1435,7 @@ func initializeDatabases(ctx context.Context, remoteChanDB, err = channeldb.CreateWithBackend( databaseBackends.RemoteDB, channeldb.OptionDryRunMigration(cfg.DryRunMigration), + channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), ) switch { case err == channeldb.ErrDryRunMigrationOK: diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 41ca93e6..aaa28df8 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -1850,31 +1850,36 @@ func getChannelPolicies(t *harnessTest, node *lntest.HarnessNode, } ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) chanGraph, err := node.DescribeGraph(ctxt, descReq) - if err != nil { - t.Fatalf("unable to query for alice's graph: %v", err) - } + require.NoError(t.t, err, "unable to query for alice's graph") var policies []*lnrpc.RoutingPolicy -out: - for _, chanPoint := range chanPoints { - for _, e := range chanGraph.Edges { - if e.ChanPoint != txStr(chanPoint) { - continue + err = wait.NoError(func() error { + out: + for _, chanPoint := range chanPoints { + for _, e := range chanGraph.Edges { + if e.ChanPoint != txStr(chanPoint) { + continue + } + + if e.Node1Pub == advertisingNode { + policies = append(policies, + e.Node1Policy) + } else { + policies = append(policies, + e.Node2Policy) + } + + continue out } - if e.Node1Pub == advertisingNode { - policies = append(policies, e.Node1Policy) - } else { - policies = append(policies, e.Node2Policy) - } - - continue out + // If we've iterated over all the known edges and we weren't + // able to find this specific one, then we'll fail. + return fmt.Errorf("did not find edge %v", txStr(chanPoint)) } - // If we've iterated over all the known edges and we weren't - // able to find this specific one, then we'll fail. - t.Fatalf("did not find edge %v", txStr(chanPoint)) - } + return nil + }, defaultTimeout) + require.NoError(t.t, err) return policies } diff --git a/lntest/node.go b/lntest/node.go index 4fcbc78b..d96fe995 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -238,6 +238,7 @@ func (cfg NodeConfig) genArgs() []string { args = append(args, "--nobootstrap") args = append(args, "--debuglevel=debug") args = append(args, "--bitcoin.defaultchanconfs=1") + args = append(args, fmt.Sprintf("--db.batch-commit-interval=%v", 10*time.Millisecond)) args = append(args, fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV)) args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr())) args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr())) diff --git a/routing/router.go b/routing/router.go index 93904f09..5d70f4b8 100644 --- a/routing/router.go +++ b/routing/router.go @@ -3,7 +3,6 @@ package routing import ( "bytes" "fmt" - "runtime" "sync" "sync/atomic" "time" @@ -914,7 +913,7 @@ func (r *ChannelRouter) networkHandler() { // We'll use this validation barrier to ensure that we process all jobs // in the proper order during parallel validation. - validationBarrier := NewValidationBarrier(runtime.NumCPU()*4, r.quit) + validationBarrier := NewValidationBarrier(1000, r.quit) for { diff --git a/sample-lnd.conf b/sample-lnd.conf index 7fd8a696..760df39e 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -930,6 +930,10 @@ litecoin.node=ltcd ; also has experimental support for etcd, a replicated backend. ; db.backend=bolt +; The maximum interval the graph database will wait between attempting to flush +; a batch of modifications to disk. Defaults to 500 milliseconds. +; db.batch-commit-interval=500ms + [etcd] ; Etcd database host. ; db.etcd.host=localhost:2379