chainntnfs: add the ability to cancel spend+epoch notifications

When iterating with the ChainNotifier, it currently isn’t possible to
cancel a non-dispatched yet active notificaiton intent. As a result,
this can be rather wasteful in many parts of lnd which my repeatedly
create a new spend notification depending on if/when a peer is
connected or not.

In order to fix this, we add a new `Cancel func()` field to both the
`BlockEpochEvent` and `SpendEvent` structs. This new closure attribute
allows the caller to cancel the yet-to-be-dispathed event, allowing the
ChainNotifier to free up resources.
This commit is contained in:
Olaoluwa Osuntokun 2017-02-20 16:29:05 -08:00
parent 19a7778a7d
commit a3319bb21a
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 202 additions and 32 deletions

@ -59,7 +59,7 @@ type ChainNotifier interface {
} }
// TxConfirmation carries some additional block-level details of the exact // TxConfirmation carries some additional block-level details of the exact
// block that specified transactions was confirmed wihtin. // block that specified transactions was confirmed within.
type TxConfirmation struct { type TxConfirmation struct {
// BlockHash is the hash of the block that confirmed the original // BlockHash is the hash of the block that confirmed the original
// transition. // transition.
@ -115,22 +115,46 @@ type SpendDetail struct {
// SpendEvent encapsulates a spentness notification. Its only field 'Spend' will // SpendEvent encapsulates a spentness notification. Its only field 'Spend' will
// be sent upon once the target output passed into RegisterSpendNtfn has been // be sent upon once the target output passed into RegisterSpendNtfn has been
// spent on the blockchain. // spent on the blockchain.
//
// NOTE: If the caller wishes to cancel their registered spend notification,
// the Cancel closure MUST be called.
type SpendEvent struct { type SpendEvent struct {
Spend chan *SpendDetail // MUST be buffered. // Spend is a receive only channel which will be sent upon once the
// target outpoint has been spent.
Spend <-chan *SpendDetail // MUST be buffered.
// Cancel is a closure that should be executed by the caller in the
// case that they wish to prematurely abandon their regsitered spend
// notification.
Cancel func()
} }
// BlockEpoch represents metadata concerning each new block connected to the // BlockEpoch represents metadata concerning each new block connected to the
// main chain. // main chain.
type BlockEpoch struct { type BlockEpoch struct {
Height int32 // Hash is the block hash of the latest blcok to be added to the tip of
// the main chain.
Hash *chainhash.Hash Hash *chainhash.Hash
// Height is the height of the latest block to be added to the tip of
// the main chain.
Height int32
} }
// BlockEpochEvent encapsulates an on-going stream of block epoch // BlockEpochEvent encapsulates an on-going stream of block epoch
// notifications. Its only field 'Epoochs' will be sent upon for each new block // notifications. Its only field 'Epochs' will be sent upon for each new block
// connected to the main-chain. // connected to the main-chain.
//
// NOTE: If the caller wishes to cancel their registered block epoch
// notification, the Cancel closure MUST be called.
type BlockEpochEvent struct { type BlockEpochEvent struct {
Epochs chan *BlockEpoch // MUST be buffered. // Epochs is a receive only channel that will be sent upon each time a
// new block is connected to the end of the main chain.
Epochs <-chan *BlockEpoch // MUST be buffered.
// Cancel is a closure that should be executed by the caller in the
// case that they wish to abandon their registered spend notification.
Cancel func()
} }
// NotifierDriver represents a "driver" for a particular interface. A driver is // NotifierDriver represents a "driver" for a particular interface. A driver is

@ -193,15 +193,9 @@ func testBatchConfirmationNotification(miner *rpctest.Harness,
} }
} }
func testSpendNotification(miner *rpctest.Harness, func createSpendableOutput(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { t *testing.T) (*wire.OutPoint, []byte) {
t.Logf("testing multi-client spend notification")
// We'd like to test the spend notifiations for all ChainNotifier
// concrete implemenations.
//
// To do so, we first create a new output to our test target address.
txid, err := getTestTxId(miner) txid, err := getTestTxId(miner)
if err != nil { if err != nil {
t.Fatalf("unable to create test addr: %v", err) t.Fatalf("unable to create test addr: %v", err)
@ -234,24 +228,12 @@ func testSpendNotification(miner *rpctest.Harness,
t.Fatalf("unable to locate new output") t.Fatalf("unable to locate new output")
} }
// Now that we've found the output index, register for a spentness return wire.NewOutPoint(txid, uint32(outIndex)), pkScript
// notification for the newly created output with multiple clients in
// order to ensure the implementation can support multi-client spend
// notifiations.
outpoint := wire.NewOutPoint(txid, uint32(outIndex))
const numClients = 5
spendClients := make([]*chainntnfs.SpendEvent, numClients)
for i := 0; i < numClients; i++ {
spentIntent, err := notifier.RegisterSpendNtfn(outpoint)
if err != nil {
t.Fatalf("unable to register for spend ntfn: %v", err)
} }
spendClients[i] = spentIntent func createSpendTx(outpoint *wire.OutPoint, pkScript []byte,
} t *testing.T) *wire.MsgTx {
// Next, create a new transaction spending that output.
spendingTx := wire.NewMsgTx(1) spendingTx := wire.NewMsgTx(1)
spendingTx.AddTxIn(&wire.TxIn{ spendingTx.AddTxIn(&wire.TxIn{
PreviousOutPoint: *outpoint, PreviousOutPoint: *outpoint,
@ -267,6 +249,38 @@ func testSpendNotification(miner *rpctest.Harness,
} }
spendingTx.TxIn[0].SignatureScript = sigScript spendingTx.TxIn[0].SignatureScript = sigScript
return spendingTx
}
func testSpendNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) {
t.Logf("testing multi-client spend notification")
// We'd like to test the spend notifications for all ChainNotifier
// concrete implementations.
//
// To do so, we first create a new output to our test target address.
outpoint, pkScript := createSpendableOutput(miner, t)
// Now that we have a output index and the pkScript, register for a
// spentness notification for the newly created output with multiple
// clients in order to ensure the implementation can support
// multi-client spend notifications.
const numClients = 5
spendClients := make([]*chainntnfs.SpendEvent, numClients)
for i := 0; i < numClients; i++ {
spentIntent, err := notifier.RegisterSpendNtfn(outpoint)
if err != nil {
t.Fatalf("unable to register for spend ntfn: %v", err)
}
spendClients[i] = spentIntent
}
// Next, create a new transaction spending that output.
spendingTx := createSpendTx(outpoint, pkScript, t)
// Broadcast our spending transaction. // Broadcast our spending transaction.
spenderSha, err := miner.Node.SendRawTransaction(spendingTx, true) spenderSha, err := miner.Node.SendRawTransaction(spendingTx, true)
if err != nil { if err != nil {
@ -281,7 +295,7 @@ func testSpendNotification(miner *rpctest.Harness,
// For each event we registered for above, we create a goroutine which // For each event we registered for above, we create a goroutine which
// will listen on the event channel, passing it proxying each // will listen on the event channel, passing it proxying each
// notification into a single which will be examined belwo. // notification into a single which will be examined below..
spentNtfn := make(chan *chainntnfs.SpendDetail, numClients) spentNtfn := make(chan *chainntnfs.SpendDetail, numClients)
for i := 0; i < numClients; i++ { for i := 0; i < numClients; i++ {
go func(c *chainntnfs.SpendEvent) { go func(c *chainntnfs.SpendEvent) {
@ -615,6 +629,136 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
} }
} }
func testCancelSpendNtfn(node *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) {
// We'd like to test that once a spend notification is registered, it
// can be cancelled before the notification is dispatched.
// First, we'll start by creating a new output that we can spend
// ourselves.
outpoint, pkScript := createSpendableOutput(node, t)
// Create two clients that each registered to the spend notification.
// We'll cancel the notification for the first client and leave the
// notification for the second client enabled.
const numClients = 2
spendClients := make([]*chainntnfs.SpendEvent, numClients)
for i := 0; i < numClients; i++ {
spentIntent, err := notifier.RegisterSpendNtfn(outpoint)
if err != nil {
t.Fatalf("unable to register for spend ntfn: %v", err)
}
spendClients[i] = spentIntent
}
// Next, create a new transaction spending that output.
spendingTx := createSpendTx(outpoint, pkScript, t)
// Before we broadcast the spending transaction, we'll cancel the
// notification of the first client.
spendClients[1].Cancel()
// Broadcast our spending transaction.
spenderSha, err := node.Node.SendRawTransaction(spendingTx, true)
if err != nil {
t.Fatalf("unable to brodacst tx: %v", err)
}
// Now we mine a single block, which should include our spend. The
// notification should also be sent off.
if _, err := node.Node.Generate(1); err != nil {
t.Fatalf("unable to generate single block: %v", err)
}
// However, the spend notification for the first client should have
// been dispatched.
select {
case ntfn := <-spendClients[0].Spend:
// We've received the spend nftn. So now verify all the
// fields have been set properly.
if ntfn.SpentOutPoint != outpoint {
t.Fatalf("ntfn includes wrong output, reports "+
"%v instead of %v",
ntfn.SpentOutPoint, outpoint)
}
if !bytes.Equal(ntfn.SpenderTxHash[:], spenderSha[:]) {
t.Fatalf("ntfn includes wrong spender tx sha, "+
"reports %v intead of %v",
ntfn.SpenderTxHash[:], spenderSha[:])
}
if ntfn.SpenderInputIndex != 0 {
t.Fatalf("ntfn includes wrong spending input "+
"index, reports %v, should be %v",
ntfn.SpenderInputIndex, 0)
}
case <-time.After(2 * time.Second):
t.Fatalf("spend ntfn never received")
}
// However, The spend notification of the second client should NOT have
// been dispatched.
select {
case _, ok := <-spendClients[1].Spend:
if ok {
t.Fatalf("spend ntfn should have been cancelled")
}
case <-time.After(2 * time.Second):
t.Fatalf("spend ntfn never cancelled")
}
}
func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifier,
t *testing.T) {
// We'd like to ensure that once a client cancels their block epoch
// notifications, no further notifications are sent over the channel
// if/when new blocks come in.
const numClients = 2
epochClients := make([]*chainntnfs.BlockEpochEvent, numClients)
for i := 0; i < numClients; i++ {
epochClient, err := notifier.RegisterBlockEpochNtfn()
if err != nil {
t.Fatalf("unable to register for epoch notification")
}
epochClients[i] = epochClient
}
// Now before we mine any blocks, cancel the notification for the first
// epoch client.
epochClients[0].Cancel()
// Now mine a single block, this should trigger the logic to dispatch
// epoch notifications.
if _, err := node.Node.Generate(1); err != nil {
t.Fatalf("unable to generate blocks: %v", err)
}
// The epoch notification for the first client shouldn't have been
// dispatched.
select {
case _, ok := <-epochClients[0].Epochs:
if ok {
t.Fatalf("epoch notification should've been cancelled")
}
case <-time.After(2 * time.Second):
t.Fatalf("epoch notification not sent")
}
// However, the epoch notification for the second client should have
// been dispatched as normal.
select {
case _, ok := <-epochClients[1].Epochs:
if !ok {
t.Fatalf("epoch was cancelled")
}
case <-time.After(2 * time.Second):
t.Fatalf("epoch notification not sent")
}
}
var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T){ var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T){
testSingleConfirmationNotification, testSingleConfirmationNotification,
testMultiConfirmationNotification, testMultiConfirmationNotification,
@ -624,17 +768,19 @@ var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier,
testBlockEpochNotification, testBlockEpochNotification,
testTxConfirmedBeforeNtfnRegistration, testTxConfirmedBeforeNtfnRegistration,
testSpendBeforeNtfnRegistration, testSpendBeforeNtfnRegistration,
testCancelSpendNtfn,
testCancelEpochNtfn,
} }
// TestInterfaces tests all registered interfaces with a unified set of tests // TestInterfaces tests all registered interfaces with a unified set of tests
// which excersie each of the required methods found within the ChainNotifier // which exercise each of the required methods found within the ChainNotifier
// interface. // interface.
// //
// NOTE: In the future, when additional implementations of the ChainNotifier // NOTE: In the future, when additional implementations of the ChainNotifier
// interface have been implemented, in order to ensure the new concrete // interface have been implemented, in order to ensure the new concrete
// implementation is automatically tested, two steps must be undertaken. First, // implementation is automatically tested, two steps must be undertaken. First,
// one needs add a "non-captured" (_) import from the new sub-package. This // one needs add a "non-captured" (_) import from the new sub-package. This
// import should trigger an init() method within the package which registeres // import should trigger an init() method within the package which registers
// the interface. Second, an additional case in the switch within the main loop // the interface. Second, an additional case in the switch within the main loop
// below needs to be added which properly initializes the interface. // below needs to be added which properly initializes the interface.
func TestInterfaces(t *testing.T) { func TestInterfaces(t *testing.T) {