From 557b930c5fc427ad1d5122bde6f6b459be1c3c90 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 9 Jan 2020 18:45:04 -0800 Subject: [PATCH] watchtower: convert to use new kvdb abstraction --- watchtower/wtdb/client_db.go | 114 +++++++++++++++++------------------ watchtower/wtdb/db_common.go | 15 ++--- watchtower/wtdb/tower_db.go | 84 +++++++++++++------------- watchtower/wtdb/version.go | 22 +++---- 4 files changed, 115 insertions(+), 120 deletions(-) diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index 169b9042..22b913e1 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -8,7 +8,7 @@ import ( "net" "github.com/btcsuite/btcd/btcec" - "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" ) @@ -113,7 +113,7 @@ var ( // ClientDB is single database providing a persistent storage engine for the // wtclient. type ClientDB struct { - db *bbolt.DB + db kvdb.Backend dbPath string } @@ -146,7 +146,7 @@ func OpenClientDB(dbPath string) (*ClientDB, error) { // initialized. This allows us to assume their presence throughout all // operations. If an known top-level bucket is expected to exist but is // missing, this will trigger a ErrUninitializedDB error. - err = clientDB.db.Update(initClientDBBuckets) + err = kvdb.Update(clientDB.db, initClientDBBuckets) if err != nil { bdb.Close() return nil, err @@ -157,7 +157,7 @@ func OpenClientDB(dbPath string) (*ClientDB, error) { // initClientDBBuckets creates all top-level buckets required to handle database // operations required by the latest version. -func initClientDBBuckets(tx *bbolt.Tx) error { +func initClientDBBuckets(tx kvdb.RwTx) error { buckets := [][]byte{ cSessionKeyIndexBkt, cChanSummaryBkt, @@ -167,7 +167,7 @@ func initClientDBBuckets(tx *bbolt.Tx) error { } for _, bucket := range buckets { - _, err := tx.CreateBucketIfNotExists(bucket) + _, err := tx.CreateTopLevelBucket(bucket) if err != nil { return err } @@ -179,7 +179,7 @@ func initClientDBBuckets(tx *bbolt.Tx) error { // bdb returns the backing bbolt.DB instance. // // NOTE: Part of the versionedDB interface. -func (c *ClientDB) bdb() *bbolt.DB { +func (c *ClientDB) bdb() kvdb.Backend { return c.db } @@ -188,7 +188,7 @@ func (c *ClientDB) bdb() *bbolt.DB { // NOTE: Part of the versionedDB interface. func (c *ClientDB) Version() (uint32, error) { var version uint32 - err := c.db.View(func(tx *bbolt.Tx) error { + err := kvdb.View(c.db, func(tx kvdb.ReadTx) error { var err error version, err = getDBVersion(tx) return err @@ -215,13 +215,13 @@ func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) { copy(towerPubKey[:], lnAddr.IdentityKey.SerializeCompressed()) var tower *Tower - err := c.db.Update(func(tx *bbolt.Tx) error { - towerIndex := tx.Bucket(cTowerIndexBkt) + err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { + towerIndex := tx.ReadWriteBucket(cTowerIndexBkt) if towerIndex == nil { return ErrUninitializedDB } - towers := tx.Bucket(cTowerBkt) + towers := tx.ReadWriteBucket(cTowerBkt) if towers == nil { return ErrUninitializedDB } @@ -248,7 +248,7 @@ func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) { // // TODO(wilmer): with an index of tower -> sessions we // can avoid the linear lookup. - sessions := tx.Bucket(cSessionBkt) + sessions := tx.ReadWriteBucket(cSessionBkt) if sessions == nil { return ErrUninitializedDB } @@ -308,12 +308,12 @@ func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) { // // NOTE: An error is not returned if the tower doesn't exist. func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error { - return c.db.Update(func(tx *bbolt.Tx) error { - towers := tx.Bucket(cTowerBkt) + return kvdb.Update(c.db, func(tx kvdb.RwTx) error { + towers := tx.ReadWriteBucket(cTowerBkt) if towers == nil { return ErrUninitializedDB } - towerIndex := tx.Bucket(cTowerIndexBkt) + towerIndex := tx.ReadWriteBucket(cTowerIndexBkt) if towerIndex == nil { return ErrUninitializedDB } @@ -342,7 +342,7 @@ func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error { // // TODO(wilmer): with an index of tower -> sessions we can avoid // the linear lookup. - sessions := tx.Bucket(cSessionBkt) + sessions := tx.ReadWriteBucket(cSessionBkt) if sessions == nil { return ErrUninitializedDB } @@ -383,8 +383,8 @@ func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error { // LoadTowerByID retrieves a tower by its tower ID. func (c *ClientDB) LoadTowerByID(towerID TowerID) (*Tower, error) { var tower *Tower - err := c.db.View(func(tx *bbolt.Tx) error { - towers := tx.Bucket(cTowerBkt) + err := kvdb.View(c.db, func(tx kvdb.ReadTx) error { + towers := tx.ReadBucket(cTowerBkt) if towers == nil { return ErrUninitializedDB } @@ -403,12 +403,12 @@ func (c *ClientDB) LoadTowerByID(towerID TowerID) (*Tower, error) { // LoadTower retrieves a tower by its public key. func (c *ClientDB) LoadTower(pubKey *btcec.PublicKey) (*Tower, error) { var tower *Tower - err := c.db.View(func(tx *bbolt.Tx) error { - towers := tx.Bucket(cTowerBkt) + err := kvdb.View(c.db, func(tx kvdb.ReadTx) error { + towers := tx.ReadBucket(cTowerBkt) if towers == nil { return ErrUninitializedDB } - towerIndex := tx.Bucket(cTowerIndexBkt) + towerIndex := tx.ReadBucket(cTowerIndexBkt) if towerIndex == nil { return ErrUninitializedDB } @@ -432,8 +432,8 @@ func (c *ClientDB) LoadTower(pubKey *btcec.PublicKey) (*Tower, error) { // ListTowers retrieves the list of towers available within the database. func (c *ClientDB) ListTowers() ([]*Tower, error) { var towers []*Tower - err := c.db.View(func(tx *bbolt.Tx) error { - towerBucket := tx.Bucket(cTowerBkt) + err := kvdb.View(c.db, func(tx kvdb.ReadTx) error { + towerBucket := tx.ReadBucket(cTowerBkt) if towerBucket == nil { return ErrUninitializedDB } @@ -461,8 +461,8 @@ func (c *ClientDB) ListTowers() ([]*Tower, error) { // CreateClientSession is invoked should return the same index. func (c *ClientDB) NextSessionKeyIndex(towerID TowerID) (uint32, error) { var index uint32 - err := c.db.Update(func(tx *bbolt.Tx) error { - keyIndex := tx.Bucket(cSessionKeyIndexBkt) + err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { + keyIndex := tx.ReadWriteBucket(cSessionKeyIndexBkt) if keyIndex == nil { return ErrUninitializedDB } @@ -509,20 +509,20 @@ func (c *ClientDB) NextSessionKeyIndex(towerID TowerID) (uint32, error) { // CreateClientSession records a newly negotiated client session in the set of // active sessions. The session can be identified by its SessionID. func (c *ClientDB) CreateClientSession(session *ClientSession) error { - return c.db.Update(func(tx *bbolt.Tx) error { - keyIndexes := tx.Bucket(cSessionKeyIndexBkt) + return kvdb.Update(c.db, func(tx kvdb.RwTx) error { + keyIndexes := tx.ReadWriteBucket(cSessionKeyIndexBkt) if keyIndexes == nil { return ErrUninitializedDB } - sessions := tx.Bucket(cSessionBkt) + sessions := tx.ReadWriteBucket(cSessionBkt) if sessions == nil { return ErrUninitializedDB } // Check that client session with this session id doesn't // already exist. - existingSessionBytes := sessions.Bucket(session.ID[:]) + existingSessionBytes := sessions.NestedReadWriteBucket(session.ID[:]) if existingSessionBytes != nil { return ErrClientSessionAlreadyExists } @@ -558,8 +558,8 @@ func (c *ClientDB) CreateClientSession(session *ClientSession) error { // response that do not correspond to this tower. func (c *ClientDB) ListClientSessions(id *TowerID) (map[SessionID]*ClientSession, error) { var clientSessions map[SessionID]*ClientSession - err := c.db.View(func(tx *bbolt.Tx) error { - sessions := tx.Bucket(cSessionBkt) + err := kvdb.View(c.db, func(tx kvdb.ReadTx) error { + sessions := tx.ReadBucket(cSessionBkt) if sessions == nil { return ErrUninitializedDB } @@ -577,7 +577,7 @@ func (c *ClientDB) ListClientSessions(id *TowerID) (map[SessionID]*ClientSession // listClientSessions returns the set of all client sessions known to the db. An // optional tower ID can be used to filter out any client sessions in the // response that do not correspond to this tower. -func listClientSessions(sessions *bbolt.Bucket, +func listClientSessions(sessions kvdb.ReadBucket, id *TowerID) (map[SessionID]*ClientSession, error) { clientSessions := make(map[SessionID]*ClientSession) @@ -612,8 +612,8 @@ func listClientSessions(sessions *bbolt.Bucket, // channel summaries. func (c *ClientDB) FetchChanSummaries() (ChannelSummaries, error) { summaries := make(map[lnwire.ChannelID]ClientChanSummary) - err := c.db.View(func(tx *bbolt.Tx) error { - chanSummaries := tx.Bucket(cChanSummaryBkt) + err := kvdb.View(c.db, func(tx kvdb.ReadTx) error { + chanSummaries := tx.ReadBucket(cChanSummaryBkt) if chanSummaries == nil { return ErrUninitializedDB } @@ -648,8 +648,8 @@ func (c *ClientDB) FetchChanSummaries() (ChannelSummaries, error) { func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID, sweepPkScript []byte) error { - return c.db.Update(func(tx *bbolt.Tx) error { - chanSummaries := tx.Bucket(cChanSummaryBkt) + return kvdb.Update(c.db, func(tx kvdb.RwTx) error { + chanSummaries := tx.ReadWriteBucket(cChanSummaryBkt) if chanSummaries == nil { return ErrUninitializedDB } @@ -692,8 +692,8 @@ func (c *ClientDB) CommitUpdate(id *SessionID, update *CommittedUpdate) (uint16, error) { var lastApplied uint16 - err := c.db.Update(func(tx *bbolt.Tx) error { - sessions := tx.Bucket(cSessionBkt) + err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { + sessions := tx.ReadWriteBucket(cSessionBkt) if sessions == nil { return ErrUninitializedDB } @@ -708,7 +708,7 @@ func (c *ClientDB) CommitUpdate(id *SessionID, } // Can't fail if the above didn't fail. - sessionBkt := sessions.Bucket(id[:]) + sessionBkt := sessions.NestedReadWriteBucket(id[:]) // Ensure the session commits sub-bucket is initialized. sessionCommits, err := sessionBkt.CreateBucketIfNotExists( @@ -796,8 +796,8 @@ func (c *ClientDB) CommitUpdate(id *SessionID, func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16, lastApplied uint16) error { - return c.db.Update(func(tx *bbolt.Tx) error { - sessions := tx.Bucket(cSessionBkt) + return kvdb.Update(c.db, func(tx kvdb.RwTx) error { + sessions := tx.ReadWriteBucket(cSessionBkt) if sessions == nil { return ErrUninitializedDB } @@ -835,11 +835,11 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16, } // Can't fail because of getClientSession succeeded. - sessionBkt := sessions.Bucket(id[:]) + sessionBkt := sessions.NestedReadWriteBucket(id[:]) // If the commits sub-bucket doesn't exist, there can't possibly // be a corresponding committed update to remove. - sessionCommits := sessionBkt.Bucket(cSessionCommits) + sessionCommits := sessionBkt.NestedReadWriteBucket(cSessionCommits) if sessionCommits == nil { return ErrCommittedUpdateNotFound } @@ -894,10 +894,10 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16, // bucket corresponding to the serialized session id. This does not deserialize // the CommittedUpdates or AckUpdates associated with the session. If the caller // requires this info, use getClientSession. -func getClientSessionBody(sessions *bbolt.Bucket, +func getClientSessionBody(sessions kvdb.ReadBucket, idBytes []byte) (*ClientSession, error) { - sessionBkt := sessions.Bucket(idBytes) + sessionBkt := sessions.NestedReadBucket(idBytes) if sessionBkt == nil { return nil, ErrClientSessionNotFound } @@ -922,7 +922,7 @@ func getClientSessionBody(sessions *bbolt.Bucket, // getClientSession loads the full ClientSession associated with the serialized // session id. This method populates the CommittedUpdates and AckUpdates in // addition to the ClientSession's body. -func getClientSession(sessions *bbolt.Bucket, +func getClientSession(sessions kvdb.ReadBucket, idBytes []byte) (*ClientSession, error) { session, err := getClientSessionBody(sessions, idBytes) @@ -950,17 +950,17 @@ func getClientSession(sessions *bbolt.Bucket, // getClientSessionCommits retrieves all committed updates for the session // identified by the serialized session id. -func getClientSessionCommits(sessions *bbolt.Bucket, +func getClientSessionCommits(sessions kvdb.ReadBucket, idBytes []byte) ([]CommittedUpdate, error) { // Can't fail because client session body has already been read. - sessionBkt := sessions.Bucket(idBytes) + sessionBkt := sessions.NestedReadBucket(idBytes) // Initialize commitedUpdates so that we can return an initialized map // if no committed updates exist. committedUpdates := make([]CommittedUpdate, 0) - sessionCommits := sessionBkt.Bucket(cSessionCommits) + sessionCommits := sessionBkt.NestedReadBucket(cSessionCommits) if sessionCommits == nil { return committedUpdates, nil } @@ -986,17 +986,17 @@ func getClientSessionCommits(sessions *bbolt.Bucket, // getClientSessionAcks retrieves all acked updates for the session identified // by the serialized session id. -func getClientSessionAcks(sessions *bbolt.Bucket, +func getClientSessionAcks(sessions kvdb.ReadBucket, idBytes []byte) (map[uint16]BackupID, error) { // Can't fail because client session body has already been read. - sessionBkt := sessions.Bucket(idBytes) + sessionBkt := sessions.NestedReadBucket(idBytes) // Initialize ackedUpdates so that we can return an initialized map if // no acked updates exist. ackedUpdates := make(map[uint16]BackupID) - sessionAcks := sessionBkt.Bucket(cSessionAcks) + sessionAcks := sessionBkt.NestedReadBucket(cSessionAcks) if sessionAcks == nil { return ackedUpdates, nil } @@ -1023,7 +1023,7 @@ func getClientSessionAcks(sessions *bbolt.Bucket, // putClientSessionBody stores the body of the ClientSession (everything but the // CommittedUpdates and AckedUpdates). -func putClientSessionBody(sessions *bbolt.Bucket, +func putClientSessionBody(sessions kvdb.RwBucket, session *ClientSession) error { sessionBkt, err := sessions.CreateBucketIfNotExists(session.ID[:]) @@ -1042,7 +1042,7 @@ func putClientSessionBody(sessions *bbolt.Bucket, // markSessionStatus updates the persisted state of the session to the new // status. -func markSessionStatus(sessions *bbolt.Bucket, session *ClientSession, +func markSessionStatus(sessions kvdb.RwBucket, session *ClientSession, status CSessionStatus) error { session.Status = status @@ -1050,7 +1050,7 @@ func markSessionStatus(sessions *bbolt.Bucket, session *ClientSession, } // getChanSummary loads a ClientChanSummary for the passed chanID. -func getChanSummary(chanSummaries *bbolt.Bucket, +func getChanSummary(chanSummaries kvdb.ReadBucket, chanID lnwire.ChannelID) (*ClientChanSummary, error) { chanSummaryBytes := chanSummaries.Get(chanID[:]) @@ -1068,7 +1068,7 @@ func getChanSummary(chanSummaries *bbolt.Bucket, } // putChanSummary stores a ClientChanSummary for the passed chanID. -func putChanSummary(chanSummaries *bbolt.Bucket, chanID lnwire.ChannelID, +func putChanSummary(chanSummaries kvdb.RwBucket, chanID lnwire.ChannelID, summary *ClientChanSummary) error { var b bytes.Buffer @@ -1081,7 +1081,7 @@ func putChanSummary(chanSummaries *bbolt.Bucket, chanID lnwire.ChannelID, } // getTower loads a Tower identified by its serialized tower id. -func getTower(towers *bbolt.Bucket, id []byte) (*Tower, error) { +func getTower(towers kvdb.ReadBucket, id []byte) (*Tower, error) { towerBytes := towers.Get(id) if towerBytes == nil { return nil, ErrTowerNotFound @@ -1099,7 +1099,7 @@ func getTower(towers *bbolt.Bucket, id []byte) (*Tower, error) { } // putTower stores a Tower identified by its serialized tower id. -func putTower(towers *bbolt.Bucket, tower *Tower) error { +func putTower(towers kvdb.RwBucket, tower *Tower) error { var b bytes.Buffer err := tower.Encode(&b) if err != nil { diff --git a/watchtower/wtdb/db_common.go b/watchtower/wtdb/db_common.go index ed6f1c6b..1592f1e6 100644 --- a/watchtower/wtdb/db_common.go +++ b/watchtower/wtdb/db_common.go @@ -6,7 +6,7 @@ import ( "os" "path/filepath" - "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/channeldb/kvdb" ) const ( @@ -49,7 +49,7 @@ func fileExists(path string) bool { // one doesn't exist. The boolean returned indicates if the database did not // exist before, or if it has been created but no version metadata exists within // it. -func createDBIfNotExist(dbPath, name string) (*bbolt.DB, bool, error) { +func createDBIfNotExist(dbPath, name string) (kvdb.Backend, bool, error) { path := filepath.Join(dbPath, name) // If the database file doesn't exist, this indicates we much initialize @@ -65,12 +65,7 @@ func createDBIfNotExist(dbPath, name string) (*bbolt.DB, bool, error) { // Specify bbolt freelist options to reduce heap pressure in case the // freelist grows to be very large. - options := &bbolt.Options{ - NoFreelistSync: true, - FreelistType: bbolt.FreelistMapType, - } - - bdb, err := bbolt.Open(path, dbFilePermission, options) + bdb, err := kvdb.Create(kvdb.BoltBackendName, path, true) if err != nil { return nil, false, err } @@ -82,8 +77,8 @@ func createDBIfNotExist(dbPath, name string) (*bbolt.DB, bool, error) { // set firstInit to true so that we can treat is initialize the bucket. if !firstInit { var metadataExists bool - err = bdb.View(func(tx *bbolt.Tx) error { - metadataExists = tx.Bucket(metadataBkt) != nil + err = kvdb.View(bdb, func(tx kvdb.ReadTx) error { + metadataExists = tx.ReadBucket(metadataBkt) != nil return nil }) if err != nil { diff --git a/watchtower/wtdb/tower_db.go b/watchtower/wtdb/tower_db.go index 92a9e55a..39782f1d 100644 --- a/watchtower/wtdb/tower_db.go +++ b/watchtower/wtdb/tower_db.go @@ -5,8 +5,8 @@ import ( "errors" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/watchtower/blob" ) @@ -55,7 +55,7 @@ var ( // TowerDB is single database providing a persistent storage engine for the // wtserver and lookout subsystems. type TowerDB struct { - db *bbolt.DB + db kvdb.Backend dbPath string } @@ -88,7 +88,7 @@ func OpenTowerDB(dbPath string) (*TowerDB, error) { // initialized. This allows us to assume their presence throughout all // operations. If an known top-level bucket is expected to exist but is // missing, this will trigger a ErrUninitializedDB error. - err = towerDB.db.Update(initTowerDBBuckets) + err = kvdb.Update(towerDB.db, initTowerDBBuckets) if err != nil { bdb.Close() return nil, err @@ -99,7 +99,7 @@ func OpenTowerDB(dbPath string) (*TowerDB, error) { // initTowerDBBuckets creates all top-level buckets required to handle database // operations required by the latest version. -func initTowerDBBuckets(tx *bbolt.Tx) error { +func initTowerDBBuckets(tx kvdb.RwTx) error { buckets := [][]byte{ sessionsBkt, updateIndexBkt, @@ -108,7 +108,7 @@ func initTowerDBBuckets(tx *bbolt.Tx) error { } for _, bucket := range buckets { - _, err := tx.CreateBucketIfNotExists(bucket) + _, err := tx.CreateTopLevelBucket(bucket) if err != nil { return err } @@ -120,7 +120,7 @@ func initTowerDBBuckets(tx *bbolt.Tx) error { // bdb returns the backing bbolt.DB instance. // // NOTE: Part of the versionedDB interface. -func (t *TowerDB) bdb() *bbolt.DB { +func (t *TowerDB) bdb() kvdb.Backend { return t.db } @@ -129,7 +129,7 @@ func (t *TowerDB) bdb() *bbolt.DB { // NOTE: Part of the versionedDB interface. func (t *TowerDB) Version() (uint32, error) { var version uint32 - err := t.db.View(func(tx *bbolt.Tx) error { + err := kvdb.View(t.db, func(tx kvdb.ReadTx) error { var err error version, err = getDBVersion(tx) return err @@ -150,8 +150,8 @@ func (t *TowerDB) Close() error { // returned if the session could not be found. func (t *TowerDB) GetSessionInfo(id *SessionID) (*SessionInfo, error) { var session *SessionInfo - err := t.db.View(func(tx *bbolt.Tx) error { - sessions := tx.Bucket(sessionsBkt) + err := kvdb.View(t.db, func(tx kvdb.ReadTx) error { + sessions := tx.ReadBucket(sessionsBkt) if sessions == nil { return ErrUninitializedDB } @@ -170,13 +170,13 @@ func (t *TowerDB) GetSessionInfo(id *SessionID) (*SessionInfo, error) { // InsertSessionInfo records a negotiated session in the tower database. An // error is returned if the session already exists. func (t *TowerDB) InsertSessionInfo(session *SessionInfo) error { - return t.db.Update(func(tx *bbolt.Tx) error { - sessions := tx.Bucket(sessionsBkt) + return kvdb.Update(t.db, func(tx kvdb.RwTx) error { + sessions := tx.ReadWriteBucket(sessionsBkt) if sessions == nil { return ErrUninitializedDB } - updateIndex := tx.Bucket(updateIndexBkt) + updateIndex := tx.ReadWriteBucket(updateIndexBkt) if updateIndex == nil { return ErrUninitializedDB } @@ -219,18 +219,18 @@ func (t *TowerDB) InsertSessionInfo(session *SessionInfo) error { // properly and the last applied values echoed by the client are sane. func (t *TowerDB) InsertStateUpdate(update *SessionStateUpdate) (uint16, error) { var lastApplied uint16 - err := t.db.Update(func(tx *bbolt.Tx) error { - sessions := tx.Bucket(sessionsBkt) + err := kvdb.Update(t.db, func(tx kvdb.RwTx) error { + sessions := tx.ReadWriteBucket(sessionsBkt) if sessions == nil { return ErrUninitializedDB } - updates := tx.Bucket(updatesBkt) + updates := tx.ReadWriteBucket(updatesBkt) if updates == nil { return ErrUninitializedDB } - updateIndex := tx.Bucket(updateIndexBkt) + updateIndex := tx.ReadWriteBucket(updateIndexBkt) if updateIndex == nil { return ErrUninitializedDB } @@ -303,18 +303,18 @@ func (t *TowerDB) InsertStateUpdate(update *SessionStateUpdate) (uint16, error) // DeleteSession removes all data associated with a particular session id from // the tower's database. func (t *TowerDB) DeleteSession(target SessionID) error { - return t.db.Update(func(tx *bbolt.Tx) error { - sessions := tx.Bucket(sessionsBkt) + return kvdb.Update(t.db, func(tx kvdb.RwTx) error { + sessions := tx.ReadWriteBucket(sessionsBkt) if sessions == nil { return ErrUninitializedDB } - updates := tx.Bucket(updatesBkt) + updates := tx.ReadWriteBucket(updatesBkt) if updates == nil { return ErrUninitializedDB } - updateIndex := tx.Bucket(updateIndexBkt) + updateIndex := tx.ReadWriteBucket(updateIndexBkt) if updateIndex == nil { return ErrUninitializedDB } @@ -341,7 +341,7 @@ func (t *TowerDB) DeleteSession(target SessionID) error { for _, hint := range hints { // Remove the state updates for any blobs stored under // the target session identifier. - updatesForHint := updates.Bucket(hint[:]) + updatesForHint := updates.NestedReadWriteBucket(hint[:]) if updatesForHint == nil { continue } @@ -371,7 +371,7 @@ func (t *TowerDB) DeleteSession(target SessionID) error { // No more updates for this hint, prune hint bucket. default: - err = updates.DeleteBucket(hint[:]) + err = updates.DeleteNestedBucket(hint[:]) if err != nil { return err } @@ -389,13 +389,13 @@ func (t *TowerDB) DeleteSession(target SessionID) error { // they exist in the database. func (t *TowerDB) QueryMatches(breachHints []blob.BreachHint) ([]Match, error) { var matches []Match - err := t.db.View(func(tx *bbolt.Tx) error { - sessions := tx.Bucket(sessionsBkt) + err := kvdb.View(t.db, func(tx kvdb.ReadTx) error { + sessions := tx.ReadBucket(sessionsBkt) if sessions == nil { return ErrUninitializedDB } - updates := tx.Bucket(updatesBkt) + updates := tx.ReadBucket(updatesBkt) if updates == nil { return ErrUninitializedDB } @@ -405,7 +405,7 @@ func (t *TowerDB) QueryMatches(breachHints []blob.BreachHint) ([]Match, error) { for _, hint := range breachHints { // If a bucket does not exist for this hint, no matches // are known. - updatesForHint := updates.Bucket(hint[:]) + updatesForHint := updates.NestedReadBucket(hint[:]) if updatesForHint == nil { continue } @@ -471,8 +471,8 @@ func (t *TowerDB) QueryMatches(breachHints []blob.BreachHint) ([]Match, error) { // SetLookoutTip stores the provided epoch as the latest lookout tip epoch in // the tower database. func (t *TowerDB) SetLookoutTip(epoch *chainntnfs.BlockEpoch) error { - return t.db.Update(func(tx *bbolt.Tx) error { - lookoutTip := tx.Bucket(lookoutTipBkt) + return kvdb.Update(t.db, func(tx kvdb.RwTx) error { + lookoutTip := tx.ReadWriteBucket(lookoutTipBkt) if lookoutTip == nil { return ErrUninitializedDB } @@ -485,8 +485,8 @@ func (t *TowerDB) SetLookoutTip(epoch *chainntnfs.BlockEpoch) error { // database. func (t *TowerDB) GetLookoutTip() (*chainntnfs.BlockEpoch, error) { var epoch *chainntnfs.BlockEpoch - err := t.db.View(func(tx *bbolt.Tx) error { - lookoutTip := tx.Bucket(lookoutTipBkt) + err := kvdb.View(t.db, func(tx kvdb.ReadTx) error { + lookoutTip := tx.ReadBucket(lookoutTipBkt) if lookoutTip == nil { return ErrUninitializedDB } @@ -505,7 +505,7 @@ func (t *TowerDB) GetLookoutTip() (*chainntnfs.BlockEpoch, error) { // getSession retrieves the session info from the sessions bucket identified by // its session id. An error is returned if the session is not found or a // deserialization error occurs. -func getSession(sessions *bbolt.Bucket, id []byte) (*SessionInfo, error) { +func getSession(sessions kvdb.ReadBucket, id []byte) (*SessionInfo, error) { sessionBytes := sessions.Get(id) if sessionBytes == nil { return nil, ErrSessionNotFound @@ -522,7 +522,7 @@ func getSession(sessions *bbolt.Bucket, id []byte) (*SessionInfo, error) { // putSession stores the session info in the sessions bucket identified by its // session id. An error is returned if a serialization error occurs. -func putSession(sessions *bbolt.Bucket, session *SessionInfo) error { +func putSession(sessions kvdb.RwBucket, session *SessionInfo) error { var b bytes.Buffer err := session.Encode(&b) if err != nil { @@ -536,7 +536,7 @@ func putSession(sessions *bbolt.Bucket, session *SessionInfo) error { // session id. This ensures that future calls to getHintsForSession or // putHintForSession can rely on the bucket already being created, and fail if // index has not been initialized as this points to improper usage. -func touchSessionHintBkt(updateIndex *bbolt.Bucket, id *SessionID) error { +func touchSessionHintBkt(updateIndex kvdb.RwBucket, id *SessionID) error { _, err := updateIndex.CreateBucketIfNotExists(id[:]) return err } @@ -544,17 +544,17 @@ func touchSessionHintBkt(updateIndex *bbolt.Bucket, id *SessionID) error { // removeSessionHintBkt prunes the session-hint bucket for the given session id // and all of the hints contained inside. This should be used to clean up the // index upon session deletion. -func removeSessionHintBkt(updateIndex *bbolt.Bucket, id *SessionID) error { - return updateIndex.DeleteBucket(id[:]) +func removeSessionHintBkt(updateIndex kvdb.RwBucket, id *SessionID) error { + return updateIndex.DeleteNestedBucket(id[:]) } // getHintsForSession returns all known hints belonging to the given session id. // If the index for the session has not been initialized, this method returns // ErrNoSessionHintIndex. -func getHintsForSession(updateIndex *bbolt.Bucket, +func getHintsForSession(updateIndex kvdb.ReadBucket, id *SessionID) ([]blob.BreachHint, error) { - sessionHints := updateIndex.Bucket(id[:]) + sessionHints := updateIndex.NestedReadBucket(id[:]) if sessionHints == nil { return nil, ErrNoSessionHintIndex } @@ -582,10 +582,10 @@ func getHintsForSession(updateIndex *bbolt.Bucket, // session id, and used to perform efficient removal of updates. If the index // for the session has not been initialized, this method returns // ErrNoSessionHintIndex. -func putHintForSession(updateIndex *bbolt.Bucket, id *SessionID, +func putHintForSession(updateIndex kvdb.RwBucket, id *SessionID, hint blob.BreachHint) error { - sessionHints := updateIndex.Bucket(id[:]) + sessionHints := updateIndex.NestedReadWriteBucket(id[:]) if sessionHints == nil { return ErrNoSessionHintIndex } @@ -594,7 +594,7 @@ func putHintForSession(updateIndex *bbolt.Bucket, id *SessionID, } // putLookoutEpoch stores the given lookout tip block epoch in provided bucket. -func putLookoutEpoch(bkt *bbolt.Bucket, epoch *chainntnfs.BlockEpoch) error { +func putLookoutEpoch(bkt kvdb.RwBucket, epoch *chainntnfs.BlockEpoch) error { epochBytes := make([]byte, 36) copy(epochBytes, epoch.Hash[:]) byteOrder.PutUint32(epochBytes[32:], uint32(epoch.Height)) @@ -604,7 +604,7 @@ func putLookoutEpoch(bkt *bbolt.Bucket, epoch *chainntnfs.BlockEpoch) error { // getLookoutEpoch retrieves the lookout tip block epoch from the given bucket. // A nil epoch is returned if no update exists. -func getLookoutEpoch(bkt *bbolt.Bucket) *chainntnfs.BlockEpoch { +func getLookoutEpoch(bkt kvdb.ReadBucket) *chainntnfs.BlockEpoch { epochBytes := bkt.Get(lookoutTipKey) if len(epochBytes) != 36 { return nil @@ -625,7 +625,7 @@ func getLookoutEpoch(bkt *bbolt.Bucket) *chainntnfs.BlockEpoch { var errBucketNotEmpty = errors.New("bucket not empty") // isBucketEmpty returns errBucketNotEmpty if the bucket is not empty. -func isBucketEmpty(bkt *bbolt.Bucket) error { +func isBucketEmpty(bkt kvdb.ReadBucket) error { return bkt.ForEach(func(_, _ []byte) error { return errBucketNotEmpty }) diff --git a/watchtower/wtdb/version.go b/watchtower/wtdb/version.go index b8aa2b7e..597a1f1f 100644 --- a/watchtower/wtdb/version.go +++ b/watchtower/wtdb/version.go @@ -1,14 +1,14 @@ package wtdb import ( - "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" ) // migration is a function which takes a prior outdated version of the database // instances and mutates the key/bucket structure to arrive at a more // up-to-date version of the database. -type migration func(tx *bbolt.Tx) error +type migration func(tx kvdb.RwTx) error // version pairs a version number with the migration that would need to be // applied from the prior version to upgrade. @@ -46,8 +46,8 @@ func getMigrations(versions []version, curVersion uint32) []version { // getDBVersion retrieves the current database version from the metadata bucket // using the dbVersionKey. -func getDBVersion(tx *bbolt.Tx) (uint32, error) { - metadata := tx.Bucket(metadataBkt) +func getDBVersion(tx kvdb.ReadTx) (uint32, error) { + metadata := tx.ReadBucket(metadataBkt) if metadata == nil { return 0, ErrUninitializedDB } @@ -62,8 +62,8 @@ func getDBVersion(tx *bbolt.Tx) (uint32, error) { // initDBVersion initializes the top-level metadata bucket and writes the passed // version number as the current version. -func initDBVersion(tx *bbolt.Tx, version uint32) error { - _, err := tx.CreateBucketIfNotExists(metadataBkt) +func initDBVersion(tx kvdb.RwTx, version uint32) error { + _, err := tx.CreateTopLevelBucket(metadataBkt) if err != nil { return err } @@ -73,8 +73,8 @@ func initDBVersion(tx *bbolt.Tx, version uint32) error { // putDBVersion stores the passed database version in the metadata bucket under // the dbVersionKey. -func putDBVersion(tx *bbolt.Tx, version uint32) error { - metadata := tx.Bucket(metadataBkt) +func putDBVersion(tx kvdb.RwTx, version uint32) error { + metadata := tx.ReadWriteBucket(metadataBkt) if metadata == nil { return ErrUninitializedDB } @@ -89,7 +89,7 @@ func putDBVersion(tx *bbolt.Tx, version uint32) error { // on either. type versionedDB interface { // bdb returns the underlying bbolt database. - bdb() *bbolt.DB + bdb() kvdb.Backend // Version returns the current version stored in the database. Version() (uint32, error) @@ -105,7 +105,7 @@ func initOrSyncVersions(db versionedDB, init bool, versions []version) error { // If the database has not yet been created, we'll initialize the // database version with the latest known version. if init { - return db.bdb().Update(func(tx *bbolt.Tx) error { + return kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error { return initDBVersion(tx, getLatestDBVersion(versions)) }) } @@ -141,7 +141,7 @@ func syncVersions(db versionedDB, versions []version) error { // Otherwise, apply any migrations in order to bring the database // version up to the highest known version. updates := getMigrations(versions, curVersion) - return db.bdb().Update(func(tx *bbolt.Tx) error { + return kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error { for i, update := range updates { if update.migration == nil { continue