discovery: include peerPub in gossipSyncerCfg

This commit is contained in:
Wilmer Paulino 2019-03-22 19:54:31 -07:00
parent c72db902f0
commit d954cfc4ba
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 31 additions and 31 deletions

@ -1107,8 +1107,9 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
log.Infof("Creating new gossipSyncer for peer=%x", nodeID[:]) log.Infof("Creating new gossipSyncer for peer=%x", nodeID[:])
encoding := lnwire.EncodingSortedPlain encoding := lnwire.EncodingSortedPlain
syncer := newGossiperSyncer(gossipSyncerCfg{ syncer := newGossipSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash, chainHash: d.cfg.ChainHash,
peerPub: nodeID,
syncChanUpdates: recvUpdates, syncChanUpdates: recvUpdates,
channelSeries: d.cfg.ChanSeries, channelSeries: d.cfg.ChanSeries,
encodingType: encoding, encodingType: encoding,
@ -1117,7 +1118,6 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
return syncPeer.SendMessageLazy(false, msgs...) return syncPeer.SendMessageLazy(false, msgs...)
}, },
}) })
copy(syncer.peerPub[:], nodeID[:])
d.peerSyncers[nodeID] = syncer d.peerSyncers[nodeID] = syncer
syncer.Start() syncer.Start()

@ -113,6 +113,10 @@ type gossipSyncerCfg struct {
// chainHash is the chain that this syncer is responsible for. // chainHash is the chain that this syncer is responsible for.
chainHash chainhash.Hash chainHash chainhash.Hash
// peerPub is the public key of the peer we're syncing with, serialized
// in compressed format.
peerPub [33]byte
// syncChanUpdates is a bool that indicates if we should request a // syncChanUpdates is a bool that indicates if we should request a
// continual channel update stream or not. // continual channel update stream or not.
syncChanUpdates bool syncChanUpdates bool
@ -183,10 +187,6 @@ type gossipSyncer struct {
// state. // state.
newChansToQuery []lnwire.ShortChannelID newChansToQuery []lnwire.ShortChannelID
// peerPub is the public key of the peer we're syncing with, serialized
// in compressed format.
peerPub [33]byte
cfg gossipSyncerCfg cfg gossipSyncerCfg
// rateLimiter dictates the frequency with which we will reply to gossip // rateLimiter dictates the frequency with which we will reply to gossip
@ -201,9 +201,9 @@ type gossipSyncer struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
// newGossiperSyncer returns a new instance of the gossipSyncer populated using // newGossipSyncer returns a new instance of the gossipSyncer populated using
// the passed config. // the passed config.
func newGossiperSyncer(cfg gossipSyncerCfg) *gossipSyncer { func newGossipSyncer(cfg gossipSyncerCfg) *gossipSyncer {
// If no parameter was specified for max undelayed query replies, set it // If no parameter was specified for max undelayed query replies, set it
// to the default of 5 queries. // to the default of 5 queries.
if cfg.maxUndelayedQueryReplies <= 0 { if cfg.maxUndelayedQueryReplies <= 0 {
@ -240,7 +240,7 @@ func (g *gossipSyncer) Start() error {
return nil return nil
} }
log.Debugf("Starting gossipSyncer(%x)", g.peerPub[:]) log.Debugf("Starting gossipSyncer(%x)", g.cfg.peerPub[:])
g.wg.Add(1) g.wg.Add(1)
go g.channelGraphSyncer() go g.channelGraphSyncer()
@ -274,7 +274,7 @@ func (g *gossipSyncer) channelGraphSyncer() {
for { for {
state := atomic.LoadUint32(&g.state) state := atomic.LoadUint32(&g.state)
log.Debugf("gossipSyncer(%x): state=%v", g.peerPub[:], log.Debugf("gossipSyncer(%x): state=%v", g.cfg.peerPub[:],
syncerState(state)) syncerState(state))
switch syncerState(state) { switch syncerState(state) {
@ -413,8 +413,8 @@ func (g *gossipSyncer) channelGraphSyncer() {
// items. // items.
updateHorizon := time.Now().Add(-time.Hour * 1) updateHorizon := time.Now().Add(-time.Hour * 1)
log.Infof("gossipSyncer(%x): applying "+ log.Infof("gossipSyncer(%x): applying "+
"gossipFilter(start=%v)", g.peerPub[:], "gossipFilter(start=%v)",
updateHorizon) g.cfg.peerPub[:], updateHorizon)
g.localUpdateHorizon = &lnwire.GossipTimestampRange{ g.localUpdateHorizon = &lnwire.GossipTimestampRange{
ChainHash: g.cfg.chainHash, ChainHash: g.cfg.chainHash,
@ -457,7 +457,7 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
// to signal that we're fully synchronized. // to signal that we're fully synchronized.
if len(g.newChansToQuery) == 0 { if len(g.newChansToQuery) == 0 {
log.Infof("gossipSyncer(%x): no more chans to query", log.Infof("gossipSyncer(%x): no more chans to query",
g.peerPub[:]) g.cfg.peerPub[:])
return true, nil return true, nil
} }
@ -480,7 +480,7 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) {
} }
log.Infof("gossipSyncer(%x): querying for %v new channels", log.Infof("gossipSyncer(%x): querying for %v new channels",
g.peerPub[:], len(queryChunk)) g.cfg.peerPub[:], len(queryChunk))
// With our chunk obtained, we'll send over our next query, then return // With our chunk obtained, we'll send over our next query, then return
// false indicating that we're net yet fully synced. // false indicating that we're net yet fully synced.
@ -502,7 +502,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
) )
log.Infof("gossipSyncer(%x): buffering chan range reply of size=%v", log.Infof("gossipSyncer(%x): buffering chan range reply of size=%v",
g.peerPub[:], len(msg.ShortChanIDs)) g.cfg.peerPub[:], len(msg.ShortChanIDs))
// If this isn't the last response, then we can exit as we've already // If this isn't the last response, then we can exit as we've already
// buffered the latest portion of the streaming reply. // buffered the latest portion of the streaming reply.
@ -510,8 +510,8 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
return nil return nil
} }
log.Infof("gossipSyncer(%x): filtering through %v chans", g.peerPub[:], log.Infof("gossipSyncer(%x): filtering through %v chans",
len(g.bufferedChanRangeReplies)) g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
// Otherwise, this is the final response, so we'll now check to see // Otherwise, this is the final response, so we'll now check to see
// which channels they know of that we don't. // which channels they know of that we don't.
@ -531,7 +531,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
// switch straight to our terminal state. // switch straight to our terminal state.
if len(newChans) == 0 { if len(newChans) == 0 {
log.Infof("gossipSyncer(%x): remote peer has no new chans", log.Infof("gossipSyncer(%x): remote peer has no new chans",
g.peerPub[:]) g.cfg.peerPub[:])
atomic.StoreUint32(&g.state, uint32(chansSynced)) atomic.StoreUint32(&g.state, uint32(chansSynced))
return nil return nil
@ -543,7 +543,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
atomic.StoreUint32(&g.state, uint32(queryNewChannels)) atomic.StoreUint32(&g.state, uint32(queryNewChannels))
log.Infof("gossipSyncer(%x): starting query for %v new chans", log.Infof("gossipSyncer(%x): starting query for %v new chans",
g.peerPub[:], len(newChans)) g.cfg.peerPub[:], len(newChans))
return nil return nil
} }
@ -575,7 +575,7 @@ func (g *gossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
} }
log.Infof("gossipSyncer(%x): requesting new chans from height=%v "+ log.Infof("gossipSyncer(%x): requesting new chans from height=%v "+
"and %v blocks after", g.peerPub[:], startHeight, "and %v blocks after", g.cfg.peerPub[:], startHeight,
math.MaxUint32-startHeight) math.MaxUint32-startHeight)
// Finally, we'll craft the channel range query, using our starting // Finally, we'll craft the channel range query, using our starting
@ -599,7 +599,7 @@ func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
// where the remote peer spams us endlessly. // where the remote peer spams us endlessly.
if delay > 0 { if delay > 0 {
log.Infof("gossipSyncer(%x): rate limiting gossip replies, "+ log.Infof("gossipSyncer(%x): rate limiting gossip replies, "+
"responding in %s", g.peerPub[:], delay) "responding in %s", g.cfg.peerPub[:], delay)
select { select {
case <-time.After(delay): case <-time.After(delay):
@ -632,7 +632,7 @@ func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
// end of our streaming response. // end of our streaming response.
func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error { func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+ log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+
"num_blocks=%v", g.peerPub[:], query.FirstBlockHeight, "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
query.NumBlocks) query.NumBlocks)
// Next, we'll consult the time series to obtain the set of known // Next, we'll consult the time series to obtain the set of known
@ -667,15 +667,15 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
channelChunk = channelRange[numChansSent:] channelChunk = channelRange[numChansSent:]
log.Infof("gossipSyncer(%x): sending final chan "+ log.Infof("gossipSyncer(%x): sending final chan "+
"range chunk, size=%v", g.peerPub[:], len(channelChunk)) "range chunk, size=%v", g.cfg.peerPub[:],
len(channelChunk))
} else { } else {
// Otherwise, we'll only send off a fragment exactly // Otherwise, we'll only send off a fragment exactly
// sized to the proper chunk size. // sized to the proper chunk size.
channelChunk = channelRange[numChansSent : numChansSent+g.cfg.chunkSize] channelChunk = channelRange[numChansSent : numChansSent+g.cfg.chunkSize]
log.Infof("gossipSyncer(%x): sending range chunk of "+ log.Infof("gossipSyncer(%x): sending range chunk of "+
"size=%v", g.peerPub[:], len(channelChunk)) "size=%v", g.cfg.peerPub[:], len(channelChunk))
} }
// With our chunk assembled, we'll now send to the remote peer // With our chunk assembled, we'll now send to the remote peer
@ -725,12 +725,12 @@ func (g *gossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
if len(query.ShortChanIDs) == 0 { if len(query.ShortChanIDs) == 0 {
log.Infof("gossipSyncer(%x): ignoring query for blank short chan ID's", log.Infof("gossipSyncer(%x): ignoring query for blank short chan ID's",
g.peerPub[:]) g.cfg.peerPub[:])
return nil return nil
} }
log.Infof("gossipSyncer(%x): fetching chan anns for %v chans", log.Infof("gossipSyncer(%x): fetching chan anns for %v chans",
g.peerPub[:], len(query.ShortChanIDs)) g.cfg.peerPub[:], len(query.ShortChanIDs))
// Now that we know we're on the same chain, we'll query the channel // Now that we know we're on the same chain, we'll query the channel
// time series for the set of messages that we know of which satisfies // time series for the set of messages that we know of which satisfies
@ -788,7 +788,7 @@ func (g *gossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
} }
log.Infof("gossipSyncer(%x): applying new update horizon: start=%v, "+ log.Infof("gossipSyncer(%x): applying new update horizon: start=%v, "+
"end=%v, backlog_size=%v", g.peerPub[:], startTime, endTime, "end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
len(newUpdatestoSend)) len(newUpdatestoSend))
// If we don't have any to send, then we can return early. // If we don't have any to send, then we can return early.
@ -866,7 +866,7 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
// If the target peer is the peer that sent us this message, // If the target peer is the peer that sent us this message,
// then we'll exit early as we don't need to filter this // then we'll exit early as we don't need to filter this
// message. // message.
if _, ok := msg.senders[g.peerPub]; ok { if _, ok := msg.senders[g.cfg.peerPub]; ok {
continue continue
} }
@ -921,7 +921,7 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
} }
log.Tracef("gossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v", log.Tracef("gossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
g.peerPub[:], len(msgs), len(msgsToSend)) g.cfg.peerPub[:], len(msgs), len(msgsToSend))
if len(msgsToSend) == 0 { if len(msgsToSend) == 0 {
return return

@ -130,7 +130,7 @@ func newTestSyncer(hID lnwire.ShortChannelID,
}, },
delayedQueryReplyInterval: 2 * time.Second, delayedQueryReplyInterval: 2 * time.Second,
} }
syncer := newGossiperSyncer(cfg) syncer := newGossipSyncer(cfg)
return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries) return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries)
} }