From 9eccb0638a1ee4211706d136a30598a09618a3fb Mon Sep 17 00:00:00 2001 From: Tadge Dryja Date: Wed, 3 Feb 2016 20:26:12 -0800 Subject: [PATCH] I think this fixes all sync problems add a bool to the HashAndHeight struct; can indicate "final". When the block market final comes in, we don't enter wait state, but instead ask for headers. when you don't get any headers, you wont need any blocks, and that will assert the wait state. --- shell.go | 14 +---- uspv/eight333.go | 127 ++++++++++++++++++++++----------------------- uspv/msghandler.go | 37 ++++--------- 3 files changed, 74 insertions(+), 104 deletions(-) diff --git a/shell.go b/shell.go index 5602a18c..2a95280b 100644 --- a/shell.go +++ b/shell.go @@ -69,7 +69,7 @@ func shell() { } // once we're connected, initiate headers sync - err = Hdr() + err = SCon.AskForHeaders() if err != nil { log.Fatal(err) } @@ -149,18 +149,6 @@ func Shellparse(cmdslice []string) error { return nil } -// Hdr asks for headers. -func Hdr() error { - if SCon.RBytes == 0 { - return fmt.Errorf("No SPV connection, can't get headers.") - } - err := SCon.AskForHeaders() - if err != nil { - return err - } - return nil -} - // Bal prints out your score. func Bal(args []string) error { if SCon.TS == nil { diff --git a/uspv/eight333.go b/uspv/eight333.go index a757c4ab..893d473b 100644 --- a/uspv/eight333.go +++ b/uspv/eight333.go @@ -17,8 +17,6 @@ import ( const ( keyFileName = "testseed.hex" headerFileName = "headers.bin" - // Except hash-160s, those aren't backwards. But anything that's 32 bytes is. - // because, cmon, 32? Gotta reverse that. But 20? 20 is OK. // version hardcoded for now, probably ok...? VERSION = 70011 @@ -223,6 +221,7 @@ func (s *SPVCon) HeightFromHeader(query wire.BlockHeader) (uint32, error) { // AskForTx requests a tx we heard about from an inv message. // It's one at a time but should be fast enough. +// I don't like this function because SPV shouldn't even ask... func (s *SPVCon) AskForTx(txid wire.ShaHash) { gdata := wire.NewMsgGetData() inv := wire.NewInvVect(wire.InvTypeTx, &txid) @@ -230,38 +229,12 @@ func (s *SPVCon) AskForTx(txid wire.ShaHash) { s.outMsgQueue <- gdata } -// AskForBlock requests a merkle block we heard about from an inv message. -// We don't have it in our header file so when we get it we do both operations: -// appending and checking the header, and checking spv proofs -func (s *SPVCon) AskForBlockx(hsh wire.ShaHash) { - s.headerMutex.Lock() - defer s.headerMutex.Unlock() - - gdata := wire.NewMsgGetData() - inv := wire.NewInvVect(wire.InvTypeFilteredBlock, &hsh) - gdata.AddInvVect(inv) - - info, err := s.headerFile.Stat() // get - if err != nil { - log.Fatal(err) // crash if header file disappears - } - nextHeight := int32(info.Size() / 80) - - hah := NewRootAndHeight(hsh, nextHeight) - fmt.Printf("AskForBlock - %s height %d\n", hsh.String(), nextHeight) - s.mBlockQueue <- hah // push height and mroot of requested block on queue - s.outMsgQueue <- gdata // push request to outbox - return -} - func (s *SPVCon) AskForHeaders() error { - s.headerMutex.Lock() - defer s.headerMutex.Unlock() - var hdr wire.BlockHeader ghdr := wire.NewMsgGetHeaders() ghdr.ProtocolVersion = s.localVersion + s.headerMutex.Lock() // start header file ops info, err := s.headerFile.Stat() if err != nil { return err @@ -286,6 +259,7 @@ func (s *SPVCon) AskForHeaders() error { log.Printf("can't Deserialize") return err } + s.headerMutex.Unlock() // done with header file cHash := hdr.BlockSha() err = ghdr.AddBlockLocatorHash(&cHash) @@ -307,12 +281,8 @@ func (s *SPVCon) IngestMerkleBlock(m *wire.MsgMerkleBlock) error { return err } var hah HashAndHeight - select { + select { // select here so we don't block on an unrequested mblock case hah = <-s.mBlockQueue: // pop height off mblock queue - // not super comfortable with this but it seems to work. - if len(s.mBlockQueue) == 0 { // done and fully sync'd - s.inWaitState <- true - } break default: return fmt.Errorf("Unrequested merkle block") @@ -336,6 +306,16 @@ func (s *SPVCon) IngestMerkleBlock(m *wire.MsgMerkleBlock) error { if err != nil { return err } + if hah.final { + // don't set waitstate; instead, ask for headers again! + // this way the only thing that triggers waitstate is asking for headers, + // getting 0, calling AskForMerkBlocks(), and seeing you don't need any. + // that way you are pretty sure you're synced up. + err = s.AskForHeaders() + if err != nil { + return err + } + } return nil } @@ -345,6 +325,16 @@ func (s *SPVCon) IngestMerkleBlock(m *wire.MsgMerkleBlock) error { // it assumes we're done and returns false. If it worked it assumes there's // more to request and returns true. func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { + gotNum := int64(len(m.Headers)) + if gotNum > 0 { + fmt.Printf("got %d headers. Range:\n%s - %s\n", + gotNum, m.Headers[0].BlockSha().String(), + m.Headers[len(m.Headers)-1].BlockSha().String()) + } else { + log.Printf("got 0 headers, we're probably synced up") + return false, nil + } + s.headerMutex.Lock() defer s.headerMutex.Unlock() @@ -367,15 +357,6 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { } tip := int32(endPos/80) - 1 // move back 1 header length to read - gotNum := int64(len(m.Headers)) - if gotNum > 0 { - fmt.Printf("got %d headers. Range:\n%s - %s\n", - gotNum, m.Headers[0].BlockSha().String(), - m.Headers[len(m.Headers)-1].BlockSha().String()) - } else { - log.Printf("got 0 headers, we're probably synced up") - return false, nil - } // check first header returned to make sure it fits on the end // of our header file if !m.Headers[0].PrevBlock.IsEqual(&prevHash) { @@ -434,6 +415,7 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { type HashAndHeight struct { blockhash wire.ShaHash height int32 + final bool // indicates this is the last merkleblock requested } // NewRootAndHeight saves like 2 lines. @@ -483,20 +465,32 @@ func (s *SPVCon) RemoveHeaders(r int32) error { // AskForMerkBlocks requests blocks from current to last // right now this asks for 1 block per getData message. // Maybe it's faster to ask for many in a each message? -func (s *SPVCon) AskForMerkBlocks(current, last int32) error { +func (s *SPVCon) AskForMerkBlocks() error { var hdr wire.BlockHeader - nextHeight, err := s.GetNextHeaderHeight() + s.headerMutex.Lock() // lock just to check filesize + stat, err := os.Stat(headerFileName) + s.headerMutex.Unlock() // checked, unlock + endPos := stat.Size() + + headerTip := int32(endPos/80) - 1 // move back 1 header length to read + + dbTip, err := s.TS.GetDBSyncHeight() if err != nil { return err } - - fmt.Printf("have headers up to height %d\n", nextHeight-1) - // if last is 0, that means go as far as we can - if last < current { - return fmt.Errorf("MBlock range %d < %d\n", last, current) + fmt.Printf("dbTip %d headerTip %d\n", dbTip, headerTip) + if dbTip > headerTip { + return fmt.Errorf("error- db longer than headers! shouldn't happen.") } - fmt.Printf("will request merkleblocks %d to %d\n", current, last) + if dbTip == headerTip { + // nothing to ask for; set wait state and return + fmt.Printf("no merkle blocks to request, entering wait state\n") + s.inWaitState <- true + return nil + } + + fmt.Printf("will request merkleblocks %d to %d\n", dbTip, headerTip) // create initial filter filt, err := s.TS.GimmeFilter() @@ -506,19 +500,20 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error { // send filter s.SendFilter(filt) fmt.Printf("sent filter %x\n", filt.MsgFilterLoad().Filter) - s.headerMutex.Lock() - defer s.headerMutex.Unlock() - _, err = s.headerFile.Seek(int64((current-1)*80), os.SEEK_SET) - if err != nil { - return err - } // loop through all heights where we want merkleblocks. - for current <= last { + for dbTip <= headerTip { // load header from file - err = hdr.Deserialize(s.headerFile) + + s.headerMutex.Lock() // seek to header we need + _, err = s.headerFile.Seek(int64((dbTip-1)*80), os.SEEK_SET) if err != nil { - log.Printf("Deserialize err\n") + return err + } + err = hdr.Deserialize(s.headerFile) // read header, done w/ file for now + s.headerMutex.Unlock() // unlock after reading 1 header + if err != nil { + log.Printf("header deserialize error!\n") return err } @@ -531,13 +526,15 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error { if err != nil { return err } - hah := NewRootAndHeight(hdr.BlockSha(), current) + hah := NewRootAndHeight(hdr.BlockSha(), dbTip) + if dbTip == headerTip { // if this is the last block, indicate finality + hah.final = true + } s.outMsgQueue <- gdataMsg + // waits here most of the time for the queue to empty out s.mBlockQueue <- hah // push height and mroot of requested block on queue - current++ + dbTip++ } - // done syncing blocks known in header file, ask for new headers we missed - // s.AskForHeaders() - // don't need this -- will sync to end regardless + return nil } diff --git a/uspv/msghandler.go b/uspv/msghandler.go index 926ae5da..a6a1c3c2 100644 --- a/uspv/msghandler.go +++ b/uspv/msghandler.go @@ -3,7 +3,6 @@ package uspv import ( "fmt" "log" - "os" "github.com/btcsuite/btcd/wire" ) @@ -107,34 +106,20 @@ func (s *SPVCon) HeaderHandler(m *wire.MsgHeaders) { log.Printf("Header error: %s\n", err.Error()) return } - // if we got post DB syncheight headers, get merkleblocks for them - // this is always true except for first pre-birthday sync - - // checked header length, start req for more if needed + // more to get? if so, ask for them and return if moar { - s.AskForHeaders() - } else { // no moar, done w/ headers, get merkleblocks - s.headerMutex.Lock() - endPos, err := s.headerFile.Seek(0, os.SEEK_END) + err = s.AskForHeaders() if err != nil { - log.Printf("Header error: %s", err.Error()) - return - } - s.headerMutex.Unlock() - tip := int32(endPos/80) - 1 // move back 1 header length to read - syncTip, err := s.TS.GetDBSyncHeight() - if err != nil { - log.Printf("syncTip error: %s", err.Error()) - return - } - if syncTip < tip { - fmt.Printf("syncTip %d headerTip %d\n", syncTip, tip) - err = s.AskForMerkBlocks(syncTip+1, tip) - if err != nil { - log.Printf("AskForMerkBlocks error: %s", err.Error()) - return - } + log.Printf("AskForHeaders error: %s", err.Error()) } + return + } + + // no moar, done w/ headers, get merkleblocks + err = s.AskForMerkBlocks() + if err != nil { + log.Printf("AskForMerkBlocks error: %s", err.Error()) + return } }