diff --git a/discovery/gossiper.go b/discovery/gossiper.go index fb6cd7d2..ac954186 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1107,8 +1107,9 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, log.Infof("Creating new gossipSyncer for peer=%x", nodeID[:]) encoding := lnwire.EncodingSortedPlain - syncer := newGossiperSyncer(gossipSyncerCfg{ + syncer := newGossipSyncer(gossipSyncerCfg{ chainHash: d.cfg.ChainHash, + peerPub: nodeID, syncChanUpdates: recvUpdates, channelSeries: d.cfg.ChanSeries, encodingType: encoding, @@ -1117,7 +1118,6 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, return syncPeer.SendMessageLazy(false, msgs...) }, }) - copy(syncer.peerPub[:], nodeID[:]) d.peerSyncers[nodeID] = syncer syncer.Start() diff --git a/discovery/syncer.go b/discovery/syncer.go index f82c372e..7a4f9776 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -113,6 +113,10 @@ type gossipSyncerCfg struct { // chainHash is the chain that this syncer is responsible for. chainHash chainhash.Hash + // peerPub is the public key of the peer we're syncing with, serialized + // in compressed format. + peerPub [33]byte + // syncChanUpdates is a bool that indicates if we should request a // continual channel update stream or not. syncChanUpdates bool @@ -183,10 +187,6 @@ type gossipSyncer struct { // state. newChansToQuery []lnwire.ShortChannelID - // peerPub is the public key of the peer we're syncing with, serialized - // in compressed format. - peerPub [33]byte - cfg gossipSyncerCfg // rateLimiter dictates the frequency with which we will reply to gossip @@ -201,9 +201,9 @@ type gossipSyncer struct { wg sync.WaitGroup } -// newGossiperSyncer returns a new instance of the gossipSyncer populated using +// newGossipSyncer returns a new instance of the gossipSyncer populated using // the passed config. -func newGossiperSyncer(cfg gossipSyncerCfg) *gossipSyncer { +func newGossipSyncer(cfg gossipSyncerCfg) *gossipSyncer { // If no parameter was specified for max undelayed query replies, set it // to the default of 5 queries. if cfg.maxUndelayedQueryReplies <= 0 { @@ -240,7 +240,7 @@ func (g *gossipSyncer) Start() error { return nil } - log.Debugf("Starting gossipSyncer(%x)", g.peerPub[:]) + log.Debugf("Starting gossipSyncer(%x)", g.cfg.peerPub[:]) g.wg.Add(1) go g.channelGraphSyncer() @@ -274,7 +274,7 @@ func (g *gossipSyncer) channelGraphSyncer() { for { state := atomic.LoadUint32(&g.state) - log.Debugf("gossipSyncer(%x): state=%v", g.peerPub[:], + log.Debugf("gossipSyncer(%x): state=%v", g.cfg.peerPub[:], syncerState(state)) switch syncerState(state) { @@ -413,8 +413,8 @@ func (g *gossipSyncer) channelGraphSyncer() { // items. updateHorizon := time.Now().Add(-time.Hour * 1) log.Infof("gossipSyncer(%x): applying "+ - "gossipFilter(start=%v)", g.peerPub[:], - updateHorizon) + "gossipFilter(start=%v)", + g.cfg.peerPub[:], updateHorizon) g.localUpdateHorizon = &lnwire.GossipTimestampRange{ ChainHash: g.cfg.chainHash, @@ -457,7 +457,7 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) { // to signal that we're fully synchronized. if len(g.newChansToQuery) == 0 { log.Infof("gossipSyncer(%x): no more chans to query", - g.peerPub[:]) + g.cfg.peerPub[:]) return true, nil } @@ -480,7 +480,7 @@ func (g *gossipSyncer) synchronizeChanIDs() (bool, error) { } log.Infof("gossipSyncer(%x): querying for %v new channels", - g.peerPub[:], len(queryChunk)) + g.cfg.peerPub[:], len(queryChunk)) // With our chunk obtained, we'll send over our next query, then return // false indicating that we're net yet fully synced. @@ -502,7 +502,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro ) log.Infof("gossipSyncer(%x): buffering chan range reply of size=%v", - g.peerPub[:], len(msg.ShortChanIDs)) + g.cfg.peerPub[:], len(msg.ShortChanIDs)) // If this isn't the last response, then we can exit as we've already // buffered the latest portion of the streaming reply. @@ -510,8 +510,8 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro return nil } - log.Infof("gossipSyncer(%x): filtering through %v chans", g.peerPub[:], - len(g.bufferedChanRangeReplies)) + log.Infof("gossipSyncer(%x): filtering through %v chans", + g.cfg.peerPub[:], len(g.bufferedChanRangeReplies)) // Otherwise, this is the final response, so we'll now check to see // which channels they know of that we don't. @@ -531,7 +531,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro // switch straight to our terminal state. if len(newChans) == 0 { log.Infof("gossipSyncer(%x): remote peer has no new chans", - g.peerPub[:]) + g.cfg.peerPub[:]) atomic.StoreUint32(&g.state, uint32(chansSynced)) return nil @@ -543,7 +543,7 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro atomic.StoreUint32(&g.state, uint32(queryNewChannels)) log.Infof("gossipSyncer(%x): starting query for %v new chans", - g.peerPub[:], len(newChans)) + g.cfg.peerPub[:], len(newChans)) return nil } @@ -575,7 +575,7 @@ func (g *gossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) { } log.Infof("gossipSyncer(%x): requesting new chans from height=%v "+ - "and %v blocks after", g.peerPub[:], startHeight, + "and %v blocks after", g.cfg.peerPub[:], startHeight, math.MaxUint32-startHeight) // Finally, we'll craft the channel range query, using our starting @@ -599,7 +599,7 @@ func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error { // where the remote peer spams us endlessly. if delay > 0 { log.Infof("gossipSyncer(%x): rate limiting gossip replies, "+ - "responding in %s", g.peerPub[:], delay) + "responding in %s", g.cfg.peerPub[:], delay) select { case <-time.After(delay): @@ -632,7 +632,7 @@ func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error { // end of our streaming response. func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error { log.Infof("gossipSyncer(%x): filtering chan range: start_height=%v, "+ - "num_blocks=%v", g.peerPub[:], query.FirstBlockHeight, + "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight, query.NumBlocks) // Next, we'll consult the time series to obtain the set of known @@ -667,15 +667,15 @@ func (g *gossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro channelChunk = channelRange[numChansSent:] log.Infof("gossipSyncer(%x): sending final chan "+ - "range chunk, size=%v", g.peerPub[:], len(channelChunk)) - + "range chunk, size=%v", g.cfg.peerPub[:], + len(channelChunk)) } else { // Otherwise, we'll only send off a fragment exactly // sized to the proper chunk size. channelChunk = channelRange[numChansSent : numChansSent+g.cfg.chunkSize] log.Infof("gossipSyncer(%x): sending range chunk of "+ - "size=%v", g.peerPub[:], len(channelChunk)) + "size=%v", g.cfg.peerPub[:], len(channelChunk)) } // With our chunk assembled, we'll now send to the remote peer @@ -725,12 +725,12 @@ func (g *gossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error if len(query.ShortChanIDs) == 0 { log.Infof("gossipSyncer(%x): ignoring query for blank short chan ID's", - g.peerPub[:]) + g.cfg.peerPub[:]) return nil } log.Infof("gossipSyncer(%x): fetching chan anns for %v chans", - g.peerPub[:], len(query.ShortChanIDs)) + g.cfg.peerPub[:], len(query.ShortChanIDs)) // Now that we know we're on the same chain, we'll query the channel // time series for the set of messages that we know of which satisfies @@ -788,7 +788,7 @@ func (g *gossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er } log.Infof("gossipSyncer(%x): applying new update horizon: start=%v, "+ - "end=%v, backlog_size=%v", g.peerPub[:], startTime, endTime, + "end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime, len(newUpdatestoSend)) // If we don't have any to send, then we can return early. @@ -866,7 +866,7 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) { // If the target peer is the peer that sent us this message, // then we'll exit early as we don't need to filter this // message. - if _, ok := msg.senders[g.peerPub]; ok { + if _, ok := msg.senders[g.cfg.peerPub]; ok { continue } @@ -921,7 +921,7 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) { } log.Tracef("gossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v", - g.peerPub[:], len(msgs), len(msgsToSend)) + g.cfg.peerPub[:], len(msgs), len(msgsToSend)) if len(msgsToSend) == 0 { return diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 1887f0ba..44acf402 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -130,7 +130,7 @@ func newTestSyncer(hID lnwire.ShortChannelID, }, delayedQueryReplyInterval: 2 * time.Second, } - syncer := newGossiperSyncer(cfg) + syncer := newGossipSyncer(cfg) return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries) }