add a height channel to keep track of incoming merkleblocks

This commit is contained in:
Tadge Dryja 2016-01-19 14:23:18 -08:00
parent ece5dc9d2d
commit 58c8d32ac5
3 changed files with 47 additions and 14 deletions

@ -38,8 +38,11 @@ type SPVCon struct {
WBytes uint64 // total bytes written
RBytes uint64 // total bytes read
TS *TxStore
param *chaincfg.Params
TS *TxStore // transaction store to write to
param *chaincfg.Params // network parameters (testnet3, testnetL)
// mBlockQueue is for keeping track of what height we've requested.
mBlockQueue chan RootAndHeight
}
func OpenSPV(remoteNode string, hfn string,
@ -114,7 +117,7 @@ func OpenSPV(remoteNode string, hfn string,
go s.incomingMessageHandler()
s.outMsgQueue = make(chan wire.Message)
go s.outgoingMessageHandler()
s.mBlockQueue = make(chan RootAndHeight, 10) // queue of 10 requests? more?
return s, nil
}
@ -319,27 +322,52 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) {
return true, nil
}
// RootAndHeight is needed instead of just height in case a fullnode
// responds abnormally (?) by sending out of order merkleblocks.
// we cache a merkleroot:height pair in the queue so we don't have to
// look them up from the disk.
type RootAndHeight struct {
root wire.ShaHash
height uint32
}
// NewRootAndHeight saves like 2 lines.
func NewRootAndHeight(r wire.ShaHash, h uint32) (rah RootAndHeight) {
rah.root = r
rah.height = h
return
}
// AskForMerkBlocks requests blocks from current to last
// right now this asks for 1 block per getData message.
// Maybe it's faster to ask for many in a each message?
func (s *SPVCon) AskForMerkBlocks(current, last uint32) error {
var hdr wire.BlockHeader
_, err := s.headerFile.Seek(int64(current*80), os.SEEK_SET)
if err != nil {
return err
}
// loop through all heights where we want merkleblocks.
for current < last {
// load header from file
err = hdr.Deserialize(s.headerFile)
if err != nil {
return err
}
current++
bHash := hdr.BlockSha()
// create inventory we're asking for
iv1 := wire.NewInvVect(wire.InvTypeFilteredBlock, &bHash)
gdataMsg := wire.NewMsgGetData()
// add inventory
err = gdataMsg.AddInvVect(iv1)
if err != nil {
return err
}
rah := NewRootAndHeight(hdr.MerkleRoot, current)
s.outMsgQueue <- gdataMsg
s.mBlockQueue <- rah // push height and mroot of requested block on queue
current++
}
return nil

@ -43,22 +43,26 @@ func (s *SPVCon) incomingMessageHandler() {
fmt.Printf(" got %d txs ", len(txids))
// fmt.Printf(" = got %d txs from block %s\n",
// len(txids), m.Header.BlockSha().String())
var height uint32
if len(txids) > 0 {
// make sure block is in our store before adding txs
height, err = s.HeightFromHeader(m.Header)
if err != nil {
log.Printf("Merkle block height error: %s\n", err.Error())
continue
}
// var height uint32
// if len(txids) > 0 {
// make sure block is in our store before adding txs
// height, err = s.HeightFromHeader(m.Header)
// height = 20000
// if err != nil {
//log.Printf("Merkle block height error: %s\n", err.Error())
// continue
// }
// }
rah := <-s.mBlockQueue // pop height off mblock queue
if !rah.root.IsEqual(&m.Header.MerkleRoot) {
log.Printf("out of order error")
}
for _, txid := range txids {
err := s.TS.AddTxid(txid, height)
err := s.TS.AddTxid(txid, rah.height)
if err != nil {
log.Printf("Txid store error: %s\n", err.Error())
}
}
// nextReq <- true
case *wire.MsgHeaders:
moar, err := s.IngestHeaders(m)

@ -51,6 +51,7 @@ func (t *TxStore) AddTxid(txid *wire.ShaHash, height uint32) error {
if txid == nil {
return fmt.Errorf("tried to add nil txid")
}
log.Printf("added %s at height %d\n", txid.String(), height)
t.OKTxids[*txid] = height
return nil
}