chainntnfs/txconfnotifier: make confirmation notifcation registration async

In this commit, we modify our TxConfNotifier struct to allow handling
notification registrations asynchronously. The Register method has been
refactored into two: Register and UpdateConfDetails. In the case that a
transaction we registered for notifications on has already confirmed,
we'll need to determine its confirmation details on our own. Once done,
this can be provided within UpdateConfDetails.

This change will pave down the road for our different chain notifiers to
handle potentially long rescans asynchronously to prevent blocking the
caller.
This commit is contained in:
Wilmer Paulino 2018-07-26 21:31:32 -07:00
parent c43506dee9
commit 867d8524bf
2 changed files with 158 additions and 64 deletions

@ -3,6 +3,7 @@ package chainntnfs
import (
"errors"
"fmt"
"sync"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcutil"
@ -91,6 +92,8 @@ type TxConfNotifier struct {
// quit is closed in order to signal that the notifier is gracefully
// exiting.
quit chan struct{}
sync.Mutex
}
// NewTxConfNotifier creates a TxConfNotifier. The current height of the
@ -108,27 +111,83 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif
// Register handles a new notification request. The client will be notified when
// the transaction gets a sufficient number of confirmations on the blockchain.
// If the transaction has already been included in a block on the chain, the
// confirmation details must be given as the txConf argument, otherwise it
// should be nil. If the transaction already has the sufficient number of
// confirmations, this dispatches the notification immediately.
func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error {
//
// NOTE: If the transaction has already been included in a block on the chain,
// the confirmation details must be provided with the UpdateConfDetails method,
// otherwise we will wait for the transaction to confirm even though it already
// has.
func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error {
select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
return ErrTxConfNotifierExiting
default:
}
if txConf == nil || txConf.BlockHeight > tcn.currentHeight {
// Transaction is unconfirmed.
tcn.confNotifications[*ntfn.TxID] =
append(tcn.confNotifications[*ntfn.TxID], ntfn)
tcn.Lock()
defer tcn.Unlock()
ntfns, ok := tcn.confNotifications[*ntfn.TxID]
if !ok {
ntfns = make(map[uint64]*ConfNtfn)
tcn.confNotifications[*ntfn.TxID] = ntfns
}
ntfns[ntfn.ConfID] = ntfn
return nil
}
// UpdateConfDetails attempts to update the confirmation details for an active
// notification within the notifier. This should only be used in the case of a
// transaction that has confirmed before the notifier's current height.
//
// NOTE: The notification should be registered first to ensure notifications are
// dispatched correctly.
func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash,
clientID uint64, details *TxConfirmation) error {
select {
case <-tcn.quit:
return ErrTxConfNotifierExiting
default:
}
// Ensure we hold the lock throughout handling the notification to
// prevent the notifier from advancing its height underneath us.
tcn.Lock()
defer tcn.Unlock()
// First, we'll determine whether we have an active notification for
// this transaction with the given ID.
ntfns, ok := tcn.confNotifications[txid]
if !ok {
return fmt.Errorf("no notifications found for txid %v", txid)
}
ntfn, ok := ntfns[clientID]
if !ok {
return fmt.Errorf("no notification found with ID %v", clientID)
}
// If the notification has already recognized that the transaction
// confirmed, there's nothing left for us to do.
if ntfn.details != nil {
return nil
}
// If the transaction already has the required confirmations, we'll
// dispatch the notification immediately.
confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1
// The notifier has yet to reach the height at which the transaction was
// included in a block, so we should defer until handling it then within
// ConnectTip.
if details == nil || details.BlockHeight > tcn.currentHeight {
return nil
}
ntfn.details = details
// Now, we'll examine whether the transaction of this notification
// request has reched its required number of confirmations. If it has,
// we'll disaptch a confirmation notification to the caller.
confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
if confHeight <= tcn.currentHeight {
Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID)
@ -136,21 +195,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro
// We'll send a 0 value to the Updates channel, indicating that
// the transaction has already been confirmed.
select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
case ntfn.Event.Updates <- 0:
case <-tcn.quit:
return ErrTxConfNotifierExiting
}
select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
case ntfn.Event.Confirmed <- txConf:
case ntfn.Event.Confirmed <- details:
ntfn.dispatched = true
case <-tcn.quit:
return ErrTxConfNotifierExiting
}
} else {
// Otherwise, we'll record the transaction along with the height
// at which we should notify the client.
ntfn.details = txConf
// Otherwise, we'll keep track of the notification request by
// the height at which we should dispatch the confirmation
// notification.
ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
if !exists {
ntfnSet = make(map[*ConfNtfn]struct{})
@ -164,22 +223,19 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro
select {
case ntfn.Event.Updates <- numConfsLeft:
case <-tcn.quit:
return errors.New("TxConfNotifier is exiting")
return ErrTxConfNotifierExiting
}
}
// As a final check, we'll also watch the transaction if it's still
// possible for it to get reorganized out of the chain.
if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight {
tcn.confNotifications[*ntfn.TxID] =
append(tcn.confNotifications[*ntfn.TxID], ntfn)
txSet, exists := tcn.txsByInitialHeight[txConf.BlockHeight]
// possible for it to get reorged out of the chain.
if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight {
txSet, exists := tcn.txsByInitialHeight[details.BlockHeight]
if !exists {
txSet = make(map[chainhash.Hash]struct{})
tcn.txsByInitialHeight[txConf.BlockHeight] = txSet
tcn.txsByInitialHeight[details.BlockHeight] = txSet
}
txSet[*ntfn.TxID] = struct{}{}
txSet[txid] = struct{}{}
}
return nil
@ -199,6 +255,9 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
default:
}
tcn.Lock()
defer tcn.Unlock()
if blockHeight != tcn.currentHeight+1 {
return fmt.Errorf("Received blocks out of order: "+
"current height=%d, new height=%d",
@ -244,8 +303,10 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
for _, txHashes := range tcn.txsByInitialHeight {
for txHash := range txHashes {
for _, ntfn := range tcn.confNotifications[txHash] {
// If the transaction still hasn't been included
// in a block, we'll skip it.
// If the notification hasn't learned about the
// confirmation of its transaction yet (in the
// case of historical confirmations), we'll skip
// it.
if ntfn.details == nil {
continue
}
@ -308,10 +369,13 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
return ErrTxConfNotifierExiting
default:
}
tcn.Lock()
defer tcn.Unlock()
if blockHeight != tcn.currentHeight {
return fmt.Errorf("Received blocks out of order: "+
"current height=%d, disconnected height=%d",
@ -394,6 +458,9 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
// This closes the event channels of all registered notifications that have
// not been dispatched yet.
func (tcn *TxConfNotifier) TearDown() {
tcn.Lock()
defer tcn.Unlock()
close(tcn.quit)
for _, ntfns := range tcn.confNotifications {

@ -3,10 +3,10 @@ package chainntnfs_test
import (
"testing"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/chainntnfs"
)
var zeroHash chainhash.Hash
@ -38,7 +38,9 @@ func TestTxConfFutureDispatch(t *testing.T) {
NumConfirmations: tx1NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs),
}
txConfNotifier.Register(&ntfn1, nil)
if err := txConfNotifier.Register(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
tx2Hash := tx2.TxHash()
ntfn2 := chainntnfs.ConfNtfn{
@ -46,7 +48,9 @@ func TestTxConfFutureDispatch(t *testing.T) {
NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
}
txConfNotifier.Register(&ntfn2, nil)
if err := txConfNotifier.Register(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// We should not receive any notifications from both transactions
// since they have not been included in a block yet.
@ -202,32 +206,37 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
// starting height so that they are confirmed once registering them.
tx1Hash := tx1.TxHash()
ntfn1 := chainntnfs.ConfNtfn{
ConfID: 0,
TxID: &tx1Hash,
NumConfirmations: tx1NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs),
}
if err := txConfNotifier.Register(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
tx2Hash := tx2.TxHash()
ntfn2 := chainntnfs.ConfNtfn{
ConfID: 1,
TxID: &tx2Hash,
NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
}
if err := txConfNotifier.Register(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Update tx1 with its confirmation details. We should only receive one
// update since it only requires one confirmation and it already met it.
txConf1 := chainntnfs.TxConfirmation{
BlockHash: &zeroHash,
BlockHeight: 9,
TxIndex: 1,
}
txConfNotifier.Register(&ntfn1, &txConf1)
tx2Hash := tx2.TxHash()
txConf2 := chainntnfs.TxConfirmation{
BlockHash: &zeroHash,
BlockHeight: 9,
TxIndex: 2,
err := txConfNotifier.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1)
if err != nil {
t.Fatalf("unable to update conf details: %v", err)
}
ntfn2 := chainntnfs.ConfNtfn{
TxID: &tx2Hash,
NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
}
txConfNotifier.Register(&ntfn2, &txConf2)
// We should only receive one update for tx1 since it only requires
// one confirmation and it already met it.
select {
case numConfsLeft := <-ntfn1.Event.Updates:
const expected = 0
@ -240,8 +249,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
t.Fatal("Expected confirmation update for tx1")
}
// A confirmation notification for tx1 should be dispatched, as it met
// its required number of confirmations.
// A confirmation notification for tx1 should also be dispatched.
select {
case txConf := <-ntfn1.Event.Confirmed:
assertEqualTxConf(t, txConf, &txConf1)
@ -249,8 +257,19 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
t.Fatalf("Expected confirmation for tx1")
}
// We should only receive one update indicating how many confirmations
// are left for the transaction to be confirmed.
// Update tx2 with its confirmation details. This should not trigger a
// confirmation notification since it hasn't reached its required number
// of confirmations, but we should receive a confirmation update
// indicating how many confirmation are left.
txConf2 := chainntnfs.TxConfirmation{
BlockHash: &zeroHash,
BlockHeight: 9,
TxIndex: 2,
}
err = txConfNotifier.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2)
if err != nil {
t.Fatalf("unable to update conf details: %v", err)
}
select {
case numConfsLeft := <-ntfn2.Event.Updates:
const expected = 1
@ -263,8 +282,6 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
t.Fatal("Expected confirmation update for tx2")
}
// A confirmation notification for tx2 should not be dispatched yet, as
// it requires one more confirmation.
select {
case txConf := <-ntfn2.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx2: %v", txConf)
@ -277,7 +294,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
Transactions: []*wire.MsgTx{&tx3},
})
err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions())
err = txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions())
if err != nil {
t.Fatalf("Failed to connect block: %v", err)
}
@ -343,7 +360,9 @@ func TestTxConfChainReorg(t *testing.T) {
NumConfirmations: tx1NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs),
}
txConfNotifier.Register(&ntfn1, nil)
if err := txConfNotifier.Register(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Tx 2 will be confirmed in block 10 and requires 1 conf.
tx2Hash := tx2.TxHash()
@ -352,7 +371,9 @@ func TestTxConfChainReorg(t *testing.T) {
NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
}
txConfNotifier.Register(&ntfn2, nil)
if err := txConfNotifier.Register(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Tx 3 will be confirmed in block 10 and requires 2 confs.
tx3Hash := tx3.TxHash()
@ -361,7 +382,9 @@ func TestTxConfChainReorg(t *testing.T) {
NumConfirmations: tx3NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx3NumConfs),
}
txConfNotifier.Register(&ntfn3, nil)
if err := txConfNotifier.Register(&ntfn3); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Sync chain to block 10. Txs 1 & 2 should be confirmed.
block1 := btcutil.NewBlock(&wire.MsgBlock{
@ -581,7 +604,9 @@ func TestTxConfTearDown(t *testing.T) {
NumConfirmations: 1,
Event: chainntnfs.NewConfirmationEvent(1),
}
txConfNotifier.Register(&ntfn1, nil)
if err := txConfNotifier.Register(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
tx2Hash := tx2.TxHash()
ntfn2 := chainntnfs.ConfNtfn{
@ -589,7 +614,9 @@ func TestTxConfTearDown(t *testing.T) {
NumConfirmations: 2,
Event: chainntnfs.NewConfirmationEvent(2),
}
txConfNotifier.Register(&ntfn2, nil)
if err := txConfNotifier.Register(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Include the transactions in a block and add it to the TxConfNotifier.
// This should confirm tx1, but not tx2.