From 9ecfdb3c321a3a188a67a00acaf399b20d500212 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 11 Jun 2018 17:37:17 -0700 Subject: [PATCH 1/2] discovery/gossiper: hold ref to block epoch stream and cancel when stopped --- discovery/gossiper.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 3819b589..9a5318cb 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -161,9 +161,9 @@ type AuthenticatedGossiper struct { // was initialized with. cfg *Config - // newBlocks is a channel in which new blocks connected to the end of - // the main chain are sent over. - newBlocks <-chan *chainntnfs.BlockEpoch + // blockEpochs encapsulates a stream of block epochs that are sent at + // every new block height. + blockEpochs *chainntnfs.BlockEpochEvent // prematureAnnouncements maps a block height to a set of network // messages which are "premature" from our PoV. A message is premature @@ -410,7 +410,7 @@ func (d *AuthenticatedGossiper) Start() error { if err != nil { return err } - d.newBlocks = blockEpochs.Epochs + d.blockEpochs = blockEpochs height, err := d.cfg.Router.CurrentBlockHeight() if err != nil { @@ -440,6 +440,8 @@ func (d *AuthenticatedGossiper) Stop() { log.Info("Authenticated Gossiper is stopping") + d.blockEpochs.Cancel() + d.syncerMtx.RLock() for _, syncer := range d.peerSyncers { syncer.Stop() @@ -1105,7 +1107,7 @@ func (d *AuthenticatedGossiper) networkHandler() { // A new block has arrived, so we can re-process the previously // premature announcements. - case newBlock, ok := <-d.newBlocks: + case newBlock, ok := <-d.blockEpochs.Epochs: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. if !ok { From 90a50bd893b64ad73d4d6a7a513895561d2d48e5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 11 Jun 2018 19:05:00 -0700 Subject: [PATCH 2/2] peer: remove no longer needed block epoch --- peer.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/peer.go b/peer.go index 91058d7d..63e83363 100644 --- a/peer.go +++ b/peer.go @@ -343,11 +343,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { continue } - blockEpoch, err := p.server.cc.chainNotifier.RegisterBlockEpochNtfn() - if err != nil { - lnChan.Stop() - return err - } _, currentHeight, err := p.server.cc.chainIO.GetBestBlock() if err != nil { lnChan.Stop() @@ -410,8 +405,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { // Create the link and add it to the switch. err = p.addLink( - chanPoint, lnChan, forwardingPolicy, blockEpoch, - chainEvents, currentHeight, true, + chanPoint, lnChan, forwardingPolicy, chainEvents, + currentHeight, true, ) if err != nil { lnChan.Stop() @@ -430,7 +425,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { func (p *peer) addLink(chanPoint *wire.OutPoint, lnChan *lnwallet.LightningChannel, forwardingPolicy *htlcswitch.ForwardingPolicy, - blockEpoch *chainntnfs.BlockEpochEvent, chainEvents *contractcourt.ChainEventSubscription, currentHeight int32, syncStates bool) error { @@ -1503,11 +1497,6 @@ out: // necessary items it needs to function. // // TODO(roasbeef): panic on below? - blockEpoch, err := p.server.cc.chainNotifier.RegisterBlockEpochNtfn() - if err != nil { - peerLog.Errorf("unable to register for block epoch: %v", err) - continue - } _, currentHeight, err := p.server.cc.chainIO.GetBestBlock() if err != nil { peerLog.Errorf("unable to get best block: %v", err) @@ -1523,9 +1512,10 @@ out: } // Create the link and add it to the switch. - err = p.addLink(chanPoint, newChan, - &p.server.cc.routingPolicy, blockEpoch, - chainEvents, currentHeight, false) + err = p.addLink( + chanPoint, newChan, &p.server.cc.routingPolicy, + chainEvents, currentHeight, false, + ) if err != nil { peerLog.Errorf("can't register new channel "+ "link(%v) with NodeKey(%x): %v", chanPoint,