Merge pull request #1754 from halseth/fndmgr-gossiper-handoff

Unify gossiper and router announcement handling
This commit is contained in:
Olaoluwa Osuntokun 2018-08-23 18:44:37 -07:00 committed by GitHub
commit 8de84c4576
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 239 additions and 127 deletions

@ -31,6 +31,10 @@ var (
// AnnounceSignatures messages, by persisting them until a send // AnnounceSignatures messages, by persisting them until a send
// operation has succeeded. // operation has succeeded.
messageStoreKey = []byte("message-store") messageStoreKey = []byte("message-store")
// ErrGossiperShuttingDown is an error that is returned if the gossiper
// is in the process of being shut down.
ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
) )
// networkMsg couples a routing related wire message with the peer that // networkMsg couples a routing related wire message with the peer that
@ -261,7 +265,9 @@ func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error {
// containing all the messages to be sent to the target peer. // containing all the messages to be sent to the target peer.
var announceMessages []lnwire.Message var announceMessages []lnwire.Message
makeNodeAnn := func(n *channeldb.LightningNode) (*lnwire.NodeAnnouncement, error) { makeNodeAnn := func(n *channeldb.LightningNode) (
*lnwire.NodeAnnouncement, error) {
alias, _ := lnwire.NewNodeAlias(n.Alias) alias, _ := lnwire.NewNodeAlias(n.Alias)
wireSig, err := lnwire.NewSigFromRawSignature(n.AuthSigBytes) wireSig, err := lnwire.NewSigFromRawSignature(n.AuthSigBytes)
@ -476,7 +482,7 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
select { select {
case d.networkMsgs <- nMsg: case d.networkMsgs <- nMsg:
case <-d.quit: case <-d.quit:
nMsg.err <- errors.New("gossiper has shut down") nMsg.err <- ErrGossiperShuttingDown
} }
return nMsg.err return nMsg.err
@ -502,7 +508,7 @@ func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
select { select {
case d.networkMsgs <- nMsg: case d.networkMsgs <- nMsg:
case <-d.quit: case <-d.quit:
nMsg.err <- errors.New("gossiper has shut down") nMsg.err <- ErrGossiperShuttingDown
} }
return nMsg.err return nMsg.err
@ -893,7 +899,9 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
// gossip syncer for an inbound message so we can properly dispatch the // gossip syncer for an inbound message so we can properly dispatch the
// incoming message. If a gossip syncer isn't found, then one will be created // incoming message. If a gossip syncer isn't found, then one will be created
// for the target peer. // for the target peer.
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) { func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (
*gossipSyncer, error) {
target := routing.NewVertex(pub) target := routing.NewVertex(pub)
// First, we'll try to find an existing gossiper for this peer. // First, we'll try to find an existing gossiper for this peer.
@ -988,7 +996,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
// First, we'll now create new fully signed updates for // First, we'll now create new fully signed updates for
// the affected channels and also update the underlying // the affected channels and also update the underlying
// graph with the new state. // graph with the new state.
newChanUpdates, err := d.processChanPolicyUpdate(policyUpdate) newChanUpdates, err := d.processChanPolicyUpdate(
policyUpdate,
)
if err != nil { if err != nil {
log.Errorf("Unable to craft policy updates: %v", log.Errorf("Unable to craft policy updates: %v",
err) err)
@ -1061,7 +1071,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
// If this message was recently rejected, then we won't // If this message was recently rejected, then we won't
// attempt to re-process it. // attempt to re-process it.
if d.isRecentlyRejectedMsg(announcement.msg) { if d.isRecentlyRejectedMsg(announcement.msg) {
announcement.err <- fmt.Errorf("recently rejected") announcement.err <- fmt.Errorf("recently " +
"rejected")
continue continue
} }
@ -1088,26 +1099,31 @@ func (d *AuthenticatedGossiper) networkHandler() {
"barrier shutdown: %v", "barrier shutdown: %v",
err) err)
} }
announcement.err <- err
return return
} }
// Process the network announcement to determine if // Process the network announcement to
// this is either a new announcement from our PoV // determine if this is either a new
// or an edges to a prior vertex/edge we previously // announcement from our PoV or an edges to a
// proceeded. // prior vertex/edge we previously proceeded.
emittedAnnouncements := d.processNetworkAnnouncement( emittedAnnouncements := d.processNetworkAnnouncement(
announcement, announcement,
) )
// If this message had any dependencies, then // If this message had any dependencies, then
// we can now signal them to continue. // we can now signal them to continue.
validationBarrier.SignalDependants(announcement.msg) validationBarrier.SignalDependants(
announcement.msg,
)
// If the announcement was accepted, then add the // If the announcement was accepted, then add
// emitted announcements to our announce batch to // the emitted announcements to our announce
// be broadcast once the trickle timer ticks gain. // batch to be broadcast once the trickle timer
// ticks gain.
if emittedAnnouncements != nil { if emittedAnnouncements != nil {
// TODO(roasbeef): exclude peer that sent // TODO(roasbeef): exclude peer that
// sent.
announcements.AddMsgs( announcements.AddMsgs(
emittedAnnouncements..., emittedAnnouncements...,
) )
@ -1133,16 +1149,19 @@ func (d *AuthenticatedGossiper) networkHandler() {
// for this height, if so, then we process them once // for this height, if so, then we process them once
// more as normal announcements. // more as normal announcements.
d.Lock() d.Lock()
numPremature := len(d.prematureAnnouncements[uint32(newBlock.Height)]) numPremature := len(d.prematureAnnouncements[blockHeight])
d.Unlock() d.Unlock()
if numPremature != 0 {
log.Infof("Re-processing %v premature "+ // Return early if no announcement to process.
"announcements for height %v", if numPremature == 0 {
numPremature, blockHeight) continue
} }
log.Infof("Re-processing %v premature announcements "+
"for height %v", numPremature, blockHeight)
d.Lock() d.Lock()
for _, ann := range d.prematureAnnouncements[uint32(newBlock.Height)] { for _, ann := range d.prematureAnnouncements[blockHeight] {
emittedAnnouncements := d.processNetworkAnnouncement(ann) emittedAnnouncements := d.processNetworkAnnouncement(ann)
if emittedAnnouncements != nil { if emittedAnnouncements != nil {
announcements.AddMsgs( announcements.AddMsgs(
@ -1208,9 +1227,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
// The retransmission timer has ticked which indicates that we // The retransmission timer has ticked which indicates that we
// should check if we need to prune or re-broadcast any of our // should check if we need to prune or re-broadcast any of our
// personal channels. This addresses the case of "zombie" channels and // personal channels. This addresses the case of "zombie"
// channel advertisements that have been dropped, or not properly // channels and channel advertisements that have been dropped,
// propagated through the network. // or not properly propagated through the network.
case <-retransmitTimer.C: case <-retransmitTimer.C:
if err := d.retransmitStaleChannels(); err != nil { if err := d.retransmitStaleChannels(); err != nil {
log.Errorf("unable to rebroadcast stale "+ log.Errorf("unable to rebroadcast stale "+
@ -1233,7 +1252,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
// needed to handle new queries. The recvUpdates bool indicates if we should // needed to handle new queries. The recvUpdates bool indicates if we should
// continue to receive real-time updates from the remote peer once we've synced // continue to receive real-time updates from the remote peer once we've synced
// channel state. // channel state.
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates bool) { func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
recvUpdates bool) {
d.syncerMtx.Lock() d.syncerMtx.Lock()
defer d.syncerMtx.Unlock() defer d.syncerMtx.Unlock()
@ -1485,7 +1506,8 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
// situation in the case where we create a channel, but for some reason fail // situation in the case where we create a channel, but for some reason fail
// to receive the remote peer's proof, while the remote peer is able to fully // to receive the remote peer's proof, while the remote peer is able to fully
// assemble the proof and craft the ChannelAnnouncement. // assemble the proof and craft the ChannelAnnouncement.
func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAnnouncement, func (d *AuthenticatedGossiper) processRejectedEdge(
chanAnnMsg *lnwire.ChannelAnnouncement,
proof *channeldb.ChannelAuthProof) ([]networkMsg, error) { proof *channeldb.ChannelAuthProof) ([]networkMsg, error) {
// First, we'll fetch the state of the channel as we know if from the // First, we'll fetch the state of the channel as we know if from the
@ -1568,7 +1590,9 @@ func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAn
// didn't affect the internal state due to either being out of date, invalid, // didn't affect the internal state due to either being out of date, invalid,
// or redundant, then nil is returned. Otherwise, the set of announcements will // or redundant, then nil is returned. Otherwise, the set of announcements will
// be returned which should be broadcasted to the rest of the network. // be returned which should be broadcasted to the rest of the network.
func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []networkMsg { func (d *AuthenticatedGossiper) processNetworkAnnouncement(
nMsg *networkMsg) []networkMsg {
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6 // TODO(roasbeef) make height delta 6
// * or configurable // * or configurable
@ -1604,7 +1628,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
} }
} }
features := lnwire.NewFeatureVector(msg.Features, lnwire.GlobalFeatures) features := lnwire.NewFeatureVector(
msg.Features, lnwire.GlobalFeatures,
)
node := &channeldb.LightningNode{ node := &channeldb.LightningNode{
HaveNodeAnnouncement: true, HaveNodeAnnouncement: true,
LastUpdate: timestamp, LastUpdate: timestamp,
@ -1649,12 +1675,16 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// We'll ignore any channel announcements that target any chain // We'll ignore any channel announcements that target any chain
// other than the set of chains we know of. // other than the set of chains we know of.
if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) {
log.Errorf("Ignoring ChannelAnnouncement from "+ err := fmt.Errorf("Ignoring ChannelAnnouncement from "+
"chain=%v, gossiper on chain=%v", msg.ChainHash, "chain=%v, gossiper on chain=%v", msg.ChainHash,
d.cfg.ChainHash) d.cfg.ChainHash)
log.Errorf(err.Error())
d.rejectMtx.Lock() d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock() d.rejectMtx.Unlock()
nMsg.err <- err
return nil return nil
} }
@ -1663,8 +1693,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// to be fully verified once we advance forward in the chain. // to be fully verified once we advance forward in the chain.
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
blockHeight := msg.ShortChannelID.BlockHeight blockHeight := msg.ShortChannelID.BlockHeight
log.Infof("Announcement for chan_id=(%v), is premature: "+ log.Infof("Announcement for chan_id=(%v), is "+
"advertises height %v, only height %v is known", "premature: advertises height %v, only "+
"height %v is known",
msg.ShortChannelID.ToUint64(), msg.ShortChannelID.ToUint64(),
msg.ShortChannelID.BlockHeight, msg.ShortChannelID.BlockHeight,
atomic.LoadUint32(&d.bestHeight)) atomic.LoadUint32(&d.bestHeight))
@ -1805,35 +1836,33 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// ensure we don't block here, as we can handle only one // ensure we don't block here, as we can handle only one
// announcement at a time. // announcement at a time.
for _, cu := range channelUpdates { for _, cu := range channelUpdates {
d.wg.Add(1)
go func(nMsg *networkMsg) { go func(nMsg *networkMsg) {
defer d.wg.Done()
switch msg := nMsg.msg.(type) { switch msg := nMsg.msg.(type) {
// Reprocess the message, making sure we return
// an error to the original caller in case the
// gossiper shuts down.
case *lnwire.ChannelUpdate: case *lnwire.ChannelUpdate:
// We can safely wait for the error to log.Debugf("Reprocessing"+
// be returned, as in case of shutdown, " ChannelUpdate for "+
// the gossiper will return an error. "shortChanID=%v",
var err error msg.ShortChannelID.ToUint64())
if nMsg.isRemote {
err = <-d.ProcessRemoteAnnouncement( select {
msg, nMsg.peer) case d.networkMsgs <- nMsg:
} else { case <-d.quit:
err = <-d.ProcessLocalAnnouncement( nMsg.err <- ErrGossiperShuttingDown
msg, nMsg.source)
}
if err != nil {
log.Errorf("Failed reprocessing"+
" ChannelUpdate for "+
"shortChanID=%v: %v",
msg.ShortChannelID.ToUint64(),
err)
return
} }
// We don't expect any other message type than // We don't expect any other message type than
// ChannelUpdate to be in this map. // ChannelUpdate to be in this map.
default: default:
log.Errorf("Unsupported message type "+ log.Errorf("Unsupported message type "+
"found among ChannelUpdates: %T", msg) "found among ChannelUpdates: "+
"%T", msg)
} }
}(cu) }(cu)
} }
@ -1859,12 +1888,16 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// We'll ignore any channel announcements that target any chain // We'll ignore any channel announcements that target any chain
// other than the set of chains we know of. // other than the set of chains we know of.
if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) {
log.Errorf("Ignoring ChannelUpdate from "+ err := fmt.Errorf("Ignoring ChannelUpdate from "+
"chain=%v, gossiper on chain=%v", msg.ChainHash, "chain=%v, gossiper on chain=%v", msg.ChainHash,
d.cfg.ChainHash) d.cfg.ChainHash)
log.Errorf(err.Error())
d.rejectMtx.Lock() d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock() d.rejectMtx.Unlock()
nMsg.err <- err
return nil return nil
} }
@ -1947,19 +1980,27 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// If the node supports it, we may try to // If the node supports it, we may try to
// request the chan ann from it. // request the chan ann from it.
d.wg.Add(1)
go func() { go func() {
defer d.wg.Done()
reqErr := d.maybeRequestChanAnn( reqErr := d.maybeRequestChanAnn(
msg.ShortChannelID, msg.ShortChannelID,
) )
if reqErr != nil { if reqErr != nil {
log.Errorf("unable to request ann "+ log.Errorf("unable to request "+
"for chan_id=%v: %v", shortChanID, "ann for chan_id=%v: "+
"%v", shortChanID,
reqErr) reqErr)
} }
}() }()
nMsg.err <- nil // NOTE: We don't return anything on the error
// channel for this message, as we expect that
// will be done when this ChannelUpdate is
// later reprocessed.
return nil return nil
default: default:
err := errors.Errorf("unable to validate "+ err := errors.Errorf("unable to validate "+
"channel update short_chan_id=%v: %v", "channel update short_chan_id=%v: %v",
@ -2010,7 +2051,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
} }
if err := d.cfg.Router.UpdateEdge(update); err != nil { if err := d.cfg.Router.UpdateEdge(update); err != nil {
if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
log.Debug(err) log.Debug(err)
} else { } else {
d.rejectMtx.Lock() d.rejectMtx.Lock()
@ -2075,7 +2117,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// willingness of nodes involved in the funding of a channel to // willingness of nodes involved in the funding of a channel to
// announce this new channel to the rest of the world. // announce this new channel to the rest of the world.
case *lnwire.AnnounceSignatures: case *lnwire.AnnounceSignatures:
needBlockHeight := msg.ShortChannelID.BlockHeight + d.cfg.ProofMatureDelta needBlockHeight := msg.ShortChannelID.BlockHeight +
d.cfg.ProofMatureDelta
shortChanID := msg.ShortChannelID.ToUint64() shortChanID := msg.ShortChannelID.ToUint64()
prefix := "local" prefix := "local"
@ -2114,6 +2157,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// not change before we call AddProof() later. // not change before we call AddProof() later.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
msg.ShortChannelID) msg.ShortChannelID)
if err != nil { if err != nil {
@ -2165,7 +2209,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// Since the remote peer might not be online // Since the remote peer might not be online
// we'll call a method that will attempt to // we'll call a method that will attempt to
// deliver the proof when it comes online. // deliver the proof when it comes online.
if err := d.sendAnnSigReliably(msg, remotePeer); err != nil { err := d.sendAnnSigReliably(msg, remotePeer)
if err != nil {
err := errors.Errorf("unable to send reliably "+ err := errors.Errorf("unable to send reliably "+
"to remote for short_chan_id=%v: %v", "to remote for short_chan_id=%v: %v",
shortChanID, err) shortChanID, err)
@ -2197,13 +2242,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
peerID) peerID)
chanAnn, _, _, err := CreateChanAnnouncement( chanAnn, _, _, err := CreateChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2, chanInfo.AuthProof, chanInfo,
e1, e2,
) )
if err != nil { if err != nil {
log.Errorf("unable to gen ann: %v", err) log.Errorf("unable to gen "+
"ann: %v", err)
return return
} }
err = nMsg.peer.SendMessage(false, chanAnn) err = nMsg.peer.SendMessage(
false, chanAnn,
)
if err != nil { if err != nil {
log.Errorf("Failed sending "+ log.Errorf("Failed sending "+
"full proof to "+ "full proof to "+
@ -2212,7 +2261,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
return return
} }
log.Debugf("Full proof sent to peer=%x"+ log.Debugf("Full proof sent to peer=%x"+
" for chanID=%v", peerID, msg.ChannelID) " for chanID=%v", peerID,
msg.ChannelID)
}() }()
} }
@ -2271,7 +2321,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes() dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes()
dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes() dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes()
} }
chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement(&dbProof, chanInfo, e1, e2) chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement(
&dbProof, chanInfo, e1, e2,
)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
nMsg.err <- err nMsg.err <- err
@ -2306,9 +2358,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
return nil return nil
} }
if err := d.waitingProofs.Remove(proof.OppositeKey()); err != nil { err = d.waitingProofs.Remove(proof.OppositeKey())
if err != nil {
err := errors.Errorf("unable remove opposite proof "+ err := errors.Errorf("unable remove opposite proof "+
"for the channel with chanID=%v: %v", msg.ChannelID, err) "for the channel with chanID=%v: %v",
msg.ChannelID, err)
log.Error(err) log.Error(err)
nMsg.err <- err nMsg.err <- err
return nil return nil
@ -2421,8 +2475,8 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably(
remotePeer.SerializeCompressed()) remotePeer.SerializeCompressed())
case <-d.quit: case <-d.quit:
log.Infof("Gossiper shutting down, did not send" + log.Infof("Gossiper shutting down, did not " +
" AnnounceSignatures.") "send AnnounceSignatures.")
return return
} }
} }
@ -2441,7 +2495,8 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably(
// updateChannel creates a new fully signed update for the channel, and updates // updateChannel creates a new fully signed update for the channel, and updates
// the underlying graph with the new state. // the underlying graph with the new state.
func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, *lnwire.ChannelUpdate, error) { edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement,
*lnwire.ChannelUpdate, error) {
var err error var err error
@ -2542,7 +2597,9 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
// maybeRequestChanAnn will attempt to request the full channel announcement // maybeRequestChanAnn will attempt to request the full channel announcement
// for a particular short chan ID. We do this in the case that we get a channel // for a particular short chan ID. We do this in the case that we get a channel
// update, yet don't already have a channel announcement for it. // update, yet don't already have a channel announcement for it.
func (d *AuthenticatedGossiper) maybeRequestChanAnn(cid lnwire.ShortChannelID) error { func (d *AuthenticatedGossiper) maybeRequestChanAnn(
cid lnwire.ShortChannelID) error {
d.syncerMtx.Lock() d.syncerMtx.Lock()
defer d.syncerMtx.Unlock() defer d.syncerMtx.Unlock()

@ -2005,10 +2005,8 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
// Recreate the case where the remote node is sending us its ChannelUpdate // Recreate the case where the remote node is sending us its ChannelUpdate
// before we have been able to process our own ChannelAnnouncement and // before we have been able to process our own ChannelAnnouncement and
// ChannelUpdate. // ChannelUpdate.
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer) errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast") t.Fatal("channel update announcement was broadcast")
@ -2069,6 +2067,15 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
// At this point the remote ChannelUpdate we received earlier should // At this point the remote ChannelUpdate we received earlier should
// be reprocessed, as we now have the necessary edge entry in the graph. // be reprocessed, as we now have the necessary edge entry in the graph.
select {
case err := <-errRemoteAnn:
if err != nil {
t.Fatalf("error re-processing remote update: %v", err)
}
case <-time.After(2 * trickleDelay):
t.Fatalf("remote update was not processed")
}
// Check that the ChannelEdgePolicy was added to the graph. // Check that the ChannelEdgePolicy was added to the graph.
chanInfo, e1, e2, err = ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID) chanInfo, e1, e2, err = ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID)
if err != nil { if err != nil {

@ -255,7 +255,7 @@ type fundingConfig struct {
// SendAnnouncement is used by the FundingManager to send // SendAnnouncement is used by the FundingManager to send
// announcement messages to the Gossiper to possibly broadcast // announcement messages to the Gossiper to possibly broadcast
// to the greater network. // to the greater network.
SendAnnouncement func(msg lnwire.Message) error SendAnnouncement func(msg lnwire.Message) chan error
// NotifyWhenOnline allows the FundingManager to register with a // NotifyWhenOnline allows the FundingManager to register with a
// subsystem that will notify it when the peer comes online. This is // subsystem that will notify it when the peer comes online. This is
@ -2078,22 +2078,38 @@ func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel,
// Send ChannelAnnouncement and ChannelUpdate to the gossiper to add // Send ChannelAnnouncement and ChannelUpdate to the gossiper to add
// to the Router's topology. // to the Router's topology.
if err = f.cfg.SendAnnouncement(ann.chanAnn); err != nil { errChan := f.cfg.SendAnnouncement(ann.chanAnn)
if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { select {
fndgLog.Debugf("Router rejected ChannelAnnouncement: %v", case err := <-errChan:
err) if err != nil {
} else { if routing.IsError(err, routing.ErrOutdated,
return fmt.Errorf("error sending channel "+ routing.ErrIgnored) {
"announcement: %v", err) fndgLog.Debugf("Router rejected "+
"ChannelAnnouncement: %v", err)
} else {
return fmt.Errorf("error sending channel "+
"announcement: %v", err)
}
} }
case <-f.quit:
return ErrFundingManagerShuttingDown
} }
if err = f.cfg.SendAnnouncement(ann.chanUpdateAnn); err != nil {
if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { errChan = f.cfg.SendAnnouncement(ann.chanUpdateAnn)
fndgLog.Debugf("Router rejected ChannelUpdate: %v", err) select {
} else { case err := <-errChan:
return fmt.Errorf("error sending channel "+ if err != nil {
"update: %v", err) if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
fndgLog.Debugf("Router rejected "+
"ChannelUpdate: %v", err)
} else {
return fmt.Errorf("error sending channel "+
"update: %v", err)
}
} }
case <-f.quit:
return ErrFundingManagerShuttingDown
} }
// As the channel is now added to the ChannelRouter's topology, the // As the channel is now added to the ChannelRouter's topology, the
@ -2516,14 +2532,23 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
// because addToRouterGraph previously send the ChannelAnnouncement and // because addToRouterGraph previously send the ChannelAnnouncement and
// the ChannelUpdate announcement messages. The channel proof and node // the ChannelUpdate announcement messages. The channel proof and node
// announcements are broadcast to the greater network. // announcements are broadcast to the greater network.
if err = f.cfg.SendAnnouncement(ann.chanProof); err != nil { errChan := f.cfg.SendAnnouncement(ann.chanProof)
if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { select {
fndgLog.Debugf("Router rejected AnnounceSignatures: %v", case err := <-errChan:
err) if err != nil {
} else { if routing.IsError(err, routing.ErrOutdated,
fndgLog.Errorf("Unable to send channel proof: %v", err) routing.ErrIgnored) {
return err fndgLog.Debugf("Router rejected "+
"AnnounceSignatures: %v", err)
} else {
fndgLog.Errorf("Unable to send channel "+
"proof: %v", err)
return err
}
} }
case <-f.quit:
return ErrFundingManagerShuttingDown
} }
// Now that the channel is announced to the network, we will also // Now that the channel is announced to the network, we will also
@ -2536,15 +2561,25 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
return err return err
} }
if err := f.cfg.SendAnnouncement(&nodeAnn); err != nil { errChan = f.cfg.SendAnnouncement(&nodeAnn)
if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { select {
fndgLog.Debugf("Router rejected NodeAnnouncement: %v", case err := <-errChan:
err) if err != nil {
} else { if routing.IsError(err, routing.ErrOutdated,
fndgLog.Errorf("Unable to send node announcement: %v", err) routing.ErrIgnored) {
return err fndgLog.Debugf("Router rejected "+
"NodeAnnouncement: %v", err)
} else {
fndgLog.Errorf("Unable to send node "+
"announcement: %v", err)
return err
}
} }
case <-f.quit:
return ErrFundingManagerShuttingDown
} }
return nil return nil
} }

@ -286,13 +286,15 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
SignMessage: func(pubKey *btcec.PublicKey, msg []byte) (*btcec.Signature, error) { SignMessage: func(pubKey *btcec.PublicKey, msg []byte) (*btcec.Signature, error) {
return testSig, nil return testSig, nil
}, },
SendAnnouncement: func(msg lnwire.Message) error { SendAnnouncement: func(msg lnwire.Message) chan error {
errChan := make(chan error, 1)
select { select {
case sentAnnouncements <- msg: case sentAnnouncements <- msg:
errChan <- nil
case <-shutdownChan: case <-shutdownChan:
return fmt.Errorf("shutting down") errChan <- fmt.Errorf("shutting down")
} }
return nil return errChan
}, },
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
return lnwire.NodeAnnouncement{}, nil return lnwire.NodeAnnouncement{}, nil
@ -410,13 +412,15 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
msg []byte) (*btcec.Signature, error) { msg []byte) (*btcec.Signature, error) {
return testSig, nil return testSig, nil
}, },
SendAnnouncement: func(msg lnwire.Message) error { SendAnnouncement: func(msg lnwire.Message) chan error {
errChan := make(chan error, 1)
select { select {
case aliceAnnounceChan <- msg: case aliceAnnounceChan <- msg:
errChan <- nil
case <-shutdownChan: case <-shutdownChan:
return fmt.Errorf("shutting down") errChan <- fmt.Errorf("shutting down")
} }
return nil return errChan
}, },
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
return lnwire.NodeAnnouncement{}, nil return lnwire.NodeAnnouncement{}, nil
@ -1096,9 +1100,13 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
recreateAliceFundingManager(t, alice) recreateAliceFundingManager(t, alice)
// Intentionally make the channel announcements fail // Intentionally make the channel announcements fail
alice.fundingMgr.cfg.SendAnnouncement = func(msg lnwire.Message) error { alice.fundingMgr.cfg.SendAnnouncement =
return fmt.Errorf("intentional error in SendAnnouncement") func(msg lnwire.Message) chan error {
} errChan := make(chan error, 1)
errChan <- fmt.Errorf("intentional error in " +
"SendAnnouncement")
return errChan
}
fundingLockedAlice := assertFundingMsgSent( fundingLockedAlice := assertFundingMsgSent(
t, alice.msgChan, "FundingLocked", t, alice.msgChan, "FundingLocked",

@ -31,7 +31,7 @@ const (
ErrTargetNotInNetwork ErrTargetNotInNetwork
// ErrOutdated is returned when the routing update already have // ErrOutdated is returned when the routing update already have
// been applied. // been applied, or a newer update is already known.
ErrOutdated ErrOutdated
// ErrIgnored is returned when the update have been ignored because // ErrIgnored is returned when the update have been ignored because
@ -39,6 +39,11 @@ const (
// announcement was given for node not found in any channel. // announcement was given for node not found in any channel.
ErrIgnored ErrIgnored
// ErrRejected is returned if the update is for a channel ID that was
// previously added to the reject cache because of an invalid update
// was attempted to be processed.
ErrRejected
// ErrPaymentAttemptTimeout is an error that indicates that a payment // ErrPaymentAttemptTimeout is an error that indicates that a payment
// attempt timed out before we were able to successfully route an HTLC. // attempt timed out before we were able to successfully route an HTLC.
ErrPaymentAttemptTimeout ErrPaymentAttemptTimeout

@ -946,7 +946,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
r.rejectMtx.RLock() r.rejectMtx.RLock()
if _, ok := r.rejectCache[msg.ChannelID]; ok { if _, ok := r.rejectCache[msg.ChannelID]; ok {
r.rejectMtx.RUnlock() r.rejectMtx.RUnlock()
return newErrf(ErrIgnored, "recently rejected "+ return newErrf(ErrRejected, "recently rejected "+
"chan_id=%v", msg.ChannelID) "chan_id=%v", msg.ChannelID)
} }
r.rejectMtx.RUnlock() r.rejectMtx.RUnlock()
@ -1055,7 +1055,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
r.rejectMtx.RLock() r.rejectMtx.RLock()
if _, ok := r.rejectCache[msg.ChannelID]; ok { if _, ok := r.rejectCache[msg.ChannelID]; ok {
r.rejectMtx.RUnlock() r.rejectMtx.RUnlock()
return newErrf(ErrIgnored, "recently rejected "+ return newErrf(ErrRejected, "recently rejected "+
"chan_id=%v", msg.ChannelID) "chan_id=%v", msg.ChannelID)
} }
r.rejectMtx.RUnlock() r.rejectMtx.RUnlock()
@ -1080,30 +1080,31 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// As edges are directional edge node has a unique policy for // As edges are directional edge node has a unique policy for
// the direction of the edge they control. Therefore we first // the direction of the edge they control. Therefore we first
// check if we already have the most up to date information for // check if we already have the most up to date information for
// that edge. If so, then we can exit early. // that edge. If this message has a timestamp not strictly
// newer than what we already know of we can exit early.
switch { switch {
// A flag set of 0 indicates this is an announcement for the // A flag set of 0 indicates this is an announcement for the
// "first" node in the channel. // "first" node in the channel.
case msg.Flags&lnwire.ChanUpdateDirection == 0: case msg.Flags&lnwire.ChanUpdateDirection == 0:
if edge1Timestamp.After(msg.LastUpdate) ||
edge1Timestamp.Equal(msg.LastUpdate) {
return newErrf(ErrIgnored, "Ignoring update "+ // Ignore outdated message.
"(flags=%v) for known chan_id=%v", msg.Flags, if !edge1Timestamp.Before(msg.LastUpdate) {
msg.ChannelID) return newErrf(ErrOutdated, "Ignoring "+
"outdated update (flags=%v) for known "+
"chan_id=%v", msg.Flags, msg.ChannelID)
} }
// Similarly, a flag set of 1 indicates this is an announcement // Similarly, a flag set of 1 indicates this is an announcement
// for the "second" node in the channel. // for the "second" node in the channel.
case msg.Flags&lnwire.ChanUpdateDirection == 1: case msg.Flags&lnwire.ChanUpdateDirection == 1:
if edge2Timestamp.After(msg.LastUpdate) ||
edge2Timestamp.Equal(msg.LastUpdate) {
return newErrf(ErrIgnored, "Ignoring update "+ // Ignore outdated message.
"(flags=%v) for known chan_id=%v", msg.Flags, if !edge2Timestamp.Before(msg.LastUpdate) {
msg.ChannelID) return newErrf(ErrOutdated, "Ignoring "+
"outdated update (flags=%v) for known "+
"chan_id=%v", msg.Flags, msg.ChannelID)
} }
} }
@ -2021,7 +2022,7 @@ func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate) error {
FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee), FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee),
FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate), FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate),
}) })
if err != nil && !IsError(err, ErrIgnored) { if err != nil && !IsError(err, ErrIgnored, ErrOutdated) {
return fmt.Errorf("Unable to apply channel update: %v", err) return fmt.Errorf("Unable to apply channel update: %v", err)
} }

@ -738,11 +738,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
return s.genNodeAnnouncement(true) return s.genNodeAnnouncement(true)
}, },
SendAnnouncement: func(msg lnwire.Message) error { SendAnnouncement: func(msg lnwire.Message) chan error {
errChan := s.authGossiper.ProcessLocalAnnouncement( return s.authGossiper.ProcessLocalAnnouncement(
msg, privKey.PubKey(), msg, privKey.PubKey(),
) )
return <-errChan
}, },
NotifyWhenOnline: s.NotifyWhenOnline, NotifyWhenOnline: s.NotifyWhenOnline,
TempChanIDSeed: chanIDSeed, TempChanIDSeed: chanIDSeed,