discovery: Correctly lock premature annoucements
This reworks the locking behavior of the Gossiper so that a race condition on channel updates and block notifications doesn't cause any loss of messages. This fixes an issue that manifested mostly as flakes on itests during WaitForNetworkChannelOpen calls. The previous behavior allowed ChannelUpdates to be missed if they happened concurrently to block notifications. The processNetworkAnnoucement call would check for the current block height, then lock the gossiper and add the msg to the prematureAnnoucements list. New blocks would trigger an update to the current block height then a lock and check of the aforementioned list. However, specially during itests it could happen that the missing lock before checking the height could case a race condition if the following sequence of events happened: - A new ChannelUpdate message was received and started processing on a separate goroutine - The isPremature() call was made and verified that the ChannelUpdate was in fact premature - The goroutine was scheduled out - A new block started processing in the gossiper. It updated the block height, asked and was granted the lock for the gossiper and verified there was zero premature announcements. The lock was released. - The goroutine processing the ChannelUpdate asked for the gossiper lock and was granted it. It added the ChannelUpdate in the prematureAnnoucements list. This can never be processed now. The way to fix this behavior is to ensure that both isPremature checks done inside processNetworkAnnoucement and best block updates are made inside the same critical section (i.e. while holding the same lock) so that they can't both check and update the prematureAnnoucements list concurrently.
This commit is contained in:
parent
ccc8f8e48f
commit
44f83731bc
@ -6,7 +6,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/btcec"
|
"github.com/btcsuite/btcd/btcec"
|
||||||
@ -247,7 +246,8 @@ type AuthenticatedGossiper struct {
|
|||||||
stopped sync.Once
|
stopped sync.Once
|
||||||
|
|
||||||
// bestHeight is the height of the block at the tip of the main chain
|
// bestHeight is the height of the block at the tip of the main chain
|
||||||
// as we know it. To be used atomically.
|
// as we know it. Accesses *MUST* be done with the gossiper's lock
|
||||||
|
// held.
|
||||||
bestHeight uint32
|
bestHeight uint32
|
||||||
|
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
@ -1030,31 +1030,30 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once a new block arrives, we updates our running
|
// Once a new block arrives, we update our running
|
||||||
// track of the height of the chain tip.
|
// track of the height of the chain tip.
|
||||||
|
d.Lock()
|
||||||
blockHeight := uint32(newBlock.Height)
|
blockHeight := uint32(newBlock.Height)
|
||||||
atomic.StoreUint32(&d.bestHeight, blockHeight)
|
d.bestHeight = blockHeight
|
||||||
|
|
||||||
log.Debugf("New Block: height=%d hash=%s", blockHeight,
|
log.Debugf("New block: height=%d, hash=%s", blockHeight,
|
||||||
newBlock.Hash)
|
newBlock.Hash)
|
||||||
|
|
||||||
// Next we check if we have any premature announcements
|
// Next we check if we have any premature announcements
|
||||||
// for this height, if so, then we process them once
|
// for this height, if so, then we process them once
|
||||||
// more as normal announcements.
|
// more as normal announcements.
|
||||||
d.Lock()
|
premature := d.prematureAnnouncements[blockHeight]
|
||||||
numPremature := len(d.prematureAnnouncements[blockHeight])
|
if len(premature) == 0 {
|
||||||
d.Unlock()
|
d.Unlock()
|
||||||
|
|
||||||
// Return early if no announcement to process.
|
|
||||||
if numPremature == 0 {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
delete(d.prematureAnnouncements, blockHeight)
|
||||||
|
d.Unlock()
|
||||||
|
|
||||||
log.Infof("Re-processing %v premature announcements "+
|
log.Infof("Re-processing %v premature announcements "+
|
||||||
"for height %v", numPremature, blockHeight)
|
"for height %v", len(premature), blockHeight)
|
||||||
|
|
||||||
d.Lock()
|
for _, ann := range premature {
|
||||||
for _, ann := range d.prematureAnnouncements[blockHeight] {
|
|
||||||
emittedAnnouncements := d.processNetworkAnnouncement(ann)
|
emittedAnnouncements := d.processNetworkAnnouncement(ann)
|
||||||
if emittedAnnouncements != nil {
|
if emittedAnnouncements != nil {
|
||||||
announcements.AddMsgs(
|
announcements.AddMsgs(
|
||||||
@ -1062,8 +1061,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delete(d.prematureAnnouncements, blockHeight)
|
|
||||||
d.Unlock()
|
|
||||||
|
|
||||||
// The trickle timer has ticked, which indicates we should
|
// The trickle timer has ticked, which indicates we should
|
||||||
// flush to the network the pending batch of new announcements
|
// flush to the network the pending batch of new announcements
|
||||||
@ -1494,11 +1491,11 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error {
|
|||||||
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||||
nMsg *networkMsg) []networkMsg {
|
nMsg *networkMsg) []networkMsg {
|
||||||
|
|
||||||
|
// isPremature *MUST* be called with the gossiper's lock held.
|
||||||
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
|
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
|
||||||
// TODO(roasbeef) make height delta 6
|
// TODO(roasbeef) make height delta 6
|
||||||
// * or configurable
|
// * or configurable
|
||||||
bestHeight := atomic.LoadUint32(&d.bestHeight)
|
return chanID.BlockHeight+delta > d.bestHeight
|
||||||
return chanID.BlockHeight+delta > bestHeight
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var announcements []networkMsg
|
var announcements []networkMsg
|
||||||
@ -1584,6 +1581,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
|||||||
// If the advertised inclusionary block is beyond our knowledge
|
// If the advertised inclusionary block is beyond our knowledge
|
||||||
// of the chain tip, then we'll put the announcement in limbo
|
// of the chain tip, then we'll put the announcement in limbo
|
||||||
// to be fully verified once we advance forward in the chain.
|
// to be fully verified once we advance forward in the chain.
|
||||||
|
d.Lock()
|
||||||
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
|
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
|
||||||
blockHeight := msg.ShortChannelID.BlockHeight
|
blockHeight := msg.ShortChannelID.BlockHeight
|
||||||
log.Infof("Announcement for chan_id=(%v), is "+
|
log.Infof("Announcement for chan_id=(%v), is "+
|
||||||
@ -1591,9 +1589,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
|||||||
"height %v is known",
|
"height %v is known",
|
||||||
msg.ShortChannelID.ToUint64(),
|
msg.ShortChannelID.ToUint64(),
|
||||||
msg.ShortChannelID.BlockHeight,
|
msg.ShortChannelID.BlockHeight,
|
||||||
atomic.LoadUint32(&d.bestHeight))
|
d.bestHeight)
|
||||||
|
|
||||||
d.Lock()
|
|
||||||
d.prematureAnnouncements[blockHeight] = append(
|
d.prematureAnnouncements[blockHeight] = append(
|
||||||
d.prematureAnnouncements[blockHeight],
|
d.prematureAnnouncements[blockHeight],
|
||||||
nMsg,
|
nMsg,
|
||||||
@ -1601,6 +1598,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
|||||||
d.Unlock()
|
d.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
d.Unlock()
|
||||||
|
|
||||||
// At this point, we'll now ask the router if this is a
|
// At this point, we'll now ask the router if this is a
|
||||||
// zombie/known edge. If so we can skip all the processing
|
// zombie/known edge. If so we can skip all the processing
|
||||||
@ -1811,14 +1809,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
|||||||
// If the advertised inclusionary block is beyond our knowledge
|
// If the advertised inclusionary block is beyond our knowledge
|
||||||
// of the chain tip, then we'll put the announcement in limbo
|
// of the chain tip, then we'll put the announcement in limbo
|
||||||
// to be fully verified once we advance forward in the chain.
|
// to be fully verified once we advance forward in the chain.
|
||||||
|
d.Lock()
|
||||||
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
|
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
|
||||||
log.Infof("Update announcement for "+
|
log.Infof("Update announcement for "+
|
||||||
"short_chan_id(%v), is premature: advertises "+
|
"short_chan_id(%v), is premature: advertises "+
|
||||||
"height %v, only height %v is known",
|
"height %v, only height %v is known",
|
||||||
shortChanID, blockHeight,
|
shortChanID, blockHeight,
|
||||||
atomic.LoadUint32(&d.bestHeight))
|
d.bestHeight)
|
||||||
|
|
||||||
d.Lock()
|
|
||||||
d.prematureAnnouncements[blockHeight] = append(
|
d.prematureAnnouncements[blockHeight] = append(
|
||||||
d.prematureAnnouncements[blockHeight],
|
d.prematureAnnouncements[blockHeight],
|
||||||
nMsg,
|
nMsg,
|
||||||
@ -1826,6 +1824,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
|||||||
d.Unlock()
|
d.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
d.Unlock()
|
||||||
|
|
||||||
// Before we perform any of the expensive checks below, we'll
|
// Before we perform any of the expensive checks below, we'll
|
||||||
// check whether this update is stale or is for a zombie
|
// check whether this update is stale or is for a zombie
|
||||||
@ -2061,19 +2060,20 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
|||||||
// proof is premature. If so we'll halt processing until the
|
// proof is premature. If so we'll halt processing until the
|
||||||
// expected announcement height. This allows us to be tolerant
|
// expected announcement height. This allows us to be tolerant
|
||||||
// to other clients if this constraint was changed.
|
// to other clients if this constraint was changed.
|
||||||
|
d.Lock()
|
||||||
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) {
|
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) {
|
||||||
d.Lock()
|
|
||||||
d.prematureAnnouncements[needBlockHeight] = append(
|
d.prematureAnnouncements[needBlockHeight] = append(
|
||||||
d.prematureAnnouncements[needBlockHeight],
|
d.prematureAnnouncements[needBlockHeight],
|
||||||
nMsg,
|
nMsg,
|
||||||
)
|
)
|
||||||
d.Unlock()
|
|
||||||
log.Infof("Premature proof announcement, "+
|
log.Infof("Premature proof announcement, "+
|
||||||
"current block height lower than needed: %v <"+
|
"current block height lower than needed: %v <"+
|
||||||
" %v, add announcement to reprocessing batch",
|
" %v, add announcement to reprocessing batch",
|
||||||
atomic.LoadUint32(&d.bestHeight), needBlockHeight)
|
d.bestHeight, needBlockHeight)
|
||||||
|
d.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
d.Unlock()
|
||||||
|
|
||||||
// Ensure that we know of a channel with the target channel ID
|
// Ensure that we know of a channel with the target channel ID
|
||||||
// before proceeding further.
|
// before proceeding further.
|
||||||
|
Loading…
Reference in New Issue
Block a user