diff --git a/discovery/message_store.go b/discovery/message_store.go index e0c10a86..207f857f 100644 --- a/discovery/message_store.go +++ b/discovery/message_store.go @@ -6,8 +6,8 @@ import ( "errors" "fmt" - "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" ) @@ -68,8 +68,8 @@ var _ GossipMessageStore = (*MessageStore)(nil) // NewMessageStore creates a new message store backed by a channeldb instance. func NewMessageStore(db *channeldb.DB) (*MessageStore, error) { - err := db.Update(func(tx *bbolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(messageStoreBucket) + err := kvdb.Batch(db.Backend, func(tx kvdb.RwTx) error { + _, err := tx.CreateTopLevelBucket(messageStoreBucket) return err }) if err != nil { @@ -124,8 +124,8 @@ func (s *MessageStore) AddMessage(msg lnwire.Message, peerPubKey [33]byte) error return err } - return s.db.Batch(func(tx *bbolt.Tx) error { - messageStore := tx.Bucket(messageStoreBucket) + return kvdb.Batch(s.db.Backend, func(tx kvdb.RwTx) error { + messageStore := tx.ReadWriteBucket(messageStoreBucket) if messageStore == nil { return ErrCorruptedMessageStore } @@ -145,8 +145,8 @@ func (s *MessageStore) DeleteMessage(msg lnwire.Message, return err } - return s.db.Batch(func(tx *bbolt.Tx) error { - messageStore := tx.Bucket(messageStoreBucket) + return kvdb.Batch(s.db.Backend, func(tx kvdb.RwTx) error { + messageStore := tx.ReadWriteBucket(messageStoreBucket) if messageStore == nil { return ErrCorruptedMessageStore } @@ -200,8 +200,8 @@ func readMessage(msgBytes []byte) (lnwire.Message, error) { // all peers. func (s *MessageStore) Messages() (map[[33]byte][]lnwire.Message, error) { msgs := make(map[[33]byte][]lnwire.Message) - err := s.db.View(func(tx *bbolt.Tx) error { - messageStore := tx.Bucket(messageStoreBucket) + err := kvdb.View(s.db, func(tx kvdb.ReadTx) error { + messageStore := tx.ReadBucket(messageStoreBucket) if messageStore == nil { return ErrCorruptedMessageStore } @@ -238,13 +238,13 @@ func (s *MessageStore) MessagesForPeer( peerPubKey [33]byte) ([]lnwire.Message, error) { var msgs []lnwire.Message - err := s.db.View(func(tx *bbolt.Tx) error { - messageStore := tx.Bucket(messageStoreBucket) + err := kvdb.View(s.db, func(tx kvdb.ReadTx) error { + messageStore := tx.ReadBucket(messageStoreBucket) if messageStore == nil { return ErrCorruptedMessageStore } - c := messageStore.Cursor() + c := messageStore.ReadCursor() k, v := c.Seek(peerPubKey[:]) for ; bytes.HasPrefix(k, peerPubKey[:]); k, v = c.Next() { // Deserialize the message from its raw bytes and filter @@ -273,8 +273,8 @@ func (s *MessageStore) MessagesForPeer( // Peers returns the public key of all peers with messages within the store. func (s *MessageStore) Peers() (map[[33]byte]struct{}, error) { peers := make(map[[33]byte]struct{}) - err := s.db.View(func(tx *bbolt.Tx) error { - messageStore := tx.Bucket(messageStoreBucket) + err := kvdb.View(s.db, func(tx kvdb.ReadTx) error { + messageStore := tx.ReadBucket(messageStoreBucket) if messageStore == nil { return ErrCorruptedMessageStore } diff --git a/discovery/message_store_test.go b/discovery/message_store_test.go index a106ad22..fc7ba336 100644 --- a/discovery/message_store_test.go +++ b/discovery/message_store_test.go @@ -9,9 +9,9 @@ import ( "testing" "github.com/btcsuite/btcd/btcec" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" ) @@ -236,8 +236,8 @@ func TestMessageStoreUnsupportedMessage(t *testing.T) { if _, err := lnwire.WriteMessage(&rawMsg, unsupportedMsg, 0); err != nil { t.Fatalf("unable to serialize message: %v", err) } - err = msgStore.db.Update(func(tx *bbolt.Tx) error { - messageStore := tx.Bucket(messageStoreBucket) + err = kvdb.Update(msgStore.db, func(tx kvdb.RwTx) error { + messageStore := tx.ReadWriteBucket(messageStoreBucket) return messageStore.Put(msgKey, rawMsg.Bytes()) }) if err != nil {