diff --git a/channeldb/kvdb/backend.go b/channeldb/kvdb/backend.go index 0acedcb6..6409afda 100644 --- a/channeldb/kvdb/backend.go +++ b/channeldb/kvdb/backend.go @@ -9,6 +9,13 @@ import ( _ "github.com/btcsuite/btcwallet/walletdb/bdb" // Import to register backend. ) +const ( + // DefaultTempDBFileName is the default name of the temporary bolt DB + // file that we'll use to atomically compact the primary DB file on + // startup. + DefaultTempDBFileName = "temp-dont-use.db" +) + // fileExists returns true if the file exists, and false otherwise. func fileExists(path string) bool { if _, err := os.Stat(path); err != nil { @@ -48,15 +55,12 @@ type BoltBackendConfig struct { AutoCompactMinAge time.Duration } -// GetBoltBackend opens (or creates if doesn't exits) a bbolt -// backed database and returns a kvdb.Backend wrapping it. +// GetBoltBackend opens (or creates if doesn't exits) a bbolt backed database +// and returns a kvdb.Backend wrapping it. func GetBoltBackend(cfg *BoltBackendConfig) (Backend, error) { dbFilePath := filepath.Join(cfg.DBPath, cfg.DBFileName) - var ( - db Backend - err error - ) + // Is this a new database? if !fileExists(dbFilePath) { if !fileExists(cfg.DBPath) { if err := os.MkdirAll(cfg.DBPath, 0700); err != nil { @@ -64,16 +68,86 @@ func GetBoltBackend(cfg *BoltBackendConfig) (Backend, error) { } } - db, err = Create(BoltBackendName, dbFilePath, cfg.NoFreelistSync) - } else { - db, err = Open(BoltBackendName, dbFilePath, cfg.NoFreelistSync) + return Create(BoltBackendName, dbFilePath, cfg.NoFreelistSync) } + // This is an existing database. We might want to compact it on startup + // to free up some space. + if cfg.AutoCompact { + if err := compactAndSwap(cfg); err != nil { + return nil, err + } + } + + return Open(BoltBackendName, dbFilePath, cfg.NoFreelistSync) +} + +// compactAndSwap will attempt to write a new temporary DB file to disk with +// the compacted database content, then atomically swap (via rename) the old +// file for the new file by updating the name of the new file to the old. +func compactAndSwap(cfg *BoltBackendConfig) error { + sourceName := cfg.DBFileName + + // If the main DB file isn't set, then we can't proceed. + if sourceName == "" { + return fmt.Errorf("cannot compact DB with empty name") + } + sourceFilePath := filepath.Join(cfg.DBPath, sourceName) + tempDestFilePath := filepath.Join(cfg.DBPath, DefaultTempDBFileName) + + log.Infof("Compacting database file at %v", sourceFilePath) + + // If the old temporary DB file still exists, then we'll delete it + // before proceeding. + if _, err := os.Stat(tempDestFilePath); err == nil { + log.Infof("Found old temp DB @ %v, removing before swap", + tempDestFilePath) + + err = os.Remove(tempDestFilePath) + if err != nil { + return fmt.Errorf("unable to remove old temp DB file: "+ + "%v", err) + } + } + + // Now that we know the staging area is clear, we'll create the new + // temporary DB file and close it before we write the new DB to it. + tempFile, err := os.Create(tempDestFilePath) if err != nil { - return nil, err + return fmt.Errorf("unable to create temp DB file: %v", err) + } + if err := tempFile.Close(); err != nil { + return fmt.Errorf("unable to close file: %v", err) } - return db, nil + // With the file created, we'll start the compaction and remove the + // temporary file all together once this method exits. + defer func() { + // This will only succeed if the rename below fails. If the + // compaction is successful, the file won't exist on exit + // anymore so no need to log an error here. + _ = os.Remove(tempDestFilePath) + }() + c := &compacter{ + srcPath: sourceFilePath, + dstPath: tempDestFilePath, + } + initialSize, newSize, err := c.execute() + if err != nil { + return fmt.Errorf("error during compact: %v", err) + } + + log.Infof("DB compaction of %v successful, %d -> %d bytes (gain=%.2fx)", + sourceFilePath, initialSize, newSize, + float64(initialSize)/float64(newSize)) + + log.Infof("Swapping old DB file from %v to %v", tempDestFilePath, + sourceFilePath) + + // Finally, we'll attempt to atomically rename the temporary file to + // the main back up file. If this succeeds, then we'll only have a + // single file on disk once this method exits. + return os.Rename(tempDestFilePath, sourceFilePath) } // GetTestBackend opens (or creates if doesn't exist) a bbolt or etcd diff --git a/channeldb/kvdb/bolt_compact.go b/channeldb/kvdb/bolt_compact.go new file mode 100644 index 00000000..da8f2798 --- /dev/null +++ b/channeldb/kvdb/bolt_compact.go @@ -0,0 +1,246 @@ +// The code in this file is an adapted version of the bbolt compact command +// implemented in this file: +// https://github.com/etcd-io/bbolt/blob/master/cmd/bbolt/main.go + +package kvdb + +import ( + "fmt" + "os" + "path" + + "github.com/lightningnetwork/lnd/healthcheck" + "go.etcd.io/bbolt" +) + +const ( + // defaultResultFileSizeMultiplier is the default multiplier we apply to + // the current database size to calculate how big it could possibly get + // after compacting, in case the database is already at its optimal size + // and compaction causes it to grow. This should normally not be the + // case but we really want to avoid not having enough disk space for the + // compaction, so we apply a safety margin of 10%. + defaultResultFileSizeMultiplier = float64(1.1) + + // defaultTxMaxSize is the default maximum number of operations that + // are allowed to be executed in a single transaction. + defaultTxMaxSize = 65536 + + // bucketFillSize is the fill size setting that is used for each new + // bucket that is created in the compacted database. This setting is not + // persisted and is therefore only effective for the compaction itself. + // Because during the compaction we only append data a fill percent of + // 100% is optimal for performance. + bucketFillSize = 1.0 +) + +type compacter struct { + srcPath string + dstPath string + txMaxSize int64 +} + +// execute opens the source and destination databases and then compacts the +// source into destination and returns the size of both files as a result. +func (cmd *compacter) execute() (int64, int64, error) { + if cmd.txMaxSize == 0 { + cmd.txMaxSize = defaultTxMaxSize + } + + // Ensure source file exists. + fi, err := os.Stat(cmd.srcPath) + if err != nil { + return 0, 0, fmt.Errorf("error determining source database "+ + "size: %v", err) + } + initialSize := fi.Size() + marginSize := float64(initialSize) * defaultResultFileSizeMultiplier + + // Before opening any of the databases, let's first make sure we have + // enough free space on the destination file system to create a full + // copy of the source DB (worst-case scenario if the compaction doesn't + // actually shrink the file size). + destFolder := path.Dir(cmd.dstPath) + freeSpace, err := healthcheck.AvailableDiskSpace(destFolder) + if err != nil { + return 0, 0, fmt.Errorf("error determining free disk space on "+ + "%s: %v", destFolder, err) + } + log.Debugf("Free disk space on compaction destination file system: "+ + "%d bytes", freeSpace) + if freeSpace < uint64(marginSize) { + return 0, 0, fmt.Errorf("could not start compaction, "+ + "destination folder %s only has %d bytes of free disk "+ + "space available while we need at least %d for worst-"+ + "case compaction", destFolder, freeSpace, initialSize) + } + + // Open source database. We open it in read only mode to avoid (and fix) + // possible freelist sync problems. + src, err := bbolt.Open(cmd.srcPath, 0444, &bbolt.Options{ + ReadOnly: true, + }) + if err != nil { + return 0, 0, fmt.Errorf("error opening source database: %v", + err) + } + defer func() { + if err := src.Close(); err != nil { + log.Errorf("Compact error: closing source DB: %v", err) + } + }() + + // Open destination database. + dst, err := bbolt.Open(cmd.dstPath, fi.Mode(), nil) + if err != nil { + return 0, 0, fmt.Errorf("error opening destination database: "+ + "%v", err) + } + defer func() { + if err := dst.Close(); err != nil { + log.Errorf("Compact error: closing dest DB: %v", err) + } + }() + + // Run compaction. + if err := cmd.compact(dst, src); err != nil { + return 0, 0, fmt.Errorf("error running compaction: %v", err) + } + + // Report stats on new size. + fi, err = os.Stat(cmd.dstPath) + if err != nil { + return 0, 0, fmt.Errorf("error determining destination "+ + "database size: %v", err) + } else if fi.Size() == 0 { + return 0, 0, fmt.Errorf("zero db size") + } + + return initialSize, fi.Size(), nil +} + +// compact tries to create a compacted copy of the source database in a new +// destination database. +func (cmd *compacter) compact(dst, src *bbolt.DB) error { + // Commit regularly, or we'll run out of memory for large datasets if + // using one transaction. + var size int64 + tx, err := dst.Begin(true) + if err != nil { + return err + } + defer func() { + _ = tx.Rollback() + }() + + if err := cmd.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error { + // On each key/value, check if we have exceeded tx size. + sz := int64(len(k) + len(v)) + if size+sz > cmd.txMaxSize && cmd.txMaxSize != 0 { + // Commit previous transaction. + if err := tx.Commit(); err != nil { + return err + } + + // Start new transaction. + tx, err = dst.Begin(true) + if err != nil { + return err + } + size = 0 + } + size += sz + + // Create bucket on the root transaction if this is the first + // level. + nk := len(keys) + if nk == 0 { + bkt, err := tx.CreateBucket(k) + if err != nil { + return err + } + if err := bkt.SetSequence(seq); err != nil { + return err + } + return nil + } + + // Create buckets on subsequent levels, if necessary. + b := tx.Bucket(keys[0]) + if nk > 1 { + for _, k := range keys[1:] { + b = b.Bucket(k) + } + } + + // Fill the entire page for best compaction. + b.FillPercent = bucketFillSize + + // If there is no value then this is a bucket call. + if v == nil { + bkt, err := b.CreateBucket(k) + if err != nil { + return err + } + if err := bkt.SetSequence(seq); err != nil { + return err + } + return nil + } + + // Otherwise treat it as a key/value pair. + return b.Put(k, v) + }); err != nil { + return err + } + + return tx.Commit() +} + +// walkFunc is the type of the function called for keys (buckets and "normal" +// values) discovered by Walk. keys is the list of keys to descend to the bucket +// owning the discovered key/value pair k/v. +type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error + +// walk walks recursively the bolt database db, calling walkFn for each key it +// finds. +func (cmd *compacter) walk(db *bbolt.DB, walkFn walkFunc) error { + return db.View(func(tx *bbolt.Tx) error { + return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + // This will log the top level buckets only to give the + // user some sense of progress. + log.Debugf("Compacting top level bucket %s", name) + + return cmd.walkBucket( + b, nil, name, nil, b.Sequence(), walkFn, + ) + }) + }) +} + +// walkBucket recursively walks through a bucket. +func (cmd *compacter) walkBucket(b *bbolt.Bucket, keyPath [][]byte, k, v []byte, + seq uint64, fn walkFunc) error { + + // Execute callback. + if err := fn(keyPath, k, v, seq); err != nil { + return err + } + + // If this is not a bucket then stop. + if v != nil { + return nil + } + + // Iterate over each child key/value. + keyPath = append(keyPath, k) + return b.ForEach(func(k, v []byte) error { + if v == nil { + bkt := b.Bucket(k) + return cmd.walkBucket( + bkt, keyPath, k, nil, bkt.Sequence(), fn, + ) + } + return cmd.walkBucket(b, keyPath, k, v, b.Sequence(), fn) + }) +} diff --git a/channeldb/kvdb/log.go b/channeldb/kvdb/log.go new file mode 100644 index 00000000..628d48be --- /dev/null +++ b/channeldb/kvdb/log.go @@ -0,0 +1,12 @@ +package kvdb + +import "github.com/btcsuite/btclog" + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/channeldb/log.go b/channeldb/log.go index 75ba2a5f..92d8c3e7 100644 --- a/channeldb/log.go +++ b/channeldb/log.go @@ -3,6 +3,7 @@ package channeldb import ( "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/channeldb/kvdb" mig "github.com/lightningnetwork/lnd/channeldb/migration" "github.com/lightningnetwork/lnd/channeldb/migration12" "github.com/lightningnetwork/lnd/channeldb/migration13" @@ -35,4 +36,5 @@ func UseLogger(logger btclog.Logger) { migration12.UseLogger(logger) migration13.UseLogger(logger) migration16.UseLogger(logger) + kvdb.UseLogger(logger) } diff --git a/go.mod b/go.mod index 464bf779..998b54bf 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/tv42/zbase32 v0.0.0-20160707012821-501572607d02 github.com/urfave/cli v1.18.0 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect + go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50 go.uber.org/zap v1.14.1 // indirect golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899 golang.org/x/net v0.0.0-20191002035440-2ec189313ef0