chainntfs: switch BtcdNotifier to directly use btcrpcclient
* Turns out the NotificationServer on btcwallet doesn’t sever full blocks, nor notify for arbitrary transactions. Instead, we now create a new client specifically for BtcdNotifier. * Final implementation is simpler, less dependent on newer btcwallet features in flux. Additionally, this decouples the chain notifications from the wallet. Enabling reliance on btcd for notifications, in conjunction with an independent wallet that satisfies the to-be-drafted Wallet interface.
This commit is contained in:
parent
1c59dfc75c
commit
e92fc5f495
@ -1,16 +1,16 @@
|
|||||||
package btcdnotify
|
package btcdnotify
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/btcjson"
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/btcsuite/btcwallet/chain"
|
"github.com/btcsuite/btcrpcclient"
|
||||||
btcwallet "github.com/btcsuite/btcwallet/wallet"
|
"github.com/btcsuite/btcutil"
|
||||||
"github.com/btcsuite/btcwallet/wtxmgr"
|
|
||||||
"github.com/lightningnetwork/lnd/chainntfs"
|
"github.com/lightningnetwork/lnd/chainntfs"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -19,8 +19,7 @@ type BtcdNotifier struct {
|
|||||||
started int32 // To be used atomically
|
started int32 // To be used atomically
|
||||||
stopped int32 // To be used atomically
|
stopped int32 // To be used atomically
|
||||||
|
|
||||||
ntfnSource *btcwallet.NotificationServer
|
chainConn *btcrpcclient.Client
|
||||||
chainConn *chain.RPCClient
|
|
||||||
|
|
||||||
notificationRegistry chan interface{}
|
notificationRegistry chan interface{}
|
||||||
|
|
||||||
@ -30,9 +29,9 @@ type BtcdNotifier struct {
|
|||||||
confNotifications map[wire.ShaHash]*confirmationsNotification
|
confNotifications map[wire.ShaHash]*confirmationsNotification
|
||||||
confHeap *confirmationHeap
|
confHeap *confirmationHeap
|
||||||
|
|
||||||
connectedBlocks <-chan wtxmgr.BlockMeta
|
connectedBlockHashes chan *blockNtfn
|
||||||
disconnectedBlocks <-chan wtxmgr.BlockMeta
|
disconnectedBlockHashes chan *blockNtfn
|
||||||
relevantTxs <-chan chain.RelevantTx
|
relevantTxs chan *btcutil.Tx
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
@ -41,15 +40,12 @@ type BtcdNotifier struct {
|
|||||||
var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
|
var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
|
||||||
|
|
||||||
// NewBtcdNotifier...
|
// NewBtcdNotifier...
|
||||||
// TODO(roasbeef): chain client + notification sever
|
// TODO(roasbeef):
|
||||||
// * use server for notifications
|
|
||||||
// * when asked for spent, request via client
|
// * when asked for spent, request via client
|
||||||
func NewBtcdNotifier(ntfnSource *btcwallet.NotificationServer,
|
//func NewBtcdNotifier(ntfnSource *btcwallet.NotificationServer,
|
||||||
chainConn *chain.RPCClient) (*BtcdNotifier, error) {
|
// chainConn *chain.RPCClient) (*BtcdNotifier, error) {
|
||||||
|
func NewBtcdNotifier(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) {
|
||||||
return &BtcdNotifier{
|
notifier := &BtcdNotifier{
|
||||||
ntfnSource: ntfnSource,
|
|
||||||
chainConn: chainConn,
|
|
||||||
|
|
||||||
notificationRegistry: make(chan interface{}),
|
notificationRegistry: make(chan interface{}),
|
||||||
|
|
||||||
@ -57,12 +53,28 @@ func NewBtcdNotifier(ntfnSource *btcwallet.NotificationServer,
|
|||||||
confNotifications: make(map[wire.ShaHash]*confirmationsNotification),
|
confNotifications: make(map[wire.ShaHash]*confirmationsNotification),
|
||||||
confHeap: newConfirmationHeap(),
|
confHeap: newConfirmationHeap(),
|
||||||
|
|
||||||
connectedBlocks: make(chan wtxmgr.BlockMeta),
|
connectedBlockHashes: make(chan *blockNtfn, 20),
|
||||||
disconnectedBlocks: make(chan wtxmgr.BlockMeta),
|
disconnectedBlockHashes: make(chan *blockNtfn, 20),
|
||||||
relevantTxs: make(chan chain.RelevantTx),
|
relevantTxs: make(chan *btcutil.Tx, 100),
|
||||||
|
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
}, nil
|
}
|
||||||
|
|
||||||
|
ntfnCallbacks := &btcrpcclient.NotificationHandlers{
|
||||||
|
OnBlockConnected: notifier.onBlockConnected,
|
||||||
|
OnBlockDisconnected: notifier.onBlockDisconnected,
|
||||||
|
OnRedeemingTx: notifier.onRedeemingTx,
|
||||||
|
}
|
||||||
|
|
||||||
|
config.DisableConnectOnNew = true
|
||||||
|
config.DisableAutoReconnect = false
|
||||||
|
chainConn, err := btcrpcclient.New(config, ntfnCallbacks)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
notifier.chainConn = chainConn
|
||||||
|
|
||||||
|
return notifier, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start...
|
// Start...
|
||||||
@ -72,6 +84,14 @@ func (b *BtcdNotifier) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := b.chainConn.Connect(20); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.chainConn.NotifyBlocks(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
b.wg.Add(1)
|
b.wg.Add(1)
|
||||||
go b.notificationDispatcher()
|
go b.notificationDispatcher()
|
||||||
|
|
||||||
@ -85,16 +105,43 @@ func (b *BtcdNotifier) Stop() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.chainConn.Shutdown()
|
||||||
|
|
||||||
close(b.quit)
|
close(b.quit)
|
||||||
b.wg.Wait()
|
b.wg.Wait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// connectedBlock...
|
||||||
|
type blockNtfn struct {
|
||||||
|
sha *wire.ShaHash
|
||||||
|
height int32
|
||||||
|
}
|
||||||
|
|
||||||
|
// onBlockConnected...
|
||||||
|
func (b *BtcdNotifier) onBlockConnected(hash *wire.ShaHash, height int32, t time.Time) {
|
||||||
|
select {
|
||||||
|
case b.connectedBlockHashes <- &blockNtfn{hash, height}:
|
||||||
|
case <-b.quit:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// onBlockDisconnected...
|
||||||
|
func (b *BtcdNotifier) onBlockDisconnected(hash *wire.ShaHash, height int32, t time.Time) {
|
||||||
|
b.onBlockDisconnected(hash, height, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// onRedeemingTx...
|
||||||
|
func (b *BtcdNotifier) onRedeemingTx(transaction *btcutil.Tx, details *btcjson.BlockDetails) {
|
||||||
|
select {
|
||||||
|
case b.relevantTxs <- transaction:
|
||||||
|
case <-b.quit:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// notificationDispatcher...
|
// notificationDispatcher...
|
||||||
func (b *BtcdNotifier) notificationDispatcher() {
|
func (b *BtcdNotifier) notificationDispatcher() {
|
||||||
ntfnClient := b.ntfnSource.TransactionNotifications()
|
|
||||||
|
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -105,87 +152,38 @@ out:
|
|||||||
case *confirmationsNotification:
|
case *confirmationsNotification:
|
||||||
b.confNotifications[*msg.txid] = msg
|
b.confNotifications[*msg.txid] = msg
|
||||||
}
|
}
|
||||||
case txNtfn := <-ntfnClient.C:
|
case staleBlockHash := <-b.disconnectedBlockHashes:
|
||||||
// We're only concerned with newly mined blocks which
|
// TODO(roasbeef): re-orgs
|
||||||
// may or may not include transactions we are interested
|
// * second channel to notify of confirmation decrementing
|
||||||
// in.
|
// re-org?
|
||||||
if txNtfn.AttachedBlocks == nil {
|
// * notify of negative confirmations
|
||||||
break
|
fmt.Println(staleBlockHash)
|
||||||
}
|
case connectedBlock := <-b.connectedBlockHashes:
|
||||||
|
newBlock, err := b.chainConn.GetBlock(connectedBlock.sha)
|
||||||
newBlocks := txNtfn.AttachedBlocks
|
if err != nil {
|
||||||
for _, block := range newBlocks {
|
|
||||||
blockHeight := uint32(block.Height)
|
|
||||||
|
|
||||||
// Examine all transactions within the block
|
|
||||||
// in order to determine if this block includes a
|
|
||||||
// transactions spending one of the registered
|
|
||||||
// outpoints of interest.
|
|
||||||
for _, txSummary := range block.Transactions {
|
|
||||||
txBytes := bytes.NewReader(txSummary.Transaction)
|
|
||||||
tx := wire.NewMsgTx()
|
|
||||||
if err := tx.Deserialize(txBytes); err != nil {
|
|
||||||
// TODO(roasbeef): err
|
|
||||||
fmt.Println("unable to des tx: ", err)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
newHeight := connectedBlock.height
|
||||||
|
for _, tx := range newBlock.Transactions() {
|
||||||
// Check if the inclusion of this transaction
|
// Check if the inclusion of this transaction
|
||||||
// within a block by itself triggers a block
|
// within a block by itself triggers a block
|
||||||
// confirmation threshold, if so send a
|
// confirmation threshold, if so send a
|
||||||
// notification. Otherwise, place the notification
|
// notification. Otherwise, place the notification
|
||||||
// on a heap to be triggered in the future once
|
// on a heap to be triggered in the future once
|
||||||
// additional confirmations are attained.
|
// additional confirmations are attained.
|
||||||
txSha := tx.TxSha()
|
txSha := tx.Sha()
|
||||||
b.checkConfirmationTrigger(&txSha, blockHeight)
|
b.checkConfirmationTrigger(txSha, newHeight)
|
||||||
|
|
||||||
// Next, examine all the inputs spent, firing
|
|
||||||
// of a notification if it spends any of the
|
|
||||||
// outpoints within the set of our registered
|
|
||||||
// outputs.
|
|
||||||
b.checkSpendTrigger(tx)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// A new block has been connected to the main
|
// A new block has been connected to the main
|
||||||
// chain. Send out any N confirmation notifications
|
// chain. Send out any N confirmation notifications
|
||||||
// which may have been triggered by this new block.
|
// which may have been triggered by this new block.
|
||||||
b.notifyConfs(blockHeight)
|
b.notifyConfs(newHeight)
|
||||||
}
|
case newSpend := <-b.relevantTxs:
|
||||||
|
|
||||||
// TODO(roasbeef): re-orgs
|
|
||||||
// * second channel to notify of confirmation decrementing
|
|
||||||
// re-org?
|
|
||||||
// * notify of negative confirmations
|
|
||||||
fmt.Println(txNtfn.DetachedBlocks)
|
|
||||||
case <-b.quit:
|
|
||||||
break out
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// notifyConfs...
|
|
||||||
func (b *BtcdNotifier) notifyConfs(newBlockHeight uint32) {
|
|
||||||
// Traverse our confirmation heap. The heap is a
|
|
||||||
// min-heap, so the confirmation notification which requires
|
|
||||||
// the smallest block-height will always be at the top
|
|
||||||
// of the heap. If a confirmation notification is eligible
|
|
||||||
// for triggering, then fire it off, and check if another
|
|
||||||
// is eligible until there are no more eligible entries.
|
|
||||||
nextConf := heap.Pop(b.confHeap).(*confEntry)
|
|
||||||
for nextConf.triggerHeight <= newBlockHeight {
|
|
||||||
nextConf.finConf <- struct{}{}
|
|
||||||
|
|
||||||
nextConf = heap.Pop(b.confHeap).(*confEntry)
|
|
||||||
}
|
|
||||||
|
|
||||||
heap.Push(b.confHeap, nextConf)
|
|
||||||
}
|
|
||||||
|
|
||||||
// checkSpendTrigger...
|
|
||||||
func (b *BtcdNotifier) checkSpendTrigger(tx *wire.MsgTx) {
|
|
||||||
// First, check if this transaction spends an output
|
// First, check if this transaction spends an output
|
||||||
// that has an existing spend notification for it.
|
// that has an existing spend notification for it.
|
||||||
for i, txIn := range tx.TxIn {
|
for i, txIn := range newSpend.MsgTx().TxIn {
|
||||||
prevOut := txIn.PreviousOutPoint
|
prevOut := txIn.PreviousOutPoint
|
||||||
|
|
||||||
// If this transaction indeed does spend an
|
// If this transaction indeed does spend an
|
||||||
@ -194,12 +192,12 @@ func (b *BtcdNotifier) checkSpendTrigger(tx *wire.MsgTx) {
|
|||||||
// sending off the details to the notification
|
// sending off the details to the notification
|
||||||
// subscriber.
|
// subscriber.
|
||||||
if ntfn, ok := b.spendNotifications[prevOut]; ok {
|
if ntfn, ok := b.spendNotifications[prevOut]; ok {
|
||||||
spenderSha := tx.TxSha()
|
spenderSha := newSpend.Sha()
|
||||||
spendDetails := &chainntnfs.SpendDetail{
|
spendDetails := &chainntnfs.SpendDetail{
|
||||||
SpentOutPoint: ntfn.targetOutpoint,
|
SpentOutPoint: ntfn.targetOutpoint,
|
||||||
SpenderTxHash: &spenderSha,
|
SpenderTxHash: spenderSha,
|
||||||
// TODO(roasbeef): copy tx?
|
// TODO(roasbeef): copy tx?
|
||||||
SpendingTx: tx,
|
SpendingTx: newSpend.MsgTx(),
|
||||||
SpenderInputIndex: uint32(i),
|
SpenderInputIndex: uint32(i),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,17 +205,50 @@ func (b *BtcdNotifier) checkSpendTrigger(tx *wire.MsgTx) {
|
|||||||
delete(b.spendNotifications, prevOut)
|
delete(b.spendNotifications, prevOut)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case <-b.quit:
|
||||||
|
break out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.wg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
// notifyConfs...
|
||||||
|
func (b *BtcdNotifier) notifyConfs(newBlockHeight int32) {
|
||||||
|
// If the heap is empty, we have nothing to do.
|
||||||
|
if b.confHeap.Len() == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Traverse our confirmation heap. The heap is a
|
||||||
|
// min-heap, so the confirmation notification which requires
|
||||||
|
// the smallest block-height will always be at the top
|
||||||
|
// of the heap. If a confirmation notification is eligible
|
||||||
|
// for triggering, then fire it off, and check if another
|
||||||
|
// is eligible until there are no more eligible entries.
|
||||||
|
nextConf := heap.Pop(b.confHeap).(*confEntry)
|
||||||
|
for nextConf.triggerHeight <= uint32(newBlockHeight) {
|
||||||
|
nextConf.finConf <- struct{}{}
|
||||||
|
|
||||||
|
if b.confHeap.Len() == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nextConf = heap.Pop(b.confHeap).(*confEntry)
|
||||||
|
}
|
||||||
|
|
||||||
|
heap.Push(b.confHeap, nextConf)
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkConfirmationTrigger...
|
// checkConfirmationTrigger...
|
||||||
func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash,
|
// TODO(roasbeef): perheps lookup, then track by inputs instead?
|
||||||
blockHeight uint32) {
|
func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, blockHeight int32) {
|
||||||
// If a confirmation notification has been registered
|
// If a confirmation notification has been registered
|
||||||
// for this txid, then either trigger a notification
|
// for this txid, then either trigger a notification
|
||||||
// event if only a single confirmation notification was
|
// event if only a single confirmation notification was
|
||||||
// requested, or place the notification on the
|
// requested, or place the notification on the
|
||||||
// confirmation heap for future usage.
|
// confirmation heap for future usage.
|
||||||
if confNtfn, ok := b.confNotifications[*txSha]; ok {
|
if confNtfn, ok := b.confNotifications[*txSha]; ok {
|
||||||
|
delete(b.confNotifications, *txSha)
|
||||||
if confNtfn.numConfirmations == 1 {
|
if confNtfn.numConfirmations == 1 {
|
||||||
confNtfn.finConf <- struct{}{}
|
confNtfn.finConf <- struct{}{}
|
||||||
return
|
return
|
||||||
@ -229,10 +260,11 @@ func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash,
|
|||||||
// The heapConf allows us to easily keep track of
|
// The heapConf allows us to easily keep track of
|
||||||
// which notification(s) we should fire off with
|
// which notification(s) we should fire off with
|
||||||
// each incoming block.
|
// each incoming block.
|
||||||
confNtfn.initialConfirmHeight = blockHeight
|
confNtfn.initialConfirmHeight = uint32(blockHeight)
|
||||||
|
finalConfHeight := uint32(confNtfn.initialConfirmHeight + confNtfn.numConfirmations - 1)
|
||||||
heapEntry := &confEntry{
|
heapEntry := &confEntry{
|
||||||
confNtfn,
|
confNtfn,
|
||||||
confNtfn.initialConfirmHeight + confNtfn.numConfirmations,
|
finalConfHeight,
|
||||||
}
|
}
|
||||||
heap.Push(b.confHeap, heapEntry)
|
heap.Push(b.confHeap, heapEntry)
|
||||||
}
|
}
|
||||||
@ -248,8 +280,9 @@ type spendNotification struct {
|
|||||||
// RegisterSpendNotification...
|
// RegisterSpendNotification...
|
||||||
// NOTE: eventChan MUST be buffered
|
// NOTE: eventChan MUST be buffered
|
||||||
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.SpendEvent, error) {
|
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.SpendEvent, error) {
|
||||||
|
if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil {
|
||||||
// TODO(roasbeef): also register with rpc client? bool?
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
ntfn := &spendNotification{
|
ntfn := &spendNotification{
|
||||||
targetOutpoint: outpoint,
|
targetOutpoint: outpoint,
|
||||||
@ -270,7 +303,7 @@ type confirmationsNotification struct {
|
|||||||
numConfirmations uint32
|
numConfirmations uint32
|
||||||
|
|
||||||
finConf chan struct{}
|
finConf chan struct{}
|
||||||
negativeConf chan uint32
|
negativeConf chan int32
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterConfirmationsNotification...
|
// RegisterConfirmationsNotification...
|
||||||
@ -281,7 +314,7 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash,
|
|||||||
txid: txid,
|
txid: txid,
|
||||||
numConfirmations: numConfs,
|
numConfirmations: numConfs,
|
||||||
finConf: make(chan struct{}, 1),
|
finConf: make(chan struct{}, 1),
|
||||||
negativeConf: make(chan uint32, 1),
|
negativeConf: make(chan int32, 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
b.notificationRegistry <- ntfn
|
b.notificationRegistry <- ntfn
|
||||||
|
Loading…
Reference in New Issue
Block a user