diff --git a/channeldb/db.go b/channeldb/db.go index 465003c4..5c7d90b0 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -275,6 +275,7 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, } chanDB.graph = newChannelGraph( chanDB, opts.RejectCacheSize, opts.ChannelCacheSize, + opts.BatchCommitInterval, ) // Synchronize the version of database and apply migrations if needed. diff --git a/channeldb/graph.go b/channeldb/graph.go index 4e4dd995..ff4a871d 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -185,17 +185,18 @@ type ChannelGraph struct { // newChannelGraph allocates a new ChannelGraph backed by a DB instance. The // returned instance has its own unique reject cache and channel cache. -func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph { +func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int, + batchCommitInterval time.Duration) *ChannelGraph { g := &ChannelGraph{ db: db, rejectCache: newRejectCache(rejectCacheSize), chanCache: newChannelCache(chanCacheSize), } g.chanScheduler = batch.NewTimeScheduler( - db.Backend, &g.cacheMu, 500*time.Millisecond, + db.Backend, &g.cacheMu, batchCommitInterval, ) g.nodeScheduler = batch.NewTimeScheduler( - db.Backend, nil, 500*time.Millisecond, + db.Backend, nil, batchCommitInterval, ) return g } diff --git a/channeldb/options.go b/channeldb/options.go index c9144650..285da570 100644 --- a/channeldb/options.go +++ b/channeldb/options.go @@ -31,6 +31,10 @@ type Options struct { // channel cache. ChannelCacheSize int + // BatchCommitInterval is the maximum duration the batch schedulers will + // wait before attempting to commit a pending set of updates. + BatchCommitInterval time.Duration + // clock is the time source used by the database. clock clock.Clock @@ -92,6 +96,14 @@ func OptionAutoCompactMinAge(minAge time.Duration) OptionModifier { } } +// OptionSetBatchCommitInterval sets the batch commit interval for the internval +// batch schedulers. +func OptionSetBatchCommitInterval(interval time.Duration) OptionModifier { + return func(o *Options) { + o.BatchCommitInterval = interval + } +} + // OptionClock sets a non-default clock dependency. func OptionClock(clock clock.Clock) OptionModifier { return func(o *Options) { diff --git a/lncfg/db.go b/lncfg/db.go index 63b58c46..8c920314 100644 --- a/lncfg/db.go +++ b/lncfg/db.go @@ -3,20 +3,24 @@ package lncfg import ( "context" "fmt" + "time" "github.com/lightningnetwork/lnd/channeldb/kvdb" ) const ( - dbName = "channel.db" - BoltBackend = "bolt" - EtcdBackend = "etcd" + dbName = "channel.db" + BoltBackend = "bolt" + EtcdBackend = "etcd" + DefaultBatchCommitInterval = 500 * time.Millisecond ) // DB holds database configuration for LND. type DB struct { Backend string `long:"backend" description:"The selected database backend."` + BatchCommitInterval time.Duration `long:"batch-commit-interval" description:"The maximum duration the channel graph batch schedulers will wait before attempting to commit a batch of pending updates. This can be tradeoff database contenion for commit latency."` + Etcd *kvdb.EtcdConfig `group:"etcd" namespace:"etcd" description:"Etcd settings."` Bolt *kvdb.BoltConfig `group:"bolt" namespace:"bolt" description:"Bolt settings."` @@ -25,7 +29,8 @@ type DB struct { // NewDB creates and returns a new default DB config. func DefaultDB() *DB { return &DB{ - Backend: BoltBackend, + Backend: BoltBackend, + BatchCommitInterval: DefaultBatchCommitInterval, Bolt: &kvdb.BoltConfig{ AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, }, diff --git a/lnd.go b/lnd.go index 43692ccc..11997d14 100644 --- a/lnd.go +++ b/lnd.go @@ -1381,6 +1381,7 @@ func initializeDatabases(ctx context.Context, databaseBackends.LocalDB, channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), + channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), channeldb.OptionDryRunMigration(cfg.DryRunMigration), ) switch { @@ -1409,6 +1410,7 @@ func initializeDatabases(ctx context.Context, databaseBackends.LocalDB, channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), + channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), channeldb.OptionDryRunMigration(cfg.DryRunMigration), ) switch { @@ -1433,6 +1435,7 @@ func initializeDatabases(ctx context.Context, remoteChanDB, err = channeldb.CreateWithBackend( databaseBackends.RemoteDB, channeldb.OptionDryRunMigration(cfg.DryRunMigration), + channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), ) switch { case err == channeldb.ErrDryRunMigrationOK: diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 05c00d85..41b2790d 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -1846,31 +1846,36 @@ func getChannelPolicies(t *harnessTest, node *lntest.HarnessNode, } ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) chanGraph, err := node.DescribeGraph(ctxt, descReq) - if err != nil { - t.Fatalf("unable to query for alice's graph: %v", err) - } + require.NoError(t.t, err, "unable to query for alice's graph") var policies []*lnrpc.RoutingPolicy -out: - for _, chanPoint := range chanPoints { - for _, e := range chanGraph.Edges { - if e.ChanPoint != txStr(chanPoint) { - continue + err = wait.NoError(func() error { + out: + for _, chanPoint := range chanPoints { + for _, e := range chanGraph.Edges { + if e.ChanPoint != txStr(chanPoint) { + continue + } + + if e.Node1Pub == advertisingNode { + policies = append(policies, + e.Node1Policy) + } else { + policies = append(policies, + e.Node2Policy) + } + + continue out } - if e.Node1Pub == advertisingNode { - policies = append(policies, e.Node1Policy) - } else { - policies = append(policies, e.Node2Policy) - } - - continue out + // If we've iterated over all the known edges and we weren't + // able to find this specific one, then we'll fail. + return fmt.Errorf("did not find edge %v", txStr(chanPoint)) } - // If we've iterated over all the known edges and we weren't - // able to find this specific one, then we'll fail. - t.Fatalf("did not find edge %v", txStr(chanPoint)) - } + return nil + }, defaultTimeout) + require.NoError(t.t, err) return policies } diff --git a/lntest/node.go b/lntest/node.go index 4fcbc78b..d96fe995 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -238,6 +238,7 @@ func (cfg NodeConfig) genArgs() []string { args = append(args, "--nobootstrap") args = append(args, "--debuglevel=debug") args = append(args, "--bitcoin.defaultchanconfs=1") + args = append(args, fmt.Sprintf("--db.batch-commit-interval=%v", 10*time.Millisecond)) args = append(args, fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV)) args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr())) args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr())) diff --git a/sample-lnd.conf b/sample-lnd.conf index 7fd8a696..760df39e 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -930,6 +930,10 @@ litecoin.node=ltcd ; also has experimental support for etcd, a replicated backend. ; db.backend=bolt +; The maximum interval the graph database will wait between attempting to flush +; a batch of modifications to disk. Defaults to 500 milliseconds. +; db.batch-commit-interval=500ms + [etcd] ; Etcd database host. ; db.etcd.host=localhost:2379