diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index ae83d220..2c0277eb 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -2,7 +2,6 @@ package btcdnotify import ( "container/heap" - "fmt" "sync" "sync/atomic" "time" @@ -35,8 +34,11 @@ type BtcdNotifier struct { // TODO(roasbeef): make map point to slices? Would allow for multiple // clients to listen for same spend. Would we ever need this? spendNotifications map[wire.OutPoint]*spendNotification - confNotifications map[wire.ShaHash]*confirmationsNotification - confHeap *confirmationHeap + + confNotifications map[wire.ShaHash]*confirmationsNotification + confHeap *confirmationHeap + + blockEpochClients []chan *chainntnfs.BlockEpoch connectedBlockHashes chan *blockNtfn disconnectedBlockHashes chan *blockNtfn @@ -173,28 +175,39 @@ out: case registerMsg := <-b.notificationRegistry: switch msg := registerMsg.(type) { case *spendNotification: + chainntnfs.Log.Infof("New spend subscription: "+ + "utxo=%v", msg.targetOutpoint) b.spendNotifications[*msg.targetOutpoint] = msg case *confirmationsNotification: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", *msg.txid, msg.numConfirmations) b.confNotifications[*msg.txid] = msg + case *blockEpochRegistration: + chainntnfs.Log.Infof("New block epoch subscription") + b.blockEpochClients = append(b.blockEpochClients, + msg.epochChan) } case staleBlockHash := <-b.disconnectedBlockHashes: // TODO(roasbeef): re-orgs // * second channel to notify of confirmation decrementing // re-org? // * notify of negative confirmations - fmt.Println(staleBlockHash) + chainntnfs.Log.Warnf("Block disconnected from main "+ + "chain: %v", staleBlockHash) case connectedBlock := <-b.connectedBlockHashes: newBlock, err := b.chainConn.GetBlock(connectedBlock.sha) if err != nil { + chainntnfs.Log.Errorf("Unable to get block: %v", err) continue } chainntnfs.Log.Infof("New block: height=%v, sha=%v", connectedBlock.height, connectedBlock.sha) + go b.notifyBlockEpochs(connectedBlock.height, + connectedBlock.sha) + newHeight := connectedBlock.height for _, tx := range newBlock.Transactions() { // Check if the inclusion of this transaction @@ -243,6 +256,25 @@ out: b.wg.Done() } +// notifyBlockEpochs notifies all registered block epoch clients of the newly +// connected block to the main chain. +func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *wire.ShaHash) { + epoch := &chainntnfs.BlockEpoch{ + Height: newHeight, + Hash: newSha, + } + + // TODO(roasbeef): spwan a new goroutine for each client instead? + for _, epochChan := range b.blockEpochClients { + // Attempt a non-blocking send. If the buffered channel is + // full, then we no-op and move onto the next client. + select { + case epochChan <- epoch: + default: + } + } +} + // notifyConfs examines the current confirmation heap, sending off any // notifications which have been triggered by the connection of a new block at // newBlockHeight. @@ -369,10 +401,23 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash, }, nil } +// blockEpochRegistration represents a client's intent to receive a +// notification with each newly connected block. +type blockEpochRegistration struct { + epochChan chan *chainntnfs.BlockEpoch +} + // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notificationsm, of each new block connected to the main // chain. -func (b *BtcdNotifier) RegisterBlockEpochNtfn(targetHeight int32) (*chainntnfs.BlockEpochEvent, error) { - // TODO(roasbeef): implement - return nil, nil +func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { + registration := &blockEpochRegistration{ + epochChan: make(chan *chainntnfs.BlockEpoch, 20), + } + + b.notificationRegistry <- registration + + return &chainntnfs.BlockEpochEvent{ + Epochs: registration.epochChan, + }, nil } diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index 58d54d6a..dd707e2f 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -15,6 +15,8 @@ import ( // // Concrete implementations of ChainNotifier should be able to support multiple // concurrent client requests, as well as multiple concurrent notification events. +// TODO(roasbeef): all events should have a Cancel() method to free up the +// resource type ChainNotifier interface { // RegisterConfirmationsNtfn registers an intent to be notified once // txid reaches numConfs confirmations. The returned ConfirmationEvent @@ -36,7 +38,7 @@ type ChainNotifier interface { // new block connected to the tip of the main chain. The returned // BlockEpochEvent struct contains a channel which will be sent upon // for each new block discovered. - RegisterBlockEpochNtfn(targetHeight int32) (*BlockEpochEvent, error) + RegisterBlockEpochNtfn() (*BlockEpochEvent, error) // Start the ChainNotifier. Once started, the implementation should be // ready, and able to receive notification registrations from clients. @@ -49,10 +51,6 @@ type ChainNotifier interface { Stop() error } -// TODO(roasbeef): ln channels should request spend ntfns for counterparty's -// inputs to funding tx also, consider channel closed if funding tx re-org'd -// out and inputs double spent. - // TODO(roasbeef): all chans should be receive only. // ConfirmationEvent encapsulates a confirmation notification. With this struct, diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index e5febffc..ad29aa78 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -2,6 +2,8 @@ package chainntnfs_test import ( "bytes" + "log" + "sync" "testing" "time" @@ -281,11 +283,60 @@ func testSpendNotification(miner *rpctest.Harness, } } +func testBlockEpochNotification(miner *rpctest.Harness, + notifier chainntnfs.ChainNotifier, t *testing.T) { + + // We'd like to test the case of multiple registered clients receiving + // block epoch notifications. + + const numBlocks = 10 + const numClients = 5 + var wg sync.WaitGroup + + // Create numClients clients which will listen for block notifications. We + // expect each client to receive 10 notifications for each of the ten + // blocks we generate below. So we'll use a WaitGroup to synchronize the + // test. + for i := 0; i < numClients; i++ { + epochClient, err := notifier.RegisterBlockEpochNtfn() + if err != nil { + t.Fatalf("unable to register for epoch notification") + } + + wg.Add(numBlocks) + go func() { + for i := 0; i < numBlocks; i++ { + <-epochClient.Epochs + wg.Done() + } + }() + } + + epochsSent := make(chan struct{}) + go func() { + wg.Wait() + close(epochsSent) + }() + + // Now generate 10 blocks, the clients above should each receive 10 + // notifications, thereby unblocking the goroutine above. + if _, err := miner.Node.Generate(numBlocks); err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + + select { + case <-epochsSent: + case <-time.After(2 * time.Second): + t.Fatalf("all notifications not sent") + } +} + var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T){ testSingleConfirmationNotification, testMultiConfirmationNotification, testBatchConfirmationNotification, testSpendNotification, + testBlockEpochNotification, } // TestInterfaces tests all registered interfaces with a unified set of tests @@ -315,6 +366,7 @@ func TestInterfaces(t *testing.T) { rpcConfig := miner.RPCConfig() + log.Printf("Running %v ChainNotifier interface tests\n", len(ntfnTests)) var notifier chainntnfs.ChainNotifier for _, notifierDriver := range chainntnfs.RegisteredNotifiers() { notifierType := notifierDriver.NotifierType