lnd.xprv/channeldb/db.go
Wilmer Paulino 70d3fc640a channeldb/db: prevent filtering channels with unconfirmed close txs
In this commit, we address an issue with the FetchWaitingCloseChannels
method where it would not properly return channels that are unconfirmed
and also have an unconfirmed closing transaction because they were
filtered out. We fix this by fetching channels that remain unconfirmed
that are also waiting for a confirmed closing transaction.

This will allow the recently added test TestFetchWaitingCloseChannels to
pass.
2019-01-03 20:48:28 -05:00

861 lines
23 KiB
Go

package channeldb
import (
"bytes"
"encoding/binary"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/lnwire"
)
const (
dbName = "channel.db"
dbFilePermission = 0600
)
// migration is a function which takes a prior outdated version of the database
// instances and mutates the key/bucket structure to arrive at a more
// up-to-date version of the database.
type migration func(tx *bbolt.Tx) error
type version struct {
number uint32
migration migration
}
var (
// dbVersions is storing all versions of database. If current version
// of database don't match with latest version this list will be used
// for retrieving all migration function that are need to apply to the
// current db.
dbVersions = []version{
{
// The base DB version requires no migration.
number: 0,
migration: nil,
},
{
// The version of the database where two new indexes
// for the update time of node and channel updates were
// added.
number: 1,
migration: migrateNodeAndEdgeUpdateIndex,
},
{
// The DB version that added the invoice event time
// series.
number: 2,
migration: migrateInvoiceTimeSeries,
},
{
// The DB version that updated the embedded invoice in
// outgoing payments to match the new format.
number: 3,
migration: migrateInvoiceTimeSeriesOutgoingPayments,
},
{
// The version of the database where every channel
// always has two entries in the edges bucket. If
// a policy is unknown, this will be represented
// by a special byte sequence.
number: 4,
migration: migrateEdgePolicies,
},
{
// The DB version where we persist each attempt to send
// an HTLC to a payment hash, and track whether the
// payment is in-flight, succeeded, or failed.
number: 5,
migration: paymentStatusesMigration,
},
{
// The DB version that properly prunes stale entries
// from the edge update index.
number: 6,
migration: migratePruneEdgeUpdateIndex,
},
{
// The DB version that migrates the ChannelCloseSummary
// to a format where optional fields are indicated with
// boolean flags.
number: 7,
migration: migrateOptionalChannelCloseSummaryFields,
},
}
// Big endian is the preferred byte order, due to cursor scans over
// integer keys iterating in order.
byteOrder = binary.BigEndian
)
var bufPool = &sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
// DB is the primary datastore for the lnd daemon. The database stores
// information related to nodes, routing data, open/closed channels, fee
// schedules, and reputation data.
type DB struct {
*bbolt.DB
dbPath string
}
// Open opens an existing channeldb. Any necessary schemas migrations due to
// updates will take place as necessary.
func Open(dbPath string) (*DB, error) {
path := filepath.Join(dbPath, dbName)
if !fileExists(path) {
if err := createChannelDB(dbPath); err != nil {
return nil, err
}
}
bdb, err := bbolt.Open(path, dbFilePermission, nil)
if err != nil {
return nil, err
}
chanDB := &DB{
DB: bdb,
dbPath: dbPath,
}
// Synchronize the version of database and apply migrations if needed.
if err := chanDB.syncVersions(dbVersions); err != nil {
bdb.Close()
return nil, err
}
return chanDB, nil
}
// Path returns the file path to the channel database.
func (d *DB) Path() string {
return d.dbPath
}
// Wipe completely deletes all saved state within all used buckets within the
// database. The deletion is done in a single transaction, therefore this
// operation is fully atomic.
func (d *DB) Wipe() error {
return d.Update(func(tx *bbolt.Tx) error {
err := tx.DeleteBucket(openChannelBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
err = tx.DeleteBucket(closedChannelBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
err = tx.DeleteBucket(invoiceBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
err = tx.DeleteBucket(nodeInfoBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
err = tx.DeleteBucket(nodeBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
err = tx.DeleteBucket(edgeBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
err = tx.DeleteBucket(edgeIndexBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
err = tx.DeleteBucket(graphMetaBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
return nil
})
}
// createChannelDB creates and initializes a fresh version of channeldb. In
// the case that the target path has not yet been created or doesn't yet exist,
// then the path is created. Additionally, all required top-level buckets used
// within the database are created.
func createChannelDB(dbPath string) error {
if !fileExists(dbPath) {
if err := os.MkdirAll(dbPath, 0700); err != nil {
return err
}
}
path := filepath.Join(dbPath, dbName)
bdb, err := bbolt.Open(path, dbFilePermission, nil)
if err != nil {
return err
}
err = bdb.Update(func(tx *bbolt.Tx) error {
if _, err := tx.CreateBucket(openChannelBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(closedChannelBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(forwardingLogBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(fwdPackagesKey); err != nil {
return err
}
if _, err := tx.CreateBucket(invoiceBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(paymentBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(nodeInfoBucket); err != nil {
return err
}
nodes, err := tx.CreateBucket(nodeBucket)
if err != nil {
return err
}
_, err = nodes.CreateBucket(aliasIndexBucket)
if err != nil {
return err
}
_, err = nodes.CreateBucket(nodeUpdateIndexBucket)
if err != nil {
return err
}
edges, err := tx.CreateBucket(edgeBucket)
if err != nil {
return err
}
if _, err := edges.CreateBucket(edgeIndexBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(edgeUpdateIndexBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(channelPointBucket); err != nil {
return err
}
graphMeta, err := tx.CreateBucket(graphMetaBucket)
if err != nil {
return err
}
_, err = graphMeta.CreateBucket(pruneLogBucket)
if err != nil {
return err
}
if _, err := tx.CreateBucket(metaBucket); err != nil {
return err
}
meta := &Meta{
DbVersionNumber: getLatestDBVersion(dbVersions),
}
return putMeta(meta, tx)
})
if err != nil {
return fmt.Errorf("unable to create new channeldb")
}
return bdb.Close()
}
// fileExists returns true if the file exists, and false otherwise.
func fileExists(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
// FetchOpenChannels starts a new database transaction and returns all stored
// currently active/open channels associated with the target nodeID. In the case
// that no active channels are known to have been created with this node, then a
// zero-length slice is returned.
func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
var channels []*OpenChannel
err := d.View(func(tx *bbolt.Tx) error {
var err error
channels, err = d.fetchOpenChannels(tx, nodeID)
return err
})
return channels, err
}
// fetchOpenChannels uses and existing database transaction and returns all
// stored currently active/open channels associated with the target nodeID. In
// the case that no active channels are known to have been created with this
// node, then a zero-length slice is returned.
func (d *DB) fetchOpenChannels(tx *bbolt.Tx,
nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
// Get the bucket dedicated to storing the metadata for open channels.
openChanBucket := tx.Bucket(openChannelBucket)
if openChanBucket == nil {
return nil, nil
}
// Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node.
pub := nodeID.SerializeCompressed()
nodeChanBucket := openChanBucket.Bucket(pub)
if nodeChanBucket == nil {
return nil, nil
}
// Next, we'll need to go down an additional layer in order to retrieve
// the channels for each chain the node knows of.
var channels []*OpenChannel
err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
// If there's a value, it's not a bucket so ignore it.
if v != nil {
return nil
}
// If we've found a valid chainhash bucket, then we'll retrieve
// that so we can extract all the channels.
chainBucket := nodeChanBucket.Bucket(chainHash)
if chainBucket == nil {
return fmt.Errorf("unable to read bucket for chain=%x",
chainHash[:])
}
// Finally, we both of the necessary buckets retrieved, fetch
// all the active channels related to this node.
nodeChannels, err := d.fetchNodeChannels(chainBucket)
if err != nil {
return fmt.Errorf("unable to read channel for "+
"chain_hash=%x, node_key=%x: %v",
chainHash[:], pub, err)
}
channels = append(channels, nodeChannels...)
return nil
})
return channels, err
}
// fetchNodeChannels retrieves all active channels from the target chainBucket
// which is under a node's dedicated channel bucket. This function is typically
// used to fetch all the active channels related to a particular node.
func (d *DB) fetchNodeChannels(chainBucket *bbolt.Bucket) ([]*OpenChannel, error) {
var channels []*OpenChannel
// A node may have channels on several chains, so for each known chain,
// we'll extract all the channels.
err := chainBucket.ForEach(func(chanPoint, v []byte) error {
// If there's a value, it's not a bucket so ignore it.
if v != nil {
return nil
}
// Once we've found a valid channel bucket, we'll extract it
// from the node's chain bucket.
chanBucket := chainBucket.Bucket(chanPoint)
var outPoint wire.OutPoint
err := readOutpoint(bytes.NewReader(chanPoint), &outPoint)
if err != nil {
return err
}
oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
if err != nil {
return fmt.Errorf("unable to read channel data for "+
"chan_point=%v: %v", outPoint, err)
}
oChannel.Db = d
channels = append(channels, oChannel)
return nil
})
if err != nil {
return nil, err
}
return channels, nil
}
// FetchAllChannels attempts to retrieve all open channels currently stored
// within the database, including pending open, fully open and channels waiting
// for a closing transaction to confirm.
func (d *DB) FetchAllChannels() ([]*OpenChannel, error) {
var channels []*OpenChannel
// TODO(halseth): fetch all in one db tx.
openChannels, err := d.FetchAllOpenChannels()
if err != nil {
return nil, err
}
channels = append(channels, openChannels...)
pendingChannels, err := d.FetchPendingChannels()
if err != nil {
return nil, err
}
channels = append(channels, pendingChannels...)
waitingClose, err := d.FetchWaitingCloseChannels()
if err != nil {
return nil, err
}
channels = append(channels, waitingClose...)
return channels, nil
}
// FetchAllOpenChannels will return all channels that have the funding
// transaction confirmed, and is not waiting for a closing transaction to be
// confirmed.
func (d *DB) FetchAllOpenChannels() ([]*OpenChannel, error) {
return fetchChannels(d, false, false)
}
// FetchPendingChannels will return channels that have completed the process of
// generating and broadcasting funding transactions, but whose funding
// transactions have yet to be confirmed on the blockchain.
func (d *DB) FetchPendingChannels() ([]*OpenChannel, error) {
return fetchChannels(d, true, false)
}
// FetchWaitingCloseChannels will return all channels that have been opened,
// but are now waiting for a closing transaction to be confirmed.
//
// NOTE: This includes channels that are also pending to be opened.
func (d *DB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
waitingClose, err := fetchChannels(d, false, true)
if err != nil {
return nil, err
}
pendingWaitingClose, err := fetchChannels(d, true, true)
if err != nil {
return nil, err
}
return append(waitingClose, pendingWaitingClose...), nil
}
// fetchChannels attempts to retrieve channels currently stored in the
// database. The pending parameter determines whether only pending channels
// will be returned, or only open channels will be returned. The waitingClose
// parameter determines whether only channels waiting for a closing transaction
// to be confirmed should be returned. If no active channels exist within the
// network, then ErrNoActiveChannels is returned.
func fetchChannels(d *DB, pending, waitingClose bool) ([]*OpenChannel, error) {
var channels []*OpenChannel
err := d.View(func(tx *bbolt.Tx) error {
// Get the bucket dedicated to storing the metadata for open
// channels.
openChanBucket := tx.Bucket(openChannelBucket)
if openChanBucket == nil {
return ErrNoActiveChannels
}
// Next, fetch the bucket dedicated to storing metadata related
// to all nodes. All keys within this bucket are the serialized
// public keys of all our direct counterparties.
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
if nodeMetaBucket == nil {
return fmt.Errorf("node bucket not created")
}
// Finally for each node public key in the bucket, fetch all
// the channels related to this particular node.
return nodeMetaBucket.ForEach(func(k, v []byte) error {
nodeChanBucket := openChanBucket.Bucket(k)
if nodeChanBucket == nil {
return nil
}
return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
// If there's a value, it's not a bucket so
// ignore it.
if v != nil {
return nil
}
// If we've found a valid chainhash bucket,
// then we'll retrieve that so we can extract
// all the channels.
chainBucket := nodeChanBucket.Bucket(chainHash)
if chainBucket == nil {
return fmt.Errorf("unable to read "+
"bucket for chain=%x", chainHash[:])
}
nodeChans, err := d.fetchNodeChannels(chainBucket)
if err != nil {
return fmt.Errorf("unable to read "+
"channel for chain_hash=%x, "+
"node_key=%x: %v", chainHash[:], k, err)
}
for _, channel := range nodeChans {
if channel.IsPending != pending {
continue
}
// If the channel is in any other state
// than Default, then it means it is
// waiting to be closed.
channelWaitingClose :=
channel.ChanStatus() != Default
// Only include it if we requested
// channels with the same waitingClose
// status.
if channelWaitingClose != waitingClose {
continue
}
channels = append(channels, channel)
}
return nil
})
})
})
if err != nil {
return nil, err
}
return channels, nil
}
// FetchClosedChannels attempts to fetch all closed channels from the database.
// The pendingOnly bool toggles if channels that aren't yet fully closed should
// be returned in the response or not. When a channel was cooperatively closed,
// it becomes fully closed after a single confirmation. When a channel was
// forcibly closed, it will become fully closed after _all_ the pending funds
// (if any) have been swept.
func (d *DB) FetchClosedChannels(pendingOnly bool) ([]*ChannelCloseSummary, error) {
var chanSummaries []*ChannelCloseSummary
if err := d.View(func(tx *bbolt.Tx) error {
closeBucket := tx.Bucket(closedChannelBucket)
if closeBucket == nil {
return ErrNoClosedChannels
}
return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
summaryReader := bytes.NewReader(summaryBytes)
chanSummary, err := deserializeCloseChannelSummary(summaryReader)
if err != nil {
return err
}
// If the query specified to only include pending
// channels, then we'll skip any channels which aren't
// currently pending.
if !chanSummary.IsPending && pendingOnly {
return nil
}
chanSummaries = append(chanSummaries, chanSummary)
return nil
})
}); err != nil {
return nil, err
}
return chanSummaries, nil
}
// ErrClosedChannelNotFound signals that a closed channel could not be found in
// the channeldb.
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
// FetchClosedChannel queries for a channel close summary using the channel
// point of the channel in question.
func (d *DB) FetchClosedChannel(chanID *wire.OutPoint) (*ChannelCloseSummary, error) {
var chanSummary *ChannelCloseSummary
if err := d.View(func(tx *bbolt.Tx) error {
closeBucket := tx.Bucket(closedChannelBucket)
if closeBucket == nil {
return ErrClosedChannelNotFound
}
var b bytes.Buffer
var err error
if err = writeOutpoint(&b, chanID); err != nil {
return err
}
summaryBytes := closeBucket.Get(b.Bytes())
if summaryBytes == nil {
return ErrClosedChannelNotFound
}
summaryReader := bytes.NewReader(summaryBytes)
chanSummary, err = deserializeCloseChannelSummary(summaryReader)
return err
}); err != nil {
return nil, err
}
return chanSummary, nil
}
// FetchClosedChannelForID queries for a channel close summary using the
// channel ID of the channel in question.
func (d *DB) FetchClosedChannelForID(cid lnwire.ChannelID) (
*ChannelCloseSummary, error) {
var chanSummary *ChannelCloseSummary
if err := d.View(func(tx *bbolt.Tx) error {
closeBucket := tx.Bucket(closedChannelBucket)
if closeBucket == nil {
return ErrClosedChannelNotFound
}
// The first 30 bytes of the channel ID and outpoint will be
// equal.
cursor := closeBucket.Cursor()
op, c := cursor.Seek(cid[:30])
// We scan over all possible candidates for this channel ID.
for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
var outPoint wire.OutPoint
err := readOutpoint(bytes.NewReader(op), &outPoint)
if err != nil {
return err
}
// If the found outpoint does not correspond to this
// channel ID, we continue.
if !cid.IsChanPoint(&outPoint) {
continue
}
// Deserialize the close summary and return.
r := bytes.NewReader(c)
chanSummary, err = deserializeCloseChannelSummary(r)
if err != nil {
return err
}
return nil
}
return ErrClosedChannelNotFound
}); err != nil {
return nil, err
}
return chanSummary, nil
}
// MarkChanFullyClosed marks a channel as fully closed within the database. A
// channel should be marked as fully closed if the channel was initially
// cooperatively closed and it's reached a single confirmation, or after all
// the pending funds in a channel that has been forcibly closed have been
// swept.
func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
return d.Update(func(tx *bbolt.Tx) error {
var b bytes.Buffer
if err := writeOutpoint(&b, chanPoint); err != nil {
return err
}
chanID := b.Bytes()
closedChanBucket, err := tx.CreateBucketIfNotExists(
closedChannelBucket,
)
if err != nil {
return err
}
chanSummaryBytes := closedChanBucket.Get(chanID)
if chanSummaryBytes == nil {
return fmt.Errorf("no closed channel for "+
"chan_point=%v found", chanPoint)
}
chanSummaryReader := bytes.NewReader(chanSummaryBytes)
chanSummary, err := deserializeCloseChannelSummary(
chanSummaryReader,
)
if err != nil {
return err
}
chanSummary.IsPending = false
var newSummary bytes.Buffer
err = serializeChannelCloseSummary(&newSummary, chanSummary)
if err != nil {
return err
}
err = closedChanBucket.Put(chanID, newSummary.Bytes())
if err != nil {
return err
}
// Now that the channel is closed, we'll check if we have any
// other open channels with this peer. If we don't we'll
// garbage collect it to ensure we don't establish persistent
// connections to peers without open channels.
return d.pruneLinkNode(tx, chanSummary.RemotePub)
})
}
// pruneLinkNode determines whether we should garbage collect a link node from
// the database due to no longer having any open channels with it. If there are
// any left, then this acts as a no-op.
func (d *DB) pruneLinkNode(tx *bbolt.Tx, remotePub *btcec.PublicKey) error {
openChannels, err := d.fetchOpenChannels(tx, remotePub)
if err != nil {
return fmt.Errorf("unable to fetch open channels for peer %x: "+
"%v", remotePub.SerializeCompressed(), err)
}
if len(openChannels) > 0 {
return nil
}
log.Infof("Pruning link node %x with zero open channels from database",
remotePub.SerializeCompressed())
return d.deleteLinkNode(tx, remotePub)
}
// PruneLinkNodes attempts to prune all link nodes found within the databse with
// whom we no longer have any open channels with.
func (d *DB) PruneLinkNodes() error {
return d.Update(func(tx *bbolt.Tx) error {
linkNodes, err := d.fetchAllLinkNodes(tx)
if err != nil {
return err
}
for _, linkNode := range linkNodes {
err := d.pruneLinkNode(tx, linkNode.IdentityPub)
if err != nil {
return err
}
}
return nil
})
}
// syncVersions function is used for safe db version synchronization. It
// applies migration functions to the current database and recovers the
// previous state of db if at least one error/panic appeared during migration.
func (d *DB) syncVersions(versions []version) error {
meta, err := d.FetchMeta(nil)
if err != nil {
if err == ErrMetaNotFound {
meta = &Meta{}
} else {
return err
}
}
latestVersion := getLatestDBVersion(versions)
log.Infof("Checking for schema update: latest_version=%v, "+
"db_version=%v", latestVersion, meta.DbVersionNumber)
switch {
// If the database reports a higher version that we are aware of, the
// user is probably trying to revert to a prior version of lnd. We fail
// here to prevent reversions and unintended corruption.
case meta.DbVersionNumber > latestVersion:
log.Errorf("Refusing to revert from db_version=%d to "+
"lower version=%d", meta.DbVersionNumber,
latestVersion)
return ErrDBReversion
// If the current database version matches the latest version number,
// then we don't need to perform any migrations.
case meta.DbVersionNumber == latestVersion:
return nil
}
log.Infof("Performing database schema migration")
// Otherwise, we fetch the migrations which need to applied, and
// execute them serially within a single database transaction to ensure
// the migration is atomic.
migrations, migrationVersions := getMigrationsToApply(
versions, meta.DbVersionNumber,
)
return d.Update(func(tx *bbolt.Tx) error {
for i, migration := range migrations {
if migration == nil {
continue
}
log.Infof("Applying migration #%v", migrationVersions[i])
if err := migration(tx); err != nil {
log.Infof("Unable to apply migration #%v",
migrationVersions[i])
return err
}
}
meta.DbVersionNumber = latestVersion
return putMeta(meta, tx)
})
}
// ChannelGraph returns a new instance of the directed channel graph.
func (d *DB) ChannelGraph() *ChannelGraph {
return &ChannelGraph{d}
}
func getLatestDBVersion(versions []version) uint32 {
return versions[len(versions)-1].number
}
// getMigrationsToApply retrieves the migration function that should be
// applied to the database.
func getMigrationsToApply(versions []version, version uint32) ([]migration, []uint32) {
migrations := make([]migration, 0, len(versions))
migrationVersions := make([]uint32, 0, len(versions))
for _, v := range versions {
if v.number > version {
migrations = append(migrations, v.migration)
migrationVersions = append(migrationVersions, v.number)
}
}
return migrations, migrationVersions
}