discovery: covert to use new kvdb abstraction
This commit is contained in:
parent
852b2380a9
commit
ace7a78494
@ -6,8 +6,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
@ -68,8 +68,8 @@ var _ GossipMessageStore = (*MessageStore)(nil)
|
||||
|
||||
// NewMessageStore creates a new message store backed by a channeldb instance.
|
||||
func NewMessageStore(db *channeldb.DB) (*MessageStore, error) {
|
||||
err := db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(messageStoreBucket)
|
||||
err := kvdb.Batch(db.Backend, func(tx kvdb.RwTx) error {
|
||||
_, err := tx.CreateTopLevelBucket(messageStoreBucket)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -124,8 +124,8 @@ func (s *MessageStore) AddMessage(msg lnwire.Message, peerPubKey [33]byte) error
|
||||
return err
|
||||
}
|
||||
|
||||
return s.db.Batch(func(tx *bbolt.Tx) error {
|
||||
messageStore := tx.Bucket(messageStoreBucket)
|
||||
return kvdb.Batch(s.db.Backend, func(tx kvdb.RwTx) error {
|
||||
messageStore := tx.ReadWriteBucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return ErrCorruptedMessageStore
|
||||
}
|
||||
@ -145,8 +145,8 @@ func (s *MessageStore) DeleteMessage(msg lnwire.Message,
|
||||
return err
|
||||
}
|
||||
|
||||
return s.db.Batch(func(tx *bbolt.Tx) error {
|
||||
messageStore := tx.Bucket(messageStoreBucket)
|
||||
return kvdb.Batch(s.db.Backend, func(tx kvdb.RwTx) error {
|
||||
messageStore := tx.ReadWriteBucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return ErrCorruptedMessageStore
|
||||
}
|
||||
@ -200,8 +200,8 @@ func readMessage(msgBytes []byte) (lnwire.Message, error) {
|
||||
// all peers.
|
||||
func (s *MessageStore) Messages() (map[[33]byte][]lnwire.Message, error) {
|
||||
msgs := make(map[[33]byte][]lnwire.Message)
|
||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||
messageStore := tx.Bucket(messageStoreBucket)
|
||||
err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
messageStore := tx.ReadBucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return ErrCorruptedMessageStore
|
||||
}
|
||||
@ -238,13 +238,13 @@ func (s *MessageStore) MessagesForPeer(
|
||||
peerPubKey [33]byte) ([]lnwire.Message, error) {
|
||||
|
||||
var msgs []lnwire.Message
|
||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||
messageStore := tx.Bucket(messageStoreBucket)
|
||||
err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
messageStore := tx.ReadBucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return ErrCorruptedMessageStore
|
||||
}
|
||||
|
||||
c := messageStore.Cursor()
|
||||
c := messageStore.ReadCursor()
|
||||
k, v := c.Seek(peerPubKey[:])
|
||||
for ; bytes.HasPrefix(k, peerPubKey[:]); k, v = c.Next() {
|
||||
// Deserialize the message from its raw bytes and filter
|
||||
@ -273,8 +273,8 @@ func (s *MessageStore) MessagesForPeer(
|
||||
// Peers returns the public key of all peers with messages within the store.
|
||||
func (s *MessageStore) Peers() (map[[33]byte]struct{}, error) {
|
||||
peers := make(map[[33]byte]struct{})
|
||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||
messageStore := tx.Bucket(messageStoreBucket)
|
||||
err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
messageStore := tx.ReadBucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return ErrCorruptedMessageStore
|
||||
}
|
||||
|
@ -9,9 +9,9 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec"
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
@ -236,8 +236,8 @@ func TestMessageStoreUnsupportedMessage(t *testing.T) {
|
||||
if _, err := lnwire.WriteMessage(&rawMsg, unsupportedMsg, 0); err != nil {
|
||||
t.Fatalf("unable to serialize message: %v", err)
|
||||
}
|
||||
err = msgStore.db.Update(func(tx *bbolt.Tx) error {
|
||||
messageStore := tx.Bucket(messageStoreBucket)
|
||||
err = kvdb.Update(msgStore.db, func(tx kvdb.RwTx) error {
|
||||
messageStore := tx.ReadWriteBucket(messageStoreBucket)
|
||||
return messageStore.Put(msgKey, rawMsg.Bytes())
|
||||
})
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user