diff --git a/routing/router.go b/routing/router.go index c6f9be39..bd74c6b4 100644 --- a/routing/router.go +++ b/routing/router.go @@ -96,8 +96,13 @@ type Config struct { type ChannelRouter struct { sync.RWMutex + // cfg is a copy of the configuration struct that the ChannelRouter was + // initialized with. cfg *Config + // selfNode is the center of the star-graph centered around the + // ChannelRouter. The ChannelRouter uses this node as a starting point + // when doing any path finding. selfNode *channeldb.LightningNode // TODO(roasbeef): make LRU, invalidate upon new block connect @@ -105,12 +110,31 @@ type ChannelRouter struct { nodeCache map[[33]byte]*channeldb.LightningNode edgeCache map[wire.OutPoint]*channeldb.ChannelEdge + // newBlocks is a channel in which new blocks connected to the end of + // the main chain are sent over. newBlocks chan *chainntnfs.BlockEpoch + // networkMsgs is a channel that carries new network messages from + // outside the ChannelRouter to be processed by the networkHandler. networkMsgs chan *routingMsg + // syncRequests is a channel that carries requests to synchronize newly + // connected peers to the state of the channel graph from our PoV. syncRequests chan *syncRequest + // prematureAnnouncements maps a blockheight to a set of announcements + // which are "premature" from our PoV. An announcement is premature if + // it claims to be anchored in a block which is beyond the current + // mainchain tip as we know it. Premature announcements will be + // processed once the chain tip as we know it extends to/past the + // premature height. + // TODO(roasbeef): limit premature announcements to N + prematureAnnouncements map[uint32][]lnwire.Message + + // bestHeight is the height of the block at the tip of the main chain + // as we know it. + bestHeight uint32 + fakeSig *btcec.Signature started uint32 @@ -145,12 +169,13 @@ func New(cfg Config) (*ChannelRouter, error) { } return &ChannelRouter{ - cfg: &cfg, - selfNode: selfNode, - fakeSig: fakeSig, - networkMsgs: make(chan *routingMsg), - syncRequests: make(chan *syncRequest), - quit: make(chan struct{}), + cfg: &cfg, + selfNode: selfNode, + fakeSig: fakeSig, + networkMsgs: make(chan *routingMsg), + syncRequests: make(chan *syncRequest), + prematureAnnouncements: make(map[uint32][]lnwire.Message), + quit: make(chan struct{}), }, nil } @@ -174,6 +199,12 @@ func (r *ChannelRouter) Start() error { } r.newBlocks = blockEpochs.Epochs + _, height, err := r.cfg.Chain.GetBestBlock() + if err != nil { + return err + } + r.bestHeight = uint32(height) + // Before we begin normal operation of the router, we first need to // synchronize the channel graph to the latest state of the UTXO set. if err := r.syncGraphWithChain(); err != nil { @@ -304,6 +335,7 @@ func (r *ChannelRouter) networkHandler() { var announcementBatch []lnwire.Message + // TODO(roasbeef): parametrize the above trickleTimer := time.NewTicker(time.Millisecond * 300) defer trickleTimer.Stop() @@ -316,11 +348,8 @@ func (r *ChannelRouter) networkHandler() { // result we'll modify the channel graph accordingly depending // on the exact type of the message. case netMsg := <-r.networkMsgs: - // TODO(roasbeef): this loop would mostly be moved to - // the discovery service - - // Process the network announcement to determine if this - // is either a new announcement from our PoV or an + // Process the network announcement to determine if + // this is either a new announcement from our PoV or an // update to a prior vertex/edge we previously // accepted. accepted := r.processNetworkAnnouncement(netMsg.msg) @@ -345,15 +374,34 @@ func (r *ChannelRouter) networkHandler() { return } + // Once a new block arrives, we update our running + // track of the height of the chain tip. + blockHeight := uint32(newBlock.Height) + r.bestHeight = blockHeight + + // Next we check if we have any premature announcements + // for this height, if so, then we process them once + // more as normal announcements. + prematureAnns := r.prematureAnnouncements[uint32(newBlock.Height)] + log.Infof("Re-processing %v premature announcements for "+ + "height %v", len(prematureAnns), blockHeight) + + for _, ann := range prematureAnns { + if ok := r.processNetworkAnnouncement(ann); ok { + announcementBatch = append(announcementBatch, ann) + } + } + delete(r.prematureAnnouncements, blockHeight) + + log.Infof("Pruning channel graph using block %v (height=%v)", + newBlock.Hash, blockHeight) + block, err := r.cfg.Chain.GetBlock(newBlock.Hash) if err != nil { log.Errorf("unable to get block: %v", err) continue } - log.Infof("Pruning channel graph using block %v (height=%v)", - newBlock.Hash, newBlock.Height) - // We're only interested in all prior outputs that've // been spent in the block, so collate all the // referenced previous outpoints within each tx and @@ -371,14 +419,14 @@ func (r *ChannelRouter) networkHandler() { // of the block being pruned so the prune tip can be // updated. numClosed, err := r.cfg.Graph.PruneGraph(spentOutputs, - newBlock.Hash, uint32(newBlock.Height)) + newBlock.Hash, blockHeight) if err != nil { log.Errorf("unable to prune routing table: %v", err) continue } log.Infof("Block %v (height=%v) closed %v channels", - newBlock.Hash, newBlock.Height, numClosed) + newBlock.Hash, blockHeight, numClosed) // The retransmission timer has ticked which indicates that we // should broadcast our personal channel sot the network. This @@ -481,9 +529,13 @@ func (r *ChannelRouter) networkHandler() { // channel or node announcement. If the update didn't affect the internal state // of the draft due to either being out of date, invalid, or redundant, then // false is returned. Otherwise, true is returned indicating that the caller -// may want to batch this request to be broadcast to immediate peers during th +// may want to batch this request to be broadcast to immediate peers during the // next announcement epoch. func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool { + isPremature := func(chanID *lnwire.ChannelID) bool { + return chanID.BlockHeight > r.bestHeight + } + switch msg := msg.(type) { // A new node announcement has arrived which either presents a new @@ -545,10 +597,22 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool { return false } - // TODO(roasbeef): check if height > then our known height and - // wait for next epoch based on that? - // * or add to "premature" announcement bucket - // * bucket gets checked on each new incoming block + // If the advertised inclusionary block is beyond our knowledge + // of the chain tip, then we'll put the announcement in limbo + // to be fully verified once we advance forward in the chain. + if isPremature(&msg.ChannelID) { + blockHeight := msg.ChannelID.BlockHeight + log.Infof("Announcement for chan_id=(%v), is "+ + "premature: advertises height %v, only height "+ + "%v is known", channelID, + msg.ChannelID.BlockHeight, r.bestHeight) + + r.prematureAnnouncements[blockHeight] = append( + r.prematureAnnouncements[blockHeight], + msg, + ) + return false + } // Before we can add the channel to the channel graph, we need // to obtain the full funding outpoint that's encoded within @@ -597,6 +661,23 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool { return false } + // If the advertised inclusionary block is beyond our knowledge + // of the chain tip, then we'll put the announcement in limbo + // to be fully verified once we advance forward in the chain. + if isPremature(&msg.ChannelID) { + blockHeight := msg.ChannelID.BlockHeight + log.Infof("Update announcement for chan_id=(%v), is "+ + "premature: advertises height %v, only height "+ + "%v is known", chanID, blockHeight, + r.bestHeight) + + r.prematureAnnouncements[blockHeight] = append( + r.prematureAnnouncements[blockHeight], + msg, + ) + return false + } + // As edges are directional edge node has a unique policy for // the direction of the edge they control. Therefore we first // check if we already have the most up to date information for