Merge pull request #969 from halseth/chainntnfs-chain-spends

chainntnfs: optionally only notify spends on block inclusion
This commit is contained in:
Olaoluwa Osuntokun 2018-04-05 21:40:38 -07:00 committed by GitHub
commit b7875fce4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 271 additions and 56 deletions

@ -519,7 +519,7 @@ secondLevelCheck:
if !ok {
spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn(
&breachedOutput.outpoint,
breachInfo.breachHeight,
breachInfo.breachHeight, true,
)
if err != nil {
brarLog.Errorf("unable to check for "+

@ -486,7 +486,7 @@ type spendCancel struct {
// outpoint has been detected, the details of the spending event will be sent
// across the 'Spend' channel.
func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
_ uint32) (*chainntnfs.SpendEvent, error) {
_ uint32, _ bool) (*chainntnfs.SpendEvent, error) {
ntfn := &spendNotification{
targetOutpoint: outpoint,

@ -207,6 +207,23 @@ func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t ti
}
}
// filteredBlock represents a new block which has been connected to the main
// chain. The slice of transactions will only be populated if the block
// includes a transaction that confirmed one of our watched txids, or spends
// one of the outputs currently being watched.
// TODO(halseth): this is currently used for complete blocks. Change to use
// onFilteredBlockConnected and onFilteredBlockDisconnected, making it easier
// to unify with the Neutrino implementation.
type filteredBlock struct {
hash chainhash.Hash
height uint32
txns []*btcutil.Tx
// connected is true if this update is a new block and false if it is a
// disconnected block.
connect bool
}
// onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient.
func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) {
// Append this new chain update to the end of the queue of new chain
@ -322,12 +339,15 @@ out:
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
update.blockHeight, update.blockHash)
b.notifyBlockEpochs(update.blockHeight, update.blockHash)
txns := btcutil.NewBlock(rawBlock).Transactions()
err = b.txConfNotifier.ConnectTip(update.blockHash,
uint32(update.blockHeight), txns)
if err != nil {
block := &filteredBlock{
hash: *update.blockHash,
height: uint32(update.blockHeight),
txns: txns,
connect: true,
}
if err := b.handleBlockConnected(block); err != nil {
chainntnfs.Log.Error(err)
}
continue
@ -350,6 +370,8 @@ out:
chainntnfs.Log.Error(err)
}
// NOTE: we currently only use txUpdates for mempool spends. It
// might get removed entirely in the future.
case item := <-b.txUpdates.ChanOut():
newSpend := item.(*txUpdate)
spendingTx := newSpend.tx
@ -381,7 +403,20 @@ out:
spendDetails.SpendingHeight = currentHeight + 1
}
for _, ntfn := range clients {
// Keep spendNotifications that are
// waiting for a confirmation around.
// They will be notified when we find
// the spend within a block.
rem := make(map[uint64]*spendNotification)
for c, ntfn := range clients {
// If this client didn't want
// to be notified on mempool
// spends, store it for later.
if !ntfn.mempool {
rem[c] = ntfn
continue
}
chainntnfs.Log.Infof("Dispatching "+
"spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
@ -393,6 +428,12 @@ out:
close(ntfn.spendChan)
}
delete(b.spendNotifications, prevOut)
// If we had any clients left, add them
// back to the map.
if len(rem) > 0 {
b.spendNotifications[prevOut] = rem
}
}
}
@ -461,6 +502,65 @@ func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash,
return &txConf, nil
}
// handleBlocksConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications
// now or after numConfirmations confs.
// TODO(halseth): this is reusing the neutrino notifier implementation, unify
// them.
func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// First we'll notify any subscribed clients of the block.
b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Next, we'll scan over the list of relevant transactions and possibly
// dispatch notifications for confirmations and spends.
for _, tx := range newBlock.txns {
mtx := tx.MsgTx()
txSha := mtx.TxHash()
for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an output which we have a
// registered notification for, then create a spend summary, finally
// sending off the details to the notification subscriber.
clients, ok := b.spendNotifications[prevOut]
if !ok {
continue
}
// TODO(roasbeef): many integration tests expect spend to be
// notified within the mempool.
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &txSha,
SpendingTx: mtx,
SpenderInputIndex: uint32(i),
SpendingHeight: int32(newBlock.height),
}
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to Cancel will not
// block. This is safe to do since the channel is buffered, and
// the message can still be read by the receiver.
close(ntfn.spendChan)
}
delete(b.spendNotifications, prevOut)
}
}
// A new block has been connected to the main chain.
// Send out any N confirmation notifications which may
// have been triggered by this new block.
b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns)
return nil
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
@ -489,6 +589,7 @@ type spendNotification struct {
spendChan chan *chainntnfs.SpendDetail
spendID uint64
mempool bool
}
// spendCancel is a message sent to the BtcdNotifier when a client wishes to
@ -506,12 +607,13 @@ type spendCancel struct {
// outpoint has been detected, the details of the spending event will be sent
// across the 'Spend' channel.
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
_ uint32) (*chainntnfs.SpendEvent, error) {
_ uint32, mempool bool) (*chainntnfs.SpendEvent, error) {
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
spendID: atomic.AddUint64(&b.spendClientCounter, 1),
mempool: mempool,
}
select {

@ -36,19 +36,22 @@ type ChainNotifier interface {
heightHint uint32) (*ConfirmationEvent, error)
// RegisterSpendNtfn registers an intent to be notified once the target
// outpoint is successfully spent within a confirmed transaction. The
// returned SpendEvent will receive a send on the 'Spend' transaction
// once a transaction spending the input is detected on the blockchain.
// The heightHint parameter is provided as a convenience to light
// clients. The heightHint denotes the earliest height in the blockchain
// in which the target output could have been created.
// outpoint is successfully spent within a transaction. The returned
// SpendEvent will receive a send on the 'Spend' transaction once a
// transaction spending the input is detected on the blockchain. The
// heightHint parameter is provided as a convenience to light clients.
// The heightHint denotes the earliest height in the blockchain in
// which the target output could have been created.
//
// NOTE: This notifications should be triggered once the transaction is
// *seen* on the network, not when it has received a single confirmation.
// NOTE: If mempool=true is set, then this notification should be
// triggered on a best-effort basis once the transaction is *seen* on
// the network. If mempool=false, it should only be triggered when the
// spending transaction receives a single confirmation.
//
// NOTE: Dispatching notifications to multiple clients subscribed to a
// spend of the same outpoint MUST be supported.
RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint32) (*SpendEvent, error)
RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint32,
mempool bool) (*SpendEvent, error)
// RegisterBlockEpochNtfn registers an intent to be notified of each
// new block connected to the tip of the main chain. The returned

@ -15,6 +15,9 @@ import (
"github.com/lightninglabs/neutrino"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify"
"github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
"github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify"
"github.com/ltcsuite/ltcd/btcjson"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcwallet/walletdb"
@ -27,18 +30,6 @@ import (
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
// Required to auto-register the bitcoind backed ChainNotifier
// implementation.
_ "github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify"
// Required to auto-register the btcd backed ChainNotifier
// implementation.
_ "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
// Required to auto-register the neutrino backed ChainNotifier
// implementation.
_ "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify"
// Required to register the boltdb walletdb implementation.
_ "github.com/roasbeef/btcwallet/walletdb/bdb"
)
@ -386,7 +377,7 @@ func testSpendNotification(miner *rpctest.Harness,
spendClients := make([]*chainntnfs.SpendEvent, numClients)
for i := 0; i < numClients; i++ {
spentIntent, err := notifier.RegisterSpendNtfn(outpoint,
uint32(currentHeight))
uint32(currentHeight), false)
if err != nil {
t.Fatalf("unable to register for spend ntfn: %v", err)
}
@ -408,6 +399,16 @@ func testSpendNotification(miner *rpctest.Harness,
t.Fatalf("tx not relayed to miner: %v", err)
}
// Make sure notifications are not yet sent.
for _, c := range spendClients {
select {
case <-c.Spend:
t.Fatalf("did not expect to get notification before " +
"block was mined")
case <-time.After(50 * time.Millisecond):
}
}
// Now we mine a single block, which should include our spend. The
// notification should also be sent off.
if _, err := miner.Node.Generate(1); err != nil {
@ -419,19 +420,9 @@ func testSpendNotification(miner *rpctest.Harness,
t.Fatalf("unable to get current height: %v", err)
}
// For each event we registered for above, we create a goroutine which
// will listen on the event channel, passing it proxying each
// notification into a single which will be examined below.
spentNtfn := make(chan *chainntnfs.SpendDetail, numClients)
for i := 0; i < numClients; i++ {
go func(c *chainntnfs.SpendEvent) {
spentNtfn <- <-c.Spend
}(spendClients[i])
}
for i := 0; i < numClients; i++ {
for _, c := range spendClients {
select {
case ntfn := <-spentNtfn:
case ntfn := <-c.Spend:
// We've received the spend nftn. So now verify all the
// fields have been set properly.
if *ntfn.SpentOutPoint != *outpoint {
@ -460,6 +451,120 @@ func testSpendNotification(miner *rpctest.Harness,
}
}
func testSpendNotificationMempoolSpends(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) {
// Skip this test for neutrino and bitcoind backends, as they currently
// don't support notifying about mempool spends.
switch notifier.(type) {
case *neutrinonotify.NeutrinoNotifier:
return
case *bitcoindnotify.BitcoindNotifier:
return
case *btcdnotify.BtcdNotifier:
// Go on to test this implementation.
default:
t.Fatalf("unknown notifier type: %T", notifier)
}
// We first create a new output to our test target address.
outpoint, pkScript := createSpendableOutput(miner, t)
_, currentHeight, err := miner.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current height: %v", err)
}
// Now that we have a output index and the pkScript, register for a
// spentness notification for the newly created output with multiple
// clients in order to ensure the implementation can support
// multi-client spend notifications.
// We first create a list of clients that will be notified on mempool
// spends.
const numClients = 5
spendClientsMempool := make([]*chainntnfs.SpendEvent, numClients)
for i := 0; i < numClients; i++ {
spentIntent, err := notifier.RegisterSpendNtfn(outpoint,
uint32(currentHeight), true)
if err != nil {
t.Fatalf("unable to register for spend ntfn: %v", err)
}
spendClientsMempool[i] = spentIntent
}
// Next, create a new transaction spending that output.
spendingTx := createSpendTx(outpoint, pkScript, t)
// Broadcast our spending transaction.
spenderSha, err := miner.Node.SendRawTransaction(spendingTx, true)
if err != nil {
t.Fatalf("unable to broadcast tx: %v", err)
}
err = waitForMempoolTx(miner, spenderSha)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
// Make sure the mempool spend clients are correctly notified.
for _, client := range spendClientsMempool {
select {
case ntfn, ok := <-client.Spend:
if !ok {
t.Fatalf("channel closed unexpectedly")
}
if *ntfn.SpentOutPoint != *outpoint {
t.Fatalf("ntfn includes wrong output, reports "+
"%v instead of %v",
ntfn.SpentOutPoint, outpoint)
}
if !bytes.Equal(ntfn.SpenderTxHash[:], spenderSha[:]) {
t.Fatalf("ntfn includes wrong spender tx sha, "+
"reports %v instead of %v",
ntfn.SpenderTxHash[:], spenderSha[:])
}
if ntfn.SpenderInputIndex != 0 {
t.Fatalf("ntfn includes wrong spending input "+
"index, reports %v, should be %v",
ntfn.SpenderInputIndex, 0)
}
if ntfn.SpendingHeight != currentHeight+1 {
t.Fatalf("ntfn has wrong spending height: "+
"expected %v, got %v", currentHeight,
ntfn.SpendingHeight)
}
case <-time.After(5 * time.Second):
t.Fatalf("did not receive notification")
}
}
// TODO(halseth): create new clients that should be registered after tx
// is in the mempool already, when btcd supports notifying on these.
// Now we mine a single block, which should include our spend. The
// notification should not be sent off again.
if _, err := miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate single block: %v", err)
}
// When a block is mined, the mempool notifications we registered should
// not be sent off again, and the channel should be closed.
for _, c := range spendClientsMempool {
select {
case _, ok := <-c.Spend:
if ok {
t.Fatalf("channel should have been closed")
}
case <-time.After(30 * time.Second):
t.Fatalf("expected clients to be closed.")
}
}
}
func testBlockEpochNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) {
@ -909,7 +1014,7 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
// happened. The notifier should dispatch a spend notification
// immediately.
spentIntent, err := notifier.RegisterSpendNtfn(outpoint,
uint32(currentHeight))
uint32(currentHeight), true)
if err != nil {
t.Fatalf("unable to register for spend ntfn: %v", err)
}
@ -962,7 +1067,7 @@ func testCancelSpendNtfn(node *rpctest.Harness,
spendClients := make([]*chainntnfs.SpendEvent, numClients)
for i := 0; i < numClients; i++ {
spentIntent, err := notifier.RegisterSpendNtfn(outpoint,
uint32(currentHeight))
uint32(currentHeight), true)
if err != nil {
t.Fatalf("unable to register for spend ntfn: %v", err)
}
@ -1259,6 +1364,10 @@ var ntfnTests = []testCase{
name: "spend ntfn",
test: testSpendNotification,
},
{
name: "spend ntfn mempool",
test: testSpendNotificationMempoolSpends,
},
{
name: "block epoch",
test: testBlockEpochNotification,

@ -566,7 +566,7 @@ type spendCancel struct {
// target outpoint has been detected, the details of the spending event will be
// sent across the 'Spend' channel.
func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
heightHint uint32) (*chainntnfs.SpendEvent, error) {
heightHint uint32, _ bool) (*chainntnfs.SpendEvent, error) {
n.heightMtx.RLock()
currentHeight := n.bestHeight

@ -477,7 +477,7 @@ func (c *ChainArbitrator) Stop() error {
// NOTE: This must be launched as a goroutine.
func (c *ChainArbitrator) watchForChannelClose(closeInfo *channeldb.ChannelCloseSummary) {
spendNtfn, err := c.cfg.Notifier.RegisterSpendNtfn(
&closeInfo.ChanPoint, closeInfo.CloseHeight,
&closeInfo.ChanPoint, closeInfo.CloseHeight, true,
)
if err != nil {
log.Errorf("unable to register for spend: %v", err)

@ -180,7 +180,7 @@ func (c *chainWatcher) Start() error {
}
spendNtfn, err := c.notifier.RegisterSpendNtfn(
fundingOut, heightHint,
fundingOut, heightHint, true,
)
if err != nil {
return err

@ -173,7 +173,7 @@ func (h *htlcTimeoutResolver) Resolve() (ContractResolver, error) {
// to confirm.
spendNtfn, err := h.Notifier.RegisterSpendNtfn(
&h.htlcResolution.ClaimOutpoint,
h.broadcastHeight,
h.broadcastHeight, true,
)
if err != nil {
return err
@ -608,7 +608,7 @@ func (h *htlcSuccessResolver) Resolve() (ContractResolver, error) {
// To wrap this up, we'll wait until the second-level transaction has
// been spent, then fully resolve the contract.
spendNtfn, err := h.Notifier.RegisterSpendNtfn(
&h.htlcResolution.ClaimOutpoint, h.broadcastHeight,
&h.htlcResolution.ClaimOutpoint, h.broadcastHeight, true,
)
if err != nil {
return nil, err
@ -820,7 +820,7 @@ func (h *htlcOutgoingContestResolver) Resolve() (ContractResolver, error) {
// the remote party sweeps with the pre-image, we'll be notified.
spendNtfn, err := h.Notifier.RegisterSpendNtfn(
&outPointToWatch,
h.broadcastHeight,
h.broadcastHeight, true,
)
if err != nil {
return nil, err
@ -1316,7 +1316,7 @@ func (c *commitSweepResolver) Resolve() (ContractResolver, error) {
// until the commitment output has been spent.
spendNtfn, err := c.Notifier.RegisterSpendNtfn(
&c.commitResolution.SelfOutPoint,
c.broadcastHeight,
c.broadcastHeight, true,
)
if err != nil {
return nil, err

@ -261,7 +261,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
return nil, nil
}
func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ uint32) (*chainntnfs.SpendEvent, error) {
func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ uint32,
_ bool) (*chainntnfs.SpendEvent, error) {
return nil, nil
}

@ -124,7 +124,7 @@ func (m *mockNotifier) Stop() error {
}
func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
heightHint uint32) (*chainntnfs.SpendEvent, error) {
heightHint uint32, _ bool) (*chainntnfs.SpendEvent, error) {
return &chainntnfs.SpendEvent{
Spend: make(chan *chainntnfs.SpendDetail),
Cancel: func() {},

@ -106,7 +106,7 @@ func (m *mockNotfier) Stop() error {
return nil
}
func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint,
heightHint uint32) (*chainntnfs.SpendEvent, error) {
heightHint uint32, _ bool) (*chainntnfs.SpendEvent, error) {
return &chainntnfs.SpendEvent{
Spend: make(chan *chainntnfs.SpendDetail),
Cancel: func() {},
@ -130,7 +130,7 @@ func makeMockSpendNotifier() *mockSpendNotifier {
}
func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
heightHint uint32) (*chainntnfs.SpendEvent, error) {
heightHint uint32, _ bool) (*chainntnfs.SpendEvent, error) {
spendChan := make(chan *chainntnfs.SpendDetail)
m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan)