From 3067d05ae83557c7dad792437b9248ef27717122 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 29 Nov 2017 16:45:08 -0800 Subject: [PATCH] discovery: validate incoming announcements in parallel --- discovery/gossiper.go | 100 ++++++++++++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 27 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 040696ad..daa764e0 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -3,6 +3,7 @@ package discovery import ( "bytes" "fmt" + "runtime" "sync" "sync/atomic" "time" @@ -160,6 +161,8 @@ type AuthenticatedGossiper struct { // selfKey is the identity public key of the backing Lighting node. selfKey *btcec.PublicKey + + sync.Mutex } // New creates a new AuthenticatedGossiper instance, initialized with the @@ -540,6 +543,12 @@ func (d *AuthenticatedGossiper) networkHandler() { err) } + // We'll use this validation to ensure that we process jobs in their + // dependency order during parallel validation. + validationBarrier := routing.NewValidationBarrier( + runtime.NumCPU()*10, d.quit, + ) + for { select { // A new fee update has arrived. We'll commit it to the @@ -559,26 +568,47 @@ func (d *AuthenticatedGossiper) networkHandler() { // Finally, with the updates committed, we'll now add // them to the announcement batch to be flushed at the // start of the next epoch. - announcements.AddMsgs(newChanUpdates) + announcements.AddMsgs(newChanUpdates...) feeUpdate.errResp <- nil case announcement := <-d.networkMsgs: - // Process the network announcement to determine if - // this is either a new announcement from our PoV or an - // edges to a prior vertex/edge we previously - // proceeded. - emittedAnnouncements := d.processNetworkAnnouncement( - announcement, - ) + // We'll set up any dependant, and wait until a free + // slot for this job opens up, this allow us to not + // have thousands of goroutines active. + validationBarrier.InitJobDependancies(announcement.msg) - // If the announcement was accepted, then add the - // emitted announcements to our announce batch to be - // broadcast once the trickle timer ticks gain. - if emittedAnnouncements != nil { - // TODO(roasbeef): exclude peer that sent - announcements.AddMsgs(emittedAnnouncements) - } + go func() { + defer validationBarrier.CompleteJob() + + // If this message has an existing dependency, + // then we'll wait until that has been fully + // validated before we proceed. + validationBarrier.WaitForDependants(announcement.msg) + + // Process the network announcement to determine if + // this is either a new announcement from our PoV + // or an edges to a prior vertex/edge we previously + // proceeded. + emittedAnnouncements := d.processNetworkAnnouncement( + announcement, + ) + + // If this message had any dependencies, then + // we can now signal them to continue. + validationBarrier.SignalDependants(announcement.msg) + + // If the announcement was accepted, then add the + // emitted announcements to our announce batch to + // be broadcast once the trickle timer ticks gain. + if emittedAnnouncements != nil { + // TODO(roasbeef): exclude peer that sent + announcements.AddMsgs( + emittedAnnouncements..., + ) + } + + }() // A new block has arrived, so we can re-process the previously // premature announcements. @@ -592,32 +622,39 @@ func (d *AuthenticatedGossiper) networkHandler() { // Once a new block arrives, we updates our running // track of the height of the chain tip. blockHeight := uint32(newBlock.Height) - d.bestHeight = blockHeight + atomic.StoreUint32(&d.bestHeight, blockHeight) // Next we check if we have any premature announcements // for this height, if so, then we process them once // more as normal announcements. - prematureAnns := d.prematureAnnouncements[uint32(newBlock.Height)] - if len(prematureAnns) != 0 { + d.Lock() + numPremature := len(d.prematureAnnouncements[uint32(newBlock.Height)]) + d.Unlock() + if numPremature != 0 { log.Infof("Re-processing %v premature "+ "announcements for height %v", - len(prematureAnns), blockHeight) + numPremature, blockHeight) } - for _, ann := range prematureAnns { + d.Lock() + for _, ann := range d.prematureAnnouncements[uint32(newBlock.Height)] { emittedAnnouncements := d.processNetworkAnnouncement(ann) if emittedAnnouncements != nil { - announcements.AddMsgs(emittedAnnouncements) + announcements.AddMsgs( + emittedAnnouncements..., + ) } } delete(d.prematureAnnouncements, blockHeight) + d.Unlock() // The trickle timer has ticked, which indicates we should // flush to the network the pending batch of new announcements // we've received since the last trickle tick. case <-trickleTimer.C: - // get the batch of announcements from deDupedAnnouncements - announcementBatch := announcements.Batch() + // Emit the current batch of announcements from + // deDupedAnnouncements. + announcementBatch := announcements.Emit() // If the current announcements batch is nil, then we // have no further work here. @@ -797,7 +834,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { // TODO(roasbeef) make height delta 6 // * or configurable - return chanID.BlockHeight+delta > d.bestHeight + bestHeight := atomic.LoadUint32(&d.bestHeight) + return chanID.BlockHeight+delta > bestHeight } var announcements []lnwire.Message @@ -872,12 +910,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l log.Infof("Announcement for chan_id=(%v), is premature: "+ "advertises height %v, only height %v is known", msg.ShortChannelID.ToUint64(), - msg.ShortChannelID.BlockHeight, d.bestHeight) + msg.ShortChannelID.BlockHeight, + atomic.LoadUint32(&d.bestHeight)) + d.Lock() d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight], nMsg, ) + d.Unlock() return nil } @@ -978,12 +1019,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l log.Infof("Update announcement for "+ "short_chan_id(%v), is premature: advertises "+ "height %v, only height %v is known", - shortChanID, blockHeight, d.bestHeight) + shortChanID, blockHeight, + atomic.LoadUint32(&d.bestHeight)) + d.Lock() d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight], nMsg, ) + d.Unlock() return nil } @@ -1083,14 +1127,16 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // expected announcement height. This allows us to be tolerant // to other clients if this constraint was changed. if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) { + d.Lock() d.prematureAnnouncements[needBlockHeight] = append( d.prematureAnnouncements[needBlockHeight], nMsg, ) + d.Unlock() log.Infof("Premature proof announcement, "+ "current block height lower than needed: %v <"+ " %v, add announcement to reprocessing batch", - d.bestHeight, needBlockHeight) + atomic.LoadUint32(&d.bestHeight), needBlockHeight) return nil }