From 5c2bbff3eb042a862f73c3ed00ef653c59c8dc42 Mon Sep 17 00:00:00 2001 From: Tadge Dryja Date: Thu, 28 Jan 2016 22:31:05 -0800 Subject: [PATCH] fix more sync problems. Seems to work OK One edge case is with long chains of unconfirmed txs. All self-signed txs are stored, but not used to negate false incoming utxos yet. --- uspv/eight333.go | 24 ++++++++++++++++++----- uspv/msghandler.go | 47 +++++++++++++++++++++++++--------------------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/uspv/eight333.go b/uspv/eight333.go index 275a6493..247d2815 100644 --- a/uspv/eight333.go +++ b/uspv/eight333.go @@ -401,7 +401,9 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) { if err != nil { return false, err } + if syncTip < tip { + fmt.Printf("syncTip %d headerTip %d\n", syncTip, tip) err = s.AskForMerkBlocks(syncTip, tip) if err != nil { return false, err @@ -451,6 +453,18 @@ func (s *SPVCon) GetNextHeaderHeight() (int32, error) { return nextHeight, nil } +func (s *SPVCon) RemoveHeaders(r int32) error { + endPos, err := s.headerFile.Seek(0, os.SEEK_END) + if err != nil { + return err + } + err = s.headerFile.Truncate(endPos - int64(r*80)) + if err != nil { + return fmt.Errorf("couldn't truncate header file") + } + return nil +} + // AskForMerkBlocks requests blocks from current to last // right now this asks for 1 block per getData message. // Maybe it's faster to ask for many in a each message? @@ -464,8 +478,8 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error { fmt.Printf("have headers up to height %d\n", nextHeight-1) // if last is 0, that means go as far as we can - if last == 0 { - last = nextHeight - 1 + if last < current { + return fmt.Errorf("MBlock range %d < %d\n", last, current) } fmt.Printf("will request merkleblocks %d to %d\n", current, last) @@ -483,9 +497,7 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error { return err } // loop through all heights where we want merkleblocks. - for current < last { - // check if we need to update filter... diff of 5 utxos...? - + for current <= last { // load header from file err = hdr.Deserialize(s.headerFile) if err != nil { @@ -506,6 +518,8 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error { s.mBlockQueue <- hah // push height and mroot of requested block on queue current++ } + fmt.Printf("mblock reqs done, more headers\n") + // done syncing blocks known in header file, ask for new headers we missed s.AskForHeaders() diff --git a/uspv/msghandler.go b/uspv/msghandler.go index 1eeddb5e..88b6f4b1 100644 --- a/uspv/msghandler.go +++ b/uspv/msghandler.go @@ -27,7 +27,7 @@ func (s *SPVCon) incomingMessageHandler() { log.Printf("got %d addresses.\n", len(m.AddrList)) case *wire.MsgPing: // log.Printf("Got a ping message. We should pong back or they will kick us off.") - s.PongBack(m.Nonce) + go s.PongBack(m.Nonce) case *wire.MsgPong: log.Printf("Got a pong response. OK.\n") case *wire.MsgMerkleBlock: @@ -37,32 +37,14 @@ func (s *SPVCon) incomingMessageHandler() { continue } case *wire.MsgHeaders: - moar, err := s.IngestHeaders(m) - if err != nil { - log.Printf("Header error: %s\n", err.Error()) - return - } - if moar { - s.AskForHeaders() - } + go s.HeaderHandler(m) case *wire.MsgTx: - hits, err := s.TS.AckTx(m) - if err != nil { - log.Printf("Incoming Tx error: %s\n", err.Error()) - continue - } - if hits == 0 { - log.Printf("tx %s had no hits, filter false positive.", - m.TxSha().String()) - s.fPositives <- 1 // add one false positive to chan - } - // log.Printf("Got tx %s\n", m.TxSha().String()) + go s.TxHandler(m) case *wire.MsgReject: log.Printf("Rejected! cmd: %s code: %s tx: %s reason: %s", m.Cmd, m.Code.String(), m.Hash.String(), m.Reason) case *wire.MsgInv: go s.InvHandler(m) - case *wire.MsgNotFound: log.Printf("Got not found response from remote:") for i, thing := range m.InvList { @@ -118,6 +100,29 @@ func (s *SPVCon) fPositiveHandler() { } } +func (s *SPVCon) HeaderHandler(m *wire.MsgHeaders) { + moar, err := s.IngestHeaders(m) + if err != nil { + log.Printf("Header error: %s\n", err.Error()) + return + } + if moar { + s.AskForHeaders() + } +} + +func (s *SPVCon) TxHandler(m *wire.MsgTx) { + hits, err := s.TS.AckTx(m) + if err != nil { + log.Printf("Incoming Tx error: %s\n", err.Error()) + } + if hits == 0 { + log.Printf("tx %s had no hits, filter false positive.", + m.TxSha().String()) + s.fPositives <- 1 // add one false positive to chan + } +} + func (s *SPVCon) InvHandler(m *wire.MsgInv) { log.Printf("got inv. Contains:\n") for i, thing := range m.InvList {