routing: add support for detection+processing of premature announcements

This commit fixes a prior block propagation race-condition by detecting
and properly processing “premature” announcements. A premature
announcement is one that’s received with an anchored block height which
is beyond our chain tip. Once received, we now store these
announcements in a special map that’s caches them in memory. Once a new
block arrives, we check the map for the existence of any entries,
processing them as normal if so.
This commit is contained in:
Olaoluwa Osuntokun 2017-02-02 17:44:13 -08:00
parent 2c0d5e0f0d
commit 9cef2f8657
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

@ -96,8 +96,13 @@ type Config struct {
type ChannelRouter struct { type ChannelRouter struct {
sync.RWMutex sync.RWMutex
// cfg is a copy of the configuration struct that the ChannelRouter was
// initialized with.
cfg *Config cfg *Config
// selfNode is the center of the star-graph centered around the
// ChannelRouter. The ChannelRouter uses this node as a starting point
// when doing any path finding.
selfNode *channeldb.LightningNode selfNode *channeldb.LightningNode
// TODO(roasbeef): make LRU, invalidate upon new block connect // TODO(roasbeef): make LRU, invalidate upon new block connect
@ -105,12 +110,31 @@ type ChannelRouter struct {
nodeCache map[[33]byte]*channeldb.LightningNode nodeCache map[[33]byte]*channeldb.LightningNode
edgeCache map[wire.OutPoint]*channeldb.ChannelEdge edgeCache map[wire.OutPoint]*channeldb.ChannelEdge
// newBlocks is a channel in which new blocks connected to the end of
// the main chain are sent over.
newBlocks chan *chainntnfs.BlockEpoch newBlocks chan *chainntnfs.BlockEpoch
// networkMsgs is a channel that carries new network messages from
// outside the ChannelRouter to be processed by the networkHandler.
networkMsgs chan *routingMsg networkMsgs chan *routingMsg
// syncRequests is a channel that carries requests to synchronize newly
// connected peers to the state of the channel graph from our PoV.
syncRequests chan *syncRequest syncRequests chan *syncRequest
// prematureAnnouncements maps a blockheight to a set of announcements
// which are "premature" from our PoV. An announcement is premature if
// it claims to be anchored in a block which is beyond the current
// mainchain tip as we know it. Premature announcements will be
// processed once the chain tip as we know it extends to/past the
// premature height.
// TODO(roasbeef): limit premature announcements to N
prematureAnnouncements map[uint32][]lnwire.Message
// bestHeight is the height of the block at the tip of the main chain
// as we know it.
bestHeight uint32
fakeSig *btcec.Signature fakeSig *btcec.Signature
started uint32 started uint32
@ -145,12 +169,13 @@ func New(cfg Config) (*ChannelRouter, error) {
} }
return &ChannelRouter{ return &ChannelRouter{
cfg: &cfg, cfg: &cfg,
selfNode: selfNode, selfNode: selfNode,
fakeSig: fakeSig, fakeSig: fakeSig,
networkMsgs: make(chan *routingMsg), networkMsgs: make(chan *routingMsg),
syncRequests: make(chan *syncRequest), syncRequests: make(chan *syncRequest),
quit: make(chan struct{}), prematureAnnouncements: make(map[uint32][]lnwire.Message),
quit: make(chan struct{}),
}, nil }, nil
} }
@ -174,6 +199,12 @@ func (r *ChannelRouter) Start() error {
} }
r.newBlocks = blockEpochs.Epochs r.newBlocks = blockEpochs.Epochs
_, height, err := r.cfg.Chain.GetBestBlock()
if err != nil {
return err
}
r.bestHeight = uint32(height)
// Before we begin normal operation of the router, we first need to // Before we begin normal operation of the router, we first need to
// synchronize the channel graph to the latest state of the UTXO set. // synchronize the channel graph to the latest state of the UTXO set.
if err := r.syncGraphWithChain(); err != nil { if err := r.syncGraphWithChain(); err != nil {
@ -304,6 +335,7 @@ func (r *ChannelRouter) networkHandler() {
var announcementBatch []lnwire.Message var announcementBatch []lnwire.Message
// TODO(roasbeef): parametrize the above
trickleTimer := time.NewTicker(time.Millisecond * 300) trickleTimer := time.NewTicker(time.Millisecond * 300)
defer trickleTimer.Stop() defer trickleTimer.Stop()
@ -316,11 +348,8 @@ func (r *ChannelRouter) networkHandler() {
// result we'll modify the channel graph accordingly depending // result we'll modify the channel graph accordingly depending
// on the exact type of the message. // on the exact type of the message.
case netMsg := <-r.networkMsgs: case netMsg := <-r.networkMsgs:
// TODO(roasbeef): this loop would mostly be moved to // Process the network announcement to determine if
// the discovery service // this is either a new announcement from our PoV or an
// Process the network announcement to determine if this
// is either a new announcement from our PoV or an
// update to a prior vertex/edge we previously // update to a prior vertex/edge we previously
// accepted. // accepted.
accepted := r.processNetworkAnnouncement(netMsg.msg) accepted := r.processNetworkAnnouncement(netMsg.msg)
@ -345,15 +374,34 @@ func (r *ChannelRouter) networkHandler() {
return return
} }
// Once a new block arrives, we update our running
// track of the height of the chain tip.
blockHeight := uint32(newBlock.Height)
r.bestHeight = blockHeight
// Next we check if we have any premature announcements
// for this height, if so, then we process them once
// more as normal announcements.
prematureAnns := r.prematureAnnouncements[uint32(newBlock.Height)]
log.Infof("Re-processing %v premature announcements for "+
"height %v", len(prematureAnns), blockHeight)
for _, ann := range prematureAnns {
if ok := r.processNetworkAnnouncement(ann); ok {
announcementBatch = append(announcementBatch, ann)
}
}
delete(r.prematureAnnouncements, blockHeight)
log.Infof("Pruning channel graph using block %v (height=%v)",
newBlock.Hash, blockHeight)
block, err := r.cfg.Chain.GetBlock(newBlock.Hash) block, err := r.cfg.Chain.GetBlock(newBlock.Hash)
if err != nil { if err != nil {
log.Errorf("unable to get block: %v", err) log.Errorf("unable to get block: %v", err)
continue continue
} }
log.Infof("Pruning channel graph using block %v (height=%v)",
newBlock.Hash, newBlock.Height)
// We're only interested in all prior outputs that've // We're only interested in all prior outputs that've
// been spent in the block, so collate all the // been spent in the block, so collate all the
// referenced previous outpoints within each tx and // referenced previous outpoints within each tx and
@ -371,14 +419,14 @@ func (r *ChannelRouter) networkHandler() {
// of the block being pruned so the prune tip can be // of the block being pruned so the prune tip can be
// updated. // updated.
numClosed, err := r.cfg.Graph.PruneGraph(spentOutputs, numClosed, err := r.cfg.Graph.PruneGraph(spentOutputs,
newBlock.Hash, uint32(newBlock.Height)) newBlock.Hash, blockHeight)
if err != nil { if err != nil {
log.Errorf("unable to prune routing table: %v", err) log.Errorf("unable to prune routing table: %v", err)
continue continue
} }
log.Infof("Block %v (height=%v) closed %v channels", log.Infof("Block %v (height=%v) closed %v channels",
newBlock.Hash, newBlock.Height, numClosed) newBlock.Hash, blockHeight, numClosed)
// The retransmission timer has ticked which indicates that we // The retransmission timer has ticked which indicates that we
// should broadcast our personal channel sot the network. This // should broadcast our personal channel sot the network. This
@ -481,9 +529,13 @@ func (r *ChannelRouter) networkHandler() {
// channel or node announcement. If the update didn't affect the internal state // channel or node announcement. If the update didn't affect the internal state
// of the draft due to either being out of date, invalid, or redundant, then // of the draft due to either being out of date, invalid, or redundant, then
// false is returned. Otherwise, true is returned indicating that the caller // false is returned. Otherwise, true is returned indicating that the caller
// may want to batch this request to be broadcast to immediate peers during th // may want to batch this request to be broadcast to immediate peers during the
// next announcement epoch. // next announcement epoch.
func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool { func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool {
isPremature := func(chanID *lnwire.ChannelID) bool {
return chanID.BlockHeight > r.bestHeight
}
switch msg := msg.(type) { switch msg := msg.(type) {
// A new node announcement has arrived which either presents a new // A new node announcement has arrived which either presents a new
@ -545,10 +597,22 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool {
return false return false
} }
// TODO(roasbeef): check if height > then our known height and // If the advertised inclusionary block is beyond our knowledge
// wait for next epoch based on that? // of the chain tip, then we'll put the announcement in limbo
// * or add to "premature" announcement bucket // to be fully verified once we advance forward in the chain.
// * bucket gets checked on each new incoming block if isPremature(&msg.ChannelID) {
blockHeight := msg.ChannelID.BlockHeight
log.Infof("Announcement for chan_id=(%v), is "+
"premature: advertises height %v, only height "+
"%v is known", channelID,
msg.ChannelID.BlockHeight, r.bestHeight)
r.prematureAnnouncements[blockHeight] = append(
r.prematureAnnouncements[blockHeight],
msg,
)
return false
}
// Before we can add the channel to the channel graph, we need // Before we can add the channel to the channel graph, we need
// to obtain the full funding outpoint that's encoded within // to obtain the full funding outpoint that's encoded within
@ -597,6 +661,23 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool {
return false return false
} }
// If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain.
if isPremature(&msg.ChannelID) {
blockHeight := msg.ChannelID.BlockHeight
log.Infof("Update announcement for chan_id=(%v), is "+
"premature: advertises height %v, only height "+
"%v is known", chanID, blockHeight,
r.bestHeight)
r.prematureAnnouncements[blockHeight] = append(
r.prematureAnnouncements[blockHeight],
msg,
)
return false
}
// As edges are directional edge node has a unique policy for // As edges are directional edge node has a unique policy for
// the direction of the edge they control. Therefore we first // the direction of the edge they control. Therefore we first
// check if we already have the most up to date information for // check if we already have the most up to date information for