From 2277535e6b81430b22c8a8d0c0ec38ae036165d5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:18:27 -0800 Subject: [PATCH] server+discovery: replace gossiper message store with MessageStore --- discovery/gossiper.go | 203 +++++++++++-------------------------- discovery/gossiper_test.go | 41 +------- discovery/mock_test.go | 133 ++++++++++++++++++++++++ server.go | 6 ++ 4 files changed, 201 insertions(+), 182 deletions(-) create mode 100644 discovery/mock_test.go diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a8f15440..f1195d45 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2,7 +2,6 @@ package discovery import ( "bytes" - "encoding/binary" "errors" "fmt" "runtime" @@ -13,7 +12,6 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -25,13 +23,6 @@ import ( ) var ( - // messageStoreKey is a key used to create a top level bucket in - // the gossiper database, used for storing messages that are to - // be sent to peers. Currently this is used for reliably sending - // AnnounceSignatures messages, by persisting them until a send - // operation has succeeded. - messageStoreKey = []byte("message-store") - // ErrGossiperShuttingDown is an error that is returned if the gossiper // is in the process of being shut down. ErrGossiperShuttingDown = errors.New("gossiper is shutting down") @@ -137,6 +128,10 @@ type Config struct { // proof storage to make waiting proofs persistent. DB *channeldb.DB + // MessageStore is a persistent storage of gossip messages which we will + // use to determine which messages need to be resent for a given peer. + MessageStore GossipMessageStore + // AnnSigner is an instance of the MessageSigner interface which will // be used to manually sign any outgoing channel updates. The signer // implementation should be backed by the public key of the backing @@ -814,126 +809,69 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { // will try to resend them. If we have the full proof, we can safely delete the // message from the messageStore. func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { - type msgTuple struct { - peer *btcec.PublicKey - msg *lnwire.AnnounceSignatures - dbKey []byte - } - - // Fetch all the AnnounceSignatures messages that was added to the - // database. - // - // TODO(halseth): database access should be abstracted - // behind interface. - var msgsResend []msgTuple - if err := d.cfg.DB.View(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(messageStoreKey) - if bucket == nil { - return nil - } - - if err := bucket.ForEach(func(k, v []byte) error { - // The database value represents the encoded - // AnnounceSignatures message. - r := bytes.NewReader(v) - msg := &lnwire.AnnounceSignatures{} - if err := msg.Decode(r, 0); err != nil { - return err - } - - // The first 33 bytes of the database key is the peer's - // public key. - peer, err := btcec.ParsePubKey(k[:33], btcec.S256()) - if err != nil { - return err - } - - // Make a copy of the database key corresponding to - // these AnnounceSignatures. - dbKey := make([]byte, len(k)) - copy(dbKey, k) - - t := msgTuple{peer, msg, dbKey} - - // Add the message to the slice, such that we can - // resend it after the database transaction is over. - msgsResend = append(msgsResend, t) - return nil - }); err != nil { - return err - } - return nil - }); err != nil { + peerMsgsToResend, err := d.cfg.MessageStore.Messages() + if err != nil { return err } - // deleteMsg removes the message associated with the passed msgTuple - // from the messageStore. - deleteMsg := func(t msgTuple) error { - log.Debugf("Deleting message for chanID=%v from "+ - "messageStore", t.msg.ChannelID) - if err := d.cfg.DB.Update(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(messageStoreKey) - if bucket == nil { - return fmt.Errorf("bucket " + - "unexpectedly did not exist") - } - - return bucket.Delete(t.dbKey[:]) - }); err != nil { - return fmt.Errorf("Failed deleting message "+ - "from database: %v", err) - } - return nil - } - // We now iterate over these messages, resending those that we don't // have the full proof for, deleting the rest. - for _, t := range msgsResend { - // Check if the full channel proof exists in our graph. - chanInfo, _, _, err := d.cfg.Router.GetChannelByID( - t.msg.ShortChannelID) + for peer, msgsToResend := range peerMsgsToResend { + pubKey, err := btcec.ParsePubKey(peer[:], btcec.S256()) if err != nil { - // If the channel cannot be found, it is most likely a - // leftover message for a channel that was closed. In - // this case we delete it from the message store. - log.Warnf("unable to fetch channel info for "+ - "chanID=%v from graph: %v. Will delete local"+ - "proof from database", - t.msg.ChannelID, err) - if err := deleteMsg(t); err != nil { - return err - } - continue + return err } - // 1. If the full proof does not exist in the graph, it means - // that we haven't received the remote proof yet (or that we - // crashed before able to assemble the full proof). Since the - // remote node might think they have delivered their proof to - // us, we will resend _our_ proof to trigger a resend on their - // part: they will then be able to assemble and send us the - // full proof. - if chanInfo.AuthProof == nil { - err := d.sendAnnSigReliably(t.msg, t.peer) + for _, msg := range msgsToResend { + msg := msg.(*lnwire.AnnounceSignatures) + + // Check if the full channel proof exists in our graph. + chanInfo, _, _, err := d.cfg.Router.GetChannelByID( + msg.ShortChannelID) if err != nil { - return err + // If the channel cannot be found, it is most likely a + // leftover message for a channel that was closed. In + // this case we delete it from the message store. + log.Warnf("unable to fetch channel info for "+ + "chanID=%v from graph: %v. Will delete local"+ + "proof from database", + msg.ChannelID, err) + err = d.cfg.MessageStore.DeleteMessage(msg, peer) + if err != nil { + return err + } + continue } - continue - } - // 2. If the proof does exist in the graph, we have - // successfully received the remote proof and assembled the - // full proof. In this case we can safely delete the local - // proof from the database. In case the remote hasn't been able - // to assemble the full proof yet (maybe because of a crash), - // we will send them the full proof if we notice that they - // retry sending their half proof. - if chanInfo.AuthProof != nil { - log.Debugf("Deleting message for chanID=%v from "+ - "messageStore", t.msg.ChannelID) - if err := deleteMsg(t); err != nil { - return err + // 1. If the full proof does not exist in the graph, it means + // that we haven't received the remote proof yet (or that we + // crashed before able to assemble the full proof). Since the + // remote node might think they have delivered their proof to + // us, we will resend _our_ proof to trigger a resend on their + // part: they will then be able to assemble and send us the + // full proof. + if chanInfo.AuthProof == nil { + err := d.sendAnnSigReliably(msg, pubKey) + if err != nil { + return err + } + continue + } + + // 2. If the proof does exist in the graph, we have + // successfully received the remote proof and assembled the + // full proof. In this case we can safely delete the local + // proof from the database. In case the remote hasn't been able + // to assemble the full proof yet (maybe because of a crash), + // we will send them the full proof if we notice that they + // retry sending their half proof. + if chanInfo.AuthProof != nil { + log.Debugf("Deleting message for chanID=%v from "+ + "messageStore", msg.ChannelID) + err := d.cfg.MessageStore.DeleteMessage(msg, peer) + if err != nil { + return err + } } } } @@ -2434,31 +2372,10 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably( // We first add this message to the database, such that in case // we do not succeed in sending it to the peer, we'll fetch it - // from the DB next time we start, and retry. We use the peer ID - // + shortChannelID as key, as there possibly is more than one - // channel opening in progress to the same peer. - var key [41]byte - copy(key[:33], remotePeer.SerializeCompressed()) - binary.BigEndian.PutUint64(key[33:], msg.ShortChannelID.ToUint64()) - - err := d.cfg.DB.Update(func(tx *bbolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists(messageStoreKey) - if err != nil { - return err - } - - // Encode the AnnounceSignatures message. - var b bytes.Buffer - if err := msg.Encode(&b, 0); err != nil { - return err - } - - // Add the encoded message to the database using the peer - // + shortChanID as key. - return bucket.Put(key[:], b.Bytes()) - - }) - if err != nil { + // from the DB next time we start, and retry. + var remotePubKey [33]byte + copy(remotePubKey[:], remotePeer.SerializeCompressed()) + if err := d.cfg.MessageStore.AddMessage(msg, remotePubKey); err != nil { return err } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index be029878..d32ca0af 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -619,6 +619,7 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { RetransmitDelay: retransmitDelay, ProofMatureDelta: proofMatureDelta, DB: db, + MessageStore: newMockMessageStore(), }, nodeKeyPub1) if err != nil { cleanUpDb() @@ -1655,6 +1656,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { RetransmitDelay: retransmitDelay, ProofMatureDelta: proofMatureDelta, DB: ctx.gossiper.cfg.DB, + MessageStore: ctx.gossiper.cfg.MessageStore, }, ctx.gossiper.selfKey) if err != nil { t.Fatalf("unable to recreate gossiper: %v", err) @@ -2861,42 +2863,3 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { t.Fatalf("unable to process announcement: %v", err) } } - -// mockPeer implements the lnpeer.Peer interface and is used to test the -// gossiper's interaction with peers. -type mockPeer struct { - pk *btcec.PublicKey - sentMsgs chan lnwire.Message - quit chan struct{} -} - -var _ lnpeer.Peer = (*mockPeer)(nil) - -func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { - if p.sentMsgs == nil && p.quit == nil { - return nil - } - - for _, msg := range msgs { - select { - case p.sentMsgs <- msg: - case <-p.quit: - } - } - - return nil -} -func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error { - return nil -} -func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } -func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } -func (p *mockPeer) PubKey() [33]byte { - var pubkey [33]byte - copy(pubkey[:], p.pk.SerializeCompressed()) - return pubkey -} -func (p *mockPeer) Address() net.Addr { return nil } -func (p *mockPeer) QuitSignal() <-chan struct{} { - return p.quit -} diff --git a/discovery/mock_test.go b/discovery/mock_test.go new file mode 100644 index 00000000..85c6c4f3 --- /dev/null +++ b/discovery/mock_test.go @@ -0,0 +1,133 @@ +package discovery + +import ( + "net" + "sync" + + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// mockPeer implements the lnpeer.Peer interface and is used to test the +// gossiper's interaction with peers. +type mockPeer struct { + pk *btcec.PublicKey + sentMsgs chan lnwire.Message + quit chan struct{} +} + +var _ lnpeer.Peer = (*mockPeer)(nil) + +func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { + if p.sentMsgs == nil && p.quit == nil { + return nil + } + + for _, msg := range msgs { + select { + case p.sentMsgs <- msg: + case <-p.quit: + } + } + + return nil +} +func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error { + return nil +} +func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } +func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } +func (p *mockPeer) PubKey() [33]byte { + var pubkey [33]byte + copy(pubkey[:], p.pk.SerializeCompressed()) + return pubkey +} +func (p *mockPeer) Address() net.Addr { return nil } +func (p *mockPeer) QuitSignal() <-chan struct{} { + return p.quit +} + +// mockMessageStore is an in-memory implementation of the MessageStore interface +// used for the gossiper's unit tests. +type mockMessageStore struct { + sync.Mutex + messages map[[33]byte]map[lnwire.Message]struct{} +} + +func newMockMessageStore() *mockMessageStore { + return &mockMessageStore{ + messages: make(map[[33]byte]map[lnwire.Message]struct{}), + } +} + +var _ GossipMessageStore = (*mockMessageStore)(nil) + +func (s *mockMessageStore) AddMessage(msg lnwire.Message, pubKey [33]byte) error { + s.Lock() + defer s.Unlock() + + if _, ok := s.messages[pubKey]; !ok { + s.messages[pubKey] = make(map[lnwire.Message]struct{}) + } + + s.messages[pubKey][msg] = struct{}{} + + return nil +} + +func (s *mockMessageStore) DeleteMessage(msg lnwire.Message, pubKey [33]byte) error { + s.Lock() + defer s.Unlock() + + peerMsgs, ok := s.messages[pubKey] + if !ok { + return nil + } + + delete(peerMsgs, msg) + return nil +} + +func (s *mockMessageStore) Messages() (map[[33]byte][]lnwire.Message, error) { + s.Lock() + defer s.Unlock() + + msgs := make(map[[33]byte][]lnwire.Message, len(s.messages)) + for peer, peerMsgs := range s.messages { + for msg := range peerMsgs { + msgs[peer] = append(msgs[peer], msg) + } + } + return msgs, nil +} + +func (s *mockMessageStore) Peers() (map[[33]byte]struct{}, error) { + s.Lock() + defer s.Unlock() + + peers := make(map[[33]byte]struct{}, len(s.messages)) + for peer := range s.messages { + peers[peer] = struct{}{} + } + return peers, nil +} + +func (s *mockMessageStore) MessagesForPeer(pubKey [33]byte) ([]lnwire.Message, error) { + s.Lock() + defer s.Unlock() + + peerMsgs, ok := s.messages[pubKey] + if !ok { + return nil, nil + } + + msgs := make([]lnwire.Message, 0, len(peerMsgs)) + for msg := range peerMsgs { + msgs = append(msgs, msg) + } + + return msgs, nil +} diff --git a/server.go b/server.go index 38df7e34..13210c6e 100644 --- a/server.go +++ b/server.go @@ -583,6 +583,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, s.chanDB.ChannelGraph(), ) + gossipMessageStore, err := discovery.NewMessageStore(s.chanDB) + if err != nil { + return nil, err + } + s.authGossiper, err = discovery.New(discovery.Config{ Router: s.chanRouter, Notifier: s.cc.chainNotifier, @@ -598,6 +603,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), RetransmitDelay: time.Minute * 30, DB: chanDB, + MessageStore: gossipMessageStore, AnnSigner: s.nodeSigner, }, s.identityPriv.PubKey(),