diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 0d422616..15f52250 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -278,22 +278,40 @@ out: case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") + b.blockEpochClients[msg.epochID] = msg - if msg.bestBlock != nil { - missedBlocks, err := - chainntnfs.GetClientMissedBlocks( - b.chainConn, msg.bestBlock, - b.bestBlock.Height, true, - ) - if err != nil { - msg.errorChan <- err - continue - } - for _, block := range missedBlocks { - b.notifyBlockEpochClient(msg, - block.Height, block.Hash) - } + + // If the client did not provide their best + // known block, then we'll immediately dispatch + // a notification for the current tip. + if msg.bestBlock == nil { + b.notifyBlockEpochClient( + msg, b.bestBlock.Height, + b.bestBlock.Hash, + ) + + msg.errorChan <- nil + continue } + + // Otherwise, we'll attempt to deliver the + // backlog of notifications from their best + // known block. + missedBlocks, err := chainntnfs.GetClientMissedBlocks( + b.chainConn, msg.bestBlock, + b.bestBlock.Height, true, + ) + if err != nil { + msg.errorChan <- err + continue + } + + for _, block := range missedBlocks { + b.notifyBlockEpochClient( + msg, block.Height, block.Hash, + ) + } + msg.errorChan <- nil } @@ -951,7 +969,9 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notifications, of each new block connected to the main // chain. Clients have the option of passing in their best known block, which -// the notifier uses to check if they are behind on blocks and catch them up. +// the notifier uses to check if they are behind on blocks and catch them up. If +// they do not provide one, then a notification will be dispatched immediately +// for the current tip of the chain upon a successful registration. func (b *BitcoindNotifier) RegisterBlockEpochNtfn( bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index d4979914..b31fcf6b 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -325,23 +325,40 @@ out: case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") - b.blockEpochClients[msg.epochID] = msg - if msg.bestBlock != nil { - missedBlocks, err := - chainntnfs.GetClientMissedBlocks( - b.chainConn, msg.bestBlock, - b.bestBlock.Height, true, - ) - if err != nil { - msg.errorChan <- err - continue - } - for _, block := range missedBlocks { - b.notifyBlockEpochClient(msg, - block.Height, block.Hash) - } + b.blockEpochClients[msg.epochID] = msg + + // If the client did not provide their best + // known block, then we'll immediately dispatch + // a notification for the current tip. + if msg.bestBlock == nil { + b.notifyBlockEpochClient( + msg, b.bestBlock.Height, + b.bestBlock.Hash, + ) + + msg.errorChan <- nil + continue } + + // Otherwise, we'll attempt to deliver the + // backlog of notifications from their best + // known block. + missedBlocks, err := chainntnfs.GetClientMissedBlocks( + b.chainConn, msg.bestBlock, + b.bestBlock.Height, true, + ) + if err != nil { + msg.errorChan <- err + continue + } + + for _, block := range missedBlocks { + b.notifyBlockEpochClient( + msg, block.Height, block.Hash, + ) + } + msg.errorChan <- nil } @@ -991,7 +1008,9 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notifications, of each new block connected to the main // chain. Clients have the option of passing in their best known block, which -// the notifier uses to check if they are behind on blocks and catch them up. +// the notifier uses to check if they are behind on blocks and catch them up. If +// they do not provide one, then a notification will be dispatched immediately +// for the current tip of the chain upon a successful registration. func (b *BtcdNotifier) RegisterBlockEpochNtfn( bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { @@ -1003,6 +1022,7 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn( bestBlock: bestBlock, errorChan: make(chan error, 1), } + reg.epochQueue.Start() // Before we send the request to the main goroutine, we'll launch a new diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index f2de59f7..97a62193 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -127,7 +127,9 @@ type ChainNotifier interface { // Clients have the option of passing in their best known block. // If they specify a block, the ChainNotifier checks whether the client // is behind on blocks. If they are, the ChainNotifier sends a backlog - // of block notifications for the missed blocks. + // of block notifications for the missed blocks. If they do not provide + // one, then a notification will be dispatched immediately for the + // current tip of the chain upon a successful registration. RegisterBlockEpochNtfn(*BlockEpoch) (*BlockEpochEvent, error) // Start the ChainNotifier. Once started, the implementation should be diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 4d4e0807..9953fba7 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -398,22 +398,23 @@ func testBlockEpochNotification(miner *rpctest.Harness, // block epoch notifications. const numBlocks = 10 + const numNtfns = numBlocks + 1 const numClients = 5 var wg sync.WaitGroup // Create numClients clients which will listen for block notifications. We - // expect each client to receive 10 notifications for each of the ten - // blocks we generate below. So we'll use a WaitGroup to synchronize the - // test. + // expect each client to receive 11 notifications, one for the current + // tip of the chain, and one for each of the ten blocks we generate + // below. So we'll use a WaitGroup to synchronize the test. for i := 0; i < numClients; i++ { epochClient, err := notifier.RegisterBlockEpochNtfn(nil) if err != nil { t.Fatalf("unable to register for epoch notification") } - wg.Add(numBlocks) + wg.Add(numNtfns) go func() { - for i := 0; i < numBlocks; i++ { + for i := 0; i < numNtfns; i++ { <-epochClient.Epochs wg.Done() } @@ -1494,6 +1495,16 @@ func testCatchUpOnMissedBlocks(miner *rpctest.Harness, if err != nil { t.Fatalf("unable to register for epoch notification: %v", err) } + + // Drain the notification dispatched upon registration as we're + // not interested in it. + select { + case <-epochClient.Epochs: + case <-time.After(5 * time.Second): + t.Fatal("expected to receive epoch for current block " + + "upon registration") + } + clients = append(clients, epochClient) } @@ -1669,6 +1680,16 @@ func testCatchUpOnMissedBlocksWithReorg(miner1 *rpctest.Harness, if err != nil { t.Fatalf("unable to register for epoch notification: %v", err) } + + // Drain the notification dispatched upon registration as we're + // not interested in it. + select { + case <-epochClient.Epochs: + case <-time.After(5 * time.Second): + t.Fatal("expected to receive epoch for current block " + + "upon registration") + } + clients = append(clients, epochClient) } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index a2db0763..5ff7039e 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -334,25 +334,44 @@ out: case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") + n.blockEpochClients[msg.epochID] = msg - if msg.bestBlock != nil { - n.bestBlockMtx.Lock() - bestHeight := n.bestBlock.Height - n.bestBlockMtx.Unlock() - missedBlocks, err := - chainntnfs.GetClientMissedBlocks( - n.chainConn, msg.bestBlock, - bestHeight, false, - ) - if err != nil { - msg.errorChan <- err - continue - } - for _, block := range missedBlocks { - n.notifyBlockEpochClient(msg, - block.Height, block.Hash) - } + + // If the client did not provide their best + // known block, then we'll immediately dispatch + // a notification for the current tip. + if msg.bestBlock == nil { + n.notifyBlockEpochClient( + msg, n.bestBlock.Height, + n.bestBlock.Hash, + ) + + msg.errorChan <- nil + continue } + + // Otherwise, we'll attempt to deliver the + // backlog of notifications from their best + // known block. + n.bestBlockMtx.Lock() + bestHeight := n.bestBlock.Height + n.bestBlockMtx.Unlock() + + missedBlocks, err := chainntnfs.GetClientMissedBlocks( + n.chainConn, msg.bestBlock, bestHeight, + false, + ) + if err != nil { + msg.errorChan <- err + continue + } + + for _, block := range missedBlocks { + n.notifyBlockEpochClient( + msg, block.Height, block.Hash, + ) + } + msg.errorChan <- nil case *rescanFilterUpdate: @@ -898,7 +917,9 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notifications, of each new block connected to the main // chain. Clients have the option of passing in their best known block, which -// the notifier uses to check if they are behind on blocks and catch them up. +// the notifier uses to check if they are behind on blocks and catch them up. If +// they do not provide one, then a notification will be dispatched immediately +// for the current tip of the chain upon a successful registration. func (n *NeutrinoNotifier) RegisterBlockEpochNtfn( bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {