chainntfns: implement RegisterBlockEpochNtfn for BtcdNotifier, add tests

This commit is contained in:
Olaoluwa Osuntokun 2016-09-08 11:27:07 -07:00
parent affd7793b3
commit e72c52288d
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 107 additions and 12 deletions

@ -2,7 +2,6 @@ package btcdnotify
import ( import (
"container/heap" "container/heap"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -35,9 +34,12 @@ type BtcdNotifier struct {
// TODO(roasbeef): make map point to slices? Would allow for multiple // TODO(roasbeef): make map point to slices? Would allow for multiple
// clients to listen for same spend. Would we ever need this? // clients to listen for same spend. Would we ever need this?
spendNotifications map[wire.OutPoint]*spendNotification spendNotifications map[wire.OutPoint]*spendNotification
confNotifications map[wire.ShaHash]*confirmationsNotification confNotifications map[wire.ShaHash]*confirmationsNotification
confHeap *confirmationHeap confHeap *confirmationHeap
blockEpochClients []chan *chainntnfs.BlockEpoch
connectedBlockHashes chan *blockNtfn connectedBlockHashes chan *blockNtfn
disconnectedBlockHashes chan *blockNtfn disconnectedBlockHashes chan *blockNtfn
relevantTxs chan *btcutil.Tx relevantTxs chan *btcutil.Tx
@ -173,28 +175,39 @@ out:
case registerMsg := <-b.notificationRegistry: case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) { switch msg := registerMsg.(type) {
case *spendNotification: case *spendNotification:
chainntnfs.Log.Infof("New spend subscription: "+
"utxo=%v", msg.targetOutpoint)
b.spendNotifications[*msg.targetOutpoint] = msg b.spendNotifications[*msg.targetOutpoint] = msg
case *confirmationsNotification: case *confirmationsNotification:
chainntnfs.Log.Infof("New confirmations "+ chainntnfs.Log.Infof("New confirmations "+
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
*msg.txid, msg.numConfirmations) *msg.txid, msg.numConfirmations)
b.confNotifications[*msg.txid] = msg b.confNotifications[*msg.txid] = msg
case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients = append(b.blockEpochClients,
msg.epochChan)
} }
case staleBlockHash := <-b.disconnectedBlockHashes: case staleBlockHash := <-b.disconnectedBlockHashes:
// TODO(roasbeef): re-orgs // TODO(roasbeef): re-orgs
// * second channel to notify of confirmation decrementing // * second channel to notify of confirmation decrementing
// re-org? // re-org?
// * notify of negative confirmations // * notify of negative confirmations
fmt.Println(staleBlockHash) chainntnfs.Log.Warnf("Block disconnected from main "+
"chain: %v", staleBlockHash)
case connectedBlock := <-b.connectedBlockHashes: case connectedBlock := <-b.connectedBlockHashes:
newBlock, err := b.chainConn.GetBlock(connectedBlock.sha) newBlock, err := b.chainConn.GetBlock(connectedBlock.sha)
if err != nil { if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err)
continue continue
} }
chainntnfs.Log.Infof("New block: height=%v, sha=%v", chainntnfs.Log.Infof("New block: height=%v, sha=%v",
connectedBlock.height, connectedBlock.sha) connectedBlock.height, connectedBlock.sha)
go b.notifyBlockEpochs(connectedBlock.height,
connectedBlock.sha)
newHeight := connectedBlock.height newHeight := connectedBlock.height
for _, tx := range newBlock.Transactions() { for _, tx := range newBlock.Transactions() {
// Check if the inclusion of this transaction // Check if the inclusion of this transaction
@ -243,6 +256,25 @@ out:
b.wg.Done() b.wg.Done()
} }
// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *wire.ShaHash) {
epoch := &chainntnfs.BlockEpoch{
Height: newHeight,
Hash: newSha,
}
// TODO(roasbeef): spwan a new goroutine for each client instead?
for _, epochChan := range b.blockEpochClients {
// Attempt a non-blocking send. If the buffered channel is
// full, then we no-op and move onto the next client.
select {
case epochChan <- epoch:
default:
}
}
}
// notifyConfs examines the current confirmation heap, sending off any // notifyConfs examines the current confirmation heap, sending off any
// notifications which have been triggered by the connection of a new block at // notifications which have been triggered by the connection of a new block at
// newBlockHeight. // newBlockHeight.
@ -369,10 +401,23 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash,
}, nil }, nil
} }
// blockEpochRegistration represents a client's intent to receive a
// notification with each newly connected block.
type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch
}
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
// caller to receive notificationsm, of each new block connected to the main // caller to receive notificationsm, of each new block connected to the main
// chain. // chain.
func (b *BtcdNotifier) RegisterBlockEpochNtfn(targetHeight int32) (*chainntnfs.BlockEpochEvent, error) { func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
// TODO(roasbeef): implement registration := &blockEpochRegistration{
return nil, nil epochChan: make(chan *chainntnfs.BlockEpoch, 20),
}
b.notificationRegistry <- registration
return &chainntnfs.BlockEpochEvent{
Epochs: registration.epochChan,
}, nil
} }

@ -15,6 +15,8 @@ import (
// //
// Concrete implementations of ChainNotifier should be able to support multiple // Concrete implementations of ChainNotifier should be able to support multiple
// concurrent client requests, as well as multiple concurrent notification events. // concurrent client requests, as well as multiple concurrent notification events.
// TODO(roasbeef): all events should have a Cancel() method to free up the
// resource
type ChainNotifier interface { type ChainNotifier interface {
// RegisterConfirmationsNtfn registers an intent to be notified once // RegisterConfirmationsNtfn registers an intent to be notified once
// txid reaches numConfs confirmations. The returned ConfirmationEvent // txid reaches numConfs confirmations. The returned ConfirmationEvent
@ -36,7 +38,7 @@ type ChainNotifier interface {
// new block connected to the tip of the main chain. The returned // new block connected to the tip of the main chain. The returned
// BlockEpochEvent struct contains a channel which will be sent upon // BlockEpochEvent struct contains a channel which will be sent upon
// for each new block discovered. // for each new block discovered.
RegisterBlockEpochNtfn(targetHeight int32) (*BlockEpochEvent, error) RegisterBlockEpochNtfn() (*BlockEpochEvent, error)
// Start the ChainNotifier. Once started, the implementation should be // Start the ChainNotifier. Once started, the implementation should be
// ready, and able to receive notification registrations from clients. // ready, and able to receive notification registrations from clients.
@ -49,10 +51,6 @@ type ChainNotifier interface {
Stop() error Stop() error
} }
// TODO(roasbeef): ln channels should request spend ntfns for counterparty's
// inputs to funding tx also, consider channel closed if funding tx re-org'd
// out and inputs double spent.
// TODO(roasbeef): all chans should be receive only. // TODO(roasbeef): all chans should be receive only.
// ConfirmationEvent encapsulates a confirmation notification. With this struct, // ConfirmationEvent encapsulates a confirmation notification. With this struct,

@ -2,6 +2,8 @@ package chainntnfs_test
import ( import (
"bytes" "bytes"
"log"
"sync"
"testing" "testing"
"time" "time"
@ -281,11 +283,60 @@ func testSpendNotification(miner *rpctest.Harness,
} }
} }
func testBlockEpochNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) {
// We'd like to test the case of multiple registered clients receiving
// block epoch notifications.
const numBlocks = 10
const numClients = 5
var wg sync.WaitGroup
// Create numClients clients which will listen for block notifications. We
// expect each client to receive 10 notifications for each of the ten
// blocks we generate below. So we'll use a WaitGroup to synchronize the
// test.
for i := 0; i < numClients; i++ {
epochClient, err := notifier.RegisterBlockEpochNtfn()
if err != nil {
t.Fatalf("unable to register for epoch notification")
}
wg.Add(numBlocks)
go func() {
for i := 0; i < numBlocks; i++ {
<-epochClient.Epochs
wg.Done()
}
}()
}
epochsSent := make(chan struct{})
go func() {
wg.Wait()
close(epochsSent)
}()
// Now generate 10 blocks, the clients above should each receive 10
// notifications, thereby unblocking the goroutine above.
if _, err := miner.Node.Generate(numBlocks); err != nil {
t.Fatalf("unable to generate blocks: %v", err)
}
select {
case <-epochsSent:
case <-time.After(2 * time.Second):
t.Fatalf("all notifications not sent")
}
}
var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T){ var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T){
testSingleConfirmationNotification, testSingleConfirmationNotification,
testMultiConfirmationNotification, testMultiConfirmationNotification,
testBatchConfirmationNotification, testBatchConfirmationNotification,
testSpendNotification, testSpendNotification,
testBlockEpochNotification,
} }
// TestInterfaces tests all registered interfaces with a unified set of tests // TestInterfaces tests all registered interfaces with a unified set of tests
@ -315,6 +366,7 @@ func TestInterfaces(t *testing.T) {
rpcConfig := miner.RPCConfig() rpcConfig := miner.RPCConfig()
log.Printf("Running %v ChainNotifier interface tests\n", len(ntfnTests))
var notifier chainntnfs.ChainNotifier var notifier chainntnfs.ChainNotifier
for _, notifierDriver := range chainntnfs.RegisteredNotifiers() { for _, notifierDriver := range chainntnfs.RegisteredNotifiers() {
notifierType := notifierDriver.NotifierType notifierType := notifierDriver.NotifierType