lnd.xprv/routing/validation_barrier.go
Conner Fromknecht 995e3fa85f
routing/validation_barrier: adds ErrVBarrierShuttingDown
Adds a new error ErrVBarrierShuttingDown that is returned
from WaitForDependants if the validation barrier's quit
chan is closed. This allows any blocked goroutines to
distinguish whether the dependent task has been completed,
or if validation should be aborted entirely.
2018-05-08 16:32:08 -07:00

259 lines
8.9 KiB
Go

package routing
import (
"errors"
"sync"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
)
// ErrVBarrierShuttingDown signals that the barrier has been requested to
// shutdown, and that the caller should not treat the wait condition as
// fulfilled.
var ErrVBarrierShuttingDown = errors.New("validation barrier shutting down")
// ValidationBarrier is a barrier used to ensure proper validation order while
// concurrently validating new announcements for channel edges, and the
// attributes of channel edges. It uses this set of maps (protected by this
// mutex) to track validation dependencies. For a given channel our
// dependencies look like this: chanAnn <- chanUp <- nodeAnn. That is we must
// validate the item on the left of the arrow before that on the right.
type ValidationBarrier struct {
// validationSemaphore is a channel of structs which is used as a
// sempahore. Initially we'll fill this with a buffered channel of the
// size of the number of active requests. Each new job will consume
// from this channel, then restore the value upon completion.
validationSemaphore chan struct{}
// chanAnnFinSignal is map that keep track of all the pending
// ChannelAnnouncement like validation job going on. Once the job has
// been completed, the channel will be closed unblocking any
// dependants.
chanAnnFinSignal map[lnwire.ShortChannelID]chan struct{}
// chanEdgeDependencies tracks any channel edge updates which should
// wait until the completion of the ChannelAnnouncement before
// proceeding. This is a dependency, as we can't validate the update
// before we validate the announcement which creates the channel
// itself.
chanEdgeDependencies map[lnwire.ShortChannelID]chan struct{}
// nodeAnnDependencies tracks any pending NodeAnnouncement validation
// jobs which should wait until the completion of the
// ChannelAnnouncement before proceeding.
nodeAnnDependencies map[Vertex]chan struct{}
quit chan struct{}
sync.Mutex
}
// NewValidationBarrier creates a new instance of a validation barrier given
// the total number of active requests, and a quit channel which will be used
// to know when to kill pending, but unfilled jobs.
func NewValidationBarrier(numActiveReqs int,
quitChan chan struct{}) *ValidationBarrier {
v := &ValidationBarrier{
chanAnnFinSignal: make(map[lnwire.ShortChannelID]chan struct{}),
chanEdgeDependencies: make(map[lnwire.ShortChannelID]chan struct{}),
nodeAnnDependencies: make(map[Vertex]chan struct{}),
quit: quitChan,
}
// We'll first initialize a set of sempahores to limit our concurrency
// when validating incoming requests in parallel.
v.validationSemaphore = make(chan struct{}, numActiveReqs)
for i := 0; i < numActiveReqs; i++ {
v.validationSemaphore <- struct{}{}
}
return v
}
// InitJobDependencies will wait for a new job slot to become open, and then
// sets up any dependent signals/trigger for the new job
func (v *ValidationBarrier) InitJobDependencies(job interface{}) {
// We'll wait for either a new slot to become open, or for the quit
// channel to be closed.
select {
case <-v.validationSemaphore:
case <-v.quit:
}
v.Lock()
defer v.Unlock()
// Once a slot is open, we'll examine the message of the job, to see if
// there need to be any dependent barriers set up.
switch msg := job.(type) {
// If this is a channel announcement, then we'll need to set up den
// tenancies, as we'll need to verify this before we verify any
// ChannelUpdates for the same channel, or NodeAnnouncements of nodes
// that are involved in this channel. This goes for both the wire
// type,s and also the types that we use within the database.
case *lnwire.ChannelAnnouncement:
// We ensure that we only create a new announcement signal iff,
// one doesn't already exist, as there may be duplicate
// announcements. We'll close this signal once the
// ChannelAnnouncement has been validated. This will result in
// all the dependent jobs being unlocked so they can finish
// execution themselves.
if _, ok := v.chanAnnFinSignal[msg.ShortChannelID]; !ok {
// We'll create the channel that we close after we
// validate this announcement. All dependants will
// point to this same channel, so they'll be unblocked
// at the same time.
annFinCond := make(chan struct{})
v.chanAnnFinSignal[msg.ShortChannelID] = annFinCond
v.chanEdgeDependencies[msg.ShortChannelID] = annFinCond
v.nodeAnnDependencies[Vertex(msg.NodeID1)] = annFinCond
v.nodeAnnDependencies[Vertex(msg.NodeID2)] = annFinCond
}
case *channeldb.ChannelEdgeInfo:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
if _, ok := v.chanAnnFinSignal[shortID]; !ok {
annFinCond := make(chan struct{})
v.chanAnnFinSignal[shortID] = annFinCond
v.chanEdgeDependencies[shortID] = annFinCond
v.nodeAnnDependencies[Vertex(msg.NodeKey1Bytes)] = annFinCond
v.nodeAnnDependencies[Vertex(msg.NodeKey2Bytes)] = annFinCond
}
// These other types don't have any dependants, so no further
// initialization needs to be done beyond just occupying a job slot.
case *channeldb.ChannelEdgePolicy:
return
case *lnwire.ChannelUpdate:
return
case *lnwire.NodeAnnouncement:
// TODO(roasbeef): node ann needs to wait on existing channel updates
return
case *channeldb.LightningNode:
return
case *lnwire.AnnounceSignatures:
// TODO(roasbeef): need to wait on chan ann?
return
}
}
// CompleteJob returns a free slot to the set of available job slots. This
// should be called once a job has been fully completed. Otherwise, slots may
// not be returned to the internal scheduling, causing a deadlock when a new
// overflow job is attempted.
func (v *ValidationBarrier) CompleteJob() {
select {
case v.validationSemaphore <- struct{}{}:
case <-v.quit:
}
}
// WaitForDependants will block until any jobs that this job dependants on have
// finished executing. This allows us a graceful way to schedule goroutines
// based on any pending uncompleted dependent jobs. If this job doesn't have an
// active dependent, then this function will return immediately.
func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
var (
signal chan struct{}
ok bool
)
v.Lock()
switch msg := job.(type) {
// Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the
// completion of any active ChannelAnnouncement jobs related to them.
case *channeldb.ChannelEdgePolicy:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
signal, ok = v.chanEdgeDependencies[shortID]
case *channeldb.LightningNode:
vertex := Vertex(msg.PubKeyBytes)
signal, ok = v.nodeAnnDependencies[vertex]
case *lnwire.ChannelUpdate:
signal, ok = v.chanEdgeDependencies[msg.ShortChannelID]
case *lnwire.NodeAnnouncement:
vertex := Vertex(msg.NodeID)
signal, ok = v.nodeAnnDependencies[vertex]
// Other types of jobs can be executed immediately, so we'll just
// return directly.
case *lnwire.AnnounceSignatures:
// TODO(roasbeef): need to wait on chan ann?
v.Unlock()
return nil
case *channeldb.ChannelEdgeInfo:
v.Unlock()
return nil
case *lnwire.ChannelAnnouncement:
v.Unlock()
return nil
}
v.Unlock()
// If we do have an active job, then we'll wait until either the signal
// is closed, or the set of jobs exits.
if ok {
select {
case <-v.quit:
return ErrVBarrierShuttingDown
case <-signal:
return nil
}
}
return nil
}
// SignalDependants will signal any jobs that are dependent on this job that
// they can continue execution. If the job doesn't have any dependants, then
// this function sill exit immediately.
func (v *ValidationBarrier) SignalDependants(job interface{}) {
v.Lock()
defer v.Unlock()
switch msg := job.(type) {
// If we've just finished executing a ChannelAnnouncement, then we'll
// close out the signal, and remove the signal from the map of active
// ones. This will allow any dependent jobs to continue execution.
case *channeldb.ChannelEdgeInfo:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
finSignal, ok := v.chanAnnFinSignal[shortID]
if ok {
close(finSignal)
delete(v.chanAnnFinSignal, shortID)
}
case *lnwire.ChannelAnnouncement:
finSignal, ok := v.chanAnnFinSignal[msg.ShortChannelID]
if ok {
close(finSignal)
delete(v.chanAnnFinSignal, msg.ShortChannelID)
}
delete(v.chanEdgeDependencies, msg.ShortChannelID)
// For all other job types, we'll delete the tracking entries from the
// map, as if we reach this point, then all dependants have already
// finished executing and we can proceed.
case *channeldb.LightningNode:
delete(v.nodeAnnDependencies, Vertex(msg.PubKeyBytes))
case *lnwire.NodeAnnouncement:
delete(v.nodeAnnDependencies, Vertex(msg.NodeID))
case *lnwire.ChannelUpdate:
delete(v.chanEdgeDependencies, msg.ShortChannelID)
case *channeldb.ChannelEdgePolicy:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
delete(v.chanEdgeDependencies, shortID)
case *lnwire.AnnounceSignatures:
return
}
}