discovery: modify deDupedAnnouncements to keep track of senders for each msg

In this commit, we modify the deDupedAnnouncements struct slightly. The
element of this struct will now keep track of the set of senders that
sent a particular message. Each time a message is added, we’ll replace
the new message with the old (as normal), but we’ll also add the new
sender to the set of known senders.

With this new feature, we’ll be able to avoid re-sending a message to
the peer that sent it to us in the first place.
This commit is contained in:
Olaoluwa Osuntokun 2017-12-26 16:18:56 +01:00
parent 7e54b4ae46
commit f1b40e0b4d
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -433,6 +433,18 @@ type channelUpdateID struct {
flags lnwire.ChanUpdateFlag flags lnwire.ChanUpdateFlag
} }
// msgWithSenders is a wrapper struct around a message, and the set of peers
// that oreignally sent ius this message. Using this struct, we can ensure that
// we don't re-send a message to the peer that sent it to us in the first
// place.
type msgWithSenders struct {
// msg is the wire message itself.
msg lnwire.Message
// sender is the set of peers that sent us this message.
senders map[routing.Vertex]struct{}
}
// deDupedAnnouncements de-duplicates announcements that have been added to the // deDupedAnnouncements de-duplicates announcements that have been added to the
// batch. Internally, announcements are stored in three maps // batch. Internally, announcements are stored in three maps
// (one each for channel announcements, channel updates, and node // (one each for channel announcements, channel updates, and node
@ -440,13 +452,13 @@ type channelUpdateID struct {
// announcements are duplicated. // announcements are duplicated.
type deDupedAnnouncements struct { type deDupedAnnouncements struct {
// channelAnnouncements are identified by the short channel id field. // channelAnnouncements are identified by the short channel id field.
channelAnnouncements map[lnwire.ShortChannelID]lnwire.Message channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
// channelUpdates are identified by the channel update id field. // channelUpdates are identified by the channel update id field.
channelUpdates map[channelUpdateID]lnwire.Message channelUpdates map[channelUpdateID]msgWithSenders
// nodeAnnouncements are identified by the Vertex field. // nodeAnnouncements are identified by the Vertex field.
nodeAnnouncements map[routing.Vertex]lnwire.Message nodeAnnouncements map[routing.Vertex]msgWithSenders
sync.Mutex sync.Mutex
} }
@ -466,45 +478,99 @@ func (d *deDupedAnnouncements) reset() {
// Storage of each type of announcement (channel anouncements, channel // Storage of each type of announcement (channel anouncements, channel
// updates, node announcements) is set to an empty map where the // updates, node announcements) is set to an empty map where the
// appropriate key points to the corresponding lnwire.Message. // appropriate key points to the corresponding lnwire.Message.
d.channelAnnouncements = make(map[lnwire.ShortChannelID]lnwire.Message) d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
d.channelUpdates = make(map[channelUpdateID]lnwire.Message) d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
d.nodeAnnouncements = make(map[routing.Vertex]lnwire.Message) d.nodeAnnouncements = make(map[routing.Vertex]msgWithSenders)
} }
// addMsg adds a new message to the current batch. // addMsg adds a new message to the current batch. If the message is already
func (d *deDupedAnnouncements) addMsg(message lnwire.Message) { // persent in the current batch, then this new instance replaces the latter,
// and the set of senders is updated to reflect which node sent us this
// message.
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
// Depending on the message type (channel announcement, channel update, // Depending on the message type (channel announcement, channel update,
// or node announcement), the message is added to the corresponding map // or node announcement), the message is added to the corresponding map
// in deDupedAnnouncements. Because each identifying key can have at // in deDupedAnnouncements. Because each identifying key can have at
// most one value, the announcements are de-duplicated, with newer ones // most one value, the announcements are de-duplicated, with newer ones
// replacing older ones. // replacing older ones.
switch msg := message.(type) { switch msg := message.msg.(type) {
// Channel announcements are identified by the short channel id field. // Channel announcements are identified by the short channel id field.
case *lnwire.ChannelAnnouncement: case *lnwire.ChannelAnnouncement:
d.channelAnnouncements[msg.ShortChannelID] = msg deDupKey := msg.ShortChannelID
sender := routing.NewVertex(message.peer)
mws, ok := d.channelAnnouncements[deDupKey]
if !ok {
mws = msgWithSenders{
msg: msg,
senders: make(map[routing.Vertex]struct{}),
}
mws.senders[sender] = struct{}{}
d.channelAnnouncements[deDupKey] = mws
return
}
mws.msg = msg
mws.senders[sender] = struct{}{}
d.channelAnnouncements[deDupKey] = mws
// Channel updates are identified by the (short channel id, flags) // Channel updates are identified by the (short channel id, flags)
// tuple. // tuple.
case *lnwire.ChannelUpdate: case *lnwire.ChannelUpdate:
channelUpdateID := channelUpdateID{ sender := routing.NewVertex(message.peer)
deDupKey := channelUpdateID{
msg.ShortChannelID, msg.ShortChannelID,
msg.Flags, msg.Flags,
} }
d.channelUpdates[channelUpdateID] = msg mws, ok := d.channelUpdates[deDupKey]
if !ok {
mws = msgWithSenders{
msg: msg,
senders: make(map[routing.Vertex]struct{}),
}
mws.senders[sender] = struct{}{}
d.channelUpdates[deDupKey] = mws
return
}
mws.msg = msg
mws.senders[sender] = struct{}{}
d.channelUpdates[deDupKey] = mws
// Node announcements are identified by the Vertex field. Use the // Node announcements are identified by the Vertex field. Use the
// NodeID to create the corresponding Vertex. // NodeID to create the corresponding Vertex.
case *lnwire.NodeAnnouncement: case *lnwire.NodeAnnouncement:
vertex := routing.NewVertex(msg.NodeID) sender := routing.NewVertex(message.peer)
d.nodeAnnouncements[vertex] = msg deDupKey := routing.NewVertex(msg.NodeID)
mws, ok := d.nodeAnnouncements[deDupKey]
if !ok {
mws = msgWithSenders{
msg: msg,
senders: make(map[routing.Vertex]struct{}),
}
mws.senders[sender] = struct{}{}
d.nodeAnnouncements[deDupKey] = mws
return
}
mws.msg = msg
mws.senders[sender] = struct{}{}
d.nodeAnnouncements[deDupKey] = mws
} }
} }
// AddMsgs is a helper method to add multiple messages to the announcement // AddMsgs is a helper method to add multiple messages to the announcement
// batch. // batch.
func (d *deDupedAnnouncements) AddMsgs(msgs ...lnwire.Message) { func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
@ -515,9 +581,11 @@ func (d *deDupedAnnouncements) AddMsgs(msgs ...lnwire.Message) {
// Emit returns the set of de-duplicated announcements to be sent out during // Emit returns the set of de-duplicated announcements to be sent out during
// the next announcement epoch, in the order of channel announcements, channel // the next announcement epoch, in the order of channel announcements, channel
// updates, and node announcements. Additionally, the set of stored messages // updates, and node announcements. Each message emitted, contains the set of
// are reset. // peers that sent us the message. This way, we can ensure that we don't waste
func (d *deDupedAnnouncements) Emit() []lnwire.Message { // bandwidth by re-sending a message to the peer that sent it to us in the
// first place. Additionally, the set of stored messages are reset.
func (d *deDupedAnnouncements) Emit() []msgWithSenders {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
@ -527,27 +595,27 @@ func (d *deDupedAnnouncements) Emit() []lnwire.Message {
// Create an empty array of lnwire.Messages with a length equal to // Create an empty array of lnwire.Messages with a length equal to
// the total number of announcements. // the total number of announcements.
announcements := make([]lnwire.Message, 0, numAnnouncements) msgs := make([]msgWithSenders, 0, numAnnouncements)
// Add the channel announcements to the array first. // Add the channel announcements to the array first.
for _, message := range d.channelAnnouncements { for _, message := range d.channelAnnouncements {
announcements = append(announcements, message) msgs = append(msgs, message)
} }
// Then add the channel updates. // Then add the channel updates.
for _, message := range d.channelUpdates { for _, message := range d.channelUpdates {
announcements = append(announcements, message) msgs = append(msgs, message)
} }
// Finally add the node announcements. // Finally add the node announcements.
for _, message := range d.nodeAnnouncements { for _, message := range d.nodeAnnouncements {
announcements = append(announcements, message) msgs = append(msgs, message)
} }
d.reset() d.reset()
// Return the array of lnwire.messages. // Return the array of lnwire.messages.
return announcements return msgs
} }
// resendAnnounceSignatures will inspect the messageStore database // resendAnnounceSignatures will inspect the messageStore database