discovery/syncer: store chunk size within gossiperSyncerCfg

In this commit, we allow the gossiper syncer to store the chunk size for
its respective encoding type. We do this to prevent a race condition
that would arise within the unit tests by modifying the values of the
encodingTypeToChunkSize map to allow for easier testing.
This commit is contained in:
Wilmer Paulino 2018-07-20 16:52:01 -07:00
parent 205a32380a
commit c61b037628
3 changed files with 79 additions and 117 deletions

View File

@ -921,11 +921,13 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipS
// At this point, a syncer doesn't yet exist, so we'll create a new one
// for the peer and return it to the caller.
encoding := lnwire.EncodingSortedPlain
syncer = newGossiperSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash,
syncChanUpdates: true,
channelSeries: d.cfg.ChanSeries,
encodingType: lnwire.EncodingSortedPlain,
encodingType: encoding,
chunkSize: encodingTypeToChunkSize[encoding],
sendToPeer: func(msgs ...lnwire.Message) error {
return syncPeer.SendMessage(false, msgs...)
},
@ -1236,14 +1238,15 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates
return
}
log.Infof("Creating new gossipSyncer for peer=%x",
nodeID[:])
log.Infof("Creating new gossipSyncer for peer=%x", nodeID[:])
encoding := lnwire.EncodingSortedPlain
syncer := newGossiperSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash,
syncChanUpdates: recvUpdates,
channelSeries: d.cfg.ChanSeries,
encodingType: lnwire.EncodingSortedPlain,
encodingType: encoding,
chunkSize: encodingTypeToChunkSize[encoding],
sendToPeer: func(msgs ...lnwire.Message) error {
return syncPeer.SendMessage(false, msgs...)
},

View File

@ -7,8 +7,8 @@ import (
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/lnwire"
)
// syncerState is an enum that represents the current state of the
@ -159,6 +159,10 @@ type gossipSyncerCfg struct {
// with different encoding types will be rejected.
encodingType lnwire.ShortChanIDEncoding
// chunkSize is the max number of short chan IDs using the syncer's
// encoding type that we can fit into a single message safely.
chunkSize int32
// sendToPeer is a function closure that should send the set of
// targeted messages to the peer we've been assigned to sync the graph
// state from.
@ -445,14 +449,6 @@ func (g *gossipSyncer) channelGraphSyncer() {
// required to ensure they fit into a single message. We may re-renter this
// state in the case that chunking is required.
func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
// Ensure that we're able to handle queries using the specified chan
// ID.
chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType]
if !ok {
return false, fmt.Errorf("unknown encoding type: %v",
g.cfg.encodingType)
}
// If we're in this state yet there are no more new channels to query
// for, then we'll transition to our final synced state and return true
// to signal that we're fully synchronized.
@ -468,7 +464,7 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
// If the number of channels to query for is less than the chunk size,
// then we can issue a single query.
if int32(len(g.newChansToQuery)) < chunkSize {
if int32(len(g.newChansToQuery)) < g.cfg.chunkSize {
queryChunk = g.newChansToQuery
g.newChansToQuery = nil
@ -476,8 +472,8 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
// Otherwise, we'll need to only query for the next chunk.
// We'll slice into our query chunk, then slide down our main
// pointer down by the chunk size.
queryChunk = g.newChansToQuery[:chunkSize]
g.newChansToQuery = g.newChansToQuery[chunkSize:]
queryChunk = g.newChansToQuery[:g.cfg.chunkSize]
g.newChansToQuery = g.newChansToQuery[g.cfg.chunkSize:]
}
log.Infof("gossipSyncer(%x): querying for %v new channels",
@ -615,14 +611,6 @@ func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
// ensure that our final fragment carries the "complete" bit to indicate the
// end of our streaming response.
func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
// Using the current set encoding type, we'll determine what our chunk
// size should be. If we can't locate the chunk size, then we'll return
// an error as we can't proceed.
chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType]
if !ok {
return fmt.Errorf("unknown encoding type: %v", g.cfg.encodingType)
}
log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+
"num_blocks=%v", g.peerPub[:], query.FirstBlockHeight,
query.NumBlocks)
@ -651,7 +639,7 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
// We know this is the final chunk, if the difference between
// the total number of channels, and the number of channels
// we've sent is less-than-or-equal to the chunk size.
isFinalChunk := (numChannels - numChansSent) <= chunkSize
isFinalChunk := (numChannels - numChansSent) <= g.cfg.chunkSize
// If this is indeed the last chunk, then we'll send the
// remainder of the channels.
@ -664,7 +652,7 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
} else {
// Otherwise, we'll only send off a fragment exactly
// sized to the proper chunk size.
channelChunk = channelRange[numChansSent : numChansSent+chunkSize]
channelChunk = channelRange[numChansSent : numChansSent+g.cfg.chunkSize]
log.Infof("gossipSyncer(%x): sending range chunk of "+
"size=%v", g.peerPub[:], len(channelChunk))

View File

@ -7,10 +7,18 @@ import (
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lnwire"
)
const (
defaultEncoding = lnwire.EncodingSortedPlain
)
var (
defaultChunkSize = encodingTypeToChunkSize[defaultEncoding]
)
type horizonQuery struct {
@ -105,13 +113,16 @@ func (m *mockChannelGraphTimeSeries) FetchChanUpdates(chain chainhash.Hash,
var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil)
func newTestSyncer(hID lnwire.ShortChannelID) (chan []lnwire.Message, *gossipSyncer, *mockChannelGraphTimeSeries) {
func newTestSyncer(hID lnwire.ShortChannelID,
encodingType lnwire.ShortChanIDEncoding, chunkSize int32,
) (chan []lnwire.Message, *gossipSyncer, *mockChannelGraphTimeSeries) {
msgChan := make(chan []lnwire.Message, 20)
cfg := gossipSyncerCfg{
syncChanUpdates: true,
channelSeries: newMockChannelGraphTimeSeries(hID),
encodingType: lnwire.EncodingSortedPlain,
encodingType: encodingType,
chunkSize: chunkSize,
sendToPeer: func(msgs ...lnwire.Message) error {
msgChan <- msgs
return nil
@ -130,7 +141,8 @@ func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) {
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
// With the syncer created, we'll create a set of messages to filter
@ -174,7 +186,8 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
// We'll create then apply a remote horizon for the target peer with a
@ -303,7 +316,8 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
// We'll apply this gossip horizon for the remote peer.
@ -400,7 +414,8 @@ func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) {
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
// We'll now ask the syncer to reply to a chan ID query, but for a
@ -450,7 +465,8 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
queryChanIDs := []lnwire.ShortChannelID{
@ -527,42 +543,20 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
}
}
// TestGossipSyncerReplyChanRangeQueryUnknownEncodingType tests that if we
// receive a QueryChannelRange message with an unknown encoding type, then we
// return an error.
func TestGossipSyncerReplyChanRangeQueryUnknownEncodingType(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
_, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// If we modify the syncer to expect an encoding type that is currently
// unknown, then it should fail to process the message and return an
// error.
syncer.cfg.encodingType = 99
err := syncer.replyChanRangeQuery(&lnwire.QueryChannelRange{})
if err == nil {
t.Fatalf("expected message fail")
}
}
// TestGossipSyncerReplyChanRangeQuery tests that if we receive a
// QueryChannelRange message, then we'll properly send back a chunked reply to
// the remote peer.
func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
t.Parallel()
// First, we'll modify the main map to provide e a smaller chunk size
// so we can easily test all the edge cases.
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = 2
// We'll use a smaller chunk size so we can easily test all the edge
// cases.
const chunkSize = 2
// We'll now create our test gossip syncer that will shortly respond to
// our canned query.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(10), defaultEncoding, chunkSize,
)
// Next, we'll craft a query to ask for all the new chan ID's after
@ -609,7 +603,7 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
// full, while the other is the final fragment.
const numExpectedChunks = 3
respMsgs := make([]lnwire.ShortChannelID, 0, 5)
for i := 0; i < 3; i++ {
for i := 0; i < numExpectedChunks; i++ {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
@ -658,7 +652,8 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
// We'll now create our test gossip syncer that will shortly respond to
// our canned query.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
// Next, we'll craft a query to ask for all the new chan ID's after
@ -725,9 +720,8 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
// message to allow us to intercept their potential sends.
const startingHeight = 200
_, syncer, _ := newTestSyncer(
lnwire.ShortChannelID{
BlockHeight: startingHeight,
},
lnwire.ShortChannelID{BlockHeight: startingHeight},
defaultEncoding, defaultChunkSize,
)
// If we now ask the syncer to generate an initial range query, it
@ -760,7 +754,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
_, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(10), defaultEncoding, defaultChunkSize,
)
startingState := syncer.state
@ -873,28 +867,6 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
}
}
// TestGossipSyncerSynchronizeChanIDsUnknownEncodingType tests that if we
// attempt to query for a set of new channels using an unknown encoding type,
// then we'll get an error.
func TestGossipSyncerSynchronizeChanIDsUnknownEncodingType(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
_, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// If we modify the syncer to expect an encoding type that is currently
// unknown, then it should fail to process the message and return an
// error.
syncer.cfg.encodingType = 101
_, err := syncer.synchronizeChanIDs()
if err == nil {
t.Fatalf("expected message fail")
}
}
// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of
// the short chan ID's which were unknown to us. We'll ensure that we request
// chunk by chunk, and after the last chunk, we return true indicating that we
@ -902,10 +874,15 @@ func TestGossipSyncerSynchronizeChanIDsUnknownEncodingType(t *testing.T) {
func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
t.Parallel()
// We'll modify the chunk size to be a smaller value, so we can ensure
// our chunk parsing works properly. With this value we should get 3
// queries: two full chunks, and one lingering chunk.
const chunkSize = 2
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(10), defaultEncoding, chunkSize,
)
// Next, we'll construct a set of chan ID's that we should query for,
@ -919,13 +896,7 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
}
syncer.newChansToQuery = newChanIDs
// We'll modify the chunk size to be a smaller value, so we can ensure
// our chunk parsing works properly. With this value we should get 3
// queries: two full chunks, and one lingering chunk.
chunkSize := int32(2)
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize
for i := int32(0); i < chunkSize*2; i += 2 {
for i := 0; i < chunkSize*2; i += 2 {
// With our set up complete, we'll request a sync of chan ID's.
done, err := syncer.synchronizeChanIDs()
if err != nil {
@ -1019,19 +990,24 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
func TestGossipSyncerRoutineSync(t *testing.T) {
t.Parallel()
// We'll modify the chunk size to be a smaller value, so we can ensure
// our chunk parsing works properly. With this value we should get 3
// queries: two full chunks, and one lingering chunk.
const chunkSize = 2
// First, we'll create two gossipSyncer instances with a canned
// sendToPeer message to allow us to intercept their potential sends.
startHeight := lnwire.ShortChannelID{
BlockHeight: 1144,
}
msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight,
startHeight, defaultEncoding, chunkSize,
)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight,
startHeight, defaultEncoding, chunkSize,
)
syncer2.Start()
defer syncer2.Stop()
@ -1049,11 +1025,6 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
lnwire.NewShortChanIDFromInt(6),
}
// Before we start the test, we'll set our chunk size to 2 in order to
// make testing the chunked requests and replies easier.
chunkSize := int32(2)
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize
// We'll kick off the test by passing over the QueryChannelRange
// messages from one node to the other.
select {
@ -1124,7 +1095,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
// At this point, we'll forward the ReplyChannelRange messages to both
// parties. Two replies are expected since the chunk size is 2, and we
// need to query for 3 channels.
for i := 0; i < 2; i++ {
for i := 0; i < chunkSize; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
@ -1147,7 +1118,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
}
}
}
for i := 0; i < 2; i++ {
for i := 0; i < chunkSize; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
@ -1190,9 +1161,9 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
// At this point, both parties should start to send out initial
// requests to query the chan IDs of the remote party. As the chunk
// size is 3, they'll need 2 rounds in order to fully reconcile the
// size is 2, they'll need 2 rounds in order to fully reconcile the
// state.
for i := 0; i < 2; i++ {
for i := 0; i < chunkSize; i++ {
// Both parties should now have sent out the initial requests
// to query the chan IDs of the other party.
select {
@ -1359,28 +1330,28 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
func TestGossipSyncerAlreadySynced(t *testing.T) {
t.Parallel()
// We'll modify the chunk size to be a smaller value, so we can ensure
// our chunk parsing works properly. With this value we should get 3
// queries: two full chunks, and one lingering chunk.
const chunkSize = 2
// First, we'll create two gossipSyncer instances with a canned
// sendToPeer message to allow us to intercept their potential sends.
startHeight := lnwire.ShortChannelID{
BlockHeight: 1144,
}
msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight,
startHeight, defaultEncoding, chunkSize,
)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight,
startHeight, defaultEncoding, chunkSize,
)
syncer2.Start()
defer syncer2.Stop()
// Before we start the test, we'll set our chunk size to 2 in order to
// make testing the chunked requests and replies easier.
chunkSize := int32(2)
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize
// The channel state of both syncers will be identical. They should
// recognize this, and skip the sync phase below.
syncer1Chans := []lnwire.ShortChannelID{
@ -1463,7 +1434,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
// Next, we'll thread through the replies of both parties. As the chunk
// size is 2, and they both know of 3 channels, it'll take two around
// and two chunks.
for i := 0; i < 2; i++ {
for i := 0; i < chunkSize; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
@ -1486,7 +1457,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
}
}
}
for i := 0; i < 2; i++ {
for i := 0; i < chunkSize; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")