diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 41c95ff7..38b0330f 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -20,6 +20,22 @@ const ( notifierType = "btcd" ) +// chainUpdate encapsulates an update to the current main chain. This struct is +// used as an element within an unbounded queue in order to avoid blocking the +// main rpc dispatch rule. +type chainUpdate struct { + blockHash *wire.ShaHash + blockHeight int32 +} + +// txUpdate encapsulates a transaction related notification sent from btcd to +// the registered RPC client. This struct is used as an element within an +// unbounded queue in order to avoid blocking the main rpc dispatch rule. +type txUpdate struct { + tx *btcutil.Tx + details *btcjson.BlockDetails +} + // BtcdNotifier implements the ChainNotifier interface using btcd's websockets // notifications. Multiple concurrent clients are supported. All notifications // are achieved via non-blocking sends on client channels. @@ -40,9 +56,15 @@ type BtcdNotifier struct { blockEpochClients []chan *chainntnfs.BlockEpoch - connectedBlockHashes chan *blockNtfn disconnectedBlockHashes chan *blockNtfn - relevantTxs chan *btcutil.Tx + + chainUpdates []*chainUpdate + chainUpdateSignal chan struct{} + chainUpdateMtx sync.Mutex + + txUpdates []*txUpdate + txUpdateSignal chan struct{} + txUpdateMtx sync.Mutex wg sync.WaitGroup quit chan struct{} @@ -62,9 +84,10 @@ func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { confNotifications: make(map[wire.ShaHash][]*confirmationsNotification), confHeap: newConfirmationHeap(), - connectedBlockHashes: make(chan *blockNtfn, 20), disconnectedBlockHashes: make(chan *blockNtfn, 20), - relevantTxs: make(chan *btcutil.Tx, 100), + + chainUpdateSignal: make(chan struct{}), + txUpdateSignal: make(chan struct{}), quit: make(chan struct{}), } @@ -148,11 +171,21 @@ type blockNtfn struct { } // onBlockConnected implements on OnBlockConnected callback for btcrpcclient. +// Ingesting a block updates the wallet's internal utxo state based on the +// outputs created and destroyed within each block. func (b *BtcdNotifier) onBlockConnected(hash *wire.ShaHash, height int32, t time.Time) { - select { - case b.connectedBlockHashes <- &blockNtfn{hash, height}: - case <-b.quit: - } + // Append this new chain update to the end of the queue of new chain + // updates. + b.chainUpdateMtx.Lock() + b.chainUpdates = append(b.chainUpdates, &chainUpdate{hash, height}) + b.chainUpdateMtx.Unlock() + + // Launch a goroutine to signal the notification dispatcher that a new + // block update is available. We do this in a new goroutine in order to + // avoid blocking the main loop of the rpc client. + go func() { + b.chainUpdateSignal <- struct{}{} + }() } // onBlockDisconnected implements on OnBlockDisconnected callback for btcrpcclient. @@ -160,11 +193,19 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *wire.ShaHash, height int32, t t } // onRedeemingTx implements on OnRedeemingTx callback for btcrpcclient. -func (b *BtcdNotifier) onRedeemingTx(transaction *btcutil.Tx, details *btcjson.BlockDetails) { - select { - case b.relevantTxs <- transaction: - case <-b.quit: - } +func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { + // Append this new transaction update to the end of the queue of new chain + // updates. + b.txUpdateMtx.Lock() + b.txUpdates = append(b.txUpdates, &txUpdate{tx, details}) + b.txUpdateMtx.Unlock() + + // Launch a goroutine to signal the notification dispatcher that a new + // transaction update is available. We do this in a new goroutine in + // order to avoid blocking the main loop of the rpc client. + go func() { + b.txUpdateSignal <- struct{}{} + }() } // notificationDispatcher is the primary goroutine which handles client @@ -197,20 +238,28 @@ out: // * notify of negative confirmations chainntnfs.Log.Warnf("Block disconnected from main "+ "chain: %v", staleBlockHash) - case connectedBlock := <-b.connectedBlockHashes: - newBlock, err := b.chainConn.GetBlock(connectedBlock.sha) + case <-b.chainUpdateSignal: + // A new update is available, so pop the new chain + // update from the front of the update queue. + b.chainUpdateMtx.Lock() + update := b.chainUpdates[0] + b.chainUpdates[0] = nil // Set to nil to prevent GC leak. + b.chainUpdates = b.chainUpdates[1:] + b.chainUpdateMtx.Unlock() + + newBlock, err := b.chainConn.GetBlock(update.blockHash) if err != nil { chainntnfs.Log.Errorf("Unable to get block: %v", err) continue } chainntnfs.Log.Infof("New block: height=%v, sha=%v", - connectedBlock.height, connectedBlock.sha) + update.blockHeight, update.blockHash) - go b.notifyBlockEpochs(connectedBlock.height, - connectedBlock.sha) + go b.notifyBlockEpochs(update.blockHeight, + update.blockHash) - newHeight := connectedBlock.height + newHeight := update.blockHeight for _, tx := range newBlock.Transactions() { // Check if the inclusion of this transaction // within a block by itself triggers a block @@ -226,10 +275,20 @@ out: // chain. Send out any N confirmation notifications // which may have been triggered by this new block. b.notifyConfs(newHeight) - case newSpend := <-b.relevantTxs: + case <-b.txUpdateSignal: + // A new update is available, so pop the new chain + // update from the front of the update queue. + b.txUpdateMtx.Lock() + newSpend := b.txUpdates[0] + b.txUpdates[0] = nil // Set to nil to prevent GC leak. + b.txUpdates = b.txUpdates[1:] + b.txUpdateMtx.Unlock() + + spendingTx := newSpend.tx + // First, check if this transaction spends an output // that has an existing spend notification for it. - for i, txIn := range newSpend.MsgTx().TxIn { + for i, txIn := range spendingTx.MsgTx().TxIn { prevOut := txIn.PreviousOutPoint // If this transaction indeed does spend an @@ -238,12 +297,12 @@ out: // sending off the details to the notification // subscriber. if ntfn, ok := b.spendNotifications[prevOut]; ok { - spenderSha := newSpend.Sha() + spenderSha := newSpend.tx.Sha() spendDetails := &chainntnfs.SpendDetail{ SpentOutPoint: ntfn.targetOutpoint, SpenderTxHash: spenderSha, // TODO(roasbeef): copy tx? - SpendingTx: newSpend.MsgTx(), + SpendingTx: spendingTx.MsgTx(), SpenderInputIndex: uint32(i), }