lncfg+itest: expose configurable batch-commit-interval

This will permit a greater degree of tuning or customization depending
on various hardware/environmental factors.
This commit is contained in:
Conner Fromknecht 2020-11-24 16:40:54 -08:00
parent e8c545e909
commit 82a238317c
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
8 changed files with 58 additions and 26 deletions

@ -275,6 +275,7 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
} }
chanDB.graph = newChannelGraph( chanDB.graph = newChannelGraph(
chanDB, opts.RejectCacheSize, opts.ChannelCacheSize, chanDB, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval,
) )
// Synchronize the version of database and apply migrations if needed. // Synchronize the version of database and apply migrations if needed.

@ -185,17 +185,18 @@ type ChannelGraph struct {
// newChannelGraph allocates a new ChannelGraph backed by a DB instance. The // newChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// returned instance has its own unique reject cache and channel cache. // returned instance has its own unique reject cache and channel cache.
func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph { func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int,
batchCommitInterval time.Duration) *ChannelGraph {
g := &ChannelGraph{ g := &ChannelGraph{
db: db, db: db,
rejectCache: newRejectCache(rejectCacheSize), rejectCache: newRejectCache(rejectCacheSize),
chanCache: newChannelCache(chanCacheSize), chanCache: newChannelCache(chanCacheSize),
} }
g.chanScheduler = batch.NewTimeScheduler( g.chanScheduler = batch.NewTimeScheduler(
db.Backend, &g.cacheMu, 500*time.Millisecond, db.Backend, &g.cacheMu, batchCommitInterval,
) )
g.nodeScheduler = batch.NewTimeScheduler( g.nodeScheduler = batch.NewTimeScheduler(
db.Backend, nil, 500*time.Millisecond, db.Backend, nil, batchCommitInterval,
) )
return g return g
} }

@ -31,6 +31,10 @@ type Options struct {
// channel cache. // channel cache.
ChannelCacheSize int ChannelCacheSize int
// BatchCommitInterval is the maximum duration the batch schedulers will
// wait before attempting to commit a pending set of updates.
BatchCommitInterval time.Duration
// clock is the time source used by the database. // clock is the time source used by the database.
clock clock.Clock clock clock.Clock
@ -92,6 +96,14 @@ func OptionAutoCompactMinAge(minAge time.Duration) OptionModifier {
} }
} }
// OptionSetBatchCommitInterval sets the batch commit interval for the internval
// batch schedulers.
func OptionSetBatchCommitInterval(interval time.Duration) OptionModifier {
return func(o *Options) {
o.BatchCommitInterval = interval
}
}
// OptionClock sets a non-default clock dependency. // OptionClock sets a non-default clock dependency.
func OptionClock(clock clock.Clock) OptionModifier { func OptionClock(clock clock.Clock) OptionModifier {
return func(o *Options) { return func(o *Options) {

@ -3,6 +3,7 @@ package lncfg
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/channeldb/kvdb"
) )
@ -11,12 +12,15 @@ const (
dbName = "channel.db" dbName = "channel.db"
BoltBackend = "bolt" BoltBackend = "bolt"
EtcdBackend = "etcd" EtcdBackend = "etcd"
DefaultBatchCommitInterval = 500 * time.Millisecond
) )
// DB holds database configuration for LND. // DB holds database configuration for LND.
type DB struct { type DB struct {
Backend string `long:"backend" description:"The selected database backend."` Backend string `long:"backend" description:"The selected database backend."`
BatchCommitInterval time.Duration `long:"batch-commit-interval" description:"The maximum duration the channel graph batch schedulers will wait before attempting to commit a batch of pending updates. This can be tradeoff database contenion for commit latency."`
Etcd *kvdb.EtcdConfig `group:"etcd" namespace:"etcd" description:"Etcd settings."` Etcd *kvdb.EtcdConfig `group:"etcd" namespace:"etcd" description:"Etcd settings."`
Bolt *kvdb.BoltConfig `group:"bolt" namespace:"bolt" description:"Bolt settings."` Bolt *kvdb.BoltConfig `group:"bolt" namespace:"bolt" description:"Bolt settings."`
@ -26,6 +30,7 @@ type DB struct {
func DefaultDB() *DB { func DefaultDB() *DB {
return &DB{ return &DB{
Backend: BoltBackend, Backend: BoltBackend,
BatchCommitInterval: DefaultBatchCommitInterval,
Bolt: &kvdb.BoltConfig{ Bolt: &kvdb.BoltConfig{
AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
}, },

3
lnd.go

@ -1381,6 +1381,7 @@ func initializeDatabases(ctx context.Context,
databaseBackends.LocalDB, databaseBackends.LocalDB,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval),
channeldb.OptionDryRunMigration(cfg.DryRunMigration), channeldb.OptionDryRunMigration(cfg.DryRunMigration),
) )
switch { switch {
@ -1409,6 +1410,7 @@ func initializeDatabases(ctx context.Context,
databaseBackends.LocalDB, databaseBackends.LocalDB,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval),
channeldb.OptionDryRunMigration(cfg.DryRunMigration), channeldb.OptionDryRunMigration(cfg.DryRunMigration),
) )
switch { switch {
@ -1433,6 +1435,7 @@ func initializeDatabases(ctx context.Context,
remoteChanDB, err = channeldb.CreateWithBackend( remoteChanDB, err = channeldb.CreateWithBackend(
databaseBackends.RemoteDB, databaseBackends.RemoteDB,
channeldb.OptionDryRunMigration(cfg.DryRunMigration), channeldb.OptionDryRunMigration(cfg.DryRunMigration),
channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval),
) )
switch { switch {
case err == channeldb.ErrDryRunMigrationOK: case err == channeldb.ErrDryRunMigrationOK:

@ -1846,12 +1846,11 @@ func getChannelPolicies(t *harnessTest, node *lntest.HarnessNode,
} }
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
chanGraph, err := node.DescribeGraph(ctxt, descReq) chanGraph, err := node.DescribeGraph(ctxt, descReq)
if err != nil { require.NoError(t.t, err, "unable to query for alice's graph")
t.Fatalf("unable to query for alice's graph: %v", err)
}
var policies []*lnrpc.RoutingPolicy var policies []*lnrpc.RoutingPolicy
out: err = wait.NoError(func() error {
out:
for _, chanPoint := range chanPoints { for _, chanPoint := range chanPoints {
for _, e := range chanGraph.Edges { for _, e := range chanGraph.Edges {
if e.ChanPoint != txStr(chanPoint) { if e.ChanPoint != txStr(chanPoint) {
@ -1859,9 +1858,11 @@ out:
} }
if e.Node1Pub == advertisingNode { if e.Node1Pub == advertisingNode {
policies = append(policies, e.Node1Policy) policies = append(policies,
e.Node1Policy)
} else { } else {
policies = append(policies, e.Node2Policy) policies = append(policies,
e.Node2Policy)
} }
continue out continue out
@ -1869,9 +1870,13 @@ out:
// If we've iterated over all the known edges and we weren't // If we've iterated over all the known edges and we weren't
// able to find this specific one, then we'll fail. // able to find this specific one, then we'll fail.
t.Fatalf("did not find edge %v", txStr(chanPoint)) return fmt.Errorf("did not find edge %v", txStr(chanPoint))
} }
return nil
}, defaultTimeout)
require.NoError(t.t, err)
return policies return policies
} }

@ -238,6 +238,7 @@ func (cfg NodeConfig) genArgs() []string {
args = append(args, "--nobootstrap") args = append(args, "--nobootstrap")
args = append(args, "--debuglevel=debug") args = append(args, "--debuglevel=debug")
args = append(args, "--bitcoin.defaultchanconfs=1") args = append(args, "--bitcoin.defaultchanconfs=1")
args = append(args, fmt.Sprintf("--db.batch-commit-interval=%v", 10*time.Millisecond))
args = append(args, fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV)) args = append(args, fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV))
args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr())) args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()))
args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr())) args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()))

@ -930,6 +930,10 @@ litecoin.node=ltcd
; also has experimental support for etcd, a replicated backend. ; also has experimental support for etcd, a replicated backend.
; db.backend=bolt ; db.backend=bolt
; The maximum interval the graph database will wait between attempting to flush
; a batch of modifications to disk. Defaults to 500 milliseconds.
; db.batch-commit-interval=500ms
[etcd] [etcd]
; Etcd database host. ; Etcd database host.
; db.etcd.host=localhost:2379 ; db.etcd.host=localhost:2379