discovery: validate incoming announcements in parallel

This commit is contained in:
Olaoluwa Osuntokun 2017-11-29 16:45:08 -08:00
parent 5bc9f07d12
commit 3067d05ae8
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -3,6 +3,7 @@ package discovery
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -160,6 +161,8 @@ type AuthenticatedGossiper struct {
// selfKey is the identity public key of the backing Lighting node. // selfKey is the identity public key of the backing Lighting node.
selfKey *btcec.PublicKey selfKey *btcec.PublicKey
sync.Mutex
} }
// New creates a new AuthenticatedGossiper instance, initialized with the // New creates a new AuthenticatedGossiper instance, initialized with the
@ -540,6 +543,12 @@ func (d *AuthenticatedGossiper) networkHandler() {
err) err)
} }
// We'll use this validation to ensure that we process jobs in their
// dependency order during parallel validation.
validationBarrier := routing.NewValidationBarrier(
runtime.NumCPU()*10, d.quit,
)
for { for {
select { select {
// A new fee update has arrived. We'll commit it to the // A new fee update has arrived. We'll commit it to the
@ -559,27 +568,48 @@ func (d *AuthenticatedGossiper) networkHandler() {
// Finally, with the updates committed, we'll now add // Finally, with the updates committed, we'll now add
// them to the announcement batch to be flushed at the // them to the announcement batch to be flushed at the
// start of the next epoch. // start of the next epoch.
announcements.AddMsgs(newChanUpdates) announcements.AddMsgs(newChanUpdates...)
feeUpdate.errResp <- nil feeUpdate.errResp <- nil
case announcement := <-d.networkMsgs: case announcement := <-d.networkMsgs:
// We'll set up any dependant, and wait until a free
// slot for this job opens up, this allow us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependancies(announcement.msg)
go func() {
defer validationBarrier.CompleteJob()
// If this message has an existing dependency,
// then we'll wait until that has been fully
// validated before we proceed.
validationBarrier.WaitForDependants(announcement.msg)
// Process the network announcement to determine if // Process the network announcement to determine if
// this is either a new announcement from our PoV or an // this is either a new announcement from our PoV
// edges to a prior vertex/edge we previously // or an edges to a prior vertex/edge we previously
// proceeded. // proceeded.
emittedAnnouncements := d.processNetworkAnnouncement( emittedAnnouncements := d.processNetworkAnnouncement(
announcement, announcement,
) )
// If this message had any dependencies, then
// we can now signal them to continue.
validationBarrier.SignalDependants(announcement.msg)
// If the announcement was accepted, then add the // If the announcement was accepted, then add the
// emitted announcements to our announce batch to be // emitted announcements to our announce batch to
// broadcast once the trickle timer ticks gain. // be broadcast once the trickle timer ticks gain.
if emittedAnnouncements != nil { if emittedAnnouncements != nil {
// TODO(roasbeef): exclude peer that sent // TODO(roasbeef): exclude peer that sent
announcements.AddMsgs(emittedAnnouncements) announcements.AddMsgs(
emittedAnnouncements...,
)
} }
}()
// A new block has arrived, so we can re-process the previously // A new block has arrived, so we can re-process the previously
// premature announcements. // premature announcements.
case newBlock, ok := <-d.newBlocks: case newBlock, ok := <-d.newBlocks:
@ -592,32 +622,39 @@ func (d *AuthenticatedGossiper) networkHandler() {
// Once a new block arrives, we updates our running // Once a new block arrives, we updates our running
// track of the height of the chain tip. // track of the height of the chain tip.
blockHeight := uint32(newBlock.Height) blockHeight := uint32(newBlock.Height)
d.bestHeight = blockHeight atomic.StoreUint32(&d.bestHeight, blockHeight)
// Next we check if we have any premature announcements // Next we check if we have any premature announcements
// for this height, if so, then we process them once // for this height, if so, then we process them once
// more as normal announcements. // more as normal announcements.
prematureAnns := d.prematureAnnouncements[uint32(newBlock.Height)] d.Lock()
if len(prematureAnns) != 0 { numPremature := len(d.prematureAnnouncements[uint32(newBlock.Height)])
d.Unlock()
if numPremature != 0 {
log.Infof("Re-processing %v premature "+ log.Infof("Re-processing %v premature "+
"announcements for height %v", "announcements for height %v",
len(prematureAnns), blockHeight) numPremature, blockHeight)
} }
for _, ann := range prematureAnns { d.Lock()
for _, ann := range d.prematureAnnouncements[uint32(newBlock.Height)] {
emittedAnnouncements := d.processNetworkAnnouncement(ann) emittedAnnouncements := d.processNetworkAnnouncement(ann)
if emittedAnnouncements != nil { if emittedAnnouncements != nil {
announcements.AddMsgs(emittedAnnouncements) announcements.AddMsgs(
emittedAnnouncements...,
)
} }
} }
delete(d.prematureAnnouncements, blockHeight) delete(d.prematureAnnouncements, blockHeight)
d.Unlock()
// The trickle timer has ticked, which indicates we should // The trickle timer has ticked, which indicates we should
// flush to the network the pending batch of new announcements // flush to the network the pending batch of new announcements
// we've received since the last trickle tick. // we've received since the last trickle tick.
case <-trickleTimer.C: case <-trickleTimer.C:
// get the batch of announcements from deDupedAnnouncements // Emit the current batch of announcements from
announcementBatch := announcements.Batch() // deDupedAnnouncements.
announcementBatch := announcements.Emit()
// If the current announcements batch is nil, then we // If the current announcements batch is nil, then we
// have no further work here. // have no further work here.
@ -797,7 +834,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6 // TODO(roasbeef) make height delta 6
// * or configurable // * or configurable
return chanID.BlockHeight+delta > d.bestHeight bestHeight := atomic.LoadUint32(&d.bestHeight)
return chanID.BlockHeight+delta > bestHeight
} }
var announcements []lnwire.Message var announcements []lnwire.Message
@ -872,12 +910,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
log.Infof("Announcement for chan_id=(%v), is premature: "+ log.Infof("Announcement for chan_id=(%v), is premature: "+
"advertises height %v, only height %v is known", "advertises height %v, only height %v is known",
msg.ShortChannelID.ToUint64(), msg.ShortChannelID.ToUint64(),
msg.ShortChannelID.BlockHeight, d.bestHeight) msg.ShortChannelID.BlockHeight,
atomic.LoadUint32(&d.bestHeight))
d.Lock()
d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight], d.prematureAnnouncements[blockHeight],
nMsg, nMsg,
) )
d.Unlock()
return nil return nil
} }
@ -978,12 +1019,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
log.Infof("Update announcement for "+ log.Infof("Update announcement for "+
"short_chan_id(%v), is premature: advertises "+ "short_chan_id(%v), is premature: advertises "+
"height %v, only height %v is known", "height %v, only height %v is known",
shortChanID, blockHeight, d.bestHeight) shortChanID, blockHeight,
atomic.LoadUint32(&d.bestHeight))
d.Lock()
d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight], d.prematureAnnouncements[blockHeight],
nMsg, nMsg,
) )
d.Unlock()
return nil return nil
} }
@ -1083,14 +1127,16 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
// expected announcement height. This allows us to be tolerant // expected announcement height. This allows us to be tolerant
// to other clients if this constraint was changed. // to other clients if this constraint was changed.
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) { if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) {
d.Lock()
d.prematureAnnouncements[needBlockHeight] = append( d.prematureAnnouncements[needBlockHeight] = append(
d.prematureAnnouncements[needBlockHeight], d.prematureAnnouncements[needBlockHeight],
nMsg, nMsg,
) )
d.Unlock()
log.Infof("Premature proof announcement, "+ log.Infof("Premature proof announcement, "+
"current block height lower than needed: %v <"+ "current block height lower than needed: %v <"+
" %v, add announcement to reprocessing batch", " %v, add announcement to reprocessing batch",
d.bestHeight, needBlockHeight) atomic.LoadUint32(&d.bestHeight), needBlockHeight)
return nil return nil
} }