diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 750f4e24..2c4c07b0 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -990,7 +990,7 @@ func (d *AuthenticatedGossiper) networkHandler() { // Channel announcement signatures are amongst the only // messages that we'll process serially. case *lnwire.AnnounceSignatures: - emittedAnnouncements := d.processNetworkAnnouncement( + emittedAnnouncements, _ := d.processNetworkAnnouncement( announcement, ) if emittedAnnouncements != nil { @@ -1040,14 +1040,14 @@ func (d *AuthenticatedGossiper) networkHandler() { // determine if this is either a new // announcement from our PoV or an edges to a // prior vertex/edge we previously proceeded. - emittedAnnouncements := d.processNetworkAnnouncement( + emittedAnnouncements, allowDependents := d.processNetworkAnnouncement( announcement, ) // If this message had any dependencies, then // we can now signal them to continue. validationBarrier.SignalDependants( - announcement.msg, + announcement.msg, allowDependents, ) // If the announcement was accepted, then add @@ -1514,9 +1514,11 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement, // channel or node announcement or announcements proofs. If the announcement // didn't affect the internal state due to either being out of date, invalid, // or redundant, then nil is returned. Otherwise, the set of announcements will -// be returned which should be broadcasted to the rest of the network. +// be returned which should be broadcasted to the rest of the network. The +// boolean returned indicates whether any dependents of the announcement should +// attempt to be processed as well. func (d *AuthenticatedGossiper) processNetworkAnnouncement( - nMsg *networkMsg) []networkMsg { + nMsg *networkMsg) ([]networkMsg, bool) { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { // TODO(roasbeef) make height delta 6 @@ -1546,7 +1548,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // signatures if not required. if d.cfg.Router.IsStaleNode(msg.NodeID, timestamp) { nMsg.err <- nil - return nil + return nil, true } if err := d.addNode(msg, schedulerOp...); err != nil { @@ -1559,7 +1561,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- err - return nil + return nil, false } // In order to ensure we don't leak unadvertised nodes, we'll @@ -1570,7 +1572,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Errorf("Unable to determine if node %x is "+ "advertised: %v", msg.NodeID, err) nMsg.err <- err - return nil + return nil, false } // If it does, we'll add their announcement to our batch so that @@ -1588,7 +1590,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( nMsg.err <- nil // TODO(roasbeef): get rid of the above - return announcements + return announcements, true // A new channel announcement has arrived, this indicates the // *creation* of a new channel within the network. This only advertises @@ -1608,7 +1610,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.rejectMtx.Unlock() nMsg.err <- err - return nil + return nil, false } // If the advertised inclusionary block is beyond our knowledge @@ -1623,7 +1625,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.bestHeight) d.Unlock() nMsg.err <- nil - return nil + return nil, false } d.Unlock() @@ -1632,7 +1634,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // below. if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { nMsg.err <- nil - return nil + return nil, true } // If this is a remote channel announcement, then we'll validate @@ -1649,7 +1651,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Error(err) nMsg.err <- err - return nil + return nil, false } // If the proof checks out, then we'll save the proof @@ -1669,7 +1671,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( if err := msg.Features.Encode(&featureBuf); err != nil { log.Errorf("unable to encode features: %v", err) nMsg.err <- err - return nil + return nil, false } edge := &channeldb.ChannelEdgeInfo{ @@ -1720,7 +1722,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} d.rejectMtx.Unlock() nMsg.err <- rErr - return nil + return nil, false } // If while processing this rejected edge, we @@ -1729,7 +1731,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // directly. if len(anns) != 0 { nMsg.err <- nil - return anns + return anns, true } // Otherwise, this is just a regular rejected @@ -1742,7 +1744,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- err - return nil + return nil, false } // If we earlier received any ChannelUpdates for this channel, @@ -1806,7 +1808,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- nil - return announcements + return announcements, true // A new authenticated channel edge update has arrived. This indicates // that the directional information for an already known channel has @@ -1825,7 +1827,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.rejectMtx.Unlock() nMsg.err <- err - return nil + return nil, false } blockHeight := msg.ShortChannelID.BlockHeight @@ -1842,7 +1844,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, blockHeight, d.bestHeight) d.Unlock() - return nil + nMsg.err <- nil + return nil, false } d.Unlock() @@ -1854,7 +1857,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.ShortChannelID, timestamp, msg.ChannelFlags, ) { nMsg.err <- nil - return nil + return nil, true } // Get the node pub key as far as we don't have it in channel @@ -1893,7 +1896,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( "update signature: %v", err) log.Error(err) nMsg.err <- err - return nil + return nil, false } // With the signature valid, we'll proceed to mark the @@ -1906,7 +1909,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.ShortChannelID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } log.Debugf("Removed edge with chan_id=%v from zombie "+ @@ -1949,7 +1952,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // NOTE: We don't return anything on the error channel // for this message, as we expect that will be done when // this ChannelUpdate is later reprocessed. - return nil + return nil, false default: err := fmt.Errorf("unable to validate channel update "+ @@ -1960,7 +1963,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.rejectMtx.Lock() d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} d.rejectMtx.Unlock() - return nil + return nil, false } // The least-significant bit in the flag on the channel update @@ -1997,7 +2000,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.cfg.RebroadcastInterval, shortChanID) nMsg.err <- nil - return nil + return nil, false } } else { // If it's not, we'll allow an update per minute @@ -2024,7 +2027,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, pubKey.SerializeCompressed()) nMsg.err <- nil - return nil + return nil, false } } } @@ -2040,7 +2043,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Error(rErr) nMsg.err <- rErr - return nil + return nil, false } update := &channeldb.ChannelEdgePolicy{ @@ -2069,7 +2072,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- err - return nil + return nil, false } // If this is a local ChannelUpdate without an AuthProof, it @@ -2094,7 +2097,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.MsgType(), msg.ShortChannelID, remotePubKey, err) nMsg.err <- err - return nil + return nil, false } } @@ -2111,7 +2114,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- nil - return announcements + return announcements, true // A new signature announcement has been received. This indicates // willingness of nodes involved in the funding of a channel to @@ -2140,7 +2143,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.bestHeight, needBlockHeight) d.Unlock() nMsg.err <- nil - return nil + return nil, false } d.Unlock() @@ -2166,14 +2169,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } log.Infof("Orphan %v proof announcement with "+ "short_chan_id=%v, adding "+ "to waiting batch", prefix, shortChanID) nMsg.err <- nil - return nil + return nil, false } nodeID := nMsg.source.SerializeCompressed() @@ -2188,7 +2191,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( "short_chan_id=%v", shortChanID) log.Error(err) nMsg.err <- err - return nil + return nil, false } // If proof was sent by a local sub-system, then we'll @@ -2212,7 +2215,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.MsgType(), msg.ShortChannelID, remotePubKey, err) nMsg.err <- err - return nil + return nil, false } } @@ -2265,7 +2268,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Debugf("Already have proof for channel "+ "with chanID=%v", msg.ChannelID) nMsg.err <- nil - return nil + return nil, true } // Check that we received the opposite proof. If so, then we're @@ -2283,7 +2286,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } if err == channeldb.ErrWaitingProofNotFound { @@ -2294,7 +2297,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } log.Infof("1/2 of channel ann proof received for "+ @@ -2302,7 +2305,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID) nMsg.err <- nil - return nil + return nil, false } // We now have both halves of the channel announcement proof, @@ -2326,7 +2329,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( if err != nil { log.Error(err) nMsg.err <- err - return nil + return nil, false } // With all the necessary components assembled validate the @@ -2338,7 +2341,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Error(err) nMsg.err <- err - return nil + return nil, false } // If the channel was returned by the router it means that @@ -2354,7 +2357,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( "channel chanID=%v: %v", msg.ChannelID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey()) @@ -2364,7 +2367,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( msg.ChannelID, err) log.Error(err) nMsg.err <- err - return nil + return nil, false } // Proof was successfully created and now can announce the @@ -2431,11 +2434,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } nMsg.err <- nil - return announcements + return announcements, true default: - nMsg.err <- errors.New("wrong type of the announcement") - return nil + err := errors.New("wrong type of the announcement") + nMsg.err <- err + return nil, false } } diff --git a/routing/router.go b/routing/router.go index 0cb0ced8..2100ffb6 100644 --- a/routing/router.go +++ b/routing/router.go @@ -967,7 +967,8 @@ func (r *ChannelRouter) networkHandler() { update.msg, ) if err != nil { - if err != ErrVBarrierShuttingDown { + if err != ErrVBarrierShuttingDown && + err != ErrParentValidationFailed { log.Warnf("unexpected error "+ "during validation "+ "barrier shutdown: %v", @@ -985,7 +986,11 @@ func (r *ChannelRouter) networkHandler() { // If this message had any dependencies, then // we can now signal them to continue. - validationBarrier.SignalDependants(update.msg) + allowDependents := err == nil || + IsError(err, ErrIgnored, ErrOutdated) + validationBarrier.SignalDependants( + update.msg, allowDependents, + ) if err != nil { return } diff --git a/routing/validation_barrier.go b/routing/validation_barrier.go index 58500abb..c8c5e7b3 100644 --- a/routing/validation_barrier.go +++ b/routing/validation_barrier.go @@ -9,10 +9,28 @@ import ( "github.com/lightningnetwork/lnd/routing/route" ) -// ErrVBarrierShuttingDown signals that the barrier has been requested to -// shutdown, and that the caller should not treat the wait condition as -// fulfilled. -var ErrVBarrierShuttingDown = errors.New("validation barrier shutting down") +var ( + // ErrVBarrierShuttingDown signals that the barrier has been requested + // to shutdown, and that the caller should not treat the wait condition + // as fulfilled. + ErrVBarrierShuttingDown = errors.New("validation barrier shutting down") + + // ErrParentValidationFailed signals that the validation of a + // dependent's parent failed, so the dependent must not be processed. + ErrParentValidationFailed = errors.New("parent validation failed") +) + +// validationSignals contains two signals which allows the ValidationBarrier to +// communicate back to the caller whether a dependent should be processed or not +// based on whether its parent was successfully validated. Only one of these +// signals is to be used at a time. +type validationSignals struct { + // allow is the signal used to allow a dependent to be processed. + allow chan struct{} + + // deny is the signal used to prevent a dependent from being processed. + deny chan struct{} +} // ValidationBarrier is a barrier used to ensure proper validation order while // concurrently validating new announcements for channel edges, and the @@ -31,19 +49,19 @@ type ValidationBarrier struct { // ChannelAnnouncement like validation job going on. Once the job has // been completed, the channel will be closed unblocking any // dependants. - chanAnnFinSignal map[lnwire.ShortChannelID]chan struct{} + chanAnnFinSignal map[lnwire.ShortChannelID]*validationSignals // chanEdgeDependencies tracks any channel edge updates which should // wait until the completion of the ChannelAnnouncement before // proceeding. This is a dependency, as we can't validate the update // before we validate the announcement which creates the channel // itself. - chanEdgeDependencies map[lnwire.ShortChannelID]chan struct{} + chanEdgeDependencies map[lnwire.ShortChannelID]*validationSignals // nodeAnnDependencies tracks any pending NodeAnnouncement validation // jobs which should wait until the completion of the // ChannelAnnouncement before proceeding. - nodeAnnDependencies map[route.Vertex]chan struct{} + nodeAnnDependencies map[route.Vertex]*validationSignals quit chan struct{} sync.Mutex @@ -56,9 +74,9 @@ func NewValidationBarrier(numActiveReqs int, quitChan chan struct{}) *ValidationBarrier { v := &ValidationBarrier{ - chanAnnFinSignal: make(map[lnwire.ShortChannelID]chan struct{}), - chanEdgeDependencies: make(map[lnwire.ShortChannelID]chan struct{}), - nodeAnnDependencies: make(map[route.Vertex]chan struct{}), + chanAnnFinSignal: make(map[lnwire.ShortChannelID]*validationSignals), + chanEdgeDependencies: make(map[lnwire.ShortChannelID]*validationSignals), + nodeAnnDependencies: make(map[route.Vertex]*validationSignals), quit: quitChan, } @@ -107,24 +125,31 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) { // validate this announcement. All dependants will // point to this same channel, so they'll be unblocked // at the same time. - annFinCond := make(chan struct{}) - v.chanAnnFinSignal[msg.ShortChannelID] = annFinCond - v.chanEdgeDependencies[msg.ShortChannelID] = annFinCond + signals := &validationSignals{ + allow: make(chan struct{}), + deny: make(chan struct{}), + } - v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = annFinCond - v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = annFinCond + v.chanAnnFinSignal[msg.ShortChannelID] = signals + v.chanEdgeDependencies[msg.ShortChannelID] = signals + + v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals + v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals } case *channeldb.ChannelEdgeInfo: shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) if _, ok := v.chanAnnFinSignal[shortID]; !ok { - annFinCond := make(chan struct{}) + signals := &validationSignals{ + allow: make(chan struct{}), + deny: make(chan struct{}), + } - v.chanAnnFinSignal[shortID] = annFinCond - v.chanEdgeDependencies[shortID] = annFinCond + v.chanAnnFinSignal[shortID] = signals + v.chanEdgeDependencies[shortID] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = annFinCond - v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = annFinCond + v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = signals + v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = signals } // These other types don't have any dependants, so no further @@ -162,8 +187,8 @@ func (v *ValidationBarrier) CompleteJob() { func (v *ValidationBarrier) WaitForDependants(job interface{}) error { var ( - signal chan struct{} - ok bool + signals *validationSignals + ok bool ) v.Lock() @@ -173,15 +198,15 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { // completion of any active ChannelAnnouncement jobs related to them. case *channeldb.ChannelEdgePolicy: shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - signal, ok = v.chanEdgeDependencies[shortID] + signals, ok = v.chanEdgeDependencies[shortID] case *channeldb.LightningNode: vertex := route.Vertex(msg.PubKeyBytes) - signal, ok = v.nodeAnnDependencies[vertex] + signals, ok = v.nodeAnnDependencies[vertex] case *lnwire.ChannelUpdate: - signal, ok = v.chanEdgeDependencies[msg.ShortChannelID] + signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] case *lnwire.NodeAnnouncement: vertex := route.Vertex(msg.NodeID) - signal, ok = v.nodeAnnDependencies[vertex] + signals, ok = v.nodeAnnDependencies[vertex] // Other types of jobs can be executed immediately, so we'll just // return directly. @@ -204,7 +229,9 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { select { case <-v.quit: return ErrVBarrierShuttingDown - case <-signal: + case <-signals.deny: + return ErrParentValidationFailed + case <-signals.allow: return nil } } @@ -212,10 +239,10 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { return nil } -// SignalDependants will signal any jobs that are dependent on this job that +// SignalDependants will allow/deny any jobs that are dependent on this job that // they can continue execution. If the job doesn't have any dependants, then // this function sill exit immediately. -func (v *ValidationBarrier) SignalDependants(job interface{}) { +func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { v.Lock() defer v.Unlock() @@ -223,18 +250,26 @@ func (v *ValidationBarrier) SignalDependants(job interface{}) { // If we've just finished executing a ChannelAnnouncement, then we'll // close out the signal, and remove the signal from the map of active - // ones. This will allow any dependent jobs to continue execution. + // ones. This will allow/deny any dependent jobs to continue execution. case *channeldb.ChannelEdgeInfo: shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - finSignal, ok := v.chanAnnFinSignal[shortID] + finSignals, ok := v.chanAnnFinSignal[shortID] if ok { - close(finSignal) + if allow { + close(finSignals.allow) + } else { + close(finSignals.deny) + } delete(v.chanAnnFinSignal, shortID) } case *lnwire.ChannelAnnouncement: - finSignal, ok := v.chanAnnFinSignal[msg.ShortChannelID] + finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID] if ok { - close(finSignal) + if allow { + close(finSignals.allow) + } else { + close(finSignals.deny) + } delete(v.chanAnnFinSignal, msg.ShortChannelID) } diff --git a/routing/validation_barrier_test.go b/routing/validation_barrier_test.go index 6fd3930b..248c1dc2 100644 --- a/routing/validation_barrier_test.go +++ b/routing/validation_barrier_test.go @@ -12,6 +12,8 @@ import ( // TestValidationBarrierSemaphore checks basic properties of the validation // barrier's semaphore wrt. enqueuing/dequeuing. func TestValidationBarrierSemaphore(t *testing.T) { + t.Parallel() + const ( numTasks = 8 numPendingTasks = 8 @@ -59,6 +61,8 @@ func TestValidationBarrierSemaphore(t *testing.T) { // TestValidationBarrierQuit checks that pending validation tasks will return an // error from WaitForDependants if the barrier's quit signal is canceled. func TestValidationBarrierQuit(t *testing.T) { + t.Parallel() + const ( numTasks = 8 timeout = 50 * time.Millisecond @@ -113,9 +117,14 @@ func TestValidationBarrierQuit(t *testing.T) { // with the correct error. for i := 0; i < numTasks; i++ { switch { - // First half, signal completion and task semaphore + // Signal completion for the first half of tasks, but only allow + // dependents to be processed as well for the second quarter. + case i < numTasks/4: + barrier.SignalDependants(anns[i], false) + barrier.CompleteJob() + case i < numTasks/2: - barrier.SignalDependants(anns[i]) + barrier.SignalDependants(anns[i], true) barrier.CompleteJob() // At midpoint, quit the validation barrier. @@ -132,7 +141,10 @@ func TestValidationBarrierQuit(t *testing.T) { switch { // First half should return without failure. - case i < numTasks/2 && err != nil: + case i < numTasks/4 && err != routing.ErrParentValidationFailed: + t.Fatalf("unexpected failure while waiting: %v", err) + + case i >= numTasks/4 && i < numTasks/2 && err != nil: t.Fatalf("unexpected failure while waiting: %v", err) // Last half should return the shutdown error.