diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index c2ce043d..b955bc76 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -1,6 +1,7 @@ package chainntnfs import ( + "errors" "fmt" "github.com/roasbeef/btcd/chaincfg/chainhash" @@ -115,13 +116,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro return nil } - // If the transaction already has the required confirmations, dispatch - // notification immediately, otherwise record along with the height at - // which to notify. + // If the transaction already has the required confirmations, we'll + // dispatch the notification immediately. confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) + + // We'll send a 0 value to the Updates channel, indicating that + // the transaction has already been confirmed. + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + case ntfn.Event.Updates <- 0: + } + select { case <-tcn.quit: return fmt.Errorf("TxConfNotifier is exiting") @@ -129,6 +138,8 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro ntfn.dispatched = true } } else { + // Otherwise, we'll record the transaction along with the height + // at which we should notify the client. ntfn.details = txConf ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { @@ -136,6 +147,15 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet } ntfnSet[ntfn] = struct{}{} + + // We'll also send an update to the client of how many + // confirmations are left for the transaction to be confirmed. + numConfsLeft := confHeight - tcn.currentHeight + select { + case ntfn.Event.Updates <- numConfsLeft: + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + } } // As a final check, we'll also watch the transaction if it's still @@ -177,10 +197,11 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, tcn.currentHeight++ tcn.reorgDepth = 0 - // Record any newly confirmed transactions in ntfnsByConfirmHeight so that - // notifications get dispatched when the tx gets sufficient confirmations. - // Also record txs in confTxsByInitialHeight so reorgs can be handled - // correctly. + // Record any newly confirmed transactions by their confirmed height so + // that notifications get dispatched when the transactions reach their + // required number of confirmations. We'll also watch these transactions + // at the height they were included in the chain so reorgs can be + // handled correctly. for _, tx := range txns { txHash := tx.Hash() for _, ntfn := range tcn.confNotifications[*txHash] { @@ -207,8 +228,42 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } } - // Dispatch notifications for all transactions that are considered confirmed - // at this new block height. + // Next, we'll dispatch an update to all of the notification clients for + // our watched transactions with the number of confirmations left at + // this new height. + for _, txHashes := range tcn.txsByInitialHeight { + for txHash := range txHashes { + for _, ntfn := range tcn.confNotifications[txHash] { + // If the transaction still hasn't been included + // in a block, we'll skip it. + if ntfn.details == nil { + continue + } + + txConfHeight := ntfn.details.BlockHeight + + ntfn.NumConfirmations - 1 + numConfsLeft := txConfHeight - blockHeight + + // Since we don't clear notifications until + // transactions are no longer under the risk of + // being reorganized out of the chain, we'll + // skip sending updates for transactions that + // have already been confirmed. + if int32(numConfsLeft) < 0 { + continue + } + + select { + case ntfn.Event.Updates <- numConfsLeft: + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + } + } + } + } + + // Then, we'll dispatch notifications for all the transactions that have + // become confirmed at this new block height. for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) @@ -254,31 +309,66 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { tcn.currentHeight-- tcn.reorgDepth++ - for _, txHash := range tcn.confTxsByInitialHeight[blockHeight] { - for _, ntfn := range tcn.confNotifications[*txHash] { - // If notification has been dispatched with sufficient - // confirmations, notify of the reversal. - if ntfn.dispatched { + // We'll go through all of our watched transactions and attempt to drain + // their notification channels to ensure sending notifications to the + // clients is always non-blocking. + for initialHeight, txHashes := range tcn.txsByInitialHeight { + for txHash := range txHashes { + for _, ntfn := range tcn.confNotifications[txHash] { + // First, we'll attempt to drain an update + // from each notification to ensure sends to the + // Updates channel are always non-blocking. select { - case <-ntfn.Event.Confirmed: - // Drain confirmation notification instead of sending - // negative conf if the receiver has not processed it yet. - // This ensures sends to the Confirmed channel are always - // non-blocking. - case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-ntfn.Event.Updates: case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return errors.New("TxConfNotifier is exiting") + default: } - ntfn.dispatched = false - continue - } - confHeight := blockHeight + ntfn.NumConfirmations - 1 - ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] - if !exists { - continue + // Then, we'll check if the current transaction + // was included in the block currently being + // disconnected. If it was, we'll need to take + // some necessary precautions. + if initialHeight == blockHeight { + // If the transaction's confirmation notification + // has already been dispatched, we'll attempt to + // notify the client it was reorged out of the chain. + if ntfn.dispatched { + // Attempt to drain the confirmation notification + // to ensure sends to the Confirmed channel are + // always non-blocking. + select { + case <-ntfn.Event.Confirmed: + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + default: + } + + ntfn.dispatched = false + + // Send a negative confirmation notification to the + // client indicating how many blocks have been + // disconnected successively. + select { + case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + } + + continue + } + + // Otherwise, since the transactions was reorged out + // of the chain, we can safely remove its accompanying + // confirmation notification. + confHeight := blockHeight + ntfn.NumConfirmations - 1 + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + continue + } + delete(ntfnSet, ntfn) + } } - delete(ntfnSet, ntfn) } } diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 2677671f..f1ef9030 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -29,6 +29,9 @@ func TestTxConfFutureDispatch(t *testing.T) { txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + // Create the test transactions and register them with the + // TxConfNotifier before including them in a block to receive future + // notifications. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, @@ -45,27 +48,53 @@ func TestTxConfFutureDispatch(t *testing.T) { } txConfNotifier.Register(&ntfn2, nil) + // We should not receive any notifications from both transactions + // since they have not been included in a block yet. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } select { + case <-ntfn2.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx2") case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } + // Include the transactions in a block and add it to the TxConfNotifier. + // This should confirm tx1, but not tx2. block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3}, }) - err := txConfNotifier.ConnectTip(block1.Hash(), 11, block1.Transactions()) + err := txConfNotifier.ConnectTip( + block1.Hash(), 11, block1.Transactions(), + ) if err != nil { t.Fatalf("Failed to connect block: %v", err) } + // We should only receive one update for tx1 since it only requires + // one confirmation and it already met it. + select { + case numConfsLeft := <-ntfn1.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx1 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx1") + } + + // A confirmation notification for this tranaction should be dispatched, + // as it only required one confirmation. select { case txConf := <-ntfn1.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -78,12 +107,30 @@ func TestTxConfFutureDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } + // We should only receive one update for tx2 since it only has one + // confirmation so far and it requires two. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 1 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should not be dispatched yet, as + // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } + // Create a new block and add it to the TxConfNotifier at the next + // height. This should confirm tx2. block2 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx3}, }) @@ -93,12 +140,32 @@ func TestTxConfFutureDispatch(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We should not receive any event notifications for tx1 since it has + // already been confirmed. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } + // We should only receive one update since the last at the new height, + // indicating how many confirmations are still left. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, since it + // now meets its required number of confirmations. select { case txConf := <-ntfn2.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -131,6 +198,8 @@ func TestTxConfHistoricalDispatch(t *testing.T) { txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + // Create the test transactions at a height before the TxConfNotifier's + // starting height so that they are confirmed once registering them. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, @@ -157,6 +226,22 @@ func TestTxConfHistoricalDispatch(t *testing.T) { } txConfNotifier.Register(&ntfn2, &txConf2) + // We should only receive one update for tx1 since it only requires + // one confirmation and it already met it. + select { + case numConfsLeft := <-ntfn1.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx1 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx1") + } + + // A confirmation notification for tx1 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn1.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf1) @@ -164,12 +249,30 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } + // We should only receive one update indicating how many confirmations + // are left for the transaction to be confirmed. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 1 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should not be dispatched yet, as + // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } + // Create a new block and add it to the TxConfNotifier at the next + // height. This should confirm tx2. block := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx3}, }) @@ -179,12 +282,32 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We should not receive any event notifications for tx1 since it has + // already been confirmed. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } + // We should only receive one update for tx2 since the last one, + // indicating how many confirmations are still left. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn2.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf2) @@ -244,7 +367,11 @@ func TestTxConfChainReorg(t *testing.T) { block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1}, }) - err := txConfNotifier.ConnectTip(nil, 9, block1.Transactions()) + err := txConfNotifier.ConnectTip(nil, 8, block1.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + err = txConfNotifier.ConnectTip(nil, 9, nil) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -257,25 +384,57 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We should receive two updates for tx1 since it requires two + // confirmations and it has already met them. + for i := 0; i < 2; i++ { + select { + case <-ntfn1.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx1") + } + } + + // A confirmation notification for tx1 should be dispatched, as it met + // its required number of confirmations. select { case <-ntfn1.Event.Confirmed: default: t.Fatalf("Expected confirmation for tx1") } + // We should only receive one update for tx2 since it only requires + // one confirmation and it already met it. + select { + case <-ntfn2.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, as it met + // its required number of confirmations. select { case <-ntfn2.Event.Confirmed: default: t.Fatalf("Expected confirmation for tx2") } + // We should only receive one update for tx3 since it only has one + // confirmation so far and it requires two. + select { + case <-ntfn3.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx3") + } + + // A confirmation notification for tx3 should not be dispatched yet, as + // it requires one more confirmation. select { case txConf := <-ntfn3.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) default: } - // Block that tx2 and tx3 were included in is disconnected and two next + // The block that included tx2 and tx3 is disconnected and two next // blocks without them are connected. err = txConfNotifier.DisconnectTip(10) if err != nil { @@ -302,19 +461,28 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Expected negative conf notification for tx1") } + // We should not receive any event notifications from all of the + // transactions because tx1 has already been confirmed and tx2 and tx3 + // have not been included in the chain since the reorg. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } select { + case <-ntfn2.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx2") case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } select { + case <-ntfn3.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx3") case txConf := <-ntfn3.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) default: @@ -336,7 +504,22 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } - // Both transactions should be newly confirmed. + // We should only receive one update for tx2 since it only requires + // one confirmation and it already met it. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn2.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -349,6 +532,24 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Expected confirmation for tx2") } + // We should receive two updates for tx3 since it requires two + // confirmations and it has already met them. + for i := uint32(1); i <= 2; i++ { + select { + case numConfsLeft := <-ntfn3.Event.Updates: + expected := tx3NumConfs - i + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx3 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + } + + // A confirmation notification for tx3 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn3.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -365,18 +566,20 @@ func TestTxConfChainReorg(t *testing.T) { func TestTxConfTearDown(t *testing.T) { t.Parallel() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) - var ( tx1 = wire.MsgTx{Version: 1} tx2 = wire.MsgTx{Version: 2} ) + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + // Create the test transactions and register them with the + // TxConfNotifier to receive notifications. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(1), } txConfNotifier.Register(&ntfn1, nil) @@ -384,10 +587,12 @@ func TestTxConfTearDown(t *testing.T) { ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(2), } txConfNotifier.Register(&ntfn2, nil) + // Include the transactions in a block and add it to the TxConfNotifier. + // This should confirm tx1, but not tx2. block := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1, &tx2}, }) @@ -397,35 +602,62 @@ func TestTxConfTearDown(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We do not care about the correctness of the notifications since they + // are tested in other methods, but we'll still attempt to retrieve them + // for the sake of not being able to later once the notification + // channels are closed. + select { + case <-ntfn1.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx1") + } + select { case <-ntfn1.Event.Confirmed: default: t.Fatalf("Expected confirmation for tx1") } + select { + case <-ntfn2.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx2") + } + select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } - // Confirmed channels should be closed for notifications that have not been - // dispatched yet. + // The notification channels should be closed for notifications that + // have not been dispatched yet, so we should not expect to receive any + // more updates. txConfNotifier.TearDown() + // tx1 should not receive any more updates because it has already been + // confirmed and the TxConfNotifier has been shut down. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } + // tx2 should not receive any more updates after the notifications + // channels have been closed and the TxConfNotifier shut down. select { + case _, more := <-ntfn2.Event.Updates: + if more { + t.Fatal("Expected closed Updates channel for tx2") + } case _, more := <-ntfn2.Event.Confirmed: if more { - t.Fatalf("Expected channel close for tx2") + t.Fatalf("Expected closed Confirmed channel for tx2") } default: - t.Fatalf("Expected channel close for tx2") + t.Fatalf("Expected closed notification channels for tx2") } }