From 082f012fcf539dc94c098e9c6956bb8c2537791c Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 2 Oct 2017 17:13:39 +0200 Subject: [PATCH] routing/chainview: implement staleBlocks for btcd view. This commit moves btcd view away from using the deprecated callbacks onBlockConnected/Disconnected, and instead use onFilteredBlockConnected/disconnected. This commit also implements the sending of disconnected blocks over the staleBlocks channel. To send these blocks, the blockEventQueue is used to ensure the ordering of blocks are correctly kept. It also changes the way filter updates are handled. Since we now load the tx filter to the rpc server itself, we can call RescanBlocks instead of manually filtering blocks. These rescanned blocks are also added to the blockEventQueue, ensuring the ordering is kept. --- routing/chainview/btcd.go | 283 +++++++++++++++++++++++--------------- 1 file changed, 173 insertions(+), 110 deletions(-) diff --git a/routing/chainview/btcd.go b/routing/chainview/btcd.go index 41cc5736..04c4ce84 100644 --- a/routing/chainview/btcd.go +++ b/routing/chainview/btcd.go @@ -1,14 +1,17 @@ package chainview import ( + "bytes" + "encoding/hex" "fmt" "sync" "sync/atomic" - "time" + "github.com/roasbeef/btcd/btcjson" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/rpcclient" "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" ) // BtcdFilteredChainView is an implementation of the FilteredChainView @@ -17,34 +20,27 @@ type BtcdFilteredChainView struct { started int32 stopped int32 - // bestHash is the hash of the latest block in the main chain. - bestHash chainhash.Hash - - // bestHeight is the height of the latest block in the main chain. - bestHeight int32 + // bestHeight is the height of the latest block added to the + // blockQueue from the onFilteredConnectedMethod. It is used to + // determine up to what height we would need to rescan in case + // of a filter update. + bestHeightMtx sync.Mutex + bestHeight uint32 btcdConn *rpcclient.Client - // newBlocks is the channel in which new filtered blocks are sent over. - newBlocks chan *FilteredBlock - - // staleBlocks is the channel in which blocks that have been - // disconnected from the mainchain are sent over. - staleBlocks chan *FilteredBlock + // blockEventQueue is the ordered queue used to keep the order + // of connected and disconnected blocks sent to the reader of the + // chainView. + blockQueue *blockEventQueue // filterUpdates is a channel in which updates to the utxo filter // attached to this instance are sent over. filterUpdates chan filterUpdate - // The three field below are used to implement a synchronized queue - // that lets use instantly handle sent notifications without blocking - // the main websockets notification loop. - chainUpdates []*chainUpdate - chainUpdateSignal chan struct{} - chainUpdateMtx sync.Mutex - // chainFilter is the set of utox's that we're currently watching // spends for within the chain. + filterMtx sync.RWMutex chainFilter map[wire.OutPoint]struct{} // filterBlockReqs is a channel in which requests to filter select @@ -63,18 +59,15 @@ var _ FilteredChainView = (*BtcdFilteredChainView)(nil) // RPC credentials for an active btcd instance. func NewBtcdFilteredChainView(config rpcclient.ConnConfig) (*BtcdFilteredChainView, error) { chainView := &BtcdFilteredChainView{ - newBlocks: make(chan *FilteredBlock), - staleBlocks: make(chan *FilteredBlock), - chainUpdateSignal: make(chan struct{}), - chainFilter: make(map[wire.OutPoint]struct{}), - filterUpdates: make(chan filterUpdate), - filterBlockReqs: make(chan *filterBlockReq), - quit: make(chan struct{}), + chainFilter: make(map[wire.OutPoint]struct{}), + filterUpdates: make(chan filterUpdate), + filterBlockReqs: make(chan *filterBlockReq), + quit: make(chan struct{}), } ntfnCallbacks := &rpcclient.NotificationHandlers{ - OnBlockConnected: chainView.onBlockConnected, - OnBlockDisconnected: chainView.onBlockDisconnected, + OnFilteredBlockConnected: chainView.onFilteredBlockConnected, + OnFilteredBlockDisconnected: chainView.onFilteredBlockDisconnected, } // Disable connecting to btcd within the rpcclient.New method. We @@ -87,6 +80,8 @@ func NewBtcdFilteredChainView(config rpcclient.ConnConfig) (*BtcdFilteredChainVi } chainView.btcdConn = chainConn + chainView.blockQueue = newBlockEventQueue() + return chainView, nil } @@ -110,12 +105,16 @@ func (b *BtcdFilteredChainView) Start() error { return err } - bestHash, bestHeight, err := b.btcdConn.GetBestBlock() + _, bestHeight, err := b.btcdConn.GetBestBlock() if err != nil { return err } - b.bestHash, b.bestHeight = *bestHash, bestHeight + b.bestHeightMtx.Lock() + b.bestHeight = uint32(bestHeight) + b.bestHeightMtx.Unlock() + + b.blockQueue.Start() b.wg.Add(1) go b.chainFilterer() @@ -137,6 +136,8 @@ func (b *BtcdFilteredChainView) Stop() error { // cleans up all related resources. b.btcdConn.Shutdown() + b.blockQueue.Stop() + log.Infof("FilteredChainView stopping") close(b.quit) @@ -145,39 +146,68 @@ func (b *BtcdFilteredChainView) Stop() error { return nil } -// chainUpdate encapsulates an update to the current main chain. This struct is -// used as an element within an unbounded queue in order to avoid blocking the -// main rpc dispatch rule. -type chainUpdate struct { - blockHash *chainhash.Hash - blockHeight int32 +// onFilteredBlockConnected is called for each block that's connected to the +// end of the main chain. Based on our current chain filter, the block may or +// may not include any relevant transactions. +func (b *BtcdFilteredChainView) onFilteredBlockConnected(height int32, + header *wire.BlockHeader, txns []*btcutil.Tx) { + + mtxs := make([]*wire.MsgTx, len(txns)) + for i, tx := range txns { + mtx := tx.MsgTx() + mtxs[i] = mtx + + for _, txIn := range mtx.TxIn { + // We can delete this outpoint from the chainFilter, as + // we just received a block where it was spent. In case + // of a reorg, this outpoint might get "un-spent", but + // that's okay since it would never be wise to consider + // the channel open again (since a spending transaction + // exists on the network). + b.filterMtx.Lock() + delete(b.chainFilter, txIn.PreviousOutPoint) + b.filterMtx.Unlock() + } + + } + + // We record the height of the last connected block added to the + // blockQueue such that we can scan up to this height in case of + // a rescan. It must be protected by a mutex since a filter update + // might be trying to read it concurrently. + b.bestHeightMtx.Lock() + b.bestHeight = uint32(height) + b.bestHeightMtx.Unlock() + + block := &FilteredBlock{ + Hash: header.BlockHash(), + Height: uint32(height), + Transactions: mtxs, + } + + b.blockQueue.Add(&blockEvent{ + eventType: connected, + block: block, + }) } -// onBlockConnected implements on OnBlockConnected callback for rpcclient. -// Ingesting a block updates the wallet's internal utxo state based on the -// outputs created and destroyed within each block. -func (b *BtcdFilteredChainView) onBlockConnected(hash *chainhash.Hash, - height int32, t time.Time) { +// onFilteredBlockDisconnected is a callback which is executed once a block is +// disconnected from the end of the main chain. +func (b *BtcdFilteredChainView) onFilteredBlockDisconnected(height int32, + header *wire.BlockHeader) { - // Append this new chain update to the end of the queue of new chain - // updates. - b.chainUpdateMtx.Lock() - b.chainUpdates = append(b.chainUpdates, &chainUpdate{hash, height}) - b.chainUpdateMtx.Unlock() + log.Debugf("got disconnected block at height %d: %v", height, + header.BlockHash()) - // Launch a goroutine to signal the notification dispatcher that a new - // block update is available. We do this in a new goroutine in order to - // avoid blocking the main loop of the rpc client. - go func() { - b.chainUpdateSignal <- struct{}{} - }() -} + filteredBlock := &FilteredBlock{ + Hash: header.BlockHash(), + Height: uint32(height), + } -// onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. -func (b *BtcdFilteredChainView) onBlockDisconnected(hash *chainhash.Hash, - height int32, t time.Time) { - - // TODO(roasbeef): impl + b.blockQueue.Add(&blockEvent{ + eventType: disconnected, + block: filteredBlock, + }) } // filterBlockReq houses a request to manually filter a block specified by @@ -231,7 +261,9 @@ func (b *BtcdFilteredChainView) chainFilterer() { if _, ok := b.chainFilter[prevOp]; ok { filteredTxns = append(filteredTxns, tx) + b.filterMtx.Lock() delete(b.chainFilter, prevOp) + b.filterMtx.Unlock() break } @@ -241,87 +273,118 @@ func (b *BtcdFilteredChainView) chainFilterer() { return filteredTxns } + decodeJSONBlock := func(block *btcjson.RescannedBlock, + height uint32) (*FilteredBlock, error) { + hash, err := chainhash.NewHashFromStr(block.Hash) + if err != nil { + return nil, err + + } + txs := make([]*wire.MsgTx, 0, len(block.Transactions)) + for _, str := range block.Transactions { + b, err := hex.DecodeString(str) + if err != nil { + return nil, err + } + tx := &wire.MsgTx{} + err = tx.Deserialize(bytes.NewReader(b)) + if err != nil { + return nil, err + } + txs = append(txs, tx) + } + return &FilteredBlock{ + Hash: *hash, + Height: height, + Transactions: txs, + }, nil + } + for { select { - - // A new block has been connected to the end of the main chain. - // So we'll need to dispatch a new FilteredBlock notification. - case <-b.chainUpdateSignal: - // A new update is available, so pop the new chain - // update from the front of the update queue. - b.chainUpdateMtx.Lock() - update := b.chainUpdates[0] - b.chainUpdates[0] = nil // Set to nil to prevent GC leak. - b.chainUpdates = b.chainUpdates[1:] - b.chainUpdateMtx.Unlock() - - // Now that we have the new block has, fetch the new - // block itself. - newBlock, err := b.btcdConn.GetBlock(update.blockHash) - if err != nil { - log.Errorf("Unable to get block: %v", err) - continue - } - b.bestHash, b.bestHeight = *update.blockHash, update.blockHeight - - // Next, we'll scan this block to see if it modified - // any of the UTXO set that we're watching. - filteredTxns := filterBlock(newBlock) - - // Finally, launch a goroutine to dispatch this - // filtered block notification. - go func() { - b.newBlocks <- &FilteredBlock{ - Hash: *update.blockHash, - Height: uint32(update.blockHeight), - Transactions: filteredTxns, - } - }() - // The caller has just sent an update to the current chain // filter, so we'll apply the update, possibly rewinding our // state partially. case update := <-b.filterUpdates: + // First, we'll add all the new UTXO's to the set of // watched UTXO's, eliminating any duplicates in the // process. log.Debugf("Updating chain filter with new UTXO's: %v", update.newUtxos) for _, newOp := range update.newUtxos { + b.filterMtx.Lock() b.chainFilter[newOp] = struct{}{} + b.filterMtx.Unlock() } + // Apply the new TX filter to btcd, which will cause + // all following notifications from and calls to it + // return blocks filtered with the new filter. + b.btcdConn.LoadTxFilter(false, []btcutil.Address{}, + update.newUtxos) + + // All blocks gotten after we loaded the filter will + // have the filter applied, but we will need to rescan + // the blocks up to the height of the block we last + // added to the blockQueue. + b.bestHeightMtx.Lock() + bestHeight := b.bestHeight + b.bestHeightMtx.Unlock() + // If the update height matches our best known height, // then we don't need to do any rewinding. - if update.updateHeight == uint32(b.bestHeight) { + if update.updateHeight == bestHeight { continue } // Otherwise, we'll rewind the state to ensure the // caller doesn't miss any relevant notifications. // Starting from the height _after_ the update height, - // we'll walk forwards, manually filtering blocks. - for i := int32(update.updateHeight) + 1; i < b.bestHeight+1; i++ { + // we'll walk forwards, rescanning one block at a time + // with btcd applying the newly loaded filter to each + // block. + for i := update.updateHeight + 1; i < bestHeight+1; i++ { blockHash, err := b.btcdConn.GetBlockHash(int64(i)) if err != nil { - log.Errorf("Unable to get block hash: %v", err) + log.Warnf("Unable to get block hash "+ + "for block at height %d: %v", + i, err) continue } - block, err := b.btcdConn.GetBlock(blockHash) + + // To avoid dealing with the case where a reorg + // is happening while we rescan, we scan one + // block at a time, skipping blocks that might + // have gone missing. + rescanned, err := b.btcdConn.RescanBlocks( + []chainhash.Hash{*blockHash}) if err != nil { - log.Errorf("Unable to get block: %v", err) + log.Warnf("Unable to rescan block "+ + "with hash %v at height %d: %v", + blockHash, i, err) continue } - filteredTxns := filterBlock(block) - - go func(height uint32) { - b.newBlocks <- &FilteredBlock{ - Hash: *blockHash, - Height: height, - Transactions: filteredTxns, - } - }(uint32(i)) + // If no block was returned from the rescan, + // it means no maching transactions were found. + if len(rescanned) != 1 { + log.Debugf("no matching block found "+ + "for rescan of hash %v", + blockHash) + continue + } + decoded, err := decodeJSONBlock( + &rescanned[0], uint32(i)) + if err != nil { + log.Errorf("Unable to decode block: %v", + err) + continue + } + b.blockQueue.Add(&blockEvent{ + eventType: connected, + block: decoded, + }) } // We've received a new request to manually filter a block. @@ -393,7 +456,7 @@ func (b *BtcdFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight u // // NOTE: This is part of the FilteredChainView interface. func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { - return b.newBlocks + return b.blockQueue.newBlocks } // DisconnectedBlocks returns a receive only channel which will be sent upon @@ -402,5 +465,5 @@ func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { // // NOTE: This is part of the FilteredChainView interface. func (b *BtcdFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock { - return b.staleBlocks + return b.blockQueue.staleBlocks }