diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 2c0277eb..b129a087 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -35,7 +35,7 @@ type BtcdNotifier struct { // clients to listen for same spend. Would we ever need this? spendNotifications map[wire.OutPoint]*spendNotification - confNotifications map[wire.ShaHash]*confirmationsNotification + confNotifications map[wire.ShaHash][]*confirmationsNotification confHeap *confirmationHeap blockEpochClients []chan *chainntnfs.BlockEpoch @@ -59,7 +59,7 @@ func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { notificationRegistry: make(chan interface{}), spendNotifications: make(map[wire.OutPoint]*spendNotification), - confNotifications: make(map[wire.ShaHash]*confirmationsNotification), + confNotifications: make(map[wire.ShaHash][]*confirmationsNotification), confHeap: newConfirmationHeap(), connectedBlockHashes: make(chan *blockNtfn, 20), @@ -130,9 +130,11 @@ func (b *BtcdNotifier) Stop() error { for _, spendClient := range b.spendNotifications { close(spendClient.spendChan) } - for _, confClient := range b.confNotifications { - close(confClient.finConf) - close(confClient.negativeConf) + for _, confClients := range b.confNotifications { + for _, confClient := range confClients { + close(confClient.finConf) + close(confClient.negativeConf) + } } return nil @@ -182,7 +184,8 @@ out: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", *msg.txid, msg.numConfirmations) - b.confNotifications[*msg.txid] = msg + txid := *msg.txid + b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients = append(b.blockEpochClients, @@ -310,34 +313,44 @@ func (b *BtcdNotifier) notifyConfs(newBlockHeight int32) { // heap to be triggered at a later time. // TODO(roasbeef): perhaps lookup, then track by inputs instead? func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, blockHeight int32) { + // If a confirmation notification has been registered // for this txid, then either trigger a notification // event if only a single confirmation notification was // requested, or place the notification on the // confirmation heap for future usage. - if confNtfn, ok := b.confNotifications[*txSha]; ok { - delete(b.confNotifications, *txSha) - if confNtfn.numConfirmations == 1 { - chainntnfs.Log.Infof("Dispatching single conf "+ - "notification, sha=%v, height=%v", txSha, - blockHeight) - confNtfn.finConf <- blockHeight - return - } + if confClients, ok := b.confNotifications[*txSha]; ok { + // Either all of the registered confirmations wtill be + // dispatched due to a single confirmation, or added to the + // conf head. Therefor we unconditioanlly delete the registered + // confirmations from the staging zone. + defer func() { + delete(b.confNotifications, *txSha) + }() - // The registered notification requires more - // than one confirmation before triggering. So - // we create a heapConf entry for this notification. - // The heapConf allows us to easily keep track of - // which notification(s) we should fire off with - // each incoming block. - confNtfn.initialConfirmHeight = uint32(blockHeight) - finalConfHeight := uint32(confNtfn.initialConfirmHeight + confNtfn.numConfirmations - 1) - heapEntry := &confEntry{ - confNtfn, - finalConfHeight, + for _, confClient := range confClients { + if confClient.numConfirmations == 1 { + chainntnfs.Log.Infof("Dispatching single conf "+ + "notification, sha=%v, height=%v", txSha, + blockHeight) + confClient.finConf <- blockHeight + continue + } + + // The registered notification requires more + // than one confirmation before triggering. So + // we create a heapConf entry for this notification. + // The heapConf allows us to easily keep track of + // which notification(s) we should fire off with + // each incoming block. + confClient.initialConfirmHeight = uint32(blockHeight) + finalConfHeight := uint32(confClient.initialConfirmHeight + confClient.numConfirmations - 1) + heapEntry := &confEntry{ + confClient, + finalConfHeight, + } + heap.Push(b.confHeap, heapEntry) } - heap.Push(b.confHeap, heapEntry) } } diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index ad29aa78..9cb0db1e 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -2,6 +2,7 @@ package chainntnfs_test import ( "bytes" + "fmt" "log" "sync" "testing" @@ -55,7 +56,7 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, txid, err := getTestTxId(miner) if err != nil { - t.Fatalf("unable to create test addr: %v", err) + t.Fatalf("unable to create test tx: %v", err) } // Now that we have a txid, register a confirmation notiication with @@ -331,10 +332,62 @@ func testBlockEpochNotification(miner *rpctest.Harness, } } +func testMultiClientConfirmationNotification(miner *rpctest.Harness, + notifier chainntnfs.ChainNotifier, t *testing.T) { + // TODO(roasbeef): test various conf targets w/ same txid + + // We'd like to test the case of a multiple clients registered to + // receive a confirmation notification for the same transaction. + + txid, err := getTestTxId(miner) + if err != nil { + t.Fatalf("unable to create test tx: %v", err) + } + + var wg sync.WaitGroup + const numConfsClients = 5 + const numConfs = 1 + + // Register for a conf notification for the above generated txid with + // numConfsClients distinct clients. + for i := 0; i < numConfsClients; i++ { + confClient, err := notifier.RegisterConfirmationsNtfn(txid, numConfs) + if err != nil { + t.Fatalf("unable to register for confirmation: %v", err) + } + + wg.Add(1) + go func() { + <-confClient.Confirmed + fmt.Println(i) + wg.Done() + }() + } + + confsSent := make(chan struct{}) + go func() { + wg.Wait() + close(confsSent) + }() + + // Finally, generate a single block which should trigger the unblocking + // of all numConfsClients blocked on the channel read above. + if _, err := miner.Node.Generate(1); err != nil { + t.Fatalf("unable to generate block: %v", err) + } + + select { + case <-confsSent: + case <-time.After(2 * time.Second): + t.Fatalf("all confirmation notifications not sent") + } +} + var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T){ testSingleConfirmationNotification, testMultiConfirmationNotification, testBatchConfirmationNotification, + testMultiClientConfirmationNotification, testSpendNotification, testBlockEpochNotification, }