Merge pull request #3836 from wpaulino/interpret-query-channel-range
discovery: use block ranges from ReplyChannelRange to determine end marker
This commit is contained in:
commit
b8f6a550a9
@ -529,12 +529,16 @@ func assertSyncerStatus(t *testing.T, s *GossipSyncer, syncState syncerState,
|
|||||||
func assertTransitionToChansSynced(t *testing.T, s *GossipSyncer, peer *mockPeer) {
|
func assertTransitionToChansSynced(t *testing.T, s *GossipSyncer, peer *mockPeer) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
assertMsgSent(t, peer, &lnwire.QueryChannelRange{
|
query := &lnwire.QueryChannelRange{
|
||||||
FirstBlockHeight: 0,
|
FirstBlockHeight: 0,
|
||||||
NumBlocks: math.MaxUint32,
|
NumBlocks: math.MaxUint32,
|
||||||
})
|
}
|
||||||
|
assertMsgSent(t, peer, query)
|
||||||
|
|
||||||
s.ProcessQueryMsg(&lnwire.ReplyChannelRange{Complete: 1}, nil)
|
s.ProcessQueryMsg(&lnwire.ReplyChannelRange{
|
||||||
|
QueryChannelRange: *query,
|
||||||
|
Complete: 1,
|
||||||
|
}, nil)
|
||||||
|
|
||||||
chanSeries := s.cfg.channelSeries.(*mockChannelGraphTimeSeries)
|
chanSeries := s.cfg.channelSeries.(*mockChannelGraphTimeSeries)
|
||||||
|
|
||||||
|
@ -298,6 +298,17 @@ type GossipSyncer struct {
|
|||||||
// received over, these will be read by the replyHandler.
|
// received over, these will be read by the replyHandler.
|
||||||
queryMsgs chan lnwire.Message
|
queryMsgs chan lnwire.Message
|
||||||
|
|
||||||
|
// curQueryRangeMsg keeps track of the latest QueryChannelRange message
|
||||||
|
// we've sent to a peer to ensure we've consumed all expected replies.
|
||||||
|
// This field is primarily used within the waitingQueryChanReply state.
|
||||||
|
curQueryRangeMsg *lnwire.QueryChannelRange
|
||||||
|
|
||||||
|
// prevReplyChannelRange keeps track of the previous ReplyChannelRange
|
||||||
|
// message we've received from a peer to ensure they've fully replied to
|
||||||
|
// our query by ensuring they covered our requested block range. This
|
||||||
|
// field is primarily used within the waitingQueryChanReply state.
|
||||||
|
prevReplyChannelRange *lnwire.ReplyChannelRange
|
||||||
|
|
||||||
// bufferedChanRangeReplies is used in the waitingQueryChanReply to
|
// bufferedChanRangeReplies is used in the waitingQueryChanReply to
|
||||||
// buffer all the chunked response to our query.
|
// buffer all the chunked response to our query.
|
||||||
bufferedChanRangeReplies []lnwire.ShortChannelID
|
bufferedChanRangeReplies []lnwire.ShortChannelID
|
||||||
@ -666,10 +677,64 @@ func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
|
||||||
|
// considered legacy. There was a point where lnd used to include the same query
|
||||||
|
// over multiple replies, rather than including the portion of the query the
|
||||||
|
// reply is handling. We'll use this as a way of detecting whether we are
|
||||||
|
// communicating with a legacy node so we can properly sync with them.
|
||||||
|
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
|
||||||
|
reply *lnwire.ReplyChannelRange) bool {
|
||||||
|
|
||||||
|
return reply.QueryChannelRange == *query
|
||||||
|
}
|
||||||
|
|
||||||
// processChanRangeReply is called each time the GossipSyncer receives a new
|
// processChanRangeReply is called each time the GossipSyncer receives a new
|
||||||
// reply to the initial range query to discover new channels that it didn't
|
// reply to the initial range query to discover new channels that it didn't
|
||||||
// previously know of.
|
// previously know of.
|
||||||
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
|
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
|
||||||
|
// If we're not communicating with a legacy node, we'll apply some
|
||||||
|
// further constraints on their reply to ensure it satisfies our query.
|
||||||
|
if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
|
||||||
|
// The first block should be within our original request.
|
||||||
|
if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
|
||||||
|
return fmt.Errorf("reply includes channels for height "+
|
||||||
|
"%v prior to query %v", msg.FirstBlockHeight,
|
||||||
|
g.curQueryRangeMsg.FirstBlockHeight)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The last block should also be. We don't need to check the
|
||||||
|
// intermediate ones because they should already be in sorted
|
||||||
|
// order.
|
||||||
|
replyLastHeight := msg.QueryChannelRange.LastBlockHeight()
|
||||||
|
queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
|
||||||
|
if replyLastHeight > queryLastHeight {
|
||||||
|
return fmt.Errorf("reply includes channels for height "+
|
||||||
|
"%v after query %v", replyLastHeight,
|
||||||
|
queryLastHeight)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we've previously received a reply for this query, look at
|
||||||
|
// its last block to ensure the current reply properly follows
|
||||||
|
// it.
|
||||||
|
if g.prevReplyChannelRange != nil {
|
||||||
|
prevReply := g.prevReplyChannelRange
|
||||||
|
prevReplyLastHeight := prevReply.LastBlockHeight()
|
||||||
|
|
||||||
|
// The current reply can either start from the previous
|
||||||
|
// reply's last block, if there are still more channels
|
||||||
|
// for the same block, or the block after.
|
||||||
|
if msg.FirstBlockHeight != prevReplyLastHeight &&
|
||||||
|
msg.FirstBlockHeight != prevReplyLastHeight+1 {
|
||||||
|
|
||||||
|
return fmt.Errorf("first block of reply %v "+
|
||||||
|
"does not continue from last block of "+
|
||||||
|
"previous %v", msg.FirstBlockHeight,
|
||||||
|
prevReplyLastHeight)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
g.prevReplyChannelRange = msg
|
||||||
g.bufferedChanRangeReplies = append(
|
g.bufferedChanRangeReplies = append(
|
||||||
g.bufferedChanRangeReplies, msg.ShortChanIDs...,
|
g.bufferedChanRangeReplies, msg.ShortChanIDs...,
|
||||||
)
|
)
|
||||||
@ -679,8 +744,25 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
|
|||||||
|
|
||||||
// If this isn't the last response, then we can exit as we've already
|
// If this isn't the last response, then we can exit as we've already
|
||||||
// buffered the latest portion of the streaming reply.
|
// buffered the latest portion of the streaming reply.
|
||||||
if msg.Complete == 0 {
|
switch {
|
||||||
return nil
|
// If we're communicating with a legacy node, we'll need to look at the
|
||||||
|
// complete field.
|
||||||
|
case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
|
||||||
|
if msg.Complete == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, we'll look at the reply's height range.
|
||||||
|
default:
|
||||||
|
replyLastHeight := msg.QueryChannelRange.LastBlockHeight()
|
||||||
|
queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
|
||||||
|
|
||||||
|
// TODO(wilmer): This might require some padding if the remote
|
||||||
|
// node is not aware of the last height we sent them, i.e., is
|
||||||
|
// behind a few blocks from us.
|
||||||
|
if replyLastHeight < queryLastHeight {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("GossipSyncer(%x): filtering through %v chans",
|
log.Infof("GossipSyncer(%x): filtering through %v chans",
|
||||||
@ -696,8 +778,10 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
// As we've received the entirety of the reply, we no longer need to
|
// As we've received the entirety of the reply, we no longer need to
|
||||||
// hold on to the set of buffered replies, so we'll let that be garbage
|
// hold on to the set of buffered replies or the original query that
|
||||||
// collected now.
|
// prompted the replies, so we'll let that be garbage collected now.
|
||||||
|
g.curQueryRangeMsg = nil
|
||||||
|
g.prevReplyChannelRange = nil
|
||||||
g.bufferedChanRangeReplies = nil
|
g.bufferedChanRangeReplies = nil
|
||||||
|
|
||||||
// If there aren't any channels that we don't know of, then we can
|
// If there aren't any channels that we don't know of, then we can
|
||||||
@ -757,11 +841,14 @@ func (g *GossipSyncer) genChanRangeQuery(
|
|||||||
// Finally, we'll craft the channel range query, using our starting
|
// Finally, we'll craft the channel range query, using our starting
|
||||||
// height, then asking for all known channels to the foreseeable end of
|
// height, then asking for all known channels to the foreseeable end of
|
||||||
// the main chain.
|
// the main chain.
|
||||||
return &lnwire.QueryChannelRange{
|
query := &lnwire.QueryChannelRange{
|
||||||
ChainHash: g.cfg.chainHash,
|
ChainHash: g.cfg.chainHash,
|
||||||
FirstBlockHeight: startHeight,
|
FirstBlockHeight: startHeight,
|
||||||
NumBlocks: math.MaxUint32 - startHeight,
|
NumBlocks: math.MaxUint32 - startHeight,
|
||||||
}, nil
|
}
|
||||||
|
g.curQueryRangeMsg = query
|
||||||
|
|
||||||
|
return query, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// replyPeerQueries is called in response to any query by the remote peer.
|
// replyPeerQueries is called in response to any query by the remote peer.
|
||||||
@ -814,8 +901,9 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
|
|||||||
// Next, we'll consult the time series to obtain the set of known
|
// Next, we'll consult the time series to obtain the set of known
|
||||||
// channel ID's that match their query.
|
// channel ID's that match their query.
|
||||||
startBlock := query.FirstBlockHeight
|
startBlock := query.FirstBlockHeight
|
||||||
|
endBlock := startBlock + query.NumBlocks - 1
|
||||||
channelRange, err := g.cfg.channelSeries.FilterChannelRange(
|
channelRange, err := g.cfg.channelSeries.FilterChannelRange(
|
||||||
query.ChainHash, startBlock, startBlock+query.NumBlocks,
|
query.ChainHash, startBlock, endBlock,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -824,13 +912,13 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
|
|||||||
// TODO(roasbeef): means can't send max uint above?
|
// TODO(roasbeef): means can't send max uint above?
|
||||||
// * or make internal 64
|
// * or make internal 64
|
||||||
|
|
||||||
// In the base case (no actual response) the first block and the last
|
// In the base case (no actual response) the first block and last block
|
||||||
// block in the query will be the same. In the loop below, we'll update
|
// will match those of the query. In the loop below, we'll update these
|
||||||
// these two variables incrementally with each chunk to properly
|
// two variables incrementally with each chunk to properly compute the
|
||||||
// compute the starting block for each response and the number of
|
// starting block for each response and the number of blocks in a
|
||||||
// blocks in a response.
|
// response.
|
||||||
firstBlockHeight := query.FirstBlockHeight
|
firstBlockHeight := startBlock
|
||||||
lastBlockHeight := query.FirstBlockHeight
|
lastBlockHeight := endBlock
|
||||||
|
|
||||||
numChannels := int32(len(channelRange))
|
numChannels := int32(len(channelRange))
|
||||||
numChansSent := int32(0)
|
numChansSent := int32(0)
|
||||||
@ -866,8 +954,25 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
|
|||||||
// update our pointers to the first and last blocks for each
|
// update our pointers to the first and last blocks for each
|
||||||
// response.
|
// response.
|
||||||
if len(channelChunk) > 0 {
|
if len(channelChunk) > 0 {
|
||||||
firstBlockHeight = channelChunk[0].BlockHeight
|
// If this is the first response we'll send, we'll point
|
||||||
lastBlockHeight = channelChunk[len(channelChunk)-1].BlockHeight
|
// the first block to the first block in the query.
|
||||||
|
// Otherwise, we'll continue from the block we left off
|
||||||
|
// at.
|
||||||
|
if numChansSent == 0 {
|
||||||
|
firstBlockHeight = startBlock
|
||||||
|
} else {
|
||||||
|
firstBlockHeight = lastBlockHeight
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this is the last response we'll send, we'll point
|
||||||
|
// the last block to the last block of the query.
|
||||||
|
// Otherwise, we'll set it to the height of the last
|
||||||
|
// channel in the chunk.
|
||||||
|
if isFinalChunk {
|
||||||
|
lastBlockHeight = endBlock
|
||||||
|
} else {
|
||||||
|
lastBlockHeight = channelChunk[len(channelChunk)-1].BlockHeight
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// The number of blocks contained in this response (the total
|
// The number of blocks contained in this response (the total
|
||||||
|
@ -709,10 +709,12 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
|
|||||||
|
|
||||||
// Next, we'll craft a query to ask for all the new chan ID's after
|
// Next, we'll craft a query to ask for all the new chan ID's after
|
||||||
// block 100.
|
// block 100.
|
||||||
const startingBlockHeight int = 100
|
const startingBlockHeight = 100
|
||||||
|
const numBlocks = 50
|
||||||
|
const endingBlockHeight = startingBlockHeight + numBlocks - 1
|
||||||
query := &lnwire.QueryChannelRange{
|
query := &lnwire.QueryChannelRange{
|
||||||
FirstBlockHeight: uint32(startingBlockHeight),
|
FirstBlockHeight: uint32(startingBlockHeight),
|
||||||
NumBlocks: 50,
|
NumBlocks: uint32(numBlocks),
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll then launch a goroutine to reply to the query with a set of 5
|
// We'll then launch a goroutine to reply to the query with a set of 5
|
||||||
@ -744,8 +746,11 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
|
|||||||
return
|
return
|
||||||
case filterReq := <-chanSeries.filterRangeReqs:
|
case filterReq := <-chanSeries.filterRangeReqs:
|
||||||
// We should be querying for block 100 to 150.
|
// We should be querying for block 100 to 150.
|
||||||
if filterReq.startHeight != 100 && filterReq.endHeight != 150 {
|
if filterReq.startHeight != startingBlockHeight &&
|
||||||
errCh <- fmt.Errorf("wrong height range: %v", spew.Sdump(filterReq))
|
filterReq.endHeight != endingBlockHeight {
|
||||||
|
|
||||||
|
errCh <- fmt.Errorf("wrong height range: %v",
|
||||||
|
spew.Sdump(filterReq))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -778,52 +783,55 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
|
|||||||
t.Fatalf("expected ReplyChannelRange instead got %T", msg)
|
t.Fatalf("expected ReplyChannelRange instead got %T", msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only for the first iteration do we set the offset to
|
// We'll determine the correct values of each field in
|
||||||
// zero as no chunks have been processed yet. For every
|
// each response based on the order that they were sent.
|
||||||
// other iteration, we want to move forward by the
|
var (
|
||||||
// chunkSize (from the staring block height).
|
expectedFirstBlockHeight uint32
|
||||||
offset := 0
|
expectedNumBlocks uint32
|
||||||
if i != 0 {
|
expectedComplete uint8
|
||||||
offset = 1
|
)
|
||||||
}
|
|
||||||
expectedFirstBlockHeight := (i+offset)*2 + startingBlockHeight
|
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
// If this is not the last chunk, then Complete should
|
// The first reply should range from our starting block
|
||||||
// be set to zero. Otherwise, it should be one.
|
// height until it reaches its maximum capacity of
|
||||||
case i < 2 && rangeResp.Complete != 0:
|
// channels.
|
||||||
t.Fatalf("non-final chunk should have "+
|
case i == 0:
|
||||||
"Complete=0: %v", spew.Sdump(rangeResp))
|
expectedFirstBlockHeight = startingBlockHeight
|
||||||
|
expectedNumBlocks = chunkSize + 1
|
||||||
|
|
||||||
case i < 2 && rangeResp.NumBlocks != chunkSize+1:
|
// The last reply should range starting from the next
|
||||||
t.Fatalf("NumBlocks fields in resp "+
|
// block of our previous reply up until the ending
|
||||||
"incorrect: expected %v got %v",
|
// height of the query. It should also have the Complete
|
||||||
chunkSize+1, rangeResp.NumBlocks)
|
// bit set.
|
||||||
|
case i == numExpectedChunks-1:
|
||||||
|
expectedFirstBlockHeight = respMsgs[len(respMsgs)-1].BlockHeight
|
||||||
|
expectedNumBlocks = endingBlockHeight - expectedFirstBlockHeight + 1
|
||||||
|
expectedComplete = 1
|
||||||
|
|
||||||
case i < 2 && rangeResp.FirstBlockHeight !=
|
// Any intermediate replies should range starting from
|
||||||
uint32(expectedFirstBlockHeight):
|
// the next block of our previous reply up until it
|
||||||
|
// reaches its maximum capacity of channels.
|
||||||
|
default:
|
||||||
|
expectedFirstBlockHeight = respMsgs[len(respMsgs)-1].BlockHeight
|
||||||
|
expectedNumBlocks = 5
|
||||||
|
}
|
||||||
|
|
||||||
t.Fatalf("FirstBlockHeight incorrect: "+
|
switch {
|
||||||
"expected %v got %v",
|
case rangeResp.FirstBlockHeight != expectedFirstBlockHeight:
|
||||||
rangeResp.FirstBlockHeight,
|
t.Fatalf("FirstBlockHeight in resp #%d "+
|
||||||
expectedFirstBlockHeight)
|
"incorrect: expected %v, got %v", i+1,
|
||||||
case i == 2 && rangeResp.Complete != 1:
|
expectedFirstBlockHeight,
|
||||||
t.Fatalf("final chunk should have "+
|
rangeResp.FirstBlockHeight)
|
||||||
"Complete=1: %v", spew.Sdump(rangeResp))
|
|
||||||
|
|
||||||
case i == 2 && rangeResp.NumBlocks != 1:
|
case rangeResp.NumBlocks != expectedNumBlocks:
|
||||||
t.Fatalf("NumBlocks fields in resp "+
|
t.Fatalf("NumBlocks in resp #%d incorrect: "+
|
||||||
"incorrect: expected %v got %v", 1,
|
"expected %v, got %v", i+1,
|
||||||
rangeResp.NumBlocks)
|
expectedNumBlocks, rangeResp.NumBlocks)
|
||||||
|
|
||||||
case i == 2 && rangeResp.FirstBlockHeight !=
|
|
||||||
queryResp[len(queryResp)-1].BlockHeight:
|
|
||||||
|
|
||||||
t.Fatalf("FirstBlockHeight incorrect: "+
|
|
||||||
"expected %v got %v",
|
|
||||||
rangeResp.FirstBlockHeight,
|
|
||||||
queryResp[len(queryResp)-1].BlockHeight)
|
|
||||||
|
|
||||||
|
case rangeResp.Complete != expectedComplete:
|
||||||
|
t.Fatalf("Complete in resp #%d incorrect: "+
|
||||||
|
"expected %v, got %v", i+1,
|
||||||
|
expectedNumBlocks, rangeResp.Complete)
|
||||||
}
|
}
|
||||||
|
|
||||||
respMsgs = append(respMsgs, rangeResp.ShortChanIDs...)
|
respMsgs = append(respMsgs, rangeResp.ShortChanIDs...)
|
||||||
@ -981,35 +989,91 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerProcessChanRangeReply tests that we'll properly buffer
|
// TestGossipSyncerProcessChanRangeReply tests that we'll properly buffer
|
||||||
// replied channel replies until we have the complete version. If no new
|
// replied channel replies until we have the complete version.
|
||||||
// channels were discovered, then we should go directly to the chanSsSynced
|
|
||||||
// state. Otherwise, we should go to the queryNewChannels states.
|
|
||||||
func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("legacy", func(t *testing.T) {
|
||||||
|
testGossipSyncerProcessChanRangeReply(t, true)
|
||||||
|
})
|
||||||
|
t.Run("block ranges", func(t *testing.T) {
|
||||||
|
testGossipSyncerProcessChanRangeReply(t, false)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// testGossipSyncerProcessChanRangeReply tests that we'll properly buffer
|
||||||
|
// replied channel replies until we have the complete version. The legacy
|
||||||
|
// option, if set, uses the Complete field of the reply to determine when we've
|
||||||
|
// received all expected replies. Otherwise, it looks at the block ranges of
|
||||||
|
// each reply instead.
|
||||||
|
func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
||||||
// message to allow us to intercept their potential sends.
|
// message to allow us to intercept their potential sends.
|
||||||
|
highestID := lnwire.ShortChannelID{
|
||||||
|
BlockHeight: latestKnownHeight,
|
||||||
|
}
|
||||||
_, syncer, chanSeries := newTestSyncer(
|
_, syncer, chanSeries := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10), defaultEncoding, defaultChunkSize,
|
highestID, defaultEncoding, defaultChunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
startingState := syncer.state
|
startingState := syncer.state
|
||||||
|
|
||||||
|
query, err := syncer.genChanRangeQuery(true)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to generate channel range query: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var replyQueries []*lnwire.QueryChannelRange
|
||||||
|
if legacy {
|
||||||
|
// Each reply query is the same as the original query in the
|
||||||
|
// legacy mode.
|
||||||
|
replyQueries = []*lnwire.QueryChannelRange{query, query, query}
|
||||||
|
} else {
|
||||||
|
// When interpreting block ranges, the first reply should start
|
||||||
|
// from our requested first block, and the last should end at
|
||||||
|
// our requested last block.
|
||||||
|
replyQueries = []*lnwire.QueryChannelRange{
|
||||||
|
{
|
||||||
|
FirstBlockHeight: 0,
|
||||||
|
NumBlocks: 11,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
FirstBlockHeight: 11,
|
||||||
|
NumBlocks: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
FirstBlockHeight: 12,
|
||||||
|
NumBlocks: query.NumBlocks - 12,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
replies := []*lnwire.ReplyChannelRange{
|
replies := []*lnwire.ReplyChannelRange{
|
||||||
{
|
{
|
||||||
|
QueryChannelRange: *replyQueries[0],
|
||||||
ShortChanIDs: []lnwire.ShortChannelID{
|
ShortChanIDs: []lnwire.ShortChannelID{
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
{
|
||||||
|
BlockHeight: 10,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
QueryChannelRange: *replyQueries[1],
|
||||||
ShortChanIDs: []lnwire.ShortChannelID{
|
ShortChanIDs: []lnwire.ShortChannelID{
|
||||||
lnwire.NewShortChanIDFromInt(11),
|
{
|
||||||
|
BlockHeight: 11,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Complete: 1,
|
QueryChannelRange: *replyQueries[2],
|
||||||
|
Complete: 1,
|
||||||
ShortChanIDs: []lnwire.ShortChannelID{
|
ShortChanIDs: []lnwire.ShortChannelID{
|
||||||
lnwire.NewShortChanIDFromInt(12),
|
{
|
||||||
|
BlockHeight: 12,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -1030,9 +1094,15 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
expectedReq := []lnwire.ShortChannelID{
|
expectedReq := []lnwire.ShortChannelID{
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
{
|
||||||
lnwire.NewShortChanIDFromInt(11),
|
BlockHeight: 10,
|
||||||
lnwire.NewShortChanIDFromInt(12),
|
},
|
||||||
|
{
|
||||||
|
BlockHeight: 11,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
BlockHeight: 12,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// As we're about to send the final response, we'll launch a goroutine
|
// As we're about to send the final response, we'll launch a goroutine
|
||||||
@ -1083,48 +1153,6 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll repeat our final reply again, but this time we won't send any
|
|
||||||
// new channels. As a result, we should transition over to the
|
|
||||||
// chansSynced state.
|
|
||||||
errCh = make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Second * 15):
|
|
||||||
errCh <- errors.New("no query received")
|
|
||||||
return
|
|
||||||
case req := <-chanSeries.filterReq:
|
|
||||||
// We should get a request for the entire range of short
|
|
||||||
// chan ID's.
|
|
||||||
if !reflect.DeepEqual(expectedReq[2], req[0]) {
|
|
||||||
errCh <- fmt.Errorf("wrong request: expected %v, got %v",
|
|
||||||
expectedReq[2], req[0])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// We'll send back only the last two to simulate filtering.
|
|
||||||
chanSeries.filterResp <- []lnwire.ShortChannelID{}
|
|
||||||
errCh <- nil
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
if err := syncer.processChanRangeReply(replies[2]); err != nil {
|
|
||||||
t.Fatalf("unable to process reply: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if syncer.syncState() != chansSynced {
|
|
||||||
t.Fatalf("wrong state: expected %v instead got %v",
|
|
||||||
chansSynced, syncer.state)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for error from goroutine.
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Second * 30):
|
|
||||||
t.Fatalf("goroutine did not return within 30 seconds")
|
|
||||||
case err := <-errCh:
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of
|
// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of
|
||||||
@ -1260,17 +1288,17 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
|
|||||||
|
|
||||||
// First, we'll create two GossipSyncer instances with a canned
|
// First, we'll create two GossipSyncer instances with a canned
|
||||||
// sendToPeer message to allow us to intercept their potential sends.
|
// sendToPeer message to allow us to intercept their potential sends.
|
||||||
startHeight := lnwire.ShortChannelID{
|
highestID := lnwire.ShortChannelID{
|
||||||
BlockHeight: 1144,
|
BlockHeight: 1144,
|
||||||
}
|
}
|
||||||
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
||||||
startHeight, defaultEncoding, chunkSize, true, false,
|
highestID, defaultEncoding, chunkSize, true, false,
|
||||||
)
|
)
|
||||||
syncer1.Start()
|
syncer1.Start()
|
||||||
defer syncer1.Stop()
|
defer syncer1.Stop()
|
||||||
|
|
||||||
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
||||||
startHeight, defaultEncoding, chunkSize, false, true,
|
highestID, defaultEncoding, chunkSize, false, true,
|
||||||
)
|
)
|
||||||
syncer2.Start()
|
syncer2.Start()
|
||||||
defer syncer2.Stop()
|
defer syncer2.Stop()
|
||||||
@ -1300,9 +1328,10 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
|
|||||||
// inherently disjoint.
|
// inherently disjoint.
|
||||||
var syncer2Chans []lnwire.ShortChannelID
|
var syncer2Chans []lnwire.ShortChannelID
|
||||||
for i := 0; i < numTotalChans; i++ {
|
for i := 0; i < numTotalChans; i++ {
|
||||||
syncer2Chans = append(
|
syncer2Chans = append(syncer2Chans, lnwire.ShortChannelID{
|
||||||
syncer2Chans, lnwire.NewShortChanIDFromInt(uint64(i)),
|
BlockHeight: highestID.BlockHeight - 1,
|
||||||
)
|
TxIndex: uint32(i),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll kick off the test by asserting syncer1 sends over the
|
// We'll kick off the test by asserting syncer1 sends over the
|
||||||
@ -1530,17 +1559,17 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
|
|||||||
|
|
||||||
// First, we'll create two GossipSyncer instances with a canned
|
// First, we'll create two GossipSyncer instances with a canned
|
||||||
// sendToPeer message to allow us to intercept their potential sends.
|
// sendToPeer message to allow us to intercept their potential sends.
|
||||||
startHeight := lnwire.ShortChannelID{
|
highestID := lnwire.ShortChannelID{
|
||||||
BlockHeight: 1144,
|
BlockHeight: 1144,
|
||||||
}
|
}
|
||||||
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
||||||
startHeight, defaultEncoding, chunkSize, true, false,
|
highestID, defaultEncoding, chunkSize, true, false,
|
||||||
)
|
)
|
||||||
syncer1.Start()
|
syncer1.Start()
|
||||||
defer syncer1.Stop()
|
defer syncer1.Stop()
|
||||||
|
|
||||||
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
||||||
startHeight, defaultEncoding, chunkSize, false, true,
|
highestID, defaultEncoding, chunkSize, false, true,
|
||||||
)
|
)
|
||||||
syncer2.Start()
|
syncer2.Start()
|
||||||
defer syncer2.Stop()
|
defer syncer2.Stop()
|
||||||
@ -1548,9 +1577,9 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
|
|||||||
// Although both nodes are at the same height, syncer will have 3 chan
|
// Although both nodes are at the same height, syncer will have 3 chan
|
||||||
// ID's that syncer1 doesn't know of.
|
// ID's that syncer1 doesn't know of.
|
||||||
syncer2Chans := []lnwire.ShortChannelID{
|
syncer2Chans := []lnwire.ShortChannelID{
|
||||||
lnwire.NewShortChanIDFromInt(4),
|
{BlockHeight: highestID.BlockHeight - 3},
|
||||||
lnwire.NewShortChanIDFromInt(5),
|
{BlockHeight: highestID.BlockHeight - 2},
|
||||||
lnwire.NewShortChanIDFromInt(6),
|
{BlockHeight: highestID.BlockHeight - 1},
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll kick off the test by passing over the QueryChannelRange
|
// We'll kick off the test by passing over the QueryChannelRange
|
||||||
@ -1674,35 +1703,34 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
|
|||||||
// our chunk parsing works properly. With this value we should get 3
|
// our chunk parsing works properly. With this value we should get 3
|
||||||
// queries: two full chunks, and one lingering chunk.
|
// queries: two full chunks, and one lingering chunk.
|
||||||
const chunkSize = 2
|
const chunkSize = 2
|
||||||
|
const numChans = 3
|
||||||
|
|
||||||
// First, we'll create two GossipSyncer instances with a canned
|
// First, we'll create two GossipSyncer instances with a canned
|
||||||
// sendToPeer message to allow us to intercept their potential sends.
|
// sendToPeer message to allow us to intercept their potential sends.
|
||||||
startHeight := lnwire.ShortChannelID{
|
highestID := lnwire.ShortChannelID{
|
||||||
BlockHeight: 1144,
|
BlockHeight: 1144,
|
||||||
}
|
}
|
||||||
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
||||||
startHeight, defaultEncoding, chunkSize,
|
highestID, defaultEncoding, chunkSize,
|
||||||
)
|
)
|
||||||
syncer1.Start()
|
syncer1.Start()
|
||||||
defer syncer1.Stop()
|
defer syncer1.Stop()
|
||||||
|
|
||||||
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
||||||
startHeight, defaultEncoding, chunkSize,
|
highestID, defaultEncoding, chunkSize,
|
||||||
)
|
)
|
||||||
syncer2.Start()
|
syncer2.Start()
|
||||||
defer syncer2.Stop()
|
defer syncer2.Stop()
|
||||||
|
|
||||||
// The channel state of both syncers will be identical. They should
|
// The channel state of both syncers will be identical. They should
|
||||||
// recognize this, and skip the sync phase below.
|
// recognize this, and skip the sync phase below.
|
||||||
syncer1Chans := []lnwire.ShortChannelID{
|
var syncer1Chans, syncer2Chans []lnwire.ShortChannelID
|
||||||
lnwire.NewShortChanIDFromInt(1),
|
for i := numChans; i > 0; i-- {
|
||||||
lnwire.NewShortChanIDFromInt(2),
|
shortChanID := lnwire.ShortChannelID{
|
||||||
lnwire.NewShortChanIDFromInt(3),
|
BlockHeight: highestID.BlockHeight - uint32(i),
|
||||||
}
|
}
|
||||||
syncer2Chans := []lnwire.ShortChannelID{
|
syncer1Chans = append(syncer1Chans, shortChanID)
|
||||||
lnwire.NewShortChanIDFromInt(1),
|
syncer2Chans = append(syncer2Chans, shortChanID)
|
||||||
lnwire.NewShortChanIDFromInt(2),
|
|
||||||
lnwire.NewShortChanIDFromInt(3),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll now kick off the test by allowing both side to send their
|
// We'll now kick off the test by allowing both side to send their
|
||||||
|
@ -2,6 +2,7 @@ package lnwire
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
)
|
)
|
||||||
@ -75,3 +76,14 @@ func (q *QueryChannelRange) MaxPayloadLength(uint32) uint32 {
|
|||||||
// 32 + 4 + 4
|
// 32 + 4 + 4
|
||||||
return 40
|
return 40
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LastBlockHeight returns the last block height covered by the range of a
|
||||||
|
// QueryChannelRange message.
|
||||||
|
func (q *QueryChannelRange) LastBlockHeight() uint32 {
|
||||||
|
// Handle overflows by casting to uint64.
|
||||||
|
lastBlockHeight := uint64(q.FirstBlockHeight) + uint64(q.NumBlocks) - 1
|
||||||
|
if lastBlockHeight > math.MaxUint32 {
|
||||||
|
return math.MaxUint32
|
||||||
|
}
|
||||||
|
return uint32(lastBlockHeight)
|
||||||
|
}
|
||||||
|
10
peer.go
10
peer.go
@ -1336,8 +1336,10 @@ func messageSummary(msg lnwire.Message) string {
|
|||||||
msg.Complete)
|
msg.Complete)
|
||||||
|
|
||||||
case *lnwire.ReplyChannelRange:
|
case *lnwire.ReplyChannelRange:
|
||||||
return fmt.Sprintf("complete=%v, encoding=%v, num_chans=%v",
|
return fmt.Sprintf("start_height=%v, end_height=%v, "+
|
||||||
msg.Complete, msg.EncodingType, len(msg.ShortChanIDs))
|
"num_chans=%v, encoding=%v", msg.FirstBlockHeight,
|
||||||
|
msg.LastBlockHeight(), len(msg.ShortChanIDs),
|
||||||
|
msg.EncodingType)
|
||||||
|
|
||||||
case *lnwire.QueryShortChanIDs:
|
case *lnwire.QueryShortChanIDs:
|
||||||
return fmt.Sprintf("chain_hash=%v, encoding=%v, num_chans=%v",
|
return fmt.Sprintf("chain_hash=%v, encoding=%v, num_chans=%v",
|
||||||
@ -1345,8 +1347,8 @@ func messageSummary(msg lnwire.Message) string {
|
|||||||
|
|
||||||
case *lnwire.QueryChannelRange:
|
case *lnwire.QueryChannelRange:
|
||||||
return fmt.Sprintf("chain_hash=%v, start_height=%v, "+
|
return fmt.Sprintf("chain_hash=%v, start_height=%v, "+
|
||||||
"num_blocks=%v", msg.ChainHash, msg.FirstBlockHeight,
|
"end_height=%v", msg.ChainHash, msg.FirstBlockHeight,
|
||||||
msg.NumBlocks)
|
msg.LastBlockHeight())
|
||||||
|
|
||||||
case *lnwire.GossipTimestampRange:
|
case *lnwire.GossipTimestampRange:
|
||||||
return fmt.Sprintf("chain_hash=%v, first_stamp=%v, "+
|
return fmt.Sprintf("chain_hash=%v, first_stamp=%v, "+
|
||||||
|
Loading…
Reference in New Issue
Block a user