discovery/syncer: separate query + reply handlers

This commit creates a distinct replyHandler, completely isolating the
requesting state machine from the processing of queries from the remote
peer. Before the two were interlaced, and the syncer could only reply to
messages in certain states. Now the two will be complete separated,
which is preliminary step to make the replies synchronous (as otherwise
we would be blocking our own requesting state machine).

With this changes, the channelGraphSyncer of each peer will drive the
replyHanlder of the other. The two can now operate independently, or
even spun up conditionally depending on advertised support for gossip
queries, as shown below:

          A                                 B
 channelGraphSyncer ---control-msg--->
                                        replyHandler
 channelGraphSyncer <--control-msg----
           gossiper <--gossip-msgs----

                    <--control-msg---- channelGraphSyncer
       replyHandler
                    ---control-msg---> channelGraphSyncer
                    ---gossip-msgs---> gossiper
This commit is contained in:
Conner Fromknecht 2019-04-26 20:03:14 -07:00
parent 42b081bb37
commit 23d10336c2
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
2 changed files with 255 additions and 509 deletions

@ -219,6 +219,16 @@ type gossipSyncerCfg struct {
// responding to gossip queries after replying to // responding to gossip queries after replying to
// maxUndelayedQueryReplies queries. // maxUndelayedQueryReplies queries.
delayedQueryReplyInterval time.Duration delayedQueryReplyInterval time.Duration
// noSyncChannels will prevent the GossipSyncer from spawning a
// channelGraphSyncer, meaning we will not try to reconcile unknown
// channels with the remote peer.
noSyncChannels bool
// noReplyQueries will prevent the GossipSyncer from spawning a
// replyHandler, meaning we will not reply to queries from our remote
// peer.
noReplyQueries bool
} }
// GossipSyncer is a struct that handles synchronizing the channel graph state // GossipSyncer is a struct that handles synchronizing the channel graph state
@ -271,10 +281,15 @@ type GossipSyncer struct {
// PassiveSync to ActiveSync. // PassiveSync to ActiveSync.
genHistoricalChanRangeQuery bool genHistoricalChanRangeQuery bool
// gossipMsgs is a channel that all messages from the target peer will // gossipMsgs is a channel that all responses to our queries from the
// be sent over. // target peer will be sent over, these will be read by the
// channelGraphSyncer.
gossipMsgs chan lnwire.Message gossipMsgs chan lnwire.Message
// queryMsgs is a channel that all queries from the remote peer will be
// received over, these will be read by the replyHandler.
queryMsgs chan lnwire.Message
// bufferedChanRangeReplies is used in the waitingQueryChanReply to // bufferedChanRangeReplies is used in the waitingQueryChanReply to
// buffer all the chunked response to our query. // buffer all the chunked response to our query.
bufferedChanRangeReplies []lnwire.ShortChannelID bufferedChanRangeReplies []lnwire.ShortChannelID
@ -332,6 +347,7 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer {
syncTransitionReqs: make(chan *syncTransitionReq), syncTransitionReqs: make(chan *syncTransitionReq),
historicalSyncReqs: make(chan *historicalSyncReq), historicalSyncReqs: make(chan *historicalSyncReq),
gossipMsgs: make(chan lnwire.Message, 100), gossipMsgs: make(chan lnwire.Message, 100),
queryMsgs: make(chan lnwire.Message, 100),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
} }
@ -342,8 +358,17 @@ func (g *GossipSyncer) Start() {
g.started.Do(func() { g.started.Do(func() {
log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:]) log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
// TODO(conner): only spawn channelGraphSyncer if remote
// supports gossip queries, and only spawn replyHandler if we
// advertise support
if !g.cfg.noSyncChannels {
g.wg.Add(1) g.wg.Add(1)
go g.channelGraphSyncer() go g.channelGraphSyncer()
}
if !g.cfg.noReplyQueries {
g.wg.Add(1)
go g.replyHandler()
}
}) })
} }
@ -369,7 +394,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
log.Debugf("GossipSyncer(%x): state=%v, type=%v", log.Debugf("GossipSyncer(%x): state=%v, type=%v",
g.cfg.peerPub[:], state, syncType) g.cfg.peerPub[:], state, syncType)
switch syncerState(state) { switch state {
// When we're in this state, we're trying to synchronize our // When we're in this state, we're trying to synchronize our
// view of the network with the remote peer. We'll kick off // view of the network with the remote peer. We'll kick off
// this sync by asking them for the set of channels they // this sync by asking them for the set of channels they
@ -382,14 +407,14 @@ func (g *GossipSyncer) channelGraphSyncer() {
g.genHistoricalChanRangeQuery, g.genHistoricalChanRangeQuery,
) )
if err != nil { if err != nil {
log.Errorf("unable to gen chan range "+ log.Errorf("Unable to gen chan range "+
"query: %v", err) "query: %v", err)
return return
} }
err = g.cfg.sendToPeer(queryRangeMsg) err = g.cfg.sendToPeer(queryRangeMsg)
if err != nil { if err != nil {
log.Errorf("unable to send chan range "+ log.Errorf("Unable to send chan range "+
"query: %v", err) "query: %v", err)
return return
} }
@ -417,22 +442,16 @@ func (g *GossipSyncer) channelGraphSyncer() {
if ok { if ok {
err := g.processChanRangeReply(queryReply) err := g.processChanRangeReply(queryReply)
if err != nil { if err != nil {
log.Errorf("unable to "+ log.Errorf("Unable to "+
"process chan range "+ "process chan range "+
"query: %v", err) "query: %v", err)
return return
} }
continue continue
} }
// Otherwise, it's the remote peer performing a log.Warnf("Unexpected message: %T in state=%v",
// query, which we'll attempt to reply to. msg, state)
err := g.replyPeerQueries(msg)
if err != nil && err != ErrGossipSyncerExiting {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case <-g.quit: case <-g.quit:
return return
@ -447,7 +466,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
// query chunk. // query chunk.
done, err := g.synchronizeChanIDs() done, err := g.synchronizeChanIDs()
if err != nil { if err != nil {
log.Errorf("unable to sync chan IDs: %v", err) log.Errorf("Unable to sync chan IDs: %v", err)
} }
// If this wasn't our last query, then we'll need to // If this wasn't our last query, then we'll need to
@ -480,13 +499,8 @@ func (g *GossipSyncer) channelGraphSyncer() {
continue continue
} }
// Otherwise, it's the remote peer performing a log.Warnf("Unexpected message: %T in state=%v",
// query, which we'll attempt to deploy to. msg, state)
err := g.replyPeerQueries(msg)
if err != nil && err != ErrGossipSyncerExiting {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case <-g.quit: case <-g.quit:
return return
@ -520,13 +534,6 @@ func (g *GossipSyncer) channelGraphSyncer() {
// messages or process any state transitions and exit if // messages or process any state transitions and exit if
// needed. // needed.
select { select {
case msg := <-g.gossipMsgs:
err := g.replyPeerQueries(msg)
if err != nil && err != ErrGossipSyncerExiting {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case req := <-g.syncTransitionReqs: case req := <-g.syncTransitionReqs:
req.errChan <- g.handleSyncTransition(req) req.errChan <- g.handleSyncTransition(req)
@ -540,6 +547,35 @@ func (g *GossipSyncer) channelGraphSyncer() {
} }
} }
// replyHandler is an event loop whose sole purpose is to reply to the remote
// peers queries. Our replyHandler will respond to messages generated by their
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
// the other's replyHandler, allowing the replyHandler to operate independently
// from the state machine maintained on the same node.
//
// NOTE: This method MUST be run as a goroutine.
func (g *GossipSyncer) replyHandler() {
defer g.wg.Done()
for {
select {
case msg := <-g.queryMsgs:
err := g.replyPeerQueries(msg)
switch {
case err == ErrGossipSyncerExiting:
return
case err != nil:
log.Errorf("Unable to reply to peer "+
"query: %v", err)
}
case <-g.quit:
return
}
}
}
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the // sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
// syncer and sends it to the remote peer. // syncer and sends it to the remote peer.
func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time, func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
@ -1065,8 +1101,16 @@ func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
// ProcessQueryMsg is used by outside callers to pass new channel time series // ProcessQueryMsg is used by outside callers to pass new channel time series
// queries to the internal processing goroutine. // queries to the internal processing goroutine.
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) { func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) {
var msgChan chan lnwire.Message
switch msg.(type) {
case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
msgChan = g.queryMsgs
default:
msgChan = g.gossipMsgs
}
select { select {
case g.gossipMsgs <- msg: case msgChan <- msg:
case <-peerQuit: case <-peerQuit:
case <-g.quit: case <-g.quit:
} }

@ -116,9 +116,28 @@ func (m *mockChannelGraphTimeSeries) FetchChanUpdates(chain chainhash.Hash,
var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil) var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil)
// newTestSyncer creates a new test instance of a GossipSyncer. A buffered
// message channel is returned for intercepting messages sent from the syncer,
// in addition to a mock channel series which allows the test to control which
// messages the syncer knows of or wishes to filter out. The variadic flags are
// treated as positional arguments where the first index signals that the syncer
// should spawn a channelGraphSyncer and second index signals that the syncer
// should spawn a replyHandler. Any flags beyond the first two are currently
// ignored. If no flags are provided, both a channelGraphSyncer and replyHandler
// will be spawned by default.
func newTestSyncer(hID lnwire.ShortChannelID, func newTestSyncer(hID lnwire.ShortChannelID,
encodingType lnwire.ShortChanIDEncoding, chunkSize int32, encodingType lnwire.ShortChanIDEncoding, chunkSize int32,
) (chan []lnwire.Message, *GossipSyncer, *mockChannelGraphTimeSeries) { flags ...bool) (chan []lnwire.Message,
*GossipSyncer, *mockChannelGraphTimeSeries) {
syncChannels := true
replyQueries := true
if len(flags) > 0 {
syncChannels = flags[0]
}
if len(flags) > 1 {
replyQueries = flags[1]
}
msgChan := make(chan []lnwire.Message, 20) msgChan := make(chan []lnwire.Message, 20)
cfg := gossipSyncerCfg{ cfg := gossipSyncerCfg{
@ -126,6 +145,8 @@ func newTestSyncer(hID lnwire.ShortChannelID,
encodingType: encodingType, encodingType: encodingType,
chunkSize: chunkSize, chunkSize: chunkSize,
batchSize: chunkSize, batchSize: chunkSize,
noSyncChannels: !syncChannels,
noReplyQueries: !replyQueries,
sendToPeer: func(msgs ...lnwire.Message) error { sendToPeer: func(msgs ...lnwire.Message) error {
msgChan <- msgs msgChan <- msgs
return nil return nil
@ -1020,13 +1041,13 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
BlockHeight: 1144, BlockHeight: 1144,
} }
msgChan1, syncer1, chanSeries1 := newTestSyncer( msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, startHeight, defaultEncoding, chunkSize, true, false,
) )
syncer1.Start() syncer1.Start()
defer syncer1.Stop() defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer( msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, startHeight, defaultEncoding, chunkSize, false, true,
) )
syncer2.Start() syncer2.Start()
defer syncer2.Stop() defer syncer2.Stop()
@ -1042,7 +1063,7 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
numQueryResponses := numUndelayedQueries + numDelayedQueries numQueryResponses := numUndelayedQueries + numDelayedQueries
// The total number of responses must include the initial reply each // The total number of responses must include the initial reply each
// syner will make to QueryChannelRange. // syncer will make to QueryChannelRange.
numTotalQueries := 1 + numQueryResponses numTotalQueries := 1 + numQueryResponses
// The total number of channels each syncer needs to request must be // The total number of channels each syncer needs to request must be
@ -1051,12 +1072,6 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
// Although both nodes are at the same height, they'll have a // Although both nodes are at the same height, they'll have a
// completely disjoint set of chan ID's that they know of. // completely disjoint set of chan ID's that they know of.
var syncer1Chans []lnwire.ShortChannelID
for i := 0; i < numTotalChans; i++ {
syncer1Chans = append(
syncer1Chans, lnwire.NewShortChanIDFromInt(uint64(i)),
)
}
var syncer2Chans []lnwire.ShortChannelID var syncer2Chans []lnwire.ShortChannelID
for i := numTotalChans; i < numTotalChans+numTotalChans; i++ { for i := numTotalChans; i < numTotalChans+numTotalChans; i++ {
syncer2Chans = append( syncer2Chans = append(
@ -1064,8 +1079,8 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
) )
} }
// We'll kick off the test by passing over the QueryChannelRange // We'll kick off the test by asserting syncer1 sends over the
// messages from one node to the other. // QueryChannelRange message the other node.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1") t.Fatalf("didn't get msg from syncer1")
@ -1083,46 +1098,16 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg: case syncer2.queryMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
} }
} }
} }
// At this point, we'll need to send responses to both nodes from their // At this point, we'll need to a response from syncer2's channel
// respective channel series. Both nodes will simply request the entire // series. This will cause syncer1 to simply request the entire set of
// set of channels from the other. This will count as the first // channels from syncer2. This will count as the first undelayed
// undelayed response for each syncer. // response for sycner2.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterRangeReqs:
// We'll send all the channels that it should know of.
chanSeries1.filterRangeResp <- syncer1Chans
}
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
@ -1132,31 +1117,9 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
chanSeries2.filterRangeResp <- syncer2Chans chanSeries2.filterRangeResp <- syncer2Chans
} }
// At this point, we'll forward the ReplyChannelRange messages to both // At this point, we'll assert that the ReplyChannelRange message is
// parties. After receiving the set of channels known to the remote peer // sent by sycner2.
for i := 0; i < numQueryResponses; i++ { for i := 0; i < numQueryResponses; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2") t.Fatalf("didn't get msg from syncer2")
@ -1180,8 +1143,7 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
} }
} }
// We'll now send back a chunked response for both parties of the known // We'll now have syncer1 process the received sids from syncer2.
// short chan ID's.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
@ -1189,24 +1151,51 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
case <-chanSeries1.filterReq: case <-chanSeries1.filterReq:
chanSeries1.filterResp <- syncer2Chans chanSeries1.filterResp <- syncer2Chans
} }
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterReq: // At this point, syncer1 should start to send out initial requests to
chanSeries2.filterResp <- syncer1Chans // query the chan IDs of the remote party. We'll keep track of the
// number of queries made using the iterated value, which starts at one
// due the initial contribution of the QueryChannelRange msgs.
for i := 1; i < numTotalQueries; i++ {
expDelayResponse := i >= numUndelayedQueries
queryBatch(t,
msgChan1, msgChan2,
syncer1, syncer2,
chanSeries2,
expDelayResponse,
delayedQueryInterval,
delayTolerance,
)
}
} }
// At this point, both parties should start to send out initial // queryBatch is a helper method that will query for a single batch of channels
// requests to query the chan IDs of the remote party. We'll keep track // from a peer and assert the responses. The method can also be used to assert
// of the number of queries made using the iterated value, which starts // the same transition happens, but is delayed by the remote peer's DOS
// at one due the initial contribution of the QueryChannelRange msgs. // rate-limiting. The provided chanSeries should belong to syncer2.
for i := 1; i < numTotalQueries; i++ { //
// Both parties should now have sent out the initial requests // The state transition performed is the following:
// to query the chan IDs of the other party. // syncer1 -- QueryShortChanIDs --> syncer2
// chanSeries.FetchChanAnns()
// syncer1 <-- ReplyShortChanIDsEnd -- syncer2
//
// If expDelayResponse is true, this method will assert that the call the
// FetchChanAnns happens between:
// [delayedQueryInterval-delayTolerance, delayedQueryInterval+delayTolerance].
func queryBatch(t *testing.T,
msgChan1, msgChan2 chan []lnwire.Message,
syncer1, syncer2 *GossipSyncer,
chanSeries *mockChannelGraphTimeSeries,
expDelayResponse bool,
delayedQueryInterval, delayTolerance time.Duration) {
t.Helper()
// First, we'll assert that syncer1 sends a QueryShortChanIDs message to
// the remote peer.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1") t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan1: case msgs := <-msgChan1:
for _, msg := range msgs { for _, msg := range msgs {
@ -1221,46 +1210,22 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg: case syncer2.queryMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryShortChanIDs message.
_, ok := msg.(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryShortChanIDs for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
} }
} }
} }
// We'll then respond to both parties with an empty set of // We'll then respond to with an empty set of replies (as it doesn't
// replies (as it doesn't affect the test). // affect the test).
switch { switch {
// If this query has surpassed the undelayed query threshold, we // If this query has surpassed the undelayed query threshold, we will
// will impose stricter timing constraints on the response // impose stricter timing constraints on the response times. We'll first
// times. We'll first test that the peers don't immediately // test that syncer2's chanSeries doesn't immediately receive a query,
// receive a query, and then check that both queries haven't // and then check that the query hasn't gone unanswered entirely.
// gone unanswered entirely. case expDelayResponse:
case i >= numUndelayedQueries:
// Create a before and after timeout to test, our test // Create a before and after timeout to test, our test
// will ensure the messages are delivered to the peers // will ensure the messages are delivered to the peer
// in this timeframe. // in this timeframe.
before := time.After( before := time.After(
delayedQueryInterval - delayTolerance, delayedQueryInterval - delayTolerance,
@ -1269,107 +1234,39 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
delayedQueryInterval + delayTolerance, delayedQueryInterval + delayTolerance,
) )
// First, ensure neither peer tries to respond up until // First, ensure syncer2 doesn't try to respond up until the
// the before time fires. // before time fires.
select { select {
case <-before: case <-before:
// Queries are delayed, proceed. // Query is delayed, proceed.
case <-chanSeries1.annReq: case <-chanSeries.annReq:
t.Fatalf("DOSy query was not delayed")
case <-chanSeries2.annReq:
t.Fatalf("DOSy query was not delayed") t.Fatalf("DOSy query was not delayed")
} }
// Next, we'll need to test that both queries are // If syncer2 doesn't attempt a response within the allowed
// received before the after timer expires. To account // interval, then the messages are probably lost.
// for ordering, we will try to pull a message from both
// peers, and then test that the opposite peer also
// receives the message promptly.
var (
firstChanSeries *mockChannelGraphTimeSeries
laterChanSeries *mockChannelGraphTimeSeries
)
// If neither peer attempts a response within the
// allowed interval, then the messages are probably
// lost. Otherwise, process the message and record the
// induced ordering.
select { select {
case <-after: case <-after:
t.Fatalf("no delayed query received") t.Fatalf("no delayed query received")
case <-chanSeries1.annReq: case <-chanSeries.annReq:
chanSeries1.annResp <- []lnwire.Message{} chanSeries.annResp <- []lnwire.Message{}
firstChanSeries = chanSeries1
laterChanSeries = chanSeries2
case <-chanSeries2.annReq:
chanSeries2.annResp <- []lnwire.Message{}
firstChanSeries = chanSeries2
laterChanSeries = chanSeries1
} }
// Finally, using the same interval timeout as before, // Otherwise, syncer2 should query its chanSeries promtly.
// ensure the later peer also responds promptly. We also
// assert that the first peer doesn't attempt another
// response.
select {
case <-after:
t.Fatalf("no delayed query received")
case <-firstChanSeries.annReq:
t.Fatalf("spurious undelayed response")
case <-laterChanSeries.annReq:
laterChanSeries.annResp <- []lnwire.Message{}
}
// Otherwise, we still haven't exceeded our undelayed query
// limit. Assert that both peers promptly attempt a response to
// the queries.
default: default:
select { select {
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
case <-chanSeries1.annReq: case <-chanSeries.annReq:
chanSeries1.annResp <- []lnwire.Message{} chanSeries.annResp <- []lnwire.Message{}
}
select {
case <-time.After(50 * time.Millisecond):
t.Fatalf("no query recvd")
case <-chanSeries2.annReq:
chanSeries2.annResp <- []lnwire.Message{}
} }
} }
// Finally, both sides should then receive a // Finally, assert that syncer2 replies to syncer1 with a
// ReplyShortChanIDsEnd as the first chunk has been replied to. // ReplyShortChanIDsEnd.
select {
case <-time.After(50 * time.Millisecond):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyShortChanIDsEnd message.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select { select {
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
t.Fatalf("didn't get msg from syncer2") t.Fatalf("didn't get msg from syncer2")
@ -1388,8 +1285,6 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg: case syncer1.gossipMsgs <- msg:
}
} }
} }
} }
@ -1413,24 +1308,19 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
BlockHeight: 1144, BlockHeight: 1144,
} }
msgChan1, syncer1, chanSeries1 := newTestSyncer( msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, startHeight, defaultEncoding, chunkSize, true, false,
) )
syncer1.Start() syncer1.Start()
defer syncer1.Stop() defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer( msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, startHeight, defaultEncoding, chunkSize, false, true,
) )
syncer2.Start() syncer2.Start()
defer syncer2.Stop() defer syncer2.Stop()
// Although both nodes are at the same height, they'll have a // Although both nodes are at the same height, syncer will have 3 chan
// completely disjoint set of 3 chan ID's that they know of. // ID's that syncer1 doesn't know of.
syncer1Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
}
syncer2Chans := []lnwire.ShortChannelID{ syncer2Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(4), lnwire.NewShortChanIDFromInt(4),
lnwire.NewShortChanIDFromInt(5), lnwire.NewShortChanIDFromInt(5),
@ -1438,7 +1328,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
} }
// We'll kick off the test by passing over the QueryChannelRange // We'll kick off the test by passing over the QueryChannelRange
// messages from one node to the other. // messages from syncer1 to syncer2.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1") t.Fatalf("didn't get msg from syncer1")
@ -1456,45 +1346,15 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg: case syncer2.queryMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
} }
} }
} }
// At this point, we'll need to send responses to both nodes from their // At this point, we'll need to send a response from syncer2 to syncer1
// respective channel series. Both nodes will simply request the entire // using syncer2's channels This will cause syncer1 to simply request
// set of channels from the other. // the entire set of channels from the other.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterRangeReqs:
// We'll send all the channels that it should know of.
chanSeries1.filterRangeResp <- syncer1Chans
}
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
@ -1504,32 +1364,9 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
chanSeries2.filterRangeResp <- syncer2Chans chanSeries2.filterRangeResp <- syncer2Chans
} }
// At this point, we'll forward the ReplyChannelRange messages to both // At this point, we'll assert that syncer2 replies with the
// parties. Two replies are expected since the chunk size is 2, and we // ReplyChannelRange messages. Two replies are expected since the chunk
// need to query for 3 channels. // size is 2, and we need to query for 3 channels.
for i := 0; i < chunkSize; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
}
for i := 0; i < chunkSize; i++ { for i := 0; i < chunkSize; i++ {
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
@ -1554,8 +1391,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
} }
} }
// We'll now send back a chunked response for both parties of the known // We'll now send back a chunked response from syncer2 back to sycner1.
// short chan ID's.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
@ -1563,133 +1399,21 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
case <-chanSeries1.filterReq: case <-chanSeries1.filterReq:
chanSeries1.filterResp <- syncer2Chans chanSeries1.filterResp <- syncer2Chans
} }
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterReq: // At this point, syncer1 should start to send out initial requests to
chanSeries2.filterResp <- syncer1Chans // query the chan IDs of the remote party. As the chunk size is 2,
} // they'll need 2 rounds in order to fully reconcile the state.
// At this point, both parties should start to send out initial
// requests to query the chan IDs of the remote party. As the chunk
// size is 2, they'll need 2 rounds in order to fully reconcile the
// state.
for i := 0; i < chunkSize; i++ { for i := 0; i < chunkSize; i++ {
// Both parties should now have sent out the initial requests queryBatch(t,
// to query the chan IDs of the other party. msgChan1, msgChan2,
select { syncer1, syncer2,
case <-time.After(time.Second * 2): chanSeries2,
t.Fatalf("didn't get msg from syncer1") false, 0, 0,
)
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a QueryShortChanIDs message.
_, ok := msg.(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryShortChanIDs for %T", msg)
} }
select { // At this stage syncer1 should now be sending over its initial
case <-time.After(time.Second * 2): // GossipTimestampRange messages as it should be fully synced.
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryShortChanIDs message.
_, ok := msg.(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryShortChanIDs for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
// We'll then respond to both parties with an empty set of replies (as
// it doesn't affect the test).
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.annReq:
chanSeries1.annResp <- []lnwire.Message{}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.annReq:
chanSeries2.annResp <- []lnwire.Message{}
}
// Both sides should then receive a ReplyShortChanIDsEnd as the first
// chunk has been replied to.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyShortChanIDsEnd message.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a ReplyShortChanIDsEnd message.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("wrong message: expected "+
"ReplyShortChanIDsEnd for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
}
// At this stage both parties should now be sending over their initial
// GossipTimestampRange messages as they should both be fully synced.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1") t.Fatalf("didn't get msg from syncer1")
@ -1709,28 +1433,6 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
case syncer2.gossipMsgs <- msg: case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a GossipTimestampRange message.
_, ok := msg.(*lnwire.GossipTimestampRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
} }
} }
} }
@ -1796,7 +1498,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg: case syncer2.queryMsgs <- msg:
} }
} }
@ -1818,7 +1520,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg: case syncer1.queryMsgs <- msg:
} }
} }