From 867d8524bf6861f5f9c44ba891aab5158a709d6f Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 26 Jul 2018 21:31:32 -0700 Subject: [PATCH] chainntnfs/txconfnotifier: make confirmation notifcation registration async In this commit, we modify our TxConfNotifier struct to allow handling notification registrations asynchronously. The Register method has been refactored into two: Register and UpdateConfDetails. In the case that a transaction we registered for notifications on has already confirmed, we'll need to determine its confirmation details on our own. Once done, this can be provided within UpdateConfDetails. This change will pave down the road for our different chain notifiers to handle potentially long rescans asynchronously to prevent blocking the caller. --- chainntnfs/txconfnotifier.go | 133 ++++++++++++++++++++++-------- chainntnfs/txconfnotifier_test.go | 89 +++++++++++++------- 2 files changed, 158 insertions(+), 64 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 3fe08e40..bb049b8a 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -3,6 +3,7 @@ package chainntnfs import ( "errors" "fmt" + "sync" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcutil" @@ -91,6 +92,8 @@ type TxConfNotifier struct { // quit is closed in order to signal that the notifier is gracefully // exiting. quit chan struct{} + + sync.Mutex } // NewTxConfNotifier creates a TxConfNotifier. The current height of the @@ -108,27 +111,83 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif // Register handles a new notification request. The client will be notified when // the transaction gets a sufficient number of confirmations on the blockchain. -// If the transaction has already been included in a block on the chain, the -// confirmation details must be given as the txConf argument, otherwise it -// should be nil. If the transaction already has the sufficient number of -// confirmations, this dispatches the notification immediately. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error { +// +// NOTE: If the transaction has already been included in a block on the chain, +// the confirmation details must be provided with the UpdateConfDetails method, +// otherwise we will wait for the transaction to confirm even though it already +// has. +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } - if txConf == nil || txConf.BlockHeight > tcn.currentHeight { - // Transaction is unconfirmed. - tcn.confNotifications[*ntfn.TxID] = - append(tcn.confNotifications[*ntfn.TxID], ntfn) + tcn.Lock() + defer tcn.Unlock() + + ntfns, ok := tcn.confNotifications[*ntfn.TxID] + if !ok { + ntfns = make(map[uint64]*ConfNtfn) + tcn.confNotifications[*ntfn.TxID] = ntfns + } + + ntfns[ntfn.ConfID] = ntfn + + return nil +} + +// UpdateConfDetails attempts to update the confirmation details for an active +// notification within the notifier. This should only be used in the case of a +// transaction that has confirmed before the notifier's current height. +// +// NOTE: The notification should be registered first to ensure notifications are +// dispatched correctly. +func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, + clientID uint64, details *TxConfirmation) error { + + select { + case <-tcn.quit: + return ErrTxConfNotifierExiting + default: + } + + // Ensure we hold the lock throughout handling the notification to + // prevent the notifier from advancing its height underneath us. + tcn.Lock() + defer tcn.Unlock() + + // First, we'll determine whether we have an active notification for + // this transaction with the given ID. + ntfns, ok := tcn.confNotifications[txid] + if !ok { + return fmt.Errorf("no notifications found for txid %v", txid) + } + + ntfn, ok := ntfns[clientID] + if !ok { + return fmt.Errorf("no notification found with ID %v", clientID) + } + + // If the notification has already recognized that the transaction + // confirmed, there's nothing left for us to do. + if ntfn.details != nil { return nil } - // If the transaction already has the required confirmations, we'll - // dispatch the notification immediately. - confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 + // The notifier has yet to reach the height at which the transaction was + // included in a block, so we should defer until handling it then within + // ConnectTip. + if details == nil || details.BlockHeight > tcn.currentHeight { + return nil + } + + ntfn.details = details + + // Now, we'll examine whether the transaction of this notification + // request has reched its required number of confirmations. If it has, + // we'll disaptch a confirmation notification to the caller. + confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) @@ -136,21 +195,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro // We'll send a 0 value to the Updates channel, indicating that // the transaction has already been confirmed. select { - case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") case ntfn.Event.Updates <- 0: + case <-tcn.quit: + return ErrTxConfNotifierExiting } select { - case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") - case ntfn.Event.Confirmed <- txConf: + case ntfn.Event.Confirmed <- details: ntfn.dispatched = true + case <-tcn.quit: + return ErrTxConfNotifierExiting } } else { - // Otherwise, we'll record the transaction along with the height - // at which we should notify the client. - ntfn.details = txConf + // Otherwise, we'll keep track of the notification request by + // the height at which we should dispatch the confirmation + // notification. ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { ntfnSet = make(map[*ConfNtfn]struct{}) @@ -164,22 +223,19 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro select { case ntfn.Event.Updates <- numConfsLeft: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } } // As a final check, we'll also watch the transaction if it's still - // possible for it to get reorganized out of the chain. - if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { - tcn.confNotifications[*ntfn.TxID] = - append(tcn.confNotifications[*ntfn.TxID], ntfn) - - txSet, exists := tcn.txsByInitialHeight[txConf.BlockHeight] + // possible for it to get reorged out of the chain. + if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { + txSet, exists := tcn.txsByInitialHeight[details.BlockHeight] if !exists { txSet = make(map[chainhash.Hash]struct{}) - tcn.txsByInitialHeight[txConf.BlockHeight] = txSet + tcn.txsByInitialHeight[details.BlockHeight] = txSet } - txSet[*ntfn.TxID] = struct{}{} + txSet[txid] = struct{}{} } return nil @@ -199,6 +255,9 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, default: } + tcn.Lock() + defer tcn.Unlock() + if blockHeight != tcn.currentHeight+1 { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, new height=%d", @@ -244,8 +303,10 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, for _, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { for _, ntfn := range tcn.confNotifications[txHash] { - // If the transaction still hasn't been included - // in a block, we'll skip it. + // If the notification hasn't learned about the + // confirmation of its transaction yet (in the + // case of historical confirmations), we'll skip + // it. if ntfn.details == nil { continue } @@ -308,10 +369,13 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } + tcn.Lock() + defer tcn.Unlock() + if blockHeight != tcn.currentHeight { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, disconnected height=%d", @@ -394,6 +458,9 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // This closes the event channels of all registered notifications that have // not been dispatched yet. func (tcn *TxConfNotifier) TearDown() { + tcn.Lock() + defer tcn.Unlock() + close(tcn.quit) for _, ntfns := range tcn.confNotifications { diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 96b7d145..2a57245a 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -3,10 +3,10 @@ package chainntnfs_test import ( "testing" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/chainntnfs" ) var zeroHash chainhash.Hash @@ -38,7 +38,9 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -46,7 +48,9 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // We should not receive any notifications from both transactions // since they have not been included in a block yet. @@ -202,32 +206,37 @@ func TestTxConfHistoricalDispatch(t *testing.T) { // starting height so that they are confirmed once registering them. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ + ConfID: 0, TxID: &tx1Hash, NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + ConfID: 1, + TxID: &tx2Hash, + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), + } + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + // Update tx1 with its confirmation details. We should only receive one + // update since it only requires one confirmation and it already met it. txConf1 := chainntnfs.TxConfirmation{ BlockHash: &zeroHash, BlockHeight: 9, TxIndex: 1, } - txConfNotifier.Register(&ntfn1, &txConf1) - - tx2Hash := tx2.TxHash() - txConf2 := chainntnfs.TxConfirmation{ - BlockHash: &zeroHash, - BlockHeight: 9, - TxIndex: 2, + err := txConfNotifier.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1) + if err != nil { + t.Fatalf("unable to update conf details: %v", err) } - ntfn2 := chainntnfs.ConfNtfn{ - TxID: &tx2Hash, - NumConfirmations: tx2NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), - } - txConfNotifier.Register(&ntfn2, &txConf2) - - // We should only receive one update for tx1 since it only requires - // one confirmation and it already met it. select { case numConfsLeft := <-ntfn1.Event.Updates: const expected = 0 @@ -240,8 +249,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatal("Expected confirmation update for tx1") } - // A confirmation notification for tx1 should be dispatched, as it met - // its required number of confirmations. + // A confirmation notification for tx1 should also be dispatched. select { case txConf := <-ntfn1.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf1) @@ -249,8 +257,19 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } - // We should only receive one update indicating how many confirmations - // are left for the transaction to be confirmed. + // Update tx2 with its confirmation details. This should not trigger a + // confirmation notification since it hasn't reached its required number + // of confirmations, but we should receive a confirmation update + // indicating how many confirmation are left. + txConf2 := chainntnfs.TxConfirmation{ + BlockHash: &zeroHash, + BlockHeight: 9, + TxIndex: 2, + } + err = txConfNotifier.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2) + if err != nil { + t.Fatalf("unable to update conf details: %v", err) + } select { case numConfsLeft := <-ntfn2.Event.Updates: const expected = 1 @@ -263,8 +282,6 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatal("Expected confirmation update for tx2") } - // A confirmation notification for tx2 should not be dispatched yet, as - // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) @@ -277,7 +294,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx3}, }) - err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + err = txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -343,7 +360,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Tx 2 will be confirmed in block 10 and requires 1 conf. tx2Hash := tx2.TxHash() @@ -352,7 +371,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Tx 3 will be confirmed in block 10 and requires 2 confs. tx3Hash := tx3.TxHash() @@ -361,7 +382,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx3NumConfs, Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), } - txConfNotifier.Register(&ntfn3, nil) + if err := txConfNotifier.Register(&ntfn3); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Sync chain to block 10. Txs 1 & 2 should be confirmed. block1 := btcutil.NewBlock(&wire.MsgBlock{ @@ -581,7 +604,9 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 1, Event: chainntnfs.NewConfirmationEvent(1), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -589,7 +614,9 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 2, Event: chainntnfs.NewConfirmationEvent(2), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Include the transactions in a block and add it to the TxConfNotifier. // This should confirm tx1, but not tx2.