From 320101d05461f27245774be74ec417bad8d049e5 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 9 Jan 2020 18:44:12 -0800 Subject: [PATCH] contractcourt: convert to use new kvdb abstraction --- contractcourt/briefcase.go | 73 ++++++++++++------------ contractcourt/briefcase_test.go | 6 +- contractcourt/chain_arbitrator.go | 4 +- contractcourt/channel_arbitrator_test.go | 4 +- 4 files changed, 44 insertions(+), 43 deletions(-) diff --git a/contractcourt/briefcase.go b/contractcourt/briefcase.go index 1114f6db..03a878b7 100644 --- a/contractcourt/briefcase.go +++ b/contractcourt/briefcase.go @@ -8,8 +8,8 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnwallet" ) @@ -309,7 +309,7 @@ var ( // boltArbitratorLog is an implementation of the ArbitratorLog interface backed // by a bolt DB instance. type boltArbitratorLog struct { - db *bbolt.DB + db kvdb.Backend cfg ChannelArbitratorConfig @@ -318,7 +318,7 @@ type boltArbitratorLog struct { // newBoltArbitratorLog returns a new instance of the boltArbitratorLog given // an arbitrator config, and the items needed to create its log scope. -func newBoltArbitratorLog(db *bbolt.DB, cfg ChannelArbitratorConfig, +func newBoltArbitratorLog(db kvdb.Backend, cfg ChannelArbitratorConfig, chainHash chainhash.Hash, chanPoint wire.OutPoint) (*boltArbitratorLog, error) { scope, err := newLogScope(chainHash, chanPoint) @@ -337,13 +337,13 @@ func newBoltArbitratorLog(db *bbolt.DB, cfg ChannelArbitratorConfig, // interface. var _ ArbitratorLog = (*boltArbitratorLog)(nil) -func fetchContractReadBucket(tx *bbolt.Tx, scopeKey []byte) (*bbolt.Bucket, error) { - scopeBucket := tx.Bucket(scopeKey) +func fetchContractReadBucket(tx kvdb.ReadTx, scopeKey []byte) (kvdb.ReadBucket, error) { + scopeBucket := tx.ReadBucket(scopeKey) if scopeBucket == nil { return nil, errScopeBucketNoExist } - contractBucket := scopeBucket.Bucket(contractsBucketKey) + contractBucket := scopeBucket.NestedReadBucket(contractsBucketKey) if contractBucket == nil { return nil, errNoContracts } @@ -351,8 +351,8 @@ func fetchContractReadBucket(tx *bbolt.Tx, scopeKey []byte) (*bbolt.Bucket, erro return contractBucket, nil } -func fetchContractWriteBucket(tx *bbolt.Tx, scopeKey []byte) (*bbolt.Bucket, error) { - scopeBucket, err := tx.CreateBucketIfNotExists(scopeKey) +func fetchContractWriteBucket(tx kvdb.RwTx, scopeKey []byte) (kvdb.RwBucket, error) { + scopeBucket, err := tx.CreateTopLevelBucket(scopeKey) if err != nil { return nil, err } @@ -369,7 +369,7 @@ func fetchContractWriteBucket(tx *bbolt.Tx, scopeKey []byte) (*bbolt.Bucket, err // writeResolver is a helper method that writes a contract resolver and stores // it it within the passed contractBucket using its unique resolutionsKey key. -func (b *boltArbitratorLog) writeResolver(contractBucket *bbolt.Bucket, +func (b *boltArbitratorLog) writeResolver(contractBucket kvdb.RwBucket, res ContractResolver) error { // Only persist resolvers that are stateful. Stateless resolvers don't @@ -415,8 +415,8 @@ func (b *boltArbitratorLog) writeResolver(contractBucket *bbolt.Bucket, // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) { var s ArbitratorState - err := b.db.View(func(tx *bbolt.Tx) error { - scopeBucket := tx.Bucket(b.scopeKey[:]) + err := kvdb.View(b.db, func(tx kvdb.ReadTx) error { + scopeBucket := tx.ReadBucket(b.scopeKey[:]) if scopeBucket == nil { return errScopeBucketNoExist } @@ -440,8 +440,9 @@ func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) { // // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) CommitState(s ArbitratorState) error { - return b.db.Batch(func(tx *bbolt.Tx) error { - scopeBucket, err := tx.CreateBucketIfNotExists(b.scopeKey[:]) + fmt.Printf("yeee: %T\n", b.db) + return kvdb.Batch(b.db, func(tx kvdb.RwTx) error { + scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:]) if err != nil { return err } @@ -460,7 +461,7 @@ func (b *boltArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver, erro Checkpoint: b.checkpointContract, } var contracts []ContractResolver - err := b.db.View(func(tx *bbolt.Tx) error { + err := kvdb.View(b.db, func(tx kvdb.ReadTx) error { contractBucket, err := fetchContractReadBucket(tx, b.scopeKey[:]) if err != nil { return err @@ -533,7 +534,7 @@ func (b *boltArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver, erro // // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) InsertUnresolvedContracts(resolvers ...ContractResolver) error { - return b.db.Batch(func(tx *bbolt.Tx) error { + return kvdb.Batch(b.db, func(tx kvdb.RwTx) error { contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:]) if err != nil { return err @@ -556,7 +557,7 @@ func (b *boltArbitratorLog) InsertUnresolvedContracts(resolvers ...ContractResol // // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) SwapContract(oldContract, newContract ContractResolver) error { - return b.db.Batch(func(tx *bbolt.Tx) error { + return kvdb.Batch(b.db, func(tx kvdb.RwTx) error { contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:]) if err != nil { return err @@ -576,7 +577,7 @@ func (b *boltArbitratorLog) SwapContract(oldContract, newContract ContractResolv // // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) ResolveContract(res ContractResolver) error { - return b.db.Batch(func(tx *bbolt.Tx) error { + return kvdb.Batch(b.db, func(tx kvdb.RwTx) error { contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:]) if err != nil { return err @@ -594,8 +595,8 @@ func (b *boltArbitratorLog) ResolveContract(res ContractResolver) error { // // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) LogContractResolutions(c *ContractResolutions) error { - return b.db.Batch(func(tx *bbolt.Tx) error { - scopeBucket, err := tx.CreateBucketIfNotExists(b.scopeKey[:]) + return kvdb.Batch(b.db, func(tx kvdb.RwTx) error { + scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:]) if err != nil { return err } @@ -674,8 +675,8 @@ func (b *boltArbitratorLog) LogContractResolutions(c *ContractResolutions) error // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) FetchContractResolutions() (*ContractResolutions, error) { c := &ContractResolutions{} - err := b.db.View(func(tx *bbolt.Tx) error { - scopeBucket := tx.Bucket(b.scopeKey[:]) + err := kvdb.View(b.db, func(tx kvdb.ReadTx) error { + scopeBucket := tx.ReadBucket(b.scopeKey[:]) if scopeBucket == nil { return errScopeBucketNoExist } @@ -773,13 +774,13 @@ func (b *boltArbitratorLog) FetchContractResolutions() (*ContractResolutions, er func (b *boltArbitratorLog) FetchChainActions() (ChainActionMap, error) { actionsMap := make(ChainActionMap) - err := b.db.View(func(tx *bbolt.Tx) error { - scopeBucket := tx.Bucket(b.scopeKey[:]) + err := kvdb.View(b.db, func(tx kvdb.ReadTx) error { + scopeBucket := tx.ReadBucket(b.scopeKey[:]) if scopeBucket == nil { return errScopeBucketNoExist } - actionsBucket := scopeBucket.Bucket(actionsBucketKey) + actionsBucket := scopeBucket.NestedReadBucket(actionsBucketKey) if actionsBucket == nil { return errNoActions } @@ -815,8 +816,8 @@ func (b *boltArbitratorLog) FetchChainActions() (ChainActionMap, error) { // // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error { - return b.db.Update(func(tx *bbolt.Tx) error { - scopeBucket, err := tx.CreateBucketIfNotExists(b.scopeKey[:]) + return kvdb.Batch(b.db, func(tx kvdb.RwTx) error { + scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:]) if err != nil { return err } @@ -836,8 +837,8 @@ func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error { // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) { var c *CommitSet - err := b.db.View(func(tx *bbolt.Tx) error { - scopeBucket := tx.Bucket(b.scopeKey[:]) + err := kvdb.View(b.db, func(tx kvdb.ReadTx) error { + scopeBucket := tx.ReadBucket(b.scopeKey[:]) if scopeBucket == nil { return errScopeBucketNoExist } @@ -868,8 +869,8 @@ func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) { // // NOTE: Part of the ContractResolver interface. func (b *boltArbitratorLog) WipeHistory() error { - return b.db.Update(func(tx *bbolt.Tx) error { - scopeBucket, err := tx.CreateBucketIfNotExists(b.scopeKey[:]) + return kvdb.Update(b.db, func(tx kvdb.RwTx) error { + scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:]) if err != nil { return err } @@ -882,8 +883,8 @@ func (b *boltArbitratorLog) WipeHistory() error { // Next, we'll delete any lingering contract state within the // contracts bucket by removing the bucket itself. - err = scopeBucket.DeleteBucket(contractsBucketKey) - if err != nil && err != bbolt.ErrBucketNotFound { + err = scopeBucket.DeleteNestedBucket(contractsBucketKey) + if err != nil && err != kvdb.ErrBucketNotFound { return err } @@ -895,13 +896,13 @@ func (b *boltArbitratorLog) WipeHistory() error { // We'll delete any chain actions that are still stored by // removing the enclosing bucket. - err = scopeBucket.DeleteBucket(actionsBucketKey) - if err != nil && err != bbolt.ErrBucketNotFound { + err = scopeBucket.DeleteNestedBucket(actionsBucketKey) + if err != nil && err != kvdb.ErrBucketNotFound { return err } // Finally, we'll delete the enclosing bucket itself. - return tx.DeleteBucket(b.scopeKey[:]) + return tx.DeleteTopLevelBucket(b.scopeKey[:]) }) } @@ -909,7 +910,7 @@ func (b *boltArbitratorLog) WipeHistory() error { // ContractResolver instances to checkpoint their state once they reach // milestones during contract resolution. func (b *boltArbitratorLog) checkpointContract(c ContractResolver) error { - return b.db.Batch(func(tx *bbolt.Tx) error { + return kvdb.Update(b.db, func(tx kvdb.RwTx) error { contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:]) if err != nil { return err diff --git a/contractcourt/briefcase_test.go b/contractcourt/briefcase_test.go index 3408b2c6..165016d3 100644 --- a/contractcourt/briefcase_test.go +++ b/contractcourt/briefcase_test.go @@ -14,9 +14,9 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnwallet" ) @@ -104,7 +104,7 @@ var ( } ) -func makeTestDB() (*bbolt.DB, func(), error) { +func makeTestDB() (kvdb.Backend, func(), error) { // First, create a temporary directory to be used for the duration of // this test. tempDirName, err := ioutil.TempDir("", "arblog") @@ -112,7 +112,7 @@ func makeTestDB() (*bbolt.DB, func(), error) { return nil, nil, err } - db, err := bbolt.Open(tempDirName+"/test.db", 0600, nil) + db, err := kvdb.Create(kvdb.BoltBackendName, tempDirName+"/test.db", true) if err != nil { return nil, nil, err } diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index a9dde542..9aa691a9 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -356,7 +356,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, // TODO(roasbeef); abstraction leak... // * rework: adaptor method to set log scope w/ factory func chanLog, err := newBoltArbitratorLog( - c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint, + c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, ) if err != nil { blockEpoch.Cancel() @@ -554,7 +554,7 @@ func (c *ChainArbitrator) Start() error { CloseType: closeChanInfo.CloseType, } chanLog, err := newBoltArbitratorLog( - c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint, + c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, ) if err != nil { blockEpoch.Cancel() diff --git a/contractcourt/channel_arbitrator_test.go b/contractcourt/channel_arbitrator_test.go index bbe1c51f..3dedb420 100644 --- a/contractcourt/channel_arbitrator_test.go +++ b/contractcourt/channel_arbitrator_test.go @@ -13,9 +13,9 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" - "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnwallet" @@ -394,7 +394,7 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog, return nil, err } dbPath := filepath.Join(dbDir, "testdb") - db, err := bbolt.Open(dbPath, 0600, nil) + db, err := kvdb.Create(kvdb.BoltBackendName, dbPath, true) if err != nil { return nil, err }