discovery: reject announcements for known zombie edges
In this commit, we leverage the recently introduced zombie edge index to quickly reject announcements for edges we've previously deemed as zombies. Care has been taken to ensure we don't reject fresh updates for edges we've considered zombies.
This commit is contained in:
parent
44a01db0ef
commit
5cec4513de
@ -1571,8 +1571,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
return nil
|
||||
}
|
||||
|
||||
// At this point, we'll now ask the router if this is a stale
|
||||
// update. If so we can skip all the processing below.
|
||||
// At this point, we'll now ask the router if this is a
|
||||
// zombie/known edge. If so we can skip all the processing
|
||||
// below.
|
||||
if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) {
|
||||
nMsg.err <- nil
|
||||
return nil
|
||||
@ -1787,8 +1788,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
}
|
||||
|
||||
// Before we perform any of the expensive checks below, we'll
|
||||
// make sure that the router doesn't already have a fresher
|
||||
// announcement for this edge.
|
||||
// check whether this update is stale or is for a zombie
|
||||
// channel in order to quickly reject it.
|
||||
timestamp := time.Unix(int64(msg.Timestamp), 0)
|
||||
if d.cfg.Router.IsStaleEdgePolicy(
|
||||
msg.ShortChannelID, timestamp, msg.ChannelFlags,
|
||||
@ -1808,56 +1809,99 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
|
||||
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
|
||||
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case channeldb.ErrGraphNotFound:
|
||||
fallthrough
|
||||
case channeldb.ErrGraphNoEdgesFound:
|
||||
fallthrough
|
||||
case channeldb.ErrEdgeNotFound:
|
||||
// If the edge corresponding to this
|
||||
// ChannelUpdate was not found in the graph,
|
||||
// this might be a channel in the process of
|
||||
// being opened, and we haven't processed our
|
||||
// own ChannelAnnouncement yet, hence it is not
|
||||
// found in the graph. This usually gets
|
||||
// resolved after the channel proofs are
|
||||
// exchanged and the channel is broadcasted to
|
||||
// the rest of the network, but in case this
|
||||
// is a private channel this won't ever happen.
|
||||
// Because of this, we temporarily add it to a
|
||||
// map, and reprocess it after our own
|
||||
// ChannelAnnouncement has been processed.
|
||||
d.pChanUpdMtx.Lock()
|
||||
d.prematureChannelUpdates[shortChanID] = append(
|
||||
d.prematureChannelUpdates[shortChanID],
|
||||
nMsg,
|
||||
)
|
||||
d.pChanUpdMtx.Unlock()
|
||||
switch err {
|
||||
// No error, break.
|
||||
case nil:
|
||||
break
|
||||
|
||||
log.Debugf("Got ChannelUpdate for edge not "+
|
||||
"found in graph(shortChanID=%v), "+
|
||||
"saving for reprocessing later",
|
||||
shortChanID)
|
||||
case channeldb.ErrZombieEdge:
|
||||
// Since we've deemed the update as not stale above,
|
||||
// before marking it live, we'll make sure it has been
|
||||
// signed by the correct party. The least-significant
|
||||
// bit in the flag on the channel update tells us which
|
||||
// edge is being updated.
|
||||
var pubKey *btcec.PublicKey
|
||||
switch {
|
||||
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
|
||||
pubKey, _ = chanInfo.NodeKey1()
|
||||
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
|
||||
pubKey, _ = chanInfo.NodeKey2()
|
||||
}
|
||||
|
||||
// NOTE: We don't return anything on the error
|
||||
// channel for this message, as we expect that
|
||||
// will be done when this ChannelUpdate is
|
||||
// later reprocessed.
|
||||
return nil
|
||||
|
||||
default:
|
||||
err := fmt.Errorf("unable to validate "+
|
||||
"channel update short_chan_id=%v: %v",
|
||||
shortChanID, err)
|
||||
err := routing.VerifyChannelUpdateSignature(msg, pubKey)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("unable to verify channel "+
|
||||
"update signature: %v", err)
|
||||
log.Error(err)
|
||||
nMsg.err <- err
|
||||
|
||||
d.rejectMtx.Lock()
|
||||
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
|
||||
d.rejectMtx.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// With the signature valid, we'll proceed to mark the
|
||||
// edge as live and wait for the channel announcement to
|
||||
// come through again.
|
||||
err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("unable to remove edge with "+
|
||||
"chan_id=%v from zombie index: %v",
|
||||
msg.ShortChannelID, err)
|
||||
log.Error(err)
|
||||
nMsg.err <- err
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debugf("Removed edge with chan_id=%v from zombie "+
|
||||
"index", msg.ShortChannelID)
|
||||
|
||||
// We'll fallthrough to ensure we stash the update until
|
||||
// we receive its corresponding ChannelAnnouncement.
|
||||
// This is needed to ensure the edge exists in the graph
|
||||
// before applying the update.
|
||||
fallthrough
|
||||
case channeldb.ErrGraphNotFound:
|
||||
fallthrough
|
||||
case channeldb.ErrGraphNoEdgesFound:
|
||||
fallthrough
|
||||
case channeldb.ErrEdgeNotFound:
|
||||
// If the edge corresponding to this ChannelUpdate was
|
||||
// not found in the graph, this might be a channel in
|
||||
// the process of being opened, and we haven't processed
|
||||
// our own ChannelAnnouncement yet, hence it is not
|
||||
// found in the graph. This usually gets resolved after
|
||||
// the channel proofs are exchanged and the channel is
|
||||
// broadcasted to the rest of the network, but in case
|
||||
// this is a private channel this won't ever happen.
|
||||
// This can also happen in the case of a zombie channel
|
||||
// with a fresh update for which we don't have a
|
||||
// ChannelAnnouncement for since we reject them. Because
|
||||
// of this, we temporarily add it to a map, and
|
||||
// reprocess it after our own ChannelAnnouncement has
|
||||
// been processed.
|
||||
d.pChanUpdMtx.Lock()
|
||||
d.prematureChannelUpdates[shortChanID] = append(
|
||||
d.prematureChannelUpdates[shortChanID], nMsg,
|
||||
)
|
||||
d.pChanUpdMtx.Unlock()
|
||||
|
||||
log.Debugf("Got ChannelUpdate for edge not found in "+
|
||||
"graph(shortChanID=%v), saving for "+
|
||||
"reprocessing later", shortChanID)
|
||||
|
||||
// NOTE: We don't return anything on the error channel
|
||||
// for this message, as we expect that will be done when
|
||||
// this ChannelUpdate is later reprocessed.
|
||||
return nil
|
||||
|
||||
default:
|
||||
err := fmt.Errorf("unable to validate channel update "+
|
||||
"short_chan_id=%v: %v", shortChanID, err)
|
||||
log.Error(err)
|
||||
nMsg.err <- err
|
||||
|
||||
d.rejectMtx.Lock()
|
||||
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
|
||||
d.rejectMtx.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// The least-significant bit in the flag on the channel update
|
||||
|
@ -2201,6 +2201,259 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRejectZombieEdge ensures that we properly reject any announcements for
|
||||
// zombie edges.
|
||||
func TestRejectZombieEdge(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// We'll start by creating our test context with a batch of
|
||||
// announcements.
|
||||
ctx, cleanup, err := createTestCtx(0)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create test context: %v", err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
batch, err := createAnnouncements(0)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create announcements: %v", err)
|
||||
}
|
||||
remotePeer := &mockPeer{pk: nodeKeyPriv2.PubKey()}
|
||||
|
||||
// processAnnouncements is a helper closure we'll use to test that we
|
||||
// properly process/reject announcements based on whether they're for a
|
||||
// zombie edge or not.
|
||||
processAnnouncements := func(isZombie bool) {
|
||||
t.Helper()
|
||||
|
||||
errChan := ctx.gossiper.ProcessRemoteAnnouncement(
|
||||
batch.remoteChanAnn, remotePeer,
|
||||
)
|
||||
select {
|
||||
case err := <-errChan:
|
||||
if isZombie && err != nil {
|
||||
t.Fatalf("expected to reject live channel "+
|
||||
"announcement with nil error: %v", err)
|
||||
}
|
||||
if !isZombie && err != nil {
|
||||
t.Fatalf("expected to process live channel "+
|
||||
"announcement: %v", err)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("expected to process channel announcement")
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
if isZombie {
|
||||
t.Fatal("expected to not broadcast zombie " +
|
||||
"channel announcement")
|
||||
}
|
||||
case <-time.After(2 * trickleDelay):
|
||||
if !isZombie {
|
||||
t.Fatal("expected to broadcast live channel " +
|
||||
"announcement")
|
||||
}
|
||||
}
|
||||
|
||||
errChan = ctx.gossiper.ProcessRemoteAnnouncement(
|
||||
batch.chanUpdAnn2, remotePeer,
|
||||
)
|
||||
select {
|
||||
case err := <-errChan:
|
||||
if isZombie && err != nil {
|
||||
t.Fatalf("expected to reject zombie channel "+
|
||||
"update with nil error: %v", err)
|
||||
}
|
||||
if !isZombie && err != nil {
|
||||
t.Fatalf("expected to process live channel "+
|
||||
"update: %v", err)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("expected to process channel update")
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
if isZombie {
|
||||
t.Fatal("expected to not broadcast zombie " +
|
||||
"channel update")
|
||||
}
|
||||
case <-time.After(2 * trickleDelay):
|
||||
if !isZombie {
|
||||
t.Fatal("expected to broadcast live channel " +
|
||||
"update")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We'll mark the edge for which we'll process announcements for as a
|
||||
// zombie within the router. This should reject any announcements for
|
||||
// this edge while it remains as a zombie.
|
||||
chanID := batch.remoteChanAnn.ShortChannelID
|
||||
err = ctx.router.MarkEdgeZombie(
|
||||
chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to mark channel %v as zombie: %v", chanID, err)
|
||||
}
|
||||
|
||||
processAnnouncements(true)
|
||||
|
||||
// If we then mark the edge as live, the edge's zombie status should be
|
||||
// overridden and the announcements should be processed.
|
||||
if err := ctx.router.MarkEdgeLive(chanID); err != nil {
|
||||
t.Fatalf("unable mark channel %v as zombie: %v", chanID, err)
|
||||
}
|
||||
|
||||
processAnnouncements(false)
|
||||
}
|
||||
|
||||
// TestProcessZombieEdgeNowLive ensures that we can detect when a zombie edge
|
||||
// becomes live by receiving a fresh update.
|
||||
func TestProcessZombieEdgeNowLive(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// We'll start by creating our test context with a batch of
|
||||
// announcements.
|
||||
ctx, cleanup, err := createTestCtx(0)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create test context: %v", err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
batch, err := createAnnouncements(0)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create announcements: %v", err)
|
||||
}
|
||||
|
||||
localPrivKey := nodeKeyPriv1
|
||||
remotePrivKey := nodeKeyPriv2
|
||||
|
||||
remotePeer := &mockPeer{pk: remotePrivKey.PubKey()}
|
||||
|
||||
// processAnnouncement is a helper closure we'll use to ensure an
|
||||
// announcement is properly processed/rejected based on whether the edge
|
||||
// is a zombie or not. The expectsErr boolean can be used to determine
|
||||
// whether we should expect an error when processing the message, while
|
||||
// the isZombie boolean can be used to determine whether the
|
||||
// announcement should be or not be broadcast.
|
||||
processAnnouncement := func(ann lnwire.Message, isZombie, expectsErr bool) {
|
||||
t.Helper()
|
||||
|
||||
errChan := ctx.gossiper.ProcessRemoteAnnouncement(
|
||||
ann, remotePeer,
|
||||
)
|
||||
|
||||
var err error
|
||||
select {
|
||||
case err = <-errChan:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("expected to process announcement")
|
||||
}
|
||||
if expectsErr && err == nil {
|
||||
t.Fatal("expected error when processing announcement")
|
||||
}
|
||||
if !expectsErr && err != nil {
|
||||
t.Fatalf("received unexpected error when processing "+
|
||||
"announcement: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case msgWithSenders := <-ctx.broadcastedMessage:
|
||||
if isZombie {
|
||||
t.Fatal("expected to not broadcast zombie " +
|
||||
"channel message")
|
||||
}
|
||||
assertMessage(t, ann, msgWithSenders.msg)
|
||||
|
||||
case <-time.After(2 * trickleDelay):
|
||||
if !isZombie {
|
||||
t.Fatal("expected to broadcast live channel " +
|
||||
"message")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We'll generate a channel update with a timestamp far enough in the
|
||||
// past to consider it a zombie.
|
||||
zombieTimestamp := time.Now().Add(-routing.DefaultChannelPruneExpiry)
|
||||
batch.chanUpdAnn2.Timestamp = uint32(zombieTimestamp.Unix())
|
||||
if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil {
|
||||
t.Fatalf("unable to sign update with new timestamp: %v", err)
|
||||
}
|
||||
|
||||
// We'll also add the edge to our zombie index.
|
||||
chanID := batch.remoteChanAnn.ShortChannelID
|
||||
err = ctx.router.MarkEdgeZombie(
|
||||
chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable mark channel %v as zombie: %v", chanID, err)
|
||||
}
|
||||
|
||||
// Attempting to process the current channel update should fail due to
|
||||
// its edge being considered a zombie and its timestamp not being within
|
||||
// the live horizon. We should not expect an error here since it is just
|
||||
// a stale update.
|
||||
processAnnouncement(batch.chanUpdAnn2, true, false)
|
||||
|
||||
// Now we'll generate a new update with a fresh timestamp. This should
|
||||
// allow the channel update to be processed even though it is still
|
||||
// marked as a zombie within the index, since it is a fresh new update.
|
||||
// This won't work however since we'll sign it with the wrong private
|
||||
// key (local rather than remote).
|
||||
batch.chanUpdAnn2.Timestamp = uint32(time.Now().Unix())
|
||||
if err := signUpdate(localPrivKey, batch.chanUpdAnn2); err != nil {
|
||||
t.Fatalf("unable to sign update with new timestamp: %v", err)
|
||||
}
|
||||
|
||||
// We should expect an error due to the signature being invalid.
|
||||
processAnnouncement(batch.chanUpdAnn2, true, true)
|
||||
|
||||
// Signing it with the correct private key should allow it to be
|
||||
// processed.
|
||||
if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil {
|
||||
t.Fatalf("unable to sign update with new timestamp: %v", err)
|
||||
}
|
||||
|
||||
// The channel update cannot be successfully processed and broadcast
|
||||
// until the channel announcement is. Since the channel update indicates
|
||||
// a fresh new update, the gossiper should stash it until it sees the
|
||||
// corresponding channel announcement.
|
||||
updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement(
|
||||
batch.chanUpdAnn2, remotePeer,
|
||||
)
|
||||
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("expected to not broadcast live channel update " +
|
||||
"without announcement")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
// We'll go ahead and process the channel announcement to ensure the
|
||||
// channel update is processed thereafter.
|
||||
processAnnouncement(batch.remoteChanAnn, false, false)
|
||||
|
||||
// After successfully processing the announcement, the channel update
|
||||
// should have been processed and broadcast successfully as well.
|
||||
select {
|
||||
case err := <-updateErrChan:
|
||||
if err != nil {
|
||||
t.Fatalf("expected to process live channel update: %v",
|
||||
err)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("expected to process announcement")
|
||||
}
|
||||
|
||||
select {
|
||||
case msgWithSenders := <-ctx.broadcastedMessage:
|
||||
assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg)
|
||||
case <-time.After(2 * trickleDelay):
|
||||
t.Fatal("expected to broadcast live channel update")
|
||||
}
|
||||
}
|
||||
|
||||
// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate
|
||||
// from the remote before we have processed our own ChannelAnnouncement, it will
|
||||
// be reprocessed later, after our ChannelAnnouncement.
|
||||
|
Loading…
Reference in New Issue
Block a user