chainntnfs: send incremental update notifications for tx confirmations

In this commit, we introduce the ability for the different ChainNotifier
implements to send incremental updates to the subscribers of transaction
confirmations. These incremental updates represent how many
confirmations are left for the transaction to be confirmed. They are
sent to the subscriber at every new height of the chain.
This commit is contained in:
Wilmer Paulino 2018-03-19 15:22:44 -04:00
parent 09f8a72897
commit 19a2bd9c7d
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 363 additions and 41 deletions

@ -1,6 +1,7 @@
package chainntnfs
import (
"errors"
"fmt"
"github.com/roasbeef/btcd/chaincfg/chainhash"
@ -115,13 +116,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro
return nil
}
// If the transaction already has the required confirmations, dispatch
// notification immediately, otherwise record along with the height at
// which to notify.
// If the transaction already has the required confirmations, we'll
// dispatch the notification immediately.
confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1
if confHeight <= tcn.currentHeight {
Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID)
// We'll send a 0 value to the Updates channel, indicating that
// the transaction has already been confirmed.
select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
case ntfn.Event.Updates <- 0:
}
select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
@ -129,6 +138,8 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro
ntfn.dispatched = true
}
} else {
// Otherwise, we'll record the transaction along with the height
// at which we should notify the client.
ntfn.details = txConf
ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
if !exists {
@ -136,6 +147,15 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro
tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet
}
ntfnSet[ntfn] = struct{}{}
// We'll also send an update to the client of how many
// confirmations are left for the transaction to be confirmed.
numConfsLeft := confHeight - tcn.currentHeight
select {
case ntfn.Event.Updates <- numConfsLeft:
case <-tcn.quit:
return errors.New("TxConfNotifier is exiting")
}
}
// As a final check, we'll also watch the transaction if it's still
@ -177,10 +197,11 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
tcn.currentHeight++
tcn.reorgDepth = 0
// Record any newly confirmed transactions in ntfnsByConfirmHeight so that
// notifications get dispatched when the tx gets sufficient confirmations.
// Also record txs in confTxsByInitialHeight so reorgs can be handled
// correctly.
// Record any newly confirmed transactions by their confirmed height so
// that notifications get dispatched when the transactions reach their
// required number of confirmations. We'll also watch these transactions
// at the height they were included in the chain so reorgs can be
// handled correctly.
for _, tx := range txns {
txHash := tx.Hash()
for _, ntfn := range tcn.confNotifications[*txHash] {
@ -207,8 +228,42 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
}
}
// Dispatch notifications for all transactions that are considered confirmed
// at this new block height.
// Next, we'll dispatch an update to all of the notification clients for
// our watched transactions with the number of confirmations left at
// this new height.
for _, txHashes := range tcn.txsByInitialHeight {
for txHash := range txHashes {
for _, ntfn := range tcn.confNotifications[txHash] {
// If the transaction still hasn't been included
// in a block, we'll skip it.
if ntfn.details == nil {
continue
}
txConfHeight := ntfn.details.BlockHeight +
ntfn.NumConfirmations - 1
numConfsLeft := txConfHeight - blockHeight
// Since we don't clear notifications until
// transactions are no longer under the risk of
// being reorganized out of the chain, we'll
// skip sending updates for transactions that
// have already been confirmed.
if int32(numConfsLeft) < 0 {
continue
}
select {
case ntfn.Event.Updates <- numConfsLeft:
case <-tcn.quit:
return errors.New("TxConfNotifier is exiting")
}
}
}
}
// Then, we'll dispatch notifications for all the transactions that have
// become confirmed at this new block height.
for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] {
Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID)
@ -254,31 +309,66 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
tcn.currentHeight--
tcn.reorgDepth++
for _, txHash := range tcn.confTxsByInitialHeight[blockHeight] {
for _, ntfn := range tcn.confNotifications[*txHash] {
// If notification has been dispatched with sufficient
// confirmations, notify of the reversal.
if ntfn.dispatched {
// We'll go through all of our watched transactions and attempt to drain
// their notification channels to ensure sending notifications to the
// clients is always non-blocking.
for initialHeight, txHashes := range tcn.txsByInitialHeight {
for txHash := range txHashes {
for _, ntfn := range tcn.confNotifications[txHash] {
// First, we'll attempt to drain an update
// from each notification to ensure sends to the
// Updates channel are always non-blocking.
select {
case <-ntfn.Event.Confirmed:
// Drain confirmation notification instead of sending
// negative conf if the receiver has not processed it yet.
// This ensures sends to the Confirmed channel are always
// non-blocking.
case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth):
case <-ntfn.Event.Updates:
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
return errors.New("TxConfNotifier is exiting")
default:
}
ntfn.dispatched = false
continue
}
confHeight := blockHeight + ntfn.NumConfirmations - 1
ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
if !exists {
continue
// Then, we'll check if the current transaction
// was included in the block currently being
// disconnected. If it was, we'll need to take
// some necessary precautions.
if initialHeight == blockHeight {
// If the transaction's confirmation notification
// has already been dispatched, we'll attempt to
// notify the client it was reorged out of the chain.
if ntfn.dispatched {
// Attempt to drain the confirmation notification
// to ensure sends to the Confirmed channel are
// always non-blocking.
select {
case <-ntfn.Event.Confirmed:
case <-tcn.quit:
return errors.New("TxConfNotifier is exiting")
default:
}
ntfn.dispatched = false
// Send a negative confirmation notification to the
// client indicating how many blocks have been
// disconnected successively.
select {
case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth):
case <-tcn.quit:
return errors.New("TxConfNotifier is exiting")
}
continue
}
// Otherwise, since the transactions was reorged out
// of the chain, we can safely remove its accompanying
// confirmation notification.
confHeight := blockHeight + ntfn.NumConfirmations - 1
ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
if !exists {
continue
}
delete(ntfnSet, ntfn)
}
}
delete(ntfnSet, ntfn)
}
}

@ -29,6 +29,9 @@ func TestTxConfFutureDispatch(t *testing.T) {
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
// Create the test transactions and register them with the
// TxConfNotifier before including them in a block to receive future
// notifications.
tx1Hash := tx1.TxHash()
ntfn1 := chainntnfs.ConfNtfn{
TxID: &tx1Hash,
@ -45,27 +48,53 @@ func TestTxConfFutureDispatch(t *testing.T) {
}
txConfNotifier.Register(&ntfn2, nil)
// We should not receive any notifications from both transactions
// since they have not been included in a block yet.
select {
case <-ntfn1.Event.Updates:
t.Fatal("Received unexpected confirmation update for tx1")
case txConf := <-ntfn1.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx1: %v", txConf)
default:
}
select {
case <-ntfn2.Event.Updates:
t.Fatal("Received unexpected confirmation update for tx2")
case txConf := <-ntfn2.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx2: %v", txConf)
default:
}
// Include the transactions in a block and add it to the TxConfNotifier.
// This should confirm tx1, but not tx2.
block1 := btcutil.NewBlock(&wire.MsgBlock{
Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3},
})
err := txConfNotifier.ConnectTip(block1.Hash(), 11, block1.Transactions())
err := txConfNotifier.ConnectTip(
block1.Hash(), 11, block1.Transactions(),
)
if err != nil {
t.Fatalf("Failed to connect block: %v", err)
}
// We should only receive one update for tx1 since it only requires
// one confirmation and it already met it.
select {
case numConfsLeft := <-ntfn1.Event.Updates:
const expected = 0
if numConfsLeft != expected {
t.Fatalf("Received incorrect confirmation update: tx1 "+
"expected %d confirmations left, got %d",
expected, numConfsLeft)
}
default:
t.Fatal("Expected confirmation update for tx1")
}
// A confirmation notification for this tranaction should be dispatched,
// as it only required one confirmation.
select {
case txConf := <-ntfn1.Event.Confirmed:
expectedConf := chainntnfs.TxConfirmation{
@ -78,12 +107,30 @@ func TestTxConfFutureDispatch(t *testing.T) {
t.Fatalf("Expected confirmation for tx1")
}
// We should only receive one update for tx2 since it only has one
// confirmation so far and it requires two.
select {
case numConfsLeft := <-ntfn2.Event.Updates:
const expected = 1
if numConfsLeft != expected {
t.Fatalf("Received incorrect confirmation update: tx2 "+
"expected %d confirmations left, got %d",
expected, numConfsLeft)
}
default:
t.Fatal("Expected confirmation update for tx2")
}
// A confirmation notification for tx2 should not be dispatched yet, as
// it requires one more confirmation.
select {
case txConf := <-ntfn2.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx2: %v", txConf)
default:
}
// Create a new block and add it to the TxConfNotifier at the next
// height. This should confirm tx2.
block2 := btcutil.NewBlock(&wire.MsgBlock{
Transactions: []*wire.MsgTx{&tx3},
})
@ -93,12 +140,32 @@ func TestTxConfFutureDispatch(t *testing.T) {
t.Fatalf("Failed to connect block: %v", err)
}
// We should not receive any event notifications for tx1 since it has
// already been confirmed.
select {
case <-ntfn1.Event.Updates:
t.Fatal("Received unexpected confirmation update for tx1")
case txConf := <-ntfn1.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx1: %v", txConf)
default:
}
// We should only receive one update since the last at the new height,
// indicating how many confirmations are still left.
select {
case numConfsLeft := <-ntfn2.Event.Updates:
const expected = 0
if numConfsLeft != expected {
t.Fatalf("Received incorrect confirmation update: tx2 "+
"expected %d confirmations left, got %d",
expected, numConfsLeft)
}
default:
t.Fatal("Expected confirmation update for tx2")
}
// A confirmation notification for tx2 should be dispatched, since it
// now meets its required number of confirmations.
select {
case txConf := <-ntfn2.Event.Confirmed:
expectedConf := chainntnfs.TxConfirmation{
@ -131,6 +198,8 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
// Create the test transactions at a height before the TxConfNotifier's
// starting height so that they are confirmed once registering them.
tx1Hash := tx1.TxHash()
ntfn1 := chainntnfs.ConfNtfn{
TxID: &tx1Hash,
@ -157,6 +226,22 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
}
txConfNotifier.Register(&ntfn2, &txConf2)
// We should only receive one update for tx1 since it only requires
// one confirmation and it already met it.
select {
case numConfsLeft := <-ntfn1.Event.Updates:
const expected = 0
if numConfsLeft != expected {
t.Fatalf("Received incorrect confirmation update: tx1 "+
"expected %d confirmations left, got %d",
expected, numConfsLeft)
}
default:
t.Fatal("Expected confirmation update for tx1")
}
// A confirmation notification for tx1 should be dispatched, as it met
// its required number of confirmations.
select {
case txConf := <-ntfn1.Event.Confirmed:
assertEqualTxConf(t, txConf, &txConf1)
@ -164,12 +249,30 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
t.Fatalf("Expected confirmation for tx1")
}
// We should only receive one update indicating how many confirmations
// are left for the transaction to be confirmed.
select {
case numConfsLeft := <-ntfn2.Event.Updates:
const expected = 1
if numConfsLeft != expected {
t.Fatalf("Received incorrect confirmation update: tx2 "+
"expected %d confirmations left, got %d",
expected, numConfsLeft)
}
default:
t.Fatal("Expected confirmation update for tx2")
}
// A confirmation notification for tx2 should not be dispatched yet, as
// it requires one more confirmation.
select {
case txConf := <-ntfn2.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx2: %v", txConf)
default:
}
// Create a new block and add it to the TxConfNotifier at the next
// height. This should confirm tx2.
block := btcutil.NewBlock(&wire.MsgBlock{
Transactions: []*wire.MsgTx{&tx3},
})
@ -179,12 +282,32 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
t.Fatalf("Failed to connect block: %v", err)
}
// We should not receive any event notifications for tx1 since it has
// already been confirmed.
select {
case <-ntfn1.Event.Updates:
t.Fatal("Received unexpected confirmation update for tx1")
case txConf := <-ntfn1.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx1: %v", txConf)
default:
}
// We should only receive one update for tx2 since the last one,
// indicating how many confirmations are still left.
select {
case numConfsLeft := <-ntfn2.Event.Updates:
const expected = 0
if numConfsLeft != expected {
t.Fatalf("Received incorrect confirmation update: tx2 "+
"expected %d confirmations left, got %d",
expected, numConfsLeft)
}
default:
t.Fatal("Expected confirmation update for tx2")
}
// A confirmation notification for tx2 should be dispatched, as it met
// its required number of confirmations.
select {
case txConf := <-ntfn2.Event.Confirmed:
assertEqualTxConf(t, txConf, &txConf2)
@ -244,7 +367,11 @@ func TestTxConfChainReorg(t *testing.T) {
block1 := btcutil.NewBlock(&wire.MsgBlock{
Transactions: []*wire.MsgTx{&tx1},
})
err := txConfNotifier.ConnectTip(nil, 9, block1.Transactions())
err := txConfNotifier.ConnectTip(nil, 8, block1.Transactions())
if err != nil {
t.Fatalf("Failed to connect block: %v", err)
}
err = txConfNotifier.ConnectTip(nil, 9, nil)
if err != nil {
t.Fatalf("Failed to connect block: %v", err)
}
@ -257,25 +384,57 @@ func TestTxConfChainReorg(t *testing.T) {
t.Fatalf("Failed to connect block: %v", err)
}
// We should receive two updates for tx1 since it requires two
// confirmations and it has already met them.
for i := 0; i < 2; i++ {
select {
case <-ntfn1.Event.Updates:
default:
t.Fatal("Expected confirmation update for tx1")
}
}
// A confirmation notification for tx1 should be dispatched, as it met
// its required number of confirmations.
select {
case <-ntfn1.Event.Confirmed:
default:
t.Fatalf("Expected confirmation for tx1")
}
// We should only receive one update for tx2 since it only requires
// one confirmation and it already met it.
select {
case <-ntfn2.Event.Updates:
default:
t.Fatal("Expected confirmation update for tx2")
}
// A confirmation notification for tx2 should be dispatched, as it met
// its required number of confirmations.
select {
case <-ntfn2.Event.Confirmed:
default:
t.Fatalf("Expected confirmation for tx2")
}
// We should only receive one update for tx3 since it only has one
// confirmation so far and it requires two.
select {
case <-ntfn3.Event.Updates:
default:
t.Fatal("Expected confirmation update for tx3")
}
// A confirmation notification for tx3 should not be dispatched yet, as
// it requires one more confirmation.
select {
case txConf := <-ntfn3.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx3: %v", txConf)
default:
}
// Block that tx2 and tx3 were included in is disconnected and two next
// The block that included tx2 and tx3 is disconnected and two next
// blocks without them are connected.
err = txConfNotifier.DisconnectTip(10)
if err != nil {
@ -302,19 +461,28 @@ func TestTxConfChainReorg(t *testing.T) {
t.Fatalf("Expected negative conf notification for tx1")
}
// We should not receive any event notifications from all of the
// transactions because tx1 has already been confirmed and tx2 and tx3
// have not been included in the chain since the reorg.
select {
case <-ntfn1.Event.Updates:
t.Fatal("Received unexpected confirmation update for tx1")
case txConf := <-ntfn1.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx1: %v", txConf)
default:
}
select {
case <-ntfn2.Event.Updates:
t.Fatal("Received unexpected confirmation update for tx2")
case txConf := <-ntfn2.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx2: %v", txConf)
default:
}
select {
case <-ntfn3.Event.Updates:
t.Fatal("Received unexpected confirmation update for tx3")
case txConf := <-ntfn3.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx3: %v", txConf)
default:
@ -336,7 +504,22 @@ func TestTxConfChainReorg(t *testing.T) {
t.Fatalf("Failed to connect block: %v", err)
}
// Both transactions should be newly confirmed.
// We should only receive one update for tx2 since it only requires
// one confirmation and it already met it.
select {
case numConfsLeft := <-ntfn2.Event.Updates:
const expected = 0
if numConfsLeft != expected {
t.Fatalf("Received incorrect confirmation update: tx2 "+
"expected %d confirmations left, got %d",
expected, numConfsLeft)
}
default:
t.Fatal("Expected confirmation update for tx2")
}
// A confirmation notification for tx2 should be dispatched, as it met
// its required number of confirmations.
select {
case txConf := <-ntfn2.Event.Confirmed:
expectedConf := chainntnfs.TxConfirmation{
@ -349,6 +532,24 @@ func TestTxConfChainReorg(t *testing.T) {
t.Fatalf("Expected confirmation for tx2")
}
// We should receive two updates for tx3 since it requires two
// confirmations and it has already met them.
for i := uint32(1); i <= 2; i++ {
select {
case numConfsLeft := <-ntfn3.Event.Updates:
expected := tx3NumConfs - i
if numConfsLeft != expected {
t.Fatalf("Received incorrect confirmation update: tx3 "+
"expected %d confirmations left, got %d",
expected, numConfsLeft)
}
default:
t.Fatal("Expected confirmation update for tx2")
}
}
// A confirmation notification for tx3 should be dispatched, as it met
// its required number of confirmations.
select {
case txConf := <-ntfn3.Event.Confirmed:
expectedConf := chainntnfs.TxConfirmation{
@ -365,18 +566,20 @@ func TestTxConfChainReorg(t *testing.T) {
func TestTxConfTearDown(t *testing.T) {
t.Parallel()
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
var (
tx1 = wire.MsgTx{Version: 1}
tx2 = wire.MsgTx{Version: 2}
)
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
// Create the test transactions and register them with the
// TxConfNotifier to receive notifications.
tx1Hash := tx1.TxHash()
ntfn1 := chainntnfs.ConfNtfn{
TxID: &tx1Hash,
NumConfirmations: 1,
Event: chainntnfs.NewConfirmationEvent(),
Event: chainntnfs.NewConfirmationEvent(1),
}
txConfNotifier.Register(&ntfn1, nil)
@ -384,10 +587,12 @@ func TestTxConfTearDown(t *testing.T) {
ntfn2 := chainntnfs.ConfNtfn{
TxID: &tx2Hash,
NumConfirmations: 2,
Event: chainntnfs.NewConfirmationEvent(),
Event: chainntnfs.NewConfirmationEvent(2),
}
txConfNotifier.Register(&ntfn2, nil)
// Include the transactions in a block and add it to the TxConfNotifier.
// This should confirm tx1, but not tx2.
block := btcutil.NewBlock(&wire.MsgBlock{
Transactions: []*wire.MsgTx{&tx1, &tx2},
})
@ -397,35 +602,62 @@ func TestTxConfTearDown(t *testing.T) {
t.Fatalf("Failed to connect block: %v", err)
}
// We do not care about the correctness of the notifications since they
// are tested in other methods, but we'll still attempt to retrieve them
// for the sake of not being able to later once the notification
// channels are closed.
select {
case <-ntfn1.Event.Updates:
default:
t.Fatal("Expected confirmation update for tx1")
}
select {
case <-ntfn1.Event.Confirmed:
default:
t.Fatalf("Expected confirmation for tx1")
}
select {
case <-ntfn2.Event.Updates:
default:
t.Fatal("Expected confirmation update for tx2")
}
select {
case txConf := <-ntfn2.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx2: %v", txConf)
default:
}
// Confirmed channels should be closed for notifications that have not been
// dispatched yet.
// The notification channels should be closed for notifications that
// have not been dispatched yet, so we should not expect to receive any
// more updates.
txConfNotifier.TearDown()
// tx1 should not receive any more updates because it has already been
// confirmed and the TxConfNotifier has been shut down.
select {
case <-ntfn1.Event.Updates:
t.Fatal("Received unexpected confirmation update for tx1")
case txConf := <-ntfn1.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx1: %v", txConf)
default:
}
// tx2 should not receive any more updates after the notifications
// channels have been closed and the TxConfNotifier shut down.
select {
case _, more := <-ntfn2.Event.Updates:
if more {
t.Fatal("Expected closed Updates channel for tx2")
}
case _, more := <-ntfn2.Event.Confirmed:
if more {
t.Fatalf("Expected channel close for tx2")
t.Fatalf("Expected closed Confirmed channel for tx2")
}
default:
t.Fatalf("Expected channel close for tx2")
t.Fatalf("Expected closed notification channels for tx2")
}
}