chainntnfs/neutrinonotify: make filter update synchronous

In this commit, we modify the notifier to handle filter updates
synchronously. We do this to prevent race conditions between new block
notifications and filter updates. Otherwise, it's possible for a new
block to come in that should match our filter, but doesn't due to the
filter being updated after.

We also modify their order so that the filter is updated first. We do
this so we can immediately start watching for the event at tip while the
rescan is ongoing.
This commit is contained in:
Wilmer Paulino 2018-10-20 16:30:57 -07:00
parent 60a1d73e08
commit e6b1a27cd7
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F

@ -217,6 +217,14 @@ type filteredBlock struct {
connect bool connect bool
} }
// rescanFilterUpdate represents a request that will be sent to the
// notificaionRegistry in order to prevent race conditions between the filter
// update and new block notifications.
type rescanFilterUpdate struct {
updateOptions []neutrino.UpdateOption
errChan chan error
}
// onFilteredBlockConnected is a callback which is executed each a new block is // onFilteredBlockConnected is a callback which is executed each a new block is
// connected to the end of the main chain. // connected to the end of the main chain.
func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
@ -283,9 +291,8 @@ out:
case registerMsg := <-n.notificationRegistry: case registerMsg := <-n.notificationRegistry:
switch msg := registerMsg.(type) { switch msg := registerMsg.(type) {
case *chainntnfs.HistoricalConfDispatch: case *chainntnfs.HistoricalConfDispatch:
// Look up whether the transaction is already // We'll start a historical rescan chain of the
// included in the active chain. We'll do this // chain asynchronously to prevent blocking
// in a goroutine to prevent blocking
// potentially long rescans. // potentially long rescans.
n.wg.Add(1) n.wg.Add(1)
go func() { go func() {
@ -299,18 +306,6 @@ out:
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
// We'll map the script into an address
// type so we can instruct neutrino to
// match if the transaction containing
// the script is found in a block.
params := n.p2pNode.ChainParams()
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
msg.PkScript, &params,
)
if err != nil {
chainntnfs.Log.Error(err)
}
// If the historical dispatch finished // If the historical dispatch finished
// without error, we will invoke // without error, we will invoke
// UpdateConfDetails even if none were // UpdateConfDetails even if none were
@ -324,25 +319,6 @@ out:
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
if confDetails != nil {
return
}
// If we can't fully dispatch
// confirmation, then we'll update our
// filter so we can be notified of its
// future initial confirmation.
rescanUpdate := []neutrino.UpdateOption{
neutrino.AddAddrs(addrs...),
neutrino.Rewind(msg.EndHeight),
neutrino.DisableDisconnectedNtfns(true),
}
err = n.chainView.Update(rescanUpdate...)
if err != nil {
chainntnfs.Log.Errorf("Unable to update rescan: %v",
err)
}
}() }()
case *blockEpochRegistration: case *blockEpochRegistration:
@ -367,6 +343,14 @@ out:
} }
} }
msg.errorChan <- nil msg.errorChan <- nil
case *rescanFilterUpdate:
err := n.chainView.Update(msg.updateOptions...)
if err != nil {
chainntnfs.Log.Errorf("Unable to "+
"update rescan filter: %v", err)
}
msg.errChan <- err
} }
case item := <-n.chainUpdates.ChanOut(): case item := <-n.chainUpdates.ChanOut():
@ -658,9 +642,47 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return ntfn.Event, nil return ntfn.Event, nil
} }
// Ensure that neutrino is caught up to the height hint before we // To determine whether this outpoint has been spent on-chain, we'll
// attempt to fetch the UTXO from the chain. If we're behind, then we // update our filter to watch for the transaction at tip and we'll also
// may miss a notification dispatch. // dispatch a historical rescan to determine if it has been spent in the
// past.
//
// We'll update our filter first to ensure we can immediately detect the
// spend at tip. To do so, we'll map the script into an address
// type so we can instruct neutrino to match if the transaction
// containing the script is found in a block.
inputToWatch := neutrino.InputWithScript{
OutPoint: *outpoint,
PkScript: pkScript,
}
errChan := make(chan error, 1)
select {
case n.notificationRegistry <- &rescanFilterUpdate{
updateOptions: []neutrino.UpdateOption{
neutrino.AddInputs(inputToWatch),
neutrino.Rewind(historicalDispatch.EndHeight),
neutrino.DisableDisconnectedNtfns(true),
},
errChan: errChan,
}:
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
}
select {
case err = <-errChan:
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
}
if err != nil {
return nil, fmt.Errorf("unable to update filter: %v", err)
}
// With the filter updated, we'll dispatch our historical rescan to
// ensure we detect the spend if it happened in the past. We'll ensure
// that neutrino is caught up to the starting height before we attempt
// to fetch the UTXO from the chain. If we're behind, then we may miss a
// notification dispatch.
for { for {
n.heightMtx.RLock() n.heightMtx.RLock()
currentHeight := n.bestHeight currentHeight := n.bestHeight
@ -673,12 +695,6 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
} }
// Before sending off the notification request, we'll attempt to see if
// this output is still spent or not at this point in the chain.
inputToWatch := neutrino.InputWithScript{
OutPoint: *outpoint,
PkScript: pkScript,
}
spendReport, err := n.p2pNode.GetUtxo( spendReport, err := n.p2pNode.GetUtxo(
neutrino.WatchInputs(inputToWatch), neutrino.WatchInputs(inputToWatch),
neutrino.StartBlock(&waddrmgr.BlockStamp{ neutrino.StartBlock(&waddrmgr.BlockStamp{
@ -694,19 +710,23 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// If a spend report was returned, and the transaction is present, then // If a spend report was returned, and the transaction is present, then
// this means that the output is already spent. // this means that the output is already spent.
var spendDetails *chainntnfs.SpendDetail
if spendReport != nil && spendReport.SpendingTx != nil { if spendReport != nil && spendReport.SpendingTx != nil {
// As a result, we'll launch a goroutine to immediately
// dispatch the notification with a normal response.
spendingTxHash := spendReport.SpendingTx.TxHash() spendingTxHash := spendReport.SpendingTx.TxHash()
spendDetails := &chainntnfs.SpendDetail{ spendDetails = &chainntnfs.SpendDetail{
SpentOutPoint: outpoint, SpentOutPoint: outpoint,
SpenderTxHash: &spendingTxHash, SpenderTxHash: &spendingTxHash,
SpendingTx: spendReport.SpendingTx, SpendingTx: spendReport.SpendingTx,
SpenderInputIndex: spendReport.SpendingInputIndex, SpenderInputIndex: spendReport.SpendingInputIndex,
SpendingHeight: int32(spendReport.SpendingTxHeight), SpendingHeight: int32(spendReport.SpendingTxHeight),
} }
}
err := n.txNotifier.UpdateSpendDetails(*outpoint, spendDetails) // Finally, no matter whether the rescan found a spend in the past or
// not, we'll mark our historical rescan as complete to ensure the
// outpoint's spend hint gets updated upon connected/disconnected
// blocks.
err = n.txNotifier.UpdateSpendDetails(*outpoint, spendDetails)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -714,42 +734,6 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return ntfn.Event, nil return ntfn.Event, nil
} }
// If the output is still unspent, then we'll mark our historical rescan
// as complete and update our rescan's filter to watch for the spend of
// the outpoint in question.
if err := n.txNotifier.UpdateSpendDetails(*outpoint, nil); err != nil {
return nil, err
}
rescanUpdate := []neutrino.UpdateOption{
neutrino.AddInputs(inputToWatch),
neutrino.Rewind(historicalDispatch.EndHeight),
neutrino.DisableDisconnectedNtfns(true),
}
if err := n.chainView.Update(rescanUpdate...); err != nil {
return nil, err
}
select {
case n.notificationRegistry <- ntfn:
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
}
// Finally, we'll add a spent hint with the current height to the cache
// in order to better keep track of when this outpoint is spent.
err = n.spendHintCache.CommitSpendHint(currentHeight, *outpoint)
if err != nil {
// The error is not fatal, so we should not return an error to
// the caller.
chainntnfs.Log.Errorf("Unable to update spend hint to %d for "+
"%v: %v", currentHeight, outpoint, err)
}
return spendEvent, nil
}
// RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier
// which will be triggered once the txid reaches numConfs number of // which will be triggered once the txid reaches numConfs number of
// confirmations. // confirmations.
@ -784,12 +768,55 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
return ntfn.Event, nil return ntfn.Event, nil
} }
// To determine whether this transaction has confirmed on-chain, we'll
// update our filter to watch for the transaction at tip and we'll also
// dispatch a historical rescan to determine if it has confirmed in the
// past.
//
// We'll update our filter first to ensure we can immediately detect the
// confirmation at tip. To do so, we'll map the script into an address
// type so we can instruct neutrino to match if the transaction
// containing the script is found in a block.
params := n.p2pNode.ChainParams()
_, addrs, _, err := txscript.ExtractPkScriptAddrs(pkScript, &params)
if err != nil {
return nil, fmt.Errorf("unable to extract script: %v", err)
}
// We'll send the filter update request to the notifier's main event
// handler and wait for its response.
errChan := make(chan error, 1)
select { select {
case n.notificationRegistry <- dispatch: case n.notificationRegistry <- &rescanFilterUpdate{
return ntfn.Event, nil updateOptions: []neutrino.UpdateOption{
neutrino.AddAddrs(addrs...),
neutrino.Rewind(dispatch.EndHeight),
neutrino.DisableDisconnectedNtfns(true),
},
errChan: errChan,
}:
case <-n.quit: case <-n.quit:
return nil, ErrChainNotifierShuttingDown return nil, ErrChainNotifierShuttingDown
} }
select {
case err = <-errChan:
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
}
if err != nil {
return nil, fmt.Errorf("unable to update filter: %v", err)
}
// Finally, with the filter updates, we can dispatch the historical
// rescan to ensure we can detect if the event happened in the past.
select {
case n.notificationRegistry <- dispatch:
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
}
return ntfn.Event, nil
} }
// blockEpochRegistration represents a client's intent to receive a // blockEpochRegistration represents a client's intent to receive a