diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 28e09f25..75daea1a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -31,6 +31,10 @@ var ( // AnnounceSignatures messages, by persisting them until a send // operation has succeeded. messageStoreKey = []byte("message-store") + + // ErrGossiperShuttingDown is an error that is returned if the gossiper + // is in the process of being shut down. + ErrGossiperShuttingDown = errors.New("gossiper is shutting down") ) // networkMsg couples a routing related wire message with the peer that @@ -261,7 +265,9 @@ func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error { // containing all the messages to be sent to the target peer. var announceMessages []lnwire.Message - makeNodeAnn := func(n *channeldb.LightningNode) (*lnwire.NodeAnnouncement, error) { + makeNodeAnn := func(n *channeldb.LightningNode) ( + *lnwire.NodeAnnouncement, error) { + alias, _ := lnwire.NewNodeAlias(n.Alias) wireSig, err := lnwire.NewSigFromRawSignature(n.AuthSigBytes) @@ -476,7 +482,7 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, select { case d.networkMsgs <- nMsg: case <-d.quit: - nMsg.err <- errors.New("gossiper has shut down") + nMsg.err <- ErrGossiperShuttingDown } return nMsg.err @@ -502,7 +508,7 @@ func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message, select { case d.networkMsgs <- nMsg: case <-d.quit: - nMsg.err <- errors.New("gossiper has shut down") + nMsg.err <- ErrGossiperShuttingDown } return nMsg.err @@ -893,7 +899,9 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { // gossip syncer for an inbound message so we can properly dispatch the // incoming message. If a gossip syncer isn't found, then one will be created // for the target peer. -func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) { +func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) ( + *gossipSyncer, error) { + target := routing.NewVertex(pub) // First, we'll try to find an existing gossiper for this peer. @@ -988,7 +996,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // First, we'll now create new fully signed updates for // the affected channels and also update the underlying // graph with the new state. - newChanUpdates, err := d.processChanPolicyUpdate(policyUpdate) + newChanUpdates, err := d.processChanPolicyUpdate( + policyUpdate, + ) if err != nil { log.Errorf("Unable to craft policy updates: %v", err) @@ -1061,7 +1071,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // If this message was recently rejected, then we won't // attempt to re-process it. if d.isRecentlyRejectedMsg(announcement.msg) { - announcement.err <- fmt.Errorf("recently rejected") + announcement.err <- fmt.Errorf("recently " + + "rejected") continue } @@ -1088,26 +1099,31 @@ func (d *AuthenticatedGossiper) networkHandler() { "barrier shutdown: %v", err) } + announcement.err <- err return } - // Process the network announcement to determine if - // this is either a new announcement from our PoV - // or an edges to a prior vertex/edge we previously - // proceeded. + // Process the network announcement to + // determine if this is either a new + // announcement from our PoV or an edges to a + // prior vertex/edge we previously proceeded. emittedAnnouncements := d.processNetworkAnnouncement( announcement, ) // If this message had any dependencies, then // we can now signal them to continue. - validationBarrier.SignalDependants(announcement.msg) + validationBarrier.SignalDependants( + announcement.msg, + ) - // If the announcement was accepted, then add the - // emitted announcements to our announce batch to - // be broadcast once the trickle timer ticks gain. + // If the announcement was accepted, then add + // the emitted announcements to our announce + // batch to be broadcast once the trickle timer + // ticks gain. if emittedAnnouncements != nil { - // TODO(roasbeef): exclude peer that sent + // TODO(roasbeef): exclude peer that + // sent. announcements.AddMsgs( emittedAnnouncements..., ) @@ -1133,16 +1149,19 @@ func (d *AuthenticatedGossiper) networkHandler() { // for this height, if so, then we process them once // more as normal announcements. d.Lock() - numPremature := len(d.prematureAnnouncements[uint32(newBlock.Height)]) + numPremature := len(d.prematureAnnouncements[blockHeight]) d.Unlock() - if numPremature != 0 { - log.Infof("Re-processing %v premature "+ - "announcements for height %v", - numPremature, blockHeight) + + // Return early if no announcement to process. + if numPremature == 0 { + continue } + log.Infof("Re-processing %v premature announcements "+ + "for height %v", numPremature, blockHeight) + d.Lock() - for _, ann := range d.prematureAnnouncements[uint32(newBlock.Height)] { + for _, ann := range d.prematureAnnouncements[blockHeight] { emittedAnnouncements := d.processNetworkAnnouncement(ann) if emittedAnnouncements != nil { announcements.AddMsgs( @@ -1208,9 +1227,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // The retransmission timer has ticked which indicates that we // should check if we need to prune or re-broadcast any of our - // personal channels. This addresses the case of "zombie" channels and - // channel advertisements that have been dropped, or not properly - // propagated through the network. + // personal channels. This addresses the case of "zombie" + // channels and channel advertisements that have been dropped, + // or not properly propagated through the network. case <-retransmitTimer.C: if err := d.retransmitStaleChannels(); err != nil { log.Errorf("unable to rebroadcast stale "+ @@ -1233,7 +1252,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // needed to handle new queries. The recvUpdates bool indicates if we should // continue to receive real-time updates from the remote peer once we've synced // channel state. -func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates bool) { +func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, + recvUpdates bool) { + d.syncerMtx.Lock() defer d.syncerMtx.Unlock() @@ -1485,7 +1506,8 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( // situation in the case where we create a channel, but for some reason fail // to receive the remote peer's proof, while the remote peer is able to fully // assemble the proof and craft the ChannelAnnouncement. -func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAnnouncement, +func (d *AuthenticatedGossiper) processRejectedEdge( + chanAnnMsg *lnwire.ChannelAnnouncement, proof *channeldb.ChannelAuthProof) ([]networkMsg, error) { // First, we'll fetch the state of the channel as we know if from the @@ -1568,7 +1590,9 @@ func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAn // didn't affect the internal state due to either being out of date, invalid, // or redundant, then nil is returned. Otherwise, the set of announcements will // be returned which should be broadcasted to the rest of the network. -func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []networkMsg { +func (d *AuthenticatedGossiper) processNetworkAnnouncement( + nMsg *networkMsg) []networkMsg { + isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { // TODO(roasbeef) make height delta 6 // * or configurable @@ -1604,7 +1628,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n } } - features := lnwire.NewFeatureVector(msg.Features, lnwire.GlobalFeatures) + features := lnwire.NewFeatureVector( + msg.Features, lnwire.GlobalFeatures, + ) node := &channeldb.LightningNode{ HaveNodeAnnouncement: true, LastUpdate: timestamp, @@ -1649,12 +1675,16 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // We'll ignore any channel announcements that target any chain // other than the set of chains we know of. if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { - log.Errorf("Ignoring ChannelAnnouncement from "+ + err := fmt.Errorf("Ignoring ChannelAnnouncement from "+ "chain=%v, gossiper on chain=%v", msg.ChainHash, d.cfg.ChainHash) + log.Errorf(err.Error()) + d.rejectMtx.Lock() d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} d.rejectMtx.Unlock() + + nMsg.err <- err return nil } @@ -1663,8 +1693,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // to be fully verified once we advance forward in the chain. if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { blockHeight := msg.ShortChannelID.BlockHeight - log.Infof("Announcement for chan_id=(%v), is premature: "+ - "advertises height %v, only height %v is known", + log.Infof("Announcement for chan_id=(%v), is "+ + "premature: advertises height %v, only "+ + "height %v is known", msg.ShortChannelID.ToUint64(), msg.ShortChannelID.BlockHeight, atomic.LoadUint32(&d.bestHeight)) @@ -1805,35 +1836,33 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // ensure we don't block here, as we can handle only one // announcement at a time. for _, cu := range channelUpdates { + d.wg.Add(1) go func(nMsg *networkMsg) { + defer d.wg.Done() + switch msg := nMsg.msg.(type) { + // Reprocess the message, making sure we return + // an error to the original caller in case the + // gossiper shuts down. case *lnwire.ChannelUpdate: - // We can safely wait for the error to - // be returned, as in case of shutdown, - // the gossiper will return an error. - var err error - if nMsg.isRemote { - err = <-d.ProcessRemoteAnnouncement( - msg, nMsg.peer) - } else { - err = <-d.ProcessLocalAnnouncement( - msg, nMsg.source) - } - if err != nil { - log.Errorf("Failed reprocessing"+ - " ChannelUpdate for "+ - "shortChanID=%v: %v", - msg.ShortChannelID.ToUint64(), - err) - return + log.Debugf("Reprocessing"+ + " ChannelUpdate for "+ + "shortChanID=%v", + msg.ShortChannelID.ToUint64()) + + select { + case d.networkMsgs <- nMsg: + case <-d.quit: + nMsg.err <- ErrGossiperShuttingDown } // We don't expect any other message type than // ChannelUpdate to be in this map. default: log.Errorf("Unsupported message type "+ - "found among ChannelUpdates: %T", msg) + "found among ChannelUpdates: "+ + "%T", msg) } }(cu) } @@ -1859,12 +1888,16 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // We'll ignore any channel announcements that target any chain // other than the set of chains we know of. if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { - log.Errorf("Ignoring ChannelUpdate from "+ + err := fmt.Errorf("Ignoring ChannelUpdate from "+ "chain=%v, gossiper on chain=%v", msg.ChainHash, d.cfg.ChainHash) + log.Errorf(err.Error()) + d.rejectMtx.Lock() d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} d.rejectMtx.Unlock() + + nMsg.err <- err return nil } @@ -1947,19 +1980,27 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // If the node supports it, we may try to // request the chan ann from it. + d.wg.Add(1) go func() { + defer d.wg.Done() + reqErr := d.maybeRequestChanAnn( msg.ShortChannelID, ) if reqErr != nil { - log.Errorf("unable to request ann "+ - "for chan_id=%v: %v", shortChanID, + log.Errorf("unable to request "+ + "ann for chan_id=%v: "+ + "%v", shortChanID, reqErr) } }() - nMsg.err <- nil + // NOTE: We don't return anything on the error + // channel for this message, as we expect that + // will be done when this ChannelUpdate is + // later reprocessed. return nil + default: err := errors.Errorf("unable to validate "+ "channel update short_chan_id=%v: %v", @@ -2010,7 +2051,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n } if err := d.cfg.Router.UpdateEdge(update); err != nil { - if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { + if routing.IsError(err, routing.ErrOutdated, + routing.ErrIgnored) { log.Debug(err) } else { d.rejectMtx.Lock() @@ -2075,7 +2117,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // willingness of nodes involved in the funding of a channel to // announce this new channel to the rest of the world. case *lnwire.AnnounceSignatures: - needBlockHeight := msg.ShortChannelID.BlockHeight + d.cfg.ProofMatureDelta + needBlockHeight := msg.ShortChannelID.BlockHeight + + d.cfg.ProofMatureDelta shortChanID := msg.ShortChannelID.ToUint64() prefix := "local" @@ -2114,6 +2157,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // not change before we call AddProof() later. d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) + chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( msg.ShortChannelID) if err != nil { @@ -2165,7 +2209,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // Since the remote peer might not be online // we'll call a method that will attempt to // deliver the proof when it comes online. - if err := d.sendAnnSigReliably(msg, remotePeer); err != nil { + err := d.sendAnnSigReliably(msg, remotePeer) + if err != nil { err := errors.Errorf("unable to send reliably "+ "to remote for short_chan_id=%v: %v", shortChanID, err) @@ -2197,13 +2242,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n peerID) chanAnn, _, _, err := CreateChanAnnouncement( - chanInfo.AuthProof, chanInfo, e1, e2, + chanInfo.AuthProof, chanInfo, + e1, e2, ) if err != nil { - log.Errorf("unable to gen ann: %v", err) + log.Errorf("unable to gen "+ + "ann: %v", err) return } - err = nMsg.peer.SendMessage(false, chanAnn) + err = nMsg.peer.SendMessage( + false, chanAnn, + ) if err != nil { log.Errorf("Failed sending "+ "full proof to "+ @@ -2212,7 +2261,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n return } log.Debugf("Full proof sent to peer=%x"+ - " for chanID=%v", peerID, msg.ChannelID) + " for chanID=%v", peerID, + msg.ChannelID) }() } @@ -2271,7 +2321,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes() dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes() } - chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement(&dbProof, chanInfo, e1, e2) + chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement( + &dbProof, chanInfo, e1, e2, + ) if err != nil { log.Error(err) nMsg.err <- err @@ -2306,9 +2358,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n return nil } - if err := d.waitingProofs.Remove(proof.OppositeKey()); err != nil { + err = d.waitingProofs.Remove(proof.OppositeKey()) + if err != nil { err := errors.Errorf("unable remove opposite proof "+ - "for the channel with chanID=%v: %v", msg.ChannelID, err) + "for the channel with chanID=%v: %v", + msg.ChannelID, err) log.Error(err) nMsg.err <- err return nil @@ -2421,8 +2475,8 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably( remotePeer.SerializeCompressed()) case <-d.quit: - log.Infof("Gossiper shutting down, did not send" + - " AnnounceSignatures.") + log.Infof("Gossiper shutting down, did not " + + "send AnnounceSignatures.") return } } @@ -2441,7 +2495,8 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably( // updateChannel creates a new fully signed update for the channel, and updates // the underlying graph with the new state. func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, - edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, *lnwire.ChannelUpdate, error) { + edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, + *lnwire.ChannelUpdate, error) { var err error @@ -2542,7 +2597,9 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, // maybeRequestChanAnn will attempt to request the full channel announcement // for a particular short chan ID. We do this in the case that we get a channel // update, yet don't already have a channel announcement for it. -func (d *AuthenticatedGossiper) maybeRequestChanAnn(cid lnwire.ShortChannelID) error { +func (d *AuthenticatedGossiper) maybeRequestChanAnn( + cid lnwire.ShortChannelID) error { + d.syncerMtx.Lock() defer d.syncerMtx.Unlock() diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index dc0ad4f7..7d98c558 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -2005,10 +2005,8 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Recreate the case where the remote node is sending us its ChannelUpdate // before we have been able to process our own ChannelAnnouncement and // ChannelUpdate. - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer) - if err != nil { - t.Fatalf("unable to process :%v", err) - } + errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer) + select { case <-ctx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") @@ -2069,6 +2067,15 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // At this point the remote ChannelUpdate we received earlier should // be reprocessed, as we now have the necessary edge entry in the graph. + select { + case err := <-errRemoteAnn: + if err != nil { + t.Fatalf("error re-processing remote update: %v", err) + } + case <-time.After(2 * trickleDelay): + t.Fatalf("remote update was not processed") + } + // Check that the ChannelEdgePolicy was added to the graph. chanInfo, e1, e2, err = ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID) if err != nil { diff --git a/fundingmanager.go b/fundingmanager.go index f1163fea..b7f0d00b 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -255,7 +255,7 @@ type fundingConfig struct { // SendAnnouncement is used by the FundingManager to send // announcement messages to the Gossiper to possibly broadcast // to the greater network. - SendAnnouncement func(msg lnwire.Message) error + SendAnnouncement func(msg lnwire.Message) chan error // NotifyWhenOnline allows the FundingManager to register with a // subsystem that will notify it when the peer comes online. This is @@ -2078,22 +2078,38 @@ func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel, // Send ChannelAnnouncement and ChannelUpdate to the gossiper to add // to the Router's topology. - if err = f.cfg.SendAnnouncement(ann.chanAnn); err != nil { - if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { - fndgLog.Debugf("Router rejected ChannelAnnouncement: %v", - err) - } else { - return fmt.Errorf("error sending channel "+ - "announcement: %v", err) + errChan := f.cfg.SendAnnouncement(ann.chanAnn) + select { + case err := <-errChan: + if err != nil { + if routing.IsError(err, routing.ErrOutdated, + routing.ErrIgnored) { + fndgLog.Debugf("Router rejected "+ + "ChannelAnnouncement: %v", err) + } else { + return fmt.Errorf("error sending channel "+ + "announcement: %v", err) + } } + case <-f.quit: + return ErrFundingManagerShuttingDown } - if err = f.cfg.SendAnnouncement(ann.chanUpdateAnn); err != nil { - if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { - fndgLog.Debugf("Router rejected ChannelUpdate: %v", err) - } else { - return fmt.Errorf("error sending channel "+ - "update: %v", err) + + errChan = f.cfg.SendAnnouncement(ann.chanUpdateAnn) + select { + case err := <-errChan: + if err != nil { + if routing.IsError(err, routing.ErrOutdated, + routing.ErrIgnored) { + fndgLog.Debugf("Router rejected "+ + "ChannelUpdate: %v", err) + } else { + return fmt.Errorf("error sending channel "+ + "update: %v", err) + } } + case <-f.quit: + return ErrFundingManagerShuttingDown } // As the channel is now added to the ChannelRouter's topology, the @@ -2516,14 +2532,23 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe // because addToRouterGraph previously send the ChannelAnnouncement and // the ChannelUpdate announcement messages. The channel proof and node // announcements are broadcast to the greater network. - if err = f.cfg.SendAnnouncement(ann.chanProof); err != nil { - if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { - fndgLog.Debugf("Router rejected AnnounceSignatures: %v", - err) - } else { - fndgLog.Errorf("Unable to send channel proof: %v", err) - return err + errChan := f.cfg.SendAnnouncement(ann.chanProof) + select { + case err := <-errChan: + if err != nil { + if routing.IsError(err, routing.ErrOutdated, + routing.ErrIgnored) { + fndgLog.Debugf("Router rejected "+ + "AnnounceSignatures: %v", err) + } else { + fndgLog.Errorf("Unable to send channel "+ + "proof: %v", err) + return err + } } + + case <-f.quit: + return ErrFundingManagerShuttingDown } // Now that the channel is announced to the network, we will also @@ -2536,15 +2561,25 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe return err } - if err := f.cfg.SendAnnouncement(&nodeAnn); err != nil { - if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { - fndgLog.Debugf("Router rejected NodeAnnouncement: %v", - err) - } else { - fndgLog.Errorf("Unable to send node announcement: %v", err) - return err + errChan = f.cfg.SendAnnouncement(&nodeAnn) + select { + case err := <-errChan: + if err != nil { + if routing.IsError(err, routing.ErrOutdated, + routing.ErrIgnored) { + fndgLog.Debugf("Router rejected "+ + "NodeAnnouncement: %v", err) + } else { + fndgLog.Errorf("Unable to send node "+ + "announcement: %v", err) + return err + } } + + case <-f.quit: + return ErrFundingManagerShuttingDown } + return nil } diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 224e1e6e..3cdd23ab 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -286,13 +286,15 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, SignMessage: func(pubKey *btcec.PublicKey, msg []byte) (*btcec.Signature, error) { return testSig, nil }, - SendAnnouncement: func(msg lnwire.Message) error { + SendAnnouncement: func(msg lnwire.Message) chan error { + errChan := make(chan error, 1) select { case sentAnnouncements <- msg: + errChan <- nil case <-shutdownChan: - return fmt.Errorf("shutting down") + errChan <- fmt.Errorf("shutting down") } - return nil + return errChan }, CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return lnwire.NodeAnnouncement{}, nil @@ -410,13 +412,15 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { msg []byte) (*btcec.Signature, error) { return testSig, nil }, - SendAnnouncement: func(msg lnwire.Message) error { + SendAnnouncement: func(msg lnwire.Message) chan error { + errChan := make(chan error, 1) select { case aliceAnnounceChan <- msg: + errChan <- nil case <-shutdownChan: - return fmt.Errorf("shutting down") + errChan <- fmt.Errorf("shutting down") } - return nil + return errChan }, CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return lnwire.NodeAnnouncement{}, nil @@ -1096,9 +1100,13 @@ func TestFundingManagerRestartBehavior(t *testing.T) { recreateAliceFundingManager(t, alice) // Intentionally make the channel announcements fail - alice.fundingMgr.cfg.SendAnnouncement = func(msg lnwire.Message) error { - return fmt.Errorf("intentional error in SendAnnouncement") - } + alice.fundingMgr.cfg.SendAnnouncement = + func(msg lnwire.Message) chan error { + errChan := make(chan error, 1) + errChan <- fmt.Errorf("intentional error in " + + "SendAnnouncement") + return errChan + } fundingLockedAlice := assertFundingMsgSent( t, alice.msgChan, "FundingLocked", diff --git a/routing/errors.go b/routing/errors.go index 58a92b90..0eb6cb37 100644 --- a/routing/errors.go +++ b/routing/errors.go @@ -31,7 +31,7 @@ const ( ErrTargetNotInNetwork // ErrOutdated is returned when the routing update already have - // been applied. + // been applied, or a newer update is already known. ErrOutdated // ErrIgnored is returned when the update have been ignored because @@ -39,6 +39,11 @@ const ( // announcement was given for node not found in any channel. ErrIgnored + // ErrRejected is returned if the update is for a channel ID that was + // previously added to the reject cache because of an invalid update + // was attempted to be processed. + ErrRejected + // ErrPaymentAttemptTimeout is an error that indicates that a payment // attempt timed out before we were able to successfully route an HTLC. ErrPaymentAttemptTimeout diff --git a/routing/router.go b/routing/router.go index 8a8e0f95..e275843a 100644 --- a/routing/router.go +++ b/routing/router.go @@ -946,7 +946,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { r.rejectMtx.RLock() if _, ok := r.rejectCache[msg.ChannelID]; ok { r.rejectMtx.RUnlock() - return newErrf(ErrIgnored, "recently rejected "+ + return newErrf(ErrRejected, "recently rejected "+ "chan_id=%v", msg.ChannelID) } r.rejectMtx.RUnlock() @@ -1055,7 +1055,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { r.rejectMtx.RLock() if _, ok := r.rejectCache[msg.ChannelID]; ok { r.rejectMtx.RUnlock() - return newErrf(ErrIgnored, "recently rejected "+ + return newErrf(ErrRejected, "recently rejected "+ "chan_id=%v", msg.ChannelID) } r.rejectMtx.RUnlock() @@ -1080,30 +1080,31 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // As edges are directional edge node has a unique policy for // the direction of the edge they control. Therefore we first // check if we already have the most up to date information for - // that edge. If so, then we can exit early. + // that edge. If this message has a timestamp not strictly + // newer than what we already know of we can exit early. switch { // A flag set of 0 indicates this is an announcement for the // "first" node in the channel. case msg.Flags&lnwire.ChanUpdateDirection == 0: - if edge1Timestamp.After(msg.LastUpdate) || - edge1Timestamp.Equal(msg.LastUpdate) { - return newErrf(ErrIgnored, "Ignoring update "+ - "(flags=%v) for known chan_id=%v", msg.Flags, - msg.ChannelID) + // Ignore outdated message. + if !edge1Timestamp.Before(msg.LastUpdate) { + return newErrf(ErrOutdated, "Ignoring "+ + "outdated update (flags=%v) for known "+ + "chan_id=%v", msg.Flags, msg.ChannelID) } // Similarly, a flag set of 1 indicates this is an announcement // for the "second" node in the channel. case msg.Flags&lnwire.ChanUpdateDirection == 1: - if edge2Timestamp.After(msg.LastUpdate) || - edge2Timestamp.Equal(msg.LastUpdate) { - return newErrf(ErrIgnored, "Ignoring update "+ - "(flags=%v) for known chan_id=%v", msg.Flags, - msg.ChannelID) + // Ignore outdated message. + if !edge2Timestamp.Before(msg.LastUpdate) { + return newErrf(ErrOutdated, "Ignoring "+ + "outdated update (flags=%v) for known "+ + "chan_id=%v", msg.Flags, msg.ChannelID) } } @@ -2021,7 +2022,7 @@ func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate) error { FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee), FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate), }) - if err != nil && !IsError(err, ErrIgnored) { + if err != nil && !IsError(err, ErrIgnored, ErrOutdated) { return fmt.Errorf("Unable to apply channel update: %v", err) } diff --git a/server.go b/server.go index ae4745f2..881c1d27 100644 --- a/server.go +++ b/server.go @@ -738,11 +738,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return s.genNodeAnnouncement(true) }, - SendAnnouncement: func(msg lnwire.Message) error { - errChan := s.authGossiper.ProcessLocalAnnouncement( + SendAnnouncement: func(msg lnwire.Message) chan error { + return s.authGossiper.ProcessLocalAnnouncement( msg, privKey.PubKey(), ) - return <-errChan }, NotifyWhenOnline: s.NotifyWhenOnline, TempChanIDSeed: chanIDSeed,