diff --git a/channeldb/db.go b/channeldb/db.go index c4262ffa..51f3fe9a 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -90,6 +90,13 @@ var ( number: 7, migration: migrateOptionalChannelCloseSummaryFields, }, + { + // The DB version that changes the gossiper's message + // store keys to account for the message's type and + // ShortChannelID. + number: 8, + migration: migrateGossipMessageStoreKeys, + }, } // Big endian is the preferred byte order, due to cursor scans over diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go index 921e55c5..76d0cb25 100644 --- a/channeldb/meta_test.go +++ b/channeldb/meta_test.go @@ -50,7 +50,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), if err == nil && shouldFail { t.Fatal("error wasn't received on migration stage") } else if err != nil && !shouldFail { - t.Fatal("error was received on migration stage") + t.Fatalf("error was received on migration stage: %v", err) } // afterMigration usually used for checking the database state and diff --git a/channeldb/migrations.go b/channeldb/migrations.go index 5d4919db..f86e416b 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/lnwire" ) // migrateNodeAndEdgeUpdateIndex is a migration function that will update the @@ -610,3 +611,75 @@ func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error { return nil } + +var messageStoreBucket = []byte("message-store") + +// migrateGossipMessageStoreKeys migrates the key format for gossip messages +// found in the message store to a new one that takes into consideration the of +// the message being stored. +func migrateGossipMessageStoreKeys(tx *bbolt.Tx) error { + // We'll start by retrieving the bucket in which these messages are + // stored within. If there isn't one, there's nothing left for us to do + // so we can avoid the migration. + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return nil + } + + log.Info("Migrating to the gossip message store new key format") + + // Otherwise we'll proceed with the migration. We'll start by coalescing + // all the current messages within the store, which are indexed by the + // public key of the peer which they should be sent to, followed by the + // short channel ID of the channel for which the message belongs to. We + // should only expect to find channel announcement signatures as that + // was the only support message type previously. + msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures) + err := messageStore.ForEach(func(k, v []byte) error { + var msgKey [33 + 8]byte + copy(msgKey[:], k) + + msg := &lnwire.AnnounceSignatures{} + if err := msg.Decode(bytes.NewReader(v), 0); err != nil { + return err + } + + msgs[msgKey] = msg + + return nil + + }) + if err != nil { + return err + } + + // Then, we'll go over all of our messages, remove their previous entry, + // and add another with the new key format. Once we've done this for + // every message, we can consider the migration complete. + for oldMsgKey, msg := range msgs { + if err := messageStore.Delete(oldMsgKey[:]); err != nil { + return err + } + + // Construct the new key for which we'll find this message with + // in the store. It'll be the same as the old, but we'll also + // include the message type. + var msgType [2]byte + binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType())) + newMsgKey := append(oldMsgKey[:], msgType[:]...) + + // Serialize the message with its wire encoding. + var b bytes.Buffer + if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil { + return err + } + + if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil { + return err + } + } + + log.Info("Migration to the gossip message store new key format complete!") + + return nil +} diff --git a/channeldb/migrations_test.go b/channeldb/migrations_test.go index ed2829c0..9223108d 100644 --- a/channeldb/migrations_test.go +++ b/channeldb/migrations_test.go @@ -12,6 +12,7 @@ import ( "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" ) // TestPaymentStatusesMigration checks that already completed payments will have @@ -468,3 +469,98 @@ func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) { false) } } + +// TestMigrateGossipMessageStoreKeys ensures that the migration to the new +// gossip message store key format is successful/unsuccessful under various +// scenarios. +func TestMigrateGossipMessageStoreKeys(t *testing.T) { + t.Parallel() + + // Construct the message which we'll use to test the migration, along + // with its old and new key formats. + shortChanID := lnwire.ShortChannelID{BlockHeight: 10} + msg := &lnwire.AnnounceSignatures{ShortChannelID: shortChanID} + + var oldMsgKey [33 + 8]byte + copy(oldMsgKey[:33], pubKey.SerializeCompressed()) + binary.BigEndian.PutUint64(oldMsgKey[33:41], shortChanID.ToUint64()) + + var newMsgKey [33 + 8 + 2]byte + copy(newMsgKey[:41], oldMsgKey[:]) + binary.BigEndian.PutUint16(newMsgKey[41:43], uint16(msg.MsgType())) + + // Before the migration, we'll create the bucket where the messages + // should live and insert them. + beforeMigration := func(db *DB) { + var b bytes.Buffer + if err := msg.Encode(&b, 0); err != nil { + t.Fatalf("unable to serialize message: %v", err) + } + + err := db.Update(func(tx *bbolt.Tx) error { + messageStore, err := tx.CreateBucketIfNotExists( + messageStoreBucket, + ) + if err != nil { + return err + } + + return messageStore.Put(oldMsgKey[:], b.Bytes()) + }) + if err != nil { + t.Fatal(err) + } + } + + // After the migration, we'll make sure that: + // 1. We cannot find the message under its old key. + // 2. We can find the message under its new key. + // 3. The message matches the original. + afterMigration := func(db *DB) { + meta, err := db.FetchMeta(nil) + if err != nil { + t.Fatalf("unable to fetch db version: %v", err) + } + if meta.DbVersionNumber != 1 { + t.Fatalf("migration should have succeeded but didn't") + } + + var rawMsg []byte + err = db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return errors.New("message store bucket not " + + "found") + } + rawMsg = messageStore.Get(oldMsgKey[:]) + if rawMsg != nil { + t.Fatal("expected to not find message under " + + "old key, but did") + } + rawMsg = messageStore.Get(newMsgKey[:]) + if rawMsg == nil { + return fmt.Errorf("expected to find message " + + "under new key, but didn't") + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + gotMsg, err := lnwire.ReadMessage(bytes.NewReader(rawMsg), 0) + if err != nil { + t.Fatalf("unable to deserialize raw message: %v", err) + } + if !reflect.DeepEqual(msg, gotMsg) { + t.Fatalf("expected message: %v\ngot message: %v", + spew.Sdump(msg), spew.Sdump(gotMsg)) + } + } + + applyMigration( + t, beforeMigration, afterMigration, + migrateGossipMessageStoreKeys, false, + ) +} diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a8f15440..195773ed 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2,7 +2,6 @@ package discovery import ( "bytes" - "encoding/binary" "errors" "fmt" "runtime" @@ -13,7 +12,6 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -25,13 +23,6 @@ import ( ) var ( - // messageStoreKey is a key used to create a top level bucket in - // the gossiper database, used for storing messages that are to - // be sent to peers. Currently this is used for reliably sending - // AnnounceSignatures messages, by persisting them until a send - // operation has succeeded. - messageStoreKey = []byte("message-store") - // ErrGossiperShuttingDown is an error that is returned if the gossiper // is in the process of being shut down. ErrGossiperShuttingDown = errors.New("gossiper is shutting down") @@ -104,22 +95,20 @@ type Config struct { Broadcast func(skips map[routing.Vertex]struct{}, msg ...lnwire.Message) error - // SendToPeer is a function which allows the service to send a set of - // messages to a particular peer identified by the target public key. - SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error - - // FindPeer returns the actively registered peer for a given remote - // public key. An error is returned if the peer was not found or a - // shutdown has been requested. - FindPeer func(identityKey *btcec.PublicKey) (lnpeer.Peer, error) - // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. // // NOTE: The peerChan channel must be buffered. + // + // TODO(wilmer): use [33]byte to avoid unnecessary serializations. NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) + // NotifyWhenOffline is a function that allows the gossiper to be + // notified when a certain peer disconnects, allowing it to request a + // notification for when it reconnects. + NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} + // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. ProofMatureDelta uint32 @@ -133,9 +122,18 @@ type Config struct { // should check if we need re-broadcast any of our personal channels. RetransmitDelay time.Duration - // DB is a global boltdb instance which is needed to pass it in waiting - // proof storage to make waiting proofs persistent. - DB *channeldb.DB + // WaitingProofStore is a persistent storage of partial channel proof + // announcement messages. We use it to buffer half of the material + // needed to reconstruct a full authenticated channel announcement. + // Once we receive the other half the channel proof, we'll be able to + // properly validate it and re-broadcast it out to the network. + // + // TODO(wilmer): make interface to prevent channeldb dependency. + WaitingProofStore *channeldb.WaitingProofStore + + // MessageStore is a persistent storage of gossip messages which we will + // use to determine which messages need to be resent for a given peer. + MessageStore GossipMessageStore // AnnSigner is an instance of the MessageSigner interface which will // be used to manually sign any outgoing channel updates. The signer @@ -193,13 +191,6 @@ type AuthenticatedGossiper struct { prematureChannelUpdates map[uint64][]*networkMsg pChanUpdMtx sync.Mutex - // waitingProofs is a persistent storage of partial channel proof - // announcement messages. We use it to buffer half of the material - // needed to reconstruct a full authenticated channel announcement. - // Once we receive the other half the channel proof, we'll be able to - // properly validate it and re-broadcast it out to the network. - waitingProofs *channeldb.WaitingProofStore - // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the // networkHandler. @@ -229,18 +220,17 @@ type AuthenticatedGossiper struct { syncerMtx sync.RWMutex peerSyncers map[routing.Vertex]*gossipSyncer + // reliableSender is a subsystem responsible for handling reliable + // message send requests to peers. + reliableSender *reliableSender + sync.Mutex } // New creates a new AuthenticatedGossiper instance, initialized with the // passed configuration parameters. -func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { - storage, err := channeldb.NewWaitingProofStore(cfg.DB) - if err != nil { - return nil, err - } - - return &AuthenticatedGossiper{ +func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { + gossiper := &AuthenticatedGossiper{ selfKey: selfKey, cfg: &cfg, networkMsgs: make(chan *networkMsg), @@ -248,11 +238,19 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), prematureAnnouncements: make(map[uint32][]*networkMsg), prematureChannelUpdates: make(map[uint64][]*networkMsg), - waitingProofs: storage, channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), peerSyncers: make(map[routing.Vertex]*gossipSyncer), - }, nil + } + + gossiper.reliableSender = newReliableSender(&reliableSenderCfg{ + NotifyWhenOnline: cfg.NotifyWhenOnline, + NotifyWhenOffline: cfg.NotifyWhenOffline, + MessageStore: cfg.MessageStore, + IsMsgStale: gossiper.isMsgStale, + }) + + return gossiper } // SynchronizeNode sends a message to the service indicating it should @@ -411,11 +409,10 @@ func (d *AuthenticatedGossiper) Start() error { } d.bestHeight = height - // In case we had an AnnounceSignatures ready to be sent when the - // gossiper was last shut down, we must continue on our quest to - // deliver this message to our peer such that they can craft the - // full channel proof. - if err := d.resendAnnounceSignatures(); err != nil { + // Start the reliable sender. In case we had any pending messages ready + // to be sent when the gossiper was last shut down, we must continue on + // our quest to deliver them to their respective peers. + if err := d.reliableSender.Start(); err != nil { return err } @@ -443,6 +440,10 @@ func (d *AuthenticatedGossiper) Stop() { close(d.quit) d.wg.Wait() + + // We'll stop our reliable sender after all of the gossiper's goroutines + // have exited to ensure nothing can cause it to continue executing. + d.reliableSender.Stop() } // TODO(roasbeef): need method to get current gossip timestamp? @@ -808,138 +809,6 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { return msgs } -// resendAnnounceSignatures will inspect the messageStore database bucket for -// AnnounceSignatures messages that we recently tried to send to a peer. If the -// associated channels still not have the full channel proofs assembled, we -// will try to resend them. If we have the full proof, we can safely delete the -// message from the messageStore. -func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { - type msgTuple struct { - peer *btcec.PublicKey - msg *lnwire.AnnounceSignatures - dbKey []byte - } - - // Fetch all the AnnounceSignatures messages that was added to the - // database. - // - // TODO(halseth): database access should be abstracted - // behind interface. - var msgsResend []msgTuple - if err := d.cfg.DB.View(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(messageStoreKey) - if bucket == nil { - return nil - } - - if err := bucket.ForEach(func(k, v []byte) error { - // The database value represents the encoded - // AnnounceSignatures message. - r := bytes.NewReader(v) - msg := &lnwire.AnnounceSignatures{} - if err := msg.Decode(r, 0); err != nil { - return err - } - - // The first 33 bytes of the database key is the peer's - // public key. - peer, err := btcec.ParsePubKey(k[:33], btcec.S256()) - if err != nil { - return err - } - - // Make a copy of the database key corresponding to - // these AnnounceSignatures. - dbKey := make([]byte, len(k)) - copy(dbKey, k) - - t := msgTuple{peer, msg, dbKey} - - // Add the message to the slice, such that we can - // resend it after the database transaction is over. - msgsResend = append(msgsResend, t) - return nil - }); err != nil { - return err - } - return nil - }); err != nil { - return err - } - - // deleteMsg removes the message associated with the passed msgTuple - // from the messageStore. - deleteMsg := func(t msgTuple) error { - log.Debugf("Deleting message for chanID=%v from "+ - "messageStore", t.msg.ChannelID) - if err := d.cfg.DB.Update(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(messageStoreKey) - if bucket == nil { - return fmt.Errorf("bucket " + - "unexpectedly did not exist") - } - - return bucket.Delete(t.dbKey[:]) - }); err != nil { - return fmt.Errorf("Failed deleting message "+ - "from database: %v", err) - } - return nil - } - - // We now iterate over these messages, resending those that we don't - // have the full proof for, deleting the rest. - for _, t := range msgsResend { - // Check if the full channel proof exists in our graph. - chanInfo, _, _, err := d.cfg.Router.GetChannelByID( - t.msg.ShortChannelID) - if err != nil { - // If the channel cannot be found, it is most likely a - // leftover message for a channel that was closed. In - // this case we delete it from the message store. - log.Warnf("unable to fetch channel info for "+ - "chanID=%v from graph: %v. Will delete local"+ - "proof from database", - t.msg.ChannelID, err) - if err := deleteMsg(t); err != nil { - return err - } - continue - } - - // 1. If the full proof does not exist in the graph, it means - // that we haven't received the remote proof yet (or that we - // crashed before able to assemble the full proof). Since the - // remote node might think they have delivered their proof to - // us, we will resend _our_ proof to trigger a resend on their - // part: they will then be able to assemble and send us the - // full proof. - if chanInfo.AuthProof == nil { - err := d.sendAnnSigReliably(t.msg, t.peer) - if err != nil { - return err - } - continue - } - - // 2. If the proof does exist in the graph, we have - // successfully received the remote proof and assembled the - // full proof. In this case we can safely delete the local - // proof from the database. In case the remote hasn't been able - // to assemble the full proof yet (maybe because of a crash), - // we will send them the full proof if we notice that they - // retry sending their half proof. - if chanInfo.AuthProof != nil { - log.Debugf("Deleting message for chanID=%v from "+ - "messageStore", t.msg.ChannelID) - if err := deleteMsg(t); err != nil { - return err - } - } - } - return nil -} - // findGossipSyncer is a utility method used by the gossiper to locate the // gossip syncer for an inbound message so we can properly dispatch the // incoming message. If a gossip syncer isn't found, then one will be created @@ -2053,30 +1922,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // so we'll try sending the update directly to the remote peer. if !nMsg.isRemote && chanInfo.AuthProof == nil { // Get our peer's public key. - var remotePub *btcec.PublicKey + var remotePubKey [33]byte switch { case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: - remotePub, _ = chanInfo.NodeKey2() + remotePubKey = chanInfo.NodeKey2Bytes case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: - remotePub, _ = chanInfo.NodeKey1() + remotePubKey = chanInfo.NodeKey1Bytes } - sPeer, err := d.cfg.FindPeer(remotePub) + // Now, we'll attempt to send the channel update message + // reliably to the remote peer in the background, so + // that we don't block if the peer happens to be offline + // at the moment. + err := d.reliableSender.sendMessage(msg, remotePubKey) if err != nil { - log.Errorf("unable to send channel update -- "+ - "could not find peer %x: %v", - remotePub.SerializeCompressed(), - err) - } else { - // Send ChannelUpdate directly to remotePeer. - // TODO(halseth): make reliable send? - err = sPeer.SendMessage(false, msg) - if err != nil { - log.Errorf("unable to send channel "+ - "update message to peer %x: %v", - remotePub.SerializeCompressed(), - err) - } + err := fmt.Errorf("unable to reliably send %v "+ + "for channel=%v to peer=%x: %v", + msg.MsgType(), msg.ShortChannelID, + remotePubKey, err) + nMsg.err <- err + return nil } } @@ -2146,7 +2011,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // TODO(andrew.shvv) this is dangerous because remote // node might rewrite the waiting proof. proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) - if err := d.waitingProofs.Add(proof); err != nil { + err := d.cfg.WaitingProofStore.Add(proof) + if err != nil { err := fmt.Errorf("unable to store "+ "the proof for short_chan_id=%v: %v", shortChanID, err) @@ -2182,21 +2048,21 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // so they can also reconstruct the full channel // announcement. if !nMsg.isRemote { - var remotePeer *btcec.PublicKey + var remotePubKey [33]byte if isFirstNode { - remotePeer, _ = chanInfo.NodeKey2() + remotePubKey = chanInfo.NodeKey2Bytes } else { - remotePeer, _ = chanInfo.NodeKey1() + remotePubKey = chanInfo.NodeKey1Bytes } // Since the remote peer might not be online // we'll call a method that will attempt to // deliver the proof when it comes online. - err := d.sendAnnSigReliably(msg, remotePeer) + err := d.reliableSender.sendMessage(msg, remotePubKey) if err != nil { - err := fmt.Errorf("unable to send reliably "+ - "to remote for short_chan_id=%v: %v", - shortChanID, err) - log.Error(err) + err := fmt.Errorf("unable to reliably send %v "+ + "for channel=%v to peer=%x: %v", + msg.MsgType(), msg.ShortChannelID, + remotePubKey, err) nMsg.err <- err return nil } @@ -2260,7 +2126,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // proof than we should store it this one, and wait for // opposite to be received. proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) - oppositeProof, err := d.waitingProofs.Get(proof.OppositeKey()) + oppositeProof, err := d.cfg.WaitingProofStore.Get( + proof.OppositeKey(), + ) if err != nil && err != channeldb.ErrWaitingProofNotFound { err := fmt.Errorf("unable to get "+ "the opposite proof for short_chan_id=%v: %v", @@ -2271,7 +2139,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } if err == channeldb.ErrWaitingProofNotFound { - if err := d.waitingProofs.Add(proof); err != nil { + err := d.cfg.WaitingProofStore.Add(proof) + if err != nil { err := fmt.Errorf("unable to store "+ "the proof for short_chan_id=%v: %v", shortChanID, err) @@ -2340,7 +2209,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - err = d.waitingProofs.Remove(proof.OppositeKey()) + err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey()) if err != nil { err := fmt.Errorf("unable remove opposite proof "+ "for the channel with chanID=%v: %v", @@ -2425,91 +2294,49 @@ func (d *AuthenticatedGossiper) fetchNodeAnn( return node.NodeAnnouncement(true) } -// sendAnnSigReliably will try to send the provided local AnnounceSignatures -// to the remote peer, waiting for it to come online if necessary. This -// method returns after adding the message to persistent storage, such -// that the caller knows that the message will be delivered at one point. -func (d *AuthenticatedGossiper) sendAnnSigReliably( - msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { +// isMsgStale determines whether a message retrieved from the backing +// MessageStore is seen as stale by the current graph. +func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { + switch msg := msg.(type) { + case *lnwire.AnnounceSignatures: + chanInfo, _, _, err := d.cfg.Router.GetChannelByID( + msg.ShortChannelID, + ) - // We first add this message to the database, such that in case - // we do not succeed in sending it to the peer, we'll fetch it - // from the DB next time we start, and retry. We use the peer ID - // + shortChannelID as key, as there possibly is more than one - // channel opening in progress to the same peer. - var key [41]byte - copy(key[:33], remotePeer.SerializeCompressed()) - binary.BigEndian.PutUint64(key[33:], msg.ShortChannelID.ToUint64()) - - err := d.cfg.DB.Update(func(tx *bbolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists(messageStoreKey) + // If the channel cannot be found, it is most likely a leftover + // message for a channel that was closed, so we can consider it + // stale. + if err == channeldb.ErrEdgeNotFound { + return true + } if err != nil { - return err + log.Debugf("Unable to retrieve channel=%v from graph: "+ + "%v", err) + return false } - // Encode the AnnounceSignatures message. - var b bytes.Buffer - if err := msg.Encode(&b, 0); err != nil { - return err + // If the proof exists in the graph, then we have successfully + // received the remote proof and assembled the full proof, so we + // can safely delete the local proof from the database. + return chanInfo.AuthProof != nil + + case *lnwire.ChannelUpdate: + // The MessageStore will always store the latest ChannelUpdate + // as it is not aware of its timestamp (by design), so it will + // never be stale. We should still however check if the channel + // is part of our graph. If it's not, we can mark it as stale. + _, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + if err != nil && err != channeldb.ErrEdgeNotFound { + log.Debugf("Unable to retrieve channel=%v from graph: "+ + "%v", err) } + return err == channeldb.ErrEdgeNotFound - // Add the encoded message to the database using the peer - // + shortChanID as key. - return bucket.Put(key[:], b.Bytes()) - - }) - if err != nil { - return err + default: + // We'll make sure to not mark any unsupported messages as stale + // to ensure they are not removed. + return false } - - // We have succeeded adding the message to the database. We now launch - // a goroutine that will keep on trying sending the message to the - // remote peer until it succeeds, or the gossiper shuts down. In case - // of success, the message will be removed from the database. - d.wg.Add(1) - go func() { - defer d.wg.Done() - for { - log.Debugf("Sending AnnounceSignatures for channel "+ - "%v to remote peer %x", msg.ChannelID, - remotePeer.SerializeCompressed()) - err := d.cfg.SendToPeer(remotePeer, msg) - if err == nil { - // Sending succeeded, we can - // continue the flow. - break - } - - log.Errorf("unable to send AnnounceSignatures message "+ - "to peer(%x): %v. Will retry when online.", - remotePeer.SerializeCompressed(), err) - - peerChan := make(chan lnpeer.Peer, 1) - d.cfg.NotifyWhenOnline(remotePeer, peerChan) - - select { - case <-peerChan: - // Retry sending. - log.Infof("Peer %x reconnected. Retry sending"+ - " AnnounceSignatures.", - remotePeer.SerializeCompressed()) - - case <-d.quit: - log.Infof("Gossiper shutting down, did not " + - "send AnnounceSignatures.") - return - } - } - - log.Infof("Sent channel announcement proof to remote peer: %x", - remotePeer.SerializeCompressed()) - }() - - // This method returns after the message has been added to the database, - // such that the caller don't have to wait until the message is actually - // delivered, but can be assured that it will be delivered eventually - // when this method returns. - return nil } // updateChannel creates a new fully signed update for the channel, and updates diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index be029878..560fdbb1 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -108,28 +108,36 @@ func (n *mockSigner) SignMessage(pubKey *btcec.PublicKey, } type mockGraphSource struct { - nodes []*channeldb.LightningNode - infos map[uint64]*channeldb.ChannelEdgeInfo - edges map[uint64][]*channeldb.ChannelEdgePolicy bestHeight uint32 + + mu sync.Mutex + nodes []channeldb.LightningNode + infos map[uint64]channeldb.ChannelEdgeInfo + edges map[uint64][]channeldb.ChannelEdgePolicy } func newMockRouter(height uint32) *mockGraphSource { return &mockGraphSource{ bestHeight: height, - infos: make(map[uint64]*channeldb.ChannelEdgeInfo), - edges: make(map[uint64][]*channeldb.ChannelEdgePolicy), + infos: make(map[uint64]channeldb.ChannelEdgeInfo), + edges: make(map[uint64][]channeldb.ChannelEdgePolicy), } } var _ routing.ChannelGraphSource = (*mockGraphSource)(nil) func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error { - r.nodes = append(r.nodes, node) + r.mu.Lock() + defer r.mu.Unlock() + + r.nodes = append(r.nodes, *node) return nil } func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error { + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.infos[info.ChannelID]; ok { return errors.New("info already exist") } @@ -137,15 +145,15 @@ func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error { // Usually, the capacity is fetched in the router from the funding txout. // Since the mockGraphSource can't access the txout, assign a default value. info.Capacity = maxBtcFundingAmount - r.infos[info.ChannelID] = info + r.infos[info.ChannelID] = *info return nil } func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error { - r.edges[edge.ChannelID] = append( - r.edges[edge.ChannelID], - edge, - ) + r.mu.Lock() + defer r.mu.Unlock() + + r.edges[edge.ChannelID] = append(r.edges[edge.ChannelID], *edge) return nil } @@ -159,11 +167,19 @@ func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) { func (r *mockGraphSource) AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error { - info, ok := r.infos[chanID.ToUint64()] + + r.mu.Lock() + defer r.mu.Unlock() + + chanIDInt := chanID.ToUint64() + info, ok := r.infos[chanIDInt] if !ok { return errors.New("channel does not exist") } + info.AuthProof = proof + r.infos[chanIDInt] = info + return nil } @@ -186,6 +202,9 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) ( *channeldb.ChannelEdgePolicy, *channeldb.ChannelEdgePolicy, error) { + r.mu.Lock() + defer r.mu.Unlock() + chanInfo, ok := r.infos[chanID.ToUint64()] if !ok { return nil, nil, nil, channeldb.ErrEdgeNotFound @@ -193,14 +212,16 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) ( edges := r.edges[chanID.ToUint64()] if len(edges) == 0 { - return chanInfo, nil, nil, nil + return &chanInfo, nil, nil, nil } if len(edges) == 1 { - return chanInfo, edges[0], nil, nil + edge1 := edges[0] + return &chanInfo, &edge1, nil, nil } - return chanInfo, edges[0], edges[1], nil + edge1, edge2 := edges[0], edges[1] + return &chanInfo, &edge1, &edge2, nil } func (r *mockGraphSource) FetchLightningNode( @@ -208,7 +229,7 @@ func (r *mockGraphSource) FetchLightningNode( for _, node := range r.nodes { if bytes.Equal(nodePub[:], node.PubKeyBytes[:]) { - return node, nil + return &node, nil } } @@ -218,6 +239,9 @@ func (r *mockGraphSource) FetchLightningNode( // IsStaleNode returns true if the graph source has a node announcement for the // target node with a more recent timestamp. func (r *mockGraphSource) IsStaleNode(nodePub routing.Vertex, timestamp time.Time) bool { + r.mu.Lock() + defer r.mu.Unlock() + for _, node := range r.nodes { if node.PubKeyBytes == nodePub { return node.LastUpdate.After(timestamp) || @@ -258,6 +282,9 @@ func (r *mockGraphSource) IsPublicNode(node routing.Vertex) (bool, error) { // IsKnownEdge returns true if the graph source already knows of the passed // channel ID. func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool { + r.mu.Lock() + defer r.mu.Unlock() + _, ok := r.infos[chanID.ToUint64()] return ok } @@ -267,6 +294,9 @@ func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool { func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool { + r.mu.Lock() + defer r.mu.Unlock() + edges, ok := r.edges[chanID.ToUint64()] if !ok { return false @@ -593,8 +623,14 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil, nil, err } + waitingProofStore, err := channeldb.NewWaitingProofStore(db) + if err != nil { + cleanUpDb() + return nil, nil, err + } + broadcastedMessage := make(chan msgWithSenders, 10) - gossiper, err := New(Config{ + gossiper := New(Config{ Notifier: notifier, Broadcast: func(senders map[routing.Vertex]struct{}, msgs ...lnwire.Message) error { @@ -608,22 +644,22 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil }, - SendToPeer: func(target *btcec.PublicKey, msg ...lnwire.Message) error { - return nil + NotifyWhenOnline: func(target *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + peerChan <- &mockPeer{target, nil, nil} }, - FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, nil, nil}, nil + NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { + c := make(chan struct{}) + return c }, - Router: router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - DB: db, + Router: router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: waitingProofStore, + MessageStore: newMockMessageStore(), }, nodeKeyPub1) - if err != nil { - cleanUpDb() - return nil, nil, fmt.Errorf("unable to create router %v", err) - } + if err := gossiper.Start(); err != nil { cleanUpDb() return nil, nil, fmt.Errorf("unable to start router: %v", err) @@ -846,21 +882,16 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } defer cleanup() - // Set up a channel that we can use to inspect the messages - // sent directly fromn the gossiper. + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { select { - case sentMsgs <- msg[0]: + case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") } - return nil } batch, err := createAnnouncements(0) @@ -933,9 +964,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -992,7 +1021,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1025,7 +1054,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1050,19 +1079,16 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } defer cleanup() - // Set up a channel that we can use to inspect the messages - // sent directly from the gossiper. + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + select { - case sentMsgs <- msg[0]: + case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") } - return nil } batch, err := createAnnouncements(0) @@ -1095,7 +1121,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1164,9 +1190,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -1217,9 +1241,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // The local proof should be sent to the remote peer. select { case msg := <-sentMsgs: - if msg != batch.localProofAnn { - t.Fatalf("expected local proof to be sent, got %v", msg) - } + assertMessage(t, batch.localProofAnn, msg) case <-time.After(2 * time.Second): t.Fatalf("local proof was not sent to peer") } @@ -1235,7 +1257,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(p *channeldb.WaitingProof) error { number++ return nil @@ -1249,234 +1271,10 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } } -// Test that sending AnnounceSignatures to remote peer will continue -// to be tried until the peer comes online. -func TestSignatureAnnouncementRetry(t *testing.T) { - t.Parallel() - - ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) - if err != nil { - t.Fatalf("can't create context: %v", err) - } - defer cleanup() - - batch, err := createAnnouncements(0) - if err != nil { - t.Fatalf("can't generate announcements: %v", err) - } - - localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) - if err != nil { - t.Fatalf("unable to parse pubkey: %v", err) - } - remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) - if err != nil { - t.Fatalf("unable to parse pubkey: %v", err) - } - remotePeer := &mockPeer{remoteKey, nil, nil} - - // Recreate lightning network topology. Initialize router with channel - // between two nodes. - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.localChanAnn, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process channel ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.chanUpdAnn1, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process channel update: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel update announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.nodeAnn1, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process node ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("node announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") - } - if err != nil { - t.Fatalf("unable to process channel update: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel update announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.nodeAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") - } - if err != nil { - t.Fatalf("unable to process node ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("node announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - // Make the SendToPeer fail, simulating the peer being offline. - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - return fmt.Errorf("intentional error in SendToPeer") - } - - // We expect the gossiper to register for a notification when the peer - // comes back online, so keep track of the channel it wants to get - // notified on. - notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- lnpeer.Peer) { - notifyPeers <- connectedChan - } - - // Pretending that we receive local channel announcement from funding - // manager, thereby kick off the announcement exchange process. - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.localProofAnn, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process local proof: %v", err) - } - - // Since sending this local announcement proof to the remote will fail, - // the gossiper should register for a notification when the remote is - // online again. - var conChan chan<- lnpeer.Peer - select { - case conChan = <-notifyPeers: - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not ask to get notified when " + - "peer is online") - } - - // Since both proofs are not yet exchanged, no message should be - // broadcasted yet. - select { - case <-ctx.broadcastedMessage: - t.Fatal("announcements were broadcast") - case <-time.After(2 * trickleDelay): - } - - number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( - func(*channeldb.WaitingProof) error { - number++ - return nil - }, - ); err != nil { - t.Fatalf("unable to retrieve objects from store: %v", err) - } - - if number != 1 { - t.Fatal("wrong number of objects in storage") - } - - // When the peer comes online, the gossiper gets notified, and should - // retry sending the AnnounceSignatures. We make the SendToPeer - // method work again. - sentToPeer := make(chan lnwire.Message, 1) - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - sentToPeer <- msg[0] - return nil - } - - // Notify that peer is now online. This should trigger a new call - // to SendToPeer. - close(conChan) - - select { - case <-sentToPeer: - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not send message when peer came online") - } - - // Now give the gossiper the remote proof. This should trigger a - // broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate). - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process remote proof: %v", err) - } - - for i := 0; i < 5; i++ { - select { - case <-ctx.broadcastedMessage: - case <-time.After(time.Second): - t.Fatal("announcement wasn't broadcast") - } - } - - number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( - func(*channeldb.WaitingProof) error { - number++ - return nil - }, - ); err != nil && err != channeldb.ErrWaitingProofNotFound { - t.Fatalf("unable to retrieve objects from store: %v", err) - } - - if number != 0 { - t.Fatal("waiting proof should be removed from storage") - } -} - -// Test that if we restart the gossiper, it will retry sending the -// AnnounceSignatures to the peer if it did not succeed before -// shutting down, and the full channel proof is not yet assembled. +// TestSignatureAnnouncementRetryAtStartup tests that if we restart the +// gossiper, it will retry sending the AnnounceSignatures to the peer if it did +// not succeed before shutting down, and the full channel proof is not yet +// assembled. func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Parallel() @@ -1499,7 +1297,31 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } - remotePeer := &mockPeer{remoteKey, nil, nil} + + // Set up a channel to intercept the messages sent to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } + + // Override NotifyWhenOffline to return the channel which will notify + // the gossiper that the peer is offline. We'll use this to signal that + // the peer is offline so that the gossiper requests a notification when + // it comes back online. + notifyOffline := make(chan chan struct{}, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( + _ [33]byte) <-chan struct{} { + + c := make(chan struct{}) + notifyOffline <- c + return c + } // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1534,6 +1356,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(1 * time.Second): + t.Fatal("gossiper did not send channel update to peer") + } select { case err = <-ctx.gossiper.ProcessLocalAnnouncement( @@ -1583,17 +1411,27 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { case <-time.After(2 * trickleDelay): } - // Make the SendToPeerFail, simulating the peer being offline. - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - return fmt.Errorf("intentional error in SendToPeer") - } + // Since the reliable send to the remote peer of the local channel proof + // requires a notification when the peer comes online, we'll capture the + // channel through which it gets sent to control exactly when to + // dispatch it. notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } + // Before sending the local channel proof, we'll notify that the peer is + // offline, so that it's not sent to the peer. + var peerOffline chan struct{} + select { + case peerOffline = <-notifyOffline: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not request notification for when " + + "peer disconnects") + } + close(peerOffline) + // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { @@ -1606,24 +1444,32 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatalf("unable to process :%v", err) } - // Since sending to the remote peer will fail, the gossiper should - // register for a notification when it comes back online. - var conChan chan<- lnpeer.Peer + // The gossiper should register for a notification for when the peer is + // online. select { - case conChan = <-notifyPeers: + case <-notifyPeers: case <-time.After(2 * time.Second): t.Fatalf("gossiper did not ask to get notified when " + "peer is online") } + // The proof should not be broadcast yet since we're still missing the + // remote party's. select { case <-ctx.broadcastedMessage: t.Fatal("announcements were broadcast") case <-time.After(2 * trickleDelay): } + // And it shouldn't be sent to the peer either as they are offline. + select { + case msg := <-sentToPeer: + t.Fatalf("received unexpected message: %v", spew.Sdump(msg)) + case <-time.After(time.Second): + } + number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1636,25 +1482,21 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatal("wrong number of objects in storage") } - // Shut down gossiper, and restart. This should trigger a new attempt - // to send the message to the peer. + // Restart the gossiper and restore its original NotifyWhenOnline and + // NotifyWhenOffline methods. This should trigger a new attempt to send + // the message to the peer. ctx.gossiper.Stop() - gossiper, err := New(Config{ - Notifier: ctx.gossiper.cfg.Notifier, - Broadcast: ctx.gossiper.cfg.Broadcast, - SendToPeer: func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - return fmt.Errorf("intentional error in SendToPeer") - }, - NotifyWhenOnline: func(peer *btcec.PublicKey, - connectedChan chan<- lnpeer.Peer) { - notifyPeers <- connectedChan - }, - Router: ctx.gossiper.cfg.Router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - DB: ctx.gossiper.cfg.DB, + gossiper := New(Config{ + Notifier: ctx.gossiper.cfg.Notifier, + Broadcast: ctx.gossiper.cfg.Broadcast, + NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, + NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, + Router: ctx.gossiper.cfg.Router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, + MessageStore: ctx.gossiper.cfg.MessageStore, }, ctx.gossiper.selfKey) if err != nil { t.Fatalf("unable to recreate gossiper: %v", err) @@ -1665,38 +1507,39 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { defer gossiper.Stop() ctx.gossiper = gossiper + remotePeer.quit = ctx.gossiper.quit - // After starting up, the gossiper will see that it has a waitingproof - // in the database, and will retry sending its part to the remote. Since - // SendToPeer will fail again, it should register for a notification - // when the peer comes online. + // After starting up, the gossiper will see that it has a proof in the + // WaitingProofStore, and will retry sending its part to the remote. + // It should register for a notification for when the peer is online. + var peerChan chan<- lnpeer.Peer select { - case conChan = <-notifyPeers: + case peerChan = <-notifyPeers: case <-time.After(2 * time.Second): t.Fatalf("gossiper did not ask to get notified when " + "peer is online") } - // Fix the SendToPeer method. - sentToPeer := make(chan lnwire.Message, 1) - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { + // Notify that peer is now online. This should allow the proof to be + // sent. + peerChan <- remotePeer + +out: + for { select { - case sentToPeer <- msg[0]: - case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") + case msg := <-sentToPeer: + // Since the ChannelUpdate will also be resent as it is + // sent reliably, we'll need to filter it out. + if _, ok := msg.(*lnwire.AnnounceSignatures); !ok { + continue + } + + assertMessage(t, batch.localProofAnn, msg) + break out + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer " + + "came online") } - - return nil - } - // Notify that peer is now online. This should trigger a new call - // to SendToPeer. - close(conChan) - - select { - case <-sentToPeer: - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not send message when peer came online") } // Now exchanging the remote channel proof, the channel announcement @@ -1721,7 +1564,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1735,10 +1578,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } } -// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a -// remote proof is received when we already have the full proof, -// the gossiper will send the full proof (ChannelAnnouncement) to -// the remote peer. +// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a remote +// proof is received when we already have the full proof, the gossiper will send +// the full proof (ChannelAnnouncement) to the remote peer. func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Parallel() @@ -1761,7 +1603,19 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } - remotePeer := &mockPeer{remoteKey, nil, nil} + + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1796,6 +1650,12 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not send channel update to remove peer") + } select { case err = <-ctx.gossiper.ProcessLocalAnnouncement( @@ -1845,27 +1705,6 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { case <-time.After(2 * trickleDelay): } - // Set up a channel we can use to inspect messages sent by the - // gossiper to the remote peer. - sentToPeer := make(chan lnwire.Message, 1) - remotePeer.sentMsgs = sentToPeer - remotePeer.quit = ctx.gossiper.quit - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - select { - case <-ctx.gossiper.quit: - return fmt.Errorf("gossiper shutting down") - case sentToPeer <- msg[0]: - } - return nil - } - - notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- lnpeer.Peer) { - notifyPeers <- connectedChan - } - // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { @@ -1893,9 +1732,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // We expect the gossiper to send this message to the remote peer. select { case msg := <-sentToPeer: - if msg != batch.localProofAnn { - t.Fatalf("wrong message sent to peer: %v", msg) - } + assertMessage(t, batch.localProofAnn, msg) case <-time.After(2 * time.Second): t.Fatal("did not send local proof to peer") } @@ -1910,7 +1747,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -2321,10 +2158,9 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { } } -// TestReceiveRemoteChannelUpdateFirst tests that if we receive a -// ChannelUpdate from the remote before we have processed our -// own ChannelAnnouncement, it will be reprocessed later, after -// our ChannelAnnouncement. +// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate +// from the remote before we have processed our own ChannelAnnouncement, it will +// be reprocessed later, after our ChannelAnnouncement. func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Parallel() @@ -2334,21 +2170,6 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } defer cleanup() - // Set up a channel that we can use to inspect the messages - // sent directly fromn the gossiper. - sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { - select { - case sentMsgs <- msg[0]: - case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") - } - return nil - } - batch, err := createAnnouncements(0) if err != nil { t.Fatalf("can't generate announcements: %v", err) @@ -2362,7 +2183,19 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } - remotePeer := &mockPeer{remoteKey, nil, nil} + + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. + sentMsgs := make(chan lnwire.Message, 10) + remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } // Recreate the case where the remote node is sending us its ChannelUpdate // before we have been able to process our own ChannelAnnouncement and @@ -2441,9 +2274,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -2492,7 +2323,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -2521,7 +2352,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -2862,41 +2693,195 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { } } -// mockPeer implements the lnpeer.Peer interface and is used to test the -// gossiper's interaction with peers. -type mockPeer struct { - pk *btcec.PublicKey - sentMsgs chan lnwire.Message - quit chan struct{} -} +// TestSendChannelUpdateReliably ensures that the latest channel update for a +// channel is always sent upon the remote party reconnecting. +func TestSendChannelUpdateReliably(t *testing.T) { + t.Parallel() -var _ lnpeer.Peer = (*mockPeer)(nil) + // We'll start by creating our test context and a batch of + // announcements. + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() -func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { - if p.sentMsgs == nil && p.quit == nil { - return nil + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) } - for _, msg := range msgs { + // We'll also create two keys, one for ourselves and another for the + // remote party. + localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + + // Since we first wait to be notified of the peer before attempting to + // send the message, we'll overwrite NotifyWhenOnline and + // NotifyWhenOffline to instead give us access to the channel that will + // receive the notification. + notifyOnline := make(chan chan<- lnpeer.Peer, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + notifyOnline <- peerChan + } + notifyOffline := make(chan chan struct{}, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( + _ [33]byte) <-chan struct{} { + + c := make(chan struct{}, 1) + notifyOffline <- c + return c + } + + // assertReceivedChannelUpdate is a helper closure we'll use to + // determine if the correct channel update was received. + assertReceivedChannelUpdate := func(channelUpdate *lnwire.ChannelUpdate) { + t.Helper() + select { - case p.sentMsgs <- msg: - case <-p.quit: + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(2 * time.Second): + t.Fatal("did not send local channel update to peer") } } - return nil + // Process the channel announcement for which we'll send a channel + // update for. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.localChanAnn, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel announcement") + } + if err != nil { + t.Fatalf("unable to process local channel announcement: %v", err) + } + + // It should not be broadcast due to not having an announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Now, we'll process the channel update. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.chanUpdAnn1, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel update") + } + if err != nil { + t.Fatalf("unable to process local channel update: %v", err) + } + + // It should also not be broadcast due to the announcement not having an + // announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // It should however send it to the peer directly. In order to do so, + // it'll request a notification for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "connection") + } + + // We can go ahead and notify the peer, which should trigger the message + // to be sent. + peerChan <- remotePeer + assertReceivedChannelUpdate(batch.chanUpdAnn1) + + // The gossiper should now request a notification for when the peer + // disconnects. We'll also trigger this now. + var offlineChan chan struct{} + select { + case offlineChan = <-notifyOffline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "disconnection") + } + + close(offlineChan) + + // Since it's offline, the gossiper should request another notification + // for when it comes back online. + select { + case peerChan = <-notifyOnline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "connection") + } + + // Now that the remote peer is offline, we'll send a new channel update. + prevTimestamp := batch.chanUpdAnn1.Timestamp + newChanUpdate, err := createUpdateAnnouncement( + 0, 0, nodeKeyPriv1, prevTimestamp+1, + ) + if err != nil { + t.Fatalf("unable to create new channel update: %v", err) + } + + // With the new update created, we'll go ahead and process it. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.chanUpdAnn1, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel update") + } + if err != nil { + t.Fatalf("unable to process local channel update: %v", err) + } + + // It should also not be broadcast due to the announcement not having an + // announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // The message should not be sent since the peer remains offline. + select { + case msg := <-sentToPeer: + t.Fatalf("received unexpected message: %v", spew.Sdump(msg)) + case <-time.After(time.Second): + } + + // Finally, we'll notify the peer is online and ensure the new channel + // update is received. + peerChan <- remotePeer + assertReceivedChannelUpdate(newChanUpdate) } -func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error { - return nil -} -func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } -func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } -func (p *mockPeer) PubKey() [33]byte { - var pubkey [33]byte - copy(pubkey[:], p.pk.SerializeCompressed()) - return pubkey -} -func (p *mockPeer) Address() net.Addr { return nil } -func (p *mockPeer) QuitSignal() <-chan struct{} { - return p.quit + +func assertMessage(t *testing.T, expected, got lnwire.Message) { + t.Helper() + + if !reflect.DeepEqual(expected, got) { + t.Fatalf("expected: %v\ngot: %v", spew.Sdump(expected), + spew.Sdump(got)) + } } diff --git a/discovery/message_store.go b/discovery/message_store.go new file mode 100644 index 00000000..e0c10a86 --- /dev/null +++ b/discovery/message_store.go @@ -0,0 +1,294 @@ +package discovery + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +var ( + // messageStoreBucket is a key used to create a top level bucket in the + // gossiper database, used for storing messages that are to be sent to + // peers. Upon restarts, these messages will be read and resent to their + // respective peers. + // + // maps: + // pubKey (33 bytes) + msgShortChanID (8 bytes) + msgType (2 bytes) -> msg + messageStoreBucket = []byte("message-store") + + // ErrUnsupportedMessage is an error returned when we attempt to add a + // message to the store that is not supported. + ErrUnsupportedMessage = errors.New("unsupported message type") + + // ErrCorruptedMessageStore indicates that the on-disk bucketing + // structure has altered since the gossip message store instance was + // initialized. + ErrCorruptedMessageStore = errors.New("gossip message store has been " + + "corrupted") +) + +// GossipMessageStore is a store responsible for storing gossip messages which +// we should reliably send to our peers. +type GossipMessageStore interface { + // AddMessage adds a message to the store for this peer. + AddMessage(lnwire.Message, [33]byte) error + + // DeleteMessage deletes a message from the store for this peer. + DeleteMessage(lnwire.Message, [33]byte) error + + // Messages returns the total set of messages that exist within the + // store for all peers. + Messages() (map[[33]byte][]lnwire.Message, error) + + // Peers returns the public key of all peers with messages within the + // store. + Peers() (map[[33]byte]struct{}, error) + + // MessagesForPeer returns the set of messages that exists within the + // store for the given peer. + MessagesForPeer([33]byte) ([]lnwire.Message, error) +} + +// MessageStore is an implementation of the GossipMessageStore interface backed +// by a channeldb instance. By design, this store will only keep the latest +// version of a message (like in the case of multiple ChannelUpdate's) for a +// channel with a peer. +type MessageStore struct { + db *channeldb.DB +} + +// A compile-time assertion to ensure messageStore implements the +// GossipMessageStore interface. +var _ GossipMessageStore = (*MessageStore)(nil) + +// NewMessageStore creates a new message store backed by a channeldb instance. +func NewMessageStore(db *channeldb.DB) (*MessageStore, error) { + err := db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(messageStoreBucket) + return err + }) + if err != nil { + return nil, fmt.Errorf("unable to create required buckets: %v", + err) + } + + return &MessageStore{db}, nil +} + +// msgShortChanID retrieves the short channel ID of the message. +func msgShortChanID(msg lnwire.Message) (lnwire.ShortChannelID, error) { + var shortChanID lnwire.ShortChannelID + switch msg := msg.(type) { + case *lnwire.AnnounceSignatures: + shortChanID = msg.ShortChannelID + case *lnwire.ChannelUpdate: + shortChanID = msg.ShortChannelID + default: + return shortChanID, ErrUnsupportedMessage + } + + return shortChanID, nil +} + +// messageStoreKey constructs the database key for the message to be stored. +func messageStoreKey(msg lnwire.Message, peerPubKey [33]byte) ([]byte, error) { + shortChanID, err := msgShortChanID(msg) + if err != nil { + return nil, err + } + + var k [33 + 8 + 2]byte + copy(k[:33], peerPubKey[:]) + binary.BigEndian.PutUint64(k[33:41], shortChanID.ToUint64()) + binary.BigEndian.PutUint16(k[41:43], uint16(msg.MsgType())) + + return k[:], nil +} + +// AddMessage adds a message to the store for this peer. +func (s *MessageStore) AddMessage(msg lnwire.Message, peerPubKey [33]byte) error { + // Construct the key for which we'll find this message with in the store. + msgKey, err := messageStoreKey(msg, peerPubKey) + if err != nil { + return err + } + + // Serialize the message with its wire encoding. + var b bytes.Buffer + if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil { + return err + } + + return s.db.Batch(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + return messageStore.Put(msgKey, b.Bytes()) + }) +} + +// DeleteMessage deletes a message from the store for this peer. +func (s *MessageStore) DeleteMessage(msg lnwire.Message, + peerPubKey [33]byte) error { + + // Construct the key for which we'll find this message with in the + // store. + msgKey, err := messageStoreKey(msg, peerPubKey) + if err != nil { + return err + } + + return s.db.Batch(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + // In the event that we're attempting to delete a ChannelUpdate + // from the store, we'll make sure that we're actually deleting + // the correct one as it can be overwritten. + if msg, ok := msg.(*lnwire.ChannelUpdate); ok { + // Deleting a value from a bucket that doesn't exist + // acts as a NOP, so we'll return if a message doesn't + // exist under this key. + v := messageStore.Get(msgKey) + if v == nil { + return nil + } + + dbMsg, err := lnwire.ReadMessage(bytes.NewReader(v), 0) + if err != nil { + return err + } + + // If the timestamps don't match, then the update stored + // should be the latest one, so we'll avoid deleting it. + if msg.Timestamp != dbMsg.(*lnwire.ChannelUpdate).Timestamp { + return nil + } + } + + return messageStore.Delete(msgKey) + }) +} + +// readMessage reads a message from its serialized form and ensures its +// supported by the current version of the message store. +func readMessage(msgBytes []byte) (lnwire.Message, error) { + msg, err := lnwire.ReadMessage(bytes.NewReader(msgBytes), 0) + if err != nil { + return nil, err + } + + // Check if the message is supported by the store. We can reuse the + // check for ShortChannelID as its a dependency on messages stored. + if _, err := msgShortChanID(msg); err != nil { + return nil, err + } + + return msg, nil +} + +// Messages returns the total set of messages that exist within the store for +// all peers. +func (s *MessageStore) Messages() (map[[33]byte][]lnwire.Message, error) { + msgs := make(map[[33]byte][]lnwire.Message) + err := s.db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + return messageStore.ForEach(func(k, v []byte) error { + var pubKey [33]byte + copy(pubKey[:], k[:33]) + + // Deserialize the message from its raw bytes and filter + // out any which are not currently supported by the + // store. + msg, err := readMessage(v) + if err == ErrUnsupportedMessage { + return nil + } + if err != nil { + return err + } + + msgs[pubKey] = append(msgs[pubKey], msg) + return nil + }) + }) + if err != nil { + return nil, err + } + + return msgs, nil +} + +// MessagesForPeer returns the set of messages that exists within the store for +// the given peer. +func (s *MessageStore) MessagesForPeer( + peerPubKey [33]byte) ([]lnwire.Message, error) { + + var msgs []lnwire.Message + err := s.db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + c := messageStore.Cursor() + k, v := c.Seek(peerPubKey[:]) + for ; bytes.HasPrefix(k, peerPubKey[:]); k, v = c.Next() { + // Deserialize the message from its raw bytes and filter + // out any which are not currently supported by the + // store. + msg, err := readMessage(v) + if err == ErrUnsupportedMessage { + continue + } + if err != nil { + return err + } + + msgs = append(msgs, msg) + } + + return nil + }) + if err != nil { + return nil, err + } + + return msgs, nil +} + +// Peers returns the public key of all peers with messages within the store. +func (s *MessageStore) Peers() (map[[33]byte]struct{}, error) { + peers := make(map[[33]byte]struct{}) + err := s.db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + return messageStore.ForEach(func(k, _ []byte) error { + var pubKey [33]byte + copy(pubKey[:], k[:33]) + peers[pubKey] = struct{}{} + return nil + }) + }) + if err != nil { + return nil, err + } + + return peers, nil +} diff --git a/discovery/message_store_test.go b/discovery/message_store_test.go new file mode 100644 index 00000000..a106ad22 --- /dev/null +++ b/discovery/message_store_test.go @@ -0,0 +1,351 @@ +package discovery + +import ( + "bytes" + "io/ioutil" + "math/rand" + "os" + "reflect" + "testing" + + "github.com/btcsuite/btcd/btcec" + "github.com/coreos/bbolt" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +func createTestMessageStore(t *testing.T) (*MessageStore, func()) { + t.Helper() + + tempDir, err := ioutil.TempDir("", "channeldb") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + db, err := channeldb.Open(tempDir) + if err != nil { + os.RemoveAll(tempDir) + t.Fatalf("unable to open db: %v", err) + } + + cleanUp := func() { + db.Close() + os.RemoveAll(tempDir) + } + + store, err := NewMessageStore(db) + if err != nil { + cleanUp() + t.Fatalf("unable to initialize message store: %v", err) + } + + return store, cleanUp +} + +func randPubKey(t *testing.T) *btcec.PublicKey { + priv, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("unable to create private key: %v", err) + } + + return priv.PubKey() +} + +func randCompressedPubKey(t *testing.T) [33]byte { + t.Helper() + + pubKey := randPubKey(t) + + var compressedPubKey [33]byte + copy(compressedPubKey[:], pubKey.SerializeCompressed()) + + return compressedPubKey +} + +func randAnnounceSignatures() *lnwire.AnnounceSignatures { + return &lnwire.AnnounceSignatures{ + ShortChannelID: lnwire.NewShortChanIDFromInt(rand.Uint64()), + } +} + +func randChannelUpdate() *lnwire.ChannelUpdate { + return &lnwire.ChannelUpdate{ + ShortChannelID: lnwire.NewShortChanIDFromInt(rand.Uint64()), + } +} + +// TestMessageStoreMessages ensures that messages can be properly queried from +// the store. +func TestMessageStoreMessages(t *testing.T) { + t.Parallel() + + // We'll start by creating our test message store. + msgStore, cleanUp := createTestMessageStore(t) + defer cleanUp() + + // We'll then create some test messages for two test peers, and none for + // an additional test peer. + channelUpdate1 := randChannelUpdate() + announceSignatures1 := randAnnounceSignatures() + peer1 := randCompressedPubKey(t) + if err := msgStore.AddMessage(channelUpdate1, peer1); err != nil { + t.Fatalf("unable to add message: %v", err) + } + if err := msgStore.AddMessage(announceSignatures1, peer1); err != nil { + t.Fatalf("unable to add message: %v", err) + } + expectedPeerMsgs1 := map[uint64]lnwire.MessageType{ + channelUpdate1.ShortChannelID.ToUint64(): channelUpdate1.MsgType(), + announceSignatures1.ShortChannelID.ToUint64(): announceSignatures1.MsgType(), + } + + channelUpdate2 := randChannelUpdate() + peer2 := randCompressedPubKey(t) + if err := msgStore.AddMessage(channelUpdate2, peer2); err != nil { + t.Fatalf("unable to add message: %v", err) + } + expectedPeerMsgs2 := map[uint64]lnwire.MessageType{ + channelUpdate2.ShortChannelID.ToUint64(): channelUpdate2.MsgType(), + } + + peer3 := randCompressedPubKey(t) + expectedPeerMsgs3 := map[uint64]lnwire.MessageType{} + + // assertPeerMsgs is a helper closure that we'll use to ensure we + // retrieve the correct set of messages for a given peer. + assertPeerMsgs := func(peerMsgs []lnwire.Message, + expected map[uint64]lnwire.MessageType) { + + t.Helper() + + if len(peerMsgs) != len(expected) { + t.Fatalf("expected %d pending messages, got %d", + len(expected), len(peerMsgs)) + } + for _, msg := range peerMsgs { + var shortChanID uint64 + switch msg := msg.(type) { + case *lnwire.AnnounceSignatures: + shortChanID = msg.ShortChannelID.ToUint64() + case *lnwire.ChannelUpdate: + shortChanID = msg.ShortChannelID.ToUint64() + default: + t.Fatalf("found unexpected message type %T", msg) + } + + msgType, ok := expected[shortChanID] + if !ok { + t.Fatalf("retrieved message with unexpected ID "+ + "%d from store", shortChanID) + } + if msgType != msg.MsgType() { + t.Fatalf("expected message of type %v, got %v", + msg.MsgType(), msgType) + } + } + } + + // Then, we'll query the store for the set of messages for each peer and + // ensure it matches what we expect. + peers := [][33]byte{peer1, peer2, peer3} + expectedPeerMsgs := []map[uint64]lnwire.MessageType{ + expectedPeerMsgs1, expectedPeerMsgs2, expectedPeerMsgs3, + } + for i, peer := range peers { + peerMsgs, err := msgStore.MessagesForPeer(peer) + if err != nil { + t.Fatalf("unable to retrieve messages: %v", err) + } + assertPeerMsgs(peerMsgs, expectedPeerMsgs[i]) + } + + // Finally, we'll query the store for all of its messages of every peer. + // Again, each peer should have a set of messages that match what we + // expect. + // + // We'll construct the expected response. Only the first two peers will + // have messages. + totalPeerMsgs := make(map[[33]byte]map[uint64]lnwire.MessageType, 2) + for i := 0; i < 2; i++ { + totalPeerMsgs[peers[i]] = expectedPeerMsgs[i] + } + + msgs, err := msgStore.Messages() + if err != nil { + t.Fatalf("unable to retrieve all peers with pending messages: "+ + "%v", err) + } + if len(msgs) != len(totalPeerMsgs) { + t.Fatalf("expected %d peers with messages, got %d", + len(totalPeerMsgs), len(msgs)) + } + for peer, peerMsgs := range msgs { + expected, ok := totalPeerMsgs[peer] + if !ok { + t.Fatalf("expected to find pending messages for peer %x", + peer) + } + + assertPeerMsgs(peerMsgs, expected) + } + + peerPubKeys, err := msgStore.Peers() + if err != nil { + t.Fatalf("unable to retrieve all peers with pending messages: "+ + "%v", err) + } + if len(peerPubKeys) != len(totalPeerMsgs) { + t.Fatalf("expected %d peers with messages, got %d", + len(totalPeerMsgs), len(peerPubKeys)) + } + for peerPubKey := range peerPubKeys { + if _, ok := totalPeerMsgs[peerPubKey]; !ok { + t.Fatalf("expected to find peer %x", peerPubKey) + } + } +} + +// TestMessageStoreUnsupportedMessage ensures that we are not able to add a +// message which is unsupported, and if a message is found to be unsupported by +// the current version of the store, that it is properly filtered out from the +// response. +func TestMessageStoreUnsupportedMessage(t *testing.T) { + t.Parallel() + + // We'll start by creating our test message store. + msgStore, cleanUp := createTestMessageStore(t) + defer cleanUp() + + // Create a message that is known to not be supported by the store. + peer := randCompressedPubKey(t) + unsupportedMsg := &lnwire.Error{} + + // Attempting to add it to the store should result in + // ErrUnsupportedMessage. + err := msgStore.AddMessage(unsupportedMsg, peer) + if err != ErrUnsupportedMessage { + t.Fatalf("expected ErrUnsupportedMessage, got %v", err) + } + + // We'll now pretend that the message is actually supported in a future + // version of the store, so it's able to be added successfully. To + // replicate this, we'll add the message manually rather than through + // the existing AddMessage method. + msgKey := peer[:] + var rawMsg bytes.Buffer + if _, err := lnwire.WriteMessage(&rawMsg, unsupportedMsg, 0); err != nil { + t.Fatalf("unable to serialize message: %v", err) + } + err = msgStore.db.Update(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + return messageStore.Put(msgKey, rawMsg.Bytes()) + }) + if err != nil { + t.Fatalf("unable to add unsupported message to store: %v", err) + } + + // Finally, we'll check that the store can properly filter out messages + // that are currently unknown to it. We'll make sure this is done for + // both Messages and MessagesForPeer. + totalMsgs, err := msgStore.Messages() + if err != nil { + t.Fatalf("unable to retrieve messages: %v", err) + } + if len(totalMsgs) != 0 { + t.Fatalf("expected to filter out unsupported message") + } + peerMsgs, err := msgStore.MessagesForPeer(peer) + if err != nil { + t.Fatalf("unable to retrieve peer messages: %v", err) + } + if len(peerMsgs) != 0 { + t.Fatalf("expected to filter out unsupported message") + } +} + +// TestMessageStoreDeleteMessage ensures that we can properly delete messages +// from the store. +func TestMessageStoreDeleteMessage(t *testing.T) { + t.Parallel() + + msgStore, cleanUp := createTestMessageStore(t) + defer cleanUp() + + // assertMsg is a helper closure we'll use to ensure a message + // does/doesn't exist within the store. + assertMsg := func(msg lnwire.Message, peer [33]byte, exists bool) { + t.Helper() + + storeMsgs, err := msgStore.MessagesForPeer(peer) + if err != nil { + t.Fatalf("unable to retrieve messages: %v", err) + } + + found := false + for _, storeMsg := range storeMsgs { + if reflect.DeepEqual(msg, storeMsg) { + found = true + } + } + + if found != exists { + str := "find" + if !exists { + str = "not find" + } + t.Fatalf("expected to %v message %v", str, + spew.Sdump(msg)) + } + } + + // An AnnounceSignatures message should exist within the store after + // adding it, and should no longer exists after deleting it. + peer := randCompressedPubKey(t) + annSig := randAnnounceSignatures() + if err := msgStore.AddMessage(annSig, peer); err != nil { + t.Fatalf("unable to add message: %v", err) + } + assertMsg(annSig, peer, true) + if err := msgStore.DeleteMessage(annSig, peer); err != nil { + t.Fatalf("unable to delete message: %v", err) + } + assertMsg(annSig, peer, false) + + // The store allows overwriting ChannelUpdates, since there can be + // multiple versions, so we'll test things slightly different. + // + // The ChannelUpdate message should exist within the store after adding + // it. + chanUpdate := randChannelUpdate() + if err := msgStore.AddMessage(chanUpdate, peer); err != nil { + t.Fatalf("unable to add message: %v", err) + } + assertMsg(chanUpdate, peer, true) + + // Now, we'll create a new version for the same ChannelUpdate message. + // Adding this one to the store will overwrite the previous one, so only + // the new one should exist. + newChanUpdate := randChannelUpdate() + newChanUpdate.ShortChannelID = chanUpdate.ShortChannelID + newChanUpdate.Timestamp = chanUpdate.Timestamp + 1 + if err := msgStore.AddMessage(newChanUpdate, peer); err != nil { + t.Fatalf("unable to add message: %v", err) + } + assertMsg(chanUpdate, peer, false) + assertMsg(newChanUpdate, peer, true) + + // Deleting the older message should act as a NOP and should NOT delete + // the newer version as the older no longer exists. + if err := msgStore.DeleteMessage(chanUpdate, peer); err != nil { + t.Fatalf("unable to delete message: %v", err) + } + assertMsg(chanUpdate, peer, false) + assertMsg(newChanUpdate, peer, true) + + // The newer version should no longer exist within the store after + // deleting it. + if err := msgStore.DeleteMessage(newChanUpdate, peer); err != nil { + t.Fatalf("unable to delete message: %v", err) + } + assertMsg(newChanUpdate, peer, false) +} diff --git a/discovery/mock_test.go b/discovery/mock_test.go new file mode 100644 index 00000000..85c6c4f3 --- /dev/null +++ b/discovery/mock_test.go @@ -0,0 +1,133 @@ +package discovery + +import ( + "net" + "sync" + + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// mockPeer implements the lnpeer.Peer interface and is used to test the +// gossiper's interaction with peers. +type mockPeer struct { + pk *btcec.PublicKey + sentMsgs chan lnwire.Message + quit chan struct{} +} + +var _ lnpeer.Peer = (*mockPeer)(nil) + +func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { + if p.sentMsgs == nil && p.quit == nil { + return nil + } + + for _, msg := range msgs { + select { + case p.sentMsgs <- msg: + case <-p.quit: + } + } + + return nil +} +func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error { + return nil +} +func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } +func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } +func (p *mockPeer) PubKey() [33]byte { + var pubkey [33]byte + copy(pubkey[:], p.pk.SerializeCompressed()) + return pubkey +} +func (p *mockPeer) Address() net.Addr { return nil } +func (p *mockPeer) QuitSignal() <-chan struct{} { + return p.quit +} + +// mockMessageStore is an in-memory implementation of the MessageStore interface +// used for the gossiper's unit tests. +type mockMessageStore struct { + sync.Mutex + messages map[[33]byte]map[lnwire.Message]struct{} +} + +func newMockMessageStore() *mockMessageStore { + return &mockMessageStore{ + messages: make(map[[33]byte]map[lnwire.Message]struct{}), + } +} + +var _ GossipMessageStore = (*mockMessageStore)(nil) + +func (s *mockMessageStore) AddMessage(msg lnwire.Message, pubKey [33]byte) error { + s.Lock() + defer s.Unlock() + + if _, ok := s.messages[pubKey]; !ok { + s.messages[pubKey] = make(map[lnwire.Message]struct{}) + } + + s.messages[pubKey][msg] = struct{}{} + + return nil +} + +func (s *mockMessageStore) DeleteMessage(msg lnwire.Message, pubKey [33]byte) error { + s.Lock() + defer s.Unlock() + + peerMsgs, ok := s.messages[pubKey] + if !ok { + return nil + } + + delete(peerMsgs, msg) + return nil +} + +func (s *mockMessageStore) Messages() (map[[33]byte][]lnwire.Message, error) { + s.Lock() + defer s.Unlock() + + msgs := make(map[[33]byte][]lnwire.Message, len(s.messages)) + for peer, peerMsgs := range s.messages { + for msg := range peerMsgs { + msgs[peer] = append(msgs[peer], msg) + } + } + return msgs, nil +} + +func (s *mockMessageStore) Peers() (map[[33]byte]struct{}, error) { + s.Lock() + defer s.Unlock() + + peers := make(map[[33]byte]struct{}, len(s.messages)) + for peer := range s.messages { + peers[peer] = struct{}{} + } + return peers, nil +} + +func (s *mockMessageStore) MessagesForPeer(pubKey [33]byte) ([]lnwire.Message, error) { + s.Lock() + defer s.Unlock() + + peerMsgs, ok := s.messages[pubKey] + if !ok { + return nil, nil + } + + msgs := make([]lnwire.Message, 0, len(peerMsgs)) + for msg := range peerMsgs { + msgs = append(msgs, msg) + } + + return msgs, nil +} diff --git a/discovery/reliable_sender.go b/discovery/reliable_sender.go new file mode 100644 index 00000000..c336b01d --- /dev/null +++ b/discovery/reliable_sender.go @@ -0,0 +1,316 @@ +package discovery + +import ( + "sync" + + "github.com/btcsuite/btcd/btcec" + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// reliableSenderCfg contains all of necessary items for the reliableSender to +// carry out its duties. +type reliableSenderCfg struct { + // NotifyWhenOnline is a function that allows the gossiper to be + // notified when a certain peer comes online, allowing it to + // retry sending a peer message. + // + // NOTE: The peerChan channel must be buffered. + // + // TODO(wilmer): use [33]byte to avoid unnecessary serializations. + NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) + + // NotifyWhenOffline is a function that allows the gossiper to be + // notified when a certain peer disconnects, allowing it to request a + // notification for when it reconnects. + NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} + + // MessageStore is a persistent storage of gossip messages which we will + // use to determine which messages need to be resent for a given peer. + MessageStore GossipMessageStore + + // IsMsgStale determines whether a message retrieved from the backing + // MessageStore is seen as stale by the current graph. + IsMsgStale func(lnwire.Message) bool +} + +// peerManager contains the set of channels required for the peerHandler to +// properly carry out its duties. +type peerManager struct { + // msgs is the channel through which messages will be streamed to the + // handler in order to send the message to the peer while they're + // online. + msgs chan lnwire.Message + + // done is a channel that will be closed to signal that the handler for + // the given peer has been torn down for whatever reason. + done chan struct{} +} + +// reliableSender is a small subsystem of the gossiper used to reliably send +// gossip messages to peers. +type reliableSender struct { + start sync.Once + stop sync.Once + + cfg reliableSenderCfg + + // activePeers keeps track of whether a peerHandler exists for a given + // peer. A peerHandler is tasked with handling requests for messages + // that should be reliably sent to peers while also taking into account + // the peer's connection lifecycle. + activePeers map[[33]byte]peerManager + activePeersMtx sync.Mutex + + wg sync.WaitGroup + quit chan struct{} +} + +// newReliableSender returns a new reliableSender backed by the given config. +func newReliableSender(cfg *reliableSenderCfg) *reliableSender { + return &reliableSender{ + cfg: *cfg, + activePeers: make(map[[33]byte]peerManager), + quit: make(chan struct{}), + } +} + +// Start spawns message handlers for any peers with pending messages. +func (s *reliableSender) Start() error { + var err error + s.start.Do(func() { + err = s.resendPendingMsgs() + }) + return err +} + +// Stop halts the reliable sender from sending messages to peers. +func (s *reliableSender) Stop() { + s.stop.Do(func() { + close(s.quit) + s.wg.Wait() + }) +} + +// sendMessage constructs a request to send a message reliably to a peer. In the +// event that the peer is currently offline, this will only write the message to +// disk. Once the peer reconnects, this message, along with any others pending, +// will be sent to the peer. +func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error { + // We'll start by persisting the message to disk. This allows us to + // resend the message upon restarts and peer reconnections. + if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil { + return err + } + + // Then, we'll spawn a peerHandler for this peer to handle resending its + // pending messages while taking into account its connection lifecycle. +spawnHandler: + msgHandler, ok := s.spawnPeerHandler(peerPubKey) + + // If the handler wasn't previously active, we can exit now as we know + // that the message will be sent once the peer online notification is + // received. This prevents us from potentially sending the message + // twice. + if !ok { + return nil + } + + // Otherwise, we'll attempt to stream the message to the handler. + // There's a subtle race condition where the handler can be torn down + // due to all of the messages sent being stale, so we'll handle this + // gracefully by spawning another one to prevent blocking. + select { + case msgHandler.msgs <- msg: + case <-msgHandler.done: + goto spawnHandler + case <-s.quit: + return ErrGossiperShuttingDown + } + + return nil +} + +// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't +// one already active. The boolean returned signals whether there was already +// one active or not. +func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) { + s.activePeersMtx.Lock() + defer s.activePeersMtx.Unlock() + + msgHandler, ok := s.activePeers[peerPubKey] + if !ok { + msgHandler = peerManager{ + msgs: make(chan lnwire.Message), + done: make(chan struct{}), + } + s.activePeers[peerPubKey] = msgHandler + + s.wg.Add(1) + go s.peerHandler(msgHandler, peerPubKey) + } + + return msgHandler, ok +} + +// peerHandler is responsible for handling our reliable message send requests +// for a given peer while also taking into account the peer's connection +// lifecycle. Any messages that are attempted to be sent while the peer is +// offline will be queued and sent once the peer reconnects. +// +// NOTE: This must be run as a goroutine. +func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) { + defer s.wg.Done() + + // We'll start by requesting a notification for when the peer + // reconnects. + pubKey, _ := btcec.ParsePubKey(peerPubKey[:], btcec.S256()) + peerChan := make(chan lnpeer.Peer, 1) + +waitUntilOnline: + log.Debugf("Requesting online notification for peer=%x", peerPubKey) + + s.cfg.NotifyWhenOnline(pubKey, peerChan) + + var peer lnpeer.Peer +out: + for { + select { + // While we're waiting, we'll also consume any messages that + // must be sent to prevent blocking the caller. These can be + // ignored for now since the peer is currently offline. Once + // they reconnect, the messages will be sent since they should + // have been persisted to disk. + case <-peerMgr.msgs: + case peer = <-peerChan: + break out + case <-s.quit: + return + } + } + + log.Debugf("Peer=%x is now online, proceeding to send pending messages", + peerPubKey) + + // Once we detect the peer has reconnected, we'll also request a + // notification for when they disconnect. We'll use this to make sure + // they haven't disconnected (in the case of a flappy peer, etc.) by the + // time we attempt to send them the pending messages. + log.Debugf("Requesting offline notification for peer=%x", peerPubKey) + + offlineChan := s.cfg.NotifyWhenOffline(peerPubKey) + + pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey) + if err != nil { + log.Errorf("Unable to retrieve pending messages for peer %x: %v", + peerPubKey, err) + return + } + + // With the peer online, we can now proceed to send our pending messages + // for them. + for _, msg := range pendingMsgs { + // Retrieve the short channel ID for which this message applies + // for logging purposes. The error can be ignored as the store + // can only contain messages which have a ShortChannelID field. + shortChanID, _ := msgShortChanID(msg) + + if err := peer.SendMessage(false, msg); err != nil { + log.Errorf("Unable to send %v message for channel=%v "+ + "to %x: %v", msg.MsgType(), shortChanID, + peerPubKey, err) + goto waitUntilOnline + } + + log.Debugf("Successfully sent %v message for channel=%v with "+ + "peer=%x upon reconnection", msg.MsgType(), shortChanID, + peerPubKey) + + // Now that the message has at least been sent once, we can + // check whether it's stale. This guarantees that + // AnnounceSignatures are sent at least once if we happen to + // already have signatures for both parties. + if s.cfg.IsMsgStale(msg) { + err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey) + if err != nil { + log.Errorf("Unable to remove stale %v message "+ + "for channel=%v with peer %x: %v", + msg.MsgType(), shortChanID, peerPubKey, + err) + continue + } + + log.Debugf("Removed stale %v message for channel=%v "+ + "with peer=%x", msg.MsgType(), shortChanID, + peerPubKey) + } + } + + // If all of our messages were stale, then there's no need for this + // handler to continue running, so we can exit now. + pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey) + if err != nil { + log.Errorf("Unable to retrieve pending messages for peer %x: %v", + peerPubKey, err) + return + } + + if len(pendingMsgs) == 0 { + log.Debugf("No pending messages left for peer=%x", peerPubKey) + + s.activePeersMtx.Lock() + delete(s.activePeers, peerPubKey) + s.activePeersMtx.Unlock() + + close(peerMgr.done) + + return + } + + // Once the pending messages are sent, we can continue to send any + // future messages while the peer remains connected. + for { + select { + case msg := <-peerMgr.msgs: + // Retrieve the short channel ID for which this message + // applies for logging purposes. The error can be + // ignored as the store can only contain messages which + // have a ShortChannelID field. + shortChanID, _ := msgShortChanID(msg) + + if err := peer.SendMessage(false, msg); err != nil { + log.Errorf("Unable to send %v message for "+ + "channel=%v to %x: %v", msg.MsgType(), + shortChanID, peerPubKey, err) + } + + log.Debugf("Successfully sent %v message for "+ + "channel=%v with peer=%x", msg.MsgType(), + shortChanID, peerPubKey) + + case <-offlineChan: + goto waitUntilOnline + + case <-s.quit: + return + } + } +} + +// resendPendingMsgs retrieves and sends all of the messages within the message +// store that should be reliably sent to their respective peers. +func (s *reliableSender) resendPendingMsgs() error { + // Fetch all of the peers for which we have pending messages for and + // spawn a peerMsgHandler for each. Once the peer is seen as online, all + // of the pending messages will be sent. + peers, err := s.cfg.MessageStore.Peers() + if err != nil { + return err + } + + for peer := range peers { + s.spawnPeerHandler(peer) + } + + return nil +} diff --git a/discovery/reliable_sender_test.go b/discovery/reliable_sender_test.go new file mode 100644 index 00000000..8633b7d4 --- /dev/null +++ b/discovery/reliable_sender_test.go @@ -0,0 +1,312 @@ +package discovery + +import ( + "fmt" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// newTestReliableSender creates a new reliable sender instance used for +// testing. +func newTestReliableSender(t *testing.T) *reliableSender { + t.Helper() + + cfg := &reliableSenderCfg{ + NotifyWhenOnline: func(pubKey *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + peerChan <- &mockPeer{pk: pubKey} + }, + NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { + c := make(chan struct{}, 1) + return c + }, + MessageStore: newMockMessageStore(), + IsMsgStale: func(lnwire.Message) bool { + return false + }, + } + + return newReliableSender(cfg) +} + +// assertMsgsSent ensures that the given messages can be read from a mock peer's +// msgChan. +func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message, + msgs ...lnwire.Message) { + + t.Helper() + + m := make(map[lnwire.Message]struct{}, len(msgs)) + for _, msg := range msgs { + m[msg] = struct{}{} + } + + for i := 0; i < len(msgs); i++ { + select { + case msg := <-msgChan: + if _, ok := m[msg]; !ok { + t.Fatalf("found unexpected message sent: %v", + spew.Sdump(msg)) + } + case <-time.After(time.Second): + t.Fatal("reliable sender did not send message to peer") + } + } +} + +// waitPredicate is a helper test function that will wait for a timeout period +// of time until the passed predicate returns true. +func waitPredicate(t *testing.T, timeout time.Duration, pred func() bool) { + t.Helper() + + const pollInterval = 20 * time.Millisecond + exitTimer := time.After(timeout) + + for { + <-time.After(pollInterval) + + select { + case <-exitTimer: + t.Fatalf("predicate not satisfied after timeout") + default: + } + + if pred() { + return + } + } +} + +// TestReliableSenderFlow ensures that the flow for sending messages reliably to +// a peer while taking into account its connection lifecycle works as expected. +func TestReliableSenderFlow(t *testing.T) { + t.Parallel() + + reliableSender := newTestReliableSender(t) + + // Create a mock peer to send the messages to. + pubKey := randPubKey(t) + msgsSent := make(chan lnwire.Message) + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + + // Override NotifyWhenOnline and NotifyWhenOffline to provide the + // notification channels so that we can control when notifications get + // dispatched. + notifyOnline := make(chan chan<- lnpeer.Peer, 2) + notifyOffline := make(chan chan struct{}, 1) + + reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + notifyOnline <- peerChan + } + reliableSender.cfg.NotifyWhenOffline = func(_ [33]byte) <-chan struct{} { + c := make(chan struct{}, 1) + notifyOffline <- c + return c + } + + // We'll start by creating our first message which we should reliably + // send to our peer. + msg1 := randChannelUpdate() + var peerPubKey [33]byte + copy(peerPubKey[:], pubKey.SerializeCompressed()) + if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // Since there isn't a peerHandler for this peer currently active due to + // this being the first message being sent reliably, we should expect to + // see a notification request for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // We'll then attempt to send another additional message reliably. + msg2 := randAnnounceSignatures() + if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // This should not however request another peer online notification as + // the peerHandler has already been started and is waiting for the + // notification to be dispatched. + select { + case <-notifyOnline: + t.Fatal("reliable sender should not request online notification") + case <-time.After(time.Second): + } + + // We'll go ahead and notify the peer. + peerChan <- peer + + // By doing so, we should expect to see a notification request for when + // the peer is offline. + var offlineChan chan struct{} + select { + case offlineChan = <-notifyOffline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request offline notification") + } + + // We should also see the messages arrive at the peer since they are now + // seen as online. + assertMsgsSent(t, peer.sentMsgs, msg1, msg2) + + // Then, we'll send one more message reliably. + msg3 := randChannelUpdate() + if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // Again, this should not request another peer online notification + // request since we are currently waiting for the peer to be offline. + select { + case <-notifyOnline: + t.Fatal("reliable sender should not request online notification") + case <-time.After(time.Second): + } + + // The expected message should be sent to the peer. + assertMsgsSent(t, peer.sentMsgs, msg3) + + // We'll then notify that the peer is offline. + close(offlineChan) + + // This should cause an online notification to be requested. + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // Once we dispatch it, we should expect to see the messages be resent + // to the peer as they are not stale. + peerChan <- peer + + select { + case <-notifyOffline: + case <-time.After(5 * time.Second): + t.Fatal("reliable sender did not request offline notification") + } + + assertMsgsSent(t, peer.sentMsgs, msg1, msg2, msg3) +} + +// TestReliableSenderStaleMessages ensures that the reliable sender is no longer +// active for a peer which has successfully sent all of its messages and deemed +// them as stale. +func TestReliableSenderStaleMessages(t *testing.T) { + t.Parallel() + + reliableSender := newTestReliableSender(t) + + // Create a mock peer to send the messages to. + pubKey := randPubKey(t) + msgsSent := make(chan lnwire.Message) + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + + // Override NotifyWhenOnline to provide the notification channel so that + // we can control when notifications get dispatched. + notifyOnline := make(chan chan<- lnpeer.Peer, 1) + reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + notifyOnline <- peerChan + } + + // We'll also override IsMsgStale to mark all messages as stale as we're + // interested in testing the stale message behavior. + reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { + return true + } + + // We'll start by creating our first message which we should reliably + // send to our peer, but will be seen as stale. + msg1 := randAnnounceSignatures() + var peerPubKey [33]byte + copy(peerPubKey[:], pubKey.SerializeCompressed()) + if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // Since there isn't a peerHandler for this peer currently active due to + // this being the first message being sent reliably, we should expect to + // see a notification request for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // We'll go ahead and notify the peer. + peerChan <- peer + + // This should cause the message to be sent to the peer since they are + // now seen as online. The message will be sent at least once to ensure + // they can propagate before deciding whether they are stale or not. + assertMsgsSent(t, peer.sentMsgs, msg1) + + // We'll create another message which we'll send reliably. This one + // won't be seen as stale. + msg2 := randChannelUpdate() + + // We'll then wait for the message to be removed from the backing + // message store since it is seen as stale and has been sent at least + // once. Once the message is removed, the peerHandler should be torn + // down as there are no longer any pending messages within the store. + var predErr error + waitPredicate(t, time.Second, func() bool { + msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer( + peerPubKey, + ) + if err != nil { + predErr = fmt.Errorf("unable to retrieve messages for "+ + "peer: %v", err) + return false + } + if len(msgs) != 0 { + predErr = fmt.Errorf("expected to not find any "+ + "messages for peer, found %d", len(msgs)) + return false + } + + predErr = nil + return true + }) + if predErr != nil { + t.Fatal(predErr) + } + + // Override IsMsgStale to no longer mark messages as stale. + reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { + return false + } + + // We'll request the message to be sent reliably. + if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // We should see an online notification request indicating that a new + // peerHandler has been spawned since it was previously torn down. + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // Finally, notifying the peer is online should prompt the message to be + // sent. Only the ChannelUpdate will be sent in this case since the + // AnnounceSignatures message above was seen as stale. + peerChan <- peer + + assertMsgsSent(t, peer.sentMsgs, msg2) +} diff --git a/server.go b/server.go index b32b1034..6b4cb7a4 100644 --- a/server.go +++ b/server.go @@ -599,28 +599,32 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, s.chanDB.ChannelGraph(), ) - s.authGossiper, err = discovery.New(discovery.Config{ - Router: s.chanRouter, - Notifier: s.cc.chainNotifier, - ChainHash: *activeNetParams.GenesisHash, - Broadcast: s.BroadcastMessage, - ChanSeries: chanSeries, - SendToPeer: s.SendToPeer, - FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { - return s.FindPeer(pub) - }, - NotifyWhenOnline: s.NotifyWhenOnline, - ProofMatureDelta: 0, - TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), - RetransmitDelay: time.Minute * 30, - DB: chanDB, - AnnSigner: s.nodeSigner, - }, - s.identityPriv.PubKey(), - ) + gossipMessageStore, err := discovery.NewMessageStore(s.chanDB) if err != nil { return nil, err } + waitingProofStore, err := channeldb.NewWaitingProofStore(s.chanDB) + if err != nil { + return nil, err + } + + s.authGossiper = discovery.New(discovery.Config{ + Router: s.chanRouter, + Notifier: s.cc.chainNotifier, + ChainHash: *activeNetParams.GenesisHash, + Broadcast: s.BroadcastMessage, + ChanSeries: chanSeries, + NotifyWhenOnline: s.NotifyWhenOnline, + NotifyWhenOffline: s.NotifyWhenOffline, + ProofMatureDelta: 0, + TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), + RetransmitDelay: time.Minute * 30, + WaitingProofStore: waitingProofStore, + MessageStore: gossipMessageStore, + AnnSigner: s.nodeSigner, + }, + s.identityPriv.PubKey(), + ) utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB) if err != nil {