From 716c20b18da4b553e6bc9f4c1fa517192bb23dca Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 1 Oct 2018 14:19:23 -0700 Subject: [PATCH 1/9] Revert "chainntnfs/height_hint_cache: add disable flag to hint cache" This reverts commit 7df9ae026665de75175853e486143ab80122ce9d. --- chainntnfs/height_hint_cache.go | 34 +++------------------------------ 1 file changed, 3 insertions(+), 31 deletions(-) diff --git a/chainntnfs/height_hint_cache.go b/chainntnfs/height_hint_cache.go index cd499a6e..b05f09df 100644 --- a/chainntnfs/height_hint_cache.go +++ b/chainntnfs/height_hint_cache.go @@ -85,8 +85,7 @@ type ConfirmHintCache interface { // ConfirmHintCache interfaces backed by a channeldb DB instance where the hints // will be stored. type HeightHintCache struct { - db *channeldb.DB - disabled bool + db *channeldb.DB } // Compile-time checks to ensure HeightHintCache satisfies the SpendHintCache @@ -95,11 +94,8 @@ var _ SpendHintCache = (*HeightHintCache)(nil) var _ ConfirmHintCache = (*HeightHintCache)(nil) // NewHeightHintCache returns a new height hint cache backed by a database. -func NewHeightHintCache(db *channeldb.DB, disable bool) (*HeightHintCache, error) { - cache := &HeightHintCache{ - db: db, - disabled: disable, - } +func NewHeightHintCache(db *channeldb.DB) (*HeightHintCache, error) { + cache := &HeightHintCache{db} if err := cache.initBuckets(); err != nil { return nil, err } @@ -123,10 +119,6 @@ func (c *HeightHintCache) initBuckets() error { // CommitSpendHint commits a spend hint for the outpoints to the cache. func (c *HeightHintCache) CommitSpendHint(height uint32, ops ...wire.OutPoint) error { - if c.disabled { - return nil - } - if len(ops) == 0 { return nil } @@ -165,10 +157,6 @@ func (c *HeightHintCache) CommitSpendHint(height uint32, ops ...wire.OutPoint) e // ErrSpendHintNotFound is returned if a spend hint does not exist within the // cache for the outpoint. func (c *HeightHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) { - if c.disabled { - return 0, ErrSpendHintNotFound - } - var hint uint32 err := c.db.View(func(tx *bolt.Tx) error { spendHints := tx.Bucket(spendHintBucket) @@ -197,10 +185,6 @@ func (c *HeightHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) { // PurgeSpendHint removes the spend hint for the outpoints from the cache. func (c *HeightHintCache) PurgeSpendHint(ops ...wire.OutPoint) error { - if c.disabled { - return nil - } - if len(ops) == 0 { return nil } @@ -232,10 +216,6 @@ func (c *HeightHintCache) PurgeSpendHint(ops ...wire.OutPoint) error { // CommitConfirmHint commits a confirm hint for the transactions to the cache. func (c *HeightHintCache) CommitConfirmHint(height uint32, txids ...chainhash.Hash) error { - if c.disabled { - return nil - } - if len(txids) == 0 { return nil } @@ -274,10 +254,6 @@ func (c *HeightHintCache) CommitConfirmHint(height uint32, txids ...chainhash.Ha // ErrConfirmHintNotFound is returned if a confirm hint does not exist within // the cache for the transaction hash. func (c *HeightHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) { - if c.disabled { - return 0, ErrConfirmHintNotFound - } - var hint uint32 err := c.db.View(func(tx *bolt.Tx) error { confirmHints := tx.Bucket(confirmHintBucket) @@ -307,10 +283,6 @@ func (c *HeightHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) // PurgeConfirmHint removes the confirm hint for the transactions from the // cache. func (c *HeightHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error { - if c.disabled { - return nil - } - if len(txids) == 0 { return nil } From 88ac985c95cd5373db66e4b876a66f3a989c313e Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 1 Oct 2018 14:19:38 -0700 Subject: [PATCH 2/9] Revert "chainntnfs/height_hint_cache_test: add tests for disabled cache" This reverts commit 45a2c9aca8f10f341848845e1f51261bd09fcb68. --- chainntnfs/height_hint_cache_test.go | 81 ++-------------------------- 1 file changed, 4 insertions(+), 77 deletions(-) diff --git a/chainntnfs/height_hint_cache_test.go b/chainntnfs/height_hint_cache_test.go index 2f09bce7..f444b18d 100644 --- a/chainntnfs/height_hint_cache_test.go +++ b/chainntnfs/height_hint_cache_test.go @@ -10,7 +10,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" ) -func initHintCache(t *testing.T, disable bool) *HeightHintCache { +func initHintCache(t *testing.T) *HeightHintCache { t.Helper() tempDir, err := ioutil.TempDir("", "kek") @@ -21,7 +21,7 @@ func initHintCache(t *testing.T, disable bool) *HeightHintCache { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := NewHeightHintCache(db, disable) + hintCache, err := NewHeightHintCache(db) if err != nil { t.Fatalf("unable to create hint cache: %v", err) } @@ -34,7 +34,7 @@ func initHintCache(t *testing.T, disable bool) *HeightHintCache { func TestHeightHintCacheConfirms(t *testing.T) { t.Parallel() - hintCache := initHintCache(t, false) + hintCache := initHintCache(t) // Querying for a transaction hash not found within the cache should // return an error indication so. @@ -93,7 +93,7 @@ func TestHeightHintCacheConfirms(t *testing.T) { func TestHeightHintCacheSpends(t *testing.T) { t.Parallel() - hintCache := initHintCache(t, false) + hintCache := initHintCache(t) // Querying for an outpoint not found within the cache should return an // error indication so. @@ -146,76 +146,3 @@ func TestHeightHintCacheSpends(t *testing.T) { } } } - -// TestHeightHintCacheDisabled asserts that a disabled height hint cache never -// returns spend or confirm hints that are committed. -func TestHeightHintCacheDisabled(t *testing.T) { - t.Parallel() - - const height uint32 = 100 - - // Create a disabled height hint cache. - hintCache := initHintCache(t, true) - - // Querying a disabled cache w/ no spend hint should return not found. - var outpoint wire.OutPoint - _, err := hintCache.QuerySpendHint(outpoint) - if err != ErrSpendHintNotFound { - t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) - } - - // Commit a spend hint to the disabled cache, which should be a noop. - if err := hintCache.CommitSpendHint(height, outpoint); err != nil { - t.Fatalf("unable to commit spend hint: %v", err) - } - - // Querying a disabled cache after commit noop should return not found. - _, err = hintCache.QuerySpendHint(outpoint) - if err != ErrSpendHintNotFound { - t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) - } - - // Reenable the cache, this time actually committing a spend hint. - hintCache.disabled = false - if err := hintCache.CommitSpendHint(height, outpoint); err != nil { - t.Fatalf("unable to commit spend hint: %v", err) - } - - // Disable the cache again, spend hint should not be found. - hintCache.disabled = true - _, err = hintCache.QuerySpendHint(outpoint) - if err != ErrSpendHintNotFound { - t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) - } - - // Querying a disabled cache w/ no conf hint should return not found. - var txid chainhash.Hash - _, err = hintCache.QueryConfirmHint(txid) - if err != ErrConfirmHintNotFound { - t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err) - } - - // Commit a conf hint to the disabled cache, which should be a noop. - if err := hintCache.CommitConfirmHint(height, txid); err != nil { - t.Fatalf("unable to commit spend hint: %v", err) - } - - // Querying a disabled cache after commit noop should return not found. - _, err = hintCache.QueryConfirmHint(txid) - if err != ErrConfirmHintNotFound { - t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err) - } - - // Reenable the cache, this time actually committing a conf hint. - hintCache.disabled = false - if err := hintCache.CommitConfirmHint(height, txid); err != nil { - t.Fatalf("unable to commit spend hint: %v", err) - } - - // Disable the cache again, conf hint should not be found. - hintCache.disabled = true - _, err = hintCache.QueryConfirmHint(txid) - if err != ErrConfirmHintNotFound { - t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err) - } -} From 35bfb7909952dc8485009173368440bb6429b47a Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 1 Oct 2018 14:19:48 -0700 Subject: [PATCH 3/9] Revert "chainntnfs/interface_test: run tests w/ disabled cache" This reverts commit 12761a4f435a033e8425dbcb3b407bb094fcae9f. --- chainntnfs/interface_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index b4bfa30e..67c9e9db 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -1758,7 +1758,7 @@ func TestInterfaces(t *testing.T) { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := chainntnfs.NewHeightHintCache(db, true) + hintCache, err := chainntnfs.NewHeightHintCache(db) if err != nil { t.Fatalf("unable to create height hint cache: %v", err) } From c740b8b85f87bf49ee099ec5d8c5d283fc228d68 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 1 Oct 2018 14:19:56 -0700 Subject: [PATCH 4/9] Revert "lnwallet/interface_test: run tests with disabled hint cache" This reverts commit 70ba3119b77151600d2b00c607ba89dfa6408254. --- lnwallet/interface_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lnwallet/interface_test.go b/lnwallet/interface_test.go index 921dc0a8..eeca8962 100644 --- a/lnwallet/interface_test.go +++ b/lnwallet/interface_test.go @@ -2153,7 +2153,7 @@ func TestLightningWallet(t *testing.T) { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := chainntnfs.NewHeightHintCache(db, true) + hintCache, err := chainntnfs.NewHeightHintCache(db) if err != nil { t.Fatalf("unable to create height hint cache: %v", err) } From 7d94c65afc771147d637d5d9d81ba420cca800d9 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 1 Oct 2018 14:20:04 -0700 Subject: [PATCH 5/9] Revert "chainregistry: disable height hint cache" This reverts commit 0e29a457e1f38b2841019dc6664de1697aa84b34. --- chainregistry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chainregistry.go b/chainregistry.go index c4d915d6..c8019458 100644 --- a/chainregistry.go +++ b/chainregistry.go @@ -181,8 +181,8 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, cleanUp func() ) - // Initialize disabled height hint cache within the chain directory. - hintCache, err := chainntnfs.NewHeightHintCache(chanDB, true) + // Initialize the height hint cache within the chain directory. + hintCache, err := chainntnfs.NewHeightHintCache(chanDB) if err != nil { return nil, nil, fmt.Errorf("unable to initialize height hint "+ "cache: %v", err) From 5127f2aa82e60e8048012a1418dd4f5b4e5a5a5e Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 1 Oct 2018 14:20:13 -0700 Subject: [PATCH 6/9] Revert "chainntnfs/bitcoindnotify: disable height hints in testing" This reverts commit ab28db5b0dc780a09b7474e187cefff809ac2714. --- chainntnfs/bitcoindnotify/bitcoind_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind_test.go b/chainntnfs/bitcoindnotify/bitcoind_test.go index 8f731f38..63357ae6 100644 --- a/chainntnfs/bitcoindnotify/bitcoind_test.go +++ b/chainntnfs/bitcoindnotify/bitcoind_test.go @@ -25,7 +25,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := chainntnfs.NewHeightHintCache(db, true) + hintCache, err := chainntnfs.NewHeightHintCache(db) if err != nil { t.Fatalf("unable to create hint cache: %v", err) } From 9a025867d03655fb7fe810ba901191a05454d2a4 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 1 Oct 2018 14:20:27 -0700 Subject: [PATCH 7/9] Revert "chainntnfs/btcdnotify: disable height hint cache in testing" This reverts commit 98e7c968d4bf5900a5d7be7f557eab8f623633c0. --- chainntnfs/btcdnotify/btcd_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chainntnfs/btcdnotify/btcd_test.go b/chainntnfs/btcdnotify/btcd_test.go index 6ef564d0..18bd1bb3 100644 --- a/chainntnfs/btcdnotify/btcd_test.go +++ b/chainntnfs/btcdnotify/btcd_test.go @@ -23,7 +23,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := chainntnfs.NewHeightHintCache(db, true) + hintCache, err := chainntnfs.NewHeightHintCache(db) if err != nil { t.Fatalf("unable to create hint cache: %v", err) } From 770e00594328159d4243ee56c45297d074c390e1 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 11 Oct 2018 17:29:58 -0700 Subject: [PATCH 8/9] chainntnfs/txnotifer: prevent dispatching notifications within ConnectTip In this commit, we modify the TxNotifier's ConnectTip method to no longer dispatch notifications to any clients who had a request fulfilled within the height connected. Instead, it will queue the notifications for dispatch and we add a new method NotifyHeight, which will actually dispatch them. We do this to allow the users of the TxNotifier to be more flexible when dispatching notifications. --- chainntnfs/txnotifier.go | 44 ++++++++----- chainntnfs/txnotifier_test.go | 116 +++++++++++++++++++++++++--------- 2 files changed, 114 insertions(+), 46 deletions(-) diff --git a/chainntnfs/txnotifier.go b/chainntnfs/txnotifier.go index e2a682ea..e3f3c4e1 100644 --- a/chainntnfs/txnotifier.go +++ b/chainntnfs/txnotifier.go @@ -887,9 +887,13 @@ func (n *TxNotifier) dispatchSpendDetails(ntfn *SpendNtfn, details *SpendDetail) // confirmation registration for. // // In the event that the transaction is relevant, a confirmation/spend -// notification will be dispatched to the relevant clients. Confirmation -// notifications will only be dispatched for transactions that have met the -// required number of confirmations required by the client. +// notification will be queued for dispatch to the relevant clients. +// Confirmation notifications will only be dispatched for transactions that have +// met the required number of confirmations required by the client. +// +// NOTE: In order to actually dispatch the relevant transaction notifications to +// clients, NotifyHeight must be called with the same block height in order to +// maintain correctness. func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32, txns []*btcutil.Tx) error { @@ -1017,14 +1021,24 @@ func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32, } } - // Now that we've determined which transactions were confirmed and which - // outpoints were spent within the new block, we can update their - // entries in their respective caches, along with all of our unconfirmed - // transactions and unspent outpoints. + // Finally, now that we've determined which transactions were confirmed + // and which outpoints were spent within the new block, we can update + // their entries in their respective caches, along with all of our + // unconfirmed transactions and unspent outpoints. n.updateHints(blockHeight) - // Next, we'll dispatch an update to all of the notification clients for - // our watched transactions with the number of confirmations left at + return nil +} + +// NotifyHeight dispatches confirmation and spend notifications to the clients +// who registered for a notification which has been fulfilled at the passed +// height. +func (n *TxNotifier) NotifyHeight(height uint32) error { + n.Lock() + defer n.Unlock() + + // First, we'll dispatch an update to all of the notification clients + // for our watched transactions with the number of confirmations left at // this new height. for _, txHashes := range n.txsByInitialHeight { for txHash := range txHashes { @@ -1032,7 +1046,7 @@ func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32, for _, ntfn := range confSet.ntfns { txConfHeight := confSet.details.BlockHeight + ntfn.NumConfirmations - 1 - numConfsLeft := txConfHeight - blockHeight + numConfsLeft := txConfHeight - height // Since we don't clear notifications until // transactions are no longer under the risk of @@ -1054,7 +1068,7 @@ func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32, // Then, we'll dispatch notifications for all the transactions that have // become confirmed at this new block height. - for ntfn := range n.ntfnsByConfirmHeight[blockHeight] { + for ntfn := range n.ntfnsByConfirmHeight[height] { confSet := n.confNotifications[*ntfn.TxID] Log.Infof("Dispatching %v conf notification for %v", @@ -1067,11 +1081,11 @@ func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32, return ErrTxNotifierExiting } } - delete(n.ntfnsByConfirmHeight, blockHeight) + delete(n.ntfnsByConfirmHeight, height) // We'll also dispatch spend notifications for all the outpoints that // were spent at this new block height. - for op := range n.opsBySpendHeight[blockHeight] { + for op := range n.opsBySpendHeight[height] { spendSet := n.spendNotifications[op] for _, ntfn := range spendSet.ntfns { err := n.dispatchSpendDetails(ntfn, spendSet.details) @@ -1084,8 +1098,8 @@ func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32, // Finally, we'll clear the entries from our set of notifications for // transactions and outpoints that are no longer under the risk of being // reorged out of the chain. - if blockHeight >= n.reorgSafetyLimit { - matureBlockHeight := blockHeight - n.reorgSafetyLimit + if height >= n.reorgSafetyLimit { + matureBlockHeight := height - n.reorgSafetyLimit for tx := range n.txsByInitialHeight[matureBlockHeight] { delete(n.confNotifications, tx) } diff --git a/chainntnfs/txnotifier_test.go b/chainntnfs/txnotifier_test.go index 98a62f2f..521dfdcc 100644 --- a/chainntnfs/txnotifier_test.go +++ b/chainntnfs/txnotifier_test.go @@ -165,12 +165,13 @@ func TestTxNotifierFutureConfDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3}, }) - err := n.ConnectTip( - block1.Hash(), 11, block1.Transactions(), - ) + err := n.ConnectTip(block1.Hash(), 11, block1.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(11); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // We should only receive one update for tx1 since it only requires // one confirmation and it already met it. @@ -232,6 +233,9 @@ func TestTxNotifierFutureConfDispatch(t *testing.T) { if err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(12); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // We should not receive any event notifications for tx1 since it has // already been confirmed. @@ -388,6 +392,9 @@ func TestTxNotifierHistoricalConfDispatch(t *testing.T) { if err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(11); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // We should not receive any event notifications for tx1 since it has // already been confirmed. @@ -462,6 +469,9 @@ func TestTxNotifierFutureSpendDispatch(t *testing.T) { if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(11); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } expectedSpendDetails := &chainntnfs.SpendDetail{ SpentOutPoint: &ntfn.OutPoint, @@ -491,6 +501,9 @@ func TestTxNotifierFutureSpendDispatch(t *testing.T) { if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(12); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } select { case <-ntfn.Event.Spend: @@ -570,6 +583,9 @@ func TestTxNotifierHistoricalSpendDispatch(t *testing.T) { if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(startingHeight + 1); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } select { case <-ntfn.Event.Spend: @@ -931,6 +947,9 @@ func TestTxNotifierCancelSpend(t *testing.T) { if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(startingHeight + 1); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // The first request should still be active, so we should receive a // spend notification with the correct spending details. @@ -1024,22 +1043,28 @@ func TestTxNotifierConfReorg(t *testing.T) { block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1}, }) - err := n.ConnectTip(nil, 8, block1.Transactions()) - if err != nil { + if err := n.ConnectTip(nil, 8, block1.Transactions()); err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = n.ConnectTip(nil, 9, nil) - if err != nil { + if err := n.NotifyHeight(8); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } + if err := n.ConnectTip(nil, 9, nil); err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(9); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } block2 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx2, &tx3}, }) - err = n.ConnectTip(nil, 10, block2.Transactions()) - if err != nil { + if err := n.ConnectTip(nil, 10, block2.Transactions()); err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(10); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // We should receive two updates for tx1 since it requires two // confirmations and it has already met them. @@ -1093,20 +1118,23 @@ func TestTxNotifierConfReorg(t *testing.T) { // The block that included tx2 and tx3 is disconnected and two next // blocks without them are connected. - err = n.DisconnectTip(10) - if err != nil { + if err := n.DisconnectTip(10); err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = n.ConnectTip(nil, 10, nil) - if err != nil { + if err := n.ConnectTip(nil, 10, nil); err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(10); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } - err = n.ConnectTip(nil, 11, nil) - if err != nil { + if err := n.ConnectTip(nil, 11, nil); err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(11); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } select { case reorgDepth := <-ntfn2.Event.NegativeConf: @@ -1151,15 +1179,21 @@ func TestTxNotifierConfReorg(t *testing.T) { }) block4 := btcutil.NewBlock(&wire.MsgBlock{}) - err = n.ConnectTip(block3.Hash(), 12, block3.Transactions()) + err := n.ConnectTip(block3.Hash(), 12, block3.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(12); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } err = n.ConnectTip(block4.Hash(), 13, block4.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(13); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // We should only receive one update for tx2 since it only requires // one confirmation and it already met it. @@ -1293,12 +1327,13 @@ func TestTxNotifierSpendReorg(t *testing.T) { block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{spendTx1}, }) - err := n.ConnectTip( - block1.Hash(), startingHeight+1, block1.Transactions(), - ) + err := n.ConnectTip(block1.Hash(), startingHeight+1, block1.Transactions()) if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(startingHeight + 1); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // We should receive a spend notification for the first outpoint with // its correct spending details. @@ -1322,12 +1357,13 @@ func TestTxNotifierSpendReorg(t *testing.T) { block2 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{spendTx2}, }) - err = n.ConnectTip( - block2.Hash(), startingHeight+2, block2.Transactions(), - ) + err = n.ConnectTip(block2.Hash(), startingHeight+2, block2.Transactions()) if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(startingHeight + 2); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // We should not receive another spend notification for the first // outpoint. @@ -1381,6 +1417,9 @@ func TestTxNotifierSpendReorg(t *testing.T) { if err != nil { t.Fatalf("unable to disconnect block: %v", err) } + if err := n.NotifyHeight(startingHeight + 2); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // We shouldn't receive notifications for either of the outpoints. select { @@ -1403,6 +1442,9 @@ func TestTxNotifierSpendReorg(t *testing.T) { if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(startingHeight + 3); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // We should now receive a spend notification once again for the second // outpoint containing the new spend details. @@ -1489,12 +1531,13 @@ func TestTxNotifierConfirmHintCache(t *testing.T) { Transactions: []*wire.MsgTx{&txDummy}, }) - err = n.ConnectTip( - block1.Hash(), txDummyHeight, block1.Transactions(), - ) + err = n.ConnectTip(block1.Hash(), txDummyHeight, block1.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(txDummyHeight); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // Since UpdateConfDetails has not been called for either transaction, // the height hints should remain unchanged. This simulates blocks @@ -1529,12 +1572,13 @@ func TestTxNotifierConfirmHintCache(t *testing.T) { Transactions: []*wire.MsgTx{&tx1}, }) - err = n.ConnectTip( - block2.Hash(), tx1Height, block2.Transactions(), - ) + err = n.ConnectTip(block2.Hash(), tx1Height, block2.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(tx1Height); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // Now that both notifications are waiting at tip for confirmations, // they should have their height hints updated to the latest block @@ -1563,12 +1607,13 @@ func TestTxNotifierConfirmHintCache(t *testing.T) { Transactions: []*wire.MsgTx{&tx2}, }) - err = n.ConnectTip( - block3.Hash(), tx2Height, block3.Transactions(), - ) + err = n.ConnectTip(block3.Hash(), tx2Height, block3.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } + if err := n.NotifyHeight(tx2Height); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // The height hint for the first transaction should remain the same. hint, err = hintCache.QueryConfirmHint(tx1Hash) @@ -1682,6 +1727,9 @@ func TestTxNotifierSpendHintCache(t *testing.T) { if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(dummyHeight); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // Since we haven't called UpdateSpendDetails on any of the test // outpoints, this implies that there is a still a pending historical @@ -1720,6 +1768,9 @@ func TestTxNotifierSpendHintCache(t *testing.T) { if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(op1Height); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // Both outpoints should have their spend hints reflect the height of // the new block being connected due to the first outpoint being spent @@ -1749,6 +1800,9 @@ func TestTxNotifierSpendHintCache(t *testing.T) { if err != nil { t.Fatalf("unable to connect block: %v", err) } + if err := n.NotifyHeight(op2Height); err != nil { + t.Fatalf("unable to dispatch notifications: %v", err) + } // Only the second outpoint should have its spend hint updated due to // being spent within the new block. The first outpoint's spend hint From e402a4e14627cb9bf6cc0061e81aeb1055a74284 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 11 Oct 2018 17:30:40 -0700 Subject: [PATCH 9/9] chainntnfs: dispatch conf/spend notifications after blocks In this commit, we alter the different ChainNotifier implementations to dispatch confirmation and spend notifications after blocks. We do this to ensure the external consistency of our registered clients. --- chainntnfs/bitcoindnotify/bitcoind.go | 23 ++++++++++++++--------- chainntnfs/btcdnotify/btcd.go | 22 +++++++++++++--------- chainntnfs/neutrinonotify/neutrino.go | 18 ++++++++---------- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index f4f51821..8c235213 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -568,14 +568,19 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash, // transactions included this block will processed to either send notifications // now or after numConfirmations confs. func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) error { + // First, we'll fetch the raw block as we'll need to gather all the + // transactions to determine whether any are relevant to our registered + // clients. rawBlock, err := b.chainConn.GetBlock(block.Hash) if err != nil { return fmt.Errorf("unable to get block: %v", err) } - txns := btcutil.NewBlock(rawBlock).Transactions() - err = b.txNotifier.ConnectTip( - block.Hash, uint32(block.Height), txns) + + // We'll then extend the txNotifier's height with the information of + // this new block, which will handle all of the notification logic for + // us. + err = b.txNotifier.ConnectTip(block.Hash, uint32(block.Height), txns) if err != nil { return fmt.Errorf("unable to connect tip: %v", err) } @@ -583,15 +588,15 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err chainntnfs.Log.Infof("New block: height=%v, sha=%v", block.Height, block.Hash) - // We want to set the best block before dispatching notifications so - // if any subscribers make queries based on their received block epoch, - // our state is fully updated in time. + // Now that we've guaranteed the new block extends the txNotifier's + // current tip, we'll proceed to dispatch notifications to all of our + // registered clients whom have had notifications fulfilled. Before + // doing so, we'll make sure update our in memory state in order to + // satisfy any client requests based upon the new block. b.bestBlock = block - // Lastly we'll notify any subscribed clients of the block. b.notifyBlockEpochs(block.Height, block.Hash) - - return nil + return b.txNotifier.NotifyHeight(uint32(block.Height)) } // notifyBlockEpochs notifies all registered block epoch clients of the newly diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index c7eecfbc..bb5e55a4 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -622,14 +622,13 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, startHeight, // TODO(halseth): this is reusing the neutrino notifier implementation, unify // them. func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { - // First process the block for our internal state. A new block has - // been connected to the main chain. Send out any N confirmation - // notifications which may have been triggered by this new block. + // First, we'll fetch the raw block as we'll need to gather all the + // transactions to determine whether any are relevant to our registered + // clients. rawBlock, err := b.chainConn.GetBlock(epoch.Hash) if err != nil { return fmt.Errorf("unable to get block: %v", err) } - newBlock := &filteredBlock{ hash: *epoch.Hash, height: uint32(epoch.Height), @@ -637,6 +636,9 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { connect: true, } + // We'll then extend the txNotifier's height with the information of + // this new block, which will handle all of the notification logic for + // us. err = b.txNotifier.ConnectTip( &newBlock.hash, newBlock.height, newBlock.txns, ) @@ -647,13 +649,15 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height, epoch.Hash) - // We want to set the best block before dispatching notifications so if - // any subscribers make queries based on their received block epoch, our - // state is fully updated in time. + // Now that we've guaranteed the new block extends the txNotifier's + // current tip, we'll proceed to dispatch notifications to all of our + // registered clients whom have had notifications fulfilled. Before + // doing so, we'll make sure update our in memory state in order to + // satisfy any client requests based upon the new block. b.bestBlock = epoch - b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - return nil + b.notifyBlockEpochs(epoch.Height, epoch.Hash) + return b.txNotifier.NotifyHeight(uint32(epoch.Height)) } // notifyBlockEpochs notifies all registered block epoch clients of the newly diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index f0d4da2e..34b8285f 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -542,9 +542,8 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, // transactions included this block will processed to either send notifications // now or after numConfirmations confs. func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { - // First process the block for our internal state. A new block has - // been connected to the main chain. Send out any N confirmation - // notifications which may have been triggered by this new block. + // We'll extend the txNotifier's height with the information of this new + // block, which will handle all of the notification logic for us. err := n.txNotifier.ConnectTip( &newBlock.hash, newBlock.height, newBlock.txns, ) @@ -555,16 +554,15 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height, newBlock.hash) - // We want to set the best block before dispatching notifications - // so if any subscribers make queries based on their received - // block epoch, our state is fully updated in time. + // Now that we've guaranteed the new block extends the txNotifier's + // current tip, we'll proceed to dispatch notifications to all of our + // registered clients whom have had notifications fulfilled. Before + // doing so, we'll make sure update our in memory state in order to + // satisfy any client requests based upon the new block. n.bestHeight = newBlock.height - // With all persistent changes committed, notify any subscribed clients - // of the block. n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - - return nil + return n.txNotifier.NotifyHeight(newBlock.height) } // getFilteredBlock is a utility to retrieve the full filtered block from a block epoch.