Merge pull request #1439 from valentinewallace/historical-block-epoch-ntfns

Historical block epoch ntfns
This commit is contained in:
Olaoluwa Osuntokun 2018-08-13 21:18:36 -07:00 committed by GitHub
commit 5efc1229bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1429 additions and 224 deletions

@ -74,6 +74,8 @@ type BitcoindNotifier struct {
blockEpochClients map[uint64]*blockEpochRegistration blockEpochClients map[uint64]*blockEpochRegistration
bestBlock chainntnfs.BlockEpoch
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
} }
@ -119,7 +121,7 @@ func (b *BitcoindNotifier) Start() error {
return err return err
} }
_, currentHeight, err := b.chainConn.GetBestBlock() currentHash, currentHeight, err := b.chainConn.GetBestBlock()
if err != nil { if err != nil {
return err return err
} }
@ -127,8 +129,13 @@ func (b *BitcoindNotifier) Start() error {
b.txConfNotifier = chainntnfs.NewTxConfNotifier( b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit) uint32(currentHeight), reorgSafetyLimit)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
Hash: currentHash,
}
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher(currentHeight) go b.notificationDispatcher()
return nil return nil
} }
@ -174,7 +181,7 @@ type blockNtfn struct {
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches. // notification registrations, as well as notification dispatches.
func (b *BitcoindNotifier) notificationDispatcher(bestHeight int32) { func (b *BitcoindNotifier) notificationDispatcher() {
out: out:
for { for {
select { select {
@ -235,7 +242,7 @@ out:
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
msg.TxID, msg.NumConfirmations) msg.TxID, msg.NumConfirmations)
currentHeight := uint32(bestHeight) currentHeight := uint32(b.bestBlock.Height)
// Look up whether the transaction is already // Look up whether the transaction is already
// included in the active chain. We'll do this // included in the active chain. We'll do this
@ -268,63 +275,101 @@ out:
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
if msg.bestBlock != nil {
missedBlocks, err :=
chainntnfs.GetClientMissedBlocks(
b.chainConn, msg.bestBlock,
b.bestBlock.Height, true,
)
if err != nil {
msg.errorChan <- err
continue
}
for _, block := range missedBlocks {
b.notifyBlockEpochClient(msg,
block.Height, block.Hash)
}
}
msg.errorChan <- nil
case chain.RelevantTx: case chain.RelevantTx:
b.handleRelevantTx(msg, bestHeight) b.handleRelevantTx(msg, b.bestBlock.Height)
} }
case ntfn := <-b.chainConn.Notifications(): case ntfn := <-b.chainConn.Notifications():
switch item := ntfn.(type) { switch item := ntfn.(type) {
case chain.BlockConnected: case chain.BlockConnected:
if item.Height != bestHeight+1 { blockHeader, err :=
chainntnfs.Log.Warnf("Received blocks out of order: "+ b.chainConn.GetBlockHeader(&item.Hash)
"current height=%d, new height=%d",
bestHeight, item.Height)
continue
}
bestHeight = item.Height
rawBlock, err := b.chainConn.GetBlock(&item.Hash)
if err != nil { if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err) chainntnfs.Log.Errorf("Unable to fetch "+
"block header: %v", err)
continue continue
} }
chainntnfs.Log.Infof("New block: height=%v, sha=%v", if blockHeader.PrevBlock != *b.bestBlock.Hash {
item.Height, item.Hash) // Handle the case where the notifier
// missed some blocks from its chain
// backend.
chainntnfs.Log.Infof("Missed blocks, " +
"attempting to catch up")
newBestBlock, missedBlocks, err :=
chainntnfs.HandleMissedBlocks(
b.chainConn,
b.txConfNotifier,
b.bestBlock, item.Height,
true,
)
b.notifyBlockEpochs(item.Height, &item.Hash) if err != nil {
// Set the bestBlock here in case
// a catch up partially completed.
b.bestBlock = newBestBlock
chainntnfs.Log.Error(err)
continue
}
txns := btcutil.NewBlock(rawBlock).Transactions() for _, block := range missedBlocks {
err = b.txConfNotifier.ConnectTip(&item.Hash, err := b.handleBlockConnected(block)
uint32(item.Height), txns) if err != nil {
if err != nil { chainntnfs.Log.Error(err)
continue out
}
}
}
newBlock := chainntnfs.BlockEpoch{
Height: item.Height,
Hash: &item.Hash,
}
if err := b.handleBlockConnected(newBlock); err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
continue continue
case chain.BlockDisconnected: case chain.BlockDisconnected:
if item.Height != bestHeight { if item.Height != b.bestBlock.Height {
chainntnfs.Log.Warnf("Received blocks "+ chainntnfs.Log.Infof("Missed disconnected" +
"out of order: current height="+ "blocks, attempting to catch up")
"%d, disconnected height=%d",
bestHeight, item.Height)
continue
} }
bestHeight = item.Height - 1
chainntnfs.Log.Infof("Block disconnected from "+ newBestBlock, err := chainntnfs.RewindChain(
"main chain: height=%v, sha=%v", b.chainConn, b.txConfNotifier,
item.Height, item.Hash) b.bestBlock, item.Height-1,
)
err := b.txConfNotifier.DisconnectTip(
uint32(item.Height))
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Errorf("Unable to rewind chain "+
"from height %d to height %d: %v",
b.bestBlock.Height, item.Height-1, err)
} }
// Set the bestBlock here in case a chain
// rewind partially completed.
b.bestBlock = newBestBlock
case chain.RelevantTx: case chain.RelevantTx:
b.handleRelevantTx(item, bestHeight) b.handleRelevantTx(item, b.bestBlock.Height)
} }
case <-b.quit: case <-b.quit:
@ -517,23 +562,57 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash,
return nil, nil return nil, nil
} }
// handleBlockConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications
// now or after numConfirmations confs.
func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) error {
rawBlock, err := b.chainConn.GetBlock(block.Hash)
if err != nil {
return fmt.Errorf("unable to get block: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
block.Height, block.Hash)
txns := btcutil.NewBlock(rawBlock).Transactions()
err = b.txConfNotifier.ConnectTip(
block.Hash, uint32(block.Height), txns)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
// We want to set the best block before dispatching notifications so
// if any subscribers make queries based on their received block epoch,
// our state is fully updated in time.
b.bestBlock = block
b.notifyBlockEpochs(block.Height, block.Hash)
return nil
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly // notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain. // connected block to the main chain.
func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
for _, client := range b.blockEpochClients {
b.notifyBlockEpochClient(client, newHeight, newSha)
}
}
// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (b *BitcoindNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
epoch := &chainntnfs.BlockEpoch{ epoch := &chainntnfs.BlockEpoch{
Height: newHeight, Height: height,
Hash: newSha, Hash: sha,
} }
for _, epochClient := range b.blockEpochClients { select {
select { case epochClient.epochQueue.ChanIn() <- epoch:
case <-epochClient.cancelChan:
case epochClient.epochQueue.ChanIn() <- epoch: case <-b.quit:
case <-epochClient.cancelChan:
case <-b.quit:
}
} }
} }
@ -805,6 +884,10 @@ type blockEpochRegistration struct {
epochQueue *chainntnfs.ConcurrentQueue epochQueue *chainntnfs.ConcurrentQueue
bestBlock *chainntnfs.BlockEpoch
errorChan chan error
cancelChan chan struct{} cancelChan chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -818,13 +901,18 @@ type epochCancel struct {
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
// caller to receive notifications, of each new block connected to the main // caller to receive notifications, of each new block connected to the main
// chain. // chain. Clients have the option of passing in their best known block, which
func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { // the notifier uses to check if they are behind on blocks and catch them up.
func (b *BitcoindNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
reg := &blockEpochRegistration{ reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20), epochQueue: chainntnfs.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20), epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}), cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1), epochID: atomic.AddUint64(&b.epochClientCounter, 1),
bestBlock: bestBlock,
errorChan: make(chan error, 1),
} }
reg.epochQueue.Start() reg.epochQueue.Start()

@ -0,0 +1,77 @@
package bitcoindnotify
import (
"fmt"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcwallet/chain"
"github.com/lightningnetwork/lnd/chainntnfs"
)
// UnsafeStart starts the notifier with a specified best height and optional
// best hash. Its bestBlock and txConfNotifier are initialized with
// bestHeight and optionally bestHash. The parameter generateBlocks is
// necessary for the bitcoind notifier to ensure we drain all notifications up
// to syncHeight, since if they are generated ahead of UnsafeStart the chainConn
// may start up with an outdated best block and miss sending ntfns. Used for
// testing.
func (b *BitcoindNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash,
syncHeight int32, generateBlocks func() error) error {
// Connect to bitcoind, and register for notifications on connected,
// and disconnected blocks.
if err := b.chainConn.Start(); err != nil {
return err
}
if err := b.chainConn.NotifyBlocks(); err != nil {
return err
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(bestHeight), reorgSafetyLimit)
if generateBlocks != nil {
// Ensure no block notifications are pending when we start the
// notification dispatcher goroutine.
// First generate the blocks, then drain the notifications
// for the generated blocks.
if err := generateBlocks(); err != nil {
return err
}
timeout := time.After(60 * time.Second)
loop:
for {
select {
case ntfn := <-b.chainConn.Notifications():
switch update := ntfn.(type) {
case chain.BlockConnected:
if update.Height >= syncHeight {
break loop
}
}
case <-timeout:
return fmt.Errorf("unable to catch up to height %d",
syncHeight)
}
}
}
// Run notificationDispatcher after setting the notifier's best block
// to avoid a race condition.
b.bestBlock = chainntnfs.BlockEpoch{Height: bestHeight, Hash: bestHash}
if bestHash == nil {
hash, err := b.chainConn.GetBlockHash(int64(bestHeight))
if err != nil {
return err
}
b.bestBlock.Hash = hash
}
b.wg.Add(1)
go b.notificationDispatcher()
return nil
}

@ -16,7 +16,6 @@ import (
) )
const ( const (
// notifierType uniquely identifies this concrete implementation of the // notifierType uniquely identifies this concrete implementation of the
// ChainNotifier interface. // ChainNotifier interface.
notifierType = "btcd" notifierType = "btcd"
@ -79,6 +78,8 @@ type BtcdNotifier struct {
blockEpochClients map[uint64]*blockEpochRegistration blockEpochClients map[uint64]*blockEpochRegistration
bestBlock chainntnfs.BlockEpoch
chainUpdates *chainntnfs.ConcurrentQueue chainUpdates *chainntnfs.ConcurrentQueue
txUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue
@ -143,7 +144,7 @@ func (b *BtcdNotifier) Start() error {
return err return err
} }
_, currentHeight, err := b.chainConn.GetBestBlock() currentHash, currentHeight, err := b.chainConn.GetBestBlock()
if err != nil { if err != nil {
return err return err
} }
@ -151,11 +152,16 @@ func (b *BtcdNotifier) Start() error {
b.txConfNotifier = chainntnfs.NewTxConfNotifier( b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit) uint32(currentHeight), reorgSafetyLimit)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
Hash: currentHash,
}
b.chainUpdates.Start() b.chainUpdates.Start()
b.txUpdates.Start() b.txUpdates.Start()
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher(currentHeight) go b.notificationDispatcher()
return nil return nil
} }
@ -245,7 +251,7 @@ func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetai
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches. // notification registrations, as well as notification dispatches.
func (b *BtcdNotifier) notificationDispatcher(currentHeight int32) { func (b *BtcdNotifier) notificationDispatcher() {
out: out:
for { for {
select { select {
@ -305,7 +311,7 @@ out:
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
msg.TxID, msg.NumConfirmations) msg.TxID, msg.NumConfirmations)
bestHeight := uint32(currentHeight) bestHeight := uint32(b.bestBlock.Height)
// Look up whether the transaction is already // Look up whether the transaction is already
// included in the active chain. We'll do this // included in the active chain. We'll do this
@ -338,60 +344,96 @@ out:
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
if msg.bestBlock != nil {
missedBlocks, err :=
chainntnfs.GetClientMissedBlocks(
b.chainConn, msg.bestBlock,
b.bestBlock.Height, true,
)
if err != nil {
msg.errorChan <- err
continue
}
for _, block := range missedBlocks {
b.notifyBlockEpochClient(msg,
block.Height, block.Hash)
}
}
msg.errorChan <- nil
} }
case item := <-b.chainUpdates.ChanOut(): case item := <-b.chainUpdates.ChanOut():
update := item.(*chainUpdate) update := item.(*chainUpdate)
if update.connect { if update.connect {
if update.blockHeight != currentHeight+1 { blockHeader, err :=
chainntnfs.Log.Warnf("Received blocks out of order: "+ b.chainConn.GetBlockHeader(update.blockHash)
"current height=%d, new height=%d",
currentHeight, update.blockHeight)
continue
}
currentHeight = update.blockHeight
rawBlock, err := b.chainConn.GetBlock(update.blockHash)
if err != nil { if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err) chainntnfs.Log.Errorf("Unable to fetch "+
"block header: %v", err)
continue continue
} }
chainntnfs.Log.Infof("New block: height=%v, sha=%v", if blockHeader.PrevBlock != *b.bestBlock.Hash {
update.blockHeight, update.blockHash) // Handle the case where the notifier
// missed some blocks from its chain
// backend
chainntnfs.Log.Infof("Missed blocks, " +
"attempting to catch up")
newBestBlock, missedBlocks, err :=
chainntnfs.HandleMissedBlocks(
b.chainConn,
b.txConfNotifier,
b.bestBlock,
update.blockHeight,
true,
)
if err != nil {
// Set the bestBlock here in case
// a catch up partially completed.
b.bestBlock = newBestBlock
chainntnfs.Log.Error(err)
continue
}
txns := btcutil.NewBlock(rawBlock).Transactions() for _, block := range missedBlocks {
err := b.handleBlockConnected(block)
block := &filteredBlock{ if err != nil {
hash: *update.blockHash, chainntnfs.Log.Error(err)
height: uint32(update.blockHeight), continue out
txns: txns, }
connect: true, }
} }
if err := b.handleBlockConnected(block); err != nil {
newBlock := chainntnfs.BlockEpoch{
Height: update.blockHeight,
Hash: update.blockHash,
}
if err := b.handleBlockConnected(newBlock); err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
continue continue
} }
if update.blockHeight != currentHeight { if update.blockHeight != b.bestBlock.Height {
chainntnfs.Log.Warnf("Received blocks out of order: "+ chainntnfs.Log.Infof("Missed disconnected" +
"current height=%d, disconnected height=%d", "blocks, attempting to catch up")
currentHeight, update.blockHeight)
continue
} }
currentHeight = update.blockHeight - 1 newBestBlock, err := chainntnfs.RewindChain(
b.chainConn, b.txConfNotifier, b.bestBlock,
chainntnfs.Log.Infof("Block disconnected from main chain: "+ update.blockHeight-1,
"height=%v, sha=%v", update.blockHeight, update.blockHash) )
err := b.txConfNotifier.DisconnectTip(uint32(update.blockHeight))
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Errorf("Unable to rewind chain "+
"from height %d to height %d: %v",
b.bestBlock.Height, update.blockHeight-1, err)
} }
// Set the bestBlock here in case a chain rewind
// partially completed.
b.bestBlock = newBestBlock
// NOTE: we currently only use txUpdates for mempool spends and // NOTE: we currently only use txUpdates for mempool spends and
// rescan spends. It might get removed entirely in the future. // rescan spends. It might get removed entirely in the future.
case item := <-b.txUpdates.ChanOut(): case item := <-b.txUpdates.ChanOut():
@ -590,17 +632,47 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash,
return nil, nil return nil, nil
} }
// handleBlocksConnected applies a chain update for a new block. Any watched // handleBlockConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications // transactions included this block will processed to either send notifications
// now or after numConfirmations confs. // now or after numConfirmations confs.
// TODO(halseth): this is reusing the neutrino notifier implementation, unify // TODO(halseth): this is reusing the neutrino notifier implementation, unify
// them. // them.
func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
// First we'll notify any subscribed clients of the block. // First process the block for our internal state. A new block has
// been connected to the main chain. Send out any N confirmation
// notifications which may have been triggered by this new block.
rawBlock, err := b.chainConn.GetBlock(epoch.Hash)
if err != nil {
return fmt.Errorf("unable to get block: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
epoch.Height, epoch.Hash)
txns := btcutil.NewBlock(rawBlock).Transactions()
newBlock := &filteredBlock{
hash: *epoch.Hash,
height: uint32(epoch.Height),
txns: txns,
connect: true,
}
err = b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height,
newBlock.txns)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
// We want to set the best block before dispatching notifications
// so if any subscribers make queries based on their received
// block epoch, our state is fully updated in time.
b.bestBlock = epoch
// Next we'll notify any subscribed clients of the block.
b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Next, we'll scan over the list of relevant transactions and possibly // Finally, we'll scan over the list of relevant transactions and
// dispatch notifications for confirmations and spends. // possibly dispatch notifications for confirmations and spends.
for _, tx := range newBlock.txns { for _, tx := range newBlock.txns {
mtx := tx.MsgTx() mtx := tx.MsgTx()
txSha := mtx.TxHash() txSha := mtx.TxHash()
@ -608,9 +680,10 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error {
for i, txIn := range mtx.TxIn { for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an output which we have a // If this transaction indeed does spend an output which
// registered notification for, then create a spend summary, finally // we have a registered notification for, then create a
// sending off the details to the notification subscriber. // spend summary, finally sending off the details to the
// notification subscriber.
clients, ok := b.spendNotifications[prevOut] clients, ok := b.spendNotifications[prevOut]
if !ok { if !ok {
continue continue
@ -629,9 +702,10 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error {
"outpoint=%v", ntfn.targetOutpoint) "outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to Cancel will not // Close spendChan to ensure that any calls to
// block. This is safe to do since the channel is buffered, and // Cancel will not block. This is safe to do
// the message can still be read by the receiver. // since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan) close(ntfn.spendChan)
} }
@ -639,31 +713,31 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error {
} }
} }
// A new block has been connected to the main chain.
// Send out any N confirmation notifications which may
// have been triggered by this new block.
b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns)
return nil return nil
} }
// notifyBlockEpochs notifies all registered block epoch clients of the newly // notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain. // connected block to the main chain.
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
for _, client := range b.blockEpochClients {
b.notifyBlockEpochClient(client, newHeight, newSha)
}
}
// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (b *BtcdNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
epoch := &chainntnfs.BlockEpoch{ epoch := &chainntnfs.BlockEpoch{
Height: newHeight, Height: height,
Hash: newSha, Hash: sha,
} }
for _, epochClient := range b.blockEpochClients { select {
select { case epochClient.epochQueue.ChanIn() <- epoch:
case <-epochClient.cancelChan:
case epochClient.epochQueue.ChanIn() <- epoch: case <-b.quit:
case <-epochClient.cancelChan:
case <-b.quit:
}
} }
} }
@ -857,6 +931,10 @@ type blockEpochRegistration struct {
epochQueue *chainntnfs.ConcurrentQueue epochQueue *chainntnfs.ConcurrentQueue
bestBlock *chainntnfs.BlockEpoch
errorChan chan error
cancelChan chan struct{} cancelChan chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -870,13 +948,18 @@ type epochCancel struct {
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
// caller to receive notifications, of each new block connected to the main // caller to receive notifications, of each new block connected to the main
// chain. // chain. Clients have the option of passing in their best known block, which
func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { // the notifier uses to check if they are behind on blocks and catch them up.
func (b *BtcdNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
reg := &blockEpochRegistration{ reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20), epochQueue: chainntnfs.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20), epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}), cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1), epochID: atomic.AddUint64(&b.epochClientCounter, 1),
bestBlock: bestBlock,
errorChan: make(chan error, 1),
} }
reg.epochQueue.Start() reg.epochQueue.Start()

@ -0,0 +1,77 @@
package btcdnotify
import (
"fmt"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/chainntnfs"
)
// UnsafeStart starts the notifier with a specified best height and optional
// best hash. Its bestBlock and txConfNotifier are initialized with
// bestHeight and optionally bestHash. The parameter generateBlocks is
// necessary for the bitcoind notifier to ensure we drain all notifications up
// to syncHeight, since if they are generated ahead of UnsafeStart the chainConn
// may start up with an outdated best block and miss sending ntfns. Used for
// testing.
func (b *BtcdNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash,
syncHeight int32, generateBlocks func() error) error {
// Connect to btcd, and register for notifications on connected, and
// disconnected blocks.
if err := b.chainConn.Connect(20); err != nil {
return err
}
if err := b.chainConn.NotifyBlocks(); err != nil {
return err
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(bestHeight), reorgSafetyLimit)
b.chainUpdates.Start()
b.txUpdates.Start()
if generateBlocks != nil {
// Ensure no block notifications are pending when we start the
// notification dispatcher goroutine.
// First generate the blocks, then drain the notifications
// for the generated blocks.
if err := generateBlocks(); err != nil {
return err
}
timeout := time.After(60 * time.Second)
loop:
for {
select {
case ntfn := <-b.chainUpdates.ChanOut():
lastReceivedNtfn := ntfn.(*chainUpdate)
if lastReceivedNtfn.blockHeight >= syncHeight {
break loop
}
case <-timeout:
return fmt.Errorf("unable to catch up to height %d",
syncHeight)
}
}
}
// Run notificationDispatcher after setting the notifier's best block
// to avoid a race condition.
b.bestBlock = chainntnfs.BlockEpoch{Height: bestHeight, Hash: bestHash}
if bestHash == nil {
hash, err := b.chainConn.GetBlockHash(int64(bestHeight))
if err != nil {
return err
}
b.bestBlock.Hash = hash
}
b.wg.Add(1)
go b.notificationDispatcher()
return nil
}

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
) )
@ -59,7 +60,12 @@ type ChainNotifier interface {
// new block connected to the tip of the main chain. The returned // new block connected to the tip of the main chain. The returned
// BlockEpochEvent struct contains a channel which will be sent upon // BlockEpochEvent struct contains a channel which will be sent upon
// for each new block discovered. // for each new block discovered.
RegisterBlockEpochNtfn() (*BlockEpochEvent, error) //
// Clients have the option of passing in their best known block.
// If they specify a block, the ChainNotifier checks whether the client
// is behind on blocks. If they are, the ChainNotifier sends a backlog
// of block notifications for the missed blocks.
RegisterBlockEpochNtfn(*BlockEpoch) (*BlockEpochEvent, error)
// Start the ChainNotifier. Once started, the implementation should be // Start the ChainNotifier. Once started, the implementation should be
// ready, and able to receive notification registrations from clients. // ready, and able to receive notification registrations from clients.
@ -248,3 +254,206 @@ func SupportedNotifiers() []string {
return supportedNotifiers return supportedNotifiers
} }
// ChainConn enables notifiers to pass in their chain backend to interface
// functions that require it.
type ChainConn interface {
// GetBlockHeader returns the block header for a hash.
GetBlockHeader(blockHash *chainhash.Hash) (*wire.BlockHeader, error)
// GetBlockHeaderVerbose returns the verbose block header for a hash.
GetBlockHeaderVerbose(blockHash *chainhash.Hash) (
*btcjson.GetBlockHeaderVerboseResult, error)
// GetBlockHash returns the hash from a block height.
GetBlockHash(blockHeight int64) (*chainhash.Hash, error)
}
// GetCommonBlockAncestorHeight takes in:
// (1) the hash of a block that has been reorged out of the main chain
// (2) the hash of the block of the same height from the main chain
// It returns the height of the nearest common ancestor between the two hashes,
// or an error
func GetCommonBlockAncestorHeight(chainConn ChainConn, reorgHash,
chainHash chainhash.Hash) (int32, error) {
for reorgHash != chainHash {
reorgHeader, err := chainConn.GetBlockHeader(&reorgHash)
if err != nil {
return 0, fmt.Errorf("unable to get header for hash=%v: %v",
reorgHash, err)
}
chainHeader, err := chainConn.GetBlockHeader(&chainHash)
if err != nil {
return 0, fmt.Errorf("unable to get header for hash=%v: %v",
chainHash, err)
}
reorgHash = reorgHeader.PrevBlock
chainHash = chainHeader.PrevBlock
}
verboseHeader, err := chainConn.GetBlockHeaderVerbose(&chainHash)
if err != nil {
return 0, fmt.Errorf("unable to get verbose header for hash=%v: %v",
chainHash, err)
}
return verboseHeader.Height, nil
}
// GetClientMissedBlocks uses a client's best block to determine what blocks
// it missed being notified about, and returns them in a slice. Its
// backendStoresReorgs parameter tells it whether or not the notifier's
// chainConn stores information about blocks that have been reorged out of the
// chain, which allows GetClientMissedBlocks to find out whether the client's
// best block has been reorged out of the chain, rewind to the common ancestor
// and return blocks starting right after the common ancestor.
func GetClientMissedBlocks(chainConn ChainConn, clientBestBlock *BlockEpoch,
notifierBestHeight int32, backendStoresReorgs bool) ([]BlockEpoch, error) {
startingHeight := clientBestBlock.Height
if backendStoresReorgs {
// If a reorg causes the client's best hash to be incorrect,
// retrieve the closest common ancestor and dispatch
// notifications from there.
hashAtBestHeight, err := chainConn.GetBlockHash(
int64(clientBestBlock.Height))
if err != nil {
return nil, fmt.Errorf("unable to find blockhash for "+
"height=%d: %v", clientBestBlock.Height, err)
}
startingHeight, err = GetCommonBlockAncestorHeight(
chainConn, *clientBestBlock.Hash, *hashAtBestHeight,
)
if err != nil {
return nil, fmt.Errorf("unable to find common ancestor: "+
"%v", err)
}
}
// We want to start dispatching historical notifications from the block
// right after the client's best block, to avoid a redundant notification.
missedBlocks, err := getMissedBlocks(
chainConn, startingHeight+1, notifierBestHeight+1,
)
if err != nil {
return nil, fmt.Errorf("unable to get missed blocks: %v", err)
}
return missedBlocks, nil
}
// RewindChain handles internal state updates for the notifier's TxConfNotifier
// It has no effect if given a height greater than or equal to our current best
// known height. It returns the new best block for the notifier.
func RewindChain(chainConn ChainConn, txConfNotifier *TxConfNotifier,
currBestBlock BlockEpoch, targetHeight int32) (BlockEpoch, error) {
newBestBlock := BlockEpoch{
Height: currBestBlock.Height,
Hash: currBestBlock.Hash,
}
for height := currBestBlock.Height; height > targetHeight; height-- {
hash, err := chainConn.GetBlockHash(int64(height - 1))
if err != nil {
return newBestBlock, fmt.Errorf("unable to "+
"find blockhash for disconnected height=%d: %v",
height, err)
}
Log.Infof("Block disconnected from main chain: "+
"height=%v, sha=%v", height, newBestBlock.Hash)
err = txConfNotifier.DisconnectTip(uint32(height))
if err != nil {
return newBestBlock, fmt.Errorf("unable to "+
" disconnect tip for height=%d: %v",
height, err)
}
newBestBlock.Height = height - 1
newBestBlock.Hash = hash
}
return newBestBlock, nil
}
// HandleMissedBlocks is called when the chain backend for a notifier misses a
// series of blocks, handling a reorg if necessary. Its backendStoresReorgs
// parameter tells it whether or not the notifier's chainConn stores
// information about blocks that have been reorged out of the chain, which allows
// HandleMissedBlocks to check whether the notifier's best block has been
// reorged out, and rewind the chain accordingly. It returns the best block for
// the notifier and a slice of the missed blocks. The new best block needs to be
// returned in case a chain rewind occurs and partially completes before
// erroring. In the case where there is no rewind, the notifier's
// current best block is returned.
func HandleMissedBlocks(chainConn ChainConn, txConfNotifier *TxConfNotifier,
currBestBlock BlockEpoch, newHeight int32,
backendStoresReorgs bool) (BlockEpoch, []BlockEpoch, error) {
startingHeight := currBestBlock.Height
if backendStoresReorgs {
// If a reorg causes our best hash to be incorrect, rewind the
// chain so our best block is set to the closest common
// ancestor, then dispatch notifications from there.
hashAtBestHeight, err :=
chainConn.GetBlockHash(int64(currBestBlock.Height))
if err != nil {
return currBestBlock, nil, fmt.Errorf("unable to find "+
"blockhash for height=%d: %v",
currBestBlock.Height, err)
}
startingHeight, err = GetCommonBlockAncestorHeight(
chainConn, *currBestBlock.Hash, *hashAtBestHeight,
)
if err != nil {
return currBestBlock, nil, fmt.Errorf("unable to find "+
"common ancestor: %v", err)
}
currBestBlock, err = RewindChain(chainConn, txConfNotifier,
currBestBlock, startingHeight)
if err != nil {
return currBestBlock, nil, fmt.Errorf("unable to "+
"rewind chain: %v", err)
}
}
// We want to start dispatching historical notifications from the block
// right after our best block, to avoid a redundant notification.
missedBlocks, err := getMissedBlocks(chainConn, startingHeight+1, newHeight)
if err != nil {
return currBestBlock, nil, fmt.Errorf("unable to get missed "+
"blocks: %v", err)
}
return currBestBlock, missedBlocks, nil
}
// getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight)
// fetched from the chain.
func getMissedBlocks(chainConn ChainConn, startingHeight,
endingHeight int32) ([]BlockEpoch, error) {
numMissedBlocks := endingHeight - startingHeight
if numMissedBlocks < 0 {
return nil, fmt.Errorf("starting height %d is greater than "+
"ending height %d", startingHeight, endingHeight)
}
missedBlocks := make([]BlockEpoch, 0, numMissedBlocks)
for height := startingHeight; height < endingHeight; height++ {
hash, err := chainConn.GetBlockHash(int64(height))
if err != nil {
return nil, fmt.Errorf("unable to find blockhash for "+
"height=%d: %v", height, err)
}
missedBlocks = append(missedBlocks,
BlockEpoch{Hash: hash, Height: height})
}
return missedBlocks, nil
}

@ -0,0 +1,13 @@
package chainntnfs
import "github.com/btcsuite/btcd/chaincfg/chainhash"
// TestChainNotifier enables the use of methods that are only present during
// testing for ChainNotifiers.
type TestChainNotifier interface {
ChainNotifier
// UnsafeStart enables notifiers to start up with a specific best block.
// Used for testing.
UnsafeStart(int32, *chainhash.Hash, int32, func() error) error
}

@ -29,15 +29,15 @@ import (
// Required to auto-register the bitcoind backed ChainNotifier // Required to auto-register the bitcoind backed ChainNotifier
// implementation. // implementation.
_ "github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify" "github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify"
// Required to auto-register the btcd backed ChainNotifier // Required to auto-register the btcd backed ChainNotifier
// implementation. // implementation.
_ "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify" "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
// Required to auto-register the neutrino backed ChainNotifier // Required to auto-register the neutrino backed ChainNotifier
// implementation. // implementation.
_ "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify" "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify"
// Required to register the boltdb walletdb implementation. // Required to register the boltdb walletdb implementation.
_ "github.com/btcsuite/btcwallet/walletdb/bdb" _ "github.com/btcsuite/btcwallet/walletdb/bdb"
@ -113,7 +113,7 @@ func waitForMempoolTx(r *rpctest.Harness, txid *chainhash.Hash) error {
} }
func testSingleConfirmationNotification(miner *rpctest.Harness, func testSingleConfirmationNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// We'd like to test the case of being notified once a txid reaches // We'd like to test the case of being notified once a txid reaches
// a *single* confirmation. // a *single* confirmation.
@ -184,7 +184,7 @@ func testSingleConfirmationNotification(miner *rpctest.Harness,
} }
func testMultiConfirmationNotification(miner *rpctest.Harness, func testMultiConfirmationNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// We'd like to test the case of being notified once a txid reaches // We'd like to test the case of being notified once a txid reaches
// N confirmations, where N > 1. // N confirmations, where N > 1.
@ -232,7 +232,7 @@ func testMultiConfirmationNotification(miner *rpctest.Harness,
} }
func testBatchConfirmationNotification(miner *rpctest.Harness, func testBatchConfirmationNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// We'd like to test a case of serving notifications to multiple // We'd like to test a case of serving notifications to multiple
// clients, each requesting to be notified once a txid receives // clients, each requesting to be notified once a txid receives
@ -400,7 +400,7 @@ func checkNotificationFields(ntfn *chainntnfs.SpendDetail,
} }
func testSpendNotification(miner *rpctest.Harness, func testSpendNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// We'd like to test the spend notifications for all ChainNotifier // We'd like to test the spend notifications for all ChainNotifier
// concrete implementations. // concrete implementations.
@ -512,7 +512,7 @@ func testSpendNotification(miner *rpctest.Harness,
} }
func testBlockEpochNotification(miner *rpctest.Harness, func testBlockEpochNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// We'd like to test the case of multiple registered clients receiving // We'd like to test the case of multiple registered clients receiving
// block epoch notifications. // block epoch notifications.
@ -526,7 +526,7 @@ func testBlockEpochNotification(miner *rpctest.Harness,
// blocks we generate below. So we'll use a WaitGroup to synchronize the // blocks we generate below. So we'll use a WaitGroup to synchronize the
// test. // test.
for i := 0; i < numClients; i++ { for i := 0; i < numClients; i++ {
epochClient, err := notifier.RegisterBlockEpochNtfn() epochClient, err := notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
t.Fatalf("unable to register for epoch notification") t.Fatalf("unable to register for epoch notification")
} }
@ -560,7 +560,7 @@ func testBlockEpochNotification(miner *rpctest.Harness,
} }
func testMultiClientConfirmationNotification(miner *rpctest.Harness, func testMultiClientConfirmationNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// We'd like to test the case of a multiple clients registered to // We'd like to test the case of a multiple clients registered to
// receive a confirmation notification for the same transaction. // receive a confirmation notification for the same transaction.
@ -626,7 +626,7 @@ func testMultiClientConfirmationNotification(miner *rpctest.Harness,
// transaction that has already been included in a block. In this case, the // transaction that has already been included in a block. In this case, the
// confirmation notification should be dispatched immediately. // confirmation notification should be dispatched immediately.
func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// First, let's send some coins to "ourself", obtaining a txid. We're // First, let's send some coins to "ourself", obtaining a txid. We're
// spending from a coinbase output here, so we use the dedicated // spending from a coinbase output here, so we use the dedicated
@ -786,7 +786,7 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness,
// checking for a confirmation. This should not cause the notifier to stop // checking for a confirmation. This should not cause the notifier to stop
// working // working
func testLazyNtfnConsumer(miner *rpctest.Harness, func testLazyNtfnConsumer(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// Create a transaction to be notified about. We'll register for // Create a transaction to be notified about. We'll register for
// notifications on this transaction but won't be prompt in checking them // notifications on this transaction but won't be prompt in checking them
@ -877,7 +877,7 @@ func testLazyNtfnConsumer(miner *rpctest.Harness,
// has already been included in a block. In this case, the spend notification // has already been included in a block. In this case, the spend notification
// should be dispatched immediately. // should be dispatched immediately.
func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// We'd like to test the spend notifications for all ChainNotifier // We'd like to test the spend notifications for all ChainNotifier
// concrete implementations. // concrete implementations.
@ -898,7 +898,7 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
// We create an epoch client we can use to make sure the notifier is // We create an epoch client we can use to make sure the notifier is
// caught up to the mining node's chain. // caught up to the mining node's chain.
epochClient, err := notifier.RegisterBlockEpochNtfn() epochClient, err := notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
t.Fatalf("unable to register for block epoch: %v", err) t.Fatalf("unable to register for block epoch: %v", err)
} }
@ -981,7 +981,7 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
} }
func testCancelSpendNtfn(node *rpctest.Harness, func testCancelSpendNtfn(node *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) { notifier chainntnfs.TestChainNotifier, t *testing.T) {
// We'd like to test that once a spend notification is registered, it // We'd like to test that once a spend notification is registered, it
// can be cancelled before the notification is dispatched. // can be cancelled before the notification is dispatched.
@ -1072,7 +1072,7 @@ func testCancelSpendNtfn(node *rpctest.Harness,
} }
} }
func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier,
t *testing.T) { t *testing.T) {
// We'd like to ensure that once a client cancels their block epoch // We'd like to ensure that once a client cancels their block epoch
@ -1082,7 +1082,7 @@ func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifie
epochClients := make([]*chainntnfs.BlockEpochEvent, numClients) epochClients := make([]*chainntnfs.BlockEpochEvent, numClients)
for i := 0; i < numClients; i++ { for i := 0; i < numClients; i++ {
epochClient, err := notifier.RegisterBlockEpochNtfn() epochClient, err := notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
t.Fatalf("unable to register for epoch notification") t.Fatalf("unable to register for epoch notification")
} }
@ -1122,7 +1122,7 @@ func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifie
} }
} }
func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.TestChainNotifier,
t *testing.T) { t *testing.T) {
// Set up a new miner that we can use to cause a reorg. // Set up a new miner that we can use to cause a reorg.
@ -1274,10 +1274,369 @@ func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier,
} }
} }
// testCatchUpClientOnMissedBlocks tests the case of multiple registered client
// receiving historical block epoch notifications due to their best known block
// being out of date.
func testCatchUpClientOnMissedBlocks(miner *rpctest.Harness,
notifier chainntnfs.TestChainNotifier, t *testing.T) {
const numBlocks = 10
const numClients = 5
var wg sync.WaitGroup
outdatedHash, outdatedHeight, err := miner.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to retrieve current height: %v", err)
}
// This function is used by UnsafeStart to ensure all notifications
// are fully drained before clients register for notifications.
generateBlocks := func() error {
_, err = miner.Node.Generate(numBlocks)
return err
}
// We want to ensure that when a client registers for block notifications,
// the notifier's best block is at the tip of the chain. If it isn't, the
// client may not receive all historical notifications.
bestHeight := outdatedHeight + numBlocks
if err := notifier.UnsafeStart(
bestHeight, nil, bestHeight, generateBlocks); err != nil {
t.Fatalf("Unable to unsafe start the notifier: %v", err)
}
// Create numClients clients whose best known block is 10 blocks behind
// the tip of the chain. We expect each client to receive numBlocks
// notifications, 1 for each block they're behind.
clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients)
outdatedBlock := &chainntnfs.BlockEpoch{
Height: outdatedHeight, Hash: outdatedHash,
}
for i := 0; i < numClients; i++ {
epochClient, err := notifier.RegisterBlockEpochNtfn(outdatedBlock)
if err != nil {
t.Fatalf("unable to register for epoch notification: %v", err)
}
clients = append(clients, epochClient)
}
for expectedHeight := outdatedHeight + 1; expectedHeight <=
bestHeight; expectedHeight++ {
for _, epochClient := range clients {
select {
case block := <-epochClient.Epochs:
if block.Height != expectedHeight {
t.Fatalf("received block of height: %d, "+
"expected: %d", block.Height,
expectedHeight)
}
case <-time.After(20 * time.Second):
t.Fatalf("did not receive historical notification "+
"for height %d", expectedHeight)
}
}
}
// Finally, ensure that an extra block notification wasn't received.
anyExtras := make(chan struct{}, len(clients))
for _, epochClient := range clients {
wg.Add(1)
go func(epochClient *chainntnfs.BlockEpochEvent) {
defer wg.Done()
select {
case <-epochClient.Epochs:
anyExtras <- struct{}{}
case <-time.After(5 * time.Second):
}
}(epochClient)
}
wg.Wait()
close(anyExtras)
var extraCount int
for range anyExtras {
extraCount++
}
if extraCount > 0 {
t.Fatalf("received %d unexpected block notification", extraCount)
}
}
// testCatchUpOnMissedBlocks the case of multiple registered clients receiving
// historical block epoch notifications due to the notifier's best known block
// being out of date.
func testCatchUpOnMissedBlocks(miner *rpctest.Harness,
notifier chainntnfs.TestChainNotifier, t *testing.T) {
const numBlocks = 10
const numClients = 5
var wg sync.WaitGroup
_, bestHeight, err := miner.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current blockheight %v", err)
}
// This function is used by UnsafeStart to ensure all notifications
// are fully drained before clients register for notifications.
generateBlocks := func() error {
_, err = miner.Node.Generate(numBlocks)
return err
}
// Next, start the notifier with outdated best block information.
if err := notifier.UnsafeStart(bestHeight,
nil, bestHeight+numBlocks, generateBlocks); err != nil {
t.Fatalf("Unable to unsafe start the notifier: %v", err)
}
// Create numClients clients who will listen for block notifications.
clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients)
for i := 0; i < numClients; i++ {
epochClient, err := notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
t.Fatalf("unable to register for epoch notification: %v", err)
}
clients = append(clients, epochClient)
}
// Generate a single block to trigger the backlog of historical
// notifications for the previously mined blocks.
if _, err := miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate blocks: %v", err)
}
// We expect each client to receive numBlocks + 1 notifications, 1 for
// each block that the notifier has missed out on.
for expectedHeight := bestHeight + 1; expectedHeight <=
bestHeight+numBlocks+1; expectedHeight++ {
for _, epochClient := range clients {
select {
case block := <-epochClient.Epochs:
if block.Height != expectedHeight {
t.Fatalf("received block of height: %d, "+
"expected: %d", block.Height,
expectedHeight)
}
case <-time.After(20 * time.Second):
t.Fatalf("did not receive historical notification "+
"for height %d", expectedHeight)
}
}
}
// Finally, ensure that an extra block notification wasn't received.
anyExtras := make(chan struct{}, len(clients))
for _, epochClient := range clients {
wg.Add(1)
go func(epochClient *chainntnfs.BlockEpochEvent) {
defer wg.Done()
select {
case <-epochClient.Epochs:
anyExtras <- struct{}{}
case <-time.After(5 * time.Second):
}
}(epochClient)
}
wg.Wait()
close(anyExtras)
var extraCount int
for range anyExtras {
extraCount++
}
if extraCount > 0 {
t.Fatalf("received %d unexpected block notification", extraCount)
}
}
// testCatchUpOnMissedBlocks tests that a client will still receive all valid
// block notifications in the case where a notifier's best block has been reorged
// out of the chain.
func testCatchUpOnMissedBlocksWithReorg(miner1 *rpctest.Harness,
notifier chainntnfs.TestChainNotifier, t *testing.T) {
const numBlocks = 10
const numClients = 5
var wg sync.WaitGroup
// Set up a new miner that we can use to cause a reorg.
miner2, err := rpctest.New(netParams, nil, nil)
if err != nil {
t.Fatalf("unable to create mining node: %v", err)
}
if err := miner2.SetUp(false, 0); err != nil {
t.Fatalf("unable to set up mining node: %v", err)
}
defer miner2.TearDown()
// We start by connecting the new miner to our original miner,
// such that it will sync to our original chain.
if err := rpctest.ConnectNode(miner1, miner2); err != nil {
t.Fatalf("unable to connect harnesses: %v", err)
}
nodeSlice := []*rpctest.Harness{miner1, miner2}
if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil {
t.Fatalf("unable to join node on blocks: %v", err)
}
// The two should be on the same blockheight.
_, nodeHeight1, err := miner1.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current blockheight %v", err)
}
_, nodeHeight2, err := miner2.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current blockheight %v", err)
}
if nodeHeight1 != nodeHeight2 {
t.Fatalf("expected both miners to be on the same height: %v vs %v",
nodeHeight1, nodeHeight2)
}
// We disconnect the two nodes, such that we can start mining on them
// individually without the other one learning about the new blocks.
err = miner1.Node.AddNode(miner2.P2PAddress(), rpcclient.ANRemove)
if err != nil {
t.Fatalf("unable to remove node: %v", err)
}
// Now mine on each chain separately
blocks, err := miner1.Node.Generate(numBlocks)
if err != nil {
t.Fatalf("unable to generate single block: %v", err)
}
// We generate an extra block on miner 2's chain to ensure it is the
// longer chain.
_, err = miner2.Node.Generate(numBlocks + 1)
if err != nil {
t.Fatalf("unable to generate single block: %v", err)
}
// Sync the two chains to ensure they will sync to miner2's chain.
if err := rpctest.ConnectNode(miner1, miner2); err != nil {
t.Fatalf("unable to connect harnesses: %v", err)
}
nodeSlice = []*rpctest.Harness{miner1, miner2}
if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil {
t.Fatalf("unable to join node on blocks: %v", err)
}
// Next, start the notifier with outdated best block information.
// We set the notifier's best block to be the last block mined on the
// shorter chain, to test that the notifier correctly rewinds to
// the common ancestor between the two chains.
syncHeight := nodeHeight1 + numBlocks + 1
if err := notifier.UnsafeStart(nodeHeight1+numBlocks,
blocks[numBlocks-1], syncHeight, nil); err != nil {
t.Fatalf("Unable to unsafe start the notifier: %v", err)
}
// Create numClients clients who will listen for block notifications.
clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients)
for i := 0; i < numClients; i++ {
epochClient, err := notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
t.Fatalf("unable to register for epoch notification: %v", err)
}
clients = append(clients, epochClient)
}
// Generate a single block, which should trigger the notifier to rewind
// to the common ancestor and dispatch notifications from there.
_, err = miner2.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate single block: %v", err)
}
// If the chain backend to the notifier stores information about reorged
// blocks, the notifier is able to rewind the chain to the common
// ancestor between the chain tip and its outdated best known block.
// In this case, the client is expected to receive numBlocks + 2
// notifications, 1 for each block the notifier has missed out on from
// the longer chain.
//
// If the chain backend does not store information about reorged blocks,
// the notifier has no way of knowing where to rewind to and therefore
// the client is only expected to receive notifications for blocks
// whose height is greater than the notifier's best known height: 2
// notifications, in this case.
var startingHeight int32
switch notifier.(type) {
case *neutrinonotify.NeutrinoNotifier:
startingHeight = nodeHeight1 + numBlocks + 1
default:
startingHeight = nodeHeight1 + 1
}
for expectedHeight := startingHeight; expectedHeight <=
nodeHeight1+numBlocks+2; expectedHeight++ {
for _, epochClient := range clients {
select {
case block := <-epochClient.Epochs:
if block.Height != expectedHeight {
t.Fatalf("received block of height: %d, "+
"expected: %d", block.Height,
expectedHeight)
}
case <-time.After(20 * time.Second):
t.Fatalf("did not receive historical notification "+
"for height %d", expectedHeight)
}
}
}
// Finally, ensure that an extra block notification wasn't received.
anyExtras := make(chan struct{}, len(clients))
for _, epochClient := range clients {
wg.Add(1)
go func(epochClient *chainntnfs.BlockEpochEvent) {
defer wg.Done()
select {
case <-epochClient.Epochs:
anyExtras <- struct{}{}
case <-time.After(5 * time.Second):
}
}(epochClient)
}
wg.Wait()
close(anyExtras)
var extraCount int
for range anyExtras {
extraCount++
}
if extraCount > 0 {
t.Fatalf("received %d unexpected block notification", extraCount)
}
}
type testCase struct { type testCase struct {
name string name string
test func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T) test func(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier, t *testing.T)
}
type blockCatchupTestCase struct {
name string
test func(node *rpctest.Harness, notifier chainntnfs.TestChainNotifier,
t *testing.T)
} }
var ntfnTests = []testCase{ var ntfnTests = []testCase{
@ -1331,6 +1690,21 @@ var ntfnTests = []testCase{
}, },
} }
var blockCatchupTests = []blockCatchupTestCase{
{
name: "catch up client on historical block epoch ntfns",
test: testCatchUpClientOnMissedBlocks,
},
{
name: "test catch up on missed blocks",
test: testCatchUpOnMissedBlocks,
},
{
name: "test catch up on missed blocks w/ reorged best block",
test: testCatchUpOnMissedBlocksWithReorg,
},
}
// TestInterfaces tests all registered interfaces with a unified set of tests // TestInterfaces tests all registered interfaces with a unified set of tests
// which exercise each of the required methods found within the ChainNotifier // which exercise each of the required methods found within the ChainNotifier
// interface. // interface.
@ -1361,8 +1735,10 @@ func TestInterfaces(t *testing.T) {
log.Printf("Running %v ChainNotifier interface tests\n", len(ntfnTests)) log.Printf("Running %v ChainNotifier interface tests\n", len(ntfnTests))
var ( var (
notifier chainntnfs.ChainNotifier notifier chainntnfs.TestChainNotifier
cleanUp func() cleanUp func()
newNotifier func() (chainntnfs.TestChainNotifier, error)
) )
for _, notifierDriver := range chainntnfs.RegisteredNotifiers() { for _, notifierDriver := range chainntnfs.RegisteredNotifiers() {
notifierType := notifierDriver.NotifierType notifierType := notifierDriver.NotifierType
@ -1430,18 +1806,15 @@ func TestInterfaces(t *testing.T) {
} }
cleanUp = cleanUp3 cleanUp = cleanUp3
notifier, err = notifierDriver.New(chainConn) newNotifier = func() (chainntnfs.TestChainNotifier, error) {
if err != nil { return bitcoindnotify.New(chainConn), nil
t.Fatalf("unable to create %v notifier: %v",
notifierType, err)
} }
case "btcd": case "btcd":
notifier, err = notifierDriver.New(&rpcConfig) newNotifier = func() (chainntnfs.TestChainNotifier, error) {
if err != nil { return btcdnotify.New(&rpcConfig)
t.Fatalf("unable to create %v notifier: %v",
notifierType, err)
} }
cleanUp = func() {} cleanUp = func() {}
case "neutrino": case "neutrino":
@ -1481,16 +1854,18 @@ func TestInterfaces(t *testing.T) {
for !spvNode.IsCurrent() { for !spvNode.IsCurrent() {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
} }
newNotifier = func() (chainntnfs.TestChainNotifier, error) {
notifier, err = notifierDriver.New(spvNode) return neutrinonotify.New(spvNode)
if err != nil {
t.Fatalf("unable to create %v notifier: %v",
notifierType, err)
} }
} }
t.Logf("Running ChainNotifier interface tests for: %v", notifierType) t.Logf("Running ChainNotifier interface tests for: %v", notifierType)
notifier, err = newNotifier()
if err != nil {
t.Fatalf("unable to create %v notifier: %v", notifierType, err)
}
if err := notifier.Start(); err != nil { if err := notifier.Start(); err != nil {
t.Fatalf("unable to start notifier %v: %v", t.Fatalf("unable to start notifier %v: %v",
notifierType, err) notifierType, err)
@ -1510,6 +1885,30 @@ func TestInterfaces(t *testing.T) {
} }
notifier.Stop() notifier.Stop()
// Run catchup tests separately since they require
// restarting the notifier every time.
for _, blockCatchupTest := range blockCatchupTests {
notifier, err = newNotifier()
if err != nil {
t.Fatalf("unable to create %v notifier: %v",
notifierType, err)
}
testName := fmt.Sprintf("%v: %v", notifierType,
blockCatchupTest.name)
success := t.Run(testName, func(t *testing.T) {
blockCatchupTest.test(miner, notifier, t)
})
notifier.Stop()
if !success {
break
}
}
if cleanUp != nil { if cleanUp != nil {
cleanUp() cleanUp()
} }

@ -8,6 +8,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/txscript"
@ -20,7 +21,6 @@ import (
) )
const ( const (
// notifierType uniquely identifies this concrete implementation of the // notifierType uniquely identifies this concrete implementation of the
// ChainNotifier interface. // ChainNotifier interface.
notifierType = "neutrino" notifierType = "neutrino"
@ -62,6 +62,8 @@ type NeutrinoNotifier struct {
p2pNode *neutrino.ChainService p2pNode *neutrino.ChainService
chainView *neutrino.Rescan chainView *neutrino.Rescan
chainConn *NeutrinoChainConn
notificationCancels chan interface{} notificationCancels chan interface{}
notificationRegistry chan interface{} notificationRegistry chan interface{}
@ -151,6 +153,8 @@ func (n *NeutrinoNotifier) Start() error {
bestHeight, reorgSafetyLimit, bestHeight, reorgSafetyLimit,
) )
n.chainConn = &NeutrinoChainConn{n.p2pNode}
// Finally, we'll create our rescan struct, start it, and launch all // Finally, we'll create our rescan struct, start it, and launch all
// the goroutines we need to operate this ChainNotifier instance. // the goroutines we need to operate this ChainNotifier instance.
n.chainView = n.p2pNode.NewRescan(rescanOptions...) n.chainView = n.p2pNode.NewRescan(rescanOptions...)
@ -241,7 +245,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
// notification registrations, as well as notification dispatches. // notification registrations, as well as notification dispatches.
func (n *NeutrinoNotifier) notificationDispatcher() { func (n *NeutrinoNotifier) notificationDispatcher() {
defer n.wg.Done() defer n.wg.Done()
out:
for { for {
select { select {
case cancelMsg := <-n.notificationCancels: case cancelMsg := <-n.notificationCancels:
@ -361,58 +365,131 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
"to update rescan: %v", "to update rescan: %v",
err) err)
} }
}() }()
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
n.blockEpochClients[msg.epochID] = msg n.blockEpochClients[msg.epochID] = msg
if msg.bestBlock != nil {
n.heightMtx.Lock()
bestHeight := int32(n.bestHeight)
n.heightMtx.Unlock()
missedBlocks, err :=
chainntnfs.GetClientMissedBlocks(
n.chainConn, msg.bestBlock,
bestHeight, false,
)
if err != nil {
msg.errorChan <- err
continue
}
for _, block := range missedBlocks {
n.notifyBlockEpochClient(msg,
block.Height, block.Hash)
}
}
msg.errorChan <- nil
} }
case item := <-n.chainUpdates.ChanOut(): case item := <-n.chainUpdates.ChanOut():
update := item.(*filteredBlock) update := item.(*filteredBlock)
if update.connect { if update.connect {
n.heightMtx.Lock() n.heightMtx.Lock()
// Since neutrino has no way of knowing what
// height to rewind to in the case of a reorged
// best known height, there is no point in
// checking that the previous hash matches the
// the hash from our best known height the way
// the other notifiers do when they receive
// a new connected block. Therefore, we just
// compare the heights.
if update.height != n.bestHeight+1 { if update.height != n.bestHeight+1 {
chainntnfs.Log.Warnf("Received blocks out of order: "+ // Handle the case where the notifier
"current height=%d, new height=%d", // missed some blocks from its chain
n.bestHeight, update.height) // backend
n.heightMtx.Unlock() chainntnfs.Log.Infof("Missed blocks, " +
continue "attempting to catch up")
bestBlock := chainntnfs.BlockEpoch{
Height: int32(n.bestHeight),
Hash: nil,
}
_, missedBlocks, err :=
chainntnfs.HandleMissedBlocks(
n.chainConn,
n.txConfNotifier,
bestBlock,
int32(update.height),
false,
)
if err != nil {
chainntnfs.Log.Error(err)
n.heightMtx.Unlock()
continue
}
for _, block := range missedBlocks {
filteredBlock, err :=
n.getFilteredBlock(block)
if err != nil {
chainntnfs.Log.Error(err)
n.heightMtx.Unlock()
continue out
}
err = n.handleBlockConnected(filteredBlock)
if err != nil {
chainntnfs.Log.Error(err)
n.heightMtx.Unlock()
continue out
}
}
} }
n.bestHeight = update.height
n.heightMtx.Unlock()
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
update.height, update.hash)
err := n.handleBlockConnected(update) err := n.handleBlockConnected(update)
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
continue
}
n.heightMtx.Lock()
if update.height != n.bestHeight {
chainntnfs.Log.Warnf("Received blocks out of order: "+
"current height=%d, disconnected height=%d",
n.bestHeight, update.height)
n.heightMtx.Unlock() n.heightMtx.Unlock()
continue continue
} }
n.bestHeight = update.height - 1 n.heightMtx.Lock()
n.heightMtx.Unlock() if update.height != uint32(n.bestHeight) {
chainntnfs.Log.Infof("Missed disconnected" +
chainntnfs.Log.Infof("Block disconnected from main chain: "+ "blocks, attempting to catch up")
"height=%v, sha=%v", update.height, update.hash)
err := n.txConfNotifier.DisconnectTip(update.height)
if err != nil {
chainntnfs.Log.Error(err)
} }
header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(
n.bestHeight,
)
if err != nil {
chainntnfs.Log.Errorf("Unable to fetch header"+
"for height %d: %v", n.bestHeight, err)
n.heightMtx.Unlock()
continue
}
hash := header.BlockHash()
notifierBestBlock := chainntnfs.BlockEpoch{
Height: int32(n.bestHeight),
Hash: &hash,
}
newBestBlock, err := chainntnfs.RewindChain(
n.chainConn, n.txConfNotifier, notifierBestBlock,
int32(update.height-1),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to rewind chain "+
"from height %d to height %d: %v",
n.bestHeight, update.height-1, err)
}
// Set the bestHeight here in case a chain rewind
// partially completed.
n.bestHeight = uint32(newBestBlock.Height)
n.heightMtx.Unlock()
case err := <-n.rescanErr: case err := <-n.rescanErr:
chainntnfs.Log.Errorf("Error during rescan: %v", err) chainntnfs.Log.Errorf("Error during rescan: %v", err)
@ -504,15 +581,29 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash,
return nil, nil return nil, nil
} }
// handleBlocksConnected applies a chain update for a new block. Any watched // handleBlockConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications // transactions included this block will processed to either send notifications
// now or after numConfirmations confs. // now or after numConfirmations confs.
func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// First we'll notify any subscribed clients of the block. // First process the block for our internal state. A new block has
// been connected to the main chain. Send out any N confirmation
// notifications which may have been triggered by this new block.
err := n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height,
newBlock.txns)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
newBlock.height, newBlock.hash)
n.bestHeight = newBlock.height
// Next, notify any subscribed clients of the block.
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Next, we'll scan over the list of relevant transactions and possibly // Finally, we'll scan over the list of relevant transactions and
// dispatch notifications for confirmations and spends. // possibly dispatch notifications for confirmations and spends.
for _, tx := range newBlock.txns { for _, tx := range newBlock.txns {
mtx := tx.MsgTx() mtx := tx.MsgTx()
txSha := mtx.TxHash() txSha := mtx.TxHash()
@ -520,10 +611,10 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
for i, txIn := range mtx.TxIn { for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an output // If this transaction indeed does spend an output which
// which we have a registered notification for, then // we have a registered notification for, then create a
// create a spend summary, finally sending off the // spend summary, finally sending off the details to the
// details to the notification subscriber. // notification subscriber.
clients, ok := n.spendNotifications[prevOut] clients, ok := n.spendNotifications[prevOut]
if !ok { if !ok {
continue continue
@ -555,33 +646,49 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
} }
} }
// A new block has been connected to the main chain. Send out any N
// confirmation notifications which may have been triggered by this new
// block.
n.txConfNotifier.ConnectTip(
&newBlock.hash, newBlock.height, newBlock.txns,
)
return nil return nil
} }
// getFilteredBlock is a utility to retrieve the full filtered block from a block epoch.
func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filteredBlock, error) {
rawBlock, err := n.p2pNode.GetBlockFromNetwork(*epoch.Hash)
if err != nil {
return nil, fmt.Errorf("unable to get block: %v", err)
}
txns := rawBlock.Transactions()
block := &filteredBlock{
hash: *epoch.Hash,
height: uint32(epoch.Height),
txns: txns,
connect: true,
}
return block, nil
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly // notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain. // connected block to the main chain.
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
for _, client := range n.blockEpochClients {
n.notifyBlockEpochClient(client, newHeight, newSha)
}
}
// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
epoch := &chainntnfs.BlockEpoch{ epoch := &chainntnfs.BlockEpoch{
Height: newHeight, Height: height,
Hash: newSha, Hash: sha,
} }
for _, epochClient := range n.blockEpochClients { select {
select { case epochClient.epochQueue.ChanIn() <- epoch:
case <-epochClient.cancelChan:
case epochClient.epochQueue.ChanIn() <- epoch: case <-n.quit:
case <-epochClient.cancelChan:
case <-n.quit:
}
} }
} }
@ -781,6 +888,10 @@ type blockEpochRegistration struct {
cancelChan chan struct{} cancelChan chan struct{}
bestBlock *chainntnfs.BlockEpoch
errorChan chan error
wg sync.WaitGroup wg sync.WaitGroup
} }
@ -790,14 +901,20 @@ type epochCancel struct {
epochID uint64 epochID uint64
} }
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
// to receive notifications, of each new block connected to the main chain. // caller to receive notifications, of each new block connected to the main
func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { // chain. Clients have the option of passing in their best known block, which
// the notifier uses to check if they are behind on blocks and catch them up.
func (n *NeutrinoNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
reg := &blockEpochRegistration{ reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20), epochQueue: chainntnfs.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20), epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}), cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&n.epochClientCounter, 1), epochID: atomic.AddUint64(&n.epochClientCounter, 1),
bestBlock: bestBlock,
errorChan: make(chan error, 1),
} }
reg.epochQueue.Start() reg.epochQueue.Start()
@ -868,3 +985,41 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent
}, nil }, nil
} }
} }
// NeutrinoChainConn is a wrapper around neutrino's chain backend in order
// to satisfy the chainntnfs.ChainConn interface.
type NeutrinoChainConn struct {
p2pNode *neutrino.ChainService
}
// GetBlockHeader returns the block header for a hash.
func (n *NeutrinoChainConn) GetBlockHeader(blockHash *chainhash.Hash) (*wire.BlockHeader, error) {
header, _, err := n.p2pNode.BlockHeaders.FetchHeader(blockHash)
if err != nil {
return nil, err
}
return header, nil
}
// GetBlockHeaderVerbose returns a verbose block header result for a hash. This
// result only contains the height with a nil hash.
func (n *NeutrinoChainConn) GetBlockHeaderVerbose(blockHash *chainhash.Hash) (
*btcjson.GetBlockHeaderVerboseResult, error) {
_, height, err := n.p2pNode.BlockHeaders.FetchHeader(blockHash)
if err != nil {
return nil, err
}
// Since only the height is used from the result, leave the hash nil.
return &btcjson.GetBlockHeaderVerboseResult{Height: int32(height)}, nil
}
// GetBlockHash returns the hash from a block height.
func (n *NeutrinoChainConn) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) {
header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(uint32(blockHeight))
if err != nil {
return nil, err
}
hash := header.BlockHash()
return &hash, nil
}

@ -0,0 +1,99 @@
package neutrinonotify
import (
"fmt"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/lightninglabs/neutrino"
"github.com/lightningnetwork/lnd/chainntnfs"
)
// UnsafeStart starts the notifier with a specified best height and optional
// best hash. Its bestHeight, txConfNotifier and neutrino node are initialized
// with bestHeight. The parameter generateBlocks is necessary for the
// bitcoind notifier to ensure we drain all notifications up to syncHeight,
// since if they are generated ahead of UnsafeStart the chainConn may start
// up with an outdated best block and miss sending ntfns. Used for testing.
func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash,
syncHeight int32, generateBlocks func() error) error {
// We'll obtain the latest block height of the p2p node. We'll
// start the auto-rescan from this point. Once a caller actually wishes
// to register a chain view, the rescan state will be rewound
// accordingly.
header, height, err := n.p2pNode.BlockHeaders.ChainTip()
if err != nil {
return err
}
startingPoint := &waddrmgr.BlockStamp{
Height: int32(height),
Hash: header.BlockHash(),
}
// Next, we'll create our set of rescan options. Currently it's
// required that a user MUST set an addr/outpoint/txid when creating a
// rescan. To get around this, we'll add a "zero" outpoint, that won't
// actually be matched.
var zeroInput neutrino.InputWithScript
rescanOptions := []neutrino.RescanOption{
neutrino.StartBlock(startingPoint),
neutrino.QuitChan(n.quit),
neutrino.NotificationHandlers(
rpcclient.NotificationHandlers{
OnFilteredBlockConnected: n.onFilteredBlockConnected,
OnFilteredBlockDisconnected: n.onFilteredBlockDisconnected,
},
),
neutrino.WatchInputs(zeroInput),
}
n.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(bestHeight), reorgSafetyLimit)
n.chainConn = &NeutrinoChainConn{n.p2pNode}
// Finally, we'll create our rescan struct, start it, and launch all
// the goroutines we need to operate this ChainNotifier instance.
n.chainView = n.p2pNode.NewRescan(rescanOptions...)
n.rescanErr = n.chainView.Start()
n.chainUpdates.Start()
if generateBlocks != nil {
// Ensure no block notifications are pending when we start the
// notification dispatcher goroutine.
// First generate the blocks, then drain the notifications
// for the generated blocks.
if err := generateBlocks(); err != nil {
return err
}
timeout := time.After(60 * time.Second)
loop:
for {
select {
case ntfn := <-n.chainUpdates.ChanOut():
lastReceivedNtfn := ntfn.(*filteredBlock)
if lastReceivedNtfn.height >= uint32(syncHeight) {
break loop
}
case <-timeout:
return fmt.Errorf("unable to catch up to height %d",
syncHeight)
}
}
}
// Run notificationDispatcher after setting the notifier's best height
// to avoid a race condition.
n.bestHeight = uint32(bestHeight)
n.wg.Add(1)
go n.notificationDispatcher()
return nil
}

@ -194,7 +194,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
// //
// TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest? // TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest?
// * reduces the number of goroutines // * reduces the number of goroutines
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn() blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -384,7 +384,7 @@ func (c *ChainArbitrator) Start() error {
// the chain any longer, only resolve the contracts on the confirmed // the chain any longer, only resolve the contracts on the confirmed
// commitment. // commitment.
for _, closeChanInfo := range closingChannels { for _, closeChanInfo := range closingChannels {
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn() blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
return err return err
} }

@ -21,7 +21,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte,
heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { heightHint uint32) (*chainntnfs.ConfirmationEvent, error) {
return nil, nil return nil, nil
} }
func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { func (m *mockNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
return &chainntnfs.BlockEpochEvent{ return &chainntnfs.BlockEpochEvent{
Epochs: make(chan *chainntnfs.BlockEpoch), Epochs: make(chan *chainntnfs.BlockEpoch),
Cancel: func() {}, Cancel: func() {},

@ -855,7 +855,7 @@ func (h *htlcOutgoingContestResolver) Resolve() (ContractResolver, error) {
// If we reach this point, then we can't fully act yet, so we'll await // If we reach this point, then we can't fully act yet, so we'll await
// either of our signals triggering: the HTLC expires, or we learn of // either of our signals triggering: the HTLC expires, or we learn of
// the preimage. // the preimage.
blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn() blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1043,7 +1043,7 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
// ensure the preimage can't be delivered between querying and // ensure the preimage can't be delivered between querying and
// registering for the preimage subscription. // registering for the preimage subscription.
preimageSubscription := h.PreimageDB.SubscribeUpdates() preimageSubscription := h.PreimageDB.SubscribeUpdates()
blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn() blockEpochs, err := h.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -408,7 +408,7 @@ func (d *AuthenticatedGossiper) Start() error {
// First we register for new notifications of newly discovered blocks. // First we register for new notifications of newly discovered blocks.
// We do this immediately so we'll later be able to consume any/all // We do this immediately so we'll later be able to consume any/all
// blocks which were discovered. // blocks which were discovered.
blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn() blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
return err return err
} }

@ -280,7 +280,8 @@ func (m *mockNotifier) notifyBlock(hash chainhash.Hash, height uint32) {
} }
} }
func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { func (m *mockNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()

@ -1705,7 +1705,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel, func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel,
confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) { confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) {
epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn() epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
fndgLog.Errorf("unable to register for epoch notification: %v", fndgLog.Errorf("unable to register for epoch notification: %v",
err) err)

@ -112,7 +112,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
}, nil }, nil
} }
func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { func (m *mockNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
return &chainntnfs.BlockEpochEvent{ return &chainntnfs.BlockEpochEvent{
Epochs: m.epochChan, Epochs: m.epochChan,
Cancel: func() {}, Cancel: func() {},

@ -103,7 +103,7 @@ func (d *DecayedLog) Start() error {
// Start garbage collector. // Start garbage collector.
if d.notifier != nil { if d.notifier != nil {
epochClient, err := d.notifier.RegisterBlockEpochNtfn() epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
return fmt.Errorf("Unable to register for epoch "+ return fmt.Errorf("Unable to register for epoch "+
"notifications: %v", err) "notifications: %v", err)

@ -790,7 +790,8 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte,
numConfs uint32, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { numConfs uint32, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) {
return nil, nil return nil, nil
} }
func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { func (m *mockNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
return &chainntnfs.BlockEpochEvent{ return &chainntnfs.BlockEpochEvent{
Epochs: m.epochChan, Epochs: m.epochChan,
Cancel: func() {}, Cancel: func() {},

@ -1596,7 +1596,7 @@ func (s *Switch) Start() error {
log.Infof("Starting HTLC Switch") log.Infof("Starting HTLC Switch")
blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn() blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
return err return err
} }

@ -92,7 +92,8 @@ func (m *mockNotfier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
Confirmed: m.confChannel, Confirmed: m.confChannel,
}, nil }, nil
} }
func (m *mockNotfier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { func (m *mockNotfier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
return &chainntnfs.BlockEpochEvent{ return &chainntnfs.BlockEpochEvent{
Epochs: make(chan *chainntnfs.BlockEpoch), Epochs: make(chan *chainntnfs.BlockEpoch),
Cancel: func() {}, Cancel: func() {},

@ -257,7 +257,7 @@ func (u *utxoNursery) Start() error {
// connected block. We register immediately on startup to ensure that // connected block. We register immediately on startup to ensure that
// no blocks are missed while we are handling blocks that were missed // no blocks are missed while we are handling blocks that were missed
// during the time the UTXO nursery was unavailable. // during the time the UTXO nursery was unavailable.
newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn() newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil { if err != nil {
return err return err
} }