Browse Source

discovery+routing: cancel dependent jobs if parent validation fails

Previously, we would always allow dependent jobs to be processed,
regardless of the result of its parent job's validation. This isn't
correct, as a parent job contains actions necessary to successfully
process a dependent job. A prime example of this can be found within the
AuthenticatedGossiper, where an incoming channel announcement and update
are both processed, but if the channel announcement job fails to
complete, then the gossiper is unable to properly validate the update.
This commit aims to address this by preventing the dependent jobs to
run.
master
Wilmer Paulino 3 years ago
parent
commit
393111cea9
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
  1. 98
      discovery/gossiper.go
  2. 9
      routing/router.go
  3. 103
      routing/validation_barrier.go
  4. 18
      routing/validation_barrier_test.go

98
discovery/gossiper.go

@ -990,7 +990,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
// Channel announcement signatures are amongst the only
// messages that we'll process serially.
case *lnwire.AnnounceSignatures:
emittedAnnouncements := d.processNetworkAnnouncement(
emittedAnnouncements, _ := d.processNetworkAnnouncement(
announcement,
)
if emittedAnnouncements != nil {
@ -1040,14 +1040,14 @@ func (d *AuthenticatedGossiper) networkHandler() {
// determine if this is either a new
// announcement from our PoV or an edges to a
// prior vertex/edge we previously proceeded.
emittedAnnouncements := d.processNetworkAnnouncement(
emittedAnnouncements, allowDependents := d.processNetworkAnnouncement(
announcement,
)
// If this message had any dependencies, then
// we can now signal them to continue.
validationBarrier.SignalDependants(
announcement.msg,
announcement.msg, allowDependents,
)
// If the announcement was accepted, then add
@ -1514,9 +1514,11 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
// channel or node announcement or announcements proofs. If the announcement
// didn't affect the internal state due to either being out of date, invalid,
// or redundant, then nil is returned. Otherwise, the set of announcements will
// be returned which should be broadcasted to the rest of the network.
// be returned which should be broadcasted to the rest of the network. The
// boolean returned indicates whether any dependents of the announcement should
// attempt to be processed as well.
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
nMsg *networkMsg) []networkMsg {
nMsg *networkMsg) ([]networkMsg, bool) {
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6
@ -1546,7 +1548,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// signatures if not required.
if d.cfg.Router.IsStaleNode(msg.NodeID, timestamp) {
nMsg.err <- nil
return nil
return nil, true
}
if err := d.addNode(msg, schedulerOp...); err != nil {
@ -1559,7 +1561,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}
nMsg.err <- err
return nil
return nil, false
}
// In order to ensure we don't leak unadvertised nodes, we'll
@ -1570,7 +1572,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Errorf("Unable to determine if node %x is "+
"advertised: %v", msg.NodeID, err)
nMsg.err <- err
return nil
return nil, false
}
// If it does, we'll add their announcement to our batch so that
@ -1588,7 +1590,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
nMsg.err <- nil
// TODO(roasbeef): get rid of the above
return announcements
return announcements, true
// A new channel announcement has arrived, this indicates the
// *creation* of a new channel within the network. This only advertises
@ -1608,7 +1610,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.rejectMtx.Unlock()
nMsg.err <- err
return nil
return nil, false
}
// If the advertised inclusionary block is beyond our knowledge
@ -1623,7 +1625,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.bestHeight)
d.Unlock()
nMsg.err <- nil
return nil
return nil, false
}
d.Unlock()
@ -1632,7 +1634,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// below.
if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) {
nMsg.err <- nil
return nil
return nil, true
}
// If this is a remote channel announcement, then we'll validate
@ -1649,7 +1651,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
// If the proof checks out, then we'll save the proof
@ -1669,7 +1671,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
if err := msg.Features.Encode(&featureBuf); err != nil {
log.Errorf("unable to encode features: %v", err)
nMsg.err <- err
return nil
return nil, false
}
edge := &channeldb.ChannelEdgeInfo{
@ -1720,7 +1722,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
nMsg.err <- rErr
return nil
return nil, false
}
// If while processing this rejected edge, we
@ -1729,7 +1731,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// directly.
if len(anns) != 0 {
nMsg.err <- nil
return anns
return anns, true
}
// Otherwise, this is just a regular rejected
@ -1742,7 +1744,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}
nMsg.err <- err
return nil
return nil, false
}
// If we earlier received any ChannelUpdates for this channel,
@ -1806,7 +1808,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}
nMsg.err <- nil
return announcements
return announcements, true
// A new authenticated channel edge update has arrived. This indicates
// that the directional information for an already known channel has
@ -1825,7 +1827,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.rejectMtx.Unlock()
nMsg.err <- err
return nil
return nil, false
}
blockHeight := msg.ShortChannelID.BlockHeight
@ -1842,7 +1844,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID, blockHeight,
d.bestHeight)
d.Unlock()
return nil
nMsg.err <- nil
return nil, false
}
d.Unlock()
@ -1854,7 +1857,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
msg.ShortChannelID, timestamp, msg.ChannelFlags,
) {
nMsg.err <- nil
return nil
return nil, true
}
// Get the node pub key as far as we don't have it in channel
@ -1893,7 +1896,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
"update signature: %v", err)
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
// With the signature valid, we'll proceed to mark the
@ -1906,7 +1909,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
msg.ShortChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
log.Debugf("Removed edge with chan_id=%v from zombie "+
@ -1949,7 +1952,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// NOTE: We don't return anything on the error channel
// for this message, as we expect that will be done when
// this ChannelUpdate is later reprocessed.
return nil
return nil, false
default:
err := fmt.Errorf("unable to validate channel update "+
@ -1960,7 +1963,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
return nil
return nil, false
}
// The least-significant bit in the flag on the channel update
@ -1997,7 +2000,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.cfg.RebroadcastInterval,
shortChanID)
nMsg.err <- nil
return nil
return nil, false
}
} else {
// If it's not, we'll allow an update per minute
@ -2024,7 +2027,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID,
pubKey.SerializeCompressed())
nMsg.err <- nil
return nil
return nil, false
}
}
}
@ -2040,7 +2043,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Error(rErr)
nMsg.err <- rErr
return nil
return nil, false
}
update := &channeldb.ChannelEdgePolicy{
@ -2069,7 +2072,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}
nMsg.err <- err
return nil
return nil, false
}
// If this is a local ChannelUpdate without an AuthProof, it
@ -2094,7 +2097,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
msg.MsgType(), msg.ShortChannelID,
remotePubKey, err)
nMsg.err <- err
return nil
return nil, false
}
}
@ -2111,7 +2114,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}
nMsg.err <- nil
return announcements
return announcements, true
// A new signature announcement has been received. This indicates
// willingness of nodes involved in the funding of a channel to
@ -2140,7 +2143,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.bestHeight, needBlockHeight)
d.Unlock()
nMsg.err <- nil
return nil
return nil, false
}
d.Unlock()
@ -2166,14 +2169,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
log.Infof("Orphan %v proof announcement with "+
"short_chan_id=%v, adding "+
"to waiting batch", prefix, shortChanID)
nMsg.err <- nil
return nil
return nil, false
}
nodeID := nMsg.source.SerializeCompressed()
@ -2188,7 +2191,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
"short_chan_id=%v", shortChanID)
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
// If proof was sent by a local sub-system, then we'll
@ -2212,7 +2215,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
msg.MsgType(), msg.ShortChannelID,
remotePubKey, err)
nMsg.err <- err
return nil
return nil, false
}
}
@ -2265,7 +2268,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Debugf("Already have proof for channel "+
"with chanID=%v", msg.ChannelID)
nMsg.err <- nil
return nil
return nil, true
}
// Check that we received the opposite proof. If so, then we're
@ -2283,7 +2286,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
if err == channeldb.ErrWaitingProofNotFound {
@ -2294,7 +2297,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
log.Infof("1/2 of channel ann proof received for "+
@ -2302,7 +2305,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID)
nMsg.err <- nil
return nil
return nil, false
}
// We now have both halves of the channel announcement proof,
@ -2326,7 +2329,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
if err != nil {
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
// With all the necessary components assembled validate the
@ -2338,7 +2341,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
// If the channel was returned by the router it means that
@ -2354,7 +2357,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
"channel chanID=%v: %v", msg.ChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
@ -2364,7 +2367,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
msg.ChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
return nil, false
}
// Proof was successfully created and now can announce the
@ -2431,11 +2434,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}
nMsg.err <- nil
return announcements
return announcements, true
default:
nMsg.err <- errors.New("wrong type of the announcement")
return nil
err := errors.New("wrong type of the announcement")
nMsg.err <- err
return nil, false
}
}

9
routing/router.go

@ -967,7 +967,8 @@ func (r *ChannelRouter) networkHandler() {
update.msg,
)
if err != nil {
if err != ErrVBarrierShuttingDown {
if err != ErrVBarrierShuttingDown &&
err != ErrParentValidationFailed {
log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
@ -985,7 +986,11 @@ func (r *ChannelRouter) networkHandler() {
// If this message had any dependencies, then
// we can now signal them to continue.
validationBarrier.SignalDependants(update.msg)
allowDependents := err == nil ||
IsError(err, ErrIgnored, ErrOutdated)
validationBarrier.SignalDependants(
update.msg, allowDependents,
)
if err != nil {
return
}

103
routing/validation_barrier.go

@ -9,10 +9,28 @@ import (
"github.com/lightningnetwork/lnd/routing/route"
)
// ErrVBarrierShuttingDown signals that the barrier has been requested to
// shutdown, and that the caller should not treat the wait condition as
// fulfilled.
var ErrVBarrierShuttingDown = errors.New("validation barrier shutting down")
var (
// ErrVBarrierShuttingDown signals that the barrier has been requested
// to shutdown, and that the caller should not treat the wait condition
// as fulfilled.
ErrVBarrierShuttingDown = errors.New("validation barrier shutting down")
// ErrParentValidationFailed signals that the validation of a
// dependent's parent failed, so the dependent must not be processed.
ErrParentValidationFailed = errors.New("parent validation failed")
)
// validationSignals contains two signals which allows the ValidationBarrier to
// communicate back to the caller whether a dependent should be processed or not
// based on whether its parent was successfully validated. Only one of these
// signals is to be used at a time.
type validationSignals struct {
// allow is the signal used to allow a dependent to be processed.
allow chan struct{}
// deny is the signal used to prevent a dependent from being processed.
deny chan struct{}
}
// ValidationBarrier is a barrier used to ensure proper validation order while
// concurrently validating new announcements for channel edges, and the
@ -31,19 +49,19 @@ type ValidationBarrier struct {
// ChannelAnnouncement like validation job going on. Once the job has
// been completed, the channel will be closed unblocking any
// dependants.
chanAnnFinSignal map[lnwire.ShortChannelID]chan struct{}
chanAnnFinSignal map[lnwire.ShortChannelID]*validationSignals
// chanEdgeDependencies tracks any channel edge updates which should
// wait until the completion of the ChannelAnnouncement before
// proceeding. This is a dependency, as we can't validate the update
// before we validate the announcement which creates the channel
// itself.
chanEdgeDependencies map[lnwire.ShortChannelID]chan struct{}
chanEdgeDependencies map[lnwire.ShortChannelID]*validationSignals
// nodeAnnDependencies tracks any pending NodeAnnouncement validation
// jobs which should wait until the completion of the
// ChannelAnnouncement before proceeding.
nodeAnnDependencies map[route.Vertex]chan struct{}
nodeAnnDependencies map[route.Vertex]*validationSignals
quit chan struct{}
sync.Mutex
@ -56,9 +74,9 @@ func NewValidationBarrier(numActiveReqs int,
quitChan chan struct{}) *ValidationBarrier {
v := &ValidationBarrier{
chanAnnFinSignal: make(map[lnwire.ShortChannelID]chan struct{}),
chanEdgeDependencies: make(map[lnwire.ShortChannelID]chan struct{}),
nodeAnnDependencies: make(map[route.Vertex]chan struct{}),
chanAnnFinSignal: make(map[lnwire.ShortChannelID]*validationSignals),
chanEdgeDependencies: make(map[lnwire.ShortChannelID]*validationSignals),
nodeAnnDependencies: make(map[route.Vertex]*validationSignals),
quit: quitChan,
}
@ -107,24 +125,31 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) {
// validate this announcement. All dependants will
// point to this same channel, so they'll be unblocked
// at the same time.
annFinCond := make(chan struct{})
v.chanAnnFinSignal[msg.ShortChannelID] = annFinCond
v.chanEdgeDependencies[msg.ShortChannelID] = annFinCond
signals := &validationSignals{
allow: make(chan struct{}),
deny: make(chan struct{}),
}
v.chanAnnFinSignal[msg.ShortChannelID] = signals
v.chanEdgeDependencies[msg.ShortChannelID] = signals
v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = annFinCond
v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = annFinCond
v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals
v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals
}
case *channeldb.ChannelEdgeInfo:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
if _, ok := v.chanAnnFinSignal[shortID]; !ok {
annFinCond := make(chan struct{})
signals := &validationSignals{
allow: make(chan struct{}),
deny: make(chan struct{}),
}
v.chanAnnFinSignal[shortID] = annFinCond
v.chanEdgeDependencies[shortID] = annFinCond
v.chanAnnFinSignal[shortID] = signals
v.chanEdgeDependencies[shortID] = signals
v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = annFinCond
v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = annFinCond
v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = signals
v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = signals
}
// These other types don't have any dependants, so no further
@ -162,8 +187,8 @@ func (v *ValidationBarrier) CompleteJob() {
func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
var (
signal chan struct{}
ok bool
signals *validationSignals
ok bool
)
v.Lock()
@ -173,15 +198,15 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
// completion of any active ChannelAnnouncement jobs related to them.
case *channeldb.ChannelEdgePolicy:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
signal, ok = v.chanEdgeDependencies[shortID]
signals, ok = v.chanEdgeDependencies[shortID]
case *channeldb.LightningNode:
vertex := route.Vertex(msg.PubKeyBytes)
signal, ok = v.nodeAnnDependencies[vertex]
signals, ok = v.nodeAnnDependencies[vertex]
case *lnwire.ChannelUpdate:
signal, ok = v.chanEdgeDependencies[msg.ShortChannelID]
signals, ok = v.chanEdgeDependencies[msg.ShortChannelID]
case *lnwire.NodeAnnouncement:
vertex := route.Vertex(msg.NodeID)
signal, ok = v.nodeAnnDependencies[vertex]
signals, ok = v.nodeAnnDependencies[vertex]
// Other types of jobs can be executed immediately, so we'll just
// return directly.
@ -204,7 +229,9 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
select {
case <-v.quit:
return ErrVBarrierShuttingDown
case <-signal:
case <-signals.deny:
return ErrParentValidationFailed
case <-signals.allow:
return nil
}
}
@ -212,10 +239,10 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
return nil
}
// SignalDependants will signal any jobs that are dependent on this job that
// SignalDependants will allow/deny any jobs that are dependent on this job that
// they can continue execution. If the job doesn't have any dependants, then
// this function sill exit immediately.
func (v *ValidationBarrier) SignalDependants(job interface{}) {
func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) {
v.Lock()
defer v.Unlock()
@ -223,18 +250,26 @@ func (v *ValidationBarrier) SignalDependants(job interface{}) {
// If we've just finished executing a ChannelAnnouncement, then we'll
// close out the signal, and remove the signal from the map of active
// ones. This will allow any dependent jobs to continue execution.
// ones. This will allow/deny any dependent jobs to continue execution.
case *channeldb.ChannelEdgeInfo:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
finSignal, ok := v.chanAnnFinSignal[shortID]
finSignals, ok := v.chanAnnFinSignal[shortID]
if ok {
close(finSignal)
if allow {
close(finSignals.allow)
} else {
close(finSignals.deny)
}
delete(v.chanAnnFinSignal, shortID)
}
case *lnwire.ChannelAnnouncement:
finSignal, ok := v.chanAnnFinSignal[msg.ShortChannelID]
finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID]
if ok {
close(finSignal)
if allow {
close(finSignals.allow)
} else {
close(finSignals.deny)
}
delete(v.chanAnnFinSignal, msg.ShortChannelID)
}

18
routing/validation_barrier_test.go

@ -12,6 +12,8 @@ import (
// TestValidationBarrierSemaphore checks basic properties of the validation
// barrier's semaphore wrt. enqueuing/dequeuing.
func TestValidationBarrierSemaphore(t *testing.T) {
t.Parallel()
const (
numTasks = 8
numPendingTasks = 8
@ -59,6 +61,8 @@ func TestValidationBarrierSemaphore(t *testing.T) {
// TestValidationBarrierQuit checks that pending validation tasks will return an
// error from WaitForDependants if the barrier's quit signal is canceled.
func TestValidationBarrierQuit(t *testing.T) {
t.Parallel()
const (
numTasks = 8
timeout = 50 * time.Millisecond
@ -113,9 +117,14 @@ func TestValidationBarrierQuit(t *testing.T) {
// with the correct error.
for i := 0; i < numTasks; i++ {
switch {
// First half, signal completion and task semaphore
// Signal completion for the first half of tasks, but only allow
// dependents to be processed as well for the second quarter.
case i < numTasks/4:
barrier.SignalDependants(anns[i], false)
barrier.CompleteJob()
case i < numTasks/2:
barrier.SignalDependants(anns[i])
barrier.SignalDependants(anns[i], true)
barrier.CompleteJob()
// At midpoint, quit the validation barrier.
@ -132,7 +141,10 @@ func TestValidationBarrierQuit(t *testing.T) {
switch {
// First half should return without failure.
case i < numTasks/2 && err != nil:
case i < numTasks/4 && err != routing.ErrParentValidationFailed:
t.Fatalf("unexpected failure while waiting: %v", err)
case i >= numTasks/4 && i < numTasks/2 && err != nil:
t.Fatalf("unexpected failure while waiting: %v", err)
// Last half should return the shutdown error.

Loading…
Cancel
Save