chainntnfs/neutrino: Handle block connects and disconnects in order.

This commit is contained in:
Jim Posen 2017-11-10 11:01:36 -08:00 committed by Olaoluwa Osuntokun
parent 0cac7e80d4
commit fa513a76ad

@ -65,8 +65,7 @@ type NeutrinoNotifier struct {
rescanErr <-chan error rescanErr <-chan error
newBlocks *chainntnfs.ConcurrentQueue chainUpdates *chainntnfs.ConcurrentQueue
staleBlocks *chainntnfs.ConcurrentQueue
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
@ -96,9 +95,7 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) {
rescanErr: make(chan error), rescanErr: make(chan error),
newBlocks: chainntnfs.NewConcurrentQueue(10), chainUpdates: chainntnfs.NewConcurrentQueue(10),
staleBlocks: chainntnfs.NewConcurrentQueue(10),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -150,8 +147,7 @@ func (n *NeutrinoNotifier) Start() error {
n.chainView = n.p2pNode.NewRescan(rescanOptions...) n.chainView = n.p2pNode.NewRescan(rescanOptions...)
n.rescanErr = n.chainView.Start() n.rescanErr = n.chainView.Start()
n.newBlocks.Start() n.chainUpdates.Start()
n.staleBlocks.Start()
n.wg.Add(1) n.wg.Add(1)
go n.notificationDispatcher() go n.notificationDispatcher()
@ -169,8 +165,7 @@ func (n *NeutrinoNotifier) Stop() error {
close(n.quit) close(n.quit)
n.wg.Wait() n.wg.Wait()
n.newBlocks.Stop() n.chainUpdates.Stop()
n.staleBlocks.Stop()
// Notify all pending clients of our shutdown by closing the related // Notify all pending clients of our shutdown by closing the related
// notification channels. // notification channels.
@ -200,6 +195,10 @@ type filteredBlock struct {
hash chainhash.Hash hash chainhash.Hash
height uint32 height uint32
txns []*btcutil.Tx txns []*btcutil.Tx
// connected is true if this update is a new block and false if it is a
// disconnected block.
connect bool
} }
// onFilteredBlockConnected is a callback which is executed each a new block is // onFilteredBlockConnected is a callback which is executed each a new block is
@ -209,10 +208,11 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
// Append this new chain update to the end of the queue of new chain // Append this new chain update to the end of the queue of new chain
// updates. // updates.
n.newBlocks.ChanIn() <- &filteredBlock{ n.chainUpdates.ChanIn() <- &filteredBlock{
hash: header.BlockHash(), hash: header.BlockHash(),
height: uint32(height), height: uint32(height),
txns: txns, txns: txns,
connect: true,
} }
} }
@ -223,9 +223,10 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
// Append this new chain update to the end of the queue of new chain // Append this new chain update to the end of the queue of new chain
// disconnects. // disconnects.
n.staleBlocks.ChanIn() <- &filteredBlock{ n.chainUpdates.ChanIn() <- &filteredBlock{
hash: header.BlockHash(), hash: header.BlockHash(),
height: uint32(height), height: uint32(height),
connect: false,
} }
} }
@ -318,87 +319,40 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
n.blockEpochClients[msg.epochID] = msg n.blockEpochClients[msg.epochID] = msg
} }
case item := <-n.newBlocks.ChanOut(): case item := <-n.chainUpdates.ChanOut():
newBlock := item.(*filteredBlock) update := item.(*filteredBlock)
if update.connect {
n.heightMtx.Lock() n.heightMtx.Lock()
n.bestHeight = newBlock.height if update.height != n.bestHeight+1 {
chainntnfs.Log.Warnf("Received blocks out of order: "+
"current height=%d, new height=%d",
n.bestHeight, update.height)
}
n.bestHeight = update.height
n.heightMtx.Unlock() n.heightMtx.Unlock()
chainntnfs.Log.Infof("New block: height=%v, sha=%v", chainntnfs.Log.Infof("New block: height=%v, sha=%v",
newBlock.height, newBlock.hash) update.height, update.hash)
// First we'll notify any subscribed clients of the err := n.handleBlockConnected(update)
// block. if err != nil {
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) chainntnfs.Log.Error(err)
}
// Next, we'll scan over the list of relevant } else {
// transactions and possibly dispatch notifications for n.heightMtx.Lock()
// confirmations and spends. if update.height != n.bestHeight {
for _, tx := range newBlock.txns { chainntnfs.Log.Warnf("Received blocks out of order: "+
// Check if the inclusion of this transaction "current height=%d, disconnected height=%d",
// within a block by itself triggers a block n.bestHeight, update.height)
// confirmation threshold, if so send a
// notification. Otherwise, place the
// notification on a heap to be triggered in
// the future once additional confirmations are
// attained.
mtx := tx.MsgTx()
txIndex := tx.Index()
txSha := mtx.TxHash()
n.checkConfirmationTrigger(&txSha, newBlock, txIndex)
for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does
// spend an output which we have a
// registered notification for, then
// create a spend summary, finally
// sending off the details to the
// notification subscriber.
if clients, ok := n.spendNotifications[prevOut]; ok {
// TODO(roasbeef): many
// integration tests expect
// spend to be notified within
// the mempool.
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &txSha,
SpendingTx: mtx,
SpenderInputIndex: uint32(i),
SpendingHeight: int32(newBlock.height),
} }
for _, ntfn := range clients { n.bestHeight = update.height - 1
chainntnfs.Log.Infof("Dispatching "+ n.heightMtx.Unlock()
"spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to Cancel will not chainntnfs.Log.Infof("Block disconnected from main chain: "+
// block. This is safe to do since the channel is buffered, and the "height=%v, sha=%v", update.height, update.hash)
// message can still be read by the receiver.
close(ntfn.spendChan)
} }
delete(n.spendNotifications, prevOut)
}
}
}
// A new block has been connected to the main chain.
// Send out any N confirmation notifications which may
// have been triggered by this new block.
n.notifyConfs(int32(newBlock.height))
case item := <-n.staleBlocks.ChanOut():
staleBlock := item.(*filteredBlock)
chainntnfs.Log.Warnf("Block disconnected from main "+
"chain: %v", staleBlock.hash)
case err := <-n.rescanErr: case err := <-n.rescanErr:
chainntnfs.Log.Errorf("Error during rescan: %v", err) chainntnfs.Log.Errorf("Error during rescan: %v", err)
@ -525,6 +479,69 @@ chainScan:
return true return true
} }
// handleBlocksConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications
// now or after numConfirmations confs.
func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// First we'll notify any subscribed clients of the block.
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Next, we'll scan over the list of relevant transactions and possibly
// dispatch notifications for confirmations and spends.
for _, tx := range newBlock.txns {
// Check if the inclusion of this transaction within a block by itself
// triggers a block confirmation threshold, if so send a notification.
// Otherwise, place the notification on a heap to be triggered in the
// future once additional confirmations are attained.
mtx := tx.MsgTx()
txIndex := tx.Index()
txSha := mtx.TxHash()
n.checkConfirmationTrigger(&txSha, newBlock, txIndex)
for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an output which we have a
// registered notification for, then create a spend summary, finally
// sending off the details to the notification subscriber.
clients, ok := n.spendNotifications[prevOut]
if !ok {
continue
}
// TODO(roasbeef): many integration tests expect spend to be
// notified within the mempool.
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &txSha,
SpendingTx: mtx,
SpenderInputIndex: uint32(i),
SpendingHeight: int32(newBlock.height),
}
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to Cancel will not
// block. This is safe to do since the channel is buffered, and
// the message can still be read by the receiver.
close(ntfn.spendChan)
}
delete(n.spendNotifications, prevOut)
}
}
// A new block has been connected to the main chain.
// Send out any N confirmation notifications which may
// have been triggered by this new block.
n.notifyConfs(int32(newBlock.height))
return nil
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly // notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain. // connected block to the main chain.
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {