From 7dceac92d21f7788ca323908dd921cef2021b0f3 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 24 Aug 2018 17:57:05 -0700 Subject: [PATCH] chainntnfs/neutrino: commit spend hints before notifying --- chainntnfs/neutrinonotify/neutrino.go | 71 +++++++++++++++++---------- 1 file changed, 46 insertions(+), 25 deletions(-) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 75135bb0..45990450 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -614,13 +614,16 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height, newBlock.hash) - n.bestHeight = newBlock.height + // Create a helper struct for coalescing spend notifications triggered + // by this block. + type spendNtfnBatch struct { + details *chainntnfs.SpendDetail + clients map[uint64]*spendNotification + } - // Next, notify any subscribed clients of the block. - n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - - // Scan over the list of relevant transactions and possibly dispatch - // notifications for spends. + // Scan over the list of relevant transactions and assemble the + // possible spend notifications we need to dispatch. + spendBatches := make(map[wire.OutPoint]spendNtfnBatch) for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -630,12 +633,13 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // If this transaction indeed does spend an output which // we have a registered notification for, then create a - // spend summary, finally sending off the details to the - // notification subscriber. + // spend summary and add it to our batch of spend + // notifications to be delivered. clients, ok := n.spendNotifications[prevOut] if !ok { continue } + delete(n.spendNotifications, prevOut) spendDetails := &chainntnfs.SpendDetail{ SpentOutPoint: &prevOut, @@ -645,25 +649,15 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { SpendingHeight: int32(newBlock.height), } - for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching spend "+ - "notification for outpoint=%v", - ntfn.targetOutpoint) - - ntfn.spendChan <- spendDetails - - // Close spendChan to ensure that any calls to - // Cancel will not block. This is safe to do - // since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) + spendBatches[prevOut] = spendNtfnBatch{ + details: spendDetails, + clients: clients, } - delete(n.spendNotifications, prevOut) } } - // Finally, we'll update the spend height hint for all of our watched + // Now, we'll update the spend height hint for all of our watched // outpoints that have not been spent yet. This is safe to do as we do // not watch already spent outpoints for spend notifications. ops := make([]wire.OutPoint, 0, len(n.spendNotifications)) @@ -674,13 +668,40 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { if len(ops) > 0 { err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...) if err != nil { - // The error is not fatal, so we should not return an - // error to the caller. + // The error is not fatal since we are connecting a + // block, and advancing the spend hint is an optimistic + // optimization. chainntnfs.Log.Errorf("Unable to update spend hint to "+ "%d for %v: %v", newBlock.height, ops, err) } } + // We want to set the best block before dispatching notifications + // so if any subscribers make queries based on their received + // block epoch, our state is fully updated in time. + n.bestHeight = newBlock.height + + // With all persistent changes committed, notify any subscribed clients + // of the block. + n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) + + // Finally, send off the spend details to the notification subscribers. + for _, batch := range spendBatches { + for _, ntfn := range batch.clients { + chainntnfs.Log.Infof("Dispatching spend "+ + "notification for outpoint=%v", + ntfn.targetOutpoint) + + ntfn.spendChan <- batch.details + + // Close spendChan to ensure that any calls to + // Cancel will not block. This is safe to do + // since the channel is buffered, and the + // message can still be read by the receiver. + close(ntfn.spendChan) + } + } + return nil } @@ -810,7 +831,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, } // Ensure that neutrino is caught up to the height hint before we - // attempt to fetch the utxo fromt the chain. If we're behind, then we + // attempt to fetch the utxo from the chain. If we're behind, then we // may miss a notification dispatch. for { n.heightMtx.RLock()