555 lines
15 KiB
Go
555 lines
15 KiB
Go
package channeldb
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/boltdb/bolt"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
"github.com/roasbeef/btcd/btcec"
|
|
"github.com/roasbeef/btcd/wire"
|
|
)
|
|
|
|
const (
|
|
dbName = "channel.db"
|
|
dbFilePermission = 0600
|
|
)
|
|
|
|
// migration is a function which takes a prior outdated version of the database
|
|
// instances and mutates the key/bucket structure to arrive at a more
|
|
// up-to-date version of the database.
|
|
type migration func(tx *bolt.Tx) error
|
|
|
|
type version struct {
|
|
number uint32
|
|
migration migration
|
|
}
|
|
|
|
var (
|
|
// dbVersions is storing all versions of database. If current version
|
|
// of database don't match with latest version this list will be used
|
|
// for retrieving all migration function that are need to apply to the
|
|
// current db.
|
|
dbVersions = []version{
|
|
{
|
|
// The base DB version requires no migration.
|
|
number: 0,
|
|
migration: nil,
|
|
},
|
|
}
|
|
|
|
// Big endian is the preferred byte order, due to cursor scans over
|
|
// integer keys iterating in order.
|
|
byteOrder = binary.BigEndian
|
|
)
|
|
|
|
var bufPool = &sync.Pool{
|
|
New: func() interface{} { return new(bytes.Buffer) },
|
|
}
|
|
|
|
// DB is the primary datastore for the lnd daemon. The database stores
|
|
// information related to nodes, routing data, open/closed channels, fee
|
|
// schedules, and reputation data.
|
|
type DB struct {
|
|
*bolt.DB
|
|
dbPath string
|
|
}
|
|
|
|
// Open opens an existing channeldb. Any necessary schemas migrations due to
|
|
// updates will take place as necessary.
|
|
func Open(dbPath string) (*DB, error) {
|
|
path := filepath.Join(dbPath, dbName)
|
|
|
|
if !fileExists(path) {
|
|
if err := createChannelDB(dbPath); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
bdb, err := bolt.Open(path, dbFilePermission, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
chanDB := &DB{
|
|
DB: bdb,
|
|
dbPath: dbPath,
|
|
}
|
|
|
|
// Synchronize the version of database and apply migrations if needed.
|
|
if err := chanDB.syncVersions(dbVersions); err != nil {
|
|
bdb.Close()
|
|
return nil, err
|
|
}
|
|
|
|
return chanDB, nil
|
|
}
|
|
|
|
// Wipe completely deletes all saved state within all used buckets within the
|
|
// database. The deletion is done in a single transaction, therefore this
|
|
// operation is fully atomic.
|
|
func (d *DB) Wipe() error {
|
|
return d.Update(func(tx *bolt.Tx) error {
|
|
err := tx.DeleteBucket(openChannelBucket)
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
return err
|
|
}
|
|
|
|
err = tx.DeleteBucket(closedChannelBucket)
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
return err
|
|
}
|
|
|
|
err = tx.DeleteBucket(invoiceBucket)
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
return err
|
|
}
|
|
|
|
err = tx.DeleteBucket(nodeInfoBucket)
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
return err
|
|
}
|
|
|
|
err = tx.DeleteBucket(nodeBucket)
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
return err
|
|
}
|
|
err = tx.DeleteBucket(edgeBucket)
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
return err
|
|
}
|
|
err = tx.DeleteBucket(edgeIndexBucket)
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
return err
|
|
}
|
|
err = tx.DeleteBucket(graphMetaBucket)
|
|
if err != nil && err != bolt.ErrBucketNotFound {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// createChannelDB creates and initializes a fresh version of channeldb. In
|
|
// the case that the target path has not yet been created or doesn't yet exist,
|
|
// then the path is created. Additionally, all required top-level buckets used
|
|
// within the database are created.
|
|
func createChannelDB(dbPath string) error {
|
|
if !fileExists(dbPath) {
|
|
if err := os.MkdirAll(dbPath, 0700); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
path := filepath.Join(dbPath, dbName)
|
|
bdb, err := bolt.Open(path, dbFilePermission, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = bdb.Update(func(tx *bolt.Tx) error {
|
|
if _, err := tx.CreateBucket(openChannelBucket); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.CreateBucket(closedChannelBucket); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.CreateBucket(invoiceBucket); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.CreateBucket(nodeInfoBucket); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.CreateBucket(nodeBucket); err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.CreateBucket(edgeBucket); err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.CreateBucket(edgeIndexBucket); err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.CreateBucket(graphMetaBucket); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.CreateBucket(metaBucket); err != nil {
|
|
return err
|
|
}
|
|
|
|
meta := &Meta{
|
|
DbVersionNumber: getLatestDBVersion(dbVersions),
|
|
}
|
|
return putMeta(meta, tx)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to create new channeldb")
|
|
}
|
|
|
|
return bdb.Close()
|
|
}
|
|
|
|
// fileExists returns true if the file exists, and false otherwise.
|
|
func fileExists(path string) bool {
|
|
if _, err := os.Stat(path); err != nil {
|
|
if os.IsNotExist(err) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// FetchOpenChannels returns all stored currently active/open channels
|
|
// associated with the target nodeID. In the case that no active channels are
|
|
// known to have been created with this node, then a zero-length slice is
|
|
// returned.
|
|
func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
|
|
var channels []*OpenChannel
|
|
err := d.View(func(tx *bolt.Tx) error {
|
|
// Get the bucket dedicated to storing the metadata for open
|
|
// channels.
|
|
openChanBucket := tx.Bucket(openChannelBucket)
|
|
if openChanBucket == nil {
|
|
return nil
|
|
}
|
|
|
|
// Within this top level bucket, fetch the bucket dedicated to storing
|
|
// open channel data specific to the remote node.
|
|
pub := nodeID.SerializeCompressed()
|
|
nodeChanBucket := openChanBucket.Bucket(pub)
|
|
if nodeChanBucket == nil {
|
|
return nil
|
|
}
|
|
|
|
// Finally, we both of the necessary buckets retrieved, fetch
|
|
// all the active channels related to this node.
|
|
nodeChannels, err := d.fetchNodeChannels(openChanBucket,
|
|
nodeChanBucket)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to read channel for "+
|
|
"node_key=%x: %v", pub, err)
|
|
}
|
|
|
|
channels = nodeChannels
|
|
return nil
|
|
})
|
|
|
|
return channels, err
|
|
}
|
|
|
|
// fetchNodeChannels retrieves all active channels from the target
|
|
// nodeChanBucket. This function is typically used to fetch all the active
|
|
// channels related to a particular node.
|
|
func (d *DB) fetchNodeChannels(openChanBucket,
|
|
nodeChanBucket *bolt.Bucket) ([]*OpenChannel, error) {
|
|
|
|
var channels []*OpenChannel
|
|
|
|
// Once we have the node's channel bucket, iterate through each
|
|
// item in the inner chan ID bucket. This bucket acts as an
|
|
// index for all channels we currently have open with this node.
|
|
nodeChanIDBucket := nodeChanBucket.Bucket(chanIDBucket[:])
|
|
if nodeChanIDBucket == nil {
|
|
return nil, nil
|
|
}
|
|
err := nodeChanIDBucket.ForEach(func(k, v []byte) error {
|
|
if k == nil {
|
|
return nil
|
|
}
|
|
|
|
outBytes := bytes.NewReader(k)
|
|
chanID := &wire.OutPoint{}
|
|
if err := readOutpoint(outBytes, chanID); err != nil {
|
|
return err
|
|
}
|
|
|
|
oChannel, err := fetchOpenChannel(openChanBucket,
|
|
nodeChanBucket, chanID)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to read channel data for "+
|
|
"chan_point=%v: %v", chanID, err)
|
|
}
|
|
oChannel.Db = d
|
|
|
|
channels = append(channels, oChannel)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return channels, nil
|
|
}
|
|
|
|
// FetchAllChannels attempts to retrieve all open channels currently stored
|
|
// within the database.
|
|
func (d *DB) FetchAllChannels() ([]*OpenChannel, error) {
|
|
return fetchChannels(d, false)
|
|
}
|
|
|
|
// FetchPendingChannels will return channels that have completed the process
|
|
// of generating and broadcasting funding transactions, but whose funding
|
|
// transactions have yet to be confirmed on the blockchain.
|
|
func (d *DB) FetchPendingChannels() ([]*OpenChannel, error) {
|
|
return fetchChannels(d, true)
|
|
}
|
|
|
|
// fetchChannels attempts to retrieve channels currently stored in the
|
|
// database. The pendingOnly parameter determines whether only pending
|
|
// channels will be returned. If no active channels exist within the network,
|
|
// then ErrNoActiveChannels is returned.
|
|
func fetchChannels(d *DB, pendingOnly bool) ([]*OpenChannel, error) {
|
|
var channels []*OpenChannel
|
|
|
|
err := d.View(func(tx *bolt.Tx) error {
|
|
// Get the bucket dedicated to storing the metadata for open
|
|
// channels.
|
|
openChanBucket := tx.Bucket(openChannelBucket)
|
|
if openChanBucket == nil {
|
|
return ErrNoActiveChannels
|
|
}
|
|
|
|
// Next, fetch the bucket dedicated to storing metadata
|
|
// related to all nodes. All keys within this bucket are the
|
|
// serialized public keys of all our direct counterparties.
|
|
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
|
|
if nodeMetaBucket == nil {
|
|
return fmt.Errorf("node bucket not created")
|
|
}
|
|
|
|
// Finally for each node public key in the bucket, fetch all
|
|
// the channels related to this particualr ndoe.
|
|
return nodeMetaBucket.ForEach(func(k, v []byte) error {
|
|
nodeChanBucket := openChanBucket.Bucket(k)
|
|
if nodeChanBucket == nil {
|
|
return nil
|
|
}
|
|
|
|
nodeChannels, err := d.fetchNodeChannels(openChanBucket,
|
|
nodeChanBucket)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to read channel for "+
|
|
"node_key=%x: %v", k, err)
|
|
}
|
|
// TODO(roasbeef): simplify
|
|
if pendingOnly {
|
|
for _, channel := range nodeChannels {
|
|
if channel.IsPending {
|
|
channels = append(channels, channel)
|
|
}
|
|
}
|
|
} else {
|
|
channels = append(channels, nodeChannels...)
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
|
|
return channels, err
|
|
}
|
|
|
|
// MarkChannelAsOpen records the finalization of the funding process and marks
|
|
// a channel as available for use. Additionally the height in which this
|
|
// channel as opened will also be recorded within the database.
|
|
func (d *DB) MarkChannelAsOpen(outpoint *wire.OutPoint,
|
|
openLoc lnwire.ShortChannelID) error {
|
|
|
|
return d.Update(func(tx *bolt.Tx) error {
|
|
openChanBucket := tx.Bucket(openChannelBucket)
|
|
if openChanBucket == nil {
|
|
return ErrNoActiveChannels
|
|
}
|
|
|
|
// Generate the database key, which will consist of the
|
|
// IsPending prefix followed by the channel's outpoint.
|
|
var b bytes.Buffer
|
|
if err := writeOutpoint(&b, outpoint); err != nil {
|
|
return err
|
|
}
|
|
keyPrefix := make([]byte, 3+b.Len())
|
|
copy(keyPrefix[3:], b.Bytes())
|
|
copy(keyPrefix[:3], isPendingPrefix)
|
|
|
|
// For the database value, store a zero, since the channel is
|
|
// no longer pending.
|
|
scratch := make([]byte, 4)
|
|
byteOrder.PutUint16(scratch[:2], uint16(0))
|
|
if err := openChanBucket.Put(keyPrefix, scratch[:2]); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Finally, we'll also store the opening height for this
|
|
// channel as well.
|
|
confInfoKey := make([]byte, len(confInfoPrefix)+len(b.Bytes()))
|
|
copy(confInfoKey[:len(confInfoPrefix)], confInfoPrefix)
|
|
copy(confInfoKey[len(confInfoPrefix):], b.Bytes())
|
|
|
|
confInfoBytes := openChanBucket.Get(confInfoKey)
|
|
infoCopy := make([]byte, len(confInfoBytes))
|
|
copy(infoCopy[:], confInfoBytes)
|
|
|
|
byteOrder.PutUint64(infoCopy[4:], openLoc.ToUint64())
|
|
|
|
return openChanBucket.Put(confInfoKey, infoCopy)
|
|
})
|
|
}
|
|
|
|
// FetchClosedChannels attempts to fetch all closed channels from the database.
|
|
// The pendingOnly bool toggles if channels that aren't yet fully closed should
|
|
// be returned int he response or not. When a channel was cooperatively closed,
|
|
// it becomes fully closed after a single confirmation. When a channel was
|
|
// forcibly closed, it will become fully closed after _all_ the pending funds
|
|
// (if any) have been swept.
|
|
func (d *DB) FetchClosedChannels(pendingOnly bool) ([]*ChannelCloseSummary, error) {
|
|
var chanSummaries []*ChannelCloseSummary
|
|
|
|
if err := d.View(func(tx *bolt.Tx) error {
|
|
closeBucket := tx.Bucket(closedChannelBucket)
|
|
if closeBucket == nil {
|
|
return ErrNoClosedChannels
|
|
}
|
|
|
|
return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
|
|
// The first byte of the summary is a bool which
|
|
// indicates if this channel is pending closure, or has
|
|
// been fully closed.
|
|
isPending := summaryBytes[0]
|
|
|
|
// If the query specified to only include pending
|
|
// channels, then we'll skip any channels which aren't
|
|
// currently pending.
|
|
if pendingOnly && isPending != 0x01 {
|
|
return nil
|
|
}
|
|
|
|
summaryReader := bytes.NewReader(summaryBytes)
|
|
chanSummary, err := deserializeCloseChannelSummary(summaryReader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
chanSummaries = append(chanSummaries, chanSummary)
|
|
return nil
|
|
})
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return chanSummaries, nil
|
|
}
|
|
|
|
// MarkChanFullyClosed marks a channel as fully closed within the database. A
|
|
// channel should be marked as fully closed if the channel was initially
|
|
// cooperatively closed and it's reach a single confirmation, or after all the
|
|
// pending funds in a channel that has been forcibly closed have been swept.
|
|
func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
|
|
return d.Update(func(tx *bolt.Tx) error {
|
|
var b bytes.Buffer
|
|
if err := writeOutpoint(&b, chanPoint); err != nil {
|
|
return err
|
|
}
|
|
|
|
chanID := b.Bytes()
|
|
|
|
closedChanBucket, err := tx.CreateBucketIfNotExists(closedChannelBucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
chanSummary := closedChanBucket.Get(chanID)
|
|
if chanSummary == nil {
|
|
return fmt.Errorf("no closed channel by that chanID found")
|
|
}
|
|
|
|
newSummary := make([]byte, len(chanSummary))
|
|
copy(newSummary[:], chanSummary[:])
|
|
newSummary[0] = 0x00
|
|
|
|
return closedChanBucket.Put(chanID, newSummary)
|
|
})
|
|
}
|
|
|
|
// syncVersions function is used for safe db version synchronization. It applies
|
|
// migration functions to the current database and recovers the previous
|
|
// state of db if at least one error/panic appeared during migration.
|
|
func (d *DB) syncVersions(versions []version) error {
|
|
meta, err := d.FetchMeta(nil)
|
|
if err != nil {
|
|
if err == ErrMetaNotFound {
|
|
meta = &Meta{}
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// If the current database version matches the latest version number,
|
|
// then we don't need to perform any migrations.
|
|
latestVersion := getLatestDBVersion(versions)
|
|
log.Infof("Checking for schema update: latest_version=%v, "+
|
|
"db_version=%v", latestVersion, meta.DbVersionNumber)
|
|
if meta.DbVersionNumber == latestVersion {
|
|
return nil
|
|
}
|
|
|
|
log.Infof("Performing database schema migration")
|
|
|
|
// Otherwise, we fetch the migrations which need to applied, and
|
|
// execute them serially within a single database transaction to ensure
|
|
// the migration is atomic.
|
|
migrations, migrationVersions := getMigrationsToApply(versions,
|
|
meta.DbVersionNumber)
|
|
return d.Update(func(tx *bolt.Tx) error {
|
|
for i, migration := range migrations {
|
|
if migration == nil {
|
|
continue
|
|
}
|
|
|
|
log.Infof("Applying migration #%v", migrationVersions[i])
|
|
|
|
if err := migration(tx); err != nil {
|
|
log.Infof("Unable to apply migration #%v",
|
|
migrationVersions[i])
|
|
return err
|
|
}
|
|
}
|
|
|
|
meta.DbVersionNumber = latestVersion
|
|
return putMeta(meta, tx)
|
|
})
|
|
}
|
|
|
|
// ChannelGraph returns a new instance of the directed channel graph.
|
|
func (d *DB) ChannelGraph() *ChannelGraph {
|
|
return &ChannelGraph{d}
|
|
}
|
|
|
|
func getLatestDBVersion(versions []version) uint32 {
|
|
return versions[len(versions)-1].number
|
|
}
|
|
|
|
// getMigrationsToApply retrieves the migration function that should be
|
|
// applied to the database.
|
|
func getMigrationsToApply(versions []version, version uint32) ([]migration, []uint32) {
|
|
migrations := make([]migration, 0, len(versions))
|
|
migrationVersions := make([]uint32, 0, len(versions))
|
|
|
|
for _, v := range versions {
|
|
if v.number > version {
|
|
migrations = append(migrations, v.migration)
|
|
migrationVersions = append(migrationVersions, v.number)
|
|
}
|
|
}
|
|
|
|
return migrations, migrationVersions
|
|
}
|