routing/router: improve validation barrier shutdown

This commit improves the shutdown of the router's
pending validation tasks, by ensuring the pending
tasks exit early if the validation barrier
receives a shutdown request.

Currently, any goroutines blocked by WaitForDependants
will continue execution after a shutdown is signaled.
This may lead to unnexpected behavior as the relation
between updates is no longer upheld. It also has the
side effect of slowing down shutdown, since we
continue to process the remaining updates.

To remedy this, WaitForDependants now returns an error
that signals if a shutdown was requested. The blocked
goroutines can exit early upon seeing this error,
without also signaling completion of their task to
the dependent tasks, which should will now properly
wait to read the validation barrier's quit signal.
This commit is contained in:
Conner Fromknecht 2018-05-07 16:27:34 -07:00
parent 3854c1ed68
commit eaa8cdf916
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -637,31 +637,43 @@ func (r *ChannelRouter) networkHandler() {
// A new fully validated network update has just arrived. As a // A new fully validated network update has just arrived. As a
// result we'll modify the channel graph accordingly depending // result we'll modify the channel graph accordingly depending
// on the exact type of the message. // on the exact type of the message.
case updateMsg := <-r.networkUpdates: case update := <-r.networkUpdates:
// We'll set up any dependants, and wait until a free // We'll set up any dependants, and wait until a free
// slot for this job opens up, this allow us to not // slot for this job opens up, this allow us to not
// have thousands of goroutines active. // have thousands of goroutines active.
validationBarrier.InitJobDependencies(updateMsg.msg) validationBarrier.InitJobDependencies(update.msg)
r.wg.Add(1)
go func() { go func() {
defer r.wg.Done()
defer validationBarrier.CompleteJob() defer validationBarrier.CompleteJob()
// If this message has an existing dependency, // If this message has an existing dependency,
// then we'll wait until that has been fully // then we'll wait until that has been fully
// validated before we proceed. // validated before we proceed.
validationBarrier.WaitForDependants(updateMsg.msg) err := validationBarrier.WaitForDependants(
update.msg,
)
if err != nil {
if err != ErrVBarrierShuttingDown {
log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
err)
}
return
}
// Process the routing update to determine if // Process the routing update to determine if
// this is either a new update from our PoV or // this is either a new update from our PoV or
// an update to a prior vertex/edge we // an update to a prior vertex/edge we
// previously accepted. // previously accepted.
err := r.processUpdate(updateMsg.msg) err = r.processUpdate(update.msg)
updateMsg.err <- err update.err <- err
// If this message had any dependencies, then // If this message had any dependencies, then
// we can now signal them to continue. // we can now signal them to continue.
validationBarrier.SignalDependants(updateMsg.msg) validationBarrier.SignalDependants(update.msg)
if err != nil { if err != nil {
return return
} }
@ -669,8 +681,9 @@ func (r *ChannelRouter) networkHandler() {
// Send off a new notification for the newly // Send off a new notification for the newly
// accepted update. // accepted update.
topChange := &TopologyChange{} topChange := &TopologyChange{}
err = addToTopologyChange(r.cfg.Graph, topChange, err = addToTopologyChange(
updateMsg.msg) r.cfg.Graph, topChange, update.msg,
)
if err != nil { if err != nil {
log.Errorf("unable to update topology "+ log.Errorf("unable to update topology "+
"change notification: %v", err) "change notification: %v", err)