From ad904ebe8c9cb06e53cf66392f1938bbfa65b27d Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 26 Jul 2018 21:27:27 -0700 Subject: [PATCH 1/6] chainntnfs/txconfnotifier: use concrete ErrTxConfNotifierExiting error --- chainntnfs/txconfnotifier.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index b10f4272..fef53547 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -8,6 +8,12 @@ import ( "github.com/btcsuite/btcutil" ) +var ( + // ErrTxConfNotifierExiting is an error returned when attempting to + // interact with the TxConfNotifier but it been shut down. + ErrTxConfNotifierExiting = errors.New("TxConfNotifier is exiting") +) + // ConfNtfn represents a notifier client's request to receive a notification // once the target transaction gets sufficient confirmations. The client is // asynchronously notified via the ConfirmationEvent channels. @@ -185,7 +191,7 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } @@ -256,7 +262,7 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, select { case ntfn.Event.Updates <- numConfsLeft: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } } } @@ -267,11 +273,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) + select { case ntfn.Event.Confirmed <- ntfn.details: ntfn.dispatched = true case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } } delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) @@ -321,7 +328,7 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case <-ntfn.Event.Updates: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } @@ -340,7 +347,7 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case <-ntfn.Event.Confirmed: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } @@ -352,7 +359,7 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } continue From c43506dee9c0ec06b8f44b77f186131ae96d8f52 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 26 Jul 2018 21:30:15 -0700 Subject: [PATCH 2/6] chainntnfs: add unique ID field to track conf ntfns within notifier --- chainntnfs/bitcoindnotify/bitcoind.go | 2 ++ chainntnfs/btcdnotify/btcd.go | 2 ++ chainntnfs/neutrinonotify/neutrino.go | 8 +++++--- chainntnfs/txconfnotifier.go | 8 ++++++-- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 2139e1f6..dc898a04 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -54,6 +54,7 @@ type chainUpdate struct { // chain client. Multiple concurrent clients are supported. All notifications // are achieved via non-blocking sends on client channels. type BitcoindNotifier struct { + confClientCounter uint64 // To be used atomically. spendClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically. @@ -716,6 +717,7 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, ntfn := &confirmationNotification{ ConfNtfn: chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&b.confClientCounter, 1), TxID: txid, NumConfirmations: numConfs, Event: chainntnfs.NewConfirmationEvent(numConfs), diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 371be550..5e6ee526 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -61,6 +61,7 @@ type txUpdate struct { // notifications. Multiple concurrent clients are supported. All notifications // are achieved via non-blocking sends on client channels. type BtcdNotifier struct { + confClientCounter uint64 // To be used aotmically. spendClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically. @@ -800,6 +801,7 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, ntfn := &confirmationNotification{ ConfNtfn: chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&b.confClientCounter, 1), TxID: txid, NumConfirmations: numConfs, Event: chainntnfs.NewConfirmationEvent(numConfs), diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 2ab2ad05..d811f0e5 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -48,12 +48,13 @@ var ( // TODO(roasbeef): heavily consolidate with NeutrinoNotifier code // * maybe combine into single package? type NeutrinoNotifier struct { - started int32 // To be used atomically. - stopped int32 // To be used atomically. - + confClientCounter uint64 // To be used atomically. spendClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically. + started int32 // To be used atomically. + stopped int32 // To be used atomically. + heightMtx sync.RWMutex bestHeight uint32 @@ -696,6 +697,7 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, ntfn := &confirmationsNotification{ ConfNtfn: chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&n.confClientCounter, 1), TxID: txid, NumConfirmations: numConfs, Event: chainntnfs.NewConfirmationEvent(numConfs), diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index fef53547..3fe08e40 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -18,6 +18,10 @@ var ( // once the target transaction gets sufficient confirmations. The client is // asynchronously notified via the ConfirmationEvent channels. type ConfNtfn struct { + // ConfID uniquely identifies the confirmation notification request for + // the specified transaction. + ConfID uint64 + // TxID is the hash of the transaction for which confirmation notifications // are requested. TxID *chainhash.Hash @@ -72,7 +76,7 @@ type TxConfNotifier struct { // confNotifications is an index of notification requests by transaction // hash. - confNotifications map[chainhash.Hash][]*ConfNtfn + confNotifications map[chainhash.Hash]map[uint64]*ConfNtfn // txsByInitialHeight is an index of watched transactions by the height // that they are included at in the blockchain. This is tracked so that @@ -95,7 +99,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif return &TxConfNotifier{ currentHeight: startHeight, reorgSafetyLimit: reorgSafetyLimit, - confNotifications: make(map[chainhash.Hash][]*ConfNtfn), + confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), quit: make(chan struct{}), From 867d8524bf6861f5f9c44ba891aab5158a709d6f Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 26 Jul 2018 21:31:32 -0700 Subject: [PATCH 3/6] chainntnfs/txconfnotifier: make confirmation notifcation registration async In this commit, we modify our TxConfNotifier struct to allow handling notification registrations asynchronously. The Register method has been refactored into two: Register and UpdateConfDetails. In the case that a transaction we registered for notifications on has already confirmed, we'll need to determine its confirmation details on our own. Once done, this can be provided within UpdateConfDetails. This change will pave down the road for our different chain notifiers to handle potentially long rescans asynchronously to prevent blocking the caller. --- chainntnfs/txconfnotifier.go | 133 ++++++++++++++++++++++-------- chainntnfs/txconfnotifier_test.go | 89 +++++++++++++------- 2 files changed, 158 insertions(+), 64 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 3fe08e40..bb049b8a 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -3,6 +3,7 @@ package chainntnfs import ( "errors" "fmt" + "sync" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcutil" @@ -91,6 +92,8 @@ type TxConfNotifier struct { // quit is closed in order to signal that the notifier is gracefully // exiting. quit chan struct{} + + sync.Mutex } // NewTxConfNotifier creates a TxConfNotifier. The current height of the @@ -108,27 +111,83 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif // Register handles a new notification request. The client will be notified when // the transaction gets a sufficient number of confirmations on the blockchain. -// If the transaction has already been included in a block on the chain, the -// confirmation details must be given as the txConf argument, otherwise it -// should be nil. If the transaction already has the sufficient number of -// confirmations, this dispatches the notification immediately. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error { +// +// NOTE: If the transaction has already been included in a block on the chain, +// the confirmation details must be provided with the UpdateConfDetails method, +// otherwise we will wait for the transaction to confirm even though it already +// has. +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } - if txConf == nil || txConf.BlockHeight > tcn.currentHeight { - // Transaction is unconfirmed. - tcn.confNotifications[*ntfn.TxID] = - append(tcn.confNotifications[*ntfn.TxID], ntfn) + tcn.Lock() + defer tcn.Unlock() + + ntfns, ok := tcn.confNotifications[*ntfn.TxID] + if !ok { + ntfns = make(map[uint64]*ConfNtfn) + tcn.confNotifications[*ntfn.TxID] = ntfns + } + + ntfns[ntfn.ConfID] = ntfn + + return nil +} + +// UpdateConfDetails attempts to update the confirmation details for an active +// notification within the notifier. This should only be used in the case of a +// transaction that has confirmed before the notifier's current height. +// +// NOTE: The notification should be registered first to ensure notifications are +// dispatched correctly. +func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, + clientID uint64, details *TxConfirmation) error { + + select { + case <-tcn.quit: + return ErrTxConfNotifierExiting + default: + } + + // Ensure we hold the lock throughout handling the notification to + // prevent the notifier from advancing its height underneath us. + tcn.Lock() + defer tcn.Unlock() + + // First, we'll determine whether we have an active notification for + // this transaction with the given ID. + ntfns, ok := tcn.confNotifications[txid] + if !ok { + return fmt.Errorf("no notifications found for txid %v", txid) + } + + ntfn, ok := ntfns[clientID] + if !ok { + return fmt.Errorf("no notification found with ID %v", clientID) + } + + // If the notification has already recognized that the transaction + // confirmed, there's nothing left for us to do. + if ntfn.details != nil { return nil } - // If the transaction already has the required confirmations, we'll - // dispatch the notification immediately. - confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 + // The notifier has yet to reach the height at which the transaction was + // included in a block, so we should defer until handling it then within + // ConnectTip. + if details == nil || details.BlockHeight > tcn.currentHeight { + return nil + } + + ntfn.details = details + + // Now, we'll examine whether the transaction of this notification + // request has reched its required number of confirmations. If it has, + // we'll disaptch a confirmation notification to the caller. + confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) @@ -136,21 +195,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro // We'll send a 0 value to the Updates channel, indicating that // the transaction has already been confirmed. select { - case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") case ntfn.Event.Updates <- 0: + case <-tcn.quit: + return ErrTxConfNotifierExiting } select { - case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") - case ntfn.Event.Confirmed <- txConf: + case ntfn.Event.Confirmed <- details: ntfn.dispatched = true + case <-tcn.quit: + return ErrTxConfNotifierExiting } } else { - // Otherwise, we'll record the transaction along with the height - // at which we should notify the client. - ntfn.details = txConf + // Otherwise, we'll keep track of the notification request by + // the height at which we should dispatch the confirmation + // notification. ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { ntfnSet = make(map[*ConfNtfn]struct{}) @@ -164,22 +223,19 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro select { case ntfn.Event.Updates <- numConfsLeft: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } } // As a final check, we'll also watch the transaction if it's still - // possible for it to get reorganized out of the chain. - if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { - tcn.confNotifications[*ntfn.TxID] = - append(tcn.confNotifications[*ntfn.TxID], ntfn) - - txSet, exists := tcn.txsByInitialHeight[txConf.BlockHeight] + // possible for it to get reorged out of the chain. + if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { + txSet, exists := tcn.txsByInitialHeight[details.BlockHeight] if !exists { txSet = make(map[chainhash.Hash]struct{}) - tcn.txsByInitialHeight[txConf.BlockHeight] = txSet + tcn.txsByInitialHeight[details.BlockHeight] = txSet } - txSet[*ntfn.TxID] = struct{}{} + txSet[txid] = struct{}{} } return nil @@ -199,6 +255,9 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, default: } + tcn.Lock() + defer tcn.Unlock() + if blockHeight != tcn.currentHeight+1 { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, new height=%d", @@ -244,8 +303,10 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, for _, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { for _, ntfn := range tcn.confNotifications[txHash] { - // If the transaction still hasn't been included - // in a block, we'll skip it. + // If the notification hasn't learned about the + // confirmation of its transaction yet (in the + // case of historical confirmations), we'll skip + // it. if ntfn.details == nil { continue } @@ -308,10 +369,13 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } + tcn.Lock() + defer tcn.Unlock() + if blockHeight != tcn.currentHeight { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, disconnected height=%d", @@ -394,6 +458,9 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // This closes the event channels of all registered notifications that have // not been dispatched yet. func (tcn *TxConfNotifier) TearDown() { + tcn.Lock() + defer tcn.Unlock() + close(tcn.quit) for _, ntfns := range tcn.confNotifications { diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 96b7d145..2a57245a 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -3,10 +3,10 @@ package chainntnfs_test import ( "testing" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/chainntnfs" ) var zeroHash chainhash.Hash @@ -38,7 +38,9 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -46,7 +48,9 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // We should not receive any notifications from both transactions // since they have not been included in a block yet. @@ -202,32 +206,37 @@ func TestTxConfHistoricalDispatch(t *testing.T) { // starting height so that they are confirmed once registering them. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ + ConfID: 0, TxID: &tx1Hash, NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + ConfID: 1, + TxID: &tx2Hash, + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), + } + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + // Update tx1 with its confirmation details. We should only receive one + // update since it only requires one confirmation and it already met it. txConf1 := chainntnfs.TxConfirmation{ BlockHash: &zeroHash, BlockHeight: 9, TxIndex: 1, } - txConfNotifier.Register(&ntfn1, &txConf1) - - tx2Hash := tx2.TxHash() - txConf2 := chainntnfs.TxConfirmation{ - BlockHash: &zeroHash, - BlockHeight: 9, - TxIndex: 2, + err := txConfNotifier.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1) + if err != nil { + t.Fatalf("unable to update conf details: %v", err) } - ntfn2 := chainntnfs.ConfNtfn{ - TxID: &tx2Hash, - NumConfirmations: tx2NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), - } - txConfNotifier.Register(&ntfn2, &txConf2) - - // We should only receive one update for tx1 since it only requires - // one confirmation and it already met it. select { case numConfsLeft := <-ntfn1.Event.Updates: const expected = 0 @@ -240,8 +249,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatal("Expected confirmation update for tx1") } - // A confirmation notification for tx1 should be dispatched, as it met - // its required number of confirmations. + // A confirmation notification for tx1 should also be dispatched. select { case txConf := <-ntfn1.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf1) @@ -249,8 +257,19 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } - // We should only receive one update indicating how many confirmations - // are left for the transaction to be confirmed. + // Update tx2 with its confirmation details. This should not trigger a + // confirmation notification since it hasn't reached its required number + // of confirmations, but we should receive a confirmation update + // indicating how many confirmation are left. + txConf2 := chainntnfs.TxConfirmation{ + BlockHash: &zeroHash, + BlockHeight: 9, + TxIndex: 2, + } + err = txConfNotifier.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2) + if err != nil { + t.Fatalf("unable to update conf details: %v", err) + } select { case numConfsLeft := <-ntfn2.Event.Updates: const expected = 1 @@ -263,8 +282,6 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatal("Expected confirmation update for tx2") } - // A confirmation notification for tx2 should not be dispatched yet, as - // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) @@ -277,7 +294,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx3}, }) - err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + err = txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -343,7 +360,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Tx 2 will be confirmed in block 10 and requires 1 conf. tx2Hash := tx2.TxHash() @@ -352,7 +371,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Tx 3 will be confirmed in block 10 and requires 2 confs. tx3Hash := tx3.TxHash() @@ -361,7 +382,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx3NumConfs, Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), } - txConfNotifier.Register(&ntfn3, nil) + if err := txConfNotifier.Register(&ntfn3); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Sync chain to block 10. Txs 1 & 2 should be confirmed. block1 := btcutil.NewBlock(&wire.MsgBlock{ @@ -581,7 +604,9 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 1, Event: chainntnfs.NewConfirmationEvent(1), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -589,7 +614,9 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 2, Event: chainntnfs.NewConfirmationEvent(2), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Include the transactions in a block and add it to the TxConfNotifier. // This should confirm tx1, but not tx2. From 12816a910dc95ca8d18e8c7802dbf6eaa425b7dc Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 26 Jul 2018 21:32:55 -0700 Subject: [PATCH 4/6] chainntnfs: make historical confirmation rescans async --- chainntnfs/bitcoindnotify/bitcoind.go | 61 ++++++++++++++++------- chainntnfs/btcdnotify/btcd.go | 58 ++++++++++++++++------ chainntnfs/neutrinonotify/neutrino.go | 71 +++++++++++++++++++-------- 3 files changed, 137 insertions(+), 53 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index dc898a04..6e3981cf 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -243,27 +243,40 @@ out: "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - _, currentHeight, err := b.chainConn.GetBestBlock() - if err != nil { - chainntnfs.Log.Error(err) - } + currentHeight := uint32(bestHeight) - // Lookup whether the transaction is already included in the - // active chain. - txConf, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, uint32(currentHeight), - ) - if err != nil { - chainntnfs.Log.Error(err) - } + // Look up whether the transaction is already + // included in the active chain. We'll do this + // in a goroutine to prevent blocking + // potentially long rescans. + b.wg.Add(1) + go func() { + defer b.wg.Done() + + confDetails, err := b.historicalConfDetails( + msg.TxID, msg.heightHint, + currentHeight, + ) + if err != nil { + chainntnfs.Log.Error(err) + return + } + + if confDetails != nil { + err := b.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + } + }() - err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) - if err != nil { - chainntnfs.Log.Error(err) - } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + case chain.RelevantTx: b.handleRelevantTx(msg, bestHeight) } @@ -474,6 +487,14 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash, // Begin scanning blocks at every height to determine where the // transaction was included in. for height := heightHint; height <= currentHeight; height++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + default: + } + blockHash, err := b.chainConn.GetBlockHash(int64(height)) if err != nil { return nil, fmt.Errorf("unable to get hash from block "+ @@ -725,11 +746,15 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, heightHint: heightHint, } + if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + return nil, err + } + select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown case b.notificationRegistry <- ntfn: return ntfn.Event, nil + case <-b.quit: + return nil, ErrChainNotifierShuttingDown } } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 5e6ee526..fa4f1826 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -299,24 +299,42 @@ out: b.spendNotifications[op] = make(map[uint64]*spendNotification) } b.spendNotifications[op][msg.spendID] = msg + case *confirmationNotification: chainntnfs.Log.Infof("New confirmation "+ "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - // Lookup whether the transaction is already included in the - // active chain. - txConf, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, uint32(currentHeight), - ) - if err != nil { - chainntnfs.Log.Error(err) - } + bestHeight := uint32(currentHeight) + + // Look up whether the transaction is already + // included in the active chain. We'll do this + // in a goroutine to prevent blocking + // potentially long rescans. + b.wg.Add(1) + go func() { + defer b.wg.Done() + + confDetails, err := b.historicalConfDetails( + msg.TxID, msg.heightHint, + bestHeight, + ) + if err != nil { + chainntnfs.Log.Error(err) + return + } + + if confDetails != nil { + err = b.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + } + }() - err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) - if err != nil { - chainntnfs.Log.Error(err) - } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg @@ -533,6 +551,14 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, // Begin scanning blocks at every height to determine where the // transaction was included in. for height := heightHint; height <= currentHeight; height++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + default: + } + blockHash, err := b.chainConn.GetBlockHash(int64(height)) if err != nil { return nil, fmt.Errorf("unable to get hash from block "+ @@ -809,11 +835,15 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, heightHint: heightHint, } + if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + return nil, err + } + select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown case b.notificationRegistry <- ntfn: return ntfn.Event, nil + case <-b.quit: + return nil, ErrChainNotifierShuttingDown } } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index d811f0e5..7d4dc5d0 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -307,31 +307,48 @@ func (n *NeutrinoNotifier) notificationDispatcher() { currentHeight := n.bestHeight n.heightMtx.RUnlock() - // Lookup whether the transaction is already included in the - // active chain. - txConf, err := n.historicalConfDetails(msg.TxID, currentHeight, - msg.heightHint) - if err != nil { - chainntnfs.Log.Error(err) - } + // Look up whether the transaction is already + // included in the active chain. We'll do this + // in a goroutine to prevent blocking + // potentially long rescans. + n.wg.Add(1) + go func() { + defer n.wg.Done() - if txConf == nil { - // If we can't fully dispatch confirmation, - // then we'll update our filter so we can be - // notified of its future initial confirmation. + confDetails, err := n.historicalConfDetails( + msg.TxID, currentHeight, + msg.heightHint, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + + if confDetails != nil { + err := n.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + return + } + + // If we can't fully dispatch + // confirmation, then we'll update our + // filter so we can be notified of its + // future initial confirmation. rescanUpdate := []neutrino.UpdateOption{ neutrino.AddTxIDs(*msg.TxID), neutrino.Rewind(currentHeight), } - if err := n.chainView.Update(rescanUpdate...); err != nil { - chainntnfs.Log.Errorf("unable to update rescan: %v", err) + err = n.chainView.Update(rescanUpdate...) + if err != nil { + chainntnfs.Log.Errorf("Unable "+ + "to update rescan: %v", + err) } - } - - err = n.txConfNotifier.Register(&msg.ConfNtfn, txConf) - if err != nil { - chainntnfs.Log.Error(err) - } + }() case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") @@ -401,6 +418,14 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, // Starting from the height hint, we'll walk forwards in the chain to // see if this transaction has already been confirmed. for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-n.quit: + return nil, ErrChainNotifierShuttingDown + default: + } + // First, we'll fetch the block header for this height so we // can compute the current block hash. header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight) @@ -705,11 +730,15 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, heightHint: heightHint, } + if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + return nil, err + } + select { - case <-n.quit: - return nil, ErrChainNotifierShuttingDown case n.notificationRegistry <- ntfn: return ntfn.Event, nil + case <-n.quit: + return nil, ErrChainNotifierShuttingDown } } From 65b6257e1e19c03fcb20e0b3b1894018b8831ba7 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 26 Jul 2018 21:33:32 -0700 Subject: [PATCH 5/6] chainntnfs/bitcoindnotify: make historical spend rescans async --- chainntnfs/bitcoindnotify/bitcoind.go | 128 ++++++++++++++++++-------- 1 file changed, 89 insertions(+), 39 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 6e3981cf..452a9025 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -19,7 +19,6 @@ import ( ) const ( - // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "bitcoind" @@ -35,6 +34,11 @@ var ( // measure a spend notification when notifier is already stopped. ErrChainNotifierShuttingDown = errors.New("chainntnfs: system interrupt " + "while attempting to register for spend notification.") + + // ErrTransactionNotFound is an error returned when we attempt to find a + // transaction by manually scanning the chain within a specific range + // but it is not found. + ErrTransactionNotFound = errors.New("transaction not found within range") ) // chainUpdate encapsulates an update to the current main chain. This struct is @@ -237,7 +241,7 @@ out: b.spendNotifications[op] = make(map[uint64]*spendNotification) } b.spendNotifications[op][msg.spendID] = msg - b.chainConn.NotifySpent([]*wire.OutPoint{&op}) + case *confirmationNotification: chainntnfs.Log.Infof("New confirmation "+ "subscription: txid=%v, numconfs=%v", @@ -654,43 +658,22 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, err } - out: - for i := startHeight; i <= endHeight; i++ { - blockHash, err := b.chainConn.GetBlockHash(int64(i)) + // In order to ensure we don't block the caller on what + // may be a long rescan, we'll launch a goroutine to do + // so in the background. + b.wg.Add(1) + go func() { + defer b.wg.Done() + + err := b.dispatchSpendDetailsManually( + *outpoint, startHeight, endHeight, + ) if err != nil { - return nil, err + chainntnfs.Log.Errorf("Rescan for spend "+ + "notification txout(%x) "+ + "failed: %v", outpoint, err) } - block, err := b.chainConn.GetBlock(blockHash) - if err != nil { - return nil, err - } - for _, tx := range block.Transactions { - for _, in := range tx.TxIn { - if in.PreviousOutPoint == *outpoint { - relTx := chain.RelevantTx{ - TxRecord: &wtxmgr.TxRecord{ - MsgTx: *tx, - Hash: tx.TxHash(), - Received: block.Header.Timestamp, - }, - Block: &wtxmgr.BlockMeta{ - Block: wtxmgr.Block{ - Hash: block.BlockHash(), - Height: i, - }, - Time: block.Header.Timestamp, - }, - } - select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown - case b.notificationRegistry <- relTx: - } - break out - } - } - } - } + }() } } @@ -705,8 +688,9 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // Submit spend cancellation to notification dispatcher. select { case b.notificationCancels <- cancel: - // Cancellation is being handled, drain the spend chan until it is - // closed before yielding to the caller. + // Cancellation is being handled, drain the + // spend chan until it is closed before yielding + // to the caller. for { select { case _, ok := <-ntfn.spendChan: @@ -723,6 +707,72 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, }, nil } +// disaptchSpendDetailsManually attempts to manually scan the chain within the +// given height range for a transaction that spends the given outpoint. If one +// is found, it's spending details are sent to the notifier dispatcher, which +// will then dispatch the notification to all of its clients. +func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint, + startHeight, endHeight int32) error { + + // Begin scanning blocks at every height to determine if the outpoint + // was spent. + for height := startHeight; height <= endHeight; height++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-b.quit: + return ErrChainNotifierShuttingDown + default: + } + + blockHash, err := b.chainConn.GetBlockHash(int64(height)) + if err != nil { + return err + } + block, err := b.chainConn.GetBlock(blockHash) + if err != nil { + return err + } + + for _, tx := range block.Transactions { + for _, in := range tx.TxIn { + if in.PreviousOutPoint != op { + continue + } + + // If this transaction input spends the + // outpoint, we'll gather the details of the + // spending transaction and dispatch a spend + // notification to our clients. + relTx := chain.RelevantTx{ + TxRecord: &wtxmgr.TxRecord{ + MsgTx: *tx, + Hash: tx.TxHash(), + Received: block.Header.Timestamp, + }, + Block: &wtxmgr.BlockMeta{ + Block: wtxmgr.Block{ + Hash: *blockHash, + Height: height, + }, + Time: block.Header.Timestamp, + }, + } + + select { + case b.notificationRegistry <- relTx: + case <-b.quit: + return ErrChainNotifierShuttingDown + } + + return nil + } + } + } + + return ErrTransactionNotFound +} + // confirmationNotification represents a client's intent to receive a // notification once the target txid reaches numConfirmations confirmations. type confirmationNotification struct { From 8f5f3bcab25ba987ea70e9a5ede70cd42df1f7e8 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 26 Jul 2018 21:34:43 -0700 Subject: [PATCH 6/6] chainntnfs/interface_test: remove code found within helper functions --- chainntnfs/interface_test.go | 61 ++---------------------------------- 1 file changed, 3 insertions(+), 58 deletions(-) diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 90f31a01..c9c83c20 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -867,69 +867,14 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, // concrete implementations. // // To do so, we first create a new output to our test target address. - txid, err := getTestTxId(miner) - if err != nil { - t.Fatalf("unable to create test addr: %v", err) - } + outpoint, pkScript := createSpendableOutput(miner, t) - err = waitForMempoolTx(miner, txid) - if err != nil { - t.Fatalf("tx not relayed to miner: %v", err) - } - - // Mine a single block which should include that txid above. - if _, err := miner.Node.Generate(1); err != nil { - t.Fatalf("unable to generate single block: %v", err) - } - - // Now that we have the txid, fetch the transaction itself. - wrappedTx, err := miner.Node.GetRawTransaction(txid) - if err != nil { - t.Fatalf("unable to get new tx: %v", err) - } - tx := wrappedTx.MsgTx() - - // Locate the output index sent to us. We need this so we can construct - // a spending txn below. - outIndex := -1 - var pkScript []byte - for i, txOut := range tx.TxOut { - if bytes.Contains(txOut.PkScript, testAddr.ScriptAddress()) { - pkScript = txOut.PkScript - outIndex = i - break - } - } - if outIndex == -1 { - t.Fatalf("unable to locate new output") - } - - // Now that we've found the output index, register for a spentness - // notification for the newly created output. - outpoint := wire.NewOutPoint(txid, uint32(outIndex)) - - // Next, create a new transaction spending that output. - spendingTx := wire.NewMsgTx(1) - spendingTx.AddTxIn(&wire.TxIn{ - PreviousOutPoint: *outpoint, - }) - spendingTx.AddTxOut(&wire.TxOut{ - Value: 1e8, - PkScript: pkScript, - }) - sigScript, err := txscript.SignatureScript(spendingTx, 0, pkScript, - txscript.SigHashAll, privKey, true) - if err != nil { - t.Fatalf("unable to sign tx: %v", err) - } - spendingTx.TxIn[0].SignatureScript = sigScript - - // Broadcast our spending transaction. + // We'll then spend this output and broadcast the spend transaction. + spendingTx := createSpendTx(outpoint, pkScript, t) spenderSha, err := miner.Node.SendRawTransaction(spendingTx, true) if err != nil { t.Fatalf("unable to broadcast tx: %v", err) } - err = waitForMempoolTx(miner, spenderSha) if err != nil { t.Fatalf("tx not relayed to miner: %v", err)