Merge pull request #2775 from wpaulino/reliable-sender-chan-update

discovery: check if stale within isMsgStale for ChannelUpdate messages
This commit is contained in:
Johan T. Halseth 2019-04-01 12:08:33 +02:00 committed by GitHub
commit 8087ea4c4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 244 additions and 154 deletions

@ -221,7 +221,10 @@ type AuthenticatedGossiper struct {
peerSyncers map[routing.Vertex]*gossipSyncer peerSyncers map[routing.Vertex]*gossipSyncer
// reliableSender is a subsystem responsible for handling reliable // reliableSender is a subsystem responsible for handling reliable
// message send requests to peers. // message send requests to peers. This should only be used for channels
// that are unadvertised at the time of handling the message since if it
// is advertised, then peers should be able to get the message from the
// network.
reliableSender *reliableSender reliableSender *reliableSender
sync.Mutex sync.Mutex
@ -2364,16 +2367,32 @@ func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
return chanInfo.AuthProof != nil return chanInfo.AuthProof != nil
case *lnwire.ChannelUpdate: case *lnwire.ChannelUpdate:
// The MessageStore will always store the latest ChannelUpdate _, p1, p2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
// as it is not aware of its timestamp (by design), so it will
// never be stale. We should still however check if the channel // If the channel cannot be found, it is most likely a leftover
// is part of our graph. If it's not, we can mark it as stale. // message for a channel that was closed, so we can consider it
_, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) // stale.
if err != nil && err != channeldb.ErrEdgeNotFound { if err == channeldb.ErrEdgeNotFound {
log.Debugf("Unable to retrieve channel=%v from graph: "+ return true
"%v", err)
} }
return err == channeldb.ErrEdgeNotFound if err != nil {
log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", msg.ShortChannelID, err)
return false
}
// Otherwise, we'll retrieve the correct policy that we
// currently have stored within our graph to check if this
// message is stale by comparing its timestamp.
var p *channeldb.ChannelEdgePolicy
if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
p = p1
} else {
p = p2
}
timestamp := time.Unix(int64(msg.Timestamp), 0)
return p.LastUpdate.After(timestamp)
default: default:
// We'll make sure to not mark any unsupported messages as stale // We'll make sure to not mark any unsupported messages as stale

@ -24,6 +24,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
) )
@ -155,7 +156,16 @@ func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.edges[edge.ChannelID] = append(r.edges[edge.ChannelID], *edge) if len(r.edges[edge.ChannelID]) == 0 {
r.edges[edge.ChannelID] = make([]channeldb.ChannelEdgePolicy, 2)
}
if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
r.edges[edge.ChannelID][0] = *edge
} else {
r.edges[edge.ChannelID][1] = *edge
}
return nil return nil
} }
@ -226,13 +236,17 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) (
return &chanInfo, nil, nil, nil return &chanInfo, nil, nil, nil
} }
if len(edges) == 1 { var edge1 *channeldb.ChannelEdgePolicy
edge1 := edges[0] if !reflect.DeepEqual(edges[0], channeldb.ChannelEdgePolicy{}) {
return &chanInfo, &edge1, nil, nil edge1 = &edges[0]
} }
edge1, edge2 := edges[0], edges[1] var edge2 *channeldb.ChannelEdgePolicy
return &chanInfo, &edge1, &edge2, nil if !reflect.DeepEqual(edges[1], channeldb.ChannelEdgePolicy{}) {
edge2 = &edges[1]
}
return &chanInfo, edge1, edge2, nil
} }
func (r *mockGraphSource) FetchLightningNode( func (r *mockGraphSource) FetchLightningNode(
@ -327,11 +341,15 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
} }
switch { switch {
case len(edges) >= 1 && edges[0].ChannelFlags == flags: case flags&lnwire.ChanUpdateDirection == 0 &&
return !edges[0].LastUpdate.Before(timestamp) !reflect.DeepEqual(edges[0], channeldb.ChannelEdgePolicy{}):
case len(edges) >= 2 && edges[1].ChannelFlags == flags: return !timestamp.After(edges[0].LastUpdate)
return !edges[1].LastUpdate.Before(timestamp)
case flags&lnwire.ChanUpdateDirection == 1 &&
!reflect.DeepEqual(edges[1], channeldb.ChannelEdgePolicy{}):
return !timestamp.After(edges[1].LastUpdate)
default: default:
return false return false
@ -1345,25 +1363,14 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
sentToPeer := make(chan lnwire.Message, 1) sentToPeer := make(chan lnwire.Message, 1)
remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit}
// Override NotifyWhenOnline to return the remote peer which we expect // Since the reliable send to the remote peer of the local channel proof
// meesages to be sent to. // requires a notification when the peer comes online, we'll capture the
// channel through which it gets sent to control exactly when to
// dispatch it.
notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) { connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
peerChan <- remotePeer
}
// Override NotifyWhenOffline to return the channel which will notify
// the gossiper that the peer is offline. We'll use this to signal that
// the peer is offline so that the gossiper requests a notification when
// it comes back online.
notifyOffline := make(chan chan struct{}, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func(
_ [33]byte) <-chan struct{} {
c := make(chan struct{})
notifyOffline <- c
return c
} }
// Recreate lightning network topology. Initialize router with channel // Recreate lightning network topology. Initialize router with channel
@ -1384,102 +1391,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case msg := <-sentToPeer:
assertMessage(t, batch.chanUpdAnn1, msg)
case <-time.After(1 * time.Second):
t.Fatal("gossiper did not send channel update to peer")
}
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Since the reliable send to the remote peer of the local channel proof
// requires a notification when the peer comes online, we'll capture the
// channel through which it gets sent to control exactly when to
// dispatch it.
notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
}
// Before sending the local channel proof, we'll notify that the peer is
// offline, so that it's not sent to the peer.
var peerOffline chan struct{}
select {
case peerOffline = <-notifyOffline:
case <-time.After(2 * time.Second):
t.Fatalf("gossiper did not request notification for when " +
"peer disconnects")
}
close(peerOffline)
// Pretending that we receive local channel announcement from funding // Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process. // manager, thereby kick off the announcement exchange process.
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement") t.Fatal("did not process remote announcement")
} }
@ -1598,13 +1515,11 @@ out:
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process :%v", err)
} }
for i := 0; i < 5; i++ {
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
case <-time.After(time.Second): case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast") t.Fatal("announcement wasn't broadcast")
} }
}
number = 0 number = 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
@ -3042,16 +2957,17 @@ func TestSendChannelUpdateReliably(t *testing.T) {
return c return c
} }
// assertReceivedChannelUpdate is a helper closure we'll use to // assertMsgSent is a helper closure we'll use to determine if the
// determine if the correct channel update was received. // correct gossip message was sent.
assertReceivedChannelUpdate := func(channelUpdate *lnwire.ChannelUpdate) { assertMsgSent := func(msg lnwire.Message) {
t.Helper() t.Helper()
select { select {
case msg := <-sentToPeer: case msgSent := <-sentToPeer:
assertMessage(t, batch.chanUpdAnn1, msg) assertMessage(t, msg, msgSent)
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not send local channel update to peer") t.Fatalf("did not send %v message to peer",
msg.MsgType())
} }
} }
@ -3108,7 +3024,7 @@ func TestSendChannelUpdateReliably(t *testing.T) {
// We can go ahead and notify the peer, which should trigger the message // We can go ahead and notify the peer, which should trigger the message
// to be sent. // to be sent.
peerChan <- remotePeer peerChan <- remotePeer
assertReceivedChannelUpdate(batch.chanUpdAnn1) assertMsgSent(batch.chanUpdAnn1)
// The gossiper should now request a notification for when the peer // The gossiper should now request a notification for when the peer
// disconnects. We'll also trigger this now. // disconnects. We'll also trigger this now.
@ -3132,12 +3048,9 @@ func TestSendChannelUpdateReliably(t *testing.T) {
} }
// Now that the remote peer is offline, we'll send a new channel update. // Now that the remote peer is offline, we'll send a new channel update.
prevTimestamp := batch.chanUpdAnn1.Timestamp batch.chanUpdAnn1.Timestamp++
newChanUpdate, err := createUpdateAnnouncement( if err := signUpdate(nodeKeyPriv1, batch.chanUpdAnn1); err != nil {
0, 0, nodeKeyPriv1, prevTimestamp+1, t.Fatalf("unable to sign new channel update: %v", err)
)
if err != nil {
t.Fatalf("unable to create new channel update: %v", err)
} }
// With the new update created, we'll go ahead and process it. // With the new update created, we'll go ahead and process it.
@ -3167,10 +3080,150 @@ func TestSendChannelUpdateReliably(t *testing.T) {
case <-time.After(time.Second): case <-time.After(time.Second):
} }
// Finally, we'll notify the peer is online and ensure the new channel // Once again, we'll notify the peer is online and ensure the new
// update is received. // channel update is received. This will also cause an offline
// notification to be requested again.
peerChan <- remotePeer peerChan <- remotePeer
assertReceivedChannelUpdate(newChanUpdate) assertMsgSent(batch.chanUpdAnn1)
select {
case offlineChan = <-notifyOffline:
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not request notification upon peer " +
"disconnection")
}
// We'll then exchange proofs with the remote peer in order to announce
// the channel.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local channel proof")
}
if err != nil {
t.Fatalf("unable to process local channel proof: %v", err)
}
// No messages should be broadcast as we don't have the full proof yet.
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Our proof should be sent to the remote peer however.
assertMsgSent(batch.localProofAnn)
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote channel proof")
}
if err != nil {
t.Fatalf("unable to process remote channel proof: %v", err)
}
// Now that we've constructed our full proof, we can assert that the
// channel has been announced.
for i := 0; i < 2; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("expected channel to be announced")
}
}
// With the channel announced, we'll generate a new channel update. This
// one won't take the path of the reliable sender, as the channel has
// already been announced. We'll keep track of the old message that is
// now stale to use later on.
staleChannelUpdate := batch.chanUpdAnn1
newChannelUpdate := &lnwire.ChannelUpdate{}
*newChannelUpdate = *staleChannelUpdate
newChannelUpdate.Timestamp++
if err := signUpdate(nodeKeyPriv1, newChannelUpdate); err != nil {
t.Fatalf("unable to sign new channel update: %v", err)
}
// Process the new channel update. It should not be sent to the peer
// directly since the reliable sender only applies when the channel is
// not announced.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
newChannelUpdate, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local channel update")
}
if err != nil {
t.Fatalf("unable to process local channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("channel update was not broadcast")
}
select {
case msg := <-sentToPeer:
t.Fatalf("received unexpected message: %v", spew.Sdump(msg))
case <-time.After(time.Second):
}
// Then, we'll trigger the reliable sender to send its pending messages
// by triggering an offline notification for the peer, followed by an
// online one.
close(offlineChan)
select {
case peerChan = <-notifyOnline:
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not request notification upon peer " +
"connection")
}
peerChan <- remotePeer
// At this point, we should have sent both the AnnounceSignatures and
// stale ChannelUpdate.
for i := 0; i < 2; i++ {
var msg lnwire.Message
select {
case msg = <-sentToPeer:
case <-time.After(time.Second):
t.Fatal("expected to send message")
}
switch msg := msg.(type) {
case *lnwire.ChannelUpdate:
assertMessage(t, staleChannelUpdate, msg)
case *lnwire.AnnounceSignatures:
assertMessage(t, batch.localProofAnn, msg)
default:
t.Fatalf("send unexpected %v message", msg.MsgType())
}
}
// Since the messages above are now deemed as stale, they should be
// removed from the message store.
err = lntest.WaitNoError(func() error {
msgs, err := ctx.gossiper.cfg.MessageStore.Messages()
if err != nil {
return fmt.Errorf("unable to retrieve pending "+
"messages: %v", err)
}
if len(msgs) != 0 {
return fmt.Errorf("expected no messages left, found %d",
len(msgs))
}
return nil
}, time.Second)
if err != nil {
t.Fatal(err)
}
} }
func assertMessage(t *testing.T, expected, got lnwire.Message) { func assertMessage(t *testing.T, expected, got lnwire.Message) {

@ -181,9 +181,19 @@ out:
// ignored for now since the peer is currently offline. Once // ignored for now since the peer is currently offline. Once
// they reconnect, the messages will be sent since they should // they reconnect, the messages will be sent since they should
// have been persisted to disk. // have been persisted to disk.
case <-peerMgr.msgs: case msg := <-peerMgr.msgs:
// Retrieve the short channel ID for which this message
// applies for logging purposes. The error can be
// ignored as the store can only contain messages which
// have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
log.Debugf("Received request to send %v message for "+
"channel=%v while peer=%x is offline",
msg.MsgType(), shortChanID, peerPubKey)
case peer = <-peerChan: case peer = <-peerChan:
break out break out
case <-s.quit: case <-s.quit:
return return
} }
@ -215,6 +225,14 @@ out:
// can only contain messages which have a ShortChannelID field. // can only contain messages which have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg) shortChanID, _ := msgShortChanID(msg)
// Ensure the peer is still online right before sending the
// message.
select {
case <-offlineChan:
goto waitUntilOnline
default:
}
if err := peer.SendMessage(false, msg); err != nil { if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for channel=%v "+ log.Errorf("Unable to send %v message for channel=%v "+
"to %x: %v", msg.MsgType(), shortChanID, "to %x: %v", msg.MsgType(), shortChanID,