diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 90c2cb0e..195773ed 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -95,11 +95,6 @@ type Config struct { Broadcast func(skips map[routing.Vertex]struct{}, msg ...lnwire.Message) error - // FindPeer returns the actively registered peer for a given remote - // public key. An error is returned if the peer was not found or a - // shutdown has been requested. - FindPeer func(identityKey *btcec.PublicKey) (lnpeer.Peer, error) - // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. @@ -1927,30 +1922,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // so we'll try sending the update directly to the remote peer. if !nMsg.isRemote && chanInfo.AuthProof == nil { // Get our peer's public key. - var remotePub *btcec.PublicKey + var remotePubKey [33]byte switch { case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: - remotePub, _ = chanInfo.NodeKey2() + remotePubKey = chanInfo.NodeKey2Bytes case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: - remotePub, _ = chanInfo.NodeKey1() + remotePubKey = chanInfo.NodeKey1Bytes } - sPeer, err := d.cfg.FindPeer(remotePub) + // Now, we'll attempt to send the channel update message + // reliably to the remote peer in the background, so + // that we don't block if the peer happens to be offline + // at the moment. + err := d.reliableSender.sendMessage(msg, remotePubKey) if err != nil { - log.Errorf("unable to send channel update -- "+ - "could not find peer %x: %v", - remotePub.SerializeCompressed(), - err) - } else { - // Send ChannelUpdate directly to remotePeer. - // TODO(halseth): make reliable send? - err = sPeer.SendMessage(false, msg) - if err != nil { - log.Errorf("unable to send channel "+ - "update message to peer %x: %v", - remotePub.SerializeCompressed(), - err) - } + err := fmt.Errorf("unable to reliably send %v "+ + "for channel=%v to peer=%x: %v", + msg.MsgType(), msg.ShortChannelID, + remotePubKey, err) + nMsg.err <- err + return nil } } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 043007bb..560fdbb1 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -644,9 +644,6 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil }, - FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, nil, nil}, nil - }, NotifyWhenOnline: func(target *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { peerChan <- &mockPeer{target, nil, nil} @@ -888,9 +885,6 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { @@ -970,9 +964,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -1090,9 +1082,6 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { @@ -1201,9 +1190,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -1315,6 +1302,27 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { sentToPeer := make(chan lnwire.Message, 1) remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } + + // Override NotifyWhenOffline to return the channel which will notify + // the gossiper that the peer is offline. We'll use this to signal that + // the peer is offline so that the gossiper requests a notification when + // it comes back online. + notifyOffline := make(chan chan struct{}, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( + _ [33]byte) <-chan struct{} { + + c := make(chan struct{}) + notifyOffline <- c + return c + } + // Recreate lightning network topology. Initialize router with channel // between two nodes. select { @@ -1348,6 +1356,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(1 * time.Second): + t.Fatal("gossiper did not send channel update to peer") + } select { case err = <-ctx.gossiper.ProcessLocalAnnouncement( @@ -1407,6 +1421,17 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { notifyPeers <- connectedChan } + // Before sending the local channel proof, we'll notify that the peer is + // offline, so that it's not sent to the peer. + var peerOffline chan struct{} + select { + case peerOffline = <-notifyOffline: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not request notification for when " + + "peer disconnects") + } + close(peerOffline) + // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { @@ -1428,12 +1453,21 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { "peer is online") } + // The proof should not be broadcast yet since we're still missing the + // remote party's. select { case <-ctx.broadcastedMessage: t.Fatal("announcements were broadcast") case <-time.After(2 * trickleDelay): } + // And it shouldn't be sent to the peer either as they are offline. + select { + case msg := <-sentToPeer: + t.Fatalf("received unexpected message: %v", spew.Sdump(msg)) + case <-time.After(time.Second): + } + number := 0 if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { @@ -1448,8 +1482,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatal("wrong number of objects in storage") } - // Shut down gossiper, and restart. This should trigger a new attempt - // to send the message to the peer. + // Restart the gossiper and restore its original NotifyWhenOnline and + // NotifyWhenOffline methods. This should trigger a new attempt to send + // the message to the peer. ctx.gossiper.Stop() gossiper := New(Config{ Notifier: ctx.gossiper.cfg.Notifier, @@ -1489,11 +1524,22 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // sent. peerChan <- remotePeer - select { - case msg := <-sentToPeer: - assertMessage(t, batch.localProofAnn, msg) - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not send message when peer came online") +out: + for { + select { + case msg := <-sentToPeer: + // Since the ChannelUpdate will also be resent as it is + // sent reliably, we'll need to filter it out. + if _, ok := msg.(*lnwire.AnnounceSignatures); !ok { + continue + } + + assertMessage(t, batch.localProofAnn, msg) + break out + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer " + + "came online") + } } // Now exchanging the remote channel proof, the channel announcement @@ -1604,6 +1650,12 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not send channel update to remove peer") + } select { case err = <-ctx.gossiper.ProcessLocalAnnouncement( @@ -2137,11 +2189,8 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { sentMsgs := make(chan lnwire.Message, 10) remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} - // Override NotifyWhenOnline and FindPeer to return the remote peer - // which we expect meesages to be sent to. - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return remotePeer, nil - } + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { @@ -2225,9 +2274,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -2646,6 +2693,190 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { } } +// TestSendChannelUpdateReliably ensures that the latest channel update for a +// channel is always sent upon the remote party reconnecting. +func TestSendChannelUpdateReliably(t *testing.T) { + t.Parallel() + + // We'll start by creating our test context and a batch of + // announcements. + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + // We'll also create two keys, one for ourselves and another for the + // remote party. + localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + + // Since we first wait to be notified of the peer before attempting to + // send the message, we'll overwrite NotifyWhenOnline and + // NotifyWhenOffline to instead give us access to the channel that will + // receive the notification. + notifyOnline := make(chan chan<- lnpeer.Peer, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + notifyOnline <- peerChan + } + notifyOffline := make(chan chan struct{}, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( + _ [33]byte) <-chan struct{} { + + c := make(chan struct{}, 1) + notifyOffline <- c + return c + } + + // assertReceivedChannelUpdate is a helper closure we'll use to + // determine if the correct channel update was received. + assertReceivedChannelUpdate := func(channelUpdate *lnwire.ChannelUpdate) { + t.Helper() + + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(2 * time.Second): + t.Fatal("did not send local channel update to peer") + } + } + + // Process the channel announcement for which we'll send a channel + // update for. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.localChanAnn, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel announcement") + } + if err != nil { + t.Fatalf("unable to process local channel announcement: %v", err) + } + + // It should not be broadcast due to not having an announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Now, we'll process the channel update. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.chanUpdAnn1, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel update") + } + if err != nil { + t.Fatalf("unable to process local channel update: %v", err) + } + + // It should also not be broadcast due to the announcement not having an + // announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // It should however send it to the peer directly. In order to do so, + // it'll request a notification for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "connection") + } + + // We can go ahead and notify the peer, which should trigger the message + // to be sent. + peerChan <- remotePeer + assertReceivedChannelUpdate(batch.chanUpdAnn1) + + // The gossiper should now request a notification for when the peer + // disconnects. We'll also trigger this now. + var offlineChan chan struct{} + select { + case offlineChan = <-notifyOffline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "disconnection") + } + + close(offlineChan) + + // Since it's offline, the gossiper should request another notification + // for when it comes back online. + select { + case peerChan = <-notifyOnline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "connection") + } + + // Now that the remote peer is offline, we'll send a new channel update. + prevTimestamp := batch.chanUpdAnn1.Timestamp + newChanUpdate, err := createUpdateAnnouncement( + 0, 0, nodeKeyPriv1, prevTimestamp+1, + ) + if err != nil { + t.Fatalf("unable to create new channel update: %v", err) + } + + // With the new update created, we'll go ahead and process it. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.chanUpdAnn1, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel update") + } + if err != nil { + t.Fatalf("unable to process local channel update: %v", err) + } + + // It should also not be broadcast due to the announcement not having an + // announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // The message should not be sent since the peer remains offline. + select { + case msg := <-sentToPeer: + t.Fatalf("received unexpected message: %v", spew.Sdump(msg)) + case <-time.After(time.Second): + } + + // Finally, we'll notify the peer is online and ensure the new channel + // update is received. + peerChan <- remotePeer + assertReceivedChannelUpdate(newChanUpdate) +} + func assertMessage(t *testing.T, expected, got lnwire.Message) { t.Helper() diff --git a/server.go b/server.go index 32cf6fe1..4f49f274 100644 --- a/server.go +++ b/server.go @@ -593,14 +593,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, } s.authGossiper = discovery.New(discovery.Config{ - Router: s.chanRouter, - Notifier: s.cc.chainNotifier, - ChainHash: *activeNetParams.GenesisHash, - Broadcast: s.BroadcastMessage, - ChanSeries: chanSeries, - FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { - return s.FindPeer(pub) - }, + Router: s.chanRouter, + Notifier: s.cc.chainNotifier, + ChainHash: *activeNetParams.GenesisHash, + Broadcast: s.BroadcastMessage, + ChanSeries: chanSeries, NotifyWhenOnline: s.NotifyWhenOnline, NotifyWhenOffline: s.NotifyWhenOffline, ProofMatureDelta: 0,