Revert "Merge pull request #4895 from wpaulino/disallow-premature-chan-updates"

This reverts commit 6e6384114c890cdfd486ace5885118150940df86, reversing
changes made to 98ea4332716f953c039308c4e28cb4e55f8f89bc.
This commit is contained in:
Olaoluwa Osuntokun 2021-02-09 19:55:45 -08:00
parent 5cec468ff4
commit 555de44d9f
2 changed files with 333 additions and 33 deletions

@ -274,6 +274,13 @@ type AuthenticatedGossiper struct {
// every new block height.
blockEpochs *chainntnfs.BlockEpochEvent
// prematureChannelUpdates is a map of ChannelUpdates we have received
// that wasn't associated with any channel we know about. We store
// them temporarily, such that we can reprocess them when a
// ChannelAnnouncement for the channel is received.
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
// networkMsgs is a channel that carries new network broadcasted
// message from outside the gossiper service to be processed by the
// networkHandler.
@ -330,6 +337,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
heightForLastChanUpdate: make(map[uint64][2]uint32),
@ -1024,7 +1032,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
}()
// A new block has arrived, update our best height.
// A new block has arrived, so we can re-process the previously
// premature announcements.
case newBlock, ok := <-d.blockEpochs.Epochs:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
@ -1691,6 +1700,55 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil
}
// If we earlier received any ChannelUpdates for this channel,
// we can now process them, as the channel is added to the
// graph.
shortChanID := msg.ShortChannelID.ToUint64()
var channelUpdates []*networkMsg
d.pChanUpdMtx.Lock()
channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...)
// Now delete the premature ChannelUpdates, since we added them
// all to the queue of network messages.
delete(d.prematureChannelUpdates, shortChanID)
d.pChanUpdMtx.Unlock()
// Launch a new goroutine to handle each ChannelUpdate, this to
// ensure we don't block here, as we can handle only one
// announcement at a time.
for _, cu := range channelUpdates {
d.wg.Add(1)
go func(nMsg *networkMsg) {
defer d.wg.Done()
switch msg := nMsg.msg.(type) {
// Reprocess the message, making sure we return
// an error to the original caller in case the
// gossiper shuts down.
case *lnwire.ChannelUpdate:
log.Debugf("Reprocessing"+
" ChannelUpdate for "+
"shortChanID=%v",
msg.ShortChannelID.ToUint64())
select {
case d.networkMsgs <- nMsg:
case <-d.quit:
nMsg.err <- ErrGossiperShuttingDown
}
// We don't expect any other message type than
// ChannelUpdate to be in this map.
default:
log.Errorf("Unsupported message type "+
"found among ChannelUpdates: "+
"%T", msg)
}
}(cu)
}
// Channel announcement was successfully proceeded and know it
// might be broadcast to other connected nodes if it was
// announcement with proof (remote).
@ -1729,7 +1787,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID := msg.ShortChannelID.ToUint64()
// If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll ignore it.
// of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain.
d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
log.Infof("Update announcement for "+
@ -1738,8 +1797,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID, blockHeight,
d.bestHeight)
d.Unlock()
nMsg.err <- nil
return nil
}
d.Unlock()
@ -1810,19 +1867,43 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Debugf("Removed edge with chan_id=%v from zombie "+
"index", msg.ShortChannelID)
nMsg.err <- nil
return nil
// We'll fallthrough to ensure we stash the update until
// we receive its corresponding ChannelAnnouncement.
// This is needed to ensure the edge exists in the graph
// before applying the update.
fallthrough
case channeldb.ErrGraphNotFound:
fallthrough
case channeldb.ErrGraphNoEdgesFound:
fallthrough
case channeldb.ErrEdgeNotFound:
log.Debugf("Got ChannelUpdate for edge not found in "+
"graph(shortChanID=%v)", shortChanID)
// If the edge corresponding to this ChannelUpdate was
// not found in the graph, this might be a channel in
// the process of being opened, and we haven't processed
// our own ChannelAnnouncement yet, hence it is not
// found in the graph. This usually gets resolved after
// the channel proofs are exchanged and the channel is
// broadcasted to the rest of the network, but in case
// this is a private channel this won't ever happen.
// This can also happen in the case of a zombie channel
// with a fresh update for which we don't have a
// ChannelAnnouncement for since we reject them. Because
// of this, we temporarily add it to a map, and
// reprocess it after our own ChannelAnnouncement has
// been processed.
d.pChanUpdMtx.Lock()
d.prematureChannelUpdates[shortChanID] = append(
d.prematureChannelUpdates[shortChanID], nMsg,
)
d.pChanUpdMtx.Unlock()
// TODO: Add to recentRejects like below? Return error?
nMsg.err <- nil
log.Debugf("Got ChannelUpdate for edge not found in "+
"graph(shortChanID=%v), saving for "+
"reprocessing later", shortChanID)
// NOTE: We don't return anything on the error channel
// for this message, as we expect that will be done when
// this ChannelUpdate is later reprocessed.
return nil
default:
@ -2255,17 +2336,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
source: nMsg.source,
msg: chanAnn,
})
if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
if e1Ann != nil {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: src,
source: nMsg.source,
msg: e1Ann,
})
}
if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
if e2Ann != nil {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: src,
source: nMsg.source,
msg: e2Ann,
})
}

@ -908,10 +908,8 @@ func TestPrematureAnnouncement(t *testing.T) {
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
if err != nil {
t.Fatal(err)
}
case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
t.Fatal("announcement was proceeded")
case <-time.After(100 * time.Millisecond):
}
@ -928,10 +926,8 @@ func TestPrematureAnnouncement(t *testing.T) {
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
if err != nil {
t.Fatal(err)
}
case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
t.Fatal("announcement was proceeded")
case <-time.After(100 * time.Millisecond):
}
@ -2377,13 +2373,27 @@ func TestProcessZombieEdgeNowLive(t *testing.T) {
// The channel update cannot be successfully processed and broadcast
// until the channel announcement is. Since the channel update indicates
// a fresh new update, the gossiper should mark the channel as live and
// allow it once it sees it again.
errChan := ctx.gossiper.ProcessRemoteAnnouncement(
// a fresh new update, the gossiper should stash it until it sees the
// corresponding channel announcement.
updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select {
case err := <-errChan:
case <-ctx.broadcastedMessage:
t.Fatal("expected to not broadcast live channel update " +
"without announcement")
case <-time.After(2 * trickleDelay):
}
// We'll go ahead and process the channel announcement to ensure the
// channel update is processed thereafter.
processAnnouncement(batch.remoteChanAnn, false, false)
// After successfully processing the announcement, the channel update
// should have been processed and broadcast successfully as well.
select {
case err := <-updateErrChan:
if err != nil {
t.Fatalf("expected to process live channel update: %v",
err)
@ -2392,17 +2402,226 @@ func TestProcessZombieEdgeNowLive(t *testing.T) {
t.Fatal("expected to process announcement")
}
select {
case msgWithSenders := <-ctx.broadcastedMessage:
assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg)
case <-time.After(2 * trickleDelay):
t.Fatal("expected to broadcast live channel update")
}
}
// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate
// from the remote before we have processed our own ChannelAnnouncement, it will
// be reprocessed later, after our ChannelAnnouncement.
func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
// Set up a channel that we can use to inspect the messages sent
// directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10)
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
// Override NotifyWhenOnline to return the remote peer which we expect
// meesages to be sent to.
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
peerChan chan<- lnpeer.Peer) {
peerChan <- remotePeer
}
// Recreate the case where the remote node is sending us its ChannelUpdate
// before we have been able to process our own ChannelAnnouncement and
// ChannelUpdate.
errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select {
case <-ctx.broadcastedMessage:
t.Fatal("expected to not broadcast live channel update " +
"without announcement")
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Re-process the channel announcement and update. Both should be
// applied to the graph and broadcast.
processAnnouncement(batch.remoteChanAnn, false, false)
processAnnouncement(batch.chanUpdAnn2, false, false)
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer)
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Since the remote ChannelUpdate was added for an edge that
// we did not already know about, it should have been added
// to the map of premature ChannelUpdates. Check that nothing
// was added to the graph.
chanInfo, e1, e2, err := ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID)
if err != channeldb.ErrEdgeNotFound {
t.Fatalf("Expected ErrEdgeNotFound, got: %v", err)
}
if chanInfo != nil {
t.Fatalf("chanInfo was not nil")
}
if e1 != nil {
t.Fatalf("e1 was not nil")
}
if e2 != nil {
t.Fatalf("e2 was not nil")
}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The local ChannelUpdate should now be sent directly to the remote peer,
// such that the edge can be used for routing, regardless if this channel
// is announced or not (private channel).
select {
case msg := <-sentMsgs:
assertMessage(t, batch.chanUpdAnn1, msg)
case <-time.After(1 * time.Second):
t.Fatal("gossiper did not send channel update to peer")
}
// At this point the remote ChannelUpdate we received earlier should
// be reprocessed, as we now have the necessary edge entry in the graph.
select {
case err := <-errRemoteAnn:
if err != nil {
t.Fatalf("error re-processing remote update: %v", err)
}
case <-time.After(2 * trickleDelay):
t.Fatalf("remote update was not processed")
}
// Check that the ChannelEdgePolicy was added to the graph.
chanInfo, e1, e2, err = ctx.router.GetChannelByID(
batch.chanUpdAnn1.ShortChannelID,
)
if err != nil {
t.Fatalf("unable to get channel from router: %v", err)
}
if chanInfo == nil {
t.Fatalf("chanInfo was nil")
}
if e1 == nil {
t.Fatalf("e1 was nil")
}
if e2 == nil {
t.Fatalf("e2 was nil")
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("announcements were broadcast")
case <-time.After(2 * trickleDelay):
}
number := 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
func() {
number = 0
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
for i := 0; i < 4; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
}
number = 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
func() {
number = 0
},
); err != nil && err != channeldb.ErrWaitingProofNotFound {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("waiting proof should be removed from storage")
}
}
// TestExtraDataChannelAnnouncementValidation tests that we're able to properly