lnd.xprv/discovery/syncer.go
Wilmer Paulino 227e492ccf
discovery: make historicalSync transition synchronous
We do this to ensure that the state transition from chansSynced to
syncingChans has occurred by the time we return back to the caller.
2019-04-24 13:20:18 -07:00

1224 lines
39 KiB
Go

package discovery
import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/time/rate"
)
// SyncerType encapsulates the different types of syncing mechanisms for a
// gossip syncer.
type SyncerType uint8
const (
// ActiveSync denotes that a gossip syncer:
//
// 1. Should not attempt to synchronize with the remote peer for
// missing channels.
// 2. Should respond to queries from the remote peer.
// 3. Should receive new updates from the remote peer.
//
// They are started in a chansSynced state in order to accomplish their
// responsibilities above.
ActiveSync SyncerType = iota
// PassiveSync denotes that a gossip syncer:
//
// 1. Should not attempt to synchronize with the remote peer for
// missing channels.
// 2. Should respond to queries from the remote peer.
// 3. Should not receive new updates from the remote peer.
//
// They are started in a chansSynced state in order to accomplish their
// responsibilities above.
PassiveSync
)
// String returns a human readable string describing the target SyncerType.
func (t SyncerType) String() string {
switch t {
case ActiveSync:
return "ActiveSync"
case PassiveSync:
return "PassiveSync"
default:
return fmt.Sprintf("unknown sync type %d", t)
}
}
// syncerState is an enum that represents the current state of the GossipSyncer.
// As the syncer is a state machine, we'll gate our actions based off of the
// current state and the next incoming message.
type syncerState uint32
const (
// syncingChans is the default state of the GossipSyncer. We start in
// this state when a new peer first connects and we don't yet know if
// we're fully synchronized.
syncingChans syncerState = iota
// waitingQueryRangeReply is the second main phase of the GossipSyncer.
// We enter this state after we send out our first QueryChannelRange
// reply. We'll stay in this state until the remote party sends us a
// ReplyShortChanIDsEnd message that indicates they've responded to our
// query entirely. After this state, we'll transition to
// waitingQueryChanReply after we send out requests for all the new
// chan ID's to us.
waitingQueryRangeReply
// queryNewChannels is the third main phase of the GossipSyncer. In
// this phase we'll send out all of our QueryShortChanIDs messages in
// response to the new channels that we don't yet know about.
queryNewChannels
// waitingQueryChanReply is the fourth main phase of the GossipSyncer.
// We enter this phase once we've sent off a query chink to the remote
// peer. We'll stay in this phase until we receive a
// ReplyShortChanIDsEnd message which indicates that the remote party
// has responded to all of our requests.
waitingQueryChanReply
// chansSynced is the terminal stage of the GossipSyncer. Once we enter
// this phase, we'll send out our update horizon, which filters out the
// set of channel updates that we're interested in. In this state,
// we'll be able to accept any outgoing messages from the
// AuthenticatedGossiper, and decide if we should forward them to our
// target peer based on its update horizon.
chansSynced
)
// String returns a human readable string describing the target syncerState.
func (s syncerState) String() string {
switch s {
case syncingChans:
return "syncingChans"
case waitingQueryRangeReply:
return "waitingQueryRangeReply"
case queryNewChannels:
return "queryNewChannels"
case waitingQueryChanReply:
return "waitingQueryChanReply"
case chansSynced:
return "chansSynced"
default:
return "UNKNOWN STATE"
}
}
const (
// DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
// will respond to immediately before starting to delay responses.
DefaultMaxUndelayedQueryReplies = 10
// DefaultDelayedQueryReplyInterval is the length of time we will wait
// before responding to gossip queries after replying to
// maxUndelayedQueryReplies queries.
DefaultDelayedQueryReplyInterval = 5 * time.Second
// chanRangeQueryBuffer is the number of blocks back that we'll go when
// asking the remote peer for their any channels they know of beyond
// our highest known channel ID.
chanRangeQueryBuffer = 144
// syncTransitionTimeout is the default timeout in which we'll wait up
// to when attempting to perform a sync transition.
syncTransitionTimeout = 5 * time.Second
// requestBatchSize is the maximum number of channels we will query the
// remote peer for in a QueryShortChanIDs message.
requestBatchSize = 500
)
var (
// encodingTypeToChunkSize maps an encoding type, to the max number of
// short chan ID's using the encoding type that we can fit into a
// single message safely.
encodingTypeToChunkSize = map[lnwire.ShortChanIDEncoding]int32{
lnwire.EncodingSortedPlain: 8000,
}
// ErrGossipSyncerExiting signals that the syncer has been killed.
ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
// ErrSyncTransitionTimeout is an error returned when we've timed out
// attempting to perform a sync transition.
ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
"transition sync type")
// zeroTimestamp is the timestamp we'll use when we want to indicate to
// peers that we do not want to receive any new graph updates.
zeroTimestamp time.Time
)
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
type syncTransitionReq struct {
newSyncType SyncerType
errChan chan error
}
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
// historical sync.
type historicalSyncReq struct {
// doneChan is a channel that serves as a signal and is closed to ensure
// the historical sync is attempted by the time we return to the caller.
doneChan chan struct{}
}
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
// needs to carry out its duties.
type gossipSyncerCfg struct {
// chainHash is the chain that this syncer is responsible for.
chainHash chainhash.Hash
// peerPub is the public key of the peer we're syncing with, serialized
// in compressed format.
peerPub [33]byte
// syncChanUpdates is a bool that indicates if we should request a
// continual channel update stream or not.
syncChanUpdates bool
// channelSeries is the primary interface that we'll use to generate
// our queries and respond to the queries of the remote peer.
channelSeries ChannelGraphTimeSeries
// encodingType is the current encoding type we're aware of. Requests
// with different encoding types will be rejected.
encodingType lnwire.ShortChanIDEncoding
// chunkSize is the max number of short chan IDs using the syncer's
// encoding type that we can fit into a single message safely.
chunkSize int32
// batchSize is the max number of channels the syncer will query from
// the remote node in a single QueryShortChanIDs request.
batchSize int32
// sendToPeer is a function closure that should send the set of
// targeted messages to the peer we've been assigned to sync the graph
// state from.
sendToPeer func(...lnwire.Message) error
// maxUndelayedQueryReplies specifies how many gossip queries we will
// respond to immediately before starting to delay responses.
maxUndelayedQueryReplies int
// delayedQueryReplyInterval is the length of time we will wait before
// responding to gossip queries after replying to
// maxUndelayedQueryReplies queries.
delayedQueryReplyInterval time.Duration
}
// GossipSyncer is a struct that handles synchronizing the channel graph state
// with a remote peer. The GossipSyncer implements a state machine that will
// progressively ensure we're synchronized with the channel state of the remote
// node. Once both nodes have been synchronized, we'll use an update filter to
// filter out which messages should be sent to a remote peer based on their
// update horizon. If the update horizon isn't specified, then we won't send
// them any channel updates at all.
type GossipSyncer struct {
started sync.Once
stopped sync.Once
// state is the current state of the GossipSyncer.
//
// NOTE: This variable MUST be used atomically.
state uint32
// syncType denotes the SyncerType the gossip syncer is currently
// exercising.
//
// NOTE: This variable MUST be used atomically.
syncType uint32
// remoteUpdateHorizon is the update horizon of the remote peer. We'll
// use this to properly filter out any messages.
remoteUpdateHorizon *lnwire.GossipTimestampRange
// localUpdateHorizon is our local update horizon, we'll use this to
// determine if we've already sent out our update.
localUpdateHorizon *lnwire.GossipTimestampRange
// syncTransitions is a channel through which new sync type transition
// requests will be sent through. These requests should only be handled
// when the gossip syncer is in a chansSynced state to ensure its state
// machine behaves as expected.
syncTransitionReqs chan *syncTransitionReq
// historicalSyncReqs is a channel that serves as a signal for the
// gossip syncer to perform a historical sync. Theese can only be done
// once the gossip syncer is in a chansSynced state to ensure its state
// machine behaves as expected.
historicalSyncReqs chan *historicalSyncReq
// genHistoricalChanRangeQuery when true signals to the gossip syncer
// that it should request the remote peer for all of its known channel
// IDs starting from the genesis block of the chain. This can only
// happen if the gossip syncer receives a request to attempt a
// historical sync. It can be unset if the syncer ever transitions from
// PassiveSync to ActiveSync.
genHistoricalChanRangeQuery bool
// gossipMsgs is a channel that all messages from the target peer will
// be sent over.
gossipMsgs chan lnwire.Message
// bufferedChanRangeReplies is used in the waitingQueryChanReply to
// buffer all the chunked response to our query.
bufferedChanRangeReplies []lnwire.ShortChannelID
// newChansToQuery is used to pass the set of channels we should query
// for from the waitingQueryChanReply state to the queryNewChannels
// state.
newChansToQuery []lnwire.ShortChannelID
cfg gossipSyncerCfg
// rateLimiter dictates the frequency with which we will reply to gossip
// queries from a peer. This is used to delay responses to peers to
// prevent DOS vulnerabilities if they are spamming with an unreasonable
// number of queries.
rateLimiter *rate.Limiter
// syncedSignal is a channel that, if set, will be closed when the
// GossipSyncer reaches its terminal chansSynced state.
syncedSignal chan struct{}
sync.Mutex
quit chan struct{}
wg sync.WaitGroup
}
// newGossipSyncer returns a new instance of the GossipSyncer populated using
// the passed config.
func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer {
// If no parameter was specified for max undelayed query replies, set it
// to the default of 5 queries.
if cfg.maxUndelayedQueryReplies <= 0 {
cfg.maxUndelayedQueryReplies = DefaultMaxUndelayedQueryReplies
}
// If no parameter was specified for delayed query reply interval, set
// to the default of 5 seconds.
if cfg.delayedQueryReplyInterval <= 0 {
cfg.delayedQueryReplyInterval = DefaultDelayedQueryReplyInterval
}
// Construct a rate limiter that will govern how frequently we reply to
// gossip queries from this peer. The limiter will automatically adjust
// during periods of quiescence, and increase the reply interval under
// load.
interval := rate.Every(cfg.delayedQueryReplyInterval)
rateLimiter := rate.NewLimiter(
interval, cfg.maxUndelayedQueryReplies,
)
return &GossipSyncer{
cfg: cfg,
rateLimiter: rateLimiter,
syncTransitionReqs: make(chan *syncTransitionReq),
historicalSyncReqs: make(chan *historicalSyncReq),
gossipMsgs: make(chan lnwire.Message, 100),
quit: make(chan struct{}),
}
}
// Start starts the GossipSyncer and any goroutines that it needs to carry out
// its duties.
func (g *GossipSyncer) Start() {
g.started.Do(func() {
log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
g.wg.Add(1)
go g.channelGraphSyncer()
})
}
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
// exited.
func (g *GossipSyncer) Stop() {
g.stopped.Do(func() {
close(g.quit)
g.wg.Wait()
})
}
// channelGraphSyncer is the main goroutine responsible for ensuring that we
// properly channel graph state with the remote peer, and also that we only
// send them messages which actually pass their defined update horizon.
func (g *GossipSyncer) channelGraphSyncer() {
defer g.wg.Done()
for {
state := g.syncState()
syncType := g.SyncType()
log.Debugf("GossipSyncer(%x): state=%v, type=%v",
g.cfg.peerPub[:], state, syncType)
switch syncerState(state) {
// When we're in this state, we're trying to synchronize our
// view of the network with the remote peer. We'll kick off
// this sync by asking them for the set of channels they
// understand, as we'll as responding to any other queries by
// them.
case syncingChans:
// If we're in this state, then we'll send the remote
// peer our opening QueryChannelRange message.
queryRangeMsg, err := g.genChanRangeQuery(
g.genHistoricalChanRangeQuery,
)
if err != nil {
log.Errorf("unable to gen chan range "+
"query: %v", err)
return
}
err = g.cfg.sendToPeer(queryRangeMsg)
if err != nil {
log.Errorf("unable to send chan range "+
"query: %v", err)
return
}
// With the message sent successfully, we'll transition
// into the next state where we wait for their reply.
g.setSyncState(waitingQueryRangeReply)
// In this state, we've sent out our initial channel range
// query and are waiting for the final response from the remote
// peer before we perform a diff to see with channels they know
// of that we don't.
case waitingQueryRangeReply:
// We'll wait to either process a new message from the
// remote party, or exit due to the gossiper exiting,
// or us being signalled to do so.
select {
case msg := <-g.gossipMsgs:
// The remote peer is sending a response to our
// initial query, we'll collate this response,
// and see if it's the final one in the series.
// If so, we can then transition to querying
// for the new channels.
queryReply, ok := msg.(*lnwire.ReplyChannelRange)
if ok {
err := g.processChanRangeReply(queryReply)
if err != nil {
log.Errorf("unable to "+
"process chan range "+
"query: %v", err)
return
}
continue
}
// Otherwise, it's the remote peer performing a
// query, which we'll attempt to reply to.
err := g.replyPeerQueries(msg)
if err != nil && err != ErrGossipSyncerExiting {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case <-g.quit:
return
}
// We'll enter this state once we've discovered which channels
// the remote party knows of that we don't yet know of
// ourselves.
case queryNewChannels:
// First, we'll attempt to continue our channel
// synchronization by continuing to send off another
// query chunk.
done, err := g.synchronizeChanIDs()
if err != nil {
log.Errorf("unable to sync chan IDs: %v", err)
}
// If this wasn't our last query, then we'll need to
// transition to our waiting state.
if !done {
g.setSyncState(waitingQueryChanReply)
continue
}
// If we're fully synchronized, then we can transition
// to our terminal state.
g.setSyncState(chansSynced)
// In this state, we've just sent off a new query for channels
// that we don't yet know of. We'll remain in this state until
// the remote party signals they've responded to our query in
// totality.
case waitingQueryChanReply:
// Once we've sent off our query, we'll wait for either
// an ending reply, or just another query from the
// remote peer.
select {
case msg := <-g.gossipMsgs:
// If this is the final reply to one of our
// queries, then we'll loop back into our query
// state to send of the remaining query chunks.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if ok {
g.setSyncState(queryNewChannels)
continue
}
// Otherwise, it's the remote peer performing a
// query, which we'll attempt to deploy to.
err := g.replyPeerQueries(msg)
if err != nil && err != ErrGossipSyncerExiting {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case <-g.quit:
return
}
// This is our final terminal state where we'll only reply to
// any further queries by the remote peer.
case chansSynced:
g.Lock()
if g.syncedSignal != nil {
close(g.syncedSignal)
g.syncedSignal = nil
}
g.Unlock()
// If we haven't yet sent out our update horizon, and
// we want to receive real-time channel updates, we'll
// do so now.
if g.localUpdateHorizon == nil && syncType == ActiveSync {
err := g.sendGossipTimestampRange(
time.Now(), math.MaxUint32,
)
if err != nil {
log.Errorf("Unable to send update "+
"horizon to %x: %v",
g.cfg.peerPub, err)
}
}
// With our horizon set, we'll simply reply to any new
// messages or process any state transitions and exit if
// needed.
select {
case msg := <-g.gossipMsgs:
err := g.replyPeerQueries(msg)
if err != nil && err != ErrGossipSyncerExiting {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case req := <-g.syncTransitionReqs:
req.errChan <- g.handleSyncTransition(req)
case req := <-g.historicalSyncReqs:
g.handleHistoricalSync(req)
case <-g.quit:
return
}
}
}
}
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
// syncer and sends it to the remote peer.
func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
timestampRange uint32) error {
endTimestamp := firstTimestamp.Add(
time.Duration(timestampRange) * time.Second,
)
log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
g.cfg.peerPub[:], firstTimestamp, endTimestamp)
localUpdateHorizon := &lnwire.GossipTimestampRange{
ChainHash: g.cfg.chainHash,
FirstTimestamp: uint32(firstTimestamp.Unix()),
TimestampRange: timestampRange,
}
if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil {
return err
}
if firstTimestamp == zeroTimestamp && timestampRange == 0 {
g.localUpdateHorizon = nil
} else {
g.localUpdateHorizon = localUpdateHorizon
}
return nil
}
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
// the remote peer for its known set of channel IDs within a particular block
// range. This method will be called continually until the entire range has
// been queried for with a response received. We'll chunk our requests as
// required to ensure they fit into a single message. We may re-renter this
// state in the case that chunking is required.
func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
// If we're in this state yet there are no more new channels to query
// for, then we'll transition to our final synced state and return true
// to signal that we're fully synchronized.
if len(g.newChansToQuery) == 0 {
log.Infof("GossipSyncer(%x): no more chans to query",
g.cfg.peerPub[:])
return true, nil
}
// Otherwise, we'll issue our next chunked query to receive replies
// for.
var queryChunk []lnwire.ShortChannelID
// If the number of channels to query for is less than the chunk size,
// then we can issue a single query.
if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
queryChunk = g.newChansToQuery
g.newChansToQuery = nil
} else {
// Otherwise, we'll need to only query for the next chunk.
// We'll slice into our query chunk, then slide down our main
// pointer down by the chunk size.
queryChunk = g.newChansToQuery[:g.cfg.batchSize]
g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
}
log.Infof("GossipSyncer(%x): querying for %v new channels",
g.cfg.peerPub[:], len(queryChunk))
// With our chunk obtained, we'll send over our next query, then return
// false indicating that we're net yet fully synced.
err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
ChainHash: g.cfg.chainHash,
EncodingType: lnwire.EncodingSortedPlain,
ShortChanIDs: queryChunk,
})
return false, err
}
// processChanRangeReply is called each time the GossipSyncer receives a new
// reply to the initial range query to discover new channels that it didn't
// previously know of.
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
g.bufferedChanRangeReplies = append(
g.bufferedChanRangeReplies, msg.ShortChanIDs...,
)
log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
g.cfg.peerPub[:], len(msg.ShortChanIDs))
// If this isn't the last response, then we can exit as we've already
// buffered the latest portion of the streaming reply.
if msg.Complete == 0 {
return nil
}
log.Infof("GossipSyncer(%x): filtering through %v chans",
g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
// Otherwise, this is the final response, so we'll now check to see
// which channels they know of that we don't.
newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
g.cfg.chainHash, g.bufferedChanRangeReplies,
)
if err != nil {
return fmt.Errorf("unable to filter chan ids: %v", err)
}
// As we've received the entirety of the reply, we no longer need to
// hold on to the set of buffered replies, so we'll let that be garbage
// collected now.
g.bufferedChanRangeReplies = nil
// If there aren't any channels that we don't know of, then we can
// switch straight to our terminal state.
if len(newChans) == 0 {
log.Infof("GossipSyncer(%x): remote peer has no new chans",
g.cfg.peerPub[:])
g.setSyncState(chansSynced)
return nil
}
// Otherwise, we'll set the set of channels that we need to query for
// the next state, and also transition our state.
g.newChansToQuery = newChans
g.setSyncState(queryNewChannels)
log.Infof("GossipSyncer(%x): starting query for %v new chans",
g.cfg.peerPub[:], len(newChans))
return nil
}
// genChanRangeQuery generates the initial message we'll send to the remote
// party when we're kicking off the channel graph synchronization upon
// connection. The historicalQuery boolean can be used to generate a query from
// the genesis block of the chain.
func (g *GossipSyncer) genChanRangeQuery(
historicalQuery bool) (*lnwire.QueryChannelRange, error) {
// First, we'll query our channel graph time series for its highest
// known channel ID.
newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
if err != nil {
return nil, err
}
// Once we have the chan ID of the newest, we'll obtain the block height
// of the channel, then subtract our default horizon to ensure we don't
// miss any channels. By default, we go back 1 day from the newest
// channel, unless we're attempting a historical sync, where we'll
// actually start from the genesis block instead.
var startHeight uint32
switch {
case historicalQuery:
fallthrough
case newestChan.BlockHeight <= chanRangeQueryBuffer:
startHeight = 0
default:
startHeight = uint32(newestChan.BlockHeight - chanRangeQueryBuffer)
}
log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
"and %v blocks after", g.cfg.peerPub[:], startHeight,
math.MaxUint32-startHeight)
// Finally, we'll craft the channel range query, using our starting
// height, then asking for all known channels to the foreseeable end of
// the main chain.
return &lnwire.QueryChannelRange{
ChainHash: g.cfg.chainHash,
FirstBlockHeight: startHeight,
NumBlocks: math.MaxUint32 - startHeight,
}, nil
}
// replyPeerQueries is called in response to any query by the remote peer.
// We'll examine our state and send back our best response.
func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error {
reservation := g.rateLimiter.Reserve()
delay := reservation.Delay()
// If we've already replied a handful of times, we will start to delay
// responses back to the remote peer. This can help prevent DOS attacks
// where the remote peer spams us endlessly.
if delay > 0 {
log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
"responding in %s", g.cfg.peerPub[:], delay)
select {
case <-time.After(delay):
case <-g.quit:
return ErrGossipSyncerExiting
}
}
switch msg := msg.(type) {
// In this state, we'll also handle any incoming channel range queries
// from the remote peer as they're trying to sync their state as well.
case *lnwire.QueryChannelRange:
return g.replyChanRangeQuery(msg)
// If the remote peer skips straight to requesting new channels that
// they don't know of, then we'll ensure that we also handle this case.
case *lnwire.QueryShortChanIDs:
return g.replyShortChanIDs(msg)
default:
return fmt.Errorf("unknown message: %T", msg)
}
}
// replyChanRangeQuery will be dispatched in response to a channel range query
// by the remote node. We'll query the channel time series for channels that
// meet the channel range, then chunk our responses to the remote node. We also
// ensure that our final fragment carries the "complete" bit to indicate the
// end of our streaming response.
func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
"num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
query.NumBlocks)
// Next, we'll consult the time series to obtain the set of known
// channel ID's that match their query.
startBlock := query.FirstBlockHeight
channelRange, err := g.cfg.channelSeries.FilterChannelRange(
query.ChainHash, startBlock, startBlock+query.NumBlocks,
)
if err != nil {
return err
}
// TODO(roasbeef): means can't send max uint above?
// * or make internal 64
numChannels := int32(len(channelRange))
numChansSent := int32(0)
for {
// We'll send our this response in a streaming manner,
// chunk-by-chunk. We do this as there's a transport message
// size limit which we'll need to adhere to.
var channelChunk []lnwire.ShortChannelID
// We know this is the final chunk, if the difference between
// the total number of channels, and the number of channels
// we've sent is less-than-or-equal to the chunk size.
isFinalChunk := (numChannels - numChansSent) <= g.cfg.chunkSize
// If this is indeed the last chunk, then we'll send the
// remainder of the channels.
if isFinalChunk {
channelChunk = channelRange[numChansSent:]
log.Infof("GossipSyncer(%x): sending final chan "+
"range chunk, size=%v", g.cfg.peerPub[:],
len(channelChunk))
} else {
// Otherwise, we'll only send off a fragment exactly
// sized to the proper chunk size.
channelChunk = channelRange[numChansSent : numChansSent+g.cfg.chunkSize]
log.Infof("GossipSyncer(%x): sending range chunk of "+
"size=%v", g.cfg.peerPub[:], len(channelChunk))
}
// With our chunk assembled, we'll now send to the remote peer
// the current chunk.
replyChunk := lnwire.ReplyChannelRange{
QueryChannelRange: *query,
Complete: 0,
EncodingType: g.cfg.encodingType,
ShortChanIDs: channelChunk,
}
if isFinalChunk {
replyChunk.Complete = 1
}
if err := g.cfg.sendToPeer(&replyChunk); err != nil {
return err
}
// If this was the final chunk, then we'll exit now as our
// response is now complete.
if isFinalChunk {
return nil
}
numChansSent += int32(len(channelChunk))
}
}
// replyShortChanIDs will be dispatched in response to a query by the remote
// node for information concerning a set of short channel ID's. Our response
// will be sent in a streaming chunked manner to ensure that we remain below
// the current transport level message size.
func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error {
// Before responding, we'll check to ensure that the remote peer is
// querying for the same chain that we're on. If not, we'll send back a
// response with a complete value of zero to indicate we're on a
// different chain.
if g.cfg.chainHash != query.ChainHash {
log.Warnf("Remote peer requested QueryShortChanIDs for "+
"chain=%v, we're on chain=%v", g.cfg.chainHash,
query.ChainHash)
return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 0,
})
}
if len(query.ShortChanIDs) == 0 {
log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
g.cfg.peerPub[:])
return nil
}
log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
g.cfg.peerPub[:], len(query.ShortChanIDs))
// Now that we know we're on the same chain, we'll query the channel
// time series for the set of messages that we know of which satisfies
// the requirement of being a chan ann, chan update, or a node ann
// related to the set of queried channels.
replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
query.ChainHash, query.ShortChanIDs,
)
if err != nil {
return fmt.Errorf("unable to fetch chan anns for %v..., %v",
query.ShortChanIDs[0].ToUint64(), err)
}
// If we didn't find any messages related to those channel ID's, then
// we'll send over a reply marking the end of our response, and exit
// early.
if len(replyMsgs) == 0 {
return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 1,
})
}
// Otherwise, we'll send over our set of messages responding to the
// query, with the ending message appended to it.
replyMsgs = append(replyMsgs, &lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 1,
})
return g.cfg.sendToPeer(replyMsgs...)
}
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
// state machine. Once applied, we'll ensure that we don't forward any messages
// to the peer that aren't within the time range of the filter.
func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
g.Lock()
g.remoteUpdateHorizon = filter
startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
endTime := startTime.Add(
time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
)
g.Unlock()
// Now that the remote peer has applied their filter, we'll query the
// database for all the messages that are beyond this filter.
newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
g.cfg.chainHash, startTime, endTime,
)
if err != nil {
return err
}
log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+
"end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
len(newUpdatestoSend))
// If we don't have any to send, then we can return early.
if len(newUpdatestoSend) == 0 {
return nil
}
// We'll conclude by launching a goroutine to send out any updates.
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := g.cfg.sendToPeer(newUpdatestoSend...); err != nil {
log.Errorf("unable to send messages for peer catch "+
"up: %v", err)
}
}()
return nil
}
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
// iff the message is within the bounds of their set gossip filter. If the peer
// doesn't have a gossip filter set, then no messages will be forwarded.
func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
// If the peer doesn't have an update horizon set, then we won't send
// it any new update messages.
if g.remoteUpdateHorizon == nil {
return
}
// If we've been signaled to exit, or are exiting, then we'll stop
// short.
select {
case <-g.quit:
return
default:
}
// TODO(roasbeef): need to ensure that peer still online...send msg to
// gossiper on peer termination to signal peer disconnect?
var err error
// Before we filter out the messages, we'll construct an index over the
// set of channel announcements and channel updates. This will allow us
// to quickly check if we should forward a chan ann, based on the known
// channel updates for a channel.
chanUpdateIndex := make(map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate)
for _, msg := range msgs {
chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate)
if !ok {
continue
}
chanUpdateIndex[chanUpdate.ShortChannelID] = append(
chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
)
}
// We'll construct a helper function that we'll us below to determine
// if a given messages passes the gossip msg filter.
g.Lock()
startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
endTime := startTime.Add(
time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
)
g.Unlock()
passesFilter := func(timeStamp uint32) bool {
t := time.Unix(int64(timeStamp), 0)
return t.Equal(startTime) ||
(t.After(startTime) && t.Before(endTime))
}
msgsToSend := make([]lnwire.Message, 0, len(msgs))
for _, msg := range msgs {
// If the target peer is the peer that sent us this message,
// then we'll exit early as we don't need to filter this
// message.
if _, ok := msg.senders[g.cfg.peerPub]; ok {
continue
}
switch msg := msg.msg.(type) {
// For each channel announcement message, we'll only send this
// message if the channel updates for the channel are between
// our time range.
case *lnwire.ChannelAnnouncement:
// First, we'll check if the channel updates are in
// this message batch.
chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
if !ok {
// If not, we'll attempt to query the database
// to see if we know of the updates.
chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
g.cfg.chainHash, msg.ShortChannelID,
)
if err != nil {
log.Warnf("no channel updates found for "+
"short_chan_id=%v",
msg.ShortChannelID)
continue
}
}
for _, chanUpdate := range chanUpdates {
if passesFilter(chanUpdate.Timestamp) {
msgsToSend = append(msgsToSend, msg)
break
}
}
if len(chanUpdates) == 0 {
msgsToSend = append(msgsToSend, msg)
}
// For each channel update, we'll only send if it the timestamp
// is between our time range.
case *lnwire.ChannelUpdate:
if passesFilter(msg.Timestamp) {
msgsToSend = append(msgsToSend, msg)
}
// Similarly, we only send node announcements if the update
// timestamp ifs between our set gossip filter time range.
case *lnwire.NodeAnnouncement:
if passesFilter(msg.Timestamp) {
msgsToSend = append(msgsToSend, msg)
}
}
}
log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
g.cfg.peerPub[:], len(msgs), len(msgsToSend))
if len(msgsToSend) == 0 {
return
}
g.cfg.sendToPeer(msgsToSend...)
}
// ProcessQueryMsg is used by outside callers to pass new channel time series
// queries to the internal processing goroutine.
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) {
select {
case g.gossipMsgs <- msg:
case <-peerQuit:
case <-g.quit:
}
}
// setSyncState sets the gossip syncer's state to the given state.
func (g *GossipSyncer) setSyncState(state syncerState) {
atomic.StoreUint32(&g.state, uint32(state))
}
// syncState returns the current syncerState of the target GossipSyncer.
func (g *GossipSyncer) syncState() syncerState {
return syncerState(atomic.LoadUint32(&g.state))
}
// ResetSyncedSignal returns a channel that will be closed in order to serve as
// a signal for when the GossipSyncer has reached its chansSynced state.
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
g.Lock()
defer g.Unlock()
syncedSignal := make(chan struct{})
syncState := syncerState(atomic.LoadUint32(&g.state))
if syncState == chansSynced {
close(syncedSignal)
return syncedSignal
}
g.syncedSignal = syncedSignal
return g.syncedSignal
}
// ProcessSyncTransition sends a request to the gossip syncer to transition its
// sync type to a new one.
//
// NOTE: This can only be done once the gossip syncer has reached its final
// chansSynced state.
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
errChan := make(chan error, 1)
select {
case g.syncTransitionReqs <- &syncTransitionReq{
newSyncType: newSyncType,
errChan: errChan,
}:
case <-time.After(syncTransitionTimeout):
return ErrSyncTransitionTimeout
case <-g.quit:
return ErrGossipSyncerExiting
}
select {
case err := <-errChan:
return err
case <-g.quit:
return ErrGossipSyncerExiting
}
}
// handleSyncTransition handles a new sync type transition request.
//
// NOTE: The gossip syncer might have another sync state as a result of this
// transition.
func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
// Return early from any NOP sync transitions.
syncType := g.SyncType()
if syncType == req.newSyncType {
return nil
}
log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
g.cfg.peerPub, syncType, req.newSyncType)
var (
firstTimestamp time.Time
timestampRange uint32
)
switch req.newSyncType {
// If an active sync has been requested, then we should resume receiving
// new graph updates from the remote peer.
case ActiveSync:
firstTimestamp = time.Now()
timestampRange = math.MaxUint32
// If a PassiveSync transition has been requested, then we should no
// longer receive any new updates from the remote peer. We can do this
// by setting our update horizon to a range in the past ensuring no
// graph updates match the timestamp range.
case PassiveSync:
firstTimestamp = zeroTimestamp
timestampRange = 0
default:
return fmt.Errorf("unhandled sync transition %v",
req.newSyncType)
}
err := g.sendGossipTimestampRange(firstTimestamp, timestampRange)
if err != nil {
return fmt.Errorf("unable to send local update horizon: %v", err)
}
g.setSyncType(req.newSyncType)
return nil
}
// setSyncType sets the gossip syncer's sync type to the given type.
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
atomic.StoreUint32(&g.syncType, uint32(syncType))
}
// SyncType returns the current SyncerType of the target GossipSyncer.
func (g *GossipSyncer) SyncType() SyncerType {
return SyncerType(atomic.LoadUint32(&g.syncType))
}
// historicalSync sends a request to the gossip syncer to perofmr a historical
// sync.
//
// NOTE: This can only be done once the gossip syncer has reached its final
// chansSynced state.
func (g *GossipSyncer) historicalSync() error {
done := make(chan struct{})
select {
case g.historicalSyncReqs <- &historicalSyncReq{
doneChan: done,
}:
case <-time.After(syncTransitionTimeout):
return ErrSyncTransitionTimeout
case <-g.quit:
return ErrGossiperShuttingDown
}
select {
case <-done:
return nil
case <-g.quit:
return ErrGossiperShuttingDown
}
}
// handleHistoricalSync handles a request to the gossip syncer to perform a
// historical sync.
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
// We'll go back to our initial syncingChans state in order to request
// the remote peer to give us all of the channel IDs they know of
// starting from the genesis block.
g.genHistoricalChanRangeQuery = true
g.setSyncState(syncingChans)
close(req.doneChan)
}