diff --git a/chainntnfs/txnotifier.go b/chainntnfs/txnotifier.go index 04fc81a9..13c45ff7 100644 --- a/chainntnfs/txnotifier.go +++ b/chainntnfs/txnotifier.go @@ -1118,9 +1118,11 @@ out: } // DisconnectTip handles the tip of the current chain being disconnected during -// a chain reorganization. If any watched transactions were included in this -// block, internal structures are updated to ensure a confirmation notification -// is not sent unless the transaction is included in the new chain. +// a chain reorganization. If any watched transactions or spending transactions +// for registered outpoints were included in this block, internal structures are +// updated to ensure confirmation/spend notifications are consumed (if not +// already), and reorg notifications are dispatched instead. Confirmation/spend +// notifications will be dispatched again upon block inclusion. func (n *TxNotifier) DisconnectTip(blockHeight uint32) error { select { case <-n.quit: @@ -1193,9 +1195,34 @@ func (n *TxNotifier) DisconnectTip(blockHeight uint32) error { } } - // Finally, we can remove the transactions we're currently watching that - // were included in this block height. + // We'll also go through our watched outpoints and attempt to drain + // their dispatched notifications to ensure dispatching notifications to + // clients later on is always non-blocking. We're only interested in + // outpoints whose spending transaction was included at the height being + // disconnected. + for op := range n.opsBySpendHeight[blockHeight] { + // Since the spending transaction is being reorged out of the + // chain, we'll need to clear out the spending details of the + // outpoint. + spendSet := n.spendNotifications[op] + spendSet.details = nil + + // For all requests which have had a spend notification + // dispatched, we'll attempt to drain it and send a reorg + // notification instead. + for _, ntfn := range spendSet.ntfns { + if err := n.dispatchSpendReorg(ntfn); err != nil { + return err + } + } + } + + // Finally, we can remove the transactions that were confirmed and the + // outpoints that were spent at the height being disconnected. We'll + // still continue to track them until they have been confirmed/spent and + // are no longer under the risk of being reorged out of the chain again. delete(n.txsByInitialHeight, blockHeight) + delete(n.opsBySpendHeight, blockHeight) return nil } @@ -1243,6 +1270,35 @@ func (n *TxNotifier) dispatchConfReorg(ntfn *ConfNtfn, return nil } +// dispatchSpendReorg dispatches a reorg notification to the client if a spend +// notiification was already delivered. +// +// NOTE: This must be called with the TxNotifier's lock held. +func (n *TxNotifier) dispatchSpendReorg(ntfn *SpendNtfn) error { + if !ntfn.dispatched { + return nil + } + + // Attempt to drain the spend notification to ensure sends to the Spend + // channel are always non-blocking. + select { + case <-ntfn.Event.Spend: + default: + } + + // Send a reorg notification to the client in order for them to + // correctly handle reorgs. + select { + case ntfn.Event.Reorg <- struct{}{}: + case <-n.quit: + return ErrTxNotifierExiting + } + + ntfn.dispatched = false + + return nil +} + // TearDown is to be called when the owner of the TxNotifier is exiting. This // closes the event channels of all registered notifications that have not been // dispatched yet.