chainntnfs: add multi-client txid support for confirmation notifications

This commit adds multi-client support for confirmation notification of
the same transaction. Within the daemon there might be scenarios where
multiple goroutines are waiting for the same transaction to be
confirmed in order to properly fulfill their tasks. Previously if
multiple clients were registered for the same txid confirmation
notification, then only the client who registered last would receive
the notification.
This commit is contained in:
Olaoluwa Osuntokun 2016-09-09 11:25:12 -07:00
parent e858bb5ca2
commit 0873c4da76
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 94 additions and 28 deletions

@ -35,7 +35,7 @@ type BtcdNotifier struct {
// clients to listen for same spend. Would we ever need this? // clients to listen for same spend. Would we ever need this?
spendNotifications map[wire.OutPoint]*spendNotification spendNotifications map[wire.OutPoint]*spendNotification
confNotifications map[wire.ShaHash]*confirmationsNotification confNotifications map[wire.ShaHash][]*confirmationsNotification
confHeap *confirmationHeap confHeap *confirmationHeap
blockEpochClients []chan *chainntnfs.BlockEpoch blockEpochClients []chan *chainntnfs.BlockEpoch
@ -59,7 +59,7 @@ func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) {
notificationRegistry: make(chan interface{}), notificationRegistry: make(chan interface{}),
spendNotifications: make(map[wire.OutPoint]*spendNotification), spendNotifications: make(map[wire.OutPoint]*spendNotification),
confNotifications: make(map[wire.ShaHash]*confirmationsNotification), confNotifications: make(map[wire.ShaHash][]*confirmationsNotification),
confHeap: newConfirmationHeap(), confHeap: newConfirmationHeap(),
connectedBlockHashes: make(chan *blockNtfn, 20), connectedBlockHashes: make(chan *blockNtfn, 20),
@ -130,9 +130,11 @@ func (b *BtcdNotifier) Stop() error {
for _, spendClient := range b.spendNotifications { for _, spendClient := range b.spendNotifications {
close(spendClient.spendChan) close(spendClient.spendChan)
} }
for _, confClient := range b.confNotifications { for _, confClients := range b.confNotifications {
close(confClient.finConf) for _, confClient := range confClients {
close(confClient.negativeConf) close(confClient.finConf)
close(confClient.negativeConf)
}
} }
return nil return nil
@ -182,7 +184,8 @@ out:
chainntnfs.Log.Infof("New confirmations "+ chainntnfs.Log.Infof("New confirmations "+
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
*msg.txid, msg.numConfirmations) *msg.txid, msg.numConfirmations)
b.confNotifications[*msg.txid] = msg txid := *msg.txid
b.confNotifications[txid] = append(b.confNotifications[txid], msg)
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients = append(b.blockEpochClients, b.blockEpochClients = append(b.blockEpochClients,
@ -310,34 +313,44 @@ func (b *BtcdNotifier) notifyConfs(newBlockHeight int32) {
// heap to be triggered at a later time. // heap to be triggered at a later time.
// TODO(roasbeef): perhaps lookup, then track by inputs instead? // TODO(roasbeef): perhaps lookup, then track by inputs instead?
func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, blockHeight int32) { func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, blockHeight int32) {
// If a confirmation notification has been registered // If a confirmation notification has been registered
// for this txid, then either trigger a notification // for this txid, then either trigger a notification
// event if only a single confirmation notification was // event if only a single confirmation notification was
// requested, or place the notification on the // requested, or place the notification on the
// confirmation heap for future usage. // confirmation heap for future usage.
if confNtfn, ok := b.confNotifications[*txSha]; ok { if confClients, ok := b.confNotifications[*txSha]; ok {
delete(b.confNotifications, *txSha) // Either all of the registered confirmations wtill be
if confNtfn.numConfirmations == 1 { // dispatched due to a single confirmation, or added to the
chainntnfs.Log.Infof("Dispatching single conf "+ // conf head. Therefor we unconditioanlly delete the registered
"notification, sha=%v, height=%v", txSha, // confirmations from the staging zone.
blockHeight) defer func() {
confNtfn.finConf <- blockHeight delete(b.confNotifications, *txSha)
return }()
}
// The registered notification requires more for _, confClient := range confClients {
// than one confirmation before triggering. So if confClient.numConfirmations == 1 {
// we create a heapConf entry for this notification. chainntnfs.Log.Infof("Dispatching single conf "+
// The heapConf allows us to easily keep track of "notification, sha=%v, height=%v", txSha,
// which notification(s) we should fire off with blockHeight)
// each incoming block. confClient.finConf <- blockHeight
confNtfn.initialConfirmHeight = uint32(blockHeight) continue
finalConfHeight := uint32(confNtfn.initialConfirmHeight + confNtfn.numConfirmations - 1) }
heapEntry := &confEntry{
confNtfn, // The registered notification requires more
finalConfHeight, // than one confirmation before triggering. So
// we create a heapConf entry for this notification.
// The heapConf allows us to easily keep track of
// which notification(s) we should fire off with
// each incoming block.
confClient.initialConfirmHeight = uint32(blockHeight)
finalConfHeight := uint32(confClient.initialConfirmHeight + confClient.numConfirmations - 1)
heapEntry := &confEntry{
confClient,
finalConfHeight,
}
heap.Push(b.confHeap, heapEntry)
} }
heap.Push(b.confHeap, heapEntry)
} }
} }

@ -2,6 +2,7 @@ package chainntnfs_test
import ( import (
"bytes" "bytes"
"fmt"
"log" "log"
"sync" "sync"
"testing" "testing"
@ -55,7 +56,7 @@ func testSingleConfirmationNotification(miner *rpctest.Harness,
txid, err := getTestTxId(miner) txid, err := getTestTxId(miner)
if err != nil { if err != nil {
t.Fatalf("unable to create test addr: %v", err) t.Fatalf("unable to create test tx: %v", err)
} }
// Now that we have a txid, register a confirmation notiication with // Now that we have a txid, register a confirmation notiication with
@ -331,10 +332,62 @@ func testBlockEpochNotification(miner *rpctest.Harness,
} }
} }
func testMultiClientConfirmationNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) {
// TODO(roasbeef): test various conf targets w/ same txid
// We'd like to test the case of a multiple clients registered to
// receive a confirmation notification for the same transaction.
txid, err := getTestTxId(miner)
if err != nil {
t.Fatalf("unable to create test tx: %v", err)
}
var wg sync.WaitGroup
const numConfsClients = 5
const numConfs = 1
// Register for a conf notification for the above generated txid with
// numConfsClients distinct clients.
for i := 0; i < numConfsClients; i++ {
confClient, err := notifier.RegisterConfirmationsNtfn(txid, numConfs)
if err != nil {
t.Fatalf("unable to register for confirmation: %v", err)
}
wg.Add(1)
go func() {
<-confClient.Confirmed
fmt.Println(i)
wg.Done()
}()
}
confsSent := make(chan struct{})
go func() {
wg.Wait()
close(confsSent)
}()
// Finally, generate a single block which should trigger the unblocking
// of all numConfsClients blocked on the channel read above.
if _, err := miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate block: %v", err)
}
select {
case <-confsSent:
case <-time.After(2 * time.Second):
t.Fatalf("all confirmation notifications not sent")
}
}
var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T){ var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T){
testSingleConfirmationNotification, testSingleConfirmationNotification,
testMultiConfirmationNotification, testMultiConfirmationNotification,
testBatchConfirmationNotification, testBatchConfirmationNotification,
testMultiClientConfirmationNotification,
testSpendNotification, testSpendNotification,
testBlockEpochNotification, testBlockEpochNotification,
} }