From 12168f022e61d73d82c3ca02ffb6fa5952622cf5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:19:04 -0800 Subject: [PATCH] server+discovery: send channel updates to remote peers reliably In this commit, we also allow channel updates for our channels to be sent reliably to our channel counterparty. This is especially crucial for private channels, since they're not announced, in order to ensure each party can receive funds from the other side. --- discovery/gossiper.go | 37 ++--- discovery/gossiper_test.go | 291 +++++++++++++++++++++++++++++++++---- server.go | 13 +- 3 files changed, 280 insertions(+), 61 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 90c2cb0e..195773ed 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -95,11 +95,6 @@ type Config struct { Broadcast func(skips map[routing.Vertex]struct{}, msg ...lnwire.Message) error - // FindPeer returns the actively registered peer for a given remote - // public key. An error is returned if the peer was not found or a - // shutdown has been requested. - FindPeer func(identityKey *btcec.PublicKey) (lnpeer.Peer, error) - // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. @@ -1927,30 +1922,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // so we'll try sending the update directly to the remote peer. if !nMsg.isRemote && chanInfo.AuthProof == nil { // Get our peer's public key. - var remotePub *btcec.PublicKey + var remotePubKey [33]byte switch { case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: - remotePub, _ = chanInfo.NodeKey2() + remotePubKey = chanInfo.NodeKey2Bytes case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: - remotePub, _ = chanInfo.NodeKey1() + remotePubKey = chanInfo.NodeKey1Bytes } - sPeer, err := d.cfg.FindPeer(remotePub) + // Now, we'll attempt to send the channel update message + // reliably to the remote peer in the background, so + // that we don't block if the peer happens to be offline + // at the moment. + err := d.reliableSender.sendMessage(msg, remotePubKey) if err != nil { - log.Errorf("unable to send channel update -- "+ - "could not find peer %x: %v", - remotePub.SerializeCompressed(), - err) - } else { - // Send ChannelUpdate directly to remotePeer. - // TODO(halseth): make reliable send? - err = sPeer.SendMessage(false, msg) - if err != nil { - log.Errorf("unable to send channel "+ - "update message to peer %x: %v", - remotePub.SerializeCompressed(), - err) - } + err := fmt.Errorf("unable to reliably send %v "+ + "for channel=%v to peer=%x: %v", + msg.MsgType(), msg.ShortChannelID, + remotePubKey, err) + nMsg.err <- err + return nil } } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 043007bb..560fdbb1 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -644,9 +644,6 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil }, - FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, nil, nil}, nil - }, NotifyWhenOnline: func(target *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { peerChan <- &mockPeer{target, nil, nil} @@ -888,9 +885,6 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { @@ -970,9 +964,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -1090,9 +1082,6 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { @@ -1201,9 +1190,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -1315,6 +1302,27 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { sentToPeer := make(chan lnwire.Message, 1) remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } + + // Override NotifyWhenOffline to return the channel which will notify + // the gossiper that the peer is offline. We'll use this to signal that + // the peer is offline so that the gossiper requests a notification when + // it comes back online. + notifyOffline := make(chan chan struct{}, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( + _ [33]byte) <-chan struct{} { + + c := make(chan struct{}) + notifyOffline <- c + return c + } + // Recreate lightning network topology. Initialize router with channel // between two nodes. select { @@ -1348,6 +1356,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(1 * time.Second): + t.Fatal("gossiper did not send channel update to peer") + } select { case err = <-ctx.gossiper.ProcessLocalAnnouncement( @@ -1407,6 +1421,17 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { notifyPeers <- connectedChan } + // Before sending the local channel proof, we'll notify that the peer is + // offline, so that it's not sent to the peer. + var peerOffline chan struct{} + select { + case peerOffline = <-notifyOffline: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not request notification for when " + + "peer disconnects") + } + close(peerOffline) + // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { @@ -1428,12 +1453,21 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { "peer is online") } + // The proof should not be broadcast yet since we're still missing the + // remote party's. select { case <-ctx.broadcastedMessage: t.Fatal("announcements were broadcast") case <-time.After(2 * trickleDelay): } + // And it shouldn't be sent to the peer either as they are offline. + select { + case msg := <-sentToPeer: + t.Fatalf("received unexpected message: %v", spew.Sdump(msg)) + case <-time.After(time.Second): + } + number := 0 if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { @@ -1448,8 +1482,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatal("wrong number of objects in storage") } - // Shut down gossiper, and restart. This should trigger a new attempt - // to send the message to the peer. + // Restart the gossiper and restore its original NotifyWhenOnline and + // NotifyWhenOffline methods. This should trigger a new attempt to send + // the message to the peer. ctx.gossiper.Stop() gossiper := New(Config{ Notifier: ctx.gossiper.cfg.Notifier, @@ -1489,11 +1524,22 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // sent. peerChan <- remotePeer - select { - case msg := <-sentToPeer: - assertMessage(t, batch.localProofAnn, msg) - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not send message when peer came online") +out: + for { + select { + case msg := <-sentToPeer: + // Since the ChannelUpdate will also be resent as it is + // sent reliably, we'll need to filter it out. + if _, ok := msg.(*lnwire.AnnounceSignatures); !ok { + continue + } + + assertMessage(t, batch.localProofAnn, msg) + break out + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer " + + "came online") + } } // Now exchanging the remote channel proof, the channel announcement @@ -1604,6 +1650,12 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not send channel update to remove peer") + } select { case err = <-ctx.gossiper.ProcessLocalAnnouncement( @@ -2137,11 +2189,8 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { sentMsgs := make(chan lnwire.Message, 10) remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} - // Override NotifyWhenOnline and FindPeer to return the remote peer - // which we expect meesages to be sent to. - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return remotePeer, nil - } + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { @@ -2225,9 +2274,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -2646,6 +2693,190 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { } } +// TestSendChannelUpdateReliably ensures that the latest channel update for a +// channel is always sent upon the remote party reconnecting. +func TestSendChannelUpdateReliably(t *testing.T) { + t.Parallel() + + // We'll start by creating our test context and a batch of + // announcements. + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + // We'll also create two keys, one for ourselves and another for the + // remote party. + localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + + // Since we first wait to be notified of the peer before attempting to + // send the message, we'll overwrite NotifyWhenOnline and + // NotifyWhenOffline to instead give us access to the channel that will + // receive the notification. + notifyOnline := make(chan chan<- lnpeer.Peer, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + notifyOnline <- peerChan + } + notifyOffline := make(chan chan struct{}, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( + _ [33]byte) <-chan struct{} { + + c := make(chan struct{}, 1) + notifyOffline <- c + return c + } + + // assertReceivedChannelUpdate is a helper closure we'll use to + // determine if the correct channel update was received. + assertReceivedChannelUpdate := func(channelUpdate *lnwire.ChannelUpdate) { + t.Helper() + + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(2 * time.Second): + t.Fatal("did not send local channel update to peer") + } + } + + // Process the channel announcement for which we'll send a channel + // update for. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.localChanAnn, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel announcement") + } + if err != nil { + t.Fatalf("unable to process local channel announcement: %v", err) + } + + // It should not be broadcast due to not having an announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Now, we'll process the channel update. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.chanUpdAnn1, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel update") + } + if err != nil { + t.Fatalf("unable to process local channel update: %v", err) + } + + // It should also not be broadcast due to the announcement not having an + // announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // It should however send it to the peer directly. In order to do so, + // it'll request a notification for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "connection") + } + + // We can go ahead and notify the peer, which should trigger the message + // to be sent. + peerChan <- remotePeer + assertReceivedChannelUpdate(batch.chanUpdAnn1) + + // The gossiper should now request a notification for when the peer + // disconnects. We'll also trigger this now. + var offlineChan chan struct{} + select { + case offlineChan = <-notifyOffline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "disconnection") + } + + close(offlineChan) + + // Since it's offline, the gossiper should request another notification + // for when it comes back online. + select { + case peerChan = <-notifyOnline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "connection") + } + + // Now that the remote peer is offline, we'll send a new channel update. + prevTimestamp := batch.chanUpdAnn1.Timestamp + newChanUpdate, err := createUpdateAnnouncement( + 0, 0, nodeKeyPriv1, prevTimestamp+1, + ) + if err != nil { + t.Fatalf("unable to create new channel update: %v", err) + } + + // With the new update created, we'll go ahead and process it. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.chanUpdAnn1, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel update") + } + if err != nil { + t.Fatalf("unable to process local channel update: %v", err) + } + + // It should also not be broadcast due to the announcement not having an + // announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // The message should not be sent since the peer remains offline. + select { + case msg := <-sentToPeer: + t.Fatalf("received unexpected message: %v", spew.Sdump(msg)) + case <-time.After(time.Second): + } + + // Finally, we'll notify the peer is online and ensure the new channel + // update is received. + peerChan <- remotePeer + assertReceivedChannelUpdate(newChanUpdate) +} + func assertMessage(t *testing.T, expected, got lnwire.Message) { t.Helper() diff --git a/server.go b/server.go index 32cf6fe1..4f49f274 100644 --- a/server.go +++ b/server.go @@ -593,14 +593,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, } s.authGossiper = discovery.New(discovery.Config{ - Router: s.chanRouter, - Notifier: s.cc.chainNotifier, - ChainHash: *activeNetParams.GenesisHash, - Broadcast: s.BroadcastMessage, - ChanSeries: chanSeries, - FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { - return s.FindPeer(pub) - }, + Router: s.chanRouter, + Notifier: s.cc.chainNotifier, + ChainHash: *activeNetParams.GenesisHash, + Broadcast: s.BroadcastMessage, + ChanSeries: chanSeries, NotifyWhenOnline: s.NotifyWhenOnline, NotifyWhenOffline: s.NotifyWhenOffline, ProofMatureDelta: 0,