From 5a7c04bdb5693f166aff58bc2e280a69396717ae Mon Sep 17 00:00:00 2001 From: Tadge Dryja Date: Fri, 22 Jan 2016 16:04:27 -0800 Subject: [PATCH] incoming invs and tx signing when we get inv messages, txs are easy but blocks are a little tricky block heigh synchronization is done via os' file system signing is a bit inelegant (searches through inputs for pkscript matches) but fast enough in practice --- uspv/eight333.go | 115 +++++++++++++++++++++++++++++++++---------- uspv/header.go | 12 ++--- uspv/mblock.go | 5 +- uspv/msghandler.go | 39 ++++++--------- uspv/sortsignsend.go | 56 +++++++++++++++++++++ uspv/txstore.go | 5 +- 6 files changed, 170 insertions(+), 62 deletions(-) create mode 100644 uspv/sortsignsend.go diff --git a/uspv/eight333.go b/uspv/eight333.go index 5c2e459d..548f450a 100644 --- a/uspv/eight333.go +++ b/uspv/eight333.go @@ -27,9 +27,11 @@ type SPVCon struct { con net.Conn // the (probably tcp) connection to the node headerFile *os.File // file for SPV headers + //[doesn't work without fancy mutexes, nevermind, just use header file] + // localHeight int32 // block height we're on + remoteHeight int32 // block height they're on localVersion uint32 // version we report remoteVersion uint32 // version remote node - remoteHeight int32 // block height they're on // what's the point of the input queue? remove? leave for now... inMsgQueue chan wire.Message // Messages coming in from remote node @@ -42,7 +44,7 @@ type SPVCon struct { param *chaincfg.Params // network parameters (testnet3, testnetL) // mBlockQueue is for keeping track of what height we've requested. - mBlockQueue chan RootAndHeight + mBlockQueue chan HashAndHeight } func OpenSPV(remoteNode string, hfn, tsfn string, @@ -125,7 +127,7 @@ func OpenSPV(remoteNode string, hfn, tsfn string, go s.incomingMessageHandler() s.outMsgQueue = make(chan wire.Message, 1) go s.outgoingMessageHandler() - s.mBlockQueue = make(chan RootAndHeight, 32) // queue depth 32 is a thing + s.mBlockQueue = make(chan HashAndHeight, 32) // queue depth 32 is a thing return s, nil } @@ -207,6 +209,36 @@ func (s *SPVCon) HeightFromHeader(query wire.BlockHeader) (uint32, error) { return 0, fmt.Errorf("Header not found on disk") } +// AskForTx requests a tx we heard about from an inv message. +// It's one at a time but should be fast enough. +func (s *SPVCon) AskForTx(txid wire.ShaHash) { + gdata := wire.NewMsgGetData() + inv := wire.NewInvVect(wire.InvTypeTx, &txid) + gdata.AddInvVect(inv) + s.outMsgQueue <- gdata +} + +// AskForBlock requests a merkle block we heard about from an inv message. +// We don't have it in our header file so when we get it we do both operations: +// appending and checking the header, and checking spv proofs +func (s *SPVCon) AskForBlock(hsh wire.ShaHash) { + gdata := wire.NewMsgGetData() + inv := wire.NewInvVect(wire.InvTypeFilteredBlock, &hsh) + gdata.AddInvVect(inv) + + info, err := s.headerFile.Stat() // get + if err != nil { + log.Fatal(err) // crash if header file disappears + } + nextHeight := int32(info.Size() / 80) + + hah := NewRootAndHeight(hsh, nextHeight) + + s.mBlockQueue <- hah // push height and mroot of requested block on queue + s.outMsgQueue <- gdata // push request to outbox + +} + func (s *SPVCon) AskForHeaders() error { var hdr wire.BlockHeader ghdr := wire.NewMsgGetHeaders() @@ -229,6 +261,7 @@ func (s *SPVCon) AskForHeaders() error { } log.Printf("suk to offset %d (should be near the end\n", ns) + // get header from last 80 bytes of file err = hdr.Deserialize(s.headerFile) if err != nil { @@ -250,6 +283,31 @@ func (s *SPVCon) AskForHeaders() error { return nil } +func (s *SPVCon) IngestMerkleBlock(m *wire.MsgMerkleBlock) error { + txids, err := checkMBlock(m) // check self-consistency + if err != nil { + return err + } + hah := <-s.mBlockQueue // pop height off mblock queue + // this verifies order, and also that the returned header fits + // into our SPV header file + newMerkBlockSha := m.Header.BlockSha() + if !hah.blockhash.IsEqual(&newMerkBlockSha) { + return fmt.Errorf("merkle block out of order error") + } + for _, txid := range txids { + err := s.TS.AddTxid(txid, hah.height) + if err != nil { + return fmt.Errorf("Txid store error: %s\n", err.Error()) + } + } + return nil +} + +// IngestHeaders takes in a bunch of headers and appends them to the +// local header file, checking that they fit. If there's no headers, +// it assumes we're done and returns false. If it worked it assumes there's +// more to request and returns true.9 func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { var err error // seek to last header @@ -264,6 +322,12 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { } prevHash := last.BlockSha() + endPos, err := s.headerFile.Seek(0, os.SEEK_END) + if err != nil { + return false, err + } + tip := int32(endPos/80) - 1 // move back 1 header length to read + gotNum := int64(len(m.Headers)) if gotNum > 0 { fmt.Printf("got %d headers. Range:\n%s - %s\n", @@ -273,17 +337,11 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { log.Printf("got 0 headers, we're probably synced up") return false, nil } - - endPos, err := s.headerFile.Seek(0, os.SEEK_END) - if err != nil { - return false, err - } - // check first header returned to make sure it fits on the end // of our header file if !m.Headers[0].PrevBlock.IsEqual(&prevHash) { // delete 100 headers if this happens! Dumb reorg. - log.Printf("possible reorg; header msg doesn't fit. points to %s, expect %s", + log.Printf("reorg? header msg doesn't fit. points to %s, expect %s", m.Headers[0].PrevBlock.String(), prevHash.String()) if endPos < 8080 { // jeez I give up, back to genesis @@ -297,8 +355,6 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { return false, fmt.Errorf("Truncated header file to try again") } - tip := endPos / 80 - tip-- // move back header length so it can read last header for _, resphdr := range m.Headers { // write to end of file err = resphdr.Serialize(s.headerFile) @@ -328,22 +384,25 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { } } log.Printf("Headers to height %d OK.", tip) + return true, nil } -// RootAndHeight is needed instead of just height in case a fullnode +// HashAndHeight is needed instead of just height in case a fullnode // responds abnormally (?) by sending out of order merkleblocks. // we cache a merkleroot:height pair in the queue so we don't have to // look them up from the disk. -type RootAndHeight struct { - root wire.ShaHash - height int32 +// Also used when inv messages indicate blocks so we can add the header +// and parse the txs in one request instead of requesting headers first. +type HashAndHeight struct { + blockhash wire.ShaHash + height int32 } // NewRootAndHeight saves like 2 lines. -func NewRootAndHeight(r wire.ShaHash, h int32) (rah RootAndHeight) { - rah.root = r - rah.height = h +func NewRootAndHeight(b wire.ShaHash, h int32) (hah HashAndHeight) { + hah.blockhash = b + hah.height = h return } @@ -366,15 +425,17 @@ func (s *SPVCon) PushTx(tx *wire.MsgTx) error { // Maybe it's faster to ask for many in a each message? func (s *SPVCon) AskForMerkBlocks(current, last int32) error { var hdr wire.BlockHeader + info, err := s.headerFile.Stat() // get + if err != nil { + return err // crash if header file disappears + } + nextHeight := int32(info.Size() / 80) + fmt.Printf("have headers up to height %d\n", nextHeight-1) // if last is 0, that means go as far as we can if last == 0 { - n, err := s.headerFile.Seek(0, os.SEEK_END) - if err != nil { - return err - } - last = int32(n / 80) + last = nextHeight - 1 } - + fmt.Printf("will request merkleblocks %d to %d\n", current, last) // track number of utxos track := len(s.TS.Utxos) // create initial filter @@ -419,9 +480,9 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error { if err != nil { return err } - rah := NewRootAndHeight(hdr.MerkleRoot, current) + hah := NewRootAndHeight(hdr.BlockSha(), current) s.outMsgQueue <- gdataMsg - s.mBlockQueue <- rah // push height and mroot of requested block on queue + s.mBlockQueue <- hah // push height and mroot of requested block on queue current++ } return nil diff --git a/uspv/header.go b/uspv/header.go index 6f9c18f9..40983e36 100644 --- a/uspv/header.go +++ b/uspv/header.go @@ -23,7 +23,7 @@ import ( const ( targetTimespan = time.Hour * 24 * 14 targetSpacing = time.Minute * 10 - epochLength = int64(targetTimespan / targetSpacing) + epochLength = int32(targetTimespan / targetSpacing) // 2016 maxDiffAdjust = 4 minRetargetTimespan = int64(targetTimespan / maxDiffAdjust) maxRetargetTimespan = int64(targetTimespan * maxDiffAdjust) @@ -90,7 +90,7 @@ func calcDiffAdjust(start, end wire.BlockHeader, p *chaincfg.Params) uint32 { return blockchain.BigToCompact(newTarget) } -func CheckHeader(r io.ReadSeeker, height int64, p *chaincfg.Params) bool { +func CheckHeader(r io.ReadSeeker, height int32, p *chaincfg.Params) bool { var err error var cur, prev, epochStart wire.BlockHeader // don't try to verfy the genesis block. That way madness lies. @@ -100,7 +100,7 @@ func CheckHeader(r io.ReadSeeker, height int64, p *chaincfg.Params) bool { // initial load of headers // load epochstart, previous and current. // get the header from the epoch start, up to 2016 blocks ago - _, err = r.Seek(80*(height-(height%epochLength)), os.SEEK_SET) + _, err = r.Seek(int64(80*(height-(height%epochLength))), os.SEEK_SET) if err != nil { log.Printf(err.Error()) return false @@ -113,7 +113,7 @@ func CheckHeader(r io.ReadSeeker, height int64, p *chaincfg.Params) bool { // log.Printf("start epoch at height %d ", height-(height%epochLength)) // seek to n-1 header - _, err = r.Seek(80*(height-1), os.SEEK_SET) + _, err = r.Seek(int64(80*(height-1)), os.SEEK_SET) if err != nil { log.Printf(err.Error()) return false @@ -125,7 +125,7 @@ func CheckHeader(r io.ReadSeeker, height int64, p *chaincfg.Params) bool { return false } // seek to curHeight header and read in - _, err = r.Seek(80*(height), os.SEEK_SET) + _, err = r.Seek(int64(80*(height)), os.SEEK_SET) if err != nil { log.Printf(err.Error()) return false @@ -184,7 +184,7 @@ difficulty adjustments, and that they all link in to each other properly. This is the only blockchain technology in the whole code base. Returns false if anything bad happens. Returns true if the range checks out with no errors. */ -func CheckRange(r io.ReadSeeker, first, last int64, p *chaincfg.Params) bool { +func CheckRange(r io.ReadSeeker, first, last int32, p *chaincfg.Params) bool { for i := first; i <= last; i++ { if !CheckHeader(r, i, p) { return false diff --git a/uspv/mblock.go b/uspv/mblock.go index dc71608c..823ec805 100644 --- a/uspv/mblock.go +++ b/uspv/mblock.go @@ -65,9 +65,8 @@ func inDeadZone(pos, size uint32) bool { return pos > last } -// take in a merkle block, parse through it, and return the -// txids that they're trying to tell us about. If there's any problem -// return an error. +// take in a merkle block, parse through it, and return txids indicated +// If there's any problem return an error. Checks self-consistency only. // doing it with a stack instead of recursion. Because... // OK I don't know why I'm just not in to recursion OK? func checkMBlock(m *wire.MsgMerkleBlock) ([]*wire.ShaHash, error) { diff --git a/uspv/msghandler.go b/uspv/msghandler.go index b3ee6822..58d15093 100644 --- a/uspv/msghandler.go +++ b/uspv/msghandler.go @@ -30,31 +30,11 @@ func (s *SPVCon) incomingMessageHandler() { case *wire.MsgPong: log.Printf("Got a pong response. OK.\n") case *wire.MsgMerkleBlock: - - // log.Printf("Got merkle block message. Will verify.\n") - // fmt.Printf("%d flag bytes, %d txs, %d hashes", - // len(m.Flags), m.Transactions, len(m.Hashes)) - txids, err := checkMBlock(m) + err = s.IngestMerkleBlock(m) if err != nil { log.Printf("Merkle block error: %s\n", err.Error()) - return + continue } - // fmt.Printf(" got %d txs ", len(txids)) - // fmt.Printf(" = got %d txs from block %s\n", - // len(txids), m.Header.BlockSha().String()) - rah := <-s.mBlockQueue // pop height off mblock queue - // this verifies order, and also that the returned header fits - // into our SPV header file - if !rah.root.IsEqual(&m.Header.MerkleRoot) { - log.Printf("out of order error") - } - for _, txid := range txids { - err := s.TS.AddTxid(txid, rah.height) - if err != nil { - log.Printf("Txid store error: %s\n", err.Error()) - } - } - case *wire.MsgHeaders: moar, err := s.IngestHeaders(m) if err != nil { @@ -70,11 +50,22 @@ func (s *SPVCon) incomingMessageHandler() { log.Printf("Incoming Tx error: %s\n", err.Error()) } // log.Printf("Got tx %s\n", m.TxSha().String()) - case *wire.MsgReject: log.Printf("Rejected! cmd: %s code: %s tx: %s reason: %s", m.Cmd, m.Code.String(), m.Hash.String(), m.Reason) - + case *wire.MsgInv: + log.Printf("got inv. Contains:\n") + for i, thing := range m.InvList { + log.Printf("\t%d)%s : %s", + i, thing.Type.String(), thing.Hash.String()) + if thing.Type == wire.InvTypeTx { // new tx, ingest + s.TS.OKTxids[thing.Hash] = 0 // unconfirmed + s.AskForTx(thing.Hash) + } + if thing.Type == wire.InvTypeBlock { // new block, ingest + s.AskForBlock(thing.Hash) + } + } default: log.Printf("Got unknown message type %s\n", m.Command()) } diff --git a/uspv/sortsignsend.go b/uspv/sortsignsend.go new file mode 100644 index 00000000..4aa9bc26 --- /dev/null +++ b/uspv/sortsignsend.go @@ -0,0 +1,56 @@ +package uspv + +import ( + "bytes" + "fmt" + + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil/hdkeychain" + "github.com/btcsuite/btcutil/txsort" +) + +func (t *TxStore) SignThis(tx *wire.MsgTx) error { + fmt.Printf("-= SignThis =-\n") + + // sort tx before signing. + txsort.InPlaceSort(tx) + + sigs := make([][]byte, len(tx.TxIn)) + // first iterate over each input + for j, in := range tx.TxIn { + for k := uint32(0); k < uint32(len(t.Adrs)); k++ { + child, err := t.rootPrivKey.Child(k + hdkeychain.HardenedKeyStart) + if err != nil { + return err + } + myadr, err := child.Address(t.Param) + if err != nil { + return err + } + adrScript, err := txscript.PayToAddrScript(myadr) + if err != nil { + return err + } + if bytes.Equal(adrScript, in.SignatureScript) { + fmt.Printf("Hit; key %d matches input %d. Signing.\n", k, j) + priv, err := child.ECPrivKey() + if err != nil { + return err + } + sigs[j], err = txscript.SignatureScript( + tx, j, in.SignatureScript, txscript.SigHashAll, priv, true) + if err != nil { + return err + } + break + } + } + } + for i, s := range sigs { + if s != nil { + tx.TxIn[i].SignatureScript = s + } + } + return nil +} diff --git a/uspv/txstore.go b/uspv/txstore.go index c9aaa656..d1aeb45f 100644 --- a/uspv/txstore.go +++ b/uspv/txstore.go @@ -129,8 +129,7 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error { return err } if bytes.Equal(out.PkScript, aPKscript) { // hit - hits++ - acq += out.Value + var newu Utxo newu.AtHeight = height newu.KeyIdx = a.KeyIdx @@ -146,6 +145,8 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error { } if !dupe { // only save to DB if new txid t.Utxos = append(t.Utxos, newu) + acq += out.Value + hits++ } else { fmt.Printf("...dupe ") }