From 280e264e8c2cff5a2fa917e72592554df4a21075 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 4 Dec 2017 13:30:33 -0800 Subject: [PATCH] chainntnfs: Implement quit signal in TxConfNotifier. --- chainntnfs/btcdnotify/btcd.go | 5 ++- chainntnfs/neutrinonotify/neutrino.go | 5 ++- chainntnfs/txconfnotifier.go | 52 ++++++++++++++++++++++----- 3 files changed, 52 insertions(+), 10 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index bc900435..5ee9a619 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -284,7 +284,10 @@ out: if err != nil { chainntnfs.Log.Error(err) } - b.txConfNotifier.Register(&msg.ConfNtfn, txConf) + err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) + if err != nil { + chainntnfs.Log.Error(err) + } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 518ec9a9..4bb29920 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -316,7 +316,10 @@ func (n *NeutrinoNotifier) notificationDispatcher() { } } - n.txConfNotifier.Register(&msg.ConfNtfn, txConf) + err = n.txConfNotifier.Register(&msg.ConfNtfn, txConf) + if err != nil { + chainntnfs.Log.Error(err) + } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 17471957..414b6b06 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -75,6 +75,10 @@ type TxConfNotifier struct { // ntfnsByConfirmHeight is an index of notification requests by the height // at which the transaction will have sufficient confirmations. ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} + + // quit is closed in order to signal that the notifier is gracefully + // exiting. + quit chan struct{} } // NewTxConfNotifier creates a TxConfNotifier. The current height of the @@ -86,6 +90,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif confNotifications: make(map[chainhash.Hash][]*ConfNtfn), confTxsByInitialHeight: make(map[uint32][]*chainhash.Hash), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), + quit: make(chan struct{}), } } @@ -95,12 +100,18 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif // confirmation details must be given as the txConf argument, otherwise it // should be nil. If the transaction already has the sufficient number of // confirmations, this dispatches the notification immediately. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error { + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + default: + } + if txConf == nil || txConf.BlockHeight > tcn.currentHeight { // Transaction is unconfirmed. tcn.confNotifications[*ntfn.TxID] = append(tcn.confNotifications[*ntfn.TxID], ntfn) - return + return nil } // If the transaction already has the required confirmations, dispatch @@ -110,8 +121,12 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) - ntfn.Event.Confirmed <- txConf - ntfn.dispatched = true + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + case ntfn.Event.Confirmed <- txConf: + ntfn.dispatched = true + } } else { ntfn.details = txConf ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] @@ -131,6 +146,8 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { tcn.confTxsByInitialHeight[txConf.BlockHeight] = append(tcn.confTxsByInitialHeight[txConf.BlockHeight], ntfn.TxID) } + + return nil } // ConnectTip handles a new block extending the current chain. This checks each @@ -141,6 +158,12 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32, txns []*btcutil.Tx) error { + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + default: + } + if blockHeight != tcn.currentHeight+1 { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, new height=%d", @@ -180,8 +203,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) - ntfn.Event.Confirmed <- ntfn.details - ntfn.dispatched = true + select { + case ntfn.Event.Confirmed <- ntfn.details: + ntfn.dispatched = true + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + } } delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) @@ -204,6 +231,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, // block, internal structures are updated to ensure a confirmation notification // is not sent unless the transaction is included in the new chain. func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + default: + } + if blockHeight != tcn.currentHeight { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, disconnected height=%d", @@ -223,8 +256,9 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // negative conf if the receiver has not processed it yet. // This ensures sends to the Confirmed channel are always // non-blocking. - default: - ntfn.Event.NegativeConf <- int32(tcn.reorgDepth) + case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") } ntfn.dispatched = false continue @@ -247,6 +281,8 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // This closes the event channels of all registered notifications that have // not been dispatched yet. func (tcn *TxConfNotifier) TearDown() { + close(tcn.quit) + for _, ntfns := range tcn.confNotifications { for _, ntfn := range ntfns { if ntfn.dispatched {