From f45502f890b7eb624ea946ed7fcc7b8a1fa500d7 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 26 Dec 2017 16:23:05 +0100 Subject: [PATCH] discovery: update message processing to emit networkMsg rather than lnwire.Message In this commit, we make an incremental step towards page of the new feature of deDupedAnnoucnements to return the set of senders for each message. All methods the process new channel announcements, will now return an instance of networkMsg rather than lnwire.Message. This will allow passing the returned announcement directly into deDupedAnnoucnements.AddMsg(). --- discovery/gossiper.go | 55 ++++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ea4a055f..93ddc64d 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1034,7 +1034,7 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error { // updated with the latest information reflecting the applied fee updates. // // TODO(roasbeef): generalize into generic for any channel update -func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest) ([]lnwire.Message, error) { +func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest) ([]networkMsg, error) { // First, we'll construct a set of all the channels that need to be // updated. chansToUpdate := make(map[wire.OutPoint]struct{}) @@ -1044,7 +1044,7 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest haveChanFilter := len(chansToUpdate) != 0 - var chanUpdates []lnwire.Message + var chanUpdates []networkMsg // Next, we'll loop over all the outgoing channels the router knows of. // If we have a filter then we'll only collected those channels, @@ -1071,7 +1071,13 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest return err } - chanUpdates = append(chanUpdates, chanUpdate) + // We set ourselves as the source of this message to indicate + // that we shouldn't skip any peers when sending this message. + chanUpdates = append(chanUpdates, networkMsg{ + peer: d.selfKey, + msg: chanUpdate, + }) + return nil }) if err != nil { @@ -1086,7 +1092,7 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest // didn't affect the internal state due to either being out of date, invalid, // or redundant, then nil is returned. Otherwise, the set of announcements will // be returned which should be broadcasted to the rest of the network. -func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message { +func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []networkMsg { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { // TODO(roasbeef) make height delta 6 // * or configurable @@ -1139,7 +1145,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // Node announcement was successfully proceeded and know it // might be broadcast to other connected nodes. - announcements = append(announcements, msg) + announcements = append(announcements, networkMsg{ + msg: msg, + peer: nMsg.peer, + }) nMsg.err <- nil // TODO(roasbeef): get rid of the above @@ -1326,13 +1335,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // Now delete the premature ChannelUpdates, since we added them // all to the queue of network messages. - delete(d.prematureChannelUpdates, shortChanID) d.pChanUpdMtx.Unlock() - // Launch a new goroutine to handle each ChannelUpdate, - // this to ensure we don't block here, as we can handle - // only one announcement at a time. + // Launch a new goroutine to handle each ChannelUpdate, this to + // ensure we don't block here, as we can handle only one + // announcement at a time. for _, cu := range channelUpdates { go func(nMsg *networkMsg) { switch msg := nMsg.msg.(type) { @@ -1358,8 +1366,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return } - // We don't expect any other message type - // than ChannelUpdate to be in this map. + // We don't expect any other message type than + // ChannelUpdate to be in this map. default: log.Errorf("Unsupported message type "+ "found among ChannelUpdates: %T", msg) @@ -1371,7 +1379,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // might be broadcast to other connected nodes if it was // announcement with proof (remote). if proof != nil { - announcements = append(announcements, msg) + announcements = append(announcements, networkMsg{ + msg: msg, + peer: nMsg.peer, + }) } nMsg.err <- nil @@ -1532,7 +1543,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // we'll only broadcast the channel update announcement if it // has an attached authentication proof. if chanInfo.AuthProof != nil { - announcements = append(announcements, msg) + announcements = append(announcements, networkMsg{ + msg: msg, + peer: nMsg.peer, + }) } nMsg.err <- nil @@ -1774,12 +1788,21 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // Assemble the necessary announcements to add to the next // broadcasting batch. - announcements = append(announcements, chanAnn) + announcements = append(announcements, networkMsg{ + msg: chanAnn, + peer: nMsg.peer, + }) if e1Ann != nil { - announcements = append(announcements, e1Ann) + announcements = append(announcements, networkMsg{ + msg: e1Ann, + peer: nMsg.peer, + }) } if e2Ann != nil { - announcements = append(announcements, e2Ann) + announcements = append(announcements, networkMsg{ + msg: e2Ann, + peer: nMsg.peer, + }) } nMsg.err <- nil