diff --git a/discovery/syncer.go b/discovery/syncer.go index 09f3ee3f..346215bf 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -219,6 +219,16 @@ type gossipSyncerCfg struct { // responding to gossip queries after replying to // maxUndelayedQueryReplies queries. delayedQueryReplyInterval time.Duration + + // noSyncChannels will prevent the GossipSyncer from spawning a + // channelGraphSyncer, meaning we will not try to reconcile unknown + // channels with the remote peer. + noSyncChannels bool + + // noReplyQueries will prevent the GossipSyncer from spawning a + // replyHandler, meaning we will not reply to queries from our remote + // peer. + noReplyQueries bool } // GossipSyncer is a struct that handles synchronizing the channel graph state @@ -271,10 +281,15 @@ type GossipSyncer struct { // PassiveSync to ActiveSync. genHistoricalChanRangeQuery bool - // gossipMsgs is a channel that all messages from the target peer will - // be sent over. + // gossipMsgs is a channel that all responses to our queries from the + // target peer will be sent over, these will be read by the + // channelGraphSyncer. gossipMsgs chan lnwire.Message + // queryMsgs is a channel that all queries from the remote peer will be + // received over, these will be read by the replyHandler. + queryMsgs chan lnwire.Message + // bufferedChanRangeReplies is used in the waitingQueryChanReply to // buffer all the chunked response to our query. bufferedChanRangeReplies []lnwire.ShortChannelID @@ -332,6 +347,7 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer { syncTransitionReqs: make(chan *syncTransitionReq), historicalSyncReqs: make(chan *historicalSyncReq), gossipMsgs: make(chan lnwire.Message, 100), + queryMsgs: make(chan lnwire.Message, 100), quit: make(chan struct{}), } } @@ -342,8 +358,17 @@ func (g *GossipSyncer) Start() { g.started.Do(func() { log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:]) - g.wg.Add(1) - go g.channelGraphSyncer() + // TODO(conner): only spawn channelGraphSyncer if remote + // supports gossip queries, and only spawn replyHandler if we + // advertise support + if !g.cfg.noSyncChannels { + g.wg.Add(1) + go g.channelGraphSyncer() + } + if !g.cfg.noReplyQueries { + g.wg.Add(1) + go g.replyHandler() + } }) } @@ -369,7 +394,7 @@ func (g *GossipSyncer) channelGraphSyncer() { log.Debugf("GossipSyncer(%x): state=%v, type=%v", g.cfg.peerPub[:], state, syncType) - switch syncerState(state) { + switch state { // When we're in this state, we're trying to synchronize our // view of the network with the remote peer. We'll kick off // this sync by asking them for the set of channels they @@ -382,14 +407,14 @@ func (g *GossipSyncer) channelGraphSyncer() { g.genHistoricalChanRangeQuery, ) if err != nil { - log.Errorf("unable to gen chan range "+ + log.Errorf("Unable to gen chan range "+ "query: %v", err) return } err = g.cfg.sendToPeer(queryRangeMsg) if err != nil { - log.Errorf("unable to send chan range "+ + log.Errorf("Unable to send chan range "+ "query: %v", err) return } @@ -417,22 +442,16 @@ func (g *GossipSyncer) channelGraphSyncer() { if ok { err := g.processChanRangeReply(queryReply) if err != nil { - log.Errorf("unable to "+ + log.Errorf("Unable to "+ "process chan range "+ "query: %v", err) return } - continue } - // Otherwise, it's the remote peer performing a - // query, which we'll attempt to reply to. - err := g.replyPeerQueries(msg) - if err != nil && err != ErrGossipSyncerExiting { - log.Errorf("unable to reply to peer "+ - "query: %v", err) - } + log.Warnf("Unexpected message: %T in state=%v", + msg, state) case <-g.quit: return @@ -447,7 +466,7 @@ func (g *GossipSyncer) channelGraphSyncer() { // query chunk. done, err := g.synchronizeChanIDs() if err != nil { - log.Errorf("unable to sync chan IDs: %v", err) + log.Errorf("Unable to sync chan IDs: %v", err) } // If this wasn't our last query, then we'll need to @@ -480,13 +499,8 @@ func (g *GossipSyncer) channelGraphSyncer() { continue } - // Otherwise, it's the remote peer performing a - // query, which we'll attempt to deploy to. - err := g.replyPeerQueries(msg) - if err != nil && err != ErrGossipSyncerExiting { - log.Errorf("unable to reply to peer "+ - "query: %v", err) - } + log.Warnf("Unexpected message: %T in state=%v", + msg, state) case <-g.quit: return @@ -520,13 +534,6 @@ func (g *GossipSyncer) channelGraphSyncer() { // messages or process any state transitions and exit if // needed. select { - case msg := <-g.gossipMsgs: - err := g.replyPeerQueries(msg) - if err != nil && err != ErrGossipSyncerExiting { - log.Errorf("unable to reply to peer "+ - "query: %v", err) - } - case req := <-g.syncTransitionReqs: req.errChan <- g.handleSyncTransition(req) @@ -540,6 +547,35 @@ func (g *GossipSyncer) channelGraphSyncer() { } } +// replyHandler is an event loop whose sole purpose is to reply to the remote +// peers queries. Our replyHandler will respond to messages generated by their +// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives +// the other's replyHandler, allowing the replyHandler to operate independently +// from the state machine maintained on the same node. +// +// NOTE: This method MUST be run as a goroutine. +func (g *GossipSyncer) replyHandler() { + defer g.wg.Done() + + for { + select { + case msg := <-g.queryMsgs: + err := g.replyPeerQueries(msg) + switch { + case err == ErrGossipSyncerExiting: + return + + case err != nil: + log.Errorf("Unable to reply to peer "+ + "query: %v", err) + } + + case <-g.quit: + return + } + } +} + // sendGossipTimestampRange constructs and sets a GossipTimestampRange for the // syncer and sends it to the remote peer. func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time, @@ -1065,8 +1101,16 @@ func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) { // ProcessQueryMsg is used by outside callers to pass new channel time series // queries to the internal processing goroutine. func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) { + var msgChan chan lnwire.Message + switch msg.(type) { + case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs: + msgChan = g.queryMsgs + default: + msgChan = g.gossipMsgs + } + select { - case g.gossipMsgs <- msg: + case msgChan <- msg: case <-peerQuit: case <-g.quit: } diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 5a5eb725..b9ee24c7 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -116,16 +116,37 @@ func (m *mockChannelGraphTimeSeries) FetchChanUpdates(chain chainhash.Hash, var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil) +// newTestSyncer creates a new test instance of a GossipSyncer. A buffered +// message channel is returned for intercepting messages sent from the syncer, +// in addition to a mock channel series which allows the test to control which +// messages the syncer knows of or wishes to filter out. The variadic flags are +// treated as positional arguments where the first index signals that the syncer +// should spawn a channelGraphSyncer and second index signals that the syncer +// should spawn a replyHandler. Any flags beyond the first two are currently +// ignored. If no flags are provided, both a channelGraphSyncer and replyHandler +// will be spawned by default. func newTestSyncer(hID lnwire.ShortChannelID, encodingType lnwire.ShortChanIDEncoding, chunkSize int32, -) (chan []lnwire.Message, *GossipSyncer, *mockChannelGraphTimeSeries) { + flags ...bool) (chan []lnwire.Message, + *GossipSyncer, *mockChannelGraphTimeSeries) { + + syncChannels := true + replyQueries := true + if len(flags) > 0 { + syncChannels = flags[0] + } + if len(flags) > 1 { + replyQueries = flags[1] + } msgChan := make(chan []lnwire.Message, 20) cfg := gossipSyncerCfg{ - channelSeries: newMockChannelGraphTimeSeries(hID), - encodingType: encodingType, - chunkSize: chunkSize, - batchSize: chunkSize, + channelSeries: newMockChannelGraphTimeSeries(hID), + encodingType: encodingType, + chunkSize: chunkSize, + batchSize: chunkSize, + noSyncChannels: !syncChannels, + noReplyQueries: !replyQueries, sendToPeer: func(msgs ...lnwire.Message) error { msgChan <- msgs return nil @@ -1020,13 +1041,13 @@ func TestGossipSyncerDelayDOS(t *testing.T) { BlockHeight: 1144, } msgChan1, syncer1, chanSeries1 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, + startHeight, defaultEncoding, chunkSize, true, false, ) syncer1.Start() defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, + startHeight, defaultEncoding, chunkSize, false, true, ) syncer2.Start() defer syncer2.Stop() @@ -1042,7 +1063,7 @@ func TestGossipSyncerDelayDOS(t *testing.T) { numQueryResponses := numUndelayedQueries + numDelayedQueries // The total number of responses must include the initial reply each - // syner will make to QueryChannelRange. + // syncer will make to QueryChannelRange. numTotalQueries := 1 + numQueryResponses // The total number of channels each syncer needs to request must be @@ -1051,12 +1072,6 @@ func TestGossipSyncerDelayDOS(t *testing.T) { // Although both nodes are at the same height, they'll have a // completely disjoint set of chan ID's that they know of. - var syncer1Chans []lnwire.ShortChannelID - for i := 0; i < numTotalChans; i++ { - syncer1Chans = append( - syncer1Chans, lnwire.NewShortChanIDFromInt(uint64(i)), - ) - } var syncer2Chans []lnwire.ShortChannelID for i := numTotalChans; i < numTotalChans+numTotalChans; i++ { syncer2Chans = append( @@ -1064,8 +1079,8 @@ func TestGossipSyncerDelayDOS(t *testing.T) { ) } - // We'll kick off the test by passing over the QueryChannelRange - // messages from one node to the other. + // We'll kick off the test by asserting syncer1 sends over the + // QueryChannelRange message the other node. select { case <-time.After(time.Second * 2): t.Fatalf("didn't get msg from syncer1") @@ -1083,46 +1098,16 @@ func TestGossipSyncerDelayDOS(t *testing.T) { case <-time.After(time.Second * 2): t.Fatalf("node 2 didn't read msg") - case syncer2.gossipMsgs <- msg: - - } - } - } - select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer2") - - case msgs := <-msgChan2: - for _, msg := range msgs { - // The message MUST be a QueryChannelRange message. - _, ok := msg.(*lnwire.QueryChannelRange) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryChannelRange for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer1.gossipMsgs <- msg: + case syncer2.queryMsgs <- msg: } } } - // At this point, we'll need to send responses to both nodes from their - // respective channel series. Both nodes will simply request the entire - // set of channels from the other. This will count as the first - // undelayed response for each syncer. - select { - case <-time.After(time.Second * 2): - t.Fatalf("no query recvd") - - case <-chanSeries1.filterRangeReqs: - // We'll send all the channels that it should know of. - chanSeries1.filterRangeResp <- syncer1Chans - } + // At this point, we'll need to a response from syncer2's channel + // series. This will cause syncer1 to simply request the entire set of + // channels from syncer2. This will count as the first undelayed + // response for sycner2. select { case <-time.After(time.Second * 2): t.Fatalf("no query recvd") @@ -1132,31 +1117,9 @@ func TestGossipSyncerDelayDOS(t *testing.T) { chanSeries2.filterRangeResp <- syncer2Chans } - // At this point, we'll forward the ReplyChannelRange messages to both - // parties. After receiving the set of channels known to the remote peer + // At this point, we'll assert that the ReplyChannelRange message is + // sent by sycner2. for i := 0; i < numQueryResponses; i++ { - select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer1") - - case msgs := <-msgChan1: - for _, msg := range msgs { - // The message MUST be a ReplyChannelRange message. - _, ok := msg.(*lnwire.ReplyChannelRange) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryChannelRange for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer2.gossipMsgs <- msg: - } - } - } - select { case <-time.After(time.Second * 2): t.Fatalf("didn't get msg from syncer2") @@ -1180,8 +1143,7 @@ func TestGossipSyncerDelayDOS(t *testing.T) { } } - // We'll now send back a chunked response for both parties of the known - // short chan ID's. + // We'll now have syncer1 process the received sids from syncer2. select { case <-time.After(time.Second * 2): t.Fatalf("no query recvd") @@ -1189,207 +1151,140 @@ func TestGossipSyncerDelayDOS(t *testing.T) { case <-chanSeries1.filterReq: chanSeries1.filterResp <- syncer2Chans } + + // At this point, syncer1 should start to send out initial requests to + // query the chan IDs of the remote party. We'll keep track of the + // number of queries made using the iterated value, which starts at one + // due the initial contribution of the QueryChannelRange msgs. + for i := 1; i < numTotalQueries; i++ { + expDelayResponse := i >= numUndelayedQueries + queryBatch(t, + msgChan1, msgChan2, + syncer1, syncer2, + chanSeries2, + expDelayResponse, + delayedQueryInterval, + delayTolerance, + ) + } +} + +// queryBatch is a helper method that will query for a single batch of channels +// from a peer and assert the responses. The method can also be used to assert +// the same transition happens, but is delayed by the remote peer's DOS +// rate-limiting. The provided chanSeries should belong to syncer2. +// +// The state transition performed is the following: +// syncer1 -- QueryShortChanIDs --> syncer2 +// chanSeries.FetchChanAnns() +// syncer1 <-- ReplyShortChanIDsEnd -- syncer2 +// +// If expDelayResponse is true, this method will assert that the call the +// FetchChanAnns happens between: +// [delayedQueryInterval-delayTolerance, delayedQueryInterval+delayTolerance]. +func queryBatch(t *testing.T, + msgChan1, msgChan2 chan []lnwire.Message, + syncer1, syncer2 *GossipSyncer, + chanSeries *mockChannelGraphTimeSeries, + expDelayResponse bool, + delayedQueryInterval, delayTolerance time.Duration) { + + t.Helper() + + // First, we'll assert that syncer1 sends a QueryShortChanIDs message to + // the remote peer. select { case <-time.After(time.Second * 2): - t.Fatalf("no query recvd") + t.Fatalf("didn't get msg from syncer2") - case <-chanSeries2.filterReq: - chanSeries2.filterResp <- syncer1Chans + case msgs := <-msgChan1: + for _, msg := range msgs { + // The message MUST be a QueryShortChanIDs message. + _, ok := msg.(*lnwire.QueryShortChanIDs) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryShortChanIDs for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer2.queryMsgs <- msg: + } + } } - // At this point, both parties should start to send out initial - // requests to query the chan IDs of the remote party. We'll keep track - // of the number of queries made using the iterated value, which starts - // at one due the initial contribution of the QueryChannelRange msgs. - for i := 1; i < numTotalQueries; i++ { - // Both parties should now have sent out the initial requests - // to query the chan IDs of the other party. + // We'll then respond to with an empty set of replies (as it doesn't + // affect the test). + switch { + + // If this query has surpassed the undelayed query threshold, we will + // impose stricter timing constraints on the response times. We'll first + // test that syncer2's chanSeries doesn't immediately receive a query, + // and then check that the query hasn't gone unanswered entirely. + case expDelayResponse: + // Create a before and after timeout to test, our test + // will ensure the messages are delivered to the peer + // in this timeframe. + before := time.After( + delayedQueryInterval - delayTolerance, + ) + after := time.After( + delayedQueryInterval + delayTolerance, + ) + + // First, ensure syncer2 doesn't try to respond up until the + // before time fires. select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer1") + case <-before: + // Query is delayed, proceed. - case msgs := <-msgChan1: - for _, msg := range msgs { - // The message MUST be a QueryShortChanIDs message. - _, ok := msg.(*lnwire.QueryShortChanIDs) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryShortChanIDs for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer2.gossipMsgs <- msg: - - } - } + case <-chanSeries.annReq: + t.Fatalf("DOSy query was not delayed") } + + // If syncer2 doesn't attempt a response within the allowed + // interval, then the messages are probably lost. select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer2") + case <-after: + t.Fatalf("no delayed query received") - case msgs := <-msgChan2: - for _, msg := range msgs { - // The message MUST be a QueryShortChanIDs message. - _, ok := msg.(*lnwire.QueryShortChanIDs) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryShortChanIDs for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer1.gossipMsgs <- msg: - - } - } + case <-chanSeries.annReq: + chanSeries.annResp <- []lnwire.Message{} } - // We'll then respond to both parties with an empty set of - // replies (as it doesn't affect the test). - switch { - - // If this query has surpassed the undelayed query threshold, we - // will impose stricter timing constraints on the response - // times. We'll first test that the peers don't immediately - // receive a query, and then check that both queries haven't - // gone unanswered entirely. - case i >= numUndelayedQueries: - // Create a before and after timeout to test, our test - // will ensure the messages are delivered to the peers - // in this timeframe. - before := time.After( - delayedQueryInterval - delayTolerance, - ) - after := time.After( - delayedQueryInterval + delayTolerance, - ) - - // First, ensure neither peer tries to respond up until - // the before time fires. - select { - case <-before: - // Queries are delayed, proceed. - - case <-chanSeries1.annReq: - t.Fatalf("DOSy query was not delayed") - - case <-chanSeries2.annReq: - t.Fatalf("DOSy query was not delayed") - } - - // Next, we'll need to test that both queries are - // received before the after timer expires. To account - // for ordering, we will try to pull a message from both - // peers, and then test that the opposite peer also - // receives the message promptly. - var ( - firstChanSeries *mockChannelGraphTimeSeries - laterChanSeries *mockChannelGraphTimeSeries - ) - - // If neither peer attempts a response within the - // allowed interval, then the messages are probably - // lost. Otherwise, process the message and record the - // induced ordering. - select { - case <-after: - t.Fatalf("no delayed query received") - - case <-chanSeries1.annReq: - chanSeries1.annResp <- []lnwire.Message{} - firstChanSeries = chanSeries1 - laterChanSeries = chanSeries2 - - case <-chanSeries2.annReq: - chanSeries2.annResp <- []lnwire.Message{} - firstChanSeries = chanSeries2 - laterChanSeries = chanSeries1 - } - - // Finally, using the same interval timeout as before, - // ensure the later peer also responds promptly. We also - // assert that the first peer doesn't attempt another - // response. - select { - case <-after: - t.Fatalf("no delayed query received") - - case <-firstChanSeries.annReq: - t.Fatalf("spurious undelayed response") - - case <-laterChanSeries.annReq: - laterChanSeries.annResp <- []lnwire.Message{} - } - - // Otherwise, we still haven't exceeded our undelayed query - // limit. Assert that both peers promptly attempt a response to - // the queries. - default: - select { - case <-time.After(50 * time.Millisecond): - t.Fatalf("no query recvd") - - case <-chanSeries1.annReq: - chanSeries1.annResp <- []lnwire.Message{} - } - select { - case <-time.After(50 * time.Millisecond): - t.Fatalf("no query recvd") - - case <-chanSeries2.annReq: - chanSeries2.annResp <- []lnwire.Message{} - } - } - - // Finally, both sides should then receive a - // ReplyShortChanIDsEnd as the first chunk has been replied to. + // Otherwise, syncer2 should query its chanSeries promtly. + default: select { case <-time.After(50 * time.Millisecond): - t.Fatalf("didn't get msg from syncer1") + t.Fatalf("no query recvd") - case msgs := <-msgChan1: - for _, msg := range msgs { - // The message MUST be a ReplyShortChanIDsEnd message. - _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryChannelRange for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer2.gossipMsgs <- msg: - - } - } + case <-chanSeries.annReq: + chanSeries.annResp <- []lnwire.Message{} } - select { - case <-time.After(50 * time.Millisecond): - t.Fatalf("didn't get msg from syncer2") + } - case msgs := <-msgChan2: - for _, msg := range msgs { - // The message MUST be a ReplyShortChanIDsEnd message. - _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) - if !ok { - t.Fatalf("wrong message: expected "+ - "ReplyShortChanIDsEnd for %T", msg) - } + // Finally, assert that syncer2 replies to syncer1 with a + // ReplyShortChanIDsEnd. + select { + case <-time.After(50 * time.Millisecond): + t.Fatalf("didn't get msg from syncer2") - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") + case msgs := <-msgChan2: + for _, msg := range msgs { + // The message MUST be a ReplyShortChanIDsEnd message. + _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) + if !ok { + t.Fatalf("wrong message: expected "+ + "ReplyShortChanIDsEnd for %T", msg) + } - case syncer1.gossipMsgs <- msg: + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") - } + case syncer1.gossipMsgs <- msg: } } } @@ -1413,24 +1308,19 @@ func TestGossipSyncerRoutineSync(t *testing.T) { BlockHeight: 1144, } msgChan1, syncer1, chanSeries1 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, + startHeight, defaultEncoding, chunkSize, true, false, ) syncer1.Start() defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( - startHeight, defaultEncoding, chunkSize, + startHeight, defaultEncoding, chunkSize, false, true, ) syncer2.Start() defer syncer2.Stop() - // Although both nodes are at the same height, they'll have a - // completely disjoint set of 3 chan ID's that they know of. - syncer1Chans := []lnwire.ShortChannelID{ - lnwire.NewShortChanIDFromInt(1), - lnwire.NewShortChanIDFromInt(2), - lnwire.NewShortChanIDFromInt(3), - } + // Although both nodes are at the same height, syncer will have 3 chan + // ID's that syncer1 doesn't know of. syncer2Chans := []lnwire.ShortChannelID{ lnwire.NewShortChanIDFromInt(4), lnwire.NewShortChanIDFromInt(5), @@ -1438,7 +1328,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) { } // We'll kick off the test by passing over the QueryChannelRange - // messages from one node to the other. + // messages from syncer1 to syncer2. select { case <-time.After(time.Second * 2): t.Fatalf("didn't get msg from syncer1") @@ -1456,45 +1346,15 @@ func TestGossipSyncerRoutineSync(t *testing.T) { case <-time.After(time.Second * 2): t.Fatalf("node 2 didn't read msg") - case syncer2.gossipMsgs <- msg: - - } - } - } - select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer2") - - case msgs := <-msgChan2: - for _, msg := range msgs { - // The message MUST be a QueryChannelRange message. - _, ok := msg.(*lnwire.QueryChannelRange) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryChannelRange for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer1.gossipMsgs <- msg: + case syncer2.queryMsgs <- msg: } } } - // At this point, we'll need to send responses to both nodes from their - // respective channel series. Both nodes will simply request the entire - // set of channels from the other. - select { - case <-time.After(time.Second * 2): - t.Fatalf("no query recvd") - - case <-chanSeries1.filterRangeReqs: - // We'll send all the channels that it should know of. - chanSeries1.filterRangeResp <- syncer1Chans - } + // At this point, we'll need to send a response from syncer2 to syncer1 + // using syncer2's channels This will cause syncer1 to simply request + // the entire set of channels from the other. select { case <-time.After(time.Second * 2): t.Fatalf("no query recvd") @@ -1504,32 +1364,9 @@ func TestGossipSyncerRoutineSync(t *testing.T) { chanSeries2.filterRangeResp <- syncer2Chans } - // At this point, we'll forward the ReplyChannelRange messages to both - // parties. Two replies are expected since the chunk size is 2, and we - // need to query for 3 channels. - for i := 0; i < chunkSize; i++ { - select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer1") - - case msgs := <-msgChan1: - for _, msg := range msgs { - // The message MUST be a ReplyChannelRange message. - _, ok := msg.(*lnwire.ReplyChannelRange) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryChannelRange for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer2.gossipMsgs <- msg: - } - } - } - } + // At this point, we'll assert that syncer2 replies with the + // ReplyChannelRange messages. Two replies are expected since the chunk + // size is 2, and we need to query for 3 channels. for i := 0; i < chunkSize; i++ { select { case <-time.After(time.Second * 2): @@ -1554,8 +1391,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) { } } - // We'll now send back a chunked response for both parties of the known - // short chan ID's. + // We'll now send back a chunked response from syncer2 back to sycner1. select { case <-time.After(time.Second * 2): t.Fatalf("no query recvd") @@ -1563,133 +1399,21 @@ func TestGossipSyncerRoutineSync(t *testing.T) { case <-chanSeries1.filterReq: chanSeries1.filterResp <- syncer2Chans } - select { - case <-time.After(time.Second * 2): - t.Fatalf("no query recvd") - case <-chanSeries2.filterReq: - chanSeries2.filterResp <- syncer1Chans - } - - // At this point, both parties should start to send out initial - // requests to query the chan IDs of the remote party. As the chunk - // size is 2, they'll need 2 rounds in order to fully reconcile the - // state. + // At this point, syncer1 should start to send out initial requests to + // query the chan IDs of the remote party. As the chunk size is 2, + // they'll need 2 rounds in order to fully reconcile the state. for i := 0; i < chunkSize; i++ { - // Both parties should now have sent out the initial requests - // to query the chan IDs of the other party. - select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer1") - - case msgs := <-msgChan1: - for _, msg := range msgs { - // The message MUST be a QueryShortChanIDs message. - _, ok := msg.(*lnwire.QueryShortChanIDs) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryShortChanIDs for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer2.gossipMsgs <- msg: - - } - } - } - select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer2") - - case msgs := <-msgChan2: - for _, msg := range msgs { - // The message MUST be a QueryShortChanIDs message. - _, ok := msg.(*lnwire.QueryShortChanIDs) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryShortChanIDs for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer1.gossipMsgs <- msg: - - } - } - } - - // We'll then respond to both parties with an empty set of replies (as - // it doesn't affect the test). - select { - case <-time.After(time.Second * 2): - t.Fatalf("no query recvd") - - case <-chanSeries1.annReq: - chanSeries1.annResp <- []lnwire.Message{} - } - select { - case <-time.After(time.Second * 2): - t.Fatalf("no query recvd") - - case <-chanSeries2.annReq: - chanSeries2.annResp <- []lnwire.Message{} - } - - // Both sides should then receive a ReplyShortChanIDsEnd as the first - // chunk has been replied to. - select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer1") - - case msgs := <-msgChan1: - for _, msg := range msgs { - // The message MUST be a ReplyShortChanIDsEnd message. - _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryChannelRange for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer2.gossipMsgs <- msg: - - } - } - } - select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer1") - - case msgs := <-msgChan2: - for _, msg := range msgs { - // The message MUST be a ReplyShortChanIDsEnd message. - _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) - if !ok { - t.Fatalf("wrong message: expected "+ - "ReplyShortChanIDsEnd for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer1.gossipMsgs <- msg: - - } - } - } + queryBatch(t, + msgChan1, msgChan2, + syncer1, syncer2, + chanSeries2, + false, 0, 0, + ) } - // At this stage both parties should now be sending over their initial - // GossipTimestampRange messages as they should both be fully synced. + // At this stage syncer1 should now be sending over its initial + // GossipTimestampRange messages as it should be fully synced. select { case <-time.After(time.Second * 2): t.Fatalf("didn't get msg from syncer1") @@ -1709,28 +1433,6 @@ func TestGossipSyncerRoutineSync(t *testing.T) { case syncer2.gossipMsgs <- msg: - } - } - } - select { - case <-time.After(time.Second * 2): - t.Fatalf("didn't get msg from syncer1") - - case msgs := <-msgChan2: - for _, msg := range msgs { - // The message MUST be a GossipTimestampRange message. - _, ok := msg.(*lnwire.GossipTimestampRange) - if !ok { - t.Fatalf("wrong message: expected "+ - "QueryChannelRange for %T", msg) - } - - select { - case <-time.After(time.Second * 2): - t.Fatalf("node 2 didn't read msg") - - case syncer1.gossipMsgs <- msg: - } } } @@ -1796,7 +1498,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { case <-time.After(time.Second * 2): t.Fatalf("node 2 didn't read msg") - case syncer2.gossipMsgs <- msg: + case syncer2.queryMsgs <- msg: } } @@ -1818,7 +1520,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { case <-time.After(time.Second * 2): t.Fatalf("node 2 didn't read msg") - case syncer1.gossipMsgs <- msg: + case syncer1.queryMsgs <- msg: } }