discovery: prevent rebroadcast of premature channel updates

As similarly done with premature channel announcements, we'll no longer
allow premature channel updates to be rebroadcast once mature. This is
no longer necessary as channel announcements that we're not aware of are
usually broadcast to us with their accompanying channel updates.
This commit is contained in:
Wilmer Paulino 2021-01-04 17:58:45 -08:00
parent faa399f942
commit 00d4e92362
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 28 additions and 328 deletions

@ -265,13 +265,6 @@ type AuthenticatedGossiper struct {
// every new block height. // every new block height.
blockEpochs *chainntnfs.BlockEpochEvent blockEpochs *chainntnfs.BlockEpochEvent
// prematureChannelUpdates is a map of ChannelUpdates we have received
// that wasn't associated with any channel we know about. We store
// them temporarily, such that we can reprocess them when a
// ChannelAnnouncement for the channel is received.
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
// networkMsgs is a channel that carries new network broadcasted // networkMsgs is a channel that carries new network broadcasted
// message from outside the gossiper service to be processed by the // message from outside the gossiper service to be processed by the
// networkHandler. // networkHandler.
@ -328,7 +321,6 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
networkMsgs: make(chan *networkMsg), networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}), quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(), channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}), recentRejects: make(map[uint64]struct{}),
heightForLastChanUpdate: make(map[uint64][2]uint32), heightForLastChanUpdate: make(map[uint64][2]uint32),
@ -1022,8 +1014,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
}() }()
// A new block has arrived, so we can re-process the previously // A new block has arrived, update our best height.
// premature announcements.
case newBlock, ok := <-d.blockEpochs.Epochs: case newBlock, ok := <-d.blockEpochs.Epochs:
// If the channel has been closed, then this indicates // If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves. // the daemon is shutting down, so we exit ourselves.
@ -1690,55 +1681,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil return nil
} }
// If we earlier received any ChannelUpdates for this channel,
// we can now process them, as the channel is added to the
// graph.
shortChanID := msg.ShortChannelID.ToUint64()
var channelUpdates []*networkMsg
d.pChanUpdMtx.Lock()
channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...)
// Now delete the premature ChannelUpdates, since we added them
// all to the queue of network messages.
delete(d.prematureChannelUpdates, shortChanID)
d.pChanUpdMtx.Unlock()
// Launch a new goroutine to handle each ChannelUpdate, this to
// ensure we don't block here, as we can handle only one
// announcement at a time.
for _, cu := range channelUpdates {
d.wg.Add(1)
go func(nMsg *networkMsg) {
defer d.wg.Done()
switch msg := nMsg.msg.(type) {
// Reprocess the message, making sure we return
// an error to the original caller in case the
// gossiper shuts down.
case *lnwire.ChannelUpdate:
log.Debugf("Reprocessing"+
" ChannelUpdate for "+
"shortChanID=%v",
msg.ShortChannelID.ToUint64())
select {
case d.networkMsgs <- nMsg:
case <-d.quit:
nMsg.err <- ErrGossiperShuttingDown
}
// We don't expect any other message type than
// ChannelUpdate to be in this map.
default:
log.Errorf("Unsupported message type "+
"found among ChannelUpdates: "+
"%T", msg)
}
}(cu)
}
// Channel announcement was successfully proceeded and know it // Channel announcement was successfully proceeded and know it
// might be broadcast to other connected nodes if it was // might be broadcast to other connected nodes if it was
// announcement with proof (remote). // announcement with proof (remote).
@ -1777,8 +1719,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID := msg.ShortChannelID.ToUint64() shortChanID := msg.ShortChannelID.ToUint64()
// If the advertised inclusionary block is beyond our knowledge // If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll put the announcement in limbo // of the chain tip, then we'll ignore it.
// to be fully verified once we advance forward in the chain.
d.Lock() d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
log.Infof("Update announcement for "+ log.Infof("Update announcement for "+
@ -1787,6 +1728,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID, blockHeight, shortChanID, blockHeight,
d.bestHeight) d.bestHeight)
d.Unlock() d.Unlock()
nMsg.err <- nil
return nil return nil
} }
d.Unlock() d.Unlock()
@ -1857,43 +1800,19 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Debugf("Removed edge with chan_id=%v from zombie "+ log.Debugf("Removed edge with chan_id=%v from zombie "+
"index", msg.ShortChannelID) "index", msg.ShortChannelID)
// We'll fallthrough to ensure we stash the update until nMsg.err <- nil
// we receive its corresponding ChannelAnnouncement. return nil
// This is needed to ensure the edge exists in the graph
// before applying the update.
fallthrough
case channeldb.ErrGraphNotFound: case channeldb.ErrGraphNotFound:
fallthrough fallthrough
case channeldb.ErrGraphNoEdgesFound: case channeldb.ErrGraphNoEdgesFound:
fallthrough fallthrough
case channeldb.ErrEdgeNotFound: case channeldb.ErrEdgeNotFound:
// If the edge corresponding to this ChannelUpdate was
// not found in the graph, this might be a channel in
// the process of being opened, and we haven't processed
// our own ChannelAnnouncement yet, hence it is not
// found in the graph. This usually gets resolved after
// the channel proofs are exchanged and the channel is
// broadcasted to the rest of the network, but in case
// this is a private channel this won't ever happen.
// This can also happen in the case of a zombie channel
// with a fresh update for which we don't have a
// ChannelAnnouncement for since we reject them. Because
// of this, we temporarily add it to a map, and
// reprocess it after our own ChannelAnnouncement has
// been processed.
d.pChanUpdMtx.Lock()
d.prematureChannelUpdates[shortChanID] = append(
d.prematureChannelUpdates[shortChanID], nMsg,
)
d.pChanUpdMtx.Unlock()
log.Debugf("Got ChannelUpdate for edge not found in "+ log.Debugf("Got ChannelUpdate for edge not found in "+
"graph(shortChanID=%v), saving for "+ "graph(shortChanID=%v)", shortChanID)
"reprocessing later", shortChanID)
// NOTE: We don't return anything on the error channel // TODO: Add to recentRejects like below? Return error?
// for this message, as we expect that will be done when nMsg.err <- nil
// this ChannelUpdate is later reprocessed.
return nil return nil
default: default:

@ -907,8 +907,10 @@ func TestPrematureAnnouncement(t *testing.T) {
} }
select { select {
case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
t.Fatal("announcement was proceeded") if err != nil {
t.Fatal(err)
}
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
} }
@ -925,8 +927,10 @@ func TestPrematureAnnouncement(t *testing.T) {
} }
select { select {
case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
t.Fatal("announcement was proceeded") if err != nil {
t.Fatal(err)
}
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
} }
@ -2372,27 +2376,13 @@ func TestProcessZombieEdgeNowLive(t *testing.T) {
// The channel update cannot be successfully processed and broadcast // The channel update cannot be successfully processed and broadcast
// until the channel announcement is. Since the channel update indicates // until the channel announcement is. Since the channel update indicates
// a fresh new update, the gossiper should stash it until it sees the // a fresh new update, the gossiper should mark the channel as live and
// corresponding channel announcement. // allow it once it sees it again.
updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement( errChan := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer, batch.chanUpdAnn2, remotePeer,
) )
select { select {
case <-ctx.broadcastedMessage: case err := <-errChan:
t.Fatal("expected to not broadcast live channel update " +
"without announcement")
case <-time.After(2 * trickleDelay):
}
// We'll go ahead and process the channel announcement to ensure the
// channel update is processed thereafter.
processAnnouncement(batch.remoteChanAnn, false, false)
// After successfully processing the announcement, the channel update
// should have been processed and broadcast successfully as well.
select {
case err := <-updateErrChan:
if err != nil { if err != nil {
t.Fatalf("expected to process live channel update: %v", t.Fatalf("expected to process live channel update: %v",
err) err)
@ -2401,226 +2391,17 @@ func TestProcessZombieEdgeNowLive(t *testing.T) {
t.Fatal("expected to process announcement") t.Fatal("expected to process announcement")
} }
select {
case msgWithSenders := <-ctx.broadcastedMessage:
assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg)
case <-time.After(2 * trickleDelay):
t.Fatal("expected to broadcast live channel update")
}
}
// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate
// from the remote before we have processed our own ChannelAnnouncement, it will
// be reprocessed later, after our ChannelAnnouncement.
func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
// Set up a channel that we can use to inspect the messages sent
// directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10)
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
// Override NotifyWhenOnline to return the remote peer which we expect
// meesages to be sent to.
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
peerChan chan<- lnpeer.Peer) {
peerChan <- remotePeer
}
// Recreate the case where the remote node is sending us its ChannelUpdate
// before we have been able to process our own ChannelAnnouncement and
// ChannelUpdate.
errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast") t.Fatal("expected to not broadcast live channel update " +
"without announcement")
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer) // Re-process the channel announcement and update. Both should be
if err != nil { // applied to the graph and broadcast.
t.Fatalf("unable to process node ann: %v", err) processAnnouncement(batch.remoteChanAnn, false, false)
} processAnnouncement(batch.chanUpdAnn2, false, false)
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Since the remote ChannelUpdate was added for an edge that
// we did not already know about, it should have been added
// to the map of premature ChannelUpdates. Check that nothing
// was added to the graph.
chanInfo, e1, e2, err := ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID)
if err != channeldb.ErrEdgeNotFound {
t.Fatalf("Expected ErrEdgeNotFound, got: %v", err)
}
if chanInfo != nil {
t.Fatalf("chanInfo was not nil")
}
if e1 != nil {
t.Fatalf("e1 was not nil")
}
if e2 != nil {
t.Fatalf("e2 was not nil")
}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The local ChannelUpdate should now be sent directly to the remote peer,
// such that the edge can be used for routing, regardless if this channel
// is announced or not (private channel).
select {
case msg := <-sentMsgs:
assertMessage(t, batch.chanUpdAnn1, msg)
case <-time.After(1 * time.Second):
t.Fatal("gossiper did not send channel update to peer")
}
// At this point the remote ChannelUpdate we received earlier should
// be reprocessed, as we now have the necessary edge entry in the graph.
select {
case err := <-errRemoteAnn:
if err != nil {
t.Fatalf("error re-processing remote update: %v", err)
}
case <-time.After(2 * trickleDelay):
t.Fatalf("remote update was not processed")
}
// Check that the ChannelEdgePolicy was added to the graph.
chanInfo, e1, e2, err = ctx.router.GetChannelByID(
batch.chanUpdAnn1.ShortChannelID,
)
if err != nil {
t.Fatalf("unable to get channel from router: %v", err)
}
if chanInfo == nil {
t.Fatalf("chanInfo was nil")
}
if e1 == nil {
t.Fatalf("e1 was nil")
}
if e2 == nil {
t.Fatalf("e2 was nil")
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("announcements were broadcast")
case <-time.After(2 * trickleDelay):
}
number := 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
func() {
number = 0
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
for i := 0; i < 4; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
}
number = 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
func() {
number = 0
},
); err != nil && err != channeldb.ErrWaitingProofNotFound {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("waiting proof should be removed from storage")
}
} }
// TestExtraDataChannelAnnouncementValidation tests that we're able to properly // TestExtraDataChannelAnnouncementValidation tests that we're able to properly