Merge pull request #1198 from cfromknecht/validation-barrier-quit

router: Validation Barrier Shutdown
This commit is contained in:
Olaoluwa Osuntokun 2018-05-08 17:28:35 -07:00 committed by GitHub
commit b437553b5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 201 additions and 15 deletions

@ -906,13 +906,26 @@ func (d *AuthenticatedGossiper) networkHandler() {
// have thousands of goroutines active. // have thousands of goroutines active.
validationBarrier.InitJobDependencies(announcement.msg) validationBarrier.InitJobDependencies(announcement.msg)
d.wg.Add(1)
go func() { go func() {
defer d.wg.Done()
defer validationBarrier.CompleteJob() defer validationBarrier.CompleteJob()
// If this message has an existing dependency, // If this message has an existing dependency,
// then we'll wait until that has been fully // then we'll wait until that has been fully
// validated before we proceed. // validated before we proceed.
validationBarrier.WaitForDependants(announcement.msg) err := validationBarrier.WaitForDependants(
announcement.msg,
)
if err != nil {
if err != routing.ErrVBarrierShuttingDown {
log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
err)
}
return
}
// Process the network announcement to determine if // Process the network announcement to determine if
// this is either a new announcement from our PoV // this is either a new announcement from our PoV

@ -637,31 +637,43 @@ func (r *ChannelRouter) networkHandler() {
// A new fully validated network update has just arrived. As a // A new fully validated network update has just arrived. As a
// result we'll modify the channel graph accordingly depending // result we'll modify the channel graph accordingly depending
// on the exact type of the message. // on the exact type of the message.
case updateMsg := <-r.networkUpdates: case update := <-r.networkUpdates:
// We'll set up any dependants, and wait until a free // We'll set up any dependants, and wait until a free
// slot for this job opens up, this allow us to not // slot for this job opens up, this allow us to not
// have thousands of goroutines active. // have thousands of goroutines active.
validationBarrier.InitJobDependencies(updateMsg.msg) validationBarrier.InitJobDependencies(update.msg)
r.wg.Add(1)
go func() { go func() {
defer r.wg.Done()
defer validationBarrier.CompleteJob() defer validationBarrier.CompleteJob()
// If this message has an existing dependency, // If this message has an existing dependency,
// then we'll wait until that has been fully // then we'll wait until that has been fully
// validated before we proceed. // validated before we proceed.
validationBarrier.WaitForDependants(updateMsg.msg) err := validationBarrier.WaitForDependants(
update.msg,
)
if err != nil {
if err != ErrVBarrierShuttingDown {
log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
err)
}
return
}
// Process the routing update to determine if // Process the routing update to determine if
// this is either a new update from our PoV or // this is either a new update from our PoV or
// an update to a prior vertex/edge we // an update to a prior vertex/edge we
// previously accepted. // previously accepted.
err := r.processUpdate(updateMsg.msg) err = r.processUpdate(update.msg)
updateMsg.err <- err update.err <- err
// If this message had any dependencies, then // If this message had any dependencies, then
// we can now signal them to continue. // we can now signal them to continue.
validationBarrier.SignalDependants(updateMsg.msg) validationBarrier.SignalDependants(update.msg)
if err != nil { if err != nil {
return return
} }
@ -669,8 +681,9 @@ func (r *ChannelRouter) networkHandler() {
// Send off a new notification for the newly // Send off a new notification for the newly
// accepted update. // accepted update.
topChange := &TopologyChange{} topChange := &TopologyChange{}
err = addToTopologyChange(r.cfg.Graph, topChange, err = addToTopologyChange(
updateMsg.msg) r.cfg.Graph, topChange, update.msg,
)
if err != nil { if err != nil {
log.Errorf("unable to update topology "+ log.Errorf("unable to update topology "+
"change notification: %v", err) "change notification: %v", err)

@ -1,12 +1,18 @@
package routing package routing
import ( import (
"errors"
"sync" "sync"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
// ErrVBarrierShuttingDown signals that the barrier has been requested to
// shutdown, and that the caller should not treat the wait condition as
// fulfilled.
var ErrVBarrierShuttingDown = errors.New("validation barrier shutting down")
// ValidationBarrier is a barrier used to ensure proper validation order while // ValidationBarrier is a barrier used to ensure proper validation order while
// concurrently validating new announcements for channel edges, and the // concurrently validating new announcements for channel edges, and the
// attributes of channel edges. It uses this set of maps (protected by this // attributes of channel edges. It uses this set of maps (protected by this
@ -152,7 +158,7 @@ func (v *ValidationBarrier) CompleteJob() {
// finished executing. This allows us a graceful way to schedule goroutines // finished executing. This allows us a graceful way to schedule goroutines
// based on any pending uncompleted dependent jobs. If this job doesn't have an // based on any pending uncompleted dependent jobs. If this job doesn't have an
// active dependent, then this function will return immediately. // active dependent, then this function will return immediately.
func (v *ValidationBarrier) WaitForDependants(job interface{}) { func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
var ( var (
signal chan struct{} signal chan struct{}
@ -181,13 +187,13 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) {
case *lnwire.AnnounceSignatures: case *lnwire.AnnounceSignatures:
// TODO(roasbeef): need to wait on chan ann? // TODO(roasbeef): need to wait on chan ann?
v.Unlock() v.Unlock()
return return nil
case *channeldb.ChannelEdgeInfo: case *channeldb.ChannelEdgeInfo:
v.Unlock() v.Unlock()
return return nil
case *lnwire.ChannelAnnouncement: case *lnwire.ChannelAnnouncement:
v.Unlock() v.Unlock()
return return nil
} }
v.Unlock() v.Unlock()
@ -196,10 +202,13 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) {
if ok { if ok {
select { select {
case <-v.quit: case <-v.quit:
return return ErrVBarrierShuttingDown
case <-signal: case <-signal:
return nil
} }
} }
return nil
} }
// SignalDependants will signal any jobs that are dependent on this job that // SignalDependants will signal any jobs that are dependent on this job that

@ -0,0 +1,151 @@
package routing_test
import (
"encoding/binary"
"testing"
"time"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
// TestValidationBarrierSemaphore checks basic properties of the validation
// barrier's semaphore wrt. enqueuing/dequeuing.
func TestValidationBarrierSemaphore(t *testing.T) {
const (
numTasks = 8
numPendingTasks = 8
timeout = 50 * time.Millisecond
)
quit := make(chan struct{})
barrier := routing.NewValidationBarrier(numTasks, quit)
// Saturate the semaphore with jobs.
for i := 0; i < numTasks; i++ {
barrier.InitJobDependencies(nil)
}
// Spawn additional tasks that will signal completion when added.
jobAdded := make(chan struct{})
for i := 0; i < numPendingTasks; i++ {
go func() {
barrier.InitJobDependencies(nil)
jobAdded <- struct{}{}
}()
}
// Check that no jobs are added while semaphore is full.
select {
case <-time.After(timeout):
// Expected since no slots open.
case <-jobAdded:
t.Fatalf("job should not have been added")
}
// Complete jobs one at a time and verify that they get added.
for i := 0; i < numPendingTasks; i++ {
barrier.CompleteJob()
select {
case <-time.After(timeout):
t.Fatalf("timeout waiting for job to be added")
case <-jobAdded:
// Expected since one slot opened up.
}
}
}
// TestValidationBarrierQuit checks that pending validation tasks will return an
// error from WaitForDependants if the barrier's quit signal is canceled.
func TestValidationBarrierQuit(t *testing.T) {
const (
numTasks = 8
timeout = 50 * time.Millisecond
)
quit := make(chan struct{})
barrier := routing.NewValidationBarrier(2*numTasks, quit)
// Create a set of unique channel announcements that we will prep for
// validation.
anns := make([]*lnwire.ChannelAnnouncement, 0, numTasks)
for i := 0; i < numTasks; i++ {
anns = append(anns, &lnwire.ChannelAnnouncement{
ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)),
NodeID1: nodeIDFromInt(uint64(2 * i)),
NodeID2: nodeIDFromInt(uint64(2*i + 1)),
})
barrier.InitJobDependencies(anns[i])
}
// Create a set of channel updates, that must wait until their
// associated channel announcement has been verified.
chanUpds := make([]*lnwire.ChannelUpdate, 0, numTasks)
for i := 0; i < numTasks; i++ {
chanUpds = append(chanUpds, &lnwire.ChannelUpdate{
ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)),
})
barrier.InitJobDependencies(chanUpds[i])
}
// Spawn additional tasks that will send the error returned after
// waiting for the announcements to finish. In the background, we will
// iteratively queue the channel updates, which will send back the error
// returned from waiting.
jobErrs := make(chan error)
for i := 0; i < numTasks; i++ {
go func(ii int) {
jobErrs <- barrier.WaitForDependants(chanUpds[ii])
}(i)
}
// Check that no jobs are added while semaphore is full.
select {
case <-time.After(timeout):
// Expected since no slots open.
case <-jobErrs:
t.Fatalf("job should not have been signaled")
}
// Complete the first half of jobs, one at a time, verifying that they
// get signaled. Then, quit the barrier and check that all others exit
// with the correct error.
for i := 0; i < numTasks; i++ {
switch {
// First half, signal completion and task semaphore
case i < numTasks/2:
barrier.SignalDependants(anns[i])
barrier.CompleteJob()
// At midpoint, quit the validation barrier.
case i == numTasks/2:
close(quit)
}
var err error
select {
case <-time.After(timeout):
t.Fatalf("timeout waiting for job to be signaled")
case err = <-jobErrs:
}
switch {
// First half should return without failure.
case i < numTasks/2 && err != nil:
t.Fatalf("unexpected failure while waiting: %v", err)
// Last half should return the shutdown error.
case i >= numTasks/2 && err != routing.ErrVBarrierShuttingDown:
t.Fatalf("expected failure after quitting: want %v, "+
"got %v", routing.ErrVBarrierShuttingDown, err)
}
}
}
// nodeIDFromInt creates a node ID by writing a uint64 to the first 8 bytes.
func nodeIDFromInt(i uint64) [33]byte {
var nodeID [33]byte
binary.BigEndian.PutUint64(nodeID[:8], i)
return nodeID
}