chainntfs: create new ChainNotifier for bfcd-based notifications

* Currently depends on the wallet, but that will change in the near
future due to new additions to btcwallet currently under review.
* Re-orgs aren’t intelligently handled yet. To be done along with tests
in a later commit.
This commit is contained in:
Olaoluwa Osuntokun 2016-01-06 13:03:17 -08:00
parent 00f2a0d7c0
commit 986eb83ceb
2 changed files with 323 additions and 0 deletions

@ -0,0 +1,273 @@
package btcdnotify
import (
"container/heap"
"fmt"
"sync"
"sync/atomic"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/wtxmgr"
"li.lan/labs/plasma/chainntfs"
"li.lan/labs/plasma/lnwallet"
)
// BtcdNotifier...
type BtcdNotifier struct {
// TODO(roasbeef): refactor to use the new NotificationServer
wallet *lnwallet.LightningWallet
notificationRegistry chan interface{}
spendNotifications map[wire.OutPoint]*spendNotification
confNotifications map[wire.ShaHash]*confirmationsNotification
confHeap *confirmationHeap
connectedBlocks <-chan wtxmgr.BlockMeta
disconnectedBlocks <-chan wtxmgr.BlockMeta
relevantTxs <-chan chain.RelevantTx
managerLocked <-chan bool
confirmedBalance <-chan btcutil.Amount
unconfirmedBalance <-chan btcutil.Amount
rpcConnected chan struct{}
wg sync.WaitGroup
started int32 // To be used atomically
stopped int32 // To be used atomically
quit chan struct{}
}
var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
// NewBtcdNotifier...
func NewBtcdNotifier() (*BtcdNotifier, error) {
return &BtcdNotifier{
notificationRegistry: make(chan interface{}),
spendNotifications: make(map[wire.OutPoint]*spendNotification),
confNotifications: make(map[wire.ShaHash]*confirmationsNotification),
confHeap: newConfirmationHeap(),
connectedBlocks: make(chan wtxmgr.BlockMeta),
disconnectedBlocks: make(chan wtxmgr.BlockMeta),
relevantTxs: make(chan chain.RelevantTx),
managerLocked: make(chan bool),
confirmedBalance: make(chan btcutil.Amount),
unconfirmedBalance: make(chan btcutil.Amount),
rpcConnected: make(chan struct{}, 1),
quit: make(chan struct{}),
}, nil
}
// Start...
func (b *BtcdNotifier) Start() error {
// Already started?
if atomic.AddInt32(&b.started, 1) != 1 {
return nil
}
b.wg.Add(1)
go b.notificationDispatcher()
b.rpcConnected <- struct{}{} // TODO(roasbeef) why?
return nil
}
// Stop...
func (b *BtcdNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil
}
close(b.quit)
b.wg.Wait()
return nil
}
// notificationDispatcher...
func (b *BtcdNotifier) notificationDispatcher() {
out:
for {
select {
case <-b.rpcConnected:
err := b.initAllNotifications()
fmt.Println(err)
case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) {
case *spendNotification:
b.spendNotifications[*msg.outpoint] = msg
case *confirmationsNotification:
b.confNotifications[*msg.txid] = msg
}
case txNtfn := <-b.relevantTxs:
tx := txNtfn.TxRecord.MsgTx
txMined := txNtfn.Block != nil
// First, check if this transaction spends an output
// that has an existing spend notification for it.
for _, txIn := range tx.TxIn {
prevOut := txIn.PreviousOutPoint
if ntfn, ok := b.spendNotifications[prevOut]; ok {
go triggerNtfn(ntfn.trigger)
delete(b.spendNotifications, prevOut)
}
}
// If the transaction has been mined, then we check if
// a notification for the confirmation of this txid has
// been registered previously. Otherwise, we're done,
// for now.
if !txMined {
break
}
// If a confirmation notification has been registered
// for this txid, then either trigger a notification
// event if only a single confirmation notification was
// requested, or place the notification on the
// confirmation heap for future usage.
if confNtfn, ok := b.confNotifications[tx.TxSha()]; ok {
if confNtfn.numConfirmations == 1 {
go triggerNtfn(confNtfn.trigger)
break
}
// The registered notification requires more
// than one confirmation before triggering. So
// we create a heapConf entry for this notification.
// The heapConf allows us to easily keep track of
// which notification(s) we should fire off with
// each incoming block.
confNtfn.initialConfirmHeight = uint32(txNtfn.Block.Height)
heapEntry := &confEntry{
confNtfn,
confNtfn.initialConfirmHeight + confNtfn.numConfirmations,
}
heap.Push(b.confHeap, heapEntry)
}
case blockNtfn := <-b.connectedBlocks:
blockHeight := uint32(blockNtfn.Height)
// Traverse our confirmation heap. The heap is a
// min-heap, so the confirmation notification which requires
// the smallest block-height will always be at the top
// of the heap. If a confirmation notification is eligible
// for triggering, then fire it off, and check if another
// is eligible until there are no more eligible entries.
nextConf := heap.Pop(b.confHeap).(*confEntry)
for nextConf.triggerHeight <= blockHeight {
triggerNtfn(nextConf.trigger)
nextConf = heap.Pop(b.confHeap).(*confEntry)
}
heap.Push(b.confHeap, nextConf)
case delBlockNtfn := <-b.disconnectedBlocks:
// TODO(roasbeef): re-orgs
// * second channel to notify of confirmation decrementing
// re-org?
fmt.Println(delBlockNtfn)
case <-b.quit:
break out
}
}
}
// initAllNotifications...
func (b *BtcdNotifier) initAllNotifications() error {
var err error
b.connectedBlocks, err = b.wallet.ListenConnectedBlocks()
if err != nil {
return err
}
b.disconnectedBlocks, err = b.wallet.ListenDisconnectedBlocks()
if err != nil {
return err
}
b.relevantTxs, err = b.wallet.ListenRelevantTxs()
if err != nil {
return err
}
b.managerLocked, err = b.wallet.ListenLockStatus()
if err != nil {
return err
}
b.confirmedBalance, err = b.wallet.ListenConfirmedBalance()
if err != nil {
return err
}
b.unconfirmedBalance, err = b.wallet.ListenUnconfirmedBalance()
if err != nil {
return err
}
return nil
}
// spendNotification....
type spendNotification struct {
outpoint *wire.OutPoint
trigger *chainntnfs.NotificationTrigger
}
// confirmationNotification...
// TODO(roasbeef): re-org funny business
type confirmationsNotification struct {
txid *wire.ShaHash
initialConfirmHeight uint32
numConfirmations uint32
trigger *chainntnfs.NotificationTrigger
}
// RegisterSpendNotification...
// NOTE: eventChan MUST be buffered
func (b *BtcdNotifier) RegisterSpendNotification(outpoint *wire.OutPoint,
trigger *chainntnfs.NotificationTrigger) error {
// TODO(roasbeef): also register with rpc client? bool?
ntfn := &spendNotification{
outpoint: outpoint,
trigger: trigger,
}
b.notificationRegistry <- ntfn
return nil
}
// RegisterConfirmationsNotification...
func (b *BtcdNotifier) RegisterConfirmationsNotification(txid *wire.ShaHash,
numConfs uint32, trigger *chainntnfs.NotificationTrigger) error {
ntfn := &confirmationsNotification{
txid: txid,
numConfirmations: numConfs,
trigger: trigger,
}
b.notificationRegistry <- ntfn
return nil
}
func triggerNtfn(t *chainntnfs.NotificationTrigger) {
if t.Callback != nil {
go t.Callback()
}
t.TriggerChan <- struct{}{}
}

@ -0,0 +1,50 @@
package btcdnotify
// confEntry...
type confEntry struct {
*confirmationsNotification
triggerHeight uint32
}
// confirmationHeap...
type confirmationHeap struct {
items []*confEntry
}
func newConfirmationHeap() *confirmationHeap {
var confItems []*confEntry
return &confirmationHeap{confItems}
}
// Len returns the number of items in the priority queue. It is part of the
// heap.Interface implementation.
func (c *confirmationHeap) Len() int { return len(c.items) }
// Less returns whether the item in the priority queue with index i should sort
// before the item with index j. It is part of the heap.Interface implementation.
func (c *confirmationHeap) Less(i, j int) bool {
return c.items[i].triggerHeight < c.items[j].triggerHeight
}
// Swap swaps the items at the passed indices in the priority queue. It is
// part of the heap.Interface implementation.
func (c *confirmationHeap) Swap(i, j int) {
c.items[i], c.items[j] = c.items[j], c.items[i]
}
// Push pushes the passed item onto the priority queue. It is part of the
// heap.Interface implementation.
func (c *confirmationHeap) Push(x interface{}) {
c.items = append(c.items, x.(*confEntry))
}
// Pop removes the highest priority item (according to Less) from the priority
// queue and returns it. It is part of the heap.Interface implementation.
func (c *confirmationHeap) Pop() interface{} {
n := len(c.items)
x := c.items[n-1]
c.items = c.items[0 : n-1]
c.items[n] = nil
return x
}