From c61b03762886d740db88e41b5d34f58ded0c8071 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 20 Jul 2018 16:52:01 -0700 Subject: [PATCH] discovery/syncer: store chunk size within gossiperSyncerCfg In this commit, we allow the gossiper syncer to store the chunk size for its respective encoding type. We do this to prevent a race condition that would arise within the unit tests by modifying the values of the encodingTypeToChunkSize map to allow for easier testing. --- discovery/gossiper.go | 11 ++- discovery/syncer.go | 32 +++----- discovery/syncer_test.go | 153 ++++++++++++++++----------------------- 3 files changed, 79 insertions(+), 117 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a7b70ba5..7dea1f36 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -921,11 +921,13 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipS // At this point, a syncer doesn't yet exist, so we'll create a new one // for the peer and return it to the caller. + encoding := lnwire.EncodingSortedPlain syncer = newGossiperSyncer(gossipSyncerCfg{ chainHash: d.cfg.ChainHash, syncChanUpdates: true, channelSeries: d.cfg.ChanSeries, - encodingType: lnwire.EncodingSortedPlain, + encodingType: encoding, + chunkSize: encodingTypeToChunkSize[encoding], sendToPeer: func(msgs ...lnwire.Message) error { return syncPeer.SendMessage(false, msgs...) }, @@ -1236,14 +1238,15 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates return } - log.Infof("Creating new gossipSyncer for peer=%x", - nodeID[:]) + log.Infof("Creating new gossipSyncer for peer=%x", nodeID[:]) + encoding := lnwire.EncodingSortedPlain syncer := newGossiperSyncer(gossipSyncerCfg{ chainHash: d.cfg.ChainHash, syncChanUpdates: recvUpdates, channelSeries: d.cfg.ChanSeries, - encodingType: lnwire.EncodingSortedPlain, + encodingType: encoding, + chunkSize: encodingTypeToChunkSize[encoding], sendToPeer: func(msgs ...lnwire.Message) error { return syncPeer.SendMessage(false, msgs...) }, diff --git a/discovery/syncer.go b/discovery/syncer.go index 8e81b4ef..4041b1fd 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -7,8 +7,8 @@ import ( "sync/atomic" "time" - "github.com/lightningnetwork/lnd/lnwire" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/lnwire" ) // syncerState is an enum that represents the current state of the @@ -159,6 +159,10 @@ type gossipSyncerCfg struct { // with different encoding types will be rejected. encodingType lnwire.ShortChanIDEncoding + // chunkSize is the max number of short chan IDs using the syncer's + // encoding type that we can fit into a single message safely. + chunkSize int32 + // sendToPeer is a function closure that should send the set of // targeted messages to the peer we've been assigned to sync the graph // state from. @@ -445,14 +449,6 @@ func (g *gossipSyncer) channelGraphSyncer() { // required to ensure they fit into a single message. We may re-renter this // state in the case that chunking is required. func (g *gossipSyncer) synchronizeChanIDs() (bool, error) { - // Ensure that we're able to handle queries using the specified chan - // ID. - chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType] - if !ok { - return false, fmt.Errorf("unknown encoding type: %v", - g.cfg.encodingType) - } - // If we're in this state yet there are no more new channels to query // for, then we'll transition to our final synced state and return true // to signal that we're fully synchronized. @@ -468,7 +464,7 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) { // If the number of channels to query for is less than the chunk size, // then we can issue a single query. - if int32(len(g.newChansToQuery)) < chunkSize { + if int32(len(g.newChansToQuery)) < g.cfg.chunkSize { queryChunk = g.newChansToQuery g.newChansToQuery = nil @@ -476,8 +472,8 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) { // Otherwise, we'll need to only query for the next chunk. // We'll slice into our query chunk, then slide down our main // pointer down by the chunk size. - queryChunk = g.newChansToQuery[:chunkSize] - g.newChansToQuery = g.newChansToQuery[chunkSize:] + queryChunk = g.newChansToQuery[:g.cfg.chunkSize] + g.newChansToQuery = g.newChansToQuery[g.cfg.chunkSize:] } log.Infof("gossipSyncer(%x): querying for %v new channels", @@ -615,14 +611,6 @@ func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error { // ensure that our final fragment carries the "complete" bit to indicate the // end of our streaming response. func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error { - // Using the current set encoding type, we'll determine what our chunk - // size should be. If we can't locate the chunk size, then we'll return - // an error as we can't proceed. - chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType] - if !ok { - return fmt.Errorf("unknown encoding type: %v", g.cfg.encodingType) - } - log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+ "num_blocks=%v", g.peerPub[:], query.FirstBlockHeight, query.NumBlocks) @@ -651,7 +639,7 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro // We know this is the final chunk, if the difference between // the total number of channels, and the number of channels // we've sent is less-than-or-equal to the chunk size. - isFinalChunk := (numChannels - numChansSent) <= chunkSize + isFinalChunk := (numChannels - numChansSent) <= g.cfg.chunkSize // If this is indeed the last chunk, then we'll send the // remainder of the channels. @@ -664,7 +652,7 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro } else { // Otherwise, we'll only send off a fragment exactly // sized to the proper chunk size. - channelChunk = channelRange[numChansSent : numChansSent+chunkSize] + channelChunk = channelRange[numChansSent : numChansSent+g.cfg.chunkSize] log.Infof("gossipSyncer(%x): sending range chunk of "+ "size=%v", g.peerPub[:], len(channelChunk)) diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index b46f9af1..2ae8435b 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -7,10 +7,18 @@ import ( "testing" "time" - "github.com/davecgh/go-spew/spew" - "github.com/lightningnetwork/lnd/lnwire" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/lnwire" +) + +const ( + defaultEncoding = lnwire.EncodingSortedPlain +) + +var ( + defaultChunkSize = encodingTypeToChunkSize[defaultEncoding] ) type horizonQuery struct { @@ -105,13 +113,16 @@ func (m *mockChannelGraphTimeSeries) FetchChanUpdates(chain chainhash.Hash, var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil) -func newTestSyncer(hID lnwire.ShortChannelID) (chan []lnwire.Message, *gossipSyncer, *mockChannelGraphTimeSeries) { +func newTestSyncer(hID lnwire.ShortChannelID, + encodingType lnwire.ShortChanIDEncoding, chunkSize int32, +) (chan []lnwire.Message, *gossipSyncer, *mockChannelGraphTimeSeries) { msgChan := make(chan []lnwire.Message, 20) cfg := gossipSyncerCfg{ syncChanUpdates: true, channelSeries: newMockChannelGraphTimeSeries(hID), - encodingType: lnwire.EncodingSortedPlain, + encodingType: encodingType, + chunkSize: chunkSize, sendToPeer: func(msgs ...lnwire.Message) error { msgChan <- msgs return nil @@ -130,7 +141,8 @@ func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) { // First, we'll create a gossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. msgChan, syncer, _ := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, ) // With the syncer created, we'll create a set of messages to filter @@ -174,7 +186,8 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) { // First, we'll create a gossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. msgChan, syncer, chanSeries := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, ) // We'll create then apply a remote horizon for the target peer with a @@ -303,7 +316,8 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) { // First, we'll create a gossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. msgChan, syncer, chanSeries := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, ) // We'll apply this gossip horizon for the remote peer. @@ -400,7 +414,8 @@ func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) { // First, we'll create a gossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. msgChan, syncer, _ := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, ) // We'll now ask the syncer to reply to a chan ID query, but for a @@ -450,7 +465,8 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) { // First, we'll create a gossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. msgChan, syncer, chanSeries := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, ) queryChanIDs := []lnwire.ShortChannelID{ @@ -527,42 +543,20 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) { } } -// TestGossipSyncerReplyChanRangeQueryUnknownEncodingType tests that if we -// receive a QueryChannelRange message with an unknown encoding type, then we -// return an error. -func TestGossipSyncerReplyChanRangeQueryUnknownEncodingType(t *testing.T) { - t.Parallel() - - // First, we'll create a gossipSyncer instance with a canned sendToPeer - // message to allow us to intercept their potential sends. - _, syncer, _ := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), - ) - - // If we modify the syncer to expect an encoding type that is currently - // unknown, then it should fail to process the message and return an - // error. - syncer.cfg.encodingType = 99 - err := syncer.replyChanRangeQuery(&lnwire.QueryChannelRange{}) - if err == nil { - t.Fatalf("expected message fail") - } -} - // TestGossipSyncerReplyChanRangeQuery tests that if we receive a // QueryChannelRange message, then we'll properly send back a chunked reply to // the remote peer. func TestGossipSyncerReplyChanRangeQuery(t *testing.T) { t.Parallel() - // First, we'll modify the main map to provide e a smaller chunk size - // so we can easily test all the edge cases. - encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = 2 + // We'll use a smaller chunk size so we can easily test all the edge + // cases. + const chunkSize = 2 // We'll now create our test gossip syncer that will shortly respond to // our canned query. msgChan, syncer, chanSeries := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(10), defaultEncoding, chunkSize, ) // Next, we'll craft a query to ask for all the new chan ID's after @@ -609,7 +603,7 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) { // full, while the other is the final fragment. const numExpectedChunks = 3 respMsgs := make([]lnwire.ShortChannelID, 0, 5) - for i := 0; i < 3; i++ { + for i := 0; i < numExpectedChunks; i++ { select { case <-time.After(time.Second * 15): t.Fatalf("no msgs received") @@ -658,7 +652,8 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) { // We'll now create our test gossip syncer that will shortly respond to // our canned query. msgChan, syncer, chanSeries := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, ) // Next, we'll craft a query to ask for all the new chan ID's after @@ -725,9 +720,8 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) { // message to allow us to intercept their potential sends. const startingHeight = 200 _, syncer, _ := newTestSyncer( - lnwire.ShortChannelID{ - BlockHeight: startingHeight, - }, + lnwire.ShortChannelID{BlockHeight: startingHeight}, + defaultEncoding, defaultChunkSize, ) // If we now ask the syncer to generate an initial range query, it @@ -760,7 +754,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) { // First, we'll create a gossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. _, syncer, chanSeries := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(10), defaultEncoding, defaultChunkSize, ) startingState := syncer.state @@ -873,28 +867,6 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) { } } -// TestGossipSyncerSynchronizeChanIDsUnknownEncodingType tests that if we -// attempt to query for a set of new channels using an unknown encoding type, -// then we'll get an error. -func TestGossipSyncerSynchronizeChanIDsUnknownEncodingType(t *testing.T) { - t.Parallel() - - // First, we'll create a gossipSyncer instance with a canned sendToPeer - // message to allow us to intercept their potential sends. - _, syncer, _ := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), - ) - - // If we modify the syncer to expect an encoding type that is currently - // unknown, then it should fail to process the message and return an - // error. - syncer.cfg.encodingType = 101 - _, err := syncer.synchronizeChanIDs() - if err == nil { - t.Fatalf("expected message fail") - } -} - // TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of // the short chan ID's which were unknown to us. We'll ensure that we request // chunk by chunk, and after the last chunk, we return true indicating that we @@ -902,10 +874,15 @@ func TestGossipSyncerSynchronizeChanIDsUnknownEncodingType(t *testing.T) { func TestGossipSyncerSynchronizeChanIDs(t *testing.T) { t.Parallel() + // We'll modify the chunk size to be a smaller value, so we can ensure + // our chunk parsing works properly. With this value we should get 3 + // queries: two full chunks, and one lingering chunk. + const chunkSize = 2 + // First, we'll create a gossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. msgChan, syncer, _ := newTestSyncer( - lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(10), defaultEncoding, chunkSize, ) // Next, we'll construct a set of chan ID's that we should query for, @@ -919,13 +896,7 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) { } syncer.newChansToQuery = newChanIDs - // We'll modify the chunk size to be a smaller value, so we can ensure - // our chunk parsing works properly. With this value we should get 3 - // queries: two full chunks, and one lingering chunk. - chunkSize := int32(2) - encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize - - for i := int32(0); i < chunkSize*2; i += 2 { + for i := 0; i < chunkSize*2; i += 2 { // With our set up complete, we'll request a sync of chan ID's. done, err := syncer.synchronizeChanIDs() if err != nil { @@ -1019,19 +990,24 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) { func TestGossipSyncerRoutineSync(t *testing.T) { t.Parallel() + // We'll modify the chunk size to be a smaller value, so we can ensure + // our chunk parsing works properly. With this value we should get 3 + // queries: two full chunks, and one lingering chunk. + const chunkSize = 2 + // First, we'll create two gossipSyncer instances with a canned // sendToPeer message to allow us to intercept their potential sends. startHeight := lnwire.ShortChannelID{ BlockHeight: 1144, } msgChan1, syncer1, chanSeries1 := newTestSyncer( - startHeight, + startHeight, defaultEncoding, chunkSize, ) syncer1.Start() defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( - startHeight, + startHeight, defaultEncoding, chunkSize, ) syncer2.Start() defer syncer2.Stop() @@ -1049,11 +1025,6 @@ func TestGossipSyncerRoutineSync(t *testing.T) { lnwire.NewShortChanIDFromInt(6), } - // Before we start the test, we'll set our chunk size to 2 in order to - // make testing the chunked requests and replies easier. - chunkSize := int32(2) - encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize - // We'll kick off the test by passing over the QueryChannelRange // messages from one node to the other. select { @@ -1124,7 +1095,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) { // At this point, we'll forward the ReplyChannelRange messages to both // parties. Two replies are expected since the chunk size is 2, and we // need to query for 3 channels. - for i := 0; i < 2; i++ { + for i := 0; i < chunkSize; i++ { select { case <-time.After(time.Second * 2): t.Fatalf("didn't get msg from syncer1") @@ -1147,7 +1118,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) { } } } - for i := 0; i < 2; i++ { + for i := 0; i < chunkSize; i++ { select { case <-time.After(time.Second * 2): t.Fatalf("didn't get msg from syncer2") @@ -1190,9 +1161,9 @@ func TestGossipSyncerRoutineSync(t *testing.T) { // At this point, both parties should start to send out initial // requests to query the chan IDs of the remote party. As the chunk - // size is 3, they'll need 2 rounds in order to fully reconcile the + // size is 2, they'll need 2 rounds in order to fully reconcile the // state. - for i := 0; i < 2; i++ { + for i := 0; i < chunkSize; i++ { // Both parties should now have sent out the initial requests // to query the chan IDs of the other party. select { @@ -1359,28 +1330,28 @@ func TestGossipSyncerRoutineSync(t *testing.T) { func TestGossipSyncerAlreadySynced(t *testing.T) { t.Parallel() + // We'll modify the chunk size to be a smaller value, so we can ensure + // our chunk parsing works properly. With this value we should get 3 + // queries: two full chunks, and one lingering chunk. + const chunkSize = 2 + // First, we'll create two gossipSyncer instances with a canned // sendToPeer message to allow us to intercept their potential sends. startHeight := lnwire.ShortChannelID{ BlockHeight: 1144, } msgChan1, syncer1, chanSeries1 := newTestSyncer( - startHeight, + startHeight, defaultEncoding, chunkSize, ) syncer1.Start() defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( - startHeight, + startHeight, defaultEncoding, chunkSize, ) syncer2.Start() defer syncer2.Stop() - // Before we start the test, we'll set our chunk size to 2 in order to - // make testing the chunked requests and replies easier. - chunkSize := int32(2) - encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize - // The channel state of both syncers will be identical. They should // recognize this, and skip the sync phase below. syncer1Chans := []lnwire.ShortChannelID{ @@ -1463,7 +1434,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { // Next, we'll thread through the replies of both parties. As the chunk // size is 2, and they both know of 3 channels, it'll take two around // and two chunks. - for i := 0; i < 2; i++ { + for i := 0; i < chunkSize; i++ { select { case <-time.After(time.Second * 2): t.Fatalf("didn't get msg from syncer1") @@ -1486,7 +1457,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { } } } - for i := 0; i < 2; i++ { + for i := 0; i < chunkSize; i++ { select { case <-time.After(time.Second * 2): t.Fatalf("didn't get msg from syncer2")