chainntfns: update BtcdNotifier to adhere to new ChainNotifier interface

* Re-orgs are still unhanded.

* RegisterSpendNtfn should perhaps also register directly with the rpc
client instead of pushing the responsibility to the caller.
This commit is contained in:
Olaoluwa Osuntokun 2016-02-16 14:46:18 -08:00
parent b913bda472
commit c7402f3462

@ -14,11 +14,16 @@ import (
// BtcdNotifier...
type BtcdNotifier struct {
started int32 // To be used atomically
stopped int32 // To be used atomically
// TODO(roasbeef): refactor to use the new NotificationServer
conn ChainConnection
notificationRegistry chan interface{}
// TODO(roasbeef): make map point to slices? Would allow for multiple
// clients to listen for same spend. Would we ever need this?
spendNotifications map[wire.OutPoint]*spendNotification
confNotifications map[wire.ShaHash]*confirmationsNotification
confHeap *confirmationHeap
@ -29,16 +34,15 @@ type BtcdNotifier struct {
rpcConnected chan struct{}
wg sync.WaitGroup
started int32 // To be used atomically
stopped int32 // To be used atomically
quit chan struct{}
wg sync.WaitGroup
quit chan struct{}
}
var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
// NewBtcdNotifier...
func NewBtcdNotifier(c ChainConnection) (*BtcdNotifier, error) {
// TODO(roasbeef): take client also in order to get notifications?
return &BtcdNotifier{
conn: c,
notificationRegistry: make(chan interface{}),
@ -95,7 +99,7 @@ out:
case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) {
case *spendNotification:
b.spendNotifications[*msg.outpoint] = msg
b.spendNotifications[*msg.targetOutpoint] = msg
case *confirmationsNotification:
b.confNotifications[*msg.txid] = msg
}
@ -105,12 +109,25 @@ out:
// First, check if this transaction spends an output
// that has an existing spend notification for it.
for _, txIn := range tx.TxIn {
for i, txIn := range tx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an
// output which we have a registered notification
// for, then create a spend summary, finally
// sending off the details to the notification
// subscriber.
if ntfn, ok := b.spendNotifications[prevOut]; ok {
go triggerNtfn(ntfn.trigger)
spenderSha := tx.TxSha()
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: ntfn.targetOutpoint,
SpenderTxHash: &spenderSha,
// TODO(roasbeef): copy tx?
SpendingTx: &tx,
SpenderInputIndex: uint32(i),
}
ntfn.spendChan <- spendDetails
delete(b.spendNotifications, prevOut)
}
}
@ -130,7 +147,7 @@ out:
// confirmation heap for future usage.
if confNtfn, ok := b.confNotifications[tx.TxSha()]; ok {
if confNtfn.numConfirmations == 1 {
go triggerNtfn(confNtfn.trigger)
confNtfn.finConf <- struct{}{}
break
}
@ -158,7 +175,7 @@ out:
// is eligible until there are no more eligible entries.
nextConf := heap.Pop(b.confHeap).(*confEntry)
for nextConf.triggerHeight <= blockHeight {
triggerNtfn(nextConf.trigger)
nextConf.finConf <- struct{}{}
nextConf = heap.Pop(b.confHeap).(*confEntry)
}
@ -168,6 +185,7 @@ out:
// TODO(roasbeef): re-orgs
// * second channel to notify of confirmation decrementing
// re-org?
// * notify of negative confirmations
fmt.Println(delBlockNtfn)
case <-b.quit:
break out
@ -197,9 +215,25 @@ func (b *BtcdNotifier) initAllNotifications() error {
// spendNotification....
type spendNotification struct {
outpoint *wire.OutPoint
targetOutpoint *wire.OutPoint
trigger *chainntnfs.NotificationTrigger
spendChan chan *chainntnfs.SpendDetail
}
// RegisterSpendNotification...
// NOTE: eventChan MUST be buffered
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.SpendEvent, error) {
// TODO(roasbeef): also register with rpc client? bool?
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
}
b.notificationRegistry <- ntfn
return &chainntnfs.SpendEvent{ntfn.spendChan}, nil
}
// confirmationNotification...
@ -210,45 +244,25 @@ type confirmationsNotification struct {
initialConfirmHeight uint32
numConfirmations uint32
trigger *chainntnfs.NotificationTrigger
}
// RegisterSpendNotification...
// NOTE: eventChan MUST be buffered
func (b *BtcdNotifier) RegisterSpendNotification(outpoint *wire.OutPoint,
trigger *chainntnfs.NotificationTrigger) error {
// TODO(roasbeef): also register with rpc client? bool?
ntfn := &spendNotification{
outpoint: outpoint,
trigger: trigger,
}
b.notificationRegistry <- ntfn
return nil
finConf chan struct{}
negativeConf chan uint32
}
// RegisterConfirmationsNotification...
func (b *BtcdNotifier) RegisterConfirmationsNotification(txid *wire.ShaHash,
numConfs uint32, trigger *chainntnfs.NotificationTrigger) error {
func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash,
numConfs uint32) (*chainntnfs.ConfirmationEvent, error) {
ntfn := &confirmationsNotification{
txid: txid,
numConfirmations: numConfs,
trigger: trigger,
finConf: make(chan struct{}, 1),
negativeConf: make(chan uint32, 1),
}
b.notificationRegistry <- ntfn
return nil
}
func triggerNtfn(t *chainntnfs.NotificationTrigger) {
if t.Callback != nil {
go t.Callback()
}
t.TriggerChan <- struct{}{}
return &chainntnfs.ConfirmationEvent{
Confirmed: ntfn.finConf,
NegativeConf: ntfn.negativeConf,
}, nil
}