sweep: add sweeper store

This commit adds a store for the sweeper. The sweeper needs minimal
persistent data to be able to recognize its own sweeps.
This commit is contained in:
Joost Jager 2018-12-07 09:02:11 +01:00
parent a2dcca2b08
commit 1f0656559e
No known key found for this signature in database
GPG Key ID: AE6B0D042C8E38D9
3 changed files with 445 additions and 0 deletions

247
sweep/store.go Normal file

@ -0,0 +1,247 @@
package sweep
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
)
var (
// lastTxBucketKey is the key that points to a bucket containing a
// single item storing the last published tx.
//
// maps: lastTxKey -> serialized_tx
lastTxBucketKey = []byte("sweeper-last-tx")
// lastTxKey is the fixed key under which the serialized tx is stored.
lastTxKey = []byte("last-tx")
// txHashesBucketKey is the key that points to a bucket containing the
// hashes of all sweep txes that were published successfully.
//
// maps: txHash -> empty slice
txHashesBucketKey = []byte("sweeper-tx-hashes")
// utxnChainPrefix is the bucket prefix for nursery buckets.
utxnChainPrefix = []byte("utxn")
// utxnHeightIndexKey is the sub bucket where the nursery stores the
// height index.
utxnHeightIndexKey = []byte("height-index")
// utxnFinalizedKndrTxnKey is a static key that can be used to locate
// the nursery finalized kindergarten sweep txn.
utxnFinalizedKndrTxnKey = []byte("finalized-kndr-txn")
byteOrder = binary.BigEndian
)
// SweeperStore stores published txes.
type SweeperStore interface {
// IsOurTx determines whether a tx is published by us, based on its
// hash.
IsOurTx(hash chainhash.Hash) (bool, error)
// NotifyPublishTx signals that we are about to publish a tx.
NotifyPublishTx(*wire.MsgTx) error
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
// for.
GetLastPublishedTx() (*wire.MsgTx, error)
}
type sweeperStore struct {
db *channeldb.DB
}
// NewSweeperStore returns a new store instance.
func NewSweeperStore(db *channeldb.DB, chainHash *chainhash.Hash) (
SweeperStore, error) {
err := db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(
lastTxBucketKey,
)
if err != nil {
return err
}
if tx.Bucket(txHashesBucketKey) != nil {
return nil
}
txHashesBucket, err := tx.CreateBucket(txHashesBucketKey)
if err != nil {
return err
}
// Use non-existence of tx hashes bucket as a signal to migrate
// nursery finalized txes.
err = migrateTxHashes(tx, txHashesBucket, chainHash)
return err
})
if err != nil {
return nil, err
}
return &sweeperStore{
db: db,
}, nil
}
// migrateTxHashes migrates nursery finalized txes to the tx hashes bucket. This
// is not implemented as a database migration, to keep the downgrade path open.
func migrateTxHashes(tx *bbolt.Tx, txHashesBucket *bbolt.Bucket,
chainHash *chainhash.Hash) error {
log.Infof("Migrating UTXO nursery finalized TXIDs")
// Compose chain bucket key.
var b bytes.Buffer
if _, err := b.Write(utxnChainPrefix); err != nil {
return err
}
if _, err := b.Write(chainHash[:]); err != nil {
return err
}
// Get chain bucket if exists.
chainBucket := tx.Bucket(b.Bytes())
if chainBucket == nil {
return nil
}
// Retrieve the existing height index.
hghtIndex := chainBucket.Bucket(utxnHeightIndexKey)
if hghtIndex == nil {
return nil
}
// Retrieve all heights.
err := hghtIndex.ForEach(func(k, v []byte) error {
heightBucket := hghtIndex.Bucket(k)
if heightBucket == nil {
return nil
}
// Get finalized tx for height.
txBytes := heightBucket.Get(utxnFinalizedKndrTxnKey)
if txBytes == nil {
return nil
}
// Deserialize and skip tx if it cannot be deserialized.
tx := &wire.MsgTx{}
err := tx.Deserialize(bytes.NewReader(txBytes))
if err != nil {
log.Warnf("Cannot deserialize utxn tx")
return nil
}
// Calculate hash.
hash := tx.TxHash()
// Insert utxn tx hash in hashes bucket.
log.Debugf("Inserting nursery tx %v in hash list "+
"(height=%v)", hash, byteOrder.Uint32(k))
return txHashesBucket.Put(hash[:], []byte{})
})
if err != nil {
return err
}
return nil
}
// NotifyPublishTx signals that we are about to publish a tx.
func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error {
return s.db.Update(func(tx *bbolt.Tx) error {
lastTxBucket := tx.Bucket(lastTxBucketKey)
if lastTxBucket == nil {
return errors.New("last tx bucket does not exist")
}
txHashesBucket := tx.Bucket(txHashesBucketKey)
if txHashesBucket == nil {
return errors.New("tx hashes bucket does not exist")
}
var b bytes.Buffer
if err := sweepTx.Serialize(&b); err != nil {
return err
}
if err := lastTxBucket.Put(lastTxKey, b.Bytes()); err != nil {
return err
}
hash := sweepTx.TxHash()
return txHashesBucket.Put(hash[:], []byte{})
})
}
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
// for.
func (s *sweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
var sweepTx *wire.MsgTx
err := s.db.View(func(tx *bbolt.Tx) error {
lastTxBucket := tx.Bucket(lastTxBucketKey)
if lastTxBucket == nil {
return errors.New("last tx bucket does not exist")
}
sweepTxRaw := lastTxBucket.Get(lastTxKey)
if sweepTxRaw == nil {
return nil
}
sweepTx = &wire.MsgTx{}
txReader := bytes.NewReader(sweepTxRaw)
if err := sweepTx.Deserialize(txReader); err != nil {
return fmt.Errorf("tx deserialize: %v", err)
}
return nil
})
if err != nil {
return nil, err
}
return sweepTx, nil
}
// IsOurTx determines whether a tx is published by us, based on its
// hash.
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
var ours bool
err := s.db.View(func(tx *bbolt.Tx) error {
txHashesBucket := tx.Bucket(txHashesBucketKey)
if txHashesBucket == nil {
return errors.New("tx hashes bucket does not exist")
}
ours = txHashesBucket.Get(hash[:]) != nil
return nil
})
if err != nil {
return false, err
}
return ours, nil
}
// Compile-time constraint to ensure sweeperStore implements SweeperStore.
var _ SweeperStore = (*sweeperStore)(nil)

45
sweep/store_mock.go Normal file

@ -0,0 +1,45 @@
package sweep
import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
)
// MockSweeperStore is a mock implementation of sweeper store. This type is
// exported, because it is currently used in nursery tests too.
type MockSweeperStore struct {
lastTx *wire.MsgTx
ourTxes map[chainhash.Hash]struct{}
}
// NewMockSweeperStore returns a new instance.
func NewMockSweeperStore() *MockSweeperStore {
return &MockSweeperStore{
ourTxes: make(map[chainhash.Hash]struct{}),
}
}
// IsOurTx determines whether a tx is published by us, based on its
// hash.
func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
_, ok := s.ourTxes[hash]
return ok, nil
}
// NotifyPublishTx signals that we are about to publish a tx.
func (s *MockSweeperStore) NotifyPublishTx(tx *wire.MsgTx) error {
txHash := tx.TxHash()
s.ourTxes[txHash] = struct{}{}
s.lastTx = tx
return nil
}
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
// for.
func (s *MockSweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
return s.lastTx, nil
}
// Compile-time constraint to ensure MockSweeperStore implements SweeperStore.
var _ SweeperStore = (*MockSweeperStore)(nil)

153
sweep/store_test.go Normal file

@ -0,0 +1,153 @@
package sweep
import (
"io/ioutil"
"os"
"testing"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
)
// makeTestDB creates a new instance of the ChannelDB for testing purposes. A
// callback which cleans up the created temporary directories is also returned
// and intended to be executed after the test completes.
func makeTestDB() (*channeldb.DB, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
return nil, nil, err
}
// Next, create channeldb for the first time.
cdb, err := channeldb.Open(tempDirName)
if err != nil {
return nil, nil, err
}
cleanUp := func() {
cdb.Close()
os.RemoveAll(tempDirName)
}
return cdb, cleanUp, nil
}
// TestStore asserts that the store persists the presented data to disk and is
// able to retrieve it again.
func TestStore(t *testing.T) {
t.Run("bolt", func(t *testing.T) {
// Create new store.
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to open channel db: %v", err)
}
defer cleanUp()
if err != nil {
t.Fatal(err)
}
testStore(t, func() (SweeperStore, error) {
var chain chainhash.Hash
return NewSweeperStore(cdb, &chain)
})
})
t.Run("mock", func(t *testing.T) {
store := NewMockSweeperStore()
testStore(t, func() (SweeperStore, error) {
// Return same store, because the mock has no real
// persistence.
return store, nil
})
})
}
func testStore(t *testing.T, createStore func() (SweeperStore, error)) {
store, err := createStore()
if err != nil {
t.Fatal(err)
}
// Initially we expect the store not to have a last published tx.
retrievedTx, err := store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}
if retrievedTx != nil {
t.Fatal("expected no last published tx")
}
// Notify publication of tx1
tx1 := wire.MsgTx{}
tx1.AddTxIn(&wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Index: 1,
},
})
err = store.NotifyPublishTx(&tx1)
if err != nil {
t.Fatal(err)
}
// Notify publication of tx2
tx2 := wire.MsgTx{}
tx2.AddTxIn(&wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Index: 2,
},
})
err = store.NotifyPublishTx(&tx2)
if err != nil {
t.Fatal(err)
}
// Recreate the sweeper store
store, err = createStore()
if err != nil {
t.Fatal(err)
}
// Assert that last published tx2 is present.
retrievedTx, err = store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}
if tx2.TxHash() != retrievedTx.TxHash() {
t.Fatal("txes do not match")
}
// Assert that both txes are recognized as our own.
ours, err := store.IsOurTx(tx1.TxHash())
if err != nil {
t.Fatal(err)
}
if !ours {
t.Fatal("expected tx to be ours")
}
ours, err = store.IsOurTx(tx2.TxHash())
if err != nil {
t.Fatal(err)
}
if !ours {
t.Fatal("expected tx to be ours")
}
// An different hash should be reported on as not being ours.
var unknownHash chainhash.Hash
ours, err = store.IsOurTx(unknownHash)
if err != nil {
t.Fatal(err)
}
if ours {
t.Fatal("expected tx to be not ours")
}
}