lnd: convert to use new kvdb abstraction

This commit is contained in:
Olaoluwa Osuntokun 2020-01-09 18:47:38 -08:00
parent 21521ff610
commit 071c7cbe78
No known key found for this signature in database
GPG Key ID: BC13F65E2DC84465
5 changed files with 193 additions and 101 deletions

@ -13,11 +13,11 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
@ -1237,10 +1237,10 @@ func newRetributionStore(db *channeldb.DB) *retributionStore {
// Add adds a retribution state to the retributionStore, which is then persisted
// to disk.
func (rs *retributionStore) Add(ret *retributionInfo) error {
return rs.db.Update(func(tx *bbolt.Tx) error {
return kvdb.Update(rs.db, func(tx kvdb.RwTx) error {
// If this is our first contract breach, the retributionBucket
// won't exist, in which case, we just create a new bucket.
retBucket, err := tx.CreateBucketIfNotExists(retributionBucket)
retBucket, err := tx.CreateTopLevelBucket(retributionBucket)
if err != nil {
return err
}
@ -1264,8 +1264,8 @@ func (rs *retributionStore) Add(ret *retributionInfo) error {
// startup and re-register for confirmation notifications.
func (rs *retributionStore) Finalize(chanPoint *wire.OutPoint,
finalTx *wire.MsgTx) error {
return rs.db.Update(func(tx *bbolt.Tx) error {
justiceBkt, err := tx.CreateBucketIfNotExists(justiceTxnBucket)
return kvdb.Update(rs.db, func(tx kvdb.RwTx) error {
justiceBkt, err := tx.CreateTopLevelBucket(justiceTxnBucket)
if err != nil {
return err
}
@ -1291,8 +1291,8 @@ func (rs *retributionStore) GetFinalizedTxn(
chanPoint *wire.OutPoint) (*wire.MsgTx, error) {
var finalTxBytes []byte
if err := rs.db.View(func(tx *bbolt.Tx) error {
justiceBkt := tx.Bucket(justiceTxnBucket)
if err := kvdb.View(rs.db, func(tx kvdb.ReadTx) error {
justiceBkt := tx.ReadBucket(justiceTxnBucket)
if justiceBkt == nil {
return nil
}
@ -1325,8 +1325,8 @@ func (rs *retributionStore) GetFinalizedTxn(
// that has already been breached.
func (rs *retributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
var found bool
err := rs.db.View(func(tx *bbolt.Tx) error {
retBucket := tx.Bucket(retributionBucket)
err := kvdb.View(rs.db, func(tx kvdb.ReadTx) error {
retBucket := tx.ReadBucket(retributionBucket)
if retBucket == nil {
return nil
}
@ -1350,8 +1350,8 @@ func (rs *retributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
// Remove removes a retribution state and finalized justice transaction by
// channel point from the retribution store.
func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error {
return rs.db.Update(func(tx *bbolt.Tx) error {
retBucket := tx.Bucket(retributionBucket)
return kvdb.Update(rs.db, func(tx kvdb.RwTx) error {
retBucket := tx.ReadWriteBucket(retributionBucket)
// We return an error if the bucket is not already created,
// since normal operation of the breach arbiter should never try
@ -1377,7 +1377,7 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error {
// If we have not finalized this channel breach, we can exit
// early.
justiceBkt := tx.Bucket(justiceTxnBucket)
justiceBkt := tx.ReadWriteBucket(justiceTxnBucket)
if justiceBkt == nil {
return nil
}
@ -1389,10 +1389,10 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error {
// ForAll iterates through all stored retributions and executes the passed
// callback function on each retribution.
func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error {
return rs.db.View(func(tx *bbolt.Tx) error {
return kvdb.View(rs.db, func(tx kvdb.ReadTx) error {
// If the bucket does not exist, then there are no pending
// retributions.
retBucket := tx.Bucket(retributionBucket)
retBucket := tx.ReadBucket(retributionBucket)
if retBucket == nil {
return nil
}

@ -12,12 +12,12 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
@ -3287,9 +3287,9 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
// chanPoint to the channelOpeningStateBucket.
func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint,
state channelOpeningState, shortChanID *lnwire.ShortChannelID) error {
return f.cfg.Wallet.Cfg.Database.Update(func(tx *bbolt.Tx) error {
return kvdb.Update(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RwTx) error {
bucket, err := tx.CreateBucketIfNotExists(channelOpeningStateBucket)
bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
if err != nil {
return err
}
@ -3317,9 +3317,9 @@ func (f *fundingManager) getChannelOpeningState(chanPoint *wire.OutPoint) (
var state channelOpeningState
var shortChanID lnwire.ShortChannelID
err := f.cfg.Wallet.Cfg.Database.View(func(tx *bbolt.Tx) error {
err := kvdb.View(f.cfg.Wallet.Cfg.Database, func(tx kvdb.ReadTx) error {
bucket := tx.Bucket(channelOpeningStateBucket)
bucket := tx.ReadBucket(channelOpeningStateBucket)
if bucket == nil {
// If the bucket does not exist, it means we never added
// a channel to the db, so return ErrChannelNotFound.
@ -3349,8 +3349,8 @@ func (f *fundingManager) getChannelOpeningState(chanPoint *wire.OutPoint) (
// deleteChannelOpeningState removes any state for chanPoint from the database.
func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) error {
return f.cfg.Wallet.Cfg.Database.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(channelOpeningStateBucket)
return kvdb.Update(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
if bucket == nil {
return fmt.Errorf("Bucket not found")
}

@ -7,8 +7,8 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
)
// Overview of Nursery Store Storage Hierarchy
@ -263,7 +263,7 @@ func newNurseryStore(chainHash *chainhash.Hash,
// CSV-delayed outputs (commitment and incoming HTLC's), commitment output and
// a list of outgoing two-stage htlc outputs.
func (ns *nurseryStore) Incubate(kids []kidOutput, babies []babyOutput) error {
return ns.db.Update(func(tx *bbolt.Tx) error {
return kvdb.Update(ns.db, func(tx kvdb.RwTx) error {
// If we have any kid outputs to incubate, then we'll attempt
// to add each of them to the nursery store. Any duplicate
// outputs will be ignored.
@ -290,7 +290,7 @@ func (ns *nurseryStore) Incubate(kids []kidOutput, babies []babyOutput) error {
// kindergarten bucket. The now mature kidOutput contained in the babyOutput
// will be stored as it waits out the kidOutput's CSV delay.
func (ns *nurseryStore) CribToKinder(bby *babyOutput) error {
return ns.db.Update(func(tx *bbolt.Tx) error {
return kvdb.Update(ns.db, func(tx kvdb.RwTx) error {
// First, retrieve or create the channel bucket corresponding to
// the baby output's origin channel point.
@ -374,7 +374,7 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error {
func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput,
lastGradHeight uint32) error {
return ns.db.Update(func(tx *bbolt.Tx) error {
return kvdb.Update(ns.db, func(tx kvdb.RwTx) error {
// Create or retrieve the channel bucket corresponding to the
// kid output's origin channel point.
chanPoint := kid.OriginChanPoint()
@ -471,7 +471,7 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput,
// the height and channel indexes. The height bucket will be opportunistically
// pruned from the height index as outputs are removed.
func (ns *nurseryStore) GraduateKinder(height uint32, kid *kidOutput) error {
return ns.db.Update(func(tx *bbolt.Tx) error {
return kvdb.Update(ns.db, func(tx kvdb.RwTx) error {
hghtBucket := ns.getHeightBucket(tx, height)
if hghtBucket == nil {
@ -501,8 +501,7 @@ func (ns *nurseryStore) GraduateKinder(height uint32, kid *kidOutput) error {
return err
}
chanBucket := ns.getChannelBucket(tx,
chanPoint)
chanBucket := ns.getChannelBucketWrite(tx, chanPoint)
if chanBucket == nil {
return ErrContractNotFound
}
@ -540,7 +539,7 @@ func (ns *nurseryStore) FetchClass(
// processed at the provided block height.
var kids []kidOutput
var babies []babyOutput
if err := ns.db.View(func(tx *bbolt.Tx) error {
if err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
// Append each crib output to our list of babyOutputs.
if err := ns.forEachHeightPrefix(tx, cribPrefix, height,
func(buf []byte) error {
@ -594,16 +593,16 @@ func (ns *nurseryStore) FetchClass(
// preschool bucket.
func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) {
var kids []kidOutput
if err := ns.db.View(func(tx *bbolt.Tx) error {
if err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
chainBucket := tx.ReadBucket(ns.pfxChainKey)
if chainBucket == nil {
return nil
}
// Load the existing channel index from the chain bucket.
chanIndex := chainBucket.Bucket(channelIndexKey)
chanIndex := chainBucket.NestedReadBucket(channelIndexKey)
if chanIndex == nil {
return nil
}
@ -626,7 +625,7 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) {
for _, chanBytes := range activeChannels {
// Retrieve the channel bucket associated with this
// channel.
chanBucket := chanIndex.Bucket(chanBytes)
chanBucket := chanIndex.NestedReadBucket(chanBytes)
if chanBucket == nil {
continue
}
@ -635,7 +634,7 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) {
// "pscl" prefix. So, we will perform a prefix scan of
// the channel bucket to efficiently enumerate all the
// desired outputs.
c := chanBucket.Cursor()
c := chanBucket.ReadCursor()
for k, v := c.Seek(psclPrefix); bytes.HasPrefix(
k, psclPrefix); k, v = c.Next() {
@ -667,16 +666,16 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) {
// index at or below the provided upper bound.
func (ns *nurseryStore) HeightsBelowOrEqual(height uint32) ([]uint32, error) {
var activeHeights []uint32
err := ns.db.View(func(tx *bbolt.Tx) error {
err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
// Ensure that the chain bucket for this nursery store exists.
chainBucket := tx.Bucket(ns.pfxChainKey)
chainBucket := tx.ReadBucket(ns.pfxChainKey)
if chainBucket == nil {
return nil
}
// Ensure that the height index has been properly initialized for this
// chain.
hghtIndex := chainBucket.Bucket(heightIndexKey)
hghtIndex := chainBucket.NestedReadBucket(heightIndexKey)
if hghtIndex == nil {
return nil
}
@ -686,7 +685,7 @@ func (ns *nurseryStore) HeightsBelowOrEqual(height uint32) ([]uint32, error) {
var lower, upper [4]byte
byteOrder.PutUint32(upper[:], height)
c := hghtIndex.Cursor()
c := hghtIndex.ReadCursor()
for k, _ := c.Seek(lower[:]); bytes.Compare(k, upper[:]) <= 0 &&
len(k) == 4; k, _ = c.Next() {
@ -712,7 +711,7 @@ func (ns *nurseryStore) HeightsBelowOrEqual(height uint32) ([]uint32, error) {
func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint,
callback func([]byte, []byte) error) error {
return ns.db.View(func(tx *bbolt.Tx) error {
return kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
return ns.forChanOutputs(tx, chanPoint, callback)
})
}
@ -720,15 +719,15 @@ func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint,
// ListChannels returns all channels the nursery is currently tracking.
func (ns *nurseryStore) ListChannels() ([]wire.OutPoint, error) {
var activeChannels []wire.OutPoint
if err := ns.db.View(func(tx *bbolt.Tx) error {
if err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
chainBucket := tx.ReadBucket(ns.pfxChainKey)
if chainBucket == nil {
return nil
}
// Retrieve the existing channel index.
chanIndex := chainBucket.Bucket(channelIndexKey)
chanIndex := chainBucket.NestedReadBucket(channelIndexKey)
if chanIndex == nil {
return nil
}
@ -754,7 +753,7 @@ func (ns *nurseryStore) ListChannels() ([]wire.OutPoint, error) {
// IsMatureChannel determines the whether or not all of the outputs in a
// particular channel bucket have been marked as graduated.
func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) {
err := ns.db.View(func(tx *bbolt.Tx) error {
err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
// Iterate over the contents of the channel bucket, computing
// both total number of outputs, and those that have the grad
// prefix.
@ -783,15 +782,15 @@ var ErrImmatureChannel = errors.New("cannot remove immature channel, " +
// provided channel point.
// NOTE: The channel's entries in the height index are assumed to be removed.
func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error {
return ns.db.Update(func(tx *bbolt.Tx) error {
return kvdb.Update(ns.db, func(tx kvdb.RwTx) error {
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
chainBucket := tx.ReadWriteBucket(ns.pfxChainKey)
if chainBucket == nil {
return nil
}
// Retrieve the channel index stored in the chain bucket.
chanIndex := chainBucket.Bucket(channelIndexKey)
chanIndex := chainBucket.NestedReadWriteBucket(channelIndexKey)
if chanIndex == nil {
return nil
}
@ -824,7 +823,7 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error {
maturityHeight := kid.ConfHeight() + kid.BlocksToMaturity()
hghtBucket := ns.getHeightBucket(tx, maturityHeight)
hghtBucket := ns.getHeightBucketWrite(tx, maturityHeight)
if hghtBucket == nil {
return nil
}
@ -845,7 +844,7 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error {
// its two-stage process of sweeping funds back to the user's wallet. These
// outputs are persisted in the nursery store in the crib state, and will be
// revisited after the first-stage output's CLTV has expired.
func (ns *nurseryStore) enterCrib(tx *bbolt.Tx, baby *babyOutput) error {
func (ns *nurseryStore) enterCrib(tx kvdb.RwTx, baby *babyOutput) error {
// First, retrieve or create the channel bucket corresponding to the
// baby output's origin channel point.
chanPoint := baby.OriginChanPoint()
@ -902,7 +901,7 @@ func (ns *nurseryStore) enterCrib(tx *bbolt.Tx, baby *babyOutput) error {
// through a single stage before sweeping. Outputs are stored in the preschool
// bucket until the commitment transaction has been confirmed, at which point
// they will be moved to the kindergarten bucket.
func (ns *nurseryStore) enterPreschool(tx *bbolt.Tx, kid *kidOutput) error {
func (ns *nurseryStore) enterPreschool(tx kvdb.RwTx, kid *kidOutput) error {
// First, retrieve or create the channel bucket corresponding to the
// baby output's origin channel point.
chanPoint := kid.OriginChanPoint()
@ -935,11 +934,11 @@ func (ns *nurseryStore) enterPreschool(tx *bbolt.Tx, kid *kidOutput) error {
// createChannelBucket creates or retrieves a channel bucket for the provided
// channel point.
func (ns *nurseryStore) createChannelBucket(tx *bbolt.Tx,
chanPoint *wire.OutPoint) (*bbolt.Bucket, error) {
func (ns *nurseryStore) createChannelBucket(tx kvdb.RwTx,
chanPoint *wire.OutPoint) (kvdb.RwBucket, error) {
// Ensure that the chain bucket for this nursery store exists.
chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey)
chainBucket, err := tx.CreateTopLevelBucket(ns.pfxChainKey)
if err != nil {
return nil, err
}
@ -966,17 +965,17 @@ func (ns *nurseryStore) createChannelBucket(tx *bbolt.Tx,
// getChannelBucket retrieves an existing channel bucket from the nursery store,
// using the given channel point. If the bucket does not exist, or any bucket
// along its path does not exist, a nil value is returned.
func (ns *nurseryStore) getChannelBucket(tx *bbolt.Tx,
chanPoint *wire.OutPoint) *bbolt.Bucket {
func (ns *nurseryStore) getChannelBucket(tx kvdb.ReadTx,
chanPoint *wire.OutPoint) kvdb.ReadBucket {
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
chainBucket := tx.ReadBucket(ns.pfxChainKey)
if chainBucket == nil {
return nil
}
// Retrieve the existing channel index.
chanIndex := chainBucket.Bucket(channelIndexKey)
chanIndex := chainBucket.NestedReadBucket(channelIndexKey)
if chanIndex == nil {
return nil
}
@ -988,16 +987,44 @@ func (ns *nurseryStore) getChannelBucket(tx *bbolt.Tx,
return nil
}
return chanIndex.Bucket(chanBuffer.Bytes())
return chanIndex.NestedReadBucket(chanBuffer.Bytes())
}
// getChannelBucketWrite retrieves an existing channel bucket from the nursery store,
// using the given channel point. If the bucket does not exist, or any bucket
// along its path does not exist, a nil value is returned.
func (ns *nurseryStore) getChannelBucketWrite(tx kvdb.RwTx,
chanPoint *wire.OutPoint) kvdb.RwBucket {
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.ReadWriteBucket(ns.pfxChainKey)
if chainBucket == nil {
return nil
}
// Retrieve the existing channel index.
chanIndex := chainBucket.NestedReadWriteBucket(channelIndexKey)
if chanIndex == nil {
return nil
}
// Serialize the provided channel point and return the bucket matching
// the serialized key.
var chanBuffer bytes.Buffer
if err := writeOutpoint(&chanBuffer, chanPoint); err != nil {
return nil
}
return chanIndex.NestedReadWriteBucket(chanBuffer.Bytes())
}
// createHeightBucket creates or retrieves an existing bucket from the height
// index, corresponding to the provided height.
func (ns *nurseryStore) createHeightBucket(tx *bbolt.Tx,
height uint32) (*bbolt.Bucket, error) {
func (ns *nurseryStore) createHeightBucket(tx kvdb.RwTx,
height uint32) (kvdb.RwBucket, error) {
// Ensure that the chain bucket for this nursery store exists.
chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey)
chainBucket, err := tx.CreateTopLevelBucket(ns.pfxChainKey)
if err != nil {
return nil, err
}
@ -1021,17 +1048,17 @@ func (ns *nurseryStore) createHeightBucket(tx *bbolt.Tx,
// getHeightBucketPath retrieves an existing height bucket from the nursery
// store, using the provided block height. If the bucket does not exist, or any
// bucket along its path does not exist, a nil value is returned.
func (ns *nurseryStore) getHeightBucketPath(tx *bbolt.Tx,
height uint32) (*bbolt.Bucket, *bbolt.Bucket, *bbolt.Bucket) {
func (ns *nurseryStore) getHeightBucketPath(tx kvdb.ReadTx,
height uint32) (kvdb.ReadBucket, kvdb.ReadBucket, kvdb.ReadBucket) {
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
chainBucket := tx.ReadBucket(ns.pfxChainKey)
if chainBucket == nil {
return nil, nil, nil
}
// Retrieve the existing channel index.
hghtIndex := chainBucket.Bucket(heightIndexKey)
hghtIndex := chainBucket.NestedReadBucket(heightIndexKey)
if hghtIndex == nil {
return nil, nil, nil
}
@ -1041,24 +1068,63 @@ func (ns *nurseryStore) getHeightBucketPath(tx *bbolt.Tx,
var heightBytes [4]byte
byteOrder.PutUint32(heightBytes[:], height)
return chainBucket, hghtIndex, hghtIndex.Bucket(heightBytes[:])
return chainBucket, hghtIndex, hghtIndex.NestedReadBucket(heightBytes[:])
}
// getHeightBucketPathWrite retrieves an existing height bucket from the nursery
// store, using the provided block height. If the bucket does not exist, or any
// bucket along its path does not exist, a nil value is returned.
func (ns *nurseryStore) getHeightBucketPathWrite(tx kvdb.RwTx,
height uint32) (kvdb.RwBucket, kvdb.RwBucket, kvdb.RwBucket) {
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.ReadWriteBucket(ns.pfxChainKey)
if chainBucket == nil {
return nil, nil, nil
}
// Retrieve the existing channel index.
hghtIndex := chainBucket.NestedReadWriteBucket(heightIndexKey)
if hghtIndex == nil {
return nil, nil, nil
}
// Serialize the provided block height and return the bucket matching
// the serialized key.
var heightBytes [4]byte
byteOrder.PutUint32(heightBytes[:], height)
return chainBucket, hghtIndex, hghtIndex.NestedReadWriteBucket(
heightBytes[:],
)
}
// getHeightBucket retrieves an existing height bucket from the nursery store,
// using the provided block height. If the bucket does not exist, or any bucket
// along its path does not exist, a nil value is returned.
func (ns *nurseryStore) getHeightBucket(tx *bbolt.Tx,
height uint32) *bbolt.Bucket {
func (ns *nurseryStore) getHeightBucket(tx kvdb.ReadTx,
height uint32) kvdb.ReadBucket {
_, _, hghtBucket := ns.getHeightBucketPath(tx, height)
return hghtBucket
}
// getHeightBucketWrite retrieves an existing height bucket from the nursery store,
// using the provided block height. If the bucket does not exist, or any bucket
// along its path does not exist, a nil value is returned.
func (ns *nurseryStore) getHeightBucketWrite(tx kvdb.RwTx,
height uint32) kvdb.RwBucket {
_, _, hghtBucket := ns.getHeightBucketPathWrite(tx, height)
return hghtBucket
}
// createHeightChanBucket creates or retrieves an existing height-channel bucket
// for the provided block height and channel point. This method will attempt to
// instantiate all buckets along the path if required.
func (ns *nurseryStore) createHeightChanBucket(tx *bbolt.Tx,
height uint32, chanPoint *wire.OutPoint) (*bbolt.Bucket, error) {
func (ns *nurseryStore) createHeightChanBucket(tx kvdb.RwTx,
height uint32, chanPoint *wire.OutPoint) (kvdb.RwBucket, error) {
// Ensure that the height bucket for this nursery store exists.
hghtBucket, err := ns.createHeightBucket(tx, height)
@ -1083,8 +1149,8 @@ func (ns *nurseryStore) createHeightChanBucket(tx *bbolt.Tx,
// nursery store, using the provided block height and channel point. if the
// bucket does not exist, or any bucket along its path does not exist, a nil
// value is returned.
func (ns *nurseryStore) getHeightChanBucket(tx *bbolt.Tx,
height uint32, chanPoint *wire.OutPoint) *bbolt.Bucket {
func (ns *nurseryStore) getHeightChanBucket(tx kvdb.ReadTx,
height uint32, chanPoint *wire.OutPoint) kvdb.ReadBucket {
// Retrieve the existing height bucket from this nursery store.
hghtBucket := ns.getHeightBucket(tx, height)
@ -1102,7 +1168,33 @@ func (ns *nurseryStore) getHeightChanBucket(tx *bbolt.Tx,
// Finally, return the height bucket specified by the serialized channel
// point.
return hghtBucket.Bucket(chanBytes)
return hghtBucket.NestedReadBucket(chanBytes)
}
// getHeightChanBucketWrite retrieves an existing height-channel bucket from the
// nursery store, using the provided block height and channel point. if the
// bucket does not exist, or any bucket along its path does not exist, a nil
// value is returned.
func (ns *nurseryStore) getHeightChanBucketWrite(tx kvdb.RwTx,
height uint32, chanPoint *wire.OutPoint) kvdb.RwBucket {
// Retrieve the existing height bucket from this nursery store.
hghtBucket := ns.getHeightBucketWrite(tx, height)
if hghtBucket == nil {
return nil
}
// Serialize the provided channel point, which generates the key for
// looking up the proper height-channel bucket inside the height bucket.
var chanBuffer bytes.Buffer
if err := writeOutpoint(&chanBuffer, chanPoint); err != nil {
return nil
}
chanBytes := chanBuffer.Bytes()
// Finally, return the height bucket specified by the serialized channel
// point.
return hghtBucket.NestedReadWriteBucket(chanBytes)
}
// forEachHeightPrefix enumerates all outputs at the given height whose state
@ -1110,7 +1202,7 @@ func (ns *nurseryStore) getHeightChanBucket(tx *bbolt.Tx,
// enumerate crib and kindergarten outputs at a particular height. The callback
// is invoked with serialized bytes retrieved for each output of interest,
// allowing the caller to deserialize them into the appropriate type.
func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte,
func (ns *nurseryStore) forEachHeightPrefix(tx kvdb.ReadTx, prefix []byte,
height uint32, callback func([]byte) error) error {
// Start by retrieving the height bucket corresponding to the provided
@ -1138,7 +1230,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte,
// Additionally, grab the chain index, which we will facilitate queries
// for each of the channel buckets of each of the channels in the list
// we assembled above.
chanIndex := chainBucket.Bucket(channelIndexKey)
chanIndex := chainBucket.NestedReadBucket(channelIndexKey)
if chanIndex == nil {
return errors.New("unable to retrieve channel index")
}
@ -1151,7 +1243,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte,
for _, chanBytes := range channelsAtHeight {
// Retrieve the height-channel bucket for this channel, which
// holds a sub-bucket for all outputs maturing at this height.
hghtChanBucket := hghtBucket.Bucket(chanBytes)
hghtChanBucket := hghtBucket.NestedReadBucket(chanBytes)
if hghtChanBucket == nil {
return fmt.Errorf("unable to retrieve height-channel "+
"bucket at height %d for %x", height, chanBytes)
@ -1160,7 +1252,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte,
// Load the appropriate channel bucket from the channel index,
// this will allow us to retrieve the individual serialized
// outputs.
chanBucket := chanIndex.Bucket(chanBytes)
chanBucket := chanIndex.NestedReadBucket(chanBytes)
if chanBucket == nil {
return fmt.Errorf("unable to retrieve channel "+
"bucket: '%x'", chanBytes)
@ -1170,7 +1262,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte,
// prefix, we will perform a prefix scan of the buckets
// contained in the height-channel bucket, efficiently
// enumerating the desired outputs.
c := hghtChanBucket.Cursor()
c := hghtChanBucket.ReadCursor()
for k, _ := c.Seek(prefix); bytes.HasPrefix(
k, prefix); k, _ = c.Next() {
@ -1198,7 +1290,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte,
// provided callback. The callback accepts a key-value pair of byte slices
// corresponding to the prefixed-output key and the serialized output,
// respectively.
func (ns *nurseryStore) forChanOutputs(tx *bbolt.Tx, chanPoint *wire.OutPoint,
func (ns *nurseryStore) forChanOutputs(tx kvdb.ReadTx, chanPoint *wire.OutPoint,
callback func([]byte, []byte) error) error {
chanBucket := ns.getChannelBucket(tx, chanPoint)
@ -1216,11 +1308,11 @@ var errBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned")
// removeOutputFromHeight will delete the given output from the specified
// height-channel bucket, and attempt to prune the upstream directories if they
// are empty.
func (ns *nurseryStore) removeOutputFromHeight(tx *bbolt.Tx, height uint32,
func (ns *nurseryStore) removeOutputFromHeight(tx kvdb.RwTx, height uint32,
chanPoint *wire.OutPoint, pfxKey []byte) error {
// Retrieve the height-channel bucket and delete the prefixed output.
hghtChanBucket := ns.getHeightChanBucket(tx, height, chanPoint)
hghtChanBucket := ns.getHeightChanBucketWrite(tx, height, chanPoint)
if hghtChanBucket == nil {
// Height-channel bucket already removed.
return nil
@ -1233,7 +1325,7 @@ func (ns *nurseryStore) removeOutputFromHeight(tx *bbolt.Tx, height uint32,
}
// Retrieve the height bucket that contains the height-channel bucket.
hghtBucket := ns.getHeightBucket(tx, height)
hghtBucket := ns.getHeightBucketWrite(tx, height)
if hghtBucket == nil {
return errors.New("height bucket not found")
}
@ -1268,9 +1360,9 @@ func (ns *nurseryStore) removeOutputFromHeight(tx *bbolt.Tx, height uint32,
// all active outputs at this height have been removed from their respective
// height-channel buckets. The returned boolean value indicated whether or not
// this invocation successfully pruned the height bucket.
func (ns *nurseryStore) pruneHeight(tx *bbolt.Tx, height uint32) (bool, error) {
func (ns *nurseryStore) pruneHeight(tx kvdb.RwTx, height uint32) (bool, error) {
// Fetch the existing height index and height bucket.
_, hghtIndex, hghtBucket := ns.getHeightBucketPath(tx, height)
_, hghtIndex, hghtBucket := ns.getHeightBucketPathWrite(tx, height)
if hghtBucket == nil {
return false, nil
}
@ -1287,7 +1379,7 @@ func (ns *nurseryStore) pruneHeight(tx *bbolt.Tx, height uint32) (bool, error) {
// Attempt to each height-channel bucket from the height bucket
// located above.
hghtChanBucket := hghtBucket.Bucket(chanBytes)
hghtChanBucket := hghtBucket.NestedReadWriteBucket(chanBytes)
if hghtChanBucket == nil {
return errors.New("unable to find height-channel bucket")
}
@ -1315,9 +1407,9 @@ func (ns *nurseryStore) pruneHeight(tx *bbolt.Tx, height uint32) (bool, error) {
// removeBucketIfEmpty attempts to delete a bucket specified by name from the
// provided parent bucket.
func removeBucketIfEmpty(parent *bbolt.Bucket, bktName []byte) error {
func removeBucketIfEmpty(parent kvdb.RwBucket, bktName []byte) error {
// Attempt to fetch the named bucket from its parent.
bkt := parent.Bucket(bktName)
bkt := parent.NestedReadWriteBucket(bktName)
if bkt == nil {
// No bucket was found, already removed?
return nil
@ -1328,25 +1420,25 @@ func removeBucketIfEmpty(parent *bbolt.Bucket, bktName []byte) error {
return err
}
return parent.DeleteBucket(bktName)
return parent.DeleteNestedBucket(bktName)
}
// removeBucketIfExists safely deletes the named bucket by first checking
// that it exists in the parent bucket.
func removeBucketIfExists(parent *bbolt.Bucket, bktName []byte) error {
func removeBucketIfExists(parent kvdb.RwBucket, bktName []byte) error {
// Attempt to fetch the named bucket from its parent.
bkt := parent.Bucket(bktName)
bkt := parent.NestedReadWriteBucket(bktName)
if bkt == nil {
// No bucket was found, already removed?
return nil
}
return parent.DeleteBucket(bktName)
return parent.DeleteNestedBucket(bktName)
}
// isBucketEmpty returns errBucketNotEmpty if the bucket has a non-zero number
// of children.
func isBucketEmpty(parent *bbolt.Bucket) error {
func isBucketEmpty(parent kvdb.ReadBucket) error {
return parent.ForEach(func(_, _ []byte) error {
return errBucketNotEmpty
})

@ -23,7 +23,6 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/wallet/txauthor"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
proxy "github.com/grpc-ecosystem/grpc-gateway/runtime"
@ -33,6 +32,7 @@ import (
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/chanfitness"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
@ -4494,7 +4494,7 @@ func (r *rpcServer) DescribeGraph(ctx context.Context,
// First iterate through all the known nodes (connected or unconnected
// within the graph), collating their current state into the RPC
// response.
err := graph.ForEachNode(nil, func(_ *bbolt.Tx, node *channeldb.LightningNode) error {
err := graph.ForEachNode(nil, func(_ kvdb.ReadTx, node *channeldb.LightningNode) error {
nodeAddrs := make([]*lnrpc.NodeAddress, 0)
for _, addr := range node.Addresses {
nodeAddr := &lnrpc.NodeAddress{
@ -4652,7 +4652,7 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context,
channels []*lnrpc.ChannelEdge
)
if err := node.ForEachChannel(nil, func(_ *bbolt.Tx,
if err := node.ForEachChannel(nil, func(_ kvdb.ReadTx,
edge *channeldb.ChannelEdgeInfo,
c1, c2 *channeldb.ChannelEdgePolicy) error {
@ -4750,7 +4750,7 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context,
// network, tallying up the total number of nodes, and also gathering
// each node so we can measure the graph diameter and degree stats
// below.
if err := graph.ForEachNode(nil, func(tx *bbolt.Tx, node *channeldb.LightningNode) error {
if err := graph.ForEachNode(nil, func(tx kvdb.ReadTx, node *channeldb.LightningNode) error {
// Increment the total number of nodes with each iteration.
numNodes++
@ -4760,7 +4760,7 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context,
// through the db transaction from the outer view so we can
// re-use it within this inner view.
var outDegree uint32
if err := node.ForEachChannel(tx, func(_ *bbolt.Tx,
if err := node.ForEachChannel(tx, func(_ kvdb.ReadTx,
edge *channeldb.ChannelEdgeInfo, _, _ *channeldb.ChannelEdgePolicy) error {
// Bump up the out degree for this node for each
@ -5225,7 +5225,7 @@ func (r *rpcServer) FeeReport(ctx context.Context,
}
var feeReports []*lnrpc.ChannelFeeReport
err = selfNode.ForEachChannel(nil, func(_ *bbolt.Tx, chanInfo *channeldb.ChannelEdgeInfo,
err = selfNode.ForEachChannel(nil, func(_ kvdb.ReadTx, chanInfo *channeldb.ChannelEdgeInfo,
edgePolicy, _ *channeldb.ChannelEdgePolicy) error {
// Self node should always have policies for its channels.

@ -23,7 +23,6 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/go-errors/errors"
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/autopilot"
@ -32,6 +31,7 @@ import (
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/chanfitness"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/contractcourt"
@ -708,7 +708,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
s.missionControl, err = routing.NewMissionControl(
chanDB.DB,
chanDB,
&routing.MissionControlConfig{
AprioriHopProbability: routingConfig.AprioriHopProbability,
PenaltyHalfLife: routingConfig.PenaltyHalfLife,
@ -820,7 +820,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
sweep.DefaultBatchWindowDuration)
sweeperStore, err := sweep.NewSweeperStore(
chanDB.DB, activeNetParams.GenesisHash,
chanDB, activeNetParams.GenesisHash,
)
if err != nil {
srvrLog.Errorf("unable to create sweeper store: %v", err)
@ -2126,7 +2126,7 @@ func (s *server) establishPersistentConnections() error {
// each of the nodes.
selfPub := s.identityPriv.PubKey().SerializeCompressed()
err = sourceNode.ForEachChannel(nil, func(
tx *bbolt.Tx,
tx kvdb.ReadTx,
chanInfo *channeldb.ChannelEdgeInfo,
policy, _ *channeldb.ChannelEdgePolicy) error {