From 486694a84e1c0dde60345174650adedbe78dff52 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 19 Mar 2018 14:48:44 -0400 Subject: [PATCH 1/3] chainntnfs: add Updates channel field to ConfirmationEvent In this commit, we add a new Updates channel to our ConfirmationEvent struct. This channel will be used to deliver updates to a subscriber of a confirmation notification. Updates will be delivered at every incremental height of the chain with the number of confirmations remaining for the transaction to be considered confirmed by the subscriber. --- chainntnfs/bitcoindnotify/bitcoind.go | 2 +- chainntnfs/btcdnotify/btcd.go | 2 +- chainntnfs/interface.go | 11 ++++-- chainntnfs/neutrinonotify/neutrino.go | 2 +- chainntnfs/txconfnotifier.go | 4 ++- chainntnfs/txconfnotifier_test.go | 50 ++++++++++++++++++--------- 6 files changed, 48 insertions(+), 23 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 5897d06f..7233389a 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -605,7 +605,7 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, chainntnfs.ConfNtfn{ TxID: txid, NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(numConfs), }, } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index a93726e2..a7c87c65 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -607,7 +607,7 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, chainntnfs.ConfNtfn{ TxID: txid, NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(numConfs), }, } diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index d589578b..9854d30a 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -85,8 +85,10 @@ type TxConfirmation struct { // ConfirmationEvent encapsulates a confirmation notification. With this struct, // callers can be notified of: the instance the target txid reaches the targeted -// number of confirmations, and also in the event that the original txid becomes -// disconnected from the blockchain as a result of a re-org. +// number of confirmations, how many confirmations are left for the target txid +// to be fully confirmed at every new block height, and also in the event that +// the original txid becomes disconnected from the blockchain as a result of a +// re-org. // // Once the txid reaches the specified number of confirmations, the 'Confirmed' // channel will be sent upon fulfilling the notification. @@ -100,6 +102,11 @@ type ConfirmationEvent struct { // details of the channel's confirmation. Confirmed chan *TxConfirmation // MUST be buffered. + // Updates is a channel that will sent upon, at every incremental + // confirmation, how many confirmations are left to declare the + // transaction as fully confirmed. + Updates chan uint32 // MUST be buffered. + // TODO(roasbeef): all goroutines on ln channel updates should also // have a struct chan that's closed if funding gets re-org out. Need // to sync, to request another confirmation event ntfn, then re-open diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index e347e6d8..d9b0828e 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -698,7 +698,7 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, ConfNtfn: chainntnfs.ConfNtfn{ TxID: txid, NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(numConfs), }, heightHint: heightHint, } diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 6d941553..f5d37e46 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -33,9 +33,10 @@ type ConfNtfn struct { // NewConfirmationEvent constructs a new ConfirmationEvent with newly opened // channels. -func NewConfirmationEvent() *ConfirmationEvent { +func NewConfirmationEvent(numConfs uint32) *ConfirmationEvent { return &ConfirmationEvent{ Confirmed: make(chan *TxConfirmation, 1), + Updates: make(chan uint32, numConfs), NegativeConf: make(chan int32, 1), } } @@ -290,6 +291,7 @@ func (tcn *TxConfNotifier) TearDown() { } close(ntfn.Event.Confirmed) + close(ntfn.Event.Updates) close(ntfn.Event.NegativeConf) } } diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index d60b5227..2677671f 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -16,7 +16,10 @@ var zeroHash chainhash.Hash func TestTxConfFutureDispatch(t *testing.T) { t.Parallel() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + const ( + tx1NumConfs uint32 = 1 + tx2NumConfs uint32 = 2 + ) var ( tx1 = wire.MsgTx{Version: 1} @@ -24,19 +27,21 @@ func TestTxConfFutureDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, - NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx1NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } txConfNotifier.Register(&ntfn1, nil) tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, - NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } txConfNotifier.Register(&ntfn2, nil) @@ -113,7 +118,10 @@ func TestTxConfFutureDispatch(t *testing.T) { func TestTxConfHistoricalDispatch(t *testing.T) { t.Parallel() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + const ( + tx1NumConfs uint32 = 1 + tx2NumConfs uint32 = 3 + ) var ( tx1 = wire.MsgTx{Version: 1} @@ -121,11 +129,13 @@ func TestTxConfHistoricalDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, - NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx1NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } txConf1 := chainntnfs.TxConfirmation{ BlockHash: &zeroHash, @@ -142,8 +152,8 @@ func TestTxConfHistoricalDispatch(t *testing.T) { } ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, - NumConfirmations: 3, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } txConfNotifier.Register(&ntfn2, &txConf2) @@ -189,7 +199,11 @@ func TestTxConfHistoricalDispatch(t *testing.T) { func TestTxConfChainReorg(t *testing.T) { t.Parallel() - txConfNotifier := chainntnfs.NewTxConfNotifier(8, 100) + const ( + tx1NumConfs uint32 = 2 + tx2NumConfs uint32 = 1 + tx3NumConfs uint32 = 2 + ) var ( tx1 = wire.MsgTx{Version: 1} @@ -197,12 +211,14 @@ func TestTxConfChainReorg(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) + txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100) + // Tx 1 will be confirmed in block 9 and requires 2 confs. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, - NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx1NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } txConfNotifier.Register(&ntfn1, nil) @@ -210,8 +226,8 @@ func TestTxConfChainReorg(t *testing.T) { tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, - NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } txConfNotifier.Register(&ntfn2, nil) @@ -219,8 +235,8 @@ func TestTxConfChainReorg(t *testing.T) { tx3Hash := tx3.TxHash() ntfn3 := chainntnfs.ConfNtfn{ TxID: &tx3Hash, - NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx3NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), } txConfNotifier.Register(&ntfn3, nil) From 09f8a728971d3a2a741635891b54629cd43fab2c Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 19 Mar 2018 15:04:19 -0400 Subject: [PATCH 2/3] chainntnfs: modify watched transactions by height to use a set In this commit, we avoid storing extra copies of a transaction when multiple clients register to be notified for the same transaction. We do this by using a set, which only stores unique elements. --- chainntnfs/txconfnotifier.go | 57 +++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index f5d37e46..c2ce043d 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -67,11 +67,11 @@ type TxConfNotifier struct { // hash. confNotifications map[chainhash.Hash][]*ConfNtfn - // confTxsByInitialHeight is an index of watched transactions by the height + // txsByInitialHeight is an index of watched transactions by the height // that they are included at in the blockchain. This is tracked so that - // incorrect notifications are not sent if a transaction is reorganized out - // of the chain and so that negative confirmations can be recognized. - confTxsByInitialHeight map[uint32][]*chainhash.Hash + // incorrect notifications are not sent if a transaction is reorganized + // out of the chain and so that negative confirmations can be recognized. + txsByInitialHeight map[uint32]map[chainhash.Hash]struct{} // ntfnsByConfirmHeight is an index of notification requests by the height // at which the transaction will have sufficient confirmations. @@ -86,12 +86,12 @@ type TxConfNotifier struct { // blockchain is accepted as a parameter. func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotifier { return &TxConfNotifier{ - currentHeight: startHeight, - reorgSafetyLimit: reorgSafetyLimit, - confNotifications: make(map[chainhash.Hash][]*ConfNtfn), - confTxsByInitialHeight: make(map[uint32][]*chainhash.Hash), - ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), - quit: make(chan struct{}), + currentHeight: startHeight, + reorgSafetyLimit: reorgSafetyLimit, + confNotifications: make(map[chainhash.Hash][]*ConfNtfn), + txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), + ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), + quit: make(chan struct{}), } } @@ -138,14 +138,18 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro ntfnSet[ntfn] = struct{}{} } - // Unless the transaction is finalized, include transaction information in - // confNotifications and confTxsByInitialHeight in case the tx gets - // reorganized out of the chain. + // As a final check, we'll also watch the transaction if it's still + // possible for it to get reorganized out of the chain. if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { tcn.confNotifications[*ntfn.TxID] = append(tcn.confNotifications[*ntfn.TxID], ntfn) - tcn.confTxsByInitialHeight[txConf.BlockHeight] = - append(tcn.confTxsByInitialHeight[txConf.BlockHeight], ntfn.TxID) + + txSet, exists := tcn.txsByInitialHeight[txConf.BlockHeight] + if !exists { + txSet = make(map[chainhash.Hash]struct{}) + tcn.txsByInitialHeight[txConf.BlockHeight] = txSet + } + txSet[*ntfn.TxID] = struct{}{} } return nil @@ -194,8 +198,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } ntfnSet[ntfn] = struct{}{} - tcn.confTxsByInitialHeight[blockHeight] = - append(tcn.confTxsByInitialHeight[blockHeight], tx.Hash()) + txSet, exists := tcn.txsByInitialHeight[blockHeight] + if !exists { + txSet = make(map[chainhash.Hash]struct{}) + tcn.txsByInitialHeight[blockHeight] = txSet + } + txSet[*txHash] = struct{}{} } } @@ -214,14 +222,14 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) // Clear entries from confNotifications and confTxsByInitialHeight. We - // assume that reorgs deeper than the reorg safety limit do not happen, so - // we can clear out entries for the block that is now mature. + // assume that reorgs deeper than the reorg safety limit do not happen, + // so we can clear out entries for the block that is now mature. if tcn.currentHeight >= tcn.reorgSafetyLimit { matureBlockHeight := tcn.currentHeight - tcn.reorgSafetyLimit - for _, txHash := range tcn.confTxsByInitialHeight[matureBlockHeight] { - delete(tcn.confNotifications, *txHash) + for txHash := range tcn.txsByInitialHeight[matureBlockHeight] { + delete(tcn.confNotifications, txHash) } - delete(tcn.confTxsByInitialHeight, matureBlockHeight) + delete(tcn.txsByInitialHeight, matureBlockHeight) } return nil @@ -273,7 +281,10 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { delete(ntfnSet, ntfn) } } - delete(tcn.confTxsByInitialHeight, blockHeight) + + // Finally, we can remove the transactions we're currently watching that + // were included in this block height. + delete(tcn.txsByInitialHeight, blockHeight) return nil } From 19a2bd9c7d852651f0724ccea0049ec2207d1a9c Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 19 Mar 2018 15:22:44 -0400 Subject: [PATCH 3/3] chainntnfs: send incremental update notifications for tx confirmations In this commit, we introduce the ability for the different ChainNotifier implements to send incremental updates to the subscribers of transaction confirmations. These incremental updates represent how many confirmations are left for the transaction to be confirmed. They are sent to the subscriber at every new height of the chain. --- chainntnfs/txconfnotifier.go | 148 +++++++++++++---- chainntnfs/txconfnotifier_test.go | 256 ++++++++++++++++++++++++++++-- 2 files changed, 363 insertions(+), 41 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index c2ce043d..b955bc76 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -1,6 +1,7 @@ package chainntnfs import ( + "errors" "fmt" "github.com/roasbeef/btcd/chaincfg/chainhash" @@ -115,13 +116,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro return nil } - // If the transaction already has the required confirmations, dispatch - // notification immediately, otherwise record along with the height at - // which to notify. + // If the transaction already has the required confirmations, we'll + // dispatch the notification immediately. confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) + + // We'll send a 0 value to the Updates channel, indicating that + // the transaction has already been confirmed. + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + case ntfn.Event.Updates <- 0: + } + select { case <-tcn.quit: return fmt.Errorf("TxConfNotifier is exiting") @@ -129,6 +138,8 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro ntfn.dispatched = true } } else { + // Otherwise, we'll record the transaction along with the height + // at which we should notify the client. ntfn.details = txConf ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { @@ -136,6 +147,15 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet } ntfnSet[ntfn] = struct{}{} + + // We'll also send an update to the client of how many + // confirmations are left for the transaction to be confirmed. + numConfsLeft := confHeight - tcn.currentHeight + select { + case ntfn.Event.Updates <- numConfsLeft: + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + } } // As a final check, we'll also watch the transaction if it's still @@ -177,10 +197,11 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, tcn.currentHeight++ tcn.reorgDepth = 0 - // Record any newly confirmed transactions in ntfnsByConfirmHeight so that - // notifications get dispatched when the tx gets sufficient confirmations. - // Also record txs in confTxsByInitialHeight so reorgs can be handled - // correctly. + // Record any newly confirmed transactions by their confirmed height so + // that notifications get dispatched when the transactions reach their + // required number of confirmations. We'll also watch these transactions + // at the height they were included in the chain so reorgs can be + // handled correctly. for _, tx := range txns { txHash := tx.Hash() for _, ntfn := range tcn.confNotifications[*txHash] { @@ -207,8 +228,42 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } } - // Dispatch notifications for all transactions that are considered confirmed - // at this new block height. + // Next, we'll dispatch an update to all of the notification clients for + // our watched transactions with the number of confirmations left at + // this new height. + for _, txHashes := range tcn.txsByInitialHeight { + for txHash := range txHashes { + for _, ntfn := range tcn.confNotifications[txHash] { + // If the transaction still hasn't been included + // in a block, we'll skip it. + if ntfn.details == nil { + continue + } + + txConfHeight := ntfn.details.BlockHeight + + ntfn.NumConfirmations - 1 + numConfsLeft := txConfHeight - blockHeight + + // Since we don't clear notifications until + // transactions are no longer under the risk of + // being reorganized out of the chain, we'll + // skip sending updates for transactions that + // have already been confirmed. + if int32(numConfsLeft) < 0 { + continue + } + + select { + case ntfn.Event.Updates <- numConfsLeft: + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + } + } + } + } + + // Then, we'll dispatch notifications for all the transactions that have + // become confirmed at this new block height. for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) @@ -254,31 +309,66 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { tcn.currentHeight-- tcn.reorgDepth++ - for _, txHash := range tcn.confTxsByInitialHeight[blockHeight] { - for _, ntfn := range tcn.confNotifications[*txHash] { - // If notification has been dispatched with sufficient - // confirmations, notify of the reversal. - if ntfn.dispatched { + // We'll go through all of our watched transactions and attempt to drain + // their notification channels to ensure sending notifications to the + // clients is always non-blocking. + for initialHeight, txHashes := range tcn.txsByInitialHeight { + for txHash := range txHashes { + for _, ntfn := range tcn.confNotifications[txHash] { + // First, we'll attempt to drain an update + // from each notification to ensure sends to the + // Updates channel are always non-blocking. select { - case <-ntfn.Event.Confirmed: - // Drain confirmation notification instead of sending - // negative conf if the receiver has not processed it yet. - // This ensures sends to the Confirmed channel are always - // non-blocking. - case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-ntfn.Event.Updates: case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return errors.New("TxConfNotifier is exiting") + default: } - ntfn.dispatched = false - continue - } - confHeight := blockHeight + ntfn.NumConfirmations - 1 - ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] - if !exists { - continue + // Then, we'll check if the current transaction + // was included in the block currently being + // disconnected. If it was, we'll need to take + // some necessary precautions. + if initialHeight == blockHeight { + // If the transaction's confirmation notification + // has already been dispatched, we'll attempt to + // notify the client it was reorged out of the chain. + if ntfn.dispatched { + // Attempt to drain the confirmation notification + // to ensure sends to the Confirmed channel are + // always non-blocking. + select { + case <-ntfn.Event.Confirmed: + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + default: + } + + ntfn.dispatched = false + + // Send a negative confirmation notification to the + // client indicating how many blocks have been + // disconnected successively. + select { + case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + } + + continue + } + + // Otherwise, since the transactions was reorged out + // of the chain, we can safely remove its accompanying + // confirmation notification. + confHeight := blockHeight + ntfn.NumConfirmations - 1 + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + continue + } + delete(ntfnSet, ntfn) + } } - delete(ntfnSet, ntfn) } } diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 2677671f..f1ef9030 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -29,6 +29,9 @@ func TestTxConfFutureDispatch(t *testing.T) { txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + // Create the test transactions and register them with the + // TxConfNotifier before including them in a block to receive future + // notifications. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, @@ -45,27 +48,53 @@ func TestTxConfFutureDispatch(t *testing.T) { } txConfNotifier.Register(&ntfn2, nil) + // We should not receive any notifications from both transactions + // since they have not been included in a block yet. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } select { + case <-ntfn2.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx2") case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } + // Include the transactions in a block and add it to the TxConfNotifier. + // This should confirm tx1, but not tx2. block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3}, }) - err := txConfNotifier.ConnectTip(block1.Hash(), 11, block1.Transactions()) + err := txConfNotifier.ConnectTip( + block1.Hash(), 11, block1.Transactions(), + ) if err != nil { t.Fatalf("Failed to connect block: %v", err) } + // We should only receive one update for tx1 since it only requires + // one confirmation and it already met it. + select { + case numConfsLeft := <-ntfn1.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx1 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx1") + } + + // A confirmation notification for this tranaction should be dispatched, + // as it only required one confirmation. select { case txConf := <-ntfn1.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -78,12 +107,30 @@ func TestTxConfFutureDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } + // We should only receive one update for tx2 since it only has one + // confirmation so far and it requires two. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 1 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should not be dispatched yet, as + // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } + // Create a new block and add it to the TxConfNotifier at the next + // height. This should confirm tx2. block2 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx3}, }) @@ -93,12 +140,32 @@ func TestTxConfFutureDispatch(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We should not receive any event notifications for tx1 since it has + // already been confirmed. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } + // We should only receive one update since the last at the new height, + // indicating how many confirmations are still left. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, since it + // now meets its required number of confirmations. select { case txConf := <-ntfn2.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -131,6 +198,8 @@ func TestTxConfHistoricalDispatch(t *testing.T) { txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + // Create the test transactions at a height before the TxConfNotifier's + // starting height so that they are confirmed once registering them. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, @@ -157,6 +226,22 @@ func TestTxConfHistoricalDispatch(t *testing.T) { } txConfNotifier.Register(&ntfn2, &txConf2) + // We should only receive one update for tx1 since it only requires + // one confirmation and it already met it. + select { + case numConfsLeft := <-ntfn1.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx1 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx1") + } + + // A confirmation notification for tx1 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn1.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf1) @@ -164,12 +249,30 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } + // We should only receive one update indicating how many confirmations + // are left for the transaction to be confirmed. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 1 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should not be dispatched yet, as + // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } + // Create a new block and add it to the TxConfNotifier at the next + // height. This should confirm tx2. block := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx3}, }) @@ -179,12 +282,32 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We should not receive any event notifications for tx1 since it has + // already been confirmed. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } + // We should only receive one update for tx2 since the last one, + // indicating how many confirmations are still left. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn2.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf2) @@ -244,7 +367,11 @@ func TestTxConfChainReorg(t *testing.T) { block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1}, }) - err := txConfNotifier.ConnectTip(nil, 9, block1.Transactions()) + err := txConfNotifier.ConnectTip(nil, 8, block1.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + err = txConfNotifier.ConnectTip(nil, 9, nil) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -257,25 +384,57 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We should receive two updates for tx1 since it requires two + // confirmations and it has already met them. + for i := 0; i < 2; i++ { + select { + case <-ntfn1.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx1") + } + } + + // A confirmation notification for tx1 should be dispatched, as it met + // its required number of confirmations. select { case <-ntfn1.Event.Confirmed: default: t.Fatalf("Expected confirmation for tx1") } + // We should only receive one update for tx2 since it only requires + // one confirmation and it already met it. + select { + case <-ntfn2.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, as it met + // its required number of confirmations. select { case <-ntfn2.Event.Confirmed: default: t.Fatalf("Expected confirmation for tx2") } + // We should only receive one update for tx3 since it only has one + // confirmation so far and it requires two. + select { + case <-ntfn3.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx3") + } + + // A confirmation notification for tx3 should not be dispatched yet, as + // it requires one more confirmation. select { case txConf := <-ntfn3.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) default: } - // Block that tx2 and tx3 were included in is disconnected and two next + // The block that included tx2 and tx3 is disconnected and two next // blocks without them are connected. err = txConfNotifier.DisconnectTip(10) if err != nil { @@ -302,19 +461,28 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Expected negative conf notification for tx1") } + // We should not receive any event notifications from all of the + // transactions because tx1 has already been confirmed and tx2 and tx3 + // have not been included in the chain since the reorg. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } select { + case <-ntfn2.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx2") case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } select { + case <-ntfn3.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx3") case txConf := <-ntfn3.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) default: @@ -336,7 +504,22 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } - // Both transactions should be newly confirmed. + // We should only receive one update for tx2 since it only requires + // one confirmation and it already met it. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn2.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -349,6 +532,24 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Expected confirmation for tx2") } + // We should receive two updates for tx3 since it requires two + // confirmations and it has already met them. + for i := uint32(1); i <= 2; i++ { + select { + case numConfsLeft := <-ntfn3.Event.Updates: + expected := tx3NumConfs - i + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx3 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + } + + // A confirmation notification for tx3 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn3.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -365,18 +566,20 @@ func TestTxConfChainReorg(t *testing.T) { func TestTxConfTearDown(t *testing.T) { t.Parallel() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) - var ( tx1 = wire.MsgTx{Version: 1} tx2 = wire.MsgTx{Version: 2} ) + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + // Create the test transactions and register them with the + // TxConfNotifier to receive notifications. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(1), } txConfNotifier.Register(&ntfn1, nil) @@ -384,10 +587,12 @@ func TestTxConfTearDown(t *testing.T) { ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(2), } txConfNotifier.Register(&ntfn2, nil) + // Include the transactions in a block and add it to the TxConfNotifier. + // This should confirm tx1, but not tx2. block := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1, &tx2}, }) @@ -397,35 +602,62 @@ func TestTxConfTearDown(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We do not care about the correctness of the notifications since they + // are tested in other methods, but we'll still attempt to retrieve them + // for the sake of not being able to later once the notification + // channels are closed. + select { + case <-ntfn1.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx1") + } + select { case <-ntfn1.Event.Confirmed: default: t.Fatalf("Expected confirmation for tx1") } + select { + case <-ntfn2.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx2") + } + select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } - // Confirmed channels should be closed for notifications that have not been - // dispatched yet. + // The notification channels should be closed for notifications that + // have not been dispatched yet, so we should not expect to receive any + // more updates. txConfNotifier.TearDown() + // tx1 should not receive any more updates because it has already been + // confirmed and the TxConfNotifier has been shut down. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } + // tx2 should not receive any more updates after the notifications + // channels have been closed and the TxConfNotifier shut down. select { + case _, more := <-ntfn2.Event.Updates: + if more { + t.Fatal("Expected closed Updates channel for tx2") + } case _, more := <-ntfn2.Event.Confirmed: if more { - t.Fatalf("Expected channel close for tx2") + t.Fatalf("Expected closed Confirmed channel for tx2") } default: - t.Fatalf("Expected channel close for tx2") + t.Fatalf("Expected closed notification channels for tx2") } }