diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 6675207b..0d422616 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -885,16 +885,19 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, // Construct a notification request for the transaction and send it to // the main event loop. + confID := atomic.AddUint64(&b.confClientCounter, 1) confRequest, err := chainntnfs.NewConfRequest(txid, pkScript) if err != nil { return nil, err } ntfn := &chainntnfs.ConfNtfn{ - ConfID: atomic.AddUint64(&b.confClientCounter, 1), + ConfID: confID, ConfRequest: confRequest, NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(numConfs), - HeightHint: heightHint, + Event: chainntnfs.NewConfirmationEvent(numConfs, func() { + b.txNotifier.CancelConf(confRequest, confID) + }), + HeightHint: heightHint, } chainntnfs.Log.Infof("New confirmation subscription: %v, num_confs=%v", diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 4f432dbc..d4979914 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -925,16 +925,19 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, // Construct a notification request for the transaction and send it to // the main event loop. + confID := atomic.AddUint64(&b.confClientCounter, 1) confRequest, err := chainntnfs.NewConfRequest(txid, pkScript) if err != nil { return nil, err } ntfn := &chainntnfs.ConfNtfn{ - ConfID: atomic.AddUint64(&b.confClientCounter, 1), + ConfID: confID, ConfRequest: confRequest, NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(numConfs), - HeightHint: heightHint, + Event: chainntnfs.NewConfirmationEvent(numConfs, func() { + b.txNotifier.CancelConf(confRequest, confID) + }), + HeightHint: heightHint, } chainntnfs.Log.Infof("New confirmation subscription: %v, num_confs=%v ", diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index b03d9973..65bad7e3 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -72,8 +72,6 @@ func (t TxConfStatus) String() string { // // Concrete implementations of ChainNotifier should be able to support multiple // concurrent client requests, as well as multiple concurrent notification events. -// TODO(roasbeef): all events should have a Cancel() method to free up the -// resource type ChainNotifier interface { // RegisterConfirmationsNtfn registers an intent to be notified once // txid reaches numConfs confirmations. We also pass in the pkScript as @@ -175,6 +173,9 @@ type TxConfirmation struct { // If the event that the original transaction becomes re-org'd out of the main // chain, the 'NegativeConf' will be sent upon with a value representing the // depth of the re-org. +// +// NOTE: If the caller wishes to cancel their registered spend notification, +// the Cancel closure MUST be called. type ConfirmationEvent struct { // Confirmed is a channel that will be sent upon once the transaction // has been fully confirmed. The struct sent will contain all the @@ -191,26 +192,27 @@ type ConfirmationEvent struct { // confirmations. Updates chan uint32 - // TODO(roasbeef): all goroutines on ln channel updates should also - // have a struct chan that's closed if funding gets re-org out. Need - // to sync, to request another confirmation event ntfn, then re-open - // channel after confs. - // NegativeConf is a channel that will be sent upon if the transaction // confirms, but is later reorged out of the chain. The integer sent // through the channel represents the reorg depth. // // NOTE: This channel must be buffered. NegativeConf chan int32 + + // Cancel is a closure that should be executed by the caller in the case + // that they wish to prematurely abandon their registered confirmation + // notification. + Cancel func() } // NewConfirmationEvent constructs a new ConfirmationEvent with newly opened // channels. -func NewConfirmationEvent(numConfs uint32) *ConfirmationEvent { +func NewConfirmationEvent(numConfs uint32, cancel func()) *ConfirmationEvent { return &ConfirmationEvent{ Confirmed: make(chan *TxConfirmation, 1), Updates: make(chan uint32, numConfs), NegativeConf: make(chan int32, 1), + Cancel: cancel, } } @@ -247,8 +249,8 @@ type SpendEvent struct { // NOTE: This channel must be buffered. Reorg chan struct{} - // Cancel is a closure that should be executed by the caller in the - // case that they wish to prematurely abandon their registered spend + // Cancel is a closure that should be executed by the caller in the case + // that they wish to prematurely abandon their registered spend // notification. Cancel func() } @@ -287,8 +289,8 @@ type BlockEpochEvent struct { // NOTE: This channel must be buffered. Epochs <-chan *BlockEpoch - // Cancel is a closure that should be executed by the caller in the - // case that they wish to abandon their registered spend notification. + // Cancel is a closure that should be executed by the caller in the case + // that they wish to abandon their registered block epochs notification. Cancel func() } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 6a1eafa9..6a3e5738 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -795,16 +795,19 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, // Construct a notification request for the transaction and send it to // the main event loop. + confID := atomic.AddUint64(&n.confClientCounter, 1) confRequest, err := chainntnfs.NewConfRequest(txid, pkScript) if err != nil { return nil, err } ntfn := &chainntnfs.ConfNtfn{ - ConfID: atomic.AddUint64(&n.confClientCounter, 1), + ConfID: confID, ConfRequest: confRequest, NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(numConfs), - HeightHint: heightHint, + Event: chainntnfs.NewConfirmationEvent(numConfs, func() { + n.txNotifier.CancelConf(confRequest, confID) + }), + HeightHint: heightHint, } chainntnfs.Log.Infof("New confirmation subscription: %v, num_confs=%v", diff --git a/chainntnfs/txnotifier.go b/chainntnfs/txnotifier.go index a6a39779..9cf75da0 100644 --- a/chainntnfs/txnotifier.go +++ b/chainntnfs/txnotifier.go @@ -602,6 +602,38 @@ func (n *TxNotifier) RegisterConf(ntfn *ConfNtfn) (*HistoricalConfDispatch, return dispatch, n.currentHeight, nil } +// CancelConf cancels an existing request for a spend notification of an +// outpoint/output script. The request is identified by its spend ID. +func (n *TxNotifier) CancelConf(confRequest ConfRequest, confID uint64) { + select { + case <-n.quit: + return + default: + } + + n.Lock() + defer n.Unlock() + + confSet, ok := n.confNotifications[confRequest] + if !ok { + return + } + ntfn, ok := confSet.ntfns[confID] + if !ok { + return + } + + Log.Infof("Canceling confirmation notification: conf_id=%d, %v", confID, + confRequest) + + // We'll close all the notification channels to let the client know + // their cancel request has been fulfilled. + close(ntfn.Event.Confirmed) + close(ntfn.Event.Updates) + close(ntfn.Event.NegativeConf) + delete(confSet.ntfns, confID) +} + // UpdateConfDetails attempts to update the confirmation details for an active // notification within the notifier. This should only be used in the case of a // transaction/output script that has confirmed before the notifier's current @@ -904,9 +936,6 @@ func (n *TxNotifier) CancelSpend(spendRequest SpendRequest, spendID uint64) { n.Lock() defer n.Unlock() - Log.Infof("Canceling spend notification: spend_id=%d, %v", spendID, - spendRequest) - spendSet, ok := n.spendNotifications[spendRequest] if !ok { return @@ -916,6 +945,9 @@ func (n *TxNotifier) CancelSpend(spendRequest SpendRequest, spendID uint64) { return } + Log.Infof("Canceling spend notification: spend_id=%d, %v", spendID, + spendRequest) + // We'll close all the notification channels to let the client know // their cancel request has been fulfilled. close(ntfn.Event.Spend) diff --git a/chainntnfs/txnotifier_test.go b/chainntnfs/txnotifier_test.go index aa5e8892..fb8d9beb 100644 --- a/chainntnfs/txnotifier_test.go +++ b/chainntnfs/txnotifier_test.go @@ -143,7 +143,7 @@ func TestTxNotifierMaxConfs(t *testing.T) { }, NumConfirmations: chainntnfs.MaxNumConfs + 1, Event: chainntnfs.NewConfirmationEvent( - chainntnfs.MaxNumConfs, + chainntnfs.MaxNumConfs, nil, ), } if _, _, err := n.RegisterConf(ntfn); err != chainntnfs.ErrTxMaxConfs { @@ -182,7 +182,7 @@ func TestTxNotifierFutureConfDispatch(t *testing.T) { PkScript: testScript, }, NumConfirmations: tx1NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), + Event: chainntnfs.NewConfirmationEvent(tx1NumConfs, nil), } if _, _, err := n.RegisterConf(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) @@ -196,7 +196,7 @@ func TestTxNotifierFutureConfDispatch(t *testing.T) { PkScript: testScript, }, NumConfirmations: tx2NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs, nil), } if _, _, err := n.RegisterConf(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) @@ -365,7 +365,7 @@ func TestTxNotifierHistoricalConfDispatch(t *testing.T) { ConfID: 0, ConfRequest: chainntnfs.ConfRequest{TxID: tx1Hash}, NumConfirmations: tx1NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), + Event: chainntnfs.NewConfirmationEvent(tx1NumConfs, nil), } if _, _, err := n.RegisterConf(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) @@ -376,7 +376,7 @@ func TestTxNotifierHistoricalConfDispatch(t *testing.T) { ConfID: 1, ConfRequest: chainntnfs.ConfRequest{TxID: tx2Hash}, NumConfirmations: tx2NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs, nil), } if _, _, err := n.RegisterConf(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) @@ -697,7 +697,7 @@ func TestTxNotifierMultipleHistoricalConfRescans(t *testing.T) { ConfID: 0, // TODO(wilmer): set pkScript. ConfRequest: chainntnfs.ConfRequest{TxID: chainntnfs.ZeroHash}, - Event: chainntnfs.NewConfirmationEvent(1), + Event: chainntnfs.NewConfirmationEvent(1, nil), } historicalConfDispatch1, _, err := n.RegisterConf(confNtfn1) if err != nil { @@ -714,7 +714,7 @@ func TestTxNotifierMultipleHistoricalConfRescans(t *testing.T) { ConfID: 1, // TODO(wilmer): set pkScript. ConfRequest: chainntnfs.ConfRequest{TxID: chainntnfs.ZeroHash}, - Event: chainntnfs.NewConfirmationEvent(1), + Event: chainntnfs.NewConfirmationEvent(1, nil), } historicalConfDispatch2, _, err := n.RegisterConf(confNtfn2) if err != nil { @@ -739,7 +739,7 @@ func TestTxNotifierMultipleHistoricalConfRescans(t *testing.T) { confNtfn3 := &chainntnfs.ConfNtfn{ ConfID: 2, ConfRequest: chainntnfs.ConfRequest{TxID: chainntnfs.ZeroHash}, - Event: chainntnfs.NewConfirmationEvent(1), + Event: chainntnfs.NewConfirmationEvent(1, nil), } historicalConfDispatch3, _, err := n.RegisterConf(confNtfn3) if err != nil { @@ -860,7 +860,7 @@ func TestTxNotifierMultipleHistoricalNtfns(t *testing.T) { confNtfns[i] = &chainntnfs.ConfNtfn{ ConfID: i, ConfRequest: confRequest, - Event: chainntnfs.NewConfirmationEvent(1), + Event: chainntnfs.NewConfirmationEvent(1, nil), } if _, _, err := n.RegisterConf(confNtfns[i]); err != nil { t.Fatalf("unable to register conf ntfn #%d: %v", i, err) @@ -882,6 +882,7 @@ func TestTxNotifierMultipleHistoricalNtfns(t *testing.T) { // it can stop watching at tip. expectedConfDetails := &chainntnfs.TxConfirmation{ BlockHeight: startingHeight - 1, + Tx: wire.NewMsgTx(1), } err := n.UpdateConfDetails(confNtfns[0].ConfRequest, expectedConfDetails) if err != nil { @@ -907,7 +908,7 @@ func TestTxNotifierMultipleHistoricalNtfns(t *testing.T) { extraConfNtfn := &chainntnfs.ConfNtfn{ ConfID: numNtfns + 1, ConfRequest: confRequest, - Event: chainntnfs.NewConfirmationEvent(1), + Event: chainntnfs.NewConfirmationEvent(1, nil), } historicalConfRescan, _, err := n.RegisterConf(extraConfNtfn) if err != nil { @@ -1002,6 +1003,92 @@ func TestTxNotifierMultipleHistoricalNtfns(t *testing.T) { } } +// TestTxNotifierCancelConf ensures that a confirmation notification after a +// client has canceled their intent to receive one. +func TestTxNotifierCancelConf(t *testing.T) { + t.Parallel() + + const startingHeight = 10 + hintCache := newMockHintCache() + n := chainntnfs.NewTxNotifier(startingHeight, 100, hintCache, hintCache) + + // We'll register two notification requests. Only the second one will be + // canceled. + tx1 := wire.NewMsgTx(1) + tx1.AddTxOut(&wire.TxOut{PkScript: testRawScript}) + ntfn1 := &chainntnfs.ConfNtfn{ + ConfID: 1, + ConfRequest: chainntnfs.ConfRequest{ + TxID: tx1.TxHash(), + PkScript: testScript, + }, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(1, nil), + } + if _, _, err := n.RegisterConf(ntfn1); err != nil { + t.Fatalf("unable to register spend ntfn: %v", err) + } + + tx2 := wire.NewMsgTx(2) + tx2.AddTxOut(&wire.TxOut{PkScript: testRawScript}) + ntfn2 := &chainntnfs.ConfNtfn{ + ConfID: 2, + ConfRequest: chainntnfs.ConfRequest{ + TxID: tx2.TxHash(), + PkScript: testScript, + }, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(1, nil), + } + if _, _, err := n.RegisterConf(ntfn2); err != nil { + t.Fatalf("unable to register spend ntfn: %v", err) + } + + // Construct a block that will confirm both transactions. + block := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{tx1, tx2}, + }) + tx1ConfDetails := &chainntnfs.TxConfirmation{ + BlockHeight: startingHeight + 1, + BlockHash: block.Hash(), + TxIndex: 0, + Tx: tx1, + } + + // Before extending the notifier's tip with the block above, we'll + // cancel the second request. + n.CancelConf(ntfn2.ConfRequest, ntfn2.ConfID) + + err := n.ConnectTip(block.Hash(), startingHeight+1, block.Transactions()) + if err != nil { + t.Fatalf("unable to connect block: %v", err) + } + if err := n.NotifyHeight(startingHeight + 1); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } + + // The first request should still be active, so we should receive a + // confirmation notification with the correct details. + select { + case confDetails := <-ntfn1.Event.Confirmed: + assertConfDetails(t, confDetails, tx1ConfDetails) + default: + t.Fatalf("expected to receive confirmation notification") + } + + // The second one, however, should not have. The event's Confrimed + // channel must have also been closed to indicate the caller that the + // TxNotifier can no longer fulfill their canceled request. + select { + case _, ok := <-ntfn2.Event.Confirmed: + if ok { + t.Fatal("expected Confirmed channel to be closed") + } + default: + t.Fatal("expected Confirmed channel to be closed") + } +} + // TestTxNotifierCancelSpend ensures that a spend notification after a client // has canceled their intent to receive one. func TestTxNotifierCancelSpend(t *testing.T) { @@ -1120,7 +1207,7 @@ func TestTxNotifierConfReorg(t *testing.T) { PkScript: testScript, }, NumConfirmations: tx1NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), + Event: chainntnfs.NewConfirmationEvent(tx1NumConfs, nil), } if _, _, err := n.RegisterConf(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) @@ -1140,7 +1227,7 @@ func TestTxNotifierConfReorg(t *testing.T) { PkScript: testScript, }, NumConfirmations: tx2NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs, nil), } if _, _, err := n.RegisterConf(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) @@ -1160,7 +1247,7 @@ func TestTxNotifierConfReorg(t *testing.T) { PkScript: testScript, }, NumConfirmations: tx3NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), + Event: chainntnfs.NewConfirmationEvent(tx3NumConfs, nil), } if _, _, err := n.RegisterConf(&ntfn3); err != nil { t.Fatalf("unable to register ntfn: %v", err) @@ -1646,7 +1733,7 @@ func TestTxNotifierConfirmHintCache(t *testing.T) { PkScript: testScript, }, NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(1), + Event: chainntnfs.NewConfirmationEvent(1, nil), } tx2 := wire.MsgTx{Version: 2} @@ -1658,7 +1745,7 @@ func TestTxNotifierConfirmHintCache(t *testing.T) { PkScript: testScript, }, NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(2), + Event: chainntnfs.NewConfirmationEvent(2, nil), } if _, _, err := n.RegisterConf(ntfn1); err != nil { @@ -2039,7 +2126,7 @@ func TestTxNotifierTearDown(t *testing.T) { ConfID: 1, ConfRequest: chainntnfs.ConfRequest{TxID: chainntnfs.ZeroHash}, NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(1), + Event: chainntnfs.NewConfirmationEvent(1, nil), } if _, _, err := n.RegisterConf(confNtfn); err != nil { t.Fatalf("unable to register conf ntfn: %v", err)