chainntnfs: dispatch block notification for current tip upon registration
This commit is contained in:
parent
568f8271a0
commit
26d53c64f1
@ -278,22 +278,40 @@ out:
|
||||
|
||||
case *blockEpochRegistration:
|
||||
chainntnfs.Log.Infof("New block epoch subscription")
|
||||
|
||||
b.blockEpochClients[msg.epochID] = msg
|
||||
if msg.bestBlock != nil {
|
||||
missedBlocks, err :=
|
||||
chainntnfs.GetClientMissedBlocks(
|
||||
b.chainConn, msg.bestBlock,
|
||||
b.bestBlock.Height, true,
|
||||
)
|
||||
if err != nil {
|
||||
msg.errorChan <- err
|
||||
continue
|
||||
}
|
||||
for _, block := range missedBlocks {
|
||||
b.notifyBlockEpochClient(msg,
|
||||
block.Height, block.Hash)
|
||||
}
|
||||
|
||||
// If the client did not provide their best
|
||||
// known block, then we'll immediately dispatch
|
||||
// a notification for the current tip.
|
||||
if msg.bestBlock == nil {
|
||||
b.notifyBlockEpochClient(
|
||||
msg, b.bestBlock.Height,
|
||||
b.bestBlock.Hash,
|
||||
)
|
||||
|
||||
msg.errorChan <- nil
|
||||
continue
|
||||
}
|
||||
|
||||
// Otherwise, we'll attempt to deliver the
|
||||
// backlog of notifications from their best
|
||||
// known block.
|
||||
missedBlocks, err := chainntnfs.GetClientMissedBlocks(
|
||||
b.chainConn, msg.bestBlock,
|
||||
b.bestBlock.Height, true,
|
||||
)
|
||||
if err != nil {
|
||||
msg.errorChan <- err
|
||||
continue
|
||||
}
|
||||
|
||||
for _, block := range missedBlocks {
|
||||
b.notifyBlockEpochClient(
|
||||
msg, block.Height, block.Hash,
|
||||
)
|
||||
}
|
||||
|
||||
msg.errorChan <- nil
|
||||
}
|
||||
|
||||
@ -951,7 +969,9 @@ type epochCancel struct {
|
||||
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
|
||||
// caller to receive notifications, of each new block connected to the main
|
||||
// chain. Clients have the option of passing in their best known block, which
|
||||
// the notifier uses to check if they are behind on blocks and catch them up.
|
||||
// the notifier uses to check if they are behind on blocks and catch them up. If
|
||||
// they do not provide one, then a notification will be dispatched immediately
|
||||
// for the current tip of the chain upon a successful registration.
|
||||
func (b *BitcoindNotifier) RegisterBlockEpochNtfn(
|
||||
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
|
||||
|
||||
|
@ -325,23 +325,40 @@ out:
|
||||
|
||||
case *blockEpochRegistration:
|
||||
chainntnfs.Log.Infof("New block epoch subscription")
|
||||
b.blockEpochClients[msg.epochID] = msg
|
||||
if msg.bestBlock != nil {
|
||||
missedBlocks, err :=
|
||||
chainntnfs.GetClientMissedBlocks(
|
||||
b.chainConn, msg.bestBlock,
|
||||
b.bestBlock.Height, true,
|
||||
)
|
||||
if err != nil {
|
||||
msg.errorChan <- err
|
||||
continue
|
||||
}
|
||||
for _, block := range missedBlocks {
|
||||
b.notifyBlockEpochClient(msg,
|
||||
block.Height, block.Hash)
|
||||
}
|
||||
|
||||
b.blockEpochClients[msg.epochID] = msg
|
||||
|
||||
// If the client did not provide their best
|
||||
// known block, then we'll immediately dispatch
|
||||
// a notification for the current tip.
|
||||
if msg.bestBlock == nil {
|
||||
b.notifyBlockEpochClient(
|
||||
msg, b.bestBlock.Height,
|
||||
b.bestBlock.Hash,
|
||||
)
|
||||
|
||||
msg.errorChan <- nil
|
||||
continue
|
||||
}
|
||||
|
||||
// Otherwise, we'll attempt to deliver the
|
||||
// backlog of notifications from their best
|
||||
// known block.
|
||||
missedBlocks, err := chainntnfs.GetClientMissedBlocks(
|
||||
b.chainConn, msg.bestBlock,
|
||||
b.bestBlock.Height, true,
|
||||
)
|
||||
if err != nil {
|
||||
msg.errorChan <- err
|
||||
continue
|
||||
}
|
||||
|
||||
for _, block := range missedBlocks {
|
||||
b.notifyBlockEpochClient(
|
||||
msg, block.Height, block.Hash,
|
||||
)
|
||||
}
|
||||
|
||||
msg.errorChan <- nil
|
||||
}
|
||||
|
||||
@ -991,7 +1008,9 @@ type epochCancel struct {
|
||||
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
|
||||
// caller to receive notifications, of each new block connected to the main
|
||||
// chain. Clients have the option of passing in their best known block, which
|
||||
// the notifier uses to check if they are behind on blocks and catch them up.
|
||||
// the notifier uses to check if they are behind on blocks and catch them up. If
|
||||
// they do not provide one, then a notification will be dispatched immediately
|
||||
// for the current tip of the chain upon a successful registration.
|
||||
func (b *BtcdNotifier) RegisterBlockEpochNtfn(
|
||||
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
|
||||
|
||||
@ -1003,6 +1022,7 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn(
|
||||
bestBlock: bestBlock,
|
||||
errorChan: make(chan error, 1),
|
||||
}
|
||||
|
||||
reg.epochQueue.Start()
|
||||
|
||||
// Before we send the request to the main goroutine, we'll launch a new
|
||||
|
@ -127,7 +127,9 @@ type ChainNotifier interface {
|
||||
// Clients have the option of passing in their best known block.
|
||||
// If they specify a block, the ChainNotifier checks whether the client
|
||||
// is behind on blocks. If they are, the ChainNotifier sends a backlog
|
||||
// of block notifications for the missed blocks.
|
||||
// of block notifications for the missed blocks. If they do not provide
|
||||
// one, then a notification will be dispatched immediately for the
|
||||
// current tip of the chain upon a successful registration.
|
||||
RegisterBlockEpochNtfn(*BlockEpoch) (*BlockEpochEvent, error)
|
||||
|
||||
// Start the ChainNotifier. Once started, the implementation should be
|
||||
|
@ -398,22 +398,23 @@ func testBlockEpochNotification(miner *rpctest.Harness,
|
||||
// block epoch notifications.
|
||||
|
||||
const numBlocks = 10
|
||||
const numNtfns = numBlocks + 1
|
||||
const numClients = 5
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Create numClients clients which will listen for block notifications. We
|
||||
// expect each client to receive 10 notifications for each of the ten
|
||||
// blocks we generate below. So we'll use a WaitGroup to synchronize the
|
||||
// test.
|
||||
// expect each client to receive 11 notifications, one for the current
|
||||
// tip of the chain, and one for each of the ten blocks we generate
|
||||
// below. So we'll use a WaitGroup to synchronize the test.
|
||||
for i := 0; i < numClients; i++ {
|
||||
epochClient, err := notifier.RegisterBlockEpochNtfn(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to register for epoch notification")
|
||||
}
|
||||
|
||||
wg.Add(numBlocks)
|
||||
wg.Add(numNtfns)
|
||||
go func() {
|
||||
for i := 0; i < numBlocks; i++ {
|
||||
for i := 0; i < numNtfns; i++ {
|
||||
<-epochClient.Epochs
|
||||
wg.Done()
|
||||
}
|
||||
@ -1494,6 +1495,16 @@ func testCatchUpOnMissedBlocks(miner *rpctest.Harness,
|
||||
if err != nil {
|
||||
t.Fatalf("unable to register for epoch notification: %v", err)
|
||||
}
|
||||
|
||||
// Drain the notification dispatched upon registration as we're
|
||||
// not interested in it.
|
||||
select {
|
||||
case <-epochClient.Epochs:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("expected to receive epoch for current block " +
|
||||
"upon registration")
|
||||
}
|
||||
|
||||
clients = append(clients, epochClient)
|
||||
}
|
||||
|
||||
@ -1669,6 +1680,16 @@ func testCatchUpOnMissedBlocksWithReorg(miner1 *rpctest.Harness,
|
||||
if err != nil {
|
||||
t.Fatalf("unable to register for epoch notification: %v", err)
|
||||
}
|
||||
|
||||
// Drain the notification dispatched upon registration as we're
|
||||
// not interested in it.
|
||||
select {
|
||||
case <-epochClient.Epochs:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("expected to receive epoch for current block " +
|
||||
"upon registration")
|
||||
}
|
||||
|
||||
clients = append(clients, epochClient)
|
||||
}
|
||||
|
||||
|
@ -334,25 +334,44 @@ out:
|
||||
|
||||
case *blockEpochRegistration:
|
||||
chainntnfs.Log.Infof("New block epoch subscription")
|
||||
|
||||
n.blockEpochClients[msg.epochID] = msg
|
||||
if msg.bestBlock != nil {
|
||||
n.bestBlockMtx.Lock()
|
||||
bestHeight := n.bestBlock.Height
|
||||
n.bestBlockMtx.Unlock()
|
||||
missedBlocks, err :=
|
||||
chainntnfs.GetClientMissedBlocks(
|
||||
n.chainConn, msg.bestBlock,
|
||||
bestHeight, false,
|
||||
)
|
||||
if err != nil {
|
||||
msg.errorChan <- err
|
||||
continue
|
||||
}
|
||||
for _, block := range missedBlocks {
|
||||
n.notifyBlockEpochClient(msg,
|
||||
block.Height, block.Hash)
|
||||
}
|
||||
|
||||
// If the client did not provide their best
|
||||
// known block, then we'll immediately dispatch
|
||||
// a notification for the current tip.
|
||||
if msg.bestBlock == nil {
|
||||
n.notifyBlockEpochClient(
|
||||
msg, n.bestBlock.Height,
|
||||
n.bestBlock.Hash,
|
||||
)
|
||||
|
||||
msg.errorChan <- nil
|
||||
continue
|
||||
}
|
||||
|
||||
// Otherwise, we'll attempt to deliver the
|
||||
// backlog of notifications from their best
|
||||
// known block.
|
||||
n.bestBlockMtx.Lock()
|
||||
bestHeight := n.bestBlock.Height
|
||||
n.bestBlockMtx.Unlock()
|
||||
|
||||
missedBlocks, err := chainntnfs.GetClientMissedBlocks(
|
||||
n.chainConn, msg.bestBlock, bestHeight,
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
msg.errorChan <- err
|
||||
continue
|
||||
}
|
||||
|
||||
for _, block := range missedBlocks {
|
||||
n.notifyBlockEpochClient(
|
||||
msg, block.Height, block.Hash,
|
||||
)
|
||||
}
|
||||
|
||||
msg.errorChan <- nil
|
||||
|
||||
case *rescanFilterUpdate:
|
||||
@ -898,7 +917,9 @@ type epochCancel struct {
|
||||
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
|
||||
// caller to receive notifications, of each new block connected to the main
|
||||
// chain. Clients have the option of passing in their best known block, which
|
||||
// the notifier uses to check if they are behind on blocks and catch them up.
|
||||
// the notifier uses to check if they are behind on blocks and catch them up. If
|
||||
// they do not provide one, then a notification will be dispatched immediately
|
||||
// for the current tip of the chain upon a successful registration.
|
||||
func (n *NeutrinoNotifier) RegisterBlockEpochNtfn(
|
||||
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user