diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index d165447f..c7a228f8 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -529,12 +529,16 @@ func assertSyncerStatus(t *testing.T, s *GossipSyncer, syncState syncerState, func assertTransitionToChansSynced(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() - assertMsgSent(t, peer, &lnwire.QueryChannelRange{ + query := &lnwire.QueryChannelRange{ FirstBlockHeight: 0, NumBlocks: math.MaxUint32, - }) + } + assertMsgSent(t, peer, query) - s.ProcessQueryMsg(&lnwire.ReplyChannelRange{Complete: 1}, nil) + s.ProcessQueryMsg(&lnwire.ReplyChannelRange{ + QueryChannelRange: *query, + Complete: 1, + }, nil) chanSeries := s.cfg.channelSeries.(*mockChannelGraphTimeSeries) diff --git a/discovery/syncer.go b/discovery/syncer.go index 9d24e50c..8348abc5 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -298,6 +298,17 @@ type GossipSyncer struct { // received over, these will be read by the replyHandler. queryMsgs chan lnwire.Message + // curQueryRangeMsg keeps track of the latest QueryChannelRange message + // we've sent to a peer to ensure we've consumed all expected replies. + // This field is primarily used within the waitingQueryChanReply state. + curQueryRangeMsg *lnwire.QueryChannelRange + + // prevReplyChannelRange keeps track of the previous ReplyChannelRange + // message we've received from a peer to ensure they've fully replied to + // our query by ensuring they covered our requested block range. This + // field is primarily used within the waitingQueryChanReply state. + prevReplyChannelRange *lnwire.ReplyChannelRange + // bufferedChanRangeReplies is used in the waitingQueryChanReply to // buffer all the chunked response to our query. bufferedChanRangeReplies []lnwire.ShortChannelID @@ -666,10 +677,64 @@ func (g *GossipSyncer) synchronizeChanIDs() (bool, error) { return false, err } +// isLegacyReplyChannelRange determines where a ReplyChannelRange message is +// considered legacy. There was a point where lnd used to include the same query +// over multiple replies, rather than including the portion of the query the +// reply is handling. We'll use this as a way of detecting whether we are +// communicating with a legacy node so we can properly sync with them. +func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange, + reply *lnwire.ReplyChannelRange) bool { + + return reply.QueryChannelRange == *query +} + // processChanRangeReply is called each time the GossipSyncer receives a new // reply to the initial range query to discover new channels that it didn't // previously know of. func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error { + // If we're not communicating with a legacy node, we'll apply some + // further constraints on their reply to ensure it satisfies our query. + if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) { + // The first block should be within our original request. + if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight { + return fmt.Errorf("reply includes channels for height "+ + "%v prior to query %v", msg.FirstBlockHeight, + g.curQueryRangeMsg.FirstBlockHeight) + } + + // The last block should also be. We don't need to check the + // intermediate ones because they should already be in sorted + // order. + replyLastHeight := msg.QueryChannelRange.LastBlockHeight() + queryLastHeight := g.curQueryRangeMsg.LastBlockHeight() + if replyLastHeight > queryLastHeight { + return fmt.Errorf("reply includes channels for height "+ + "%v after query %v", replyLastHeight, + queryLastHeight) + } + + // If we've previously received a reply for this query, look at + // its last block to ensure the current reply properly follows + // it. + if g.prevReplyChannelRange != nil { + prevReply := g.prevReplyChannelRange + prevReplyLastHeight := prevReply.LastBlockHeight() + + // The current reply can either start from the previous + // reply's last block, if there are still more channels + // for the same block, or the block after. + if msg.FirstBlockHeight != prevReplyLastHeight && + msg.FirstBlockHeight != prevReplyLastHeight+1 { + + return fmt.Errorf("first block of reply %v "+ + "does not continue from last block of "+ + "previous %v", msg.FirstBlockHeight, + prevReplyLastHeight) + } + } + } + + g.prevReplyChannelRange = msg g.bufferedChanRangeReplies = append( g.bufferedChanRangeReplies, msg.ShortChanIDs..., ) @@ -679,8 +744,25 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro // If this isn't the last response, then we can exit as we've already // buffered the latest portion of the streaming reply. - if msg.Complete == 0 { - return nil + switch { + // If we're communicating with a legacy node, we'll need to look at the + // complete field. + case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg): + if msg.Complete == 0 { + return nil + } + + // Otherwise, we'll look at the reply's height range. + default: + replyLastHeight := msg.QueryChannelRange.LastBlockHeight() + queryLastHeight := g.curQueryRangeMsg.LastBlockHeight() + + // TODO(wilmer): This might require some padding if the remote + // node is not aware of the last height we sent them, i.e., is + // behind a few blocks from us. + if replyLastHeight < queryLastHeight { + return nil + } } log.Infof("GossipSyncer(%x): filtering through %v chans", @@ -696,8 +778,10 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro } // As we've received the entirety of the reply, we no longer need to - // hold on to the set of buffered replies, so we'll let that be garbage - // collected now. + // hold on to the set of buffered replies or the original query that + // prompted the replies, so we'll let that be garbage collected now. + g.curQueryRangeMsg = nil + g.prevReplyChannelRange = nil g.bufferedChanRangeReplies = nil // If there aren't any channels that we don't know of, then we can @@ -757,11 +841,14 @@ func (g *GossipSyncer) genChanRangeQuery( // Finally, we'll craft the channel range query, using our starting // height, then asking for all known channels to the foreseeable end of // the main chain. - return &lnwire.QueryChannelRange{ + query := &lnwire.QueryChannelRange{ ChainHash: g.cfg.chainHash, FirstBlockHeight: startHeight, NumBlocks: math.MaxUint32 - startHeight, - }, nil + } + g.curQueryRangeMsg = query + + return query, nil } // replyPeerQueries is called in response to any query by the remote peer. diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 82c07044..1b1f8890 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -993,29 +993,87 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) { func TestGossipSyncerProcessChanRangeReply(t *testing.T) { t.Parallel() + t.Run("legacy", func(t *testing.T) { + testGossipSyncerProcessChanRangeReply(t, true) + }) + t.Run("block ranges", func(t *testing.T) { + testGossipSyncerProcessChanRangeReply(t, false) + }) +} + +// testGossipSyncerProcessChanRangeReply tests that we'll properly buffer +// replied channel replies until we have the complete version. The legacy +// option, if set, uses the Complete field of the reply to determine when we've +// received all expected replies. Otherwise, it looks at the block ranges of +// each reply instead. +func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { + t.Parallel() + // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. + highestID := lnwire.ShortChannelID{ + BlockHeight: latestKnownHeight, + } _, syncer, chanSeries := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), defaultEncoding, defaultChunkSize, + highestID, defaultEncoding, defaultChunkSize, ) startingState := syncer.state + query, err := syncer.genChanRangeQuery(true) + if err != nil { + t.Fatalf("unable to generate channel range query: %v", err) + } + + var replyQueries []*lnwire.QueryChannelRange + if legacy { + // Each reply query is the same as the original query in the + // legacy mode. + replyQueries = []*lnwire.QueryChannelRange{query, query, query} + } else { + // When interpreting block ranges, the first reply should start + // from our requested first block, and the last should end at + // our requested last block. + replyQueries = []*lnwire.QueryChannelRange{ + { + FirstBlockHeight: 0, + NumBlocks: 11, + }, + { + FirstBlockHeight: 11, + NumBlocks: 1, + }, + { + FirstBlockHeight: 12, + NumBlocks: query.NumBlocks - 12, + }, + } + } + replies := []*lnwire.ReplyChannelRange{ { + QueryChannelRange: *replyQueries[0], ShortChanIDs: []lnwire.ShortChannelID{ - lnwire.NewShortChanIDFromInt(10), + { + BlockHeight: 10, + }, }, }, { + QueryChannelRange: *replyQueries[1], ShortChanIDs: []lnwire.ShortChannelID{ - lnwire.NewShortChanIDFromInt(11), + { + BlockHeight: 11, + }, }, }, { - Complete: 1, + QueryChannelRange: *replyQueries[2], + Complete: 1, ShortChanIDs: []lnwire.ShortChannelID{ - lnwire.NewShortChanIDFromInt(12), + { + BlockHeight: 12, + }, }, }, } @@ -1036,9 +1094,15 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) { } expectedReq := []lnwire.ShortChannelID{ - lnwire.NewShortChanIDFromInt(10), - lnwire.NewShortChanIDFromInt(11), - lnwire.NewShortChanIDFromInt(12), + { + BlockHeight: 10, + }, + { + BlockHeight: 11, + }, + { + BlockHeight: 12, + }, } // As we're about to send the final response, we'll launch a goroutine @@ -1224,17 +1288,17 @@ func TestGossipSyncerDelayDOS(t *testing.T) { // First, we'll create two GossipSyncer instances with a canned // sendToPeer message to allow us to intercept their potential sends. - startHeight := lnwire.ShortChannelID{ + highestID := lnwire.ShortChannelID{ BlockHeight: 1144, } msgChan1, syncer1, chanSeries1 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, true, false, + highestID, defaultEncoding, chunkSize, true, false, ) syncer1.Start() defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, false, true, + highestID, defaultEncoding, chunkSize, false, true, ) syncer2.Start() defer syncer2.Stop() @@ -1264,9 +1328,10 @@ func TestGossipSyncerDelayDOS(t *testing.T) { // inherently disjoint. var syncer2Chans []lnwire.ShortChannelID for i := 0; i < numTotalChans; i++ { - syncer2Chans = append( - syncer2Chans, lnwire.NewShortChanIDFromInt(uint64(i)), - ) + syncer2Chans = append(syncer2Chans, lnwire.ShortChannelID{ + BlockHeight: highestID.BlockHeight - 1, + TxIndex: uint32(i), + }) } // We'll kick off the test by asserting syncer1 sends over the @@ -1494,17 +1559,17 @@ func TestGossipSyncerRoutineSync(t *testing.T) { // First, we'll create two GossipSyncer instances with a canned // sendToPeer message to allow us to intercept their potential sends. - startHeight := lnwire.ShortChannelID{ + highestID := lnwire.ShortChannelID{ BlockHeight: 1144, } msgChan1, syncer1, chanSeries1 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, true, false, + highestID, defaultEncoding, chunkSize, true, false, ) syncer1.Start() defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, false, true, + highestID, defaultEncoding, chunkSize, false, true, ) syncer2.Start() defer syncer2.Stop() @@ -1512,9 +1577,9 @@ func TestGossipSyncerRoutineSync(t *testing.T) { // Although both nodes are at the same height, syncer will have 3 chan // ID's that syncer1 doesn't know of. syncer2Chans := []lnwire.ShortChannelID{ - lnwire.NewShortChanIDFromInt(4), - lnwire.NewShortChanIDFromInt(5), - lnwire.NewShortChanIDFromInt(6), + {BlockHeight: highestID.BlockHeight - 3}, + {BlockHeight: highestID.BlockHeight - 2}, + {BlockHeight: highestID.BlockHeight - 1}, } // We'll kick off the test by passing over the QueryChannelRange @@ -1638,35 +1703,34 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { // our chunk parsing works properly. With this value we should get 3 // queries: two full chunks, and one lingering chunk. const chunkSize = 2 + const numChans = 3 // First, we'll create two GossipSyncer instances with a canned // sendToPeer message to allow us to intercept their potential sends. - startHeight := lnwire.ShortChannelID{ + highestID := lnwire.ShortChannelID{ BlockHeight: 1144, } msgChan1, syncer1, chanSeries1 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, + highestID, defaultEncoding, chunkSize, ) syncer1.Start() defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, + highestID, defaultEncoding, chunkSize, ) syncer2.Start() defer syncer2.Stop() // The channel state of both syncers will be identical. They should // recognize this, and skip the sync phase below. - syncer1Chans := []lnwire.ShortChannelID{ - lnwire.NewShortChanIDFromInt(1), - lnwire.NewShortChanIDFromInt(2), - lnwire.NewShortChanIDFromInt(3), - } - syncer2Chans := []lnwire.ShortChannelID{ - lnwire.NewShortChanIDFromInt(1), - lnwire.NewShortChanIDFromInt(2), - lnwire.NewShortChanIDFromInt(3), + var syncer1Chans, syncer2Chans []lnwire.ShortChannelID + for i := numChans; i > 0; i-- { + shortChanID := lnwire.ShortChannelID{ + BlockHeight: highestID.BlockHeight - uint32(i), + } + syncer1Chans = append(syncer1Chans, shortChanID) + syncer2Chans = append(syncer2Chans, shortChanID) } // We'll now kick off the test by allowing both side to send their