chainntnfs: track best block in btcd and bitcoind

This commit is contained in:
Valentine Wallace 2018-08-09 00:05:28 -07:00
parent 1ffc3bb82e
commit d4cf271526
No known key found for this signature in database
GPG Key ID: B0E55E8D1776A58D
2 changed files with 28 additions and 20 deletions

@ -74,6 +74,8 @@ type BitcoindNotifier struct {
blockEpochClients map[uint64]*blockEpochRegistration blockEpochClients map[uint64]*blockEpochRegistration
bestBlock chainntnfs.BlockEpoch
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
} }
@ -119,7 +121,7 @@ func (b *BitcoindNotifier) Start() error {
return err return err
} }
_, currentHeight, err := b.chainConn.GetBestBlock() currentHash, currentHeight, err := b.chainConn.GetBestBlock()
if err != nil { if err != nil {
return err return err
} }
@ -127,8 +129,13 @@ func (b *BitcoindNotifier) Start() error {
b.txConfNotifier = chainntnfs.NewTxConfNotifier( b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit) uint32(currentHeight), reorgSafetyLimit)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
Hash: currentHash,
}
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher(currentHeight) go b.notificationDispatcher()
return nil return nil
} }
@ -174,7 +181,7 @@ type blockNtfn struct {
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches. // notification registrations, as well as notification dispatches.
func (b *BitcoindNotifier) notificationDispatcher(bestHeight int32) { func (b *BitcoindNotifier) notificationDispatcher() {
out: out:
for { for {
select { select {
@ -235,7 +242,7 @@ out:
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
msg.TxID, msg.NumConfirmations) msg.TxID, msg.NumConfirmations)
currentHeight := uint32(bestHeight) currentHeight := uint32(b.bestBlock.Height)
// Look up whether the transaction is already // Look up whether the transaction is already
// included in the active chain. We'll do this // included in the active chain. We'll do this
@ -270,19 +277,18 @@ out:
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
case chain.RelevantTx: case chain.RelevantTx:
b.handleRelevantTx(msg, bestHeight) b.handleRelevantTx(msg, b.bestBlock.Height)
} }
case ntfn := <-b.chainConn.Notifications(): case ntfn := <-b.chainConn.Notifications():
switch item := ntfn.(type) { switch item := ntfn.(type) {
case chain.BlockConnected: case chain.BlockConnected:
if item.Height != bestHeight+1 { if item.Height != b.bestBlock.Height+1 {
chainntnfs.Log.Warnf("Received blocks out of order: "+ chainntnfs.Log.Warnf("Received blocks out of order: "+
"current height=%d, new height=%d", "current height=%d, new height=%d",
bestHeight, item.Height) bestHeight, item.Height)
continue continue
} }
bestHeight = item.Height
rawBlock, err := b.chainConn.GetBlock(&item.Hash) rawBlock, err := b.chainConn.GetBlock(&item.Hash)
if err != nil { if err != nil {
@ -304,14 +310,13 @@ out:
continue continue
case chain.BlockDisconnected: case chain.BlockDisconnected:
if item.Height != bestHeight { if item.Height != b.bestBlock.Height {
chainntnfs.Log.Warnf("Received blocks "+ chainntnfs.Log.Warnf("Received blocks "+
"out of order: current height="+ "out of order: current height="+
"%d, disconnected height=%d", "%d, disconnected height=%d",
bestHeight, item.Height) bestHeight, item.Height)
continue continue
} }
bestHeight = item.Height - 1
chainntnfs.Log.Infof("Block disconnected from "+ chainntnfs.Log.Infof("Block disconnected from "+
"main chain: height=%v, sha=%v", "main chain: height=%v, sha=%v",
@ -324,7 +329,7 @@ out:
} }
case chain.RelevantTx: case chain.RelevantTx:
b.handleRelevantTx(item, bestHeight) b.handleRelevantTx(item, b.bestBlock.Height)
} }
case <-b.quit: case <-b.quit:

@ -78,6 +78,8 @@ type BtcdNotifier struct {
blockEpochClients map[uint64]*blockEpochRegistration blockEpochClients map[uint64]*blockEpochRegistration
bestBlock chainntnfs.BlockEpoch
chainUpdates *chainntnfs.ConcurrentQueue chainUpdates *chainntnfs.ConcurrentQueue
txUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue
@ -142,7 +144,7 @@ func (b *BtcdNotifier) Start() error {
return err return err
} }
_, currentHeight, err := b.chainConn.GetBestBlock() currentHash, currentHeight, err := b.chainConn.GetBestBlock()
if err != nil { if err != nil {
return err return err
} }
@ -150,11 +152,16 @@ func (b *BtcdNotifier) Start() error {
b.txConfNotifier = chainntnfs.NewTxConfNotifier( b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit) uint32(currentHeight), reorgSafetyLimit)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
Hash: currentHash,
}
b.chainUpdates.Start() b.chainUpdates.Start()
b.txUpdates.Start() b.txUpdates.Start()
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher(currentHeight) go b.notificationDispatcher()
return nil return nil
} }
@ -244,7 +251,7 @@ func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetai
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches. // notification registrations, as well as notification dispatches.
func (b *BtcdNotifier) notificationDispatcher(currentHeight int32) { func (b *BtcdNotifier) notificationDispatcher() {
out: out:
for { for {
select { select {
@ -304,7 +311,7 @@ out:
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
msg.TxID, msg.NumConfirmations) msg.TxID, msg.NumConfirmations)
bestHeight := uint32(currentHeight) bestHeight := uint32(b.bestBlock.Height)
// Look up whether the transaction is already // Look up whether the transaction is already
// included in the active chain. We'll do this // included in the active chain. We'll do this
@ -342,15 +349,13 @@ out:
case item := <-b.chainUpdates.ChanOut(): case item := <-b.chainUpdates.ChanOut():
update := item.(*chainUpdate) update := item.(*chainUpdate)
if update.connect { if update.connect {
if update.blockHeight != currentHeight+1 { if update.blockHeight != b.bestBlock.Height+1 {
chainntnfs.Log.Warnf("Received blocks out of order: "+ chainntnfs.Log.Warnf("Received blocks out of order: "+
"current height=%d, new height=%d", "current height=%d, new height=%d",
currentHeight, update.blockHeight) currentHeight, update.blockHeight)
continue continue
} }
currentHeight = update.blockHeight
rawBlock, err := b.chainConn.GetBlock(update.blockHash) rawBlock, err := b.chainConn.GetBlock(update.blockHash)
if err != nil { if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err) chainntnfs.Log.Errorf("Unable to get block: %v", err)
@ -374,15 +379,13 @@ out:
continue continue
} }
if update.blockHeight != currentHeight { if update.blockHeight != b.bestBlock.Height {
chainntnfs.Log.Warnf("Received blocks out of order: "+ chainntnfs.Log.Warnf("Received blocks out of order: "+
"current height=%d, disconnected height=%d", "current height=%d, disconnected height=%d",
currentHeight, update.blockHeight) currentHeight, update.blockHeight)
continue continue
} }
currentHeight = update.blockHeight - 1
chainntnfs.Log.Infof("Block disconnected from main chain: "+ chainntnfs.Log.Infof("Block disconnected from main chain: "+
"height=%v, sha=%v", update.blockHeight, update.blockHash) "height=%v, sha=%v", update.blockHeight, update.blockHash)