chainntnfs/neutrinonotify: handle spend notification registration w/ TxNotifier

In this commit, we modify the logic within RegisterSpendNtfn for the
NeutrinoNotifier to account for the recent changes made to the
TxNotifier. Since it is now able to handle spend notification
registration and dispatch, we can bypass all the current logic within
the NeutrinoNotifier and interact directly with the TxNotifier instead.

The most notable change is that now we'll only attempt a historical
rescan if the TxNotifier tells us so.
This commit is contained in:
Wilmer Paulino 2018-10-05 02:07:55 -07:00
parent 74139c9a3f
commit bfd11a251e
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 55 additions and 156 deletions

@ -164,6 +164,7 @@ func (n *NeutrinoNotifier) Start() error {
n.txNotifier = chainntnfs.NewTxNotifier(
n.bestHeight, reorgSafetyLimit, n.confirmHintCache,
n.spendHintCache,
)
n.chainConn = &NeutrinoChainConn{n.p2pNode}
@ -603,68 +604,6 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height,
newBlock.hash)
// Create a helper struct for coalescing spend notifications triggered
// by this block.
type spendNtfnBatch struct {
details *chainntnfs.SpendDetail
clients map[uint64]*spendNotification
}
// Scan over the list of relevant transactions and assemble the
// possible spend notifications we need to dispatch.
spendBatches := make(map[wire.OutPoint]spendNtfnBatch)
for _, tx := range newBlock.txns {
mtx := tx.MsgTx()
txSha := mtx.TxHash()
for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an output which
// we have a registered notification for, then create a
// spend summary and add it to our batch of spend
// notifications to be delivered.
clients, ok := n.spendNotifications[prevOut]
if !ok {
continue
}
delete(n.spendNotifications, prevOut)
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &txSha,
SpendingTx: mtx,
SpenderInputIndex: uint32(i),
SpendingHeight: int32(newBlock.height),
}
spendBatches[prevOut] = spendNtfnBatch{
details: spendDetails,
clients: clients,
}
}
}
// Now, we'll update the spend height hint for all of our watched
// outpoints that have not been spent yet. This is safe to do as we do
// not watch already spent outpoints for spend notifications.
ops := make([]wire.OutPoint, 0, len(n.spendNotifications))
for op := range n.spendNotifications {
ops = append(ops, op)
}
if len(ops) > 0 {
err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...)
if err != nil {
// The error is not fatal since we are connecting a
// block, and advancing the spend hint is an optimistic
// optimization.
chainntnfs.Log.Errorf("Unable to update spend hint to "+
"%d for %v: %v", newBlock.height, ops, err)
}
}
// We want to set the best block before dispatching notifications
// so if any subscribers make queries based on their received
// block epoch, our state is fully updated in time.
@ -674,23 +613,6 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// of the block.
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Finally, send off the spend details to the notification subscribers.
for _, batch := range spendBatches {
for _, ntfn := range batch.clients {
chainntnfs.Log.Infof("Dispatching spend "+
"notification for outpoint=%v",
ntfn.targetOutpoint)
ntfn.spendChan <- batch.details
// Close spendChan to ensure that any calls to
// Cancel will not block. This is safe to do
// since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan)
}
}
return nil
}
@ -766,86 +688,59 @@ type spendCancel struct {
func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
n.heightMtx.RLock()
currentHeight := n.bestHeight
n.heightMtx.RUnlock()
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := n.spendHintCache.QuerySpendHint(*outpoint); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, outpoint)
heightHint = hint
}
// First, we'll construct a spend notification request and hand it off
// to the txNotifier.
spendID := atomic.AddUint64(&n.spendClientCounter, 1)
cancel := func() {
n.txNotifier.CancelSpend(*outpoint, spendID)
}
ntfn := &chainntnfs.SpendNtfn{
SpendID: spendID,
OutPoint: *outpoint,
Event: chainntnfs.NewSpendEvent(cancel),
HeightHint: heightHint,
}
// Construct a notification request for the outpoint. We'll defer
// sending it to the main event loop until after we've guaranteed that
// the outpoint has not been spent.
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
spendID: atomic.AddUint64(&n.spendClientCounter, 1),
heightHint: heightHint,
historicalDispatch, err := n.txNotifier.RegisterSpend(ntfn)
if err != nil {
return nil, err
}
spendEvent := &chainntnfs.SpendEvent{
Spend: ntfn.spendChan,
Cancel: func() {
cancel := &spendCancel{
op: *outpoint,
spendID: ntfn.spendID,
}
// Submit spend cancellation to notification dispatcher.
select {
case n.notificationCancels <- cancel:
// Cancellation is being handled, drain the
// spend chan until it is closed before yielding
// to the caller.
for {
select {
case _, ok := <-ntfn.spendChan:
if !ok {
return
}
case <-n.quit:
return
}
}
case <-n.quit:
}
},
// If the txNotifier didn't return any details to perform a historical
// scan of the chain, then we can return early as there's nothing left
// for us to do.
if historicalDispatch == nil {
return ntfn.Event, nil
}
// Ensure that neutrino is caught up to the height hint before we
// attempt to fetch the utxo from the chain. If we're behind, then we
// attempt to fetch the UTXO from the chain. If we're behind, then we
// may miss a notification dispatch.
for {
n.heightMtx.RLock()
currentHeight = n.bestHeight
currentHeight := n.bestHeight
n.heightMtx.RUnlock()
if currentHeight < heightHint {
time.Sleep(time.Millisecond * 200)
continue
if currentHeight >= historicalDispatch.StartHeight {
break
}
break
}
inputToWatch := neutrino.InputWithScript{
OutPoint: *outpoint,
PkScript: pkScript,
time.Sleep(time.Millisecond * 200)
}
// Before sending off the notification request, we'll attempt to see if
// this output is still spent or not at this point in the chain.
inputToWatch := neutrino.InputWithScript{
OutPoint: *outpoint,
PkScript: pkScript,
}
spendReport, err := n.p2pNode.GetUtxo(
neutrino.WatchInputs(inputToWatch),
neutrino.StartBlock(&waddrmgr.BlockStamp{
Height: int32(heightHint),
Height: int32(historicalDispatch.StartHeight),
}),
neutrino.EndBlock(&waddrmgr.BlockStamp{
Height: int32(historicalDispatch.EndHeight),
}),
)
if err != nil && !strings.Contains(err.Error(), "not found") {
@ -857,30 +752,33 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
if spendReport != nil && spendReport.SpendingTx != nil {
// As a result, we'll launch a goroutine to immediately
// dispatch the notification with a normal response.
go func() {
txSha := spendReport.SpendingTx.TxHash()
select {
case ntfn.spendChan <- &chainntnfs.SpendDetail{
SpentOutPoint: outpoint,
SpenderTxHash: &txSha,
SpendingTx: spendReport.SpendingTx,
SpenderInputIndex: spendReport.SpendingInputIndex,
SpendingHeight: int32(spendReport.SpendingTxHeight),
}:
case <-n.quit:
return
}
spendingTxHash := spendReport.SpendingTx.TxHash()
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: outpoint,
SpenderTxHash: &spendingTxHash,
SpendingTx: spendReport.SpendingTx,
SpenderInputIndex: spendReport.SpendingInputIndex,
SpendingHeight: int32(spendReport.SpendingTxHeight),
}
}()
err := n.txNotifier.UpdateSpendDetails(*outpoint, spendDetails)
if err != nil {
return nil, err
}
return spendEvent, nil
return ntfn.Event, nil
}
// If the output is still unspent, then we'll mark our historical rescan
// as complete and update our rescan's filter to watch for the spend of
// the outpoint in question.
if err := n.txNotifier.UpdateSpendDetails(*outpoint, nil); err != nil {
return nil, err
}
// If the output is still unspent, then we'll update our rescan's
// filter, and send the request to the dispatcher goroutine.
rescanUpdate := []neutrino.UpdateOption{
neutrino.AddInputs(inputToWatch),
neutrino.Rewind(currentHeight),
neutrino.Rewind(historicalDispatch.EndHeight),
neutrino.DisableDisconnectedNtfns(true),
}

@ -50,6 +50,7 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32,
n.txNotifier = chainntnfs.NewTxNotifier(
uint32(bestHeight), reorgSafetyLimit, n.confirmHintCache,
n.spendHintCache,
)
n.chainConn = &NeutrinoChainConn{n.p2pNode}