discovery: if unable to find gossipSyncer for peer, create one
In this commit we fix an existing bug caused by a scheduling race condition. We'll now ensure that if we get a gossip message from a peer before we create an instance for it, then we create one on the spot so we can service the message. Before this commit, we would drop the first message, and therefore never sync up with the peer at all, causing them to miss channel announcements.
This commit is contained in:
parent
994d9cf7e4
commit
b3ac3492a0
@ -872,18 +872,48 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
|
||||
|
||||
// findGossipSyncer is a utility method used by the gossiper to locate the
|
||||
// gossip syncer for an inbound message so we can properly dispatch the
|
||||
// incoming message.
|
||||
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) {
|
||||
// incoming message. If a gossip syncer isn't found, then one will be created
|
||||
// for the target peer.
|
||||
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSyncer {
|
||||
target := routing.NewVertex(pub)
|
||||
|
||||
// First, we'll try to find an existing gossiper for this peer.
|
||||
d.syncerMtx.RLock()
|
||||
syncer, ok := d.peerSyncers[target]
|
||||
d.syncerMtx.RUnlock()
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("received chan time series message for "+
|
||||
"unknown peer: %x", target[:])
|
||||
|
||||
// If one exists, then we'll return it directly.
|
||||
if ok {
|
||||
return syncer
|
||||
}
|
||||
|
||||
return syncer, nil
|
||||
// Otherwise, we'll obtain the mutex, then check again if a gossiper
|
||||
// was added after we dropped the read mutex.
|
||||
d.syncerMtx.Lock()
|
||||
syncer, ok = d.peerSyncers[target]
|
||||
if ok {
|
||||
d.syncerMtx.Unlock()
|
||||
return syncer
|
||||
}
|
||||
|
||||
// At this point, a syncer doesn't yet exist, so we'll create a new one
|
||||
// for the peer and return it to the caller.
|
||||
syncer = newGossiperSyncer(gossipSyncerCfg{
|
||||
chainHash: d.cfg.ChainHash,
|
||||
syncChanUpdates: true,
|
||||
channelSeries: d.cfg.ChanSeries,
|
||||
encodingType: lnwire.EncodingSortedPlain,
|
||||
sendToPeer: func(msgs ...lnwire.Message) error {
|
||||
return d.cfg.SendToPeer(pub, msgs...)
|
||||
},
|
||||
})
|
||||
copy(syncer.peerPub[:], pub.SerializeCompressed())
|
||||
d.peerSyncers[target] = syncer
|
||||
syncer.Start()
|
||||
|
||||
d.syncerMtx.Unlock()
|
||||
|
||||
return syncer
|
||||
}
|
||||
|
||||
// networkHandler is the primary goroutine that drives this service. The roles
|
||||
@ -961,15 +991,11 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
// then we'll dispatch that directly to the proper
|
||||
// gossipSyncer.
|
||||
case *lnwire.GossipTimestampRange:
|
||||
syncer, err := d.findGossipSyncer(announcement.peer)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
syncer := d.findGossipSyncer(announcement.peer)
|
||||
|
||||
// If we've found the message target, then
|
||||
// we'll dispatch the message directly to it.
|
||||
err = syncer.ApplyGossipFilter(msg)
|
||||
err := syncer.ApplyGossipFilter(msg)
|
||||
if err != nil {
|
||||
log.Warnf("unable to apply gossip "+
|
||||
"filter for peer=%x: %v",
|
||||
@ -985,11 +1011,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
*lnwire.ReplyChannelRange,
|
||||
*lnwire.ReplyShortChanIDsEnd:
|
||||
|
||||
syncer, err := d.findGossipSyncer(announcement.peer)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
syncer := d.findGossipSyncer(announcement.peer)
|
||||
|
||||
syncer.ProcessQueryMsg(announcement.msg)
|
||||
continue
|
||||
|
Loading…
Reference in New Issue
Block a user