chainntnfs: Implement quit signal in TxConfNotifier.

This commit is contained in:
Jim Posen 2017-12-04 13:30:33 -08:00 committed by Olaoluwa Osuntokun
parent 2639b58e4b
commit 280e264e8c
3 changed files with 52 additions and 10 deletions

@ -284,7 +284,10 @@ out:
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
b.txConfNotifier.Register(&msg.ConfNtfn, txConf) err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil {
chainntnfs.Log.Error(err)
}
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg

@ -316,7 +316,10 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
} }
} }
n.txConfNotifier.Register(&msg.ConfNtfn, txConf) err = n.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil {
chainntnfs.Log.Error(err)
}
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")

@ -75,6 +75,10 @@ type TxConfNotifier struct {
// ntfnsByConfirmHeight is an index of notification requests by the height // ntfnsByConfirmHeight is an index of notification requests by the height
// at which the transaction will have sufficient confirmations. // at which the transaction will have sufficient confirmations.
ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{}
// quit is closed in order to signal that the notifier is gracefully
// exiting.
quit chan struct{}
} }
// NewTxConfNotifier creates a TxConfNotifier. The current height of the // NewTxConfNotifier creates a TxConfNotifier. The current height of the
@ -86,6 +90,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif
confNotifications: make(map[chainhash.Hash][]*ConfNtfn), confNotifications: make(map[chainhash.Hash][]*ConfNtfn),
confTxsByInitialHeight: make(map[uint32][]*chainhash.Hash), confTxsByInitialHeight: make(map[uint32][]*chainhash.Hash),
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
quit: make(chan struct{}),
} }
} }
@ -95,12 +100,18 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif
// confirmation details must be given as the txConf argument, otherwise it // confirmation details must be given as the txConf argument, otherwise it
// should be nil. If the transaction already has the sufficient number of // should be nil. If the transaction already has the sufficient number of
// confirmations, this dispatches the notification immediately. // confirmations, this dispatches the notification immediately.
func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) { func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error {
select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
default:
}
if txConf == nil || txConf.BlockHeight > tcn.currentHeight { if txConf == nil || txConf.BlockHeight > tcn.currentHeight {
// Transaction is unconfirmed. // Transaction is unconfirmed.
tcn.confNotifications[*ntfn.TxID] = tcn.confNotifications[*ntfn.TxID] =
append(tcn.confNotifications[*ntfn.TxID], ntfn) append(tcn.confNotifications[*ntfn.TxID], ntfn)
return return nil
} }
// If the transaction already has the required confirmations, dispatch // If the transaction already has the required confirmations, dispatch
@ -110,8 +121,12 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) {
if confHeight <= tcn.currentHeight { if confHeight <= tcn.currentHeight {
Log.Infof("Dispatching %v conf notification for %v", Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID) ntfn.NumConfirmations, ntfn.TxID)
ntfn.Event.Confirmed <- txConf select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
case ntfn.Event.Confirmed <- txConf:
ntfn.dispatched = true ntfn.dispatched = true
}
} else { } else {
ntfn.details = txConf ntfn.details = txConf
ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
@ -131,6 +146,8 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) {
tcn.confTxsByInitialHeight[txConf.BlockHeight] = tcn.confTxsByInitialHeight[txConf.BlockHeight] =
append(tcn.confTxsByInitialHeight[txConf.BlockHeight], ntfn.TxID) append(tcn.confTxsByInitialHeight[txConf.BlockHeight], ntfn.TxID)
} }
return nil
} }
// ConnectTip handles a new block extending the current chain. This checks each // ConnectTip handles a new block extending the current chain. This checks each
@ -141,6 +158,12 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) {
func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
blockHeight uint32, txns []*btcutil.Tx) error { blockHeight uint32, txns []*btcutil.Tx) error {
select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
default:
}
if blockHeight != tcn.currentHeight+1 { if blockHeight != tcn.currentHeight+1 {
return fmt.Errorf("Received blocks out of order: "+ return fmt.Errorf("Received blocks out of order: "+
"current height=%d, new height=%d", "current height=%d, new height=%d",
@ -180,8 +203,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] {
Log.Infof("Dispatching %v conf notification for %v", Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID) ntfn.NumConfirmations, ntfn.TxID)
ntfn.Event.Confirmed <- ntfn.details select {
case ntfn.Event.Confirmed <- ntfn.details:
ntfn.dispatched = true ntfn.dispatched = true
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
}
} }
delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight)
@ -204,6 +231,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
// block, internal structures are updated to ensure a confirmation notification // block, internal structures are updated to ensure a confirmation notification
// is not sent unless the transaction is included in the new chain. // is not sent unless the transaction is included in the new chain.
func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
default:
}
if blockHeight != tcn.currentHeight { if blockHeight != tcn.currentHeight {
return fmt.Errorf("Received blocks out of order: "+ return fmt.Errorf("Received blocks out of order: "+
"current height=%d, disconnected height=%d", "current height=%d, disconnected height=%d",
@ -223,8 +256,9 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
// negative conf if the receiver has not processed it yet. // negative conf if the receiver has not processed it yet.
// This ensures sends to the Confirmed channel are always // This ensures sends to the Confirmed channel are always
// non-blocking. // non-blocking.
default: case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth):
ntfn.Event.NegativeConf <- int32(tcn.reorgDepth) case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
} }
ntfn.dispatched = false ntfn.dispatched = false
continue continue
@ -247,6 +281,8 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
// This closes the event channels of all registered notifications that have // This closes the event channels of all registered notifications that have
// not been dispatched yet. // not been dispatched yet.
func (tcn *TxConfNotifier) TearDown() { func (tcn *TxConfNotifier) TearDown() {
close(tcn.quit)
for _, ntfns := range tcn.confNotifications { for _, ntfns := range tcn.confNotifications {
for _, ntfn := range ntfns { for _, ntfn := range ntfns {
if ntfn.dispatched { if ntfn.dispatched {