From c7402f34620dc4b8955c9db329d20c416445ba14 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 16 Feb 2016 14:46:18 -0800 Subject: [PATCH] chainntfns: update BtcdNotifier to adhere to new ChainNotifier interface * Re-orgs are still unhanded. * RegisterSpendNtfn should perhaps also register directly with the rpc client instead of pushing the responsibility to the caller. --- chainntfs/btcdnotify/btcd.go | 96 +++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 41 deletions(-) diff --git a/chainntfs/btcdnotify/btcd.go b/chainntfs/btcdnotify/btcd.go index df45d683..eb6e53f5 100644 --- a/chainntfs/btcdnotify/btcd.go +++ b/chainntfs/btcdnotify/btcd.go @@ -14,11 +14,16 @@ import ( // BtcdNotifier... type BtcdNotifier struct { + started int32 // To be used atomically + stopped int32 // To be used atomically + // TODO(roasbeef): refactor to use the new NotificationServer conn ChainConnection notificationRegistry chan interface{} + // TODO(roasbeef): make map point to slices? Would allow for multiple + // clients to listen for same spend. Would we ever need this? spendNotifications map[wire.OutPoint]*spendNotification confNotifications map[wire.ShaHash]*confirmationsNotification confHeap *confirmationHeap @@ -29,16 +34,15 @@ type BtcdNotifier struct { rpcConnected chan struct{} - wg sync.WaitGroup - started int32 // To be used atomically - stopped int32 // To be used atomically - quit chan struct{} + wg sync.WaitGroup + quit chan struct{} } var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // NewBtcdNotifier... func NewBtcdNotifier(c ChainConnection) (*BtcdNotifier, error) { + // TODO(roasbeef): take client also in order to get notifications? return &BtcdNotifier{ conn: c, notificationRegistry: make(chan interface{}), @@ -95,7 +99,7 @@ out: case registerMsg := <-b.notificationRegistry: switch msg := registerMsg.(type) { case *spendNotification: - b.spendNotifications[*msg.outpoint] = msg + b.spendNotifications[*msg.targetOutpoint] = msg case *confirmationsNotification: b.confNotifications[*msg.txid] = msg } @@ -105,12 +109,25 @@ out: // First, check if this transaction spends an output // that has an existing spend notification for it. - for _, txIn := range tx.TxIn { + for i, txIn := range tx.TxIn { prevOut := txIn.PreviousOutPoint + // If this transaction indeed does spend an + // output which we have a registered notification + // for, then create a spend summary, finally + // sending off the details to the notification + // subscriber. if ntfn, ok := b.spendNotifications[prevOut]; ok { - go triggerNtfn(ntfn.trigger) + spenderSha := tx.TxSha() + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: ntfn.targetOutpoint, + SpenderTxHash: &spenderSha, + // TODO(roasbeef): copy tx? + SpendingTx: &tx, + SpenderInputIndex: uint32(i), + } + ntfn.spendChan <- spendDetails delete(b.spendNotifications, prevOut) } } @@ -130,7 +147,7 @@ out: // confirmation heap for future usage. if confNtfn, ok := b.confNotifications[tx.TxSha()]; ok { if confNtfn.numConfirmations == 1 { - go triggerNtfn(confNtfn.trigger) + confNtfn.finConf <- struct{}{} break } @@ -158,7 +175,7 @@ out: // is eligible until there are no more eligible entries. nextConf := heap.Pop(b.confHeap).(*confEntry) for nextConf.triggerHeight <= blockHeight { - triggerNtfn(nextConf.trigger) + nextConf.finConf <- struct{}{} nextConf = heap.Pop(b.confHeap).(*confEntry) } @@ -168,6 +185,7 @@ out: // TODO(roasbeef): re-orgs // * second channel to notify of confirmation decrementing // re-org? + // * notify of negative confirmations fmt.Println(delBlockNtfn) case <-b.quit: break out @@ -197,9 +215,25 @@ func (b *BtcdNotifier) initAllNotifications() error { // spendNotification.... type spendNotification struct { - outpoint *wire.OutPoint + targetOutpoint *wire.OutPoint - trigger *chainntnfs.NotificationTrigger + spendChan chan *chainntnfs.SpendDetail +} + +// RegisterSpendNotification... +// NOTE: eventChan MUST be buffered +func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.SpendEvent, error) { + + // TODO(roasbeef): also register with rpc client? bool? + + ntfn := &spendNotification{ + targetOutpoint: outpoint, + spendChan: make(chan *chainntnfs.SpendDetail, 1), + } + + b.notificationRegistry <- ntfn + + return &chainntnfs.SpendEvent{ntfn.spendChan}, nil } // confirmationNotification... @@ -210,45 +244,25 @@ type confirmationsNotification struct { initialConfirmHeight uint32 numConfirmations uint32 - trigger *chainntnfs.NotificationTrigger -} - -// RegisterSpendNotification... -// NOTE: eventChan MUST be buffered -func (b *BtcdNotifier) RegisterSpendNotification(outpoint *wire.OutPoint, - trigger *chainntnfs.NotificationTrigger) error { - - // TODO(roasbeef): also register with rpc client? bool? - - ntfn := &spendNotification{ - outpoint: outpoint, - trigger: trigger, - } - - b.notificationRegistry <- ntfn - - return nil + finConf chan struct{} + negativeConf chan uint32 } // RegisterConfirmationsNotification... -func (b *BtcdNotifier) RegisterConfirmationsNotification(txid *wire.ShaHash, - numConfs uint32, trigger *chainntnfs.NotificationTrigger) error { +func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash, + numConfs uint32) (*chainntnfs.ConfirmationEvent, error) { ntfn := &confirmationsNotification{ txid: txid, numConfirmations: numConfs, - trigger: trigger, + finConf: make(chan struct{}, 1), + negativeConf: make(chan uint32, 1), } b.notificationRegistry <- ntfn - return nil -} - -func triggerNtfn(t *chainntnfs.NotificationTrigger) { - if t.Callback != nil { - go t.Callback() - } - - t.TriggerChan <- struct{}{} + return &chainntnfs.ConfirmationEvent{ + Confirmed: ntfn.finConf, + NegativeConf: ntfn.negativeConf, + }, nil }