Merge pull request #2149 from wpaulino/chan-full-proof-node-ann

discovery/gossiper: send node anns when constructing full chan proof
This commit is contained in:
Olaoluwa Osuntokun 2018-11-13 21:13:57 -08:00 committed by GitHub
commit fd5b24fb4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 356 additions and 77 deletions

@ -1642,6 +1642,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
source: nMsg.source, source: nMsg.source,
msg: msg, msg: msg,
}) })
} else {
log.Tracef("Skipping broadcasting node announcement "+
"for %x due to being unadvertised", msg.NodeID)
} }
nMsg.err <- nil nMsg.err <- nil
@ -2363,6 +2366,31 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}) })
} }
// We'll also send along the node announcements for each channel
// participant if we know of them.
node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
if err != nil {
log.Debugf("Unable to fetch node announcement for "+
"%x: %v", chanInfo.NodeKey1Bytes, err)
} else {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: node1Ann,
})
}
node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
if err != nil {
log.Debugf("Unable to fetch node announcement for "+
"%x: %v", chanInfo.NodeKey2Bytes, err)
} else {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: node2Ann,
})
}
nMsg.err <- nil nMsg.err <- nil
return announcements return announcements
@ -2372,6 +2400,19 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
} }
} }
// fetchNodeAnn fetches the latest signed node announcement from our point of
// view for the node with the given public key.
func (d *AuthenticatedGossiper) fetchNodeAnn(
pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
node, err := d.cfg.Router.FetchLightningNode(pubKey)
if err != nil {
return nil, err
}
return node.NodeAnnouncement(true)
}
// sendAnnSigReliably will try to send the provided local AnnounceSignatures // sendAnnSigReliably will try to send the provided local AnnounceSignatures
// to the remote peer, waiting for it to come online if necessary. This // to the remote peer, waiting for it to come online if necessary. This
// method returns after adding the message to persistent storage, such // method returns after adding the message to persistent storage, such

@ -196,6 +196,18 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) (
return chanInfo, edges[0], edges[1], nil return chanInfo, edges[0], edges[1], nil
} }
func (r *mockGraphSource) FetchLightningNode(
nodePub routing.Vertex) (*channeldb.LightningNode, error) {
for _, node := range r.nodes {
if bytes.Equal(nodePub[:], node.PubKeyBytes[:]) {
return node, nil
}
}
return nil, channeldb.ErrGraphNodeNotFound
}
// IsStaleNode returns true if the graph source has a node announcement for the // IsStaleNode returns true if the graph source has a node announcement for the
// target node with a more recent timestamp. // target node with a more recent timestamp.
func (r *mockGraphSource) IsStaleNode(nodePub routing.Vertex, timestamp time.Time) bool { func (r *mockGraphSource) IsStaleNode(nodePub routing.Vertex, timestamp time.Time) bool {
@ -806,7 +818,9 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) {
return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil
} }
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
msg ...lnwire.Message) error {
select { select {
case sentMsgs <- msg[0]: case sentMsgs <- msg[0]:
case <-ctx.gossiper.quit: case <-ctx.gossiper.quit:
@ -833,13 +847,14 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
// Recreate lightning network topology. Initialize router with channel // Recreate lightning network topology. Initialize router with channel
// between two nodes. // between two nodes.
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.localChanAnn, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel ann: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -848,21 +863,37 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel update: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast") t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The local ChannelUpdate should now be sent directly to the remote peer, // The local ChannelUpdate should now be sent directly to the remote peer,
// such that the edge can be used for routing, regardless if this channel // such that the edge can be used for routing, regardless if this channel
// is announced or not (private channel). // is announced or not (private channel).
@ -876,13 +907,14 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
remotePeer): batch.chanUpdAnn2, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement") t.Fatal("did not process remote announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel update: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -890,16 +922,33 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
localKey): batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement") t.Fatal("did not process remote announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process local proof: %v", err)
} }
select { select {
@ -923,16 +972,17 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
remotePeer): batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement") t.Fatal("did not process remote announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process remote proof: %v", err)
} }
for i := 0; i < 3; i++ { for i := 0; i < 5; i++ {
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
case <-time.After(time.Second): case <-time.After(time.Second):
@ -1059,6 +1109,22 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The local ChannelUpdate should now be sent directly to the remote peer, // The local ChannelUpdate should now be sent directly to the remote peer,
// such that the edge can be used for routing, regardless if this channel // such that the edge can be used for routing, regardless if this channel
// is announced or not (private channel). // is announced or not (private channel).
@ -1078,7 +1144,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
t.Fatal("did not process remote announcement") t.Fatal("did not process remote announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process: %v", err) t.Fatalf("unable to process node ann: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -1086,6 +1152,22 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// After that we process local announcement, and waiting to receive // After that we process local announcement, and waiting to receive
// the channel announcement. // the channel announcement.
select { select {
@ -1110,7 +1192,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
// And since both remote and local announcements are processed, we // And since both remote and local announcements are processed, we
// should be broadcasting the final channel announcements. // should be broadcasting the final channel announcements.
for i := 0; i < 3; i++ { for i := 0; i < 5; i++ {
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
case <-time.After(time.Second): case <-time.After(time.Second):
@ -1162,13 +1244,14 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
// Recreate lightning network topology. Initialize router with channel // Recreate lightning network topology. Initialize router with channel
// between two nodes. // between two nodes.
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.localChanAnn, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel ann: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -1177,13 +1260,14 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel update: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -1192,13 +1276,30 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
remotePeer): batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement") t.Fatal("did not process remote announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel update: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -1206,6 +1307,22 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Make the SendToPeer fail, simulating the peer being offline. // Make the SendToPeer fail, simulating the peer being offline.
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
msg ...lnwire.Message) error { msg ...lnwire.Message) error {
@ -1224,13 +1341,14 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
// Pretending that we receive local channel announcement from funding // Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process. // manager, thereby kick off the announcement exchange process.
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process local proof: %v", err)
} }
// Since sending this local announcement proof to the remote will fail, // Since sending this local announcement proof to the remote will fail,
@ -1289,16 +1407,17 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
// Now give the gossiper the remote proof. This should trigger a // Now give the gossiper the remote proof. This should trigger a
// broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate). // broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate).
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
remotePeer): batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process remote proof: %v", err)
} }
for i := 0; i < 3; i++ { for i := 0; i < 5; i++ {
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
case <-time.After(time.Second): case <-time.After(time.Second):
@ -1351,13 +1470,14 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
// Recreate lightning network topology. Initialize router with channel // Recreate lightning network topology. Initialize router with channel
// between two nodes. // between two nodes.
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.localChanAnn, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel ann: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -1366,13 +1486,14 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel update: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -1381,13 +1502,30 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
remotePeer): batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement") t.Fatal("did not process remote announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel update: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -1395,6 +1533,22 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Make the SendToPeerFail, simulating the peer being offline. // Make the SendToPeerFail, simulating the peer being offline.
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
msg ...lnwire.Message) error { msg ...lnwire.Message) error {
@ -1514,8 +1668,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
// Now exchanging the remote channel proof, the channel announcement // Now exchanging the remote channel proof, the channel announcement
// broadcast should continue as normal. // broadcast should continue as normal.
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
remotePeer): batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement") t.Fatal("did not process remote announcement")
} }
@ -1523,7 +1678,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process :%v", err)
} }
for i := 0; i < 3; i++ { for i := 0; i < 5; i++ {
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
case <-time.After(time.Second): case <-time.After(time.Second):
@ -1577,13 +1732,14 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
// Recreate lightning network topology. Initialize router with channel // Recreate lightning network topology. Initialize router with channel
// between two nodes. // between two nodes.
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.localChanAnn, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel ann: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -1592,13 +1748,14 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel update: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
@ -1607,19 +1764,53 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
remotePeer): batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann:%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement") t.Fatal("did not process remote announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process channel update: %v", err)
} }
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast") t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Set up a channel we can use to inspect messages sent by the // Set up a channel we can use to inspect messages sent by the
// gossiper to the remote peer. // gossiper to the remote peer.
sentToPeer := make(chan lnwire.Message, 1) sentToPeer := make(chan lnwire.Message, 1)
@ -1644,23 +1835,25 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
// Pretending that we receive local channel announcement from funding // Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process. // manager, thereby kick off the announcement exchange process.
select { select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, case err = <-ctx.gossiper.ProcessLocalAnnouncement(
localKey): batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process local proof: %v", err)
} }
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
remotePeer): batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process remote proof: %v", err)
} }
// We expect the gossiper to send this message to the remote peer. // We expect the gossiper to send this message to the remote peer.
@ -1673,8 +1866,8 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
t.Fatal("did not send local proof to peer") t.Fatal("did not send local proof to peer")
} }
// And all channel announcements should be broadcast. // All channel and node announcements should be broadcast.
for i := 0; i < 3; i++ { for i := 0; i < 5; i++ {
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
case <-time.After(time.Second): case <-time.After(time.Second):
@ -1699,13 +1892,14 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
// Now give the gossiper the remote proof yet again. This should // Now give the gossiper the remote proof yet again. This should
// trigger a send of the full ChannelAnnouncement. // trigger a send of the full ChannelAnnouncement.
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
remotePeer): batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement") t.Fatal("did not process local announcement")
} }
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process remote proof: %v", err)
} }
// We expect the gossiper to send this message to the remote peer. // We expect the gossiper to send this message to the remote peer.
@ -1718,7 +1912,6 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("did not send local proof to peer") t.Fatal("did not send local proof to peer")
} }
} }
// TestDeDuplicatedAnnouncements ensures that the deDupedAnnouncements struct // TestDeDuplicatedAnnouncements ensures that the deDupedAnnouncements struct
@ -2140,14 +2333,25 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
// Recreate the case where the remote node is sending us its ChannelUpdate // Recreate the case where the remote node is sending us its ChannelUpdate
// before we have been able to process our own ChannelAnnouncement and // before we have been able to process our own ChannelAnnouncement and
// ChannelUpdate. // ChannelUpdate.
errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer) errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast") t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer)
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Since the remote ChannelUpdate was added for an edge that // Since the remote ChannelUpdate was added for an edge that
// we did not already know about, it should have been added // we did not already know about, it should have been added
// to the map of premature ChannelUpdates. Check that nothing // to the map of premature ChannelUpdates. Check that nothing
@ -2188,6 +2392,16 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The local ChannelUpdate should now be sent directly to the remote peer, // The local ChannelUpdate should now be sent directly to the remote peer,
// such that the edge can be used for routing, regardless if this channel // such that the edge can be used for routing, regardless if this channel
// is announced or not (private channel). // is announced or not (private channel).
@ -2212,7 +2426,9 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
} }
// Check that the ChannelEdgePolicy was added to the graph. // Check that the ChannelEdgePolicy was added to the graph.
chanInfo, e1, e2, err = ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID) chanInfo, e1, e2, err = ctx.router.GetChannelByID(
batch.chanUpdAnn1.ShortChannelID,
)
if err != nil { if err != nil {
t.Fatalf("unable to get channel from router: %v", err) t.Fatalf("unable to get channel from router: %v", err)
} }
@ -2228,7 +2444,9 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
// Pretending that we receive local channel announcement from funding // Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process. // manager, thereby kick off the announcement exchange process.
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey) err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
)
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process :%v", err)
} }
@ -2253,12 +2471,14 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
t.Fatal("wrong number of objects in storage") t.Fatal("wrong number of objects in storage")
} }
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remotePeer) err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
)
if err != nil { if err != nil {
t.Fatalf("unable to process :%v", err) t.Fatalf("unable to process :%v", err)
} }
for i := 0; i < 3; i++ { for i := 0; i < 5; i++ {
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
case <-time.After(time.Second): case <-time.After(time.Second):

@ -100,6 +100,11 @@ type ChannelGraphSource interface {
GetChannelByID(chanID lnwire.ShortChannelID) (*channeldb.ChannelEdgeInfo, GetChannelByID(chanID lnwire.ShortChannelID) (*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy, *channeldb.ChannelEdgePolicy, error) *channeldb.ChannelEdgePolicy, *channeldb.ChannelEdgePolicy, error)
// FetchLightningNode attempts to look up a target node by its identity
// public key. channeldb.ErrGraphNodeNotFound is returned if the node
// doesn't exist within the graph.
FetchLightningNode(Vertex) (*channeldb.LightningNode, error)
// ForEachNode is used to iterate over every node in the known graph. // ForEachNode is used to iterate over every node in the known graph.
ForEachNode(func(node *channeldb.LightningNode) error) error ForEachNode(func(node *channeldb.LightningNode) error) error
@ -2163,6 +2168,19 @@ func (r *ChannelRouter) GetChannelByID(chanID lnwire.ShortChannelID) (
return r.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64()) return r.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
} }
// FetchLightningNode attempts to look up a target node by its identity public
// key. channeldb.ErrGraphNodeNotFound is returned if the node doesn't exist
// within the graph.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) FetchLightningNode(node Vertex) (*channeldb.LightningNode, error) {
pubKey, err := btcec.ParsePubKey(node[:], btcec.S256())
if err != nil {
return nil, fmt.Errorf("unable to parse raw public key: %v", err)
}
return r.cfg.Graph.FetchLightningNode(pubKey)
}
// ForEachNode is used to iterate over every node in router topology. // ForEachNode is used to iterate over every node in router topology.
// //
// NOTE: This method is part of the ChannelGraphSource interface. // NOTE: This method is part of the ChannelGraphSource interface.