incoming invs and tx signing

when we get inv messages, txs are easy but blocks are a little tricky
block heigh synchronization is done via os' file system
signing is a bit inelegant (searches through inputs for pkscript matches)
but fast enough in practice
This commit is contained in:
Tadge Dryja 2016-01-22 16:04:27 -08:00
parent 01c35e62ab
commit 5a7c04bdb5
6 changed files with 170 additions and 62 deletions

@ -27,9 +27,11 @@ type SPVCon struct {
con net.Conn // the (probably tcp) connection to the node con net.Conn // the (probably tcp) connection to the node
headerFile *os.File // file for SPV headers headerFile *os.File // file for SPV headers
//[doesn't work without fancy mutexes, nevermind, just use header file]
// localHeight int32 // block height we're on
remoteHeight int32 // block height they're on
localVersion uint32 // version we report localVersion uint32 // version we report
remoteVersion uint32 // version remote node remoteVersion uint32 // version remote node
remoteHeight int32 // block height they're on
// what's the point of the input queue? remove? leave for now... // what's the point of the input queue? remove? leave for now...
inMsgQueue chan wire.Message // Messages coming in from remote node inMsgQueue chan wire.Message // Messages coming in from remote node
@ -42,7 +44,7 @@ type SPVCon struct {
param *chaincfg.Params // network parameters (testnet3, testnetL) param *chaincfg.Params // network parameters (testnet3, testnetL)
// mBlockQueue is for keeping track of what height we've requested. // mBlockQueue is for keeping track of what height we've requested.
mBlockQueue chan RootAndHeight mBlockQueue chan HashAndHeight
} }
func OpenSPV(remoteNode string, hfn, tsfn string, func OpenSPV(remoteNode string, hfn, tsfn string,
@ -125,7 +127,7 @@ func OpenSPV(remoteNode string, hfn, tsfn string,
go s.incomingMessageHandler() go s.incomingMessageHandler()
s.outMsgQueue = make(chan wire.Message, 1) s.outMsgQueue = make(chan wire.Message, 1)
go s.outgoingMessageHandler() go s.outgoingMessageHandler()
s.mBlockQueue = make(chan RootAndHeight, 32) // queue depth 32 is a thing s.mBlockQueue = make(chan HashAndHeight, 32) // queue depth 32 is a thing
return s, nil return s, nil
} }
@ -207,6 +209,36 @@ func (s *SPVCon) HeightFromHeader(query wire.BlockHeader) (uint32, error) {
return 0, fmt.Errorf("Header not found on disk") return 0, fmt.Errorf("Header not found on disk")
} }
// AskForTx requests a tx we heard about from an inv message.
// It's one at a time but should be fast enough.
func (s *SPVCon) AskForTx(txid wire.ShaHash) {
gdata := wire.NewMsgGetData()
inv := wire.NewInvVect(wire.InvTypeTx, &txid)
gdata.AddInvVect(inv)
s.outMsgQueue <- gdata
}
// AskForBlock requests a merkle block we heard about from an inv message.
// We don't have it in our header file so when we get it we do both operations:
// appending and checking the header, and checking spv proofs
func (s *SPVCon) AskForBlock(hsh wire.ShaHash) {
gdata := wire.NewMsgGetData()
inv := wire.NewInvVect(wire.InvTypeFilteredBlock, &hsh)
gdata.AddInvVect(inv)
info, err := s.headerFile.Stat() // get
if err != nil {
log.Fatal(err) // crash if header file disappears
}
nextHeight := int32(info.Size() / 80)
hah := NewRootAndHeight(hsh, nextHeight)
s.mBlockQueue <- hah // push height and mroot of requested block on queue
s.outMsgQueue <- gdata // push request to outbox
}
func (s *SPVCon) AskForHeaders() error { func (s *SPVCon) AskForHeaders() error {
var hdr wire.BlockHeader var hdr wire.BlockHeader
ghdr := wire.NewMsgGetHeaders() ghdr := wire.NewMsgGetHeaders()
@ -229,6 +261,7 @@ func (s *SPVCon) AskForHeaders() error {
} }
log.Printf("suk to offset %d (should be near the end\n", ns) log.Printf("suk to offset %d (should be near the end\n", ns)
// get header from last 80 bytes of file // get header from last 80 bytes of file
err = hdr.Deserialize(s.headerFile) err = hdr.Deserialize(s.headerFile)
if err != nil { if err != nil {
@ -250,6 +283,31 @@ func (s *SPVCon) AskForHeaders() error {
return nil return nil
} }
func (s *SPVCon) IngestMerkleBlock(m *wire.MsgMerkleBlock) error {
txids, err := checkMBlock(m) // check self-consistency
if err != nil {
return err
}
hah := <-s.mBlockQueue // pop height off mblock queue
// this verifies order, and also that the returned header fits
// into our SPV header file
newMerkBlockSha := m.Header.BlockSha()
if !hah.blockhash.IsEqual(&newMerkBlockSha) {
return fmt.Errorf("merkle block out of order error")
}
for _, txid := range txids {
err := s.TS.AddTxid(txid, hah.height)
if err != nil {
return fmt.Errorf("Txid store error: %s\n", err.Error())
}
}
return nil
}
// IngestHeaders takes in a bunch of headers and appends them to the
// local header file, checking that they fit. If there's no headers,
// it assumes we're done and returns false. If it worked it assumes there's
// more to request and returns true.9
func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) {
var err error var err error
// seek to last header // seek to last header
@ -264,6 +322,12 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) {
} }
prevHash := last.BlockSha() prevHash := last.BlockSha()
endPos, err := s.headerFile.Seek(0, os.SEEK_END)
if err != nil {
return false, err
}
tip := int32(endPos/80) - 1 // move back 1 header length to read
gotNum := int64(len(m.Headers)) gotNum := int64(len(m.Headers))
if gotNum > 0 { if gotNum > 0 {
fmt.Printf("got %d headers. Range:\n%s - %s\n", fmt.Printf("got %d headers. Range:\n%s - %s\n",
@ -273,17 +337,11 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) {
log.Printf("got 0 headers, we're probably synced up") log.Printf("got 0 headers, we're probably synced up")
return false, nil return false, nil
} }
endPos, err := s.headerFile.Seek(0, os.SEEK_END)
if err != nil {
return false, err
}
// check first header returned to make sure it fits on the end // check first header returned to make sure it fits on the end
// of our header file // of our header file
if !m.Headers[0].PrevBlock.IsEqual(&prevHash) { if !m.Headers[0].PrevBlock.IsEqual(&prevHash) {
// delete 100 headers if this happens! Dumb reorg. // delete 100 headers if this happens! Dumb reorg.
log.Printf("possible reorg; header msg doesn't fit. points to %s, expect %s", log.Printf("reorg? header msg doesn't fit. points to %s, expect %s",
m.Headers[0].PrevBlock.String(), prevHash.String()) m.Headers[0].PrevBlock.String(), prevHash.String())
if endPos < 8080 { if endPos < 8080 {
// jeez I give up, back to genesis // jeez I give up, back to genesis
@ -297,8 +355,6 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) {
return false, fmt.Errorf("Truncated header file to try again") return false, fmt.Errorf("Truncated header file to try again")
} }
tip := endPos / 80
tip-- // move back header length so it can read last header
for _, resphdr := range m.Headers { for _, resphdr := range m.Headers {
// write to end of file // write to end of file
err = resphdr.Serialize(s.headerFile) err = resphdr.Serialize(s.headerFile)
@ -328,22 +384,25 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) {
} }
} }
log.Printf("Headers to height %d OK.", tip) log.Printf("Headers to height %d OK.", tip)
return true, nil return true, nil
} }
// RootAndHeight is needed instead of just height in case a fullnode // HashAndHeight is needed instead of just height in case a fullnode
// responds abnormally (?) by sending out of order merkleblocks. // responds abnormally (?) by sending out of order merkleblocks.
// we cache a merkleroot:height pair in the queue so we don't have to // we cache a merkleroot:height pair in the queue so we don't have to
// look them up from the disk. // look them up from the disk.
type RootAndHeight struct { // Also used when inv messages indicate blocks so we can add the header
root wire.ShaHash // and parse the txs in one request instead of requesting headers first.
height int32 type HashAndHeight struct {
blockhash wire.ShaHash
height int32
} }
// NewRootAndHeight saves like 2 lines. // NewRootAndHeight saves like 2 lines.
func NewRootAndHeight(r wire.ShaHash, h int32) (rah RootAndHeight) { func NewRootAndHeight(b wire.ShaHash, h int32) (hah HashAndHeight) {
rah.root = r hah.blockhash = b
rah.height = h hah.height = h
return return
} }
@ -366,15 +425,17 @@ func (s *SPVCon) PushTx(tx *wire.MsgTx) error {
// Maybe it's faster to ask for many in a each message? // Maybe it's faster to ask for many in a each message?
func (s *SPVCon) AskForMerkBlocks(current, last int32) error { func (s *SPVCon) AskForMerkBlocks(current, last int32) error {
var hdr wire.BlockHeader var hdr wire.BlockHeader
info, err := s.headerFile.Stat() // get
if err != nil {
return err // crash if header file disappears
}
nextHeight := int32(info.Size() / 80)
fmt.Printf("have headers up to height %d\n", nextHeight-1)
// if last is 0, that means go as far as we can // if last is 0, that means go as far as we can
if last == 0 { if last == 0 {
n, err := s.headerFile.Seek(0, os.SEEK_END) last = nextHeight - 1
if err != nil {
return err
}
last = int32(n / 80)
} }
fmt.Printf("will request merkleblocks %d to %d\n", current, last)
// track number of utxos // track number of utxos
track := len(s.TS.Utxos) track := len(s.TS.Utxos)
// create initial filter // create initial filter
@ -419,9 +480,9 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error {
if err != nil { if err != nil {
return err return err
} }
rah := NewRootAndHeight(hdr.MerkleRoot, current) hah := NewRootAndHeight(hdr.BlockSha(), current)
s.outMsgQueue <- gdataMsg s.outMsgQueue <- gdataMsg
s.mBlockQueue <- rah // push height and mroot of requested block on queue s.mBlockQueue <- hah // push height and mroot of requested block on queue
current++ current++
} }
return nil return nil

@ -23,7 +23,7 @@ import (
const ( const (
targetTimespan = time.Hour * 24 * 14 targetTimespan = time.Hour * 24 * 14
targetSpacing = time.Minute * 10 targetSpacing = time.Minute * 10
epochLength = int64(targetTimespan / targetSpacing) epochLength = int32(targetTimespan / targetSpacing) // 2016
maxDiffAdjust = 4 maxDiffAdjust = 4
minRetargetTimespan = int64(targetTimespan / maxDiffAdjust) minRetargetTimespan = int64(targetTimespan / maxDiffAdjust)
maxRetargetTimespan = int64(targetTimespan * maxDiffAdjust) maxRetargetTimespan = int64(targetTimespan * maxDiffAdjust)
@ -90,7 +90,7 @@ func calcDiffAdjust(start, end wire.BlockHeader, p *chaincfg.Params) uint32 {
return blockchain.BigToCompact(newTarget) return blockchain.BigToCompact(newTarget)
} }
func CheckHeader(r io.ReadSeeker, height int64, p *chaincfg.Params) bool { func CheckHeader(r io.ReadSeeker, height int32, p *chaincfg.Params) bool {
var err error var err error
var cur, prev, epochStart wire.BlockHeader var cur, prev, epochStart wire.BlockHeader
// don't try to verfy the genesis block. That way madness lies. // don't try to verfy the genesis block. That way madness lies.
@ -100,7 +100,7 @@ func CheckHeader(r io.ReadSeeker, height int64, p *chaincfg.Params) bool {
// initial load of headers // initial load of headers
// load epochstart, previous and current. // load epochstart, previous and current.
// get the header from the epoch start, up to 2016 blocks ago // get the header from the epoch start, up to 2016 blocks ago
_, err = r.Seek(80*(height-(height%epochLength)), os.SEEK_SET) _, err = r.Seek(int64(80*(height-(height%epochLength))), os.SEEK_SET)
if err != nil { if err != nil {
log.Printf(err.Error()) log.Printf(err.Error())
return false return false
@ -113,7 +113,7 @@ func CheckHeader(r io.ReadSeeker, height int64, p *chaincfg.Params) bool {
// log.Printf("start epoch at height %d ", height-(height%epochLength)) // log.Printf("start epoch at height %d ", height-(height%epochLength))
// seek to n-1 header // seek to n-1 header
_, err = r.Seek(80*(height-1), os.SEEK_SET) _, err = r.Seek(int64(80*(height-1)), os.SEEK_SET)
if err != nil { if err != nil {
log.Printf(err.Error()) log.Printf(err.Error())
return false return false
@ -125,7 +125,7 @@ func CheckHeader(r io.ReadSeeker, height int64, p *chaincfg.Params) bool {
return false return false
} }
// seek to curHeight header and read in // seek to curHeight header and read in
_, err = r.Seek(80*(height), os.SEEK_SET) _, err = r.Seek(int64(80*(height)), os.SEEK_SET)
if err != nil { if err != nil {
log.Printf(err.Error()) log.Printf(err.Error())
return false return false
@ -184,7 +184,7 @@ difficulty adjustments, and that they all link in to each other properly.
This is the only blockchain technology in the whole code base. This is the only blockchain technology in the whole code base.
Returns false if anything bad happens. Returns true if the range checks Returns false if anything bad happens. Returns true if the range checks
out with no errors. */ out with no errors. */
func CheckRange(r io.ReadSeeker, first, last int64, p *chaincfg.Params) bool { func CheckRange(r io.ReadSeeker, first, last int32, p *chaincfg.Params) bool {
for i := first; i <= last; i++ { for i := first; i <= last; i++ {
if !CheckHeader(r, i, p) { if !CheckHeader(r, i, p) {
return false return false

@ -65,9 +65,8 @@ func inDeadZone(pos, size uint32) bool {
return pos > last return pos > last
} }
// take in a merkle block, parse through it, and return the // take in a merkle block, parse through it, and return txids indicated
// txids that they're trying to tell us about. If there's any problem // If there's any problem return an error. Checks self-consistency only.
// return an error.
// doing it with a stack instead of recursion. Because... // doing it with a stack instead of recursion. Because...
// OK I don't know why I'm just not in to recursion OK? // OK I don't know why I'm just not in to recursion OK?
func checkMBlock(m *wire.MsgMerkleBlock) ([]*wire.ShaHash, error) { func checkMBlock(m *wire.MsgMerkleBlock) ([]*wire.ShaHash, error) {

@ -30,31 +30,11 @@ func (s *SPVCon) incomingMessageHandler() {
case *wire.MsgPong: case *wire.MsgPong:
log.Printf("Got a pong response. OK.\n") log.Printf("Got a pong response. OK.\n")
case *wire.MsgMerkleBlock: case *wire.MsgMerkleBlock:
err = s.IngestMerkleBlock(m)
// log.Printf("Got merkle block message. Will verify.\n")
// fmt.Printf("%d flag bytes, %d txs, %d hashes",
// len(m.Flags), m.Transactions, len(m.Hashes))
txids, err := checkMBlock(m)
if err != nil { if err != nil {
log.Printf("Merkle block error: %s\n", err.Error()) log.Printf("Merkle block error: %s\n", err.Error())
return continue
} }
// fmt.Printf(" got %d txs ", len(txids))
// fmt.Printf(" = got %d txs from block %s\n",
// len(txids), m.Header.BlockSha().String())
rah := <-s.mBlockQueue // pop height off mblock queue
// this verifies order, and also that the returned header fits
// into our SPV header file
if !rah.root.IsEqual(&m.Header.MerkleRoot) {
log.Printf("out of order error")
}
for _, txid := range txids {
err := s.TS.AddTxid(txid, rah.height)
if err != nil {
log.Printf("Txid store error: %s\n", err.Error())
}
}
case *wire.MsgHeaders: case *wire.MsgHeaders:
moar, err := s.IngestHeaders(m) moar, err := s.IngestHeaders(m)
if err != nil { if err != nil {
@ -70,11 +50,22 @@ func (s *SPVCon) incomingMessageHandler() {
log.Printf("Incoming Tx error: %s\n", err.Error()) log.Printf("Incoming Tx error: %s\n", err.Error())
} }
// log.Printf("Got tx %s\n", m.TxSha().String()) // log.Printf("Got tx %s\n", m.TxSha().String())
case *wire.MsgReject: case *wire.MsgReject:
log.Printf("Rejected! cmd: %s code: %s tx: %s reason: %s", log.Printf("Rejected! cmd: %s code: %s tx: %s reason: %s",
m.Cmd, m.Code.String(), m.Hash.String(), m.Reason) m.Cmd, m.Code.String(), m.Hash.String(), m.Reason)
case *wire.MsgInv:
log.Printf("got inv. Contains:\n")
for i, thing := range m.InvList {
log.Printf("\t%d)%s : %s",
i, thing.Type.String(), thing.Hash.String())
if thing.Type == wire.InvTypeTx { // new tx, ingest
s.TS.OKTxids[thing.Hash] = 0 // unconfirmed
s.AskForTx(thing.Hash)
}
if thing.Type == wire.InvTypeBlock { // new block, ingest
s.AskForBlock(thing.Hash)
}
}
default: default:
log.Printf("Got unknown message type %s\n", m.Command()) log.Printf("Got unknown message type %s\n", m.Command())
} }

56
uspv/sortsignsend.go Normal file

@ -0,0 +1,56 @@
package uspv
import (
"bytes"
"fmt"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil/hdkeychain"
"github.com/btcsuite/btcutil/txsort"
)
func (t *TxStore) SignThis(tx *wire.MsgTx) error {
fmt.Printf("-= SignThis =-\n")
// sort tx before signing.
txsort.InPlaceSort(tx)
sigs := make([][]byte, len(tx.TxIn))
// first iterate over each input
for j, in := range tx.TxIn {
for k := uint32(0); k < uint32(len(t.Adrs)); k++ {
child, err := t.rootPrivKey.Child(k + hdkeychain.HardenedKeyStart)
if err != nil {
return err
}
myadr, err := child.Address(t.Param)
if err != nil {
return err
}
adrScript, err := txscript.PayToAddrScript(myadr)
if err != nil {
return err
}
if bytes.Equal(adrScript, in.SignatureScript) {
fmt.Printf("Hit; key %d matches input %d. Signing.\n", k, j)
priv, err := child.ECPrivKey()
if err != nil {
return err
}
sigs[j], err = txscript.SignatureScript(
tx, j, in.SignatureScript, txscript.SigHashAll, priv, true)
if err != nil {
return err
}
break
}
}
}
for i, s := range sigs {
if s != nil {
tx.TxIn[i].SignatureScript = s
}
}
return nil
}

@ -129,8 +129,7 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error {
return err return err
} }
if bytes.Equal(out.PkScript, aPKscript) { // hit if bytes.Equal(out.PkScript, aPKscript) { // hit
hits++
acq += out.Value
var newu Utxo var newu Utxo
newu.AtHeight = height newu.AtHeight = height
newu.KeyIdx = a.KeyIdx newu.KeyIdx = a.KeyIdx
@ -146,6 +145,8 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error {
} }
if !dupe { // only save to DB if new txid if !dupe { // only save to DB if new txid
t.Utxos = append(t.Utxos, newu) t.Utxos = append(t.Utxos, newu)
acq += out.Value
hits++
} else { } else {
fmt.Printf("...dupe ") fmt.Printf("...dupe ")
} }