Merge pull request #1325 from Roasbeef/gossip-syncer-fixes
discovery: ensure gossiper syncer has idempotent exit/start, fix deadlock
This commit is contained in:
commit
cc12cf428b
@ -525,7 +525,7 @@ type msgWithSenders struct {
|
|||||||
// with peers that we have an active gossipSyncer with. We do this to ensure
|
// with peers that we have an active gossipSyncer with. We do this to ensure
|
||||||
// that we don't broadcast messages to any peers that we have active gossip
|
// that we don't broadcast messages to any peers that we have active gossip
|
||||||
// syncers for.
|
// syncers for.
|
||||||
func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]struct{}) {
|
func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]*gossipSyncer) {
|
||||||
for peerPub := range syncers {
|
for peerPub := range syncers {
|
||||||
m.senders[peerPub] = struct{}{}
|
m.senders[peerPub] = struct{}{}
|
||||||
}
|
}
|
||||||
@ -1130,9 +1130,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
// syncers, we'll collect their pubkeys so we can avoid
|
// syncers, we'll collect their pubkeys so we can avoid
|
||||||
// sending them the full message blast below.
|
// sending them the full message blast below.
|
||||||
d.syncerMtx.RLock()
|
d.syncerMtx.RLock()
|
||||||
syncerPeers := map[routing.Vertex]struct{}{}
|
syncerPeers := make(map[routing.Vertex]*gossipSyncer)
|
||||||
for peerPub := range d.peerSyncers {
|
for peerPub, syncer := range d.peerSyncers {
|
||||||
syncerPeers[peerPub] = struct{}{}
|
syncerPeers[peerPub] = syncer
|
||||||
}
|
}
|
||||||
d.syncerMtx.RUnlock()
|
d.syncerMtx.RUnlock()
|
||||||
|
|
||||||
@ -1142,11 +1142,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
// We'll first attempt to filter out this new message
|
// We'll first attempt to filter out this new message
|
||||||
// for all peers that have active gossip syncers
|
// for all peers that have active gossip syncers
|
||||||
// active.
|
// active.
|
||||||
d.syncerMtx.RLock()
|
for _, syncer := range syncerPeers {
|
||||||
for _, syncer := range d.peerSyncers {
|
|
||||||
syncer.FilterGossipMsgs(announcementBatch...)
|
syncer.FilterGossipMsgs(announcementBatch...)
|
||||||
}
|
}
|
||||||
d.syncerMtx.RUnlock()
|
|
||||||
|
|
||||||
// Next, If we have new things to announce then
|
// Next, If we have new things to announce then
|
||||||
// broadcast them to all our immediately connected
|
// broadcast them to all our immediately connected
|
||||||
@ -1234,8 +1232,7 @@ func (d *AuthenticatedGossiper) PruneSyncState(peer *btcec.PublicKey) {
|
|||||||
peer.SerializeCompressed())
|
peer.SerializeCompressed())
|
||||||
|
|
||||||
vertex := routing.NewVertex(peer)
|
vertex := routing.NewVertex(peer)
|
||||||
|
syncer, ok := d.peerSyncers[vertex]
|
||||||
syncer, ok := d.peerSyncers[routing.NewVertex(peer)]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -175,6 +175,9 @@ type gossipSyncerCfg struct {
|
|||||||
//
|
//
|
||||||
// TODO(roasbeef): modify to only sync from one peer at a time?
|
// TODO(roasbeef): modify to only sync from one peer at a time?
|
||||||
type gossipSyncer struct {
|
type gossipSyncer struct {
|
||||||
|
started uint32
|
||||||
|
stopped uint32
|
||||||
|
|
||||||
// remoteUpdateHorizon is the update horizon of the remote peer. We'll
|
// remoteUpdateHorizon is the update horizon of the remote peer. We'll
|
||||||
// use this to properly filter out any messages.
|
// use this to properly filter out any messages.
|
||||||
remoteUpdateHorizon *lnwire.GossipTimestampRange
|
remoteUpdateHorizon *lnwire.GossipTimestampRange
|
||||||
@ -226,6 +229,10 @@ func newGossiperSyncer(cfg gossipSyncerCfg) *gossipSyncer {
|
|||||||
// Start starts the gossipSyncer and any goroutines that it needs to carry out
|
// Start starts the gossipSyncer and any goroutines that it needs to carry out
|
||||||
// its duties.
|
// its duties.
|
||||||
func (g *gossipSyncer) Start() error {
|
func (g *gossipSyncer) Start() error {
|
||||||
|
if !atomic.CompareAndSwapUint32(&g.started, 0, 1) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
log.Debugf("Starting gossipSyncer(%x)", g.peerPub[:])
|
log.Debugf("Starting gossipSyncer(%x)", g.peerPub[:])
|
||||||
|
|
||||||
g.wg.Add(1)
|
g.wg.Add(1)
|
||||||
@ -237,6 +244,10 @@ func (g *gossipSyncer) Start() error {
|
|||||||
// Stop signals the gossipSyncer for a graceful exit, then waits until it has
|
// Stop signals the gossipSyncer for a graceful exit, then waits until it has
|
||||||
// exited.
|
// exited.
|
||||||
func (g *gossipSyncer) Stop() error {
|
func (g *gossipSyncer) Stop() error {
|
||||||
|
if !atomic.CompareAndSwapUint32(&g.stopped, 0, 1) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
close(g.quit)
|
close(g.quit)
|
||||||
|
|
||||||
g.wg.Wait()
|
g.wg.Wait()
|
||||||
@ -794,6 +805,12 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we've been signalled to exit, or are exiting, then we'll stop
|
||||||
|
// short.
|
||||||
|
if atomic.LoadUint32(&g.stopped) == 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(roasbeef): need to ensure that peer still online...send msg to
|
// TODO(roasbeef): need to ensure that peer still online...send msg to
|
||||||
// gossiper on peer termination to signal peer disconnect?
|
// gossiper on peer termination to signal peer disconnect?
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user