add waitgroups to ingestBlock

seems to go a little faster but not much.
making ingest tx take a slice of txs would be faster probably.
but it seems network i/o is the limiting factor so maybe it's OK
This commit is contained in:
Tadge Dryja 2016-02-07 19:52:45 -08:00
parent 4535b965cd
commit e2dd892a4f
6 changed files with 26 additions and 17 deletions

@ -25,7 +25,7 @@ testing. It can send and receive coins.
const (
keyFileName = "testkey.hex"
headerFileName = "headers.bin"
dbFileName = "utxo.db"
dbFileName = "/dev/shm/utxo.db"
// this is my local testnet node, replace it with your own close by.
// Random internet testnet nodes usually work but sometimes don't, so
// maybe I should test against different versions out there.
@ -51,7 +51,7 @@ func shell() {
// setup spvCon
SCon, err = uspv.OpenSPV(
SPVHostAdr, headerFileName, dbFileName, &Store, true, Params)
SPVHostAdr, headerFileName, dbFileName, &Store, false, false, Params)
if err != nil {
log.Fatal(err)
}

@ -303,7 +303,8 @@ func (s *SPVCon) AskForBlocks() error {
}
if dbTip == headerTip {
// nothing to ask for; set wait state and return
fmt.Printf("no merkle blocks to request, entering wait state\n")
fmt.Printf("no blocks to request, entering wait state\n")
fmt.Printf("%d bytes received\n", s.RBytes)
s.inWaitState <- true
// also advertise any unconfirmed txs here
s.Rebroadcast()

@ -4,6 +4,7 @@ import (
"fmt"
"log"
"os"
"sync"
"github.com/btcsuite/btcd/wire"
)
@ -22,7 +23,7 @@ func BlockRootOK(blk wire.MsgBlock) bool {
}
for len(shas) > 1 { // calculate merkle root. Terse, eh?
shas = append(shas[2:], MakeMerkleParent(shas[0], shas[1]))
}
} // auto recognizes coinbase-only blocks
return blk.Header.MerkleRoot.IsEqual(shas[0])
}
@ -55,17 +56,24 @@ func (s *SPVCon) IngestBlock(m *wire.MsgBlock) {
// iterate through all txs in the block, looking for matches.
// this is slow and can be sped up by doing in-ram filters client side.
// kindof a pain to implement though and it's fast enough for now.
var wg sync.WaitGroup
wg.Add(len(m.Transactions))
for i, tx := range m.Transactions {
hits, err := s.TS.Ingest(tx, hah.height)
if err != nil {
log.Printf("Incoming Tx error: %s\n", err.Error())
return
}
if hits > 0 {
log.Printf("block %d tx %d %s ingested and matches %d utxo/adrs.",
hah.height, i, tx.TxSha().String(), hits)
}
go func() {
hits, err := s.TS.Ingest(tx, hah.height)
if err != nil {
log.Printf("Incoming Tx error: %s\n", err.Error())
return
}
if hits > 0 {
log.Printf("block %d tx %d %s ingested and matches %d utxo/adrs.",
hah.height, i, tx.TxSha().String(), hits)
}
wg.Done()
}()
}
wg.Wait()
// write to db that we've sync'd to the height indicated in the
// merkle block. This isn't QUITE true since we haven't actually gotten

@ -13,10 +13,11 @@ import (
// OpenPV starts a
func OpenSPV(remoteNode string, hfn, dbfn string,
inTs *TxStore, hard bool, p *chaincfg.Params) (SPVCon, error) {
inTs *TxStore, hard bool, iron bool, p *chaincfg.Params) (SPVCon, error) {
// create new SPVCon
var s SPVCon
s.HardMode = hard
s.Ironman = iron
// I should really merge SPVCon and TxStore, they're basically the same
inTs.Param = p
s.TS = inTs // copy pointer of txstore into spvcon
@ -84,7 +85,7 @@ func OpenSPV(remoteNode string, hfn, dbfn string,
s.outMsgQueue = make(chan wire.Message)
go s.outgoingMessageHandler()
s.blockQueue = make(chan HashAndHeight, 32) // queue depth 32 is a thing
s.fPositives = make(chan int32, 4000) // a block full, approx
s.fPositives = make(chan int32, 4000) // a block full, approx
s.inWaitState = make(chan bool, 1)
go s.fPositiveHandler()

@ -85,7 +85,6 @@ func (s *SPVCon) fPositiveHandler() {
log.Printf("uhoh, crashing filter handler")
return
}
// send filter
s.SendFilter(filt)
fmt.Printf("sent filter %x\n", filt.MsgFilterLoad().Filter)

@ -350,7 +350,7 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) {
// tx has been OK'd by SPV; check tx sanity
utilTx := btcutil.NewTx(tx) // convert for validation
// checks stuff like inputs >= ouputs
// checks basic stuff like there are inputs and ouputs
err = blockchain.CheckTransactionSanity(utilTx)
if err != nil {
return hits, err