From 071c7cbe783b03595b8657add5ca85882c79f569 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 9 Jan 2020 18:47:38 -0800 Subject: [PATCH] lnd: convert to use new kvdb abstraction --- breacharbiter.go | 28 +++--- fundingmanager.go | 14 +-- nursery_store.go | 232 ++++++++++++++++++++++++++++++++-------------- rpcserver.go | 12 +-- server.go | 8 +- 5 files changed, 193 insertions(+), 101 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index eeee4f8e..f454aad3 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -13,11 +13,11 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnwallet" @@ -1237,10 +1237,10 @@ func newRetributionStore(db *channeldb.DB) *retributionStore { // Add adds a retribution state to the retributionStore, which is then persisted // to disk. func (rs *retributionStore) Add(ret *retributionInfo) error { - return rs.db.Update(func(tx *bbolt.Tx) error { + return kvdb.Update(rs.db, func(tx kvdb.RwTx) error { // If this is our first contract breach, the retributionBucket // won't exist, in which case, we just create a new bucket. - retBucket, err := tx.CreateBucketIfNotExists(retributionBucket) + retBucket, err := tx.CreateTopLevelBucket(retributionBucket) if err != nil { return err } @@ -1264,8 +1264,8 @@ func (rs *retributionStore) Add(ret *retributionInfo) error { // startup and re-register for confirmation notifications. func (rs *retributionStore) Finalize(chanPoint *wire.OutPoint, finalTx *wire.MsgTx) error { - return rs.db.Update(func(tx *bbolt.Tx) error { - justiceBkt, err := tx.CreateBucketIfNotExists(justiceTxnBucket) + return kvdb.Update(rs.db, func(tx kvdb.RwTx) error { + justiceBkt, err := tx.CreateTopLevelBucket(justiceTxnBucket) if err != nil { return err } @@ -1291,8 +1291,8 @@ func (rs *retributionStore) GetFinalizedTxn( chanPoint *wire.OutPoint) (*wire.MsgTx, error) { var finalTxBytes []byte - if err := rs.db.View(func(tx *bbolt.Tx) error { - justiceBkt := tx.Bucket(justiceTxnBucket) + if err := kvdb.View(rs.db, func(tx kvdb.ReadTx) error { + justiceBkt := tx.ReadBucket(justiceTxnBucket) if justiceBkt == nil { return nil } @@ -1325,8 +1325,8 @@ func (rs *retributionStore) GetFinalizedTxn( // that has already been breached. func (rs *retributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) { var found bool - err := rs.db.View(func(tx *bbolt.Tx) error { - retBucket := tx.Bucket(retributionBucket) + err := kvdb.View(rs.db, func(tx kvdb.ReadTx) error { + retBucket := tx.ReadBucket(retributionBucket) if retBucket == nil { return nil } @@ -1350,8 +1350,8 @@ func (rs *retributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) { // Remove removes a retribution state and finalized justice transaction by // channel point from the retribution store. func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error { - return rs.db.Update(func(tx *bbolt.Tx) error { - retBucket := tx.Bucket(retributionBucket) + return kvdb.Update(rs.db, func(tx kvdb.RwTx) error { + retBucket := tx.ReadWriteBucket(retributionBucket) // We return an error if the bucket is not already created, // since normal operation of the breach arbiter should never try @@ -1377,7 +1377,7 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error { // If we have not finalized this channel breach, we can exit // early. - justiceBkt := tx.Bucket(justiceTxnBucket) + justiceBkt := tx.ReadWriteBucket(justiceTxnBucket) if justiceBkt == nil { return nil } @@ -1389,10 +1389,10 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error { // ForAll iterates through all stored retributions and executes the passed // callback function on each retribution. func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error { - return rs.db.View(func(tx *bbolt.Tx) error { + return kvdb.View(rs.db, func(tx kvdb.ReadTx) error { // If the bucket does not exist, then there are no pending // retributions. - retBucket := tx.Bucket(retributionBucket) + retBucket := tx.ReadBucket(retributionBucket) if retBucket == nil { return nil } diff --git a/fundingmanager.go b/fundingmanager.go index a49e0d4d..0bd64430 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -12,12 +12,12 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" @@ -3287,9 +3287,9 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey { // chanPoint to the channelOpeningStateBucket. func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint, state channelOpeningState, shortChanID *lnwire.ShortChannelID) error { - return f.cfg.Wallet.Cfg.Database.Update(func(tx *bbolt.Tx) error { + return kvdb.Update(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RwTx) error { - bucket, err := tx.CreateBucketIfNotExists(channelOpeningStateBucket) + bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket) if err != nil { return err } @@ -3317,9 +3317,9 @@ func (f *fundingManager) getChannelOpeningState(chanPoint *wire.OutPoint) ( var state channelOpeningState var shortChanID lnwire.ShortChannelID - err := f.cfg.Wallet.Cfg.Database.View(func(tx *bbolt.Tx) error { + err := kvdb.View(f.cfg.Wallet.Cfg.Database, func(tx kvdb.ReadTx) error { - bucket := tx.Bucket(channelOpeningStateBucket) + bucket := tx.ReadBucket(channelOpeningStateBucket) if bucket == nil { // If the bucket does not exist, it means we never added // a channel to the db, so return ErrChannelNotFound. @@ -3349,8 +3349,8 @@ func (f *fundingManager) getChannelOpeningState(chanPoint *wire.OutPoint) ( // deleteChannelOpeningState removes any state for chanPoint from the database. func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) error { - return f.cfg.Wallet.Cfg.Database.Update(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(channelOpeningStateBucket) + return kvdb.Update(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RwTx) error { + bucket := tx.ReadWriteBucket(channelOpeningStateBucket) if bucket == nil { return fmt.Errorf("Bucket not found") } diff --git a/nursery_store.go b/nursery_store.go index d7f30d07..2424dbfc 100644 --- a/nursery_store.go +++ b/nursery_store.go @@ -7,8 +7,8 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" ) // Overview of Nursery Store Storage Hierarchy @@ -263,7 +263,7 @@ func newNurseryStore(chainHash *chainhash.Hash, // CSV-delayed outputs (commitment and incoming HTLC's), commitment output and // a list of outgoing two-stage htlc outputs. func (ns *nurseryStore) Incubate(kids []kidOutput, babies []babyOutput) error { - return ns.db.Update(func(tx *bbolt.Tx) error { + return kvdb.Update(ns.db, func(tx kvdb.RwTx) error { // If we have any kid outputs to incubate, then we'll attempt // to add each of them to the nursery store. Any duplicate // outputs will be ignored. @@ -290,7 +290,7 @@ func (ns *nurseryStore) Incubate(kids []kidOutput, babies []babyOutput) error { // kindergarten bucket. The now mature kidOutput contained in the babyOutput // will be stored as it waits out the kidOutput's CSV delay. func (ns *nurseryStore) CribToKinder(bby *babyOutput) error { - return ns.db.Update(func(tx *bbolt.Tx) error { + return kvdb.Update(ns.db, func(tx kvdb.RwTx) error { // First, retrieve or create the channel bucket corresponding to // the baby output's origin channel point. @@ -374,7 +374,7 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error { func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput, lastGradHeight uint32) error { - return ns.db.Update(func(tx *bbolt.Tx) error { + return kvdb.Update(ns.db, func(tx kvdb.RwTx) error { // Create or retrieve the channel bucket corresponding to the // kid output's origin channel point. chanPoint := kid.OriginChanPoint() @@ -471,7 +471,7 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput, // the height and channel indexes. The height bucket will be opportunistically // pruned from the height index as outputs are removed. func (ns *nurseryStore) GraduateKinder(height uint32, kid *kidOutput) error { - return ns.db.Update(func(tx *bbolt.Tx) error { + return kvdb.Update(ns.db, func(tx kvdb.RwTx) error { hghtBucket := ns.getHeightBucket(tx, height) if hghtBucket == nil { @@ -501,8 +501,7 @@ func (ns *nurseryStore) GraduateKinder(height uint32, kid *kidOutput) error { return err } - chanBucket := ns.getChannelBucket(tx, - chanPoint) + chanBucket := ns.getChannelBucketWrite(tx, chanPoint) if chanBucket == nil { return ErrContractNotFound } @@ -540,7 +539,7 @@ func (ns *nurseryStore) FetchClass( // processed at the provided block height. var kids []kidOutput var babies []babyOutput - if err := ns.db.View(func(tx *bbolt.Tx) error { + if err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error { // Append each crib output to our list of babyOutputs. if err := ns.forEachHeightPrefix(tx, cribPrefix, height, func(buf []byte) error { @@ -594,16 +593,16 @@ func (ns *nurseryStore) FetchClass( // preschool bucket. func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) { var kids []kidOutput - if err := ns.db.View(func(tx *bbolt.Tx) error { + if err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error { // Retrieve the existing chain bucket for this nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) + chainBucket := tx.ReadBucket(ns.pfxChainKey) if chainBucket == nil { return nil } // Load the existing channel index from the chain bucket. - chanIndex := chainBucket.Bucket(channelIndexKey) + chanIndex := chainBucket.NestedReadBucket(channelIndexKey) if chanIndex == nil { return nil } @@ -626,7 +625,7 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) { for _, chanBytes := range activeChannels { // Retrieve the channel bucket associated with this // channel. - chanBucket := chanIndex.Bucket(chanBytes) + chanBucket := chanIndex.NestedReadBucket(chanBytes) if chanBucket == nil { continue } @@ -635,7 +634,7 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) { // "pscl" prefix. So, we will perform a prefix scan of // the channel bucket to efficiently enumerate all the // desired outputs. - c := chanBucket.Cursor() + c := chanBucket.ReadCursor() for k, v := c.Seek(psclPrefix); bytes.HasPrefix( k, psclPrefix); k, v = c.Next() { @@ -667,16 +666,16 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) { // index at or below the provided upper bound. func (ns *nurseryStore) HeightsBelowOrEqual(height uint32) ([]uint32, error) { var activeHeights []uint32 - err := ns.db.View(func(tx *bbolt.Tx) error { + err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error { // Ensure that the chain bucket for this nursery store exists. - chainBucket := tx.Bucket(ns.pfxChainKey) + chainBucket := tx.ReadBucket(ns.pfxChainKey) if chainBucket == nil { return nil } // Ensure that the height index has been properly initialized for this // chain. - hghtIndex := chainBucket.Bucket(heightIndexKey) + hghtIndex := chainBucket.NestedReadBucket(heightIndexKey) if hghtIndex == nil { return nil } @@ -686,7 +685,7 @@ func (ns *nurseryStore) HeightsBelowOrEqual(height uint32) ([]uint32, error) { var lower, upper [4]byte byteOrder.PutUint32(upper[:], height) - c := hghtIndex.Cursor() + c := hghtIndex.ReadCursor() for k, _ := c.Seek(lower[:]); bytes.Compare(k, upper[:]) <= 0 && len(k) == 4; k, _ = c.Next() { @@ -712,7 +711,7 @@ func (ns *nurseryStore) HeightsBelowOrEqual(height uint32) ([]uint32, error) { func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint, callback func([]byte, []byte) error) error { - return ns.db.View(func(tx *bbolt.Tx) error { + return kvdb.View(ns.db, func(tx kvdb.ReadTx) error { return ns.forChanOutputs(tx, chanPoint, callback) }) } @@ -720,15 +719,15 @@ func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint, // ListChannels returns all channels the nursery is currently tracking. func (ns *nurseryStore) ListChannels() ([]wire.OutPoint, error) { var activeChannels []wire.OutPoint - if err := ns.db.View(func(tx *bbolt.Tx) error { + if err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error { // Retrieve the existing chain bucket for this nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) + chainBucket := tx.ReadBucket(ns.pfxChainKey) if chainBucket == nil { return nil } // Retrieve the existing channel index. - chanIndex := chainBucket.Bucket(channelIndexKey) + chanIndex := chainBucket.NestedReadBucket(channelIndexKey) if chanIndex == nil { return nil } @@ -754,7 +753,7 @@ func (ns *nurseryStore) ListChannels() ([]wire.OutPoint, error) { // IsMatureChannel determines the whether or not all of the outputs in a // particular channel bucket have been marked as graduated. func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) { - err := ns.db.View(func(tx *bbolt.Tx) error { + err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error { // Iterate over the contents of the channel bucket, computing // both total number of outputs, and those that have the grad // prefix. @@ -783,15 +782,15 @@ var ErrImmatureChannel = errors.New("cannot remove immature channel, " + // provided channel point. // NOTE: The channel's entries in the height index are assumed to be removed. func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { - return ns.db.Update(func(tx *bbolt.Tx) error { + return kvdb.Update(ns.db, func(tx kvdb.RwTx) error { // Retrieve the existing chain bucket for this nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) + chainBucket := tx.ReadWriteBucket(ns.pfxChainKey) if chainBucket == nil { return nil } // Retrieve the channel index stored in the chain bucket. - chanIndex := chainBucket.Bucket(channelIndexKey) + chanIndex := chainBucket.NestedReadWriteBucket(channelIndexKey) if chanIndex == nil { return nil } @@ -824,7 +823,7 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { maturityHeight := kid.ConfHeight() + kid.BlocksToMaturity() - hghtBucket := ns.getHeightBucket(tx, maturityHeight) + hghtBucket := ns.getHeightBucketWrite(tx, maturityHeight) if hghtBucket == nil { return nil } @@ -845,7 +844,7 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { // its two-stage process of sweeping funds back to the user's wallet. These // outputs are persisted in the nursery store in the crib state, and will be // revisited after the first-stage output's CLTV has expired. -func (ns *nurseryStore) enterCrib(tx *bbolt.Tx, baby *babyOutput) error { +func (ns *nurseryStore) enterCrib(tx kvdb.RwTx, baby *babyOutput) error { // First, retrieve or create the channel bucket corresponding to the // baby output's origin channel point. chanPoint := baby.OriginChanPoint() @@ -902,7 +901,7 @@ func (ns *nurseryStore) enterCrib(tx *bbolt.Tx, baby *babyOutput) error { // through a single stage before sweeping. Outputs are stored in the preschool // bucket until the commitment transaction has been confirmed, at which point // they will be moved to the kindergarten bucket. -func (ns *nurseryStore) enterPreschool(tx *bbolt.Tx, kid *kidOutput) error { +func (ns *nurseryStore) enterPreschool(tx kvdb.RwTx, kid *kidOutput) error { // First, retrieve or create the channel bucket corresponding to the // baby output's origin channel point. chanPoint := kid.OriginChanPoint() @@ -935,11 +934,11 @@ func (ns *nurseryStore) enterPreschool(tx *bbolt.Tx, kid *kidOutput) error { // createChannelBucket creates or retrieves a channel bucket for the provided // channel point. -func (ns *nurseryStore) createChannelBucket(tx *bbolt.Tx, - chanPoint *wire.OutPoint) (*bbolt.Bucket, error) { +func (ns *nurseryStore) createChannelBucket(tx kvdb.RwTx, + chanPoint *wire.OutPoint) (kvdb.RwBucket, error) { // Ensure that the chain bucket for this nursery store exists. - chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey) + chainBucket, err := tx.CreateTopLevelBucket(ns.pfxChainKey) if err != nil { return nil, err } @@ -966,17 +965,17 @@ func (ns *nurseryStore) createChannelBucket(tx *bbolt.Tx, // getChannelBucket retrieves an existing channel bucket from the nursery store, // using the given channel point. If the bucket does not exist, or any bucket // along its path does not exist, a nil value is returned. -func (ns *nurseryStore) getChannelBucket(tx *bbolt.Tx, - chanPoint *wire.OutPoint) *bbolt.Bucket { +func (ns *nurseryStore) getChannelBucket(tx kvdb.ReadTx, + chanPoint *wire.OutPoint) kvdb.ReadBucket { // Retrieve the existing chain bucket for this nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) + chainBucket := tx.ReadBucket(ns.pfxChainKey) if chainBucket == nil { return nil } // Retrieve the existing channel index. - chanIndex := chainBucket.Bucket(channelIndexKey) + chanIndex := chainBucket.NestedReadBucket(channelIndexKey) if chanIndex == nil { return nil } @@ -988,16 +987,44 @@ func (ns *nurseryStore) getChannelBucket(tx *bbolt.Tx, return nil } - return chanIndex.Bucket(chanBuffer.Bytes()) + return chanIndex.NestedReadBucket(chanBuffer.Bytes()) +} + +// getChannelBucketWrite retrieves an existing channel bucket from the nursery store, +// using the given channel point. If the bucket does not exist, or any bucket +// along its path does not exist, a nil value is returned. +func (ns *nurseryStore) getChannelBucketWrite(tx kvdb.RwTx, + chanPoint *wire.OutPoint) kvdb.RwBucket { + + // Retrieve the existing chain bucket for this nursery store. + chainBucket := tx.ReadWriteBucket(ns.pfxChainKey) + if chainBucket == nil { + return nil + } + + // Retrieve the existing channel index. + chanIndex := chainBucket.NestedReadWriteBucket(channelIndexKey) + if chanIndex == nil { + return nil + } + + // Serialize the provided channel point and return the bucket matching + // the serialized key. + var chanBuffer bytes.Buffer + if err := writeOutpoint(&chanBuffer, chanPoint); err != nil { + return nil + } + + return chanIndex.NestedReadWriteBucket(chanBuffer.Bytes()) } // createHeightBucket creates or retrieves an existing bucket from the height // index, corresponding to the provided height. -func (ns *nurseryStore) createHeightBucket(tx *bbolt.Tx, - height uint32) (*bbolt.Bucket, error) { +func (ns *nurseryStore) createHeightBucket(tx kvdb.RwTx, + height uint32) (kvdb.RwBucket, error) { // Ensure that the chain bucket for this nursery store exists. - chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey) + chainBucket, err := tx.CreateTopLevelBucket(ns.pfxChainKey) if err != nil { return nil, err } @@ -1021,17 +1048,17 @@ func (ns *nurseryStore) createHeightBucket(tx *bbolt.Tx, // getHeightBucketPath retrieves an existing height bucket from the nursery // store, using the provided block height. If the bucket does not exist, or any // bucket along its path does not exist, a nil value is returned. -func (ns *nurseryStore) getHeightBucketPath(tx *bbolt.Tx, - height uint32) (*bbolt.Bucket, *bbolt.Bucket, *bbolt.Bucket) { +func (ns *nurseryStore) getHeightBucketPath(tx kvdb.ReadTx, + height uint32) (kvdb.ReadBucket, kvdb.ReadBucket, kvdb.ReadBucket) { // Retrieve the existing chain bucket for this nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) + chainBucket := tx.ReadBucket(ns.pfxChainKey) if chainBucket == nil { return nil, nil, nil } // Retrieve the existing channel index. - hghtIndex := chainBucket.Bucket(heightIndexKey) + hghtIndex := chainBucket.NestedReadBucket(heightIndexKey) if hghtIndex == nil { return nil, nil, nil } @@ -1041,24 +1068,63 @@ func (ns *nurseryStore) getHeightBucketPath(tx *bbolt.Tx, var heightBytes [4]byte byteOrder.PutUint32(heightBytes[:], height) - return chainBucket, hghtIndex, hghtIndex.Bucket(heightBytes[:]) + return chainBucket, hghtIndex, hghtIndex.NestedReadBucket(heightBytes[:]) +} + +// getHeightBucketPathWrite retrieves an existing height bucket from the nursery +// store, using the provided block height. If the bucket does not exist, or any +// bucket along its path does not exist, a nil value is returned. +func (ns *nurseryStore) getHeightBucketPathWrite(tx kvdb.RwTx, + height uint32) (kvdb.RwBucket, kvdb.RwBucket, kvdb.RwBucket) { + + // Retrieve the existing chain bucket for this nursery store. + chainBucket := tx.ReadWriteBucket(ns.pfxChainKey) + if chainBucket == nil { + return nil, nil, nil + } + + // Retrieve the existing channel index. + hghtIndex := chainBucket.NestedReadWriteBucket(heightIndexKey) + if hghtIndex == nil { + return nil, nil, nil + } + + // Serialize the provided block height and return the bucket matching + // the serialized key. + var heightBytes [4]byte + byteOrder.PutUint32(heightBytes[:], height) + + return chainBucket, hghtIndex, hghtIndex.NestedReadWriteBucket( + heightBytes[:], + ) } // getHeightBucket retrieves an existing height bucket from the nursery store, // using the provided block height. If the bucket does not exist, or any bucket // along its path does not exist, a nil value is returned. -func (ns *nurseryStore) getHeightBucket(tx *bbolt.Tx, - height uint32) *bbolt.Bucket { +func (ns *nurseryStore) getHeightBucket(tx kvdb.ReadTx, + height uint32) kvdb.ReadBucket { _, _, hghtBucket := ns.getHeightBucketPath(tx, height) return hghtBucket } +// getHeightBucketWrite retrieves an existing height bucket from the nursery store, +// using the provided block height. If the bucket does not exist, or any bucket +// along its path does not exist, a nil value is returned. +func (ns *nurseryStore) getHeightBucketWrite(tx kvdb.RwTx, + height uint32) kvdb.RwBucket { + + _, _, hghtBucket := ns.getHeightBucketPathWrite(tx, height) + + return hghtBucket +} + // createHeightChanBucket creates or retrieves an existing height-channel bucket // for the provided block height and channel point. This method will attempt to // instantiate all buckets along the path if required. -func (ns *nurseryStore) createHeightChanBucket(tx *bbolt.Tx, - height uint32, chanPoint *wire.OutPoint) (*bbolt.Bucket, error) { +func (ns *nurseryStore) createHeightChanBucket(tx kvdb.RwTx, + height uint32, chanPoint *wire.OutPoint) (kvdb.RwBucket, error) { // Ensure that the height bucket for this nursery store exists. hghtBucket, err := ns.createHeightBucket(tx, height) @@ -1083,8 +1149,8 @@ func (ns *nurseryStore) createHeightChanBucket(tx *bbolt.Tx, // nursery store, using the provided block height and channel point. if the // bucket does not exist, or any bucket along its path does not exist, a nil // value is returned. -func (ns *nurseryStore) getHeightChanBucket(tx *bbolt.Tx, - height uint32, chanPoint *wire.OutPoint) *bbolt.Bucket { +func (ns *nurseryStore) getHeightChanBucket(tx kvdb.ReadTx, + height uint32, chanPoint *wire.OutPoint) kvdb.ReadBucket { // Retrieve the existing height bucket from this nursery store. hghtBucket := ns.getHeightBucket(tx, height) @@ -1102,7 +1168,33 @@ func (ns *nurseryStore) getHeightChanBucket(tx *bbolt.Tx, // Finally, return the height bucket specified by the serialized channel // point. - return hghtBucket.Bucket(chanBytes) + return hghtBucket.NestedReadBucket(chanBytes) +} + +// getHeightChanBucketWrite retrieves an existing height-channel bucket from the +// nursery store, using the provided block height and channel point. if the +// bucket does not exist, or any bucket along its path does not exist, a nil +// value is returned. +func (ns *nurseryStore) getHeightChanBucketWrite(tx kvdb.RwTx, + height uint32, chanPoint *wire.OutPoint) kvdb.RwBucket { + + // Retrieve the existing height bucket from this nursery store. + hghtBucket := ns.getHeightBucketWrite(tx, height) + if hghtBucket == nil { + return nil + } + + // Serialize the provided channel point, which generates the key for + // looking up the proper height-channel bucket inside the height bucket. + var chanBuffer bytes.Buffer + if err := writeOutpoint(&chanBuffer, chanPoint); err != nil { + return nil + } + chanBytes := chanBuffer.Bytes() + + // Finally, return the height bucket specified by the serialized channel + // point. + return hghtBucket.NestedReadWriteBucket(chanBytes) } // forEachHeightPrefix enumerates all outputs at the given height whose state @@ -1110,7 +1202,7 @@ func (ns *nurseryStore) getHeightChanBucket(tx *bbolt.Tx, // enumerate crib and kindergarten outputs at a particular height. The callback // is invoked with serialized bytes retrieved for each output of interest, // allowing the caller to deserialize them into the appropriate type. -func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte, +func (ns *nurseryStore) forEachHeightPrefix(tx kvdb.ReadTx, prefix []byte, height uint32, callback func([]byte) error) error { // Start by retrieving the height bucket corresponding to the provided @@ -1138,7 +1230,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte, // Additionally, grab the chain index, which we will facilitate queries // for each of the channel buckets of each of the channels in the list // we assembled above. - chanIndex := chainBucket.Bucket(channelIndexKey) + chanIndex := chainBucket.NestedReadBucket(channelIndexKey) if chanIndex == nil { return errors.New("unable to retrieve channel index") } @@ -1151,7 +1243,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte, for _, chanBytes := range channelsAtHeight { // Retrieve the height-channel bucket for this channel, which // holds a sub-bucket for all outputs maturing at this height. - hghtChanBucket := hghtBucket.Bucket(chanBytes) + hghtChanBucket := hghtBucket.NestedReadBucket(chanBytes) if hghtChanBucket == nil { return fmt.Errorf("unable to retrieve height-channel "+ "bucket at height %d for %x", height, chanBytes) @@ -1160,7 +1252,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte, // Load the appropriate channel bucket from the channel index, // this will allow us to retrieve the individual serialized // outputs. - chanBucket := chanIndex.Bucket(chanBytes) + chanBucket := chanIndex.NestedReadBucket(chanBytes) if chanBucket == nil { return fmt.Errorf("unable to retrieve channel "+ "bucket: '%x'", chanBytes) @@ -1170,7 +1262,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte, // prefix, we will perform a prefix scan of the buckets // contained in the height-channel bucket, efficiently // enumerating the desired outputs. - c := hghtChanBucket.Cursor() + c := hghtChanBucket.ReadCursor() for k, _ := c.Seek(prefix); bytes.HasPrefix( k, prefix); k, _ = c.Next() { @@ -1198,7 +1290,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bbolt.Tx, prefix []byte, // provided callback. The callback accepts a key-value pair of byte slices // corresponding to the prefixed-output key and the serialized output, // respectively. -func (ns *nurseryStore) forChanOutputs(tx *bbolt.Tx, chanPoint *wire.OutPoint, +func (ns *nurseryStore) forChanOutputs(tx kvdb.ReadTx, chanPoint *wire.OutPoint, callback func([]byte, []byte) error) error { chanBucket := ns.getChannelBucket(tx, chanPoint) @@ -1216,11 +1308,11 @@ var errBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned") // removeOutputFromHeight will delete the given output from the specified // height-channel bucket, and attempt to prune the upstream directories if they // are empty. -func (ns *nurseryStore) removeOutputFromHeight(tx *bbolt.Tx, height uint32, +func (ns *nurseryStore) removeOutputFromHeight(tx kvdb.RwTx, height uint32, chanPoint *wire.OutPoint, pfxKey []byte) error { // Retrieve the height-channel bucket and delete the prefixed output. - hghtChanBucket := ns.getHeightChanBucket(tx, height, chanPoint) + hghtChanBucket := ns.getHeightChanBucketWrite(tx, height, chanPoint) if hghtChanBucket == nil { // Height-channel bucket already removed. return nil @@ -1233,7 +1325,7 @@ func (ns *nurseryStore) removeOutputFromHeight(tx *bbolt.Tx, height uint32, } // Retrieve the height bucket that contains the height-channel bucket. - hghtBucket := ns.getHeightBucket(tx, height) + hghtBucket := ns.getHeightBucketWrite(tx, height) if hghtBucket == nil { return errors.New("height bucket not found") } @@ -1268,9 +1360,9 @@ func (ns *nurseryStore) removeOutputFromHeight(tx *bbolt.Tx, height uint32, // all active outputs at this height have been removed from their respective // height-channel buckets. The returned boolean value indicated whether or not // this invocation successfully pruned the height bucket. -func (ns *nurseryStore) pruneHeight(tx *bbolt.Tx, height uint32) (bool, error) { +func (ns *nurseryStore) pruneHeight(tx kvdb.RwTx, height uint32) (bool, error) { // Fetch the existing height index and height bucket. - _, hghtIndex, hghtBucket := ns.getHeightBucketPath(tx, height) + _, hghtIndex, hghtBucket := ns.getHeightBucketPathWrite(tx, height) if hghtBucket == nil { return false, nil } @@ -1287,7 +1379,7 @@ func (ns *nurseryStore) pruneHeight(tx *bbolt.Tx, height uint32) (bool, error) { // Attempt to each height-channel bucket from the height bucket // located above. - hghtChanBucket := hghtBucket.Bucket(chanBytes) + hghtChanBucket := hghtBucket.NestedReadWriteBucket(chanBytes) if hghtChanBucket == nil { return errors.New("unable to find height-channel bucket") } @@ -1315,9 +1407,9 @@ func (ns *nurseryStore) pruneHeight(tx *bbolt.Tx, height uint32) (bool, error) { // removeBucketIfEmpty attempts to delete a bucket specified by name from the // provided parent bucket. -func removeBucketIfEmpty(parent *bbolt.Bucket, bktName []byte) error { +func removeBucketIfEmpty(parent kvdb.RwBucket, bktName []byte) error { // Attempt to fetch the named bucket from its parent. - bkt := parent.Bucket(bktName) + bkt := parent.NestedReadWriteBucket(bktName) if bkt == nil { // No bucket was found, already removed? return nil @@ -1328,25 +1420,25 @@ func removeBucketIfEmpty(parent *bbolt.Bucket, bktName []byte) error { return err } - return parent.DeleteBucket(bktName) + return parent.DeleteNestedBucket(bktName) } // removeBucketIfExists safely deletes the named bucket by first checking // that it exists in the parent bucket. -func removeBucketIfExists(parent *bbolt.Bucket, bktName []byte) error { +func removeBucketIfExists(parent kvdb.RwBucket, bktName []byte) error { // Attempt to fetch the named bucket from its parent. - bkt := parent.Bucket(bktName) + bkt := parent.NestedReadWriteBucket(bktName) if bkt == nil { // No bucket was found, already removed? return nil } - return parent.DeleteBucket(bktName) + return parent.DeleteNestedBucket(bktName) } // isBucketEmpty returns errBucketNotEmpty if the bucket has a non-zero number // of children. -func isBucketEmpty(parent *bbolt.Bucket) error { +func isBucketEmpty(parent kvdb.ReadBucket) error { return parent.ForEach(func(_, _ []byte) error { return errBucketNotEmpty }) diff --git a/rpcserver.go b/rpcserver.go index 700c572a..43349100 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -23,7 +23,6 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwallet/wallet/txauthor" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" @@ -33,6 +32,7 @@ import ( "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/chanfitness" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/discovery" @@ -4494,7 +4494,7 @@ func (r *rpcServer) DescribeGraph(ctx context.Context, // First iterate through all the known nodes (connected or unconnected // within the graph), collating their current state into the RPC // response. - err := graph.ForEachNode(nil, func(_ *bbolt.Tx, node *channeldb.LightningNode) error { + err := graph.ForEachNode(nil, func(_ kvdb.ReadTx, node *channeldb.LightningNode) error { nodeAddrs := make([]*lnrpc.NodeAddress, 0) for _, addr := range node.Addresses { nodeAddr := &lnrpc.NodeAddress{ @@ -4652,7 +4652,7 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context, channels []*lnrpc.ChannelEdge ) - if err := node.ForEachChannel(nil, func(_ *bbolt.Tx, + if err := node.ForEachChannel(nil, func(_ kvdb.ReadTx, edge *channeldb.ChannelEdgeInfo, c1, c2 *channeldb.ChannelEdgePolicy) error { @@ -4750,7 +4750,7 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context, // network, tallying up the total number of nodes, and also gathering // each node so we can measure the graph diameter and degree stats // below. - if err := graph.ForEachNode(nil, func(tx *bbolt.Tx, node *channeldb.LightningNode) error { + if err := graph.ForEachNode(nil, func(tx kvdb.ReadTx, node *channeldb.LightningNode) error { // Increment the total number of nodes with each iteration. numNodes++ @@ -4760,7 +4760,7 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context, // through the db transaction from the outer view so we can // re-use it within this inner view. var outDegree uint32 - if err := node.ForEachChannel(tx, func(_ *bbolt.Tx, + if err := node.ForEachChannel(tx, func(_ kvdb.ReadTx, edge *channeldb.ChannelEdgeInfo, _, _ *channeldb.ChannelEdgePolicy) error { // Bump up the out degree for this node for each @@ -5225,7 +5225,7 @@ func (r *rpcServer) FeeReport(ctx context.Context, } var feeReports []*lnrpc.ChannelFeeReport - err = selfNode.ForEachChannel(nil, func(_ *bbolt.Tx, chanInfo *channeldb.ChannelEdgeInfo, + err = selfNode.ForEachChannel(nil, func(_ kvdb.ReadTx, chanInfo *channeldb.ChannelEdgeInfo, edgePolicy, _ *channeldb.ChannelEdgePolicy) error { // Self node should always have policies for its channels. diff --git a/server.go b/server.go index 98f446d7..6792981c 100644 --- a/server.go +++ b/server.go @@ -23,7 +23,6 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" - "github.com/coreos/bbolt" "github.com/go-errors/errors" sphinx "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/autopilot" @@ -32,6 +31,7 @@ import ( "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/chanfitness" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/contractcourt" @@ -708,7 +708,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC) s.missionControl, err = routing.NewMissionControl( - chanDB.DB, + chanDB, &routing.MissionControlConfig{ AprioriHopProbability: routingConfig.AprioriHopProbability, PenaltyHalfLife: routingConfig.PenaltyHalfLife, @@ -820,7 +820,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, sweep.DefaultBatchWindowDuration) sweeperStore, err := sweep.NewSweeperStore( - chanDB.DB, activeNetParams.GenesisHash, + chanDB, activeNetParams.GenesisHash, ) if err != nil { srvrLog.Errorf("unable to create sweeper store: %v", err) @@ -2126,7 +2126,7 @@ func (s *server) establishPersistentConnections() error { // each of the nodes. selfPub := s.identityPriv.PubKey().SerializeCompressed() err = sourceNode.ForEachChannel(nil, func( - tx *bbolt.Tx, + tx kvdb.ReadTx, chanInfo *channeldb.ChannelEdgeInfo, policy, _ *channeldb.ChannelEdgePolicy) error {