contractcourt: convert to use new kvdb abstraction

This commit is contained in:
Olaoluwa Osuntokun 2020-01-09 18:44:12 -08:00
parent f0911765af
commit 320101d054
No known key found for this signature in database
GPG Key ID: BC13F65E2DC84465
4 changed files with 44 additions and 43 deletions

@ -8,8 +8,8 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
)
@ -309,7 +309,7 @@ var (
// boltArbitratorLog is an implementation of the ArbitratorLog interface backed
// by a bolt DB instance.
type boltArbitratorLog struct {
db *bbolt.DB
db kvdb.Backend
cfg ChannelArbitratorConfig
@ -318,7 +318,7 @@ type boltArbitratorLog struct {
// newBoltArbitratorLog returns a new instance of the boltArbitratorLog given
// an arbitrator config, and the items needed to create its log scope.
func newBoltArbitratorLog(db *bbolt.DB, cfg ChannelArbitratorConfig,
func newBoltArbitratorLog(db kvdb.Backend, cfg ChannelArbitratorConfig,
chainHash chainhash.Hash, chanPoint wire.OutPoint) (*boltArbitratorLog, error) {
scope, err := newLogScope(chainHash, chanPoint)
@ -337,13 +337,13 @@ func newBoltArbitratorLog(db *bbolt.DB, cfg ChannelArbitratorConfig,
// interface.
var _ ArbitratorLog = (*boltArbitratorLog)(nil)
func fetchContractReadBucket(tx *bbolt.Tx, scopeKey []byte) (*bbolt.Bucket, error) {
scopeBucket := tx.Bucket(scopeKey)
func fetchContractReadBucket(tx kvdb.ReadTx, scopeKey []byte) (kvdb.ReadBucket, error) {
scopeBucket := tx.ReadBucket(scopeKey)
if scopeBucket == nil {
return nil, errScopeBucketNoExist
}
contractBucket := scopeBucket.Bucket(contractsBucketKey)
contractBucket := scopeBucket.NestedReadBucket(contractsBucketKey)
if contractBucket == nil {
return nil, errNoContracts
}
@ -351,8 +351,8 @@ func fetchContractReadBucket(tx *bbolt.Tx, scopeKey []byte) (*bbolt.Bucket, erro
return contractBucket, nil
}
func fetchContractWriteBucket(tx *bbolt.Tx, scopeKey []byte) (*bbolt.Bucket, error) {
scopeBucket, err := tx.CreateBucketIfNotExists(scopeKey)
func fetchContractWriteBucket(tx kvdb.RwTx, scopeKey []byte) (kvdb.RwBucket, error) {
scopeBucket, err := tx.CreateTopLevelBucket(scopeKey)
if err != nil {
return nil, err
}
@ -369,7 +369,7 @@ func fetchContractWriteBucket(tx *bbolt.Tx, scopeKey []byte) (*bbolt.Bucket, err
// writeResolver is a helper method that writes a contract resolver and stores
// it it within the passed contractBucket using its unique resolutionsKey key.
func (b *boltArbitratorLog) writeResolver(contractBucket *bbolt.Bucket,
func (b *boltArbitratorLog) writeResolver(contractBucket kvdb.RwBucket,
res ContractResolver) error {
// Only persist resolvers that are stateful. Stateless resolvers don't
@ -415,8 +415,8 @@ func (b *boltArbitratorLog) writeResolver(contractBucket *bbolt.Bucket,
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) {
var s ArbitratorState
err := b.db.View(func(tx *bbolt.Tx) error {
scopeBucket := tx.Bucket(b.scopeKey[:])
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}
@ -440,8 +440,9 @@ func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) {
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) CommitState(s ArbitratorState) error {
return b.db.Batch(func(tx *bbolt.Tx) error {
scopeBucket, err := tx.CreateBucketIfNotExists(b.scopeKey[:])
fmt.Printf("yeee: %T\n", b.db)
return kvdb.Batch(b.db, func(tx kvdb.RwTx) error {
scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:])
if err != nil {
return err
}
@ -460,7 +461,7 @@ func (b *boltArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver, erro
Checkpoint: b.checkpointContract,
}
var contracts []ContractResolver
err := b.db.View(func(tx *bbolt.Tx) error {
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
contractBucket, err := fetchContractReadBucket(tx, b.scopeKey[:])
if err != nil {
return err
@ -533,7 +534,7 @@ func (b *boltArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver, erro
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) InsertUnresolvedContracts(resolvers ...ContractResolver) error {
return b.db.Batch(func(tx *bbolt.Tx) error {
return kvdb.Batch(b.db, func(tx kvdb.RwTx) error {
contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:])
if err != nil {
return err
@ -556,7 +557,7 @@ func (b *boltArbitratorLog) InsertUnresolvedContracts(resolvers ...ContractResol
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) SwapContract(oldContract, newContract ContractResolver) error {
return b.db.Batch(func(tx *bbolt.Tx) error {
return kvdb.Batch(b.db, func(tx kvdb.RwTx) error {
contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:])
if err != nil {
return err
@ -576,7 +577,7 @@ func (b *boltArbitratorLog) SwapContract(oldContract, newContract ContractResolv
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) ResolveContract(res ContractResolver) error {
return b.db.Batch(func(tx *bbolt.Tx) error {
return kvdb.Batch(b.db, func(tx kvdb.RwTx) error {
contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:])
if err != nil {
return err
@ -594,8 +595,8 @@ func (b *boltArbitratorLog) ResolveContract(res ContractResolver) error {
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) LogContractResolutions(c *ContractResolutions) error {
return b.db.Batch(func(tx *bbolt.Tx) error {
scopeBucket, err := tx.CreateBucketIfNotExists(b.scopeKey[:])
return kvdb.Batch(b.db, func(tx kvdb.RwTx) error {
scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:])
if err != nil {
return err
}
@ -674,8 +675,8 @@ func (b *boltArbitratorLog) LogContractResolutions(c *ContractResolutions) error
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) FetchContractResolutions() (*ContractResolutions, error) {
c := &ContractResolutions{}
err := b.db.View(func(tx *bbolt.Tx) error {
scopeBucket := tx.Bucket(b.scopeKey[:])
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}
@ -773,13 +774,13 @@ func (b *boltArbitratorLog) FetchContractResolutions() (*ContractResolutions, er
func (b *boltArbitratorLog) FetchChainActions() (ChainActionMap, error) {
actionsMap := make(ChainActionMap)
err := b.db.View(func(tx *bbolt.Tx) error {
scopeBucket := tx.Bucket(b.scopeKey[:])
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}
actionsBucket := scopeBucket.Bucket(actionsBucketKey)
actionsBucket := scopeBucket.NestedReadBucket(actionsBucketKey)
if actionsBucket == nil {
return errNoActions
}
@ -815,8 +816,8 @@ func (b *boltArbitratorLog) FetchChainActions() (ChainActionMap, error) {
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error {
return b.db.Update(func(tx *bbolt.Tx) error {
scopeBucket, err := tx.CreateBucketIfNotExists(b.scopeKey[:])
return kvdb.Batch(b.db, func(tx kvdb.RwTx) error {
scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:])
if err != nil {
return err
}
@ -836,8 +837,8 @@ func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error {
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
var c *CommitSet
err := b.db.View(func(tx *bbolt.Tx) error {
scopeBucket := tx.Bucket(b.scopeKey[:])
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}
@ -868,8 +869,8 @@ func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) WipeHistory() error {
return b.db.Update(func(tx *bbolt.Tx) error {
scopeBucket, err := tx.CreateBucketIfNotExists(b.scopeKey[:])
return kvdb.Update(b.db, func(tx kvdb.RwTx) error {
scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:])
if err != nil {
return err
}
@ -882,8 +883,8 @@ func (b *boltArbitratorLog) WipeHistory() error {
// Next, we'll delete any lingering contract state within the
// contracts bucket by removing the bucket itself.
err = scopeBucket.DeleteBucket(contractsBucketKey)
if err != nil && err != bbolt.ErrBucketNotFound {
err = scopeBucket.DeleteNestedBucket(contractsBucketKey)
if err != nil && err != kvdb.ErrBucketNotFound {
return err
}
@ -895,13 +896,13 @@ func (b *boltArbitratorLog) WipeHistory() error {
// We'll delete any chain actions that are still stored by
// removing the enclosing bucket.
err = scopeBucket.DeleteBucket(actionsBucketKey)
if err != nil && err != bbolt.ErrBucketNotFound {
err = scopeBucket.DeleteNestedBucket(actionsBucketKey)
if err != nil && err != kvdb.ErrBucketNotFound {
return err
}
// Finally, we'll delete the enclosing bucket itself.
return tx.DeleteBucket(b.scopeKey[:])
return tx.DeleteTopLevelBucket(b.scopeKey[:])
})
}
@ -909,7 +910,7 @@ func (b *boltArbitratorLog) WipeHistory() error {
// ContractResolver instances to checkpoint their state once they reach
// milestones during contract resolution.
func (b *boltArbitratorLog) checkpointContract(c ContractResolver) error {
return b.db.Batch(func(tx *bbolt.Tx) error {
return kvdb.Update(b.db, func(tx kvdb.RwTx) error {
contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:])
if err != nil {
return err

@ -14,9 +14,9 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
)
@ -104,7 +104,7 @@ var (
}
)
func makeTestDB() (*bbolt.DB, func(), error) {
func makeTestDB() (kvdb.Backend, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "arblog")
@ -112,7 +112,7 @@ func makeTestDB() (*bbolt.DB, func(), error) {
return nil, nil, err
}
db, err := bbolt.Open(tempDirName+"/test.db", 0600, nil)
db, err := kvdb.Create(kvdb.BoltBackendName, tempDirName+"/test.db", true)
if err != nil {
return nil, nil, err
}

@ -356,7 +356,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
// TODO(roasbeef); abstraction leak...
// * rework: adaptor method to set log scope w/ factory func
chanLog, err := newBoltArbitratorLog(
c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint,
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
blockEpoch.Cancel()
@ -554,7 +554,7 @@ func (c *ChainArbitrator) Start() error {
CloseType: closeChanInfo.CloseType,
}
chanLog, err := newBoltArbitratorLog(
c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint,
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
blockEpoch.Cancel()

@ -13,9 +13,9 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
@ -394,7 +394,7 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
return nil, err
}
dbPath := filepath.Join(dbDir, "testdb")
db, err := bbolt.Open(dbPath, 0600, nil)
db, err := kvdb.Create(kvdb.BoltBackendName, dbPath, true)
if err != nil {
return nil, err
}