diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 90b84590..ce51643f 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1009,6 +1009,20 @@ func (d *AuthenticatedGossiper) networkHandler() { // If we have new things to announce then broadcast // them to all our immediately connected peers. for _, msgChunk := range announcementBatch { + // We'll first attempt to filter out this new + // message for all peers that have active + // gossip syncers active. + d.syncerMtx.RLock() + for _, syncer := range d.peerSyncers { + syncer.FilterGossipMsgs(msgChunk) + } + d.syncerMtx.RUnlock() + + // With the syncers taken care of, we'll merge + // the sender map with the set of syncers, so + // we don't send out duplicate messages. + msgChunk.mergeSyncerMap(syncerPeers) + err := d.cfg.Broadcast( msgChunk.senders, msgChunk.msg, ) diff --git a/discovery/syncer.go b/discovery/syncer.go new file mode 100644 index 00000000..488e271f --- /dev/null +++ b/discovery/syncer.go @@ -0,0 +1,906 @@ +package discovery + +import ( + "fmt" + "math" + "sync" + "time" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/chaincfg/chainhash" +) + +// syncerState is an enum that represents the current state of the +// gossipSyncer. As the syncer is a state machine, we'll gate our actions +// based off of the current state and the next incoming message. +type syncerState uint8 + +const ( + // syncingChans is the default state of the gossipSyncer. We start in + // this state when a new peer first connects and we don't yet know if + // we're fully synchronized. + syncingChans syncerState = iota + + // waitingQueryRangeReply is the second main phase of the gossipSyncer. + // We enter this state after we send out our first QueryChannelRange + // reply. We'll stay in this state until the remote party sends us a + // ReplyShortChanIDsEnd message that indicates they've responded to our + // query entirely. After this state, we'll transition to + // waitingQueryChanReply after we send out requests for all the new + // chan ID's to us. + waitingQueryRangeReply + + // queryNewChannels is the third main phase of the gossipSyncer. In + // this phase we'll send out all of our QueryShortChanIDs messages in + // response to the new channels that we don't yet know about. + queryNewChannels + + // waitingQueryChanReply is the fourth main phase of the gossipSyncer. + // We enter this phase once we've sent off a query chink to the remote + // peer. We'll stay in this phase until we receive a + // ReplyShortChanIDsEnd message which indicates that the remote party + // has responded to all of our requests. + waitingQueryChanReply + + // chansSynced is the terminal stage of the gossipSyncer. Once we enter + // this phase, we'll send out our update horizon, which filters out the + // set of channel updates that we're interested in. In this state, + // we'll be able to accept any outgoing messages from the + // AuthenticatedGossiper, and decide if we should forward them to our + // target peer based on its update horizon. + chansSynced +) + +// String returns a human readable string describing the target syncerState. +func (s syncerState) String() string { + switch s { + case syncingChans: + return "syncingChans" + + case waitingQueryRangeReply: + return "waitingQueryRangeReply" + + case queryNewChannels: + return "queryNewChannels" + + case waitingQueryChanReply: + return "waitingQueryChanReply" + + case chansSynced: + return "chansSynced" + + default: + return "UNKNOWN STATE" + } +} + +var ( + // encodingTypeToChunkSize maps an encoding type, to the max number of + // short chan ID's using the encoding type that we can fit into a + // single message safely. + encodingTypeToChunkSize = map[lnwire.ShortChanIDEncoding]int32{ + lnwire.EncodingSortedPlain: 8000, + } +) + +const ( + // chanRangeQueryBuffer is the number of blocks back that we'll go when + // asking the remote peer for their any channels they know of beyond + // our highest known channel ID. + chanRangeQueryBuffer = 144 +) + +// ChannelGraphTimeSeries is an interface that provides time and block based +// querying into our view of the channel graph. New channels will have +// monotonically increasing block heights, and new channel updates will have +// increasing timestamps. Once we connect to a peer, we'll use the methods in +// this interface to determine if we're already in sync, or need to request +// some new information from them. +type ChannelGraphTimeSeries interface { + // HighestChanID should return the channel ID of the channel we know of + // that's furthest in the target chain. This channel will have a block + // height that's close to the current tip of the main chain as we + // know it. We'll use this to start our QueryChannelRange dance with + // the remote node. + HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) + + // UpdatesInHorizon returns all known channel and node updates with an + // update timestamp between the start time and end time. We'll use this + // to catch up a remote node to the set of channel updates that they + // may have missed out on within the target chain. + UpdatesInHorizon(chain chainhash.Hash, + startTime time.Time, endTime time.Time) ([]lnwire.Message, error) + + // FilterKnownChanIDs takes a target chain, and a set of channel ID's, + // and returns a filtered set of chan ID's. This filtered set of chan + // ID's represents the ID's that we don't know of which were in the + // passed superSet. + FilterKnownChanIDs(chain chainhash.Hash, + superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error) + + // FilterChannelRange returns the set of channels that we created + // between the start height and the end height. We'll use this to to a + // remote peer's QueryChannelRange message. + FilterChannelRange(chain chainhash.Hash, + startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error) + + // FetchChanAnns returns a full set of channel announcements as well as + // their updates that match the set of specified short channel ID's. + // We'll use this to reply to a QueryShortChanIDs message sent by a + // remote peer. The response will contain a unique set of + // ChannelAnnouncements, the latest ChannelUpdate for each of the + // announcements, and a unique set of NodeAnnouncements. + FetchChanAnns(chain chainhash.Hash, + shortChanIDs []lnwire.ShortChannelID) ([]lnwire.Message, error) + + // FetchChanUpdates returns the latest channel update messages for the + // specified short channel ID. If no channel updates are known for the + // channel, then an empty slice will be returned. + FetchChanUpdates(chain chainhash.Hash, + shortChanID lnwire.ShortChannelID) ([]*lnwire.ChannelUpdate, error) +} + +// gossipSyncerCfg is a struct that packages all the information a gossipSyncer +// needs to carry out its duties. +type gossipSyncerCfg struct { + // chainHash is the chain that this syncer is responsible for. + chainHash chainhash.Hash + + // syncChanUpdates is a bool that indicates if we should request a + // continual channel update stream or not. + syncChanUpdates bool + + // channelSeries is the primary interface that we'll use to generate + // our queries and respond to the queries of the remote peer. + channelSeries ChannelGraphTimeSeries + + // encodingType is the current encoding type we're aware of. Requests + // with different encoding types will be rejected. + encodingType lnwire.ShortChanIDEncoding + + // sendToPeer is a function closure that should send the set of + // targeted messages to the peer we've been assigned to sync the graph + // state from. + sendToPeer func(...lnwire.Message) error +} + +// gossipSyncer is a struct that handles synchronizing the channel graph state +// with a remote peer. The gossipSyncer implements a state machine that will +// progressively ensure we're synchronized with the channel state of the remote +// node. Once both nodes have been synchronized, we'll use an update filter to +// filter out which messages should be sent to a remote peer based on their +// update horizon. If the update horizon isn't specified, then we won't send +// them any channel updates at all. +// +// TODO(roasbeef): modify to only sync from one peer at a time? +type gossipSyncer struct { + // remoteUpdateHorizon is the update horizon of the remote peer. We'll + // use this to properly filter out any messages. + remoteUpdateHorizon *lnwire.GossipTimestampRange + + // localUpdateHorizon is our local update horizon, we'll use this to + // determine if we've already sent out our update. + localUpdateHorizon *lnwire.GossipTimestampRange + + // state is the current state of the gossipSyncer. + state syncerState + + // gossipMsgs is a channel that all messages from the target peer will + // be sent over. + gossipMsgs chan lnwire.Message + + // bufferedChanRangeReplies is used in the waitingQueryChanReply to + // buffer all the chunked response to our query. + bufferedChanRangeReplies []lnwire.ShortChannelID + + // newChansToQuery is used to pass the set of channels we should query + // for from the waitingQueryChanReply state to the queryNewChannels + // state. + newChansToQuery []lnwire.ShortChannelID + + // peerPub is the public key of the peer we're syncing with, serialized + // in compressed format. + peerPub [33]byte + + cfg gossipSyncerCfg + + sync.Mutex + + quit chan struct{} + wg sync.WaitGroup +} + +// newGossiperSyncer returns a new instance of the gossipSyncer populated using +// the passed config. +func newGossiperSyncer(cfg gossipSyncerCfg) *gossipSyncer { + return &gossipSyncer{ + cfg: cfg, + gossipMsgs: make(chan lnwire.Message, 100), + quit: make(chan struct{}), + } +} + +// Start starts the gossipSyncer and any goroutines that it needs to carry out +// its duties. +func (g *gossipSyncer) Start() error { + log.Debugf("Starting gossipSyncer(%x)", g.peerPub[:]) + + g.wg.Add(1) + go g.channelGraphSyncer() + + return nil +} + +// Stop signals the gossipSyncer for a graceful exit, then waits until it has +// exited. +func (g *gossipSyncer) Stop() error { + close(g.quit) + + g.wg.Wait() + + return nil +} + +// channelGraphSyncer is the main goroutine responsible for ensuring that we +// properly channel graph state with the remote peer, and also that we only +// send them messages which actually pass their defined update horizon. +func (g *gossipSyncer) channelGraphSyncer() { + defer g.wg.Done() + + // TODO(roasbeef): also add ability to force transition back to syncing + // chans + // * needed if we want to sync chan state very few blocks? + + for { + log.Debugf("gossipSyncer(%x): state=%v", g.peerPub[:], g.state) + + switch g.state { + // When we're in this state, we're trying to synchronize our + // view of the network with the remote peer. We'll kick off + // this sync by asking them for the set of channels they + // understand, as we'll as responding to any other queries by + // them. + case syncingChans: + // If we're in this state, then we'll send the remote + // peer our opening QueryChannelRange message. + queryRangeMsg, err := g.genChanRangeQuery() + if err != nil { + log.Errorf("unable to gen chan range "+ + "query: %v", err) + return + } + + err = g.cfg.sendToPeer(queryRangeMsg) + if err != nil { + log.Errorf("unable to send chan range "+ + "query: %v", err) + return + } + + // With the message sent successfully, we'll transition + // into the next state where we wait for their reply. + g.state = waitingQueryRangeReply + + // In this state, we've sent out our initial channel range + // query and are waiting for the final response from the remote + // peer before we perform a diff to see with channels they know + // of that we don't. + case waitingQueryRangeReply: + // We'll wait to either process a new message from the + // remote party, or exit due to the gossiper exiting, + // or us being signalled to do so. + select { + case msg := <-g.gossipMsgs: + // The remote peer is sending a response to our + // initial query, we'll collate this response, + // and see if it's the final one in the series. + // If so, we can then transition to querying + // for the new channels. + queryReply, ok := msg.(*lnwire.ReplyChannelRange) + if ok { + err := g.processChanRangeReply(queryReply) + if err != nil { + log.Errorf("unable to "+ + "process chan range "+ + "query: %v", err) + return + } + + continue + } + + // Otherwise, it's the remote peer performing a + // query, which we'll attempt to reply to. + err := g.replyPeerQueries(msg) + if err != nil { + log.Errorf("unable to reply to peer "+ + "query: %v", err) + } + + case <-g.quit: + return + } + + // We'll enter this state once we've discovered which channels + // the remote party knows of that we don't yet know of + // ourselves. + case queryNewChannels: + // First, we'll attempt to continue our channel + // synchronization by continuing to send off another + // query chunk. + done, err := g.synchronizeChanIDs() + if err != nil { + log.Errorf("unable to sync chan IDs: %v", err) + } + + // If this wasn't our last query, then we'll need to + // transition to our waiting state. + if !done { + g.state = waitingQueryChanReply + continue + } + + // If we're fully synchronized, then we can transition + // to our terminal state. + g.state = chansSynced + + // In this state, we've just sent off a new query for channels + // that we don't yet know of. We'll remain in this state until + // the remote party signals they've responded to our query in + // totality. + case waitingQueryChanReply: + // Once we've sent off our query, we'll wait for either + // an ending reply, or just another query from the + // remote peer. + select { + case msg := <-g.gossipMsgs: + // If this is the final reply to one of our + // queries, then we'll loop back into our query + // state to send of the remaining query chunks. + _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) + if ok { + g.state = queryNewChannels + continue + } + + // Otherwise, it's the remote peer performing a + // query, which we'll attempt to deploy to. + err := g.replyPeerQueries(msg) + if err != nil { + log.Errorf("unable to reply to peer "+ + "query: %v", err) + } + + case <-g.quit: + return + } + + // This is our final terminal state where we'll only reply to + // any further queries by the remote peer. + case chansSynced: + // If we haven't yet sent out our update horizon, and + // we want to receive real-time channel updates, we'll + // do so now. + if g.localUpdateHorizon == nil && g.cfg.syncChanUpdates { + // TODO(roasbeef): query DB for most recent + // update? + + // We'll give an hours room in our update + // horizon to ensure we don't miss any newer + // items. + updateHorizon := time.Now().Add(-time.Hour * 1) + log.Infof("gossipSyncer(%x): applying "+ + "gossipFilter(start=%v)", g.peerPub[:], + updateHorizon) + + g.localUpdateHorizon = &lnwire.GossipTimestampRange{ + ChainHash: g.cfg.chainHash, + FirstTimestamp: uint32(updateHorizon.Unix()), + TimestampRange: math.MaxUint32, + } + err := g.cfg.sendToPeer(g.localUpdateHorizon) + if err != nil { + log.Errorf("unable to send update "+ + "horizon: %v", err) + } + } + + // With our horizon set, we'll simply reply to any new + // message and exit if needed. + select { + case msg := <-g.gossipMsgs: + err := g.replyPeerQueries(msg) + if err != nil { + log.Errorf("unable to reply to peer "+ + "query: %v", err) + } + + case <-g.quit: + return + } + } + } +} + +// synchronizeChanIDs is called by the channelGraphSyncer when we need to query +// the remote peer for its known set of channel IDs within a particular block +// range. This method will be called continually until the entire range has +// been queried for with a response received. We'll chunk our requests as +// required to ensure they fit into a single message. We may re-renter this +// state in the case that chunking is required. +func (g *gossipSyncer) synchronizeChanIDs() (bool, error) { + // Ensure that we're able to handle queries using the specified chan + // ID. + chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType] + if !ok { + return false, fmt.Errorf("unknown encoding type: %v", + g.cfg.encodingType) + } + + // If we're in this state yet there are no more new channels to query + // for, then we'll transition to our final synced state and return true + // to signal that we're fully synchronized. + if len(g.newChansToQuery) == 0 { + log.Infof("gossipSyncer(%x): no more chans to query", + g.peerPub[:]) + return true, nil + } + + // Otherwise, we'll issue our next chunked query to receive replies + // for. + var queryChunk []lnwire.ShortChannelID + + // If the number of channels to query for is less than the chunk size, + // then we can issue a single query. + if int32(len(g.newChansToQuery)) < chunkSize { + queryChunk = g.newChansToQuery + g.newChansToQuery = nil + + } else { + // Otherwise, we'll need to only query for the next chunk. + // We'll slice into our query chunk, then slide down our main + // pointer down by the chunk size. + queryChunk = g.newChansToQuery[:chunkSize] + g.newChansToQuery = g.newChansToQuery[chunkSize:] + } + + log.Infof("gossipSyncer(%x): querying for %v new channels", + g.peerPub[:], len(queryChunk)) + + // With our chunk obtained, we'll send over our next query, then return + // false indicating that we're net yet fully synced. + err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{ + ChainHash: g.cfg.chainHash, + EncodingType: lnwire.EncodingSortedPlain, + ShortChanIDs: queryChunk, + }) + + return false, err +} + +// processChanRangeReply is called each time the gossipSyncer receives a new +// reply to the initial range query to discover new channels that it didn't +// previously know of. +func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error { + g.bufferedChanRangeReplies = append( + g.bufferedChanRangeReplies, msg.ShortChanIDs..., + ) + + log.Infof("gossipSyncer(%x): buffering chan range reply of size=%v", + g.peerPub[:], len(msg.ShortChanIDs)) + + // If this isn't the last response, then we can exit as we've already + // buffered the latest portion of the streaming reply. + if msg.Complete == 0 { + return nil + } + + log.Infof("gossipSyncer(%x): filtering through %v chans", g.peerPub[:], + len(g.bufferedChanRangeReplies)) + + // Otherwise, this is the final response, so we'll now check to see + // which channels they know of that we don't. + newChans, err := g.cfg.channelSeries.FilterKnownChanIDs( + g.cfg.chainHash, g.bufferedChanRangeReplies, + ) + if err != nil { + return fmt.Errorf("unable to filter chan ids: %v", err) + } + + // As we've received the entirety of the reply, we no longer need to + // hold on to the set of buffered replies, so we'll let that be garbage + // collected now. + g.bufferedChanRangeReplies = nil + + // If there aren't any channels that we don't know of, then we can + // switch straight to our terminal state. + if len(newChans) == 0 { + log.Infof("gossipSyncer(%x): remote peer has no new chans", + g.peerPub[:]) + + g.state = chansSynced + return nil + } + + // Otherwise, we'll set the set of channels that we need to query for + // the next state, and also transition our state. + g.newChansToQuery = newChans + g.state = queryNewChannels + + log.Infof("gossipSyncer(%x): starting query for %v new chans", + g.peerPub[:], len(newChans)) + + return nil +} + +// genChanRangeQuery generates the initial message we'll send to the remote +// party when we're kicking off the channel graph synchronization upon +// connection. +func (g *gossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) { + // First, we'll query our channel graph time series for its highest + // known channel ID. + newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash) + if err != nil { + return nil, err + } + + // Once we have the chan ID of the newest, we'll obtain the block + // height of the channel, then subtract our default horizon to ensure + // we don't miss any channels. By default, we go back 1 day from the + // newest channel. + var startHeight uint32 + switch { + case newestChan.BlockHeight <= chanRangeQueryBuffer: + fallthrough + case newestChan.BlockHeight == 0: + startHeight = 0 + + default: + startHeight = uint32(newestChan.BlockHeight - chanRangeQueryBuffer) + } + + log.Infof("gossipSyncer(%x): requesting new chans from height=%v "+ + "and %v blocks after", g.peerPub[:], startHeight, + math.MaxUint32-startHeight) + + // Finally, we'll craft the channel range query, using our starting + // height, then asking for all known channels to the foreseeable end of + // the main chain. + return &lnwire.QueryChannelRange{ + ChainHash: g.cfg.chainHash, + FirstBlockHeight: startHeight, + NumBlocks: math.MaxUint32 - startHeight, + }, nil +} + +// replyPeerQueries is called in response to any query by the remote peer. +// We'll examine our state and send back our best response. +func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error { + switch msg := msg.(type) { + + // In this state, we'll also handle any incoming channel range queries + // from the remote peer as they're trying to sync their state as well. + case *lnwire.QueryChannelRange: + return g.replyChanRangeQuery(msg) + + // If the remote peer skips straight to requesting new channels that + // they don't know of, then we'll ensure that we also handle this case. + case *lnwire.QueryShortChanIDs: + return g.replyShortChanIDs(msg) + + default: + return fmt.Errorf("unknown message: %T", msg) + } +} + +// replyChanRangeQuery will be dispatched in response to a channel range query +// by the remote node. We'll query the channel time series for channels that +// meet the channel range, then chunk our responses to the remote node. We also +// ensure that our final fragment carries the "complete" bit to indicate the +// end of our streaming response. +func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error { + // Using the current set encoding type, we'll determine what our chunk + // size should be. If we can't locate the chunk size, then we'll return + // an error as we can't proceed. + chunkSize, ok := encodingTypeToChunkSize[g.cfg.encodingType] + if !ok { + return fmt.Errorf("unknown encoding type: %v", g.cfg.encodingType) + } + + log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+ + "num_blocks=%v", g.peerPub[:], query.FirstBlockHeight, + query.NumBlocks) + + // Next, we'll consult the time series to obtain the set of known + // channel ID's that match their query. + startBlock := query.FirstBlockHeight + channelRange, err := g.cfg.channelSeries.FilterChannelRange( + query.ChainHash, startBlock, startBlock+query.NumBlocks, + ) + if err != nil { + return err + } + + // TODO(roasbeef): means can't send max uint above? + // * or make internal 64 + + numChannels := int32(len(channelRange)) + numChansSent := int32(0) + for { + // We'll send our this response in a streaming manner, + // chunk-by-chunk. We do this as there's a transport message + // size limit which we'll need to adhere to. + var channelChunk []lnwire.ShortChannelID + + // We know this is the final chunk, if the difference between + // the total number of channels, and the number of channels + // we've sent is less-than-or-equal to the chunk size. + isFinalChunk := (numChannels - numChansSent) <= chunkSize + + // If this is indeed the last chunk, then we'll send the + // remainder of the channels. + if isFinalChunk { + channelChunk = channelRange[numChansSent:] + + log.Infof("gossipSyncer(%x): sending final chan "+ + "range chunk, size=%v", g.peerPub[:], len(channelChunk)) + + } else { + // Otherwise, we'll only send off a fragment exactly + // sized to the proper chunk size. + channelChunk = channelRange[numChansSent : numChansSent+chunkSize] + + log.Infof("gossipSyncer(%x): sending range chunk of "+ + "size=%v", g.peerPub[:], len(channelChunk)) + } + + // With our chunk assembled, we'll now send to the remote peer + // the current chunk. + replyChunk := lnwire.ReplyChannelRange{ + QueryChannelRange: *query, + Complete: 0, + EncodingType: g.cfg.encodingType, + ShortChanIDs: channelChunk, + } + if isFinalChunk { + replyChunk.Complete = 1 + } + if err := g.cfg.sendToPeer(&replyChunk); err != nil { + return err + } + + // If this was the final chunk, then we'll exit now as our + // response is now complete. + if isFinalChunk { + return nil + } + + numChansSent += int32(len(channelChunk)) + } +} + +// replyShortChanIDs will be dispatched in response to a query by the remote +// node for information concerning a set of short channel ID's. Our response +// will be sent in a streaming chunked manner to ensure that we remain below +// the current transport level message size. +func (g *gossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error { + // Before responding, we'll check to ensure that the remote peer is + // querying for the same chain that we're on. If not, we'll send back a + // response with a complete value of zero to indicate we're on a + // different chain. + if g.cfg.chainHash != query.ChainHash { + log.Warnf("Remote peer requested QueryShortChanIDs for "+ + "chain=%v, we're on chain=%v", g.cfg.chainHash, + query.ChainHash) + + return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ + ChainHash: query.ChainHash, + Complete: 0, + }) + } + + log.Infof("gossipSyncer(%x): fetching chan anns for %v chans", + g.peerPub[:], len(query.ShortChanIDs)) + + // Now that we know we're on the same chain, we'll query the channel + // time series for the set of messages that we know of which satisfies + // the requirement of being a chan ann, chan update, or a node ann + // related to the set of queried channels. + replyMsgs, err := g.cfg.channelSeries.FetchChanAnns( + query.ChainHash, query.ShortChanIDs, + ) + if err != nil { + return err + } + + // If we didn't find any messages related to those channel ID's, then + // we'll send over a reply marking the end of our response, and exit + // early. + if len(replyMsgs) == 0 { + return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ + ChainHash: query.ChainHash, + Complete: 1, + }) + } + + // Otherwise, we'll send over our set of messages responding to the + // query, with the ending message appended to it. + replyMsgs = append(replyMsgs, &lnwire.ReplyShortChanIDsEnd{ + ChainHash: query.ChainHash, + Complete: 1, + }) + return g.cfg.sendToPeer(replyMsgs...) +} + +// ApplyGossipFilter applies a gossiper filter sent by the remote node to the +// state machine. Once applied, we'll ensure that we don't forward any messages +// to the peer that aren't within the time range of the filter. +func (g *gossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error { + g.Lock() + + g.remoteUpdateHorizon = filter + + startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0) + endTime := startTime.Add( + time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second, + ) + + g.Unlock() + + // Now that the remote peer has applied their filter, we'll query the + // database for all the messages that are beyond this filter. + newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon( + g.cfg.chainHash, startTime, endTime, + ) + if err != nil { + return err + } + + log.Infof("gossipSyncer(%x): applying new update horizon: start=%v, "+ + "end=%v, backlog_size=%v", g.peerPub[:], startTime, endTime, + len(newUpdatestoSend)) + + // If we don't have any to send, then we can return early. + if len(newUpdatestoSend) == 0 { + return nil + } + + // We'll conclude by launching a goroutine to send out any updates. + g.wg.Add(1) + go func() { + defer g.wg.Done() + + if err := g.cfg.sendToPeer(newUpdatestoSend...); err != nil { + log.Errorf("unable to send messages for peer catch "+ + "up: %v", err) + } + }() + + return nil +} + +// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer +// iff the message is within the bounds of their set gossip filter. If the peer +// doesn't have a gossip filter set, then no messages will be forwarded. +func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) { + // If the peer doesn't have an update horizon set, then we won't send + // it any new update messages. + if g.remoteUpdateHorizon == nil { + return + } + + // TODO(roasbeef): need to ensure that peer still online...send msg to + // gossiper on peer termination to signal peer disconnect? + + var err error + + // Before we filter out the messages, we'll construct an index over the + // set of channel announcements and channel updates. This will allow us + // to quickly check if we should forward a chan ann, based on the known + // channel updates for a channel. + chanUpdateIndex := make(map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate) + for _, msg := range msgs { + chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate) + if !ok { + continue + } + + chanUpdateIndex[chanUpdate.ShortChannelID] = append( + chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate, + ) + } + + // We'll construct a helper function that we'll us below to determine + // if a given messages passes the gossip msg filter. + g.Lock() + startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0) + endTime := startTime.Add( + time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second, + ) + g.Unlock() + + passesFilter := func(timeStamp uint32) bool { + t := time.Unix(int64(timeStamp), 0) + return t.After(startTime) && t.Before(endTime) + } + + msgsToSend := make([]lnwire.Message, 0, len(msgs)) + for _, msg := range msgs { + // If the target peer is the peer that sent us this message, + // then we'll exit early as we don't need to filter this + // message. + if _, ok := msg.senders[g.peerPub]; ok { + continue + } + + switch msg := msg.msg.(type) { + + // For each channel announcement message, we'll only send this + // message if the channel updates for the channel are between + // our time range. + case *lnwire.ChannelAnnouncement: + // First, we'll check if the channel updates are in + // this message batch. + chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID] + if !ok { + // If not, we'll attempt to query the database + // to see if we know of the updates. + chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates( + g.cfg.chainHash, msg.ShortChannelID, + ) + if err != nil { + log.Warnf("no channel updates found for "+ + "short_chan_id=%v", + msg.ShortChannelID) + msgsToSend = append(msgsToSend, msg) + continue + } + } + + for _, chanUpdate := range chanUpdates { + if passesFilter(chanUpdate.Timestamp) { + msgsToSend = append(msgsToSend, msg) + break + } + } + + if len(chanUpdates) == 0 { + msgsToSend = append(msgsToSend, msg) + } + + // For each channel update, we'll only send if it the timestamp + // is between our time range. + case *lnwire.ChannelUpdate: + if passesFilter(msg.Timestamp) { + msgsToSend = append(msgsToSend, msg) + } + + // Similarly, we only send node announcements if the update + // timestamp ifs between our set gossip filter time range. + case *lnwire.NodeAnnouncement: + if passesFilter(msg.Timestamp) { + msgsToSend = append(msgsToSend, msg) + } + } + } + + log.Tracef("gossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v", + g.peerPub[:], len(msgs), len(msgsToSend)) + + if len(msgsToSend) == 0 { + return + } + + g.cfg.sendToPeer(msgsToSend...) +} + +// ProcessQueryMsg is used by outside callers to pass new channel time series +// queries to the internal processing goroutine. +func (g *gossipSyncer) ProcessQueryMsg(msg lnwire.Message) { + select { + case g.gossipMsgs <- msg: + return + case <-g.quit: + return + } +} diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go new file mode 100644 index 00000000..9bd1c22b --- /dev/null +++ b/discovery/syncer_test.go @@ -0,0 +1,1577 @@ +package discovery + +import ( + "fmt" + "math" + "reflect" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/chaincfg/chainhash" +) + +type horizonQuery struct { + chain chainhash.Hash + start time.Time + end time.Time +} +type filterRangeReq struct { + startHeight, endHeight uint32 +} + +type mockChannelGraphTimeSeries struct { + highestID lnwire.ShortChannelID + + horizonReq chan horizonQuery + horizonResp chan []lnwire.Message + + filterReq chan []lnwire.ShortChannelID + filterResp chan []lnwire.ShortChannelID + + filterRangeReqs chan filterRangeReq + filterRangeResp chan []lnwire.ShortChannelID + + annReq chan []lnwire.ShortChannelID + annResp chan []lnwire.Message + + updateReq chan lnwire.ShortChannelID + updateResp chan []*lnwire.ChannelUpdate +} + +func newMockChannelGraphTimeSeries(hID lnwire.ShortChannelID) *mockChannelGraphTimeSeries { + return &mockChannelGraphTimeSeries{ + highestID: hID, + + horizonReq: make(chan horizonQuery, 1), + horizonResp: make(chan []lnwire.Message, 1), + + filterReq: make(chan []lnwire.ShortChannelID, 1), + filterResp: make(chan []lnwire.ShortChannelID, 1), + + filterRangeReqs: make(chan filterRangeReq, 1), + filterRangeResp: make(chan []lnwire.ShortChannelID, 1), + + annReq: make(chan []lnwire.ShortChannelID, 1), + annResp: make(chan []lnwire.Message, 1), + + updateReq: make(chan lnwire.ShortChannelID, 1), + updateResp: make(chan []*lnwire.ChannelUpdate, 1), + } +} + +func (m *mockChannelGraphTimeSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) { + return &m.highestID, nil +} +func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash, + startTime time.Time, endTime time.Time) ([]lnwire.Message, error) { + + m.horizonReq <- horizonQuery{ + chain, startTime, endTime, + } + + return <-m.horizonResp, nil +} +func (m *mockChannelGraphTimeSeries) FilterKnownChanIDs(chain chainhash.Hash, + superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error) { + + m.filterReq <- superSet + + return <-m.filterResp, nil +} +func (m *mockChannelGraphTimeSeries) FilterChannelRange(chain chainhash.Hash, + startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error) { + + m.filterRangeReqs <- filterRangeReq{startHeight, endHeight} + + return <-m.filterRangeResp, nil +} +func (m *mockChannelGraphTimeSeries) FetchChanAnns(chain chainhash.Hash, + shortChanIDs []lnwire.ShortChannelID) ([]lnwire.Message, error) { + + m.annReq <- shortChanIDs + + return <-m.annResp, nil +} +func (m *mockChannelGraphTimeSeries) FetchChanUpdates(chain chainhash.Hash, + shortChanID lnwire.ShortChannelID) ([]*lnwire.ChannelUpdate, error) { + + m.updateReq <- shortChanID + + return <-m.updateResp, nil +} + +var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil) + +func newTestSyncer(hID lnwire.ShortChannelID) (chan []lnwire.Message, *gossipSyncer, *mockChannelGraphTimeSeries) { + + msgChan := make(chan []lnwire.Message, 20) + cfg := gossipSyncerCfg{ + syncChanUpdates: true, + channelSeries: newMockChannelGraphTimeSeries(hID), + encodingType: lnwire.EncodingSortedPlain, + sendToPeer: func(msgs ...lnwire.Message) error { + msgChan <- msgs + return nil + }, + } + syncer := newGossiperSyncer(cfg) + + return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries) +} + +// TestGossipSyncerFilterGossipMsgsNoHorizon tests that if the remote peer +// doesn't have a horizon set, then we won't send any incoming messages to it. +func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + msgChan, syncer, _ := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + // With the syncer created, we'll create a set of messages to filter + // through the gossiper to the target peer. + msgs := []msgWithSenders{ + { + msg: &lnwire.NodeAnnouncement{Timestamp: uint32(time.Now().Unix())}, + }, + { + msg: &lnwire.NodeAnnouncement{Timestamp: uint32(time.Now().Unix())}, + }, + } + + // We'll then attempt to filter the set of messages through the target + // peer. + syncer.FilterGossipMsgs(msgs...) + + // As the remote peer doesn't yet have a gossip timestamp set, we + // shouldn't receive any outbound messages. + select { + case msg := <-msgChan: + t.Fatalf("received message but shouldn't have: %v", + spew.Sdump(msg)) + + case <-time.After(time.Millisecond * 10): + } +} + +func unixStamp(a int64) uint32 { + t := time.Unix(a, 0) + return uint32(t.Unix()) +} + +// TestGossipSyncerFilterGossipMsgsAll tests that we're able to properly filter +// out a set of incoming messages based on the set remote update horizon for a +// peer. We tests all messages type, and all time straddling. We'll also send a +// channel ann that already has a channel update on disk. +func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + msgChan, syncer, chanSeries := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + // We'll create then apply a remote horizon for the target peer with a + // set of manually selected timestamps. + remoteHorizon := &lnwire.GossipTimestampRange{ + FirstTimestamp: unixStamp(25000), + TimestampRange: uint32(1000), + } + syncer.remoteUpdateHorizon = remoteHorizon + + // With the syncer created, we'll create a set of messages to filter + // through the gossiper to the target peer. Our message will consist of + // one node announcement above the horizon, one below. Additionally, + // we'll include a chan ann with an update below the horizon, one + // with an update timestmap above the horizon, and one without any + // channel updates at all. + msgs := []msgWithSenders{ + { + // Node ann above horizon. + msg: &lnwire.NodeAnnouncement{Timestamp: unixStamp(25001)}, + }, + { + // Node ann below horizon. + msg: &lnwire.NodeAnnouncement{Timestamp: unixStamp(5)}, + }, + { + // Node ann above horizon. + msg: &lnwire.NodeAnnouncement{Timestamp: unixStamp(999999)}, + }, + { + // Ann tuple below horizon. + msg: &lnwire.ChannelAnnouncement{ + ShortChannelID: lnwire.NewShortChanIDFromInt(10), + }, + }, + { + msg: &lnwire.ChannelUpdate{ + ShortChannelID: lnwire.NewShortChanIDFromInt(10), + Timestamp: unixStamp(5), + }, + }, + { + // Ann tuple above horizon. + msg: &lnwire.ChannelAnnouncement{ + ShortChannelID: lnwire.NewShortChanIDFromInt(15), + }, + }, + { + msg: &lnwire.ChannelUpdate{ + ShortChannelID: lnwire.NewShortChanIDFromInt(15), + Timestamp: unixStamp(25002), + }, + }, + { + // Ann tuple beyond horizon. + msg: &lnwire.ChannelAnnouncement{ + ShortChannelID: lnwire.NewShortChanIDFromInt(20), + }, + }, + { + msg: &lnwire.ChannelUpdate{ + ShortChannelID: lnwire.NewShortChanIDFromInt(20), + Timestamp: unixStamp(999999), + }, + }, + { + // Ann w/o an update at all, the update in the DB will + // be below the horizon. + msg: &lnwire.ChannelAnnouncement{ + ShortChannelID: lnwire.NewShortChanIDFromInt(25), + }, + }, + } + + // Before we send off the query, we'll ensure we send the missing + // channel update for that final ann. It will be below the horizon, so + // shouldn't be sent anyway. + go func() { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no query recvd") + + case query := <-chanSeries.updateReq: + + // It should be asking for the chan updates of short + // chan ID 25. + expectedID := lnwire.NewShortChanIDFromInt(25) + if expectedID != query { + t.Fatalf("wrong query id: expected %v, got %v", + expectedID, query) + } + + // If so, then we'll send back the missing update. + chanSeries.updateResp <- []*lnwire.ChannelUpdate{ + { + ShortChannelID: lnwire.NewShortChanIDFromInt(25), + Timestamp: unixStamp(5), + }, + } + } + }() + + // We'll then instruct the gossiper to filter this set of messages. + syncer.FilterGossipMsgs(msgs...) + + // Out of all the messages we sent in, we should only get 2 of them + // back. + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") + + case msgs := <-msgChan: + if len(msgs) != 3 { + t.Fatalf("expected 3 messages instead got %v "+ + "messages: %v", len(msgs), spew.Sdump(msgs)) + } + } +} + +// TestGossipSyncerApplyGossipFilter tests that once a gossip filter is applied +// for the remote peer, then we send the peer all known messages which are +// within their desired time horizon. +func TestGossipSyncerApplyGossipFilter(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + msgChan, syncer, chanSeries := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + // We'll apply this gossip horizon for the remote peer. + remoteHorizon := &lnwire.GossipTimestampRange{ + FirstTimestamp: unixStamp(25000), + TimestampRange: uint32(1000), + } + + // Before we apply the horizon, we'll dispatch a response to the query + // that the syncer will issue. + go func() { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no query recvd") + + case query := <-chanSeries.horizonReq: + // The syncer should have translated the time range + // into the proper star time. + if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) { + t.Fatalf("wrong query stamp: expected %v, got %v", + remoteHorizon.FirstTimestamp, query.start) + } + + // For this first response, we'll send back an empty + // set of messages. As result, we shouldn't send any + // messages. + chanSeries.horizonResp <- []lnwire.Message{} + } + }() + + // We'll now attempt to apply the gossip filter for the remote peer. + err := syncer.ApplyGossipFilter(remoteHorizon) + if err != nil { + t.Fatalf("unable to apply filter: %v", err) + } + + // There should be no messages in the message queue as we didn't send + // the syncer and messages within the horizon. + select { + case msgs := <-msgChan: + t.Fatalf("expected no msgs, instead got %v", spew.Sdump(msgs)) + default: + } + + // If we repeat the process, but give the syncer a set of valid + // messages, then these should be sent to the remote peer. + go func() { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no query recvd") + + case query := <-chanSeries.horizonReq: + // The syncer should have translated the time range + // into the proper star time. + if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) { + t.Fatalf("wrong query stamp: expected %v, got %v", + remoteHorizon.FirstTimestamp, query.start) + } + + // For this first response, we'll send back a proper + // set of messages that should be echoed back. + chanSeries.horizonResp <- []lnwire.Message{ + &lnwire.ChannelUpdate{ + ShortChannelID: lnwire.NewShortChanIDFromInt(25), + Timestamp: unixStamp(5), + }, + } + } + }() + err = syncer.ApplyGossipFilter(remoteHorizon) + if err != nil { + t.Fatalf("unable to apply filter: %v", err) + } + + // We should get back the exact same message. + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") + + case msgs := <-msgChan: + if len(msgs) != 1 { + t.Fatalf("wrong messages: expected %v, got %v", + 1, len(msgs)) + } + } +} + +// TestGossipSyncerReplyShortChanIDsWrongChainHash tests that if we get a chan +// ID query for the wrong chain, then we send back only a short ID end with +// complete=0. +func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + msgChan, syncer, _ := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + // We'll now ask the syncer to reply to a chan ID query, but for a + // chain that it isn't aware of. + err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{ + ChainHash: *chaincfg.SimNetParams.GenesisHash, + }) + if err != nil { + t.Fatalf("unable to process short chan ID's: %v", err) + } + + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") + case msgs := <-msgChan: + + // We should get back exactly one message, that's a + // ReplyShortChanIDsEnd with a matching chain hash, and a + // complete value of zero. + if len(msgs) != 1 { + t.Fatalf("wrong messages: expected %v, got %v", + 1, len(msgs)) + } + + msg, ok := msgs[0].(*lnwire.ReplyShortChanIDsEnd) + if !ok { + t.Fatalf("expected lnwire.ReplyShortChanIDsEnd "+ + "instead got %T", msg) + } + + if msg.ChainHash != *chaincfg.SimNetParams.GenesisHash { + t.Fatalf("wrong chain hash: expected %v, got %v", + msg.ChainHash, chaincfg.SimNetParams.GenesisHash) + } + if msg.Complete != 0 { + t.Fatalf("complete set incorrectly") + } + } +} + +// TestGossipSyncerReplyShortChanIDs tests that in the case of a known chain +// hash for a QueryShortChanIDs, we'll return the set of matching +// announcements, as well as an ending ReplyShortChanIDsEnd message. +func TestGossipSyncerReplyShortChanIDs(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + msgChan, syncer, chanSeries := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + queryChanIDs := []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(1), + lnwire.NewShortChanIDFromInt(2), + lnwire.NewShortChanIDFromInt(3), + } + + queryReply := []lnwire.Message{ + &lnwire.ChannelAnnouncement{ + ShortChannelID: lnwire.NewShortChanIDFromInt(20), + }, + &lnwire.ChannelUpdate{ + ShortChannelID: lnwire.NewShortChanIDFromInt(20), + Timestamp: unixStamp(999999), + }, + &lnwire.NodeAnnouncement{Timestamp: unixStamp(25001)}, + } + + // We'll then craft a reply to the upcoming query for all the matching + // channel announcements for a particular set of short channel ID's. + go func() { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no query recvd") + + case chanIDs := <-chanSeries.annReq: + // The set of chan ID's should match exactly. + if !reflect.DeepEqual(chanIDs, queryChanIDs) { + t.Fatalf("wrong chan IDs: expected %v, got %v", + queryChanIDs, chanIDs) + } + + // If they do, then we'll send back a response with + // some canned messages. + chanSeries.annResp <- queryReply + } + }() + + // With our set up above complete, we'll now attempt to obtain a reply + // from the channel syncer for our target chan ID query. + err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{ + ShortChanIDs: queryChanIDs, + }) + if err != nil { + t.Fatalf("unable to query for chan IDs: %v", err) + } + + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") + + // We should get back exactly 4 messages. The first 3 are the same + // messages we sent above, and the query end message. + case msgs := <-msgChan: + if len(msgs) != 4 { + t.Fatalf("wrong messages: expected %v, got %v", + 4, len(msgs)) + } + + if !reflect.DeepEqual(queryReply, msgs[:3]) { + t.Fatalf("wrong set of messages: expected %v, got %v", + spew.Sdump(queryReply), spew.Sdump(msgs[:3])) + } + + finalMsg, ok := msgs[3].(*lnwire.ReplyShortChanIDsEnd) + if !ok { + t.Fatalf("expected lnwire.ReplyShortChanIDsEnd "+ + "instead got %T", msgs[3]) + } + if finalMsg.Complete != 1 { + t.Fatalf("complete wasn't set") + } + } +} + +// TestGossipSyncerReplyChanRangeQueryUnknownEncodingType tests that if we +// receive a QueryChannelRange message with an unknown encoding type, then we +// return an error. +func TestGossipSyncerReplyChanRangeQueryUnknownEncodingType(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + _, syncer, _ := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + // If we modify the syncer to expect an encoding type that is currently + // unknown, then it should fail to process the message and return an + // error. + syncer.cfg.encodingType = 99 + err := syncer.replyChanRangeQuery(&lnwire.QueryChannelRange{}) + if err == nil { + t.Fatalf("expected message fail") + } +} + +// TestGossipSyncerReplyChanRangeQuery tests that if we receive a +// QueryChannelRange message, then we'll properly send back a chunked reply to +// the remote peer. +func TestGossipSyncerReplyChanRangeQuery(t *testing.T) { + t.Parallel() + + // First, we'll modify the main map to provide e a smaller chunk size + // so we can easily test all the edge cases. + encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = 2 + + // We'll now create our test gossip syncer that will shortly respond to + // our canned query. + msgChan, syncer, chanSeries := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + // Next, we'll craft a query to ask for all the new chan ID's after + // block 100. + query := &lnwire.QueryChannelRange{ + FirstBlockHeight: 100, + NumBlocks: 50, + } + + // We'll then launch a goroutine to reply to the query with a set of 5 + // responses. This will ensure we get two full chunks, and one partial + // chunk. + resp := []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(1), + lnwire.NewShortChanIDFromInt(2), + lnwire.NewShortChanIDFromInt(3), + lnwire.NewShortChanIDFromInt(4), + lnwire.NewShortChanIDFromInt(5), + } + go func() { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no query recvd") + + case filterReq := <-chanSeries.filterRangeReqs: + // We should be querying for block 100 to 150. + if filterReq.startHeight != 100 && filterReq.endHeight != 150 { + t.Fatalf("wrong height range: %v", spew.Sdump(filterReq)) + } + + // If the proper request was sent, then we'll respond + // with our set of short channel ID's. + chanSeries.filterRangeResp <- resp + } + }() + + // With our goroutine active, we'll now issue the query. + if err := syncer.replyChanRangeQuery(query); err != nil { + t.Fatalf("unable to issue query: %v", err) + } + + // At this point, we'll now wait for the syncer to send the chunked + // reply. We should get three sets of messages as two of them should be + // full, while the other is the final fragment. + const numExpectedChunks = 3 + respMsgs := make([]lnwire.ShortChannelID, 0, 5) + for i := 0; i < 3; i++ { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") + + case msg := <-msgChan: + resp := msg[0] + rangeResp, ok := resp.(*lnwire.ReplyChannelRange) + if !ok { + t.Fatalf("expected ReplyChannelRange instead got %T", msg) + } + + // If this is not the last chunk, then Complete should + // be set to zero. Otherwise, it should be one. + switch { + case i < 2 && rangeResp.Complete != 0: + t.Fatalf("non-final chunk should have "+ + "Complete=0: %v", spew.Sdump(rangeResp)) + + case i == 2 && rangeResp.Complete != 1: + t.Fatalf("final chunk should have "+ + "Complete=1: %v", spew.Sdump(rangeResp)) + } + + respMsgs = append(respMsgs, rangeResp.ShortChanIDs...) + } + } + + // We should get back exactly 5 short chan ID's, and they should match + // exactly the ID's we sent as a reply. + if len(respMsgs) != len(resp) { + t.Fatalf("expected %v chan ID's, instead got %v", + len(resp), spew.Sdump(respMsgs)) + } + if !reflect.DeepEqual(resp, respMsgs) { + t.Fatalf("mismatched response: expected %v, got %v", + spew.Sdump(resp), spew.Sdump(respMsgs)) + } +} + +// TestGossipSyncerReplyChanRangeQueryNoNewChans tests that if we issue a reply +// for a channel range query, and we don't have any new channels, then we send +// back a single response that signals completion. +func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) { + t.Parallel() + + // We'll now create our test gossip syncer that will shortly respond to + // our canned query. + msgChan, syncer, chanSeries := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + // Next, we'll craft a query to ask for all the new chan ID's after + // block 100. + query := &lnwire.QueryChannelRange{ + FirstBlockHeight: 100, + NumBlocks: 50, + } + + // We'll then launch a goroutine to reply to the query no new channels. + resp := []lnwire.ShortChannelID{} + go func() { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no query recvd") + + case filterReq := <-chanSeries.filterRangeReqs: + // We should be querying for block 100 to 150. + if filterReq.startHeight != 100 && filterReq.endHeight != 150 { + t.Fatalf("wrong height range: %v", + spew.Sdump(filterReq)) + } + + // If the proper request was sent, then we'll respond + // with our blank set of short chan ID's. + chanSeries.filterRangeResp <- resp + } + }() + + // With our goroutine active, we'll now issue the query. + if err := syncer.replyChanRangeQuery(query); err != nil { + t.Fatalf("unable to issue query: %v", err) + } + + // We should get back exactly one message, and the message should + // indicate that this is the final in the series. + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") + + case msg := <-msgChan: + resp := msg[0] + rangeResp, ok := resp.(*lnwire.ReplyChannelRange) + if !ok { + t.Fatalf("expected ReplyChannelRange instead got %T", msg) + } + + if len(rangeResp.ShortChanIDs) != 0 { + t.Fatalf("expected no chan ID's, instead "+ + "got: %v", spew.Sdump(rangeResp.ShortChanIDs)) + } + if rangeResp.Complete != 1 { + t.Fatalf("complete wasn't set") + } + } +} + +// TestGossipSyncerGenChanRangeQuery tests that given the current best known +// channel ID, we properly generate an correct initial channel range response. +func TestGossipSyncerGenChanRangeQuery(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + const startingHeight = 200 + _, syncer, _ := newTestSyncer( + lnwire.ShortChannelID{ + BlockHeight: startingHeight, + }, + ) + + // If we now ask the syncer to generate an initial range query, it + // should return a start height that's back chanRangeQueryBuffer + // blocks. + rangeQuery, err := syncer.genChanRangeQuery() + if err != nil { + t.Fatalf("unable to resp: %v", err) + } + + firstHeight := uint32(startingHeight - chanRangeQueryBuffer) + if rangeQuery.FirstBlockHeight != firstHeight { + t.Fatalf("incorrect chan range query: expected %v, %v", + rangeQuery.FirstBlockHeight, + startingHeight-chanRangeQueryBuffer) + } + if rangeQuery.NumBlocks != math.MaxUint32-firstHeight { + t.Fatalf("wrong num blocks: expected %v, got %v", + rangeQuery.NumBlocks, math.MaxUint32-firstHeight) + } +} + +// TestGossipSyncerProcessChanRangeReply tests that we'll properly buffer +// replied channel replies until we have the complete version. If no new +// channels were discovered, then we should go directly to the chanSsSynced +// state. Otherwise, we should go to the queryNewChannels states. +func TestGossipSyncerProcessChanRangeReply(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + _, syncer, chanSeries := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + startingState := syncer.state + + replies := []*lnwire.ReplyChannelRange{ + { + ShortChanIDs: []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(10), + }, + }, + { + ShortChanIDs: []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(11), + }, + }, + { + Complete: 1, + ShortChanIDs: []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(12), + }, + }, + } + + // We'll begin by sending the syncer a set of non-complete channel + // range replies. + if err := syncer.processChanRangeReply(replies[0]); err != nil { + t.Fatalf("unable to process reply: %v", err) + } + if err := syncer.processChanRangeReply(replies[1]); err != nil { + t.Fatalf("unable to process reply: %v", err) + } + + // At this point, we should still be in our starting state as the query + // hasn't finished. + if syncer.state != startingState { + t.Fatalf("state should not have transitioned") + } + + expectedReq := []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(10), + lnwire.NewShortChanIDFromInt(11), + lnwire.NewShortChanIDFromInt(12), + } + + // As we're about to send the final response, we'll launch a goroutine + // to respond back with a filtered set of chan ID's. + go func() { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no query recvd") + + case req := <-chanSeries.filterReq: + // We should get a request for the entire range of short + // chan ID's. + if !reflect.DeepEqual(expectedReq, req) { + fmt.Printf("wrong request: expected %v, got %v\n", + expectedReq, req) + + t.Fatalf("wrong request: expected %v, got %v", + expectedReq, req) + } + + // We'll send back only the last two to simulate filtering. + chanSeries.filterResp <- expectedReq[1:] + } + }() + + // If we send the final message, then we should transition to + // queryNewChannels as we've sent a non-empty set of new channels. + if err := syncer.processChanRangeReply(replies[2]); err != nil { + t.Fatalf("unable to process reply: %v", err) + } + + if syncer.state != queryNewChannels { + t.Fatalf("wrong state: expected %v instead got %v", + queryNewChannels, syncer.state) + } + if !reflect.DeepEqual(syncer.newChansToQuery, expectedReq[1:]) { + t.Fatalf("wrong set of chans to query: expected %v, got %v", + syncer.newChansToQuery, expectedReq[1:]) + } + + // We'll repeat our final reply again, but this time we won't send any + // new channels. As a result, we should transition over to the + // chansSynced state. + go func() { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no query recvd") + + case req := <-chanSeries.filterReq: + // We should get a request for the entire range of short + // chan ID's. + if !reflect.DeepEqual(expectedReq[2], req[0]) { + t.Fatalf("wrong request: expected %v, got %v", + expectedReq[2], req[0]) + } + + // We'll send back only the last two to simulate filtering. + chanSeries.filterResp <- []lnwire.ShortChannelID{} + } + }() + if err := syncer.processChanRangeReply(replies[2]); err != nil { + t.Fatalf("unable to process reply: %v", err) + } + + if syncer.state != chansSynced { + t.Fatalf("wrong state: expected %v instead got %v", + chansSynced, syncer.state) + } +} + +// TestGossipSyncerSynchronizeChanIDsUnknownEncodingType tests that if we +// attempt to query for a set of new channels using an unknown encoding type, +// then we'll get an error. +func TestGossipSyncerSynchronizeChanIDsUnknownEncodingType(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + _, syncer, _ := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + // If we modify the syncer to expect an encoding type that is currently + // unknown, then it should fail to process the message and return an + // error. + syncer.cfg.encodingType = 101 + _, err := syncer.synchronizeChanIDs() + if err == nil { + t.Fatalf("expected message fail") + } +} + +// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of +// the short chan ID's which were unknown to us. We'll ensure that we request +// chunk by chunk, and after the last chunk, we return true indicating that we +// can transition to the synced stage. +func TestGossipSyncerSynchronizeChanIDs(t *testing.T) { + t.Parallel() + + // First, we'll create a gossipSyncer instance with a canned sendToPeer + // message to allow us to intercept their potential sends. + msgChan, syncer, _ := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), + ) + + // Next, we'll construct a set of chan ID's that we should query for, + // and set them as newChansToQuery within the state machine. + newChanIDs := []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(1), + lnwire.NewShortChanIDFromInt(2), + lnwire.NewShortChanIDFromInt(3), + lnwire.NewShortChanIDFromInt(4), + lnwire.NewShortChanIDFromInt(5), + } + syncer.newChansToQuery = newChanIDs + + // We'll modify the chunk size to be a smaller value, so we can ensure + // our chunk parsing works properly. With this value we should get 3 + // queries: two full chunks, and one lingering chunk. + chunkSize := int32(2) + encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize + + for i := int32(0); i < chunkSize*2; i += 2 { + // With our set up complete, we'll request a sync of chan ID's. + done, err := syncer.synchronizeChanIDs() + if err != nil { + t.Fatalf("unable to sync chan IDs: %v", err) + } + + // At this point, we shouldn't yet be done as only 2 items + // should have been queried for. + if done { + t.Fatalf("syncer shown as done, but shouldn't be!") + } + + // We should've received a new message from the syncer. + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") + + case msg := <-msgChan: + queryMsg, ok := msg[0].(*lnwire.QueryShortChanIDs) + if !ok { + t.Fatalf("expected QueryShortChanIDs instead "+ + "got %T", msg) + } + + // The query message should have queried for the first + // two chan ID's, and nothing more. + if !reflect.DeepEqual(queryMsg.ShortChanIDs, newChanIDs[i:i+chunkSize]) { + t.Fatalf("wrong query: expected %v, got %v", + spew.Sdump(newChanIDs[i:i+chunkSize]), + queryMsg.ShortChanIDs) + } + } + + // With the proper message sent out, the internal state of the + // syncer should reflect that it still has more channels to + // query for. + if !reflect.DeepEqual(syncer.newChansToQuery, newChanIDs[i+chunkSize:]) { + t.Fatalf("incorrect chans to query for: expected %v, got %v", + spew.Sdump(newChanIDs[i+chunkSize:]), + syncer.newChansToQuery) + } + } + + // At this point, only one more channel should be lingering for the + // syncer to query for. + if !reflect.DeepEqual(newChanIDs[chunkSize*2:], syncer.newChansToQuery) { + t.Fatalf("wrong chans to query: expected %v, got %v", + newChanIDs[chunkSize*2:], syncer.newChansToQuery) + } + + // If we issue another query, the syncer should tell us that it's done. + done, err := syncer.synchronizeChanIDs() + if err != nil { + t.Fatalf("unable to sync chan IDs: %v", err) + } + if done { + t.Fatalf("syncer should be finished!") + } + + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") + + case msg := <-msgChan: + queryMsg, ok := msg[0].(*lnwire.QueryShortChanIDs) + if !ok { + t.Fatalf("expected QueryShortChanIDs instead "+ + "got %T", msg) + } + + // The query issued should simply be the last item. + if !reflect.DeepEqual(queryMsg.ShortChanIDs, newChanIDs[chunkSize*2:]) { + t.Fatalf("wrong query: expected %v, got %v", + spew.Sdump(newChanIDs[chunkSize*2:]), + queryMsg.ShortChanIDs) + } + + // There also should be no more channels to query. + if len(syncer.newChansToQuery) != 0 { + t.Fatalf("should be no more chans to query for, "+ + "instead have %v", + spew.Sdump(syncer.newChansToQuery)) + } + } +} + +// TestGossipSyncerRoutineSync tests all state transitions of the main syncer +// goroutine. This ensures that given an encounter with a peer that has a set +// of distinct channels, then we'll properly synchronize our channel state with +// them. +func TestGossipSyncerRoutineSync(t *testing.T) { + t.Parallel() + + // First, we'll create two gossipSyncer instances with a canned + // sendToPeer message to allow us to intercept their potential sends. + startHeight := lnwire.ShortChannelID{ + BlockHeight: 1144, + } + msgChan1, syncer1, chanSeries1 := newTestSyncer( + startHeight, + ) + syncer1.Start() + defer syncer1.Stop() + + msgChan2, syncer2, chanSeries2 := newTestSyncer( + startHeight, + ) + syncer2.Start() + defer syncer2.Stop() + + // Although both nodes are at the same height, they'll have a + // completely disjoint set of 3 chan ID's that they know of. + syncer1Chans := []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(1), + lnwire.NewShortChanIDFromInt(2), + lnwire.NewShortChanIDFromInt(3), + } + syncer2Chans := []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(4), + lnwire.NewShortChanIDFromInt(5), + lnwire.NewShortChanIDFromInt(6), + } + + // Before we start the test, we'll set our chunk size to 2 in order to + // make testing the chunked requests and replies easier. + chunkSize := int32(2) + encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize + + // We'll kick off the test by passing over the QueryChannelRange + // messages from one node to the other. + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan1: + for _, msg := range msgs { + // The message MUST be a QueryChannelRange message. + _, ok := msg.(*lnwire.QueryChannelRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer2.gossipMsgs <- msg: + + } + } + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer2") + + case msgs := <-msgChan2: + for _, msg := range msgs { + // The message MUST be a QueryChannelRange message. + _, ok := msg.(*lnwire.QueryChannelRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer1.gossipMsgs <- msg: + + } + } + } + + // At this point, we'll need to send responses to both nodes from their + // respective channel series. Both nodes will simply request the entire + // set of channels from the other. + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries1.filterRangeReqs: + // We'll send all the channels that it should know of. + chanSeries1.filterRangeResp <- syncer1Chans + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries2.filterRangeReqs: + // We'll send back all the channels that it should know of. + chanSeries2.filterRangeResp <- syncer2Chans + } + + // At this point, we'll forward the ReplyChannelRange messages to both + // parties. Two replies are expected since the chunk size is 2, and we + // need to query for 3 channels. + for i := 0; i < 2; i++ { + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan1: + for _, msg := range msgs { + // The message MUST be a ReplyChannelRange message. + _, ok := msg.(*lnwire.ReplyChannelRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer2.gossipMsgs <- msg: + } + } + } + } + for i := 0; i < 2; i++ { + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer2") + + case msgs := <-msgChan2: + for _, msg := range msgs { + // The message MUST be a ReplyChannelRange message. + _, ok := msg.(*lnwire.ReplyChannelRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer1.gossipMsgs <- msg: + } + } + } + } + + // We'll now send back a chunked response for both parties of the known + // short chan ID's. + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries1.filterReq: + chanSeries1.filterResp <- syncer2Chans + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries2.filterReq: + chanSeries2.filterResp <- syncer1Chans + } + + // At this point, both parties should start to send out initial + // requests to query the chan IDs of the remote party. As the chunk + // size is 3, they'll need 2 rounds in order to fully reconcile the + // state. + for i := 0; i < 2; i++ { + // Both parties should now have sent out the initial requests + // to query the chan IDs of the other party. + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan1: + for _, msg := range msgs { + // The message MUST be a QueryShortChanIDs message. + _, ok := msg.(*lnwire.QueryShortChanIDs) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryShortChanIDs for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer2.gossipMsgs <- msg: + + } + } + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer2") + + case msgs := <-msgChan2: + for _, msg := range msgs { + // The message MUST be a QueryShortChanIDs message. + _, ok := msg.(*lnwire.QueryShortChanIDs) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryShortChanIDs for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer1.gossipMsgs <- msg: + + } + } + } + + // We'll then respond to both parties with an empty set of replies (as + // it doesn't affect the test). + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries1.annReq: + chanSeries1.annResp <- []lnwire.Message{} + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries2.annReq: + chanSeries2.annResp <- []lnwire.Message{} + } + + // Both sides should then receive a ReplyShortChanIDsEnd as the first + // chunk has been replied to. + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan1: + for _, msg := range msgs { + // The message MUST be a ReplyShortChanIDsEnd message. + _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer2.gossipMsgs <- msg: + + } + } + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan2: + for _, msg := range msgs { + // The message MUST be a ReplyShortChanIDsEnd message. + _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) + if !ok { + t.Fatalf("wrong message: expected "+ + "ReplyShortChanIDsEnd for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer1.gossipMsgs <- msg: + + } + } + } + } + + // At this stage both parties should now be sending over their initial + // GossipTimestampRange messages as they should both be fully synced. + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan1: + for _, msg := range msgs { + // The message MUST be a GossipTimestampRange message. + _, ok := msg.(*lnwire.GossipTimestampRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer2.gossipMsgs <- msg: + + } + } + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan2: + for _, msg := range msgs { + // The message MUST be a GossipTimestampRange message. + _, ok := msg.(*lnwire.GossipTimestampRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer1.gossipMsgs <- msg: + + } + } + } +} + +// TestGossipSyncerAlreadySynced tests that if we attempt to synchronize two +// syncers that have the exact same state, then they'll skip straight to the +// final state and not perform any channel queries. +func TestGossipSyncerAlreadySynced(t *testing.T) { + t.Parallel() + + // First, we'll create two gossipSyncer instances with a canned + // sendToPeer message to allow us to intercept their potential sends. + startHeight := lnwire.ShortChannelID{ + BlockHeight: 1144, + } + msgChan1, syncer1, chanSeries1 := newTestSyncer( + startHeight, + ) + syncer1.Start() + defer syncer1.Stop() + + msgChan2, syncer2, chanSeries2 := newTestSyncer( + startHeight, + ) + syncer2.Start() + defer syncer2.Stop() + + // Before we start the test, we'll set our chunk size to 2 in order to + // make testing the chunked requests and replies easier. + chunkSize := int32(2) + encodingTypeToChunkSize[lnwire.EncodingSortedPlain] = chunkSize + + // The channel state of both syncers will be identical. They should + // recognize this, and skip the sync phase below. + syncer1Chans := []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(1), + lnwire.NewShortChanIDFromInt(2), + lnwire.NewShortChanIDFromInt(3), + } + syncer2Chans := []lnwire.ShortChannelID{ + lnwire.NewShortChanIDFromInt(1), + lnwire.NewShortChanIDFromInt(2), + lnwire.NewShortChanIDFromInt(3), + } + + // We'll now kick off the test by allowing both side to send their + // QueryChannelRange messages to each other. + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan1: + for _, msg := range msgs { + // The message MUST be a QueryChannelRange message. + _, ok := msg.(*lnwire.QueryChannelRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer2.gossipMsgs <- msg: + + } + } + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer2") + + case msgs := <-msgChan2: + for _, msg := range msgs { + // The message MUST be a QueryChannelRange message. + _, ok := msg.(*lnwire.QueryChannelRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer1.gossipMsgs <- msg: + + } + } + } + + // We'll now send back the range each side should send over: the set of + // channels they already know about. + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries1.filterRangeReqs: + // We'll send all the channels that it should know of. + chanSeries1.filterRangeResp <- syncer1Chans + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries2.filterRangeReqs: + // We'll send back all the channels that it should know of. + chanSeries2.filterRangeResp <- syncer2Chans + } + + // Next, we'll thread through the replies of both parties. As the chunk + // size is 2, and they both know of 3 channels, it'll take two around + // and two chunks. + for i := 0; i < 2; i++ { + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan1: + for _, msg := range msgs { + // The message MUST be a ReplyChannelRange message. + _, ok := msg.(*lnwire.ReplyChannelRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer2.gossipMsgs <- msg: + } + } + } + } + for i := 0; i < 2; i++ { + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer2") + + case msgs := <-msgChan2: + for _, msg := range msgs { + // The message MUST be a ReplyChannelRange message. + _, ok := msg.(*lnwire.ReplyChannelRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer1.gossipMsgs <- msg: + } + } + } + } + + // Now that both sides have the full responses, we'll send over the + // channels that they need to filter out. As both sides have the exact + // same set of channels, they should skip to the final state. + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries1.filterReq: + chanSeries1.filterResp <- []lnwire.ShortChannelID{} + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("no query recvd") + + case <-chanSeries2.filterReq: + chanSeries2.filterResp <- []lnwire.ShortChannelID{} + } + + // As both parties are already synced, the next message they send to + // each other should be the GossipTimestampRange message. + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan1: + for _, msg := range msgs { + // The message MUST be a GossipTimestampRange message. + _, ok := msg.(*lnwire.GossipTimestampRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer2.gossipMsgs <- msg: + + } + } + } + select { + case <-time.After(time.Second * 2): + t.Fatalf("didn't get msg from syncer1") + + case msgs := <-msgChan2: + for _, msg := range msgs { + // The message MUST be a GossipTimestampRange message. + _, ok := msg.(*lnwire.GossipTimestampRange) + if !ok { + t.Fatalf("wrong message: expected "+ + "QueryChannelRange for %T", msg) + } + + select { + case <-time.After(time.Second * 2): + t.Fatalf("node 2 didn't read msg") + + case syncer1.gossipMsgs <- msg: + + } + } + } +}