Merge pull request #4352 from matheusdtech/discovery-lock-premature

discovery: correctly lock premature messages
This commit is contained in:
Conner Fromknecht 2020-06-26 22:50:09 -07:00 committed by GitHub
commit cff52f7622
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"runtime" "runtime"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
@ -247,7 +246,8 @@ type AuthenticatedGossiper struct {
stopped sync.Once stopped sync.Once
// bestHeight is the height of the block at the tip of the main chain // bestHeight is the height of the block at the tip of the main chain
// as we know it. To be used atomically. // as we know it. Accesses *MUST* be done with the gossiper's lock
// held.
bestHeight uint32 bestHeight uint32
quit chan struct{} quit chan struct{}
@ -1030,28 +1030,30 @@ func (d *AuthenticatedGossiper) networkHandler() {
return return
} }
// Once a new block arrives, we updates our running // Once a new block arrives, we update our running
// track of the height of the chain tip. // track of the height of the chain tip.
d.Lock()
blockHeight := uint32(newBlock.Height) blockHeight := uint32(newBlock.Height)
atomic.StoreUint32(&d.bestHeight, blockHeight) d.bestHeight = blockHeight
log.Debugf("New block: height=%d, hash=%s", blockHeight,
newBlock.Hash)
// Next we check if we have any premature announcements // Next we check if we have any premature announcements
// for this height, if so, then we process them once // for this height, if so, then we process them once
// more as normal announcements. // more as normal announcements.
d.Lock() premature := d.prematureAnnouncements[blockHeight]
numPremature := len(d.prematureAnnouncements[blockHeight]) if len(premature) == 0 {
d.Unlock() d.Unlock()
// Return early if no announcement to process.
if numPremature == 0 {
continue continue
} }
delete(d.prematureAnnouncements, blockHeight)
d.Unlock()
log.Infof("Re-processing %v premature announcements "+ log.Infof("Re-processing %v premature announcements "+
"for height %v", numPremature, blockHeight) "for height %v", len(premature), blockHeight)
d.Lock() for _, ann := range premature {
for _, ann := range d.prematureAnnouncements[blockHeight] {
emittedAnnouncements := d.processNetworkAnnouncement(ann) emittedAnnouncements := d.processNetworkAnnouncement(ann)
if emittedAnnouncements != nil { if emittedAnnouncements != nil {
announcements.AddMsgs( announcements.AddMsgs(
@ -1059,8 +1061,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
) )
} }
} }
delete(d.prematureAnnouncements, blockHeight)
d.Unlock()
// The trickle timer has ticked, which indicates we should // The trickle timer has ticked, which indicates we should
// flush to the network the pending batch of new announcements // flush to the network the pending batch of new announcements
@ -1491,11 +1491,11 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error {
func (d *AuthenticatedGossiper) processNetworkAnnouncement( func (d *AuthenticatedGossiper) processNetworkAnnouncement(
nMsg *networkMsg) []networkMsg { nMsg *networkMsg) []networkMsg {
// isPremature *MUST* be called with the gossiper's lock held.
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6 // TODO(roasbeef) make height delta 6
// * or configurable // * or configurable
bestHeight := atomic.LoadUint32(&d.bestHeight) return chanID.BlockHeight+delta > d.bestHeight
return chanID.BlockHeight+delta > bestHeight
} }
var announcements []networkMsg var announcements []networkMsg
@ -1581,6 +1581,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// If the advertised inclusionary block is beyond our knowledge // If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll put the announcement in limbo // of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain. // to be fully verified once we advance forward in the chain.
d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
blockHeight := msg.ShortChannelID.BlockHeight blockHeight := msg.ShortChannelID.BlockHeight
log.Infof("Announcement for chan_id=(%v), is "+ log.Infof("Announcement for chan_id=(%v), is "+
@ -1588,9 +1589,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
"height %v is known", "height %v is known",
msg.ShortChannelID.ToUint64(), msg.ShortChannelID.ToUint64(),
msg.ShortChannelID.BlockHeight, msg.ShortChannelID.BlockHeight,
atomic.LoadUint32(&d.bestHeight)) d.bestHeight)
d.Lock()
d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight], d.prematureAnnouncements[blockHeight],
nMsg, nMsg,
@ -1598,6 +1598,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.Unlock() d.Unlock()
return nil return nil
} }
d.Unlock()
// At this point, we'll now ask the router if this is a // At this point, we'll now ask the router if this is a
// zombie/known edge. If so we can skip all the processing // zombie/known edge. If so we can skip all the processing
@ -1808,14 +1809,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// If the advertised inclusionary block is beyond our knowledge // If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll put the announcement in limbo // of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain. // to be fully verified once we advance forward in the chain.
d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
log.Infof("Update announcement for "+ log.Infof("Update announcement for "+
"short_chan_id(%v), is premature: advertises "+ "short_chan_id(%v), is premature: advertises "+
"height %v, only height %v is known", "height %v, only height %v is known",
shortChanID, blockHeight, shortChanID, blockHeight,
atomic.LoadUint32(&d.bestHeight)) d.bestHeight)
d.Lock()
d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight], d.prematureAnnouncements[blockHeight],
nMsg, nMsg,
@ -1823,6 +1824,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.Unlock() d.Unlock()
return nil return nil
} }
d.Unlock()
// Before we perform any of the expensive checks below, we'll // Before we perform any of the expensive checks below, we'll
// check whether this update is stale or is for a zombie // check whether this update is stale or is for a zombie
@ -2058,19 +2060,20 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// proof is premature. If so we'll halt processing until the // proof is premature. If so we'll halt processing until the
// expected announcement height. This allows us to be tolerant // expected announcement height. This allows us to be tolerant
// to other clients if this constraint was changed. // to other clients if this constraint was changed.
d.Lock()
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) { if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) {
d.Lock()
d.prematureAnnouncements[needBlockHeight] = append( d.prematureAnnouncements[needBlockHeight] = append(
d.prematureAnnouncements[needBlockHeight], d.prematureAnnouncements[needBlockHeight],
nMsg, nMsg,
) )
d.Unlock()
log.Infof("Premature proof announcement, "+ log.Infof("Premature proof announcement, "+
"current block height lower than needed: %v <"+ "current block height lower than needed: %v <"+
" %v, add announcement to reprocessing batch", " %v, add announcement to reprocessing batch",
atomic.LoadUint32(&d.bestHeight), needBlockHeight) d.bestHeight, needBlockHeight)
d.Unlock()
return nil return nil
} }
d.Unlock()
// Ensure that we know of a channel with the target channel ID // Ensure that we know of a channel with the target channel ID
// before proceeding further. // before proceeding further.