chainntnfs/txconfnotifier: split out ntfn dispatch into helper

This commit is contained in:
Conner Fromknecht 2018-09-28 16:30:13 -07:00
parent 217b1fc0ef
commit 9ae6d43916
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -222,9 +222,6 @@ func (tcn *TxConfNotifier) Register(
tcn.Lock() tcn.Lock()
defer tcn.Unlock() defer tcn.Unlock()
// TODO(conner): promote immediately to confNotifications if a
// historical dispatch has already completed.
confSet, ok := tcn.confNotifications[*ntfn.TxID] confSet, ok := tcn.confNotifications[*ntfn.TxID]
if !ok { if !ok {
confSet = newConfNtfnSet() confSet = newConfNtfnSet()
@ -238,6 +235,10 @@ func (tcn *TxConfNotifier) Register(
// A prior rescan has already completed and we are actively watching at // A prior rescan has already completed and we are actively watching at
// tip for this txid. // tip for this txid.
case rescanComplete: case rescanComplete:
// If conf details for this set of notifications has already
// been found, we'll attempt to deliver them immediately to this
// client.
tcn.dispatchConfDetails(ntfn, confSet.details)
return nil, nil return nil, nil
// A rescan is already in progress, return here to prevent dispatching // A rescan is already in progress, return here to prevent dispatching
@ -327,72 +328,96 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash,
details.BlockHeight, txid, err) details.BlockHeight, txid, err)
} }
// Update the conf details of all ntfns that don't yet have them. // Cache the details found in the rescan and attempt to dispatch any
// notifications that have not yet been delivered.
confSet.details = details
for _, ntfn := range confSet.ntfns { for _, ntfn := range confSet.ntfns {
if ntfn.details != nil { err = tcn.dispatchConfDetails(ntfn, details)
continue if err != nil {
return err
}
}
return nil
}
// dispatchConfDetails attempts to cache and dispatch details to a particular
// client if the transaction has sufficiently confirmed. If the provided details
// are nil, this method will be a no-op.
func (tcn *TxConfNotifier) dispatchConfDetails(
ntfn *ConfNtfn, details *TxConfirmation) error {
// If no details are provided, return early as we can't dispatch.
if details == nil {
return nil
}
// Set the confirmation details for this notification, only if the
// notification doesn't already have details. This ensure we only fall
// through the following logic at most once, which could cause the
// buffered channels to block when exceeding their allocated capacity.
if ntfn.details != nil {
return nil
}
ntfn.details = details
// Now, we'll examine whether the transaction of this
// notification request has reached its required number of
// confirmations. If it has, we'll dispatch a confirmation
// notification to the caller.
confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
if confHeight <= tcn.currentHeight {
Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID)
// We'll send a 0 value to the Updates channel,
// indicating that the transaction has already been
// confirmed.
select {
case ntfn.Event.Updates <- 0:
case <-tcn.quit:
return ErrTxConfNotifierExiting
} }
ntfn.details = details select {
case ntfn.Event.Confirmed <- details:
// Now, we'll examine whether the transaction of this ntfn.dispatched = true
// notification request has reached its required number of case <-tcn.quit:
// confirmations. If it has, we'll dispatch a confirmation return ErrTxConfNotifierExiting
// notification to the caller.
confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
if confHeight <= tcn.currentHeight {
Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID)
// We'll send a 0 value to the Updates channel,
// indicating that the transaction has already been
// confirmed.
select {
case ntfn.Event.Updates <- 0:
case <-tcn.quit:
return ErrTxConfNotifierExiting
}
select {
case ntfn.Event.Confirmed <- details:
ntfn.dispatched = true
case <-tcn.quit:
return ErrTxConfNotifierExiting
}
} else {
// Otherwise, we'll keep track of the notification
// request by the height at which we should dispatch the
// confirmation notification.
ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
if !exists {
ntfnSet = make(map[*ConfNtfn]struct{})
tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet
}
ntfnSet[ntfn] = struct{}{}
// We'll also send an update to the client of how many
// confirmations are left for the transaction to be
// confirmed.
numConfsLeft := confHeight - tcn.currentHeight
select {
case ntfn.Event.Updates <- numConfsLeft:
case <-tcn.quit:
return ErrTxConfNotifierExiting
}
} }
} else {
// As a final check, we'll also watch the transaction if it's // Otherwise, we'll keep track of the notification
// still possible for it to get reorged out of the chain. // request by the height at which we should dispatch the
blockHeight := details.BlockHeight // confirmation notification.
reorgSafeHeight := blockHeight + tcn.reorgSafetyLimit ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
if reorgSafeHeight > tcn.currentHeight { if !exists {
txSet, exists := tcn.txsByInitialHeight[blockHeight] ntfnSet = make(map[*ConfNtfn]struct{})
if !exists { tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet
txSet = make(map[chainhash.Hash]struct{})
tcn.txsByInitialHeight[blockHeight] = txSet
}
txSet[txid] = struct{}{}
} }
ntfnSet[ntfn] = struct{}{}
// We'll also send an update to the client of how many
// confirmations are left for the transaction to be
// confirmed.
numConfsLeft := confHeight - tcn.currentHeight
select {
case ntfn.Event.Updates <- numConfsLeft:
case <-tcn.quit:
return ErrTxConfNotifierExiting
}
}
// As a final check, we'll also watch the transaction if it's
// still possible for it to get reorged out of the chain.
blockHeight := details.BlockHeight
reorgSafeHeight := blockHeight + tcn.reorgSafetyLimit
if reorgSafeHeight > tcn.currentHeight {
txSet, exists := tcn.txsByInitialHeight[blockHeight]
if !exists {
txSet = make(map[chainhash.Hash]struct{})
tcn.txsByInitialHeight[blockHeight] = txSet
}
txSet[*ntfn.TxID] = struct{}{}
} }
return nil return nil