From 393111cea903de65302ceadbf7737eb7f438d778 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 22 Mar 2021 15:32:24 -0700 Subject: [PATCH] discovery+routing: cancel dependent jobs if parent validation fails Previously, we would always allow dependent jobs to be processed, regardless of the result of its parent job's validation. This isn't correct, as a parent job contains actions necessary to successfully process a dependent job. A prime example of this can be found within the AuthenticatedGossiper, where an incoming channel announcement and update are both processed, but if the channel announcement job fails to complete, then the gossiper is unable to properly validate the update. This commit aims to address this by preventing the dependent jobs to run. --- discovery/gossiper.go | 98 ++++++++++++++------------- routing/router.go | 9 ++- routing/validation_barrier.go | 103 +++++++++++++++++++---------- routing/validation_barrier_test.go | 18 ++++- 4 files changed, 142 insertions(+), 86 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 750f4e24..2c4c07b0 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -990,7 +990,7 @@ func (d *AuthenticatedGossiper) networkHandler() { // Channel announcement signatures are amongst the only // messages that we'll process serially. case *lnwire.AnnounceSignatures: - emittedAnnouncements := d.processNetworkAnnouncement( + emittedAnnouncements, _ := d.processNetworkAnnouncement( announcement, ) if emittedAnnouncements != nil { @@ -1040,14 +1040,14 @@ func (d *AuthenticatedGossiper) networkHandler() { // determine if this is either a new // announcement from our PoV or an edges to a // prior vertex/edge we previously proceeded. - emittedAnnouncements := d.processNetworkAnnouncement( + emittedAnnouncements, allowDependents := d.processNetworkAnnouncement( announcement, ) // If this message had any dependencies, then // we can now signal them to continue. validationBarrier.SignalDependants( - announcement.msg, + announcement.msg, allowDependents, ) // If the announcement was accepted, then add @@ -1514,9 +1514,11 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement, // channel or node announcement or announcements proofs. If the announcement // didn't affect the internal state due to either being out of date, invalid, // or redundant, then nil is returned. Otherwise, the set of announcements will -// be returned which should be broadcasted to the rest of the network. +// be returned which should be broadcasted to the rest of the network. The +// boolean returned indicates whether any dependents of the announcement should +// attempt to be processed as well. func (d *AuthenticatedGossiper) processNetworkAnnouncement( - nMsg *networkMsg) []networkMsg { + nMsg *networkMsg) ([]networkMsg, bool) { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { // TODO(roasbeef) make height delta 6 @@ -1546,7 +1548,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // signatures if not required. if d.cfg.Router.IsStaleNode(msg.NodeID, timestamp) { nMsg.err <- nil - return nil + return nil, true } if err := d.addNode(msg, schedulerOp...); err != nil { @@ -1559,7 +1561,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- err - return nil + return nil, false } // In order to ensure we don't leak unadvertised nodes, we'll @@ -1570,7 +1572,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Errorf("Unable to determine if node %x is "+ "advertised: %v", msg.NodeID, err) nMsg.err <- err - return nil + return nil, false } // If it does, we'll add their announcement to our batch so that @@ -1588,7 +1590,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( nMsg.err <- nil // TODO(roasbeef): get rid of the above - return announcements + return announcements, true // A new channel announcement has arrived, this indicates the // *creation* of a new channel within the network. This only advertises @@ -1608,7 +1610,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.rejectMtx.Unlock() nMsg.err <- err - return nil + return nil, false } // If the advertised inclusionary block is beyond our knowledge @@ -1623,7 +1625,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.bestHeight) d.Unlock() nMsg.err <- nil - return nil + return nil, false } d.Unlock() @@ -1632,7 +1634,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // below. if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { nMsg.err <- nil - return nil + return nil, true } // If this is a remote channel announcement, then we'll validate @@ -1649,7 +1651,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Error(err) nMsg.err <- err - return nil + return nil, false } // If the proof checks out, then we'll save the proof @@ -1669,7 +1671,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( if err := msg.Features.Encode(&featureBuf); err != nil { log.Errorf("unable to encode features: %v", err) nMsg.err <- err - return nil + return nil, false } edge := &channeldb.ChannelEdgeInfo{ @@ -1720,7 +1722,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} d.rejectMtx.Unlock() nMsg.err <- rErr - return nil + return nil, false } // If while processing this rejected edge, we @@ -1729,7 +1731,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // directly. if len(anns) != 0 { nMsg.err <- nil - return anns + return anns, true } // Otherwise, this is just a regular rejected @@ -1742,7 +1744,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- err - return nil + return nil, false } // If we earlier received any ChannelUpdates for this channel, @@ -1806,7 +1808,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- nil - return announcements + return announcements, true // A new authenticated channel edge update has arrived. This indicates // that the directional information for an already known channel has @@ -1825,7 +1827,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.rejectMtx.Unlock() nMsg.err <- err - return nil + return nil, false } blockHeight := msg.ShortChannelID.BlockHeight @@ -1842,7 +1844,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, blockHeight, d.bestHeight) d.Unlock() - return nil + nMsg.err <- nil + return nil, false } d.Unlock() @@ -1854,7 +1857,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.ShortChannelID, timestamp, msg.ChannelFlags, ) { nMsg.err <- nil - return nil + return nil, true } // Get the node pub key as far as we don't have it in channel @@ -1893,7 +1896,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( "update signature: %v", err) log.Error(err) nMsg.err <- err - return nil + return nil, false } // With the signature valid, we'll proceed to mark the @@ -1906,7 +1909,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.ShortChannelID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } log.Debugf("Removed edge with chan_id=%v from zombie "+ @@ -1949,7 +1952,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // NOTE: We don't return anything on the error channel // for this message, as we expect that will be done when // this ChannelUpdate is later reprocessed. - return nil + return nil, false default: err := fmt.Errorf("unable to validate channel update "+ @@ -1960,7 +1963,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.rejectMtx.Lock() d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} d.rejectMtx.Unlock() - return nil + return nil, false } // The least-significant bit in the flag on the channel update @@ -1997,7 +2000,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.cfg.RebroadcastInterval, shortChanID) nMsg.err <- nil - return nil + return nil, false } } else { // If it's not, we'll allow an update per minute @@ -2024,7 +2027,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, pubKey.SerializeCompressed()) nMsg.err <- nil - return nil + return nil, false } } } @@ -2040,7 +2043,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Error(rErr) nMsg.err <- rErr - return nil + return nil, false } update := &channeldb.ChannelEdgePolicy{ @@ -2069,7 +2072,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- err - return nil + return nil, false } // If this is a local ChannelUpdate without an AuthProof, it @@ -2094,7 +2097,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.MsgType(), msg.ShortChannelID, remotePubKey, err) nMsg.err <- err - return nil + return nil, false } } @@ -2111,7 +2114,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- nil - return announcements + return announcements, true // A new signature announcement has been received. This indicates // willingness of nodes involved in the funding of a channel to @@ -2140,7 +2143,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.bestHeight, needBlockHeight) d.Unlock() nMsg.err <- nil - return nil + return nil, false } d.Unlock() @@ -2166,14 +2169,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } log.Infof("Orphan %v proof announcement with "+ "short_chan_id=%v, adding "+ "to waiting batch", prefix, shortChanID) nMsg.err <- nil - return nil + return nil, false } nodeID := nMsg.source.SerializeCompressed() @@ -2188,7 +2191,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( "short_chan_id=%v", shortChanID) log.Error(err) nMsg.err <- err - return nil + return nil, false } // If proof was sent by a local sub-system, then we'll @@ -2212,7 +2215,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.MsgType(), msg.ShortChannelID, remotePubKey, err) nMsg.err <- err - return nil + return nil, false } } @@ -2265,7 +2268,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Debugf("Already have proof for channel "+ "with chanID=%v", msg.ChannelID) nMsg.err <- nil - return nil + return nil, true } // Check that we received the opposite proof. If so, then we're @@ -2283,7 +2286,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } if err == channeldb.ErrWaitingProofNotFound { @@ -2294,7 +2297,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } log.Infof("1/2 of channel ann proof received for "+ @@ -2302,7 +2305,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID) nMsg.err <- nil - return nil + return nil, false } // We now have both halves of the channel announcement proof, @@ -2326,7 +2329,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( if err != nil { log.Error(err) nMsg.err <- err - return nil + return nil, false } // With all the necessary components assembled validate the @@ -2338,7 +2341,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Error(err) nMsg.err <- err - return nil + return nil, false } // If the channel was returned by the router it means that @@ -2354,7 +2357,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( "channel chanID=%v: %v", msg.ChannelID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey()) @@ -2364,7 +2367,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.ChannelID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } // Proof was successfully created and now can announce the @@ -2431,11 +2434,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- nil - return announcements + return announcements, true default: - nMsg.err <- errors.New("wrong type of the announcement") - return nil + err := errors.New("wrong type of the announcement") + nMsg.err <- err + return nil, false } } diff --git a/routing/router.go b/routing/router.go index 0cb0ced8..2100ffb6 100644 --- a/routing/router.go +++ b/routing/router.go @@ -967,7 +967,8 @@ func (r *ChannelRouter) networkHandler() { update.msg, ) if err != nil { - if err != ErrVBarrierShuttingDown { + if err != ErrVBarrierShuttingDown && + err != ErrParentValidationFailed { log.Warnf("unexpected error "+ "during validation "+ "barrier shutdown: %v", @@ -985,7 +986,11 @@ func (r *ChannelRouter) networkHandler() { // If this message had any dependencies, then // we can now signal them to continue. - validationBarrier.SignalDependants(update.msg) + allowDependents := err == nil || + IsError(err, ErrIgnored, ErrOutdated) + validationBarrier.SignalDependants( + update.msg, allowDependents, + ) if err != nil { return } diff --git a/routing/validation_barrier.go b/routing/validation_barrier.go index 58500abb..c8c5e7b3 100644 --- a/routing/validation_barrier.go +++ b/routing/validation_barrier.go @@ -9,10 +9,28 @@ import ( "github.com/lightningnetwork/lnd/routing/route" ) -// ErrVBarrierShuttingDown signals that the barrier has been requested to -// shutdown, and that the caller should not treat the wait condition as -// fulfilled. -var ErrVBarrierShuttingDown = errors.New("validation barrier shutting down") +var ( + // ErrVBarrierShuttingDown signals that the barrier has been requested + // to shutdown, and that the caller should not treat the wait condition + // as fulfilled. + ErrVBarrierShuttingDown = errors.New("validation barrier shutting down") + + // ErrParentValidationFailed signals that the validation of a + // dependent's parent failed, so the dependent must not be processed. + ErrParentValidationFailed = errors.New("parent validation failed") +) + +// validationSignals contains two signals which allows the ValidationBarrier to +// communicate back to the caller whether a dependent should be processed or not +// based on whether its parent was successfully validated. Only one of these +// signals is to be used at a time. +type validationSignals struct { + // allow is the signal used to allow a dependent to be processed. + allow chan struct{} + + // deny is the signal used to prevent a dependent from being processed. + deny chan struct{} +} // ValidationBarrier is a barrier used to ensure proper validation order while // concurrently validating new announcements for channel edges, and the @@ -31,19 +49,19 @@ type ValidationBarrier struct { // ChannelAnnouncement like validation job going on. Once the job has // been completed, the channel will be closed unblocking any // dependants. - chanAnnFinSignal map[lnwire.ShortChannelID]chan struct{} + chanAnnFinSignal map[lnwire.ShortChannelID]*validationSignals // chanEdgeDependencies tracks any channel edge updates which should // wait until the completion of the ChannelAnnouncement before // proceeding. This is a dependency, as we can't validate the update // before we validate the announcement which creates the channel // itself. - chanEdgeDependencies map[lnwire.ShortChannelID]chan struct{} + chanEdgeDependencies map[lnwire.ShortChannelID]*validationSignals // nodeAnnDependencies tracks any pending NodeAnnouncement validation // jobs which should wait until the completion of the // ChannelAnnouncement before proceeding. - nodeAnnDependencies map[route.Vertex]chan struct{} + nodeAnnDependencies map[route.Vertex]*validationSignals quit chan struct{} sync.Mutex @@ -56,9 +74,9 @@ func NewValidationBarrier(numActiveReqs int, quitChan chan struct{}) *ValidationBarrier { v := &ValidationBarrier{ - chanAnnFinSignal: make(map[lnwire.ShortChannelID]chan struct{}), - chanEdgeDependencies: make(map[lnwire.ShortChannelID]chan struct{}), - nodeAnnDependencies: make(map[route.Vertex]chan struct{}), + chanAnnFinSignal: make(map[lnwire.ShortChannelID]*validationSignals), + chanEdgeDependencies: make(map[lnwire.ShortChannelID]*validationSignals), + nodeAnnDependencies: make(map[route.Vertex]*validationSignals), quit: quitChan, } @@ -107,24 +125,31 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) { // validate this announcement. All dependants will // point to this same channel, so they'll be unblocked // at the same time. - annFinCond := make(chan struct{}) - v.chanAnnFinSignal[msg.ShortChannelID] = annFinCond - v.chanEdgeDependencies[msg.ShortChannelID] = annFinCond + signals := &validationSignals{ + allow: make(chan struct{}), + deny: make(chan struct{}), + } - v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = annFinCond - v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = annFinCond + v.chanAnnFinSignal[msg.ShortChannelID] = signals + v.chanEdgeDependencies[msg.ShortChannelID] = signals + + v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals + v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals } case *channeldb.ChannelEdgeInfo: shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) if _, ok := v.chanAnnFinSignal[shortID]; !ok { - annFinCond := make(chan struct{}) + signals := &validationSignals{ + allow: make(chan struct{}), + deny: make(chan struct{}), + } - v.chanAnnFinSignal[shortID] = annFinCond - v.chanEdgeDependencies[shortID] = annFinCond + v.chanAnnFinSignal[shortID] = signals + v.chanEdgeDependencies[shortID] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = annFinCond - v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = annFinCond + v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = signals + v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = signals } // These other types don't have any dependants, so no further @@ -162,8 +187,8 @@ func (v *ValidationBarrier) CompleteJob() { func (v *ValidationBarrier) WaitForDependants(job interface{}) error { var ( - signal chan struct{} - ok bool + signals *validationSignals + ok bool ) v.Lock() @@ -173,15 +198,15 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { // completion of any active ChannelAnnouncement jobs related to them. case *channeldb.ChannelEdgePolicy: shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - signal, ok = v.chanEdgeDependencies[shortID] + signals, ok = v.chanEdgeDependencies[shortID] case *channeldb.LightningNode: vertex := route.Vertex(msg.PubKeyBytes) - signal, ok = v.nodeAnnDependencies[vertex] + signals, ok = v.nodeAnnDependencies[vertex] case *lnwire.ChannelUpdate: - signal, ok = v.chanEdgeDependencies[msg.ShortChannelID] + signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] case *lnwire.NodeAnnouncement: vertex := route.Vertex(msg.NodeID) - signal, ok = v.nodeAnnDependencies[vertex] + signals, ok = v.nodeAnnDependencies[vertex] // Other types of jobs can be executed immediately, so we'll just // return directly. @@ -204,7 +229,9 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { select { case <-v.quit: return ErrVBarrierShuttingDown - case <-signal: + case <-signals.deny: + return ErrParentValidationFailed + case <-signals.allow: return nil } } @@ -212,10 +239,10 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { return nil } -// SignalDependants will signal any jobs that are dependent on this job that +// SignalDependants will allow/deny any jobs that are dependent on this job that // they can continue execution. If the job doesn't have any dependants, then // this function sill exit immediately. -func (v *ValidationBarrier) SignalDependants(job interface{}) { +func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { v.Lock() defer v.Unlock() @@ -223,18 +250,26 @@ func (v *ValidationBarrier) SignalDependants(job interface{}) { // If we've just finished executing a ChannelAnnouncement, then we'll // close out the signal, and remove the signal from the map of active - // ones. This will allow any dependent jobs to continue execution. + // ones. This will allow/deny any dependent jobs to continue execution. case *channeldb.ChannelEdgeInfo: shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - finSignal, ok := v.chanAnnFinSignal[shortID] + finSignals, ok := v.chanAnnFinSignal[shortID] if ok { - close(finSignal) + if allow { + close(finSignals.allow) + } else { + close(finSignals.deny) + } delete(v.chanAnnFinSignal, shortID) } case *lnwire.ChannelAnnouncement: - finSignal, ok := v.chanAnnFinSignal[msg.ShortChannelID] + finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID] if ok { - close(finSignal) + if allow { + close(finSignals.allow) + } else { + close(finSignals.deny) + } delete(v.chanAnnFinSignal, msg.ShortChannelID) } diff --git a/routing/validation_barrier_test.go b/routing/validation_barrier_test.go index 6fd3930b..248c1dc2 100644 --- a/routing/validation_barrier_test.go +++ b/routing/validation_barrier_test.go @@ -12,6 +12,8 @@ import ( // TestValidationBarrierSemaphore checks basic properties of the validation // barrier's semaphore wrt. enqueuing/dequeuing. func TestValidationBarrierSemaphore(t *testing.T) { + t.Parallel() + const ( numTasks = 8 numPendingTasks = 8 @@ -59,6 +61,8 @@ func TestValidationBarrierSemaphore(t *testing.T) { // TestValidationBarrierQuit checks that pending validation tasks will return an // error from WaitForDependants if the barrier's quit signal is canceled. func TestValidationBarrierQuit(t *testing.T) { + t.Parallel() + const ( numTasks = 8 timeout = 50 * time.Millisecond @@ -113,9 +117,14 @@ func TestValidationBarrierQuit(t *testing.T) { // with the correct error. for i := 0; i < numTasks; i++ { switch { - // First half, signal completion and task semaphore + // Signal completion for the first half of tasks, but only allow + // dependents to be processed as well for the second quarter. + case i < numTasks/4: + barrier.SignalDependants(anns[i], false) + barrier.CompleteJob() + case i < numTasks/2: - barrier.SignalDependants(anns[i]) + barrier.SignalDependants(anns[i], true) barrier.CompleteJob() // At midpoint, quit the validation barrier. @@ -132,7 +141,10 @@ func TestValidationBarrierQuit(t *testing.T) { switch { // First half should return without failure. - case i < numTasks/2 && err != nil: + case i < numTasks/4 && err != routing.ErrParentValidationFailed: + t.Fatalf("unexpected failure while waiting: %v", err) + + case i >= numTasks/4 && i < numTasks/2 && err != nil: t.Fatalf("unexpected failure while waiting: %v", err) // Last half should return the shutdown error.