diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 171b04ab..fb6cd7d2 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -221,7 +221,10 @@ type AuthenticatedGossiper struct { peerSyncers map[routing.Vertex]*gossipSyncer // reliableSender is a subsystem responsible for handling reliable - // message send requests to peers. + // message send requests to peers. This should only be used for channels + // that are unadvertised at the time of handling the message since if it + // is advertised, then peers should be able to get the message from the + // network. reliableSender *reliableSender sync.Mutex @@ -2364,16 +2367,32 @@ func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { return chanInfo.AuthProof != nil case *lnwire.ChannelUpdate: - // The MessageStore will always store the latest ChannelUpdate - // as it is not aware of its timestamp (by design), so it will - // never be stale. We should still however check if the channel - // is part of our graph. If it's not, we can mark it as stale. - _, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) - if err != nil && err != channeldb.ErrEdgeNotFound { - log.Debugf("Unable to retrieve channel=%v from graph: "+ - "%v", err) + _, p1, p2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + + // If the channel cannot be found, it is most likely a leftover + // message for a channel that was closed, so we can consider it + // stale. + if err == channeldb.ErrEdgeNotFound { + return true } - return err == channeldb.ErrEdgeNotFound + if err != nil { + log.Debugf("Unable to retrieve channel=%v from graph: "+ + "%v", msg.ShortChannelID, err) + return false + } + + // Otherwise, we'll retrieve the correct policy that we + // currently have stored within our graph to check if this + // message is stale by comparing its timestamp. + var p *channeldb.ChannelEdgePolicy + if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 { + p = p1 + } else { + p = p2 + } + + timestamp := time.Unix(int64(msg.Timestamp), 0) + return p.LastUpdate.After(timestamp) default: // We'll make sure to not mark any unsupported messages as stale diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 447ce1c9..2e19f9fe 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -24,6 +24,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" ) @@ -2956,16 +2957,17 @@ func TestSendChannelUpdateReliably(t *testing.T) { return c } - // assertReceivedChannelUpdate is a helper closure we'll use to - // determine if the correct channel update was received. - assertReceivedChannelUpdate := func(channelUpdate *lnwire.ChannelUpdate) { + // assertMsgSent is a helper closure we'll use to determine if the + // correct gossip message was sent. + assertMsgSent := func(msg lnwire.Message) { t.Helper() select { - case msg := <-sentToPeer: - assertMessage(t, batch.chanUpdAnn1, msg) + case msgSent := <-sentToPeer: + assertMessage(t, msg, msgSent) case <-time.After(2 * time.Second): - t.Fatal("did not send local channel update to peer") + t.Fatalf("did not send %v message to peer", + msg.MsgType()) } } @@ -3022,7 +3024,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // We can go ahead and notify the peer, which should trigger the message // to be sent. peerChan <- remotePeer - assertReceivedChannelUpdate(batch.chanUpdAnn1) + assertMsgSent(batch.chanUpdAnn1) // The gossiper should now request a notification for when the peer // disconnects. We'll also trigger this now. @@ -3046,12 +3048,9 @@ func TestSendChannelUpdateReliably(t *testing.T) { } // Now that the remote peer is offline, we'll send a new channel update. - prevTimestamp := batch.chanUpdAnn1.Timestamp - newChanUpdate, err := createUpdateAnnouncement( - 0, 0, nodeKeyPriv1, prevTimestamp+1, - ) - if err != nil { - t.Fatalf("unable to create new channel update: %v", err) + batch.chanUpdAnn1.Timestamp++ + if err := signUpdate(nodeKeyPriv1, batch.chanUpdAnn1); err != nil { + t.Fatalf("unable to sign new channel update: %v", err) } // With the new update created, we'll go ahead and process it. @@ -3081,10 +3080,150 @@ func TestSendChannelUpdateReliably(t *testing.T) { case <-time.After(time.Second): } - // Finally, we'll notify the peer is online and ensure the new channel - // update is received. + // Once again, we'll notify the peer is online and ensure the new + // channel update is received. This will also cause an offline + // notification to be requested again. peerChan <- remotePeer - assertReceivedChannelUpdate(newChanUpdate) + assertMsgSent(batch.chanUpdAnn1) + + select { + case offlineChan = <-notifyOffline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "disconnection") + } + + // We'll then exchange proofs with the remote peer in order to announce + // the channel. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.localProofAnn, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel proof") + } + if err != nil { + t.Fatalf("unable to process local channel proof: %v", err) + } + + // No messages should be broadcast as we don't have the full proof yet. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Our proof should be sent to the remote peer however. + assertMsgSent(batch.localProofAnn) + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.remoteProofAnn, remotePeer, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote channel proof") + } + if err != nil { + t.Fatalf("unable to process remote channel proof: %v", err) + } + + // Now that we've constructed our full proof, we can assert that the + // channel has been announced. + for i := 0; i < 2; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(2 * trickleDelay): + t.Fatal("expected channel to be announced") + } + } + + // With the channel announced, we'll generate a new channel update. This + // one won't take the path of the reliable sender, as the channel has + // already been announced. We'll keep track of the old message that is + // now stale to use later on. + staleChannelUpdate := batch.chanUpdAnn1 + newChannelUpdate := &lnwire.ChannelUpdate{} + *newChannelUpdate = *staleChannelUpdate + newChannelUpdate.Timestamp++ + if err := signUpdate(nodeKeyPriv1, newChannelUpdate); err != nil { + t.Fatalf("unable to sign new channel update: %v", err) + } + + // Process the new channel update. It should not be sent to the peer + // directly since the reliable sender only applies when the channel is + // not announced. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + newChannelUpdate, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel update") + } + if err != nil { + t.Fatalf("unable to process local channel update: %v", err) + } + select { + case <-ctx.broadcastedMessage: + case <-time.After(2 * trickleDelay): + t.Fatal("channel update was not broadcast") + } + select { + case msg := <-sentToPeer: + t.Fatalf("received unexpected message: %v", spew.Sdump(msg)) + case <-time.After(time.Second): + } + + // Then, we'll trigger the reliable sender to send its pending messages + // by triggering an offline notification for the peer, followed by an + // online one. + close(offlineChan) + + select { + case peerChan = <-notifyOnline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "connection") + } + + peerChan <- remotePeer + + // At this point, we should have sent both the AnnounceSignatures and + // stale ChannelUpdate. + for i := 0; i < 2; i++ { + var msg lnwire.Message + select { + case msg = <-sentToPeer: + case <-time.After(time.Second): + t.Fatal("expected to send message") + } + + switch msg := msg.(type) { + case *lnwire.ChannelUpdate: + assertMessage(t, staleChannelUpdate, msg) + case *lnwire.AnnounceSignatures: + assertMessage(t, batch.localProofAnn, msg) + default: + t.Fatalf("send unexpected %v message", msg.MsgType()) + } + } + + // Since the messages above are now deemed as stale, they should be + // removed from the message store. + err = lntest.WaitNoError(func() error { + msgs, err := ctx.gossiper.cfg.MessageStore.Messages() + if err != nil { + return fmt.Errorf("unable to retrieve pending "+ + "messages: %v", err) + } + if len(msgs) != 0 { + return fmt.Errorf("expected no messages left, found %d", + len(msgs)) + } + return nil + }, time.Second) + if err != nil { + t.Fatal(err) + } } func assertMessage(t *testing.T, expected, got lnwire.Message) {