discovery: add new gossipSyncer struct to manage sync state for each peer

In this commit, introduce a new struct, the gossipSyncer. The role of
this struct is to encapsulate the state machine required to implement
the new gossip query range feature recently added to the spec. With this
change, each peer that knows of this new feature will have a new
goroutine that will be managed by the gossiper.

Once created and started, the gossipSyncer will start to progress
through each possible state, finally ending at the chansSynced stage. In
this stage, it has synchronized state with the remote peer, and is
simply awaiting any new messages from the gossiper to send directly to
the peer. Each message will only be sent if the remote peer actually has
a set update horizon, and the message isn't before or after that
horizon.

A set of unit tests has been added to ensure that two state machines
properly terminate and synchronize channel state.
This commit is contained in:
Olaoluwa Osuntokun 2018-04-16 18:54:53 -07:00
parent 62df3cbbb8
commit 5789ef7c10
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
3 changed files with 2497 additions and 0 deletions

@ -1009,6 +1009,20 @@ func (d *AuthenticatedGossiper) networkHandler() {
// If we have new things to announce then broadcast
// them to all our immediately connected peers.
for _, msgChunk := range announcementBatch {
// We'll first attempt to filter out this new
// message for all peers that have active
// gossip syncers active.
d.syncerMtx.RLock()
for _, syncer := range d.peerSyncers {
syncer.FilterGossipMsgs(msgChunk)
}
d.syncerMtx.RUnlock()
// With the syncers taken care of, we'll merge
// the sender map with the set of syncers, so
// we don't send out duplicate messages.
msgChunk.mergeSyncerMap(syncerPeers)
err := d.cfg.Broadcast(
msgChunk.senders, msgChunk.msg,
)

906
discovery/syncer.go Normal file

@ -0,0 +1,906 @@
package discovery
import (
"fmt"
"math"
"sync"
"time"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg/chainhash"
)
// syncerState is an enum that represents the current state of the
// gossipSyncer. As the syncer is a state machine, we'll gate our actions
// based off of the current state and the next incoming message.
type syncerState uint8
const (
// syncingChans is the default state of the gossipSyncer. We start in
// this state when a new peer first connects and we don't yet know if
// we're fully synchronized.
syncingChans syncerState = iota
// waitingQueryRangeReply is the second main phase of the gossipSyncer.
// We enter this state after we send out our first QueryChannelRange
// reply. We'll stay in this state until the remote party sends us a
// ReplyShortChanIDsEnd message that indicates they've responded to our
// query entirely. After this state, we'll transition to
// waitingQueryChanReply after we send out requests for all the new
// chan ID's to us.
waitingQueryRangeReply
// queryNewChannels is the third main phase of the gossipSyncer. In
// this phase we'll send out all of our QueryShortChanIDs messages in
// response to the new channels that we don't yet know about.
queryNewChannels
// waitingQueryChanReply is the fourth main phase of the gossipSyncer.
// We enter this phase once we've sent off a query chink to the remote
// peer. We'll stay in this phase until we receive a
// ReplyShortChanIDsEnd message which indicates that the remote party
// has responded to all of our requests.
waitingQueryChanReply
// chansSynced is the terminal stage of the gossipSyncer. Once we enter
// this phase, we'll send out our update horizon, which filters out the
// set of channel updates that we're interested in. In this state,
// we'll be able to accept any outgoing messages from the
// AuthenticatedGossiper, and decide if we should forward them to our
// target peer based on its update horizon.
chansSynced
)
// String returns a human readable string describing the target syncerState.
func (s syncerState) String() string {
switch s {
case syncingChans:
return "syncingChans"
case waitingQueryRangeReply:
return "waitingQueryRangeReply"
case queryNewChannels:
return "queryNewChannels"
case waitingQueryChanReply:
return "waitingQueryChanReply"
case chansSynced:
return "chansSynced"
default:
return "UNKNOWN STATE"
}
}
var (
// encodingTypeToChunkSize maps an encoding type, to the max number of
// short chan ID's using the encoding type that we can fit into a
// single message safely.
encodingTypeToChunkSize = map[lnwire.ShortChanIDEncoding]int32{
lnwire.EncodingSortedPlain: 8000,
}
)
const (
// chanRangeQueryBuffer is the number of blocks back that we'll go when
// asking the remote peer for their any channels they know of beyond
// our highest known channel ID.
chanRangeQueryBuffer = 144
)
// ChannelGraphTimeSeries is an interface that provides time and block based
// querying into our view of the channel graph. New channels will have
// monotonically increasing block heights, and new channel updates will have
// increasing timestamps. Once we connect to a peer, we'll use the methods in
// this interface to determine if we're already in sync, or need to request
// some new information from them.
type ChannelGraphTimeSeries interface {
// HighestChanID should return the channel ID of the channel we know of
// that's furthest in the target chain. This channel will have a block
// height that's close to the current tip of the main chain as we
// know it. We'll use this to start our QueryChannelRange dance with
// the remote node.
HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error)
// UpdatesInHorizon returns all known channel and node updates with an
// update timestamp between the start time and end time. We'll use this
// to catch up a remote node to the set of channel updates that they
// may have missed out on within the target chain.
UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error)
// FilterKnownChanIDs takes a target chain, and a set of channel ID's,
// and returns a filtered set of chan ID's. This filtered set of chan
// ID's represents the ID's that we don't know of which were in the
// passed superSet.
FilterKnownChanIDs(chain chainhash.Hash,
superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error)
// FilterChannelRange returns the set of channels that we created
// between the start height and the end height. We'll use this to to a
// remote peer's QueryChannelRange message.
FilterChannelRange(chain chainhash.Hash,
startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error)
// FetchChanAnns returns a full set of channel announcements as well as
// their updates that match the set of specified short channel ID's.
// We'll use this to reply to a QueryShortChanIDs message sent by a
// remote peer. The response will contain a unique set of
// ChannelAnnouncements, the latest ChannelUpdate for each of the
// announcements, and a unique set of NodeAnnouncements.
FetchChanAnns(chain chainhash.Hash,
shortChanIDs []lnwire.ShortChannelID) ([]lnwire.Message, error)
// FetchChanUpdates returns the latest channel update messages for the
// specified short channel ID. If no channel updates are known for the
// channel, then an empty slice will be returned.
FetchChanUpdates(chain chainhash.Hash,
shortChanID lnwire.ShortChannelID) ([]*lnwire.ChannelUpdate, error)
}
// gossipSyncerCfg is a struct that packages all the information a gossipSyncer
// needs to carry out its duties.
type gossipSyncerCfg struct {
// chainHash is the chain that this syncer is responsible for.
chainHash chainhash.Hash
// syncChanUpdates is a bool that indicates if we should request a
// continual channel update stream or not.
syncChanUpdates bool
// channelSeries is the primary interface that we'll use to generate
// our queries and respond to the queries of the remote peer.
channelSeries ChannelGraphTimeSeries
// encodingType is the current encoding type we're aware of. Requests
// with different encoding types will be rejected.
encodingType lnwire.ShortChanIDEncoding
// sendToPeer is a function closure that should send the set of
// targeted messages to the peer we've been assigned to sync the graph
// state from.
sendToPeer func(...lnwire.Message) error
}
// gossipSyncer is a struct that handles synchronizing the channel graph state
// with a remote peer. The gossipSyncer implements a state machine that will
// progressively ensure we're synchronized with the channel state of the remote
// node. Once both nodes have been synchronized, we'll use an update filter to
// filter out which messages should be sent to a remote peer based on their
// update horizon. If the update horizon isn't specified, then we won't send
// them any channel updates at all.
//
// TODO(roasbeef): modify to only sync from one peer at a time?
type gossipSyncer struct {
// remoteUpdateHorizon is the update horizon of the remote peer. We'll
// use this to properly filter out any messages.
remoteUpdateHorizon *lnwire.GossipTimestampRange
// localUpdateHorizon is our local update horizon, we'll use this to
// determine if we've already sent out our update.
localUpdateHorizon *lnwire.GossipTimestampRange
// state is the current state of the gossipSyncer.
state syncerState
// gossipMsgs is a channel that all messages from the target peer will
// be sent over.
gossipMsgs chan lnwire.Message
// bufferedChanRangeReplies is used in the waitingQueryChanReply to
// buffer all the chunked response to our query.
bufferedChanRangeReplies []lnwire.ShortChannelID
// newChansToQuery is used to pass the set of channels we should query
// for from the waitingQueryChanReply state to the queryNewChannels
// state.
newChansToQuery []lnwire.ShortChannelID
// peerPub is the public key of the peer we're syncing with, serialized
// in compressed format.
peerPub [33]byte
cfg gossipSyncerCfg
sync.Mutex
quit chan struct{}
wg sync.WaitGroup
}
// newGossiperSyncer returns a new instance of the gossipSyncer populated using
// the passed config.
func newGossiperSyncer(cfg gossipSyncerCfg) *gossipSyncer {
return &gossipSyncer{
cfg: cfg,
gossipMsgs: make(chan lnwire.Message, 100),
quit: make(chan struct{}),
}
}
// Start starts the gossipSyncer and any goroutines that it needs to carry out
// its duties.
func (g *gossipSyncer) Start() error {
log.Debugf("Starting gossipSyncer(%x)", g.peerPub[:])
g.wg.Add(1)
go g.channelGraphSyncer()
return nil
}
// Stop signals the gossipSyncer for a graceful exit, then waits until it has
// exited.
func (g *gossipSyncer) Stop() error {
close(g.quit)
g.wg.Wait()
return nil
}
// channelGraphSyncer is the main goroutine responsible for ensuring that we
// properly channel graph state with the remote peer, and also that we only
// send them messages which actually pass their defined update horizon.
func (g *gossipSyncer) channelGraphSyncer() {
defer g.wg.Done()
// TODO(roasbeef): also add ability to force transition back to syncing
// chans
// * needed if we want to sync chan state very few blocks?
for {
log.Debugf("gossipSyncer(%x): state=%v", g.peerPub[:], g.state)
switch g.state {
// When we're in this state, we're trying to synchronize our
// view of the network with the remote peer. We'll kick off
// this sync by asking them for the set of channels they
// understand, as we'll as responding to any other queries by
// them.
case syncingChans:
// If we're in this state, then we'll send the remote
// peer our opening QueryChannelRange message.
queryRangeMsg, err := g.genChanRangeQuery()
if err != nil {
log.Errorf("unable to gen chan range "+
"query: %v", err)
return
}
err = g.cfg.sendToPeer(queryRangeMsg)
if err != nil {
log.Errorf("unable to send chan range "+
"query: %v", err)
return
}
// With the message sent successfully, we'll transition
// into the next state where we wait for their reply.
g.state = waitingQueryRangeReply
// In this state, we've sent out our initial channel range
// query and are waiting for the final response from the remote
// peer before we perform a diff to see with channels they know
// of that we don't.
case waitingQueryRangeReply:
// We'll wait to either process a new message from the
// remote party, or exit due to the gossiper exiting,
// or us being signalled to do so.
select {
case msg := <-g.gossipMsgs:
// The remote peer is sending a response to our
// initial query, we'll collate this response,
// and see if it's the final one in the series.
// If so, we can then transition to querying
// for the new channels.
queryReply, ok := msg.(*lnwire.ReplyChannelRange)
if ok {
err := g.processChanRangeReply(queryReply)
if err != nil {
log.Errorf("unable to "+
"process chan range "+
"query: %v", err)
return
}
continue
}
// Otherwise, it's the remote peer performing a
// query, which we'll attempt to reply to.
err := g.replyPeerQueries(msg)
if err != nil {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case <-g.quit:
return
}
// We'll enter this state once we've discovered which channels
// the remote party knows of that we don't yet know of
// ourselves.
case queryNewChannels:
// First, we'll attempt to continue our channel
// synchronization by continuing to send off another
// query chunk.
done, err := g.synchronizeChanIDs()
if err != nil {
log.Errorf("unable to sync chan IDs: %v", err)
}
// If this wasn't our last query, then we'll need to
// transition to our waiting state.
if !done {
g.state = waitingQueryChanReply
continue
}
// If we're fully synchronized, then we can transition
// to our terminal state.
g.state = chansSynced
// In this state, we've just sent off a new query for channels
// that we don't yet know of. We'll remain in this state until
// the remote party signals they've responded to our query in
// totality.
case waitingQueryChanReply:
// Once we've sent off our query, we'll wait for either
// an ending reply, or just another query from the
// remote peer.
select {
case msg := <-g.gossipMsgs:
// If this is the final reply to one of our
// queries, then we'll loop back into our query
// state to send of the remaining query chunks.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if ok {
g.state = queryNewChannels
continue
}
// Otherwise, it's the remote peer performing a
// query, which we'll attempt to deploy to.
err := g.replyPeerQueries(msg)
if err != nil {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case <-g.quit:
return
}
// This is our final terminal state where we'll only reply to
// any further queries by the remote peer.
case chansSynced:
// If we haven't yet sent out our update horizon, and
// we want to receive real-time channel updates, we'll
// do so now.
if g.localUpdateHorizon == nil && g.cfg.syncChanUpdates {
// TODO(roasbeef): query DB for most recent
// update?
// We'll give an hours room in our update
// horizon to ensure we don't miss any newer
// items.
updateHorizon := time.Now().Add(-time.Hour * 1)
log.Infof("gossipSyncer(%x): applying "+
"gossipFilter(start=%v)", g.peerPub[:],
updateHorizon)
g.localUpdateHorizon = &lnwire.GossipTimestampRange{
ChainHash: g.cfg.chainHash,
FirstTimestamp: uint32(updateHorizon.Unix()),
TimestampRange: math.MaxUint32,
}
err := g.cfg.sendToPeer(g.localUpdateHorizon)
if err != nil {
log.Errorf("unable to send update "+
"horizon: %v", err)
}
}
// With our horizon set, we'll simply reply to any new
// message and exit if needed.
select {
case msg := <-g.gossipMsgs:
err := g.replyPeerQueries(msg)
if err != nil {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case <-g.quit:
return
}
}
}
}
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
// the remote peer for its known set of channel IDs within a particular block
// range. This method will be called continually until the entire range has
// been queried for with a response received. We'll chunk our requests as
// required to ensure they fit into a single message. We may re-renter this
// state in the case that chunking is required.
func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
// Ensure that we're able to handle queries using the specified chan
// ID.
chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType]
if !ok {
return false, fmt.Errorf("unknown encoding type: %v",
g.cfg.encodingType)
}
// If we're in this state yet there are no more new channels to query
// for, then we'll transition to our final synced state and return true
// to signal that we're fully synchronized.
if len(g.newChansToQuery) == 0 {
log.Infof("gossipSyncer(%x): no more chans to query",
g.peerPub[:])
return true, nil
}
// Otherwise, we'll issue our next chunked query to receive replies
// for.
var queryChunk []lnwire.ShortChannelID
// If the number of channels to query for is less than the chunk size,
// then we can issue a single query.
if int32(len(g.newChansToQuery)) < chunkSize {
queryChunk = g.newChansToQuery
g.newChansToQuery = nil
} else {
// Otherwise, we'll need to only query for the next chunk.
// We'll slice into our query chunk, then slide down our main
// pointer down by the chunk size.
queryChunk = g.newChansToQuery[:chunkSize]
g.newChansToQuery = g.newChansToQuery[chunkSize:]
}
log.Infof("gossipSyncer(%x): querying for %v new channels",
g.peerPub[:], len(queryChunk))
// With our chunk obtained, we'll send over our next query, then return
// false indicating that we're net yet fully synced.
err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
ChainHash: g.cfg.chainHash,
EncodingType: lnwire.EncodingSortedPlain,
ShortChanIDs: queryChunk,
})
return false, err
}
// processChanRangeReply is called each time the gossipSyncer receives a new
// reply to the initial range query to discover new channels that it didn't
// previously know of.
func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
g.bufferedChanRangeReplies = append(
g.bufferedChanRangeReplies, msg.ShortChanIDs...,
)
log.Infof("gossipSyncer(%x): buffering chan range reply of size=%v",
g.peerPub[:], len(msg.ShortChanIDs))
// If this isn't the last response, then we can exit as we've already
// buffered the latest portion of the streaming reply.
if msg.Complete == 0 {
return nil
}
log.Infof("gossipSyncer(%x): filtering through %v chans", g.peerPub[:],
len(g.bufferedChanRangeReplies))
// Otherwise, this is the final response, so we'll now check to see
// which channels they know of that we don't.
newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
g.cfg.chainHash, g.bufferedChanRangeReplies,
)
if err != nil {
return fmt.Errorf("unable to filter chan ids: %v", err)
}
// As we've received the entirety of the reply, we no longer need to
// hold on to the set of buffered replies, so we'll let that be garbage
// collected now.
g.bufferedChanRangeReplies = nil
// If there aren't any channels that we don't know of, then we can
// switch straight to our terminal state.
if len(newChans) == 0 {
log.Infof("gossipSyncer(%x): remote peer has no new chans",
g.peerPub[:])
g.state = chansSynced
return nil
}
// Otherwise, we'll set the set of channels that we need to query for
// the next state, and also transition our state.
g.newChansToQuery = newChans
g.state = queryNewChannels
log.Infof("gossipSyncer(%x): starting query for %v new chans",
g.peerPub[:], len(newChans))
return nil
}
// genChanRangeQuery generates the initial message we'll send to the remote
// party when we're kicking off the channel graph synchronization upon
// connection.
func (g *gossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
// First, we'll query our channel graph time series for its highest
// known channel ID.
newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
if err != nil {
return nil, err
}
// Once we have the chan ID of the newest, we'll obtain the block
// height of the channel, then subtract our default horizon to ensure
// we don't miss any channels. By default, we go back 1 day from the
// newest channel.
var startHeight uint32
switch {
case newestChan.BlockHeight <= chanRangeQueryBuffer:
fallthrough
case newestChan.BlockHeight == 0:
startHeight = 0
default:
startHeight = uint32(newestChan.BlockHeight - chanRangeQueryBuffer)
}
log.Infof("gossipSyncer(%x): requesting new chans from height=%v "+
"and %v blocks after", g.peerPub[:], startHeight,
math.MaxUint32-startHeight)
// Finally, we'll craft the channel range query, using our starting
// height, then asking for all known channels to the foreseeable end of
// the main chain.
return &lnwire.QueryChannelRange{
ChainHash: g.cfg.chainHash,
FirstBlockHeight: startHeight,
NumBlocks: math.MaxUint32 - startHeight,
}, nil
}
// replyPeerQueries is called in response to any query by the remote peer.
// We'll examine our state and send back our best response.
func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
switch msg := msg.(type) {
// In this state, we'll also handle any incoming channel range queries
// from the remote peer as they're trying to sync their state as well.
case *lnwire.QueryChannelRange:
return g.replyChanRangeQuery(msg)
// If the remote peer skips straight to requesting new channels that
// they don't know of, then we'll ensure that we also handle this case.
case *lnwire.QueryShortChanIDs:
return g.replyShortChanIDs(msg)
default:
return fmt.Errorf("unknown message: %T", msg)
}
}
// replyChanRangeQuery will be dispatched in response to a channel range query
// by the remote node. We'll query the channel time series for channels that
// meet the channel range, then chunk our responses to the remote node. We also
// ensure that our final fragment carries the "complete" bit to indicate the
// end of our streaming response.
func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
// Using the current set encoding type, we'll determine what our chunk
// size should be. If we can't locate the chunk size, then we'll return
// an error as we can't proceed.
chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType]
if !ok {
return fmt.Errorf("unknown encoding type: %v", g.cfg.encodingType)
}
log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+
"num_blocks=%v", g.peerPub[:], query.FirstBlockHeight,
query.NumBlocks)
// Next, we'll consult the time series to obtain the set of known
// channel ID's that match their query.
startBlock := query.FirstBlockHeight
channelRange, err := g.cfg.channelSeries.FilterChannelRange(
query.ChainHash, startBlock, startBlock+query.NumBlocks,
)
if err != nil {
return err
}
// TODO(roasbeef): means can't send max uint above?
// * or make internal 64
numChannels := int32(len(channelRange))
numChansSent := int32(0)
for {
// We'll send our this response in a streaming manner,
// chunk-by-chunk. We do this as there's a transport message
// size limit which we'll need to adhere to.
var channelChunk []lnwire.ShortChannelID
// We know this is the final chunk, if the difference between
// the total number of channels, and the number of channels
// we've sent is less-than-or-equal to the chunk size.
isFinalChunk := (numChannels - numChansSent) <= chunkSize
// If this is indeed the last chunk, then we'll send the
// remainder of the channels.
if isFinalChunk {
channelChunk = channelRange[numChansSent:]
log.Infof("gossipSyncer(%x): sending final chan "+
"range chunk, size=%v", g.peerPub[:], len(channelChunk))
} else {
// Otherwise, we'll only send off a fragment exactly
// sized to the proper chunk size.
channelChunk = channelRange[numChansSent : numChansSent+chunkSize]
log.Infof("gossipSyncer(%x): sending range chunk of "+
"size=%v", g.peerPub[:], len(channelChunk))
}
// With our chunk assembled, we'll now send to the remote peer
// the current chunk.
replyChunk := lnwire.ReplyChannelRange{
QueryChannelRange: *query,
Complete: 0,
EncodingType: g.cfg.encodingType,
ShortChanIDs: channelChunk,
}
if isFinalChunk {
replyChunk.Complete = 1
}
if err := g.cfg.sendToPeer(&replyChunk); err != nil {
return err
}
// If this was the final chunk, then we'll exit now as our
// response is now complete.
if isFinalChunk {
return nil
}
numChansSent += int32(len(channelChunk))
}
}
// replyShortChanIDs will be dispatched in response to a query by the remote
// node for information concerning a set of short channel ID's. Our response
// will be sent in a streaming chunked manner to ensure that we remain below
// the current transport level message size.
func (g *gossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error {
// Before responding, we'll check to ensure that the remote peer is
// querying for the same chain that we're on. If not, we'll send back a
// response with a complete value of zero to indicate we're on a
// different chain.
if g.cfg.chainHash != query.ChainHash {
log.Warnf("Remote peer requested QueryShortChanIDs for "+
"chain=%v, we're on chain=%v", g.cfg.chainHash,
query.ChainHash)
return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 0,
})
}
log.Infof("gossipSyncer(%x): fetching chan anns for %v chans",
g.peerPub[:], len(query.ShortChanIDs))
// Now that we know we're on the same chain, we'll query the channel
// time series for the set of messages that we know of which satisfies
// the requirement of being a chan ann, chan update, or a node ann
// related to the set of queried channels.
replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
query.ChainHash, query.ShortChanIDs,
)
if err != nil {
return err
}
// If we didn't find any messages related to those channel ID's, then
// we'll send over a reply marking the end of our response, and exit
// early.
if len(replyMsgs) == 0 {
return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 1,
})
}
// Otherwise, we'll send over our set of messages responding to the
// query, with the ending message appended to it.
replyMsgs = append(replyMsgs, &lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 1,
})
return g.cfg.sendToPeer(replyMsgs...)
}
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
// state machine. Once applied, we'll ensure that we don't forward any messages
// to the peer that aren't within the time range of the filter.
func (g *gossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
g.Lock()
g.remoteUpdateHorizon = filter
startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
endTime := startTime.Add(
time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
)
g.Unlock()
// Now that the remote peer has applied their filter, we'll query the
// database for all the messages that are beyond this filter.
newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
g.cfg.chainHash, startTime, endTime,
)
if err != nil {
return err
}
log.Infof("gossipSyncer(%x): applying new update horizon: start=%v, "+
"end=%v, backlog_size=%v", g.peerPub[:], startTime, endTime,
len(newUpdatestoSend))
// If we don't have any to send, then we can return early.
if len(newUpdatestoSend) == 0 {
return nil
}
// We'll conclude by launching a goroutine to send out any updates.
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := g.cfg.sendToPeer(newUpdatestoSend...); err != nil {
log.Errorf("unable to send messages for peer catch "+
"up: %v", err)
}
}()
return nil
}
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
// iff the message is within the bounds of their set gossip filter. If the peer
// doesn't have a gossip filter set, then no messages will be forwarded.
func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
// If the peer doesn't have an update horizon set, then we won't send
// it any new update messages.
if g.remoteUpdateHorizon == nil {
return
}
// TODO(roasbeef): need to ensure that peer still online...send msg to
// gossiper on peer termination to signal peer disconnect?
var err error
// Before we filter out the messages, we'll construct an index over the
// set of channel announcements and channel updates. This will allow us
// to quickly check if we should forward a chan ann, based on the known
// channel updates for a channel.
chanUpdateIndex := make(map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate)
for _, msg := range msgs {
chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate)
if !ok {
continue
}
chanUpdateIndex[chanUpdate.ShortChannelID] = append(
chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
)
}
// We'll construct a helper function that we'll us below to determine
// if a given messages passes the gossip msg filter.
g.Lock()
startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
endTime := startTime.Add(
time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
)
g.Unlock()
passesFilter := func(timeStamp uint32) bool {
t := time.Unix(int64(timeStamp), 0)
return t.After(startTime) && t.Before(endTime)
}
msgsToSend := make([]lnwire.Message, 0, len(msgs))
for _, msg := range msgs {
// If the target peer is the peer that sent us this message,
// then we'll exit early as we don't need to filter this
// message.
if _, ok := msg.senders[g.peerPub]; ok {
continue
}
switch msg := msg.msg.(type) {
// For each channel announcement message, we'll only send this
// message if the channel updates for the channel are between
// our time range.
case *lnwire.ChannelAnnouncement:
// First, we'll check if the channel updates are in
// this message batch.
chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
if !ok {
// If not, we'll attempt to query the database
// to see if we know of the updates.
chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
g.cfg.chainHash, msg.ShortChannelID,
)
if err != nil {
log.Warnf("no channel updates found for "+
"short_chan_id=%v",
msg.ShortChannelID)
msgsToSend = append(msgsToSend, msg)
continue
}
}
for _, chanUpdate := range chanUpdates {
if passesFilter(chanUpdate.Timestamp) {
msgsToSend = append(msgsToSend, msg)
break
}
}
if len(chanUpdates) == 0 {
msgsToSend = append(msgsToSend, msg)
}
// For each channel update, we'll only send if it the timestamp
// is between our time range.
case *lnwire.ChannelUpdate:
if passesFilter(msg.Timestamp) {
msgsToSend = append(msgsToSend, msg)
}
// Similarly, we only send node announcements if the update
// timestamp ifs between our set gossip filter time range.
case *lnwire.NodeAnnouncement:
if passesFilter(msg.Timestamp) {
msgsToSend = append(msgsToSend, msg)
}
}
}
log.Tracef("gossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
g.peerPub[:], len(msgs), len(msgsToSend))
if len(msgsToSend) == 0 {
return
}
g.cfg.sendToPeer(msgsToSend...)
}
// ProcessQueryMsg is used by outside callers to pass new channel time series
// queries to the internal processing goroutine.
func (g *gossipSyncer) ProcessQueryMsg(msg lnwire.Message) {
select {
case g.gossipMsgs <- msg:
return
case <-g.quit:
return
}
}

1577
discovery/syncer_test.go Normal file

@ -0,0 +1,1577 @@
package discovery
import (
"fmt"
"math"
"reflect"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/chaincfg/chainhash"
)
type horizonQuery struct {
chain chainhash.Hash
start time.Time
end time.Time
}
type filterRangeReq struct {
startHeight, endHeight uint32
}
type mockChannelGraphTimeSeries struct {
highestID lnwire.ShortChannelID
horizonReq chan horizonQuery
horizonResp chan []lnwire.Message
filterReq chan []lnwire.ShortChannelID
filterResp chan []lnwire.ShortChannelID
filterRangeReqs chan filterRangeReq
filterRangeResp chan []lnwire.ShortChannelID
annReq chan []lnwire.ShortChannelID
annResp chan []lnwire.Message
updateReq chan lnwire.ShortChannelID
updateResp chan []*lnwire.ChannelUpdate
}
func newMockChannelGraphTimeSeries(hID lnwire.ShortChannelID) *mockChannelGraphTimeSeries {
return &mockChannelGraphTimeSeries{
highestID: hID,
horizonReq: make(chan horizonQuery, 1),
horizonResp: make(chan []lnwire.Message, 1),
filterReq: make(chan []lnwire.ShortChannelID, 1),
filterResp: make(chan []lnwire.ShortChannelID, 1),
filterRangeReqs: make(chan filterRangeReq, 1),
filterRangeResp: make(chan []lnwire.ShortChannelID, 1),
annReq: make(chan []lnwire.ShortChannelID, 1),
annResp: make(chan []lnwire.Message, 1),
updateReq: make(chan lnwire.ShortChannelID, 1),
updateResp: make(chan []*lnwire.ChannelUpdate, 1),
}
}
func (m *mockChannelGraphTimeSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) {
return &m.highestID, nil
}
func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {
m.horizonReq <- horizonQuery{
chain, startTime, endTime,
}
return <-m.horizonResp, nil
}
func (m *mockChannelGraphTimeSeries) FilterKnownChanIDs(chain chainhash.Hash,
superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error) {
m.filterReq <- superSet
return <-m.filterResp, nil
}
func (m *mockChannelGraphTimeSeries) FilterChannelRange(chain chainhash.Hash,
startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error) {
m.filterRangeReqs <- filterRangeReq{startHeight, endHeight}
return <-m.filterRangeResp, nil
}
func (m *mockChannelGraphTimeSeries) FetchChanAnns(chain chainhash.Hash,
shortChanIDs []lnwire.ShortChannelID) ([]lnwire.Message, error) {
m.annReq <- shortChanIDs
return <-m.annResp, nil
}
func (m *mockChannelGraphTimeSeries) FetchChanUpdates(chain chainhash.Hash,
shortChanID lnwire.ShortChannelID) ([]*lnwire.ChannelUpdate, error) {
m.updateReq <- shortChanID
return <-m.updateResp, nil
}
var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil)
func newTestSyncer(hID lnwire.ShortChannelID) (chan []lnwire.Message, *gossipSyncer, *mockChannelGraphTimeSeries) {
msgChan := make(chan []lnwire.Message, 20)
cfg := gossipSyncerCfg{
syncChanUpdates: true,
channelSeries: newMockChannelGraphTimeSeries(hID),
encodingType: lnwire.EncodingSortedPlain,
sendToPeer: func(msgs ...lnwire.Message) error {
msgChan <- msgs
return nil
},
}
syncer := newGossiperSyncer(cfg)
return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries)
}
// TestGossipSyncerFilterGossipMsgsNoHorizon tests that if the remote peer
// doesn't have a horizon set, then we won't send any incoming messages to it.
func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// With the syncer created, we'll create a set of messages to filter
// through the gossiper to the target peer.
msgs := []msgWithSenders{
{
msg: &lnwire.NodeAnnouncement{Timestamp: uint32(time.Now().Unix())},
},
{
msg: &lnwire.NodeAnnouncement{Timestamp: uint32(time.Now().Unix())},
},
}
// We'll then attempt to filter the set of messages through the target
// peer.
syncer.FilterGossipMsgs(msgs...)
// As the remote peer doesn't yet have a gossip timestamp set, we
// shouldn't receive any outbound messages.
select {
case msg := <-msgChan:
t.Fatalf("received message but shouldn't have: %v",
spew.Sdump(msg))
case <-time.After(time.Millisecond * 10):
}
}
func unixStamp(a int64) uint32 {
t := time.Unix(a, 0)
return uint32(t.Unix())
}
// TestGossipSyncerFilterGossipMsgsAll tests that we're able to properly filter
// out a set of incoming messages based on the set remote update horizon for a
// peer. We tests all messages type, and all time straddling. We'll also send a
// channel ann that already has a channel update on disk.
func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// We'll create then apply a remote horizon for the target peer with a
// set of manually selected timestamps.
remoteHorizon := &lnwire.GossipTimestampRange{
FirstTimestamp: unixStamp(25000),
TimestampRange: uint32(1000),
}
syncer.remoteUpdateHorizon = remoteHorizon
// With the syncer created, we'll create a set of messages to filter
// through the gossiper to the target peer. Our message will consist of
// one node announcement above the horizon, one below. Additionally,
// we'll include a chan ann with an update below the horizon, one
// with an update timestmap above the horizon, and one without any
// channel updates at all.
msgs := []msgWithSenders{
{
// Node ann above horizon.
msg: &lnwire.NodeAnnouncement{Timestamp: unixStamp(25001)},
},
{
// Node ann below horizon.
msg: &lnwire.NodeAnnouncement{Timestamp: unixStamp(5)},
},
{
// Node ann above horizon.
msg: &lnwire.NodeAnnouncement{Timestamp: unixStamp(999999)},
},
{
// Ann tuple below horizon.
msg: &lnwire.ChannelAnnouncement{
ShortChannelID: lnwire.NewShortChanIDFromInt(10),
},
},
{
msg: &lnwire.ChannelUpdate{
ShortChannelID: lnwire.NewShortChanIDFromInt(10),
Timestamp: unixStamp(5),
},
},
{
// Ann tuple above horizon.
msg: &lnwire.ChannelAnnouncement{
ShortChannelID: lnwire.NewShortChanIDFromInt(15),
},
},
{
msg: &lnwire.ChannelUpdate{
ShortChannelID: lnwire.NewShortChanIDFromInt(15),
Timestamp: unixStamp(25002),
},
},
{
// Ann tuple beyond horizon.
msg: &lnwire.ChannelAnnouncement{
ShortChannelID: lnwire.NewShortChanIDFromInt(20),
},
},
{
msg: &lnwire.ChannelUpdate{
ShortChannelID: lnwire.NewShortChanIDFromInt(20),
Timestamp: unixStamp(999999),
},
},
{
// Ann w/o an update at all, the update in the DB will
// be below the horizon.
msg: &lnwire.ChannelAnnouncement{
ShortChannelID: lnwire.NewShortChanIDFromInt(25),
},
},
}
// Before we send off the query, we'll ensure we send the missing
// channel update for that final ann. It will be below the horizon, so
// shouldn't be sent anyway.
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")
case query := <-chanSeries.updateReq:
// It should be asking for the chan updates of short
// chan ID 25.
expectedID := lnwire.NewShortChanIDFromInt(25)
if expectedID != query {
t.Fatalf("wrong query id: expected %v, got %v",
expectedID, query)
}
// If so, then we'll send back the missing update.
chanSeries.updateResp <- []*lnwire.ChannelUpdate{
{
ShortChannelID: lnwire.NewShortChanIDFromInt(25),
Timestamp: unixStamp(5),
},
}
}
}()
// We'll then instruct the gossiper to filter this set of messages.
syncer.FilterGossipMsgs(msgs...)
// Out of all the messages we sent in, we should only get 2 of them
// back.
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
case msgs := <-msgChan:
if len(msgs) != 3 {
t.Fatalf("expected 3 messages instead got %v "+
"messages: %v", len(msgs), spew.Sdump(msgs))
}
}
}
// TestGossipSyncerApplyGossipFilter tests that once a gossip filter is applied
// for the remote peer, then we send the peer all known messages which are
// within their desired time horizon.
func TestGossipSyncerApplyGossipFilter(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// We'll apply this gossip horizon for the remote peer.
remoteHorizon := &lnwire.GossipTimestampRange{
FirstTimestamp: unixStamp(25000),
TimestampRange: uint32(1000),
}
// Before we apply the horizon, we'll dispatch a response to the query
// that the syncer will issue.
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")
case query := <-chanSeries.horizonReq:
// The syncer should have translated the time range
// into the proper star time.
if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
t.Fatalf("wrong query stamp: expected %v, got %v",
remoteHorizon.FirstTimestamp, query.start)
}
// For this first response, we'll send back an empty
// set of messages. As result, we shouldn't send any
// messages.
chanSeries.horizonResp <- []lnwire.Message{}
}
}()
// We'll now attempt to apply the gossip filter for the remote peer.
err := syncer.ApplyGossipFilter(remoteHorizon)
if err != nil {
t.Fatalf("unable to apply filter: %v", err)
}
// There should be no messages in the message queue as we didn't send
// the syncer and messages within the horizon.
select {
case msgs := <-msgChan:
t.Fatalf("expected no msgs, instead got %v", spew.Sdump(msgs))
default:
}
// If we repeat the process, but give the syncer a set of valid
// messages, then these should be sent to the remote peer.
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")
case query := <-chanSeries.horizonReq:
// The syncer should have translated the time range
// into the proper star time.
if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
t.Fatalf("wrong query stamp: expected %v, got %v",
remoteHorizon.FirstTimestamp, query.start)
}
// For this first response, we'll send back a proper
// set of messages that should be echoed back.
chanSeries.horizonResp <- []lnwire.Message{
&lnwire.ChannelUpdate{
ShortChannelID: lnwire.NewShortChanIDFromInt(25),
Timestamp: unixStamp(5),
},
}
}
}()
err = syncer.ApplyGossipFilter(remoteHorizon)
if err != nil {
t.Fatalf("unable to apply filter: %v", err)
}
// We should get back the exact same message.
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
case msgs := <-msgChan:
if len(msgs) != 1 {
t.Fatalf("wrong messages: expected %v, got %v",
1, len(msgs))
}
}
}
// TestGossipSyncerReplyShortChanIDsWrongChainHash tests that if we get a chan
// ID query for the wrong chain, then we send back only a short ID end with
// complete=0.
func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// We'll now ask the syncer to reply to a chan ID query, but for a
// chain that it isn't aware of.
err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{
ChainHash: *chaincfg.SimNetParams.GenesisHash,
})
if err != nil {
t.Fatalf("unable to process short chan ID's: %v", err)
}
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
case msgs := <-msgChan:
// We should get back exactly one message, that's a
// ReplyShortChanIDsEnd with a matching chain hash, and a
// complete value of zero.
if len(msgs) != 1 {
t.Fatalf("wrong messages: expected %v, got %v",
1, len(msgs))
}
msg, ok := msgs[0].(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("expected lnwire.ReplyShortChanIDsEnd "+
"instead got %T", msg)
}
if msg.ChainHash != *chaincfg.SimNetParams.GenesisHash {
t.Fatalf("wrong chain hash: expected %v, got %v",
msg.ChainHash, chaincfg.SimNetParams.GenesisHash)
}
if msg.Complete != 0 {
t.Fatalf("complete set incorrectly")
}
}
}
// TestGossipSyncerReplyShortChanIDs tests that in the case of a known chain
// hash for a QueryShortChanIDs, we'll return the set of matching
// announcements, as well as an ending ReplyShortChanIDsEnd message.
func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
queryChanIDs := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
}
queryReply := []lnwire.Message{
&lnwire.ChannelAnnouncement{
ShortChannelID: lnwire.NewShortChanIDFromInt(20),
},
&lnwire.ChannelUpdate{
ShortChannelID: lnwire.NewShortChanIDFromInt(20),
Timestamp: unixStamp(999999),
},
&lnwire.NodeAnnouncement{Timestamp: unixStamp(25001)},
}
// We'll then craft a reply to the upcoming query for all the matching
// channel announcements for a particular set of short channel ID's.
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")
case chanIDs := <-chanSeries.annReq:
// The set of chan ID's should match exactly.
if !reflect.DeepEqual(chanIDs, queryChanIDs) {
t.Fatalf("wrong chan IDs: expected %v, got %v",
queryChanIDs, chanIDs)
}
// If they do, then we'll send back a response with
// some canned messages.
chanSeries.annResp <- queryReply
}
}()
// With our set up above complete, we'll now attempt to obtain a reply
// from the channel syncer for our target chan ID query.
err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{
ShortChanIDs: queryChanIDs,
})
if err != nil {
t.Fatalf("unable to query for chan IDs: %v", err)
}
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
// We should get back exactly 4 messages. The first 3 are the same
// messages we sent above, and the query end message.
case msgs := <-msgChan:
if len(msgs) != 4 {
t.Fatalf("wrong messages: expected %v, got %v",
4, len(msgs))
}
if !reflect.DeepEqual(queryReply, msgs[:3]) {
t.Fatalf("wrong set of messages: expected %v, got %v",
spew.Sdump(queryReply), spew.Sdump(msgs[:3]))
}
finalMsg, ok := msgs[3].(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("expected lnwire.ReplyShortChanIDsEnd "+
"instead got %T", msgs[3])
}
if finalMsg.Complete != 1 {
t.Fatalf("complete wasn't set")
}
}
}
// TestGossipSyncerReplyChanRangeQueryUnknownEncodingType tests that if we
// receive a QueryChannelRange message with an unknown encoding type, then we
// return an error.
func TestGossipSyncerReplyChanRangeQueryUnknownEncodingType(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
_, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// If we modify the syncer to expect an encoding type that is currently
// unknown, then it should fail to process the message and return an
// error.
syncer.cfg.encodingType = 99
err := syncer.replyChanRangeQuery(&lnwire.QueryChannelRange{})
if err == nil {
t.Fatalf("expected message fail")
}
}
// TestGossipSyncerReplyChanRangeQuery tests that if we receive a
// QueryChannelRange message, then we'll properly send back a chunked reply to
// the remote peer.
func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
t.Parallel()
// First, we'll modify the main map to provide e a smaller chunk size
// so we can easily test all the edge cases.
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = 2
// We'll now create our test gossip syncer that will shortly respond to
// our canned query.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// Next, we'll craft a query to ask for all the new chan ID's after
// block 100.
query := &lnwire.QueryChannelRange{
FirstBlockHeight: 100,
NumBlocks: 50,
}
// We'll then launch a goroutine to reply to the query with a set of 5
// responses. This will ensure we get two full chunks, and one partial
// chunk.
resp := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
lnwire.NewShortChanIDFromInt(4),
lnwire.NewShortChanIDFromInt(5),
}
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")
case filterReq := <-chanSeries.filterRangeReqs:
// We should be querying for block 100 to 150.
if filterReq.startHeight != 100 && filterReq.endHeight != 150 {
t.Fatalf("wrong height range: %v", spew.Sdump(filterReq))
}
// If the proper request was sent, then we'll respond
// with our set of short channel ID's.
chanSeries.filterRangeResp <- resp
}
}()
// With our goroutine active, we'll now issue the query.
if err := syncer.replyChanRangeQuery(query); err != nil {
t.Fatalf("unable to issue query: %v", err)
}
// At this point, we'll now wait for the syncer to send the chunked
// reply. We should get three sets of messages as two of them should be
// full, while the other is the final fragment.
const numExpectedChunks = 3
respMsgs := make([]lnwire.ShortChannelID, 0, 5)
for i := 0; i < 3; i++ {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
case msg := <-msgChan:
resp := msg[0]
rangeResp, ok := resp.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("expected ReplyChannelRange instead got %T", msg)
}
// If this is not the last chunk, then Complete should
// be set to zero. Otherwise, it should be one.
switch {
case i < 2 && rangeResp.Complete != 0:
t.Fatalf("non-final chunk should have "+
"Complete=0: %v", spew.Sdump(rangeResp))
case i == 2 && rangeResp.Complete != 1:
t.Fatalf("final chunk should have "+
"Complete=1: %v", spew.Sdump(rangeResp))
}
respMsgs = append(respMsgs, rangeResp.ShortChanIDs...)
}
}
// We should get back exactly 5 short chan ID's, and they should match
// exactly the ID's we sent as a reply.
if len(respMsgs) != len(resp) {
t.Fatalf("expected %v chan ID's, instead got %v",
len(resp), spew.Sdump(respMsgs))
}
if !reflect.DeepEqual(resp, respMsgs) {
t.Fatalf("mismatched response: expected %v, got %v",
spew.Sdump(resp), spew.Sdump(respMsgs))
}
}
// TestGossipSyncerReplyChanRangeQueryNoNewChans tests that if we issue a reply
// for a channel range query, and we don't have any new channels, then we send
// back a single response that signals completion.
func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
t.Parallel()
// We'll now create our test gossip syncer that will shortly respond to
// our canned query.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// Next, we'll craft a query to ask for all the new chan ID's after
// block 100.
query := &lnwire.QueryChannelRange{
FirstBlockHeight: 100,
NumBlocks: 50,
}
// We'll then launch a goroutine to reply to the query no new channels.
resp := []lnwire.ShortChannelID{}
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")
case filterReq := <-chanSeries.filterRangeReqs:
// We should be querying for block 100 to 150.
if filterReq.startHeight != 100 && filterReq.endHeight != 150 {
t.Fatalf("wrong height range: %v",
spew.Sdump(filterReq))
}
// If the proper request was sent, then we'll respond
// with our blank set of short chan ID's.
chanSeries.filterRangeResp <- resp
}
}()
// With our goroutine active, we'll now issue the query.
if err := syncer.replyChanRangeQuery(query); err != nil {
t.Fatalf("unable to issue query: %v", err)
}
// We should get back exactly one message, and the message should
// indicate that this is the final in the series.
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
case msg := <-msgChan:
resp := msg[0]
rangeResp, ok := resp.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("expected ReplyChannelRange instead got %T", msg)
}
if len(rangeResp.ShortChanIDs) != 0 {
t.Fatalf("expected no chan ID's, instead "+
"got: %v", spew.Sdump(rangeResp.ShortChanIDs))
}
if rangeResp.Complete != 1 {
t.Fatalf("complete wasn't set")
}
}
}
// TestGossipSyncerGenChanRangeQuery tests that given the current best known
// channel ID, we properly generate an correct initial channel range response.
func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
const startingHeight = 200
_, syncer, _ := newTestSyncer(
lnwire.ShortChannelID{
BlockHeight: startingHeight,
},
)
// If we now ask the syncer to generate an initial range query, it
// should return a start height that's back chanRangeQueryBuffer
// blocks.
rangeQuery, err := syncer.genChanRangeQuery()
if err != nil {
t.Fatalf("unable to resp: %v", err)
}
firstHeight := uint32(startingHeight - chanRangeQueryBuffer)
if rangeQuery.FirstBlockHeight != firstHeight {
t.Fatalf("incorrect chan range query: expected %v, %v",
rangeQuery.FirstBlockHeight,
startingHeight-chanRangeQueryBuffer)
}
if rangeQuery.NumBlocks != math.MaxUint32-firstHeight {
t.Fatalf("wrong num blocks: expected %v, got %v",
rangeQuery.NumBlocks, math.MaxUint32-firstHeight)
}
}
// TestGossipSyncerProcessChanRangeReply tests that we'll properly buffer
// replied channel replies until we have the complete version. If no new
// channels were discovered, then we should go directly to the chanSsSynced
// state. Otherwise, we should go to the queryNewChannels states.
func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
_, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
startingState := syncer.state
replies := []*lnwire.ReplyChannelRange{
{
ShortChanIDs: []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(10),
},
},
{
ShortChanIDs: []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(11),
},
},
{
Complete: 1,
ShortChanIDs: []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(12),
},
},
}
// We'll begin by sending the syncer a set of non-complete channel
// range replies.
if err := syncer.processChanRangeReply(replies[0]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
if err := syncer.processChanRangeReply(replies[1]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
// At this point, we should still be in our starting state as the query
// hasn't finished.
if syncer.state != startingState {
t.Fatalf("state should not have transitioned")
}
expectedReq := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(10),
lnwire.NewShortChanIDFromInt(11),
lnwire.NewShortChanIDFromInt(12),
}
// As we're about to send the final response, we'll launch a goroutine
// to respond back with a filtered set of chan ID's.
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")
case req := <-chanSeries.filterReq:
// We should get a request for the entire range of short
// chan ID's.
if !reflect.DeepEqual(expectedReq, req) {
fmt.Printf("wrong request: expected %v, got %v\n",
expectedReq, req)
t.Fatalf("wrong request: expected %v, got %v",
expectedReq, req)
}
// We'll send back only the last two to simulate filtering.
chanSeries.filterResp <- expectedReq[1:]
}
}()
// If we send the final message, then we should transition to
// queryNewChannels as we've sent a non-empty set of new channels.
if err := syncer.processChanRangeReply(replies[2]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
if syncer.state != queryNewChannels {
t.Fatalf("wrong state: expected %v instead got %v",
queryNewChannels, syncer.state)
}
if !reflect.DeepEqual(syncer.newChansToQuery, expectedReq[1:]) {
t.Fatalf("wrong set of chans to query: expected %v, got %v",
syncer.newChansToQuery, expectedReq[1:])
}
// We'll repeat our final reply again, but this time we won't send any
// new channels. As a result, we should transition over to the
// chansSynced state.
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")
case req := <-chanSeries.filterReq:
// We should get a request for the entire range of short
// chan ID's.
if !reflect.DeepEqual(expectedReq[2], req[0]) {
t.Fatalf("wrong request: expected %v, got %v",
expectedReq[2], req[0])
}
// We'll send back only the last two to simulate filtering.
chanSeries.filterResp <- []lnwire.ShortChannelID{}
}
}()
if err := syncer.processChanRangeReply(replies[2]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
if syncer.state != chansSynced {
t.Fatalf("wrong state: expected %v instead got %v",
chansSynced, syncer.state)
}
}
// TestGossipSyncerSynchronizeChanIDsUnknownEncodingType tests that if we
// attempt to query for a set of new channels using an unknown encoding type,
// then we'll get an error.
func TestGossipSyncerSynchronizeChanIDsUnknownEncodingType(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
_, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// If we modify the syncer to expect an encoding type that is currently
// unknown, then it should fail to process the message and return an
// error.
syncer.cfg.encodingType = 101
_, err := syncer.synchronizeChanIDs()
if err == nil {
t.Fatalf("expected message fail")
}
}
// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of
// the short chan ID's which were unknown to us. We'll ensure that we request
// chunk by chunk, and after the last chunk, we return true indicating that we
// can transition to the synced stage.
func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
t.Parallel()
// First, we'll create a gossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10),
)
// Next, we'll construct a set of chan ID's that we should query for,
// and set them as newChansToQuery within the state machine.
newChanIDs := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
lnwire.NewShortChanIDFromInt(4),
lnwire.NewShortChanIDFromInt(5),
}
syncer.newChansToQuery = newChanIDs
// We'll modify the chunk size to be a smaller value, so we can ensure
// our chunk parsing works properly. With this value we should get 3
// queries: two full chunks, and one lingering chunk.
chunkSize := int32(2)
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize
for i := int32(0); i < chunkSize*2; i += 2 {
// With our set up complete, we'll request a sync of chan ID's.
done, err := syncer.synchronizeChanIDs()
if err != nil {
t.Fatalf("unable to sync chan IDs: %v", err)
}
// At this point, we shouldn't yet be done as only 2 items
// should have been queried for.
if done {
t.Fatalf("syncer shown as done, but shouldn't be!")
}
// We should've received a new message from the syncer.
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
case msg := <-msgChan:
queryMsg, ok := msg[0].(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("expected QueryShortChanIDs instead "+
"got %T", msg)
}
// The query message should have queried for the first
// two chan ID's, and nothing more.
if !reflect.DeepEqual(queryMsg.ShortChanIDs, newChanIDs[i:i+chunkSize]) {
t.Fatalf("wrong query: expected %v, got %v",
spew.Sdump(newChanIDs[i:i+chunkSize]),
queryMsg.ShortChanIDs)
}
}
// With the proper message sent out, the internal state of the
// syncer should reflect that it still has more channels to
// query for.
if !reflect.DeepEqual(syncer.newChansToQuery, newChanIDs[i+chunkSize:]) {
t.Fatalf("incorrect chans to query for: expected %v, got %v",
spew.Sdump(newChanIDs[i+chunkSize:]),
syncer.newChansToQuery)
}
}
// At this point, only one more channel should be lingering for the
// syncer to query for.
if !reflect.DeepEqual(newChanIDs[chunkSize*2:], syncer.newChansToQuery) {
t.Fatalf("wrong chans to query: expected %v, got %v",
newChanIDs[chunkSize*2:], syncer.newChansToQuery)
}
// If we issue another query, the syncer should tell us that it's done.
done, err := syncer.synchronizeChanIDs()
if err != nil {
t.Fatalf("unable to sync chan IDs: %v", err)
}
if done {
t.Fatalf("syncer should be finished!")
}
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
case msg := <-msgChan:
queryMsg, ok := msg[0].(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("expected QueryShortChanIDs instead "+
"got %T", msg)
}
// The query issued should simply be the last item.
if !reflect.DeepEqual(queryMsg.ShortChanIDs, newChanIDs[chunkSize*2:]) {
t.Fatalf("wrong query: expected %v, got %v",
spew.Sdump(newChanIDs[chunkSize*2:]),
queryMsg.ShortChanIDs)
}
// There also should be no more channels to query.
if len(syncer.newChansToQuery) != 0 {
t.Fatalf("should be no more chans to query for, "+
"instead have %v",
spew.Sdump(syncer.newChansToQuery))
}
}
}
// TestGossipSyncerRoutineSync tests all state transitions of the main syncer
// goroutine. This ensures that given an encounter with a peer that has a set
// of distinct channels, then we'll properly synchronize our channel state with
// them.
func TestGossipSyncerRoutineSync(t *testing.T) {
t.Parallel()
// First, we'll create two gossipSyncer instances with a canned
// sendToPeer message to allow us to intercept their potential sends.
startHeight := lnwire.ShortChannelID{
BlockHeight: 1144,
}
msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight,
)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight,
)
syncer2.Start()
defer syncer2.Stop()
// Although both nodes are at the same height, they'll have a
// completely disjoint set of 3 chan ID's that they know of.
syncer1Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
}
syncer2Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(4),
lnwire.NewShortChanIDFromInt(5),
lnwire.NewShortChanIDFromInt(6),
}
// Before we start the test, we'll set our chunk size to 2 in order to
// make testing the chunked requests and replies easier.
chunkSize := int32(2)
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize
// We'll kick off the test by passing over the QueryChannelRange
// messages from one node to the other.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
// At this point, we'll need to send responses to both nodes from their
// respective channel series. Both nodes will simply request the entire
// set of channels from the other.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterRangeReqs:
// We'll send all the channels that it should know of.
chanSeries1.filterRangeResp <- syncer1Chans
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterRangeReqs:
// We'll send back all the channels that it should know of.
chanSeries2.filterRangeResp <- syncer2Chans
}
// At this point, we'll forward the ReplyChannelRange messages to both
// parties. Two replies are expected since the chunk size is 2, and we
// need to query for 3 channels.
for i := 0; i < 2; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
}
for i := 0; i < 2; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
}
// We'll now send back a chunked response for both parties of the known
// short chan ID's.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterReq:
chanSeries1.filterResp <- syncer2Chans
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterReq:
chanSeries2.filterResp <- syncer1Chans
}
// At this point, both parties should start to send out initial
// requests to query the chan IDs of the remote party. As the chunk
// size is 3, they'll need 2 rounds in order to fully reconcile the
// state.
for i := 0; i < 2; i++ {
// Both parties should now have sent out the initial requests
// to query the chan IDs of the other party.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a QueryShortChanIDs message.
_, ok := msg.(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryShortChanIDs for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryShortChanIDs message.
_, ok := msg.(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryShortChanIDs for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
// We'll then respond to both parties with an empty set of replies (as
// it doesn't affect the test).
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.annReq:
chanSeries1.annResp <- []lnwire.Message{}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.annReq:
chanSeries2.annResp <- []lnwire.Message{}
}
// Both sides should then receive a ReplyShortChanIDsEnd as the first
// chunk has been replied to.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyShortChanIDsEnd message.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a ReplyShortChanIDsEnd message.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("wrong message: expected "+
"ReplyShortChanIDsEnd for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
}
// At this stage both parties should now be sending over their initial
// GossipTimestampRange messages as they should both be fully synced.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a GossipTimestampRange message.
_, ok := msg.(*lnwire.GossipTimestampRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a GossipTimestampRange message.
_, ok := msg.(*lnwire.GossipTimestampRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
}
// TestGossipSyncerAlreadySynced tests that if we attempt to synchronize two
// syncers that have the exact same state, then they'll skip straight to the
// final state and not perform any channel queries.
func TestGossipSyncerAlreadySynced(t *testing.T) {
t.Parallel()
// First, we'll create two gossipSyncer instances with a canned
// sendToPeer message to allow us to intercept their potential sends.
startHeight := lnwire.ShortChannelID{
BlockHeight: 1144,
}
msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight,
)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight,
)
syncer2.Start()
defer syncer2.Stop()
// Before we start the test, we'll set our chunk size to 2 in order to
// make testing the chunked requests and replies easier.
chunkSize := int32(2)
encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize
// The channel state of both syncers will be identical. They should
// recognize this, and skip the sync phase below.
syncer1Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
}
syncer2Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
}
// We'll now kick off the test by allowing both side to send their
// QueryChannelRange messages to each other.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
// We'll now send back the range each side should send over: the set of
// channels they already know about.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterRangeReqs:
// We'll send all the channels that it should know of.
chanSeries1.filterRangeResp <- syncer1Chans
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterRangeReqs:
// We'll send back all the channels that it should know of.
chanSeries2.filterRangeResp <- syncer2Chans
}
// Next, we'll thread through the replies of both parties. As the chunk
// size is 2, and they both know of 3 channels, it'll take two around
// and two chunks.
for i := 0; i < 2; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
}
for i := 0; i < 2; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
}
// Now that both sides have the full responses, we'll send over the
// channels that they need to filter out. As both sides have the exact
// same set of channels, they should skip to the final state.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterReq:
chanSeries1.filterResp <- []lnwire.ShortChannelID{}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterReq:
chanSeries2.filterResp <- []lnwire.ShortChannelID{}
}
// As both parties are already synced, the next message they send to
// each other should be the GossipTimestampRange message.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a GossipTimestampRange message.
_, ok := msg.(*lnwire.GossipTimestampRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a GossipTimestampRange message.
_, ok := msg.(*lnwire.GossipTimestampRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
}