diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 3fe08e40..bb049b8a 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -3,6 +3,7 @@ package chainntnfs import ( "errors" "fmt" + "sync" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcutil" @@ -91,6 +92,8 @@ type TxConfNotifier struct { // quit is closed in order to signal that the notifier is gracefully // exiting. quit chan struct{} + + sync.Mutex } // NewTxConfNotifier creates a TxConfNotifier. The current height of the @@ -108,27 +111,83 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif // Register handles a new notification request. The client will be notified when // the transaction gets a sufficient number of confirmations on the blockchain. -// If the transaction has already been included in a block on the chain, the -// confirmation details must be given as the txConf argument, otherwise it -// should be nil. If the transaction already has the sufficient number of -// confirmations, this dispatches the notification immediately. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error { +// +// NOTE: If the transaction has already been included in a block on the chain, +// the confirmation details must be provided with the UpdateConfDetails method, +// otherwise we will wait for the transaction to confirm even though it already +// has. +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } - if txConf == nil || txConf.BlockHeight > tcn.currentHeight { - // Transaction is unconfirmed. - tcn.confNotifications[*ntfn.TxID] = - append(tcn.confNotifications[*ntfn.TxID], ntfn) + tcn.Lock() + defer tcn.Unlock() + + ntfns, ok := tcn.confNotifications[*ntfn.TxID] + if !ok { + ntfns = make(map[uint64]*ConfNtfn) + tcn.confNotifications[*ntfn.TxID] = ntfns + } + + ntfns[ntfn.ConfID] = ntfn + + return nil +} + +// UpdateConfDetails attempts to update the confirmation details for an active +// notification within the notifier. This should only be used in the case of a +// transaction that has confirmed before the notifier's current height. +// +// NOTE: The notification should be registered first to ensure notifications are +// dispatched correctly. +func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, + clientID uint64, details *TxConfirmation) error { + + select { + case <-tcn.quit: + return ErrTxConfNotifierExiting + default: + } + + // Ensure we hold the lock throughout handling the notification to + // prevent the notifier from advancing its height underneath us. + tcn.Lock() + defer tcn.Unlock() + + // First, we'll determine whether we have an active notification for + // this transaction with the given ID. + ntfns, ok := tcn.confNotifications[txid] + if !ok { + return fmt.Errorf("no notifications found for txid %v", txid) + } + + ntfn, ok := ntfns[clientID] + if !ok { + return fmt.Errorf("no notification found with ID %v", clientID) + } + + // If the notification has already recognized that the transaction + // confirmed, there's nothing left for us to do. + if ntfn.details != nil { return nil } - // If the transaction already has the required confirmations, we'll - // dispatch the notification immediately. - confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 + // The notifier has yet to reach the height at which the transaction was + // included in a block, so we should defer until handling it then within + // ConnectTip. + if details == nil || details.BlockHeight > tcn.currentHeight { + return nil + } + + ntfn.details = details + + // Now, we'll examine whether the transaction of this notification + // request has reched its required number of confirmations. If it has, + // we'll disaptch a confirmation notification to the caller. + confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) @@ -136,21 +195,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro // We'll send a 0 value to the Updates channel, indicating that // the transaction has already been confirmed. select { - case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") case ntfn.Event.Updates <- 0: + case <-tcn.quit: + return ErrTxConfNotifierExiting } select { - case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") - case ntfn.Event.Confirmed <- txConf: + case ntfn.Event.Confirmed <- details: ntfn.dispatched = true + case <-tcn.quit: + return ErrTxConfNotifierExiting } } else { - // Otherwise, we'll record the transaction along with the height - // at which we should notify the client. - ntfn.details = txConf + // Otherwise, we'll keep track of the notification request by + // the height at which we should dispatch the confirmation + // notification. ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { ntfnSet = make(map[*ConfNtfn]struct{}) @@ -164,22 +223,19 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro select { case ntfn.Event.Updates <- numConfsLeft: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } } // As a final check, we'll also watch the transaction if it's still - // possible for it to get reorganized out of the chain. - if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { - tcn.confNotifications[*ntfn.TxID] = - append(tcn.confNotifications[*ntfn.TxID], ntfn) - - txSet, exists := tcn.txsByInitialHeight[txConf.BlockHeight] + // possible for it to get reorged out of the chain. + if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { + txSet, exists := tcn.txsByInitialHeight[details.BlockHeight] if !exists { txSet = make(map[chainhash.Hash]struct{}) - tcn.txsByInitialHeight[txConf.BlockHeight] = txSet + tcn.txsByInitialHeight[details.BlockHeight] = txSet } - txSet[*ntfn.TxID] = struct{}{} + txSet[txid] = struct{}{} } return nil @@ -199,6 +255,9 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, default: } + tcn.Lock() + defer tcn.Unlock() + if blockHeight != tcn.currentHeight+1 { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, new height=%d", @@ -244,8 +303,10 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, for _, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { for _, ntfn := range tcn.confNotifications[txHash] { - // If the transaction still hasn't been included - // in a block, we'll skip it. + // If the notification hasn't learned about the + // confirmation of its transaction yet (in the + // case of historical confirmations), we'll skip + // it. if ntfn.details == nil { continue } @@ -308,10 +369,13 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } + tcn.Lock() + defer tcn.Unlock() + if blockHeight != tcn.currentHeight { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, disconnected height=%d", @@ -394,6 +458,9 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // This closes the event channels of all registered notifications that have // not been dispatched yet. func (tcn *TxConfNotifier) TearDown() { + tcn.Lock() + defer tcn.Unlock() + close(tcn.quit) for _, ntfns := range tcn.confNotifications { diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 96b7d145..2a57245a 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -3,10 +3,10 @@ package chainntnfs_test import ( "testing" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/chainntnfs" ) var zeroHash chainhash.Hash @@ -38,7 +38,9 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -46,7 +48,9 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // We should not receive any notifications from both transactions // since they have not been included in a block yet. @@ -202,32 +206,37 @@ func TestTxConfHistoricalDispatch(t *testing.T) { // starting height so that they are confirmed once registering them. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ + ConfID: 0, TxID: &tx1Hash, NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + ConfID: 1, + TxID: &tx2Hash, + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), + } + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + // Update tx1 with its confirmation details. We should only receive one + // update since it only requires one confirmation and it already met it. txConf1 := chainntnfs.TxConfirmation{ BlockHash: &zeroHash, BlockHeight: 9, TxIndex: 1, } - txConfNotifier.Register(&ntfn1, &txConf1) - - tx2Hash := tx2.TxHash() - txConf2 := chainntnfs.TxConfirmation{ - BlockHash: &zeroHash, - BlockHeight: 9, - TxIndex: 2, + err := txConfNotifier.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1) + if err != nil { + t.Fatalf("unable to update conf details: %v", err) } - ntfn2 := chainntnfs.ConfNtfn{ - TxID: &tx2Hash, - NumConfirmations: tx2NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), - } - txConfNotifier.Register(&ntfn2, &txConf2) - - // We should only receive one update for tx1 since it only requires - // one confirmation and it already met it. select { case numConfsLeft := <-ntfn1.Event.Updates: const expected = 0 @@ -240,8 +249,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatal("Expected confirmation update for tx1") } - // A confirmation notification for tx1 should be dispatched, as it met - // its required number of confirmations. + // A confirmation notification for tx1 should also be dispatched. select { case txConf := <-ntfn1.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf1) @@ -249,8 +257,19 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } - // We should only receive one update indicating how many confirmations - // are left for the transaction to be confirmed. + // Update tx2 with its confirmation details. This should not trigger a + // confirmation notification since it hasn't reached its required number + // of confirmations, but we should receive a confirmation update + // indicating how many confirmation are left. + txConf2 := chainntnfs.TxConfirmation{ + BlockHash: &zeroHash, + BlockHeight: 9, + TxIndex: 2, + } + err = txConfNotifier.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2) + if err != nil { + t.Fatalf("unable to update conf details: %v", err) + } select { case numConfsLeft := <-ntfn2.Event.Updates: const expected = 1 @@ -263,8 +282,6 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatal("Expected confirmation update for tx2") } - // A confirmation notification for tx2 should not be dispatched yet, as - // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) @@ -277,7 +294,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx3}, }) - err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + err = txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -343,7 +360,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Tx 2 will be confirmed in block 10 and requires 1 conf. tx2Hash := tx2.TxHash() @@ -352,7 +371,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Tx 3 will be confirmed in block 10 and requires 2 confs. tx3Hash := tx3.TxHash() @@ -361,7 +382,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx3NumConfs, Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), } - txConfNotifier.Register(&ntfn3, nil) + if err := txConfNotifier.Register(&ntfn3); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Sync chain to block 10. Txs 1 & 2 should be confirmed. block1 := btcutil.NewBlock(&wire.MsgBlock{ @@ -581,7 +604,9 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 1, Event: chainntnfs.NewConfirmationEvent(1), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -589,7 +614,9 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 2, Event: chainntnfs.NewConfirmationEvent(2), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Include the transactions in a block and add it to the TxConfNotifier. // This should confirm tx1, but not tx2.