server+discovery: use reliableSender to replace existing resend logic

This commit is contained in:
Wilmer Paulino 2019-02-05 17:18:56 -08:00
parent 2f679f6015
commit 4996d49118
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 173 additions and 492 deletions

@ -95,10 +95,6 @@ type Config struct {
Broadcast func(skips map[routing.Vertex]struct{}, Broadcast func(skips map[routing.Vertex]struct{},
msg ...lnwire.Message) error msg ...lnwire.Message) error
// SendToPeer is a function which allows the service to send a set of
// messages to a particular peer identified by the target public key.
SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error
// FindPeer returns the actively registered peer for a given remote // FindPeer returns the actively registered peer for a given remote
// public key. An error is returned if the peer was not found or a // public key. An error is returned if the peer was not found or a
// shutdown has been requested. // shutdown has been requested.
@ -109,8 +105,15 @@ type Config struct {
// retry sending a peer message. // retry sending a peer message.
// //
// NOTE: The peerChan channel must be buffered. // NOTE: The peerChan channel must be buffered.
//
// TODO(wilmer): use [33]byte to avoid unnecessary serializations.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a
// notification for when it reconnects.
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
// ProofMatureDelta the number of confirmations which is needed before // ProofMatureDelta the number of confirmations which is needed before
// exchange the channel announcement proofs. // exchange the channel announcement proofs.
ProofMatureDelta uint32 ProofMatureDelta uint32
@ -222,13 +225,17 @@ type AuthenticatedGossiper struct {
syncerMtx sync.RWMutex syncerMtx sync.RWMutex
peerSyncers map[routing.Vertex]*gossipSyncer peerSyncers map[routing.Vertex]*gossipSyncer
// reliableSender is a subsystem responsible for handling reliable
// message send requests to peers.
reliableSender *reliableSender
sync.Mutex sync.Mutex
} }
// New creates a new AuthenticatedGossiper instance, initialized with the // New creates a new AuthenticatedGossiper instance, initialized with the
// passed configuration parameters. // passed configuration parameters.
func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
return &AuthenticatedGossiper{ gossiper := &AuthenticatedGossiper{
selfKey: selfKey, selfKey: selfKey,
cfg: &cfg, cfg: &cfg,
networkMsgs: make(chan *networkMsg), networkMsgs: make(chan *networkMsg),
@ -240,6 +247,15 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
recentRejects: make(map[uint64]struct{}), recentRejects: make(map[uint64]struct{}),
peerSyncers: make(map[routing.Vertex]*gossipSyncer), peerSyncers: make(map[routing.Vertex]*gossipSyncer),
} }
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
NotifyWhenOnline: cfg.NotifyWhenOnline,
NotifyWhenOffline: cfg.NotifyWhenOffline,
MessageStore: cfg.MessageStore,
IsMsgStale: gossiper.isMsgStale,
})
return gossiper
} }
// SynchronizeNode sends a message to the service indicating it should // SynchronizeNode sends a message to the service indicating it should
@ -398,11 +414,10 @@ func (d *AuthenticatedGossiper) Start() error {
} }
d.bestHeight = height d.bestHeight = height
// In case we had an AnnounceSignatures ready to be sent when the // Start the reliable sender. In case we had any pending messages ready
// gossiper was last shut down, we must continue on our quest to // to be sent when the gossiper was last shut down, we must continue on
// deliver this message to our peer such that they can craft the // our quest to deliver them to their respective peers.
// full channel proof. if err := d.reliableSender.Start(); err != nil {
if err := d.resendAnnounceSignatures(); err != nil {
return err return err
} }
@ -430,6 +445,10 @@ func (d *AuthenticatedGossiper) Stop() {
close(d.quit) close(d.quit)
d.wg.Wait() d.wg.Wait()
// We'll stop our reliable sender after all of the gossiper's goroutines
// have exited to ensure nothing can cause it to continue executing.
d.reliableSender.Stop()
} }
// TODO(roasbeef): need method to get current gossip timestamp? // TODO(roasbeef): need method to get current gossip timestamp?
@ -795,81 +814,6 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
return msgs return msgs
} }
// resendAnnounceSignatures will inspect the messageStore database bucket for
// AnnounceSignatures messages that we recently tried to send to a peer. If the
// associated channels still not have the full channel proofs assembled, we
// will try to resend them. If we have the full proof, we can safely delete the
// message from the messageStore.
func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
peerMsgsToResend, err := d.cfg.MessageStore.Messages()
if err != nil {
return err
}
// We now iterate over these messages, resending those that we don't
// have the full proof for, deleting the rest.
for peer, msgsToResend := range peerMsgsToResend {
pubKey, err := btcec.ParsePubKey(peer[:], btcec.S256())
if err != nil {
return err
}
for _, msg := range msgsToResend {
msg := msg.(*lnwire.AnnounceSignatures)
// Check if the full channel proof exists in our graph.
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(
msg.ShortChannelID)
if err != nil {
// If the channel cannot be found, it is most likely a
// leftover message for a channel that was closed. In
// this case we delete it from the message store.
log.Warnf("unable to fetch channel info for "+
"chanID=%v from graph: %v. Will delete local"+
"proof from database",
msg.ChannelID, err)
err = d.cfg.MessageStore.DeleteMessage(msg, peer)
if err != nil {
return err
}
continue
}
// 1. If the full proof does not exist in the graph, it means
// that we haven't received the remote proof yet (or that we
// crashed before able to assemble the full proof). Since the
// remote node might think they have delivered their proof to
// us, we will resend _our_ proof to trigger a resend on their
// part: they will then be able to assemble and send us the
// full proof.
if chanInfo.AuthProof == nil {
err := d.sendAnnSigReliably(msg, pubKey)
if err != nil {
return err
}
continue
}
// 2. If the proof does exist in the graph, we have
// successfully received the remote proof and assembled the
// full proof. In this case we can safely delete the local
// proof from the database. In case the remote hasn't been able
// to assemble the full proof yet (maybe because of a crash),
// we will send them the full proof if we notice that they
// retry sending their half proof.
if chanInfo.AuthProof != nil {
log.Debugf("Deleting message for chanID=%v from "+
"messageStore", msg.ChannelID)
err := d.cfg.MessageStore.DeleteMessage(msg, peer)
if err != nil {
return err
}
}
}
}
return nil
}
// findGossipSyncer is a utility method used by the gossiper to locate the // findGossipSyncer is a utility method used by the gossiper to locate the
// gossip syncer for an inbound message so we can properly dispatch the // gossip syncer for an inbound message so we can properly dispatch the
// incoming message. If a gossip syncer isn't found, then one will be created // incoming message. If a gossip syncer isn't found, then one will be created
@ -2113,21 +2057,21 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// so they can also reconstruct the full channel // so they can also reconstruct the full channel
// announcement. // announcement.
if !nMsg.isRemote { if !nMsg.isRemote {
var remotePeer *btcec.PublicKey var remotePubKey [33]byte
if isFirstNode { if isFirstNode {
remotePeer, _ = chanInfo.NodeKey2() remotePubKey = chanInfo.NodeKey2Bytes
} else { } else {
remotePeer, _ = chanInfo.NodeKey1() remotePubKey = chanInfo.NodeKey1Bytes
} }
// Since the remote peer might not be online // Since the remote peer might not be online
// we'll call a method that will attempt to // we'll call a method that will attempt to
// deliver the proof when it comes online. // deliver the proof when it comes online.
err := d.sendAnnSigReliably(msg, remotePeer) err := d.reliableSender.sendMessage(msg, remotePubKey)
if err != nil { if err != nil {
err := fmt.Errorf("unable to send reliably "+ err := fmt.Errorf("unable to reliably send %v "+
"to remote for short_chan_id=%v: %v", "for channel=%v to peer=%x: %v",
shortChanID, err) msg.MsgType(), msg.ShortChannelID,
log.Error(err) remotePubKey, err)
nMsg.err <- err nMsg.err <- err
return nil return nil
} }
@ -2359,70 +2303,49 @@ func (d *AuthenticatedGossiper) fetchNodeAnn(
return node.NodeAnnouncement(true) return node.NodeAnnouncement(true)
} }
// sendAnnSigReliably will try to send the provided local AnnounceSignatures // isMsgStale determines whether a message retrieved from the backing
// to the remote peer, waiting for it to come online if necessary. This // MessageStore is seen as stale by the current graph.
// method returns after adding the message to persistent storage, such func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
// that the caller knows that the message will be delivered at one point. switch msg := msg.(type) {
func (d *AuthenticatedGossiper) sendAnnSigReliably( case *lnwire.AnnounceSignatures:
msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { chanInfo, _, _, err := d.cfg.Router.GetChannelByID(
msg.ShortChannelID,
)
// We first add this message to the database, such that in case // If the channel cannot be found, it is most likely a leftover
// we do not succeed in sending it to the peer, we'll fetch it // message for a channel that was closed, so we can consider it
// from the DB next time we start, and retry. // stale.
var remotePubKey [33]byte if err == channeldb.ErrEdgeNotFound {
copy(remotePubKey[:], remotePeer.SerializeCompressed()) return true
if err := d.cfg.MessageStore.AddMessage(msg, remotePubKey); err != nil { }
return err if err != nil {
} log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", err)
// We have succeeded adding the message to the database. We now launch return false
// a goroutine that will keep on trying sending the message to the
// remote peer until it succeeds, or the gossiper shuts down. In case
// of success, the message will be removed from the database.
d.wg.Add(1)
go func() {
defer d.wg.Done()
for {
log.Debugf("Sending AnnounceSignatures for channel "+
"%v to remote peer %x", msg.ChannelID,
remotePeer.SerializeCompressed())
err := d.cfg.SendToPeer(remotePeer, msg)
if err == nil {
// Sending succeeded, we can
// continue the flow.
break
}
log.Errorf("unable to send AnnounceSignatures message "+
"to peer(%x): %v. Will retry when online.",
remotePeer.SerializeCompressed(), err)
peerChan := make(chan lnpeer.Peer, 1)
d.cfg.NotifyWhenOnline(remotePeer, peerChan)
select {
case <-peerChan:
// Retry sending.
log.Infof("Peer %x reconnected. Retry sending"+
" AnnounceSignatures.",
remotePeer.SerializeCompressed())
case <-d.quit:
log.Infof("Gossiper shutting down, did not " +
"send AnnounceSignatures.")
return
}
} }
log.Infof("Sent channel announcement proof to remote peer: %x", // If the proof exists in the graph, then we have successfully
remotePeer.SerializeCompressed()) // received the remote proof and assembled the full proof, so we
}() // can safely delete the local proof from the database.
return chanInfo.AuthProof != nil
// This method returns after the message has been added to the database, case *lnwire.ChannelUpdate:
// such that the caller don't have to wait until the message is actually // The MessageStore will always store the latest ChannelUpdate
// delivered, but can be assured that it will be delivered eventually // as it is not aware of its timestamp (by design), so it will
// when this method returns. // never be stale. We should still however check if the channel
return nil // is part of our graph. If it's not, we can mark it as stale.
_, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil && err != channeldb.ErrEdgeNotFound {
log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", err)
}
return err == channeldb.ErrEdgeNotFound
default:
// We'll make sure to not mark any unsupported messages as stale
// to ensure they are not removed.
return false
}
} }
// updateChannel creates a new fully signed update for the channel, and updates // updateChannel creates a new fully signed update for the channel, and updates

@ -644,12 +644,17 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
return nil return nil
}, },
SendToPeer: func(target *btcec.PublicKey, msg ...lnwire.Message) error {
return nil
},
FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) {
return &mockPeer{target, nil, nil}, nil return &mockPeer{target, nil, nil}, nil
}, },
NotifyWhenOnline: func(target *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
peerChan <- &mockPeer{target, nil, nil}
},
NotifyWhenOffline: func(_ [33]byte) <-chan struct{} {
c := make(chan struct{})
return c
},
Router: router, Router: router,
TrickleDelay: trickleDelay, TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay, RetransmitDelay: retransmitDelay,
@ -880,21 +885,19 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
} }
defer cleanup() defer cleanup()
// Set up a channel that we can use to inspect the messages // Set up a channel that we can use to inspect the messages sent
// sent directly fromn the gossiper. // directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10) sentMsgs := make(chan lnwire.Message, 10)
ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) {
return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil
} }
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey,
msg ...lnwire.Message) error { peerChan chan<- lnpeer.Peer) {
select { select {
case sentMsgs <- msg[0]: case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}:
case <-ctx.gossiper.quit: case <-ctx.gossiper.quit:
return fmt.Errorf("shutting down")
} }
return nil
} }
batch, err := createAnnouncements(0) batch, err := createAnnouncements(0)
@ -1084,19 +1087,19 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
} }
defer cleanup() defer cleanup()
// Set up a channel that we can use to inspect the messages // Set up a channel that we can use to inspect the messages sent
// sent directly from the gossiper. // directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10) sentMsgs := make(chan lnwire.Message, 10)
ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) {
return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil
} }
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
select { select {
case sentMsgs <- msg[0]: case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}:
case <-ctx.gossiper.quit: case <-ctx.gossiper.quit:
return fmt.Errorf("shutting down")
} }
return nil
} }
batch, err := createAnnouncements(0) batch, err := createAnnouncements(0)
@ -1251,9 +1254,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
// The local proof should be sent to the remote peer. // The local proof should be sent to the remote peer.
select { select {
case msg := <-sentMsgs: case msg := <-sentMsgs:
if msg != batch.localProofAnn { assertMessage(t, batch.localProofAnn, msg)
t.Fatalf("expected local proof to be sent, got %v", msg)
}
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatalf("local proof was not sent to peer") t.Fatalf("local proof was not sent to peer")
} }
@ -1283,234 +1284,10 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
} }
} }
// Test that sending AnnounceSignatures to remote peer will continue // TestSignatureAnnouncementRetryAtStartup tests that if we restart the
// to be tried until the peer comes online. // gossiper, it will retry sending the AnnounceSignatures to the peer if it did
func TestSignatureAnnouncementRetry(t *testing.T) { // not succeed before shutting down, and the full channel proof is not yet
t.Parallel() // assembled.
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remotePeer := &mockPeer{remoteKey, nil, nil}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localChanAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process channel ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Make the SendToPeer fail, simulating the peer being offline.
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
msg ...lnwire.Message) error {
return fmt.Errorf("intentional error in SendToPeer")
}
// We expect the gossiper to register for a notification when the peer
// comes back online, so keep track of the channel it wants to get
// notified on.
notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process local proof: %v", err)
}
// Since sending this local announcement proof to the remote will fail,
// the gossiper should register for a notification when the remote is
// online again.
var conChan chan<- lnpeer.Peer
select {
case conChan = <-notifyPeers:
case <-time.After(2 * time.Second):
t.Fatalf("gossiper did not ask to get notified when " +
"peer is online")
}
// Since both proofs are not yet exchanged, no message should be
// broadcasted yet.
select {
case <-ctx.broadcastedMessage:
t.Fatal("announcements were broadcast")
case <-time.After(2 * trickleDelay):
}
number := 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
// When the peer comes online, the gossiper gets notified, and should
// retry sending the AnnounceSignatures. We make the SendToPeer
// method work again.
sentToPeer := make(chan lnwire.Message, 1)
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
msg ...lnwire.Message) error {
sentToPeer <- msg[0]
return nil
}
// Notify that peer is now online. This should trigger a new call
// to SendToPeer.
close(conChan)
select {
case <-sentToPeer:
case <-time.After(2 * time.Second):
t.Fatalf("gossiper did not send message when peer came online")
}
// Now give the gossiper the remote proof. This should trigger a
// broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate).
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process remote proof: %v", err)
}
for i := 0; i < 5; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
}
number = 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil && err != channeldb.ErrWaitingProofNotFound {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("waiting proof should be removed from storage")
}
}
// Test that if we restart the gossiper, it will retry sending the
// AnnounceSignatures to the peer if it did not succeed before
// shutting down, and the full channel proof is not yet assembled.
func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
t.Parallel() t.Parallel()
@ -1533,7 +1310,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unable to parse pubkey: %v", err) t.Fatalf("unable to parse pubkey: %v", err)
} }
remotePeer := &mockPeer{remoteKey, nil, nil}
// Set up a channel to intercept the messages sent to the remote peer.
sentToPeer := make(chan lnwire.Message, 1)
remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit}
// Recreate lightning network topology. Initialize router with channel // Recreate lightning network topology. Initialize router with channel
// between two nodes. // between two nodes.
@ -1617,13 +1397,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
// Make the SendToPeerFail, simulating the peer being offline. // Since the reliable send to the remote peer of the local channel proof
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, // requires a notification when the peer comes online, we'll capture the
msg ...lnwire.Message) error { // channel through which it gets sent to control exactly when to
return fmt.Errorf("intentional error in SendToPeer") // dispatch it.
}
notifyPeers := make(chan chan<- lnpeer.Peer, 1) notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
connectedChan chan<- lnpeer.Peer) { connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan notifyPeers <- connectedChan
} }
@ -1640,11 +1419,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process :%v", err)
} }
// Since sending to the remote peer will fail, the gossiper should // The gossiper should register for a notification for when the peer is
// register for a notification when it comes back online. // online.
var conChan chan<- lnpeer.Peer
select { select {
case conChan = <-notifyPeers: case <-notifyPeers:
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatalf("gossiper did not ask to get notified when " + t.Fatalf("gossiper did not ask to get notified when " +
"peer is online") "peer is online")
@ -1674,16 +1452,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
// to send the message to the peer. // to send the message to the peer.
ctx.gossiper.Stop() ctx.gossiper.Stop()
gossiper := New(Config{ gossiper := New(Config{
Notifier: ctx.gossiper.cfg.Notifier, Notifier: ctx.gossiper.cfg.Notifier,
Broadcast: ctx.gossiper.cfg.Broadcast, Broadcast: ctx.gossiper.cfg.Broadcast,
SendToPeer: func(target *btcec.PublicKey, NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline,
msg ...lnwire.Message) error { NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
return fmt.Errorf("intentional error in SendToPeer")
},
NotifyWhenOnline: func(peer *btcec.PublicKey,
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
},
Router: ctx.gossiper.cfg.Router, Router: ctx.gossiper.cfg.Router,
TrickleDelay: trickleDelay, TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay, RetransmitDelay: retransmitDelay,
@ -1700,36 +1472,26 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
defer gossiper.Stop() defer gossiper.Stop()
ctx.gossiper = gossiper ctx.gossiper = gossiper
remotePeer.quit = ctx.gossiper.quit
// After starting up, the gossiper will see that it has a waitingproof // After starting up, the gossiper will see that it has a proof in the
// in the database, and will retry sending its part to the remote. Since // WaitingProofStore, and will retry sending its part to the remote.
// SendToPeer will fail again, it should register for a notification // It should register for a notification for when the peer is online.
// when the peer comes online. var peerChan chan<- lnpeer.Peer
select { select {
case conChan = <-notifyPeers: case peerChan = <-notifyPeers:
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatalf("gossiper did not ask to get notified when " + t.Fatalf("gossiper did not ask to get notified when " +
"peer is online") "peer is online")
} }
// Fix the SendToPeer method. // Notify that peer is now online. This should allow the proof to be
sentToPeer := make(chan lnwire.Message, 1) // sent.
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, peerChan <- remotePeer
msg ...lnwire.Message) error {
select {
case sentToPeer <- msg[0]:
case <-ctx.gossiper.quit:
return fmt.Errorf("shutting down")
}
return nil
}
// Notify that peer is now online. This should trigger a new call
// to SendToPeer.
close(conChan)
select { select {
case <-sentToPeer: case msg := <-sentToPeer:
assertMessage(t, batch.localProofAnn, msg)
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatalf("gossiper did not send message when peer came online") t.Fatalf("gossiper did not send message when peer came online")
} }
@ -1770,10 +1532,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
} }
} }
// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a // TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a remote
// remote proof is received when we already have the full proof, // proof is received when we already have the full proof, the gossiper will send
// the gossiper will send the full proof (ChannelAnnouncement) to // the full proof (ChannelAnnouncement) to the remote peer.
// the remote peer.
func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
t.Parallel() t.Parallel()
@ -1796,7 +1557,19 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unable to parse pubkey: %v", err) t.Fatalf("unable to parse pubkey: %v", err)
} }
remotePeer := &mockPeer{remoteKey, nil, nil}
// Set up a channel we can use to inspect messages sent by the
// gossiper to the remote peer.
sentToPeer := make(chan lnwire.Message, 1)
remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit}
// Override NotifyWhenOnline to return the remote peer which we expect
// meesages to be sent to.
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
peerChan <- remotePeer
}
// Recreate lightning network topology. Initialize router with channel // Recreate lightning network topology. Initialize router with channel
// between two nodes. // between two nodes.
@ -1880,27 +1653,6 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
// Set up a channel we can use to inspect messages sent by the
// gossiper to the remote peer.
sentToPeer := make(chan lnwire.Message, 1)
remotePeer.sentMsgs = sentToPeer
remotePeer.quit = ctx.gossiper.quit
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
msg ...lnwire.Message) error {
select {
case <-ctx.gossiper.quit:
return fmt.Errorf("gossiper shutting down")
case sentToPeer <- msg[0]:
}
return nil
}
notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
}
// Pretending that we receive local channel announcement from funding // Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process. // manager, thereby kick off the announcement exchange process.
select { select {
@ -1928,9 +1680,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
// We expect the gossiper to send this message to the remote peer. // We expect the gossiper to send this message to the remote peer.
select { select {
case msg := <-sentToPeer: case msg := <-sentToPeer:
if msg != batch.localProofAnn { assertMessage(t, batch.localProofAnn, msg)
t.Fatalf("wrong message sent to peer: %v", msg)
}
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not send local proof to peer") t.Fatal("did not send local proof to peer")
} }
@ -2356,10 +2106,9 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) {
} }
} }
// TestReceiveRemoteChannelUpdateFirst tests that if we receive a // TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate
// ChannelUpdate from the remote before we have processed our // from the remote before we have processed our own ChannelAnnouncement, it will
// own ChannelAnnouncement, it will be reprocessed later, after // be reprocessed later, after our ChannelAnnouncement.
// our ChannelAnnouncement.
func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
t.Parallel() t.Parallel()
@ -2369,21 +2118,6 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
} }
defer cleanup() defer cleanup()
// Set up a channel that we can use to inspect the messages
// sent directly fromn the gossiper.
sentMsgs := make(chan lnwire.Message, 10)
ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) {
return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil
}
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error {
select {
case sentMsgs <- msg[0]:
case <-ctx.gossiper.quit:
return fmt.Errorf("shutting down")
}
return nil
}
batch, err := createAnnouncements(0) batch, err := createAnnouncements(0)
if err != nil { if err != nil {
t.Fatalf("can't generate announcements: %v", err) t.Fatalf("can't generate announcements: %v", err)
@ -2397,7 +2131,22 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unable to parse pubkey: %v", err) t.Fatalf("unable to parse pubkey: %v", err)
} }
remotePeer := &mockPeer{remoteKey, nil, nil}
// Set up a channel that we can use to inspect the messages sent
// directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10)
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
// Override NotifyWhenOnline and FindPeer to return the remote peer
// which we expect meesages to be sent to.
ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) {
return remotePeer, nil
}
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
peerChan <- remotePeer
}
// Recreate the case where the remote node is sending us its ChannelUpdate // Recreate the case where the remote node is sending us its ChannelUpdate
// before we have been able to process our own ChannelAnnouncement and // before we have been able to process our own ChannelAnnouncement and
@ -2896,3 +2645,12 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) {
t.Fatalf("unable to process announcement: %v", err) t.Fatalf("unable to process announcement: %v", err)
} }
} }
func assertMessage(t *testing.T, expected, got lnwire.Message) {
t.Helper()
if !reflect.DeepEqual(expected, got) {
t.Fatalf("expected: %v\ngot: %v", spew.Sdump(expected),
spew.Sdump(got))
}
}

@ -598,11 +598,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
ChainHash: *activeNetParams.GenesisHash, ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage, Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries, ChanSeries: chanSeries,
SendToPeer: s.SendToPeer,
FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) {
return s.FindPeer(pub) return s.FindPeer(pub)
}, },
NotifyWhenOnline: s.NotifyWhenOnline, NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline,
ProofMatureDelta: 0, ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30, RetransmitDelay: time.Minute * 30,