routing/chainview: implement staleBlocks for btcd view.

This commit moves btcd view away from using the deprecated
callbacks onBlockConnected/Disconnected, and instead use
onFilteredBlockConnected/disconnected.

This commit also implements the sending of disconnected blocks
over the staleBlocks channel. To send these blocks, the
blockEventQueue is used to ensure the ordering of blocks are
correctly kept.

It also changes the way filter updates are handled. Since we
now load the tx filter to the rpc server itself, we can call
RescanBlocks instead of manually filtering blocks. These
rescanned blocks are also added to the blockEventQueue,
ensuring the ordering is kept.
This commit is contained in:
Johan T. Halseth 2017-10-02 17:13:39 +02:00
parent e9c2f703d9
commit 082f012fcf
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -1,14 +1,17 @@
package chainview package chainview
import ( import (
"bytes"
"encoding/hex"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/roasbeef/btcd/btcjson"
"github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/rpcclient" "github.com/roasbeef/btcd/rpcclient"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
) )
// BtcdFilteredChainView is an implementation of the FilteredChainView // BtcdFilteredChainView is an implementation of the FilteredChainView
@ -17,34 +20,27 @@ type BtcdFilteredChainView struct {
started int32 started int32
stopped int32 stopped int32
// bestHash is the hash of the latest block in the main chain. // bestHeight is the height of the latest block added to the
bestHash chainhash.Hash // blockQueue from the onFilteredConnectedMethod. It is used to
// determine up to what height we would need to rescan in case
// bestHeight is the height of the latest block in the main chain. // of a filter update.
bestHeight int32 bestHeightMtx sync.Mutex
bestHeight uint32
btcdConn *rpcclient.Client btcdConn *rpcclient.Client
// newBlocks is the channel in which new filtered blocks are sent over. // blockEventQueue is the ordered queue used to keep the order
newBlocks chan *FilteredBlock // of connected and disconnected blocks sent to the reader of the
// chainView.
// staleBlocks is the channel in which blocks that have been blockQueue *blockEventQueue
// disconnected from the mainchain are sent over.
staleBlocks chan *FilteredBlock
// filterUpdates is a channel in which updates to the utxo filter // filterUpdates is a channel in which updates to the utxo filter
// attached to this instance are sent over. // attached to this instance are sent over.
filterUpdates chan filterUpdate filterUpdates chan filterUpdate
// The three field below are used to implement a synchronized queue
// that lets use instantly handle sent notifications without blocking
// the main websockets notification loop.
chainUpdates []*chainUpdate
chainUpdateSignal chan struct{}
chainUpdateMtx sync.Mutex
// chainFilter is the set of utox's that we're currently watching // chainFilter is the set of utox's that we're currently watching
// spends for within the chain. // spends for within the chain.
filterMtx sync.RWMutex
chainFilter map[wire.OutPoint]struct{} chainFilter map[wire.OutPoint]struct{}
// filterBlockReqs is a channel in which requests to filter select // filterBlockReqs is a channel in which requests to filter select
@ -63,18 +59,15 @@ var _ FilteredChainView = (*BtcdFilteredChainView)(nil)
// RPC credentials for an active btcd instance. // RPC credentials for an active btcd instance.
func NewBtcdFilteredChainView(config rpcclient.ConnConfig) (*BtcdFilteredChainView, error) { func NewBtcdFilteredChainView(config rpcclient.ConnConfig) (*BtcdFilteredChainView, error) {
chainView := &BtcdFilteredChainView{ chainView := &BtcdFilteredChainView{
newBlocks: make(chan *FilteredBlock), chainFilter: make(map[wire.OutPoint]struct{}),
staleBlocks: make(chan *FilteredBlock), filterUpdates: make(chan filterUpdate),
chainUpdateSignal: make(chan struct{}), filterBlockReqs: make(chan *filterBlockReq),
chainFilter: make(map[wire.OutPoint]struct{}), quit: make(chan struct{}),
filterUpdates: make(chan filterUpdate),
filterBlockReqs: make(chan *filterBlockReq),
quit: make(chan struct{}),
} }
ntfnCallbacks := &rpcclient.NotificationHandlers{ ntfnCallbacks := &rpcclient.NotificationHandlers{
OnBlockConnected: chainView.onBlockConnected, OnFilteredBlockConnected: chainView.onFilteredBlockConnected,
OnBlockDisconnected: chainView.onBlockDisconnected, OnFilteredBlockDisconnected: chainView.onFilteredBlockDisconnected,
} }
// Disable connecting to btcd within the rpcclient.New method. We // Disable connecting to btcd within the rpcclient.New method. We
@ -87,6 +80,8 @@ func NewBtcdFilteredChainView(config rpcclient.ConnConfig) (*BtcdFilteredChainVi
} }
chainView.btcdConn = chainConn chainView.btcdConn = chainConn
chainView.blockQueue = newBlockEventQueue()
return chainView, nil return chainView, nil
} }
@ -110,12 +105,16 @@ func (b *BtcdFilteredChainView) Start() error {
return err return err
} }
bestHash, bestHeight, err := b.btcdConn.GetBestBlock() _, bestHeight, err := b.btcdConn.GetBestBlock()
if err != nil { if err != nil {
return err return err
} }
b.bestHash, b.bestHeight = *bestHash, bestHeight b.bestHeightMtx.Lock()
b.bestHeight = uint32(bestHeight)
b.bestHeightMtx.Unlock()
b.blockQueue.Start()
b.wg.Add(1) b.wg.Add(1)
go b.chainFilterer() go b.chainFilterer()
@ -137,6 +136,8 @@ func (b *BtcdFilteredChainView) Stop() error {
// cleans up all related resources. // cleans up all related resources.
b.btcdConn.Shutdown() b.btcdConn.Shutdown()
b.blockQueue.Stop()
log.Infof("FilteredChainView stopping") log.Infof("FilteredChainView stopping")
close(b.quit) close(b.quit)
@ -145,39 +146,68 @@ func (b *BtcdFilteredChainView) Stop() error {
return nil return nil
} }
// chainUpdate encapsulates an update to the current main chain. This struct is // onFilteredBlockConnected is called for each block that's connected to the
// used as an element within an unbounded queue in order to avoid blocking the // end of the main chain. Based on our current chain filter, the block may or
// main rpc dispatch rule. // may not include any relevant transactions.
type chainUpdate struct { func (b *BtcdFilteredChainView) onFilteredBlockConnected(height int32,
blockHash *chainhash.Hash header *wire.BlockHeader, txns []*btcutil.Tx) {
blockHeight int32
mtxs := make([]*wire.MsgTx, len(txns))
for i, tx := range txns {
mtx := tx.MsgTx()
mtxs[i] = mtx
for _, txIn := range mtx.TxIn {
// We can delete this outpoint from the chainFilter, as
// we just received a block where it was spent. In case
// of a reorg, this outpoint might get "un-spent", but
// that's okay since it would never be wise to consider
// the channel open again (since a spending transaction
// exists on the network).
b.filterMtx.Lock()
delete(b.chainFilter, txIn.PreviousOutPoint)
b.filterMtx.Unlock()
}
}
// We record the height of the last connected block added to the
// blockQueue such that we can scan up to this height in case of
// a rescan. It must be protected by a mutex since a filter update
// might be trying to read it concurrently.
b.bestHeightMtx.Lock()
b.bestHeight = uint32(height)
b.bestHeightMtx.Unlock()
block := &FilteredBlock{
Hash: header.BlockHash(),
Height: uint32(height),
Transactions: mtxs,
}
b.blockQueue.Add(&blockEvent{
eventType: connected,
block: block,
})
} }
// onBlockConnected implements on OnBlockConnected callback for rpcclient. // onFilteredBlockDisconnected is a callback which is executed once a block is
// Ingesting a block updates the wallet's internal utxo state based on the // disconnected from the end of the main chain.
// outputs created and destroyed within each block. func (b *BtcdFilteredChainView) onFilteredBlockDisconnected(height int32,
func (b *BtcdFilteredChainView) onBlockConnected(hash *chainhash.Hash, header *wire.BlockHeader) {
height int32, t time.Time) {
// Append this new chain update to the end of the queue of new chain log.Debugf("got disconnected block at height %d: %v", height,
// updates. header.BlockHash())
b.chainUpdateMtx.Lock()
b.chainUpdates = append(b.chainUpdates, &chainUpdate{hash, height})
b.chainUpdateMtx.Unlock()
// Launch a goroutine to signal the notification dispatcher that a new filteredBlock := &FilteredBlock{
// block update is available. We do this in a new goroutine in order to Hash: header.BlockHash(),
// avoid blocking the main loop of the rpc client. Height: uint32(height),
go func() { }
b.chainUpdateSignal <- struct{}{}
}()
}
// onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. b.blockQueue.Add(&blockEvent{
func (b *BtcdFilteredChainView) onBlockDisconnected(hash *chainhash.Hash, eventType: disconnected,
height int32, t time.Time) { block: filteredBlock,
})
// TODO(roasbeef): impl
} }
// filterBlockReq houses a request to manually filter a block specified by // filterBlockReq houses a request to manually filter a block specified by
@ -231,7 +261,9 @@ func (b *BtcdFilteredChainView) chainFilterer() {
if _, ok := b.chainFilter[prevOp]; ok { if _, ok := b.chainFilter[prevOp]; ok {
filteredTxns = append(filteredTxns, tx) filteredTxns = append(filteredTxns, tx)
b.filterMtx.Lock()
delete(b.chainFilter, prevOp) delete(b.chainFilter, prevOp)
b.filterMtx.Unlock()
break break
} }
@ -241,87 +273,118 @@ func (b *BtcdFilteredChainView) chainFilterer() {
return filteredTxns return filteredTxns
} }
decodeJSONBlock := func(block *btcjson.RescannedBlock,
height uint32) (*FilteredBlock, error) {
hash, err := chainhash.NewHashFromStr(block.Hash)
if err != nil {
return nil, err
}
txs := make([]*wire.MsgTx, 0, len(block.Transactions))
for _, str := range block.Transactions {
b, err := hex.DecodeString(str)
if err != nil {
return nil, err
}
tx := &wire.MsgTx{}
err = tx.Deserialize(bytes.NewReader(b))
if err != nil {
return nil, err
}
txs = append(txs, tx)
}
return &FilteredBlock{
Hash: *hash,
Height: height,
Transactions: txs,
}, nil
}
for { for {
select { select {
// A new block has been connected to the end of the main chain.
// So we'll need to dispatch a new FilteredBlock notification.
case <-b.chainUpdateSignal:
// A new update is available, so pop the new chain
// update from the front of the update queue.
b.chainUpdateMtx.Lock()
update := b.chainUpdates[0]
b.chainUpdates[0] = nil // Set to nil to prevent GC leak.
b.chainUpdates = b.chainUpdates[1:]
b.chainUpdateMtx.Unlock()
// Now that we have the new block has, fetch the new
// block itself.
newBlock, err := b.btcdConn.GetBlock(update.blockHash)
if err != nil {
log.Errorf("Unable to get block: %v", err)
continue
}
b.bestHash, b.bestHeight = *update.blockHash, update.blockHeight
// Next, we'll scan this block to see if it modified
// any of the UTXO set that we're watching.
filteredTxns := filterBlock(newBlock)
// Finally, launch a goroutine to dispatch this
// filtered block notification.
go func() {
b.newBlocks <- &FilteredBlock{
Hash: *update.blockHash,
Height: uint32(update.blockHeight),
Transactions: filteredTxns,
}
}()
// The caller has just sent an update to the current chain // The caller has just sent an update to the current chain
// filter, so we'll apply the update, possibly rewinding our // filter, so we'll apply the update, possibly rewinding our
// state partially. // state partially.
case update := <-b.filterUpdates: case update := <-b.filterUpdates:
// First, we'll add all the new UTXO's to the set of // First, we'll add all the new UTXO's to the set of
// watched UTXO's, eliminating any duplicates in the // watched UTXO's, eliminating any duplicates in the
// process. // process.
log.Debugf("Updating chain filter with new UTXO's: %v", log.Debugf("Updating chain filter with new UTXO's: %v",
update.newUtxos) update.newUtxos)
for _, newOp := range update.newUtxos { for _, newOp := range update.newUtxos {
b.filterMtx.Lock()
b.chainFilter[newOp] = struct{}{} b.chainFilter[newOp] = struct{}{}
b.filterMtx.Unlock()
} }
// Apply the new TX filter to btcd, which will cause
// all following notifications from and calls to it
// return blocks filtered with the new filter.
b.btcdConn.LoadTxFilter(false, []btcutil.Address{},
update.newUtxos)
// All blocks gotten after we loaded the filter will
// have the filter applied, but we will need to rescan
// the blocks up to the height of the block we last
// added to the blockQueue.
b.bestHeightMtx.Lock()
bestHeight := b.bestHeight
b.bestHeightMtx.Unlock()
// If the update height matches our best known height, // If the update height matches our best known height,
// then we don't need to do any rewinding. // then we don't need to do any rewinding.
if update.updateHeight == uint32(b.bestHeight) { if update.updateHeight == bestHeight {
continue continue
} }
// Otherwise, we'll rewind the state to ensure the // Otherwise, we'll rewind the state to ensure the
// caller doesn't miss any relevant notifications. // caller doesn't miss any relevant notifications.
// Starting from the height _after_ the update height, // Starting from the height _after_ the update height,
// we'll walk forwards, manually filtering blocks. // we'll walk forwards, rescanning one block at a time
for i := int32(update.updateHeight) + 1; i < b.bestHeight+1; i++ { // with btcd applying the newly loaded filter to each
// block.
for i := update.updateHeight + 1; i < bestHeight+1; i++ {
blockHash, err := b.btcdConn.GetBlockHash(int64(i)) blockHash, err := b.btcdConn.GetBlockHash(int64(i))
if err != nil { if err != nil {
log.Errorf("Unable to get block hash: %v", err) log.Warnf("Unable to get block hash "+
"for block at height %d: %v",
i, err)
continue continue
} }
block, err := b.btcdConn.GetBlock(blockHash)
// To avoid dealing with the case where a reorg
// is happening while we rescan, we scan one
// block at a time, skipping blocks that might
// have gone missing.
rescanned, err := b.btcdConn.RescanBlocks(
[]chainhash.Hash{*blockHash})
if err != nil { if err != nil {
log.Errorf("Unable to get block: %v", err) log.Warnf("Unable to rescan block "+
"with hash %v at height %d: %v",
blockHash, i, err)
continue continue
} }
filteredTxns := filterBlock(block) // If no block was returned from the rescan,
// it means no maching transactions were found.
go func(height uint32) { if len(rescanned) != 1 {
b.newBlocks <- &FilteredBlock{ log.Debugf("no matching block found "+
Hash: *blockHash, "for rescan of hash %v",
Height: height, blockHash)
Transactions: filteredTxns, continue
} }
}(uint32(i)) decoded, err := decodeJSONBlock(
&rescanned[0], uint32(i))
if err != nil {
log.Errorf("Unable to decode block: %v",
err)
continue
}
b.blockQueue.Add(&blockEvent{
eventType: connected,
block: decoded,
})
} }
// We've received a new request to manually filter a block. // We've received a new request to manually filter a block.
@ -393,7 +456,7 @@ func (b *BtcdFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight u
// //
// NOTE: This is part of the FilteredChainView interface. // NOTE: This is part of the FilteredChainView interface.
func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
return b.newBlocks return b.blockQueue.newBlocks
} }
// DisconnectedBlocks returns a receive only channel which will be sent upon // DisconnectedBlocks returns a receive only channel which will be sent upon
@ -402,5 +465,5 @@ func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
// //
// NOTE: This is part of the FilteredChainView interface. // NOTE: This is part of the FilteredChainView interface.
func (b *BtcdFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock { func (b *BtcdFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
return b.staleBlocks return b.blockQueue.staleBlocks
} }