channeldb: implement compaction
This commit adds the compaction feature of the bbolt compact to our bolt backend. This will try to open the DB in read-only mode and create a compacted copy in a temporary file. Once the compaction was successful, the temporary file and the old source file are swapped atomically.
This commit is contained in:
parent
35c1fad517
commit
505be0d8bc
@ -9,6 +9,13 @@ import (
|
||||
_ "github.com/btcsuite/btcwallet/walletdb/bdb" // Import to register backend.
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultTempDBFileName is the default name of the temporary bolt DB
|
||||
// file that we'll use to atomically compact the primary DB file on
|
||||
// startup.
|
||||
DefaultTempDBFileName = "temp-dont-use.db"
|
||||
)
|
||||
|
||||
// fileExists returns true if the file exists, and false otherwise.
|
||||
func fileExists(path string) bool {
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
@ -48,15 +55,12 @@ type BoltBackendConfig struct {
|
||||
AutoCompactMinAge time.Duration
|
||||
}
|
||||
|
||||
// GetBoltBackend opens (or creates if doesn't exits) a bbolt
|
||||
// backed database and returns a kvdb.Backend wrapping it.
|
||||
// GetBoltBackend opens (or creates if doesn't exits) a bbolt backed database
|
||||
// and returns a kvdb.Backend wrapping it.
|
||||
func GetBoltBackend(cfg *BoltBackendConfig) (Backend, error) {
|
||||
dbFilePath := filepath.Join(cfg.DBPath, cfg.DBFileName)
|
||||
var (
|
||||
db Backend
|
||||
err error
|
||||
)
|
||||
|
||||
// Is this a new database?
|
||||
if !fileExists(dbFilePath) {
|
||||
if !fileExists(cfg.DBPath) {
|
||||
if err := os.MkdirAll(cfg.DBPath, 0700); err != nil {
|
||||
@ -64,16 +68,86 @@ func GetBoltBackend(cfg *BoltBackendConfig) (Backend, error) {
|
||||
}
|
||||
}
|
||||
|
||||
db, err = Create(BoltBackendName, dbFilePath, cfg.NoFreelistSync)
|
||||
} else {
|
||||
db, err = Open(BoltBackendName, dbFilePath, cfg.NoFreelistSync)
|
||||
return Create(BoltBackendName, dbFilePath, cfg.NoFreelistSync)
|
||||
}
|
||||
|
||||
// This is an existing database. We might want to compact it on startup
|
||||
// to free up some space.
|
||||
if cfg.AutoCompact {
|
||||
if err := compactAndSwap(cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return Open(BoltBackendName, dbFilePath, cfg.NoFreelistSync)
|
||||
}
|
||||
|
||||
// compactAndSwap will attempt to write a new temporary DB file to disk with
|
||||
// the compacted database content, then atomically swap (via rename) the old
|
||||
// file for the new file by updating the name of the new file to the old.
|
||||
func compactAndSwap(cfg *BoltBackendConfig) error {
|
||||
sourceName := cfg.DBFileName
|
||||
|
||||
// If the main DB file isn't set, then we can't proceed.
|
||||
if sourceName == "" {
|
||||
return fmt.Errorf("cannot compact DB with empty name")
|
||||
}
|
||||
sourceFilePath := filepath.Join(cfg.DBPath, sourceName)
|
||||
tempDestFilePath := filepath.Join(cfg.DBPath, DefaultTempDBFileName)
|
||||
|
||||
log.Infof("Compacting database file at %v", sourceFilePath)
|
||||
|
||||
// If the old temporary DB file still exists, then we'll delete it
|
||||
// before proceeding.
|
||||
if _, err := os.Stat(tempDestFilePath); err == nil {
|
||||
log.Infof("Found old temp DB @ %v, removing before swap",
|
||||
tempDestFilePath)
|
||||
|
||||
err = os.Remove(tempDestFilePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to remove old temp DB file: "+
|
||||
"%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now that we know the staging area is clear, we'll create the new
|
||||
// temporary DB file and close it before we write the new DB to it.
|
||||
tempFile, err := os.Create(tempDestFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return fmt.Errorf("unable to create temp DB file: %v", err)
|
||||
}
|
||||
if err := tempFile.Close(); err != nil {
|
||||
return fmt.Errorf("unable to close file: %v", err)
|
||||
}
|
||||
|
||||
return db, nil
|
||||
// With the file created, we'll start the compaction and remove the
|
||||
// temporary file all together once this method exits.
|
||||
defer func() {
|
||||
// This will only succeed if the rename below fails. If the
|
||||
// compaction is successful, the file won't exist on exit
|
||||
// anymore so no need to log an error here.
|
||||
_ = os.Remove(tempDestFilePath)
|
||||
}()
|
||||
c := &compacter{
|
||||
srcPath: sourceFilePath,
|
||||
dstPath: tempDestFilePath,
|
||||
}
|
||||
initialSize, newSize, err := c.execute()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error during compact: %v", err)
|
||||
}
|
||||
|
||||
log.Infof("DB compaction of %v successful, %d -> %d bytes (gain=%.2fx)",
|
||||
sourceFilePath, initialSize, newSize,
|
||||
float64(initialSize)/float64(newSize))
|
||||
|
||||
log.Infof("Swapping old DB file from %v to %v", tempDestFilePath,
|
||||
sourceFilePath)
|
||||
|
||||
// Finally, we'll attempt to atomically rename the temporary file to
|
||||
// the main back up file. If this succeeds, then we'll only have a
|
||||
// single file on disk once this method exits.
|
||||
return os.Rename(tempDestFilePath, sourceFilePath)
|
||||
}
|
||||
|
||||
// GetTestBackend opens (or creates if doesn't exist) a bbolt or etcd
|
||||
|
246
channeldb/kvdb/bolt_compact.go
Normal file
246
channeldb/kvdb/bolt_compact.go
Normal file
@ -0,0 +1,246 @@
|
||||
// The code in this file is an adapted version of the bbolt compact command
|
||||
// implemented in this file:
|
||||
// https://github.com/etcd-io/bbolt/blob/master/cmd/bbolt/main.go
|
||||
|
||||
package kvdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/lightningnetwork/lnd/healthcheck"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultResultFileSizeMultiplier is the default multiplier we apply to
|
||||
// the current database size to calculate how big it could possibly get
|
||||
// after compacting, in case the database is already at its optimal size
|
||||
// and compaction causes it to grow. This should normally not be the
|
||||
// case but we really want to avoid not having enough disk space for the
|
||||
// compaction, so we apply a safety margin of 10%.
|
||||
defaultResultFileSizeMultiplier = float64(1.1)
|
||||
|
||||
// defaultTxMaxSize is the default maximum number of operations that
|
||||
// are allowed to be executed in a single transaction.
|
||||
defaultTxMaxSize = 65536
|
||||
|
||||
// bucketFillSize is the fill size setting that is used for each new
|
||||
// bucket that is created in the compacted database. This setting is not
|
||||
// persisted and is therefore only effective for the compaction itself.
|
||||
// Because during the compaction we only append data a fill percent of
|
||||
// 100% is optimal for performance.
|
||||
bucketFillSize = 1.0
|
||||
)
|
||||
|
||||
type compacter struct {
|
||||
srcPath string
|
||||
dstPath string
|
||||
txMaxSize int64
|
||||
}
|
||||
|
||||
// execute opens the source and destination databases and then compacts the
|
||||
// source into destination and returns the size of both files as a result.
|
||||
func (cmd *compacter) execute() (int64, int64, error) {
|
||||
if cmd.txMaxSize == 0 {
|
||||
cmd.txMaxSize = defaultTxMaxSize
|
||||
}
|
||||
|
||||
// Ensure source file exists.
|
||||
fi, err := os.Stat(cmd.srcPath)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("error determining source database "+
|
||||
"size: %v", err)
|
||||
}
|
||||
initialSize := fi.Size()
|
||||
marginSize := float64(initialSize) * defaultResultFileSizeMultiplier
|
||||
|
||||
// Before opening any of the databases, let's first make sure we have
|
||||
// enough free space on the destination file system to create a full
|
||||
// copy of the source DB (worst-case scenario if the compaction doesn't
|
||||
// actually shrink the file size).
|
||||
destFolder := path.Dir(cmd.dstPath)
|
||||
freeSpace, err := healthcheck.AvailableDiskSpace(destFolder)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("error determining free disk space on "+
|
||||
"%s: %v", destFolder, err)
|
||||
}
|
||||
log.Debugf("Free disk space on compaction destination file system: "+
|
||||
"%d bytes", freeSpace)
|
||||
if freeSpace < uint64(marginSize) {
|
||||
return 0, 0, fmt.Errorf("could not start compaction, "+
|
||||
"destination folder %s only has %d bytes of free disk "+
|
||||
"space available while we need at least %d for worst-"+
|
||||
"case compaction", destFolder, freeSpace, initialSize)
|
||||
}
|
||||
|
||||
// Open source database. We open it in read only mode to avoid (and fix)
|
||||
// possible freelist sync problems.
|
||||
src, err := bbolt.Open(cmd.srcPath, 0444, &bbolt.Options{
|
||||
ReadOnly: true,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("error opening source database: %v",
|
||||
err)
|
||||
}
|
||||
defer func() {
|
||||
if err := src.Close(); err != nil {
|
||||
log.Errorf("Compact error: closing source DB: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Open destination database.
|
||||
dst, err := bbolt.Open(cmd.dstPath, fi.Mode(), nil)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("error opening destination database: "+
|
||||
"%v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := dst.Close(); err != nil {
|
||||
log.Errorf("Compact error: closing dest DB: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Run compaction.
|
||||
if err := cmd.compact(dst, src); err != nil {
|
||||
return 0, 0, fmt.Errorf("error running compaction: %v", err)
|
||||
}
|
||||
|
||||
// Report stats on new size.
|
||||
fi, err = os.Stat(cmd.dstPath)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("error determining destination "+
|
||||
"database size: %v", err)
|
||||
} else if fi.Size() == 0 {
|
||||
return 0, 0, fmt.Errorf("zero db size")
|
||||
}
|
||||
|
||||
return initialSize, fi.Size(), nil
|
||||
}
|
||||
|
||||
// compact tries to create a compacted copy of the source database in a new
|
||||
// destination database.
|
||||
func (cmd *compacter) compact(dst, src *bbolt.DB) error {
|
||||
// Commit regularly, or we'll run out of memory for large datasets if
|
||||
// using one transaction.
|
||||
var size int64
|
||||
tx, err := dst.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
if err := cmd.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error {
|
||||
// On each key/value, check if we have exceeded tx size.
|
||||
sz := int64(len(k) + len(v))
|
||||
if size+sz > cmd.txMaxSize && cmd.txMaxSize != 0 {
|
||||
// Commit previous transaction.
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start new transaction.
|
||||
tx, err = dst.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size = 0
|
||||
}
|
||||
size += sz
|
||||
|
||||
// Create bucket on the root transaction if this is the first
|
||||
// level.
|
||||
nk := len(keys)
|
||||
if nk == 0 {
|
||||
bkt, err := tx.CreateBucket(k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := bkt.SetSequence(seq); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create buckets on subsequent levels, if necessary.
|
||||
b := tx.Bucket(keys[0])
|
||||
if nk > 1 {
|
||||
for _, k := range keys[1:] {
|
||||
b = b.Bucket(k)
|
||||
}
|
||||
}
|
||||
|
||||
// Fill the entire page for best compaction.
|
||||
b.FillPercent = bucketFillSize
|
||||
|
||||
// If there is no value then this is a bucket call.
|
||||
if v == nil {
|
||||
bkt, err := b.CreateBucket(k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := bkt.SetSequence(seq); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Otherwise treat it as a key/value pair.
|
||||
return b.Put(k, v)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// walkFunc is the type of the function called for keys (buckets and "normal"
|
||||
// values) discovered by Walk. keys is the list of keys to descend to the bucket
|
||||
// owning the discovered key/value pair k/v.
|
||||
type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error
|
||||
|
||||
// walk walks recursively the bolt database db, calling walkFn for each key it
|
||||
// finds.
|
||||
func (cmd *compacter) walk(db *bbolt.DB, walkFn walkFunc) error {
|
||||
return db.View(func(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||
// This will log the top level buckets only to give the
|
||||
// user some sense of progress.
|
||||
log.Debugf("Compacting top level bucket %s", name)
|
||||
|
||||
return cmd.walkBucket(
|
||||
b, nil, name, nil, b.Sequence(), walkFn,
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// walkBucket recursively walks through a bucket.
|
||||
func (cmd *compacter) walkBucket(b *bbolt.Bucket, keyPath [][]byte, k, v []byte,
|
||||
seq uint64, fn walkFunc) error {
|
||||
|
||||
// Execute callback.
|
||||
if err := fn(keyPath, k, v, seq); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If this is not a bucket then stop.
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Iterate over each child key/value.
|
||||
keyPath = append(keyPath, k)
|
||||
return b.ForEach(func(k, v []byte) error {
|
||||
if v == nil {
|
||||
bkt := b.Bucket(k)
|
||||
return cmd.walkBucket(
|
||||
bkt, keyPath, k, nil, bkt.Sequence(), fn,
|
||||
)
|
||||
}
|
||||
return cmd.walkBucket(b, keyPath, k, v, b.Sequence(), fn)
|
||||
})
|
||||
}
|
12
channeldb/kvdb/log.go
Normal file
12
channeldb/kvdb/log.go
Normal file
@ -0,0 +1,12 @@
|
||||
package kvdb
|
||||
|
||||
import "github.com/btcsuite/btclog"
|
||||
|
||||
// log is a logger that is initialized as disabled. This means the package will
|
||||
// not perform any logging by default until a logger is set.
|
||||
var log = btclog.Disabled
|
||||
|
||||
// UseLogger uses a specified Logger to output package logging info.
|
||||
func UseLogger(logger btclog.Logger) {
|
||||
log = logger
|
||||
}
|
@ -3,6 +3,7 @@ package channeldb
|
||||
import (
|
||||
"github.com/btcsuite/btclog"
|
||||
"github.com/lightningnetwork/lnd/build"
|
||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||
mig "github.com/lightningnetwork/lnd/channeldb/migration"
|
||||
"github.com/lightningnetwork/lnd/channeldb/migration12"
|
||||
"github.com/lightningnetwork/lnd/channeldb/migration13"
|
||||
@ -35,4 +36,5 @@ func UseLogger(logger btclog.Logger) {
|
||||
migration12.UseLogger(logger)
|
||||
migration13.UseLogger(logger)
|
||||
migration16.UseLogger(logger)
|
||||
kvdb.UseLogger(logger)
|
||||
}
|
||||
|
1
go.mod
1
go.mod
@ -63,6 +63,7 @@ require (
|
||||
github.com/tv42/zbase32 v0.0.0-20160707012821-501572607d02
|
||||
github.com/urfave/cli v1.18.0
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
|
||||
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
|
||||
go.uber.org/zap v1.14.1 // indirect
|
||||
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899
|
||||
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0
|
||||
|
Loading…
Reference in New Issue
Block a user