chainntnfs/btcd: add logic for mempool argument to RegisterSpendNtfn

This commit adds a boolean to RegisterSpendNtfn, giving the caller the
option to only register for notifications on confirmed spends. This is
implemented for the btcd backend using logic similar to what is in used
for Neutrino, paving the way for later unifying them.
This commit is contained in:
Johan T. Halseth 2018-03-20 16:17:23 +01:00
parent fa29adf9a3
commit cc17bc1636
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -207,6 +207,23 @@ func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t ti
}
}
// filteredBlock represents a new block which has been connected to the main
// chain. The slice of transactions will only be populated if the block
// includes a transaction that confirmed one of our watched txids, or spends
// one of the outputs currently being watched.
// TODO(halseth): this is currently used for complete blocks. Change to use
// onFilteredBlockConnected and onFilteredBlockDisconnected, making it easier
// to unify with the Neutrino implementation.
type filteredBlock struct {
hash chainhash.Hash
height uint32
txns []*btcutil.Tx
// connected is true if this update is a new block and false if it is a
// disconnected block.
connect bool
}
// onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient.
func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) {
// Append this new chain update to the end of the queue of new chain
@ -322,12 +339,15 @@ out:
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
update.blockHeight, update.blockHash)
b.notifyBlockEpochs(update.blockHeight, update.blockHash)
txns := btcutil.NewBlock(rawBlock).Transactions()
err = b.txConfNotifier.ConnectTip(update.blockHash,
uint32(update.blockHeight), txns)
if err != nil {
block := &filteredBlock{
hash: *update.blockHash,
height: uint32(update.blockHeight),
txns: txns,
connect: true,
}
if err := b.handleBlockConnected(block); err != nil {
chainntnfs.Log.Error(err)
}
continue
@ -350,6 +370,8 @@ out:
chainntnfs.Log.Error(err)
}
// NOTE: we currently only use txUpdates for mempool spends. It
// might get removed entirely in the future.
case item := <-b.txUpdates.ChanOut():
newSpend := item.(*txUpdate)
spendingTx := newSpend.tx
@ -381,7 +403,20 @@ out:
spendDetails.SpendingHeight = currentHeight + 1
}
for _, ntfn := range clients {
// Keep spendNotifications that are
// waiting for a confirmation around.
// They will be notified when we find
// the spend within a block.
rem := make(map[uint64]*spendNotification)
for c, ntfn := range clients {
// If this client didn't want
// to be notified on mempool
// spends, store it for later.
if !ntfn.mempool {
rem[c] = ntfn
continue
}
chainntnfs.Log.Infof("Dispatching "+
"spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
@ -393,6 +428,12 @@ out:
close(ntfn.spendChan)
}
delete(b.spendNotifications, prevOut)
// If we had any clients left, add them
// back to the map.
if len(rem) > 0 {
b.spendNotifications[prevOut] = rem
}
}
}
@ -461,6 +502,65 @@ func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash,
return &txConf, nil
}
// handleBlocksConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications
// now or after numConfirmations confs.
// TODO(halseth): this is reusing the neutrino notifier implementation, unify
// them.
func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// First we'll notify any subscribed clients of the block.
b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Next, we'll scan over the list of relevant transactions and possibly
// dispatch notifications for confirmations and spends.
for _, tx := range newBlock.txns {
mtx := tx.MsgTx()
txSha := mtx.TxHash()
for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an output which we have a
// registered notification for, then create a spend summary, finally
// sending off the details to the notification subscriber.
clients, ok := b.spendNotifications[prevOut]
if !ok {
continue
}
// TODO(roasbeef): many integration tests expect spend to be
// notified within the mempool.
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &txSha,
SpendingTx: mtx,
SpenderInputIndex: uint32(i),
SpendingHeight: int32(newBlock.height),
}
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to Cancel will not
// block. This is safe to do since the channel is buffered, and
// the message can still be read by the receiver.
close(ntfn.spendChan)
}
delete(b.spendNotifications, prevOut)
}
}
// A new block has been connected to the main chain.
// Send out any N confirmation notifications which may
// have been triggered by this new block.
b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns)
return nil
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
@ -489,6 +589,7 @@ type spendNotification struct {
spendChan chan *chainntnfs.SpendDetail
spendID uint64
mempool bool
}
// spendCancel is a message sent to the BtcdNotifier when a client wishes to
@ -506,12 +607,13 @@ type spendCancel struct {
// outpoint has been detected, the details of the spending event will be sent
// across the 'Spend' channel.
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
_ uint32) (*chainntnfs.SpendEvent, error) {
_ uint32, mempool bool) (*chainntnfs.SpendEvent, error) {
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
spendID: atomic.AddUint64(&b.spendClientCounter, 1),
mempool: mempool,
}
select {