diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index b3e5f7c7..21140994 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -74,6 +74,8 @@ type BitcoindNotifier struct { blockEpochClients map[uint64]*blockEpochRegistration + bestBlock chainntnfs.BlockEpoch + wg sync.WaitGroup quit chan struct{} } @@ -119,7 +121,7 @@ func (b *BitcoindNotifier) Start() error { return err } - _, currentHeight, err := b.chainConn.GetBestBlock() + currentHash, currentHeight, err := b.chainConn.GetBestBlock() if err != nil { return err } @@ -127,8 +129,13 @@ func (b *BitcoindNotifier) Start() error { b.txConfNotifier = chainntnfs.NewTxConfNotifier( uint32(currentHeight), reorgSafetyLimit) + b.bestBlock = chainntnfs.BlockEpoch{ + Height: currentHeight, + Hash: currentHash, + } + b.wg.Add(1) - go b.notificationDispatcher(currentHeight) + go b.notificationDispatcher() return nil } @@ -174,7 +181,7 @@ type blockNtfn struct { // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. -func (b *BitcoindNotifier) notificationDispatcher(bestHeight int32) { +func (b *BitcoindNotifier) notificationDispatcher() { out: for { select { @@ -235,7 +242,7 @@ out: "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - currentHeight := uint32(bestHeight) + currentHeight := uint32(b.bestBlock.Height) // Look up whether the transaction is already // included in the active chain. We'll do this @@ -268,63 +275,101 @@ out: case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + if msg.bestBlock != nil { + missedBlocks, err := + chainntnfs.GetClientMissedBlocks( + b.chainConn, msg.bestBlock, + b.bestBlock.Height, true, + ) + if err != nil { + msg.errorChan <- err + continue + } + for _, block := range missedBlocks { + b.notifyBlockEpochClient(msg, + block.Height, block.Hash) + } + } + msg.errorChan <- nil case chain.RelevantTx: - b.handleRelevantTx(msg, bestHeight) + b.handleRelevantTx(msg, b.bestBlock.Height) } case ntfn := <-b.chainConn.Notifications(): switch item := ntfn.(type) { case chain.BlockConnected: - if item.Height != bestHeight+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - bestHeight, item.Height) - continue - } - bestHeight = item.Height - - rawBlock, err := b.chainConn.GetBlock(&item.Hash) + blockHeader, err := + b.chainConn.GetBlockHeader(&item.Hash) if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) + chainntnfs.Log.Errorf("Unable to fetch "+ + "block header: %v", err) continue } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - item.Height, item.Hash) + if blockHeader.PrevBlock != *b.bestBlock.Hash { + // Handle the case where the notifier + // missed some blocks from its chain + // backend. + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + newBestBlock, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + b.chainConn, + b.txConfNotifier, + b.bestBlock, item.Height, + true, + ) - b.notifyBlockEpochs(item.Height, &item.Hash) + if err != nil { + // Set the bestBlock here in case + // a catch up partially completed. + b.bestBlock = newBestBlock + chainntnfs.Log.Error(err) + continue + } - txns := btcutil.NewBlock(rawBlock).Transactions() - err = b.txConfNotifier.ConnectTip(&item.Hash, - uint32(item.Height), txns) - if err != nil { + for _, block := range missedBlocks { + err := b.handleBlockConnected(block) + if err != nil { + chainntnfs.Log.Error(err) + continue out + } + } + } + + newBlock := chainntnfs.BlockEpoch{ + Height: item.Height, + Hash: &item.Hash, + } + if err := b.handleBlockConnected(newBlock); err != nil { chainntnfs.Log.Error(err) } + continue case chain.BlockDisconnected: - if item.Height != bestHeight { - chainntnfs.Log.Warnf("Received blocks "+ - "out of order: current height="+ - "%d, disconnected height=%d", - bestHeight, item.Height) - continue + if item.Height != b.bestBlock.Height { + chainntnfs.Log.Infof("Missed disconnected" + + "blocks, attempting to catch up") } - bestHeight = item.Height - 1 - chainntnfs.Log.Infof("Block disconnected from "+ - "main chain: height=%v, sha=%v", - item.Height, item.Hash) - - err := b.txConfNotifier.DisconnectTip( - uint32(item.Height)) + newBestBlock, err := chainntnfs.RewindChain( + b.chainConn, b.txConfNotifier, + b.bestBlock, item.Height-1, + ) if err != nil { - chainntnfs.Log.Error(err) + chainntnfs.Log.Errorf("Unable to rewind chain "+ + "from height %d to height %d: %v", + b.bestBlock.Height, item.Height-1, err) } + // Set the bestBlock here in case a chain + // rewind partially completed. + b.bestBlock = newBestBlock + case chain.RelevantTx: - b.handleRelevantTx(item, bestHeight) + b.handleRelevantTx(item, b.bestBlock.Height) } case <-b.quit: @@ -517,23 +562,57 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash, return nil, nil } +// handleBlockConnected applies a chain update for a new block. Any watched +// transactions included this block will processed to either send notifications +// now or after numConfirmations confs. +func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) error { + rawBlock, err := b.chainConn.GetBlock(block.Hash) + if err != nil { + return fmt.Errorf("unable to get block: %v", err) + } + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + block.Height, block.Hash) + + txns := btcutil.NewBlock(rawBlock).Transactions() + err = b.txConfNotifier.ConnectTip( + block.Hash, uint32(block.Height), txns) + if err != nil { + return fmt.Errorf("unable to connect tip: %v", err) + } + + // We want to set the best block before dispatching notifications so + // if any subscribers make queries based on their received block epoch, + // our state is fully updated in time. + b.bestBlock = block + + b.notifyBlockEpochs(block.Height, block.Hash) + + return nil +} + // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + for _, client := range b.blockEpochClients { + b.notifyBlockEpochClient(client, newHeight, newSha) + } +} + +// notifyBlockEpochClient sends a registered block epoch client a notification +// about a specific block. +func (b *BitcoindNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration, + height int32, sha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ - Height: newHeight, - Hash: newSha, + Height: height, + Hash: sha, } - for _, epochClient := range b.blockEpochClients { - select { - - case epochClient.epochQueue.ChanIn() <- epoch: - - case <-epochClient.cancelChan: - - case <-b.quit: - } + select { + case epochClient.epochQueue.ChanIn() <- epoch: + case <-epochClient.cancelChan: + case <-b.quit: } } @@ -805,6 +884,10 @@ type blockEpochRegistration struct { epochQueue *chainntnfs.ConcurrentQueue + bestBlock *chainntnfs.BlockEpoch + + errorChan chan error + cancelChan chan struct{} wg sync.WaitGroup @@ -818,13 +901,18 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notifications, of each new block connected to the main -// chain. -func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +// chain. Clients have the option of passing in their best known block, which +// the notifier uses to check if they are behind on blocks and catch them up. +func (b *BitcoindNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + reg := &blockEpochRegistration{ epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), + bestBlock: bestBlock, + errorChan: make(chan error, 1), } reg.epochQueue.Start() diff --git a/chainntnfs/bitcoindnotify/bitcoind_debug.go b/chainntnfs/bitcoindnotify/bitcoind_debug.go new file mode 100644 index 00000000..33d1aa4d --- /dev/null +++ b/chainntnfs/bitcoindnotify/bitcoind_debug.go @@ -0,0 +1,77 @@ +package bitcoindnotify + +import ( + "fmt" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcwallet/chain" + "github.com/lightningnetwork/lnd/chainntnfs" +) + +// UnsafeStart starts the notifier with a specified best height and optional +// best hash. Its bestBlock and txConfNotifier are initialized with +// bestHeight and optionally bestHash. The parameter generateBlocks is +// necessary for the bitcoind notifier to ensure we drain all notifications up +// to syncHeight, since if they are generated ahead of UnsafeStart the chainConn +// may start up with an outdated best block and miss sending ntfns. Used for +// testing. +func (b *BitcoindNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash, + syncHeight int32, generateBlocks func() error) error { + + // Connect to bitcoind, and register for notifications on connected, + // and disconnected blocks. + if err := b.chainConn.Start(); err != nil { + return err + } + if err := b.chainConn.NotifyBlocks(); err != nil { + return err + } + + b.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(bestHeight), reorgSafetyLimit) + + if generateBlocks != nil { + // Ensure no block notifications are pending when we start the + // notification dispatcher goroutine. + + // First generate the blocks, then drain the notifications + // for the generated blocks. + if err := generateBlocks(); err != nil { + return err + } + + timeout := time.After(60 * time.Second) + loop: + for { + select { + case ntfn := <-b.chainConn.Notifications(): + switch update := ntfn.(type) { + case chain.BlockConnected: + if update.Height >= syncHeight { + break loop + } + } + case <-timeout: + return fmt.Errorf("unable to catch up to height %d", + syncHeight) + } + } + } + + // Run notificationDispatcher after setting the notifier's best block + // to avoid a race condition. + b.bestBlock = chainntnfs.BlockEpoch{Height: bestHeight, Hash: bestHash} + if bestHash == nil { + hash, err := b.chainConn.GetBlockHash(int64(bestHeight)) + if err != nil { + return err + } + b.bestBlock.Hash = hash + } + + b.wg.Add(1) + go b.notificationDispatcher() + + return nil +} diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 2fb2f26c..c7cb6995 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -16,7 +16,6 @@ import ( ) const ( - // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "btcd" @@ -79,6 +78,8 @@ type BtcdNotifier struct { blockEpochClients map[uint64]*blockEpochRegistration + bestBlock chainntnfs.BlockEpoch + chainUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue @@ -143,7 +144,7 @@ func (b *BtcdNotifier) Start() error { return err } - _, currentHeight, err := b.chainConn.GetBestBlock() + currentHash, currentHeight, err := b.chainConn.GetBestBlock() if err != nil { return err } @@ -151,11 +152,16 @@ func (b *BtcdNotifier) Start() error { b.txConfNotifier = chainntnfs.NewTxConfNotifier( uint32(currentHeight), reorgSafetyLimit) + b.bestBlock = chainntnfs.BlockEpoch{ + Height: currentHeight, + Hash: currentHash, + } + b.chainUpdates.Start() b.txUpdates.Start() b.wg.Add(1) - go b.notificationDispatcher(currentHeight) + go b.notificationDispatcher() return nil } @@ -245,7 +251,7 @@ func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetai // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. -func (b *BtcdNotifier) notificationDispatcher(currentHeight int32) { +func (b *BtcdNotifier) notificationDispatcher() { out: for { select { @@ -305,7 +311,7 @@ out: "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - bestHeight := uint32(currentHeight) + bestHeight := uint32(b.bestBlock.Height) // Look up whether the transaction is already // included in the active chain. We'll do this @@ -338,60 +344,96 @@ out: case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + if msg.bestBlock != nil { + missedBlocks, err := + chainntnfs.GetClientMissedBlocks( + b.chainConn, msg.bestBlock, + b.bestBlock.Height, true, + ) + if err != nil { + msg.errorChan <- err + continue + } + for _, block := range missedBlocks { + b.notifyBlockEpochClient(msg, + block.Height, block.Hash) + } + + } + msg.errorChan <- nil } case item := <-b.chainUpdates.ChanOut(): update := item.(*chainUpdate) if update.connect { - if update.blockHeight != currentHeight+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - currentHeight, update.blockHeight) - continue - } - - currentHeight = update.blockHeight - - rawBlock, err := b.chainConn.GetBlock(update.blockHash) + blockHeader, err := + b.chainConn.GetBlockHeader(update.blockHash) if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) + chainntnfs.Log.Errorf("Unable to fetch "+ + "block header: %v", err) continue } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.blockHeight, update.blockHash) + if blockHeader.PrevBlock != *b.bestBlock.Hash { + // Handle the case where the notifier + // missed some blocks from its chain + // backend + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + newBestBlock, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + b.chainConn, + b.txConfNotifier, + b.bestBlock, + update.blockHeight, + true, + ) + if err != nil { + // Set the bestBlock here in case + // a catch up partially completed. + b.bestBlock = newBestBlock + chainntnfs.Log.Error(err) + continue + } - txns := btcutil.NewBlock(rawBlock).Transactions() - - block := &filteredBlock{ - hash: *update.blockHash, - height: uint32(update.blockHeight), - txns: txns, - connect: true, + for _, block := range missedBlocks { + err := b.handleBlockConnected(block) + if err != nil { + chainntnfs.Log.Error(err) + continue out + } + } } - if err := b.handleBlockConnected(block); err != nil { + + newBlock := chainntnfs.BlockEpoch{ + Height: update.blockHeight, + Hash: update.blockHash, + } + if err := b.handleBlockConnected(newBlock); err != nil { chainntnfs.Log.Error(err) } continue } - if update.blockHeight != currentHeight { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, disconnected height=%d", - currentHeight, update.blockHeight) - continue + if update.blockHeight != b.bestBlock.Height { + chainntnfs.Log.Infof("Missed disconnected" + + "blocks, attempting to catch up") } - currentHeight = update.blockHeight - 1 - - chainntnfs.Log.Infof("Block disconnected from main chain: "+ - "height=%v, sha=%v", update.blockHeight, update.blockHash) - - err := b.txConfNotifier.DisconnectTip(uint32(update.blockHeight)) + newBestBlock, err := chainntnfs.RewindChain( + b.chainConn, b.txConfNotifier, b.bestBlock, + update.blockHeight-1, + ) if err != nil { - chainntnfs.Log.Error(err) + chainntnfs.Log.Errorf("Unable to rewind chain "+ + "from height %d to height %d: %v", + b.bestBlock.Height, update.blockHeight-1, err) } + // Set the bestBlock here in case a chain rewind + // partially completed. + b.bestBlock = newBestBlock + // NOTE: we currently only use txUpdates for mempool spends and // rescan spends. It might get removed entirely in the future. case item := <-b.txUpdates.ChanOut(): @@ -590,17 +632,47 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, return nil, nil } -// handleBlocksConnected applies a chain update for a new block. Any watched +// handleBlockConnected applies a chain update for a new block. Any watched // transactions included this block will processed to either send notifications // now or after numConfirmations confs. // TODO(halseth): this is reusing the neutrino notifier implementation, unify // them. -func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { - // First we'll notify any subscribed clients of the block. +func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { + // First process the block for our internal state. A new block has + // been connected to the main chain. Send out any N confirmation + // notifications which may have been triggered by this new block. + rawBlock, err := b.chainConn.GetBlock(epoch.Hash) + if err != nil { + return fmt.Errorf("unable to get block: %v", err) + } + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + epoch.Height, epoch.Hash) + + txns := btcutil.NewBlock(rawBlock).Transactions() + + newBlock := &filteredBlock{ + hash: *epoch.Hash, + height: uint32(epoch.Height), + txns: txns, + connect: true, + } + err = b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, + newBlock.txns) + if err != nil { + return fmt.Errorf("unable to connect tip: %v", err) + } + + // We want to set the best block before dispatching notifications + // so if any subscribers make queries based on their received + // block epoch, our state is fully updated in time. + b.bestBlock = epoch + + // Next we'll notify any subscribed clients of the block. b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Next, we'll scan over the list of relevant transactions and possibly - // dispatch notifications for confirmations and spends. + // Finally, we'll scan over the list of relevant transactions and + // possibly dispatch notifications for confirmations and spends. for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -608,9 +680,10 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { for i, txIn := range mtx.TxIn { prevOut := txIn.PreviousOutPoint - // If this transaction indeed does spend an output which we have a - // registered notification for, then create a spend summary, finally - // sending off the details to the notification subscriber. + // If this transaction indeed does spend an output which + // we have a registered notification for, then create a + // spend summary, finally sending off the details to the + // notification subscriber. clients, ok := b.spendNotifications[prevOut] if !ok { continue @@ -629,9 +702,10 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { "outpoint=%v", ntfn.targetOutpoint) ntfn.spendChan <- spendDetails - // Close spendChan to ensure that any calls to Cancel will not - // block. This is safe to do since the channel is buffered, and - // the message can still be read by the receiver. + // Close spendChan to ensure that any calls to + // Cancel will not block. This is safe to do + // since the channel is buffered, and the + // message can still be read by the receiver. close(ntfn.spendChan) } @@ -639,31 +713,31 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { } } - // A new block has been connected to the main chain. - // Send out any N confirmation notifications which may - // have been triggered by this new block. - b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns) - return nil } // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + for _, client := range b.blockEpochClients { + b.notifyBlockEpochClient(client, newHeight, newSha) + } +} + +// notifyBlockEpochClient sends a registered block epoch client a notification +// about a specific block. +func (b *BtcdNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration, + height int32, sha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ - Height: newHeight, - Hash: newSha, + Height: height, + Hash: sha, } - for _, epochClient := range b.blockEpochClients { - select { - - case epochClient.epochQueue.ChanIn() <- epoch: - - case <-epochClient.cancelChan: - - case <-b.quit: - } + select { + case epochClient.epochQueue.ChanIn() <- epoch: + case <-epochClient.cancelChan: + case <-b.quit: } } @@ -857,6 +931,10 @@ type blockEpochRegistration struct { epochQueue *chainntnfs.ConcurrentQueue + bestBlock *chainntnfs.BlockEpoch + + errorChan chan error + cancelChan chan struct{} wg sync.WaitGroup @@ -870,13 +948,18 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notifications, of each new block connected to the main -// chain. -func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +// chain. Clients have the option of passing in their best known block, which +// the notifier uses to check if they are behind on blocks and catch them up. +func (b *BtcdNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + reg := &blockEpochRegistration{ epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), + bestBlock: bestBlock, + errorChan: make(chan error, 1), } reg.epochQueue.Start() diff --git a/chainntnfs/btcdnotify/btcd_debug.go b/chainntnfs/btcdnotify/btcd_debug.go new file mode 100644 index 00000000..8ddffc39 --- /dev/null +++ b/chainntnfs/btcdnotify/btcd_debug.go @@ -0,0 +1,77 @@ +package btcdnotify + +import ( + "fmt" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/chainntnfs" +) + +// UnsafeStart starts the notifier with a specified best height and optional +// best hash. Its bestBlock and txConfNotifier are initialized with +// bestHeight and optionally bestHash. The parameter generateBlocks is +// necessary for the bitcoind notifier to ensure we drain all notifications up +// to syncHeight, since if they are generated ahead of UnsafeStart the chainConn +// may start up with an outdated best block and miss sending ntfns. Used for +// testing. +func (b *BtcdNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash, + syncHeight int32, generateBlocks func() error) error { + + // Connect to btcd, and register for notifications on connected, and + // disconnected blocks. + if err := b.chainConn.Connect(20); err != nil { + return err + } + if err := b.chainConn.NotifyBlocks(); err != nil { + return err + } + + b.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(bestHeight), reorgSafetyLimit) + + b.chainUpdates.Start() + b.txUpdates.Start() + + if generateBlocks != nil { + // Ensure no block notifications are pending when we start the + // notification dispatcher goroutine. + + // First generate the blocks, then drain the notifications + // for the generated blocks. + if err := generateBlocks(); err != nil { + return err + } + + timeout := time.After(60 * time.Second) + loop: + for { + select { + case ntfn := <-b.chainUpdates.ChanOut(): + lastReceivedNtfn := ntfn.(*chainUpdate) + if lastReceivedNtfn.blockHeight >= syncHeight { + break loop + } + case <-timeout: + return fmt.Errorf("unable to catch up to height %d", + syncHeight) + } + } + } + + // Run notificationDispatcher after setting the notifier's best block + // to avoid a race condition. + b.bestBlock = chainntnfs.BlockEpoch{Height: bestHeight, Hash: bestHash} + if bestHash == nil { + hash, err := b.chainConn.GetBlockHash(int64(bestHeight)) + if err != nil { + return err + } + b.bestBlock.Hash = hash + } + + b.wg.Add(1) + go b.notificationDispatcher() + + return nil +} diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index ff90cc0a..7c7a2ec6 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" ) @@ -59,7 +60,12 @@ type ChainNotifier interface { // new block connected to the tip of the main chain. The returned // BlockEpochEvent struct contains a channel which will be sent upon // for each new block discovered. - RegisterBlockEpochNtfn() (*BlockEpochEvent, error) + // + // Clients have the option of passing in their best known block. + // If they specify a block, the ChainNotifier checks whether the client + // is behind on blocks. If they are, the ChainNotifier sends a backlog + // of block notifications for the missed blocks. + RegisterBlockEpochNtfn(*BlockEpoch) (*BlockEpochEvent, error) // Start the ChainNotifier. Once started, the implementation should be // ready, and able to receive notification registrations from clients. @@ -248,3 +254,206 @@ func SupportedNotifiers() []string { return supportedNotifiers } + +// ChainConn enables notifiers to pass in their chain backend to interface +// functions that require it. +type ChainConn interface { + // GetBlockHeader returns the block header for a hash. + GetBlockHeader(blockHash *chainhash.Hash) (*wire.BlockHeader, error) + + // GetBlockHeaderVerbose returns the verbose block header for a hash. + GetBlockHeaderVerbose(blockHash *chainhash.Hash) ( + *btcjson.GetBlockHeaderVerboseResult, error) + + // GetBlockHash returns the hash from a block height. + GetBlockHash(blockHeight int64) (*chainhash.Hash, error) +} + +// GetCommonBlockAncestorHeight takes in: +// (1) the hash of a block that has been reorged out of the main chain +// (2) the hash of the block of the same height from the main chain +// It returns the height of the nearest common ancestor between the two hashes, +// or an error +func GetCommonBlockAncestorHeight(chainConn ChainConn, reorgHash, + chainHash chainhash.Hash) (int32, error) { + + for reorgHash != chainHash { + reorgHeader, err := chainConn.GetBlockHeader(&reorgHash) + if err != nil { + return 0, fmt.Errorf("unable to get header for hash=%v: %v", + reorgHash, err) + } + chainHeader, err := chainConn.GetBlockHeader(&chainHash) + if err != nil { + return 0, fmt.Errorf("unable to get header for hash=%v: %v", + chainHash, err) + } + reorgHash = reorgHeader.PrevBlock + chainHash = chainHeader.PrevBlock + } + + verboseHeader, err := chainConn.GetBlockHeaderVerbose(&chainHash) + if err != nil { + return 0, fmt.Errorf("unable to get verbose header for hash=%v: %v", + chainHash, err) + } + + return verboseHeader.Height, nil +} + +// GetClientMissedBlocks uses a client's best block to determine what blocks +// it missed being notified about, and returns them in a slice. Its +// backendStoresReorgs parameter tells it whether or not the notifier's +// chainConn stores information about blocks that have been reorged out of the +// chain, which allows GetClientMissedBlocks to find out whether the client's +// best block has been reorged out of the chain, rewind to the common ancestor +// and return blocks starting right after the common ancestor. +func GetClientMissedBlocks(chainConn ChainConn, clientBestBlock *BlockEpoch, + notifierBestHeight int32, backendStoresReorgs bool) ([]BlockEpoch, error) { + + startingHeight := clientBestBlock.Height + if backendStoresReorgs { + // If a reorg causes the client's best hash to be incorrect, + // retrieve the closest common ancestor and dispatch + // notifications from there. + hashAtBestHeight, err := chainConn.GetBlockHash( + int64(clientBestBlock.Height)) + if err != nil { + return nil, fmt.Errorf("unable to find blockhash for "+ + "height=%d: %v", clientBestBlock.Height, err) + } + + startingHeight, err = GetCommonBlockAncestorHeight( + chainConn, *clientBestBlock.Hash, *hashAtBestHeight, + ) + if err != nil { + return nil, fmt.Errorf("unable to find common ancestor: "+ + "%v", err) + } + } + + // We want to start dispatching historical notifications from the block + // right after the client's best block, to avoid a redundant notification. + missedBlocks, err := getMissedBlocks( + chainConn, startingHeight+1, notifierBestHeight+1, + ) + if err != nil { + return nil, fmt.Errorf("unable to get missed blocks: %v", err) + } + + return missedBlocks, nil +} + +// RewindChain handles internal state updates for the notifier's TxConfNotifier +// It has no effect if given a height greater than or equal to our current best +// known height. It returns the new best block for the notifier. +func RewindChain(chainConn ChainConn, txConfNotifier *TxConfNotifier, + currBestBlock BlockEpoch, targetHeight int32) (BlockEpoch, error) { + + newBestBlock := BlockEpoch{ + Height: currBestBlock.Height, + Hash: currBestBlock.Hash, + } + + for height := currBestBlock.Height; height > targetHeight; height-- { + hash, err := chainConn.GetBlockHash(int64(height - 1)) + if err != nil { + return newBestBlock, fmt.Errorf("unable to "+ + "find blockhash for disconnected height=%d: %v", + height, err) + } + + Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", height, newBestBlock.Hash) + + err = txConfNotifier.DisconnectTip(uint32(height)) + if err != nil { + return newBestBlock, fmt.Errorf("unable to "+ + " disconnect tip for height=%d: %v", + height, err) + } + newBestBlock.Height = height - 1 + newBestBlock.Hash = hash + } + return newBestBlock, nil +} + +// HandleMissedBlocks is called when the chain backend for a notifier misses a +// series of blocks, handling a reorg if necessary. Its backendStoresReorgs +// parameter tells it whether or not the notifier's chainConn stores +// information about blocks that have been reorged out of the chain, which allows +// HandleMissedBlocks to check whether the notifier's best block has been +// reorged out, and rewind the chain accordingly. It returns the best block for +// the notifier and a slice of the missed blocks. The new best block needs to be +// returned in case a chain rewind occurs and partially completes before +// erroring. In the case where there is no rewind, the notifier's +// current best block is returned. +func HandleMissedBlocks(chainConn ChainConn, txConfNotifier *TxConfNotifier, + currBestBlock BlockEpoch, newHeight int32, + backendStoresReorgs bool) (BlockEpoch, []BlockEpoch, error) { + + startingHeight := currBestBlock.Height + + if backendStoresReorgs { + // If a reorg causes our best hash to be incorrect, rewind the + // chain so our best block is set to the closest common + // ancestor, then dispatch notifications from there. + hashAtBestHeight, err := + chainConn.GetBlockHash(int64(currBestBlock.Height)) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to find "+ + "blockhash for height=%d: %v", + currBestBlock.Height, err) + } + + startingHeight, err = GetCommonBlockAncestorHeight( + chainConn, *currBestBlock.Hash, *hashAtBestHeight, + ) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to find "+ + "common ancestor: %v", err) + } + + currBestBlock, err = RewindChain(chainConn, txConfNotifier, + currBestBlock, startingHeight) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to "+ + "rewind chain: %v", err) + } + } + + // We want to start dispatching historical notifications from the block + // right after our best block, to avoid a redundant notification. + missedBlocks, err := getMissedBlocks(chainConn, startingHeight+1, newHeight) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to get missed "+ + "blocks: %v", err) + } + + return currBestBlock, missedBlocks, nil +} + +// getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight) +// fetched from the chain. +func getMissedBlocks(chainConn ChainConn, startingHeight, + endingHeight int32) ([]BlockEpoch, error) { + + numMissedBlocks := endingHeight - startingHeight + if numMissedBlocks < 0 { + return nil, fmt.Errorf("starting height %d is greater than "+ + "ending height %d", startingHeight, endingHeight) + } + + missedBlocks := make([]BlockEpoch, 0, numMissedBlocks) + for height := startingHeight; height < endingHeight; height++ { + hash, err := chainConn.GetBlockHash(int64(height)) + if err != nil { + return nil, fmt.Errorf("unable to find blockhash for "+ + "height=%d: %v", height, err) + } + missedBlocks = append(missedBlocks, + BlockEpoch{Hash: hash, Height: height}) + } + + return missedBlocks, nil +} diff --git a/chainntnfs/interface_debug.go b/chainntnfs/interface_debug.go new file mode 100644 index 00000000..f3e6eaf4 --- /dev/null +++ b/chainntnfs/interface_debug.go @@ -0,0 +1,13 @@ +package chainntnfs + +import "github.com/btcsuite/btcd/chaincfg/chainhash" + +// TestChainNotifier enables the use of methods that are only present during +// testing for ChainNotifiers. +type TestChainNotifier interface { + ChainNotifier + + // UnsafeStart enables notifiers to start up with a specific best block. + // Used for testing. + UnsafeStart(int32, *chainhash.Hash, int32, func() error) error +} diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index b279195c..dade94ed 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -29,15 +29,15 @@ import ( // Required to auto-register the bitcoind backed ChainNotifier // implementation. - _ "github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify" + "github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify" // Required to auto-register the btcd backed ChainNotifier // implementation. - _ "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify" + "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify" // Required to auto-register the neutrino backed ChainNotifier // implementation. - _ "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify" + "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify" // Required to register the boltdb walletdb implementation. _ "github.com/btcsuite/btcwallet/walletdb/bdb" @@ -113,7 +113,7 @@ func waitForMempoolTx(r *rpctest.Harness, txid *chainhash.Hash) error { } func testSingleConfirmationNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the case of being notified once a txid reaches // a *single* confirmation. @@ -184,7 +184,7 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, } func testMultiConfirmationNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the case of being notified once a txid reaches // N confirmations, where N > 1. @@ -232,7 +232,7 @@ func testMultiConfirmationNotification(miner *rpctest.Harness, } func testBatchConfirmationNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test a case of serving notifications to multiple // clients, each requesting to be notified once a txid receives @@ -400,7 +400,7 @@ func checkNotificationFields(ntfn *chainntnfs.SpendDetail, } func testSpendNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the spend notifications for all ChainNotifier // concrete implementations. @@ -512,7 +512,7 @@ func testSpendNotification(miner *rpctest.Harness, } func testBlockEpochNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the case of multiple registered clients receiving // block epoch notifications. @@ -526,7 +526,7 @@ func testBlockEpochNotification(miner *rpctest.Harness, // blocks we generate below. So we'll use a WaitGroup to synchronize the // test. for i := 0; i < numClients; i++ { - epochClient, err := notifier.RegisterBlockEpochNtfn() + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) if err != nil { t.Fatalf("unable to register for epoch notification") } @@ -560,7 +560,7 @@ func testBlockEpochNotification(miner *rpctest.Harness, } func testMultiClientConfirmationNotification(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the case of a multiple clients registered to // receive a confirmation notification for the same transaction. @@ -626,7 +626,7 @@ func testMultiClientConfirmationNotification(miner *rpctest.Harness, // transaction that has already been included in a block. In this case, the // confirmation notification should be dispatched immediately. func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // First, let's send some coins to "ourself", obtaining a txid. We're // spending from a coinbase output here, so we use the dedicated @@ -786,7 +786,7 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, // checking for a confirmation. This should not cause the notifier to stop // working func testLazyNtfnConsumer(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // Create a transaction to be notified about. We'll register for // notifications on this transaction but won't be prompt in checking them @@ -877,7 +877,7 @@ func testLazyNtfnConsumer(miner *rpctest.Harness, // has already been included in a block. In this case, the spend notification // should be dispatched immediately. func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test the spend notifications for all ChainNotifier // concrete implementations. @@ -898,7 +898,7 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, // We create an epoch client we can use to make sure the notifier is // caught up to the mining node's chain. - epochClient, err := notifier.RegisterBlockEpochNtfn() + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) if err != nil { t.Fatalf("unable to register for block epoch: %v", err) } @@ -981,7 +981,7 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, } func testCancelSpendNtfn(node *rpctest.Harness, - notifier chainntnfs.ChainNotifier, t *testing.T) { + notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to test that once a spend notification is registered, it // can be cancelled before the notification is dispatched. @@ -1072,7 +1072,7 @@ func testCancelSpendNtfn(node *rpctest.Harness, } } -func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, +func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier, t *testing.T) { // We'd like to ensure that once a client cancels their block epoch @@ -1082,7 +1082,7 @@ func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifie epochClients := make([]*chainntnfs.BlockEpochEvent, numClients) for i := 0; i < numClients; i++ { - epochClient, err := notifier.RegisterBlockEpochNtfn() + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) if err != nil { t.Fatalf("unable to register for epoch notification") } @@ -1122,7 +1122,7 @@ func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifie } } -func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, +func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.TestChainNotifier, t *testing.T) { // Set up a new miner that we can use to cause a reorg. @@ -1274,10 +1274,369 @@ func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, } } +// testCatchUpClientOnMissedBlocks tests the case of multiple registered client +// receiving historical block epoch notifications due to their best known block +// being out of date. +func testCatchUpClientOnMissedBlocks(miner *rpctest.Harness, + notifier chainntnfs.TestChainNotifier, t *testing.T) { + + const numBlocks = 10 + const numClients = 5 + var wg sync.WaitGroup + + outdatedHash, outdatedHeight, err := miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to retrieve current height: %v", err) + } + + // This function is used by UnsafeStart to ensure all notifications + // are fully drained before clients register for notifications. + generateBlocks := func() error { + _, err = miner.Node.Generate(numBlocks) + return err + } + + // We want to ensure that when a client registers for block notifications, + // the notifier's best block is at the tip of the chain. If it isn't, the + // client may not receive all historical notifications. + bestHeight := outdatedHeight + numBlocks + if err := notifier.UnsafeStart( + bestHeight, nil, bestHeight, generateBlocks); err != nil { + + t.Fatalf("Unable to unsafe start the notifier: %v", err) + } + + // Create numClients clients whose best known block is 10 blocks behind + // the tip of the chain. We expect each client to receive numBlocks + // notifications, 1 for each block they're behind. + clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients) + outdatedBlock := &chainntnfs.BlockEpoch{ + Height: outdatedHeight, Hash: outdatedHash, + } + for i := 0; i < numClients; i++ { + epochClient, err := notifier.RegisterBlockEpochNtfn(outdatedBlock) + if err != nil { + t.Fatalf("unable to register for epoch notification: %v", err) + } + clients = append(clients, epochClient) + } + for expectedHeight := outdatedHeight + 1; expectedHeight <= + bestHeight; expectedHeight++ { + + for _, epochClient := range clients { + select { + case block := <-epochClient.Epochs: + if block.Height != expectedHeight { + t.Fatalf("received block of height: %d, "+ + "expected: %d", block.Height, + expectedHeight) + } + case <-time.After(20 * time.Second): + t.Fatalf("did not receive historical notification "+ + "for height %d", expectedHeight) + } + + } + } + + // Finally, ensure that an extra block notification wasn't received. + anyExtras := make(chan struct{}, len(clients)) + for _, epochClient := range clients { + wg.Add(1) + go func(epochClient *chainntnfs.BlockEpochEvent) { + defer wg.Done() + select { + case <-epochClient.Epochs: + anyExtras <- struct{}{} + case <-time.After(5 * time.Second): + } + }(epochClient) + } + + wg.Wait() + close(anyExtras) + + var extraCount int + for range anyExtras { + extraCount++ + } + + if extraCount > 0 { + t.Fatalf("received %d unexpected block notification", extraCount) + } +} + +// testCatchUpOnMissedBlocks the case of multiple registered clients receiving +// historical block epoch notifications due to the notifier's best known block +// being out of date. +func testCatchUpOnMissedBlocks(miner *rpctest.Harness, + notifier chainntnfs.TestChainNotifier, t *testing.T) { + + const numBlocks = 10 + const numClients = 5 + var wg sync.WaitGroup + + _, bestHeight, err := miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + // This function is used by UnsafeStart to ensure all notifications + // are fully drained before clients register for notifications. + generateBlocks := func() error { + _, err = miner.Node.Generate(numBlocks) + return err + } + + // Next, start the notifier with outdated best block information. + if err := notifier.UnsafeStart(bestHeight, + nil, bestHeight+numBlocks, generateBlocks); err != nil { + + t.Fatalf("Unable to unsafe start the notifier: %v", err) + } + + // Create numClients clients who will listen for block notifications. + clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients) + for i := 0; i < numClients; i++ { + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + t.Fatalf("unable to register for epoch notification: %v", err) + } + clients = append(clients, epochClient) + } + + // Generate a single block to trigger the backlog of historical + // notifications for the previously mined blocks. + if _, err := miner.Node.Generate(1); err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + + // We expect each client to receive numBlocks + 1 notifications, 1 for + // each block that the notifier has missed out on. + for expectedHeight := bestHeight + 1; expectedHeight <= + bestHeight+numBlocks+1; expectedHeight++ { + + for _, epochClient := range clients { + select { + case block := <-epochClient.Epochs: + if block.Height != expectedHeight { + t.Fatalf("received block of height: %d, "+ + "expected: %d", block.Height, + expectedHeight) + } + case <-time.After(20 * time.Second): + t.Fatalf("did not receive historical notification "+ + "for height %d", expectedHeight) + } + } + } + + // Finally, ensure that an extra block notification wasn't received. + anyExtras := make(chan struct{}, len(clients)) + for _, epochClient := range clients { + wg.Add(1) + go func(epochClient *chainntnfs.BlockEpochEvent) { + defer wg.Done() + select { + case <-epochClient.Epochs: + anyExtras <- struct{}{} + case <-time.After(5 * time.Second): + } + }(epochClient) + } + + wg.Wait() + close(anyExtras) + + var extraCount int + for range anyExtras { + extraCount++ + } + + if extraCount > 0 { + t.Fatalf("received %d unexpected block notification", extraCount) + } +} + +// testCatchUpOnMissedBlocks tests that a client will still receive all valid +// block notifications in the case where a notifier's best block has been reorged +// out of the chain. +func testCatchUpOnMissedBlocksWithReorg(miner1 *rpctest.Harness, + notifier chainntnfs.TestChainNotifier, t *testing.T) { + + const numBlocks = 10 + const numClients = 5 + var wg sync.WaitGroup + + // Set up a new miner that we can use to cause a reorg. + miner2, err := rpctest.New(netParams, nil, nil) + if err != nil { + t.Fatalf("unable to create mining node: %v", err) + } + if err := miner2.SetUp(false, 0); err != nil { + t.Fatalf("unable to set up mining node: %v", err) + } + defer miner2.TearDown() + + // We start by connecting the new miner to our original miner, + // such that it will sync to our original chain. + if err := rpctest.ConnectNode(miner1, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice := []*rpctest.Harness{miner1, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + // The two should be on the same blockheight. + _, nodeHeight1, err := miner1.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + _, nodeHeight2, err := miner2.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + if nodeHeight1 != nodeHeight2 { + t.Fatalf("expected both miners to be on the same height: %v vs %v", + nodeHeight1, nodeHeight2) + } + + // We disconnect the two nodes, such that we can start mining on them + // individually without the other one learning about the new blocks. + err = miner1.Node.AddNode(miner2.P2PAddress(), rpcclient.ANRemove) + if err != nil { + t.Fatalf("unable to remove node: %v", err) + } + + // Now mine on each chain separately + blocks, err := miner1.Node.Generate(numBlocks) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // We generate an extra block on miner 2's chain to ensure it is the + // longer chain. + _, err = miner2.Node.Generate(numBlocks + 1) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // Sync the two chains to ensure they will sync to miner2's chain. + if err := rpctest.ConnectNode(miner1, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice = []*rpctest.Harness{miner1, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + // Next, start the notifier with outdated best block information. + // We set the notifier's best block to be the last block mined on the + // shorter chain, to test that the notifier correctly rewinds to + // the common ancestor between the two chains. + syncHeight := nodeHeight1 + numBlocks + 1 + if err := notifier.UnsafeStart(nodeHeight1+numBlocks, + blocks[numBlocks-1], syncHeight, nil); err != nil { + + t.Fatalf("Unable to unsafe start the notifier: %v", err) + } + + // Create numClients clients who will listen for block notifications. + clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients) + for i := 0; i < numClients; i++ { + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + t.Fatalf("unable to register for epoch notification: %v", err) + } + clients = append(clients, epochClient) + } + + // Generate a single block, which should trigger the notifier to rewind + // to the common ancestor and dispatch notifications from there. + _, err = miner2.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // If the chain backend to the notifier stores information about reorged + // blocks, the notifier is able to rewind the chain to the common + // ancestor between the chain tip and its outdated best known block. + // In this case, the client is expected to receive numBlocks + 2 + // notifications, 1 for each block the notifier has missed out on from + // the longer chain. + // + // If the chain backend does not store information about reorged blocks, + // the notifier has no way of knowing where to rewind to and therefore + // the client is only expected to receive notifications for blocks + // whose height is greater than the notifier's best known height: 2 + // notifications, in this case. + var startingHeight int32 + switch notifier.(type) { + case *neutrinonotify.NeutrinoNotifier: + startingHeight = nodeHeight1 + numBlocks + 1 + default: + startingHeight = nodeHeight1 + 1 + } + + for expectedHeight := startingHeight; expectedHeight <= + nodeHeight1+numBlocks+2; expectedHeight++ { + + for _, epochClient := range clients { + select { + case block := <-epochClient.Epochs: + if block.Height != expectedHeight { + t.Fatalf("received block of height: %d, "+ + "expected: %d", block.Height, + expectedHeight) + } + case <-time.After(20 * time.Second): + t.Fatalf("did not receive historical notification "+ + "for height %d", expectedHeight) + } + } + } + + // Finally, ensure that an extra block notification wasn't received. + anyExtras := make(chan struct{}, len(clients)) + for _, epochClient := range clients { + wg.Add(1) + go func(epochClient *chainntnfs.BlockEpochEvent) { + defer wg.Done() + select { + case <-epochClient.Epochs: + anyExtras <- struct{}{} + case <-time.After(5 * time.Second): + } + }(epochClient) + } + + wg.Wait() + close(anyExtras) + + var extraCount int + for range anyExtras { + extraCount++ + } + + if extraCount > 0 { + t.Fatalf("received %d unexpected block notification", extraCount) + } +} + type testCase struct { name string - test func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T) + test func(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier, t *testing.T) +} + +type blockCatchupTestCase struct { + name string + + test func(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier, + t *testing.T) } var ntfnTests = []testCase{ @@ -1331,6 +1690,21 @@ var ntfnTests = []testCase{ }, } +var blockCatchupTests = []blockCatchupTestCase{ + { + name: "catch up client on historical block epoch ntfns", + test: testCatchUpClientOnMissedBlocks, + }, + { + name: "test catch up on missed blocks", + test: testCatchUpOnMissedBlocks, + }, + { + name: "test catch up on missed blocks w/ reorged best block", + test: testCatchUpOnMissedBlocksWithReorg, + }, +} + // TestInterfaces tests all registered interfaces with a unified set of tests // which exercise each of the required methods found within the ChainNotifier // interface. @@ -1361,8 +1735,10 @@ func TestInterfaces(t *testing.T) { log.Printf("Running %v ChainNotifier interface tests\n", len(ntfnTests)) var ( - notifier chainntnfs.ChainNotifier + notifier chainntnfs.TestChainNotifier cleanUp func() + + newNotifier func() (chainntnfs.TestChainNotifier, error) ) for _, notifierDriver := range chainntnfs.RegisteredNotifiers() { notifierType := notifierDriver.NotifierType @@ -1430,18 +1806,15 @@ func TestInterfaces(t *testing.T) { } cleanUp = cleanUp3 - notifier, err = notifierDriver.New(chainConn) - if err != nil { - t.Fatalf("unable to create %v notifier: %v", - notifierType, err) + newNotifier = func() (chainntnfs.TestChainNotifier, error) { + return bitcoindnotify.New(chainConn), nil } case "btcd": - notifier, err = notifierDriver.New(&rpcConfig) - if err != nil { - t.Fatalf("unable to create %v notifier: %v", - notifierType, err) + newNotifier = func() (chainntnfs.TestChainNotifier, error) { + return btcdnotify.New(&rpcConfig) } + cleanUp = func() {} case "neutrino": @@ -1481,16 +1854,18 @@ func TestInterfaces(t *testing.T) { for !spvNode.IsCurrent() { time.Sleep(time.Millisecond * 100) } - - notifier, err = notifierDriver.New(spvNode) - if err != nil { - t.Fatalf("unable to create %v notifier: %v", - notifierType, err) + newNotifier = func() (chainntnfs.TestChainNotifier, error) { + return neutrinonotify.New(spvNode) } } t.Logf("Running ChainNotifier interface tests for: %v", notifierType) + notifier, err = newNotifier() + if err != nil { + t.Fatalf("unable to create %v notifier: %v", notifierType, err) + } + if err := notifier.Start(); err != nil { t.Fatalf("unable to start notifier %v: %v", notifierType, err) @@ -1510,6 +1885,30 @@ func TestInterfaces(t *testing.T) { } notifier.Stop() + + // Run catchup tests separately since they require + // restarting the notifier every time. + for _, blockCatchupTest := range blockCatchupTests { + notifier, err = newNotifier() + if err != nil { + t.Fatalf("unable to create %v notifier: %v", + notifierType, err) + } + + testName := fmt.Sprintf("%v: %v", notifierType, + blockCatchupTest.name) + + success := t.Run(testName, func(t *testing.T) { + blockCatchupTest.test(miner, notifier, t) + }) + + notifier.Stop() + + if !success { + break + } + } + if cleanUp != nil { cleanUp() } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index f612ef1c..07e297a6 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/txscript" @@ -20,7 +21,6 @@ import ( ) const ( - // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "neutrino" @@ -62,6 +62,8 @@ type NeutrinoNotifier struct { p2pNode *neutrino.ChainService chainView *neutrino.Rescan + chainConn *NeutrinoChainConn + notificationCancels chan interface{} notificationRegistry chan interface{} @@ -151,6 +153,8 @@ func (n *NeutrinoNotifier) Start() error { bestHeight, reorgSafetyLimit, ) + n.chainConn = &NeutrinoChainConn{n.p2pNode} + // Finally, we'll create our rescan struct, start it, and launch all // the goroutines we need to operate this ChainNotifier instance. n.chainView = n.p2pNode.NewRescan(rescanOptions...) @@ -241,7 +245,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // notification registrations, as well as notification dispatches. func (n *NeutrinoNotifier) notificationDispatcher() { defer n.wg.Done() - +out: for { select { case cancelMsg := <-n.notificationCancels: @@ -361,58 +365,131 @@ func (n *NeutrinoNotifier) notificationDispatcher() { "to update rescan: %v", err) } + }() case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") n.blockEpochClients[msg.epochID] = msg + if msg.bestBlock != nil { + n.heightMtx.Lock() + bestHeight := int32(n.bestHeight) + n.heightMtx.Unlock() + missedBlocks, err := + chainntnfs.GetClientMissedBlocks( + n.chainConn, msg.bestBlock, + bestHeight, false, + ) + if err != nil { + msg.errorChan <- err + continue + } + for _, block := range missedBlocks { + n.notifyBlockEpochClient(msg, + block.Height, block.Hash) + } + } + msg.errorChan <- nil } case item := <-n.chainUpdates.ChanOut(): update := item.(*filteredBlock) if update.connect { n.heightMtx.Lock() + // Since neutrino has no way of knowing what + // height to rewind to in the case of a reorged + // best known height, there is no point in + // checking that the previous hash matches the + // the hash from our best known height the way + // the other notifiers do when they receive + // a new connected block. Therefore, we just + // compare the heights. if update.height != n.bestHeight+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - n.bestHeight, update.height) - n.heightMtx.Unlock() - continue + // Handle the case where the notifier + // missed some blocks from its chain + // backend + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + bestBlock := chainntnfs.BlockEpoch{ + Height: int32(n.bestHeight), + Hash: nil, + } + _, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + n.chainConn, + n.txConfNotifier, + bestBlock, + int32(update.height), + false, + ) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue + } + + for _, block := range missedBlocks { + filteredBlock, err := + n.getFilteredBlock(block) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue out + } + err = n.handleBlockConnected(filteredBlock) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue out + } + } + } - n.bestHeight = update.height - n.heightMtx.Unlock() - - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.height, update.hash) - err := n.handleBlockConnected(update) if err != nil { chainntnfs.Log.Error(err) } - continue - } - - n.heightMtx.Lock() - if update.height != n.bestHeight { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, disconnected height=%d", - n.bestHeight, update.height) n.heightMtx.Unlock() continue } - n.bestHeight = update.height - 1 - n.heightMtx.Unlock() - - chainntnfs.Log.Infof("Block disconnected from main chain: "+ - "height=%v, sha=%v", update.height, update.hash) - - err := n.txConfNotifier.DisconnectTip(update.height) - if err != nil { - chainntnfs.Log.Error(err) + n.heightMtx.Lock() + if update.height != uint32(n.bestHeight) { + chainntnfs.Log.Infof("Missed disconnected" + + "blocks, attempting to catch up") } + header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight( + n.bestHeight, + ) + if err != nil { + chainntnfs.Log.Errorf("Unable to fetch header"+ + "for height %d: %v", n.bestHeight, err) + n.heightMtx.Unlock() + continue + } + + hash := header.BlockHash() + notifierBestBlock := chainntnfs.BlockEpoch{ + Height: int32(n.bestHeight), + Hash: &hash, + } + newBestBlock, err := chainntnfs.RewindChain( + n.chainConn, n.txConfNotifier, notifierBestBlock, + int32(update.height-1), + ) + if err != nil { + chainntnfs.Log.Errorf("Unable to rewind chain "+ + "from height %d to height %d: %v", + n.bestHeight, update.height-1, err) + } + + // Set the bestHeight here in case a chain rewind + // partially completed. + n.bestHeight = uint32(newBestBlock.Height) + n.heightMtx.Unlock() + case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err) @@ -504,15 +581,29 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, return nil, nil } -// handleBlocksConnected applies a chain update for a new block. Any watched +// handleBlockConnected applies a chain update for a new block. Any watched // transactions included this block will processed to either send notifications // now or after numConfirmations confs. func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { - // First we'll notify any subscribed clients of the block. + // First process the block for our internal state. A new block has + // been connected to the main chain. Send out any N confirmation + // notifications which may have been triggered by this new block. + err := n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, + newBlock.txns) + if err != nil { + return fmt.Errorf("unable to connect tip: %v", err) + } + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + newBlock.height, newBlock.hash) + + n.bestHeight = newBlock.height + + // Next, notify any subscribed clients of the block. n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Next, we'll scan over the list of relevant transactions and possibly - // dispatch notifications for confirmations and spends. + // Finally, we'll scan over the list of relevant transactions and + // possibly dispatch notifications for confirmations and spends. for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -520,10 +611,10 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { for i, txIn := range mtx.TxIn { prevOut := txIn.PreviousOutPoint - // If this transaction indeed does spend an output - // which we have a registered notification for, then - // create a spend summary, finally sending off the - // details to the notification subscriber. + // If this transaction indeed does spend an output which + // we have a registered notification for, then create a + // spend summary, finally sending off the details to the + // notification subscriber. clients, ok := n.spendNotifications[prevOut] if !ok { continue @@ -555,33 +646,49 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { } } - // A new block has been connected to the main chain. Send out any N - // confirmation notifications which may have been triggered by this new - // block. - n.txConfNotifier.ConnectTip( - &newBlock.hash, newBlock.height, newBlock.txns, - ) - return nil } +// getFilteredBlock is a utility to retrieve the full filtered block from a block epoch. +func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filteredBlock, error) { + rawBlock, err := n.p2pNode.GetBlockFromNetwork(*epoch.Hash) + if err != nil { + return nil, fmt.Errorf("unable to get block: %v", err) + } + + txns := rawBlock.Transactions() + + block := &filteredBlock{ + hash: *epoch.Hash, + height: uint32(epoch.Height), + txns: txns, + connect: true, + } + return block, nil +} + // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + for _, client := range n.blockEpochClients { + n.notifyBlockEpochClient(client, newHeight, newSha) + } +} + +// notifyBlockEpochClient sends a registered block epoch client a notification +// about a specific block. +func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration, + height int32, sha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ - Height: newHeight, - Hash: newSha, + Height: height, + Hash: sha, } - for _, epochClient := range n.blockEpochClients { - select { - - case epochClient.epochQueue.ChanIn() <- epoch: - - case <-epochClient.cancelChan: - - case <-n.quit: - } + select { + case epochClient.epochQueue.ChanIn() <- epoch: + case <-epochClient.cancelChan: + case <-n.quit: } } @@ -781,6 +888,10 @@ type blockEpochRegistration struct { cancelChan chan struct{} + bestBlock *chainntnfs.BlockEpoch + + errorChan chan error + wg sync.WaitGroup } @@ -790,14 +901,20 @@ type epochCancel struct { epochID uint64 } -// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller -// to receive notifications, of each new block connected to the main chain. -func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the +// caller to receive notifications, of each new block connected to the main +// chain. Clients have the option of passing in their best known block, which +// the notifier uses to check if they are behind on blocks and catch them up. +func (n *NeutrinoNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + reg := &blockEpochRegistration{ epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&n.epochClientCounter, 1), + bestBlock: bestBlock, + errorChan: make(chan error, 1), } reg.epochQueue.Start() @@ -868,3 +985,41 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent }, nil } } + +// NeutrinoChainConn is a wrapper around neutrino's chain backend in order +// to satisfy the chainntnfs.ChainConn interface. +type NeutrinoChainConn struct { + p2pNode *neutrino.ChainService +} + +// GetBlockHeader returns the block header for a hash. +func (n *NeutrinoChainConn) GetBlockHeader(blockHash *chainhash.Hash) (*wire.BlockHeader, error) { + header, _, err := n.p2pNode.BlockHeaders.FetchHeader(blockHash) + if err != nil { + return nil, err + } + return header, nil +} + +// GetBlockHeaderVerbose returns a verbose block header result for a hash. This +// result only contains the height with a nil hash. +func (n *NeutrinoChainConn) GetBlockHeaderVerbose(blockHash *chainhash.Hash) ( + *btcjson.GetBlockHeaderVerboseResult, error) { + + _, height, err := n.p2pNode.BlockHeaders.FetchHeader(blockHash) + if err != nil { + return nil, err + } + // Since only the height is used from the result, leave the hash nil. + return &btcjson.GetBlockHeaderVerboseResult{Height: int32(height)}, nil +} + +// GetBlockHash returns the hash from a block height. +func (n *NeutrinoChainConn) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) { + header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(uint32(blockHeight)) + if err != nil { + return nil, err + } + hash := header.BlockHash() + return &hash, nil +} diff --git a/chainntnfs/neutrinonotify/neutrino_debug.go b/chainntnfs/neutrinonotify/neutrino_debug.go new file mode 100644 index 00000000..56724c4f --- /dev/null +++ b/chainntnfs/neutrinonotify/neutrino_debug.go @@ -0,0 +1,99 @@ +package neutrinonotify + +import ( + "fmt" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/rpcclient" + "github.com/btcsuite/btcwallet/waddrmgr" + "github.com/lightninglabs/neutrino" + "github.com/lightningnetwork/lnd/chainntnfs" +) + +// UnsafeStart starts the notifier with a specified best height and optional +// best hash. Its bestHeight, txConfNotifier and neutrino node are initialized +// with bestHeight. The parameter generateBlocks is necessary for the +// bitcoind notifier to ensure we drain all notifications up to syncHeight, +// since if they are generated ahead of UnsafeStart the chainConn may start +// up with an outdated best block and miss sending ntfns. Used for testing. +func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash, + syncHeight int32, generateBlocks func() error) error { + + // We'll obtain the latest block height of the p2p node. We'll + // start the auto-rescan from this point. Once a caller actually wishes + // to register a chain view, the rescan state will be rewound + // accordingly. + header, height, err := n.p2pNode.BlockHeaders.ChainTip() + if err != nil { + return err + } + startingPoint := &waddrmgr.BlockStamp{ + Height: int32(height), + Hash: header.BlockHash(), + } + + // Next, we'll create our set of rescan options. Currently it's + // required that a user MUST set an addr/outpoint/txid when creating a + // rescan. To get around this, we'll add a "zero" outpoint, that won't + // actually be matched. + var zeroInput neutrino.InputWithScript + rescanOptions := []neutrino.RescanOption{ + neutrino.StartBlock(startingPoint), + neutrino.QuitChan(n.quit), + neutrino.NotificationHandlers( + rpcclient.NotificationHandlers{ + OnFilteredBlockConnected: n.onFilteredBlockConnected, + OnFilteredBlockDisconnected: n.onFilteredBlockDisconnected, + }, + ), + neutrino.WatchInputs(zeroInput), + } + + n.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(bestHeight), reorgSafetyLimit) + + n.chainConn = &NeutrinoChainConn{n.p2pNode} + + // Finally, we'll create our rescan struct, start it, and launch all + // the goroutines we need to operate this ChainNotifier instance. + n.chainView = n.p2pNode.NewRescan(rescanOptions...) + n.rescanErr = n.chainView.Start() + + n.chainUpdates.Start() + + if generateBlocks != nil { + // Ensure no block notifications are pending when we start the + // notification dispatcher goroutine. + + // First generate the blocks, then drain the notifications + // for the generated blocks. + if err := generateBlocks(); err != nil { + return err + } + + timeout := time.After(60 * time.Second) + loop: + for { + select { + case ntfn := <-n.chainUpdates.ChanOut(): + lastReceivedNtfn := ntfn.(*filteredBlock) + if lastReceivedNtfn.height >= uint32(syncHeight) { + break loop + } + case <-timeout: + return fmt.Errorf("unable to catch up to height %d", + syncHeight) + } + } + } + + // Run notificationDispatcher after setting the notifier's best height + // to avoid a race condition. + n.bestHeight = uint32(bestHeight) + + n.wg.Add(1) + go n.notificationDispatcher() + + return nil +} diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 97618d3a..1e728d4e 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -194,7 +194,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, // // TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest? // * reduces the number of goroutines - blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn() + blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return nil, err } @@ -384,7 +384,7 @@ func (c *ChainArbitrator) Start() error { // the chain any longer, only resolve the contracts on the confirmed // commitment. for _, closeChanInfo := range closingChannels { - blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn() + blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } diff --git a/contractcourt/chain_watcher_test.go b/contractcourt/chain_watcher_test.go index ab0b8f39..a8d513f7 100644 --- a/contractcourt/chain_watcher_test.go +++ b/contractcourt/chain_watcher_test.go @@ -21,7 +21,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { return nil, nil } -func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: make(chan *chainntnfs.BlockEpoch), Cancel: func() {}, diff --git a/contractcourt/contract_resolvers.go b/contractcourt/contract_resolvers.go index b525b5df..661ee9d0 100644 --- a/contractcourt/contract_resolvers.go +++ b/contractcourt/contract_resolvers.go @@ -855,7 +855,7 @@ func (h *htlcOutgoingContestResolver) Resolve() (ContractResolver, error) { // If we reach this point, then we can't fully act yet, so we'll await // either of our signals triggering: the HTLC expires, or we learn of // the preimage. - blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn() + blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return nil, err } @@ -1043,7 +1043,7 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) { // ensure the preimage can't be delivered between querying and // registering for the preimage subscription. preimageSubscription := h.PreimageDB.SubscribeUpdates() - blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn() + blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return nil, err } diff --git a/discovery/gossiper.go b/discovery/gossiper.go index f9166683..28e09f25 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -408,7 +408,7 @@ func (d *AuthenticatedGossiper) Start() error { // First we register for new notifications of newly discovered blocks. // We do this immediately so we'll later be able to consume any/all // blocks which were discovered. - blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn() + blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 44edf187..dc0ad4f7 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -280,7 +280,8 @@ func (m *mockNotifier) notifyBlock(hash chainhash.Hash, height uint32) { } } -func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { m.RLock() defer m.RUnlock() diff --git a/fundingmanager.go b/fundingmanager.go index a1180750..3fcce0a3 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1705,7 +1705,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel, confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) { - epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn() + epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { fndgLog.Errorf("unable to register for epoch notification: %v", err) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 6038aaff..224e1e6e 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -112,7 +112,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, }, nil } -func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: m.epochChan, Cancel: func() {}, diff --git a/htlcswitch/decayedlog.go b/htlcswitch/decayedlog.go index 93eb0e51..d6845002 100644 --- a/htlcswitch/decayedlog.go +++ b/htlcswitch/decayedlog.go @@ -103,7 +103,7 @@ func (d *DecayedLog) Start() error { // Start garbage collector. if d.notifier != nil { - epochClient, err := d.notifier.RegisterBlockEpochNtfn() + epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil) if err != nil { return fmt.Errorf("Unable to register for epoch "+ "notifications: %v", err) diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 00795c51..64f843fc 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -790,7 +790,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs uint32, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { return nil, nil } -func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: m.epochChan, Cancel: func() {}, diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index e2ee14b0..8f19cfb3 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1596,7 +1596,7 @@ func (s *Switch) Start() error { log.Infof("Starting HTLC Switch") - blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn() + blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } diff --git a/mock.go b/mock.go index f41bf532..b2d4d89a 100644 --- a/mock.go +++ b/mock.go @@ -92,7 +92,8 @@ func (m *mockNotfier) RegisterConfirmationsNtfn(txid *chainhash.Hash, Confirmed: m.confChannel, }, nil } -func (m *mockNotfier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +func (m *mockNotfier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: make(chan *chainntnfs.BlockEpoch), Cancel: func() {}, diff --git a/utxonursery.go b/utxonursery.go index eba9de75..f3bf362e 100644 --- a/utxonursery.go +++ b/utxonursery.go @@ -257,7 +257,7 @@ func (u *utxoNursery) Start() error { // connected block. We register immediately on startup to ensure that // no blocks are missed while we are handling blocks that were missed // during the time the UTXO nursery was unavailable. - newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn() + newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err }