chainntnfs/neutrino: commit spend hints before notifying

This commit is contained in:
Conner Fromknecht 2018-08-24 17:57:05 -07:00
parent e498a837a3
commit 7dceac92d2
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -614,13 +614,16 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height, chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height,
newBlock.hash) newBlock.hash)
n.bestHeight = newBlock.height // Create a helper struct for coalescing spend notifications triggered
// by this block.
type spendNtfnBatch struct {
details *chainntnfs.SpendDetail
clients map[uint64]*spendNotification
}
// Next, notify any subscribed clients of the block. // Scan over the list of relevant transactions and assemble the
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) // possible spend notifications we need to dispatch.
spendBatches := make(map[wire.OutPoint]spendNtfnBatch)
// Scan over the list of relevant transactions and possibly dispatch
// notifications for spends.
for _, tx := range newBlock.txns { for _, tx := range newBlock.txns {
mtx := tx.MsgTx() mtx := tx.MsgTx()
txSha := mtx.TxHash() txSha := mtx.TxHash()
@ -630,12 +633,13 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// If this transaction indeed does spend an output which // If this transaction indeed does spend an output which
// we have a registered notification for, then create a // we have a registered notification for, then create a
// spend summary, finally sending off the details to the // spend summary and add it to our batch of spend
// notification subscriber. // notifications to be delivered.
clients, ok := n.spendNotifications[prevOut] clients, ok := n.spendNotifications[prevOut]
if !ok { if !ok {
continue continue
} }
delete(n.spendNotifications, prevOut)
spendDetails := &chainntnfs.SpendDetail{ spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut, SpentOutPoint: &prevOut,
@ -645,25 +649,15 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
SpendingHeight: int32(newBlock.height), SpendingHeight: int32(newBlock.height),
} }
for _, ntfn := range clients { spendBatches[prevOut] = spendNtfnBatch{
chainntnfs.Log.Infof("Dispatching spend "+ details: spendDetails,
"notification for outpoint=%v", clients: clients,
ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to
// Cancel will not block. This is safe to do
// since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan)
} }
delete(n.spendNotifications, prevOut)
} }
} }
// Finally, we'll update the spend height hint for all of our watched // Now, we'll update the spend height hint for all of our watched
// outpoints that have not been spent yet. This is safe to do as we do // outpoints that have not been spent yet. This is safe to do as we do
// not watch already spent outpoints for spend notifications. // not watch already spent outpoints for spend notifications.
ops := make([]wire.OutPoint, 0, len(n.spendNotifications)) ops := make([]wire.OutPoint, 0, len(n.spendNotifications))
@ -674,13 +668,40 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
if len(ops) > 0 { if len(ops) > 0 {
err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...) err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...)
if err != nil { if err != nil {
// The error is not fatal, so we should not return an // The error is not fatal since we are connecting a
// error to the caller. // block, and advancing the spend hint is an optimistic
// optimization.
chainntnfs.Log.Errorf("Unable to update spend hint to "+ chainntnfs.Log.Errorf("Unable to update spend hint to "+
"%d for %v: %v", newBlock.height, ops, err) "%d for %v: %v", newBlock.height, ops, err)
} }
} }
// We want to set the best block before dispatching notifications
// so if any subscribers make queries based on their received
// block epoch, our state is fully updated in time.
n.bestHeight = newBlock.height
// With all persistent changes committed, notify any subscribed clients
// of the block.
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Finally, send off the spend details to the notification subscribers.
for _, batch := range spendBatches {
for _, ntfn := range batch.clients {
chainntnfs.Log.Infof("Dispatching spend "+
"notification for outpoint=%v",
ntfn.targetOutpoint)
ntfn.spendChan <- batch.details
// Close spendChan to ensure that any calls to
// Cancel will not block. This is safe to do
// since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan)
}
}
return nil return nil
} }
@ -810,7 +831,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
} }
// Ensure that neutrino is caught up to the height hint before we // Ensure that neutrino is caught up to the height hint before we
// attempt to fetch the utxo fromt the chain. If we're behind, then we // attempt to fetch the utxo from the chain. If we're behind, then we
// may miss a notification dispatch. // may miss a notification dispatch.
for { for {
n.heightMtx.RLock() n.heightMtx.RLock()