diff --git a/discovery/gossiper.go b/discovery/gossiper.go index d019c280..171b04ab 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1571,8 +1571,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - // At this point, we'll now ask the router if this is a stale - // update. If so we can skip all the processing below. + // At this point, we'll now ask the router if this is a + // zombie/known edge. If so we can skip all the processing + // below. if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { nMsg.err <- nil return nil @@ -1787,8 +1788,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } // Before we perform any of the expensive checks below, we'll - // make sure that the router doesn't already have a fresher - // announcement for this edge. + // check whether this update is stale or is for a zombie + // channel in order to quickly reject it. timestamp := time.Unix(int64(msg.Timestamp), 0) if d.cfg.Router.IsStaleEdgePolicy( msg.ShortChannelID, timestamp, msg.ChannelFlags, @@ -1808,56 +1809,99 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) - if err != nil { - switch err { - case channeldb.ErrGraphNotFound: - fallthrough - case channeldb.ErrGraphNoEdgesFound: - fallthrough - case channeldb.ErrEdgeNotFound: - // If the edge corresponding to this - // ChannelUpdate was not found in the graph, - // this might be a channel in the process of - // being opened, and we haven't processed our - // own ChannelAnnouncement yet, hence it is not - // found in the graph. This usually gets - // resolved after the channel proofs are - // exchanged and the channel is broadcasted to - // the rest of the network, but in case this - // is a private channel this won't ever happen. - // Because of this, we temporarily add it to a - // map, and reprocess it after our own - // ChannelAnnouncement has been processed. - d.pChanUpdMtx.Lock() - d.prematureChannelUpdates[shortChanID] = append( - d.prematureChannelUpdates[shortChanID], - nMsg, - ) - d.pChanUpdMtx.Unlock() + switch err { + // No error, break. + case nil: + break - log.Debugf("Got ChannelUpdate for edge not "+ - "found in graph(shortChanID=%v), "+ - "saving for reprocessing later", - shortChanID) + case channeldb.ErrZombieEdge: + // Since we've deemed the update as not stale above, + // before marking it live, we'll make sure it has been + // signed by the correct party. The least-significant + // bit in the flag on the channel update tells us which + // edge is being updated. + var pubKey *btcec.PublicKey + switch { + case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: + pubKey, _ = chanInfo.NodeKey1() + case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: + pubKey, _ = chanInfo.NodeKey2() + } - // NOTE: We don't return anything on the error - // channel for this message, as we expect that - // will be done when this ChannelUpdate is - // later reprocessed. - return nil - - default: - err := fmt.Errorf("unable to validate "+ - "channel update short_chan_id=%v: %v", - shortChanID, err) + err := routing.VerifyChannelUpdateSignature(msg, pubKey) + if err != nil { + err := fmt.Errorf("unable to verify channel "+ + "update signature: %v", err) log.Error(err) nMsg.err <- err - - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() return nil } + + // With the signature valid, we'll proceed to mark the + // edge as live and wait for the channel announcement to + // come through again. + err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID) + if err != nil { + err := fmt.Errorf("unable to remove edge with "+ + "chan_id=%v from zombie index: %v", + msg.ShortChannelID, err) + log.Error(err) + nMsg.err <- err + return nil + } + + log.Debugf("Removed edge with chan_id=%v from zombie "+ + "index", msg.ShortChannelID) + + // We'll fallthrough to ensure we stash the update until + // we receive its corresponding ChannelAnnouncement. + // This is needed to ensure the edge exists in the graph + // before applying the update. + fallthrough + case channeldb.ErrGraphNotFound: + fallthrough + case channeldb.ErrGraphNoEdgesFound: + fallthrough + case channeldb.ErrEdgeNotFound: + // If the edge corresponding to this ChannelUpdate was + // not found in the graph, this might be a channel in + // the process of being opened, and we haven't processed + // our own ChannelAnnouncement yet, hence it is not + // found in the graph. This usually gets resolved after + // the channel proofs are exchanged and the channel is + // broadcasted to the rest of the network, but in case + // this is a private channel this won't ever happen. + // This can also happen in the case of a zombie channel + // with a fresh update for which we don't have a + // ChannelAnnouncement for since we reject them. Because + // of this, we temporarily add it to a map, and + // reprocess it after our own ChannelAnnouncement has + // been processed. + d.pChanUpdMtx.Lock() + d.prematureChannelUpdates[shortChanID] = append( + d.prematureChannelUpdates[shortChanID], nMsg, + ) + d.pChanUpdMtx.Unlock() + + log.Debugf("Got ChannelUpdate for edge not found in "+ + "graph(shortChanID=%v), saving for "+ + "reprocessing later", shortChanID) + + // NOTE: We don't return anything on the error channel + // for this message, as we expect that will be done when + // this ChannelUpdate is later reprocessed. + return nil + + default: + err := fmt.Errorf("unable to validate channel update "+ + "short_chan_id=%v: %v", shortChanID, err) + log.Error(err) + nMsg.err <- err + + d.rejectMtx.Lock() + d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} + d.rejectMtx.Unlock() + return nil } // The least-significant bit in the flag on the channel update diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 2f7ca77c..8b8a3917 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -2201,6 +2201,259 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { } } +// TestRejectZombieEdge ensures that we properly reject any announcements for +// zombie edges. +func TestRejectZombieEdge(t *testing.T) { + t.Parallel() + + // We'll start by creating our test context with a batch of + // announcements. + ctx, cleanup, err := createTestCtx(0) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("unable to create announcements: %v", err) + } + remotePeer := &mockPeer{pk: nodeKeyPriv2.PubKey()} + + // processAnnouncements is a helper closure we'll use to test that we + // properly process/reject announcements based on whether they're for a + // zombie edge or not. + processAnnouncements := func(isZombie bool) { + t.Helper() + + errChan := ctx.gossiper.ProcessRemoteAnnouncement( + batch.remoteChanAnn, remotePeer, + ) + select { + case err := <-errChan: + if isZombie && err != nil { + t.Fatalf("expected to reject live channel "+ + "announcement with nil error: %v", err) + } + if !isZombie && err != nil { + t.Fatalf("expected to process live channel "+ + "announcement: %v", err) + } + case <-time.After(time.Second): + t.Fatal("expected to process channel announcement") + } + select { + case <-ctx.broadcastedMessage: + if isZombie { + t.Fatal("expected to not broadcast zombie " + + "channel announcement") + } + case <-time.After(2 * trickleDelay): + if !isZombie { + t.Fatal("expected to broadcast live channel " + + "announcement") + } + } + + errChan = ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, remotePeer, + ) + select { + case err := <-errChan: + if isZombie && err != nil { + t.Fatalf("expected to reject zombie channel "+ + "update with nil error: %v", err) + } + if !isZombie && err != nil { + t.Fatalf("expected to process live channel "+ + "update: %v", err) + } + case <-time.After(time.Second): + t.Fatal("expected to process channel update") + } + select { + case <-ctx.broadcastedMessage: + if isZombie { + t.Fatal("expected to not broadcast zombie " + + "channel update") + } + case <-time.After(2 * trickleDelay): + if !isZombie { + t.Fatal("expected to broadcast live channel " + + "update") + } + } + } + + // We'll mark the edge for which we'll process announcements for as a + // zombie within the router. This should reject any announcements for + // this edge while it remains as a zombie. + chanID := batch.remoteChanAnn.ShortChannelID + err = ctx.router.MarkEdgeZombie( + chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2, + ) + if err != nil { + t.Fatalf("unable to mark channel %v as zombie: %v", chanID, err) + } + + processAnnouncements(true) + + // If we then mark the edge as live, the edge's zombie status should be + // overridden and the announcements should be processed. + if err := ctx.router.MarkEdgeLive(chanID); err != nil { + t.Fatalf("unable mark channel %v as zombie: %v", chanID, err) + } + + processAnnouncements(false) +} + +// TestProcessZombieEdgeNowLive ensures that we can detect when a zombie edge +// becomes live by receiving a fresh update. +func TestProcessZombieEdgeNowLive(t *testing.T) { + t.Parallel() + + // We'll start by creating our test context with a batch of + // announcements. + ctx, cleanup, err := createTestCtx(0) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("unable to create announcements: %v", err) + } + + localPrivKey := nodeKeyPriv1 + remotePrivKey := nodeKeyPriv2 + + remotePeer := &mockPeer{pk: remotePrivKey.PubKey()} + + // processAnnouncement is a helper closure we'll use to ensure an + // announcement is properly processed/rejected based on whether the edge + // is a zombie or not. The expectsErr boolean can be used to determine + // whether we should expect an error when processing the message, while + // the isZombie boolean can be used to determine whether the + // announcement should be or not be broadcast. + processAnnouncement := func(ann lnwire.Message, isZombie, expectsErr bool) { + t.Helper() + + errChan := ctx.gossiper.ProcessRemoteAnnouncement( + ann, remotePeer, + ) + + var err error + select { + case err = <-errChan: + case <-time.After(time.Second): + t.Fatal("expected to process announcement") + } + if expectsErr && err == nil { + t.Fatal("expected error when processing announcement") + } + if !expectsErr && err != nil { + t.Fatalf("received unexpected error when processing "+ + "announcement: %v", err) + } + + select { + case msgWithSenders := <-ctx.broadcastedMessage: + if isZombie { + t.Fatal("expected to not broadcast zombie " + + "channel message") + } + assertMessage(t, ann, msgWithSenders.msg) + + case <-time.After(2 * trickleDelay): + if !isZombie { + t.Fatal("expected to broadcast live channel " + + "message") + } + } + } + + // We'll generate a channel update with a timestamp far enough in the + // past to consider it a zombie. + zombieTimestamp := time.Now().Add(-routing.DefaultChannelPruneExpiry) + batch.chanUpdAnn2.Timestamp = uint32(zombieTimestamp.Unix()) + if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil { + t.Fatalf("unable to sign update with new timestamp: %v", err) + } + + // We'll also add the edge to our zombie index. + chanID := batch.remoteChanAnn.ShortChannelID + err = ctx.router.MarkEdgeZombie( + chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2, + ) + if err != nil { + t.Fatalf("unable mark channel %v as zombie: %v", chanID, err) + } + + // Attempting to process the current channel update should fail due to + // its edge being considered a zombie and its timestamp not being within + // the live horizon. We should not expect an error here since it is just + // a stale update. + processAnnouncement(batch.chanUpdAnn2, true, false) + + // Now we'll generate a new update with a fresh timestamp. This should + // allow the channel update to be processed even though it is still + // marked as a zombie within the index, since it is a fresh new update. + // This won't work however since we'll sign it with the wrong private + // key (local rather than remote). + batch.chanUpdAnn2.Timestamp = uint32(time.Now().Unix()) + if err := signUpdate(localPrivKey, batch.chanUpdAnn2); err != nil { + t.Fatalf("unable to sign update with new timestamp: %v", err) + } + + // We should expect an error due to the signature being invalid. + processAnnouncement(batch.chanUpdAnn2, true, true) + + // Signing it with the correct private key should allow it to be + // processed. + if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil { + t.Fatalf("unable to sign update with new timestamp: %v", err) + } + + // The channel update cannot be successfully processed and broadcast + // until the channel announcement is. Since the channel update indicates + // a fresh new update, the gossiper should stash it until it sees the + // corresponding channel announcement. + updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, remotePeer, + ) + + select { + case <-ctx.broadcastedMessage: + t.Fatal("expected to not broadcast live channel update " + + "without announcement") + case <-time.After(2 * trickleDelay): + } + + // We'll go ahead and process the channel announcement to ensure the + // channel update is processed thereafter. + processAnnouncement(batch.remoteChanAnn, false, false) + + // After successfully processing the announcement, the channel update + // should have been processed and broadcast successfully as well. + select { + case err := <-updateErrChan: + if err != nil { + t.Fatalf("expected to process live channel update: %v", + err) + } + case <-time.After(time.Second): + t.Fatal("expected to process announcement") + } + + select { + case msgWithSenders := <-ctx.broadcastedMessage: + assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg) + case <-time.After(2 * trickleDelay): + t.Fatal("expected to broadcast live channel update") + } +} + // TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate // from the remote before we have processed our own ChannelAnnouncement, it will // be reprocessed later, after our ChannelAnnouncement.