discovery: interpret block range from ReplyChannelRange messages

We move from our legacy way of interpreting ReplyChannelRange messages
which was incorrect. Previously, we'd rely on the Complete field of the
ReplyChannelRange message to determine when our peer had sent all of
their replies. Now, we properly adhere to the specification by
interpreting the block ranges of these messages as intended.

Due to the large number of nodes deployed with the previous method, we
still maintain and detect when we are communicating with them, such that
we are still able to sync with them for backwards compatibility.
This commit is contained in:
Wilmer Paulino 2019-12-13 16:08:58 -08:00
parent d688e13d35
commit 1bacdfb41e
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 196 additions and 41 deletions

View File

@ -529,12 +529,16 @@ func assertSyncerStatus(t *testing.T, s *GossipSyncer, syncState syncerState,
func assertTransitionToChansSynced(t *testing.T, s *GossipSyncer, peer *mockPeer) {
t.Helper()
assertMsgSent(t, peer, &lnwire.QueryChannelRange{
query := &lnwire.QueryChannelRange{
FirstBlockHeight: 0,
NumBlocks: math.MaxUint32,
})
}
assertMsgSent(t, peer, query)
s.ProcessQueryMsg(&lnwire.ReplyChannelRange{Complete: 1}, nil)
s.ProcessQueryMsg(&lnwire.ReplyChannelRange{
QueryChannelRange: *query,
Complete: 1,
}, nil)
chanSeries := s.cfg.channelSeries.(*mockChannelGraphTimeSeries)

View File

@ -298,6 +298,17 @@ type GossipSyncer struct {
// received over, these will be read by the replyHandler.
queryMsgs chan lnwire.Message
// curQueryRangeMsg keeps track of the latest QueryChannelRange message
// we've sent to a peer to ensure we've consumed all expected replies.
// This field is primarily used within the waitingQueryChanReply state.
curQueryRangeMsg *lnwire.QueryChannelRange
// prevReplyChannelRange keeps track of the previous ReplyChannelRange
// message we've received from a peer to ensure they've fully replied to
// our query by ensuring they covered our requested block range. This
// field is primarily used within the waitingQueryChanReply state.
prevReplyChannelRange *lnwire.ReplyChannelRange
// bufferedChanRangeReplies is used in the waitingQueryChanReply to
// buffer all the chunked response to our query.
bufferedChanRangeReplies []lnwire.ShortChannelID
@ -666,10 +677,64 @@ func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
return false, err
}
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
// considered legacy. There was a point where lnd used to include the same query
// over multiple replies, rather than including the portion of the query the
// reply is handling. We'll use this as a way of detecting whether we are
// communicating with a legacy node so we can properly sync with them.
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
reply *lnwire.ReplyChannelRange) bool {
return reply.QueryChannelRange == *query
}
// processChanRangeReply is called each time the GossipSyncer receives a new
// reply to the initial range query to discover new channels that it didn't
// previously know of.
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
// If we're not communicating with a legacy node, we'll apply some
// further constraints on their reply to ensure it satisfies our query.
if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
// The first block should be within our original request.
if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
return fmt.Errorf("reply includes channels for height "+
"%v prior to query %v", msg.FirstBlockHeight,
g.curQueryRangeMsg.FirstBlockHeight)
}
// The last block should also be. We don't need to check the
// intermediate ones because they should already be in sorted
// order.
replyLastHeight := msg.QueryChannelRange.LastBlockHeight()
queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
if replyLastHeight > queryLastHeight {
return fmt.Errorf("reply includes channels for height "+
"%v after query %v", replyLastHeight,
queryLastHeight)
}
// If we've previously received a reply for this query, look at
// its last block to ensure the current reply properly follows
// it.
if g.prevReplyChannelRange != nil {
prevReply := g.prevReplyChannelRange
prevReplyLastHeight := prevReply.LastBlockHeight()
// The current reply can either start from the previous
// reply's last block, if there are still more channels
// for the same block, or the block after.
if msg.FirstBlockHeight != prevReplyLastHeight &&
msg.FirstBlockHeight != prevReplyLastHeight+1 {
return fmt.Errorf("first block of reply %v "+
"does not continue from last block of "+
"previous %v", msg.FirstBlockHeight,
prevReplyLastHeight)
}
}
}
g.prevReplyChannelRange = msg
g.bufferedChanRangeReplies = append(
g.bufferedChanRangeReplies, msg.ShortChanIDs...,
)
@ -679,8 +744,25 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
// If this isn't the last response, then we can exit as we've already
// buffered the latest portion of the streaming reply.
if msg.Complete == 0 {
return nil
switch {
// If we're communicating with a legacy node, we'll need to look at the
// complete field.
case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
if msg.Complete == 0 {
return nil
}
// Otherwise, we'll look at the reply's height range.
default:
replyLastHeight := msg.QueryChannelRange.LastBlockHeight()
queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
// TODO(wilmer): This might require some padding if the remote
// node is not aware of the last height we sent them, i.e., is
// behind a few blocks from us.
if replyLastHeight < queryLastHeight {
return nil
}
}
log.Infof("GossipSyncer(%x): filtering through %v chans",
@ -696,8 +778,10 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
}
// As we've received the entirety of the reply, we no longer need to
// hold on to the set of buffered replies, so we'll let that be garbage
// collected now.
// hold on to the set of buffered replies or the original query that
// prompted the replies, so we'll let that be garbage collected now.
g.curQueryRangeMsg = nil
g.prevReplyChannelRange = nil
g.bufferedChanRangeReplies = nil
// If there aren't any channels that we don't know of, then we can
@ -757,11 +841,14 @@ func (g *GossipSyncer) genChanRangeQuery(
// Finally, we'll craft the channel range query, using our starting
// height, then asking for all known channels to the foreseeable end of
// the main chain.
return &lnwire.QueryChannelRange{
query := &lnwire.QueryChannelRange{
ChainHash: g.cfg.chainHash,
FirstBlockHeight: startHeight,
NumBlocks: math.MaxUint32 - startHeight,
}, nil
}
g.curQueryRangeMsg = query
return query, nil
}
// replyPeerQueries is called in response to any query by the remote peer.

View File

@ -993,29 +993,87 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
t.Parallel()
t.Run("legacy", func(t *testing.T) {
testGossipSyncerProcessChanRangeReply(t, true)
})
t.Run("block ranges", func(t *testing.T) {
testGossipSyncerProcessChanRangeReply(t, false)
})
}
// testGossipSyncerProcessChanRangeReply tests that we'll properly buffer
// replied channel replies until we have the complete version. The legacy
// option, if set, uses the Complete field of the reply to determine when we've
// received all expected replies. Otherwise, it looks at the block ranges of
// each reply instead.
func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
t.Parallel()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
highestID := lnwire.ShortChannelID{
BlockHeight: latestKnownHeight,
}
_, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10), defaultEncoding, defaultChunkSize,
highestID, defaultEncoding, defaultChunkSize,
)
startingState := syncer.state
query, err := syncer.genChanRangeQuery(true)
if err != nil {
t.Fatalf("unable to generate channel range query: %v", err)
}
var replyQueries []*lnwire.QueryChannelRange
if legacy {
// Each reply query is the same as the original query in the
// legacy mode.
replyQueries = []*lnwire.QueryChannelRange{query, query, query}
} else {
// When interpreting block ranges, the first reply should start
// from our requested first block, and the last should end at
// our requested last block.
replyQueries = []*lnwire.QueryChannelRange{
{
FirstBlockHeight: 0,
NumBlocks: 11,
},
{
FirstBlockHeight: 11,
NumBlocks: 1,
},
{
FirstBlockHeight: 12,
NumBlocks: query.NumBlocks - 12,
},
}
}
replies := []*lnwire.ReplyChannelRange{
{
QueryChannelRange: *replyQueries[0],
ShortChanIDs: []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(10),
{
BlockHeight: 10,
},
},
},
{
QueryChannelRange: *replyQueries[1],
ShortChanIDs: []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(11),
{
BlockHeight: 11,
},
},
},
{
Complete: 1,
QueryChannelRange: *replyQueries[2],
Complete: 1,
ShortChanIDs: []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(12),
{
BlockHeight: 12,
},
},
},
}
@ -1036,9 +1094,15 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
}
expectedReq := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(11),
lnwire.NewShortChanIDFromInt(12),
{
BlockHeight: 10,
},
{
BlockHeight: 11,
},
{
BlockHeight: 12,
},
}
// As we're about to send the final response, we'll launch a goroutine
@ -1224,17 +1288,17 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
// First, we'll create two GossipSyncer instances with a canned
// sendToPeer message to allow us to intercept their potential sends.
startHeight := lnwire.ShortChannelID{
highestID := lnwire.ShortChannelID{
BlockHeight: 1144,
}
msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, true, false,
highestID, defaultEncoding, chunkSize, true, false,
)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, false, true,
highestID, defaultEncoding, chunkSize, false, true,
)
syncer2.Start()
defer syncer2.Stop()
@ -1264,9 +1328,10 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
// inherently disjoint.
var syncer2Chans []lnwire.ShortChannelID
for i := 0; i < numTotalChans; i++ {
syncer2Chans = append(
syncer2Chans, lnwire.NewShortChanIDFromInt(uint64(i)),
)
syncer2Chans = append(syncer2Chans, lnwire.ShortChannelID{
BlockHeight: highestID.BlockHeight - 1,
TxIndex: uint32(i),
})
}
// We'll kick off the test by asserting syncer1 sends over the
@ -1494,17 +1559,17 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
// First, we'll create two GossipSyncer instances with a canned
// sendToPeer message to allow us to intercept their potential sends.
startHeight := lnwire.ShortChannelID{
highestID := lnwire.ShortChannelID{
BlockHeight: 1144,
}
msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, true, false,
highestID, defaultEncoding, chunkSize, true, false,
)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, false, true,
highestID, defaultEncoding, chunkSize, false, true,
)
syncer2.Start()
defer syncer2.Stop()
@ -1512,9 +1577,9 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
// Although both nodes are at the same height, syncer will have 3 chan
// ID's that syncer1 doesn't know of.
syncer2Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(4),
lnwire.NewShortChanIDFromInt(5),
lnwire.NewShortChanIDFromInt(6),
{BlockHeight: highestID.BlockHeight - 3},
{BlockHeight: highestID.BlockHeight - 2},
{BlockHeight: highestID.BlockHeight - 1},
}
// We'll kick off the test by passing over the QueryChannelRange
@ -1638,35 +1703,34 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
// our chunk parsing works properly. With this value we should get 3
// queries: two full chunks, and one lingering chunk.
const chunkSize = 2
const numChans = 3
// First, we'll create two GossipSyncer instances with a canned
// sendToPeer message to allow us to intercept their potential sends.
startHeight := lnwire.ShortChannelID{
highestID := lnwire.ShortChannelID{
BlockHeight: 1144,
}
msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight, defaultEncoding, chunkSize,
highestID, defaultEncoding, chunkSize,
)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight, defaultEncoding, chunkSize,
highestID, defaultEncoding, chunkSize,
)
syncer2.Start()
defer syncer2.Stop()
// The channel state of both syncers will be identical. They should
// recognize this, and skip the sync phase below.
syncer1Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
}
syncer2Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
var syncer1Chans, syncer2Chans []lnwire.ShortChannelID
for i := numChans; i > 0; i-- {
shortChanID := lnwire.ShortChannelID{
BlockHeight: highestID.BlockHeight - uint32(i),
}
syncer1Chans = append(syncer1Chans, shortChanID)
syncer2Chans = append(syncer2Chans, shortChanID)
}
// We'll now kick off the test by allowing both side to send their