From 0873c4da766fe22d415210ac03cefdb30ed129db Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 9 Sep 2016 11:25:12 -0700 Subject: [PATCH] chainntnfs: add multi-client txid support for confirmation notifications This commit adds multi-client support for confirmation notification of the same transaction. Within the daemon there might be scenarios where multiple goroutines are waiting for the same transaction to be confirmed in order to properly fulfill their tasks. Previously if multiple clients were registered for the same txid confirmation notification, then only the client who registered last would receive the notification. --- chainntnfs/btcdnotify/btcd.go | 67 +++++++++++++++++++++-------------- chainntnfs/interface_test.go | 55 +++++++++++++++++++++++++++- 2 files changed, 94 insertions(+), 28 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 2c0277eb..b129a087 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -35,7 +35,7 @@ type BtcdNotifier struct { // clients to listen for same spend. Would we ever need this? spendNotifications map[wire.OutPoint]*spendNotification - confNotifications map[wire.ShaHash]*confirmationsNotification + confNotifications map[wire.ShaHash][]*confirmationsNotification confHeap *confirmationHeap blockEpochClients []chan *chainntnfs.BlockEpoch @@ -59,7 +59,7 @@ func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { notificationRegistry: make(chan interface{}), spendNotifications: make(map[wire.OutPoint]*spendNotification), - confNotifications: make(map[wire.ShaHash]*confirmationsNotification), + confNotifications: make(map[wire.ShaHash][]*confirmationsNotification), confHeap: newConfirmationHeap(), connectedBlockHashes: make(chan *blockNtfn, 20), @@ -130,9 +130,11 @@ func (b *BtcdNotifier) Stop() error { for _, spendClient := range b.spendNotifications { close(spendClient.spendChan) } - for _, confClient := range b.confNotifications { - close(confClient.finConf) - close(confClient.negativeConf) + for _, confClients := range b.confNotifications { + for _, confClient := range confClients { + close(confClient.finConf) + close(confClient.negativeConf) + } } return nil @@ -182,7 +184,8 @@ out: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", *msg.txid, msg.numConfirmations) - b.confNotifications[*msg.txid] = msg + txid := *msg.txid + b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients = append(b.blockEpochClients, @@ -310,34 +313,44 @@ func (b *BtcdNotifier) notifyConfs(newBlockHeight int32) { // heap to be triggered at a later time. // TODO(roasbeef): perhaps lookup, then track by inputs instead? func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, blockHeight int32) { + // If a confirmation notification has been registered // for this txid, then either trigger a notification // event if only a single confirmation notification was // requested, or place the notification on the // confirmation heap for future usage. - if confNtfn, ok := b.confNotifications[*txSha]; ok { - delete(b.confNotifications, *txSha) - if confNtfn.numConfirmations == 1 { - chainntnfs.Log.Infof("Dispatching single conf "+ - "notification, sha=%v, height=%v", txSha, - blockHeight) - confNtfn.finConf <- blockHeight - return - } + if confClients, ok := b.confNotifications[*txSha]; ok { + // Either all of the registered confirmations wtill be + // dispatched due to a single confirmation, or added to the + // conf head. Therefor we unconditioanlly delete the registered + // confirmations from the staging zone. + defer func() { + delete(b.confNotifications, *txSha) + }() - // The registered notification requires more - // than one confirmation before triggering. So - // we create a heapConf entry for this notification. - // The heapConf allows us to easily keep track of - // which notification(s) we should fire off with - // each incoming block. - confNtfn.initialConfirmHeight = uint32(blockHeight) - finalConfHeight := uint32(confNtfn.initialConfirmHeight + confNtfn.numConfirmations - 1) - heapEntry := &confEntry{ - confNtfn, - finalConfHeight, + for _, confClient := range confClients { + if confClient.numConfirmations == 1 { + chainntnfs.Log.Infof("Dispatching single conf "+ + "notification, sha=%v, height=%v", txSha, + blockHeight) + confClient.finConf <- blockHeight + continue + } + + // The registered notification requires more + // than one confirmation before triggering. So + // we create a heapConf entry for this notification. + // The heapConf allows us to easily keep track of + // which notification(s) we should fire off with + // each incoming block. + confClient.initialConfirmHeight = uint32(blockHeight) + finalConfHeight := uint32(confClient.initialConfirmHeight + confClient.numConfirmations - 1) + heapEntry := &confEntry{ + confClient, + finalConfHeight, + } + heap.Push(b.confHeap, heapEntry) } - heap.Push(b.confHeap, heapEntry) } } diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index ad29aa78..9cb0db1e 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -2,6 +2,7 @@ package chainntnfs_test import ( "bytes" + "fmt" "log" "sync" "testing" @@ -55,7 +56,7 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, txid, err := getTestTxId(miner) if err != nil { - t.Fatalf("unable to create test addr: %v", err) + t.Fatalf("unable to create test tx: %v", err) } // Now that we have a txid, register a confirmation notiication with @@ -331,10 +332,62 @@ func testBlockEpochNotification(miner *rpctest.Harness, } } +func testMultiClientConfirmationNotification(miner *rpctest.Harness, + notifier chainntnfs.ChainNotifier, t *testing.T) { + // TODO(roasbeef): test various conf targets w/ same txid + + // We'd like to test the case of a multiple clients registered to + // receive a confirmation notification for the same transaction. + + txid, err := getTestTxId(miner) + if err != nil { + t.Fatalf("unable to create test tx: %v", err) + } + + var wg sync.WaitGroup + const numConfsClients = 5 + const numConfs = 1 + + // Register for a conf notification for the above generated txid with + // numConfsClients distinct clients. + for i := 0; i < numConfsClients; i++ { + confClient, err := notifier.RegisterConfirmationsNtfn(txid, numConfs) + if err != nil { + t.Fatalf("unable to register for confirmation: %v", err) + } + + wg.Add(1) + go func() { + <-confClient.Confirmed + fmt.Println(i) + wg.Done() + }() + } + + confsSent := make(chan struct{}) + go func() { + wg.Wait() + close(confsSent) + }() + + // Finally, generate a single block which should trigger the unblocking + // of all numConfsClients blocked on the channel read above. + if _, err := miner.Node.Generate(1); err != nil { + t.Fatalf("unable to generate block: %v", err) + } + + select { + case <-confsSent: + case <-time.After(2 * time.Second): + t.Fatalf("all confirmation notifications not sent") + } +} + var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T){ testSingleConfirmationNotification, testMultiConfirmationNotification, testBatchConfirmationNotification, + testMultiClientConfirmationNotification, testSpendNotification, testBlockEpochNotification, }