From d1f12627d29a7c336017bd725df48168a097eb33 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 27 Nov 2016 19:17:27 -0800 Subject: [PATCH] chainntfns/btcdnotify: add support for multi-dispatch spend notifications This commit adds support for dispatching the same spend notification to multiple clients. This is now required by the ChainNotiifer interface documentation and will be needed within the daemon in order to support some upcoming refactors. --- chainntnfs/btcdnotify/btcd.go | 66 ++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 38b0330f..66f2883e 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -47,9 +47,7 @@ type BtcdNotifier struct { notificationRegistry chan interface{} - // TODO(roasbeef): make map point to slices? Would allow for multiple - // clients to listen for same spend. Would we ever need this? - spendNotifications map[wire.OutPoint]*spendNotification + spendNotifications map[wire.OutPoint][]*spendNotification confNotifications map[wire.ShaHash][]*confirmationsNotification confHeap *confirmationHeap @@ -74,13 +72,13 @@ type BtcdNotifier struct { var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // New returns a new BtcdNotifier instance. This function assumes the btcd node -// detailed in the passed configuration is already running, and -// willing to accept new websockets clients. +// detailed in the passed configuration is already running, and willing to +// accept new websockets clients. func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { notifier := &BtcdNotifier{ notificationRegistry: make(chan interface{}), - spendNotifications: make(map[wire.OutPoint]*spendNotification), + spendNotifications: make(map[wire.OutPoint][]*spendNotification), confNotifications: make(map[wire.ShaHash][]*confirmationsNotification), confHeap: newConfirmationHeap(), @@ -150,8 +148,10 @@ func (b *BtcdNotifier) Stop() error { // Notify all pending clients of our shutdown by closing the related // notification channels. - for _, spendClient := range b.spendNotifications { - close(spendClient.spendChan) + for _, spendClients := range b.spendNotifications { + for _, spendClient := range spendClients { + close(spendClient.spendChan) + } } for _, confClients := range b.confNotifications { for _, confClient := range confClients { @@ -194,8 +194,8 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *wire.ShaHash, height int32, t t // onRedeemingTx implements on OnRedeemingTx callback for btcrpcclient. func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { - // Append this new transaction update to the end of the queue of new chain - // updates. + // Append this new transaction update to the end of the queue of new + // chain updates. b.txUpdateMtx.Lock() b.txUpdates = append(b.txUpdates, &txUpdate{tx, details}) b.txUpdateMtx.Unlock() @@ -219,11 +219,15 @@ out: case *spendNotification: chainntnfs.Log.Infof("New spend subscription: "+ "utxo=%v", msg.targetOutpoint) - b.spendNotifications[*msg.targetOutpoint] = msg + op := *msg.targetOutpoint + b.spendNotifications[op] = append(b.spendNotifications[op], msg) case *confirmationsNotification: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", *msg.txid, msg.numConfirmations) + // TODO(roasbeef): perform a N-block look + // behind to catch race-condition due to faster + // inter-block time? txid := *msg.txid b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: @@ -264,9 +268,10 @@ out: // Check if the inclusion of this transaction // within a block by itself triggers a block // confirmation threshold, if so send a - // notification. Otherwise, place the notification - // on a heap to be triggered in the future once - // additional confirmations are attained. + // notification. Otherwise, place the + // notification on a heap to be triggered in + // the future once additional confirmations are + // attained. txSha := tx.Sha() b.checkConfirmationTrigger(txSha, newHeight) } @@ -292,24 +297,27 @@ out: prevOut := txIn.PreviousOutPoint // If this transaction indeed does spend an - // output which we have a registered notification - // for, then create a spend summary, finally - // sending off the details to the notification - // subscriber. - if ntfn, ok := b.spendNotifications[prevOut]; ok { + // output which we have a registered + // notification for, then create a spend + // summary, finally sending off the details to + // the notification subscriber. + if clients, ok := b.spendNotifications[prevOut]; ok { spenderSha := newSpend.tx.Sha() - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: ntfn.targetOutpoint, - SpenderTxHash: spenderSha, - // TODO(roasbeef): copy tx? - SpendingTx: spendingTx.MsgTx(), - SpenderInputIndex: uint32(i), + for _, ntfn := range clients { + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: ntfn.targetOutpoint, + SpenderTxHash: spenderSha, + // TODO(roasbeef): copy tx? + SpendingTx: spendingTx.MsgTx(), + SpenderInputIndex: uint32(i), + } + + chainntnfs.Log.Infof("Dispatching "+ + "spend notification for "+ + "outpoint=%v", ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails } - chainntnfs.Log.Infof("Dispatching "+ - "spend notification for "+ - "outpoint=%v", ntfn.targetOutpoint) - ntfn.spendChan <- spendDetails delete(b.spendNotifications, prevOut) } }