chainntnfs/neutrino: Refactor NeutrinoNotifier to use TxConfNotifier.

This commit is contained in:
Jim Posen 2017-11-13 15:49:58 -08:00 committed by Olaoluwa Osuntokun
parent 4405dac4d0
commit abf3685d2d
3 changed files with 67 additions and 257 deletions

@ -103,6 +103,11 @@ func testSingleConfirmationNotification(miner *rpctest.Harness,
select { select {
case confInfo := <-confIntent.Confirmed: case confInfo := <-confIntent.Confirmed:
if !confInfo.BlockHash.IsEqual(blockHash[0]) {
t.Fatalf("mismatched block hashes: expected %v, got %v",
blockHash[0], confInfo.BlockHash)
}
// Finally, we'll verify that the tx index returned is the exact same // Finally, we'll verify that the tx index returned is the exact same
// as the tx index of the transaction within the block itself. // as the tx index of the transaction within the block itself.
msgBlock, err := miner.Node.GetBlock(blockHash[0]) msgBlock, err := miner.Node.GetBlock(blockHash[0])

@ -1,58 +0,0 @@
package neutrinonotify
import "github.com/lightningnetwork/lnd/chainntnfs"
// confEntry represents an entry in the min-confirmation heap. .
type confEntry struct {
*confirmationsNotification
initialConfDetails *chainntnfs.TxConfirmation
triggerHeight uint32
}
// confirmationHeap is a list of confEntries sorted according to nearest
// "confirmation" height.Each entry within the min-confirmation heap is sorted
// according to the smallest dleta from the current blockheight to the
// triggerHeight of the next entry confirmationHeap
type confirmationHeap struct {
items []*confEntry
}
// newConfirmationHeap returns a new confirmationHeap with zero items.
func newConfirmationHeap() *confirmationHeap {
var confItems []*confEntry
return &confirmationHeap{confItems}
}
// Len returns the number of items in the priority queue. It is part of the
// heap.Interface implementation.
func (c *confirmationHeap) Len() int { return len(c.items) }
// Less returns whether the item in the priority queue with index i should sort
// before the item with index j. It is part of the heap.Interface implementation.
func (c *confirmationHeap) Less(i, j int) bool {
return c.items[i].triggerHeight < c.items[j].triggerHeight
}
// Swap swaps the items at the passed indices in the priority queue. It is
// part of the heap.Interface implementation.
func (c *confirmationHeap) Swap(i, j int) {
c.items[i], c.items[j] = c.items[j], c.items[i]
}
// Push pushes the passed item onto the priority queue. It is part of the
// heap.Interface implementation.
func (c *confirmationHeap) Push(x interface{}) {
c.items = append(c.items, x.(*confEntry))
}
// Pop removes the highest priority item (according to Less) from the priority
// queue and returns it. It is part of the heap.Interface implementation.
func (c *confirmationHeap) Pop() interface{} {
n := len(c.items)
x := c.items[n-1]
c.items[n-1] = nil
c.items = c.items[0 : n-1]
return x
}

@ -1,8 +1,8 @@
package neutrinonotify package neutrinonotify
import ( import (
"container/heap"
"errors" "errors"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -22,6 +22,12 @@ const (
// notifierType uniquely identifies this concrete implementation of the // notifierType uniquely identifies this concrete implementation of the
// ChainNotifier interface. // ChainNotifier interface.
notifierType = "neutrino" notifierType = "neutrino"
// reorgSafetyLimit is the chain depth beyond which it is assumed a block
// will not be reorganized out of the chain. This is used to determine when
// to prune old confirmation requests so that reorgs are handled correctly.
// The coinbase maturity period is a reasonable value to use.
reorgSafetyLimit = 100
) )
var ( var (
@ -58,8 +64,7 @@ type NeutrinoNotifier struct {
spendNotifications map[wire.OutPoint]map[uint64]*spendNotification spendNotifications map[wire.OutPoint]map[uint64]*spendNotification
confNotifications map[chainhash.Hash][]*confirmationsNotification txConfNotifier *chainntnfs.TxConfNotifier
confHeap *confirmationHeap
blockEpochClients map[uint64]*blockEpochRegistration blockEpochClients map[uint64]*blockEpochRegistration
@ -88,9 +93,6 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) {
spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification),
confNotifications: make(map[chainhash.Hash][]*confirmationsNotification),
confHeap: newConfirmationHeap(),
p2pNode: node, p2pNode: node,
rescanErr: make(chan error), rescanErr: make(chan error),
@ -142,6 +144,9 @@ func (n *NeutrinoNotifier) Start() error {
neutrino.WatchTxIDs(zeroHash), neutrino.WatchTxIDs(zeroHash),
} }
n.txConfNotifier = chainntnfs.NewTxConfNotifier(
bestHeight, reorgSafetyLimit)
// Finally, we'll create our rescan struct, start it, and launch all // Finally, we'll create our rescan struct, start it, and launch all
// the goroutines we need to operate this ChainNotifier instance. // the goroutines we need to operate this ChainNotifier instance.
n.chainView = n.p2pNode.NewRescan(rescanOptions...) n.chainView = n.p2pNode.NewRescan(rescanOptions...)
@ -174,15 +179,10 @@ func (n *NeutrinoNotifier) Stop() error {
close(spendClient.spendChan) close(spendClient.spendChan)
} }
} }
for _, confClients := range n.confNotifications {
for _, confClient := range confClients {
close(confClient.finConf)
close(confClient.negativeConf)
}
}
for _, epochClient := range n.blockEpochClients { for _, epochClient := range n.blockEpochClients {
close(epochClient.epochChan) close(epochClient.epochChan)
} }
n.txConfNotifier.TearDown()
return nil return nil
} }
@ -284,35 +284,39 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
n.spendNotifications[op][msg.spendID] = msg n.spendNotifications[op][msg.spendID] = msg
case *confirmationsNotification: case *confirmationsNotification:
chainntnfs.Log.Infof("New confirmations "+ chainntnfs.Log.Infof("New confirmations subscription: "+
"subscription: txid=%v, numconfs=%v, "+ "txid=%v, numconfs=%v, height_hint=%v",
"height_hint=%v", *msg.txid, msg.TxID, msg.NumConfirmations, msg.heightHint)
msg.numConfirmations, msg.heightHint)
// If the notification can be partially or // If the notification can be partially or
// fully dispatched, then we can skip the first // fully dispatched, then we can skip the first
// phase for ntfns. // phase for ntfns.
n.heightMtx.RLock() n.heightMtx.RLock()
currentHeight := n.bestHeight currentHeight := n.bestHeight
if n.attemptHistoricalDispatch(msg, currentHeight, msg.heightHint) {
n.heightMtx.RUnlock()
continue
}
n.heightMtx.RUnlock() n.heightMtx.RUnlock()
// If we can't fully dispatch confirmation, // Lookup whether the transaction is already included in the
// then we'll update our filter so we can be // active chain.
// notified of its future initial confirmation. txConf, err := n.historicalConfDetails(msg.TxID, currentHeight,
rescanUpdate := []neutrino.UpdateOption{ msg.heightHint)
neutrino.AddTxIDs(*msg.txid), if err != nil {
neutrino.Rewind(currentHeight), chainntnfs.Log.Error(err)
}
if err := n.chainView.Update(rescanUpdate...); err != nil {
chainntnfs.Log.Errorf("unable to update rescan: %v", err)
} }
txid := *msg.txid if txConf == nil {
n.confNotifications[txid] = append(n.confNotifications[txid], msg) // If we can't fully dispatch confirmation,
// then we'll update our filter so we can be
// notified of its future initial confirmation.
rescanUpdate := []neutrino.UpdateOption{
neutrino.AddTxIDs(*msg.TxID),
neutrino.Rewind(currentHeight),
}
if err := n.chainView.Update(rescanUpdate...); err != nil {
chainntnfs.Log.Errorf("unable to update rescan: %v", err)
}
}
n.txConfNotifier.Register(&msg.ConfNtfn, txConf)
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
@ -352,6 +356,11 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
chainntnfs.Log.Infof("Block disconnected from main chain: "+ chainntnfs.Log.Infof("Block disconnected from main chain: "+
"height=%v, sha=%v", update.height, update.hash) "height=%v, sha=%v", update.height, update.hash)
err := n.txConfNotifier.DisconnectTip(update.height)
if err != nil {
chainntnfs.Log.Error(err)
}
} }
case err := <-n.rescanErr: case err := <-n.rescanErr:
chainntnfs.Log.Errorf("Error during rescan: %v", err) chainntnfs.Log.Errorf("Error during rescan: %v", err)
@ -363,32 +372,20 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
} }
} }
// attemptHistoricalDispatch attempts to consult the historical chain data to // historicalConfDetails looks up whether a transaction is already included in a
// see if a transaction has already reached full confirmation status at the // block in the active chain and, if so, returns details about the confirmation.
// time a notification for it was registered. If it has, then we do an func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash,
// immediate dispatch. Otherwise, we'll add the partially confirmed transaction currentHeight, heightHint uint32) (*chainntnfs.TxConfirmation, error) {
// to the confirmation heap.
func (n *NeutrinoNotifier) attemptHistoricalDispatch(msg *confirmationsNotification,
currentHeight, heightHint uint32) bool {
targetHash := msg.txid
var confDetails *chainntnfs.TxConfirmation
chainntnfs.Log.Infof("Attempting to trigger dispatch for %v from "+
"historical chain", msg.txid)
// Starting from the height hint, we'll walk forwards in the chain to // Starting from the height hint, we'll walk forwards in the chain to
// see if this transaction has already been confirmed. // see if this transaction has already been confirmed.
chainScan:
for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ {
// First, we'll fetch the block header for this height so we // First, we'll fetch the block header for this height so we
// can compute the current block hash. // can compute the current block hash.
header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight) header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight)
if err != nil { if err != nil {
chainntnfs.Log.Errorf("unable to get header for "+ return nil, fmt.Errorf("unable to get header for height=%v: %v",
"height=%v: %v", scanHeight, err) scanHeight, err)
return false
} }
blockHash := header.BlockHash() blockHash := header.BlockHash()
@ -397,9 +394,8 @@ chainScan:
regFilter, err := n.p2pNode.GetCFilter(blockHash, regFilter, err := n.p2pNode.GetCFilter(blockHash,
wire.GCSFilterRegular) wire.GCSFilterRegular)
if err != nil { if err != nil {
chainntnfs.Log.Errorf("unable to retrieve regular "+ return nil, fmt.Errorf("unable to retrieve regular filter for "+
"filter for height=%v: %v", scanHeight, err) "height=%v: %v", scanHeight, err)
return false
} }
// If the block has no transactions other than the coinbase // If the block has no transactions other than the coinbase
@ -414,8 +410,7 @@ chainScan:
key := builder.DeriveKey(&blockHash) key := builder.DeriveKey(&blockHash)
match, err := regFilter.Match(key, targetHash[:]) match, err := regFilter.Match(key, targetHash[:])
if err != nil { if err != nil {
chainntnfs.Log.Errorf("unable to query filter: %v", err) return nil, fmt.Errorf("unable to query filter: %v", err)
return false
} }
// If there's no match, then we can continue forward to the // If there's no match, then we can continue forward to the
@ -429,54 +424,22 @@ chainScan:
// to send the proper response. // to send the proper response.
block, err := n.p2pNode.GetBlockFromNetwork(blockHash) block, err := n.p2pNode.GetBlockFromNetwork(blockHash)
if err != nil { if err != nil {
chainntnfs.Log.Errorf("unable to get block from "+ return nil, fmt.Errorf("unable to get block from network: %v", err)
"network: %v", err)
return false
} }
for j, tx := range block.Transactions() { for j, tx := range block.Transactions() {
txHash := tx.Hash() txHash := tx.Hash()
if txHash.IsEqual(targetHash) { if txHash.IsEqual(targetHash) {
confDetails = &chainntnfs.TxConfirmation{ confDetails := chainntnfs.TxConfirmation{
BlockHash: &blockHash, BlockHash: &blockHash,
BlockHeight: scanHeight, BlockHeight: scanHeight,
TxIndex: uint32(j), TxIndex: uint32(j),
} }
break chainScan return &confDetails, nil
} }
} }
} }
// If it hasn't yet been confirmed, then we can exit early. return nil, nil
if confDetails == nil {
return false
}
// Otherwise, we'll calculate the number of confirmations that the
// transaction has so we can decide if it has reached the desired
// number of confirmations or not.
txConfs := currentHeight - confDetails.BlockHeight + 1
// If the transaction has more that enough confirmations, then we can
// dispatch it immediately after obtaining for information w.r.t
// exactly *when* if got all its confirmations.
if uint32(txConfs) >= msg.numConfirmations {
chainntnfs.Log.Infof("Dispatching %v conf notification, "+
"height=%v", msg.numConfirmations, currentHeight)
msg.finConf <- confDetails
return true
}
// Otherwise, the transaction has only been *partially* confirmed, so
// we need to insert it into the confirmation heap.
confHeight := confDetails.BlockHeight + msg.numConfirmations - 1
heapEntry := &confEntry{
msg,
confDetails,
confHeight,
}
heap.Push(n.confHeap, heapEntry)
return true
} }
// handleBlocksConnected applies a chain update for a new block. Any watched // handleBlocksConnected applies a chain update for a new block. Any watched
@ -489,14 +452,8 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// Next, we'll scan over the list of relevant transactions and possibly // Next, we'll scan over the list of relevant transactions and possibly
// dispatch notifications for confirmations and spends. // dispatch notifications for confirmations and spends.
for _, tx := range newBlock.txns { for _, tx := range newBlock.txns {
// Check if the inclusion of this transaction within a block by itself
// triggers a block confirmation threshold, if so send a notification.
// Otherwise, place the notification on a heap to be triggered in the
// future once additional confirmations are attained.
mtx := tx.MsgTx() mtx := tx.MsgTx()
txIndex := tx.Index()
txSha := mtx.TxHash() txSha := mtx.TxHash()
n.checkConfirmationTrigger(&txSha, newBlock, txIndex)
for i, txIn := range mtx.TxIn { for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint prevOut := txIn.PreviousOutPoint
@ -537,7 +494,7 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// A new block has been connected to the main chain. // A new block has been connected to the main chain.
// Send out any N confirmation notifications which may // Send out any N confirmation notifications which may
// have been triggered by this new block. // have been triggered by this new block.
n.notifyConfs(int32(newBlock.height)) n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns)
return nil return nil
} }
@ -572,91 +529,6 @@ func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.
} }
} }
// notifyConfs examines the current confirmation heap, sending off any
// notifications which have been triggered by the connection of a new block at
// newBlockHeight.
func (n *NeutrinoNotifier) notifyConfs(newBlockHeight int32) {
// If the heap is empty, we have nothing to do.
if n.confHeap.Len() == 0 {
return
}
// Traverse our confirmation heap. The heap is a min-heap, so the
// confirmation notification which requires the smallest block-height
// will always be at the top of the heap. If a confirmation
// notification is eligible for triggering, then fire it off, and check
// if another is eligible until there are no more eligible entries.
nextConf := heap.Pop(n.confHeap).(*confEntry)
for nextConf.triggerHeight <= uint32(newBlockHeight) {
chainntnfs.Log.Infof("Dispatching %v conf notification, "+
"height=%v", nextConf.numConfirmations, newBlockHeight)
nextConf.finConf <- nextConf.initialConfDetails
if n.confHeap.Len() == 0 {
return
}
nextConf = heap.Pop(n.confHeap).(*confEntry)
}
heap.Push(n.confHeap, nextConf)
}
// checkConfirmationTrigger determines if the passed txSha included at
// blockHeight triggers any single confirmation notifications. In the event
// that the txid matches, yet needs additional confirmations, it is added to
// the confirmation heap to be triggered at a later time.
func (n *NeutrinoNotifier) checkConfirmationTrigger(txSha *chainhash.Hash,
newTip *filteredBlock, txIndex int) {
// If a confirmation notification has been registered for this txid,
// then either trigger a notification event if only a single
// confirmation notification was requested, or place the notification
// on the confirmation heap for future usage.
if confClients, ok := n.confNotifications[*txSha]; ok {
// Either all of the registered confirmations will be
// dispatched due to a single confirmation, or added to the
// conf head. Therefor we unconditionally delete the registered
// confirmations from the staging zone.
defer func() {
delete(n.confNotifications, *txSha)
}()
for _, confClient := range confClients {
confDetails := &chainntnfs.TxConfirmation{
BlockHash: &newTip.hash,
BlockHeight: uint32(newTip.height),
TxIndex: uint32(txIndex),
}
if confClient.numConfirmations == 1 {
chainntnfs.Log.Infof("Dispatching single conf "+
"notification, sha=%v, height=%v", txSha,
newTip.height)
confClient.finConf <- confDetails
continue
}
// The registered notification requires more than one
// confirmation before triggering. So we create a
// heapConf entry for this notification. The heapConf
// allows us to easily keep track of which
// notification(s) we should fire off with each
// incoming block.
confClient.initialConfirmHeight = uint32(newTip.height)
finalConfHeight := confClient.initialConfirmHeight + confClient.numConfirmations - 1
heapEntry := &confEntry{
confClient,
confDetails,
finalConfHeight,
}
heap.Push(n.confHeap, heapEntry)
}
}
}
// spendNotification couples a target outpoint along with the channel used for // spendNotification couples a target outpoint along with the channel used for
// notifications once a spend of the outpoint has been detected. // notifications once a spend of the outpoint has been detected.
type spendNotification struct { type spendNotification struct {
@ -799,15 +671,8 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// confirmationNotification represents a client's intent to receive a // confirmationNotification represents a client's intent to receive a
// notification once the target txid reaches numConfirmations confirmations. // notification once the target txid reaches numConfirmations confirmations.
type confirmationsNotification struct { type confirmationsNotification struct {
txid *chainhash.Hash chainntnfs.ConfNtfn
heightHint uint32 heightHint uint32
initialConfirmHeight uint32
numConfirmations uint32
finConf chan *chainntnfs.TxConfirmation
negativeConf chan int32 // TODO(roasbeef): re-org funny business
} }
// RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier
@ -817,21 +682,19 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) {
ntfn := &confirmationsNotification{ ntfn := &confirmationsNotification{
txid: txid, ConfNtfn: chainntnfs.ConfNtfn{
heightHint: heightHint, TxID: txid,
numConfirmations: numConfs, NumConfirmations: numConfs,
finConf: make(chan *chainntnfs.TxConfirmation, 1), Event: chainntnfs.NewConfirmationEvent(),
negativeConf: make(chan int32, 1), },
heightHint: heightHint,
} }
select { select {
case <-n.quit: case <-n.quit:
return nil, ErrChainNotifierShuttingDown return nil, ErrChainNotifierShuttingDown
case n.notificationRegistry <- ntfn: case n.notificationRegistry <- ntfn:
return &chainntnfs.ConfirmationEvent{ return ntfn.Event, nil
Confirmed: ntfn.finConf,
NegativeConf: ntfn.negativeConf,
}, nil
} }
} }