diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 2139e1f6..452a9025 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -19,7 +19,6 @@ import ( ) const ( - // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "bitcoind" @@ -35,6 +34,11 @@ var ( // measure a spend notification when notifier is already stopped. ErrChainNotifierShuttingDown = errors.New("chainntnfs: system interrupt " + "while attempting to register for spend notification.") + + // ErrTransactionNotFound is an error returned when we attempt to find a + // transaction by manually scanning the chain within a specific range + // but it is not found. + ErrTransactionNotFound = errors.New("transaction not found within range") ) // chainUpdate encapsulates an update to the current main chain. This struct is @@ -54,6 +58,7 @@ type chainUpdate struct { // chain client. Multiple concurrent clients are supported. All notifications // are achieved via non-blocking sends on client channels. type BitcoindNotifier struct { + confClientCounter uint64 // To be used atomically. spendClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically. @@ -236,33 +241,46 @@ out: b.spendNotifications[op] = make(map[uint64]*spendNotification) } b.spendNotifications[op][msg.spendID] = msg - b.chainConn.NotifySpent([]*wire.OutPoint{&op}) + case *confirmationNotification: chainntnfs.Log.Infof("New confirmation "+ "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - _, currentHeight, err := b.chainConn.GetBestBlock() - if err != nil { - chainntnfs.Log.Error(err) - } + currentHeight := uint32(bestHeight) - // Lookup whether the transaction is already included in the - // active chain. - txConf, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, uint32(currentHeight), - ) - if err != nil { - chainntnfs.Log.Error(err) - } + // Look up whether the transaction is already + // included in the active chain. We'll do this + // in a goroutine to prevent blocking + // potentially long rescans. + b.wg.Add(1) + go func() { + defer b.wg.Done() + + confDetails, err := b.historicalConfDetails( + msg.TxID, msg.heightHint, + currentHeight, + ) + if err != nil { + chainntnfs.Log.Error(err) + return + } + + if confDetails != nil { + err := b.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + } + }() - err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) - if err != nil { - chainntnfs.Log.Error(err) - } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + case chain.RelevantTx: b.handleRelevantTx(msg, bestHeight) } @@ -473,6 +491,14 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash, // Begin scanning blocks at every height to determine where the // transaction was included in. for height := heightHint; height <= currentHeight; height++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + default: + } + blockHash, err := b.chainConn.GetBlockHash(int64(height)) if err != nil { return nil, fmt.Errorf("unable to get hash from block "+ @@ -632,43 +658,22 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, err } - out: - for i := startHeight; i <= endHeight; i++ { - blockHash, err := b.chainConn.GetBlockHash(int64(i)) + // In order to ensure we don't block the caller on what + // may be a long rescan, we'll launch a goroutine to do + // so in the background. + b.wg.Add(1) + go func() { + defer b.wg.Done() + + err := b.dispatchSpendDetailsManually( + *outpoint, startHeight, endHeight, + ) if err != nil { - return nil, err + chainntnfs.Log.Errorf("Rescan for spend "+ + "notification txout(%x) "+ + "failed: %v", outpoint, err) } - block, err := b.chainConn.GetBlock(blockHash) - if err != nil { - return nil, err - } - for _, tx := range block.Transactions { - for _, in := range tx.TxIn { - if in.PreviousOutPoint == *outpoint { - relTx := chain.RelevantTx{ - TxRecord: &wtxmgr.TxRecord{ - MsgTx: *tx, - Hash: tx.TxHash(), - Received: block.Header.Timestamp, - }, - Block: &wtxmgr.BlockMeta{ - Block: wtxmgr.Block{ - Hash: block.BlockHash(), - Height: i, - }, - Time: block.Header.Timestamp, - }, - } - select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown - case b.notificationRegistry <- relTx: - } - break out - } - } - } - } + }() } } @@ -683,8 +688,9 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // Submit spend cancellation to notification dispatcher. select { case b.notificationCancels <- cancel: - // Cancellation is being handled, drain the spend chan until it is - // closed before yielding to the caller. + // Cancellation is being handled, drain the + // spend chan until it is closed before yielding + // to the caller. for { select { case _, ok := <-ntfn.spendChan: @@ -701,6 +707,72 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, }, nil } +// disaptchSpendDetailsManually attempts to manually scan the chain within the +// given height range for a transaction that spends the given outpoint. If one +// is found, it's spending details are sent to the notifier dispatcher, which +// will then dispatch the notification to all of its clients. +func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint, + startHeight, endHeight int32) error { + + // Begin scanning blocks at every height to determine if the outpoint + // was spent. + for height := startHeight; height <= endHeight; height++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-b.quit: + return ErrChainNotifierShuttingDown + default: + } + + blockHash, err := b.chainConn.GetBlockHash(int64(height)) + if err != nil { + return err + } + block, err := b.chainConn.GetBlock(blockHash) + if err != nil { + return err + } + + for _, tx := range block.Transactions { + for _, in := range tx.TxIn { + if in.PreviousOutPoint != op { + continue + } + + // If this transaction input spends the + // outpoint, we'll gather the details of the + // spending transaction and dispatch a spend + // notification to our clients. + relTx := chain.RelevantTx{ + TxRecord: &wtxmgr.TxRecord{ + MsgTx: *tx, + Hash: tx.TxHash(), + Received: block.Header.Timestamp, + }, + Block: &wtxmgr.BlockMeta{ + Block: wtxmgr.Block{ + Hash: *blockHash, + Height: height, + }, + Time: block.Header.Timestamp, + }, + } + + select { + case b.notificationRegistry <- relTx: + case <-b.quit: + return ErrChainNotifierShuttingDown + } + + return nil + } + } + } + + return ErrTransactionNotFound +} + // confirmationNotification represents a client's intent to receive a // notification once the target txid reaches numConfirmations confirmations. type confirmationNotification struct { @@ -716,6 +788,7 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, ntfn := &confirmationNotification{ ConfNtfn: chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&b.confClientCounter, 1), TxID: txid, NumConfirmations: numConfs, Event: chainntnfs.NewConfirmationEvent(numConfs), @@ -723,11 +796,15 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, heightHint: heightHint, } + if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + return nil, err + } + select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown case b.notificationRegistry <- ntfn: return ntfn.Event, nil + case <-b.quit: + return nil, ErrChainNotifierShuttingDown } } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 371be550..fa4f1826 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -61,6 +61,7 @@ type txUpdate struct { // notifications. Multiple concurrent clients are supported. All notifications // are achieved via non-blocking sends on client channels. type BtcdNotifier struct { + confClientCounter uint64 // To be used aotmically. spendClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically. @@ -298,24 +299,42 @@ out: b.spendNotifications[op] = make(map[uint64]*spendNotification) } b.spendNotifications[op][msg.spendID] = msg + case *confirmationNotification: chainntnfs.Log.Infof("New confirmation "+ "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - // Lookup whether the transaction is already included in the - // active chain. - txConf, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, uint32(currentHeight), - ) - if err != nil { - chainntnfs.Log.Error(err) - } + bestHeight := uint32(currentHeight) + + // Look up whether the transaction is already + // included in the active chain. We'll do this + // in a goroutine to prevent blocking + // potentially long rescans. + b.wg.Add(1) + go func() { + defer b.wg.Done() + + confDetails, err := b.historicalConfDetails( + msg.TxID, msg.heightHint, + bestHeight, + ) + if err != nil { + chainntnfs.Log.Error(err) + return + } + + if confDetails != nil { + err = b.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + } + }() - err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) - if err != nil { - chainntnfs.Log.Error(err) - } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg @@ -532,6 +551,14 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, // Begin scanning blocks at every height to determine where the // transaction was included in. for height := heightHint; height <= currentHeight; height++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + default: + } + blockHash, err := b.chainConn.GetBlockHash(int64(height)) if err != nil { return nil, fmt.Errorf("unable to get hash from block "+ @@ -800,6 +827,7 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, ntfn := &confirmationNotification{ ConfNtfn: chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&b.confClientCounter, 1), TxID: txid, NumConfirmations: numConfs, Event: chainntnfs.NewConfirmationEvent(numConfs), @@ -807,11 +835,15 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, heightHint: heightHint, } + if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + return nil, err + } + select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown case b.notificationRegistry <- ntfn: return ntfn.Event, nil + case <-b.quit: + return nil, ErrChainNotifierShuttingDown } } diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 90f31a01..c9c83c20 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -867,69 +867,14 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, // concrete implementations. // // To do so, we first create a new output to our test target address. - txid, err := getTestTxId(miner) - if err != nil { - t.Fatalf("unable to create test addr: %v", err) - } + outpoint, pkScript := createSpendableOutput(miner, t) - err = waitForMempoolTx(miner, txid) - if err != nil { - t.Fatalf("tx not relayed to miner: %v", err) - } - - // Mine a single block which should include that txid above. - if _, err := miner.Node.Generate(1); err != nil { - t.Fatalf("unable to generate single block: %v", err) - } - - // Now that we have the txid, fetch the transaction itself. - wrappedTx, err := miner.Node.GetRawTransaction(txid) - if err != nil { - t.Fatalf("unable to get new tx: %v", err) - } - tx := wrappedTx.MsgTx() - - // Locate the output index sent to us. We need this so we can construct - // a spending txn below. - outIndex := -1 - var pkScript []byte - for i, txOut := range tx.TxOut { - if bytes.Contains(txOut.PkScript, testAddr.ScriptAddress()) { - pkScript = txOut.PkScript - outIndex = i - break - } - } - if outIndex == -1 { - t.Fatalf("unable to locate new output") - } - - // Now that we've found the output index, register for a spentness - // notification for the newly created output. - outpoint := wire.NewOutPoint(txid, uint32(outIndex)) - - // Next, create a new transaction spending that output. - spendingTx := wire.NewMsgTx(1) - spendingTx.AddTxIn(&wire.TxIn{ - PreviousOutPoint: *outpoint, - }) - spendingTx.AddTxOut(&wire.TxOut{ - Value: 1e8, - PkScript: pkScript, - }) - sigScript, err := txscript.SignatureScript(spendingTx, 0, pkScript, - txscript.SigHashAll, privKey, true) - if err != nil { - t.Fatalf("unable to sign tx: %v", err) - } - spendingTx.TxIn[0].SignatureScript = sigScript - - // Broadcast our spending transaction. + // We'll then spend this output and broadcast the spend transaction. + spendingTx := createSpendTx(outpoint, pkScript, t) spenderSha, err := miner.Node.SendRawTransaction(spendingTx, true) if err != nil { t.Fatalf("unable to broadcast tx: %v", err) } - err = waitForMempoolTx(miner, spenderSha) if err != nil { t.Fatalf("tx not relayed to miner: %v", err) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 2ab2ad05..7d4dc5d0 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -48,12 +48,13 @@ var ( // TODO(roasbeef): heavily consolidate with NeutrinoNotifier code // * maybe combine into single package? type NeutrinoNotifier struct { - started int32 // To be used atomically. - stopped int32 // To be used atomically. - + confClientCounter uint64 // To be used atomically. spendClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically. + started int32 // To be used atomically. + stopped int32 // To be used atomically. + heightMtx sync.RWMutex bestHeight uint32 @@ -306,31 +307,48 @@ func (n *NeutrinoNotifier) notificationDispatcher() { currentHeight := n.bestHeight n.heightMtx.RUnlock() - // Lookup whether the transaction is already included in the - // active chain. - txConf, err := n.historicalConfDetails(msg.TxID, currentHeight, - msg.heightHint) - if err != nil { - chainntnfs.Log.Error(err) - } + // Look up whether the transaction is already + // included in the active chain. We'll do this + // in a goroutine to prevent blocking + // potentially long rescans. + n.wg.Add(1) + go func() { + defer n.wg.Done() - if txConf == nil { - // If we can't fully dispatch confirmation, - // then we'll update our filter so we can be - // notified of its future initial confirmation. + confDetails, err := n.historicalConfDetails( + msg.TxID, currentHeight, + msg.heightHint, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + + if confDetails != nil { + err := n.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + return + } + + // If we can't fully dispatch + // confirmation, then we'll update our + // filter so we can be notified of its + // future initial confirmation. rescanUpdate := []neutrino.UpdateOption{ neutrino.AddTxIDs(*msg.TxID), neutrino.Rewind(currentHeight), } - if err := n.chainView.Update(rescanUpdate...); err != nil { - chainntnfs.Log.Errorf("unable to update rescan: %v", err) + err = n.chainView.Update(rescanUpdate...) + if err != nil { + chainntnfs.Log.Errorf("Unable "+ + "to update rescan: %v", + err) } - } - - err = n.txConfNotifier.Register(&msg.ConfNtfn, txConf) - if err != nil { - chainntnfs.Log.Error(err) - } + }() case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") @@ -400,6 +418,14 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, // Starting from the height hint, we'll walk forwards in the chain to // see if this transaction has already been confirmed. for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-n.quit: + return nil, ErrChainNotifierShuttingDown + default: + } + // First, we'll fetch the block header for this height so we // can compute the current block hash. header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight) @@ -696,6 +722,7 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, ntfn := &confirmationsNotification{ ConfNtfn: chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&n.confClientCounter, 1), TxID: txid, NumConfirmations: numConfs, Event: chainntnfs.NewConfirmationEvent(numConfs), @@ -703,11 +730,15 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, heightHint: heightHint, } + if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + return nil, err + } + select { - case <-n.quit: - return nil, ErrChainNotifierShuttingDown case n.notificationRegistry <- ntfn: return ntfn.Event, nil + case <-n.quit: + return nil, ErrChainNotifierShuttingDown } } diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index b10f4272..bb049b8a 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -3,15 +3,26 @@ package chainntnfs import ( "errors" "fmt" + "sync" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcutil" ) +var ( + // ErrTxConfNotifierExiting is an error returned when attempting to + // interact with the TxConfNotifier but it been shut down. + ErrTxConfNotifierExiting = errors.New("TxConfNotifier is exiting") +) + // ConfNtfn represents a notifier client's request to receive a notification // once the target transaction gets sufficient confirmations. The client is // asynchronously notified via the ConfirmationEvent channels. type ConfNtfn struct { + // ConfID uniquely identifies the confirmation notification request for + // the specified transaction. + ConfID uint64 + // TxID is the hash of the transaction for which confirmation notifications // are requested. TxID *chainhash.Hash @@ -66,7 +77,7 @@ type TxConfNotifier struct { // confNotifications is an index of notification requests by transaction // hash. - confNotifications map[chainhash.Hash][]*ConfNtfn + confNotifications map[chainhash.Hash]map[uint64]*ConfNtfn // txsByInitialHeight is an index of watched transactions by the height // that they are included at in the blockchain. This is tracked so that @@ -81,6 +92,8 @@ type TxConfNotifier struct { // quit is closed in order to signal that the notifier is gracefully // exiting. quit chan struct{} + + sync.Mutex } // NewTxConfNotifier creates a TxConfNotifier. The current height of the @@ -89,7 +102,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif return &TxConfNotifier{ currentHeight: startHeight, reorgSafetyLimit: reorgSafetyLimit, - confNotifications: make(map[chainhash.Hash][]*ConfNtfn), + confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), quit: make(chan struct{}), @@ -98,27 +111,83 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif // Register handles a new notification request. The client will be notified when // the transaction gets a sufficient number of confirmations on the blockchain. -// If the transaction has already been included in a block on the chain, the -// confirmation details must be given as the txConf argument, otherwise it -// should be nil. If the transaction already has the sufficient number of -// confirmations, this dispatches the notification immediately. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error { +// +// NOTE: If the transaction has already been included in a block on the chain, +// the confirmation details must be provided with the UpdateConfDetails method, +// otherwise we will wait for the transaction to confirm even though it already +// has. +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } - if txConf == nil || txConf.BlockHeight > tcn.currentHeight { - // Transaction is unconfirmed. - tcn.confNotifications[*ntfn.TxID] = - append(tcn.confNotifications[*ntfn.TxID], ntfn) + tcn.Lock() + defer tcn.Unlock() + + ntfns, ok := tcn.confNotifications[*ntfn.TxID] + if !ok { + ntfns = make(map[uint64]*ConfNtfn) + tcn.confNotifications[*ntfn.TxID] = ntfns + } + + ntfns[ntfn.ConfID] = ntfn + + return nil +} + +// UpdateConfDetails attempts to update the confirmation details for an active +// notification within the notifier. This should only be used in the case of a +// transaction that has confirmed before the notifier's current height. +// +// NOTE: The notification should be registered first to ensure notifications are +// dispatched correctly. +func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, + clientID uint64, details *TxConfirmation) error { + + select { + case <-tcn.quit: + return ErrTxConfNotifierExiting + default: + } + + // Ensure we hold the lock throughout handling the notification to + // prevent the notifier from advancing its height underneath us. + tcn.Lock() + defer tcn.Unlock() + + // First, we'll determine whether we have an active notification for + // this transaction with the given ID. + ntfns, ok := tcn.confNotifications[txid] + if !ok { + return fmt.Errorf("no notifications found for txid %v", txid) + } + + ntfn, ok := ntfns[clientID] + if !ok { + return fmt.Errorf("no notification found with ID %v", clientID) + } + + // If the notification has already recognized that the transaction + // confirmed, there's nothing left for us to do. + if ntfn.details != nil { return nil } - // If the transaction already has the required confirmations, we'll - // dispatch the notification immediately. - confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 + // The notifier has yet to reach the height at which the transaction was + // included in a block, so we should defer until handling it then within + // ConnectTip. + if details == nil || details.BlockHeight > tcn.currentHeight { + return nil + } + + ntfn.details = details + + // Now, we'll examine whether the transaction of this notification + // request has reched its required number of confirmations. If it has, + // we'll disaptch a confirmation notification to the caller. + confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) @@ -126,21 +195,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro // We'll send a 0 value to the Updates channel, indicating that // the transaction has already been confirmed. select { - case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") case ntfn.Event.Updates <- 0: + case <-tcn.quit: + return ErrTxConfNotifierExiting } select { - case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") - case ntfn.Event.Confirmed <- txConf: + case ntfn.Event.Confirmed <- details: ntfn.dispatched = true + case <-tcn.quit: + return ErrTxConfNotifierExiting } } else { - // Otherwise, we'll record the transaction along with the height - // at which we should notify the client. - ntfn.details = txConf + // Otherwise, we'll keep track of the notification request by + // the height at which we should dispatch the confirmation + // notification. ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { ntfnSet = make(map[*ConfNtfn]struct{}) @@ -154,22 +223,19 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro select { case ntfn.Event.Updates <- numConfsLeft: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } } // As a final check, we'll also watch the transaction if it's still - // possible for it to get reorganized out of the chain. - if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { - tcn.confNotifications[*ntfn.TxID] = - append(tcn.confNotifications[*ntfn.TxID], ntfn) - - txSet, exists := tcn.txsByInitialHeight[txConf.BlockHeight] + // possible for it to get reorged out of the chain. + if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { + txSet, exists := tcn.txsByInitialHeight[details.BlockHeight] if !exists { txSet = make(map[chainhash.Hash]struct{}) - tcn.txsByInitialHeight[txConf.BlockHeight] = txSet + tcn.txsByInitialHeight[details.BlockHeight] = txSet } - txSet[*ntfn.TxID] = struct{}{} + txSet[txid] = struct{}{} } return nil @@ -185,10 +251,13 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } + tcn.Lock() + defer tcn.Unlock() + if blockHeight != tcn.currentHeight+1 { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, new height=%d", @@ -234,8 +303,10 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, for _, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { for _, ntfn := range tcn.confNotifications[txHash] { - // If the transaction still hasn't been included - // in a block, we'll skip it. + // If the notification hasn't learned about the + // confirmation of its transaction yet (in the + // case of historical confirmations), we'll skip + // it. if ntfn.details == nil { continue } @@ -256,7 +327,7 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, select { case ntfn.Event.Updates <- numConfsLeft: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } } } @@ -267,11 +338,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) + select { case ntfn.Event.Confirmed <- ntfn.details: ntfn.dispatched = true case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } } delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) @@ -297,10 +369,13 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } + tcn.Lock() + defer tcn.Unlock() + if blockHeight != tcn.currentHeight { return fmt.Errorf("Received blocks out of order: "+ "current height=%d, disconnected height=%d", @@ -321,7 +396,7 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case <-ntfn.Event.Updates: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } @@ -340,7 +415,7 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case <-ntfn.Event.Confirmed: case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting default: } @@ -352,7 +427,7 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { select { case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): case <-tcn.quit: - return errors.New("TxConfNotifier is exiting") + return ErrTxConfNotifierExiting } continue @@ -383,6 +458,9 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // This closes the event channels of all registered notifications that have // not been dispatched yet. func (tcn *TxConfNotifier) TearDown() { + tcn.Lock() + defer tcn.Unlock() + close(tcn.quit) for _, ntfns := range tcn.confNotifications { diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 96b7d145..2a57245a 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -3,10 +3,10 @@ package chainntnfs_test import ( "testing" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/chainntnfs" ) var zeroHash chainhash.Hash @@ -38,7 +38,9 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -46,7 +48,9 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // We should not receive any notifications from both transactions // since they have not been included in a block yet. @@ -202,32 +206,37 @@ func TestTxConfHistoricalDispatch(t *testing.T) { // starting height so that they are confirmed once registering them. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ + ConfID: 0, TxID: &tx1Hash, NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + tx2Hash := tx2.TxHash() + ntfn2 := chainntnfs.ConfNtfn{ + ConfID: 1, + TxID: &tx2Hash, + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), + } + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + // Update tx1 with its confirmation details. We should only receive one + // update since it only requires one confirmation and it already met it. txConf1 := chainntnfs.TxConfirmation{ BlockHash: &zeroHash, BlockHeight: 9, TxIndex: 1, } - txConfNotifier.Register(&ntfn1, &txConf1) - - tx2Hash := tx2.TxHash() - txConf2 := chainntnfs.TxConfirmation{ - BlockHash: &zeroHash, - BlockHeight: 9, - TxIndex: 2, + err := txConfNotifier.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1) + if err != nil { + t.Fatalf("unable to update conf details: %v", err) } - ntfn2 := chainntnfs.ConfNtfn{ - TxID: &tx2Hash, - NumConfirmations: tx2NumConfs, - Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), - } - txConfNotifier.Register(&ntfn2, &txConf2) - - // We should only receive one update for tx1 since it only requires - // one confirmation and it already met it. select { case numConfsLeft := <-ntfn1.Event.Updates: const expected = 0 @@ -240,8 +249,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatal("Expected confirmation update for tx1") } - // A confirmation notification for tx1 should be dispatched, as it met - // its required number of confirmations. + // A confirmation notification for tx1 should also be dispatched. select { case txConf := <-ntfn1.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf1) @@ -249,8 +257,19 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } - // We should only receive one update indicating how many confirmations - // are left for the transaction to be confirmed. + // Update tx2 with its confirmation details. This should not trigger a + // confirmation notification since it hasn't reached its required number + // of confirmations, but we should receive a confirmation update + // indicating how many confirmation are left. + txConf2 := chainntnfs.TxConfirmation{ + BlockHash: &zeroHash, + BlockHeight: 9, + TxIndex: 2, + } + err = txConfNotifier.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2) + if err != nil { + t.Fatalf("unable to update conf details: %v", err) + } select { case numConfsLeft := <-ntfn2.Event.Updates: const expected = 1 @@ -263,8 +282,6 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatal("Expected confirmation update for tx2") } - // A confirmation notification for tx2 should not be dispatched yet, as - // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) @@ -277,7 +294,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx3}, }) - err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + err = txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -343,7 +360,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Tx 2 will be confirmed in block 10 and requires 1 conf. tx2Hash := tx2.TxHash() @@ -352,7 +371,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Tx 3 will be confirmed in block 10 and requires 2 confs. tx3Hash := tx3.TxHash() @@ -361,7 +382,9 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx3NumConfs, Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), } - txConfNotifier.Register(&ntfn3, nil) + if err := txConfNotifier.Register(&ntfn3); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Sync chain to block 10. Txs 1 & 2 should be confirmed. block1 := btcutil.NewBlock(&wire.MsgBlock{ @@ -581,7 +604,9 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 1, Event: chainntnfs.NewConfirmationEvent(1), } - txConfNotifier.Register(&ntfn1, nil) + if err := txConfNotifier.Register(&ntfn1); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -589,7 +614,9 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 2, Event: chainntnfs.NewConfirmationEvent(2), } - txConfNotifier.Register(&ntfn2, nil) + if err := txConfNotifier.Register(&ntfn2); err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } // Include the transactions in a block and add it to the TxConfNotifier. // This should confirm tx1, but not tx2.