routing: convert to use new kvdb abstraction

This commit is contained in:
Olaoluwa Osuntokun 2020-01-09 18:44:43 -08:00
parent 4e68914e9d
commit 28bbaa2a94
No known key found for this signature in database
GPG Key ID: BC13F65E2DC84465
8 changed files with 32 additions and 32 deletions

@ -1,8 +1,8 @@
package routing package routing
import ( import (
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
) )
@ -26,7 +26,7 @@ type routingGraph interface {
// database. // database.
type dbRoutingTx struct { type dbRoutingTx struct {
graph *channeldb.ChannelGraph graph *channeldb.ChannelGraph
tx *bbolt.Tx tx kvdb.ReadTx
source route.Vertex source route.Vertex
} }
@ -38,7 +38,7 @@ func newDbRoutingTx(graph *channeldb.ChannelGraph) (*dbRoutingTx, error) {
return nil, err return nil, err
} }
tx, err := graph.Database().Begin(false) tx, err := graph.Database().BeginReadTx()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -62,7 +62,7 @@ func (g *dbRoutingTx) forEachNodeChannel(nodePub route.Vertex,
cb func(*channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy, cb func(*channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy,
*channeldb.ChannelEdgePolicy) error) error { *channeldb.ChannelEdgePolicy) error) error {
txCb := func(_ *bbolt.Tx, info *channeldb.ChannelEdgeInfo, txCb := func(_ kvdb.ReadTx, info *channeldb.ChannelEdgeInfo,
p1, p2 *channeldb.ChannelEdgePolicy) error { p1, p2 *channeldb.ChannelEdgePolicy) error {
return cb(info, p1, p2) return cb(info, p1, p2)

@ -6,7 +6,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
) )
@ -82,7 +82,7 @@ func (c *integratedRoutingContext) testPayment(expectedNofAttempts int) {
dbPath := file.Name() dbPath := file.Name()
defer os.Remove(dbPath) defer os.Remove(dbPath)
db, err := bbolt.Open(dbPath, 0600, nil) db, err := kvdb.Open(kvdb.BoltBackendName, dbPath, true)
if err != nil { if err != nil {
c.t.Fatal(err) c.t.Fatal(err)
} }

@ -4,8 +4,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
) )
@ -173,7 +173,7 @@ type paymentResult struct {
} }
// NewMissionControl returns a new instance of missionControl. // NewMissionControl returns a new instance of missionControl.
func NewMissionControl(db *bbolt.DB, cfg *MissionControlConfig) ( func NewMissionControl(db kvdb.Backend, cfg *MissionControlConfig) (
*MissionControl, error) { *MissionControl, error) {
log.Debugf("Instantiating mission control with config: "+ log.Debugf("Instantiating mission control with config: "+

@ -7,8 +7,8 @@ import (
"time" "time"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -35,20 +35,20 @@ const (
// Also changes to mission control parameters can be applied to historical data. // Also changes to mission control parameters can be applied to historical data.
// Finally, it enables importing raw data from an external source. // Finally, it enables importing raw data from an external source.
type missionControlStore struct { type missionControlStore struct {
db *bbolt.DB db kvdb.Backend
maxRecords int maxRecords int
numRecords int numRecords int
} }
func newMissionControlStore(db *bbolt.DB, maxRecords int) (*missionControlStore, error) { func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlStore, error) {
store := &missionControlStore{ store := &missionControlStore{
db: db, db: db,
maxRecords: maxRecords, maxRecords: maxRecords,
} }
// Create buckets if not yet existing. // Create buckets if not yet existing.
err := db.Update(func(tx *bbolt.Tx) error { err := kvdb.Update(db, func(tx kvdb.RwTx) error {
resultsBucket, err := tx.CreateBucketIfNotExists(resultsKey) resultsBucket, err := tx.CreateTopLevelBucket(resultsKey)
if err != nil { if err != nil {
return fmt.Errorf("cannot create results bucket: %v", return fmt.Errorf("cannot create results bucket: %v",
err) err)
@ -58,7 +58,7 @@ func newMissionControlStore(db *bbolt.DB, maxRecords int) (*missionControlStore,
// memory to avoid calling Stats().KeyN. The reliability of // memory to avoid calling Stats().KeyN. The reliability of
// Stats() is doubtful and seemed to have caused crashes in the // Stats() is doubtful and seemed to have caused crashes in the
// past (see #1874). // past (see #1874).
c := resultsBucket.Cursor() c := resultsBucket.ReadCursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() { for k, _ := c.First(); k != nil; k, _ = c.Next() {
store.numRecords++ store.numRecords++
} }
@ -74,12 +74,12 @@ func newMissionControlStore(db *bbolt.DB, maxRecords int) (*missionControlStore,
// clear removes all results from the db. // clear removes all results from the db.
func (b *missionControlStore) clear() error { func (b *missionControlStore) clear() error {
return b.db.Update(func(tx *bbolt.Tx) error { return kvdb.Update(b.db, func(tx kvdb.RwTx) error {
if err := tx.DeleteBucket(resultsKey); err != nil { if err := tx.DeleteTopLevelBucket(resultsKey); err != nil {
return err return err
} }
_, err := tx.CreateBucket(resultsKey) _, err := tx.CreateTopLevelBucket(resultsKey)
return err return err
}) })
} }
@ -88,8 +88,8 @@ func (b *missionControlStore) clear() error {
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) { func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
var results []*paymentResult var results []*paymentResult
err := b.db.View(func(tx *bbolt.Tx) error { err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
resultBucket := tx.Bucket(resultsKey) resultBucket := tx.ReadBucket(resultsKey)
results = make([]*paymentResult, 0) results = make([]*paymentResult, 0)
return resultBucket.ForEach(func(k, v []byte) error { return resultBucket.ForEach(func(k, v []byte) error {
@ -218,13 +218,13 @@ func deserializeResult(k, v []byte) (*paymentResult, error) {
// AddResult adds a new result to the db. // AddResult adds a new result to the db.
func (b *missionControlStore) AddResult(rp *paymentResult) error { func (b *missionControlStore) AddResult(rp *paymentResult) error {
return b.db.Update(func(tx *bbolt.Tx) error { return kvdb.Update(b.db, func(tx kvdb.RwTx) error {
bucket := tx.Bucket(resultsKey) bucket := tx.ReadWriteBucket(resultsKey)
// Prune oldest entries. // Prune oldest entries.
if b.maxRecords > 0 { if b.maxRecords > 0 {
for b.numRecords >= b.maxRecords { for b.numRecords >= b.maxRecords {
cursor := bucket.Cursor() cursor := bucket.ReadWriteCursor()
cursor.First() cursor.First()
if err := cursor.Delete(); err != nil { if err := cursor.Delete(); err != nil {
return err return err

@ -8,9 +8,9 @@ import (
"time" "time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
) )
@ -31,7 +31,7 @@ func TestMissionControlStore(t *testing.T) {
dbPath := file.Name() dbPath := file.Name()
db, err := bbolt.Open(dbPath, 0600, nil) db, err := kvdb.Create(kvdb.BoltBackendName, dbPath, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -6,7 +6,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
) )
@ -44,7 +44,7 @@ type mcTestContext struct {
mc *MissionControl mc *MissionControl
now time.Time now time.Time
db *bbolt.DB db kvdb.Backend
dbPath string dbPath string
pid uint64 pid uint64
@ -63,7 +63,7 @@ func createMcTestContext(t *testing.T) *mcTestContext {
ctx.dbPath = file.Name() ctx.dbPath = file.Name()
ctx.db, err = bbolt.Open(ctx.dbPath, 0600, nil) ctx.db, err = kvdb.Open(kvdb.BoltBackendName, ctx.dbPath, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -11,12 +11,12 @@ import (
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors" "github.com/go-errors/errors"
sphinx "github.com/lightningnetwork/lightning-onion" sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
@ -2111,7 +2111,7 @@ func (r *ChannelRouter) FetchLightningNode(node route.Vertex) (*channeldb.Lightn
// //
// NOTE: This method is part of the ChannelGraphSource interface. // NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) error { func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) error {
return r.cfg.Graph.ForEachNode(nil, func(_ *bbolt.Tx, n *channeldb.LightningNode) error { return r.cfg.Graph.ForEachNode(nil, func(_ kvdb.ReadTx, n *channeldb.LightningNode) error {
return cb(n) return cb(n)
}) })
} }
@ -2123,7 +2123,7 @@ func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) err
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(*channeldb.ChannelEdgeInfo, func (r *ChannelRouter) ForAllOutgoingChannels(cb func(*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error { *channeldb.ChannelEdgePolicy) error) error {
return r.selfNode.ForEachChannel(nil, func(_ *bbolt.Tx, c *channeldb.ChannelEdgeInfo, return r.selfNode.ForEachChannel(nil, func(_ kvdb.ReadTx, c *channeldb.ChannelEdgeInfo,
e, _ *channeldb.ChannelEdgePolicy) error { e, _ *channeldb.ChannelEdgePolicy) error {
if e == nil { if e == nil {
@ -2264,7 +2264,7 @@ func generateBandwidthHints(sourceNode *channeldb.LightningNode,
// First, we'll collect the set of outbound edges from the target // First, we'll collect the set of outbound edges from the target
// source node. // source node.
var localChans []*channeldb.ChannelEdgeInfo var localChans []*channeldb.ChannelEdgeInfo
err := sourceNode.ForEachChannel(nil, func(tx *bbolt.Tx, err := sourceNode.ForEachChannel(nil, func(tx kvdb.ReadTx,
edgeInfo *channeldb.ChannelEdgeInfo, edgeInfo *channeldb.ChannelEdgeInfo,
_, _ *channeldb.ChannelEdgePolicy) error { _, _ *channeldb.ChannelEdgePolicy) error {

@ -97,7 +97,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
} }
mc, err := NewMissionControl( mc, err := NewMissionControl(
graphInstance.graph.Database().DB, graphInstance.graph.Database(),
mcConfig, mcConfig,
) )
if err != nil { if err != nil {