move tx ingest to all db, fixes sync

Sync was non-deterministic because ingest was concurrent.
Now receiving tx messages is blocking, but that's OK, they really need
to be in the right order because the whole point of bitcoin is to put
txs in the right order.  SendTx still has a problem that the change address
may not be recognized by ingest.
This commit is contained in:
Tadge Dryja 2016-01-31 01:05:31 -08:00
parent 83dff432b1
commit 3b774ef361
4 changed files with 150 additions and 10 deletions

@ -402,7 +402,7 @@ func (s *SPVCon) IngestHeaders(m *wire.MsgHeaders) (bool, error) {
} }
// probably should disconnect from spv node at this point, // probably should disconnect from spv node at this point,
// since they're giving us invalid headers. // since they're giving us invalid headers.
return false, fmt.Errorf( return true, fmt.Errorf(
"Header %d - %s doesn't fit, dropping 100 headers.", "Header %d - %s doesn't fit, dropping 100 headers.",
resphdr.BlockSha().String(), tip) resphdr.BlockSha().String(), tip)
} }
@ -436,7 +436,7 @@ func (s *SPVCon) PushTx(tx *wire.MsgTx) error {
if err != nil { if err != nil {
return err return err
} }
_, err = s.TS.AckTx(tx) // our own tx so don't need to track relevance _, err = s.TS.Ingest(tx) // our own tx so don't need to track relevance
if err != nil { if err != nil {
return err return err
} }

@ -39,8 +39,8 @@ func (s *SPVCon) incomingMessageHandler() {
} }
case *wire.MsgHeaders: case *wire.MsgHeaders:
go s.HeaderHandler(m) go s.HeaderHandler(m)
case *wire.MsgTx: case *wire.MsgTx: // can't be concurrent! out of order kills
go s.TxHandler(m) s.TxHandler(m)
case *wire.MsgReject: case *wire.MsgReject:
log.Printf("Rejected! cmd: %s code: %s tx: %s reason: %s", log.Printf("Rejected! cmd: %s code: %s tx: %s reason: %s",
m.Cmd, m.Code.String(), m.Hash.String(), m.Reason) m.Cmd, m.Code.String(), m.Hash.String(), m.Reason)
@ -137,7 +137,7 @@ func (s *SPVCon) HeaderHandler(m *wire.MsgHeaders) {
} }
func (s *SPVCon) TxHandler(m *wire.MsgTx) { func (s *SPVCon) TxHandler(m *wire.MsgTx) {
hits, err := s.TS.AckTx(m) hits, err := s.TS.Ingest(m)
if err != nil { if err != nil {
log.Printf("Incoming Tx error: %s\n", err.Error()) log.Printf("Incoming Tx error: %s\n", err.Error())
} }
@ -146,8 +146,8 @@ func (s *SPVCon) TxHandler(m *wire.MsgTx) {
m.TxSha().String()) m.TxSha().String())
s.fPositives <- 1 // add one false positive to chan s.fPositives <- 1 // add one false positive to chan
} else { } else {
log.Printf("tx %s ingested and matches utxo/adrs.", log.Printf("tx %s ingested and matches utxo/adrs. sum %d",
m.TxSha().String()) m.TxSha().String(), s.TS.Sum)
} }
} }

@ -98,8 +98,14 @@ func (t *TxStore) GimmeFilter() (*bloom.Filter, error) {
for _, a := range t.Adrs { for _, a := range t.Adrs {
f.Add(a.PkhAdr.ScriptAddress()) f.Add(a.PkhAdr.ScriptAddress())
} }
// add txids of utxos to look for outgoing
for _, u := range t.Utxos { // get all utxos to add outpoints to filter
allUtxos, err := t.GetAllUtxos()
if err != nil {
return nil, err
}
for _, u := range allUtxos {
f.AddOutPoint(&u.Op) f.AddOutPoint(&u.Op)
} }
@ -107,7 +113,7 @@ func (t *TxStore) GimmeFilter() (*bloom.Filter, error) {
} }
// Ingest a tx into wallet, dealing with both gains and losses // Ingest a tx into wallet, dealing with both gains and losses
func (t *TxStore) AckTx(tx *wire.MsgTx) (uint32, error) { func (t *TxStore) AckTxz(tx *wire.MsgTx) (uint32, error) {
var ioHits uint32 // number of utxos changed due to this tx var ioHits uint32 // number of utxos changed due to this tx
inTxid := tx.TxSha() inTxid := tx.TxSha()

@ -5,6 +5,8 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/hdkeychain" "github.com/btcsuite/btcutil/hdkeychain"
@ -143,6 +145,39 @@ func (ts *TxStore) NumUtxos() (uint32, error) {
return n, nil return n, nil
} }
func (ts *TxStore) GetAllUtxos() ([]*Utxo, error) {
var utxos []*Utxo
err := ts.StateDB.View(func(btx *bolt.Tx) error {
duf := btx.Bucket(BKTUtxos)
if duf == nil {
return fmt.Errorf("no duffel bag")
}
return duf.ForEach(func(k, v []byte) error {
// have to copy k and v here, otherwise append will crash it.
// not quite sure why but append does weird stuff I guess.
// create a new utxo
x := make([]byte, len(k)+len(v))
copy(x, k)
copy(x[len(k):], v)
newU, err := UtxoFromBytes(x)
if err != nil {
return err
}
// and add it to ram
utxos = append(utxos, &newU)
return nil
})
return nil
})
if err != nil {
return nil, err
}
return utxos, nil
}
// PopulateAdrs just puts a bunch of adrs in ram; it doesn't touch the DB // PopulateAdrs just puts a bunch of adrs in ram; it doesn't touch the DB
func (ts *TxStore) PopulateAdrs(lastKey uint32) error { func (ts *TxStore) PopulateAdrs(lastKey uint32) error {
for k := uint32(0); k < lastKey; k++ { for k := uint32(0); k < lastKey; k++ {
@ -162,6 +197,105 @@ func (ts *TxStore) PopulateAdrs(lastKey uint32) error {
return nil return nil
} }
// Ingest puts a tx into the DB atomically. This can result in a
// gain, a loss, or no result. Gain or loss in satoshis is returned.
func (ts *TxStore) Ingest(tx *wire.MsgTx) (uint32, error) {
var hits uint32
var err error
var spentOPs [][]byte
var nUtxoBytes [][]byte
// check that we have a height and tx has been SPV OK'd
inTxid := tx.TxSha()
height, ok := ts.OKTxids[inTxid]
if !ok {
return hits, fmt.Errorf("Ingest error: tx %s not in OKTxids.",
inTxid.String())
}
// before entering into db, serialize all inputs of the ingested tx
for _, txin := range tx.TxIn {
nOP, err := outPointToBytes(&txin.PreviousOutPoint)
if err != nil {
return hits, err
}
spentOPs = append(spentOPs, nOP)
}
// also generate PKscripts for all addresses (maybe keep storing these?)
for _, adr := range ts.Adrs {
// iterate through all our addresses
aPKscript, err := txscript.PayToAddrScript(adr.PkhAdr)
if err != nil {
return hits, err
}
// iterate through all outputs of this tx
for i, out := range tx.TxOut {
if bytes.Equal(out.PkScript, aPKscript) { // new utxo for us
var newu Utxo
newu.AtHeight = height
newu.KeyIdx = adr.KeyIdx
newu.Value = out.Value
var newop wire.OutPoint
newop.Hash = tx.TxSha()
newop.Index = uint32(i)
newu.Op = newop
b, err := newu.ToBytes()
if err != nil {
return hits, err
}
nUtxoBytes = append(nUtxoBytes, b)
ts.Sum += newu.Value
hits++
}
break // only one match
}
}
err = ts.StateDB.Update(func(btx *bolt.Tx) error {
// get all 4 buckets
duf := btx.Bucket(BKTUtxos)
// sta := btx.Bucket(BKTState)
// old := btx.Bucket(BKTStxos)
// txns := btx.Bucket(BKTTxns)
// first see if we lose utxos
// iterate through duffel bag and look for matches
// this makes us lose money, which is regrettable, but we need to know.
for _, nOP := range spentOPs {
duf.ForEach(func(k, v []byte) error {
if bytes.Equal(k, nOP) { // matched, we lost utxo
// do all this just to figure out value we lost
x := make([]byte, len(k)+len(v))
copy(x, k)
copy(x[len(k):], v)
lostTxo, err := UtxoFromBytes(x)
if err != nil {
return err
}
ts.Sum -= lostTxo.Value
hits++
// then delete the utxo from duf, save to old
err = duf.Delete(k)
if err != nil {
return err
}
return nil // matched utxo k, won't match another
}
return nil // no match
})
} // done losing utxos
// next add all new utxos to db, this is quick as the work is above
for _, ub := range nUtxoBytes {
err = duf.Put(ub[:36], ub[36:])
if err != nil {
return err
}
}
return nil
})
return hits, err
}
// SaveToDB write a utxo to disk, overwriting an old utxo of the same outpoint // SaveToDB write a utxo to disk, overwriting an old utxo of the same outpoint
func (ts *TxStore) SaveUtxo(u *Utxo) error { func (ts *TxStore) SaveUtxo(u *Utxo) error {
b, err := u.ToBytes() b, err := u.ToBytes()