Merge pull request from cfromknecht/chainview-filter-locking

[routing/chainview] improve filter locking granularity in btcd/bitcoind
This commit is contained in:
Olaoluwa Osuntokun 2018-07-10 17:19:36 -07:00 committed by GitHub
commit 16df90fc23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 24 deletions
routing/chainview

@ -154,6 +154,7 @@ func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
hash chainhash.Hash, txns []*wtxmgr.TxRecord) { hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
mtxs := make([]*wire.MsgTx, len(txns)) mtxs := make([]*wire.MsgTx, len(txns))
b.filterMtx.Lock()
for i, tx := range txns { for i, tx := range txns {
mtxs[i] = &tx.MsgTx mtxs[i] = &tx.MsgTx
@ -164,12 +165,11 @@ func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
// that's okay since it would never be wise to consider // that's okay since it would never be wise to consider
// the channel open again (since a spending transaction // the channel open again (since a spending transaction
// exists on the network). // exists on the network).
b.filterMtx.Lock()
delete(b.chainFilter, txIn.PreviousOutPoint) delete(b.chainFilter, txIn.PreviousOutPoint)
b.filterMtx.Unlock()
} }
} }
b.filterMtx.Unlock()
// We record the height of the last connected block added to the // We record the height of the last connected block added to the
// blockQueue such that we can scan up to this height in case of // blockQueue such that we can scan up to this height in case of
@ -246,19 +246,29 @@ func (b *BitcoindFilteredChainView) chainFilterer() {
// watched. Additionally, the chain filter will also be updated by // watched. Additionally, the chain filter will also be updated by
// removing any spent outputs. // removing any spent outputs.
filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx { filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
b.filterMtx.Lock()
defer b.filterMtx.Unlock()
var filteredTxns []*wire.MsgTx var filteredTxns []*wire.MsgTx
for _, tx := range blk.Transactions { for _, tx := range blk.Transactions {
var txAlreadyFiltered bool
for _, txIn := range tx.TxIn { for _, txIn := range tx.TxIn {
prevOp := txIn.PreviousOutPoint prevOp := txIn.PreviousOutPoint
if _, ok := b.chainFilter[prevOp]; ok { if _, ok := b.chainFilter[prevOp]; !ok {
filteredTxns = append(filteredTxns, tx) continue
b.filterMtx.Lock()
delete(b.chainFilter, prevOp)
b.filterMtx.Unlock()
break
} }
delete(b.chainFilter, prevOp)
// Only add this txn to our list of filtered
// txns if it is the first previous outpoint to
// cause a match.
if txAlreadyFiltered {
continue
}
filteredTxns = append(filteredTxns, tx)
txAlreadyFiltered = true
} }
} }
@ -304,11 +314,12 @@ func (b *BitcoindFilteredChainView) chainFilterer() {
// process. // process.
log.Debugf("Updating chain filter with new UTXO's: %v", log.Debugf("Updating chain filter with new UTXO's: %v",
update.newUtxos) update.newUtxos)
b.filterMtx.Lock()
for _, newOp := range update.newUtxos { for _, newOp := range update.newUtxos {
b.filterMtx.Lock()
b.chainFilter[newOp] = struct{}{} b.chainFilter[newOp] = struct{}{}
b.filterMtx.Unlock()
} }
b.filterMtx.Unlock()
// Apply the new TX filter to the chain client, which // Apply the new TX filter to the chain client, which
// will cause all following notifications from and // will cause all following notifications from and

@ -153,6 +153,7 @@ func (b *BtcdFilteredChainView) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, txns []*btcutil.Tx) { header *wire.BlockHeader, txns []*btcutil.Tx) {
mtxs := make([]*wire.MsgTx, len(txns)) mtxs := make([]*wire.MsgTx, len(txns))
b.filterMtx.Lock()
for i, tx := range txns { for i, tx := range txns {
mtx := tx.MsgTx() mtx := tx.MsgTx()
mtxs[i] = mtx mtxs[i] = mtx
@ -164,12 +165,11 @@ func (b *BtcdFilteredChainView) onFilteredBlockConnected(height int32,
// that's okay since it would never be wise to consider // that's okay since it would never be wise to consider
// the channel open again (since a spending transaction // the channel open again (since a spending transaction
// exists on the network). // exists on the network).
b.filterMtx.Lock()
delete(b.chainFilter, txIn.PreviousOutPoint) delete(b.chainFilter, txIn.PreviousOutPoint)
b.filterMtx.Unlock()
} }
} }
b.filterMtx.Unlock()
// We record the height of the last connected block added to the // We record the height of the last connected block added to the
// blockQueue such that we can scan up to this height in case of // blockQueue such that we can scan up to this height in case of
@ -254,19 +254,30 @@ func (b *BtcdFilteredChainView) chainFilterer() {
// watched. Additionally, the chain filter will also be updated by // watched. Additionally, the chain filter will also be updated by
// removing any spent outputs. // removing any spent outputs.
filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx { filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
b.filterMtx.Lock()
defer b.filterMtx.Unlock()
var filteredTxns []*wire.MsgTx var filteredTxns []*wire.MsgTx
for _, tx := range blk.Transactions { for _, tx := range blk.Transactions {
var txAlreadyFiltered bool
for _, txIn := range tx.TxIn { for _, txIn := range tx.TxIn {
prevOp := txIn.PreviousOutPoint prevOp := txIn.PreviousOutPoint
if _, ok := b.chainFilter[prevOp]; ok { if _, ok := b.chainFilter[prevOp]; !ok {
filteredTxns = append(filteredTxns, tx) continue
b.filterMtx.Lock()
delete(b.chainFilter, prevOp)
b.filterMtx.Unlock()
break
} }
delete(b.chainFilter, prevOp)
// Only add this txn to our list of filtered
// txns if it is the first previous outpoint to
// cause a match.
if txAlreadyFiltered {
continue
}
filteredTxns = append(filteredTxns, tx)
txAlreadyFiltered = true
} }
} }
@ -312,11 +323,12 @@ func (b *BtcdFilteredChainView) chainFilterer() {
// process. // process.
log.Debugf("Updating chain filter with new UTXO's: %v", log.Debugf("Updating chain filter with new UTXO's: %v",
update.newUtxos) update.newUtxos)
b.filterMtx.Lock()
for _, newOp := range update.newUtxos { for _, newOp := range update.newUtxos {
b.filterMtx.Lock()
b.chainFilter[newOp] = struct{}{} b.chainFilter[newOp] = struct{}{}
b.filterMtx.Unlock()
} }
b.filterMtx.Unlock()
// Apply the new TX filter to btcd, which will cause // Apply the new TX filter to btcd, which will cause
// all following notifications from and calls to it // all following notifications from and calls to it