From 1f0656559e7aee853c279816ba1736fb94900778 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 7 Dec 2018 09:02:11 +0100 Subject: [PATCH] sweep: add sweeper store This commit adds a store for the sweeper. The sweeper needs minimal persistent data to be able to recognize its own sweeps. --- sweep/store.go | 247 ++++++++++++++++++++++++++++++++++++++++++++ sweep/store_mock.go | 45 ++++++++ sweep/store_test.go | 153 +++++++++++++++++++++++++++ 3 files changed, 445 insertions(+) create mode 100644 sweep/store.go create mode 100644 sweep/store_mock.go create mode 100644 sweep/store_test.go diff --git a/sweep/store.go b/sweep/store.go new file mode 100644 index 00000000..ef6ba99c --- /dev/null +++ b/sweep/store.go @@ -0,0 +1,247 @@ +package sweep + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/channeldb" +) + +var ( + // lastTxBucketKey is the key that points to a bucket containing a + // single item storing the last published tx. + // + // maps: lastTxKey -> serialized_tx + lastTxBucketKey = []byte("sweeper-last-tx") + + // lastTxKey is the fixed key under which the serialized tx is stored. + lastTxKey = []byte("last-tx") + + // txHashesBucketKey is the key that points to a bucket containing the + // hashes of all sweep txes that were published successfully. + // + // maps: txHash -> empty slice + txHashesBucketKey = []byte("sweeper-tx-hashes") + + // utxnChainPrefix is the bucket prefix for nursery buckets. + utxnChainPrefix = []byte("utxn") + + // utxnHeightIndexKey is the sub bucket where the nursery stores the + // height index. + utxnHeightIndexKey = []byte("height-index") + + // utxnFinalizedKndrTxnKey is a static key that can be used to locate + // the nursery finalized kindergarten sweep txn. + utxnFinalizedKndrTxnKey = []byte("finalized-kndr-txn") + + byteOrder = binary.BigEndian +) + +// SweeperStore stores published txes. +type SweeperStore interface { + // IsOurTx determines whether a tx is published by us, based on its + // hash. + IsOurTx(hash chainhash.Hash) (bool, error) + + // NotifyPublishTx signals that we are about to publish a tx. + NotifyPublishTx(*wire.MsgTx) error + + // GetLastPublishedTx returns the last tx that we called NotifyPublishTx + // for. + GetLastPublishedTx() (*wire.MsgTx, error) +} + +type sweeperStore struct { + db *channeldb.DB +} + +// NewSweeperStore returns a new store instance. +func NewSweeperStore(db *channeldb.DB, chainHash *chainhash.Hash) ( + SweeperStore, error) { + + err := db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists( + lastTxBucketKey, + ) + if err != nil { + return err + } + + if tx.Bucket(txHashesBucketKey) != nil { + return nil + } + + txHashesBucket, err := tx.CreateBucket(txHashesBucketKey) + if err != nil { + return err + } + + // Use non-existence of tx hashes bucket as a signal to migrate + // nursery finalized txes. + err = migrateTxHashes(tx, txHashesBucket, chainHash) + + return err + }) + if err != nil { + return nil, err + } + + return &sweeperStore{ + db: db, + }, nil +} + +// migrateTxHashes migrates nursery finalized txes to the tx hashes bucket. This +// is not implemented as a database migration, to keep the downgrade path open. +func migrateTxHashes(tx *bbolt.Tx, txHashesBucket *bbolt.Bucket, + chainHash *chainhash.Hash) error { + + log.Infof("Migrating UTXO nursery finalized TXIDs") + + // Compose chain bucket key. + var b bytes.Buffer + if _, err := b.Write(utxnChainPrefix); err != nil { + return err + } + + if _, err := b.Write(chainHash[:]); err != nil { + return err + } + + // Get chain bucket if exists. + chainBucket := tx.Bucket(b.Bytes()) + if chainBucket == nil { + return nil + } + + // Retrieve the existing height index. + hghtIndex := chainBucket.Bucket(utxnHeightIndexKey) + if hghtIndex == nil { + return nil + } + + // Retrieve all heights. + err := hghtIndex.ForEach(func(k, v []byte) error { + heightBucket := hghtIndex.Bucket(k) + if heightBucket == nil { + return nil + } + + // Get finalized tx for height. + txBytes := heightBucket.Get(utxnFinalizedKndrTxnKey) + if txBytes == nil { + return nil + } + + // Deserialize and skip tx if it cannot be deserialized. + tx := &wire.MsgTx{} + err := tx.Deserialize(bytes.NewReader(txBytes)) + if err != nil { + log.Warnf("Cannot deserialize utxn tx") + return nil + } + + // Calculate hash. + hash := tx.TxHash() + + // Insert utxn tx hash in hashes bucket. + log.Debugf("Inserting nursery tx %v in hash list "+ + "(height=%v)", hash, byteOrder.Uint32(k)) + + return txHashesBucket.Put(hash[:], []byte{}) + }) + if err != nil { + return err + } + + return nil +} + +// NotifyPublishTx signals that we are about to publish a tx. +func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error { + return s.db.Update(func(tx *bbolt.Tx) error { + lastTxBucket := tx.Bucket(lastTxBucketKey) + if lastTxBucket == nil { + return errors.New("last tx bucket does not exist") + } + + txHashesBucket := tx.Bucket(txHashesBucketKey) + if txHashesBucket == nil { + return errors.New("tx hashes bucket does not exist") + } + + var b bytes.Buffer + if err := sweepTx.Serialize(&b); err != nil { + return err + } + + if err := lastTxBucket.Put(lastTxKey, b.Bytes()); err != nil { + return err + } + + hash := sweepTx.TxHash() + + return txHashesBucket.Put(hash[:], []byte{}) + }) +} + +// GetLastPublishedTx returns the last tx that we called NotifyPublishTx +// for. +func (s *sweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) { + var sweepTx *wire.MsgTx + + err := s.db.View(func(tx *bbolt.Tx) error { + lastTxBucket := tx.Bucket(lastTxBucketKey) + if lastTxBucket == nil { + return errors.New("last tx bucket does not exist") + } + + sweepTxRaw := lastTxBucket.Get(lastTxKey) + if sweepTxRaw == nil { + return nil + } + + sweepTx = &wire.MsgTx{} + txReader := bytes.NewReader(sweepTxRaw) + if err := sweepTx.Deserialize(txReader); err != nil { + return fmt.Errorf("tx deserialize: %v", err) + } + + return nil + }) + if err != nil { + return nil, err + } + + return sweepTx, nil +} + +// IsOurTx determines whether a tx is published by us, based on its +// hash. +func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { + var ours bool + + err := s.db.View(func(tx *bbolt.Tx) error { + txHashesBucket := tx.Bucket(txHashesBucketKey) + if txHashesBucket == nil { + return errors.New("tx hashes bucket does not exist") + } + + ours = txHashesBucket.Get(hash[:]) != nil + + return nil + }) + if err != nil { + return false, err + } + + return ours, nil +} + +// Compile-time constraint to ensure sweeperStore implements SweeperStore. +var _ SweeperStore = (*sweeperStore)(nil) diff --git a/sweep/store_mock.go b/sweep/store_mock.go new file mode 100644 index 00000000..ba8b2f2b --- /dev/null +++ b/sweep/store_mock.go @@ -0,0 +1,45 @@ +package sweep + +import ( + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" +) + +// MockSweeperStore is a mock implementation of sweeper store. This type is +// exported, because it is currently used in nursery tests too. +type MockSweeperStore struct { + lastTx *wire.MsgTx + ourTxes map[chainhash.Hash]struct{} +} + +// NewMockSweeperStore returns a new instance. +func NewMockSweeperStore() *MockSweeperStore { + return &MockSweeperStore{ + ourTxes: make(map[chainhash.Hash]struct{}), + } +} + +// IsOurTx determines whether a tx is published by us, based on its +// hash. +func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { + _, ok := s.ourTxes[hash] + return ok, nil +} + +// NotifyPublishTx signals that we are about to publish a tx. +func (s *MockSweeperStore) NotifyPublishTx(tx *wire.MsgTx) error { + txHash := tx.TxHash() + s.ourTxes[txHash] = struct{}{} + s.lastTx = tx + + return nil +} + +// GetLastPublishedTx returns the last tx that we called NotifyPublishTx +// for. +func (s *MockSweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) { + return s.lastTx, nil +} + +// Compile-time constraint to ensure MockSweeperStore implements SweeperStore. +var _ SweeperStore = (*MockSweeperStore)(nil) diff --git a/sweep/store_test.go b/sweep/store_test.go new file mode 100644 index 00000000..23714c78 --- /dev/null +++ b/sweep/store_test.go @@ -0,0 +1,153 @@ +package sweep + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" +) + +// makeTestDB creates a new instance of the ChannelDB for testing purposes. A +// callback which cleans up the created temporary directories is also returned +// and intended to be executed after the test completes. +func makeTestDB() (*channeldb.DB, func(), error) { + // First, create a temporary directory to be used for the duration of + // this test. + tempDirName, err := ioutil.TempDir("", "channeldb") + if err != nil { + return nil, nil, err + } + + // Next, create channeldb for the first time. + cdb, err := channeldb.Open(tempDirName) + if err != nil { + return nil, nil, err + } + + cleanUp := func() { + cdb.Close() + os.RemoveAll(tempDirName) + } + + return cdb, cleanUp, nil +} + +// TestStore asserts that the store persists the presented data to disk and is +// able to retrieve it again. +func TestStore(t *testing.T) { + t.Run("bolt", func(t *testing.T) { + + // Create new store. + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to open channel db: %v", err) + } + defer cleanUp() + + if err != nil { + t.Fatal(err) + } + + testStore(t, func() (SweeperStore, error) { + var chain chainhash.Hash + return NewSweeperStore(cdb, &chain) + }) + }) + t.Run("mock", func(t *testing.T) { + store := NewMockSweeperStore() + + testStore(t, func() (SweeperStore, error) { + // Return same store, because the mock has no real + // persistence. + return store, nil + }) + }) +} + +func testStore(t *testing.T, createStore func() (SweeperStore, error)) { + store, err := createStore() + if err != nil { + t.Fatal(err) + } + + // Initially we expect the store not to have a last published tx. + retrievedTx, err := store.GetLastPublishedTx() + if err != nil { + t.Fatal(err) + } + if retrievedTx != nil { + t.Fatal("expected no last published tx") + } + + // Notify publication of tx1 + tx1 := wire.MsgTx{} + tx1.AddTxIn(&wire.TxIn{ + PreviousOutPoint: wire.OutPoint{ + Index: 1, + }, + }) + + err = store.NotifyPublishTx(&tx1) + if err != nil { + t.Fatal(err) + } + + // Notify publication of tx2 + tx2 := wire.MsgTx{} + tx2.AddTxIn(&wire.TxIn{ + PreviousOutPoint: wire.OutPoint{ + Index: 2, + }, + }) + + err = store.NotifyPublishTx(&tx2) + if err != nil { + t.Fatal(err) + } + + // Recreate the sweeper store + store, err = createStore() + if err != nil { + t.Fatal(err) + } + + // Assert that last published tx2 is present. + retrievedTx, err = store.GetLastPublishedTx() + if err != nil { + t.Fatal(err) + } + + if tx2.TxHash() != retrievedTx.TxHash() { + t.Fatal("txes do not match") + } + + // Assert that both txes are recognized as our own. + ours, err := store.IsOurTx(tx1.TxHash()) + if err != nil { + t.Fatal(err) + } + if !ours { + t.Fatal("expected tx to be ours") + } + + ours, err = store.IsOurTx(tx2.TxHash()) + if err != nil { + t.Fatal(err) + } + if !ours { + t.Fatal("expected tx to be ours") + } + + // An different hash should be reported on as not being ours. + var unknownHash chainhash.Hash + ours, err = store.IsOurTx(unknownHash) + if err != nil { + t.Fatal(err) + } + if ours { + t.Fatal("expected tx to be not ours") + } +}