From 71a81f59a99bbba5093c47e50fcbb10bff0eff1e Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:27 -0700 Subject: [PATCH 01/13] chainntnfs: allow clients to pass in best block Clients can optionally pass their best block known into RegisterBlockEpochNtfn. This enables the notifiers to catch up clients on blocks they may have missed. --- chainntnfs/bitcoindnotify/bitcoind.go | 13 +++++++++++-- chainntnfs/btcdnotify/btcd.go | 14 +++++++++++--- chainntnfs/interface.go | 7 ++++++- chainntnfs/neutrinonotify/neutrino.go | 17 +++++++++++++---- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index b3e5f7c7..cf367217 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -805,6 +805,10 @@ type blockEpochRegistration struct { epochQueue *chainntnfs.ConcurrentQueue + bestBlock *chainntnfs.BlockEpoch + + errorChan chan error + cancelChan chan struct{} wg sync.WaitGroup @@ -818,13 +822,18 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notifications, of each new block connected to the main -// chain. -func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +// chain. Clients have the option of passing in their best known block, which +// the notifier uses to check if they are behind on blocks and catch them up. +func (b *BitcoindNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + reg := &blockEpochRegistration{ epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), + bestBlock: bestBlock, + errorChan: make(chan error, 1), } reg.epochQueue.Start() diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 2fb2f26c..5f45cf0f 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -16,7 +16,6 @@ import ( ) const ( - // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "btcd" @@ -857,6 +856,10 @@ type blockEpochRegistration struct { epochQueue *chainntnfs.ConcurrentQueue + bestBlock *chainntnfs.BlockEpoch + + errorChan chan error + cancelChan chan struct{} wg sync.WaitGroup @@ -870,13 +873,18 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notifications, of each new block connected to the main -// chain. -func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +// chain. Clients have the option of passing in their best known block, which +// the notifier uses to check if they are behind on blocks and catch them up. +func (b *BtcdNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + reg := &blockEpochRegistration{ epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), + bestBlock: bestBlock, + errorChan: make(chan error, 1), } reg.epochQueue.Start() diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index ff90cc0a..8f09c640 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -59,7 +59,12 @@ type ChainNotifier interface { // new block connected to the tip of the main chain. The returned // BlockEpochEvent struct contains a channel which will be sent upon // for each new block discovered. - RegisterBlockEpochNtfn() (*BlockEpochEvent, error) + // + // Clients have the option of passing in their best known block. + // If they specify a block, the ChainNotifier checks whether the client + // is behind on blocks. If they are, the ChainNotifier sends a backlog + // of block notifications for the missed blocks. + RegisterBlockEpochNtfn(*BlockEpoch) (*BlockEpochEvent, error) // Start the ChainNotifier. Once started, the implementation should be // ready, and able to receive notification registrations from clients. diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index f612ef1c..badda36b 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -20,7 +20,6 @@ import ( ) const ( - // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "neutrino" @@ -781,6 +780,10 @@ type blockEpochRegistration struct { cancelChan chan struct{} + bestBlock *chainntnfs.BlockEpoch + + errorChan chan error + wg sync.WaitGroup } @@ -790,14 +793,20 @@ type epochCancel struct { epochID uint64 } -// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller -// to receive notifications, of each new block connected to the main chain. -func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the +// caller to receive notifications, of each new block connected to the main +// chain. Clients have the option of passing in their best known block, which +// the notifier uses to check if they are behind on blocks and catch them up. +func (n *NeutrinoNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + reg := &blockEpochRegistration{ epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&n.epochClientCounter, 1), + bestBlock: bestBlock, + errorChan: make(chan error, 1), } reg.epochQueue.Start() From 1ffc3bb82e0b8186346379c53f1259dfe75f94f0 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:27 -0700 Subject: [PATCH 02/13] multi: update to latest RegisterBlockEpochNtfn interface --- chainntnfs/interface_test.go | 6 +++--- contractcourt/chain_arbitrator.go | 4 ++-- contractcourt/chain_watcher_test.go | 3 ++- contractcourt/contract_resolvers.go | 4 ++-- discovery/gossiper.go | 2 +- discovery/gossiper_test.go | 3 ++- fundingmanager.go | 2 +- fundingmanager_test.go | 3 ++- htlcswitch/decayedlog.go | 2 +- htlcswitch/mock.go | 3 ++- htlcswitch/switch.go | 2 +- mock.go | 3 ++- utxonursery.go | 2 +- 13 files changed, 22 insertions(+), 17 deletions(-) diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index b279195c..d408a66a 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -526,7 +526,7 @@ func testBlockEpochNotification(miner *rpctest.Harness, // blocks we generate below. So we'll use a WaitGroup to synchronize the // test. for i := 0; i < numClients; i++ { - epochClient, err := notifier.RegisterBlockEpochNtfn() + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) if err != nil { t.Fatalf("unable to register for epoch notification") } @@ -898,7 +898,7 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, // We create an epoch client we can use to make sure the notifier is // caught up to the mining node's chain. - epochClient, err := notifier.RegisterBlockEpochNtfn() + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) if err != nil { t.Fatalf("unable to register for block epoch: %v", err) } @@ -1082,7 +1082,7 @@ func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifie epochClients := make([]*chainntnfs.BlockEpochEvent, numClients) for i := 0; i < numClients; i++ { - epochClient, err := notifier.RegisterBlockEpochNtfn() + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) if err != nil { t.Fatalf("unable to register for epoch notification") } diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 97618d3a..1e728d4e 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -194,7 +194,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, // // TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest? // * reduces the number of goroutines - blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn() + blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return nil, err } @@ -384,7 +384,7 @@ func (c *ChainArbitrator) Start() error { // the chain any longer, only resolve the contracts on the confirmed // commitment. for _, closeChanInfo := range closingChannels { - blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn() + blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } diff --git a/contractcourt/chain_watcher_test.go b/contractcourt/chain_watcher_test.go index ab0b8f39..a8d513f7 100644 --- a/contractcourt/chain_watcher_test.go +++ b/contractcourt/chain_watcher_test.go @@ -21,7 +21,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { return nil, nil } -func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: make(chan *chainntnfs.BlockEpoch), Cancel: func() {}, diff --git a/contractcourt/contract_resolvers.go b/contractcourt/contract_resolvers.go index b525b5df..661ee9d0 100644 --- a/contractcourt/contract_resolvers.go +++ b/contractcourt/contract_resolvers.go @@ -855,7 +855,7 @@ func (h *htlcOutgoingContestResolver) Resolve() (ContractResolver, error) { // If we reach this point, then we can't fully act yet, so we'll await // either of our signals triggering: the HTLC expires, or we learn of // the preimage. - blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn() + blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return nil, err } @@ -1043,7 +1043,7 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) { // ensure the preimage can't be delivered between querying and // registering for the preimage subscription. preimageSubscription := h.PreimageDB.SubscribeUpdates() - blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn() + blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return nil, err } diff --git a/discovery/gossiper.go b/discovery/gossiper.go index f9166683..28e09f25 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -408,7 +408,7 @@ func (d *AuthenticatedGossiper) Start() error { // First we register for new notifications of newly discovered blocks. // We do this immediately so we'll later be able to consume any/all // blocks which were discovered. - blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn() + blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 44edf187..dc0ad4f7 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -280,7 +280,8 @@ func (m *mockNotifier) notifyBlock(hash chainhash.Hash, height uint32) { } } -func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { m.RLock() defer m.RUnlock() diff --git a/fundingmanager.go b/fundingmanager.go index a1180750..3fcce0a3 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1705,7 +1705,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel, confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) { - epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn() + epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { fndgLog.Errorf("unable to register for epoch notification: %v", err) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 6038aaff..224e1e6e 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -112,7 +112,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, }, nil } -func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: m.epochChan, Cancel: func() {}, diff --git a/htlcswitch/decayedlog.go b/htlcswitch/decayedlog.go index 93eb0e51..d6845002 100644 --- a/htlcswitch/decayedlog.go +++ b/htlcswitch/decayedlog.go @@ -103,7 +103,7 @@ func (d *DecayedLog) Start() error { // Start garbage collector. if d.notifier != nil { - epochClient, err := d.notifier.RegisterBlockEpochNtfn() + epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil) if err != nil { return fmt.Errorf("Unable to register for epoch "+ "notifications: %v", err) diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 00795c51..64f843fc 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -790,7 +790,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs uint32, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { return nil, nil } -func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: m.epochChan, Cancel: func() {}, diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index e2ee14b0..8f19cfb3 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1596,7 +1596,7 @@ func (s *Switch) Start() error { log.Infof("Starting HTLC Switch") - blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn() + blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } diff --git a/mock.go b/mock.go index f41bf532..b2d4d89a 100644 --- a/mock.go +++ b/mock.go @@ -92,7 +92,8 @@ func (m *mockNotfier) RegisterConfirmationsNtfn(txid *chainhash.Hash, Confirmed: m.confChannel, }, nil } -func (m *mockNotfier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotfier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: make(chan *chainntnfs.BlockEpoch), Cancel: func() {}, diff --git a/utxonursery.go b/utxonursery.go index eba9de75..f3bf362e 100644 --- a/utxonursery.go +++ b/utxonursery.go @@ -257,7 +257,7 @@ func (u *utxoNursery) Start() error { // connected block. We register immediately on startup to ensure that // no blocks are missed while we are handling blocks that were missed // during the time the UTXO nursery was unavailable. - newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn() + newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } From d4cf271526e8f09428d314b00b5c16467382c957 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:28 -0700 Subject: [PATCH 03/13] chainntnfs: track best block in btcd and bitcoind --- chainntnfs/bitcoindnotify/bitcoind.go | 25 +++++++++++++++---------- chainntnfs/btcdnotify/btcd.go | 23 +++++++++++++---------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index cf367217..4ebd4650 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -74,6 +74,8 @@ type BitcoindNotifier struct { blockEpochClients map[uint64]*blockEpochRegistration + bestBlock chainntnfs.BlockEpoch + wg sync.WaitGroup quit chan struct{} } @@ -119,7 +121,7 @@ func (b *BitcoindNotifier) Start() error { return err } - _, currentHeight, err := b.chainConn.GetBestBlock() + currentHash, currentHeight, err := b.chainConn.GetBestBlock() if err != nil { return err } @@ -127,8 +129,13 @@ func (b *BitcoindNotifier) Start() error { b.txConfNotifier = chainntnfs.NewTxConfNotifier( uint32(currentHeight), reorgSafetyLimit) + b.bestBlock = chainntnfs.BlockEpoch{ + Height: currentHeight, + Hash: currentHash, + } + b.wg.Add(1) - go b.notificationDispatcher(currentHeight) + go b.notificationDispatcher() return nil } @@ -174,7 +181,7 @@ type blockNtfn struct { // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. -func (b *BitcoindNotifier) notificationDispatcher(bestHeight int32) { +func (b *BitcoindNotifier) notificationDispatcher() { out: for { select { @@ -235,7 +242,7 @@ out: "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - currentHeight := uint32(bestHeight) + currentHeight := uint32(b.bestBlock.Height) // Look up whether the transaction is already // included in the active chain. We'll do this @@ -270,19 +277,18 @@ out: b.blockEpochClients[msg.epochID] = msg case chain.RelevantTx: - b.handleRelevantTx(msg, bestHeight) + b.handleRelevantTx(msg, b.bestBlock.Height) } case ntfn := <-b.chainConn.Notifications(): switch item := ntfn.(type) { case chain.BlockConnected: - if item.Height != bestHeight+1 { + if item.Height != b.bestBlock.Height+1 { chainntnfs.Log.Warnf("Received blocks out of order: "+ "current height=%d, new height=%d", bestHeight, item.Height) continue } - bestHeight = item.Height rawBlock, err := b.chainConn.GetBlock(&item.Hash) if err != nil { @@ -304,14 +310,13 @@ out: continue case chain.BlockDisconnected: - if item.Height != bestHeight { + if item.Height != b.bestBlock.Height { chainntnfs.Log.Warnf("Received blocks "+ "out of order: current height="+ "%d, disconnected height=%d", bestHeight, item.Height) continue } - bestHeight = item.Height - 1 chainntnfs.Log.Infof("Block disconnected from "+ "main chain: height=%v, sha=%v", @@ -324,7 +329,7 @@ out: } case chain.RelevantTx: - b.handleRelevantTx(item, bestHeight) + b.handleRelevantTx(item, b.bestBlock.Height) } case <-b.quit: diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 5f45cf0f..85155e61 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -78,6 +78,8 @@ type BtcdNotifier struct { blockEpochClients map[uint64]*blockEpochRegistration + bestBlock chainntnfs.BlockEpoch + chainUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue @@ -142,7 +144,7 @@ func (b *BtcdNotifier) Start() error { return err } - _, currentHeight, err := b.chainConn.GetBestBlock() + currentHash, currentHeight, err := b.chainConn.GetBestBlock() if err != nil { return err } @@ -150,11 +152,16 @@ func (b *BtcdNotifier) Start() error { b.txConfNotifier = chainntnfs.NewTxConfNotifier( uint32(currentHeight), reorgSafetyLimit) + b.bestBlock = chainntnfs.BlockEpoch{ + Height: currentHeight, + Hash: currentHash, + } + b.chainUpdates.Start() b.txUpdates.Start() b.wg.Add(1) - go b.notificationDispatcher(currentHeight) + go b.notificationDispatcher() return nil } @@ -244,7 +251,7 @@ func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetai // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. -func (b *BtcdNotifier) notificationDispatcher(currentHeight int32) { +func (b *BtcdNotifier) notificationDispatcher() { out: for { select { @@ -304,7 +311,7 @@ out: "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - bestHeight := uint32(currentHeight) + bestHeight := uint32(b.bestBlock.Height) // Look up whether the transaction is already // included in the active chain. We'll do this @@ -342,15 +349,13 @@ out: case item := <-b.chainUpdates.ChanOut(): update := item.(*chainUpdate) if update.connect { - if update.blockHeight != currentHeight+1 { + if update.blockHeight != b.bestBlock.Height+1 { chainntnfs.Log.Warnf("Received blocks out of order: "+ "current height=%d, new height=%d", currentHeight, update.blockHeight) continue } - currentHeight = update.blockHeight - rawBlock, err := b.chainConn.GetBlock(update.blockHash) if err != nil { chainntnfs.Log.Errorf("Unable to get block: %v", err) @@ -374,15 +379,13 @@ out: continue } - if update.blockHeight != currentHeight { + if update.blockHeight != b.bestBlock.Height { chainntnfs.Log.Warnf("Received blocks out of order: "+ "current height=%d, disconnected height=%d", currentHeight, update.blockHeight) continue } - currentHeight = update.blockHeight - 1 - chainntnfs.Log.Infof("Block disconnected from main chain: "+ "height=%v, sha=%v", update.blockHeight, update.blockHash) From f4005175d8d1ceb8967bbe031a1940ff1330d27a Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:28 -0700 Subject: [PATCH 04/13] chainntnfs/interface: add ChainConn interface This allows notifiers to pass their chain backend into interface functions to retrieve information from the chain. --- chainntnfs/interface.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index 8f09c640..356edc14 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" ) @@ -253,3 +254,17 @@ func SupportedNotifiers() []string { return supportedNotifiers } + +// ChainConn enables notifiers to pass in their chain backend to interface +// functions that require it. +type ChainConn interface { + // GetBlockHeader returns the block header for a hash. + GetBlockHeader(blockHash *chainhash.Hash) (*wire.BlockHeader, error) + + // GetBlockHeaderVerbose returns the verbose block header for a hash. + GetBlockHeaderVerbose(blockHash *chainhash.Hash) ( + *btcjson.GetBlockHeaderVerboseResult, error) + + // GetBlockHash returns the hash from a block height. + GetBlockHash(blockHeight int64) (*chainhash.Hash, error) +} From 02ee5650c871e5ad05d42e91bfe7e026cc1b292a Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:28 -0700 Subject: [PATCH 05/13] chainntnfs/neutrino: add neutrino ChainConn implementation --- chainntnfs/neutrinonotify/neutrino.go | 43 +++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index badda36b..da330dce 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/txscript" @@ -61,6 +62,8 @@ type NeutrinoNotifier struct { p2pNode *neutrino.ChainService chainView *neutrino.Rescan + chainConn *NeutrinoChainConn + notificationCancels chan interface{} notificationRegistry chan interface{} @@ -150,6 +153,8 @@ func (n *NeutrinoNotifier) Start() error { bestHeight, reorgSafetyLimit, ) + n.chainConn = &NeutrinoChainConn{n.p2pNode} + // Finally, we'll create our rescan struct, start it, and launch all // the goroutines we need to operate this ChainNotifier instance. n.chainView = n.p2pNode.NewRescan(rescanOptions...) @@ -877,3 +882,41 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn( }, nil } } + +// NeutrinoChainConn is a wrapper around neutrino's chain backend in order +// to satisfy the chainntnfs.ChainConn interface. +type NeutrinoChainConn struct { + p2pNode *neutrino.ChainService +} + +// GetBlockHeader returns the block header for a hash. +func (n *NeutrinoChainConn) GetBlockHeader(blockHash *chainhash.Hash) (*wire.BlockHeader, error) { + header, _, err := n.p2pNode.BlockHeaders.FetchHeader(blockHash) + if err != nil { + return nil, err + } + return header, nil +} + +// GetBlockHeaderVerbose returns a verbose block header result for a hash. This +// result only contains the height with a nil hash. +func (n *NeutrinoChainConn) GetBlockHeaderVerbose(blockHash *chainhash.Hash) ( + *btcjson.GetBlockHeaderVerboseResult, error) { + + _, height, err := n.p2pNode.BlockHeaders.FetchHeader(blockHash) + if err != nil { + return nil, err + } + // Since only the height is used from the result, leave the hash nil. + return &btcjson.GetBlockHeaderVerboseResult{Height: int32(height)}, nil +} + +// GetBlockHash returns the hash from a block height. +func (n *NeutrinoChainConn) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) { + header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(uint32(blockHeight)) + if err != nil { + return nil, err + } + hash := header.BlockHash() + return &hash, nil +} From a5e1cf9c973a2cea79a48ddaeb3722485883af6a Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:29 -0700 Subject: [PATCH 06/13] chainntnfs: dispatch historical block ntfns to clients If a client passes in their best known block when registering for block notifications, check to see if it's behind our best block. If so, dispatch the missed block notifications to the client. This is necessary because clients that persist their best known block can miss new blocks while registering for notifications. --- chainntnfs/bitcoindnotify/bitcoind.go | 43 ++++++++--- chainntnfs/btcdnotify/btcd.go | 44 +++++++++--- chainntnfs/interface.go | 100 ++++++++++++++++++++++++++ chainntnfs/neutrinonotify/neutrino.go | 46 +++++++++--- 4 files changed, 200 insertions(+), 33 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 4ebd4650..347b12c1 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -275,6 +275,22 @@ out: case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + if msg.bestBlock != nil { + missedBlocks, err := + chainntnfs.GetClientMissedBlocks( + b.chainConn, msg.bestBlock, + b.bestBlock.Height, true, + ) + if err != nil { + msg.errorChan <- err + continue + } + for _, block := range missedBlocks { + b.notifyBlockEpochClient(msg, + block.Height, block.Hash) + } + } + msg.errorChan <- nil case chain.RelevantTx: b.handleRelevantTx(msg, b.bestBlock.Height) @@ -525,20 +541,25 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash, // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + for _, client := range b.blockEpochClients { + b.notifyBlockEpochClient(client, newHeight, newSha) + } +} + +// notifyBlockEpochClient sends a registered block epoch client a notification +// about a specific block. +func (b *BitcoindNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration, + height int32, sha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ - Height: newHeight, - Hash: newSha, + Height: height, + Hash: sha, } - for _, epochClient := range b.blockEpochClients { - select { - - case epochClient.epochQueue.ChanIn() <- epoch: - - case <-epochClient.cancelChan: - - case <-b.quit: - } + select { + case epochClient.epochQueue.ChanIn() <- epoch: + case <-epochClient.cancelChan: + case <-b.quit: } } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 85155e61..0103b421 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -344,6 +344,23 @@ out: case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + if msg.bestBlock != nil { + missedBlocks, err := + chainntnfs.GetClientMissedBlocks( + b.chainConn, msg.bestBlock, + b.bestBlock.Height, true, + ) + if err != nil { + msg.errorChan <- err + continue + } + for _, block := range missedBlocks { + b.notifyBlockEpochClient(msg, + block.Height, block.Hash) + } + + } + msg.errorChan <- nil } case item := <-b.chainUpdates.ChanOut(): @@ -652,20 +669,25 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + for _, client := range b.blockEpochClients { + b.notifyBlockEpochClient(client, newHeight, newSha) + } +} + +// notifyBlockEpochClient sends a registered block epoch client a notification +// about a specific block. +func (b *BtcdNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration, + height int32, sha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ - Height: newHeight, - Hash: newSha, + Height: height, + Hash: sha, } - for _, epochClient := range b.blockEpochClients { - select { - - case epochClient.epochQueue.ChanIn() <- epoch: - - case <-epochClient.cancelChan: - - case <-b.quit: - } + select { + case epochClient.epochQueue.ChanIn() <- epoch: + case <-epochClient.cancelChan: + case <-b.quit: } } diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index 356edc14..8ec223b8 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -268,3 +268,103 @@ type ChainConn interface { // GetBlockHash returns the hash from a block height. GetBlockHash(blockHeight int64) (*chainhash.Hash, error) } + +// GetCommonBlockAncestorHeight takes in: +// (1) the hash of a block that has been reorged out of the main chain +// (2) the hash of the block of the same height from the main chain +// It returns the height of the nearest common ancestor between the two hashes, +// or an error +func GetCommonBlockAncestorHeight(chainConn ChainConn, reorgHash, + chainHash chainhash.Hash) (int32, error) { + + for reorgHash != chainHash { + reorgHeader, err := chainConn.GetBlockHeader(&reorgHash) + if err != nil { + return 0, fmt.Errorf("unable to get header for hash=%v: %v", + reorgHash, err) + } + chainHeader, err := chainConn.GetBlockHeader(&chainHash) + if err != nil { + return 0, fmt.Errorf("unable to get header for hash=%v: %v", + chainHash, err) + } + reorgHash = reorgHeader.PrevBlock + chainHash = chainHeader.PrevBlock + } + + verboseHeader, err := chainConn.GetBlockHeaderVerbose(&chainHash) + if err != nil { + return 0, fmt.Errorf("unable to get verbose header for hash=%v: %v", + chainHash, err) + } + + return verboseHeader.Height, nil +} + +// GetClientMissedBlocks uses a client's best block to determine what blocks +// it missed being notified about, and returns them in a slice. Its +// backendStoresReorgs parameter tells it whether or not the notifier's +// chainConn stores information about blocks that have been reorged out of the +// chain, which allows GetClientMissedBlocks to find out whether the client's +// best block has been reorged out of the chain, rewind to the common ancestor +// and return blocks starting right after the common ancestor. +func GetClientMissedBlocks(chainConn ChainConn, clientBestBlock *BlockEpoch, + notifierBestHeight int32, backendStoresReorgs bool) ([]BlockEpoch, error) { + + startingHeight := clientBestBlock.Height + if backendStoresReorgs { + // If a reorg causes the client's best hash to be incorrect, + // retrieve the closest common ancestor and dispatch + // notifications from there. + hashAtBestHeight, err := chainConn.GetBlockHash( + int64(clientBestBlock.Height)) + if err != nil { + return nil, fmt.Errorf("unable to find blockhash for "+ + "height=%d: %v", clientBestBlock.Height, err) + } + + startingHeight, err = GetCommonBlockAncestorHeight( + chainConn, *clientBestBlock.Hash, *hashAtBestHeight, + ) + if err != nil { + return nil, fmt.Errorf("unable to find common ancestor: "+ + "%v", err) + } + } + + // We want to start dispatching historical notifications from the block + // right after the client's best block, to avoid a redundant notification. + missedBlocks, err := getMissedBlocks( + chainConn, startingHeight+1, notifierBestHeight+1, + ) + if err != nil { + return nil, fmt.Errorf("unable to get missed blocks: %v", err) + } + + return missedBlocks, nil +} + +// getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight) +// fetched from the chain. +func getMissedBlocks(chainConn ChainConn, startingHeight, + endingHeight int32) ([]BlockEpoch, error) { + + numMissedBlocks := endingHeight - startingHeight + if numMissedBlocks < 0 { + return nil, fmt.Errorf("starting height %d is greater than "+ + "ending height %d", startingHeight, endingHeight) + } + + missedBlocks := make([]BlockEpoch, 0, numMissedBlocks) + for height := startingHeight; height < endingHeight; height++ { + hash, err := chainConn.GetBlockHash(int64(height)) + if err != nil { + return nil, fmt.Errorf("unable to find blockhash for height=%d: %v", + height, err) + } + missedBlocks = append(missedBlocks, + BlockEpoch{Hash: hash, Height: height}) + } + + return missedBlocks, nil +} diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index da330dce..1da5b404 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -370,6 +370,25 @@ func (n *NeutrinoNotifier) notificationDispatcher() { case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") n.blockEpochClients[msg.epochID] = msg + if msg.bestBlock != nil { + n.heightMtx.Lock() + bestHeight := int32(n.bestHeight) + n.heightMtx.Unlock() + missedBlocks, err := + chainntnfs.GetClientMissedBlocks( + n.chainConn, msg.bestBlock, + bestHeight, false, + ) + if err != nil { + msg.errorChan <- err + continue + } + for _, block := range missedBlocks { + n.notifyBlockEpochClient(msg, + block.Height, block.Hash) + } + } + msg.errorChan <- nil } case item := <-n.chainUpdates.ChanOut(): @@ -572,20 +591,25 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + for _, client := range n.blockEpochClients { + n.notifyBlockEpochClient(client, newHeight, newSha) + } +} + +// notifyBlockEpochClient sends a registered block epoch client a notification +// about a specific block. +func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration, + height int32, sha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ - Height: newHeight, - Hash: newSha, + Height: height, + Hash: sha, } - for _, epochClient := range n.blockEpochClients { - select { - - case epochClient.epochQueue.ChanIn() <- epoch: - - case <-epochClient.cancelChan: - - case <-n.quit: - } + select { + case epochClient.epochQueue.ChanIn() <- epoch: + case <-epochClient.cancelChan: + case <-n.quit: } } From cbf1799c40ee3d38e89a94d0fd37ecccb81b71de Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:29 -0700 Subject: [PATCH 07/13] chainntnfs: rewind chain on missed disconnected blocks If the chain backend misses telling the notifier about a series of disconnected blocks, the notifier is now able to disconnect the tip to its new best block. --- chainntnfs/bitcoindnotify/bitcoind.go | 25 ++++++++-------- chainntnfs/btcdnotify/btcd.go | 22 ++++++++------ chainntnfs/interface.go | 34 ++++++++++++++++++++++ chainntnfs/neutrinonotify/neutrino.go | 42 ++++++++++++++++++--------- 4 files changed, 88 insertions(+), 35 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 347b12c1..91fa4543 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -327,23 +327,24 @@ out: case chain.BlockDisconnected: if item.Height != b.bestBlock.Height { - chainntnfs.Log.Warnf("Received blocks "+ - "out of order: current height="+ - "%d, disconnected height=%d", - bestHeight, item.Height) - continue + chainntnfs.Log.Infof("Missed disconnected" + + "blocks, attempting to catch up") } - chainntnfs.Log.Infof("Block disconnected from "+ - "main chain: height=%v, sha=%v", - item.Height, item.Hash) - - err := b.txConfNotifier.DisconnectTip( - uint32(item.Height)) + newBestBlock, err := chainntnfs.RewindChain( + b.chainConn, b.txConfNotifier, + b.bestBlock, item.Height-1, + ) if err != nil { - chainntnfs.Log.Error(err) + chainntnfs.Log.Errorf("Unable to rewind chain "+ + "from height %d to height %d: %v", + b.bestBlock.Height, item.Height-1, err) } + // Set the bestBlock here in case a chain + // rewind partially completed. + b.bestBlock = newBestBlock + case chain.RelevantTx: b.handleRelevantTx(item, b.bestBlock.Height) } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 0103b421..51e2c95d 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -397,20 +397,24 @@ out: } if update.blockHeight != b.bestBlock.Height { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, disconnected height=%d", - currentHeight, update.blockHeight) - continue + chainntnfs.Log.Infof("Missed disconnected" + + "blocks, attempting to catch up") } - chainntnfs.Log.Infof("Block disconnected from main chain: "+ - "height=%v, sha=%v", update.blockHeight, update.blockHash) - - err := b.txConfNotifier.DisconnectTip(uint32(update.blockHeight)) + newBestBlock, err := chainntnfs.RewindChain( + b.chainConn, b.txConfNotifier, b.bestBlock, + update.blockHeight-1, + ) if err != nil { - chainntnfs.Log.Error(err) + chainntnfs.Log.Errorf("Unable to rewind chain "+ + "from height %d to height %d: %v", + b.bestBlock.Height, update.blockHeight-1, err) } + // Set the bestBlock here in case a chain rewind + // partially completed. + b.bestBlock = newBestBlock + // NOTE: we currently only use txUpdates for mempool spends and // rescan spends. It might get removed entirely in the future. case item := <-b.txUpdates.ChanOut(): diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index 8ec223b8..c46125c1 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -344,6 +344,40 @@ func GetClientMissedBlocks(chainConn ChainConn, clientBestBlock *BlockEpoch, return missedBlocks, nil } +// RewindChain handles internal state updates for the notifier's TxConfNotifier +// It has no effect if given a height greater than or equal to our current best +// known height. It returns the new best block for the notifier. +func RewindChain(chainConn ChainConn, txConfNotifier *TxConfNotifier, + currBestBlock BlockEpoch, targetHeight int32) (BlockEpoch, error) { + + newBestBlock := BlockEpoch{ + Height: currBestBlock.Height, + Hash: currBestBlock.Hash, + } + + for height := currBestBlock.Height; height > targetHeight; height-- { + hash, err := chainConn.GetBlockHash(int64(height - 1)) + if err != nil { + return newBestBlock, fmt.Errorf("unable to "+ + "find blockhash for disconnected height=%d: %v", + height, err) + } + + Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", height, newBestBlock.Hash) + + err = txConfNotifier.DisconnectTip(uint32(height)) + if err != nil { + return newBestBlock, fmt.Errorf("unable to "+ + " disconnect tip for height=%d: %v", + height, err) + } + newBestBlock.Height = height - 1 + newBestBlock.Hash = hash + } + return newBestBlock, nil +} + // getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight) // fetched from the chain. func getMissedBlocks(chainConn ChainConn, startingHeight, diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 1da5b404..ea7c270f 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -365,6 +365,7 @@ func (n *NeutrinoNotifier) notificationDispatcher() { "to update rescan: %v", err) } + }() case *blockEpochRegistration: @@ -417,24 +418,37 @@ func (n *NeutrinoNotifier) notificationDispatcher() { } n.heightMtx.Lock() - if update.height != n.bestHeight { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, disconnected height=%d", - n.bestHeight, update.height) - n.heightMtx.Unlock() - continue + if update.height != uint32(n.bestHeight) { + chainntnfs.Log.Infof("Missed disconnected" + + "blocks, attempting to catch up") } - n.bestHeight = update.height - 1 - n.heightMtx.Unlock() - - chainntnfs.Log.Infof("Block disconnected from main chain: "+ - "height=%v, sha=%v", update.height, update.hash) - - err := n.txConfNotifier.DisconnectTip(update.height) + header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight( + n.bestHeight, + ) if err != nil { - chainntnfs.Log.Error(err) + chainntnfs.Log.Errorf("Unable to fetch header"+ + "for height %d: %v", n.bestHeight, err) } + hash := header.BlockHash() + notifierBestBlock := chainntnfs.BlockEpoch{ + Height: int32(n.bestHeight), + Hash: &hash, + } + newBestBlock, err := chainntnfs.RewindChain( + n.chainConn, n.txConfNotifier, notifierBestBlock, + int32(update.height-1), + ) + if err != nil { + chainntnfs.Log.Errorf("Unable to rewind chain "+ + "from height %d to height %d: %v", + n.bestHeight, update.height-1, err) + } + + // Set the bestHeight here in case a chain rewind + // partially completed. + n.bestHeight = uint32(newBestBlock.Height) + n.heightMtx.Unlock() case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err) From 3df5b26699157848609950c39b3c75e6e0883499 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:29 -0700 Subject: [PATCH 08/13] chainntnfs: notify clients after block connect has succeeded This prevents the situation where we notify clients about a newly connected block, and then the block connection itself fails. We also want to set our best block in between connecting the block and notifying clients, in case a client makes queries about the new block they have received. --- chainntnfs/bitcoindnotify/bitcoind.go | 29 +++++++++++++ chainntnfs/btcdnotify/btcd.go | 59 +++++++++++++++++++-------- chainntnfs/neutrinonotify/neutrino.go | 55 ++++++++++++++++++------- 3 files changed, 112 insertions(+), 31 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 91fa4543..adad07dd 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -539,6 +539,35 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash, return nil, nil } +// handleBlockConnected applies a chain update for a new block. Any watched +// transactions included this block will processed to either send notifications +// now or after numConfirmations confs. +func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) error { + rawBlock, err := b.chainConn.GetBlock(block.Hash) + if err != nil { + return fmt.Errorf("unable to get block: %v", err) + } + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + block.Height, block.Hash) + + txns := btcutil.NewBlock(rawBlock).Transactions() + err = b.txConfNotifier.ConnectTip( + block.Hash, uint32(block.Height), txns) + if err != nil { + return fmt.Errorf("unable to connect tip: %v", err) + } + + // We want to set the best block before dispatching notifications so + // if any subscribers make queries based on their received block epoch, + // our state is fully updated in time. + b.bestBlock = block + + b.notifyBlockEpochs(block.Height, block.Hash) + + return nil +} + // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 51e2c95d..177d9880 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -613,17 +613,47 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, return nil, nil } -// handleBlocksConnected applies a chain update for a new block. Any watched +// handleBlockConnected applies a chain update for a new block. Any watched // transactions included this block will processed to either send notifications // now or after numConfirmations confs. // TODO(halseth): this is reusing the neutrino notifier implementation, unify // them. -func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { - // First we'll notify any subscribed clients of the block. +func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { + // First process the block for our internal state. A new block has + // been connected to the main chain. Send out any N confirmation + // notifications which may have been triggered by this new block. + rawBlock, err := b.chainConn.GetBlock(epoch.Hash) + if err != nil { + return fmt.Errorf("unable to get block: %v", err) + } + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + epoch.Height, epoch.Hash) + + txns := btcutil.NewBlock(rawBlock).Transactions() + + newBlock := &filteredBlock{ + hash: *epoch.Hash, + height: uint32(epoch.Height), + txns: txns, + connect: true, + } + err = b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, + newBlock.txns) + if err != nil { + return fmt.Errorf("unable to connect tip: %v", err) + } + + // We want to set the best block before dispatching notifications + // so if any subscribers make queries based on their received + // block epoch, our state is fully updated in time. + b.bestBlock = epoch + + // Next we'll notify any subscribed clients of the block. b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Next, we'll scan over the list of relevant transactions and possibly - // dispatch notifications for confirmations and spends. + // Finally, we'll scan over the list of relevant transactions and + // possibly dispatch notifications for confirmations and spends. for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -631,9 +661,10 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { for i, txIn := range mtx.TxIn { prevOut := txIn.PreviousOutPoint - // If this transaction indeed does spend an output which we have a - // registered notification for, then create a spend summary, finally - // sending off the details to the notification subscriber. + // If this transaction indeed does spend an output which + // we have a registered notification for, then create a + // spend summary, finally sending off the details to the + // notification subscriber. clients, ok := b.spendNotifications[prevOut] if !ok { continue @@ -652,9 +683,10 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { "outpoint=%v", ntfn.targetOutpoint) ntfn.spendChan <- spendDetails - // Close spendChan to ensure that any calls to Cancel will not - // block. This is safe to do since the channel is buffered, and - // the message can still be read by the receiver. + // Close spendChan to ensure that any calls to + // Cancel will not block. This is safe to do + // since the channel is buffered, and the + // message can still be read by the receiver. close(ntfn.spendChan) } @@ -662,11 +694,6 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { } } - // A new block has been connected to the main chain. - // Send out any N confirmation notifications which may - // have been triggered by this new block. - b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns) - return nil } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index ea7c270f..54459bca 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -541,15 +541,29 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, return nil, nil } -// handleBlocksConnected applies a chain update for a new block. Any watched +// handleBlockConnected applies a chain update for a new block. Any watched // transactions included this block will processed to either send notifications // now or after numConfirmations confs. func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { - // First we'll notify any subscribed clients of the block. + // First process the block for our internal state. A new block has + // been connected to the main chain. Send out any N confirmation + // notifications which may have been triggered by this new block. + err := n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, + newBlock.txns) + if err != nil { + return fmt.Errorf("unable to connect tip: %v", err) + } + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + newBlock.height, newBlock.hash) + + n.bestHeight = newBlock.height + + // Next, notify any subscribed clients of the block. n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Next, we'll scan over the list of relevant transactions and possibly - // dispatch notifications for confirmations and spends. + // Finally, we'll scan over the list of relevant transactions and + // possibly dispatch notifications for confirmations and spends. for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -557,10 +571,10 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { for i, txIn := range mtx.TxIn { prevOut := txIn.PreviousOutPoint - // If this transaction indeed does spend an output - // which we have a registered notification for, then - // create a spend summary, finally sending off the - // details to the notification subscriber. + // If this transaction indeed does spend an output which + // we have a registered notification for, then create a + // spend summary, finally sending off the details to the + // notification subscriber. clients, ok := n.spendNotifications[prevOut] if !ok { continue @@ -592,16 +606,27 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { } } - // A new block has been connected to the main chain. Send out any N - // confirmation notifications which may have been triggered by this new - // block. - n.txConfNotifier.ConnectTip( - &newBlock.hash, newBlock.height, newBlock.txns, - ) - return nil } +// getFilteredBlock is a utility to retrieve the full filtered block from a block epoch. +func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filteredBlock, error) { + rawBlock, err := n.p2pNode.GetBlockFromNetwork(*epoch.Hash) + if err != nil { + return nil, fmt.Errorf("unable to get block: %v", err) + } + + txns := rawBlock.Transactions() + + block := &filteredBlock{ + hash: *epoch.Hash, + height: uint32(epoch.Height), + txns: txns, + connect: true, + } + return block, nil +} + // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { From 79cbea1c9cd6402a12de5669e6ce5ed68e14c0ed Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:30 -0700 Subject: [PATCH 09/13] chainntnfs: enable notifiers to catch up on missed blocks This resolves the situation where a notifier's chain backend skips a series of blocks, causing the notifier to need to dispatch historical block notifications to clients. Additionally, if the current notifier's best block has been reorged out, this logic enables the notifier to rewind to the common ancestor between the current chain and the outdated best block and dispatches notifications from the ancestor. --- chainntnfs/bitcoindnotify/bitcoind.go | 57 +++++++++++++++++------- chainntnfs/btcdnotify/btcd.go | 57 ++++++++++++++++-------- chainntnfs/interface.go | 59 +++++++++++++++++++++++- chainntnfs/neutrinonotify/neutrino.go | 64 ++++++++++++++++++++++----- 4 files changed, 187 insertions(+), 50 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index adad07dd..21140994 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -299,30 +299,53 @@ out: case ntfn := <-b.chainConn.Notifications(): switch item := ntfn.(type) { case chain.BlockConnected: - if item.Height != b.bestBlock.Height+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - bestHeight, item.Height) + blockHeader, err := + b.chainConn.GetBlockHeader(&item.Hash) + if err != nil { + chainntnfs.Log.Errorf("Unable to fetch "+ + "block header: %v", err) continue } - rawBlock, err := b.chainConn.GetBlock(&item.Hash) - if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) - continue + if blockHeader.PrevBlock != *b.bestBlock.Hash { + // Handle the case where the notifier + // missed some blocks from its chain + // backend. + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + newBestBlock, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + b.chainConn, + b.txConfNotifier, + b.bestBlock, item.Height, + true, + ) + + if err != nil { + // Set the bestBlock here in case + // a catch up partially completed. + b.bestBlock = newBestBlock + chainntnfs.Log.Error(err) + continue + } + + for _, block := range missedBlocks { + err := b.handleBlockConnected(block) + if err != nil { + chainntnfs.Log.Error(err) + continue out + } + } } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - item.Height, item.Hash) - - b.notifyBlockEpochs(item.Height, &item.Hash) - - txns := btcutil.NewBlock(rawBlock).Transactions() - err = b.txConfNotifier.ConnectTip(&item.Hash, - uint32(item.Height), txns) - if err != nil { + newBlock := chainntnfs.BlockEpoch{ + Height: item.Height, + Hash: &item.Hash, + } + if err := b.handleBlockConnected(newBlock); err != nil { chainntnfs.Log.Error(err) } + continue case chain.BlockDisconnected: diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 177d9880..c7cb6995 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -366,31 +366,50 @@ out: case item := <-b.chainUpdates.ChanOut(): update := item.(*chainUpdate) if update.connect { - if update.blockHeight != b.bestBlock.Height+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - currentHeight, update.blockHeight) - continue - } - - rawBlock, err := b.chainConn.GetBlock(update.blockHash) + blockHeader, err := + b.chainConn.GetBlockHeader(update.blockHash) if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) + chainntnfs.Log.Errorf("Unable to fetch "+ + "block header: %v", err) continue } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.blockHeight, update.blockHash) + if blockHeader.PrevBlock != *b.bestBlock.Hash { + // Handle the case where the notifier + // missed some blocks from its chain + // backend + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + newBestBlock, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + b.chainConn, + b.txConfNotifier, + b.bestBlock, + update.blockHeight, + true, + ) + if err != nil { + // Set the bestBlock here in case + // a catch up partially completed. + b.bestBlock = newBestBlock + chainntnfs.Log.Error(err) + continue + } - txns := btcutil.NewBlock(rawBlock).Transactions() - - block := &filteredBlock{ - hash: *update.blockHash, - height: uint32(update.blockHeight), - txns: txns, - connect: true, + for _, block := range missedBlocks { + err := b.handleBlockConnected(block) + if err != nil { + chainntnfs.Log.Error(err) + continue out + } + } } - if err := b.handleBlockConnected(block); err != nil { + + newBlock := chainntnfs.BlockEpoch{ + Height: update.blockHeight, + Hash: update.blockHash, + } + if err := b.handleBlockConnected(newBlock); err != nil { chainntnfs.Log.Error(err) } continue diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index c46125c1..7c7a2ec6 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -378,6 +378,61 @@ func RewindChain(chainConn ChainConn, txConfNotifier *TxConfNotifier, return newBestBlock, nil } +// HandleMissedBlocks is called when the chain backend for a notifier misses a +// series of blocks, handling a reorg if necessary. Its backendStoresReorgs +// parameter tells it whether or not the notifier's chainConn stores +// information about blocks that have been reorged out of the chain, which allows +// HandleMissedBlocks to check whether the notifier's best block has been +// reorged out, and rewind the chain accordingly. It returns the best block for +// the notifier and a slice of the missed blocks. The new best block needs to be +// returned in case a chain rewind occurs and partially completes before +// erroring. In the case where there is no rewind, the notifier's +// current best block is returned. +func HandleMissedBlocks(chainConn ChainConn, txConfNotifier *TxConfNotifier, + currBestBlock BlockEpoch, newHeight int32, + backendStoresReorgs bool) (BlockEpoch, []BlockEpoch, error) { + + startingHeight := currBestBlock.Height + + if backendStoresReorgs { + // If a reorg causes our best hash to be incorrect, rewind the + // chain so our best block is set to the closest common + // ancestor, then dispatch notifications from there. + hashAtBestHeight, err := + chainConn.GetBlockHash(int64(currBestBlock.Height)) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to find "+ + "blockhash for height=%d: %v", + currBestBlock.Height, err) + } + + startingHeight, err = GetCommonBlockAncestorHeight( + chainConn, *currBestBlock.Hash, *hashAtBestHeight, + ) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to find "+ + "common ancestor: %v", err) + } + + currBestBlock, err = RewindChain(chainConn, txConfNotifier, + currBestBlock, startingHeight) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to "+ + "rewind chain: %v", err) + } + } + + // We want to start dispatching historical notifications from the block + // right after our best block, to avoid a redundant notification. + missedBlocks, err := getMissedBlocks(chainConn, startingHeight+1, newHeight) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to get missed "+ + "blocks: %v", err) + } + + return currBestBlock, missedBlocks, nil +} + // getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight) // fetched from the chain. func getMissedBlocks(chainConn ChainConn, startingHeight, @@ -393,8 +448,8 @@ func getMissedBlocks(chainConn ChainConn, startingHeight, for height := startingHeight; height < endingHeight; height++ { hash, err := chainConn.GetBlockHash(int64(height)) if err != nil { - return nil, fmt.Errorf("unable to find blockhash for height=%d: %v", - height, err) + return nil, fmt.Errorf("unable to find blockhash for "+ + "height=%d: %v", height, err) } missedBlocks = append(missedBlocks, BlockEpoch{Hash: hash, Height: height}) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 54459bca..07e297a6 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -245,7 +245,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // notification registrations, as well as notification dispatches. func (n *NeutrinoNotifier) notificationDispatcher() { defer n.wg.Done() - +out: for { select { case cancelMsg := <-n.notificationCancels: @@ -396,24 +396,61 @@ func (n *NeutrinoNotifier) notificationDispatcher() { update := item.(*filteredBlock) if update.connect { n.heightMtx.Lock() + // Since neutrino has no way of knowing what + // height to rewind to in the case of a reorged + // best known height, there is no point in + // checking that the previous hash matches the + // the hash from our best known height the way + // the other notifiers do when they receive + // a new connected block. Therefore, we just + // compare the heights. if update.height != n.bestHeight+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - n.bestHeight, update.height) - n.heightMtx.Unlock() - continue + // Handle the case where the notifier + // missed some blocks from its chain + // backend + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + bestBlock := chainntnfs.BlockEpoch{ + Height: int32(n.bestHeight), + Hash: nil, + } + _, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + n.chainConn, + n.txConfNotifier, + bestBlock, + int32(update.height), + false, + ) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue + } + + for _, block := range missedBlocks { + filteredBlock, err := + n.getFilteredBlock(block) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue out + } + err = n.handleBlockConnected(filteredBlock) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue out + } + } + } - n.bestHeight = update.height - n.heightMtx.Unlock() - - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.height, update.hash) - err := n.handleBlockConnected(update) if err != nil { chainntnfs.Log.Error(err) } + n.heightMtx.Unlock() continue } @@ -429,7 +466,10 @@ func (n *NeutrinoNotifier) notificationDispatcher() { if err != nil { chainntnfs.Log.Errorf("Unable to fetch header"+ "for height %d: %v", n.bestHeight, err) + n.heightMtx.Unlock() + continue } + hash := header.BlockHash() notifierBestBlock := chainntnfs.BlockEpoch{ Height: int32(n.bestHeight), From b3a5b3b5761ed2eb43803dda2b694a3ded624662 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:30 -0700 Subject: [PATCH 10/13] chainntnfs: add ChainNotifier test interface for test methods TestChainNotifier wraps the ChainNotifier interface to allow adding additional testing methods with access to private fields in the notifiers. These testing methods are only compiled when the build tag "debug" is set. UnsafeStart allows starting a notifier with a specified best block. UnsafeStart is useful for the purpose of testing cases where a notifier's best block is out of date when it receives a new block. --- chainntnfs/bitcoindnotify/bitcoind_debug.go | 77 ++++++++++++++++ chainntnfs/btcdnotify/btcd_debug.go | 77 ++++++++++++++++ chainntnfs/interface_debug.go | 13 +++ chainntnfs/neutrinonotify/neutrino_debug.go | 99 +++++++++++++++++++++ 4 files changed, 266 insertions(+) create mode 100644 chainntnfs/bitcoindnotify/bitcoind_debug.go create mode 100644 chainntnfs/btcdnotify/btcd_debug.go create mode 100644 chainntnfs/interface_debug.go create mode 100644 chainntnfs/neutrinonotify/neutrino_debug.go diff --git a/chainntnfs/bitcoindnotify/bitcoind_debug.go b/chainntnfs/bitcoindnotify/bitcoind_debug.go new file mode 100644 index 00000000..33d1aa4d --- /dev/null +++ b/chainntnfs/bitcoindnotify/bitcoind_debug.go @@ -0,0 +1,77 @@ +package bitcoindnotify + +import ( + "fmt" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcwallet/chain" + "github.com/lightningnetwork/lnd/chainntnfs" +) + +// UnsafeStart starts the notifier with a specified best height and optional +// best hash. Its bestBlock and txConfNotifier are initialized with +// bestHeight and optionally bestHash. The parameter generateBlocks is +// necessary for the bitcoind notifier to ensure we drain all notifications up +// to syncHeight, since if they are generated ahead of UnsafeStart the chainConn +// may start up with an outdated best block and miss sending ntfns. Used for +// testing. +func (b *BitcoindNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash, + syncHeight int32, generateBlocks func() error) error { + + // Connect to bitcoind, and register for notifications on connected, + // and disconnected blocks. + if err := b.chainConn.Start(); err != nil { + return err + } + if err := b.chainConn.NotifyBlocks(); err != nil { + return err + } + + b.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(bestHeight), reorgSafetyLimit) + + if generateBlocks != nil { + // Ensure no block notifications are pending when we start the + // notification dispatcher goroutine. + + // First generate the blocks, then drain the notifications + // for the generated blocks. + if err := generateBlocks(); err != nil { + return err + } + + timeout := time.After(60 * time.Second) + loop: + for { + select { + case ntfn := <-b.chainConn.Notifications(): + switch update := ntfn.(type) { + case chain.BlockConnected: + if update.Height >= syncHeight { + break loop + } + } + case <-timeout: + return fmt.Errorf("unable to catch up to height %d", + syncHeight) + } + } + } + + // Run notificationDispatcher after setting the notifier's best block + // to avoid a race condition. + b.bestBlock = chainntnfs.BlockEpoch{Height: bestHeight, Hash: bestHash} + if bestHash == nil { + hash, err := b.chainConn.GetBlockHash(int64(bestHeight)) + if err != nil { + return err + } + b.bestBlock.Hash = hash + } + + b.wg.Add(1) + go b.notificationDispatcher() + + return nil +} diff --git a/chainntnfs/btcdnotify/btcd_debug.go b/chainntnfs/btcdnotify/btcd_debug.go new file mode 100644 index 00000000..8ddffc39 --- /dev/null +++ b/chainntnfs/btcdnotify/btcd_debug.go @@ -0,0 +1,77 @@ +package btcdnotify + +import ( + "fmt" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/chainntnfs" +) + +// UnsafeStart starts the notifier with a specified best height and optional +// best hash. Its bestBlock and txConfNotifier are initialized with +// bestHeight and optionally bestHash. The parameter generateBlocks is +// necessary for the bitcoind notifier to ensure we drain all notifications up +// to syncHeight, since if they are generated ahead of UnsafeStart the chainConn +// may start up with an outdated best block and miss sending ntfns. Used for +// testing. +func (b *BtcdNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash, + syncHeight int32, generateBlocks func() error) error { + + // Connect to btcd, and register for notifications on connected, and + // disconnected blocks. + if err := b.chainConn.Connect(20); err != nil { + return err + } + if err := b.chainConn.NotifyBlocks(); err != nil { + return err + } + + b.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(bestHeight), reorgSafetyLimit) + + b.chainUpdates.Start() + b.txUpdates.Start() + + if generateBlocks != nil { + // Ensure no block notifications are pending when we start the + // notification dispatcher goroutine. + + // First generate the blocks, then drain the notifications + // for the generated blocks. + if err := generateBlocks(); err != nil { + return err + } + + timeout := time.After(60 * time.Second) + loop: + for { + select { + case ntfn := <-b.chainUpdates.ChanOut(): + lastReceivedNtfn := ntfn.(*chainUpdate) + if lastReceivedNtfn.blockHeight >= syncHeight { + break loop + } + case <-timeout: + return fmt.Errorf("unable to catch up to height %d", + syncHeight) + } + } + } + + // Run notificationDispatcher after setting the notifier's best block + // to avoid a race condition. + b.bestBlock = chainntnfs.BlockEpoch{Height: bestHeight, Hash: bestHash} + if bestHash == nil { + hash, err := b.chainConn.GetBlockHash(int64(bestHeight)) + if err != nil { + return err + } + b.bestBlock.Hash = hash + } + + b.wg.Add(1) + go b.notificationDispatcher() + + return nil +} diff --git a/chainntnfs/interface_debug.go b/chainntnfs/interface_debug.go new file mode 100644 index 00000000..f3e6eaf4 --- /dev/null +++ b/chainntnfs/interface_debug.go @@ -0,0 +1,13 @@ +package chainntnfs + +import "github.com/btcsuite/btcd/chaincfg/chainhash" + +// TestChainNotifier enables the use of methods that are only present during +// testing for ChainNotifiers. +type TestChainNotifier interface { + ChainNotifier + + // UnsafeStart enables notifiers to start up with a specific best block. + // Used for testing. + UnsafeStart(int32, *chainhash.Hash, int32, func() error) error +} diff --git a/chainntnfs/neutrinonotify/neutrino_debug.go b/chainntnfs/neutrinonotify/neutrino_debug.go new file mode 100644 index 00000000..56724c4f --- /dev/null +++ b/chainntnfs/neutrinonotify/neutrino_debug.go @@ -0,0 +1,99 @@ +package neutrinonotify + +import ( + "fmt" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/rpcclient" + "github.com/btcsuite/btcwallet/waddrmgr" + "github.com/lightninglabs/neutrino" + "github.com/lightningnetwork/lnd/chainntnfs" +) + +// UnsafeStart starts the notifier with a specified best height and optional +// best hash. Its bestHeight, txConfNotifier and neutrino node are initialized +// with bestHeight. The parameter generateBlocks is necessary for the +// bitcoind notifier to ensure we drain all notifications up to syncHeight, +// since if they are generated ahead of UnsafeStart the chainConn may start +// up with an outdated best block and miss sending ntfns. Used for testing. +func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash, + syncHeight int32, generateBlocks func() error) error { + + // We'll obtain the latest block height of the p2p node. We'll + // start the auto-rescan from this point. Once a caller actually wishes + // to register a chain view, the rescan state will be rewound + // accordingly. + header, height, err := n.p2pNode.BlockHeaders.ChainTip() + if err != nil { + return err + } + startingPoint := &waddrmgr.BlockStamp{ + Height: int32(height), + Hash: header.BlockHash(), + } + + // Next, we'll create our set of rescan options. Currently it's + // required that a user MUST set an addr/outpoint/txid when creating a + // rescan. To get around this, we'll add a "zero" outpoint, that won't + // actually be matched. + var zeroInput neutrino.InputWithScript + rescanOptions := []neutrino.RescanOption{ + neutrino.StartBlock(startingPoint), + neutrino.QuitChan(n.quit), + neutrino.NotificationHandlers( + rpcclient.NotificationHandlers{ + OnFilteredBlockConnected: n.onFilteredBlockConnected, + OnFilteredBlockDisconnected: n.onFilteredBlockDisconnected, + }, + ), + neutrino.WatchInputs(zeroInput), + } + + n.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(bestHeight), reorgSafetyLimit) + + n.chainConn = &NeutrinoChainConn{n.p2pNode} + + // Finally, we'll create our rescan struct, start it, and launch all + // the goroutines we need to operate this ChainNotifier instance. + n.chainView = n.p2pNode.NewRescan(rescanOptions...) + n.rescanErr = n.chainView.Start() + + n.chainUpdates.Start() + + if generateBlocks != nil { + // Ensure no block notifications are pending when we start the + // notification dispatcher goroutine. + + // First generate the blocks, then drain the notifications + // for the generated blocks. + if err := generateBlocks(); err != nil { + return err + } + + timeout := time.After(60 * time.Second) + loop: + for { + select { + case ntfn := <-n.chainUpdates.ChanOut(): + lastReceivedNtfn := ntfn.(*filteredBlock) + if lastReceivedNtfn.height >= uint32(syncHeight) { + break loop + } + case <-timeout: + return fmt.Errorf("unable to catch up to height %d", + syncHeight) + } + } + } + + // Run notificationDispatcher after setting the notifier's best height + // to avoid a race condition. + n.bestHeight = uint32(bestHeight) + + n.wg.Add(1) + go n.notificationDispatcher() + + return nil +} From f6ba64056a6fb781530705d16570bcd451520c1d Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:30 -0700 Subject: [PATCH 11/13] chainntnfs/interface_test: utilize TestChainNotifier interface in tests Switches all ChainNotifier parameters to be TestChainNotifiers. This allows access to the extra testing methods provided by the TestChainNotifier interface. --- chainntnfs/interface_test.go | 61 ++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index d408a66a..9eca4758 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -29,15 +29,15 @@ import ( // Required to auto-register the bitcoind backed ChainNotifier // implementation. - _ "github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify" + "github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify" // Required to auto-register the btcd backed ChainNotifier // implementation. - _ "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify" + "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify" // Required to auto-register the neutrino backed ChainNotifier // implementation. - _ "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify" + "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify" // Required to register the boltdb walletdb implementation. _ "github.com/btcsuite/btcwallet/walletdb/bdb" @@ -113,7 +113,7 @@ func waitForMempoolTx(r *rpctest.Harness, txid *chainhash.Hash) error { } func testSingleConfirmationNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the case of being notified once a txid reaches // a *single* confirmation. @@ -184,7 +184,7 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, } func testMultiConfirmationNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the case of being notified once a txid reaches // N confirmations, where N > 1. @@ -232,7 +232,7 @@ func testMultiConfirmationNotification(miner *rpctest.Harness, } func testBatchConfirmationNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test a case of serving notifications to multiple // clients, each requesting to be notified once a txid receives @@ -400,7 +400,7 @@ func checkNotificationFields(ntfn *chainntnfs.SpendDetail, } func testSpendNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the spend notifications for all ChainNotifier // concrete implementations. @@ -512,7 +512,7 @@ func testSpendNotification(miner *rpctest.Harness, } func testBlockEpochNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the case of multiple registered clients receiving // block epoch notifications. @@ -560,7 +560,7 @@ func testBlockEpochNotification(miner *rpctest.Harness, } func testMultiClientConfirmationNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the case of a multiple clients registered to // receive a confirmation notification for the same transaction. @@ -626,7 +626,7 @@ func testMultiClientConfirmationNotification(miner *rpctest.Harness, // transaction that has already been included in a block. In this case, the // confirmation notification should be dispatched immediately. func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // First, let's send some coins to "ourself", obtaining a txid. We're // spending from a coinbase output here, so we use the dedicated @@ -786,7 +786,7 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, // checking for a confirmation. This should not cause the notifier to stop // working func testLazyNtfnConsumer(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // Create a transaction to be notified about. We'll register for // notifications on this transaction but won't be prompt in checking them @@ -877,7 +877,7 @@ func testLazyNtfnConsumer(miner *rpctest.Harness, // has already been included in a block. In this case, the spend notification // should be dispatched immediately. func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the spend notifications for all ChainNotifier // concrete implementations. @@ -981,7 +981,7 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, } func testCancelSpendNtfn(node *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test that once a spend notification is registered, it // can be cancelled before the notification is dispatched. @@ -1072,7 +1072,7 @@ func testCancelSpendNtfn(node *rpctest.Harness, } } -func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, +func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to ensure that once a client cancels their block epoch @@ -1122,7 +1122,7 @@ func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifie } } -func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, +func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.TestChainNotifier, t *testing.T) { // Set up a new miner that we can use to cause a reorg. @@ -1277,7 +1277,7 @@ func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, type testCase struct { name string - test func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T) + test func(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier, t *testing.T) } var ntfnTests = []testCase{ @@ -1361,8 +1361,10 @@ func TestInterfaces(t *testing.T) { log.Printf("Running %v ChainNotifier interface tests\n", len(ntfnTests)) var ( - notifier chainntnfs.ChainNotifier + notifier chainntnfs.TestChainNotifier cleanUp func() + + newNotifier func() (chainntnfs.TestChainNotifier, error) ) for _, notifierDriver := range chainntnfs.RegisteredNotifiers() { notifierType := notifierDriver.NotifierType @@ -1430,18 +1432,15 @@ func TestInterfaces(t *testing.T) { } cleanUp = cleanUp3 - notifier, err = notifierDriver.New(chainConn) - if err != nil { - t.Fatalf("unable to create %v notifier: %v", - notifierType, err) + newNotifier = func() (chainntnfs.TestChainNotifier, error) { + return bitcoindnotify.New(chainConn), nil } case "btcd": - notifier, err = notifierDriver.New(&rpcConfig) - if err != nil { - t.Fatalf("unable to create %v notifier: %v", - notifierType, err) + newNotifier = func() (chainntnfs.TestChainNotifier, error) { + return btcdnotify.New(&rpcConfig) } + cleanUp = func() {} case "neutrino": @@ -1481,16 +1480,18 @@ func TestInterfaces(t *testing.T) { for !spvNode.IsCurrent() { time.Sleep(time.Millisecond * 100) } - - notifier, err = notifierDriver.New(spvNode) - if err != nil { - t.Fatalf("unable to create %v notifier: %v", - notifierType, err) + newNotifier = func() (chainntnfs.TestChainNotifier, error) { + return neutrinonotify.New(spvNode) } } t.Logf("Running ChainNotifier interface tests for: %v", notifierType) + notifier, err = newNotifier() + if err != nil { + t.Fatalf("unable to create %v notifier: %v", notifierType, err) + } + if err := notifier.Start(); err != nil { t.Fatalf("unable to start notifier %v: %v", notifierType, err) From dea22eb8748ff1f7e615462c2eabdb7e0487aad0 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:30 -0700 Subject: [PATCH 12/13] chainntnfs/interface_test: add test for client catchup on block ntfns This tests the case where a client registers for block notifications with an outdated best block, to ensure that the client is properly caught up on the blocks that it has missed. --- chainntnfs/interface_test.go | 129 +++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 9eca4758..2c928f27 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -1274,12 +1274,111 @@ func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.TestChainNotifier } } +// testCatchUpClientOnMissedBlocks tests the case of multiple registered client +// receiving historical block epoch notifications due to their best known block +// being out of date. +func testCatchUpClientOnMissedBlocks(miner *rpctest.Harness, + notifier chainntnfs.TestChainNotifier, t *testing.T) { + + const numBlocks = 10 + const numClients = 5 + var wg sync.WaitGroup + + outdatedHash, outdatedHeight, err := miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to retrieve current height: %v", err) + } + + // This function is used by UnsafeStart to ensure all notifications + // are fully drained before clients register for notifications. + generateBlocks := func() error { + _, err = miner.Node.Generate(numBlocks) + return err + } + + // We want to ensure that when a client registers for block notifications, + // the notifier's best block is at the tip of the chain. If it isn't, the + // client may not receive all historical notifications. + bestHeight := outdatedHeight + numBlocks + if err := notifier.UnsafeStart( + bestHeight, nil, bestHeight, generateBlocks); err != nil { + + t.Fatalf("Unable to unsafe start the notifier: %v", err) + } + + // Create numClients clients whose best known block is 10 blocks behind + // the tip of the chain. We expect each client to receive numBlocks + // notifications, 1 for each block they're behind. + clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients) + outdatedBlock := &chainntnfs.BlockEpoch{ + Height: outdatedHeight, Hash: outdatedHash, + } + for i := 0; i < numClients; i++ { + epochClient, err := notifier.RegisterBlockEpochNtfn(outdatedBlock) + if err != nil { + t.Fatalf("unable to register for epoch notification: %v", err) + } + clients = append(clients, epochClient) + } + for expectedHeight := outdatedHeight + 1; expectedHeight <= + bestHeight; expectedHeight++ { + + for _, epochClient := range clients { + select { + case block := <-epochClient.Epochs: + if block.Height != expectedHeight { + t.Fatalf("received block of height: %d, "+ + "expected: %d", block.Height, + expectedHeight) + } + case <-time.After(20 * time.Second): + t.Fatalf("did not receive historical notification "+ + "for height %d", expectedHeight) + } + + } + } + + // Finally, ensure that an extra block notification wasn't received. + anyExtras := make(chan struct{}, len(clients)) + for _, epochClient := range clients { + wg.Add(1) + go func(epochClient *chainntnfs.BlockEpochEvent) { + defer wg.Done() + select { + case <-epochClient.Epochs: + anyExtras <- struct{}{} + case <-time.After(5 * time.Second): + } + }(epochClient) + } + + wg.Wait() + close(anyExtras) + + var extraCount int + for range anyExtras { + extraCount++ + } + + if extraCount > 0 { + t.Fatalf("received %d unexpected block notification", extraCount) + } +} + type testCase struct { name string test func(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier, t *testing.T) } +type blockCatchupTestCase struct { + name string + + test func(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier, + t *testing.T) +} + var ntfnTests = []testCase{ { name: "single conf ntfn", @@ -1331,6 +1430,13 @@ var ntfnTests = []testCase{ }, } +var blockCatchupTests = []blockCatchupTestCase{ + { + name: "catch up client on historical block epoch ntfns", + test: testCatchUpClientOnMissedBlocks, + }, +} + // TestInterfaces tests all registered interfaces with a unified set of tests // which exercise each of the required methods found within the ChainNotifier // interface. @@ -1511,6 +1617,29 @@ func TestInterfaces(t *testing.T) { } notifier.Stop() + + // Run catchup tests separately since they require + // restarting the notifier every time. + for _, blockCatchupTest := range blockCatchupTests { + notifier, err = newNotifier() + if err != nil { + t.Fatalf("unable to create %v notifier: %v", + notifierType, err) + } + testName := fmt.Sprintf("%v: %v", notifierType, + blockCatchupTest.name) + + success := t.Run(testName, func(t *testing.T) { + blockCatchupTest.test(miner, notifier, t) + }) + + notifier.Stop() + + if !success { + break + } + } + if cleanUp != nil { cleanUp() } From fb7deac898bed4d2dd1b96a0ccb2eaa55cc555b0 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:31 -0700 Subject: [PATCH 13/13] chaintnfs/interface_test: test notifiers handling missed blocks Tests for the case where a chain backend skips a series of blocks, such that the notifier's best block is out of date. Also tests the case where a notifier's best block has been reorged out of the chain. --- chainntnfs/interface_test.go | 269 +++++++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 2c928f27..dade94ed 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -1366,6 +1366,266 @@ func testCatchUpClientOnMissedBlocks(miner *rpctest.Harness, } } +// testCatchUpOnMissedBlocks the case of multiple registered clients receiving +// historical block epoch notifications due to the notifier's best known block +// being out of date. +func testCatchUpOnMissedBlocks(miner *rpctest.Harness, + notifier chainntnfs.TestChainNotifier, t *testing.T) { + + const numBlocks = 10 + const numClients = 5 + var wg sync.WaitGroup + + _, bestHeight, err := miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + // This function is used by UnsafeStart to ensure all notifications + // are fully drained before clients register for notifications. + generateBlocks := func() error { + _, err = miner.Node.Generate(numBlocks) + return err + } + + // Next, start the notifier with outdated best block information. + if err := notifier.UnsafeStart(bestHeight, + nil, bestHeight+numBlocks, generateBlocks); err != nil { + + t.Fatalf("Unable to unsafe start the notifier: %v", err) + } + + // Create numClients clients who will listen for block notifications. + clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients) + for i := 0; i < numClients; i++ { + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + t.Fatalf("unable to register for epoch notification: %v", err) + } + clients = append(clients, epochClient) + } + + // Generate a single block to trigger the backlog of historical + // notifications for the previously mined blocks. + if _, err := miner.Node.Generate(1); err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + + // We expect each client to receive numBlocks + 1 notifications, 1 for + // each block that the notifier has missed out on. + for expectedHeight := bestHeight + 1; expectedHeight <= + bestHeight+numBlocks+1; expectedHeight++ { + + for _, epochClient := range clients { + select { + case block := <-epochClient.Epochs: + if block.Height != expectedHeight { + t.Fatalf("received block of height: %d, "+ + "expected: %d", block.Height, + expectedHeight) + } + case <-time.After(20 * time.Second): + t.Fatalf("did not receive historical notification "+ + "for height %d", expectedHeight) + } + } + } + + // Finally, ensure that an extra block notification wasn't received. + anyExtras := make(chan struct{}, len(clients)) + for _, epochClient := range clients { + wg.Add(1) + go func(epochClient *chainntnfs.BlockEpochEvent) { + defer wg.Done() + select { + case <-epochClient.Epochs: + anyExtras <- struct{}{} + case <-time.After(5 * time.Second): + } + }(epochClient) + } + + wg.Wait() + close(anyExtras) + + var extraCount int + for range anyExtras { + extraCount++ + } + + if extraCount > 0 { + t.Fatalf("received %d unexpected block notification", extraCount) + } +} + +// testCatchUpOnMissedBlocks tests that a client will still receive all valid +// block notifications in the case where a notifier's best block has been reorged +// out of the chain. +func testCatchUpOnMissedBlocksWithReorg(miner1 *rpctest.Harness, + notifier chainntnfs.TestChainNotifier, t *testing.T) { + + const numBlocks = 10 + const numClients = 5 + var wg sync.WaitGroup + + // Set up a new miner that we can use to cause a reorg. + miner2, err := rpctest.New(netParams, nil, nil) + if err != nil { + t.Fatalf("unable to create mining node: %v", err) + } + if err := miner2.SetUp(false, 0); err != nil { + t.Fatalf("unable to set up mining node: %v", err) + } + defer miner2.TearDown() + + // We start by connecting the new miner to our original miner, + // such that it will sync to our original chain. + if err := rpctest.ConnectNode(miner1, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice := []*rpctest.Harness{miner1, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + // The two should be on the same blockheight. + _, nodeHeight1, err := miner1.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + _, nodeHeight2, err := miner2.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + if nodeHeight1 != nodeHeight2 { + t.Fatalf("expected both miners to be on the same height: %v vs %v", + nodeHeight1, nodeHeight2) + } + + // We disconnect the two nodes, such that we can start mining on them + // individually without the other one learning about the new blocks. + err = miner1.Node.AddNode(miner2.P2PAddress(), rpcclient.ANRemove) + if err != nil { + t.Fatalf("unable to remove node: %v", err) + } + + // Now mine on each chain separately + blocks, err := miner1.Node.Generate(numBlocks) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // We generate an extra block on miner 2's chain to ensure it is the + // longer chain. + _, err = miner2.Node.Generate(numBlocks + 1) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // Sync the two chains to ensure they will sync to miner2's chain. + if err := rpctest.ConnectNode(miner1, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice = []*rpctest.Harness{miner1, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + // Next, start the notifier with outdated best block information. + // We set the notifier's best block to be the last block mined on the + // shorter chain, to test that the notifier correctly rewinds to + // the common ancestor between the two chains. + syncHeight := nodeHeight1 + numBlocks + 1 + if err := notifier.UnsafeStart(nodeHeight1+numBlocks, + blocks[numBlocks-1], syncHeight, nil); err != nil { + + t.Fatalf("Unable to unsafe start the notifier: %v", err) + } + + // Create numClients clients who will listen for block notifications. + clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients) + for i := 0; i < numClients; i++ { + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + t.Fatalf("unable to register for epoch notification: %v", err) + } + clients = append(clients, epochClient) + } + + // Generate a single block, which should trigger the notifier to rewind + // to the common ancestor and dispatch notifications from there. + _, err = miner2.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // If the chain backend to the notifier stores information about reorged + // blocks, the notifier is able to rewind the chain to the common + // ancestor between the chain tip and its outdated best known block. + // In this case, the client is expected to receive numBlocks + 2 + // notifications, 1 for each block the notifier has missed out on from + // the longer chain. + // + // If the chain backend does not store information about reorged blocks, + // the notifier has no way of knowing where to rewind to and therefore + // the client is only expected to receive notifications for blocks + // whose height is greater than the notifier's best known height: 2 + // notifications, in this case. + var startingHeight int32 + switch notifier.(type) { + case *neutrinonotify.NeutrinoNotifier: + startingHeight = nodeHeight1 + numBlocks + 1 + default: + startingHeight = nodeHeight1 + 1 + } + + for expectedHeight := startingHeight; expectedHeight <= + nodeHeight1+numBlocks+2; expectedHeight++ { + + for _, epochClient := range clients { + select { + case block := <-epochClient.Epochs: + if block.Height != expectedHeight { + t.Fatalf("received block of height: %d, "+ + "expected: %d", block.Height, + expectedHeight) + } + case <-time.After(20 * time.Second): + t.Fatalf("did not receive historical notification "+ + "for height %d", expectedHeight) + } + } + } + + // Finally, ensure that an extra block notification wasn't received. + anyExtras := make(chan struct{}, len(clients)) + for _, epochClient := range clients { + wg.Add(1) + go func(epochClient *chainntnfs.BlockEpochEvent) { + defer wg.Done() + select { + case <-epochClient.Epochs: + anyExtras <- struct{}{} + case <-time.After(5 * time.Second): + } + }(epochClient) + } + + wg.Wait() + close(anyExtras) + + var extraCount int + for range anyExtras { + extraCount++ + } + + if extraCount > 0 { + t.Fatalf("received %d unexpected block notification", extraCount) + } +} + type testCase struct { name string @@ -1435,6 +1695,14 @@ var blockCatchupTests = []blockCatchupTestCase{ name: "catch up client on historical block epoch ntfns", test: testCatchUpClientOnMissedBlocks, }, + { + name: "test catch up on missed blocks", + test: testCatchUpOnMissedBlocks, + }, + { + name: "test catch up on missed blocks w/ reorged best block", + test: testCatchUpOnMissedBlocksWithReorg, + }, } // TestInterfaces tests all registered interfaces with a unified set of tests @@ -1626,6 +1894,7 @@ func TestInterfaces(t *testing.T) { t.Fatalf("unable to create %v notifier: %v", notifierType, err) } + testName := fmt.Sprintf("%v: %v", notifierType, blockCatchupTest.name)