package discovery

import (
	"errors"
	"fmt"
	"math"
	"reflect"
	"sync"
	"testing"
	"time"

	"github.com/btcsuite/btcd/chaincfg"
	"github.com/btcsuite/btcd/chaincfg/chainhash"
	"github.com/davecgh/go-spew/spew"
	"github.com/lightningnetwork/lnd/lnwire"
)

const (
	defaultEncoding   = lnwire.EncodingSortedPlain
	latestKnownHeight = 1337
)

var (
	defaultChunkSize = encodingTypeToChunkSize[defaultEncoding]
)

type horizonQuery struct {
	chain chainhash.Hash
	start time.Time
	end   time.Time
}
type filterRangeReq struct {
	startHeight, endHeight uint32
}

type mockChannelGraphTimeSeries struct {
	highestID lnwire.ShortChannelID

	horizonReq  chan horizonQuery
	horizonResp chan []lnwire.Message

	filterReq  chan []lnwire.ShortChannelID
	filterResp chan []lnwire.ShortChannelID

	filterRangeReqs chan filterRangeReq
	filterRangeResp chan []lnwire.ShortChannelID

	annReq  chan []lnwire.ShortChannelID
	annResp chan []lnwire.Message

	updateReq  chan lnwire.ShortChannelID
	updateResp chan []*lnwire.ChannelUpdate
}

func newMockChannelGraphTimeSeries(
	hID lnwire.ShortChannelID) *mockChannelGraphTimeSeries {

	return &mockChannelGraphTimeSeries{
		highestID: hID,

		horizonReq:  make(chan horizonQuery, 1),
		horizonResp: make(chan []lnwire.Message, 1),

		filterReq:  make(chan []lnwire.ShortChannelID, 1),
		filterResp: make(chan []lnwire.ShortChannelID, 1),

		filterRangeReqs: make(chan filterRangeReq, 1),
		filterRangeResp: make(chan []lnwire.ShortChannelID, 1),

		annReq:  make(chan []lnwire.ShortChannelID, 1),
		annResp: make(chan []lnwire.Message, 1),

		updateReq:  make(chan lnwire.ShortChannelID, 1),
		updateResp: make(chan []*lnwire.ChannelUpdate, 1),
	}
}

func (m *mockChannelGraphTimeSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) {
	return &m.highestID, nil
}
func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash,
	startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {

	m.horizonReq <- horizonQuery{
		chain, startTime, endTime,
	}

	return <-m.horizonResp, nil
}
func (m *mockChannelGraphTimeSeries) FilterKnownChanIDs(chain chainhash.Hash,
	superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error) {

	m.filterReq <- superSet

	return <-m.filterResp, nil
}
func (m *mockChannelGraphTimeSeries) FilterChannelRange(chain chainhash.Hash,
	startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error) {

	m.filterRangeReqs <- filterRangeReq{startHeight, endHeight}

	return <-m.filterRangeResp, nil
}
func (m *mockChannelGraphTimeSeries) FetchChanAnns(chain chainhash.Hash,
	shortChanIDs []lnwire.ShortChannelID) ([]lnwire.Message, error) {

	m.annReq <- shortChanIDs

	return <-m.annResp, nil
}
func (m *mockChannelGraphTimeSeries) FetchChanUpdates(chain chainhash.Hash,
	shortChanID lnwire.ShortChannelID) ([]*lnwire.ChannelUpdate, error) {

	m.updateReq <- shortChanID

	return <-m.updateResp, nil
}

var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil)

// newTestSyncer creates a new test instance of a GossipSyncer. A buffered
// message channel is returned for intercepting messages sent from the syncer,
// in addition to a mock channel series which allows the test to control which
// messages the syncer knows of or wishes to filter out. The variadic flags are
// treated as positional arguments where the first index signals that the syncer
// should spawn a channelGraphSyncer and second index signals that the syncer
// should spawn a replyHandler. Any flags beyond the first two are currently
// ignored. If no flags are provided, both a channelGraphSyncer and replyHandler
// will be spawned by default.
func newTestSyncer(hID lnwire.ShortChannelID,
	encodingType lnwire.ShortChanIDEncoding, chunkSize int32,
	flags ...bool) (chan []lnwire.Message,
	*GossipSyncer, *mockChannelGraphTimeSeries) {

	syncChannels := true
	replyQueries := true
	if len(flags) > 0 {
		syncChannels = flags[0]
	}
	if len(flags) > 1 {
		replyQueries = flags[1]
	}

	msgChan := make(chan []lnwire.Message, 20)
	cfg := gossipSyncerCfg{
		channelSeries:  newMockChannelGraphTimeSeries(hID),
		encodingType:   encodingType,
		chunkSize:      chunkSize,
		batchSize:      chunkSize,
		noSyncChannels: !syncChannels,
		noReplyQueries: !replyQueries,
		sendToPeer: func(msgs ...lnwire.Message) error {
			msgChan <- msgs
			return nil
		},
		sendToPeerSync: func(msgs ...lnwire.Message) error {
			msgChan <- msgs
			return nil
		},
		delayedQueryReplyInterval: 2 * time.Second,
	}
	syncer := newGossipSyncer(cfg)

	return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries)
}

// TestGossipSyncerFilterGossipMsgsNoHorizon tests that if the remote peer
// doesn't have a horizon set, then we won't send any incoming messages to it.
func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) {
	t.Parallel()

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	msgChan, syncer, _ := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)

	// With the syncer created, we'll create a set of messages to filter
	// through the gossiper to the target peer.
	msgs := []msgWithSenders{
		{
			msg: &lnwire.NodeAnnouncement{Timestamp: uint32(time.Now().Unix())},
		},
		{
			msg: &lnwire.NodeAnnouncement{Timestamp: uint32(time.Now().Unix())},
		},
	}

	// We'll then attempt to filter the set of messages through the target
	// peer.
	syncer.FilterGossipMsgs(msgs...)

	// As the remote peer doesn't yet have a gossip timestamp set, we
	// shouldn't receive any outbound messages.
	select {
	case msg := <-msgChan:
		t.Fatalf("received message but shouldn't have: %v",
			spew.Sdump(msg))

	case <-time.After(time.Millisecond * 10):
	}
}

func unixStamp(a int64) uint32 {
	t := time.Unix(a, 0)
	return uint32(t.Unix())
}

// TestGossipSyncerFilterGossipMsgsAll tests that we're able to properly filter
// out a set of incoming messages based on the set remote update horizon for a
// peer. We tests all messages type, and all time straddling. We'll also send a
// channel ann that already has a channel update on disk.
func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
	t.Parallel()

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	msgChan, syncer, chanSeries := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)

	// We'll create then apply a remote horizon for the target peer with a
	// set of manually selected timestamps.
	remoteHorizon := &lnwire.GossipTimestampRange{
		FirstTimestamp: unixStamp(25000),
		TimestampRange: uint32(1000),
	}
	syncer.remoteUpdateHorizon = remoteHorizon

	// With the syncer created, we'll create a set of messages to filter
	// through the gossiper to the target peer. Our message will consist of
	// one node announcement above the horizon, one below. Additionally,
	// we'll include a chan ann with an update below the horizon, one
	// with an update timestamp above the horizon, and one without any
	// channel updates at all.
	msgs := []msgWithSenders{
		{
			// Node ann above horizon.
			msg: &lnwire.NodeAnnouncement{Timestamp: unixStamp(25001)},
		},
		{
			// Node ann below horizon.
			msg: &lnwire.NodeAnnouncement{Timestamp: unixStamp(5)},
		},
		{
			// Node ann above horizon.
			msg: &lnwire.NodeAnnouncement{Timestamp: unixStamp(999999)},
		},
		{
			// Ann tuple below horizon.
			msg: &lnwire.ChannelAnnouncement{
				ShortChannelID: lnwire.NewShortChanIDFromInt(10),
			},
		},
		{
			msg: &lnwire.ChannelUpdate{
				ShortChannelID: lnwire.NewShortChanIDFromInt(10),
				Timestamp:      unixStamp(5),
			},
		},
		{
			// Ann tuple above horizon.
			msg: &lnwire.ChannelAnnouncement{
				ShortChannelID: lnwire.NewShortChanIDFromInt(15),
			},
		},
		{
			msg: &lnwire.ChannelUpdate{
				ShortChannelID: lnwire.NewShortChanIDFromInt(15),
				Timestamp:      unixStamp(25002),
			},
		},
		{
			// Ann tuple beyond horizon.
			msg: &lnwire.ChannelAnnouncement{
				ShortChannelID: lnwire.NewShortChanIDFromInt(20),
			},
		},
		{
			msg: &lnwire.ChannelUpdate{
				ShortChannelID: lnwire.NewShortChanIDFromInt(20),
				Timestamp:      unixStamp(999999),
			},
		},
		{
			// Ann w/o an update at all, the update in the DB will
			// be below the horizon.
			msg: &lnwire.ChannelAnnouncement{
				ShortChannelID: lnwire.NewShortChanIDFromInt(25),
			},
		},
	}

	// Before we send off the query, we'll ensure we send the missing
	// channel update for that final ann. It will be below the horizon, so
	// shouldn't be sent anyway.
	errCh := make(chan error, 1)
	go func() {
		select {
		case <-time.After(time.Second * 15):
			errCh <- errors.New("no query received")
			return
		case query := <-chanSeries.updateReq:
			// It should be asking for the chan updates of short
			// chan ID 25.
			expectedID := lnwire.NewShortChanIDFromInt(25)
			if expectedID != query {
				errCh <- fmt.Errorf("wrong query id: expected %v, got %v",
					expectedID, query)
				return
			}

			// If so, then we'll send back the missing update.
			chanSeries.updateResp <- []*lnwire.ChannelUpdate{
				{
					ShortChannelID: lnwire.NewShortChanIDFromInt(25),
					Timestamp:      unixStamp(5),
				},
			}
			errCh <- nil
		}
	}()

	// We'll then instruct the gossiper to filter this set of messages.
	syncer.FilterGossipMsgs(msgs...)

	// Out of all the messages we sent in, we should only get 2 of them
	// back.
	select {
	case <-time.After(time.Second * 15):
		t.Fatalf("no msgs received")

	case msgs := <-msgChan:
		if len(msgs) != 3 {
			t.Fatalf("expected 3 messages instead got %v "+
				"messages: %v", len(msgs), spew.Sdump(msgs))
		}
	}

	// Wait for error from goroutine.
	select {
	case <-time.After(time.Second * 30):
		t.Fatalf("goroutine did not return within 30 seconds")
	case err := <-errCh:
		if err != nil {
			t.Fatal(err)
		}
	}
}

// TestGossipSyncerApplyNoHistoricalGossipFilter tests that once a gossip filter
// is applied for the remote peer, then we don't send the peer all known
// messages which are within their desired time horizon.
func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) {
	t.Parallel()

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	_, syncer, chanSeries := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)
	syncer.cfg.ignoreHistoricalFilters = true

	// We'll apply this gossip horizon for the remote peer.
	remoteHorizon := &lnwire.GossipTimestampRange{
		FirstTimestamp: unixStamp(25000),
		TimestampRange: uint32(1000),
	}

	// After applying the gossip filter, the chan series should not be
	// queried using the updated horizon.
	errChan := make(chan error, 1)
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()

		select {
		// No query received, success.
		case <-time.After(3 * time.Second):
			errChan <- nil

		// Unexpected query received.
		case <-chanSeries.horizonReq:
			errChan <- errors.New("chan series should not have been " +
				"queried")
		}
	}()

	// We'll now attempt to apply the gossip filter for the remote peer.
	syncer.ApplyGossipFilter(remoteHorizon)

	// Ensure that the syncer's remote horizon was properly updated.
	if !reflect.DeepEqual(syncer.remoteUpdateHorizon, remoteHorizon) {
		t.Fatalf("expected remote horizon: %v, got: %v",
			remoteHorizon, syncer.remoteUpdateHorizon)
	}

	// Wait for the query check to finish.
	wg.Wait()

	// Assert that no query was made as a result of applying the gossip
	// filter.
	err := <-errChan
	if err != nil {
		t.Fatalf(err.Error())
	}
}

// TestGossipSyncerApplyGossipFilter tests that once a gossip filter is applied
// for the remote peer, then we send the peer all known messages which are
// within their desired time horizon.
func TestGossipSyncerApplyGossipFilter(t *testing.T) {
	t.Parallel()

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	msgChan, syncer, chanSeries := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)

	// We'll apply this gossip horizon for the remote peer.
	remoteHorizon := &lnwire.GossipTimestampRange{
		FirstTimestamp: unixStamp(25000),
		TimestampRange: uint32(1000),
	}

	// Before we apply the horizon, we'll dispatch a response to the query
	// that the syncer will issue.
	errCh := make(chan error, 1)
	go func() {
		select {
		case <-time.After(time.Second * 15):
			errCh <- errors.New("no query recvd")
			return
		case query := <-chanSeries.horizonReq:
			// The syncer should have translated the time range
			// into the proper star time.
			if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
				errCh <- fmt.Errorf("wrong query stamp: expected %v, got %v",
					remoteHorizon.FirstTimestamp, query.start)
				return
			}

			// For this first response, we'll send back an empty
			// set of messages. As result, we shouldn't send any
			// messages.
			chanSeries.horizonResp <- []lnwire.Message{}
			errCh <- nil
		}
	}()

	// We'll now attempt to apply the gossip filter for the remote peer.
	err := syncer.ApplyGossipFilter(remoteHorizon)
	if err != nil {
		t.Fatalf("unable to apply filter: %v", err)
	}

	// There should be no messages in the message queue as we didn't send
	// the syncer and messages within the horizon.
	select {
	case msgs := <-msgChan:
		t.Fatalf("expected no msgs, instead got %v", spew.Sdump(msgs))
	default:
	}

	// Wait for error result from goroutine.
	select {
	case <-time.After(time.Second * 30):
		t.Fatalf("goroutine did not return within 30 seconds")
	case err := <-errCh:
		if err != nil {
			t.Fatal(err)
		}
	}

	// If we repeat the process, but give the syncer a set of valid
	// messages, then these should be sent to the remote peer.
	go func() {
		select {
		case <-time.After(time.Second * 15):
			errCh <- errors.New("no query recvd")
			return
		case query := <-chanSeries.horizonReq:
			// The syncer should have translated the time range
			// into the proper star time.
			if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
				errCh <- fmt.Errorf("wrong query stamp: expected %v, got %v",
					remoteHorizon.FirstTimestamp, query.start)
				return
			}

			// For this first response, we'll send back a proper
			// set of messages that should be echoed back.
			chanSeries.horizonResp <- []lnwire.Message{
				&lnwire.ChannelUpdate{
					ShortChannelID: lnwire.NewShortChanIDFromInt(25),
					Timestamp:      unixStamp(5),
				},
			}
			errCh <- nil
		}
	}()
	err = syncer.ApplyGossipFilter(remoteHorizon)
	if err != nil {
		t.Fatalf("unable to apply filter: %v", err)
	}

	// We should get back the exact same message.
	select {
	case <-time.After(time.Second * 15):
		t.Fatalf("no msgs received")

	case msgs := <-msgChan:
		if len(msgs) != 1 {
			t.Fatalf("wrong messages: expected %v, got %v",
				1, len(msgs))
		}
	}

	// Wait for error result from goroutine.
	select {
	case <-time.After(time.Second * 30):
		t.Fatalf("goroutine did not return within 30 seconds")
	case err := <-errCh:
		if err != nil {
			t.Fatal(err)
		}
	}
}

// TestGossipSyncerQueryChannelRangeWrongChainHash tests that if we receive a
// channel range query for the wrong chain, then we send back a response with no
// channels and complete=0.
func TestGossipSyncerQueryChannelRangeWrongChainHash(t *testing.T) {
	t.Parallel()

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	msgChan, syncer, _ := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)

	// We'll now ask the syncer to reply to a channel range query, but for a
	// chain that it isn't aware of.
	query := &lnwire.QueryChannelRange{
		ChainHash:        *chaincfg.SimNetParams.GenesisHash,
		FirstBlockHeight: 0,
		NumBlocks:        math.MaxUint32,
	}
	err := syncer.replyChanRangeQuery(query)
	if err != nil {
		t.Fatalf("unable to process short chan ID's: %v", err)
	}

	select {
	case <-time.After(time.Second * 15):
		t.Fatalf("no msgs received")

	case msgs := <-msgChan:
		// We should get back exactly one message, that's a
		// ReplyChannelRange with a matching query, and a complete value
		// of zero.
		if len(msgs) != 1 {
			t.Fatalf("wrong messages: expected %v, got %v",
				1, len(msgs))
		}

		msg, ok := msgs[0].(*lnwire.ReplyChannelRange)
		if !ok {
			t.Fatalf("expected lnwire.ReplyChannelRange, got %T", msg)
		}

		if msg.QueryChannelRange != *query {
			t.Fatalf("wrong query channel range in reply: "+
				"expected: %v\ngot: %v", spew.Sdump(*query),
				spew.Sdump(msg.QueryChannelRange))
		}
		if msg.Complete != 0 {
			t.Fatalf("expected complete set to 0, got %v",
				msg.Complete)
		}
	}
}

// TestGossipSyncerReplyShortChanIDsWrongChainHash tests that if we get a chan
// ID query for the wrong chain, then we send back only a short ID end with
// complete=0.
func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) {
	t.Parallel()

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	msgChan, syncer, _ := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)

	// We'll now ask the syncer to reply to a chan ID query, but for a
	// chain that it isn't aware of.
	err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{
		ChainHash: *chaincfg.SimNetParams.GenesisHash,
	})
	if err != nil {
		t.Fatalf("unable to process short chan ID's: %v", err)
	}

	select {
	case <-time.After(time.Second * 15):
		t.Fatalf("no msgs received")
	case msgs := <-msgChan:

		// We should get back exactly one message, that's a
		// ReplyShortChanIDsEnd with a matching chain hash, and a
		// complete value of zero.
		if len(msgs) != 1 {
			t.Fatalf("wrong messages: expected %v, got %v",
				1, len(msgs))
		}

		msg, ok := msgs[0].(*lnwire.ReplyShortChanIDsEnd)
		if !ok {
			t.Fatalf("expected lnwire.ReplyShortChanIDsEnd "+
				"instead got %T", msg)
		}

		if msg.ChainHash != *chaincfg.SimNetParams.GenesisHash {
			t.Fatalf("wrong chain hash: expected %v, got %v",
				msg.ChainHash, chaincfg.SimNetParams.GenesisHash)
		}
		if msg.Complete != 0 {
			t.Fatalf("complete set incorrectly")
		}
	}
}

// TestGossipSyncerReplyShortChanIDs tests that in the case of a known chain
// hash for a QueryShortChanIDs, we'll return the set of matching
// announcements, as well as an ending ReplyShortChanIDsEnd message.
func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
	t.Parallel()

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	msgChan, syncer, chanSeries := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)

	queryChanIDs := []lnwire.ShortChannelID{
		lnwire.NewShortChanIDFromInt(1),
		lnwire.NewShortChanIDFromInt(2),
		lnwire.NewShortChanIDFromInt(3),
	}

	queryReply := []lnwire.Message{
		&lnwire.ChannelAnnouncement{
			ShortChannelID: lnwire.NewShortChanIDFromInt(20),
		},
		&lnwire.ChannelUpdate{
			ShortChannelID: lnwire.NewShortChanIDFromInt(20),
			Timestamp:      unixStamp(999999),
		},
		&lnwire.NodeAnnouncement{Timestamp: unixStamp(25001)},
	}

	// We'll then craft a reply to the upcoming query for all the matching
	// channel announcements for a particular set of short channel ID's.
	errCh := make(chan error, 1)
	go func() {
		select {
		case <-time.After(time.Second * 15):
			errCh <- errors.New("no query recvd")
			return
		case chanIDs := <-chanSeries.annReq:
			// The set of chan ID's should match exactly.
			if !reflect.DeepEqual(chanIDs, queryChanIDs) {
				errCh <- fmt.Errorf("wrong chan IDs: expected %v, got %v",
					queryChanIDs, chanIDs)
				return
			}

			// If they do, then we'll send back a response with
			// some canned messages.
			chanSeries.annResp <- queryReply
			errCh <- nil
		}
	}()

	// With our set up above complete, we'll now attempt to obtain a reply
	// from the channel syncer for our target chan ID query.
	err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{
		ShortChanIDs: queryChanIDs,
	})
	if err != nil {
		t.Fatalf("unable to query for chan IDs: %v", err)
	}

	for i := 0; i < len(queryReply)+1; i++ {
		select {
		case <-time.After(time.Second * 15):
			t.Fatalf("no msgs received")

		// We should get back exactly 4 messages. The first 3 are the
		// same messages we sent above, and the query end message.
		case msgs := <-msgChan:
			if len(msgs) != 1 {
				t.Fatalf("wrong number of messages: "+
					"expected %v, got %v", 1, len(msgs))
			}

			isQueryReply := i < len(queryReply)
			finalMsg, ok := msgs[0].(*lnwire.ReplyShortChanIDsEnd)

			switch {
			case isQueryReply &&
				!reflect.DeepEqual(queryReply[i], msgs[0]):

				t.Fatalf("wrong message: expected %v, got %v",
					spew.Sdump(queryReply[i]),
					spew.Sdump(msgs[0]))

			case !isQueryReply && !ok:
				t.Fatalf("expected lnwire.ReplyShortChanIDsEnd"+
					" instead got %T", msgs[3])

			case !isQueryReply && finalMsg.Complete != 1:
				t.Fatalf("complete wasn't set")
			}
		}
	}

	// Wait for error from goroutine.
	select {
	case <-time.After(time.Second * 30):
		t.Fatalf("goroutine did not return within 30 seconds")
	case err := <-errCh:
		if err != nil {
			t.Fatal(err)
		}
	}
}

// TestGossipSyncerReplyChanRangeQuery tests that if we receive a
// QueryChannelRange message, then we'll properly send back a chunked reply to
// the remote peer.
func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
	t.Parallel()

	// We'll use a smaller chunk size so we can easily test all the edge
	// cases.
	const chunkSize = 2

	// We'll now create our test gossip syncer that will shortly respond to
	// our canned query.
	msgChan, syncer, chanSeries := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding, chunkSize,
	)

	// Next, we'll craft a query to ask for all the new chan ID's after
	// block 100.
	const startingBlockHeight = 100
	const numBlocks = 50
	const endingBlockHeight = startingBlockHeight + numBlocks - 1
	query := &lnwire.QueryChannelRange{
		FirstBlockHeight: uint32(startingBlockHeight),
		NumBlocks:        uint32(numBlocks),
	}

	// We'll then launch a goroutine to reply to the query with a set of 5
	// responses. This will ensure we get two full chunks, and one partial
	// chunk.
	queryResp := []lnwire.ShortChannelID{
		{
			BlockHeight: uint32(startingBlockHeight),
		},
		{
			BlockHeight: 102,
		},
		{
			BlockHeight: 104,
		},
		{
			BlockHeight: 106,
		},
		{
			BlockHeight: 108,
		},
	}

	errCh := make(chan error, 1)
	go func() {
		select {
		case <-time.After(time.Second * 15):
			errCh <- errors.New("no query recvd")
			return
		case filterReq := <-chanSeries.filterRangeReqs:
			// We should be querying for block 100 to 150.
			if filterReq.startHeight != startingBlockHeight &&
				filterReq.endHeight != endingBlockHeight {

				errCh <- fmt.Errorf("wrong height range: %v",
					spew.Sdump(filterReq))
				return
			}

			// If the proper request was sent, then we'll respond
			// with our set of short channel ID's.
			chanSeries.filterRangeResp <- queryResp
			errCh <- nil
		}
	}()

	// With our goroutine active, we'll now issue the query.
	if err := syncer.replyChanRangeQuery(query); err != nil {
		t.Fatalf("unable to issue query: %v", err)
	}

	// At this point, we'll now wait for the syncer to send the chunked
	// reply. We should get three sets of messages as two of them should be
	// full, while the other is the final fragment.
	const numExpectedChunks = 3
	respMsgs := make([]lnwire.ShortChannelID, 0, 5)
	for i := 0; i < numExpectedChunks; i++ {
		select {
		case <-time.After(time.Second * 15):
			t.Fatalf("no msgs received")

		case msg := <-msgChan:
			resp := msg[0]
			rangeResp, ok := resp.(*lnwire.ReplyChannelRange)
			if !ok {
				t.Fatalf("expected ReplyChannelRange instead got %T", msg)
			}

			// We'll determine the correct values of each field in
			// each response based on the order that they were sent.
			var (
				expectedFirstBlockHeight uint32
				expectedNumBlocks        uint32
				expectedComplete         uint8
			)

			switch {
			// The first reply should range from our starting block
			// height until it reaches its maximum capacity of
			// channels.
			case i == 0:
				expectedFirstBlockHeight = startingBlockHeight
				expectedNumBlocks = chunkSize + 1

			// The last reply should range starting from the next
			// block of our previous reply up until the ending
			// height of the query. It should also have the Complete
			// bit set.
			case i == numExpectedChunks-1:
				expectedFirstBlockHeight = respMsgs[len(respMsgs)-1].BlockHeight
				expectedNumBlocks = endingBlockHeight - expectedFirstBlockHeight + 1
				expectedComplete = 1

			// Any intermediate replies should range starting from
			// the next block of our previous reply up until it
			// reaches its maximum capacity of channels.
			default:
				expectedFirstBlockHeight = respMsgs[len(respMsgs)-1].BlockHeight
				expectedNumBlocks = 5
			}

			switch {
			case rangeResp.FirstBlockHeight != expectedFirstBlockHeight:
				t.Fatalf("FirstBlockHeight in resp #%d "+
					"incorrect: expected %v, got %v", i+1,
					expectedFirstBlockHeight,
					rangeResp.FirstBlockHeight)

			case rangeResp.NumBlocks != expectedNumBlocks:
				t.Fatalf("NumBlocks in resp #%d incorrect: "+
					"expected %v, got %v", i+1,
					expectedNumBlocks, rangeResp.NumBlocks)

			case rangeResp.Complete != expectedComplete:
				t.Fatalf("Complete in resp #%d incorrect: "+
					"expected %v, got %v", i+1,
					expectedNumBlocks, rangeResp.Complete)
			}

			respMsgs = append(respMsgs, rangeResp.ShortChanIDs...)
		}
	}

	// We should get back exactly 5 short chan ID's, and they should match
	// exactly the ID's we sent as a reply.
	if len(respMsgs) != len(queryResp) {
		t.Fatalf("expected %v chan ID's, instead got %v",
			len(queryResp), spew.Sdump(respMsgs))
	}
	if !reflect.DeepEqual(queryResp, respMsgs) {
		t.Fatalf("mismatched response: expected %v, got %v",
			spew.Sdump(queryResp), spew.Sdump(respMsgs))
	}

	// Wait for error from goroutine.
	select {
	case <-time.After(time.Second * 30):
		t.Fatalf("goroutine did not return within 30 seconds")
	case err := <-errCh:
		if err != nil {
			t.Fatal(err)
		}
	}
}

// TestGossipSyncerReplyChanRangeQuery tests a variety of
// QueryChannelRange messages to ensure the underlying queries are
// executed with the correct block range
func TestGossipSyncerReplyChanRangeQueryBlockRange(t *testing.T) {
	t.Parallel()

	// First create our test gossip syncer that will handle and
	// respond to the test queries
	_, syncer, chanSeries := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding, math.MaxInt32,
	)

	// Next construct test queries with various startBlock and endBlock
	// ranges
	queryReqs := []*lnwire.QueryChannelRange{
		// full range example
		{
			FirstBlockHeight: uint32(0),
			NumBlocks:        uint32(math.MaxUint32),
		},

		// small query example that does not overflow
		{
			FirstBlockHeight: uint32(1000),
			NumBlocks:        uint32(100),
		},

		// overflow example
		{
			FirstBlockHeight: uint32(1000),
			NumBlocks:        uint32(math.MaxUint32),
		},
	}

	// Next construct the expected filterRangeReq startHeight and endHeight
	// values that we will compare to the captured values
	expFilterReqs := []filterRangeReq{
		{
			startHeight: uint32(0),
			endHeight:   uint32(math.MaxUint32 - 1),
		},
		{
			startHeight: uint32(1000),
			endHeight:   uint32(1099),
		},
		{
			startHeight: uint32(1000),
			endHeight:   uint32(math.MaxUint32),
		},
	}

	// We'll then launch a goroutine to capture the filterRangeReqs for
	// each request and return those results once all queries have been
	// received
	resultsCh := make(chan []filterRangeReq, 1)
	errCh := make(chan error, 1)
	go func() {
		// We will capture the values supplied to the chanSeries here
		// and return the results once all the requests have been
		// collected
		capFilterReqs := []filterRangeReq{}

		for filterReq := range chanSeries.filterRangeReqs {
			// capture the filter request so we can compare to the
			// expected values later
			capFilterReqs = append(capFilterReqs, filterReq)

			// Reply with an empty result for each query to allow
			// unblock the caller
			queryResp := []lnwire.ShortChannelID{}
			chanSeries.filterRangeResp <- queryResp

			// Once we have collected all results send the results
			// back to the main thread and terminate the goroutine
			if len(capFilterReqs) == len(expFilterReqs) {
				resultsCh <- capFilterReqs
				return
			}

		}
	}()

	// We'll launch a goroutine to send the query sequentially. This
	// goroutine ensures that the timeout logic below on the mainthread
	// will be reached
	go func() {
		for _, query := range queryReqs {
			if err := syncer.replyChanRangeQuery(query); err != nil {
				errCh <- fmt.Errorf("unable to issue query: %v", err)
				return
			}
		}
	}()

	// Wait for the results to be collected and validate that the
	// collected results match the expected results, the timeout to
	// expire, or an error to occur
	select {
	case capFilterReq := <-resultsCh:
		if !reflect.DeepEqual(expFilterReqs, capFilterReq) {
			t.Fatalf("mismatched filter reqs: expected %v, got %v",
				spew.Sdump(expFilterReqs), spew.Sdump(capFilterReq))
		}
	case <-time.After(time.Second * 10):
		t.Fatalf("goroutine did not return within 10 seconds")
	case err := <-errCh:
		if err != nil {
			t.Fatal(err)
		}
	}
}

// TestGossipSyncerReplyChanRangeQueryNoNewChans tests that if we issue a reply
// for a channel range query, and we don't have any new channels, then we send
// back a single response that signals completion.
func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
	t.Parallel()

	// We'll now create our test gossip syncer that will shortly respond to
	// our canned query.
	msgChan, syncer, chanSeries := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)

	// Next, we'll craft a query to ask for all the new chan ID's after
	// block 100.
	query := &lnwire.QueryChannelRange{
		FirstBlockHeight: 100,
		NumBlocks:        50,
	}

	// We'll then launch a goroutine to reply to the query no new channels.
	resp := []lnwire.ShortChannelID{}
	errCh := make(chan error, 1)
	go func() {
		select {
		case <-time.After(time.Second * 15):
			errCh <- errors.New("no query recvd")
			return
		case filterReq := <-chanSeries.filterRangeReqs:
			// We should be querying for block 100 to 150.
			if filterReq.startHeight != 100 && filterReq.endHeight != 150 {
				errCh <- fmt.Errorf("wrong height range: %v",
					spew.Sdump(filterReq))
				return
			}
			// If the proper request was sent, then we'll respond
			// with our blank set of short chan ID's.
			chanSeries.filterRangeResp <- resp
			errCh <- nil
		}
	}()

	// With our goroutine active, we'll now issue the query.
	if err := syncer.replyChanRangeQuery(query); err != nil {
		t.Fatalf("unable to issue query: %v", err)
	}

	// We should get back exactly one message, and the message should
	// indicate that this is the final in the series.
	select {
	case <-time.After(time.Second * 15):
		t.Fatalf("no msgs received")

	case msg := <-msgChan:
		resp := msg[0]
		rangeResp, ok := resp.(*lnwire.ReplyChannelRange)
		if !ok {
			t.Fatalf("expected ReplyChannelRange instead got %T", msg)
		}

		if len(rangeResp.ShortChanIDs) != 0 {
			t.Fatalf("expected no chan ID's, instead "+
				"got: %v", spew.Sdump(rangeResp.ShortChanIDs))
		}
		if rangeResp.Complete != 1 {
			t.Fatalf("complete wasn't set")
		}
	}

	// Wait for error from goroutine.
	select {
	case <-time.After(time.Second * 30):
		t.Fatalf("goroutine did not return within 30 seconds")
	case err := <-errCh:
		if err != nil {
			t.Fatal(err)
		}
	}
}

// TestGossipSyncerGenChanRangeQuery tests that given the current best known
// channel ID, we properly generate an correct initial channel range response.
func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
	t.Parallel()

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	const startingHeight = 200
	_, syncer, _ := newTestSyncer(
		lnwire.ShortChannelID{BlockHeight: startingHeight},
		defaultEncoding, defaultChunkSize,
	)

	// If we now ask the syncer to generate an initial range query, it
	// should return a start height that's back chanRangeQueryBuffer
	// blocks.
	rangeQuery, err := syncer.genChanRangeQuery(false)
	if err != nil {
		t.Fatalf("unable to resp: %v", err)
	}

	firstHeight := uint32(startingHeight - chanRangeQueryBuffer)
	if rangeQuery.FirstBlockHeight != firstHeight {
		t.Fatalf("incorrect chan range query: expected %v, %v",
			rangeQuery.FirstBlockHeight,
			startingHeight-chanRangeQueryBuffer)
	}
	if rangeQuery.NumBlocks != math.MaxUint32-firstHeight {
		t.Fatalf("wrong num blocks: expected %v, got %v",
			math.MaxUint32-firstHeight, rangeQuery.NumBlocks)
	}

	// Generating a historical range query should result in a start height
	// of 0.
	rangeQuery, err = syncer.genChanRangeQuery(true)
	if err != nil {
		t.Fatalf("unable to resp: %v", err)
	}
	if rangeQuery.FirstBlockHeight != 0 {
		t.Fatalf("incorrect chan range query: expected %v, %v", 0,
			rangeQuery.FirstBlockHeight)
	}
	if rangeQuery.NumBlocks != math.MaxUint32 {
		t.Fatalf("wrong num blocks: expected %v, got %v",
			math.MaxUint32, rangeQuery.NumBlocks)
	}
}

// TestGossipSyncerProcessChanRangeReply tests that we'll properly buffer
// replied channel replies until we have the complete version.
func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
	t.Parallel()

	t.Run("legacy", func(t *testing.T) {
		testGossipSyncerProcessChanRangeReply(t, true)
	})
	t.Run("block ranges", func(t *testing.T) {
		testGossipSyncerProcessChanRangeReply(t, false)
	})
}

// testGossipSyncerProcessChanRangeReply tests that we'll properly buffer
// replied channel replies until we have the complete version. The legacy
// option, if set, uses the Complete field of the reply to determine when we've
// received all expected replies. Otherwise, it looks at the block ranges of
// each reply instead.
func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
	t.Parallel()

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	highestID := lnwire.ShortChannelID{
		BlockHeight: latestKnownHeight,
	}
	_, syncer, chanSeries := newTestSyncer(
		highestID, defaultEncoding, defaultChunkSize,
	)

	startingState := syncer.state

	query, err := syncer.genChanRangeQuery(true)
	if err != nil {
		t.Fatalf("unable to generate channel range query: %v", err)
	}

	var replyQueries []*lnwire.QueryChannelRange
	if legacy {
		// Each reply query is the same as the original query in the
		// legacy mode.
		replyQueries = []*lnwire.QueryChannelRange{query, query, query}
	} else {
		// When interpreting block ranges, the first reply should start
		// from our requested first block, and the last should end at
		// our requested last block.
		replyQueries = []*lnwire.QueryChannelRange{
			{
				FirstBlockHeight: 0,
				NumBlocks:        11,
			},
			{
				FirstBlockHeight: 11,
				NumBlocks:        1,
			},
			{
				FirstBlockHeight: 12,
				NumBlocks:        query.NumBlocks - 12,
			},
		}
	}

	replies := []*lnwire.ReplyChannelRange{
		{
			QueryChannelRange: *replyQueries[0],
			ShortChanIDs: []lnwire.ShortChannelID{
				{
					BlockHeight: 10,
				},
			},
		},
		{
			QueryChannelRange: *replyQueries[1],
			ShortChanIDs: []lnwire.ShortChannelID{
				{
					BlockHeight: 11,
				},
			},
		},
		{
			QueryChannelRange: *replyQueries[2],
			Complete:          1,
			ShortChanIDs: []lnwire.ShortChannelID{
				{
					BlockHeight: 12,
				},
			},
		},
	}

	// We'll begin by sending the syncer a set of non-complete channel
	// range replies.
	if err := syncer.processChanRangeReply(replies[0]); err != nil {
		t.Fatalf("unable to process reply: %v", err)
	}
	if err := syncer.processChanRangeReply(replies[1]); err != nil {
		t.Fatalf("unable to process reply: %v", err)
	}

	// At this point, we should still be in our starting state as the query
	// hasn't finished.
	if syncer.state != startingState {
		t.Fatalf("state should not have transitioned")
	}

	expectedReq := []lnwire.ShortChannelID{
		{
			BlockHeight: 10,
		},
		{
			BlockHeight: 11,
		},
		{
			BlockHeight: 12,
		},
	}

	// As we're about to send the final response, we'll launch a goroutine
	// to respond back with a filtered set of chan ID's.
	errCh := make(chan error, 1)
	go func() {
		select {
		case <-time.After(time.Second * 15):
			errCh <- errors.New("no query received")
			return

		case req := <-chanSeries.filterReq:
			// We should get a request for the entire range of short
			// chan ID's.
			if !reflect.DeepEqual(expectedReq, req) {
				errCh <- fmt.Errorf("wrong request: expected %v, got %v",
					expectedReq, req)
				return
			}

			// We'll send back only the last two to simulate filtering.
			chanSeries.filterResp <- expectedReq[1:]
			errCh <- nil
		}
	}()

	// If we send the final message, then we should transition to
	// queryNewChannels as we've sent a non-empty set of new channels.
	if err := syncer.processChanRangeReply(replies[2]); err != nil {
		t.Fatalf("unable to process reply: %v", err)
	}

	if syncer.syncState() != queryNewChannels {
		t.Fatalf("wrong state: expected %v instead got %v",
			queryNewChannels, syncer.state)
	}
	if !reflect.DeepEqual(syncer.newChansToQuery, expectedReq[1:]) {
		t.Fatalf("wrong set of chans to query: expected %v, got %v",
			syncer.newChansToQuery, expectedReq[1:])
	}

	// Wait for error from goroutine.
	select {
	case <-time.After(time.Second * 30):
		t.Fatalf("goroutine did not return within 30 seconds")
	case err := <-errCh:
		if err != nil {
			t.Fatal(err)
		}
	}
}

// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of
// the short chan ID's which were unknown to us. We'll ensure that we request
// chunk by chunk, and after the last chunk, we return true indicating that we
// can transition to the synced stage.
func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
	t.Parallel()

	// We'll modify the chunk size to be a smaller value, so we can ensure
	// our chunk parsing works properly. With this value we should get 3
	// queries: two full chunks, and one lingering chunk.
	const chunkSize = 2

	// First, we'll create a GossipSyncer instance with a canned sendToPeer
	// message to allow us to intercept their potential sends.
	msgChan, syncer, _ := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding, chunkSize,
	)

	// Next, we'll construct a set of chan ID's that we should query for,
	// and set them as newChansToQuery within the state machine.
	newChanIDs := []lnwire.ShortChannelID{
		lnwire.NewShortChanIDFromInt(1),
		lnwire.NewShortChanIDFromInt(2),
		lnwire.NewShortChanIDFromInt(3),
		lnwire.NewShortChanIDFromInt(4),
		lnwire.NewShortChanIDFromInt(5),
	}
	syncer.newChansToQuery = newChanIDs

	for i := 0; i < chunkSize*2; i += 2 {
		// With our set up complete, we'll request a sync of chan ID's.
		done, err := syncer.synchronizeChanIDs()
		if err != nil {
			t.Fatalf("unable to sync chan IDs: %v", err)
		}

		// At this point, we shouldn't yet be done as only 2 items
		// should have been queried for.
		if done {
			t.Fatalf("syncer shown as done, but shouldn't be!")
		}

		// We should've received a new message from the syncer.
		select {
		case <-time.After(time.Second * 15):
			t.Fatalf("no msgs received")

		case msg := <-msgChan:
			queryMsg, ok := msg[0].(*lnwire.QueryShortChanIDs)
			if !ok {
				t.Fatalf("expected QueryShortChanIDs instead "+
					"got %T", msg)
			}

			// The query message should have queried for the first
			// two chan ID's, and nothing more.
			if !reflect.DeepEqual(queryMsg.ShortChanIDs, newChanIDs[i:i+chunkSize]) {
				t.Fatalf("wrong query: expected %v, got %v",
					spew.Sdump(newChanIDs[i:i+chunkSize]),
					queryMsg.ShortChanIDs)
			}
		}

		// With the proper message sent out, the internal state of the
		// syncer should reflect that it still has more channels to
		// query for.
		if !reflect.DeepEqual(syncer.newChansToQuery, newChanIDs[i+chunkSize:]) {
			t.Fatalf("incorrect chans to query for: expected %v, got %v",
				spew.Sdump(newChanIDs[i+chunkSize:]),
				syncer.newChansToQuery)
		}
	}

	// At this point, only one more channel should be lingering for the
	// syncer to query for.
	if !reflect.DeepEqual(newChanIDs[chunkSize*2:], syncer.newChansToQuery) {
		t.Fatalf("wrong chans to query: expected %v, got %v",
			newChanIDs[chunkSize*2:], syncer.newChansToQuery)
	}

	// If we issue another query, the syncer should tell us that it's done.
	done, err := syncer.synchronizeChanIDs()
	if err != nil {
		t.Fatalf("unable to sync chan IDs: %v", err)
	}
	if done {
		t.Fatalf("syncer should be finished!")
	}

	select {
	case <-time.After(time.Second * 15):
		t.Fatalf("no msgs received")

	case msg := <-msgChan:
		queryMsg, ok := msg[0].(*lnwire.QueryShortChanIDs)
		if !ok {
			t.Fatalf("expected QueryShortChanIDs instead "+
				"got %T", msg)
		}

		// The query issued should simply be the last item.
		if !reflect.DeepEqual(queryMsg.ShortChanIDs, newChanIDs[chunkSize*2:]) {
			t.Fatalf("wrong query: expected %v, got %v",
				spew.Sdump(newChanIDs[chunkSize*2:]),
				queryMsg.ShortChanIDs)
		}

		// There also should be no more channels to query.
		if len(syncer.newChansToQuery) != 0 {
			t.Fatalf("should be no more chans to query for, "+
				"instead have %v",
				spew.Sdump(syncer.newChansToQuery))
		}
	}
}

// TestGossipSyncerDelayDOS tests that the gossip syncer will begin delaying
// queries after its prescribed allotment of undelayed query responses. Once
// this happens, all query replies should be delayed by the configurated
// interval.
func TestGossipSyncerDelayDOS(t *testing.T) {
	t.Parallel()

	// We'll modify the chunk size to be a smaller value, since we'll be
	// sending a modest number of queries. After exhausting our undelayed
	// gossip queries, we'll send two extra queries and ensure that they are
	// delayed properly.
	const chunkSize = 2
	const numDelayedQueries = 2
	const delayTolerance = time.Millisecond * 200

	// First, we'll create two GossipSyncer instances with a canned
	// sendToPeer message to allow us to intercept their potential sends.
	highestID := lnwire.ShortChannelID{
		BlockHeight: 1144,
	}
	msgChan1, syncer1, chanSeries1 := newTestSyncer(
		highestID, defaultEncoding, chunkSize, true, false,
	)
	syncer1.Start()
	defer syncer1.Stop()

	msgChan2, syncer2, chanSeries2 := newTestSyncer(
		highestID, defaultEncoding, chunkSize, false, true,
	)
	syncer2.Start()
	defer syncer2.Stop()

	// Record the delayed query reply interval used by each syncer.
	delayedQueryInterval := syncer1.cfg.delayedQueryReplyInterval

	// Record the number of undelayed queries allowed by the syncers.
	numUndelayedQueries := syncer1.cfg.maxUndelayedQueryReplies

	// We will send enough queries to exhaust the undelayed responses, and
	// then send two more queries which should be delayed. An additional one
	// is subtracted from the total since undelayed message will be consumed
	// by the initial QueryChannelRange.
	numQueryResponses := numUndelayedQueries + numDelayedQueries - 1

	// The total number of responses must include the initial reply each
	// syncer will make to QueryChannelRange.
	numTotalQueries := 1 + numQueryResponses

	// The total number of channels each syncer needs to request must be
	// scaled by the chunk size being used.
	numTotalChans := numQueryResponses * chunkSize

	// Construct enough channels so that all of the queries will have enough
	// channels. Since syncer1 won't know of any channels, their sets are
	// inherently disjoint.
	var syncer2Chans []lnwire.ShortChannelID
	for i := 0; i < numTotalChans; i++ {
		syncer2Chans = append(syncer2Chans, lnwire.ShortChannelID{
			BlockHeight: highestID.BlockHeight - 1,
			TxIndex:     uint32(i),
		})
	}

	// We'll kick off the test by asserting syncer1 sends over the
	// QueryChannelRange message the other node.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("didn't get msg from syncer1")

	case msgs := <-msgChan1:
		for _, msg := range msgs {
			// The message MUST be a QueryChannelRange message.
			_, ok := msg.(*lnwire.QueryChannelRange)
			if !ok {
				t.Fatalf("wrong message: expected "+
					"QueryChannelRange for %T", msg)
			}

			select {
			case <-time.After(time.Second * 2):
				t.Fatalf("node 2 didn't read msg")

			case syncer2.queryMsgs <- msg:

			}
		}
	}

	// At this point, we'll need to a response from syncer2's channel
	// series. This will cause syncer1 to simply request the entire set of
	// channels from syncer2. This will count as the first undelayed
	// response for sycner2.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("no query recvd")

	case <-chanSeries2.filterRangeReqs:
		// We'll send back all the channels that it should know of.
		chanSeries2.filterRangeResp <- syncer2Chans
	}

	// At this point, we'll assert that the ReplyChannelRange message is
	// sent by sycner2.
	for i := 0; i < numQueryResponses; i++ {
		select {
		case <-time.After(time.Second * 2):
			t.Fatalf("didn't get msg from syncer2")

		case msgs := <-msgChan2:
			for _, msg := range msgs {
				// The message MUST be a ReplyChannelRange message.
				_, ok := msg.(*lnwire.ReplyChannelRange)
				if !ok {
					t.Fatalf("wrong message: expected "+
						"QueryChannelRange for %T", msg)
				}

				select {
				case <-time.After(time.Second * 2):
					t.Fatalf("node 2 didn't read msg")

				case syncer1.gossipMsgs <- msg:
				}
			}
		}
	}

	// We'll now have syncer1 process the received sids from syncer2.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("no query recvd")

	case <-chanSeries1.filterReq:
		chanSeries1.filterResp <- syncer2Chans
	}

	// At this point, syncer1 should start to send out initial requests to
	// query the chan IDs of the remote party. We'll keep track of the
	// number of queries made using the iterated value, which starts at one
	// due the initial contribution of the QueryChannelRange msgs.
	for i := 1; i < numTotalQueries; i++ {
		expDelayResponse := i >= numUndelayedQueries
		queryBatch(t,
			msgChan1, msgChan2,
			syncer1, syncer2,
			chanSeries2,
			expDelayResponse,
			delayedQueryInterval,
			delayTolerance,
		)
	}
}

// queryBatch is a helper method that will query for a single batch of channels
// from a peer and assert the responses. The method can also be used to assert
// the same transition happens, but is delayed by the remote peer's DOS
// rate-limiting. The provided chanSeries should belong to syncer2.
//
// The state transition performed is the following:
//   syncer1  -- QueryShortChanIDs -->   syncer2
//                                       chanSeries.FetchChanAnns()
//   syncer1 <-- ReplyShortChanIDsEnd -- syncer2
//
// If expDelayResponse is true, this method will assert that the call the
// FetchChanAnns happens between:
//   [delayedQueryInterval-delayTolerance, delayedQueryInterval+delayTolerance].
func queryBatch(t *testing.T,
	msgChan1, msgChan2 chan []lnwire.Message,
	syncer1, syncer2 *GossipSyncer,
	chanSeries *mockChannelGraphTimeSeries,
	expDelayResponse bool,
	delayedQueryInterval, delayTolerance time.Duration) {

	t.Helper()

	// First, we'll assert that syncer1 sends a QueryShortChanIDs message to
	// the remote peer.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("didn't get msg from syncer2")

	case msgs := <-msgChan1:
		for _, msg := range msgs {
			// The message MUST be a QueryShortChanIDs message.
			_, ok := msg.(*lnwire.QueryShortChanIDs)
			if !ok {
				t.Fatalf("wrong message: expected "+
					"QueryShortChanIDs for %T", msg)
			}

			select {
			case <-time.After(time.Second * 2):
				t.Fatalf("node 2 didn't read msg")

			case syncer2.queryMsgs <- msg:
			}
		}
	}

	// We'll then respond to with an empty set of replies (as it doesn't
	// affect the test).
	switch {

	// If this query has surpassed the undelayed query threshold, we will
	// impose stricter timing constraints on the response times. We'll first
	// test that syncer2's chanSeries doesn't immediately receive a query,
	// and then check that the query hasn't gone unanswered entirely.
	case expDelayResponse:
		// Create a before and after timeout to test, our test
		// will ensure the messages are delivered to the peer
		// in this timeframe.
		before := time.After(
			delayedQueryInterval - delayTolerance,
		)
		after := time.After(
			delayedQueryInterval + delayTolerance,
		)

		// First, ensure syncer2 doesn't try to respond up until the
		// before time fires.
		select {
		case <-before:
			// Query is delayed, proceed.

		case <-chanSeries.annReq:
			t.Fatalf("DOSy query was not delayed")
		}

		// If syncer2 doesn't attempt a response within the allowed
		// interval, then the messages are probably lost.
		select {
		case <-after:
			t.Fatalf("no delayed query received")

		case <-chanSeries.annReq:
			chanSeries.annResp <- []lnwire.Message{}
		}

	// Otherwise, syncer2 should query its chanSeries promtly.
	default:
		select {
		case <-time.After(50 * time.Millisecond):
			t.Fatalf("no query recvd")

		case <-chanSeries.annReq:
			chanSeries.annResp <- []lnwire.Message{}
		}
	}

	// Finally, assert that syncer2 replies to syncer1 with a
	// ReplyShortChanIDsEnd.
	select {
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("didn't get msg from syncer2")

	case msgs := <-msgChan2:
		for _, msg := range msgs {
			// The message MUST be a ReplyShortChanIDsEnd message.
			_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
			if !ok {
				t.Fatalf("wrong message: expected "+
					"ReplyShortChanIDsEnd for %T", msg)
			}

			select {
			case <-time.After(time.Second * 2):
				t.Fatalf("node 2 didn't read msg")

			case syncer1.gossipMsgs <- msg:
			}
		}
	}
}

// TestGossipSyncerRoutineSync tests all state transitions of the main syncer
// goroutine. This ensures that given an encounter with a peer that has a set
// of distinct channels, then we'll properly synchronize our channel state with
// them.
func TestGossipSyncerRoutineSync(t *testing.T) {
	t.Parallel()

	// We'll modify the chunk size to be a smaller value, so we can ensure
	// our chunk parsing works properly. With this value we should get 3
	// queries: two full chunks, and one lingering chunk.
	const chunkSize = 2

	// First, we'll create two GossipSyncer instances with a canned
	// sendToPeer message to allow us to intercept their potential sends.
	highestID := lnwire.ShortChannelID{
		BlockHeight: 1144,
	}
	msgChan1, syncer1, chanSeries1 := newTestSyncer(
		highestID, defaultEncoding, chunkSize, true, false,
	)
	syncer1.Start()
	defer syncer1.Stop()

	msgChan2, syncer2, chanSeries2 := newTestSyncer(
		highestID, defaultEncoding, chunkSize, false, true,
	)
	syncer2.Start()
	defer syncer2.Stop()

	// Although both nodes are at the same height, syncer will have 3 chan
	// ID's that syncer1 doesn't know of.
	syncer2Chans := []lnwire.ShortChannelID{
		{BlockHeight: highestID.BlockHeight - 3},
		{BlockHeight: highestID.BlockHeight - 2},
		{BlockHeight: highestID.BlockHeight - 1},
	}

	// We'll kick off the test by passing over the QueryChannelRange
	// messages from syncer1 to syncer2.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("didn't get msg from syncer1")

	case msgs := <-msgChan1:
		for _, msg := range msgs {
			// The message MUST be a QueryChannelRange message.
			_, ok := msg.(*lnwire.QueryChannelRange)
			if !ok {
				t.Fatalf("wrong message: expected "+
					"QueryChannelRange for %T", msg)
			}

			select {
			case <-time.After(time.Second * 2):
				t.Fatalf("node 2 didn't read msg")

			case syncer2.queryMsgs <- msg:

			}
		}
	}

	// At this point, we'll need to send a response from syncer2 to syncer1
	// using syncer2's channels This will cause syncer1 to simply request
	// the entire set of channels from the other.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("no query recvd")

	case <-chanSeries2.filterRangeReqs:
		// We'll send back all the channels that it should know of.
		chanSeries2.filterRangeResp <- syncer2Chans
	}

	// At this point, we'll assert that syncer2 replies with the
	// ReplyChannelRange messages. Two replies are expected since the chunk
	// size is 2, and we need to query for 3 channels.
	for i := 0; i < chunkSize; i++ {
		select {
		case <-time.After(time.Second * 2):
			t.Fatalf("didn't get msg from syncer2")

		case msgs := <-msgChan2:
			for _, msg := range msgs {
				// The message MUST be a ReplyChannelRange message.
				_, ok := msg.(*lnwire.ReplyChannelRange)
				if !ok {
					t.Fatalf("wrong message: expected "+
						"QueryChannelRange for %T", msg)
				}

				select {
				case <-time.After(time.Second * 2):
					t.Fatalf("node 2 didn't read msg")

				case syncer1.gossipMsgs <- msg:
				}
			}
		}
	}

	// We'll now send back a chunked response from syncer2 back to sycner1.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("no query recvd")

	case <-chanSeries1.filterReq:
		chanSeries1.filterResp <- syncer2Chans
	}

	// At this point, syncer1 should start to send out initial requests to
	// query the chan IDs of the remote party. As the chunk size is 2,
	// they'll need 2 rounds in order to fully reconcile the state.
	for i := 0; i < chunkSize; i++ {
		queryBatch(t,
			msgChan1, msgChan2,
			syncer1, syncer2,
			chanSeries2,
			false, 0, 0,
		)
	}

	// At this stage syncer1 should now be sending over its initial
	// GossipTimestampRange messages as it should be fully synced.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("didn't get msg from syncer1")

	case msgs := <-msgChan1:
		for _, msg := range msgs {
			// The message MUST be a GossipTimestampRange message.
			_, ok := msg.(*lnwire.GossipTimestampRange)
			if !ok {
				t.Fatalf("wrong message: expected "+
					"QueryChannelRange for %T", msg)
			}

			select {
			case <-time.After(time.Second * 2):
				t.Fatalf("node 2 didn't read msg")

			case syncer2.gossipMsgs <- msg:

			}
		}
	}
}

// TestGossipSyncerAlreadySynced tests that if we attempt to synchronize two
// syncers that have the exact same state, then they'll skip straight to the
// final state and not perform any channel queries.
func TestGossipSyncerAlreadySynced(t *testing.T) {
	t.Parallel()

	// We'll modify the chunk size to be a smaller value, so we can ensure
	// our chunk parsing works properly. With this value we should get 3
	// queries: two full chunks, and one lingering chunk.
	const chunkSize = 2
	const numChans = 3

	// First, we'll create two GossipSyncer instances with a canned
	// sendToPeer message to allow us to intercept their potential sends.
	highestID := lnwire.ShortChannelID{
		BlockHeight: 1144,
	}
	msgChan1, syncer1, chanSeries1 := newTestSyncer(
		highestID, defaultEncoding, chunkSize,
	)
	syncer1.Start()
	defer syncer1.Stop()

	msgChan2, syncer2, chanSeries2 := newTestSyncer(
		highestID, defaultEncoding, chunkSize,
	)
	syncer2.Start()
	defer syncer2.Stop()

	// The channel state of both syncers will be identical. They should
	// recognize this, and skip the sync phase below.
	var syncer1Chans, syncer2Chans []lnwire.ShortChannelID
	for i := numChans; i > 0; i-- {
		shortChanID := lnwire.ShortChannelID{
			BlockHeight: highestID.BlockHeight - uint32(i),
		}
		syncer1Chans = append(syncer1Chans, shortChanID)
		syncer2Chans = append(syncer2Chans, shortChanID)
	}

	// We'll now kick off the test by allowing both side to send their
	// QueryChannelRange messages to each other.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("didn't get msg from syncer1")

	case msgs := <-msgChan1:
		for _, msg := range msgs {
			// The message MUST be a QueryChannelRange message.
			_, ok := msg.(*lnwire.QueryChannelRange)
			if !ok {
				t.Fatalf("wrong message: expected "+
					"QueryChannelRange for %T", msg)
			}

			select {
			case <-time.After(time.Second * 2):
				t.Fatalf("node 2 didn't read msg")

			case syncer2.queryMsgs <- msg:

			}
		}
	}
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("didn't get msg from syncer2")

	case msgs := <-msgChan2:
		for _, msg := range msgs {
			// The message MUST be a QueryChannelRange message.
			_, ok := msg.(*lnwire.QueryChannelRange)
			if !ok {
				t.Fatalf("wrong message: expected "+
					"QueryChannelRange for %T", msg)
			}

			select {
			case <-time.After(time.Second * 2):
				t.Fatalf("node 2 didn't read msg")

			case syncer1.queryMsgs <- msg:

			}
		}
	}

	// We'll now send back the range each side should send over: the set of
	// channels they already know about.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("no query recvd")

	case <-chanSeries1.filterRangeReqs:
		// We'll send all the channels that it should know of.
		chanSeries1.filterRangeResp <- syncer1Chans
	}
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("no query recvd")

	case <-chanSeries2.filterRangeReqs:
		// We'll send back all the channels that it should know of.
		chanSeries2.filterRangeResp <- syncer2Chans
	}

	// Next, we'll thread through the replies of both parties. As the chunk
	// size is 2, and they both know of 3 channels, it'll take two around
	// and two chunks.
	for i := 0; i < chunkSize; i++ {
		select {
		case <-time.After(time.Second * 2):
			t.Fatalf("didn't get msg from syncer1")

		case msgs := <-msgChan1:
			for _, msg := range msgs {
				// The message MUST be a ReplyChannelRange message.
				_, ok := msg.(*lnwire.ReplyChannelRange)
				if !ok {
					t.Fatalf("wrong message: expected "+
						"QueryChannelRange for %T", msg)
				}

				select {
				case <-time.After(time.Second * 2):
					t.Fatalf("node 2 didn't read msg")

				case syncer2.gossipMsgs <- msg:
				}
			}
		}
	}
	for i := 0; i < chunkSize; i++ {
		select {
		case <-time.After(time.Second * 2):
			t.Fatalf("didn't get msg from syncer2")

		case msgs := <-msgChan2:
			for _, msg := range msgs {
				// The message MUST be a ReplyChannelRange message.
				_, ok := msg.(*lnwire.ReplyChannelRange)
				if !ok {
					t.Fatalf("wrong message: expected "+
						"QueryChannelRange for %T", msg)
				}

				select {
				case <-time.After(time.Second * 2):
					t.Fatalf("node 2 didn't read msg")

				case syncer1.gossipMsgs <- msg:
				}
			}
		}
	}

	// Now that both sides have the full responses, we'll send over the
	// channels that they need to filter out. As both sides have the exact
	// same set of channels, they should skip to the final state.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("no query recvd")

	case <-chanSeries1.filterReq:
		chanSeries1.filterResp <- []lnwire.ShortChannelID{}
	}
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("no query recvd")

	case <-chanSeries2.filterReq:
		chanSeries2.filterResp <- []lnwire.ShortChannelID{}
	}

	// As both parties are already synced, the next message they send to
	// each other should be the GossipTimestampRange message.
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("didn't get msg from syncer1")

	case msgs := <-msgChan1:
		for _, msg := range msgs {
			// The message MUST be a GossipTimestampRange message.
			_, ok := msg.(*lnwire.GossipTimestampRange)
			if !ok {
				t.Fatalf("wrong message: expected "+
					"QueryChannelRange for %T", msg)
			}

			select {
			case <-time.After(time.Second * 2):
				t.Fatalf("node 2 didn't read msg")

			case syncer2.gossipMsgs <- msg:

			}
		}
	}
	select {
	case <-time.After(time.Second * 2):
		t.Fatalf("didn't get msg from syncer1")

	case msgs := <-msgChan2:
		for _, msg := range msgs {
			// The message MUST be a GossipTimestampRange message.
			_, ok := msg.(*lnwire.GossipTimestampRange)
			if !ok {
				t.Fatalf("wrong message: expected "+
					"QueryChannelRange for %T", msg)
			}

			select {
			case <-time.After(time.Second * 2):
				t.Fatalf("node 2 didn't read msg")

			case syncer1.gossipMsgs <- msg:

			}
		}
	}
}

// TestGossipSyncerSyncTransitions ensures that the gossip syncer properly
// carries out its duties when accepting a new sync transition request.
func TestGossipSyncerSyncTransitions(t *testing.T) {
	t.Parallel()

	assertMsgSent := func(t *testing.T, msgChan chan []lnwire.Message,
		msg lnwire.Message) {

		t.Helper()

		var msgSent lnwire.Message
		select {
		case msgs := <-msgChan:
			if len(msgs) != 1 {
				t.Fatal("expected to send a single message at "+
					"a time, got %d", len(msgs))
			}
			msgSent = msgs[0]
		case <-time.After(time.Second):
			t.Fatalf("expected to send %T message", msg)
		}

		if !reflect.DeepEqual(msgSent, msg) {
			t.Fatalf("expected to send message: %v\ngot: %v",
				spew.Sdump(msg), spew.Sdump(msgSent))
		}
	}

	tests := []struct {
		name          string
		entrySyncType SyncerType
		finalSyncType SyncerType
		assert        func(t *testing.T, msgChan chan []lnwire.Message,
			syncer *GossipSyncer)
	}{
		{
			name:          "active to passive",
			entrySyncType: ActiveSync,
			finalSyncType: PassiveSync,
			assert: func(t *testing.T, msgChan chan []lnwire.Message,
				g *GossipSyncer) {

				// When transitioning from active to passive, we
				// should expect to see a new local update
				// horizon sent to the remote peer indicating
				// that it would not like to receive any future
				// updates.
				assertMsgSent(t, msgChan, &lnwire.GossipTimestampRange{
					FirstTimestamp: uint32(zeroTimestamp.Unix()),
					TimestampRange: 0,
				})

				syncState := g.syncState()
				if syncState != chansSynced {
					t.Fatalf("expected syncerState %v, "+
						"got %v", chansSynced, syncState)
				}
			},
		},
		{
			name:          "passive to active",
			entrySyncType: PassiveSync,
			finalSyncType: ActiveSync,
			assert: func(t *testing.T, msgChan chan []lnwire.Message,
				g *GossipSyncer) {

				// When transitioning from historical to active,
				// we should expect to see a new local update
				// horizon sent to the remote peer indicating
				// that it would like to receive any future
				// updates.
				firstTimestamp := uint32(time.Now().Unix())
				assertMsgSent(t, msgChan, &lnwire.GossipTimestampRange{
					FirstTimestamp: firstTimestamp,
					TimestampRange: math.MaxUint32,
				})

				syncState := g.syncState()
				if syncState != chansSynced {
					t.Fatalf("expected syncerState %v, "+
						"got %v", chansSynced, syncState)
				}
			},
		},
	}

	for _, test := range tests {
		t.Run(test.name, func(t *testing.T) {
			t.Parallel()

			// We'll start each test by creating our syncer. We'll
			// initialize it with a state of chansSynced, as that's
			// the only time when it can process sync transitions.
			msgChan, syncer, _ := newTestSyncer(
				lnwire.ShortChannelID{
					BlockHeight: latestKnownHeight,
				},
				defaultEncoding, defaultChunkSize,
			)
			syncer.setSyncState(chansSynced)

			// We'll set the initial syncType to what the test
			// demands.
			syncer.setSyncType(test.entrySyncType)

			// We'll then start the syncer in order to process the
			// request.
			syncer.Start()
			defer syncer.Stop()

			syncer.ProcessSyncTransition(test.finalSyncType)

			// The syncer should now have the expected final
			// SyncerType that the test expects.
			syncType := syncer.SyncType()
			if syncType != test.finalSyncType {
				t.Fatalf("expected syncType %v, got %v",
					test.finalSyncType, syncType)
			}

			// Finally, we'll run a set of assertions for each test
			// to ensure the syncer performed its expected duties
			// after processing its sync transition.
			test.assert(t, msgChan, syncer)
		})
	}
}

// TestGossipSyncerHistoricalSync tests that a gossip syncer can perform a
// historical sync with the remote peer.
func TestGossipSyncerHistoricalSync(t *testing.T) {
	t.Parallel()

	// We'll create a new gossip syncer and manually override its state to
	// chansSynced. This is necessary as the syncer can only process
	// historical sync requests in this state.
	msgChan, syncer, _ := newTestSyncer(
		lnwire.ShortChannelID{BlockHeight: latestKnownHeight},
		defaultEncoding, defaultChunkSize,
	)
	syncer.setSyncType(PassiveSync)
	syncer.setSyncState(chansSynced)

	syncer.Start()
	defer syncer.Stop()

	syncer.historicalSync()

	// We should expect to see a single lnwire.QueryChannelRange message be
	// sent to the remote peer with a FirstBlockHeight of 0.
	expectedMsg := &lnwire.QueryChannelRange{
		FirstBlockHeight: 0,
		NumBlocks:        math.MaxUint32,
	}

	select {
	case msgs := <-msgChan:
		if len(msgs) != 1 {
			t.Fatalf("expected to send a single "+
				"lnwire.QueryChannelRange message, got %d",
				len(msgs))
		}
		if !reflect.DeepEqual(msgs[0], expectedMsg) {
			t.Fatalf("expected to send message: %v\ngot: %v",
				spew.Sdump(expectedMsg), spew.Sdump(msgs[0]))
		}
	case <-time.After(time.Second):
		t.Fatalf("expected to send a lnwire.QueryChannelRange message")
	}
}

// TestGossipSyncerSyncedSignal ensures that we receive a signal when a gossip
// syncer reaches its terminal chansSynced state.
func TestGossipSyncerSyncedSignal(t *testing.T) {
	t.Parallel()

	// We'll create a new gossip syncer and manually override its state to
	// chansSynced.
	_, syncer, _ := newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)
	syncer.setSyncState(chansSynced)

	// We'll go ahead and request a signal to be notified of when it reaches
	// this state.
	signalChan := syncer.ResetSyncedSignal()

	// Starting the gossip syncer should cause the signal to be delivered.
	syncer.Start()

	select {
	case <-signalChan:
	case <-time.After(time.Second):
		t.Fatal("expected to receive chansSynced signal")
	}

	syncer.Stop()

	// We'll try this again, but this time we'll request the signal after
	// the syncer is active and has already reached its chansSynced state.
	_, syncer, _ = newTestSyncer(
		lnwire.NewShortChanIDFromInt(10), defaultEncoding,
		defaultChunkSize,
	)

	syncer.setSyncState(chansSynced)

	syncer.Start()
	defer syncer.Stop()

	signalChan = syncer.ResetSyncedSignal()

	// The signal should be delivered immediately.
	select {
	case <-signalChan:
	case <-time.After(time.Second):
		t.Fatal("expected to receive chansSynced signal")
	}
}