From 73cc28d5fbb94d3a003cdaef066d02d0d30008cd Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 20 Feb 2017 16:31:16 -0800 Subject: [PATCH] chainntnfs/btcdnotify: implement spend+epoch ntfn cancellations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit minifies the BtcdNotifier concrete implementation of the ChainNotifier interface to allow callers to optionally cancel an outstanding block epoch or spend notificaiton intent. To do this efficiently, we now give each notification intent a unique ID based on if it’s an epoch intent or a spend intent. We then use this ID to reference back to the original un-dispatched notification intent when the caller wishes to cancel the intent. --- chainntnfs/btcdnotify/btcd.go | 99 +++++++++++++++++++++++++++++++---- 1 file changed, 89 insertions(+), 10 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index f16ce7ce..9dc2e0e8 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -43,23 +43,30 @@ type txUpdate struct { details *btcjson.BlockDetails } +// TODO(roasbeef): generalize struct below: +// * move chans to config, allow outside callers to handle send conditions + // BtcdNotifier implements the ChainNotifier interface using btcd's websockets // notifications. Multiple concurrent clients are supported. All notifications // are achieved via non-blocking sends on client channels. type BtcdNotifier struct { + spendClientCounter uint64 // To be used atomically. + epochClientCounter uint64 // To be used atomically. + started int32 // To be used atomically. stopped int32 // To be used atomically. chainConn *btcrpcclient.Client + notificationCancels chan interface{} notificationRegistry chan interface{} - spendNotifications map[wire.OutPoint][]*spendNotification + spendNotifications map[wire.OutPoint]map[uint64]*spendNotification confNotifications map[chainhash.Hash][]*confirmationsNotification confHeap *confirmationHeap - blockEpochClients []chan *chainntnfs.BlockEpoch + blockEpochClients map[uint64]chan *chainntnfs.BlockEpoch disconnectedBlockHashes chan *blockNtfn @@ -83,11 +90,15 @@ var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // accept new websockets clients. func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { notifier := &BtcdNotifier{ + notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), - spendNotifications: make(map[wire.OutPoint][]*spendNotification), - confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), - confHeap: newConfirmationHeap(), + blockEpochClients: make(map[uint64]chan *chainntnfs.BlockEpoch), + + spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), + + confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), + confHeap: newConfirmationHeap(), disconnectedBlockHashes: make(chan *blockNtfn, 20), @@ -229,13 +240,38 @@ func (b *BtcdNotifier) notificationDispatcher(currentHeight int32) { out: for { select { + case cancelMsg := <-b.notificationCancels: + switch msg := cancelMsg.(type) { + case *spendCancel: + chainntnfs.Log.Infof("Cancelling spend "+ + "notification for out_point=%v, "+ + "spend_id=%v", msg.op, msg.spendID) + + // Before we attempt to close the spendChan, + // ensure that the notification hasn't already + // yet been dispatched. + if outPointClients, ok := b.spendNotifications[msg.op]; ok { + close(outPointClients[msg.spendID].spendChan) + delete(b.spendNotifications[msg.op], msg.spendID) + } + case *epochCancel: + chainntnfs.Log.Infof("Cancelling epoch "+ + "notification, epoch_id=%v", msg.epochID) + + close(b.blockEpochClients[msg.epochID]) + delete(b.blockEpochClients, msg.epochID) + } case registerMsg := <-b.notificationRegistry: switch msg := registerMsg.(type) { case *spendNotification: chainntnfs.Log.Infof("New spend subscription: "+ "utxo=%v", msg.targetOutpoint) op := *msg.targetOutpoint - b.spendNotifications[op] = append(b.spendNotifications[op], msg) + + if _, ok := b.spendNotifications[op]; !ok { + b.spendNotifications[op] = make(map[uint64]*spendNotification) + } + b.spendNotifications[op][msg.spendID] = msg case *confirmationsNotification: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", @@ -252,9 +288,9 @@ out: b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") - b.blockEpochClients = append(b.blockEpochClients, - msg.epochChan) + b.blockEpochClients[msg.epochID] = msg.epochChan } + case staleBlockHash := <-b.disconnectedBlockHashes: // TODO(roasbeef): re-orgs // * second channel to notify of confirmation decrementing @@ -262,6 +298,7 @@ out: // * notify of negative confirmations chainntnfs.Log.Warnf("Block disconnected from main "+ "chain: %v", staleBlockHash) + case <-b.chainUpdateSignal: // A new update is available, so pop the new chain // update from the front of the update queue. @@ -303,6 +340,7 @@ out: // chain. Send out any N confirmation notifications // which may have been triggered by this new block. b.notifyConfs(newHeight) + case <-b.txUpdateSignal: // A new update is available, so pop the new chain // update from the front of the update queue. @@ -344,6 +382,7 @@ out: delete(b.spendNotifications, prevOut) } } + case <-b.quit: break out } @@ -437,7 +476,6 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash Hash: newSha, } - // TODO(roasbeef): spwan a new goroutine for each client instead? for _, epochChan := range b.blockEpochClients { // Attempt a non-blocking send. If the buffered channel is // full, then we no-op and move onto the next client. @@ -542,6 +580,18 @@ type spendNotification struct { targetOutpoint *wire.OutPoint spendChan chan *chainntnfs.SpendDetail + + spendID uint64 +} + +// spendCancel is a message sent to the BtcdNotifier when a client wishes to +// cancel an outstanding spend notification that has yet to be dispatched. +type spendCancel struct { + // op is the target outpoint of the notification to be cancelled. + op wire.OutPoint + + // spendID the ID of the notification to cancel. + spendID uint64 } // RegisterSpendNotification registers an intent to be notified once the target @@ -557,6 +607,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), + spendID: atomic.AddUint64(&b.spendClientCounter, 1), } select { @@ -594,6 +645,16 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S return &chainntnfs.SpendEvent{ Spend: ntfn.spendChan, + Cancel: func() { + select { + case b.notificationCancels <- &spendCancel{ + op: *outpoint, + spendID: ntfn.spendID, + }: + case <-b.quit: + return + } + }, }, nil } @@ -637,14 +698,23 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, // notification with each newly connected block. type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + + epochID uint64 +} + +// epochCancel is a message sent to the BtcdNotifier when a client wishes to +// cancel an outstanding epoch notification that has yet to be dispatched. +type epochCancel struct { + epochID uint64 } // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the -// caller to receive notificationsm, of each new block connected to the main +// caller to receive notifications, of each new block connected to the main // chain. func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { registration := &blockEpochRegistration{ epochChan: make(chan *chainntnfs.BlockEpoch, 20), + epochID: atomic.AddUint64(&b.epochClientCounter, 1), } select { @@ -654,6 +724,15 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, er case b.notificationRegistry <- registration: return &chainntnfs.BlockEpochEvent{ Epochs: registration.epochChan, + Cancel: func() { + select { + case b.notificationCancels <- &epochCancel{ + epochID: registration.epochID, + }: + case <-b.quit: + return + } + }, }, nil } }