discovery: create deDupedAnnouncements struct in gosspier.go

For Part 1 of Issue #275. Create isolated private struct in
networkHandler goroutine that will de-duplicate
announcements added to the batch. The struct contains maps
for each of channel announcements, channel updates, and
node announcements to keep track of unique announcements.

The struct has a Reset method to reset stored announcements, an
AddMsg(lnwire.Message) method to add a new message to the current
batch, and a Batch method to return the set of de-duplicated
announcements.

Also fix a few minor typos.
This commit is contained in:
Laura Cressman 2017-09-07 21:25:43 -04:00 committed by Olaoluwa Osuntokun
parent 1f95b660b9
commit 39d38da732
7 changed files with 173 additions and 70 deletions

@ -105,7 +105,7 @@ type Config struct {
}
// AuthenticatedGossiper is a subsystem which is responsible for receiving
// announcements validate them and apply the changes to router, syncing
// announcements, validating them and applying the changes to router, syncing
// lightning network with newly connected nodes, broadcasting announcements
// after validation, negotiating the channel announcement proofs exchange and
// handling the premature announcements. All outgoing announcements are
@ -383,6 +383,112 @@ func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
return nMsg.err
}
// ChannelUpdateID is a unique identifier for ChannelUpdate messages, as
// channel updates can be identified by the (ShortChannelID, Flags)
// tuple.
type ChannelUpdateID struct {
// channelID represents the set of data which is needed to
// retrieve all necessary data to validate the channel existence.
channelID lnwire.ShortChannelID
// Flags least-significant bit must be set to 0 if the creating node
// corresponds to the first node in the previously sent channel
// announcement and 1 otherwise.
Flags uint16
}
// deDupedAnnouncements de-duplicates announcements that have been
// added to the batch. Internally, announcements are stored in three maps
// (one each for channel announcements, channel updates, and node
// announcements). These maps keep track of unique announcements and
// ensure no announcements are duplicated.
type deDupedAnnouncements struct {
// channelAnnouncements are identified by the short channel id field.
channelAnnouncements map[lnwire.ShortChannelID]lnwire.Message
// channelUpdates are identified by the channel update id field.
channelUpdates map[ChannelUpdateID]lnwire.Message
// nodeAnnouncements are identified by node id field.
nodeAnnouncements map[*btcec.PublicKey]lnwire.Message
}
// Reset operates on deDupedAnnouncements to reset storage of announcements
func (d *deDupedAnnouncements) Reset() {
// Storage of each type of announcement (channel anouncements, channel
// updates, node announcements) is set to an empty map where the
// approprate key points to the corresponding lnwire.Message.
d.channelAnnouncements = make(map[lnwire.ShortChannelID]lnwire.Message)
d.channelUpdates = make(map[ChannelUpdateID]lnwire.Message)
d.nodeAnnouncements = make(map[*btcec.PublicKey]lnwire.Message)
}
// AddMsg adds a new message to the current batch.
func (d *deDupedAnnouncements) AddMsg(message lnwire.Message) {
// Depending on the message type (channel announcement, channel
// update, or node announcement), the message is added to the
// corresponding map in deDupedAnnouncements. Because each
// identifying key can have at most one value, the announcements
// are de-duplicated, with newer ones replacing older ones.
switch msg := message.(type) {
case *lnwire.ChannelAnnouncement:
// Channel announcements are identified by the short channel
// id field.
d.channelAnnouncements[msg.ShortChannelID] = msg
case *lnwire.ChannelUpdate:
// Channel updates are identified by the (short channel id,
// flags) tuple.
channelUpdateID := ChannelUpdateID{
msg.ShortChannelID,
msg.Flags,
}
d.channelUpdates[channelUpdateID] = msg
case *lnwire.NodeAnnouncement:
// Node announcements are identified by the node id field.
d.nodeAnnouncements[msg.NodeID] = msg
}
}
// AddMsgs is a helper method to add multiple messages to the
// announcement batch.
func (d *deDupedAnnouncements) AddMsgs(msgs []lnwire.Message) {
for _, msg := range msgs {
d.AddMsg(msg)
}
}
// Batch returns the set of de-duplicated announcements to be sent out
// during the next announcement epoch, in the order of channel announcements,
// channel updates, and node announcements.
func (d *deDupedAnnouncements) Batch() []lnwire.Message {
// Get the total number of announcements.
numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
len(d.nodeAnnouncements)
// Create an empty array of lnwire.Messages with a length equal to
// the total number of announcements.
announcements := make([]lnwire.Message, 0, numAnnouncements)
// Add the channel announcements to the array first.
for _, message := range d.channelAnnouncements {
announcements = append(announcements, message)
}
// Then add the channel updates.
for _, message := range d.channelUpdates {
announcements = append(announcements, message)
}
// Finally add the node announcements.
for _, message := range d.nodeAnnouncements {
announcements = append(announcements, message)
}
// Return the array of lnwire.messages.
return announcements
}
// networkHandler is the primary goroutine that drives this service. The roles
// of this goroutine includes answering queries related to the state of the
// network, syncing up newly connected peers, and also periodically
@ -393,12 +499,13 @@ func (d *AuthenticatedGossiper) networkHandler() {
defer d.wg.Done()
// TODO(roasbeef): changes for spec compliance
// * make into de-duplicated struct
// * always send chan ann -> node ann -> chan update
// * buffer recv'd node ann until after chan ann that includes is
// created
// * can use mostly empty struct in db as place holder
var announcementBatch []lnwire.Message
// Initialize empty deDupedAnnouncements to store announcement batch.
announcements := deDupedAnnouncements{}
announcements.Reset()
retransmitTimer := time.NewTicker(d.cfg.RetransmitDelay)
defer retransmitTimer.Stop()
@ -432,8 +539,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
// Finally, with the updates committed, we'll now add
// them to the announcement batch to be flushed at the
// start of the next epoch.
announcementBatch = append(announcementBatch,
newChanUpdates...)
announcements.AddMsgs(newChanUpdates)
feeUpdate.errResp <- nil
@ -451,10 +557,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
// broadcast once the trickle timer ticks gain.
if emittedAnnouncements != nil {
// TODO(roasbeef): exclude peer that sent
announcementBatch = append(
announcementBatch,
emittedAnnouncements...,
)
announcements.AddMsgs(emittedAnnouncements)
}
// A new block has arrived, so we can re-process the previously
@ -484,10 +587,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
for _, ann := range prematureAnns {
emittedAnnouncements := d.processNetworkAnnouncement(ann)
if emittedAnnouncements != nil {
announcementBatch = append(
announcementBatch,
emittedAnnouncements...,
)
announcements.AddMsgs(emittedAnnouncements)
}
}
delete(d.prematureAnnouncements, blockHeight)
@ -496,6 +596,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
// flush to the network the pending batch of new announcements
// we've received since the last trickle tick.
case <-trickleTimer.C:
// get the batch of announcements from deDupedAnnouncements
announcementBatch := announcements.Batch()
// If the current announcements batch is nil, then we
// have no further work here.
if len(announcementBatch) == 0 {
@ -517,7 +620,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
// If we're able to broadcast the current batch
// successfully, then we reset the batch for a new
// round of announcements.
announcementBatch = nil
announcements.Reset()
// The retransmission timer has ticked which indicates that we
// should check if we need to prune or re-broadcast any of our

@ -1,6 +1,6 @@
package lnwire
// ShortChannelID represent the set of data which is needed to retrieve all
// ShortChannelID represents the set of data which is needed to retrieve all
// necessary data to validate the channel existence.
type ShortChannelID struct {
// BlockHeight is the height of the block where funding transaction

@ -49,7 +49,7 @@ type missionControl struct {
// the time that it was added to the prune view. Vertexes are added to
// this map if a caller reports to missionControl a failure localized
// to that particular vertex.
failedVertexes map[vertex]time.Time
failedVertexes map[Vertex]time.Time
graph *channeldb.ChannelGraph
@ -71,7 +71,7 @@ func newMissionControl(g *channeldb.ChannelGraph,
return &missionControl{
failedEdges: make(map[uint64]time.Time),
failedVertexes: make(map[vertex]time.Time),
failedVertexes: make(map[Vertex]time.Time),
selfNode: selfNode,
graph: g,
}
@ -81,7 +81,7 @@ func newMissionControl(g *channeldb.ChannelGraph,
// reports a routing failure localized to the vertex. The time the vertex was
// added is noted, as it'll be pruned from the view after a period of
// vertexDecay.
func (m *missionControl) ReportVertexFailure(v vertex) {
func (m *missionControl) ReportVertexFailure(v Vertex) {
log.Debugf("Reporting vertex %v failure to Mission Control", v)
m.Lock()
@ -130,7 +130,7 @@ func (m *missionControl) RequestRoute(payment *LightningPayment,
// With the next candidate path found, we'll attempt to turn this into
// a route by applying the time-lock and fee requirements.
sourceVertex := newVertex(m.selfNode.PubKey)
sourceVertex := NewVertex(m.selfNode.PubKey)
route, err := newRoute(payment.Amount, sourceVertex, path, height,
finalCltvDelta)
if err != nil {
@ -157,7 +157,7 @@ func (m *missionControl) GraphPruneView() *graphPruneView {
// For each of the vertexes that have been added to the prune view, if
// it is now "stale", then we'll ignore it and avoid adding it to the
// view we'll return.
vertexes := make(map[vertex]struct{})
vertexes := make(map[Vertex]struct{})
for vertex, pruneTime := range m.failedVertexes {
if now.Sub(pruneTime) >= vertexDecay {
log.Tracef("Pruning decayed failure report for vertex %v "+
@ -204,7 +204,7 @@ func (m *missionControl) GraphPruneView() *graphPruneView {
type graphPruneView struct {
edges map[uint64]struct{}
vertexes map[vertex]struct{}
vertexes map[Vertex]struct{}
}
// ResetHistory resets the history of missionControl returning it to a state as
@ -212,6 +212,6 @@ type graphPruneView struct {
func (m *missionControl) ResetHistory() {
m.Lock()
m.failedEdges = make(map[uint64]time.Time)
m.failedVertexes = make(map[vertex]time.Time)
m.failedVertexes = make(map[Vertex]time.Time)
m.Unlock()
}

@ -449,7 +449,7 @@ func TestEdgeUpdateNotification(t *testing.T) {
// Create lookup map for notifications we are intending to receive. Entries
// are removed from the map when the anticipated notification is received.
var waitingFor = map[vertex]int{
var waitingFor = map[Vertex]int{
newVertex(node1.PubKey): 1,
newVertex(node2.PubKey): 2,
}
@ -608,7 +608,7 @@ func TestNodeUpdateNotification(t *testing.T) {
// Create lookup map for notifications we are intending to receive. Entries
// are removed from the map when the anticipated notification is received.
var waitingFor = map[vertex]int{
var waitingFor = map[Vertex]int{
newVertex(node1.PubKey): 1,
newVertex(node2.PubKey): 2,
}

@ -126,7 +126,7 @@ type Route struct {
// nodeIndex is a map that allows callers to quickly look up if a node
// is present in this computed route or not.
nodeIndex map[vertex]struct{}
nodeIndex map[Vertex]struct{}
// chanIndex is an index that allows callers to determine if a channel
// is present in this route or not. Channels are identified by the
@ -136,27 +136,27 @@ type Route struct {
// nextHop maps a node, to the next channel that it will pass the HTLC
// off to. With this map, we can easily look up the next outgoing
// channel or node for pruning purposes.
nextHopMap map[vertex]*ChannelHop
nextHopMap map[Vertex]*ChannelHop
}
// nextHopVertex returns the next hop (by vertex) after the target node. If the
// nextHopVertex returns the next hop (by Vertex) after the target node. If the
// target node is not found in the route, then false is returned.
func (r *Route) nextHopVertex(n *btcec.PublicKey) (vertex, bool) {
hop, ok := r.nextHopMap[newVertex(n)]
return newVertex(hop.Node.PubKey), ok
func (r *Route) nextHopVertex(n *btcec.PublicKey) (Vertex, bool) {
hop, ok := r.nextHopMap[NewVertex(n)]
return NewVertex(hop.Node.PubKey), ok
}
// nextHopChannel returns the uint64 channel ID of the next hop after the
// target node. If the target node is not foud in the route, then false is
// returned.
func (r *Route) nextHopChannel(n *btcec.PublicKey) (uint64, bool) {
hop, ok := r.nextHopMap[newVertex(n)]
hop, ok := r.nextHopMap[NewVertex(n)]
return hop.ChannelID, ok
}
// containsNode returns true if a node is present in the target route, and
// false otherwise.
func (r *Route) containsNode(v vertex) bool {
func (r *Route) containsNode(v Vertex) bool {
_, ok := r.nodeIndex[v]
return ok
}
@ -210,7 +210,7 @@ func (r *Route) ToHopPayloads() []sphinx.HopData {
//
// NOTE: The passed slice of ChannelHops MUST be sorted in forward order: from
// the source to the target node of the path finding attempt.
func newRoute(amtToSend lnwire.MilliSatoshi, sourceVertex vertex,
func newRoute(amtToSend lnwire.MilliSatoshi, sourceVertex Vertex,
pathEdges []*ChannelHop, currentHeight uint32,
finalCLTVDelta uint16) (*Route, error) {
@ -221,9 +221,9 @@ func newRoute(amtToSend lnwire.MilliSatoshi, sourceVertex vertex,
route := &Route{
Hops: make([]*Hop, len(pathEdges)),
TotalTimeLock: currentHeight,
nodeIndex: make(map[vertex]struct{}),
nodeIndex: make(map[Vertex]struct{}),
chanIndex: make(map[uint64]struct{}),
nextHopMap: make(map[vertex]*ChannelHop),
nextHopMap: make(map[Vertex]*ChannelHop),
}
// TODO(roasbeef): need to do sanity check to ensure we don't make a
@ -243,9 +243,9 @@ func newRoute(amtToSend lnwire.MilliSatoshi, sourceVertex vertex,
edge := pathEdges[i]
// First, we'll update both the node and channel index, to
// indicate that this vertex, and outgoing channel link are
// indicate that this Vertex, and outgoing channel link are
// present within this route.
v := newVertex(edge.Node.PubKey)
v := NewVertex(edge.Node.PubKey)
route.nodeIndex[v] = struct{}{}
route.chanIndex[edge.ChannelID] = struct{}{}
@ -349,20 +349,20 @@ func newRoute(amtToSend lnwire.MilliSatoshi, sourceVertex vertex,
return route, nil
}
// vertex is a simple alias for the serialization of a compressed Bitcoin
// Vertex is a simple alias for the serialization of a compressed Bitcoin
// public key.
type vertex [33]byte
type Vertex [33]byte
// newVertex returns a new vertex given a public key.
func newVertex(pub *btcec.PublicKey) vertex {
var v vertex
// NewVertex returns a new Vertex given a public key.
func NewVertex(pub *btcec.PublicKey) Vertex {
var v Vertex
copy(v[:], pub.SerializeCompressed())
return v
}
// String returns a human readable version of the vertex which is the
// String returns a human readable version of the Vertex which is the
// hex-encoding of the serialized compressed public key.
func (v vertex) String() string {
func (v Vertex) String() string {
return fmt.Sprintf("%x", v[:])
}
@ -393,7 +393,7 @@ func edgeWeight(e *channeldb.ChannelEdgePolicy) float64 {
// from the target to the source.
func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
sourceNode *channeldb.LightningNode, target *btcec.PublicKey,
ignoredNodes map[vertex]struct{}, ignoredEdges map[uint64]struct{},
ignoredNodes map[Vertex]struct{}, ignoredEdges map[uint64]struct{},
amt lnwire.MilliSatoshi) ([]*ChannelHop, error) {
var err error
@ -410,14 +410,14 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
// traversal.
var nodeHeap distanceHeap
// For each node/vertex the graph we create an entry in the distance
// For each node/Vertex the graph we create an entry in the distance
// map for the node set with a distance of "infinity". We also mark
// add the node to our set of unvisited nodes.
distance := make(map[vertex]nodeWithDist)
distance := make(map[Vertex]nodeWithDist)
if err := graph.ForEachNode(tx, func(_ *bolt.Tx, node *channeldb.LightningNode) error {
// TODO(roasbeef): with larger graph can just use disk seeks
// with a visited map
distance[newVertex(node.PubKey)] = nodeWithDist{
distance[NewVertex(node.PubKey)] = nodeWithDist{
dist: infinity,
node: node,
}
@ -432,7 +432,7 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
// To start, we add the source of our path finding attempt to the
// distance map with with a distance of 0. This indicates our starting
// point in the graph traversal.
sourceVertex := newVertex(sourceNode.PubKey)
sourceVertex := NewVertex(sourceNode.PubKey)
distance[sourceVertex] = nodeWithDist{
dist: 0,
node: sourceNode,
@ -443,8 +443,8 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
heap.Push(&nodeHeap, distance[sourceVertex])
// We'll use this map as a series of "previous" hop pointers. So to get
// to `vertex` we'll take the edge that it's mapped to within `prev`.
prev := make(map[vertex]edgeWithPrev)
// to `Vertex` we'll take the edge that it's mapped to within `prev`.
prev := make(map[Vertex]edgeWithPrev)
for nodeHeap.Len() != 0 {
// Fetch the node within the smallest distance from our source
// from the heap.
@ -461,16 +461,16 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
// Now that we've found the next potential step to take we'll
// examine all the outgoing edge (channels) from this node to
// further our graph traversal.
pivot := newVertex(bestNode.PubKey)
pivot := NewVertex(bestNode.PubKey)
err := bestNode.ForEachChannel(tx, func(tx *bolt.Tx,
edgeInfo *channeldb.ChannelEdgeInfo,
outEdge, inEdge *channeldb.ChannelEdgePolicy) error {
v := newVertex(outEdge.Node.PubKey)
v := NewVertex(outEdge.Node.PubKey)
// TODO(roasbeef): skip if chan disabled
// If this vertex or edge has been black listed, then
// If this Vertex or edge has been black listed, then
// we'll skip exploring this edge during this
// iteration.
if _, ok := ignoredNodes[v]; ok {
@ -527,7 +527,7 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
// If the target node isn't found in the prev hop map, then a path
// doesn't exist, so we terminate in an error.
if _, ok := prev[newVertex(target)]; !ok {
if _, ok := prev[NewVertex(target)]; !ok {
return nil, newErrf(ErrNoPathFound, "unable to find a path to "+
"destination")
}
@ -537,7 +537,7 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
// in the reverse direction which we'll use to properly calculate the
// timelock and fee values.
pathEdges := make([]*ChannelHop, 0, len(prev))
prevNode := newVertex(target)
prevNode := NewVertex(target)
for prevNode != sourceVertex { // TODO(roasbeef): assumes no cycles
// Add the current hop to the limit of path edges then walk
// backwards from this hop via the prev pointer for this hop
@ -545,7 +545,7 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
pathEdges = append(pathEdges, prev[prevNode].edge)
prev[prevNode].edge.Node.PubKey.Curve = nil
prevNode = newVertex(prev[prevNode].prevNode)
prevNode = NewVertex(prev[prevNode].prevNode)
}
// The route is invalid if it spans more than 20 hops. The current
@ -574,7 +574,7 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
// This function implements a modified version of Yen's. To find each path
// itself, we utilize our modified version of Dijkstra's found above. When
// examining possible spur and root paths, rather than removing edges or
// vertexes from the graph, we instead utilize a vertex+edge black-list that
// Vertexes from the graph, we instead utilize a Vertex+edge black-list that
// will be ignored by our modified Dijkstra's algorithm. With this approach, we
// make our inner path finding algorithm aware of our k-shortest paths
// algorithm, rather than attempting to use an unmodified path finding
@ -586,7 +586,7 @@ func findPaths(tx *bolt.Tx, graph *channeldb.ChannelGraph,
// TODO(roasbeef): take in db tx
ignoredEdges := make(map[uint64]struct{})
ignoredVertexes := make(map[vertex]struct{})
ignoredVertexes := make(map[Vertex]struct{})
// TODO(roasbeef): modifying ordering within heap to eliminate final
// sorting step?
@ -629,12 +629,12 @@ func findPaths(tx *bolt.Tx, graph *channeldb.ChannelGraph,
// path in order to find path deviations from each node in the
// path.
for i := 0; i < len(prevShortest)-1; i++ {
// These two maps will mark the edges and vertexes
// These two maps will mark the edges and Vertexes
// we'll exclude from the next path finding attempt.
// These are required to ensure the paths are unique
// and loopless.
ignoredEdges = make(map[uint64]struct{})
ignoredVertexes = make(map[vertex]struct{})
ignoredVertexes = make(map[Vertex]struct{})
// Our spur node is the i-th node in the prior shortest
// path, and our root path will be all nodes in the
@ -663,11 +663,11 @@ func findPaths(tx *bolt.Tx, graph *channeldb.ChannelGraph,
continue
}
ignoredVertexes[newVertex(node)] = struct{}{}
ignoredVertexes[NewVertex(node)] = struct{}{}
}
// With the edges that are part of our root path, and
// the vertexes (other than the spur path) within the
// the Vertexes (other than the spur path) within the
// root path removed, we'll attempt to find another
// shortest path from the spur node to the destination.
spurPath, err := findPath(tx, graph, spurNode, target,

@ -303,7 +303,7 @@ func TestBasicGraphPathFinding(t *testing.T) {
sourceVertex := newVertex(sourceNode.PubKey)
ignoredEdges := make(map[uint64]struct{})
ignoredVertexes := make(map[vertex]struct{})
ignoredVertexes := make(map[Vertex]struct{})
// With the test graph loaded, we'll test some basic path finding using
// the pre-generated graph. Consult the testdata/basic_graph.json file
@ -527,7 +527,7 @@ func TestNewRoutePathTooLong(t *testing.T) {
}
ignoredEdges := make(map[uint64]struct{})
ignoredVertexes := make(map[vertex]struct{})
ignoredVertexes := make(map[Vertex]struct{})
paymentAmt := lnwire.NewMSatFromSatoshis(100)
@ -568,7 +568,7 @@ func TestPathNotAvailable(t *testing.T) {
}
ignoredEdges := make(map[uint64]struct{})
ignoredVertexes := make(map[vertex]struct{})
ignoredVertexes := make(map[Vertex]struct{})
// With the test graph loaded, we'll test that queries for target that
// are either unreachable within the graph, or unknown result in an
@ -604,7 +604,7 @@ func TestPathInsufficientCapacity(t *testing.T) {
t.Fatalf("unable to fetch source node: %v", err)
}
ignoredEdges := make(map[uint64]struct{})
ignoredVertexes := make(map[vertex]struct{})
ignoredVertexes := make(map[Vertex]struct{})
// Next, test that attempting to find a path in which the current
// channel graph cannot support due to insufficient capacity triggers

@ -1009,7 +1009,7 @@ type routingMsg struct {
// pruneNodeFromRoutes accepts set of routes, and returns a new set of routes
// with the target node filtered out.
func pruneNodeFromRoutes(routes []*Route, skipNode vertex) []*Route {
func pruneNodeFromRoutes(routes []*Route, skipNode Vertex) []*Route {
// TODO(roasbeef): pass in slice index?
@ -1130,7 +1130,7 @@ func (r *ChannelRouter) FindRoutes(target *btcec.PublicKey,
// aren't able to support the total satoshis flow once fees have been
// factored in.
validRoutes := make([]*Route, 0, len(shortestPaths))
sourceVertex := newVertex(r.selfNode.PubKey)
sourceVertex := NewVertex(r.selfNode.PubKey)
for _, path := range shortestPaths {
// Attempt to make the path into a route. We snip off the first
// hop in the path as it contains a "self-hop" that is inserted