diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 0e723837..a72ddcd7 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -6,7 +6,6 @@ import ( "fmt" "runtime" "sync" - "sync/atomic" "time" "github.com/btcsuite/btcd/btcec" @@ -247,7 +246,8 @@ type AuthenticatedGossiper struct { stopped sync.Once // bestHeight is the height of the block at the tip of the main chain - // as we know it. To be used atomically. + // as we know it. Accesses *MUST* be done with the gossiper's lock + // held. bestHeight uint32 quit chan struct{} @@ -1030,31 +1030,30 @@ func (d *AuthenticatedGossiper) networkHandler() { return } - // Once a new block arrives, we updates our running + // Once a new block arrives, we update our running // track of the height of the chain tip. + d.Lock() blockHeight := uint32(newBlock.Height) - atomic.StoreUint32(&d.bestHeight, blockHeight) + d.bestHeight = blockHeight - log.Debugf("New Block: height=%d hash=%s", blockHeight, + log.Debugf("New block: height=%d, hash=%s", blockHeight, newBlock.Hash) // Next we check if we have any premature announcements // for this height, if so, then we process them once // more as normal announcements. - d.Lock() - numPremature := len(d.prematureAnnouncements[blockHeight]) - d.Unlock() - - // Return early if no announcement to process. - if numPremature == 0 { + premature := d.prematureAnnouncements[blockHeight] + if len(premature) == 0 { + d.Unlock() continue } + delete(d.prematureAnnouncements, blockHeight) + d.Unlock() log.Infof("Re-processing %v premature announcements "+ - "for height %v", numPremature, blockHeight) + "for height %v", len(premature), blockHeight) - d.Lock() - for _, ann := range d.prematureAnnouncements[blockHeight] { + for _, ann := range premature { emittedAnnouncements := d.processNetworkAnnouncement(ann) if emittedAnnouncements != nil { announcements.AddMsgs( @@ -1062,8 +1061,6 @@ func (d *AuthenticatedGossiper) networkHandler() { ) } } - delete(d.prematureAnnouncements, blockHeight) - d.Unlock() // The trickle timer has ticked, which indicates we should // flush to the network the pending batch of new announcements @@ -1494,11 +1491,11 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error { func (d *AuthenticatedGossiper) processNetworkAnnouncement( nMsg *networkMsg) []networkMsg { + // isPremature *MUST* be called with the gossiper's lock held. isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { // TODO(roasbeef) make height delta 6 // * or configurable - bestHeight := atomic.LoadUint32(&d.bestHeight) - return chanID.BlockHeight+delta > bestHeight + return chanID.BlockHeight+delta > d.bestHeight } var announcements []networkMsg @@ -1584,6 +1581,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // If the advertised inclusionary block is beyond our knowledge // of the chain tip, then we'll put the announcement in limbo // to be fully verified once we advance forward in the chain. + d.Lock() if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { blockHeight := msg.ShortChannelID.BlockHeight log.Infof("Announcement for chan_id=(%v), is "+ @@ -1591,9 +1589,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( "height %v is known", msg.ShortChannelID.ToUint64(), msg.ShortChannelID.BlockHeight, - atomic.LoadUint32(&d.bestHeight)) + d.bestHeight) - d.Lock() d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight], nMsg, @@ -1601,6 +1598,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.Unlock() return nil } + d.Unlock() // At this point, we'll now ask the router if this is a // zombie/known edge. If so we can skip all the processing @@ -1811,14 +1809,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // If the advertised inclusionary block is beyond our knowledge // of the chain tip, then we'll put the announcement in limbo // to be fully verified once we advance forward in the chain. + d.Lock() if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { log.Infof("Update announcement for "+ "short_chan_id(%v), is premature: advertises "+ "height %v, only height %v is known", shortChanID, blockHeight, - atomic.LoadUint32(&d.bestHeight)) + d.bestHeight) - d.Lock() d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight], nMsg, @@ -1826,6 +1824,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.Unlock() return nil } + d.Unlock() // Before we perform any of the expensive checks below, we'll // check whether this update is stale or is for a zombie @@ -2061,19 +2060,20 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // proof is premature. If so we'll halt processing until the // expected announcement height. This allows us to be tolerant // to other clients if this constraint was changed. + d.Lock() if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) { - d.Lock() d.prematureAnnouncements[needBlockHeight] = append( d.prematureAnnouncements[needBlockHeight], nMsg, ) - d.Unlock() log.Infof("Premature proof announcement, "+ "current block height lower than needed: %v <"+ " %v, add announcement to reprocessing batch", - atomic.LoadUint32(&d.bestHeight), needBlockHeight) + d.bestHeight, needBlockHeight) + d.Unlock() return nil } + d.Unlock() // Ensure that we know of a channel with the target channel ID // before proceeding further.