chainntnfs: properly account for partial historical confirmation dispatch

This commit modifies the recently added logic to the ChainNotifier to:
  1. Fix the off-by-one confirmation error that was missed due a flaky
test
  2. Ensure that partial historical confirmations are properly handled.

The partial hostile confirmation case arises when a transaction already
a non-zero number of confirmations when the notification is registered,
but less than what would trigger the confirmation notification. To fix
this, transaction which have a partial number of confirmation are now
properly inserted into the confHeap, skipping first first phase for
notifications.
This commit is contained in:
Olaoluwa Osuntokun 2016-12-08 16:15:58 -08:00
parent a20594b0bf
commit 1e3635b5aa
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 105 additions and 33 deletions

@ -2,7 +2,6 @@ package btcdnotify
import (
"container/heap"
"strings"
"sync"
"sync/atomic"
"time"
@ -212,6 +211,8 @@ func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetai
// notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches.
func (b *BtcdNotifier) notificationDispatcher() {
var currentHeight int32
out:
for {
select {
@ -226,9 +227,14 @@ out:
chainntnfs.Log.Infof("New confirmations "+
"subscription: txid=%v, numconfs=%v",
*msg.txid, msg.numConfirmations)
// TODO(roasbeef): perform a N-block look
// behind to catch race-condition due to faster
// inter-block time?
// If the notification can be partially or
// fully dispatched, then we can skip the first
// phase for ntfns.
if b.attemptHistoricalDispatch(msg, currentHeight) {
continue
}
txid := *msg.txid
b.confNotifications[txid] = append(b.confNotifications[txid], msg)
case *blockEpochRegistration:
@ -252,6 +258,8 @@ out:
b.chainUpdates = b.chainUpdates[1:]
b.chainUpdateMtx.Unlock()
currentHeight = update.blockHeight
newBlock, err := b.chainConn.GetBlock(update.blockHash)
if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err)
@ -329,6 +337,41 @@ out:
b.wg.Done()
}
// attemptHistoricalDispatch tries to use historical information to decide if a
// notificaiton ca be disptahced immediately, or is partially confirmed so it
// can skip straight to the confirmations heap.
func (b *BtcdNotifier) attemptHistoricalDispatch(msg *confirmationsNotification,
currentHeight int32) bool {
// If the transaction already has some or all of the confirmations,
// then we may be able to
// dispatch it immediately.
tx, err := b.chainConn.GetRawTransactionVerbose(msg.txid)
if err != nil {
return false
}
// If the transaction has more that enough confirmations, then we can
// dispatch it immediately after obtaininig for information w.r.t
// exaclty *when* if got all its confirmations.
if uint32(tx.Confirmations) >= msg.numConfirmations {
msg.finConf <- int32(tx.Confirmations)
return true
}
// Otherwise, the transaciton has only been *partially* confirmed, so
// we need to insert it into the confirmationheap.
confsLeft := msg.numConfirmations - uint32(tx.Confirmations)
confHeight := uint32(currentHeight) + confsLeft
heapEntry := &confEntry{
msg,
confHeight,
}
heap.Push(b.confHeap, heapEntry)
return false
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *wire.ShaHash) {
@ -506,19 +549,6 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash,
b.notificationRegistry <- ntfn
// The following conditional checks transaction confirmation notification
// requests so that if the transaction has already been included in a block
// with the requested number of confirmations, the notification will be
// dispatched immediately.
tx, err := b.chainConn.GetRawTransactionVerbose(txid)
if err != nil {
if !strings.Contains(err.Error(), "No information") {
return nil, err
}
} else if uint32(tx.Confirmations) > numConfs {
ntfn.finConf <- int32(tx.Confirmations)
}
return &chainntnfs.ConfirmationEvent{
Confirmed: ntfn.finConf,
NegativeConf: ntfn.negativeConf,

@ -413,15 +413,15 @@ func testMultiClientConfirmationNotification(miner *rpctest.Harness,
}
// Tests the case in which a confirmation notification is requested for a
// transaction that has already been included in a block. In this case,
// the confirmation notification should be dispatched immediately.
// transaction that has already been included in a block. In this case, the
// confirmation notification should be dispatched immediately.
func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) {
t.Logf("testing transaction confirmed before notification registration")
// First, let's send some coins to "ourself", obtainig a txid.
// We're spending from a coinbase output here, so we use the dedicated
// First, let's send some coins to "ourself", obtaining a txid. We're
// spending from a coinbase output here, so we use the dedicated
// function.
txid, err := getTestTxId(miner)
@ -429,10 +429,10 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness,
t.Fatalf("unable to create test tx: %v", err)
}
// Now generate one block. The notifier must check older blocks when the
// confirmation event is registered below to ensure that the TXID hasn't
// already been included in the chain, otherwise the notification will
// never be sent.
// Now generate one block. The notifier must check older blocks when
// the confirmation event is registered below to ensure that the TXID
// hasn't already been included in the chain, otherwise the
// notification will never be sent.
if _, err := miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate two blocks: %v", err)
}
@ -456,6 +456,48 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness,
case <-time.After(2 * time.Second):
t.Fatalf("confirmation notification never received")
}
// Next, we want to test fully dispatching the notification for a
// transaction that has been *partially* confirmed. So we'll create
// another test txid.
txid, err = getTestTxId(miner)
if err != nil {
t.Fatalf("unable to create test tx: %v", err)
}
// We'll request 6 confirmations for the above generated txid, but we
// will generate the confirmations in chunks.
numConfs = 6
// First, generate 2 confirmations.
if _, err := miner.Node.Generate(2); err != nil {
t.Fatalf("unable to generate blocks: %v", err)
}
// Next, register for the notification *after* the transition has
// already been partially confirmed.
confIntent, err = notifier.RegisterConfirmationsNtfn(txid, numConfs)
if err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// With the notification registered, generate another 4 blocks, this
// should dispatch the notification.
if _, err := miner.Node.Generate(4); err != nil {
t.Fatalf("unable to generate blocks: %v", err)
}
confSent = make(chan int32)
go func() {
confSent <- <-confIntent.Confirmed
}()
select {
case <-confSent:
break
case <-time.After(2 * time.Second):
t.Fatalf("confirmation notification never received")
}
}
// Tests the case in which a spend notification is requested for a spend that
@ -466,11 +508,10 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
t.Logf("testing spend broadcast before notification registration")
// We'd like to test the spend notifications for all
// ChainNotifier concrete implemenations.
// We'd like to test the spend notifications for all ChainNotifier
// concrete implementations.
//
// To do so, we first create a new output to our test target
// address.
// To do so, we first create a new output to our test target address.
txid, err := getTestTxId(miner)
if err != nil {
t.Fatalf("unable to create test addr: %v", err)
@ -488,8 +529,8 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
}
tx := wrappedTx.MsgTx()
// Locate the output index sent to us. We need this so we can
// construct a spending txn below.
// Locate the output index sent to us. We need this so we can construct
// a spending txn below.
outIndex := -1
var pkScript []byte
for i, txOut := range tx.TxOut {
@ -534,8 +575,9 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
t.Fatalf("unable to generate single block: %v", err)
}
// Now, we register to be notified of a spend that has already happened.
// The notifier should dispatch a spend notification immediately.
// Now, we register to be notified of a spend that has already
// happened. The notifier should dispatch a spend notification
// immediately.
spentIntent, err := notifier.RegisterSpendNtfn(outpoint)
if err != nil {
t.Fatalf("unable to register for spend ntfn: %v", err)