Merge pull request #1375 from wpaulino/gossiper-cancel-block-epochs

discovery/gossiper: hold ref to block epoch stream and cancel when stopped
This commit is contained in:
Olaoluwa Osuntokun 2018-06-13 22:16:52 -07:00 committed by GitHub
commit b5a228808b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 21 deletions

View File

@ -161,9 +161,9 @@ type AuthenticatedGossiper struct {
// was initialized with. // was initialized with.
cfg *Config cfg *Config
// newBlocks is a channel in which new blocks connected to the end of // blockEpochs encapsulates a stream of block epochs that are sent at
// the main chain are sent over. // every new block height.
newBlocks <-chan *chainntnfs.BlockEpoch blockEpochs *chainntnfs.BlockEpochEvent
// prematureAnnouncements maps a block height to a set of network // prematureAnnouncements maps a block height to a set of network
// messages which are "premature" from our PoV. A message is premature // messages which are "premature" from our PoV. A message is premature
@ -410,7 +410,7 @@ func (d *AuthenticatedGossiper) Start() error {
if err != nil { if err != nil {
return err return err
} }
d.newBlocks = blockEpochs.Epochs d.blockEpochs = blockEpochs
height, err := d.cfg.Router.CurrentBlockHeight() height, err := d.cfg.Router.CurrentBlockHeight()
if err != nil { if err != nil {
@ -440,6 +440,8 @@ func (d *AuthenticatedGossiper) Stop() {
log.Info("Authenticated Gossiper is stopping") log.Info("Authenticated Gossiper is stopping")
d.blockEpochs.Cancel()
d.syncerMtx.RLock() d.syncerMtx.RLock()
for _, syncer := range d.peerSyncers { for _, syncer := range d.peerSyncers {
syncer.Stop() syncer.Stop()
@ -1105,7 +1107,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
// A new block has arrived, so we can re-process the previously // A new block has arrived, so we can re-process the previously
// premature announcements. // premature announcements.
case newBlock, ok := <-d.newBlocks: case newBlock, ok := <-d.blockEpochs.Epochs:
// If the channel has been closed, then this indicates // If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves. // the daemon is shutting down, so we exit ourselves.
if !ok { if !ok {

22
peer.go
View File

@ -343,11 +343,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
continue continue
} }
blockEpoch, err := p.server.cc.chainNotifier.RegisterBlockEpochNtfn()
if err != nil {
lnChan.Stop()
return err
}
_, currentHeight, err := p.server.cc.chainIO.GetBestBlock() _, currentHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil { if err != nil {
lnChan.Stop() lnChan.Stop()
@ -410,8 +405,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
// Create the link and add it to the switch. // Create the link and add it to the switch.
err = p.addLink( err = p.addLink(
chanPoint, lnChan, forwardingPolicy, blockEpoch, chanPoint, lnChan, forwardingPolicy, chainEvents,
chainEvents, currentHeight, true, currentHeight, true,
) )
if err != nil { if err != nil {
lnChan.Stop() lnChan.Stop()
@ -430,7 +425,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
func (p *peer) addLink(chanPoint *wire.OutPoint, func (p *peer) addLink(chanPoint *wire.OutPoint,
lnChan *lnwallet.LightningChannel, lnChan *lnwallet.LightningChannel,
forwardingPolicy *htlcswitch.ForwardingPolicy, forwardingPolicy *htlcswitch.ForwardingPolicy,
blockEpoch *chainntnfs.BlockEpochEvent,
chainEvents *contractcourt.ChainEventSubscription, chainEvents *contractcourt.ChainEventSubscription,
currentHeight int32, syncStates bool) error { currentHeight int32, syncStates bool) error {
@ -1503,11 +1497,6 @@ out:
// necessary items it needs to function. // necessary items it needs to function.
// //
// TODO(roasbeef): panic on below? // TODO(roasbeef): panic on below?
blockEpoch, err := p.server.cc.chainNotifier.RegisterBlockEpochNtfn()
if err != nil {
peerLog.Errorf("unable to register for block epoch: %v", err)
continue
}
_, currentHeight, err := p.server.cc.chainIO.GetBestBlock() _, currentHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil { if err != nil {
peerLog.Errorf("unable to get best block: %v", err) peerLog.Errorf("unable to get best block: %v", err)
@ -1523,9 +1512,10 @@ out:
} }
// Create the link and add it to the switch. // Create the link and add it to the switch.
err = p.addLink(chanPoint, newChan, err = p.addLink(
&p.server.cc.routingPolicy, blockEpoch, chanPoint, newChan, &p.server.cc.routingPolicy,
chainEvents, currentHeight, false) chainEvents, currentHeight, false,
)
if err != nil { if err != nil {
peerLog.Errorf("can't register new channel "+ peerLog.Errorf("can't register new channel "+
"link(%v) with NodeKey(%x): %v", chanPoint, "link(%v) with NodeKey(%x): %v", chanPoint,