From d3bde428ea73b66f8f32fc0ccd28ed22943dd1a6 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 24 Aug 2018 18:51:30 -0700 Subject: [PATCH 01/27] chainntnfs/txconfnotifier_test: use tcn instead of txConfNotifier --- chainntnfs/txconfnotifier_test.go | 68 +++++++++++++++---------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index fb996d0a..82389130 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -113,7 +113,7 @@ func TestTxConfFutureDispatch(t *testing.T) { ) hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) + tcn := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier before including them in a block to receive future @@ -124,7 +124,7 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - if err := txConfNotifier.Register(&ntfn1); err != nil { + if err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -134,7 +134,7 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if err := txConfNotifier.Register(&ntfn2); err != nil { + if err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -162,7 +162,7 @@ func TestTxConfFutureDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3}, }) - err := txConfNotifier.ConnectTip( + err = tcn.ConnectTip( block1.Hash(), 11, block1.Transactions(), ) if err != nil { @@ -225,7 +225,7 @@ func TestTxConfFutureDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx3}, }) - err = txConfNotifier.ConnectTip(block2.Hash(), 12, block2.Transactions()) + err = tcn.ConnectTip(block2.Hash(), 12, block2.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -287,7 +287,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { ) hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) + tcn := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions at a height before the TxConfNotifier's // starting height so that they are confirmed once registering them. @@ -298,7 +298,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - if err := txConfNotifier.Register(&ntfn1); err != nil { + if err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -309,7 +309,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if err := txConfNotifier.Register(&ntfn2); err != nil { + if err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -320,7 +320,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { BlockHeight: 9, TxIndex: 1, } - err := txConfNotifier.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1) + err := tcn.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1) if err != nil { t.Fatalf("unable to update conf details: %v", err) } @@ -353,7 +353,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { BlockHeight: 9, TxIndex: 2, } - err = txConfNotifier.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2) + err = tcn.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2) if err != nil { t.Fatalf("unable to update conf details: %v", err) } @@ -381,7 +381,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx3}, }) - err = txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + err = tcn.ConnectTip(block.Hash(), 11, block.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -439,7 +439,7 @@ func TestTxConfChainReorg(t *testing.T) { ) hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100, hintCache) + tcn := chainntnfs.NewTxConfNotifier(7, 100, hintCache) // Tx 1 will be confirmed in block 9 and requires 2 confs. tx1Hash := tx1.TxHash() @@ -448,7 +448,7 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - if err := txConfNotifier.Register(&ntfn1); err != nil { + if err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -459,7 +459,7 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if err := txConfNotifier.Register(&ntfn2); err != nil { + if err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -470,7 +470,7 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx3NumConfs, Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), } - if err := txConfNotifier.Register(&ntfn3); err != nil { + if err := tcn.Register(&ntfn3); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -478,11 +478,11 @@ func TestTxConfChainReorg(t *testing.T) { block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1}, }) - err := txConfNotifier.ConnectTip(nil, 8, block1.Transactions()) + err := tcn.ConnectTip(nil, 8, block1.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = txConfNotifier.ConnectTip(nil, 9, nil) + err = tcn.ConnectTip(nil, 9, nil) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -490,7 +490,7 @@ func TestTxConfChainReorg(t *testing.T) { block2 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx2, &tx3}, }) - err = txConfNotifier.ConnectTip(nil, 10, block2.Transactions()) + err = tcn.ConnectTip(nil, 10, block2.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -547,17 +547,17 @@ func TestTxConfChainReorg(t *testing.T) { // The block that included tx2 and tx3 is disconnected and two next // blocks without them are connected. - err = txConfNotifier.DisconnectTip(10) + err = tcn.DisconnectTip(10) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = txConfNotifier.ConnectTip(nil, 10, nil) + err = tcn.ConnectTip(nil, 10, nil) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = txConfNotifier.ConnectTip(nil, 11, nil) + err = tcn.ConnectTip(nil, 11, nil) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -605,12 +605,12 @@ func TestTxConfChainReorg(t *testing.T) { }) block4 := btcutil.NewBlock(&wire.MsgBlock{}) - err = txConfNotifier.ConnectTip(block3.Hash(), 12, block3.Transactions()) + err = tcn.ConnectTip(block3.Hash(), 12, block3.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = txConfNotifier.ConnectTip(block4.Hash(), 13, block4.Transactions()) + err = tcn.ConnectTip(block4.Hash(), 13, block4.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -687,7 +687,7 @@ func TestTxConfHeightHintCache(t *testing.T) { // Initialize our TxConfNotifier instance backed by a height hint cache. hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier( + tcn := chainntnfs.NewTxConfNotifier( startingHeight, 100, hintCache, ) @@ -708,10 +708,10 @@ func TestTxConfHeightHintCache(t *testing.T) { Event: chainntnfs.NewConfirmationEvent(2), } - if err := txConfNotifier.Register(ntfn1); err != nil { + if err := tcn.Register(ntfn1); err != nil { t.Fatalf("unable to register tx1: %v", err) } - if err := txConfNotifier.Register(ntfn2); err != nil { + if err := tcn.Register(ntfn2); err != nil { t.Fatalf("unable to register tx2: %v", err) } @@ -739,7 +739,7 @@ func TestTxConfHeightHintCache(t *testing.T) { Transactions: []*wire.MsgTx{&tx1}, }) - err = txConfNotifier.ConnectTip( + err = tcn.ConnectTip( block1.Hash(), tx1Height, block1.Transactions(), ) if err != nil { @@ -772,7 +772,7 @@ func TestTxConfHeightHintCache(t *testing.T) { Transactions: []*wire.MsgTx{&tx2}, }) - err = txConfNotifier.ConnectTip( + err = tcn.ConnectTip( block2.Hash(), tx2Height, block2.Transactions(), ) if err != nil { @@ -800,7 +800,7 @@ func TestTxConfHeightHintCache(t *testing.T) { // Now, we'll attempt do disconnect the last block in order to simulate // a chain reorg. - if err := txConfNotifier.DisconnectTip(tx2Height); err != nil { + if err := tcn.DisconnectTip(tx2Height); err != nil { t.Fatalf("Failed to disconnect block: %v", err) } @@ -824,7 +824,7 @@ func TestTxConfTearDown(t *testing.T) { ) hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) + tcn := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier to receive notifications. @@ -834,7 +834,7 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 1, Event: chainntnfs.NewConfirmationEvent(1), } - if err := txConfNotifier.Register(&ntfn1); err != nil { + if err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -844,7 +844,7 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 2, Event: chainntnfs.NewConfirmationEvent(2), } - if err := txConfNotifier.Register(&ntfn2); err != nil { + if err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -854,7 +854,7 @@ func TestTxConfTearDown(t *testing.T) { Transactions: []*wire.MsgTx{&tx1, &tx2}, }) - err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + err := tcn.ConnectTip(block.Hash(), 11, block.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -890,7 +890,7 @@ func TestTxConfTearDown(t *testing.T) { // The notification channels should be closed for notifications that // have not been dispatched yet, so we should not expect to receive any // more updates. - txConfNotifier.TearDown() + tcn.TearDown() // tx1 should not receive any more updates because it has already been // confirmed and the TxConfNotifier has been shut down. From 7661d00d5ad47f5317436957381b826cf8b6fca1 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 24 Aug 2018 19:15:37 -0700 Subject: [PATCH 02/27] chainntnfs/txconfnotifier_test: update height hint cache test --- chainntnfs/txconfnotifier_test.go | 129 ++++++++++++++++++++++-------- 1 file changed, 94 insertions(+), 35 deletions(-) diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 82389130..afb6474a 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -128,6 +128,7 @@ func TestTxConfFutureDispatch(t *testing.T) { t.Fatalf("unable to register ntfn: %v", err) } + err := tcn.UpdateConfDetails(*ntfn1.TxID, 0, nil) tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, @@ -675,14 +676,18 @@ func TestTxConfChainReorg(t *testing.T) { } // TestTxConfHeightHintCache ensures that the height hints for transactions are -// kept track of correctly with each new block connected/disconnected. +// kept track of correctly with each new block connected/disconnected. This test +// also asserts that the height hints are not updated until the simulated +// historical dispatches have returned, and we know the transactions aren't +// already in the chain. func TestTxConfHeightHintCache(t *testing.T) { t.Parallel() const ( - startingHeight = 10 - tx1Height = 11 - tx2Height = 12 + startingHeight = 200 + txDummyHeight = 201 + tx1Height = 202 + tx2Height = 203 ) // Initialize our TxConfNotifier instance backed by a height hint cache. @@ -715,65 +720,105 @@ func TestTxConfHeightHintCache(t *testing.T) { t.Fatalf("unable to register tx2: %v", err) } - // Both transactions should have a height hint of the starting height - // due to registering notifications for them. - hint, err := hintCache.QueryConfirmHint(tx1Hash) - if err != nil { - t.Fatalf("unable to query for hint: %v", err) - } - if hint != startingHeight { - t.Fatalf("expected hint %d, got %d", startingHeight, hint) + // Both transactions should not have a height hint set, as Register + // should not alter the cache state. + _, err := hintCache.QueryConfirmHint(tx1Hash) + if err != chainntnfs.ErrConfirmHintNotFound { + t.Fatalf("unexpected error when querying for height hint "+ + "want: %v, got %v", + chainntnfs.ErrConfirmHintNotFound, err) } - hint, err = hintCache.QueryConfirmHint(tx2Hash) - if err != nil { - t.Fatalf("unable to query for hint: %v", err) - } - if hint != startingHeight { - t.Fatalf("expected hint %d, got %d", startingHeight, hint) + _, err = hintCache.QueryConfirmHint(tx2Hash) + if err != chainntnfs.ErrConfirmHintNotFound { + t.Fatalf("unexpected error when querying for height hint "+ + "want: %v, got %v", + chainntnfs.ErrConfirmHintNotFound, err) } - // Create a new block that will include the first transaction and extend + // Create a new block that will include the dummy transaction and extend // the chain. + txDummy := wire.MsgTx{Version: 3} block1 := btcutil.NewBlock(&wire.MsgBlock{ - Transactions: []*wire.MsgTx{&tx1}, + Transactions: []*wire.MsgTx{&txDummy}, }) err = tcn.ConnectTip( - block1.Hash(), tx1Height, block1.Transactions(), + block1.Hash(), txDummyHeight, block1.Transactions(), ) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - // The height hint for the first transaction should now be updated to - // reflect its confirmation. + // Since UpdateConfDetails has not been called for either transaction, + // the height hints should remain unchanged. This simulates blocks + // confirming while the historical dispatch is processing the + // registration. + hint, err := hintCache.QueryConfirmHint(tx1Hash) + if err != chainntnfs.ErrConfirmHintNotFound { + t.Fatalf("unexpected error when querying for height hint "+ + "want: %v, got %v", + chainntnfs.ErrConfirmHintNotFound, err) + } + + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != chainntnfs.ErrConfirmHintNotFound { + t.Fatalf("unexpected error when querying for height hint "+ + "want: %v, got %v", + chainntnfs.ErrConfirmHintNotFound, err) + } + + // Now, update the conf details reporting that the neither txn was found + // in the historical dispatch. + if err := tcn.UpdateConfDetails(tx1Hash, 0, nil); err != nil { + t.Fatalf("unable to update conf details: %v", err) + } + if err := tcn.UpdateConfDetails(tx2Hash, 0, nil); err != nil { + t.Fatalf("unable to update conf details: %v", err) + } + + // We'll create another block that will include the first transaction + // and extend the chain. + block2 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1}, + }) + + err = tcn.ConnectTip( + block2.Hash(), tx1Height, block2.Transactions(), + ) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // Now that both notifications are waiting at tip for confirmations, + // they should have their height hints updated to the latest block + // height. hint, err = hintCache.QueryConfirmHint(tx1Hash) if err != nil { t.Fatalf("unable to query for hint: %v", err) } if hint != tx1Height { - t.Fatalf("expected hint %d, got %d", tx1Height, hint) + t.Fatalf("expected hint %d, got %d", + tx1Height, hint) } - // The height hint for the second transaction should also be updated due - // to it still being unconfirmed. hint, err = hintCache.QueryConfirmHint(tx2Hash) if err != nil { t.Fatalf("unable to query for hint: %v", err) } if hint != tx1Height { - t.Fatalf("expected hint %d, got %d", tx1Height, hint) + t.Fatalf("expected hint %d, got %d", + tx2Height, hint) } - // Now, we'll create another block that will include the second + // Next, we'll create another block that will include the second // transaction and extend the chain. - block2 := btcutil.NewBlock(&wire.MsgBlock{ + block3 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx2}, }) err = tcn.ConnectTip( - block2.Hash(), tx2Height, block2.Transactions(), + block3.Hash(), tx2Height, block3.Transactions(), ) if err != nil { t.Fatalf("Failed to connect block: %v", err) @@ -785,7 +830,8 @@ func TestTxConfHeightHintCache(t *testing.T) { t.Fatalf("unable to query for hint: %v", err) } if hint != tx1Height { - t.Fatalf("expected hint %d, got %d", tx1Height, hint) + t.Fatalf("expected hint %d, got %d", + tx1Height, hint) } // The height hint for the second transaction should now be updated to @@ -795,11 +841,12 @@ func TestTxConfHeightHintCache(t *testing.T) { t.Fatalf("unable to query for hint: %v", err) } if hint != tx2Height { - t.Fatalf("expected hint %d, got %d", tx2Height, hint) + t.Fatalf("expected hint %d, got %d", + tx2Height, hint) } - // Now, we'll attempt do disconnect the last block in order to simulate - // a chain reorg. + // Finally, we'll attempt do disconnect the last block in order to + // simulate a chain reorg. if err := tcn.DisconnectTip(tx2Height); err != nil { t.Fatalf("Failed to disconnect block: %v", err) } @@ -811,7 +858,19 @@ func TestTxConfHeightHintCache(t *testing.T) { t.Fatalf("unable to query for hint: %v", err) } if hint != tx1Height { - t.Fatalf("expected hint %d, got %d", tx1Height, hint) + t.Fatalf("expected hint %d, got %d", + tx1Height, hint) + } + + // The first transaction's height hint should remain at the original + // confirmation height. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", + tx1Height, hint) } } From 012d17efaa1d8099c2cc606d09488703c49514b6 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 24 Aug 2018 19:23:54 -0700 Subject: [PATCH 03/27] chainntnfs/txnotifier_test: update nil spend details to restore tests --- chainntnfs/txconfnotifier_test.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index afb6474a..e9d49f10 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -128,7 +128,6 @@ func TestTxConfFutureDispatch(t *testing.T) { t.Fatalf("unable to register ntfn: %v", err) } - err := tcn.UpdateConfDetails(*ntfn1.TxID, 0, nil) tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, @@ -163,7 +162,7 @@ func TestTxConfFutureDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3}, }) - err = tcn.ConnectTip( + err := tcn.ConnectTip( block1.Hash(), 11, block1.Transactions(), ) if err != nil { @@ -453,6 +452,10 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn1.TxID, 0, nil); err != nil { + t.Fatalf("unable to deliver conf details: %v", err) + } + // Tx 2 will be confirmed in block 10 and requires 1 conf. tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -464,6 +467,10 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn2.TxID, 0, nil); err != nil { + t.Fatalf("unable to deliver conf details: %v", err) + } + // Tx 3 will be confirmed in block 10 and requires 2 confs. tx3Hash := tx3.TxHash() ntfn3 := chainntnfs.ConfNtfn{ @@ -475,6 +482,10 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn3.TxID, 0, nil); err != nil { + t.Fatalf("unable to deliver conf details: %v", err) + } + // Sync chain to block 10. Txs 1 & 2 should be confirmed. block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1}, @@ -896,6 +907,9 @@ func TestTxConfTearDown(t *testing.T) { if err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn1.TxID, 0, nil); err != nil { + t.Fatalf("unable to update conf details: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -906,6 +920,9 @@ func TestTxConfTearDown(t *testing.T) { if err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn2.TxID, 0, nil); err != nil { + t.Fatalf("unable to update conf details: %v", err) + } // Include the transactions in a block and add it to the TxConfNotifier. // This should confirm tx1, but not tx2. From 2f0b5596da6db381448b8a71f747d0fcf7e89291 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 28 Sep 2018 16:25:20 -0700 Subject: [PATCH 04/27] chainntnfs/txconfnotifier: add rescanStates --- chainntnfs/txconfnotifier.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 0a951851..4914c2ff 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -101,6 +101,27 @@ type TxConfNotifier struct { sync.Mutex } +// rescanState indicates the progression of a registration before the notifier +// can begin dispatching confirmations at tip. +type rescanState uint8 + +const ( + // rescanNotStarted is the initial state, denoting that a historical + // dispatch may be required. + rescanNotStarted rescanState = iota + + // rescanPending indicates that a dispatch has already been made, and we + // are waiting for its completion. No other rescans should be dispatched + // while in this state. + rescanPending + + // rescanComplete signals either that a rescan was dispatched and has + // completed, or that we began watching at tip immediately. In either + // case, the notifier can only dispatch notifications from tip when in + // this state. + rescanComplete +) + // NewTxConfNotifier creates a TxConfNotifier. The current height of the // blockchain is accepted as a parameter. func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, From 1babec971f974319093691dafc81a7475781ca94 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 24 Aug 2018 19:29:55 -0700 Subject: [PATCH 05/27] chainntnfs/txconfnotifier: isolate scanning ntfns --- chainntnfs/txconfnotifier.go | 252 ++++++++++++++++++++++------------- 1 file changed, 162 insertions(+), 90 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 4914c2ff..099066c1 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -77,7 +77,7 @@ type TxConfNotifier struct { // confNotifications is an index of notification requests by transaction // hash. - confNotifications map[chainhash.Hash]map[uint64]*ConfNtfn + confNotifications map[chainhash.Hash]*confNtfnSet // txsByInitialHeight is an index of watched transactions by the height // that they are included at in the blockchain. This is tracked so that @@ -122,6 +122,26 @@ const ( rescanComplete ) +// confNtfnSet holds all known, registered confirmation notifications for a +// single txid. If duplicates notifications are requested, only one historical +// dispatch will be spawned to ensure redundant scans are not permitted. A +// single conf detail will be constructed and dispatched to all interested +// clients. +type confNtfnSet struct { + ntfns map[uint64]*ConfNtfn + rescanStatus rescanState + details *TxConfirmation +} + +// newConfNtfnSet constructs a fresh confNtfnSet for a group of clients +// interested in a notification for a particular txid. +func newConfNtfnSet() *confNtfnSet { + return &confNtfnSet{ + ntfns: make(map[uint64]*ConfNtfn), + rescanStatus: rescanNotStarted, + } +} + // NewTxConfNotifier creates a TxConfNotifier. The current height of the // blockchain is accepted as a parameter. func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, @@ -130,7 +150,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, return &TxConfNotifier{ currentHeight: startHeight, reorgSafetyLimit: reorgSafetyLimit, - confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn), + confNotifications: make(map[chainhash.Hash]*confNtfnSet), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), hintCache: hintCache, @@ -145,35 +165,58 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, // the confirmation details must be provided with the UpdateConfDetails method, // otherwise we will wait for the transaction to confirm even though it already // has. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) (bool, uint32, error) { select { case <-tcn.quit: - return ErrTxConfNotifierExiting + return false, 0, ErrTxConfNotifierExiting default: } tcn.Lock() defer tcn.Unlock() - ntfns, ok := tcn.confNotifications[*ntfn.TxID] - if !ok { - ntfns = make(map[uint64]*ConfNtfn) - tcn.confNotifications[*ntfn.TxID] = ntfns + // TODO(conner): promote immediately to confNotifications if a + // historical dispatch has already completed. - err := tcn.hintCache.CommitConfirmHint( - tcn.currentHeight, *ntfn.TxID, - ) - if err != nil { - // The error is not fatal, so we should not return an - // error to the caller. - Log.Errorf("Unable to update confirm hint to %d for "+ - "%v: %v", tcn.currentHeight, *ntfn.TxID, err) - } + confSet, ok := tcn.confNotifications[*ntfn.TxID] + if !ok { + confSet = newConfNtfnSet() + tcn.confNotifications[*ntfn.TxID] = confSet } - ntfns[ntfn.ConfID] = ntfn + confSet.ntfns[ntfn.ConfID] = ntfn - return nil + switch confSet.rescanStatus { + + // A prior rescan has already completed and we are actively watching at + // tip for this txid. + case rescanComplete: + return nil, nil + + // A rescan is already in progress, return here to prevent dispatching + // another. When the scan returns, this notifications details will be + // updated as well. + case rescanPending: + return nil, nil + + // If no rescan has been dispatched, attempt to do so now. + case rescanNotStarted: + } + + // If the provided or cached height hint indicates that the transaction + // is to be confirmed at a height greater than the conf notifier's + // current height, we'll refrain from spawning a historical dispatch. + if startHeight > tcn.currentHeight { + // Set the rescan status to complete, which will allow the conf + // notifier to start delivering messages for this set + // immediately. + confSet.rescanStatus = rescanComplete + return nil, nil + } + + // Set this confSet's status to pending, ensuring subsequent + // registrations don't also attempt a dispatch. + confSet.rescanStatus = rescanPending } // UpdateConfDetails attempts to update the confirmation details for an active @@ -198,19 +241,21 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, // First, we'll determine whether we have an active notification for // this transaction with the given ID. - ntfns, ok := tcn.confNotifications[txid] + confSet, ok := tcn.confNotifications[txid] if !ok { - return fmt.Errorf("no notifications found for txid %v", txid) + return fmt.Errorf("no notification found with TxID %v", txid) } - ntfn, ok := ntfns[clientID] - if !ok { - return fmt.Errorf("no notification found with ID %v", clientID) - } + // The historical dispatch has been completed for this confSet. We'll + // update the rescan status and cache any details that were found. If + // the details are nil, that implies we did not find them and will + // continue to watch for them at tip. + confSet.rescanStatus = rescanComplete - // If the notification has already recognized that the transaction - // confirmed, there's nothing left for us to do. - if ntfn.details != nil { + // The notifier has yet to reach the height at which the transaction was + // included in a block, so we should defer until handling it then within + // ConnectTip. + if details == nil || details.BlockHeight > tcn.currentHeight { return nil } @@ -222,67 +267,72 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, details.BlockHeight, txid, err) } - // The notifier has yet to reach the height at which the transaction was - // included in a block, so we should defer until handling it then within - // ConnectTip. - if details == nil || details.BlockHeight > tcn.currentHeight { - return nil - } - - ntfn.details = details - - // Now, we'll examine whether the transaction of this notification - // request has reached its required number of confirmations. If it has, - // we'll disaptch a confirmation notification to the caller. - confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 - if confHeight <= tcn.currentHeight { - Log.Infof("Dispatching %v conf notification for %v", - ntfn.NumConfirmations, ntfn.TxID) - - // We'll send a 0 value to the Updates channel, indicating that - // the transaction has already been confirmed. - select { - case ntfn.Event.Updates <- 0: - case <-tcn.quit: - return ErrTxConfNotifierExiting + // Update the conf details of all ntfns that don't yet have them. + for _, ntfn := range confSet.ntfns { + if ntfn.details != nil { + continue } - select { - case ntfn.Event.Confirmed <- details: - ntfn.dispatched = true - case <-tcn.quit: - return ErrTxConfNotifierExiting - } - } else { - // Otherwise, we'll keep track of the notification request by - // the height at which we should dispatch the confirmation - // notification. - ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] - if !exists { - ntfnSet = make(map[*ConfNtfn]struct{}) - tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet - } - ntfnSet[ntfn] = struct{}{} + ntfn.details = details - // We'll also send an update to the client of how many - // confirmations are left for the transaction to be confirmed. - numConfsLeft := confHeight - tcn.currentHeight - select { - case ntfn.Event.Updates <- numConfsLeft: - case <-tcn.quit: - return ErrTxConfNotifierExiting - } - } + // Now, we'll examine whether the transaction of this + // notification request has reached its required number of + // confirmations. If it has, we'll dispatch a confirmation + // notification to the caller. + confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 + if confHeight <= tcn.currentHeight { + Log.Infof("Dispatching %v conf notification for %v", + ntfn.NumConfirmations, ntfn.TxID) - // As a final check, we'll also watch the transaction if it's still - // possible for it to get reorged out of the chain. - if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { - txSet, exists := tcn.txsByInitialHeight[details.BlockHeight] - if !exists { - txSet = make(map[chainhash.Hash]struct{}) - tcn.txsByInitialHeight[details.BlockHeight] = txSet + // We'll send a 0 value to the Updates channel, + // indicating that the transaction has already been + // confirmed. + select { + case ntfn.Event.Updates <- 0: + case <-tcn.quit: + return ErrTxConfNotifierExiting + } + + select { + case ntfn.Event.Confirmed <- details: + ntfn.dispatched = true + case <-tcn.quit: + return ErrTxConfNotifierExiting + } + } else { + // Otherwise, we'll keep track of the notification + // request by the height at which we should dispatch the + // confirmation notification. + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + ntfnSet = make(map[*ConfNtfn]struct{}) + tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet + } + ntfnSet[ntfn] = struct{}{} + + // We'll also send an update to the client of how many + // confirmations are left for the transaction to be + // confirmed. + numConfsLeft := confHeight - tcn.currentHeight + select { + case ntfn.Event.Updates <- numConfsLeft: + case <-tcn.quit: + return ErrTxConfNotifierExiting + } + } + + // As a final check, we'll also watch the transaction if it's + // still possible for it to get reorged out of the chain. + blockHeight := details.BlockHeight + reorgSafeHeight := blockHeight + tcn.reorgSafetyLimit + if reorgSafeHeight > tcn.currentHeight { + txSet, exists := tcn.txsByInitialHeight[blockHeight] + if !exists { + txSet = make(map[chainhash.Hash]struct{}) + tcn.txsByInitialHeight[blockHeight] = txSet + } + txSet[txid] = struct{}{} } - txSet[txid] = struct{}{} } return nil @@ -320,7 +370,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, // handled correctly. for _, tx := range txns { txHash := tx.Hash() - for _, ntfn := range tcn.confNotifications[*txHash] { + confSet, ok := tcn.confNotifications[*txHash] + if !ok { + continue + } + + for _, ntfn := range confSet.ntfns { ntfn.details = &TxConfirmation{ BlockHash: blockHash, BlockHeight: blockHeight, @@ -356,7 +411,11 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, txsToUpdateHints = append(txsToUpdateHints, confirmedTx) } out: - for maybeUnconfirmedTx := range tcn.confNotifications { + for maybeUnconfirmedTx, confSet := range tcn.confNotifications { + if confSet.rescanStatus != rescanComplete { + continue + } + for height, confirmedTxs := range tcn.txsByInitialHeight { // Skip the transactions that confirmed at the new block // height as those have already been added. @@ -391,7 +450,8 @@ out: // this new height. for _, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { - for _, ntfn := range tcn.confNotifications[txHash] { + confSet := tcn.confNotifications[txHash] + for _, ntfn := range confSet.ntfns { // If the notification hasn't learned about the // confirmation of its transaction yet (in the // case of historical confirmations), we'll skip @@ -491,7 +551,8 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // clients is always non-blocking. for initialHeight, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { - for _, ntfn := range tcn.confNotifications[txHash] { + confSet := tcn.confNotifications[txHash] + for _, ntfn := range confSet.ntfns { // First, we'll attempt to drain an update // from each notification to ensure sends to the // Updates channel are always non-blocking. @@ -544,6 +605,17 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { continue } delete(ntfnSet, ntfn) + + // Intuitively, we should also remove + // the txHash from confNotifications if + // the ntfnSet is now empty. However, we + // will not do so since we may want to + // continue rewinding the height hints + // for this txid. + // + // NOTE(conner): safe to delete if + // blockHeight is below client-provided + // height hint? } } } @@ -565,8 +637,8 @@ func (tcn *TxConfNotifier) TearDown() { close(tcn.quit) - for _, ntfns := range tcn.confNotifications { - for _, ntfn := range ntfns { + for _, confSet := range tcn.confNotifications { + for _, ntfn := range confSet.ntfns { if ntfn.dispatched { continue } From a1756b0b1bf73a75d22c13d9d90261fcb30d13d4 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 24 Aug 2018 19:35:17 -0700 Subject: [PATCH 06/27] chainntnfs/bitcoind+btcd+neutrino: pass nil conf details --- chainntnfs/bitcoindnotify/bitcoind.go | 21 +++++++++++++-------- chainntnfs/btcdnotify/btcd.go | 21 +++++++++++++-------- chainntnfs/neutrinonotify/neutrino.go | 22 +++++++++++++++------- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 7dc47170..d3c19722 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -278,14 +278,19 @@ out: return } - if confDetails != nil { - err := b.txConfNotifier.UpdateConfDetails( - *msg.TxID, msg.ConfID, - confDetails, - ) - if err != nil { - chainntnfs.Log.Error(err) - } + // If the historical dispatch finished + // without error, we will invoke + // UpdateConfDetails even if none were + // found. This allows the notifier to + // begin safely updating the height hint + // cache at tip, since any pending + // rescans have now completed. + err = b.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) } }() diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 209b9744..968a6c35 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -348,14 +348,19 @@ out: return } - if confDetails != nil { - err = b.txConfNotifier.UpdateConfDetails( - *msg.TxID, msg.ConfID, - confDetails, - ) - if err != nil { - chainntnfs.Log.Error(err) - } + // If the historical dispatch finished + // without error, we will invoke + // UpdateConfDetails even if none were + // found. This allows the notifier to + // begin safely updating the height hint + // cache at tip, since any pending + // rescans have now completed. + err = b.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) } }() diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index f410b7b8..9f76b14a 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -353,14 +353,22 @@ out: chainntnfs.Log.Error(err) } + // If the historical dispatch finished + // without error, we will invoke + // UpdateConfDetails even if none were + // found. This allows the notifier to + // begin safely updating the height hint + // cache at tip, since any pending + // rescans have now completed. + err = n.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + if confDetails != nil { - err := n.txConfNotifier.UpdateConfDetails( - *msg.TxID, msg.ConfID, - confDetails, - ) - if err != nil { - chainntnfs.Log.Error(err) - } return } From 8b8007bb5a3a400ea336f8e45e6abd6905029adf Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 24 Aug 2018 20:09:11 -0700 Subject: [PATCH 07/27] chainntnfs/txconfnotifier: query conf hint in Register --- chainntnfs/txconfnotifier.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 099066c1..e64da77d 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -35,6 +35,11 @@ type ConfNtfn struct { // be sent over. Event *ConfirmationEvent + // HeightHint is the minimum height in the chain that we expect to find + // this txid. This value will be overridden by the height hint cache if + // a more recent value is available. + HeightHint uint32 + // details describes the transaction's position is the blockchain. May be // nil for unconfirmed transactions. details *TxConfirmation @@ -172,6 +177,17 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) (bool, uint32, error) { default: } + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + hint, err := tcn.hintCache.QueryConfirmHint(*ntfn.TxID) + if err == nil { + if hint > ntfn.HeightHint { + Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, *ntfn.TxID) + ntfn.HeightHint = hint + } + } + tcn.Lock() defer tcn.Unlock() From cf7700e6cb29963031e06e7fa7b32d42e8a9a0e3 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 24 Aug 2018 20:09:53 -0700 Subject: [PATCH 08/27] chainntnfs/bitcoind+btcd+neutrino: let tcn query for height hint --- chainntnfs/bitcoindnotify/bitcoind.go | 37 +++++++-------------------- chainntnfs/btcdnotify/btcd.go | 37 +++++++-------------------- chainntnfs/neutrinonotify/neutrino.go | 23 +++++------------ 3 files changed, 25 insertions(+), 72 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index d3c19722..c592c3c8 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -254,7 +254,7 @@ out: } b.spendNotifications[op][msg.spendID] = msg - case *confirmationNotification: + case *chainntnfs.ConfNtfn: chainntnfs.Log.Infof("New confirmation "+ "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) @@ -270,7 +270,7 @@ out: defer b.wg.Done() confDetails, _, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, + msg.TxID, msg.HeightHint, currentHeight, ) if err != nil { @@ -948,42 +948,23 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint, return ErrTransactionNotFound } -// confirmationNotification represents a client's intent to receive a -// notification once the target txid reaches numConfirmations confirmations. -type confirmationNotification struct { - chainntnfs.ConfNtfn - heightHint uint32 -} - // RegisterConfirmationsNtfn registers a notification with BitcoindNotifier // which will be triggered once the txid reaches numConfs number of // confirmations. func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { - // Before proceeding to register the notification, we'll query our - // height hint cache to determine whether a better one exists. - if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil { - if hint > heightHint { - chainntnfs.Log.Debugf("Using height hint %d retrieved "+ - "from cache for %v", hint, txid) - heightHint = hint - } - } - // Construct a notification request for the transaction and send it to // the main event loop. - ntfn := &confirmationNotification{ - ConfNtfn: chainntnfs.ConfNtfn{ - ConfID: atomic.AddUint64(&b.confClientCounter, 1), - TxID: txid, - NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(numConfs), - }, - heightHint: heightHint, + ntfn := &chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&b.confClientCounter, 1), + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(numConfs), + HeightHint: heightHint, } - if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + if err := b.txConfNotifier.Register(ntfn); err != nil { return nil, err } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 968a6c35..29d020d0 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -324,7 +324,7 @@ out: } b.spendNotifications[op][msg.spendID] = msg - case *confirmationNotification: + case *chainntnfs.ConfNtfn: chainntnfs.Log.Infof("New confirmation "+ "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) @@ -340,7 +340,7 @@ out: defer b.wg.Done() confDetails, _, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, + msg.TxID, msg.HeightHint, bestHeight, ) if err != nil { @@ -1008,42 +1008,23 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, }, nil } -// confirmationNotification represents a client's intent to receive a -// notification once the target txid reaches numConfirmations confirmations. -type confirmationNotification struct { - chainntnfs.ConfNtfn - heightHint uint32 -} - // RegisterConfirmationsNtfn registers a notification with BtcdNotifier // which will be triggered once the txid reaches numConfs number of // confirmations. func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { - // Before proceeding to register the notification, we'll query our - // height hint cache to determine whether a better one exists. - if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil { - if hint > heightHint { - chainntnfs.Log.Debugf("Using height hint %d retrieved "+ - "from cache for %v", hint, txid) - heightHint = hint - } - } - // Construct a notification request for the transaction and send it to // the main event loop. - ntfn := &confirmationNotification{ - ConfNtfn: chainntnfs.ConfNtfn{ - ConfID: atomic.AddUint64(&b.confClientCounter, 1), - TxID: txid, - NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(numConfs), - }, - heightHint: heightHint, + ntfn := &chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&b.confClientCounter, 1), + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(numConfs), + HeightHint: heightHint, } - if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + if err := b.txConfNotifier.Register(ntfn); err != nil { return nil, err } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 9f76b14a..1d3741c1 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -317,7 +317,8 @@ out: case *confirmationsNotification: chainntnfs.Log.Infof("New confirmations subscription: "+ "txid=%v, numconfs=%v, height_hint=%v", - msg.TxID, msg.NumConfirmations, msg.heightHint) + msg.TxID, msg.NumConfirmations, + msg.ConfNtfn.HeightHint) // If the notification can be partially or // fully dispatched, then we can skip the first @@ -335,7 +336,8 @@ out: defer n.wg.Done() confDetails, err := n.historicalConfDetails( - msg.TxID, msg.pkScript, currentHeight, msg.heightHint, + msg.TxID, msg.pkScript, currentHeight, + msg.ConfNtfn.HeightHint, ) if err != nil { chainntnfs.Log.Error(err) @@ -924,8 +926,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // notification once the target txid reaches numConfirmations confirmations. type confirmationsNotification struct { chainntnfs.ConfNtfn - heightHint uint32 - pkScript []byte + pkScript []byte } // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier @@ -935,16 +936,6 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { - // Before proceeding to register the notification, we'll query our - // height hint cache to determine whether a better one exists. - if hint, err := n.confirmHintCache.QueryConfirmHint(*txid); err == nil { - if hint > heightHint { - chainntnfs.Log.Debugf("Using height hint %d retrieved "+ - "from cache for %v", hint, txid) - heightHint = hint - } - } - // Construct a notification request for the transaction and send it to // the main event loop. ntfn := &confirmationsNotification{ @@ -953,9 +944,9 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, TxID: txid, NumConfirmations: numConfs, Event: chainntnfs.NewConfirmationEvent(numConfs), + HeightHint: heightHint, }, - heightHint: heightHint, - pkScript: pkScript, + pkScript: pkScript, } if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { From e804b30669062736c7897c30678016ae51225a5b Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sun, 26 Aug 2018 21:38:42 -0700 Subject: [PATCH 09/27] chainntnfs/txconfnotifier_test: update to use multi-value Register --- chainntnfs/txconfnotifier_test.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index e9d49f10..dea56239 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -124,7 +124,7 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - if err := tcn.Register(&ntfn1); err != nil { + if _, err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -134,7 +134,7 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if err := tcn.Register(&ntfn2); err != nil { + if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -298,7 +298,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - if err := tcn.Register(&ntfn1); err != nil { + if _, err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -309,7 +309,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if err := tcn.Register(&ntfn2); err != nil { + if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -448,7 +448,7 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - if err := tcn.Register(&ntfn1); err != nil { + if _, err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -463,7 +463,8 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if err := tcn.Register(&ntfn2); err != nil { + if _, _, err := tcn.Register(&ntfn2); err != nil { + if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -478,7 +479,7 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx3NumConfs, Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), } - if err := tcn.Register(&ntfn3); err != nil { + if _, err := tcn.Register(&ntfn3); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -724,10 +725,10 @@ func TestTxConfHeightHintCache(t *testing.T) { Event: chainntnfs.NewConfirmationEvent(2), } - if err := tcn.Register(ntfn1); err != nil { + if _, err := tcn.Register(ntfn1); err != nil { t.Fatalf("unable to register tx1: %v", err) } - if err := tcn.Register(ntfn2); err != nil { + if _, err := tcn.Register(ntfn2); err != nil { t.Fatalf("unable to register tx2: %v", err) } @@ -904,7 +905,7 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 1, Event: chainntnfs.NewConfirmationEvent(1), } - if err := tcn.Register(&ntfn1); err != nil { + if _, err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } if err := tcn.UpdateConfDetails(*ntfn1.TxID, 0, nil); err != nil { @@ -917,7 +918,7 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 2, Event: chainntnfs.NewConfirmationEvent(2), } - if err := tcn.Register(&ntfn2); err != nil { + if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } if err := tcn.UpdateConfDetails(*ntfn2.TxID, 0, nil); err != nil { From 37c864d6f699163dd8d135650e26f99e99040b10 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 28 Sep 2018 16:24:19 -0700 Subject: [PATCH 10/27] chainntnfs/txconfnotifier: add HistoricalConfDispatch struct --- chainntnfs/txconfnotifier.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index e64da77d..511a5971 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -48,6 +48,26 @@ type ConfNtfn struct { dispatched bool } +// HistoricalConfDispatch parameterizes a manual rescan for a particular +// transaction identifier. The parameters include the start and end block +// heights specifying the range of blocks to scan. +type HistoricalConfDispatch struct { + // TxID is the transaction ID to search for in the historical dispatch. + TxID *chainhash.Hash + + // PkScript is a public key script from an output created by this + // transaction. + PkScript []byte + + // StartHeight specifies the block height at which to being the + // historical rescan. + StartHeight uint32 + + // EndHeight specifies the last block height (inclusive) that the + // historical scan should consider. + EndHeight uint32 +} + // NewConfirmationEvent constructs a new ConfirmationEvent with newly opened // channels. func NewConfirmationEvent(numConfs uint32) *ConfirmationEvent { From 74122e00f5971992538c7c6de34c5e0cfafb514a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 28 Sep 2018 16:32:53 -0700 Subject: [PATCH 11/27] chainntnfs/txconfnotifier: add PkScript to ConfNtfn --- chainntnfs/txconfnotifier.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 511a5971..2bbec53b 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -27,6 +27,10 @@ type ConfNtfn struct { // are requested. TxID *chainhash.Hash + // PkScript is the public key script of an outpoint created in this + // transaction. + PkScript []byte + // NumConfirmations is the number of confirmations after which the // notification is to be sent. NumConfirmations uint32 From f94de2308f722585a8913332157cb40eb72d4746 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 28 Sep 2018 16:36:47 -0700 Subject: [PATCH 12/27] chainntnfs/txconfnotifier: return HistoricalConfDispatch from Register --- chainntnfs/txconfnotifier.go | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 2bbec53b..ca8d0b30 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -40,8 +40,7 @@ type ConfNtfn struct { Event *ConfirmationEvent // HeightHint is the minimum height in the chain that we expect to find - // this txid. This value will be overridden by the height hint cache if - // a more recent value is available. + // this txid. HeightHint uint32 // details describes the transaction's position is the blockchain. May be @@ -189,26 +188,32 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, // Register handles a new notification request. The client will be notified when // the transaction gets a sufficient number of confirmations on the blockchain. +// The registration succeeds if no error is returned. If the returned +// HistoricalConfDispatch is non-nil, the caller is responsible for attempting +// to manually rescan blocks for the txid between the start and end heights. // // NOTE: If the transaction has already been included in a block on the chain, // the confirmation details must be provided with the UpdateConfDetails method, // otherwise we will wait for the transaction to confirm even though it already // has. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) (bool, uint32, error) { +func (tcn *TxConfNotifier) Register( + ntfn *ConfNtfn) (*HistoricalConfDispatch, error) { + select { case <-tcn.quit: - return false, 0, ErrTxConfNotifierExiting + return nil, ErrTxConfNotifierExiting default: } // Before proceeding to register the notification, we'll query our // height hint cache to determine whether a better one exists. + startHeight := ntfn.HeightHint hint, err := tcn.hintCache.QueryConfirmHint(*ntfn.TxID) if err == nil { - if hint > ntfn.HeightHint { + if hint > startHeight { Log.Debugf("Using height hint %d retrieved "+ "from cache for %v", hint, *ntfn.TxID) - ntfn.HeightHint = hint + startHeight = hint } } @@ -254,9 +259,22 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) (bool, uint32, error) { return nil, nil } + // Construct the parameters for historical dispatch, scanning the range + // of blocks between our best known height hint and the notifier's + // current height. The notifier will begin also watching for + // confirmations at tip starting with the next block. + dispatch := &HistoricalConfDispatch{ + TxID: ntfn.TxID, + PkScript: ntfn.PkScript, + StartHeight: startHeight, + EndHeight: tcn.currentHeight, + } + // Set this confSet's status to pending, ensuring subsequent // registrations don't also attempt a dispatch. confSet.rescanStatus = rescanPending + + return dispatch, nil } // UpdateConfDetails attempts to update the confirmation details for an active From 217b1fc0ef68c636fe1d7ef8cce7ddc5f906e817 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 28 Sep 2018 16:36:47 -0700 Subject: [PATCH 13/27] chainntnfs/txconfnotifier: return HistoricalConfDispatch from Register --- chainntnfs/txconfnotifier.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index ca8d0b30..88a4a25d 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -207,6 +207,8 @@ func (tcn *TxConfNotifier) Register( // Before proceeding to register the notification, we'll query our // height hint cache to determine whether a better one exists. + // + // TODO(conner): verify that all submitted height hints are identical. startHeight := ntfn.HeightHint hint, err := tcn.hintCache.QueryConfirmHint(*ntfn.TxID) if err == nil { From 9ae6d439163979fd31594de3ced6ef4318af6e58 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 28 Sep 2018 16:30:13 -0700 Subject: [PATCH 14/27] chainntnfs/txconfnotifier: split out ntfn dispatch into helper --- chainntnfs/txconfnotifier.go | 153 ++++++++++++++++++++--------------- 1 file changed, 89 insertions(+), 64 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 88a4a25d..fef1a2b7 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -222,9 +222,6 @@ func (tcn *TxConfNotifier) Register( tcn.Lock() defer tcn.Unlock() - // TODO(conner): promote immediately to confNotifications if a - // historical dispatch has already completed. - confSet, ok := tcn.confNotifications[*ntfn.TxID] if !ok { confSet = newConfNtfnSet() @@ -238,6 +235,10 @@ func (tcn *TxConfNotifier) Register( // A prior rescan has already completed and we are actively watching at // tip for this txid. case rescanComplete: + // If conf details for this set of notifications has already + // been found, we'll attempt to deliver them immediately to this + // client. + tcn.dispatchConfDetails(ntfn, confSet.details) return nil, nil // A rescan is already in progress, return here to prevent dispatching @@ -327,72 +328,96 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, details.BlockHeight, txid, err) } - // Update the conf details of all ntfns that don't yet have them. + // Cache the details found in the rescan and attempt to dispatch any + // notifications that have not yet been delivered. + confSet.details = details for _, ntfn := range confSet.ntfns { - if ntfn.details != nil { - continue + err = tcn.dispatchConfDetails(ntfn, details) + if err != nil { + return err + } + } + + return nil +} + +// dispatchConfDetails attempts to cache and dispatch details to a particular +// client if the transaction has sufficiently confirmed. If the provided details +// are nil, this method will be a no-op. +func (tcn *TxConfNotifier) dispatchConfDetails( + ntfn *ConfNtfn, details *TxConfirmation) error { + + // If no details are provided, return early as we can't dispatch. + if details == nil { + return nil + } + + // Set the confirmation details for this notification, only if the + // notification doesn't already have details. This ensure we only fall + // through the following logic at most once, which could cause the + // buffered channels to block when exceeding their allocated capacity. + if ntfn.details != nil { + return nil + } + ntfn.details = details + + // Now, we'll examine whether the transaction of this + // notification request has reached its required number of + // confirmations. If it has, we'll dispatch a confirmation + // notification to the caller. + confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 + if confHeight <= tcn.currentHeight { + Log.Infof("Dispatching %v conf notification for %v", + ntfn.NumConfirmations, ntfn.TxID) + + // We'll send a 0 value to the Updates channel, + // indicating that the transaction has already been + // confirmed. + select { + case ntfn.Event.Updates <- 0: + case <-tcn.quit: + return ErrTxConfNotifierExiting } - ntfn.details = details - - // Now, we'll examine whether the transaction of this - // notification request has reached its required number of - // confirmations. If it has, we'll dispatch a confirmation - // notification to the caller. - confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 - if confHeight <= tcn.currentHeight { - Log.Infof("Dispatching %v conf notification for %v", - ntfn.NumConfirmations, ntfn.TxID) - - // We'll send a 0 value to the Updates channel, - // indicating that the transaction has already been - // confirmed. - select { - case ntfn.Event.Updates <- 0: - case <-tcn.quit: - return ErrTxConfNotifierExiting - } - - select { - case ntfn.Event.Confirmed <- details: - ntfn.dispatched = true - case <-tcn.quit: - return ErrTxConfNotifierExiting - } - } else { - // Otherwise, we'll keep track of the notification - // request by the height at which we should dispatch the - // confirmation notification. - ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] - if !exists { - ntfnSet = make(map[*ConfNtfn]struct{}) - tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet - } - ntfnSet[ntfn] = struct{}{} - - // We'll also send an update to the client of how many - // confirmations are left for the transaction to be - // confirmed. - numConfsLeft := confHeight - tcn.currentHeight - select { - case ntfn.Event.Updates <- numConfsLeft: - case <-tcn.quit: - return ErrTxConfNotifierExiting - } + select { + case ntfn.Event.Confirmed <- details: + ntfn.dispatched = true + case <-tcn.quit: + return ErrTxConfNotifierExiting } - - // As a final check, we'll also watch the transaction if it's - // still possible for it to get reorged out of the chain. - blockHeight := details.BlockHeight - reorgSafeHeight := blockHeight + tcn.reorgSafetyLimit - if reorgSafeHeight > tcn.currentHeight { - txSet, exists := tcn.txsByInitialHeight[blockHeight] - if !exists { - txSet = make(map[chainhash.Hash]struct{}) - tcn.txsByInitialHeight[blockHeight] = txSet - } - txSet[txid] = struct{}{} + } else { + // Otherwise, we'll keep track of the notification + // request by the height at which we should dispatch the + // confirmation notification. + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + ntfnSet = make(map[*ConfNtfn]struct{}) + tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet } + ntfnSet[ntfn] = struct{}{} + + // We'll also send an update to the client of how many + // confirmations are left for the transaction to be + // confirmed. + numConfsLeft := confHeight - tcn.currentHeight + select { + case ntfn.Event.Updates <- numConfsLeft: + case <-tcn.quit: + return ErrTxConfNotifierExiting + } + } + + // As a final check, we'll also watch the transaction if it's + // still possible for it to get reorged out of the chain. + blockHeight := details.BlockHeight + reorgSafeHeight := blockHeight + tcn.reorgSafetyLimit + if reorgSafeHeight > tcn.currentHeight { + txSet, exists := tcn.txsByInitialHeight[blockHeight] + if !exists { + txSet = make(map[chainhash.Hash]struct{}) + tcn.txsByInitialHeight[blockHeight] = txSet + } + txSet[*ntfn.TxID] = struct{}{} } return nil From 32e7368e1ec757eaf345de0f3f950a91d15309a7 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 28 Sep 2018 16:38:08 -0700 Subject: [PATCH 15/27] chainntnfs/txconfnotifier: remove clientID from UpdateConfDetails signature --- chainntnfs/txconfnotifier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index fef1a2b7..e2446a45 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -287,7 +287,7 @@ func (tcn *TxConfNotifier) Register( // NOTE: The notification should be registered first to ensure notifications are // dispatched correctly. func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, - clientID uint64, details *TxConfirmation) error { + details *TxConfirmation) error { select { case <-tcn.quit: From 11c231d8145e955f5627a6210f97a7d6814e1a47 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 15 Oct 2018 15:23:07 -0700 Subject: [PATCH 16/27] chainntnfs/txconfnotifier_test: remove clientID argument... to UpdateConfDetails --- chainntnfs/txconfnotifier_test.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index dea56239..04f13ef5 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -320,7 +320,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { BlockHeight: 9, TxIndex: 1, } - err := tcn.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1) + err := tcn.UpdateConfDetails(tx1Hash, &txConf1) if err != nil { t.Fatalf("unable to update conf details: %v", err) } @@ -353,7 +353,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { BlockHeight: 9, TxIndex: 2, } - err = tcn.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2) + err = tcn.UpdateConfDetails(tx2Hash, &txConf2) if err != nil { t.Fatalf("unable to update conf details: %v", err) } @@ -452,7 +452,7 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("unable to register ntfn: %v", err) } - if err := tcn.UpdateConfDetails(*ntfn1.TxID, 0, nil); err != nil { + if err := tcn.UpdateConfDetails(*ntfn1.TxID, nil); err != nil { t.Fatalf("unable to deliver conf details: %v", err) } @@ -463,12 +463,11 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if _, _, err := tcn.Register(&ntfn2); err != nil { if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } - if err := tcn.UpdateConfDetails(*ntfn2.TxID, 0, nil); err != nil { + if err := tcn.UpdateConfDetails(*ntfn2.TxID, nil); err != nil { t.Fatalf("unable to deliver conf details: %v", err) } @@ -483,7 +482,7 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("unable to register ntfn: %v", err) } - if err := tcn.UpdateConfDetails(*ntfn3.TxID, 0, nil); err != nil { + if err := tcn.UpdateConfDetails(*ntfn3.TxID, nil); err != nil { t.Fatalf("unable to deliver conf details: %v", err) } @@ -782,10 +781,10 @@ func TestTxConfHeightHintCache(t *testing.T) { // Now, update the conf details reporting that the neither txn was found // in the historical dispatch. - if err := tcn.UpdateConfDetails(tx1Hash, 0, nil); err != nil { + if err := tcn.UpdateConfDetails(tx1Hash, nil); err != nil { t.Fatalf("unable to update conf details: %v", err) } - if err := tcn.UpdateConfDetails(tx2Hash, 0, nil); err != nil { + if err := tcn.UpdateConfDetails(tx2Hash, nil); err != nil { t.Fatalf("unable to update conf details: %v", err) } @@ -908,7 +907,7 @@ func TestTxConfTearDown(t *testing.T) { if _, err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } - if err := tcn.UpdateConfDetails(*ntfn1.TxID, 0, nil); err != nil { + if err := tcn.UpdateConfDetails(*ntfn1.TxID, nil); err != nil { t.Fatalf("unable to update conf details: %v", err) } @@ -921,7 +920,7 @@ func TestTxConfTearDown(t *testing.T) { if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } - if err := tcn.UpdateConfDetails(*ntfn2.TxID, 0, nil); err != nil { + if err := tcn.UpdateConfDetails(*ntfn2.TxID, nil); err != nil { t.Fatalf("unable to update conf details: %v", err) } From a4c9f62c6b0512eaaa4fc0137b95e83568f74e13 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sun, 26 Aug 2018 21:36:17 -0700 Subject: [PATCH 17/27] chainntnfs/bitcoind: use HistoricalConfDispatch in ntfn registry --- chainntnfs/bitcoindnotify/bitcoind.go | 34 +++++++++++++++------------ 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index c592c3c8..32756903 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -254,13 +254,7 @@ out: } b.spendNotifications[op][msg.spendID] = msg - case *chainntnfs.ConfNtfn: - chainntnfs.Log.Infof("New confirmation "+ - "subscription: txid=%v, numconfs=%v", - msg.TxID, msg.NumConfirmations) - - currentHeight := uint32(b.bestBlock.Height) - + case *chainntnfs.HistoricalConfDispatch: // Look up whether the transaction is already // included in the active chain. We'll do this // in a goroutine to prevent blocking @@ -270,8 +264,7 @@ out: defer b.wg.Done() confDetails, _, err := b.historicalConfDetails( - msg.TxID, msg.HeightHint, - currentHeight, + msg.TxID, msg.StartHeight, msg.EndHeight, ) if err != nil { chainntnfs.Log.Error(err) @@ -286,8 +279,7 @@ out: // cache at tip, since any pending // rescans have now completed. err = b.txConfNotifier.UpdateConfDetails( - *msg.TxID, msg.ConfID, - confDetails, + *msg.TxID, confDetails, ) if err != nil { chainntnfs.Log.Error(err) @@ -453,7 +445,7 @@ func (b *BitcoindNotifier) handleRelevantTx(tx chain.RelevantTx, bestHeight int3 // historicalConfDetails looks up whether a transaction is already included in a // block in the active chain and, if so, returns details about the confirmation. func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash, - heightHint, currentHeight uint32) (*chainntnfs.TxConfirmation, + startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, chainntnfs.TxConfStatus, error) { // We'll first attempt to retrieve the transaction using the node's @@ -469,7 +461,7 @@ func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash, case err != nil: chainntnfs.Log.Debugf("Failed getting conf details from "+ "index (%v), scanning manually", err) - return b.confDetailsManually(txid, heightHint, currentHeight) + return b.confDetailsManually(txid, startHeight, endHeight) // The transaction was found within the node's mempool. case txStatus == chainntnfs.TxFoundMempool: @@ -964,12 +956,24 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, HeightHint: heightHint, } - if err := b.txConfNotifier.Register(ntfn); err != nil { + chainntnfs.Log.Infof("New confirmation subscription: "+ + "txid=%v, numconfs=%v", txid, numConfs) + + // Register the conf notification with txconfnotifier. A non-nil value + // for `dispatch` will be returned if we are required to perform a + // manual scan for the confirmation. Otherwise the notifier will begin + // watching at tip for the transaction to confirm. + dispatch, err := b.txConfNotifier.Register(ntfn) + if err != nil { return nil, err } + if dispatch == nil { + return ntfn.Event, nil + } + select { - case b.notificationRegistry <- ntfn: + case b.notificationRegistry <- dispatch: return ntfn.Event, nil case <-b.quit: return nil, ErrChainNotifierShuttingDown From 6cd0f867ad3632faf7fbb79ed9fae78e024c577b Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sun, 26 Aug 2018 21:36:45 -0700 Subject: [PATCH 18/27] chainntnfs/btcd: use HistoricalConfDispatch in ntfn registry --- chainntnfs/btcdnotify/btcd.go | 40 +++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 29d020d0..5a60a83f 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -324,13 +324,7 @@ out: } b.spendNotifications[op][msg.spendID] = msg - case *chainntnfs.ConfNtfn: - chainntnfs.Log.Infof("New confirmation "+ - "subscription: txid=%v, numconfs=%v", - msg.TxID, msg.NumConfirmations) - - bestHeight := uint32(b.bestBlock.Height) - + case *chainntnfs.HistoricalConfDispatch: // Look up whether the transaction is already // included in the active chain. We'll do this // in a goroutine to prevent blocking @@ -340,8 +334,7 @@ out: defer b.wg.Done() confDetails, _, err := b.historicalConfDetails( - msg.TxID, msg.HeightHint, - bestHeight, + msg.TxID, msg.StartHeight, msg.EndHeight, ) if err != nil { chainntnfs.Log.Error(err) @@ -356,8 +349,7 @@ out: // cache at tip, since any pending // rescans have now completed. err = b.txConfNotifier.UpdateConfDetails( - *msg.TxID, msg.ConfID, - confDetails, + *msg.TxID, confDetails, ) if err != nil { chainntnfs.Log.Error(err) @@ -523,7 +515,7 @@ out: // historicalConfDetails looks up whether a transaction is already included in a // block in the active chain and, if so, returns details about the confirmation. func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash, - heightHint, currentHeight uint32) (*chainntnfs.TxConfirmation, + startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, chainntnfs.TxConfStatus, error) { // We'll first attempt to retrieve the transaction using the node's @@ -539,7 +531,7 @@ func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash, case err != nil: chainntnfs.Log.Debugf("Failed getting conf details from "+ "index (%v), scanning manually", err) - return b.confDetailsManually(txid, heightHint, currentHeight) + return b.confDetailsManually(txid, startHeight, endHeight) // The transaction was found within the node's mempool. case txStatus == chainntnfs.TxFoundMempool: @@ -638,15 +630,15 @@ func (b *BtcdNotifier) confDetailsFromTxIndex(txid *chainhash.Hash, // earliest height the transaction could have been included in, to the current // height in the chain. If the transaction is found, its confirmation details // are returned. Otherwise, nil is returned. -func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, heightHint, - currentHeight uint32) (*chainntnfs.TxConfirmation, +func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, startHeight, + endHeight uint32) (*chainntnfs.TxConfirmation, chainntnfs.TxConfStatus, error) { targetTxidStr := txid.String() // Begin scanning blocks at every height to determine where the // transaction was included in. - for height := heightHint; height <= currentHeight; height++ { + for height := startHeight; height <= endHeight; height++ { // Ensure we haven't been requested to shut down before // processing the next height. select { @@ -1024,12 +1016,24 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, HeightHint: heightHint, } - if err := b.txConfNotifier.Register(ntfn); err != nil { + chainntnfs.Log.Infof("New confirmation subscription: "+ + "txid=%v, numconfs=%v", txid, numConfs) + + // Register the conf notification with txconfnotifier. A non-nil value + // for `dispatch` will be returned if we are required to perform a + // manual scan for the confirmation. Otherwise the notifier will begin + // watching at tip for the transaction to confirm. + dispatch, err := b.txConfNotifier.Register(ntfn) + if err != nil { return nil, err } + if dispatch == nil { + return ntfn.Event, nil + } + select { - case b.notificationRegistry <- ntfn: + case b.notificationRegistry <- dispatch: return ntfn.Event, nil case <-b.quit: return nil, ErrChainNotifierShuttingDown From df9bb560687c1755295e158d920def2d5e1510c8 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sun, 26 Aug 2018 21:37:10 -0700 Subject: [PATCH 19/27] chainntnfs/neutrino: use HistoricalConfDispatch in ntfn registry --- chainntnfs/neutrinonotify/neutrino.go | 72 +++++++++++---------------- 1 file changed, 30 insertions(+), 42 deletions(-) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 1d3741c1..3c888af6 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -314,19 +314,7 @@ out: } n.spendNotifications[op][msg.spendID] = msg - case *confirmationsNotification: - chainntnfs.Log.Infof("New confirmations subscription: "+ - "txid=%v, numconfs=%v, height_hint=%v", - msg.TxID, msg.NumConfirmations, - msg.ConfNtfn.HeightHint) - - // If the notification can be partially or - // fully dispatched, then we can skip the first - // phase for ntfns. - n.heightMtx.RLock() - currentHeight := n.bestHeight - n.heightMtx.RUnlock() - + case *chainntnfs.HistoricalConfDispatch: // Look up whether the transaction is already // included in the active chain. We'll do this // in a goroutine to prevent blocking @@ -336,8 +324,8 @@ out: defer n.wg.Done() confDetails, err := n.historicalConfDetails( - msg.TxID, msg.pkScript, currentHeight, - msg.ConfNtfn.HeightHint, + msg.TxID, msg.PkScript, + msg.StartHeight, msg.EndHeight, ) if err != nil { chainntnfs.Log.Error(err) @@ -349,7 +337,7 @@ out: // the script is found in a block. params := n.p2pNode.ChainParams() _, addrs, _, err := txscript.ExtractPkScriptAddrs( - msg.pkScript, ¶ms, + msg.PkScript, ¶ms, ) if err != nil { chainntnfs.Log.Error(err) @@ -363,8 +351,7 @@ out: // cache at tip, since any pending // rescans have now completed. err = n.txConfNotifier.UpdateConfDetails( - *msg.TxID, msg.ConfID, - confDetails, + *msg.TxID, confDetails, ) if err != nil { chainntnfs.Log.Error(err) @@ -380,16 +367,14 @@ out: // future initial confirmation. rescanUpdate := []neutrino.UpdateOption{ neutrino.AddAddrs(addrs...), - neutrino.Rewind(currentHeight), + neutrino.Rewind(msg.EndHeight), neutrino.DisableDisconnectedNtfns(true), } err = n.chainView.Update(rescanUpdate...) if err != nil { - chainntnfs.Log.Errorf("Unable "+ - "to update rescan: %v", + chainntnfs.Log.Errorf("Unable to update rescan: %v", err) } - }() case *blockEpochRegistration: @@ -526,11 +511,11 @@ out: // confirmation. func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, pkScript []byte, - currentHeight, heightHint uint32) (*chainntnfs.TxConfirmation, error) { + startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, error) { // Starting from the height hint, we'll walk forwards in the chain to // see if this transaction has already been confirmed. - for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { + for scanHeight := startHeight; scanHeight <= endHeight; scanHeight++ { // Ensure we haven't been requested to shut down before // processing the next height. select { @@ -922,13 +907,6 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return spendEvent, nil } -// confirmationNotification represents a client's intent to receive a -// notification once the target txid reaches numConfirmations confirmations. -type confirmationsNotification struct { - chainntnfs.ConfNtfn - pkScript []byte -} - // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier // which will be triggered once the txid reaches numConfs number of // confirmations. @@ -938,23 +916,33 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, // Construct a notification request for the transaction and send it to // the main event loop. - ntfn := &confirmationsNotification{ - ConfNtfn: chainntnfs.ConfNtfn{ - ConfID: atomic.AddUint64(&n.confClientCounter, 1), - TxID: txid, - NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(numConfs), - HeightHint: heightHint, - }, - pkScript: pkScript, + ntfn := &chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&n.confClientCounter, 1), + TxID: txid, + PkScript: pkScript, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(numConfs), + HeightHint: heightHint, } - if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + chainntnfs.Log.Infof("New confirmation subscription: "+ + "txid=%v, numconfs=%v", txid, numConfs) + + // Register the conf notification with txconfnotifier. A non-nil value + // for `dispatch` will be returned if we are required to perform a + // manual scan for the confirmation. Otherwise the notifier will begin + // watching at tip for the transaction to confirm. + dispatch, err := n.txConfNotifier.Register(ntfn) + if err != nil { return nil, err } + if dispatch == nil { + return ntfn.Event, nil + } + select { - case n.notificationRegistry <- ntfn: + case n.notificationRegistry <- dispatch: return ntfn.Event, nil case <-n.quit: return nil, ErrChainNotifierShuttingDown From 2dcb86bced5f3826134eba5ba83d3bc67d9b5ce3 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sat, 29 Sep 2018 13:53:24 -0700 Subject: [PATCH 20/27] chainntnfs/txconfnotifier: set confset details at tip This commit ensures that a confSet's details are assigned in the confNotifications index after discovering the transaction at tip. The recent changes allow a later notification to be dispatched on registration if an earlier one has already discovered the confirmation details. Before this change, it was observed that a later registration would attempt an immediate delivery, but fail to do so because the confset's details were nil. This commit remedies that dispatch path, allowing the integration tests to pass again. --- chainntnfs/txconfnotifier.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index e2446a45..11f09650 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -455,17 +455,26 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, // handled correctly. for _, tx := range txns { txHash := tx.Hash() + + // Check if we have any pending notifications for this txid. If + // none are found, we can proceed to the next transaction. confSet, ok := tcn.confNotifications[*txHash] if !ok { continue } + Log.Debugf("Block contains txid=%v, constructing details", + txHash) + + details := &TxConfirmation{ + BlockHash: blockHash, + BlockHeight: blockHeight, + TxIndex: uint32(tx.Index()), + } + + confSet.details = details for _, ntfn := range confSet.ntfns { - ntfn.details = &TxConfirmation{ - BlockHash: blockHash, - BlockHeight: blockHeight, - TxIndex: uint32(tx.Index()), - } + ntfn.details = details confHeight := blockHeight + ntfn.NumConfirmations - 1 ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] From eee531155757347c25e5749a39748688ba3f76a9 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sat, 29 Sep 2018 13:55:30 -0700 Subject: [PATCH 21/27] chainntnfs/txconnotifier: add debug logs, log errs/warnings, godocs --- chainntnfs/txconfnotifier.go | 49 ++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 11f09650..bf5ffe79 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -217,6 +217,9 @@ func (tcn *TxConfNotifier) Register( "from cache for %v", hint, *ntfn.TxID) startHeight = hint } + } else if err != ErrConfirmHintNotFound { + Log.Errorf("Unable to query confirm hint for %v: %v", + *ntfn.TxID, err) } tcn.Lock() @@ -224,6 +227,8 @@ func (tcn *TxConfNotifier) Register( confSet, ok := tcn.confNotifications[*ntfn.TxID] if !ok { + // If this is the first registration for this txid, construct a + // confSet to coalesce all notifications for the same txid. confSet = newConfNtfnSet() tcn.confNotifications[*ntfn.TxID] = confSet } @@ -238,13 +243,17 @@ func (tcn *TxConfNotifier) Register( // If conf details for this set of notifications has already // been found, we'll attempt to deliver them immediately to this // client. - tcn.dispatchConfDetails(ntfn, confSet.details) - return nil, nil + Log.Debugf("Attempting to dispatch conf for txid=%v "+ + "on registration since rescan has finished", ntfn.TxID) + return nil, tcn.dispatchConfDetails(ntfn, confSet.details) // A rescan is already in progress, return here to prevent dispatching // another. When the scan returns, this notifications details will be // updated as well. case rescanPending: + Log.Debugf("Waiting for pending rescan to finish before "+ + "notifying txid=%v at tip", ntfn.TxID) + return nil, nil // If no rescan has been dispatched, attempt to do so now. @@ -255,6 +264,8 @@ func (tcn *TxConfNotifier) Register( // is to be confirmed at a height greater than the conf notifier's // current height, we'll refrain from spawning a historical dispatch. if startHeight > tcn.currentHeight { + Log.Debugf("Height hint is above current height, not dispatching "+ + "historical rescan for txid=%v ", ntfn.TxID) // Set the rescan status to complete, which will allow the conf // notifier to start delivering messages for this set // immediately. @@ -262,6 +273,8 @@ func (tcn *TxConfNotifier) Register( return nil, nil } + Log.Debugf("Dispatching historical rescan for txid=%v ", ntfn.TxID) + // Construct the parameters for historical dispatch, scanning the range // of blocks between our best known height hint and the notifier's // current height. The notifier will begin also watching for @@ -316,10 +329,20 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, // The notifier has yet to reach the height at which the transaction was // included in a block, so we should defer until handling it then within // ConnectTip. - if details == nil || details.BlockHeight > tcn.currentHeight { + if details == nil { + Log.Debugf("Conf details for txid=%v not found during "+ + "historical dispatch, waiting to dispatch at tip", txid) return nil } + if details.BlockHeight > tcn.currentHeight { + Log.Debugf("Conf details for txid=%v found above current "+ + "height, waiting to dispatch at tip", txid) + return nil + } + + Log.Debugf("Updating conf details for txid=%v details", txid) + err := tcn.hintCache.CommitConfirmHint(details.BlockHeight, txid) if err != nil { // The error is not fatal, so we should not return an error to @@ -349,6 +372,8 @@ func (tcn *TxConfNotifier) dispatchConfDetails( // If no details are provided, return early as we can't dispatch. if details == nil { + Log.Debugf("Unable to dispatch %v, no details provided", + ntfn.TxID) return nil } @@ -386,6 +411,9 @@ func (tcn *TxConfNotifier) dispatchConfDetails( return ErrTxConfNotifierExiting } } else { + Log.Debugf("Queueing %v conf notification for %v at tip ", + ntfn.NumConfirmations, ntfn.TxID) + // Otherwise, we'll keep track of the notification // request by the height at which we should dispatch the // confirmation notification. @@ -498,7 +526,7 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, // transactions along with the ones that confirmed at the current // height. To do so, we'll iterate over the confNotifications map, which // contains the transactions we currently have notifications for. Since - // this map doesn't tell us whether the transaction hsa confirmed or + // this map doesn't tell us whether the transaction has confirmed or // not, we'll need to look at txsByInitialHeight to determine so. var txsToUpdateHints []chainhash.Hash for confirmedTx := range tcn.txsByInitialHeight[tcn.currentHeight] { @@ -506,6 +534,9 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } out: for maybeUnconfirmedTx, confSet := range tcn.confNotifications { + // We shouldn't update the confirm hints if we still have a + // pending rescan in progress. We'll skip writing any for + // notification sets that haven't reached rescanComplete. if confSet.rescanStatus != rescanComplete { continue } @@ -546,15 +577,7 @@ out: for txHash := range txHashes { confSet := tcn.confNotifications[txHash] for _, ntfn := range confSet.ntfns { - // If the notification hasn't learned about the - // confirmation of its transaction yet (in the - // case of historical confirmations), we'll skip - // it. - if ntfn.details == nil { - continue - } - - txConfHeight := ntfn.details.BlockHeight + + txConfHeight := confSet.details.BlockHeight + ntfn.NumConfirmations - 1 numConfsLeft := txConfHeight - blockHeight From a4dee14b203e1f46ef4070412eae85d62a897fe8 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 3 Oct 2018 13:42:41 -0700 Subject: [PATCH 22/27] chainntnfs/tx_notifier: mark rescan as complete for transactions confirmed at tip In this commit, we mark the rescan status for a transaction as complete if we happen to detect it has confirmed within a new block that extends the chain. We do this as otherwise, it's possible for us to not immediately dispatch the notification upon a subsequent registration due to the rescan state machine. --- chainntnfs/txconfnotifier.go | 1 + 1 file changed, 1 insertion(+) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index bf5ffe79..b7b6b69a 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -500,6 +500,7 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, TxIndex: uint32(tx.Index()), } + confSet.rescanStatus = rescanComplete confSet.details = details for _, ntfn := range confSet.ntfns { ntfn.details = details From b28145b69ede75d562c3ab0ab4abfe7859322ca1 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 3 Oct 2018 13:43:44 -0700 Subject: [PATCH 23/27] chainntnfs/tx_notifier: consume reorg notification for transactions on block inclusion In this commit, we'll attempt to consume a reorg notification for a transaction that was previously reorged out of the chain upon block inclusion to ensure that it is not lingering due to a client not handling it the first time. --- chainntnfs/txconfnotifier.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index b7b6b69a..72f023e5 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -494,6 +494,9 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, Log.Debugf("Block contains txid=%v, constructing details", txHash) + // If we have any, we'll record its confirmed height so that + // notifications get dispatched when the transaction reaches the + // clients' desired number of confirmations. details := &TxConfirmation{ BlockHash: blockHash, BlockHeight: blockHeight, @@ -505,6 +508,18 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, for _, ntfn := range confSet.ntfns { ntfn.details = details + // In the event that this notification was aware that + // the transaction was reorged out of the chain, we'll + // consume the reorg notification if it hasn't been done + // yet already. + select { + case <-ntfn.Event.NegativeConf: + default: + } + + // We'll note this client's required number of + // confirmations so that we can notify them when + // expected. confHeight := blockHeight + ntfn.NumConfirmations - 1 ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { @@ -513,6 +528,9 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } ntfnSet[ntfn] = struct{}{} + // We'll also note the initial confirmation height in + // order to correctly handle dispatching notifications + // when the transaction gets reorged out of the chain. txSet, exists := tcn.txsByInitialHeight[blockHeight] if !exists { txSet = make(map[chainhash.Hash]struct{}) @@ -602,7 +620,7 @@ out: // Then, we'll dispatch notifications for all the transactions that have // become confirmed at this new block height. - for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { + for ntfn := range tcn.ntfnsByConfirmHeight[blockHeight] { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) From 5ae8243d0d95b4f60de2e2447cb2caa7820cd7e8 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 3 Oct 2018 13:47:26 -0700 Subject: [PATCH 24/27] chainntnfs/tx_notifier: remove cached conf details on reorg In this commit, we address a small bug where it's possible to deliver a confirmation notification with stale confirmation details upon registration. This can happen if a transaction has confirmed but was reorged out of the chain later on, and a subsequent notification is registered. --- chainntnfs/txconfnotifier.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 72f023e5..8d219622 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -687,7 +687,15 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // clients is always non-blocking. for initialHeight, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { + // If the transaction has been reorged out of the chain, + // we'll make sure to remove the cached confirmation + // details to prevent notifying clients with old + // information. confSet := tcn.confNotifications[txHash] + if initialHeight == blockHeight { + confSet.details = nil + } + for _, ntfn := range confSet.ntfns { // First, we'll attempt to drain an update // from each notification to ensure sends to the From 589dc96d88bfc560378c213d852d5b335b3abb0f Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 3 Oct 2018 13:51:05 -0700 Subject: [PATCH 25/27] chainntnfs/tx_notifier: extract conf reorg dispatch into method --- chainntnfs/txconfnotifier.go | 105 +++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 8d219622..c00b7e30 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -29,6 +29,9 @@ type ConfNtfn struct { // PkScript is the public key script of an outpoint created in this // transaction. + // + // NOTE: This value MUST be set when the dispatch is to be performed + // using compact filters. PkScript []byte // NumConfirmations is the number of confirmations after which the @@ -60,6 +63,9 @@ type HistoricalConfDispatch struct { // PkScript is a public key script from an output created by this // transaction. + // + // NOTE: This value MUST be set when the dispatch is to be performed + // using compact filters. PkScript []byte // StartHeight specifies the block height at which to being the @@ -709,57 +715,15 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // Then, we'll check if the current transaction // was included in the block currently being - // disconnected. If it was, we'll need to take - // some necessary precautions. + // disconnected. If it was, we'll need to + // dispatch a reorg notification to the client. if initialHeight == blockHeight { - // If the transaction's confirmation notification - // has already been dispatched, we'll attempt to - // notify the client it was reorged out of the chain. - if ntfn.dispatched { - // Attempt to drain the confirmation notification - // to ensure sends to the Confirmed channel are - // always non-blocking. - select { - case <-ntfn.Event.Confirmed: - case <-tcn.quit: - return ErrTxConfNotifierExiting - default: - } - - ntfn.dispatched = false - - // Send a negative confirmation notification to the - // client indicating how many blocks have been - // disconnected successively. - select { - case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): - case <-tcn.quit: - return ErrTxConfNotifierExiting - } - - continue + err := tcn.dispatchConfReorg( + ntfn, blockHeight, + ) + if err != nil { + return err } - - // Otherwise, since the transactions was reorged out - // of the chain, we can safely remove its accompanying - // confirmation notification. - confHeight := blockHeight + ntfn.NumConfirmations - 1 - ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] - if !exists { - continue - } - delete(ntfnSet, ntfn) - - // Intuitively, we should also remove - // the txHash from confNotifications if - // the ntfnSet is now empty. However, we - // will not do so since we may want to - // continue rewinding the height hints - // for this txid. - // - // NOTE(conner): safe to delete if - // blockHeight is below client-provided - // height hint? } } } @@ -772,6 +736,49 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { return nil } +// dispatchConfReorg dispatches a reorg notification to the client if the +// confirmation notification was already delivered. +// +// NOTE: This must be called with the TxNotifier's lock held. +func (tcn *TxConfNotifier) dispatchConfReorg( + ntfn *ConfNtfn, heightDisconnected uint32) error { + + // If the transaction's confirmation notification has yet to be + // dispatched, we'll need to clear its entry within the + // ntfnsByConfirmHeight index to prevent from notifiying the client once + // the notifier reaches the confirmation height. + if !ntfn.dispatched { + confHeight := heightDisconnected + ntfn.NumConfirmations - 1 + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if exists { + delete(ntfnSet, ntfn) + } + return nil + } + + // Otherwise, the entry within the ntfnsByConfirmHeight has already been + // deleted, so we'll attempt to drain the confirmation notification to + // ensure sends to the Confirmed channel are always non-blocking. + select { + case <-ntfn.Event.Confirmed: + case <-tcn.quit: + return ErrTxConfNotifierExiting + default: + } + + ntfn.dispatched = false + + // Send a negative confirmation notification to the client indicating + // how many blocks have been disconnected successively. + select { + case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-tcn.quit: + return ErrTxConfNotifierExiting + } + + return nil +} + // TearDown is to be called when the owner of the TxConfNotifier is exiting. // This closes the event channels of all registered notifications that have // not been dispatched yet. From e03c818aa74db7a15ad27b95b665b28b4558af52 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 17 Oct 2018 18:10:08 -0700 Subject: [PATCH 26/27] chainntnfs/txconfnotifier: remove ntfn details, bound conf depth Removes details field from conf notifications, in favor of using the details on the confSet. We also bound the requested conf depth to the reorg saftey limit, as the behavior of state tracking within the notifier is undefined otherwise. --- chainntnfs/txconfnotifier.go | 38 ++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index c00b7e30..fb3998c4 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -13,6 +13,10 @@ var ( // ErrTxConfNotifierExiting is an error returned when attempting to // interact with the TxConfNotifier but it been shut down. ErrTxConfNotifierExiting = errors.New("TxConfNotifier is exiting") + + // ErrTxMaxConfs signals that the user requested a number of + // confirmations beyond the reorg safety limit. + ErrTxMaxConfs = errors.New("too many confirmations requested") ) // ConfNtfn represents a notifier client's request to receive a notification @@ -46,10 +50,6 @@ type ConfNtfn struct { // this txid. HeightHint uint32 - // details describes the transaction's position is the blockchain. May be - // nil for unconfirmed transactions. - details *TxConfirmation - // dispatched is false if the confirmed notification has not been sent yet. dispatched bool } @@ -211,6 +211,12 @@ func (tcn *TxConfNotifier) Register( default: } + // Enforce that we will not dispatch confirmations beyond the reorg + // safety limit. + if ntfn.NumConfirmations > tcn.reorgSafetyLimit { + return nil, ErrTxMaxConfs + } + // Before proceeding to register the notification, we'll query our // height hint cache to determine whether a better one exists. // @@ -259,7 +265,6 @@ func (tcn *TxConfNotifier) Register( case rescanPending: Log.Debugf("Waiting for pending rescan to finish before "+ "notifying txid=%v at tip", ntfn.TxID) - return nil, nil // If no rescan has been dispatched, attempt to do so now. @@ -326,6 +331,14 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, return fmt.Errorf("no notification found with TxID %v", txid) } + // If the conf details were already found at tip, all existing + // notifications will have been dispatched or queued for dispatch. We + // can exit early to avoid sending too many notifications on the + // buffered channels. + if confSet.details != nil { + return nil + } + // The historical dispatch has been completed for this confSet. We'll // update the rescan status and cache any details that were found. If // the details are nil, that implies we did not find them and will @@ -383,15 +396,6 @@ func (tcn *TxConfNotifier) dispatchConfDetails( return nil } - // Set the confirmation details for this notification, only if the - // notification doesn't already have details. This ensure we only fall - // through the following logic at most once, which could cause the - // buffered channels to block when exceeding their allocated capacity. - if ntfn.details != nil { - return nil - } - ntfn.details = details - // Now, we'll examine whether the transaction of this // notification request has reached its required number of // confirmations. If it has, we'll dispatch a confirmation @@ -512,8 +516,6 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, confSet.rescanStatus = rescanComplete confSet.details = details for _, ntfn := range confSet.ntfns { - ntfn.details = details - // In the event that this notification was aware that // the transaction was reorged out of the chain, we'll // consume the reorg notification if it hasn't been done @@ -627,11 +629,13 @@ out: // Then, we'll dispatch notifications for all the transactions that have // become confirmed at this new block height. for ntfn := range tcn.ntfnsByConfirmHeight[blockHeight] { + confSet := tcn.confNotifications[*ntfn.TxID] + Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) select { - case ntfn.Event.Confirmed <- ntfn.details: + case ntfn.Event.Confirmed <- confSet.details: ntfn.dispatched = true case <-tcn.quit: return ErrTxConfNotifierExiting From ba28ec3be0c7089abfd3f828d4d3db3802350033 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 18 Oct 2018 16:16:30 -0700 Subject: [PATCH 27/27] chainntnfs/neutrinonotify/neutrino: fix debug logs --- chainntnfs/neutrinonotify/neutrino.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 3c888af6..b4455442 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -465,13 +465,13 @@ out: n.heightMtx.Lock() if update.height != uint32(n.bestHeight) { - chainntnfs.Log.Infof("Missed disconnected" + + chainntnfs.Log.Infof("Missed disconnected " + "blocks, attempting to catch up") } hash, err := n.p2pNode.GetBlockHash(int64(n.bestHeight)) if err != nil { - chainntnfs.Log.Errorf("Unable to fetch block hash"+ + chainntnfs.Log.Errorf("Unable to fetch block hash "+ "for height %d: %v", n.bestHeight, err) n.heightMtx.Unlock() continue