discovery: update message processing to emit networkMsg rather than lnwire.Message

In this commit, we make an incremental step towards page of the new
feature of deDupedAnnoucnements to return the set of senders for each
message. All methods the process new channel announcements, will now
return an instance of networkMsg rather than lnwire.Message. This will
allow passing the returned announcement directly into
deDupedAnnoucnements.AddMsg().
This commit is contained in:
Olaoluwa Osuntokun 2017-12-26 16:23:05 +01:00
parent f1b40e0b4d
commit f45502f890
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -1034,7 +1034,7 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
// updated with the latest information reflecting the applied fee updates. // updated with the latest information reflecting the applied fee updates.
// //
// TODO(roasbeef): generalize into generic for any channel update // TODO(roasbeef): generalize into generic for any channel update
func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest) ([]lnwire.Message, error) { func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest) ([]networkMsg, error) {
// First, we'll construct a set of all the channels that need to be // First, we'll construct a set of all the channels that need to be
// updated. // updated.
chansToUpdate := make(map[wire.OutPoint]struct{}) chansToUpdate := make(map[wire.OutPoint]struct{})
@ -1044,7 +1044,7 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest
haveChanFilter := len(chansToUpdate) != 0 haveChanFilter := len(chansToUpdate) != 0
var chanUpdates []lnwire.Message var chanUpdates []networkMsg
// Next, we'll loop over all the outgoing channels the router knows of. // Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels, // If we have a filter then we'll only collected those channels,
@ -1071,7 +1071,13 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest
return err return err
} }
chanUpdates = append(chanUpdates, chanUpdate) // We set ourselves as the source of this message to indicate
// that we shouldn't skip any peers when sending this message.
chanUpdates = append(chanUpdates, networkMsg{
peer: d.selfKey,
msg: chanUpdate,
})
return nil return nil
}) })
if err != nil { if err != nil {
@ -1086,7 +1092,7 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest
// didn't affect the internal state due to either being out of date, invalid, // didn't affect the internal state due to either being out of date, invalid,
// or redundant, then nil is returned. Otherwise, the set of announcements will // or redundant, then nil is returned. Otherwise, the set of announcements will
// be returned which should be broadcasted to the rest of the network. // be returned which should be broadcasted to the rest of the network.
func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message { func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []networkMsg {
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6 // TODO(roasbeef) make height delta 6
// * or configurable // * or configurable
@ -1139,7 +1145,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
// Node announcement was successfully proceeded and know it // Node announcement was successfully proceeded and know it
// might be broadcast to other connected nodes. // might be broadcast to other connected nodes.
announcements = append(announcements, msg) announcements = append(announcements, networkMsg{
msg: msg,
peer: nMsg.peer,
})
nMsg.err <- nil nMsg.err <- nil
// TODO(roasbeef): get rid of the above // TODO(roasbeef): get rid of the above
@ -1326,13 +1335,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
// Now delete the premature ChannelUpdates, since we added them // Now delete the premature ChannelUpdates, since we added them
// all to the queue of network messages. // all to the queue of network messages.
delete(d.prematureChannelUpdates, shortChanID) delete(d.prematureChannelUpdates, shortChanID)
d.pChanUpdMtx.Unlock() d.pChanUpdMtx.Unlock()
// Launch a new goroutine to handle each ChannelUpdate, // Launch a new goroutine to handle each ChannelUpdate, this to
// this to ensure we don't block here, as we can handle // ensure we don't block here, as we can handle only one
// only one announcement at a time. // announcement at a time.
for _, cu := range channelUpdates { for _, cu := range channelUpdates {
go func(nMsg *networkMsg) { go func(nMsg *networkMsg) {
switch msg := nMsg.msg.(type) { switch msg := nMsg.msg.(type) {
@ -1358,8 +1366,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
return return
} }
// We don't expect any other message type // We don't expect any other message type than
// than ChannelUpdate to be in this map. // ChannelUpdate to be in this map.
default: default:
log.Errorf("Unsupported message type "+ log.Errorf("Unsupported message type "+
"found among ChannelUpdates: %T", msg) "found among ChannelUpdates: %T", msg)
@ -1371,7 +1379,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
// might be broadcast to other connected nodes if it was // might be broadcast to other connected nodes if it was
// announcement with proof (remote). // announcement with proof (remote).
if proof != nil { if proof != nil {
announcements = append(announcements, msg) announcements = append(announcements, networkMsg{
msg: msg,
peer: nMsg.peer,
})
} }
nMsg.err <- nil nMsg.err <- nil
@ -1532,7 +1543,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
// we'll only broadcast the channel update announcement if it // we'll only broadcast the channel update announcement if it
// has an attached authentication proof. // has an attached authentication proof.
if chanInfo.AuthProof != nil { if chanInfo.AuthProof != nil {
announcements = append(announcements, msg) announcements = append(announcements, networkMsg{
msg: msg,
peer: nMsg.peer,
})
} }
nMsg.err <- nil nMsg.err <- nil
@ -1774,12 +1788,21 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
// Assemble the necessary announcements to add to the next // Assemble the necessary announcements to add to the next
// broadcasting batch. // broadcasting batch.
announcements = append(announcements, chanAnn) announcements = append(announcements, networkMsg{
msg: chanAnn,
peer: nMsg.peer,
})
if e1Ann != nil { if e1Ann != nil {
announcements = append(announcements, e1Ann) announcements = append(announcements, networkMsg{
msg: e1Ann,
peer: nMsg.peer,
})
} }
if e2Ann != nil { if e2Ann != nil {
announcements = append(announcements, e2Ann) announcements = append(announcements, networkMsg{
msg: e2Ann,
peer: nMsg.peer,
})
} }
nMsg.err <- nil nMsg.err <- nil