chainntnfs/btcdnotify: support registration for script confirmations

In this commit, we extend the BtcdNotifier to support registering
scripts for confirmation notifications. Once the script has been
detected as confirmed within the chain, a confirmation notification will
be dispatched to through the Confirmed channel of the ConfirmationEvent
returned upon registration.

For scripts that have confirmed in the past, the `historicalConfDetails`
method has been modified to skip the txindex and go straight to scanning
the chain manually if confirmation request is for a script. When
scanning the chain, we'll determine whether the script has been
confirmed by locating the script in an output of a confirmed
transaction.

For scripts that have yet to confirm, they will be properly tracked
within the TxNotifier.
This commit is contained in:
Wilmer Paulino 2018-12-06 21:14:16 -08:00
parent 72cc843e92
commit 1323c92947
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 131 additions and 51 deletions

@ -294,10 +294,10 @@ out:
case registerMsg := <-b.notificationRegistry: case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) { switch msg := registerMsg.(type) {
case *chainntnfs.HistoricalConfDispatch: case *chainntnfs.HistoricalConfDispatch:
// Look up whether the transaction is already // Look up whether the transaction/output script
// included in the active chain. We'll do this // has already confirmed in the active chain.
// in a goroutine to prevent blocking // We'll do this in a goroutine to prevent
// potentially long rescans. // blocking potentially long rescans.
// //
// TODO(wilmer): add retry logic if rescan fails? // TODO(wilmer): add retry logic if rescan fails?
b.wg.Add(1) b.wg.Add(1)
@ -305,7 +305,8 @@ out:
defer b.wg.Done() defer b.wg.Done()
confDetails, _, err := b.historicalConfDetails( confDetails, _, err := b.historicalConfDetails(
msg.TxID, msg.StartHeight, msg.EndHeight, msg.ConfRequest,
msg.StartHeight, msg.EndHeight,
) )
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
@ -320,7 +321,7 @@ out:
// cache at tip, since any pending // cache at tip, since any pending
// rescans have now completed. // rescans have now completed.
err = b.txNotifier.UpdateConfDetails( err = b.txNotifier.UpdateConfDetails(
*msg.TxID, confDetails, msg.ConfRequest, confDetails,
) )
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
@ -446,15 +447,28 @@ out:
b.wg.Done() b.wg.Done()
} }
// historicalConfDetails looks up whether a transaction is already included in a // historicalConfDetails looks up whether a confirmation request (txid/output
// block in the active chain and, if so, returns details about the confirmation. // script) has already been included in a block in the active chain and, if so,
func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash, // returns details about said block.
func (b *BtcdNotifier) historicalConfDetails(confRequest chainntnfs.ConfRequest,
startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, startHeight, endHeight uint32) (*chainntnfs.TxConfirmation,
chainntnfs.TxConfStatus, error) { chainntnfs.TxConfStatus, error) {
// If a txid was not provided, then we should dispatch upon seeing the
// script on-chain, so we'll short-circuit straight to scanning manually
// as there doesn't exist a script index to query.
if confRequest.TxID == chainntnfs.ZeroHash {
return b.confDetailsManually(
confRequest, startHeight, endHeight,
)
}
// Otherwise, we'll dispatch upon seeing a transaction on-chain with the
// given hash.
//
// We'll first attempt to retrieve the transaction using the node's // We'll first attempt to retrieve the transaction using the node's
// txindex. // txindex.
txConf, txStatus, err := b.confDetailsFromTxIndex(txid) txConf, txStatus, err := b.confDetailsFromTxIndex(&confRequest.TxID)
// We'll then check the status of the transaction lookup returned to // We'll then check the status of the transaction lookup returned to
// determine whether we should proceed with any fallback methods. // determine whether we should proceed with any fallback methods.
@ -463,9 +477,13 @@ func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash,
// We failed querying the index for the transaction, fall back to // We failed querying the index for the transaction, fall back to
// scanning manually. // scanning manually.
case err != nil: case err != nil:
chainntnfs.Log.Debugf("Failed getting conf details from "+ chainntnfs.Log.Debugf("Unable to determine confirmation of %v "+
"index (%v), scanning manually", err) "through the backend's txindex (%v), scanning manually",
return b.confDetailsManually(txid, startHeight, endHeight) confRequest.TxID, err)
return b.confDetailsManually(
confRequest, startHeight, endHeight,
)
// The transaction was found within the node's mempool. // The transaction was found within the node's mempool.
case txStatus == chainntnfs.TxFoundMempool: case txStatus == chainntnfs.TxFoundMempool:
@ -559,17 +577,14 @@ func (b *BtcdNotifier) confDetailsFromTxIndex(txid *chainhash.Hash,
blockHash) blockHash)
} }
// confDetailsManually looks up whether a transaction is already included in a // confDetailsManually looks up whether a transaction/output script has already
// block in the active chain by scanning the chain's blocks, starting from the // been included in a block in the active chain by scanning the chain's blocks
// earliest height the transaction could have been included in, to the current // within the given range. If the transaction/output script is found, its
// height in the chain. If the transaction is found, its confirmation details // confirmation details are returned. Otherwise, nil is returned.
// are returned. Otherwise, nil is returned. func (b *BtcdNotifier) confDetailsManually(confRequest chainntnfs.ConfRequest,
func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, startHeight, startHeight, endHeight uint32) (*chainntnfs.TxConfirmation,
endHeight uint32) (*chainntnfs.TxConfirmation,
chainntnfs.TxConfStatus, error) { chainntnfs.TxConfStatus, error) {
targetTxidStr := txid.String()
// Begin scanning blocks at every height to determine where the // Begin scanning blocks at every height to determine where the
// transaction was included in. // transaction was included in.
for height := endHeight; height >= startHeight && height > 0; height-- { for height := endHeight; height >= startHeight && height > 0; height-- {
@ -590,24 +605,26 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, startHeight,
} }
// TODO: fetch the neutrino filters instead. // TODO: fetch the neutrino filters instead.
block, err := b.chainConn.GetBlockVerbose(blockHash) block, err := b.chainConn.GetBlock(blockHash)
if err != nil { if err != nil {
return nil, chainntnfs.TxNotFoundManually, return nil, chainntnfs.TxNotFoundManually,
fmt.Errorf("unable to get block with hash "+ fmt.Errorf("unable to get block with hash "+
"%v: %v", blockHash, err) "%v: %v", blockHash, err)
} }
for txIndex, txHash := range block.Tx { // For every transaction in the block, check which one matches
// If we're able to find the transaction in this block, // our request. If we find one that does, we can dispatch its
// return its confirmation details. // confirmation details.
if txHash == targetTxidStr { for txIndex, tx := range block.Transactions {
details := &chainntnfs.TxConfirmation{ if !confRequest.MatchesTx(tx) {
continue
}
return &chainntnfs.TxConfirmation{
BlockHash: blockHash, BlockHash: blockHash,
BlockHeight: height, BlockHeight: height,
TxIndex: uint32(txIndex), TxIndex: uint32(txIndex),
} }, chainntnfs.TxFoundManually, nil
return details, chainntnfs.TxFoundManually, nil
}
} }
} }
@ -820,30 +837,41 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return ntfn.Event, nil return ntfn.Event, nil
} }
// RegisterConfirmationsNtfn registers a notification with BtcdNotifier // RegisterConfirmationsNtfn registers an intent to be notified once the target
// which will be triggered once the txid reaches numConfs number of // txid/output script has reached numConfs confirmations on-chain. When
// confirmations. // intending to be notified of the confirmation of an output script, a nil txid
func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, // must be used. The heightHint should represent the earliest height at which
// the txid/output script could have been included in the chain.
//
// Progress on the number of confirmations left can be read from the 'Updates'
// channel. Once it has reached all of its confirmations, a notification will be
// sent across the 'Confirmed' channel.
func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
pkScript []byte,
numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) {
// Construct a notification request for the transaction and send it to // Construct a notification request for the transaction and send it to
// the main event loop. // the main event loop.
confRequest, err := chainntnfs.NewConfRequest(txid, pkScript)
if err != nil {
return nil, err
}
ntfn := &chainntnfs.ConfNtfn{ ntfn := &chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&b.confClientCounter, 1), ConfID: atomic.AddUint64(&b.confClientCounter, 1),
TxID: txid, ConfRequest: confRequest,
NumConfirmations: numConfs, NumConfirmations: numConfs,
Event: chainntnfs.NewConfirmationEvent(numConfs), Event: chainntnfs.NewConfirmationEvent(numConfs),
HeightHint: heightHint, HeightHint: heightHint,
} }
chainntnfs.Log.Infof("New confirmation subscription: "+ chainntnfs.Log.Infof("New confirmation subscription: %v, num_confs=%v ",
"txid=%v, numconfs=%v", txid, numConfs) confRequest, numConfs)
// Register the conf notification with the TxNotifier. A non-nil value // Register the conf notification with the TxNotifier. A non-nil value
// for `dispatch` will be returned if we are required to perform a // for `dispatch` will be returned if we are required to perform a
// manual scan for the confirmation. Otherwise the notifier will begin // manual scan for the confirmation. Otherwise the notifier will begin
// watching at tip for the transaction to confirm. // watching at tip for the transaction to confirm.
dispatch, err := b.txNotifier.RegisterConf(ntfn) dispatch, _, err := b.txNotifier.RegisterConf(ntfn)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -3,6 +3,7 @@
package btcdnotify package btcdnotify
import ( import (
"bytes"
"io/ioutil" "io/ioutil"
"testing" "testing"
@ -12,6 +13,20 @@ import (
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
) )
var (
testScript = []byte{
// OP_HASH160
0xA9,
// OP_DATA_20
0x14,
// <20-byte hash>
0xec, 0x6f, 0x7a, 0x5a, 0xa8, 0xf2, 0xb1, 0x0c, 0xa5, 0x15,
0x04, 0x52, 0x3a, 0x60, 0xd4, 0x03, 0x06, 0xf6, 0x96, 0xcd,
// OP_EQUAL
0x87,
}
)
func initHintCache(t *testing.T) *chainntnfs.HeightHintCache { func initHintCache(t *testing.T) *chainntnfs.HeightHintCache {
t.Helper() t.Helper()
@ -64,8 +79,13 @@ func TestHistoricalConfDetailsTxIndex(t *testing.T) {
// A transaction unknown to the node should not be found within the // A transaction unknown to the node should not be found within the
// txindex even if it is enabled, so we should not proceed with any // txindex even if it is enabled, so we should not proceed with any
// fallback methods. // fallback methods.
var zeroHash chainhash.Hash var unknownHash chainhash.Hash
_, txStatus, err := notifier.historicalConfDetails(&zeroHash, 0, 0) copy(unknownHash[:], bytes.Repeat([]byte{0x10}, 32))
unknownConfReq, err := chainntnfs.NewConfRequest(&unknownHash, testScript)
if err != nil {
t.Fatalf("unable to create conf request: %v", err)
}
_, txStatus, err := notifier.historicalConfDetails(unknownConfReq, 0, 0)
if err != nil { if err != nil {
t.Fatalf("unable to retrieve historical conf details: %v", err) t.Fatalf("unable to retrieve historical conf details: %v", err)
} }
@ -80,16 +100,20 @@ func TestHistoricalConfDetailsTxIndex(t *testing.T) {
// Now, we'll create a test transaction and attempt to retrieve its // Now, we'll create a test transaction and attempt to retrieve its
// confirmation details. // confirmation details.
txid, _, err := chainntnfs.GetTestTxidAndScript(harness) txid, pkScript, err := chainntnfs.GetTestTxidAndScript(harness)
if err != nil { if err != nil {
t.Fatalf("unable to create tx: %v", err) t.Fatalf("unable to create tx: %v", err)
} }
if err := chainntnfs.WaitForMempoolTx(harness, txid); err != nil { if err := chainntnfs.WaitForMempoolTx(harness, txid); err != nil {
t.Fatalf("unable to find tx in the mempool: %v", err) t.Fatalf("unable to find tx in the mempool: %v", err)
} }
confReq, err := chainntnfs.NewConfRequest(txid, pkScript)
if err != nil {
t.Fatalf("unable to create conf request: %v", err)
}
// The transaction should be found in the mempool at this point. // The transaction should be found in the mempool at this point.
_, txStatus, err = notifier.historicalConfDetails(txid, 0, 0) _, txStatus, err = notifier.historicalConfDetails(confReq, 0, 0)
if err != nil { if err != nil {
t.Fatalf("unable to retrieve historical conf details: %v", err) t.Fatalf("unable to retrieve historical conf details: %v", err)
} }
@ -109,7 +133,7 @@ func TestHistoricalConfDetailsTxIndex(t *testing.T) {
t.Fatalf("unable to generate block: %v", err) t.Fatalf("unable to generate block: %v", err)
} }
_, txStatus, err = notifier.historicalConfDetails(txid, 0, 0) _, txStatus, err = notifier.historicalConfDetails(confReq, 0, 0)
if err != nil { if err != nil {
t.Fatalf("unable to retrieve historical conf details: %v", err) t.Fatalf("unable to retrieve historical conf details: %v", err)
} }
@ -139,8 +163,13 @@ func TestHistoricalConfDetailsNoTxIndex(t *testing.T) {
// Since the node has its txindex disabled, we fall back to scanning the // Since the node has its txindex disabled, we fall back to scanning the
// chain manually. A transaction unknown to the network should not be // chain manually. A transaction unknown to the network should not be
// found. // found.
var zeroHash chainhash.Hash var unknownHash chainhash.Hash
_, txStatus, err := notifier.historicalConfDetails(&zeroHash, 0, 0) copy(unknownHash[:], bytes.Repeat([]byte{0x10}, 32))
unknownConfReq, err := chainntnfs.NewConfRequest(&unknownHash, testScript)
if err != nil {
t.Fatalf("unable to create conf request: %v", err)
}
_, txStatus, err := notifier.historicalConfDetails(unknownConfReq, 0, 0)
if err != nil { if err != nil {
t.Fatalf("unable to retrieve historical conf details: %v", err) t.Fatalf("unable to retrieve historical conf details: %v", err)
} }
@ -161,15 +190,19 @@ func TestHistoricalConfDetailsNoTxIndex(t *testing.T) {
t.Fatalf("unable to retrieve current height: %v", err) t.Fatalf("unable to retrieve current height: %v", err)
} }
txid, _, err := chainntnfs.GetTestTxidAndScript(harness) txid, pkScript, err := chainntnfs.GetTestTxidAndScript(harness)
if err != nil { if err != nil {
t.Fatalf("unable to create tx: %v", err) t.Fatalf("unable to create tx: %v", err)
} }
if err := chainntnfs.WaitForMempoolTx(harness, txid); err != nil { if err := chainntnfs.WaitForMempoolTx(harness, txid); err != nil {
t.Fatalf("unable to find tx in the mempool: %v", err) t.Fatalf("unable to find tx in the mempool: %v", err)
} }
confReq, err := chainntnfs.NewConfRequest(txid, pkScript)
if err != nil {
t.Fatalf("unable to create conf request: %v", err)
}
_, txStatus, err = notifier.historicalConfDetails(txid, 0, 0) _, txStatus, err = notifier.historicalConfDetails(confReq, 0, 0)
if err != nil { if err != nil {
t.Fatalf("unable to retrieve historical conf details: %v", err) t.Fatalf("unable to retrieve historical conf details: %v", err)
} }
@ -188,7 +221,7 @@ func TestHistoricalConfDetailsNoTxIndex(t *testing.T) {
} }
_, txStatus, err = notifier.historicalConfDetails( _, txStatus, err = notifier.historicalConfDetails(
txid, uint32(currentHeight), uint32(currentHeight)+1, confReq, uint32(currentHeight), uint32(currentHeight)+1,
) )
if err != nil { if err != nil {
t.Fatalf("unable to retrieve historical conf details: %v", err) t.Fatalf("unable to retrieve historical conf details: %v", err)

@ -187,6 +187,25 @@ func (r ConfRequest) ConfHintKey() ([]byte, error) {
return txid.Bytes(), nil return txid.Bytes(), nil
} }
// MatchesTx determines whether the given transaction satisfies the confirmation
// request. If the confirmation request is for a script, then we'll check all of
// the outputs of the transaction to determine if it matches. Otherwise, we'll
// match on the txid.
func (r ConfRequest) MatchesTx(tx *wire.MsgTx) bool {
if r.TxID != ZeroHash {
return r.TxID == tx.TxHash()
}
pkScript := r.PkScript.Script()
for _, txOut := range tx.TxOut {
if bytes.Equal(txOut.PkScript, pkScript) {
return true
}
}
return false
}
// ConfNtfn represents a notifier client's request to receive a notification // ConfNtfn represents a notifier client's request to receive a notification
// once the target transaction/ouput script gets sufficient confirmations. The // once the target transaction/ouput script gets sufficient confirmations. The
// client is asynchronously notified via the ConfirmationEvent channels. // client is asynchronously notified via the ConfirmationEvent channels.