Merge pull request #5003 from Roasbeef/premature-upd-revert

Revert "Merge pull request #4895 from wpaulino/disallow-premature-cha…
This commit is contained in:
Olaoluwa Osuntokun 2021-02-10 14:44:18 -08:00 committed by GitHub
commit 503536c1e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 329 additions and 29 deletions

@ -274,6 +274,13 @@ type AuthenticatedGossiper struct {
// every new block height. // every new block height.
blockEpochs *chainntnfs.BlockEpochEvent blockEpochs *chainntnfs.BlockEpochEvent
// prematureChannelUpdates is a map of ChannelUpdates we have received
// that wasn't associated with any channel we know about. We store
// them temporarily, such that we can reprocess them when a
// ChannelAnnouncement for the channel is received.
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
// networkMsgs is a channel that carries new network broadcasted // networkMsgs is a channel that carries new network broadcasted
// message from outside the gossiper service to be processed by the // message from outside the gossiper service to be processed by the
// networkHandler. // networkHandler.
@ -330,6 +337,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
networkMsgs: make(chan *networkMsg), networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}), quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(), channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}), recentRejects: make(map[uint64]struct{}),
heightForLastChanUpdate: make(map[uint64][2]uint32), heightForLastChanUpdate: make(map[uint64][2]uint32),
@ -1024,7 +1032,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
}() }()
// A new block has arrived, update our best height. // A new block has arrived, so we can re-process the previously
// premature announcements.
case newBlock, ok := <-d.blockEpochs.Epochs: case newBlock, ok := <-d.blockEpochs.Epochs:
// If the channel has been closed, then this indicates // If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves. // the daemon is shutting down, so we exit ourselves.
@ -1691,6 +1700,55 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil return nil
} }
// If we earlier received any ChannelUpdates for this channel,
// we can now process them, as the channel is added to the
// graph.
shortChanID := msg.ShortChannelID.ToUint64()
var channelUpdates []*networkMsg
d.pChanUpdMtx.Lock()
channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...)
// Now delete the premature ChannelUpdates, since we added them
// all to the queue of network messages.
delete(d.prematureChannelUpdates, shortChanID)
d.pChanUpdMtx.Unlock()
// Launch a new goroutine to handle each ChannelUpdate, this to
// ensure we don't block here, as we can handle only one
// announcement at a time.
for _, cu := range channelUpdates {
d.wg.Add(1)
go func(nMsg *networkMsg) {
defer d.wg.Done()
switch msg := nMsg.msg.(type) {
// Reprocess the message, making sure we return
// an error to the original caller in case the
// gossiper shuts down.
case *lnwire.ChannelUpdate:
log.Debugf("Reprocessing"+
" ChannelUpdate for "+
"shortChanID=%v",
msg.ShortChannelID.ToUint64())
select {
case d.networkMsgs <- nMsg:
case <-d.quit:
nMsg.err <- ErrGossiperShuttingDown
}
// We don't expect any other message type than
// ChannelUpdate to be in this map.
default:
log.Errorf("Unsupported message type "+
"found among ChannelUpdates: "+
"%T", msg)
}
}(cu)
}
// Channel announcement was successfully proceeded and know it // Channel announcement was successfully proceeded and know it
// might be broadcast to other connected nodes if it was // might be broadcast to other connected nodes if it was
// announcement with proof (remote). // announcement with proof (remote).
@ -1729,7 +1787,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID := msg.ShortChannelID.ToUint64() shortChanID := msg.ShortChannelID.ToUint64()
// If the advertised inclusionary block is beyond our knowledge // If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll ignore it. // of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain.
d.Lock() d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
log.Infof("Update announcement for "+ log.Infof("Update announcement for "+
@ -1738,8 +1797,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID, blockHeight, shortChanID, blockHeight,
d.bestHeight) d.bestHeight)
d.Unlock() d.Unlock()
nMsg.err <- nil
return nil return nil
} }
d.Unlock() d.Unlock()
@ -1810,19 +1867,43 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Debugf("Removed edge with chan_id=%v from zombie "+ log.Debugf("Removed edge with chan_id=%v from zombie "+
"index", msg.ShortChannelID) "index", msg.ShortChannelID)
nMsg.err <- nil // We'll fallthrough to ensure we stash the update until
return nil // we receive its corresponding ChannelAnnouncement.
// This is needed to ensure the edge exists in the graph
// before applying the update.
fallthrough
case channeldb.ErrGraphNotFound: case channeldb.ErrGraphNotFound:
fallthrough fallthrough
case channeldb.ErrGraphNoEdgesFound: case channeldb.ErrGraphNoEdgesFound:
fallthrough fallthrough
case channeldb.ErrEdgeNotFound: case channeldb.ErrEdgeNotFound:
log.Debugf("Got ChannelUpdate for edge not found in "+ // If the edge corresponding to this ChannelUpdate was
"graph(shortChanID=%v)", shortChanID) // not found in the graph, this might be a channel in
// the process of being opened, and we haven't processed
// our own ChannelAnnouncement yet, hence it is not
// found in the graph. This usually gets resolved after
// the channel proofs are exchanged and the channel is
// broadcasted to the rest of the network, but in case
// this is a private channel this won't ever happen.
// This can also happen in the case of a zombie channel
// with a fresh update for which we don't have a
// ChannelAnnouncement for since we reject them. Because
// of this, we temporarily add it to a map, and
// reprocess it after our own ChannelAnnouncement has
// been processed.
d.pChanUpdMtx.Lock()
d.prematureChannelUpdates[shortChanID] = append(
d.prematureChannelUpdates[shortChanID], nMsg,
)
d.pChanUpdMtx.Unlock()
// TODO: Add to recentRejects like below? Return error? log.Debugf("Got ChannelUpdate for edge not found in "+
nMsg.err <- nil "graph(shortChanID=%v), saving for "+
"reprocessing later", shortChanID)
// NOTE: We don't return anything on the error channel
// for this message, as we expect that will be done when
// this ChannelUpdate is later reprocessed.
return nil return nil
default: default:

@ -908,10 +908,8 @@ func TestPrematureAnnouncement(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
if err != nil { t.Fatal("announcement was proceeded")
t.Fatal(err)
}
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
} }
@ -928,10 +926,8 @@ func TestPrematureAnnouncement(t *testing.T) {
} }
select { select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
if err != nil { t.Fatal("announcement was proceeded")
t.Fatal(err)
}
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
} }
@ -2377,13 +2373,27 @@ func TestProcessZombieEdgeNowLive(t *testing.T) {
// The channel update cannot be successfully processed and broadcast // The channel update cannot be successfully processed and broadcast
// until the channel announcement is. Since the channel update indicates // until the channel announcement is. Since the channel update indicates
// a fresh new update, the gossiper should mark the channel as live and // a fresh new update, the gossiper should stash it until it sees the
// allow it once it sees it again. // corresponding channel announcement.
errChan := ctx.gossiper.ProcessRemoteAnnouncement( updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer, batch.chanUpdAnn2, remotePeer,
) )
select { select {
case err := <-errChan: case <-ctx.broadcastedMessage:
t.Fatal("expected to not broadcast live channel update " +
"without announcement")
case <-time.After(2 * trickleDelay):
}
// We'll go ahead and process the channel announcement to ensure the
// channel update is processed thereafter.
processAnnouncement(batch.remoteChanAnn, false, false)
// After successfully processing the announcement, the channel update
// should have been processed and broadcast successfully as well.
select {
case err := <-updateErrChan:
if err != nil { if err != nil {
t.Fatalf("expected to process live channel update: %v", t.Fatalf("expected to process live channel update: %v",
err) err)
@ -2392,17 +2402,226 @@ func TestProcessZombieEdgeNowLive(t *testing.T) {
t.Fatal("expected to process announcement") t.Fatal("expected to process announcement")
} }
select {
case msgWithSenders := <-ctx.broadcastedMessage:
assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg)
case <-time.After(2 * trickleDelay):
t.Fatal("expected to broadcast live channel update")
}
}
// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate
// from the remote before we have processed our own ChannelAnnouncement, it will
// be reprocessed later, after our ChannelAnnouncement.
func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
// Set up a channel that we can use to inspect the messages sent
// directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10)
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
// Override NotifyWhenOnline to return the remote peer which we expect
// meesages to be sent to.
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
peerChan chan<- lnpeer.Peer) {
peerChan <- remotePeer
}
// Recreate the case where the remote node is sending us its ChannelUpdate
// before we have been able to process our own ChannelAnnouncement and
// ChannelUpdate.
errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select { select {
case <-ctx.broadcastedMessage: case <-ctx.broadcastedMessage:
t.Fatal("expected to not broadcast live channel update " + t.Fatal("channel update announcement was broadcast")
"without announcement")
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
// Re-process the channel announcement and update. Both should be err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer)
// applied to the graph and broadcast. if err != nil {
processAnnouncement(batch.remoteChanAnn, false, false) t.Fatalf("unable to process node ann: %v", err)
processAnnouncement(batch.chanUpdAnn2, false, false) }
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Since the remote ChannelUpdate was added for an edge that
// we did not already know about, it should have been added
// to the map of premature ChannelUpdates. Check that nothing
// was added to the graph.
chanInfo, e1, e2, err := ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID)
if err != channeldb.ErrEdgeNotFound {
t.Fatalf("Expected ErrEdgeNotFound, got: %v", err)
}
if chanInfo != nil {
t.Fatalf("chanInfo was not nil")
}
if e1 != nil {
t.Fatalf("e1 was not nil")
}
if e2 != nil {
t.Fatalf("e2 was not nil")
}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The local ChannelUpdate should now be sent directly to the remote peer,
// such that the edge can be used for routing, regardless if this channel
// is announced or not (private channel).
select {
case msg := <-sentMsgs:
assertMessage(t, batch.chanUpdAnn1, msg)
case <-time.After(1 * time.Second):
t.Fatal("gossiper did not send channel update to peer")
}
// At this point the remote ChannelUpdate we received earlier should
// be reprocessed, as we now have the necessary edge entry in the graph.
select {
case err := <-errRemoteAnn:
if err != nil {
t.Fatalf("error re-processing remote update: %v", err)
}
case <-time.After(2 * trickleDelay):
t.Fatalf("remote update was not processed")
}
// Check that the ChannelEdgePolicy was added to the graph.
chanInfo, e1, e2, err = ctx.router.GetChannelByID(
batch.chanUpdAnn1.ShortChannelID,
)
if err != nil {
t.Fatalf("unable to get channel from router: %v", err)
}
if chanInfo == nil {
t.Fatalf("chanInfo was nil")
}
if e1 == nil {
t.Fatalf("e1 was nil")
}
if e2 == nil {
t.Fatalf("e2 was nil")
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("announcements were broadcast")
case <-time.After(2 * trickleDelay):
}
number := 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
func() {
number = 0
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
for i := 0; i < 4; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
}
number = 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
func() {
number = 0
},
); err != nil && err != channeldb.ErrWaitingProofNotFound {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("waiting proof should be removed from storage")
}
} }
// TestExtraDataChannelAnnouncementValidation tests that we're able to properly // TestExtraDataChannelAnnouncementValidation tests that we're able to properly