discovery: check if stale within isMsgStale for ChannelUpdate messages

In this commit, we address an assumption of the gossiper's recently
introduce reliable sender. The reliable sender is currently only used
for messages of unannounced channels. This makes sense as peers should
be able to retrieve messages from the network if they've previously
announced. However, within isMsgStale, we assumed that the reliable
sender would be used for every ChannelUpdate being sent, even if the
channel is already announced. Due to this, checking if the policy is
stale was unnecessary. But since this isn't the case, we should actually
be checking whether it is stale to prevent sending it later on.
This commit is contained in:
Wilmer Paulino 2019-03-13 13:36:21 -07:00
parent 93414bd27a
commit 0ab97957ea
2 changed files with 184 additions and 26 deletions

@ -221,7 +221,10 @@ type AuthenticatedGossiper struct {
peerSyncers map[routing.Vertex]*gossipSyncer
// reliableSender is a subsystem responsible for handling reliable
// message send requests to peers.
// message send requests to peers. This should only be used for channels
// that are unadvertised at the time of handling the message since if it
// is advertised, then peers should be able to get the message from the
// network.
reliableSender *reliableSender
sync.Mutex
@ -2364,16 +2367,32 @@ func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
return chanInfo.AuthProof != nil
case *lnwire.ChannelUpdate:
// The MessageStore will always store the latest ChannelUpdate
// as it is not aware of its timestamp (by design), so it will
// never be stale. We should still however check if the channel
// is part of our graph. If it's not, we can mark it as stale.
_, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil && err != channeldb.ErrEdgeNotFound {
log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", err)
_, p1, p2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
// If the channel cannot be found, it is most likely a leftover
// message for a channel that was closed, so we can consider it
// stale.
if err == channeldb.ErrEdgeNotFound {
return true
}
return err == channeldb.ErrEdgeNotFound
if err != nil {
log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", msg.ShortChannelID, err)
return false
}
// Otherwise, we'll retrieve the correct policy that we
// currently have stored within our graph to check if this
// message is stale by comparing its timestamp.
var p *channeldb.ChannelEdgePolicy
if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
p = p1
} else {
p = p2
}
timestamp := time.Unix(int64(msg.Timestamp), 0)
return p.LastUpdate.After(timestamp)
default:
// We'll make sure to not mark any unsupported messages as stale

@ -24,6 +24,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
@ -2956,16 +2957,17 @@ func TestSendChannelUpdateReliably(t *testing.T) {
return c
}
// assertReceivedChannelUpdate is a helper closure we'll use to
// determine if the correct channel update was received.
assertReceivedChannelUpdate := func(channelUpdate *lnwire.ChannelUpdate) {
// assertMsgSent is a helper closure we'll use to determine if the
// correct gossip message was sent.
assertMsgSent := func(msg lnwire.Message) {
t.Helper()
select {
case msg := <-sentToPeer:
assertMessage(t, batch.chanUpdAnn1, msg)
case msgSent := <-sentToPeer:
assertMessage(t, msg, msgSent)
case <-time.After(2 * time.Second):
t.Fatal("did not send local channel update to peer")
t.Fatalf("did not send %v message to peer",
msg.MsgType())
}
}
@ -3022,7 +3024,7 @@ func TestSendChannelUpdateReliably(t *testing.T) {
// We can go ahead and notify the peer, which should trigger the message
// to be sent.
peerChan <- remotePeer
assertReceivedChannelUpdate(batch.chanUpdAnn1)
assertMsgSent(batch.chanUpdAnn1)
// The gossiper should now request a notification for when the peer
// disconnects. We'll also trigger this now.
@ -3046,12 +3048,9 @@ func TestSendChannelUpdateReliably(t *testing.T) {
}
// Now that the remote peer is offline, we'll send a new channel update.
prevTimestamp := batch.chanUpdAnn1.Timestamp
newChanUpdate, err := createUpdateAnnouncement(
0, 0, nodeKeyPriv1, prevTimestamp+1,
)
if err != nil {
t.Fatalf("unable to create new channel update: %v", err)
batch.chanUpdAnn1.Timestamp++
if err := signUpdate(nodeKeyPriv1, batch.chanUpdAnn1); err != nil {
t.Fatalf("unable to sign new channel update: %v", err)
}
// With the new update created, we'll go ahead and process it.
@ -3081,10 +3080,150 @@ func TestSendChannelUpdateReliably(t *testing.T) {
case <-time.After(time.Second):
}
// Finally, we'll notify the peer is online and ensure the new channel
// update is received.
// Once again, we'll notify the peer is online and ensure the new
// channel update is received. This will also cause an offline
// notification to be requested again.
peerChan <- remotePeer
assertReceivedChannelUpdate(newChanUpdate)
assertMsgSent(batch.chanUpdAnn1)
select {
case offlineChan = <-notifyOffline:
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not request notification upon peer " +
"disconnection")
}
// We'll then exchange proofs with the remote peer in order to announce
// the channel.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local channel proof")
}
if err != nil {
t.Fatalf("unable to process local channel proof: %v", err)
}
// No messages should be broadcast as we don't have the full proof yet.
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Our proof should be sent to the remote peer however.
assertMsgSent(batch.localProofAnn)
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote channel proof")
}
if err != nil {
t.Fatalf("unable to process remote channel proof: %v", err)
}
// Now that we've constructed our full proof, we can assert that the
// channel has been announced.
for i := 0; i < 2; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("expected channel to be announced")
}
}
// With the channel announced, we'll generate a new channel update. This
// one won't take the path of the reliable sender, as the channel has
// already been announced. We'll keep track of the old message that is
// now stale to use later on.
staleChannelUpdate := batch.chanUpdAnn1
newChannelUpdate := &lnwire.ChannelUpdate{}
*newChannelUpdate = *staleChannelUpdate
newChannelUpdate.Timestamp++
if err := signUpdate(nodeKeyPriv1, newChannelUpdate); err != nil {
t.Fatalf("unable to sign new channel update: %v", err)
}
// Process the new channel update. It should not be sent to the peer
// directly since the reliable sender only applies when the channel is
// not announced.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
newChannelUpdate, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local channel update")
}
if err != nil {
t.Fatalf("unable to process local channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("channel update was not broadcast")
}
select {
case msg := <-sentToPeer:
t.Fatalf("received unexpected message: %v", spew.Sdump(msg))
case <-time.After(time.Second):
}
// Then, we'll trigger the reliable sender to send its pending messages
// by triggering an offline notification for the peer, followed by an
// online one.
close(offlineChan)
select {
case peerChan = <-notifyOnline:
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not request notification upon peer " +
"connection")
}
peerChan <- remotePeer
// At this point, we should have sent both the AnnounceSignatures and
// stale ChannelUpdate.
for i := 0; i < 2; i++ {
var msg lnwire.Message
select {
case msg = <-sentToPeer:
case <-time.After(time.Second):
t.Fatal("expected to send message")
}
switch msg := msg.(type) {
case *lnwire.ChannelUpdate:
assertMessage(t, staleChannelUpdate, msg)
case *lnwire.AnnounceSignatures:
assertMessage(t, batch.localProofAnn, msg)
default:
t.Fatalf("send unexpected %v message", msg.MsgType())
}
}
// Since the messages above are now deemed as stale, they should be
// removed from the message store.
err = lntest.WaitNoError(func() error {
msgs, err := ctx.gossiper.cfg.MessageStore.Messages()
if err != nil {
return fmt.Errorf("unable to retrieve pending "+
"messages: %v", err)
}
if len(msgs) != 0 {
return fmt.Errorf("expected no messages left, found %d",
len(msgs))
}
return nil
}, time.Second)
if err != nil {
t.Fatal(err)
}
}
func assertMessage(t *testing.T, expected, got lnwire.Message) {