discovery: export gossipSyncer
This commit is contained in:
parent
d954cfc4ba
commit
7e92b9a4e2
@ -75,7 +75,7 @@ type Config struct {
|
||||
Router routing.ChannelGraphSource
|
||||
|
||||
// ChanSeries is an interfaces that provides access to a time series
|
||||
// view of the current known channel graph. Each gossipSyncer enabled
|
||||
// view of the current known channel graph. Each GossipSyncer enabled
|
||||
// peer will utilize this in order to create and respond to channel
|
||||
// graph time series queries.
|
||||
ChanSeries ChannelGraphTimeSeries
|
||||
@ -218,7 +218,7 @@ type AuthenticatedGossiper struct {
|
||||
// directly to their gossiper, rather than broadcasting them. With this
|
||||
// change, we ensure we filter out all updates properly.
|
||||
syncerMtx sync.RWMutex
|
||||
peerSyncers map[routing.Vertex]*gossipSyncer
|
||||
peerSyncers map[routing.Vertex]*GossipSyncer
|
||||
|
||||
// reliableSender is a subsystem responsible for handling reliable
|
||||
// message send requests to peers. This should only be used for channels
|
||||
@ -243,7 +243,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
|
||||
prematureChannelUpdates: make(map[uint64][]*networkMsg),
|
||||
channelMtx: multimutex.NewMutex(),
|
||||
recentRejects: make(map[uint64]struct{}),
|
||||
peerSyncers: make(map[routing.Vertex]*gossipSyncer),
|
||||
peerSyncers: make(map[routing.Vertex]*GossipSyncer),
|
||||
}
|
||||
|
||||
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
|
||||
@ -463,7 +463,7 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
// For messages in the known set of channel series queries, we'll
|
||||
// dispatch the message directly to the gossipSyncer, and skip the main
|
||||
// dispatch the message directly to the GossipSyncer, and skip the main
|
||||
// processing loop.
|
||||
switch m := msg.(type) {
|
||||
case *lnwire.QueryShortChanIDs,
|
||||
@ -488,7 +488,7 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
|
||||
return errChan
|
||||
|
||||
// If a peer is updating its current update horizon, then we'll dispatch
|
||||
// that directly to the proper gossipSyncer.
|
||||
// that directly to the proper GossipSyncer.
|
||||
case *lnwire.GossipTimestampRange:
|
||||
syncer, err := d.findGossipSyncer(peer.IdentityKey())
|
||||
if err != nil {
|
||||
@ -590,10 +590,10 @@ type msgWithSenders struct {
|
||||
}
|
||||
|
||||
// mergeSyncerMap is used to merge the set of senders of a particular message
|
||||
// with peers that we have an active gossipSyncer with. We do this to ensure
|
||||
// with peers that we have an active GossipSyncer with. We do this to ensure
|
||||
// that we don't broadcast messages to any peers that we have active gossip
|
||||
// syncers for.
|
||||
func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]*gossipSyncer) {
|
||||
func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]*GossipSyncer) {
|
||||
for peerPub := range syncers {
|
||||
m.senders[peerPub] = struct{}{}
|
||||
}
|
||||
@ -817,7 +817,7 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
|
||||
// incoming message. If a gossip syncer isn't found, then one will be created
|
||||
// for the target peer.
|
||||
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (
|
||||
*gossipSyncer, error) {
|
||||
*GossipSyncer, error) {
|
||||
|
||||
target := routing.NewVertex(pub)
|
||||
|
||||
@ -1029,7 +1029,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
// syncers, we'll collect their pubkeys so we can avoid
|
||||
// sending them the full message blast below.
|
||||
d.syncerMtx.RLock()
|
||||
syncerPeers := make(map[routing.Vertex]*gossipSyncer)
|
||||
syncerPeers := make(map[routing.Vertex]*GossipSyncer)
|
||||
for peerPub, syncer := range d.peerSyncers {
|
||||
syncerPeers[peerPub] = syncer
|
||||
}
|
||||
@ -1104,7 +1104,7 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Creating new gossipSyncer for peer=%x", nodeID[:])
|
||||
log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
|
||||
|
||||
encoding := lnwire.EncodingSortedPlain
|
||||
syncer := newGossipSyncer(gossipSyncerCfg{
|
||||
@ -1125,12 +1125,12 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
|
||||
|
||||
// PruneSyncState is called by outside sub-systems once a peer that we were
|
||||
// previously connected to has been disconnected. In this case we can stop the
|
||||
// existing gossipSyncer assigned to the peer and free up resources.
|
||||
// existing GossipSyncer assigned to the peer and free up resources.
|
||||
func (d *AuthenticatedGossiper) PruneSyncState(peer *btcec.PublicKey) {
|
||||
d.syncerMtx.Lock()
|
||||
defer d.syncerMtx.Unlock()
|
||||
|
||||
log.Infof("Removing gossipSyncer for peer=%x",
|
||||
log.Infof("Removing GossipSyncer for peer=%x",
|
||||
peer.SerializeCompressed())
|
||||
|
||||
vertex := routing.NewVertex(peer)
|
||||
|
@ -13,18 +13,18 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// syncerState is an enum that represents the current state of the
|
||||
// gossipSyncer. As the syncer is a state machine, we'll gate our actions
|
||||
// based off of the current state and the next incoming message.
|
||||
// syncerState is an enum that represents the current state of the GossipSyncer.
|
||||
// As the syncer is a state machine, we'll gate our actions based off of the
|
||||
// current state and the next incoming message.
|
||||
type syncerState uint32
|
||||
|
||||
const (
|
||||
// syncingChans is the default state of the gossipSyncer. We start in
|
||||
// syncingChans is the default state of the GossipSyncer. We start in
|
||||
// this state when a new peer first connects and we don't yet know if
|
||||
// we're fully synchronized.
|
||||
syncingChans syncerState = iota
|
||||
|
||||
// waitingQueryRangeReply is the second main phase of the gossipSyncer.
|
||||
// waitingQueryRangeReply is the second main phase of the GossipSyncer.
|
||||
// We enter this state after we send out our first QueryChannelRange
|
||||
// reply. We'll stay in this state until the remote party sends us a
|
||||
// ReplyShortChanIDsEnd message that indicates they've responded to our
|
||||
@ -33,19 +33,19 @@ const (
|
||||
// chan ID's to us.
|
||||
waitingQueryRangeReply
|
||||
|
||||
// queryNewChannels is the third main phase of the gossipSyncer. In
|
||||
// queryNewChannels is the third main phase of the GossipSyncer. In
|
||||
// this phase we'll send out all of our QueryShortChanIDs messages in
|
||||
// response to the new channels that we don't yet know about.
|
||||
queryNewChannels
|
||||
|
||||
// waitingQueryChanReply is the fourth main phase of the gossipSyncer.
|
||||
// waitingQueryChanReply is the fourth main phase of the GossipSyncer.
|
||||
// We enter this phase once we've sent off a query chink to the remote
|
||||
// peer. We'll stay in this phase until we receive a
|
||||
// ReplyShortChanIDsEnd message which indicates that the remote party
|
||||
// has responded to all of our requests.
|
||||
waitingQueryChanReply
|
||||
|
||||
// chansSynced is the terminal stage of the gossipSyncer. Once we enter
|
||||
// chansSynced is the terminal stage of the GossipSyncer. Once we enter
|
||||
// this phase, we'll send out our update horizon, which filters out the
|
||||
// set of channel updates that we're interested in. In this state,
|
||||
// we'll be able to accept any outgoing messages from the
|
||||
@ -107,7 +107,7 @@ const (
|
||||
chanRangeQueryBuffer = 144
|
||||
)
|
||||
|
||||
// gossipSyncerCfg is a struct that packages all the information a gossipSyncer
|
||||
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
|
||||
// needs to carry out its duties.
|
||||
type gossipSyncerCfg struct {
|
||||
// chainHash is the chain that this syncer is responsible for.
|
||||
@ -148,8 +148,8 @@ type gossipSyncerCfg struct {
|
||||
delayedQueryReplyInterval time.Duration
|
||||
}
|
||||
|
||||
// gossipSyncer is a struct that handles synchronizing the channel graph state
|
||||
// with a remote peer. The gossipSyncer implements a state machine that will
|
||||
// GossipSyncer is a struct that handles synchronizing the channel graph state
|
||||
// with a remote peer. The GossipSyncer implements a state machine that will
|
||||
// progressively ensure we're synchronized with the channel state of the remote
|
||||
// node. Once both nodes have been synchronized, we'll use an update filter to
|
||||
// filter out which messages should be sent to a remote peer based on their
|
||||
@ -157,9 +157,9 @@ type gossipSyncerCfg struct {
|
||||
// them any channel updates at all.
|
||||
//
|
||||
// TODO(roasbeef): modify to only sync from one peer at a time?
|
||||
type gossipSyncer struct {
|
||||
started uint32
|
||||
stopped uint32
|
||||
type GossipSyncer struct {
|
||||
started sync.Once
|
||||
stopped sync.Once
|
||||
|
||||
// remoteUpdateHorizon is the update horizon of the remote peer. We'll
|
||||
// use this to properly filter out any messages.
|
||||
@ -169,7 +169,7 @@ type gossipSyncer struct {
|
||||
// determine if we've already sent out our update.
|
||||
localUpdateHorizon *lnwire.GossipTimestampRange
|
||||
|
||||
// state is the current state of the gossipSyncer.
|
||||
// state is the current state of the GossipSyncer.
|
||||
//
|
||||
// NOTE: This variable MUST be used atomically.
|
||||
state uint32
|
||||
@ -201,9 +201,9 @@ type gossipSyncer struct {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// newGossipSyncer returns a new instance of the gossipSyncer populated using
|
||||
// newGossipSyncer returns a new instance of the GossipSyncer populated using
|
||||
// the passed config.
|
||||
func newGossipSyncer(cfg gossipSyncerCfg) *gossipSyncer {
|
||||
func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer {
|
||||
// If no parameter was specified for max undelayed query replies, set it
|
||||
// to the default of 5 queries.
|
||||
if cfg.maxUndelayedQueryReplies <= 0 {
|
||||
@ -225,7 +225,7 @@ func newGossipSyncer(cfg gossipSyncerCfg) *gossipSyncer {
|
||||
interval, cfg.maxUndelayedQueryReplies,
|
||||
)
|
||||
|
||||
return &gossipSyncer{
|
||||
return &GossipSyncer{
|
||||
cfg: cfg,
|
||||
rateLimiter: rateLimiter,
|
||||
gossipMsgs: make(chan lnwire.Message, 100),
|
||||
@ -233,39 +233,30 @@ func newGossipSyncer(cfg gossipSyncerCfg) *gossipSyncer {
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the gossipSyncer and any goroutines that it needs to carry out
|
||||
// Start starts the GossipSyncer and any goroutines that it needs to carry out
|
||||
// its duties.
|
||||
func (g *gossipSyncer) Start() error {
|
||||
if !atomic.CompareAndSwapUint32(&g.started, 0, 1) {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debugf("Starting gossipSyncer(%x)", g.cfg.peerPub[:])
|
||||
func (g *GossipSyncer) Start() {
|
||||
g.started.Do(func() {
|
||||
log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
|
||||
|
||||
g.wg.Add(1)
|
||||
go g.channelGraphSyncer()
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Stop signals the gossipSyncer for a graceful exit, then waits until it has
|
||||
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
|
||||
// exited.
|
||||
func (g *gossipSyncer) Stop() error {
|
||||
if !atomic.CompareAndSwapUint32(&g.stopped, 0, 1) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *GossipSyncer) Stop() {
|
||||
g.stopped.Do(func() {
|
||||
close(g.quit)
|
||||
|
||||
g.wg.Wait()
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// channelGraphSyncer is the main goroutine responsible for ensuring that we
|
||||
// properly channel graph state with the remote peer, and also that we only
|
||||
// send them messages which actually pass their defined update horizon.
|
||||
func (g *gossipSyncer) channelGraphSyncer() {
|
||||
func (g *GossipSyncer) channelGraphSyncer() {
|
||||
defer g.wg.Done()
|
||||
|
||||
// TODO(roasbeef): also add ability to force transition back to syncing
|
||||
@ -274,7 +265,7 @@ func (g *gossipSyncer) channelGraphSyncer() {
|
||||
|
||||
for {
|
||||
state := atomic.LoadUint32(&g.state)
|
||||
log.Debugf("gossipSyncer(%x): state=%v", g.cfg.peerPub[:],
|
||||
log.Debugf("GossipSyncer(%x): state=%v", g.cfg.peerPub[:],
|
||||
syncerState(state))
|
||||
|
||||
switch syncerState(state) {
|
||||
@ -412,7 +403,7 @@ func (g *gossipSyncer) channelGraphSyncer() {
|
||||
// horizon to ensure we don't miss any newer
|
||||
// items.
|
||||
updateHorizon := time.Now().Add(-time.Hour * 1)
|
||||
log.Infof("gossipSyncer(%x): applying "+
|
||||
log.Infof("GossipSyncer(%x): applying "+
|
||||
"gossipFilter(start=%v)",
|
||||
g.cfg.peerPub[:], updateHorizon)
|
||||
|
||||
@ -451,12 +442,12 @@ func (g *gossipSyncer) channelGraphSyncer() {
|
||||
// been queried for with a response received. We'll chunk our requests as
|
||||
// required to ensure they fit into a single message. We may re-renter this
|
||||
// state in the case that chunking is required.
|
||||
func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
|
||||
func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
|
||||
// If we're in this state yet there are no more new channels to query
|
||||
// for, then we'll transition to our final synced state and return true
|
||||
// to signal that we're fully synchronized.
|
||||
if len(g.newChansToQuery) == 0 {
|
||||
log.Infof("gossipSyncer(%x): no more chans to query",
|
||||
log.Infof("GossipSyncer(%x): no more chans to query",
|
||||
g.cfg.peerPub[:])
|
||||
return true, nil
|
||||
}
|
||||
@ -479,7 +470,7 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
|
||||
g.newChansToQuery = g.newChansToQuery[g.cfg.chunkSize:]
|
||||
}
|
||||
|
||||
log.Infof("gossipSyncer(%x): querying for %v new channels",
|
||||
log.Infof("GossipSyncer(%x): querying for %v new channels",
|
||||
g.cfg.peerPub[:], len(queryChunk))
|
||||
|
||||
// With our chunk obtained, we'll send over our next query, then return
|
||||
@ -493,15 +484,15 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// processChanRangeReply is called each time the gossipSyncer receives a new
|
||||
// processChanRangeReply is called each time the GossipSyncer receives a new
|
||||
// reply to the initial range query to discover new channels that it didn't
|
||||
// previously know of.
|
||||
func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
|
||||
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
|
||||
g.bufferedChanRangeReplies = append(
|
||||
g.bufferedChanRangeReplies, msg.ShortChanIDs...,
|
||||
)
|
||||
|
||||
log.Infof("gossipSyncer(%x): buffering chan range reply of size=%v",
|
||||
log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
|
||||
g.cfg.peerPub[:], len(msg.ShortChanIDs))
|
||||
|
||||
// If this isn't the last response, then we can exit as we've already
|
||||
@ -510,7 +501,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Infof("gossipSyncer(%x): filtering through %v chans",
|
||||
log.Infof("GossipSyncer(%x): filtering through %v chans",
|
||||
g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
|
||||
|
||||
// Otherwise, this is the final response, so we'll now check to see
|
||||
@ -530,7 +521,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
|
||||
// If there aren't any channels that we don't know of, then we can
|
||||
// switch straight to our terminal state.
|
||||
if len(newChans) == 0 {
|
||||
log.Infof("gossipSyncer(%x): remote peer has no new chans",
|
||||
log.Infof("GossipSyncer(%x): remote peer has no new chans",
|
||||
g.cfg.peerPub[:])
|
||||
|
||||
atomic.StoreUint32(&g.state, uint32(chansSynced))
|
||||
@ -542,7 +533,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
|
||||
g.newChansToQuery = newChans
|
||||
atomic.StoreUint32(&g.state, uint32(queryNewChannels))
|
||||
|
||||
log.Infof("gossipSyncer(%x): starting query for %v new chans",
|
||||
log.Infof("GossipSyncer(%x): starting query for %v new chans",
|
||||
g.cfg.peerPub[:], len(newChans))
|
||||
|
||||
return nil
|
||||
@ -551,7 +542,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
|
||||
// genChanRangeQuery generates the initial message we'll send to the remote
|
||||
// party when we're kicking off the channel graph synchronization upon
|
||||
// connection.
|
||||
func (g *gossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
|
||||
func (g *GossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
|
||||
// First, we'll query our channel graph time series for its highest
|
||||
// known channel ID.
|
||||
newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
|
||||
@ -574,7 +565,7 @@ func (g *gossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
|
||||
startHeight = uint32(newestChan.BlockHeight - chanRangeQueryBuffer)
|
||||
}
|
||||
|
||||
log.Infof("gossipSyncer(%x): requesting new chans from height=%v "+
|
||||
log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
|
||||
"and %v blocks after", g.cfg.peerPub[:], startHeight,
|
||||
math.MaxUint32-startHeight)
|
||||
|
||||
@ -590,7 +581,7 @@ func (g *gossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
|
||||
|
||||
// replyPeerQueries is called in response to any query by the remote peer.
|
||||
// We'll examine our state and send back our best response.
|
||||
func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
|
||||
func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error {
|
||||
reservation := g.rateLimiter.Reserve()
|
||||
delay := reservation.Delay()
|
||||
|
||||
@ -598,7 +589,7 @@ func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
|
||||
// responses back to the remote peer. This can help prevent DOS attacks
|
||||
// where the remote peer spams us endlessly.
|
||||
if delay > 0 {
|
||||
log.Infof("gossipSyncer(%x): rate limiting gossip replies, "+
|
||||
log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
|
||||
"responding in %s", g.cfg.peerPub[:], delay)
|
||||
|
||||
select {
|
||||
@ -630,8 +621,8 @@ func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
|
||||
// meet the channel range, then chunk our responses to the remote node. We also
|
||||
// ensure that our final fragment carries the "complete" bit to indicate the
|
||||
// end of our streaming response.
|
||||
func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
|
||||
log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+
|
||||
func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
|
||||
log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
|
||||
"num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
|
||||
query.NumBlocks)
|
||||
|
||||
@ -666,7 +657,7 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
|
||||
if isFinalChunk {
|
||||
channelChunk = channelRange[numChansSent:]
|
||||
|
||||
log.Infof("gossipSyncer(%x): sending final chan "+
|
||||
log.Infof("GossipSyncer(%x): sending final chan "+
|
||||
"range chunk, size=%v", g.cfg.peerPub[:],
|
||||
len(channelChunk))
|
||||
} else {
|
||||
@ -674,7 +665,7 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
|
||||
// sized to the proper chunk size.
|
||||
channelChunk = channelRange[numChansSent : numChansSent+g.cfg.chunkSize]
|
||||
|
||||
log.Infof("gossipSyncer(%x): sending range chunk of "+
|
||||
log.Infof("GossipSyncer(%x): sending range chunk of "+
|
||||
"size=%v", g.cfg.peerPub[:], len(channelChunk))
|
||||
}
|
||||
|
||||
@ -707,7 +698,7 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
|
||||
// node for information concerning a set of short channel ID's. Our response
|
||||
// will be sent in a streaming chunked manner to ensure that we remain below
|
||||
// the current transport level message size.
|
||||
func (g *gossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error {
|
||||
func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error {
|
||||
// Before responding, we'll check to ensure that the remote peer is
|
||||
// querying for the same chain that we're on. If not, we'll send back a
|
||||
// response with a complete value of zero to indicate we're on a
|
||||
@ -724,12 +715,12 @@ func (g *gossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
|
||||
}
|
||||
|
||||
if len(query.ShortChanIDs) == 0 {
|
||||
log.Infof("gossipSyncer(%x): ignoring query for blank short chan ID's",
|
||||
log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
|
||||
g.cfg.peerPub[:])
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Infof("gossipSyncer(%x): fetching chan anns for %v chans",
|
||||
log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
|
||||
g.cfg.peerPub[:], len(query.ShortChanIDs))
|
||||
|
||||
// Now that we know we're on the same chain, we'll query the channel
|
||||
@ -766,7 +757,7 @@ func (g *gossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
|
||||
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
|
||||
// state machine. Once applied, we'll ensure that we don't forward any messages
|
||||
// to the peer that aren't within the time range of the filter.
|
||||
func (g *gossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
|
||||
func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
|
||||
g.Lock()
|
||||
|
||||
g.remoteUpdateHorizon = filter
|
||||
@ -787,7 +778,7 @@ func (g *gossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("gossipSyncer(%x): applying new update horizon: start=%v, "+
|
||||
log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+
|
||||
"end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
|
||||
len(newUpdatestoSend))
|
||||
|
||||
@ -813,17 +804,19 @@ func (g *gossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
|
||||
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
|
||||
// iff the message is within the bounds of their set gossip filter. If the peer
|
||||
// doesn't have a gossip filter set, then no messages will be forwarded.
|
||||
func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
|
||||
func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
|
||||
// If the peer doesn't have an update horizon set, then we won't send
|
||||
// it any new update messages.
|
||||
if g.remoteUpdateHorizon == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If we've been signalled to exit, or are exiting, then we'll stop
|
||||
// If we've been signaled to exit, or are exiting, then we'll stop
|
||||
// short.
|
||||
if atomic.LoadUint32(&g.stopped) == 1 {
|
||||
select {
|
||||
case <-g.quit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// TODO(roasbeef): need to ensure that peer still online...send msg to
|
||||
@ -920,7 +913,7 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
|
||||
}
|
||||
}
|
||||
|
||||
log.Tracef("gossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
|
||||
log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
|
||||
g.cfg.peerPub[:], len(msgs), len(msgsToSend))
|
||||
|
||||
if len(msgsToSend) == 0 {
|
||||
@ -932,7 +925,7 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
|
||||
|
||||
// ProcessQueryMsg is used by outside callers to pass new channel time series
|
||||
// queries to the internal processing goroutine.
|
||||
func (g *gossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) {
|
||||
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) {
|
||||
select {
|
||||
case g.gossipMsgs <- msg:
|
||||
case <-peerQuit:
|
||||
@ -940,7 +933,7 @@ func (g *gossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struc
|
||||
}
|
||||
}
|
||||
|
||||
// SyncerState returns the current syncerState of the target gossipSyncer.
|
||||
func (g *gossipSyncer) SyncState() syncerState {
|
||||
// syncState returns the current syncerState of the target GossipSyncer.
|
||||
func (g *GossipSyncer) syncState() syncerState {
|
||||
return syncerState(atomic.LoadUint32(&g.state))
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil)
|
||||
|
||||
func newTestSyncer(hID lnwire.ShortChannelID,
|
||||
encodingType lnwire.ShortChanIDEncoding, chunkSize int32,
|
||||
) (chan []lnwire.Message, *gossipSyncer, *mockChannelGraphTimeSeries) {
|
||||
) (chan []lnwire.Message, *GossipSyncer, *mockChannelGraphTimeSeries) {
|
||||
|
||||
msgChan := make(chan []lnwire.Message, 20)
|
||||
cfg := gossipSyncerCfg{
|
||||
@ -140,7 +140,7 @@ func newTestSyncer(hID lnwire.ShortChannelID,
|
||||
func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
||||
// message to allow us to intercept their potential sends.
|
||||
msgChan, syncer, _ := newTestSyncer(
|
||||
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||
@ -185,7 +185,7 @@ func unixStamp(a int64) uint32 {
|
||||
func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
||||
// message to allow us to intercept their potential sends.
|
||||
msgChan, syncer, chanSeries := newTestSyncer(
|
||||
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||
@ -315,7 +315,7 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
|
||||
func TestGossipSyncerApplyGossipFilter(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
||||
// message to allow us to intercept their potential sends.
|
||||
msgChan, syncer, chanSeries := newTestSyncer(
|
||||
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||
@ -413,7 +413,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
|
||||
func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
||||
// message to allow us to intercept their potential sends.
|
||||
msgChan, syncer, _ := newTestSyncer(
|
||||
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||
@ -464,7 +464,7 @@ func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) {
|
||||
func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
||||
// message to allow us to intercept their potential sends.
|
||||
msgChan, syncer, chanSeries := newTestSyncer(
|
||||
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
|
||||
@ -718,7 +718,7 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
|
||||
func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
||||
// message to allow us to intercept their potential sends.
|
||||
const startingHeight = 200
|
||||
_, syncer, _ := newTestSyncer(
|
||||
@ -753,7 +753,7 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
|
||||
func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
||||
// message to allow us to intercept their potential sends.
|
||||
_, syncer, chanSeries := newTestSyncer(
|
||||
lnwire.NewShortChanIDFromInt(10), defaultEncoding, defaultChunkSize,
|
||||
@ -827,7 +827,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
||||
t.Fatalf("unable to process reply: %v", err)
|
||||
}
|
||||
|
||||
if syncer.SyncState() != queryNewChannels {
|
||||
if syncer.syncState() != queryNewChannels {
|
||||
t.Fatalf("wrong state: expected %v instead got %v",
|
||||
queryNewChannels, syncer.state)
|
||||
}
|
||||
@ -860,7 +860,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
||||
t.Fatalf("unable to process reply: %v", err)
|
||||
}
|
||||
|
||||
if syncer.SyncState() != chansSynced {
|
||||
if syncer.syncState() != chansSynced {
|
||||
t.Fatalf("wrong state: expected %v instead got %v",
|
||||
chansSynced, syncer.state)
|
||||
}
|
||||
@ -878,7 +878,7 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
|
||||
// queries: two full chunks, and one lingering chunk.
|
||||
const chunkSize = 2
|
||||
|
||||
// First, we'll create a gossipSyncer instance with a canned sendToPeer
|
||||
// First, we'll create a GossipSyncer instance with a canned sendToPeer
|
||||
// message to allow us to intercept their potential sends.
|
||||
msgChan, syncer, _ := newTestSyncer(
|
||||
lnwire.NewShortChanIDFromInt(10), defaultEncoding, chunkSize,
|
||||
@ -997,7 +997,7 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
|
||||
const numDelayedQueries = 2
|
||||
const delayTolerance = time.Millisecond * 200
|
||||
|
||||
// First, we'll create two gossipSyncer instances with a canned
|
||||
// First, we'll create two GossipSyncer instances with a canned
|
||||
// sendToPeer message to allow us to intercept their potential sends.
|
||||
startHeight := lnwire.ShortChannelID{
|
||||
BlockHeight: 1144,
|
||||
@ -1390,7 +1390,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
|
||||
// queries: two full chunks, and one lingering chunk.
|
||||
const chunkSize = 2
|
||||
|
||||
// First, we'll create two gossipSyncer instances with a canned
|
||||
// First, we'll create two GossipSyncer instances with a canned
|
||||
// sendToPeer message to allow us to intercept their potential sends.
|
||||
startHeight := lnwire.ShortChannelID{
|
||||
BlockHeight: 1144,
|
||||
@ -1730,7 +1730,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
|
||||
// queries: two full chunks, and one lingering chunk.
|
||||
const chunkSize = 2
|
||||
|
||||
// First, we'll create two gossipSyncer instances with a canned
|
||||
// First, we'll create two GossipSyncer instances with a canned
|
||||
// sendToPeer message to allow us to intercept their potential sends.
|
||||
startHeight := lnwire.ShortChannelID{
|
||||
BlockHeight: 1144,
|
||||
|
Loading…
Reference in New Issue
Block a user