server+discovery: replace gossiper message store with MessageStore

This commit is contained in:
Wilmer Paulino 2019-02-05 17:18:27 -08:00
parent 9febc9cc04
commit 2277535e6b
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
4 changed files with 201 additions and 182 deletions

@ -2,7 +2,6 @@ package discovery
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"runtime"
@ -13,7 +12,6 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
@ -25,13 +23,6 @@ import (
)
var (
// messageStoreKey is a key used to create a top level bucket in
// the gossiper database, used for storing messages that are to
// be sent to peers. Currently this is used for reliably sending
// AnnounceSignatures messages, by persisting them until a send
// operation has succeeded.
messageStoreKey = []byte("message-store")
// ErrGossiperShuttingDown is an error that is returned if the gossiper
// is in the process of being shut down.
ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
@ -137,6 +128,10 @@ type Config struct {
// proof storage to make waiting proofs persistent.
DB *channeldb.DB
// MessageStore is a persistent storage of gossip messages which we will
// use to determine which messages need to be resent for a given peer.
MessageStore GossipMessageStore
// AnnSigner is an instance of the MessageSigner interface which will
// be used to manually sign any outgoing channel updates. The signer
// implementation should be backed by the public key of the backing
@ -814,85 +809,25 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
// will try to resend them. If we have the full proof, we can safely delete the
// message from the messageStore.
func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
type msgTuple struct {
peer *btcec.PublicKey
msg *lnwire.AnnounceSignatures
dbKey []byte
}
// Fetch all the AnnounceSignatures messages that was added to the
// database.
//
// TODO(halseth): database access should be abstracted
// behind interface.
var msgsResend []msgTuple
if err := d.cfg.DB.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(messageStoreKey)
if bucket == nil {
return nil
}
if err := bucket.ForEach(func(k, v []byte) error {
// The database value represents the encoded
// AnnounceSignatures message.
r := bytes.NewReader(v)
msg := &lnwire.AnnounceSignatures{}
if err := msg.Decode(r, 0); err != nil {
return err
}
// The first 33 bytes of the database key is the peer's
// public key.
peer, err := btcec.ParsePubKey(k[:33], btcec.S256())
peerMsgsToResend, err := d.cfg.MessageStore.Messages()
if err != nil {
return err
}
// Make a copy of the database key corresponding to
// these AnnounceSignatures.
dbKey := make([]byte, len(k))
copy(dbKey, k)
t := msgTuple{peer, msg, dbKey}
// Add the message to the slice, such that we can
// resend it after the database transaction is over.
msgsResend = append(msgsResend, t)
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
// deleteMsg removes the message associated with the passed msgTuple
// from the messageStore.
deleteMsg := func(t msgTuple) error {
log.Debugf("Deleting message for chanID=%v from "+
"messageStore", t.msg.ChannelID)
if err := d.cfg.DB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(messageStoreKey)
if bucket == nil {
return fmt.Errorf("bucket " +
"unexpectedly did not exist")
}
return bucket.Delete(t.dbKey[:])
}); err != nil {
return fmt.Errorf("Failed deleting message "+
"from database: %v", err)
}
return nil
}
// We now iterate over these messages, resending those that we don't
// have the full proof for, deleting the rest.
for _, t := range msgsResend {
for peer, msgsToResend := range peerMsgsToResend {
pubKey, err := btcec.ParsePubKey(peer[:], btcec.S256())
if err != nil {
return err
}
for _, msg := range msgsToResend {
msg := msg.(*lnwire.AnnounceSignatures)
// Check if the full channel proof exists in our graph.
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(
t.msg.ShortChannelID)
msg.ShortChannelID)
if err != nil {
// If the channel cannot be found, it is most likely a
// leftover message for a channel that was closed. In
@ -900,8 +835,9 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
log.Warnf("unable to fetch channel info for "+
"chanID=%v from graph: %v. Will delete local"+
"proof from database",
t.msg.ChannelID, err)
if err := deleteMsg(t); err != nil {
msg.ChannelID, err)
err = d.cfg.MessageStore.DeleteMessage(msg, peer)
if err != nil {
return err
}
continue
@ -915,7 +851,7 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
// part: they will then be able to assemble and send us the
// full proof.
if chanInfo.AuthProof == nil {
err := d.sendAnnSigReliably(t.msg, t.peer)
err := d.sendAnnSigReliably(msg, pubKey)
if err != nil {
return err
}
@ -931,12 +867,14 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
// retry sending their half proof.
if chanInfo.AuthProof != nil {
log.Debugf("Deleting message for chanID=%v from "+
"messageStore", t.msg.ChannelID)
if err := deleteMsg(t); err != nil {
"messageStore", msg.ChannelID)
err := d.cfg.MessageStore.DeleteMessage(msg, peer)
if err != nil {
return err
}
}
}
}
return nil
}
@ -2434,31 +2372,10 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably(
// We first add this message to the database, such that in case
// we do not succeed in sending it to the peer, we'll fetch it
// from the DB next time we start, and retry. We use the peer ID
// + shortChannelID as key, as there possibly is more than one
// channel opening in progress to the same peer.
var key [41]byte
copy(key[:33], remotePeer.SerializeCompressed())
binary.BigEndian.PutUint64(key[33:], msg.ShortChannelID.ToUint64())
err := d.cfg.DB.Update(func(tx *bbolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(messageStoreKey)
if err != nil {
return err
}
// Encode the AnnounceSignatures message.
var b bytes.Buffer
if err := msg.Encode(&b, 0); err != nil {
return err
}
// Add the encoded message to the database using the peer
// + shortChanID as key.
return bucket.Put(key[:], b.Bytes())
})
if err != nil {
// from the DB next time we start, and retry.
var remotePubKey [33]byte
copy(remotePubKey[:], remotePeer.SerializeCompressed())
if err := d.cfg.MessageStore.AddMessage(msg, remotePubKey); err != nil {
return err
}

@ -619,6 +619,7 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
DB: db,
MessageStore: newMockMessageStore(),
}, nodeKeyPub1)
if err != nil {
cleanUpDb()
@ -1655,6 +1656,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
DB: ctx.gossiper.cfg.DB,
MessageStore: ctx.gossiper.cfg.MessageStore,
}, ctx.gossiper.selfKey)
if err != nil {
t.Fatalf("unable to recreate gossiper: %v", err)
@ -2861,42 +2863,3 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) {
t.Fatalf("unable to process announcement: %v", err)
}
}
// mockPeer implements the lnpeer.Peer interface and is used to test the
// gossiper's interaction with peers.
type mockPeer struct {
pk *btcec.PublicKey
sentMsgs chan lnwire.Message
quit chan struct{}
}
var _ lnpeer.Peer = (*mockPeer)(nil)
func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error {
if p.sentMsgs == nil && p.quit == nil {
return nil
}
for _, msg := range msgs {
select {
case p.sentMsgs <- msg:
case <-p.quit:
}
}
return nil
}
func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error {
return nil
}
func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil }
func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk }
func (p *mockPeer) PubKey() [33]byte {
var pubkey [33]byte
copy(pubkey[:], p.pk.SerializeCompressed())
return pubkey
}
func (p *mockPeer) Address() net.Addr { return nil }
func (p *mockPeer) QuitSignal() <-chan struct{} {
return p.quit
}

133
discovery/mock_test.go Normal file

@ -0,0 +1,133 @@
package discovery
import (
"net"
"sync"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// mockPeer implements the lnpeer.Peer interface and is used to test the
// gossiper's interaction with peers.
type mockPeer struct {
pk *btcec.PublicKey
sentMsgs chan lnwire.Message
quit chan struct{}
}
var _ lnpeer.Peer = (*mockPeer)(nil)
func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error {
if p.sentMsgs == nil && p.quit == nil {
return nil
}
for _, msg := range msgs {
select {
case p.sentMsgs <- msg:
case <-p.quit:
}
}
return nil
}
func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error {
return nil
}
func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil }
func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk }
func (p *mockPeer) PubKey() [33]byte {
var pubkey [33]byte
copy(pubkey[:], p.pk.SerializeCompressed())
return pubkey
}
func (p *mockPeer) Address() net.Addr { return nil }
func (p *mockPeer) QuitSignal() <-chan struct{} {
return p.quit
}
// mockMessageStore is an in-memory implementation of the MessageStore interface
// used for the gossiper's unit tests.
type mockMessageStore struct {
sync.Mutex
messages map[[33]byte]map[lnwire.Message]struct{}
}
func newMockMessageStore() *mockMessageStore {
return &mockMessageStore{
messages: make(map[[33]byte]map[lnwire.Message]struct{}),
}
}
var _ GossipMessageStore = (*mockMessageStore)(nil)
func (s *mockMessageStore) AddMessage(msg lnwire.Message, pubKey [33]byte) error {
s.Lock()
defer s.Unlock()
if _, ok := s.messages[pubKey]; !ok {
s.messages[pubKey] = make(map[lnwire.Message]struct{})
}
s.messages[pubKey][msg] = struct{}{}
return nil
}
func (s *mockMessageStore) DeleteMessage(msg lnwire.Message, pubKey [33]byte) error {
s.Lock()
defer s.Unlock()
peerMsgs, ok := s.messages[pubKey]
if !ok {
return nil
}
delete(peerMsgs, msg)
return nil
}
func (s *mockMessageStore) Messages() (map[[33]byte][]lnwire.Message, error) {
s.Lock()
defer s.Unlock()
msgs := make(map[[33]byte][]lnwire.Message, len(s.messages))
for peer, peerMsgs := range s.messages {
for msg := range peerMsgs {
msgs[peer] = append(msgs[peer], msg)
}
}
return msgs, nil
}
func (s *mockMessageStore) Peers() (map[[33]byte]struct{}, error) {
s.Lock()
defer s.Unlock()
peers := make(map[[33]byte]struct{}, len(s.messages))
for peer := range s.messages {
peers[peer] = struct{}{}
}
return peers, nil
}
func (s *mockMessageStore) MessagesForPeer(pubKey [33]byte) ([]lnwire.Message, error) {
s.Lock()
defer s.Unlock()
peerMsgs, ok := s.messages[pubKey]
if !ok {
return nil, nil
}
msgs := make([]lnwire.Message, 0, len(peerMsgs))
for msg := range peerMsgs {
msgs = append(msgs, msg)
}
return msgs, nil
}

@ -583,6 +583,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
s.chanDB.ChannelGraph(),
)
gossipMessageStore, err := discovery.NewMessageStore(s.chanDB)
if err != nil {
return nil, err
}
s.authGossiper, err = discovery.New(discovery.Config{
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
@ -598,6 +603,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
DB: chanDB,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,
},
s.identityPriv.PubKey(),