diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 377316a5..a4069a00 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -632,13 +632,6 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err chainntnfs.Log.Infof("New block: height=%v, sha=%v", block.Height, block.Hash) - // We want to set the best block before dispatching notifications so - // if any subscribers make queries based on their received block epoch, - // our state is fully updated in time. - b.bestBlock = block - - b.notifyBlockEpochs(block.Height, block.Hash) - // Finally, we'll update the spend height hint for all of our watched // outpoints that have not been spent yet. This is safe to do as we do // not watch already spent outpoints for spend notifications. @@ -652,13 +645,22 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err uint32(block.Height), ops..., ) if err != nil { - // The error is not fatal, so we should not return an - // error to the caller. + // The error is not fatal since we are connecting a + // block, and advancing the spend hint is an optimistic + // optimization. chainntnfs.Log.Errorf("Unable to update spend hint to "+ "%d for %v: %v", block.Height, ops, err) } } + // We want to set the best block before dispatching notifications so + // if any subscribers make queries based on their received block epoch, + // our state is fully updated in time. + b.bestBlock = block + + // Lastly we'll notify any subscribed clients of the block. + b.notifyBlockEpochs(block.Height, block.Hash) + return nil } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 8498d368..8a8354d0 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -715,16 +715,16 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height, epoch.Hash) - // We want to set the best block before dispatching notifications - // so if any subscribers make queries based on their received - // block epoch, our state is fully updated in time. - b.bestBlock = epoch - - // Next we'll notify any subscribed clients of the block. - b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) + // Define a helper struct for coalescing the spend notifications we will + // dispatch after trying to commit the spend hints. + type spendNtfnBatch struct { + details *chainntnfs.SpendDetail + clients map[uint64]*spendNotification + } // Scan over the list of relevant transactions and possibly dispatch // notifications for spends. + spendBatches := make(map[wire.OutPoint]spendNtfnBatch) for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -740,6 +740,7 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { if !ok { continue } + delete(b.spendNotifications, prevOut) spendDetails := &chainntnfs.SpendDetail{ SpentOutPoint: &prevOut, @@ -749,21 +750,10 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { SpendingHeight: int32(newBlock.height), } - for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching spend "+ - "notification for outpoint=%v", - ntfn.targetOutpoint) - - ntfn.spendChan <- spendDetails - - // Close spendChan to ensure that any calls to - // Cancel will not block. This is safe to do - // since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) + spendBatches[prevOut] = spendNtfnBatch{ + details: spendDetails, + clients: clients, } - - delete(b.spendNotifications, prevOut) } } @@ -780,13 +770,39 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { uint32(epoch.Height), ops..., ) if err != nil { - // The error is not fatal, so we should not return an - // error to the caller. + // The error is not fatal since we are connecting a + // block, and advancing the spend hint is an optimistic + // optimization. chainntnfs.Log.Errorf("Unable to update spend hint to "+ "%d for %v: %v", epoch.Height, ops, err) } } + // We want to set the best block before dispatching notifications + // so if any subscribers make queries based on their received + // block epoch, our state is fully updated in time. + b.bestBlock = epoch + + // Next we'll notify any subscribed clients of the block. + b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) + + // Finally, send off the spend details to the notification subscribers. + for _, batch := range spendBatches { + for _, ntfn := range batch.clients { + chainntnfs.Log.Infof("Dispatching spend "+ + "notification for outpoint=%v", + ntfn.targetOutpoint) + + ntfn.spendChan <- batch.details + + // Close spendChan to ensure that any calls to + // Cancel will not block. This is safe to do + // since the channel is buffered, and the + // message can still be read by the receiver. + close(ntfn.spendChan) + } + } + return nil } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 75135bb0..45990450 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -614,13 +614,16 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height, newBlock.hash) - n.bestHeight = newBlock.height + // Create a helper struct for coalescing spend notifications triggered + // by this block. + type spendNtfnBatch struct { + details *chainntnfs.SpendDetail + clients map[uint64]*spendNotification + } - // Next, notify any subscribed clients of the block. - n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - - // Scan over the list of relevant transactions and possibly dispatch - // notifications for spends. + // Scan over the list of relevant transactions and assemble the + // possible spend notifications we need to dispatch. + spendBatches := make(map[wire.OutPoint]spendNtfnBatch) for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -630,12 +633,13 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // If this transaction indeed does spend an output which // we have a registered notification for, then create a - // spend summary, finally sending off the details to the - // notification subscriber. + // spend summary and add it to our batch of spend + // notifications to be delivered. clients, ok := n.spendNotifications[prevOut] if !ok { continue } + delete(n.spendNotifications, prevOut) spendDetails := &chainntnfs.SpendDetail{ SpentOutPoint: &prevOut, @@ -645,25 +649,15 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { SpendingHeight: int32(newBlock.height), } - for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching spend "+ - "notification for outpoint=%v", - ntfn.targetOutpoint) - - ntfn.spendChan <- spendDetails - - // Close spendChan to ensure that any calls to - // Cancel will not block. This is safe to do - // since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) + spendBatches[prevOut] = spendNtfnBatch{ + details: spendDetails, + clients: clients, } - delete(n.spendNotifications, prevOut) } } - // Finally, we'll update the spend height hint for all of our watched + // Now, we'll update the spend height hint for all of our watched // outpoints that have not been spent yet. This is safe to do as we do // not watch already spent outpoints for spend notifications. ops := make([]wire.OutPoint, 0, len(n.spendNotifications)) @@ -674,13 +668,40 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { if len(ops) > 0 { err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...) if err != nil { - // The error is not fatal, so we should not return an - // error to the caller. + // The error is not fatal since we are connecting a + // block, and advancing the spend hint is an optimistic + // optimization. chainntnfs.Log.Errorf("Unable to update spend hint to "+ "%d for %v: %v", newBlock.height, ops, err) } } + // We want to set the best block before dispatching notifications + // so if any subscribers make queries based on their received + // block epoch, our state is fully updated in time. + n.bestHeight = newBlock.height + + // With all persistent changes committed, notify any subscribed clients + // of the block. + n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) + + // Finally, send off the spend details to the notification subscribers. + for _, batch := range spendBatches { + for _, ntfn := range batch.clients { + chainntnfs.Log.Infof("Dispatching spend "+ + "notification for outpoint=%v", + ntfn.targetOutpoint) + + ntfn.spendChan <- batch.details + + // Close spendChan to ensure that any calls to + // Cancel will not block. This is safe to do + // since the channel is buffered, and the + // message can still be read by the receiver. + close(ntfn.spendChan) + } + } + return nil } @@ -810,7 +831,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, } // Ensure that neutrino is caught up to the height hint before we - // attempt to fetch the utxo fromt the chain. If we're behind, then we + // attempt to fetch the utxo from the chain. If we're behind, then we // may miss a notification dispatch. for { n.heightMtx.RLock() diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 14f150c4..0a951851 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -452,6 +452,19 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { tcn.currentHeight-- tcn.reorgDepth++ + // Rewind the height hint for all watched transactions. + var txs []chainhash.Hash + for tx := range tcn.confNotifications { + txs = append(txs, tx) + } + + err := tcn.hintCache.CommitConfirmHint(tcn.currentHeight, txs...) + if err != nil { + Log.Errorf("Unable to update confirm hint to %d for %v: %v", + tcn.currentHeight, txs, err) + return err + } + // We'll go through all of our watched transactions and attempt to drain // their notification channels to ensure sending notifications to the // clients is always non-blocking. @@ -515,20 +528,6 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { } } - // Rewind the height hint for all watched transactions. - var txs []chainhash.Hash - for tx := range tcn.confNotifications { - txs = append(txs, tx) - } - - err := tcn.hintCache.CommitConfirmHint(tcn.currentHeight, txs...) - if err != nil { - // The error is not fatal, so we should not return an error to - // the caller. - Log.Errorf("Unable to update confirm hint to %d for %v: %v", - tcn.currentHeight, txs, err) - } - // Finally, we can remove the transactions we're currently watching that // were included in this block height. delete(tcn.txsByInitialHeight, blockHeight)