make it way faster

there were a lot of dumb things in Ingest() that made it really slow.
Like re-hashing the tx a bunch of times.  And re-saving it to the
db redundantly.

also added local bloom filters.  Maybe some concurrency issues if you
generate an address just as you're getting a tx with that address but
that doesn't seem like a real problem.  Cheap to rescan anyway.

So it's faster and works better.
This commit is contained in:
Tadge Dryja 2016-02-25 21:05:01 -08:00
parent 2c1f42d6fd
commit d2d37ce1ab
5 changed files with 91 additions and 42 deletions

@ -6,6 +6,8 @@ import (
"log" "log"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/bloom"
) )
var ( var (
@ -115,6 +117,27 @@ func calcRoot(hashes []*wire.ShaHash) *wire.ShaHash {
return hashes[0] return hashes[0]
} }
func (ts *TxStore) Refilter() error {
allUtxos, err := ts.GetAllUtxos()
if err != nil {
return err
}
filterElements := uint32(len(allUtxos) + len(ts.Adrs))
ts.localFilter = bloom.NewFilter(filterElements, 0, 0, wire.BloomUpdateAll)
for _, u := range allUtxos {
ts.localFilter.AddOutPoint(&u.Op)
}
for _, a := range ts.Adrs {
ts.localFilter.Add(a.PkhAdr.ScriptAddress())
}
msg := ts.localFilter.MsgFilterLoad()
fmt.Printf("made %d element filter: %x\n", filterElements, msg.Filter)
return nil
}
// IngestBlock is like IngestMerkleBlock but aralphic // IngestBlock is like IngestMerkleBlock but aralphic
// different enough that it's better to have 2 separate functions // different enough that it's better to have 2 separate functions
func (s *SPVCon) IngestBlock(m *wire.MsgBlock) { func (s *SPVCon) IngestBlock(m *wire.MsgBlock) {
@ -148,21 +171,39 @@ func (s *SPVCon) IngestBlock(m *wire.MsgBlock) {
return return
} }
fPositive := 0 // local filter false positives
reFilter := 10 // after that many false positives, regenerate filter.
// 10? Making it up. False positives have disk i/o cost, and regenning
// the filter also has costs. With a large local filter, false positives
// should be rare.
// iterate through all txs in the block, looking for matches. // iterate through all txs in the block, looking for matches.
// this is slow and can be sped up by doing in-ram filters client side. // use a local bloom filter to ignore txs that don't affect us
// kindof a pain to implement though and it's fast enough for now.
for i, tx := range m.Transactions { for i, tx := range m.Transactions {
hits, err := s.TS.Ingest(tx, hah.height) utilTx := btcutil.NewTx(tx)
if err != nil { if s.TS.localFilter.MatchTxAndUpdate(utilTx) {
log.Printf("Incoming Tx error: %s\n", err.Error()) hits, err := s.TS.Ingest(tx, hah.height)
return if err != nil {
} log.Printf("Incoming Tx error: %s\n", err.Error())
if hits > 0 { return
log.Printf("block %d tx %d %s ingested and matches %d utxo/adrs.", }
hah.height, i, tx.TxSha().String(), hits) if hits > 0 {
log.Printf("block %d tx %d %s ingested and matches %d utxo/adrs.",
hah.height, i, tx.TxSha().String(), hits)
} else {
fPositive++ // matched filter but no hits
}
} }
} }
if fPositive > reFilter {
fmt.Printf("%d filter false positives in this block\n", fPositive)
err = s.TS.Refilter()
if err != nil {
log.Printf("Refilter error: %s\n", err.Error())
return
}
}
// write to db that we've sync'd to the height indicated in the // write to db that we've sync'd to the height indicated in the
// merkle block. This isn't QUITE true since we haven't actually gotten // merkle block. This isn't QUITE true since we haven't actually gotten
// the txs yet but if there are problems with the txs we should backtrack. // the txs yet but if there are problems with the txs we should backtrack.
@ -171,6 +212,7 @@ func (s *SPVCon) IngestBlock(m *wire.MsgBlock) {
log.Printf("full block sync error: %s\n", err.Error()) log.Printf("full block sync error: %s\n", err.Error())
return return
} }
fmt.Printf("ingested full block %s height %d OK\n", fmt.Printf("ingested full block %s height %d OK\n",
m.Header.BlockSha().String(), hah.height) m.Header.BlockSha().String(), hah.height)

@ -91,6 +91,13 @@ func OpenSPV(remoteNode string, hfn, dbfn string,
s.inWaitState = make(chan bool, 1) s.inWaitState = make(chan bool, 1)
go s.fPositiveHandler() go s.fPositiveHandler()
if hard {
err = s.TS.Refilter()
if err != nil {
return s, err
}
}
return s, nil return s, nil
} }

@ -5,6 +5,7 @@ import (
"log" "log"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
) )
func (s *SPVCon) incomingMessageHandler() { func (s *SPVCon) incomingMessageHandler() {
@ -157,19 +158,22 @@ func (s *SPVCon) TxHandler(m *wire.MsgTx) {
// i, dub.String(), m.TxSha().String()) // i, dub.String(), m.TxSha().String())
// } // }
// } // }
hits, err := s.TS.Ingest(m, height) utilTx := btcutil.NewTx(m)
if err != nil { if !s.HardMode || s.TS.localFilter.MatchTxAndUpdate(utilTx) {
log.Printf("Incoming Tx error: %s\n", err.Error()) hits, err := s.TS.Ingest(m, height)
return if err != nil {
log.Printf("Incoming Tx error: %s\n", err.Error())
return
}
if hits == 0 && !s.HardMode {
log.Printf("tx %s had no hits, filter false positive.",
m.TxSha().String())
s.fPositives <- 1 // add one false positive to chan
return
}
log.Printf("tx %s ingested and matches %d utxo/adrs.",
m.TxSha().String(), hits)
} }
if hits == 0 && !s.HardMode {
log.Printf("tx %s had no hits, filter false positive.",
m.TxSha().String())
s.fPositives <- 1 // add one false positive to chan
return
}
log.Printf("tx %s ingested and matches %d utxo/adrs.",
m.TxSha().String(), hits)
} }
// GetDataHandler responds to requests for tx data, which happen after // GetDataHandler responds to requests for tx data, which happen after

@ -23,6 +23,8 @@ type TxStore struct {
Adrs []MyAdr // endeavouring to acquire capital Adrs []MyAdr // endeavouring to acquire capital
StateDB *bolt.DB // place to write all this down StateDB *bolt.DB // place to write all this down
localFilter *bloom.Filter // local bloom filter for hard mode
// Params live here, not SCon // Params live here, not SCon
Param *chaincfg.Params // network parameters (testnet3, testnetL) Param *chaincfg.Params // network parameters (testnet3, testnetL)

@ -125,7 +125,7 @@ func (ts *TxStore) NewAdr() (btcutil.Address, error) {
ma.KeyIdx = n ma.KeyIdx = n
ts.Adrs = append(ts.Adrs, ma) ts.Adrs = append(ts.Adrs, ma)
ts.localFilter.Add(ma.PkhAdr.ScriptAddress())
return nAdr, nil return nAdr, nil
} }
@ -399,16 +399,15 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) {
} }
} }
cachedSha := tx.TxSha()
// iterate through all outputs of this tx, see if we gain // iterate through all outputs of this tx, see if we gain
for i, out := range tx.TxOut { for i, out := range tx.TxOut {
for j, ascr := range aPKscripts { for j, ascr := range aPKscripts {
// detect p2wpkh // detect p2wpkh
witBool := false witBool := false
if bytes.Equal(out.PkScript, wPKscripts[j]) { if bytes.Equal(out.PkScript, wPKscripts[j]) {
witBool = true witBool = true
} }
if bytes.Equal(out.PkScript, ascr) || witBool { // new utxo found if bytes.Equal(out.PkScript, ascr) || witBool { // new utxo found
var newu Utxo // create new utxo and copy into it var newu Utxo // create new utxo and copy into it
newu.AtHeight = height newu.AtHeight = height
@ -416,7 +415,7 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) {
newu.Value = out.Value newu.Value = out.Value
newu.IsWit = witBool // copy witness version from pkscript newu.IsWit = witBool // copy witness version from pkscript
var newop wire.OutPoint var newop wire.OutPoint
newop.Hash = tx.TxSha() newop.Hash = cachedSha
newop.Index = uint32(i) newop.Index = uint32(i)
newu.Op = newop newu.Op = newop
b, err := newu.ToBytes() b, err := newu.ToBytes()
@ -459,7 +458,7 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) {
var st Stxo // generate spent txo var st Stxo // generate spent txo
st.Utxo = lostTxo // assign outpoint st.Utxo = lostTxo // assign outpoint
st.SpendHeight = height // spent at height st.SpendHeight = height // spent at height
st.SpendTxid = tx.TxSha() // spent by txid st.SpendTxid = cachedSha // spent by txid
stxb, err := st.ToBytes() // serialize stxb, err := st.ToBytes() // serialize
if err != nil { if err != nil {
return err return err
@ -468,14 +467,7 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) {
if err != nil { if err != nil {
return err return err
} }
// store this relevant tx
sha := tx.TxSha()
var buf bytes.Buffer
tx.SerializeWitness(&buf) // always store witness version
err = txns.Put(sha.Bytes(), buf.Bytes())
if err != nil {
return err
}
err = duf.Delete(nOP) err = duf.Delete(nOP)
if err != nil { if err != nil {
return err return err
@ -483,13 +475,6 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) {
} }
} }
//delete everything even if it doesn't exist!
// for _, dOP := range spentOPs {
// err = duf.Delete(dOP)
// if err != nil {
// return err
// }
// }
// done losing utxos, next gain utxos // done losing utxos, next gain utxos
// next add all new utxos to db, this is quick as the work is above // next add all new utxos to db, this is quick as the work is above
for _, ub := range nUtxoBytes { for _, ub := range nUtxoBytes {
@ -498,6 +483,15 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) {
return err return err
} }
} }
// if hits is nonzero it's a relevant tx and we should store it
var buf bytes.Buffer
tx.SerializeWitness(&buf) // always store witness version
err = txns.Put(cachedSha.Bytes(), buf.Bytes())
if err != nil {
return err
}
return nil return nil
}) })
return hits, err return hits, err