chainntnfs: add support to cancel pending registered confirmation

This commit is contained in:
Wilmer Paulino 2018-12-10 18:27:25 -08:00
parent be2c321c8c
commit 0c579b110c
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
6 changed files with 170 additions and 40 deletions

@ -885,16 +885,19 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
// Construct a notification request for the transaction and send it to
// the main event loop.
confID := atomic.AddUint64(&b.confClientCounter, 1)
confRequest, err := chainntnfs.NewConfRequest(txid, pkScript)
if err != nil {
return nil, err
}
ntfn := &chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&b.confClientCounter, 1),
ConfID: confID,
ConfRequest: confRequest,
NumConfirmations: numConfs,
Event: chainntnfs.NewConfirmationEvent(numConfs),
HeightHint: heightHint,
Event: chainntnfs.NewConfirmationEvent(numConfs, func() {
b.txNotifier.CancelConf(confRequest, confID)
}),
HeightHint: heightHint,
}
chainntnfs.Log.Infof("New confirmation subscription: %v, num_confs=%v",

@ -925,16 +925,19 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
// Construct a notification request for the transaction and send it to
// the main event loop.
confID := atomic.AddUint64(&b.confClientCounter, 1)
confRequest, err := chainntnfs.NewConfRequest(txid, pkScript)
if err != nil {
return nil, err
}
ntfn := &chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&b.confClientCounter, 1),
ConfID: confID,
ConfRequest: confRequest,
NumConfirmations: numConfs,
Event: chainntnfs.NewConfirmationEvent(numConfs),
HeightHint: heightHint,
Event: chainntnfs.NewConfirmationEvent(numConfs, func() {
b.txNotifier.CancelConf(confRequest, confID)
}),
HeightHint: heightHint,
}
chainntnfs.Log.Infof("New confirmation subscription: %v, num_confs=%v ",

@ -72,8 +72,6 @@ func (t TxConfStatus) String() string {
//
// Concrete implementations of ChainNotifier should be able to support multiple
// concurrent client requests, as well as multiple concurrent notification events.
// TODO(roasbeef): all events should have a Cancel() method to free up the
// resource
type ChainNotifier interface {
// RegisterConfirmationsNtfn registers an intent to be notified once
// txid reaches numConfs confirmations. We also pass in the pkScript as
@ -175,6 +173,9 @@ type TxConfirmation struct {
// If the event that the original transaction becomes re-org'd out of the main
// chain, the 'NegativeConf' will be sent upon with a value representing the
// depth of the re-org.
//
// NOTE: If the caller wishes to cancel their registered spend notification,
// the Cancel closure MUST be called.
type ConfirmationEvent struct {
// Confirmed is a channel that will be sent upon once the transaction
// has been fully confirmed. The struct sent will contain all the
@ -191,26 +192,27 @@ type ConfirmationEvent struct {
// confirmations.
Updates chan uint32
// TODO(roasbeef): all goroutines on ln channel updates should also
// have a struct chan that's closed if funding gets re-org out. Need
// to sync, to request another confirmation event ntfn, then re-open
// channel after confs.
// NegativeConf is a channel that will be sent upon if the transaction
// confirms, but is later reorged out of the chain. The integer sent
// through the channel represents the reorg depth.
//
// NOTE: This channel must be buffered.
NegativeConf chan int32
// Cancel is a closure that should be executed by the caller in the case
// that they wish to prematurely abandon their registered confirmation
// notification.
Cancel func()
}
// NewConfirmationEvent constructs a new ConfirmationEvent with newly opened
// channels.
func NewConfirmationEvent(numConfs uint32) *ConfirmationEvent {
func NewConfirmationEvent(numConfs uint32, cancel func()) *ConfirmationEvent {
return &ConfirmationEvent{
Confirmed: make(chan *TxConfirmation, 1),
Updates: make(chan uint32, numConfs),
NegativeConf: make(chan int32, 1),
Cancel: cancel,
}
}
@ -247,8 +249,8 @@ type SpendEvent struct {
// NOTE: This channel must be buffered.
Reorg chan struct{}
// Cancel is a closure that should be executed by the caller in the
// case that they wish to prematurely abandon their registered spend
// Cancel is a closure that should be executed by the caller in the case
// that they wish to prematurely abandon their registered spend
// notification.
Cancel func()
}
@ -287,8 +289,8 @@ type BlockEpochEvent struct {
// NOTE: This channel must be buffered.
Epochs <-chan *BlockEpoch
// Cancel is a closure that should be executed by the caller in the
// case that they wish to abandon their registered spend notification.
// Cancel is a closure that should be executed by the caller in the case
// that they wish to abandon their registered block epochs notification.
Cancel func()
}

@ -795,16 +795,19 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
// Construct a notification request for the transaction and send it to
// the main event loop.
confID := atomic.AddUint64(&n.confClientCounter, 1)
confRequest, err := chainntnfs.NewConfRequest(txid, pkScript)
if err != nil {
return nil, err
}
ntfn := &chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&n.confClientCounter, 1),
ConfID: confID,
ConfRequest: confRequest,
NumConfirmations: numConfs,
Event: chainntnfs.NewConfirmationEvent(numConfs),
HeightHint: heightHint,
Event: chainntnfs.NewConfirmationEvent(numConfs, func() {
n.txNotifier.CancelConf(confRequest, confID)
}),
HeightHint: heightHint,
}
chainntnfs.Log.Infof("New confirmation subscription: %v, num_confs=%v",

@ -602,6 +602,38 @@ func (n *TxNotifier) RegisterConf(ntfn *ConfNtfn) (*HistoricalConfDispatch,
return dispatch, n.currentHeight, nil
}
// CancelConf cancels an existing request for a spend notification of an
// outpoint/output script. The request is identified by its spend ID.
func (n *TxNotifier) CancelConf(confRequest ConfRequest, confID uint64) {
select {
case <-n.quit:
return
default:
}
n.Lock()
defer n.Unlock()
confSet, ok := n.confNotifications[confRequest]
if !ok {
return
}
ntfn, ok := confSet.ntfns[confID]
if !ok {
return
}
Log.Infof("Canceling confirmation notification: conf_id=%d, %v", confID,
confRequest)
// We'll close all the notification channels to let the client know
// their cancel request has been fulfilled.
close(ntfn.Event.Confirmed)
close(ntfn.Event.Updates)
close(ntfn.Event.NegativeConf)
delete(confSet.ntfns, confID)
}
// UpdateConfDetails attempts to update the confirmation details for an active
// notification within the notifier. This should only be used in the case of a
// transaction/output script that has confirmed before the notifier's current
@ -904,9 +936,6 @@ func (n *TxNotifier) CancelSpend(spendRequest SpendRequest, spendID uint64) {
n.Lock()
defer n.Unlock()
Log.Infof("Canceling spend notification: spend_id=%d, %v", spendID,
spendRequest)
spendSet, ok := n.spendNotifications[spendRequest]
if !ok {
return
@ -916,6 +945,9 @@ func (n *TxNotifier) CancelSpend(spendRequest SpendRequest, spendID uint64) {
return
}
Log.Infof("Canceling spend notification: spend_id=%d, %v", spendID,
spendRequest)
// We'll close all the notification channels to let the client know
// their cancel request has been fulfilled.
close(ntfn.Event.Spend)

@ -143,7 +143,7 @@ func TestTxNotifierMaxConfs(t *testing.T) {
},
NumConfirmations: chainntnfs.MaxNumConfs + 1,
Event: chainntnfs.NewConfirmationEvent(
chainntnfs.MaxNumConfs,
chainntnfs.MaxNumConfs, nil,
),
}
if _, _, err := n.RegisterConf(ntfn); err != chainntnfs.ErrTxMaxConfs {
@ -182,7 +182,7 @@ func TestTxNotifierFutureConfDispatch(t *testing.T) {
PkScript: testScript,
},
NumConfirmations: tx1NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs),
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs, nil),
}
if _, _, err := n.RegisterConf(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
@ -196,7 +196,7 @@ func TestTxNotifierFutureConfDispatch(t *testing.T) {
PkScript: testScript,
},
NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs, nil),
}
if _, _, err := n.RegisterConf(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
@ -365,7 +365,7 @@ func TestTxNotifierHistoricalConfDispatch(t *testing.T) {
ConfID: 0,
ConfRequest: chainntnfs.ConfRequest{TxID: tx1Hash},
NumConfirmations: tx1NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs),
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs, nil),
}
if _, _, err := n.RegisterConf(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
@ -376,7 +376,7 @@ func TestTxNotifierHistoricalConfDispatch(t *testing.T) {
ConfID: 1,
ConfRequest: chainntnfs.ConfRequest{TxID: tx2Hash},
NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs, nil),
}
if _, _, err := n.RegisterConf(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
@ -697,7 +697,7 @@ func TestTxNotifierMultipleHistoricalConfRescans(t *testing.T) {
ConfID: 0,
// TODO(wilmer): set pkScript.
ConfRequest: chainntnfs.ConfRequest{TxID: chainntnfs.ZeroHash},
Event: chainntnfs.NewConfirmationEvent(1),
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
historicalConfDispatch1, _, err := n.RegisterConf(confNtfn1)
if err != nil {
@ -714,7 +714,7 @@ func TestTxNotifierMultipleHistoricalConfRescans(t *testing.T) {
ConfID: 1,
// TODO(wilmer): set pkScript.
ConfRequest: chainntnfs.ConfRequest{TxID: chainntnfs.ZeroHash},
Event: chainntnfs.NewConfirmationEvent(1),
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
historicalConfDispatch2, _, err := n.RegisterConf(confNtfn2)
if err != nil {
@ -739,7 +739,7 @@ func TestTxNotifierMultipleHistoricalConfRescans(t *testing.T) {
confNtfn3 := &chainntnfs.ConfNtfn{
ConfID: 2,
ConfRequest: chainntnfs.ConfRequest{TxID: chainntnfs.ZeroHash},
Event: chainntnfs.NewConfirmationEvent(1),
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
historicalConfDispatch3, _, err := n.RegisterConf(confNtfn3)
if err != nil {
@ -860,7 +860,7 @@ func TestTxNotifierMultipleHistoricalNtfns(t *testing.T) {
confNtfns[i] = &chainntnfs.ConfNtfn{
ConfID: i,
ConfRequest: confRequest,
Event: chainntnfs.NewConfirmationEvent(1),
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
if _, _, err := n.RegisterConf(confNtfns[i]); err != nil {
t.Fatalf("unable to register conf ntfn #%d: %v", i, err)
@ -882,6 +882,7 @@ func TestTxNotifierMultipleHistoricalNtfns(t *testing.T) {
// it can stop watching at tip.
expectedConfDetails := &chainntnfs.TxConfirmation{
BlockHeight: startingHeight - 1,
Tx: wire.NewMsgTx(1),
}
err := n.UpdateConfDetails(confNtfns[0].ConfRequest, expectedConfDetails)
if err != nil {
@ -907,7 +908,7 @@ func TestTxNotifierMultipleHistoricalNtfns(t *testing.T) {
extraConfNtfn := &chainntnfs.ConfNtfn{
ConfID: numNtfns + 1,
ConfRequest: confRequest,
Event: chainntnfs.NewConfirmationEvent(1),
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
historicalConfRescan, _, err := n.RegisterConf(extraConfNtfn)
if err != nil {
@ -1002,6 +1003,92 @@ func TestTxNotifierMultipleHistoricalNtfns(t *testing.T) {
}
}
// TestTxNotifierCancelConf ensures that a confirmation notification after a
// client has canceled their intent to receive one.
func TestTxNotifierCancelConf(t *testing.T) {
t.Parallel()
const startingHeight = 10
hintCache := newMockHintCache()
n := chainntnfs.NewTxNotifier(startingHeight, 100, hintCache, hintCache)
// We'll register two notification requests. Only the second one will be
// canceled.
tx1 := wire.NewMsgTx(1)
tx1.AddTxOut(&wire.TxOut{PkScript: testRawScript})
ntfn1 := &chainntnfs.ConfNtfn{
ConfID: 1,
ConfRequest: chainntnfs.ConfRequest{
TxID: tx1.TxHash(),
PkScript: testScript,
},
NumConfirmations: 1,
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
if _, _, err := n.RegisterConf(ntfn1); err != nil {
t.Fatalf("unable to register spend ntfn: %v", err)
}
tx2 := wire.NewMsgTx(2)
tx2.AddTxOut(&wire.TxOut{PkScript: testRawScript})
ntfn2 := &chainntnfs.ConfNtfn{
ConfID: 2,
ConfRequest: chainntnfs.ConfRequest{
TxID: tx2.TxHash(),
PkScript: testScript,
},
NumConfirmations: 1,
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
if _, _, err := n.RegisterConf(ntfn2); err != nil {
t.Fatalf("unable to register spend ntfn: %v", err)
}
// Construct a block that will confirm both transactions.
block := btcutil.NewBlock(&wire.MsgBlock{
Transactions: []*wire.MsgTx{tx1, tx2},
})
tx1ConfDetails := &chainntnfs.TxConfirmation{
BlockHeight: startingHeight + 1,
BlockHash: block.Hash(),
TxIndex: 0,
Tx: tx1,
}
// Before extending the notifier's tip with the block above, we'll
// cancel the second request.
n.CancelConf(ntfn2.ConfRequest, ntfn2.ConfID)
err := n.ConnectTip(block.Hash(), startingHeight+1, block.Transactions())
if err != nil {
t.Fatalf("unable to connect block: %v", err)
}
if err := n.NotifyHeight(startingHeight + 1); err != nil {
t.Fatalf("unable to dispatch notifications: %v", err)
}
// The first request should still be active, so we should receive a
// confirmation notification with the correct details.
select {
case confDetails := <-ntfn1.Event.Confirmed:
assertConfDetails(t, confDetails, tx1ConfDetails)
default:
t.Fatalf("expected to receive confirmation notification")
}
// The second one, however, should not have. The event's Confrimed
// channel must have also been closed to indicate the caller that the
// TxNotifier can no longer fulfill their canceled request.
select {
case _, ok := <-ntfn2.Event.Confirmed:
if ok {
t.Fatal("expected Confirmed channel to be closed")
}
default:
t.Fatal("expected Confirmed channel to be closed")
}
}
// TestTxNotifierCancelSpend ensures that a spend notification after a client
// has canceled their intent to receive one.
func TestTxNotifierCancelSpend(t *testing.T) {
@ -1120,7 +1207,7 @@ func TestTxNotifierConfReorg(t *testing.T) {
PkScript: testScript,
},
NumConfirmations: tx1NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs),
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs, nil),
}
if _, _, err := n.RegisterConf(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
@ -1140,7 +1227,7 @@ func TestTxNotifierConfReorg(t *testing.T) {
PkScript: testScript,
},
NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs, nil),
}
if _, _, err := n.RegisterConf(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
@ -1160,7 +1247,7 @@ func TestTxNotifierConfReorg(t *testing.T) {
PkScript: testScript,
},
NumConfirmations: tx3NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx3NumConfs),
Event: chainntnfs.NewConfirmationEvent(tx3NumConfs, nil),
}
if _, _, err := n.RegisterConf(&ntfn3); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
@ -1646,7 +1733,7 @@ func TestTxNotifierConfirmHintCache(t *testing.T) {
PkScript: testScript,
},
NumConfirmations: 1,
Event: chainntnfs.NewConfirmationEvent(1),
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
tx2 := wire.MsgTx{Version: 2}
@ -1658,7 +1745,7 @@ func TestTxNotifierConfirmHintCache(t *testing.T) {
PkScript: testScript,
},
NumConfirmations: 2,
Event: chainntnfs.NewConfirmationEvent(2),
Event: chainntnfs.NewConfirmationEvent(2, nil),
}
if _, _, err := n.RegisterConf(ntfn1); err != nil {
@ -2039,7 +2126,7 @@ func TestTxNotifierTearDown(t *testing.T) {
ConfID: 1,
ConfRequest: chainntnfs.ConfRequest{TxID: chainntnfs.ZeroHash},
NumConfirmations: 1,
Event: chainntnfs.NewConfirmationEvent(1),
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
if _, _, err := n.RegisterConf(confNtfn); err != nil {
t.Fatalf("unable to register conf ntfn: %v", err)