Merge pull request #1596 from wpaulino/syncer-cfg-chunk-size
discovery/syncer: store chunk size within gossiperSyncerCfg
This commit is contained in:
commit
c41d1a0a44
@ -927,11 +927,13 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipS
|
|||||||
|
|
||||||
// At this point, a syncer doesn't yet exist, so we'll create a new one
|
// At this point, a syncer doesn't yet exist, so we'll create a new one
|
||||||
// for the peer and return it to the caller.
|
// for the peer and return it to the caller.
|
||||||
|
encoding := lnwire.EncodingSortedPlain
|
||||||
syncer = newGossiperSyncer(gossipSyncerCfg{
|
syncer = newGossiperSyncer(gossipSyncerCfg{
|
||||||
chainHash: d.cfg.ChainHash,
|
chainHash: d.cfg.ChainHash,
|
||||||
syncChanUpdates: true,
|
syncChanUpdates: true,
|
||||||
channelSeries: d.cfg.ChanSeries,
|
channelSeries: d.cfg.ChanSeries,
|
||||||
encodingType: lnwire.EncodingSortedPlain,
|
encodingType: encoding,
|
||||||
|
chunkSize: encodingTypeToChunkSize[encoding],
|
||||||
sendToPeer: func(msgs ...lnwire.Message) error {
|
sendToPeer: func(msgs ...lnwire.Message) error {
|
||||||
return syncPeer.SendMessage(false, msgs...)
|
return syncPeer.SendMessage(false, msgs...)
|
||||||
},
|
},
|
||||||
@ -1242,14 +1244,15 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Creating new gossipSyncer for peer=%x",
|
log.Infof("Creating new gossipSyncer for peer=%x", nodeID[:])
|
||||||
nodeID[:])
|
|
||||||
|
|
||||||
|
encoding := lnwire.EncodingSortedPlain
|
||||||
syncer := newGossiperSyncer(gossipSyncerCfg{
|
syncer := newGossiperSyncer(gossipSyncerCfg{
|
||||||
chainHash: d.cfg.ChainHash,
|
chainHash: d.cfg.ChainHash,
|
||||||
syncChanUpdates: recvUpdates,
|
syncChanUpdates: recvUpdates,
|
||||||
channelSeries: d.cfg.ChanSeries,
|
channelSeries: d.cfg.ChanSeries,
|
||||||
encodingType: lnwire.EncodingSortedPlain,
|
encodingType: encoding,
|
||||||
|
chunkSize: encodingTypeToChunkSize[encoding],
|
||||||
sendToPeer: func(msgs ...lnwire.Message) error {
|
sendToPeer: func(msgs ...lnwire.Message) error {
|
||||||
return syncPeer.SendMessage(false, msgs...)
|
return syncPeer.SendMessage(false, msgs...)
|
||||||
},
|
},
|
||||||
|
@ -7,8 +7,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
)
|
)
|
||||||
|
|
||||||
// syncerState is an enum that represents the current state of the
|
// syncerState is an enum that represents the current state of the
|
||||||
@ -159,6 +159,10 @@ type gossipSyncerCfg struct {
|
|||||||
// with different encoding types will be rejected.
|
// with different encoding types will be rejected.
|
||||||
encodingType lnwire.ShortChanIDEncoding
|
encodingType lnwire.ShortChanIDEncoding
|
||||||
|
|
||||||
|
// chunkSize is the max number of short chan IDs using the syncer's
|
||||||
|
// encoding type that we can fit into a single message safely.
|
||||||
|
chunkSize int32
|
||||||
|
|
||||||
// sendToPeer is a function closure that should send the set of
|
// sendToPeer is a function closure that should send the set of
|
||||||
// targeted messages to the peer we've been assigned to sync the graph
|
// targeted messages to the peer we've been assigned to sync the graph
|
||||||
// state from.
|
// state from.
|
||||||
@ -445,14 +449,6 @@ func (g *gossipSyncer) channelGraphSyncer() {
|
|||||||
// required to ensure they fit into a single message. We may re-renter this
|
// required to ensure they fit into a single message. We may re-renter this
|
||||||
// state in the case that chunking is required.
|
// state in the case that chunking is required.
|
||||||
func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
|
func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
|
||||||
// Ensure that we're able to handle queries using the specified chan
|
|
||||||
// ID.
|
|
||||||
chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType]
|
|
||||||
if !ok {
|
|
||||||
return false, fmt.Errorf("unknown encoding type: %v",
|
|
||||||
g.cfg.encodingType)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we're in this state yet there are no more new channels to query
|
// If we're in this state yet there are no more new channels to query
|
||||||
// for, then we'll transition to our final synced state and return true
|
// for, then we'll transition to our final synced state and return true
|
||||||
// to signal that we're fully synchronized.
|
// to signal that we're fully synchronized.
|
||||||
@ -468,7 +464,7 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
|
|||||||
|
|
||||||
// If the number of channels to query for is less than the chunk size,
|
// If the number of channels to query for is less than the chunk size,
|
||||||
// then we can issue a single query.
|
// then we can issue a single query.
|
||||||
if int32(len(g.newChansToQuery)) < chunkSize {
|
if int32(len(g.newChansToQuery)) < g.cfg.chunkSize {
|
||||||
queryChunk = g.newChansToQuery
|
queryChunk = g.newChansToQuery
|
||||||
g.newChansToQuery = nil
|
g.newChansToQuery = nil
|
||||||
|
|
||||||
@ -476,8 +472,8 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
|
|||||||
// Otherwise, we'll need to only query for the next chunk.
|
// Otherwise, we'll need to only query for the next chunk.
|
||||||
// We'll slice into our query chunk, then slide down our main
|
// We'll slice into our query chunk, then slide down our main
|
||||||
// pointer down by the chunk size.
|
// pointer down by the chunk size.
|
||||||
queryChunk = g.newChansToQuery[:chunkSize]
|
queryChunk = g.newChansToQuery[:g.cfg.chunkSize]
|
||||||
g.newChansToQuery = g.newChansToQuery[chunkSize:]
|
g.newChansToQuery = g.newChansToQuery[g.cfg.chunkSize:]
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("gossipSyncer(%x): querying for %v new channels",
|
log.Infof("gossipSyncer(%x): querying for %v new channels",
|
||||||
@ -615,14 +611,6 @@ func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
|
|||||||
// ensure that our final fragment carries the "complete" bit to indicate the
|
// ensure that our final fragment carries the "complete" bit to indicate the
|
||||||
// end of our streaming response.
|
// end of our streaming response.
|
||||||
func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
|
func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
|
||||||
// Using the current set encoding type, we'll determine what our chunk
|
|
||||||
// size should be. If we can't locate the chunk size, then we'll return
|
|
||||||
// an error as we can't proceed.
|
|
||||||
chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("unknown encoding type: %v", g.cfg.encodingType)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+
|
log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+
|
||||||
"num_blocks=%v", g.peerPub[:], query.FirstBlockHeight,
|
"num_blocks=%v", g.peerPub[:], query.FirstBlockHeight,
|
||||||
query.NumBlocks)
|
query.NumBlocks)
|
||||||
@ -651,7 +639,7 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
|
|||||||
// We know this is the final chunk, if the difference between
|
// We know this is the final chunk, if the difference between
|
||||||
// the total number of channels, and the number of channels
|
// the total number of channels, and the number of channels
|
||||||
// we've sent is less-than-or-equal to the chunk size.
|
// we've sent is less-than-or-equal to the chunk size.
|
||||||
isFinalChunk := (numChannels - numChansSent) <= chunkSize
|
isFinalChunk := (numChannels - numChansSent) <= g.cfg.chunkSize
|
||||||
|
|
||||||
// If this is indeed the last chunk, then we'll send the
|
// If this is indeed the last chunk, then we'll send the
|
||||||
// remainder of the channels.
|
// remainder of the channels.
|
||||||
@ -664,7 +652,7 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
|
|||||||
} else {
|
} else {
|
||||||
// Otherwise, we'll only send off a fragment exactly
|
// Otherwise, we'll only send off a fragment exactly
|
||||||
// sized to the proper chunk size.
|
// sized to the proper chunk size.
|
||||||
channelChunk = channelRange[numChansSent : numChansSent+chunkSize]
|
channelChunk = channelRange[numChansSent : numChansSent+g.cfg.chunkSize]
|
||||||
|
|
||||||
log.Infof("gossipSyncer(%x): sending range chunk of "+
|
log.Infof("gossipSyncer(%x): sending range chunk of "+
|
||||||
"size=%v", g.peerPub[:], len(channelChunk))
|
"size=%v", g.peerPub[:], len(channelChunk))
|
||||||
|
@ -7,10 +7,18 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
|
||||||
"github.com/btcsuite/btcd/chaincfg"
|
"github.com/btcsuite/btcd/chaincfg"
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultEncoding = lnwire.EncodingSortedPlain
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultChunkSize = encodingTypeToChunkSize[defaultEncoding]
|
||||||
)
|
)
|
||||||
|
|
||||||
type horizonQuery struct {
|
type horizonQuery struct {
|
||||||
@ -105,13 +113,16 @@ func (m *mockChannelGraphTimeSeries) FetchChanUpdates(chain chainhash.Hash,
|
|||||||
|
|
||||||
var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil)
|
var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil)
|
||||||
|
|
||||||
func newTestSyncer(hID lnwire.ShortChannelID) (chan []lnwire.Message, *gossipSyncer, *mockChannelGraphTimeSeries) {
|
func newTestSyncer(hID lnwire.ShortChannelID,
|
||||||
|
encodingType lnwire.ShortChanIDEncoding, chunkSize int32,
|
||||||
|
) (chan []lnwire.Message, *gossipSyncer, *mockChannelGraphTimeSeries) {
|
||||||
|
|
||||||
msgChan := make(chan []lnwire.Message, 20)
|
msgChan := make(chan []lnwire.Message, 20)
|
||||||
cfg := gossipSyncerCfg{
|
cfg := gossipSyncerCfg{
|
||||||
syncChanUpdates: true,
|
syncChanUpdates: true,
|
||||||
channelSeries: newMockChannelGraphTimeSeries(hID),
|
channelSeries: newMockChannelGraphTimeSeries(hID),
|
||||||
encodingType: lnwire.EncodingSortedPlain,
|
encodingType: encodingType,
|
||||||
|
chunkSize: chunkSize,
|
||||||
sendToPeer: func(msgs ...lnwire.Message) error {
|
sendToPeer: func(msgs ...lnwire.Message) error {
|
||||||
msgChan <- msgs
|
msgChan <- msgs
|
||||||
return nil
|
return nil
|
||||||
@ -130,7 +141,8 @@ func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) {
|
|||||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||||
// message to allow us to intercept their potential sends.
|
// message to allow us to intercept their potential sends.
|
||||||
msgChan, syncer, _ := newTestSyncer(
|
msgChan, syncer, _ := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||||
|
defaultChunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
// With the syncer created, we'll create a set of messages to filter
|
// With the syncer created, we'll create a set of messages to filter
|
||||||
@ -174,7 +186,8 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
|
|||||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||||
// message to allow us to intercept their potential sends.
|
// message to allow us to intercept their potential sends.
|
||||||
msgChan, syncer, chanSeries := newTestSyncer(
|
msgChan, syncer, chanSeries := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||||
|
defaultChunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
// We'll create then apply a remote horizon for the target peer with a
|
// We'll create then apply a remote horizon for the target peer with a
|
||||||
@ -303,7 +316,8 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
|
|||||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||||
// message to allow us to intercept their potential sends.
|
// message to allow us to intercept their potential sends.
|
||||||
msgChan, syncer, chanSeries := newTestSyncer(
|
msgChan, syncer, chanSeries := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||||
|
defaultChunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
// We'll apply this gossip horizon for the remote peer.
|
// We'll apply this gossip horizon for the remote peer.
|
||||||
@ -400,7 +414,8 @@ func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) {
|
|||||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||||
// message to allow us to intercept their potential sends.
|
// message to allow us to intercept their potential sends.
|
||||||
msgChan, syncer, _ := newTestSyncer(
|
msgChan, syncer, _ := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||||
|
defaultChunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
// We'll now ask the syncer to reply to a chan ID query, but for a
|
// We'll now ask the syncer to reply to a chan ID query, but for a
|
||||||
@ -450,7 +465,8 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
|
|||||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||||
// message to allow us to intercept their potential sends.
|
// message to allow us to intercept their potential sends.
|
||||||
msgChan, syncer, chanSeries := newTestSyncer(
|
msgChan, syncer, chanSeries := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||||
|
defaultChunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
queryChanIDs := []lnwire.ShortChannelID{
|
queryChanIDs := []lnwire.ShortChannelID{
|
||||||
@ -527,42 +543,20 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerReplyChanRangeQueryUnknownEncodingType tests that if we
|
|
||||||
// receive a QueryChannelRange message with an unknown encoding type, then we
|
|
||||||
// return an error.
|
|
||||||
func TestGossipSyncerReplyChanRangeQueryUnknownEncodingType(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
|
||||||
// message to allow us to intercept their potential sends.
|
|
||||||
_, syncer, _ := newTestSyncer(
|
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
|
||||||
)
|
|
||||||
|
|
||||||
// If we modify the syncer to expect an encoding type that is currently
|
|
||||||
// unknown, then it should fail to process the message and return an
|
|
||||||
// error.
|
|
||||||
syncer.cfg.encodingType = 99
|
|
||||||
err := syncer.replyChanRangeQuery(&lnwire.QueryChannelRange{})
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("expected message fail")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestGossipSyncerReplyChanRangeQuery tests that if we receive a
|
// TestGossipSyncerReplyChanRangeQuery tests that if we receive a
|
||||||
// QueryChannelRange message, then we'll properly send back a chunked reply to
|
// QueryChannelRange message, then we'll properly send back a chunked reply to
|
||||||
// the remote peer.
|
// the remote peer.
|
||||||
func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
|
func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
// First, we'll modify the main map to provide e a smaller chunk size
|
// We'll use a smaller chunk size so we can easily test all the edge
|
||||||
// so we can easily test all the edge cases.
|
// cases.
|
||||||
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = 2
|
const chunkSize = 2
|
||||||
|
|
||||||
// We'll now create our test gossip syncer that will shortly respond to
|
// We'll now create our test gossip syncer that will shortly respond to
|
||||||
// our canned query.
|
// our canned query.
|
||||||
msgChan, syncer, chanSeries := newTestSyncer(
|
msgChan, syncer, chanSeries := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
lnwire.NewShortChanIDFromInt(10), defaultEncoding, chunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
// Next, we'll craft a query to ask for all the new chan ID's after
|
// Next, we'll craft a query to ask for all the new chan ID's after
|
||||||
@ -609,7 +603,7 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
|
|||||||
// full, while the other is the final fragment.
|
// full, while the other is the final fragment.
|
||||||
const numExpectedChunks = 3
|
const numExpectedChunks = 3
|
||||||
respMsgs := make([]lnwire.ShortChannelID, 0, 5)
|
respMsgs := make([]lnwire.ShortChannelID, 0, 5)
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < numExpectedChunks; i++ {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(time.Second * 15):
|
||||||
t.Fatalf("no msgs received")
|
t.Fatalf("no msgs received")
|
||||||
@ -658,7 +652,8 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
|
|||||||
// We'll now create our test gossip syncer that will shortly respond to
|
// We'll now create our test gossip syncer that will shortly respond to
|
||||||
// our canned query.
|
// our canned query.
|
||||||
msgChan, syncer, chanSeries := newTestSyncer(
|
msgChan, syncer, chanSeries := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||||
|
defaultChunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
// Next, we'll craft a query to ask for all the new chan ID's after
|
// Next, we'll craft a query to ask for all the new chan ID's after
|
||||||
@ -725,9 +720,8 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
|
|||||||
// message to allow us to intercept their potential sends.
|
// message to allow us to intercept their potential sends.
|
||||||
const startingHeight = 200
|
const startingHeight = 200
|
||||||
_, syncer, _ := newTestSyncer(
|
_, syncer, _ := newTestSyncer(
|
||||||
lnwire.ShortChannelID{
|
lnwire.ShortChannelID{BlockHeight: startingHeight},
|
||||||
BlockHeight: startingHeight,
|
defaultEncoding, defaultChunkSize,
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// If we now ask the syncer to generate an initial range query, it
|
// If we now ask the syncer to generate an initial range query, it
|
||||||
@ -760,7 +754,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
|||||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||||
// message to allow us to intercept their potential sends.
|
// message to allow us to intercept their potential sends.
|
||||||
_, syncer, chanSeries := newTestSyncer(
|
_, syncer, chanSeries := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
lnwire.NewShortChanIDFromInt(10), defaultEncoding, defaultChunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
startingState := syncer.state
|
startingState := syncer.state
|
||||||
@ -873,28 +867,6 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerSynchronizeChanIDsUnknownEncodingType tests that if we
|
|
||||||
// attempt to query for a set of new channels using an unknown encoding type,
|
|
||||||
// then we'll get an error.
|
|
||||||
func TestGossipSyncerSynchronizeChanIDsUnknownEncodingType(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
|
||||||
// message to allow us to intercept their potential sends.
|
|
||||||
_, syncer, _ := newTestSyncer(
|
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
|
||||||
)
|
|
||||||
|
|
||||||
// If we modify the syncer to expect an encoding type that is currently
|
|
||||||
// unknown, then it should fail to process the message and return an
|
|
||||||
// error.
|
|
||||||
syncer.cfg.encodingType = 101
|
|
||||||
_, err := syncer.synchronizeChanIDs()
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("expected message fail")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of
|
// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of
|
||||||
// the short chan ID's which were unknown to us. We'll ensure that we request
|
// the short chan ID's which were unknown to us. We'll ensure that we request
|
||||||
// chunk by chunk, and after the last chunk, we return true indicating that we
|
// chunk by chunk, and after the last chunk, we return true indicating that we
|
||||||
@ -902,10 +874,15 @@ func TestGossipSyncerSynchronizeChanIDsUnknownEncodingType(t *testing.T) {
|
|||||||
func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
|
func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
// We'll modify the chunk size to be a smaller value, so we can ensure
|
||||||
|
// our chunk parsing works properly. With this value we should get 3
|
||||||
|
// queries: two full chunks, and one lingering chunk.
|
||||||
|
const chunkSize = 2
|
||||||
|
|
||||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||||
// message to allow us to intercept their potential sends.
|
// message to allow us to intercept their potential sends.
|
||||||
msgChan, syncer, _ := newTestSyncer(
|
msgChan, syncer, _ := newTestSyncer(
|
||||||
lnwire.NewShortChanIDFromInt(10),
|
lnwire.NewShortChanIDFromInt(10), defaultEncoding, chunkSize,
|
||||||
)
|
)
|
||||||
|
|
||||||
// Next, we'll construct a set of chan ID's that we should query for,
|
// Next, we'll construct a set of chan ID's that we should query for,
|
||||||
@ -919,13 +896,7 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
syncer.newChansToQuery = newChanIDs
|
syncer.newChansToQuery = newChanIDs
|
||||||
|
|
||||||
// We'll modify the chunk size to be a smaller value, so we can ensure
|
for i := 0; i < chunkSize*2; i += 2 {
|
||||||
// our chunk parsing works properly. With this value we should get 3
|
|
||||||
// queries: two full chunks, and one lingering chunk.
|
|
||||||
chunkSize := int32(2)
|
|
||||||
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize
|
|
||||||
|
|
||||||
for i := int32(0); i < chunkSize*2; i += 2 {
|
|
||||||
// With our set up complete, we'll request a sync of chan ID's.
|
// With our set up complete, we'll request a sync of chan ID's.
|
||||||
done, err := syncer.synchronizeChanIDs()
|
done, err := syncer.synchronizeChanIDs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1019,19 +990,24 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
|
|||||||
func TestGossipSyncerRoutineSync(t *testing.T) {
|
func TestGossipSyncerRoutineSync(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
// We'll modify the chunk size to be a smaller value, so we can ensure
|
||||||
|
// our chunk parsing works properly. With this value we should get 3
|
||||||
|
// queries: two full chunks, and one lingering chunk.
|
||||||
|
const chunkSize = 2
|
||||||
|
|
||||||
// First, we'll create two gossipSyncer instances with a canned
|
// First, we'll create two gossipSyncer instances with a canned
|
||||||
// sendToPeer message to allow us to intercept their potential sends.
|
// sendToPeer message to allow us to intercept their potential sends.
|
||||||
startHeight := lnwire.ShortChannelID{
|
startHeight := lnwire.ShortChannelID{
|
||||||
BlockHeight: 1144,
|
BlockHeight: 1144,
|
||||||
}
|
}
|
||||||
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
||||||
startHeight,
|
startHeight, defaultEncoding, chunkSize,
|
||||||
)
|
)
|
||||||
syncer1.Start()
|
syncer1.Start()
|
||||||
defer syncer1.Stop()
|
defer syncer1.Stop()
|
||||||
|
|
||||||
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
||||||
startHeight,
|
startHeight, defaultEncoding, chunkSize,
|
||||||
)
|
)
|
||||||
syncer2.Start()
|
syncer2.Start()
|
||||||
defer syncer2.Stop()
|
defer syncer2.Stop()
|
||||||
@ -1049,11 +1025,6 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
|
|||||||
lnwire.NewShortChanIDFromInt(6),
|
lnwire.NewShortChanIDFromInt(6),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Before we start the test, we'll set our chunk size to 2 in order to
|
|
||||||
// make testing the chunked requests and replies easier.
|
|
||||||
chunkSize := int32(2)
|
|
||||||
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize
|
|
||||||
|
|
||||||
// We'll kick off the test by passing over the QueryChannelRange
|
// We'll kick off the test by passing over the QueryChannelRange
|
||||||
// messages from one node to the other.
|
// messages from one node to the other.
|
||||||
select {
|
select {
|
||||||
@ -1124,7 +1095,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
|
|||||||
// At this point, we'll forward the ReplyChannelRange messages to both
|
// At this point, we'll forward the ReplyChannelRange messages to both
|
||||||
// parties. Two replies are expected since the chunk size is 2, and we
|
// parties. Two replies are expected since the chunk size is 2, and we
|
||||||
// need to query for 3 channels.
|
// need to query for 3 channels.
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < chunkSize; i++ {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 2):
|
case <-time.After(time.Second * 2):
|
||||||
t.Fatalf("didn't get msg from syncer1")
|
t.Fatalf("didn't get msg from syncer1")
|
||||||
@ -1147,7 +1118,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < chunkSize; i++ {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 2):
|
case <-time.After(time.Second * 2):
|
||||||
t.Fatalf("didn't get msg from syncer2")
|
t.Fatalf("didn't get msg from syncer2")
|
||||||
@ -1190,9 +1161,9 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
|
|||||||
|
|
||||||
// At this point, both parties should start to send out initial
|
// At this point, both parties should start to send out initial
|
||||||
// requests to query the chan IDs of the remote party. As the chunk
|
// requests to query the chan IDs of the remote party. As the chunk
|
||||||
// size is 3, they'll need 2 rounds in order to fully reconcile the
|
// size is 2, they'll need 2 rounds in order to fully reconcile the
|
||||||
// state.
|
// state.
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < chunkSize; i++ {
|
||||||
// Both parties should now have sent out the initial requests
|
// Both parties should now have sent out the initial requests
|
||||||
// to query the chan IDs of the other party.
|
// to query the chan IDs of the other party.
|
||||||
select {
|
select {
|
||||||
@ -1359,28 +1330,28 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
|
|||||||
func TestGossipSyncerAlreadySynced(t *testing.T) {
|
func TestGossipSyncerAlreadySynced(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
// We'll modify the chunk size to be a smaller value, so we can ensure
|
||||||
|
// our chunk parsing works properly. With this value we should get 3
|
||||||
|
// queries: two full chunks, and one lingering chunk.
|
||||||
|
const chunkSize = 2
|
||||||
|
|
||||||
// First, we'll create two gossipSyncer instances with a canned
|
// First, we'll create two gossipSyncer instances with a canned
|
||||||
// sendToPeer message to allow us to intercept their potential sends.
|
// sendToPeer message to allow us to intercept their potential sends.
|
||||||
startHeight := lnwire.ShortChannelID{
|
startHeight := lnwire.ShortChannelID{
|
||||||
BlockHeight: 1144,
|
BlockHeight: 1144,
|
||||||
}
|
}
|
||||||
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
msgChan1, syncer1, chanSeries1 := newTestSyncer(
|
||||||
startHeight,
|
startHeight, defaultEncoding, chunkSize,
|
||||||
)
|
)
|
||||||
syncer1.Start()
|
syncer1.Start()
|
||||||
defer syncer1.Stop()
|
defer syncer1.Stop()
|
||||||
|
|
||||||
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
msgChan2, syncer2, chanSeries2 := newTestSyncer(
|
||||||
startHeight,
|
startHeight, defaultEncoding, chunkSize,
|
||||||
)
|
)
|
||||||
syncer2.Start()
|
syncer2.Start()
|
||||||
defer syncer2.Stop()
|
defer syncer2.Stop()
|
||||||
|
|
||||||
// Before we start the test, we'll set our chunk size to 2 in order to
|
|
||||||
// make testing the chunked requests and replies easier.
|
|
||||||
chunkSize := int32(2)
|
|
||||||
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize
|
|
||||||
|
|
||||||
// The channel state of both syncers will be identical. They should
|
// The channel state of both syncers will be identical. They should
|
||||||
// recognize this, and skip the sync phase below.
|
// recognize this, and skip the sync phase below.
|
||||||
syncer1Chans := []lnwire.ShortChannelID{
|
syncer1Chans := []lnwire.ShortChannelID{
|
||||||
@ -1463,7 +1434,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
|
|||||||
// Next, we'll thread through the replies of both parties. As the chunk
|
// Next, we'll thread through the replies of both parties. As the chunk
|
||||||
// size is 2, and they both know of 3 channels, it'll take two around
|
// size is 2, and they both know of 3 channels, it'll take two around
|
||||||
// and two chunks.
|
// and two chunks.
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < chunkSize; i++ {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 2):
|
case <-time.After(time.Second * 2):
|
||||||
t.Fatalf("didn't get msg from syncer1")
|
t.Fatalf("didn't get msg from syncer1")
|
||||||
@ -1486,7 +1457,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < chunkSize; i++ {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 2):
|
case <-time.After(time.Second * 2):
|
||||||
t.Fatalf("didn't get msg from syncer2")
|
t.Fatalf("didn't get msg from syncer2")
|
||||||
|
Loading…
Reference in New Issue
Block a user