Merge pull request #4895 from wpaulino/disallow-premature-chan-updates
discovery: prevent rebroadcast of premature channel updates
This commit is contained in:
commit
6e6384114c
@ -265,13 +265,6 @@ type AuthenticatedGossiper struct {
|
||||
// every new block height.
|
||||
blockEpochs *chainntnfs.BlockEpochEvent
|
||||
|
||||
// prematureChannelUpdates is a map of ChannelUpdates we have received
|
||||
// that wasn't associated with any channel we know about. We store
|
||||
// them temporarily, such that we can reprocess them when a
|
||||
// ChannelAnnouncement for the channel is received.
|
||||
prematureChannelUpdates map[uint64][]*networkMsg
|
||||
pChanUpdMtx sync.Mutex
|
||||
|
||||
// networkMsgs is a channel that carries new network broadcasted
|
||||
// message from outside the gossiper service to be processed by the
|
||||
// networkHandler.
|
||||
@ -328,7 +321,6 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
|
||||
networkMsgs: make(chan *networkMsg),
|
||||
quit: make(chan struct{}),
|
||||
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
|
||||
prematureChannelUpdates: make(map[uint64][]*networkMsg),
|
||||
channelMtx: multimutex.NewMutex(),
|
||||
recentRejects: make(map[uint64]struct{}),
|
||||
heightForLastChanUpdate: make(map[uint64][2]uint32),
|
||||
@ -1022,8 +1014,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
|
||||
}()
|
||||
|
||||
// A new block has arrived, so we can re-process the previously
|
||||
// premature announcements.
|
||||
// A new block has arrived, update our best height.
|
||||
case newBlock, ok := <-d.blockEpochs.Epochs:
|
||||
// If the channel has been closed, then this indicates
|
||||
// the daemon is shutting down, so we exit ourselves.
|
||||
@ -1690,55 +1681,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
return nil
|
||||
}
|
||||
|
||||
// If we earlier received any ChannelUpdates for this channel,
|
||||
// we can now process them, as the channel is added to the
|
||||
// graph.
|
||||
shortChanID := msg.ShortChannelID.ToUint64()
|
||||
var channelUpdates []*networkMsg
|
||||
|
||||
d.pChanUpdMtx.Lock()
|
||||
channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...)
|
||||
|
||||
// Now delete the premature ChannelUpdates, since we added them
|
||||
// all to the queue of network messages.
|
||||
delete(d.prematureChannelUpdates, shortChanID)
|
||||
d.pChanUpdMtx.Unlock()
|
||||
|
||||
// Launch a new goroutine to handle each ChannelUpdate, this to
|
||||
// ensure we don't block here, as we can handle only one
|
||||
// announcement at a time.
|
||||
for _, cu := range channelUpdates {
|
||||
d.wg.Add(1)
|
||||
go func(nMsg *networkMsg) {
|
||||
defer d.wg.Done()
|
||||
|
||||
switch msg := nMsg.msg.(type) {
|
||||
|
||||
// Reprocess the message, making sure we return
|
||||
// an error to the original caller in case the
|
||||
// gossiper shuts down.
|
||||
case *lnwire.ChannelUpdate:
|
||||
log.Debugf("Reprocessing"+
|
||||
" ChannelUpdate for "+
|
||||
"shortChanID=%v",
|
||||
msg.ShortChannelID.ToUint64())
|
||||
|
||||
select {
|
||||
case d.networkMsgs <- nMsg:
|
||||
case <-d.quit:
|
||||
nMsg.err <- ErrGossiperShuttingDown
|
||||
}
|
||||
|
||||
// We don't expect any other message type than
|
||||
// ChannelUpdate to be in this map.
|
||||
default:
|
||||
log.Errorf("Unsupported message type "+
|
||||
"found among ChannelUpdates: "+
|
||||
"%T", msg)
|
||||
}
|
||||
}(cu)
|
||||
}
|
||||
|
||||
// Channel announcement was successfully proceeded and know it
|
||||
// might be broadcast to other connected nodes if it was
|
||||
// announcement with proof (remote).
|
||||
@ -1777,8 +1719,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
shortChanID := msg.ShortChannelID.ToUint64()
|
||||
|
||||
// If the advertised inclusionary block is beyond our knowledge
|
||||
// of the chain tip, then we'll put the announcement in limbo
|
||||
// to be fully verified once we advance forward in the chain.
|
||||
// of the chain tip, then we'll ignore it.
|
||||
d.Lock()
|
||||
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
|
||||
log.Infof("Update announcement for "+
|
||||
@ -1787,6 +1728,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
shortChanID, blockHeight,
|
||||
d.bestHeight)
|
||||
d.Unlock()
|
||||
|
||||
nMsg.err <- nil
|
||||
return nil
|
||||
}
|
||||
d.Unlock()
|
||||
@ -1857,43 +1800,19 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
log.Debugf("Removed edge with chan_id=%v from zombie "+
|
||||
"index", msg.ShortChannelID)
|
||||
|
||||
// We'll fallthrough to ensure we stash the update until
|
||||
// we receive its corresponding ChannelAnnouncement.
|
||||
// This is needed to ensure the edge exists in the graph
|
||||
// before applying the update.
|
||||
fallthrough
|
||||
nMsg.err <- nil
|
||||
return nil
|
||||
|
||||
case channeldb.ErrGraphNotFound:
|
||||
fallthrough
|
||||
case channeldb.ErrGraphNoEdgesFound:
|
||||
fallthrough
|
||||
case channeldb.ErrEdgeNotFound:
|
||||
// If the edge corresponding to this ChannelUpdate was
|
||||
// not found in the graph, this might be a channel in
|
||||
// the process of being opened, and we haven't processed
|
||||
// our own ChannelAnnouncement yet, hence it is not
|
||||
// found in the graph. This usually gets resolved after
|
||||
// the channel proofs are exchanged and the channel is
|
||||
// broadcasted to the rest of the network, but in case
|
||||
// this is a private channel this won't ever happen.
|
||||
// This can also happen in the case of a zombie channel
|
||||
// with a fresh update for which we don't have a
|
||||
// ChannelAnnouncement for since we reject them. Because
|
||||
// of this, we temporarily add it to a map, and
|
||||
// reprocess it after our own ChannelAnnouncement has
|
||||
// been processed.
|
||||
d.pChanUpdMtx.Lock()
|
||||
d.prematureChannelUpdates[shortChanID] = append(
|
||||
d.prematureChannelUpdates[shortChanID], nMsg,
|
||||
)
|
||||
d.pChanUpdMtx.Unlock()
|
||||
|
||||
log.Debugf("Got ChannelUpdate for edge not found in "+
|
||||
"graph(shortChanID=%v), saving for "+
|
||||
"reprocessing later", shortChanID)
|
||||
"graph(shortChanID=%v)", shortChanID)
|
||||
|
||||
// NOTE: We don't return anything on the error channel
|
||||
// for this message, as we expect that will be done when
|
||||
// this ChannelUpdate is later reprocessed.
|
||||
// TODO: Add to recentRejects like below? Return error?
|
||||
nMsg.err <- nil
|
||||
return nil
|
||||
|
||||
default:
|
||||
@ -2326,17 +2245,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
source: nMsg.source,
|
||||
msg: chanAnn,
|
||||
})
|
||||
if e1Ann != nil {
|
||||
if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
|
||||
announcements = append(announcements, networkMsg{
|
||||
peer: nMsg.peer,
|
||||
source: nMsg.source,
|
||||
source: src,
|
||||
msg: e1Ann,
|
||||
})
|
||||
}
|
||||
if e2Ann != nil {
|
||||
if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
|
||||
announcements = append(announcements, networkMsg{
|
||||
peer: nMsg.peer,
|
||||
source: nMsg.source,
|
||||
source: src,
|
||||
msg: e2Ann,
|
||||
})
|
||||
}
|
||||
|
@ -907,8 +907,10 @@ func TestPrematureAnnouncement(t *testing.T) {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
|
||||
t.Fatal("announcement was proceeded")
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
|
||||
@ -925,8 +927,10 @@ func TestPrematureAnnouncement(t *testing.T) {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
|
||||
t.Fatal("announcement was proceeded")
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
|
||||
@ -2372,27 +2376,13 @@ func TestProcessZombieEdgeNowLive(t *testing.T) {
|
||||
|
||||
// The channel update cannot be successfully processed and broadcast
|
||||
// until the channel announcement is. Since the channel update indicates
|
||||
// a fresh new update, the gossiper should stash it until it sees the
|
||||
// corresponding channel announcement.
|
||||
updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement(
|
||||
// a fresh new update, the gossiper should mark the channel as live and
|
||||
// allow it once it sees it again.
|
||||
errChan := ctx.gossiper.ProcessRemoteAnnouncement(
|
||||
batch.chanUpdAnn2, remotePeer,
|
||||
)
|
||||
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("expected to not broadcast live channel update " +
|
||||
"without announcement")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
// We'll go ahead and process the channel announcement to ensure the
|
||||
// channel update is processed thereafter.
|
||||
processAnnouncement(batch.remoteChanAnn, false, false)
|
||||
|
||||
// After successfully processing the announcement, the channel update
|
||||
// should have been processed and broadcast successfully as well.
|
||||
select {
|
||||
case err := <-updateErrChan:
|
||||
case err := <-errChan:
|
||||
if err != nil {
|
||||
t.Fatalf("expected to process live channel update: %v",
|
||||
err)
|
||||
@ -2401,226 +2391,17 @@ func TestProcessZombieEdgeNowLive(t *testing.T) {
|
||||
t.Fatal("expected to process announcement")
|
||||
}
|
||||
|
||||
select {
|
||||
case msgWithSenders := <-ctx.broadcastedMessage:
|
||||
assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg)
|
||||
case <-time.After(2 * trickleDelay):
|
||||
t.Fatal("expected to broadcast live channel update")
|
||||
}
|
||||
}
|
||||
|
||||
// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate
|
||||
// from the remote before we have processed our own ChannelAnnouncement, it will
|
||||
// be reprocessed later, after our ChannelAnnouncement.
|
||||
func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
|
||||
if err != nil {
|
||||
t.Fatalf("can't create context: %v", err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
batch, err := createAnnouncements(0)
|
||||
if err != nil {
|
||||
t.Fatalf("can't generate announcements: %v", err)
|
||||
}
|
||||
|
||||
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
|
||||
if err != nil {
|
||||
t.Fatalf("unable to parse pubkey: %v", err)
|
||||
}
|
||||
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
|
||||
if err != nil {
|
||||
t.Fatalf("unable to parse pubkey: %v", err)
|
||||
}
|
||||
|
||||
// Set up a channel that we can use to inspect the messages sent
|
||||
// directly from the gossiper.
|
||||
sentMsgs := make(chan lnwire.Message, 10)
|
||||
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
|
||||
|
||||
// Override NotifyWhenOnline to return the remote peer which we expect
|
||||
// meesages to be sent to.
|
||||
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
|
||||
peerChan chan<- lnpeer.Peer) {
|
||||
|
||||
peerChan <- remotePeer
|
||||
}
|
||||
|
||||
// Recreate the case where the remote node is sending us its ChannelUpdate
|
||||
// before we have been able to process our own ChannelAnnouncement and
|
||||
// ChannelUpdate.
|
||||
errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(
|
||||
batch.chanUpdAnn2, remotePeer,
|
||||
)
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
t.Fatal("expected to not broadcast live channel update " +
|
||||
"without announcement")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process node ann: %v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("node announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
// Since the remote ChannelUpdate was added for an edge that
|
||||
// we did not already know about, it should have been added
|
||||
// to the map of premature ChannelUpdates. Check that nothing
|
||||
// was added to the graph.
|
||||
chanInfo, e1, e2, err := ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID)
|
||||
if err != channeldb.ErrEdgeNotFound {
|
||||
t.Fatalf("Expected ErrEdgeNotFound, got: %v", err)
|
||||
}
|
||||
if chanInfo != nil {
|
||||
t.Fatalf("chanInfo was not nil")
|
||||
}
|
||||
if e1 != nil {
|
||||
t.Fatalf("e1 was not nil")
|
||||
}
|
||||
if e2 != nil {
|
||||
t.Fatalf("e2 was not nil")
|
||||
}
|
||||
|
||||
// Recreate lightning network topology. Initialize router with channel
|
||||
// between two nodes.
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1, localKey)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("node announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
// The local ChannelUpdate should now be sent directly to the remote peer,
|
||||
// such that the edge can be used for routing, regardless if this channel
|
||||
// is announced or not (private channel).
|
||||
select {
|
||||
case msg := <-sentMsgs:
|
||||
assertMessage(t, batch.chanUpdAnn1, msg)
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("gossiper did not send channel update to peer")
|
||||
}
|
||||
|
||||
// At this point the remote ChannelUpdate we received earlier should
|
||||
// be reprocessed, as we now have the necessary edge entry in the graph.
|
||||
select {
|
||||
case err := <-errRemoteAnn:
|
||||
if err != nil {
|
||||
t.Fatalf("error re-processing remote update: %v", err)
|
||||
}
|
||||
case <-time.After(2 * trickleDelay):
|
||||
t.Fatalf("remote update was not processed")
|
||||
}
|
||||
|
||||
// Check that the ChannelEdgePolicy was added to the graph.
|
||||
chanInfo, e1, e2, err = ctx.router.GetChannelByID(
|
||||
batch.chanUpdAnn1.ShortChannelID,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel from router: %v", err)
|
||||
}
|
||||
if chanInfo == nil {
|
||||
t.Fatalf("chanInfo was nil")
|
||||
}
|
||||
if e1 == nil {
|
||||
t.Fatalf("e1 was nil")
|
||||
}
|
||||
if e2 == nil {
|
||||
t.Fatalf("e2 was nil")
|
||||
}
|
||||
|
||||
// Pretending that we receive local channel announcement from funding
|
||||
// manager, thereby kick off the announcement exchange process.
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(
|
||||
batch.localProofAnn, localKey,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("announcements were broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
number := 0
|
||||
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
|
||||
func(*channeldb.WaitingProof) error {
|
||||
number++
|
||||
return nil
|
||||
},
|
||||
func() {
|
||||
number = 0
|
||||
},
|
||||
); err != nil {
|
||||
t.Fatalf("unable to retrieve objects from store: %v", err)
|
||||
}
|
||||
|
||||
if number != 1 {
|
||||
t.Fatal("wrong number of objects in storage")
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(
|
||||
batch.remoteProofAnn, remotePeer,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("announcement wasn't broadcast")
|
||||
}
|
||||
}
|
||||
|
||||
number = 0
|
||||
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
|
||||
func(*channeldb.WaitingProof) error {
|
||||
number++
|
||||
return nil
|
||||
},
|
||||
func() {
|
||||
number = 0
|
||||
},
|
||||
); err != nil && err != channeldb.ErrWaitingProofNotFound {
|
||||
t.Fatalf("unable to retrieve objects from store: %v", err)
|
||||
}
|
||||
|
||||
if number != 0 {
|
||||
t.Fatal("waiting proof should be removed from storage")
|
||||
}
|
||||
// Re-process the channel announcement and update. Both should be
|
||||
// applied to the graph and broadcast.
|
||||
processAnnouncement(batch.remoteChanAnn, false, false)
|
||||
processAnnouncement(batch.chanUpdAnn2, false, false)
|
||||
}
|
||||
|
||||
// TestExtraDataChannelAnnouncementValidation tests that we're able to properly
|
||||
|
@ -250,3 +250,4 @@
|
||||
<time> [ERR] UTXN: error while graduating class at height=<height>: TxNotifier is exiting
|
||||
<time> [ERR] UTXN: Failed to sweep first-stage HTLC (CLTV-delayed) output <chan_point>
|
||||
<time> [ERR] UTXN: Notification chan closed, can't advance output <chan_point>
|
||||
<time> [ERR] DISC: Unable to rebroadcast stale announcements: unable to retrieve outgoing channels: channel from self node has no policy
|
||||
|
Loading…
Reference in New Issue
Block a user