diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index f16ce7ce..9dc2e0e8 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -43,23 +43,30 @@ type txUpdate struct { details *btcjson.BlockDetails } +// TODO(roasbeef): generalize struct below: +// * move chans to config, allow outside callers to handle send conditions + // BtcdNotifier implements the ChainNotifier interface using btcd's websockets // notifications. Multiple concurrent clients are supported. All notifications // are achieved via non-blocking sends on client channels. type BtcdNotifier struct { + spendClientCounter uint64 // To be used atomically. + epochClientCounter uint64 // To be used atomically. + started int32 // To be used atomically. stopped int32 // To be used atomically. chainConn *btcrpcclient.Client + notificationCancels chan interface{} notificationRegistry chan interface{} - spendNotifications map[wire.OutPoint][]*spendNotification + spendNotifications map[wire.OutPoint]map[uint64]*spendNotification confNotifications map[chainhash.Hash][]*confirmationsNotification confHeap *confirmationHeap - blockEpochClients []chan *chainntnfs.BlockEpoch + blockEpochClients map[uint64]chan *chainntnfs.BlockEpoch disconnectedBlockHashes chan *blockNtfn @@ -83,11 +90,15 @@ var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // accept new websockets clients. func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { notifier := &BtcdNotifier{ + notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), - spendNotifications: make(map[wire.OutPoint][]*spendNotification), - confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), - confHeap: newConfirmationHeap(), + blockEpochClients: make(map[uint64]chan *chainntnfs.BlockEpoch), + + spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), + + confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), + confHeap: newConfirmationHeap(), disconnectedBlockHashes: make(chan *blockNtfn, 20), @@ -229,13 +240,38 @@ func (b *BtcdNotifier) notificationDispatcher(currentHeight int32) { out: for { select { + case cancelMsg := <-b.notificationCancels: + switch msg := cancelMsg.(type) { + case *spendCancel: + chainntnfs.Log.Infof("Cancelling spend "+ + "notification for out_point=%v, "+ + "spend_id=%v", msg.op, msg.spendID) + + // Before we attempt to close the spendChan, + // ensure that the notification hasn't already + // yet been dispatched. + if outPointClients, ok := b.spendNotifications[msg.op]; ok { + close(outPointClients[msg.spendID].spendChan) + delete(b.spendNotifications[msg.op], msg.spendID) + } + case *epochCancel: + chainntnfs.Log.Infof("Cancelling epoch "+ + "notification, epoch_id=%v", msg.epochID) + + close(b.blockEpochClients[msg.epochID]) + delete(b.blockEpochClients, msg.epochID) + } case registerMsg := <-b.notificationRegistry: switch msg := registerMsg.(type) { case *spendNotification: chainntnfs.Log.Infof("New spend subscription: "+ "utxo=%v", msg.targetOutpoint) op := *msg.targetOutpoint - b.spendNotifications[op] = append(b.spendNotifications[op], msg) + + if _, ok := b.spendNotifications[op]; !ok { + b.spendNotifications[op] = make(map[uint64]*spendNotification) + } + b.spendNotifications[op][msg.spendID] = msg case *confirmationsNotification: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", @@ -252,9 +288,9 @@ out: b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") - b.blockEpochClients = append(b.blockEpochClients, - msg.epochChan) + b.blockEpochClients[msg.epochID] = msg.epochChan } + case staleBlockHash := <-b.disconnectedBlockHashes: // TODO(roasbeef): re-orgs // * second channel to notify of confirmation decrementing @@ -262,6 +298,7 @@ out: // * notify of negative confirmations chainntnfs.Log.Warnf("Block disconnected from main "+ "chain: %v", staleBlockHash) + case <-b.chainUpdateSignal: // A new update is available, so pop the new chain // update from the front of the update queue. @@ -303,6 +340,7 @@ out: // chain. Send out any N confirmation notifications // which may have been triggered by this new block. b.notifyConfs(newHeight) + case <-b.txUpdateSignal: // A new update is available, so pop the new chain // update from the front of the update queue. @@ -344,6 +382,7 @@ out: delete(b.spendNotifications, prevOut) } } + case <-b.quit: break out } @@ -437,7 +476,6 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash Hash: newSha, } - // TODO(roasbeef): spwan a new goroutine for each client instead? for _, epochChan := range b.blockEpochClients { // Attempt a non-blocking send. If the buffered channel is // full, then we no-op and move onto the next client. @@ -542,6 +580,18 @@ type spendNotification struct { targetOutpoint *wire.OutPoint spendChan chan *chainntnfs.SpendDetail + + spendID uint64 +} + +// spendCancel is a message sent to the BtcdNotifier when a client wishes to +// cancel an outstanding spend notification that has yet to be dispatched. +type spendCancel struct { + // op is the target outpoint of the notification to be cancelled. + op wire.OutPoint + + // spendID the ID of the notification to cancel. + spendID uint64 } // RegisterSpendNotification registers an intent to be notified once the target @@ -557,6 +607,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), + spendID: atomic.AddUint64(&b.spendClientCounter, 1), } select { @@ -594,6 +645,16 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S return &chainntnfs.SpendEvent{ Spend: ntfn.spendChan, + Cancel: func() { + select { + case b.notificationCancels <- &spendCancel{ + op: *outpoint, + spendID: ntfn.spendID, + }: + case <-b.quit: + return + } + }, }, nil } @@ -637,14 +698,23 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, // notification with each newly connected block. type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + + epochID uint64 +} + +// epochCancel is a message sent to the BtcdNotifier when a client wishes to +// cancel an outstanding epoch notification that has yet to be dispatched. +type epochCancel struct { + epochID uint64 } // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the -// caller to receive notificationsm, of each new block connected to the main +// caller to receive notifications, of each new block connected to the main // chain. func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { registration := &blockEpochRegistration{ epochChan: make(chan *chainntnfs.BlockEpoch, 20), + epochID: atomic.AddUint64(&b.epochClientCounter, 1), } select { @@ -654,6 +724,15 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, er case b.notificationRegistry <- registration: return &chainntnfs.BlockEpochEvent{ Epochs: registration.epochChan, + Cancel: func() { + select { + case b.notificationCancels <- &epochCancel{ + epochID: registration.epochID, + }: + case <-b.quit: + return + } + }, }, nil } }