diff --git a/channeldb/channel.go b/channeldb/channel.go index a284509a..5a6245b8 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -3,6 +3,7 @@ package channeldb import ( "bytes" "encoding/binary" + "errors" "fmt" "io" "net" @@ -1832,6 +1833,10 @@ type ChannelCloseSummary struct { // LocalChanCfg is the channel configuration for the local node. LocalChanConfig ChannelConfig + + // LastChanSyncMsg is the ChannelReestablish message for this channel + // for the state at the point where it was closed. + LastChanSyncMsg *lnwire.ChannelReestablish } // CloseChannel closes a previously active Lightning channel. Closing a channel @@ -2059,7 +2064,12 @@ func serializeChannelCloseSummary(w io.Writer, cs *ChannelCloseSummary) error { // If this is a close channel summary created before the addition of // the new fields, then we can exit here. if cs.RemoteCurrentRevocation == nil { - return nil + return WriteElements(w, false) + } + + // If fields are present, write boolean to indicate this, and continue. + if err := WriteElements(w, true); err != nil { + return err } if err := WriteElements(w, cs.RemoteCurrentRevocation); err != nil { @@ -2070,14 +2080,34 @@ func serializeChannelCloseSummary(w io.Writer, cs *ChannelCloseSummary) error { return err } - // We'll write this field last, as it's possible for a channel to be - // closed before we learn of the next unrevoked revocation point for - // the remote party. - if cs.RemoteNextRevocation == nil { - return nil + // The RemoteNextRevocation field is optional, as it's possible for a + // channel to be closed before we learn of the next unrevoked + // revocation point for the remote party. Write a boolen indicating + // whether this field is present or not. + if err := WriteElements(w, cs.RemoteNextRevocation != nil); err != nil { + return err } - return WriteElements(w, cs.RemoteNextRevocation) + // Write the field, if present. + if cs.RemoteNextRevocation != nil { + if err = WriteElements(w, cs.RemoteNextRevocation); err != nil { + return err + } + } + + // Write whether the channel sync message is present. + if err := WriteElements(w, cs.LastChanSyncMsg != nil); err != nil { + return err + } + + // Write the channel sync message, if present. + if cs.LastChanSyncMsg != nil { + if err := WriteElements(w, cs.LastChanSyncMsg); err != nil { + return err + } + } + + return nil } func fetchChannelCloseSummary(tx *bolt.Tx, @@ -2111,15 +2141,19 @@ func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) { // We'll now check to see if the channel close summary was encoded with // any of the additional optional fields. - err = ReadElements(r, &c.RemoteCurrentRevocation) - switch { - case err == io.EOF: - return c, nil + var hasNewFields bool + err = ReadElements(r, &hasNewFields) + if err != nil { + return nil, err + } - // If we got a non-eof error, then we know there's an actually issue. - // Otherwise, it may have been the case that this summary didn't have - // the set of optional fields. - case err != nil: + // If fields are not present, we can return. + if !hasNewFields { + return c, nil + } + + // Otherwise read the new fields. + if err := ReadElements(r, &c.RemoteCurrentRevocation); err != nil { return nil, err } @@ -2129,17 +2163,48 @@ func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) { // Finally, we'll attempt to read the next unrevoked commitment point // for the remote party. If we closed the channel before receiving a - // funding locked message, then this can be nil. As a result, we'll use - // the same technique to read the field, only if there's still data - // left in the buffer. - err = ReadElements(r, &c.RemoteNextRevocation) - if err != nil && err != io.EOF { - // If we got a non-eof error, then we know there's an actually - // issue. Otherwise, it may have been the case that this - // summary didn't have the set of optional fields. + // funding locked message then this might not be present. A boolean + // indicating whether the field is present will come first. + var hasRemoteNextRevocation bool + err = ReadElements(r, &hasRemoteNextRevocation) + if err != nil { return nil, err } + // If this field was written, read it. + if hasRemoteNextRevocation { + err = ReadElements(r, &c.RemoteNextRevocation) + if err != nil { + return nil, err + } + } + + // Check if we have a channel sync message to read. + var hasChanSyncMsg bool + err = ReadElements(r, &hasChanSyncMsg) + if err == io.EOF { + return c, nil + } else if err != nil { + return nil, err + } + + // If a chan sync message is present, read it. + if hasChanSyncMsg { + // We must pass in reference to a lnwire.Message for the codec + // to support it. + var msg lnwire.Message + if err := ReadElements(r, &msg); err != nil { + return nil, err + } + + chanSync, ok := msg.(*lnwire.ChannelReestablish) + if !ok { + return nil, errors.New("unable cast db Message to " + + "ChannelReestablish") + } + c.LastChanSyncMsg = chanSync + } + return c, nil } diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 4252b721..97ab7d5b 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -248,10 +248,10 @@ func TestOpenChannelPutGetDelete(t *testing.T) { t.Parallel() cdb, cleanUp, err := makeTestDB() - defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } + defer cleanUp() // Create the test channel state, then add an additional fake HTLC // before syncing to disk. @@ -368,10 +368,10 @@ func TestChannelStateTransition(t *testing.T) { t.Parallel() cdb, cleanUp, err := makeTestDB() - defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } + defer cleanUp() // First create a minimal channel, then perform a full sync in order to // persist the data. diff --git a/channeldb/db.go b/channeldb/db.go index 2c8fc69c..e36dcf84 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/coreos/bbolt" "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" ) const ( @@ -80,6 +81,13 @@ var ( number: 6, migration: migratePruneEdgeUpdateIndex, }, + { + // The DB version that migrates the ChannelCloseSummary + // to a format where optional fields are indicated with + // boolean flags. + number: 7, + migration: migrateOptionalChannelCloseSummaryFields, + }, } // Big endian is the preferred byte order, due to cursor scans over @@ -609,6 +617,54 @@ func (d *DB) FetchClosedChannel(chanID *wire.OutPoint) (*ChannelCloseSummary, er return chanSummary, nil } +// FetchClosedChannelForID queries for a channel close summary using the +// channel ID of the channel in question. +func (d *DB) FetchClosedChannelForID(cid lnwire.ChannelID) ( + *ChannelCloseSummary, error) { + + var chanSummary *ChannelCloseSummary + if err := d.View(func(tx *bolt.Tx) error { + closeBucket := tx.Bucket(closedChannelBucket) + if closeBucket == nil { + return ErrClosedChannelNotFound + } + + // The first 30 bytes of the channel ID and outpoint will be + // equal. + cursor := closeBucket.Cursor() + op, c := cursor.Seek(cid[:30]) + + // We scan over all possible candidates for this channel ID. + for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() { + var outPoint wire.OutPoint + err := readOutpoint(bytes.NewReader(op), &outPoint) + if err != nil { + return err + } + + // If the found outpoint does not correspond to this + // channel ID, we continue. + if !cid.IsChanPoint(&outPoint) { + continue + } + + // Deserialize the close summary and return. + r := bytes.NewReader(c) + chanSummary, err = deserializeCloseChannelSummary(r) + if err != nil { + return err + } + + return nil + } + return ErrClosedChannelNotFound + }); err != nil { + return nil, err + } + + return chanSummary, nil +} + // MarkChanFullyClosed marks a channel as fully closed within the database. A // channel should be marked as fully closed if the channel was initially // cooperatively closed and it's reached a single confirmation, or after all diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 872e6307..794b1fcf 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -5,6 +5,9 @@ import ( "os" "path/filepath" "testing" + + "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/lnwire" ) func TestOpenWithCreate(t *testing.T) { @@ -71,3 +74,76 @@ func TestWipe(t *testing.T) { ErrNoClosedChannels, err) } } + +// TestFetchClosedChannelForID tests that we are able to properly retrieve a +// ChannelCloseSummary from the DB given a ChannelID. +func TestFetchClosedChannelForID(t *testing.T) { + t.Parallel() + + const numChans = 101 + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + // Create the test channel state, that we will mutate the index of the + // funding point. + state, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + + // Now run through the number of channels, and modify the outpoint index + // to create new channel IDs. + for i := uint32(0); i < numChans; i++ { + // Save the open channel to disk. + state.FundingOutpoint.Index = i + if err := state.FullSync(); err != nil { + t.Fatalf("unable to save and serialize channel "+ + "state: %v", err) + } + + // Close the channel. To make sure we retrieve the correct + // summary later, we make them differ in the SettledBalance. + closeSummary := &ChannelCloseSummary{ + ChanPoint: state.FundingOutpoint, + RemotePub: state.IdentityPub, + SettledBalance: btcutil.Amount(500 + i), + } + if err := state.CloseChannel(closeSummary); err != nil { + t.Fatalf("unable to close channel: %v", err) + } + } + + // Now run though them all again and make sure we are able to retrieve + // summaries from the DB. + for i := uint32(0); i < numChans; i++ { + state.FundingOutpoint.Index = i + + // We calculate the ChannelID and use it to fetch the summary. + cid := lnwire.NewChanIDFromOutPoint(&state.FundingOutpoint) + fetchedSummary, err := cdb.FetchClosedChannelForID(cid) + if err != nil { + t.Fatalf("unable to fetch close summary: %v", err) + } + + // Make sure we retrieved the correct one by checking the + // SettledBalance. + if fetchedSummary.SettledBalance != btcutil.Amount(500+i) { + t.Fatalf("summaries don't match: expected %v got %v", + btcutil.Amount(500+i), + fetchedSummary.SettledBalance) + } + } + + // As a final test we make sure that we get ErrClosedChannelNotFound + // for a ChannelID we didn't add to the DB. + state.FundingOutpoint.Index++ + cid := lnwire.NewChanIDFromOutPoint(&state.FundingOutpoint) + _, err = cdb.FetchClosedChannelForID(cid) + if err != ErrClosedChannelNotFound { + t.Fatalf("expected ErrClosedChannelNotFound, instead got: %v", err) + } +} diff --git a/channeldb/legacy_serialization.go b/channeldb/legacy_serialization.go new file mode 100644 index 00000000..2abb3f04 --- /dev/null +++ b/channeldb/legacy_serialization.go @@ -0,0 +1,53 @@ +package channeldb + +import "io" + +// deserializeCloseChannelSummaryV6 reads the v6 database format for +// ChannelCloseSummary. +// +// NOTE: deprecated, only for migration. +func deserializeCloseChannelSummaryV6(r io.Reader) (*ChannelCloseSummary, error) { + c := &ChannelCloseSummary{} + + err := ReadElements(r, + &c.ChanPoint, &c.ShortChanID, &c.ChainHash, &c.ClosingTXID, + &c.CloseHeight, &c.RemotePub, &c.Capacity, &c.SettledBalance, + &c.TimeLockedBalance, &c.CloseType, &c.IsPending, + ) + if err != nil { + return nil, err + } + + // We'll now check to see if the channel close summary was encoded with + // any of the additional optional fields. + err = ReadElements(r, &c.RemoteCurrentRevocation) + switch { + case err == io.EOF: + return c, nil + + // If we got a non-eof error, then we know there's an actually issue. + // Otherwise, it may have been the case that this summary didn't have + // the set of optional fields. + case err != nil: + return nil, err + } + + if err := readChanConfig(r, &c.LocalChanConfig); err != nil { + return nil, err + } + + // Finally, we'll attempt to read the next unrevoked commitment point + // for the remote party. If we closed the channel before receiving a + // funding locked message, then this can be nil. As a result, we'll use + // the same technique to read the field, only if there's still data + // left in the buffer. + err = ReadElements(r, &c.RemoteNextRevocation) + if err != nil && err != io.EOF { + // If we got a non-eof error, then we know there's an actually + // issue. Otherwise, it may have been the case that this + // summary didn't have the set of optional fields. + return nil, err + } + + return c, nil +} diff --git a/channeldb/migrations.go b/channeldb/migrations.go index 518271a8..cddf6278 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -573,3 +573,40 @@ func migratePruneEdgeUpdateIndex(tx *bolt.Tx) error { return nil } + +// migrateOptionalChannelCloseSummaryFields migrates the serialized format of +// ChannelCloseSummary to a format where optional fields' presence is indicated +// with boolean markers. +func migrateOptionalChannelCloseSummaryFields(tx *bolt.Tx) error { + closedChanBucket := tx.Bucket(closedChannelBucket) + if closedChanBucket == nil { + return nil + } + + log.Info("Migrating to new closed channel format...") + err := closedChanBucket.ForEach(func(chanID, summary []byte) error { + r := bytes.NewReader(summary) + + // Read the old (v6) format from the database. + c, err := deserializeCloseChannelSummaryV6(r) + if err != nil { + return err + } + + // Serialize using the new format, and put back into the + // bucket. + var b bytes.Buffer + if err := serializeChannelCloseSummary(&b, c); err != nil { + return err + } + + return closedChanBucket.Put(chanID, b.Bytes()) + }) + if err != nil { + return fmt.Errorf("unable to update closed channels: %v", err) + } + + log.Info("Migration to new closed channel format complete!") + + return nil +} diff --git a/channeldb/migrations_test.go b/channeldb/migrations_test.go index 6fbefd0c..02c61b22 100644 --- a/channeldb/migrations_test.go +++ b/channeldb/migrations_test.go @@ -1,11 +1,17 @@ package channeldb import ( + "bytes" "crypto/sha256" "encoding/binary" + "fmt" + "reflect" "testing" + "github.com/btcsuite/btcutil" "github.com/coreos/bbolt" + "github.com/davecgh/go-spew/spew" + "github.com/go-errors/errors" ) // TestPaymentStatusesMigration checks that already completed payments will have @@ -189,3 +195,276 @@ func TestPaymentStatusesMigration(t *testing.T) { paymentStatusesMigration, false) } + +// TestMigrateOptionalChannelCloseSummaryFields properly converts a +// ChannelCloseSummary to the v7 format, where optional fields have their +// presence indicated with boolean markers. +func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) { + t.Parallel() + + chanState, err := createTestChannelState(nil) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + + var chanPointBuf bytes.Buffer + err = writeOutpoint(&chanPointBuf, &chanState.FundingOutpoint) + if err != nil { + t.Fatalf("unable to write outpoint: %v", err) + } + + chanID := chanPointBuf.Bytes() + + testCases := []struct { + closeSummary *ChannelCloseSummary + oldSerialization func(c *ChannelCloseSummary) []byte + }{ + { + // A close summary where none of the new fields are + // set. + closeSummary: &ChannelCloseSummary{ + ChanPoint: chanState.FundingOutpoint, + ShortChanID: chanState.ShortChanID(), + ChainHash: chanState.ChainHash, + ClosingTXID: testTx.TxHash(), + CloseHeight: 100, + RemotePub: chanState.IdentityPub, + Capacity: chanState.Capacity, + SettledBalance: btcutil.Amount(50000), + CloseType: RemoteForceClose, + IsPending: true, + + // The last fields will be unset. + RemoteCurrentRevocation: nil, + LocalChanConfig: ChannelConfig{}, + RemoteNextRevocation: nil, + }, + + // In the old format the last field written is the + // IsPendingField. It should be converted by adding an + // extra boolean marker at the end to indicate that the + // remaining fields are not there. + oldSerialization: func(cs *ChannelCloseSummary) []byte { + var buf bytes.Buffer + err := WriteElements(&buf, cs.ChanPoint, + cs.ShortChanID, cs.ChainHash, + cs.ClosingTXID, cs.CloseHeight, + cs.RemotePub, cs.Capacity, + cs.SettledBalance, cs.TimeLockedBalance, + cs.CloseType, cs.IsPending, + ) + if err != nil { + t.Fatal(err) + } + + // For the old format, these are all the fields + // that are written. + return buf.Bytes() + }, + }, + { + // A close summary where the new fields are present, + // but the optional RemoteNextRevocation field is not + // set. + closeSummary: &ChannelCloseSummary{ + ChanPoint: chanState.FundingOutpoint, + ShortChanID: chanState.ShortChanID(), + ChainHash: chanState.ChainHash, + ClosingTXID: testTx.TxHash(), + CloseHeight: 100, + RemotePub: chanState.IdentityPub, + Capacity: chanState.Capacity, + SettledBalance: btcutil.Amount(50000), + CloseType: RemoteForceClose, + IsPending: true, + RemoteCurrentRevocation: chanState.RemoteCurrentRevocation, + LocalChanConfig: chanState.LocalChanCfg, + + // RemoteNextRevocation is optional, and here + // it is not set. + RemoteNextRevocation: nil, + }, + + // In the old format the last field written is the + // LocalChanConfig. This indicates that the optional + // RemoteNextRevocation field is not present. It should + // be converted by adding boolean markers for all these + // fields. + oldSerialization: func(cs *ChannelCloseSummary) []byte { + var buf bytes.Buffer + err := WriteElements(&buf, cs.ChanPoint, + cs.ShortChanID, cs.ChainHash, + cs.ClosingTXID, cs.CloseHeight, + cs.RemotePub, cs.Capacity, + cs.SettledBalance, cs.TimeLockedBalance, + cs.CloseType, cs.IsPending, + ) + if err != nil { + t.Fatal(err) + } + + err = WriteElements(&buf, cs.RemoteCurrentRevocation) + if err != nil { + t.Fatal(err) + } + + err = writeChanConfig(&buf, &cs.LocalChanConfig) + if err != nil { + t.Fatal(err) + } + + // RemoteNextRevocation is not written. + return buf.Bytes() + }, + }, + { + // A close summary where all fields are present. + closeSummary: &ChannelCloseSummary{ + ChanPoint: chanState.FundingOutpoint, + ShortChanID: chanState.ShortChanID(), + ChainHash: chanState.ChainHash, + ClosingTXID: testTx.TxHash(), + CloseHeight: 100, + RemotePub: chanState.IdentityPub, + Capacity: chanState.Capacity, + SettledBalance: btcutil.Amount(50000), + CloseType: RemoteForceClose, + IsPending: true, + RemoteCurrentRevocation: chanState.RemoteCurrentRevocation, + LocalChanConfig: chanState.LocalChanCfg, + + // RemoteNextRevocation is optional, and in + // this case we set it. + RemoteNextRevocation: chanState.RemoteNextRevocation, + }, + + // In the old format all the fields are written. It + // should be converted by adding boolean markers for + // all these fields. + oldSerialization: func(cs *ChannelCloseSummary) []byte { + var buf bytes.Buffer + err := WriteElements(&buf, cs.ChanPoint, + cs.ShortChanID, cs.ChainHash, + cs.ClosingTXID, cs.CloseHeight, + cs.RemotePub, cs.Capacity, + cs.SettledBalance, cs.TimeLockedBalance, + cs.CloseType, cs.IsPending, + ) + if err != nil { + t.Fatal(err) + } + + err = WriteElements(&buf, cs.RemoteCurrentRevocation) + if err != nil { + t.Fatal(err) + } + + err = writeChanConfig(&buf, &cs.LocalChanConfig) + if err != nil { + t.Fatal(err) + } + + err = WriteElements(&buf, cs.RemoteNextRevocation) + if err != nil { + t.Fatal(err) + } + + return buf.Bytes() + }, + }, + } + + for _, test := range testCases { + + // Before the migration we must add the old format to the DB. + beforeMigrationFunc := func(d *DB) { + + // Get the old serialization format for this test's + // close summary, and it to the closed channel bucket. + old := test.oldSerialization(test.closeSummary) + err = d.Update(func(tx *bolt.Tx) error { + closedChanBucket, err := tx.CreateBucketIfNotExists( + closedChannelBucket, + ) + if err != nil { + return err + } + return closedChanBucket.Put(chanID, old) + }) + if err != nil { + t.Fatalf("unable to add old serialization: %v", + err) + } + } + + // After the migration it should be found in the new format. + afterMigrationFunc := func(d *DB) { + meta, err := d.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.DbVersionNumber != 1 { + t.Fatal("migration wasn't applied") + } + + // We generate the new serialized version, to check + // against what is found in the DB. + var b bytes.Buffer + err = serializeChannelCloseSummary(&b, test.closeSummary) + if err != nil { + t.Fatalf("unable to serialize: %v", err) + } + newSerialization := b.Bytes() + + var dbSummary []byte + err = d.View(func(tx *bolt.Tx) error { + closedChanBucket := tx.Bucket(closedChannelBucket) + if closedChanBucket == nil { + return errors.New("unable to find bucket") + } + + // Get the serialized verision from the DB and + // make sure it matches what we expected. + dbSummary = closedChanBucket.Get(chanID) + if !bytes.Equal(dbSummary, newSerialization) { + return fmt.Errorf("unexpected new " + + "serialization") + } + return nil + }) + if err != nil { + t.Fatalf("unable to view DB: %v", err) + } + + // Finally we fetch the deserialized summary from the + // DB and check that it is equal to our original one. + dbChannels, err := d.FetchClosedChannels(false) + if err != nil { + t.Fatalf("unable to fetch closed channels: %v", + err) + } + + if len(dbChannels) != 1 { + t.Fatalf("expected 1 closed channels, found %v", + len(dbChannels)) + } + + dbChan := dbChannels[0] + if !reflect.DeepEqual(dbChan, test.closeSummary) { + dbChan.RemotePub.Curve = nil + test.closeSummary.RemotePub.Curve = nil + t.Fatalf("not equal: %v vs %v", + spew.Sdump(dbChan), + spew.Sdump(test.closeSummary)) + } + + } + + applyMigration(t, + beforeMigrationFunc, + afterMigrationFunc, + migrateOptionalChannelCloseSummaryFields, + false) + } +} diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index c183788f..1bc1ee18 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg" @@ -16,6 +17,16 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" ) +const ( + // minCommitPointPollTimeout is the minimum time we'll wait before + // polling the database for a channel's commitpoint. + minCommitPointPollTimeout = 1 * time.Second + + // maxCommitPointPollTimeout is the maximum time we'll wait before + // polling the database for a channel's commitpoint. + maxCommitPointPollTimeout = 10 * time.Minute +) + // LocalUnilateralCloseInfo encapsulates all the informnation we need to act // on a local force close that gets confirmed. type LocalUnilateralCloseInfo struct { @@ -402,16 +413,38 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { // If we are lucky, the remote peer sent us the correct // commitment point during channel sync, such that we - // can sweep our funds. - // TODO(halseth): must handle the case where we haven't - // yet processed the chan sync message. - commitPoint, err := c.cfg.chanState.DataLossCommitPoint() - if err != nil { + // can sweep our funds. If we cannot find the commit + // point, there's not much we can do other than wait + // for us to retrieve it. We will attempt to retrieve + // it from the peer each time we connect to it. + // TODO(halseth): actively initiate re-connection to + // the peer? + var commitPoint *btcec.PublicKey + backoff := minCommitPointPollTimeout + for { + commitPoint, err = c.cfg.chanState.DataLossCommitPoint() + if err == nil { + break + } + log.Errorf("Unable to retrieve commitment "+ "point for channel(%v) with lost "+ - "state: %v", - c.cfg.chanState.FundingOutpoint, err) - return + "state: %v. Retrying in %v.", + c.cfg.chanState.FundingOutpoint, + err, backoff) + + select { + // Wait before retrying, with an exponential + // backoff. + case <-time.After(backoff): + backoff = 2 * backoff + if backoff > maxCommitPointPollTimeout { + backoff = maxCommitPointPollTimeout + } + + case <-c.quit: + return + } } log.Infof("Recovered commit point(%x) for "+ @@ -522,6 +555,15 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet LocalChanConfig: c.cfg.chanState.LocalChanCfg, } + // Attempt to add a channel sync message to the close summary. + chanSync, err := lnwallet.ChanSyncMsg(c.cfg.chanState) + if err != nil { + log.Errorf("ChannelPoint(%v): unable to create channel sync "+ + "message: %v", c.cfg.chanState.FundingOutpoint, err) + } else { + closeSummary.LastChanSyncMsg = chanSync + } + // Create a summary of all the information needed to handle the // cooperative closure. closeInfo := &CooperativeCloseInfo{ @@ -590,6 +632,15 @@ func (c *chainWatcher) dispatchLocalForceClose( closeSummary.TimeLockedBalance += htlcValue } + // Attempt to add a channel sync message to the close summary. + chanSync, err := lnwallet.ChanSyncMsg(c.cfg.chanState) + if err != nil { + log.Errorf("ChannelPoint(%v): unable to create channel sync "+ + "message: %v", c.cfg.chanState.FundingOutpoint, err) + } else { + closeSummary.LastChanSyncMsg = chanSync + } + // With the event processed, we'll now notify all subscribers of the // event. closeInfo := &LocalUnilateralCloseInfo{ @@ -749,6 +800,15 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail LocalChanConfig: c.cfg.chanState.LocalChanCfg, } + // Attempt to add a channel sync message to the close summary. + chanSync, err := lnwallet.ChanSyncMsg(c.cfg.chanState) + if err != nil { + log.Errorf("ChannelPoint(%v): unable to create channel sync "+ + "message: %v", c.cfg.chanState.FundingOutpoint, err) + } else { + closeSummary.LastChanSyncMsg = chanSync + } + if err := c.cfg.chanState.CloseChannel(&closeSummary); err != nil { return err } diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 71563592..9bda1e65 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -516,7 +516,7 @@ func (l *channelLink) syncChanStates() error { // First, we'll generate our ChanSync message to send to the other // side. Based on this message, the remote party will decide if they // need to retransmit any data or not. - localChanSyncMsg, err := l.channel.ChanSyncMsg() + localChanSyncMsg, err := lnwallet.ChanSyncMsg(l.channel.State()) if err != nil { return fmt.Errorf("unable to generate chan sync message for "+ "ChannelPoint(%v)", l.channel.ChannelPoint()) @@ -789,9 +789,6 @@ func (l *channelLink) htlcManager() { // We failed syncing the commit chains, probably // because the remote has lost state. We should force // close the channel. - // TODO(halseth): store sent chanSync message to - // database, such that it can be resent to peer in case - // it tries to sync the channel again. case err == lnwallet.ErrCommitSyncRemoteDataLoss: fallthrough diff --git a/lnd_test.go b/lnd_test.go index b0af5066..71938818 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -20,7 +20,6 @@ import ( "crypto/rand" "crypto/sha256" - prand "math/rand" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -547,6 +546,58 @@ func makeFakePayHash(t *harnessTest) []byte { return randBuf } +// createPayReqs is a helper method that will create a slice of payment +// requests for the given node. +func createPayReqs(ctx context.Context, node *lntest.HarnessNode, + paymentAmt btcutil.Amount, numInvoices int) ([]string, [][]byte, + []*lnrpc.Invoice, error) { + + payReqs := make([]string, numInvoices) + rHashes := make([][]byte, numInvoices) + invoices := make([]*lnrpc.Invoice, numInvoices) + for i := 0; i < numInvoices; i++ { + preimage := make([]byte, 32) + _, err := rand.Read(preimage) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to generate "+ + "preimage: %v", err) + } + invoice := &lnrpc.Invoice{ + Memo: "testing", + RPreimage: preimage, + Value: int64(paymentAmt), + } + resp, err := node.AddInvoice(ctx, invoice) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to add "+ + "invoice: %v", err) + } + + payReqs[i] = resp.PaymentRequest + rHashes[i] = resp.RHash + invoices[i] = invoice + } + return payReqs, rHashes, invoices, nil +} + +// getChanInfo is a helper method for getting channel info for a node's sole +// channel. +func getChanInfo(ctx context.Context, node *lntest.HarnessNode) ( + *lnrpc.Channel, error) { + + req := &lnrpc.ListChannelsRequest{} + channelInfo, err := node.ListChannels(ctx, req) + if err != nil { + return nil, err + } + if len(channelInfo.Channels) != 1 { + return nil, fmt.Errorf("node should only have a single "+ + "channel, instead he has %v", len(channelInfo.Channels)) + } + + return channelInfo.Channels[0], nil +} + const ( AddrTypeWitnessPubkeyHash = lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH AddrTypeNestedPubkeyHash = lnrpc.NewAddressRequest_NESTED_PUBKEY_HASH @@ -2218,23 +2269,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("htlc mismatch: %v", predErr) } - // As we'll be querying the state of Alice's channels frequently we'll - // create a closure helper function for the purpose. - getAliceChanInfo := func() (*lnrpc.Channel, error) { - req := &lnrpc.ListChannelsRequest{} - aliceChannelInfo, err := net.Alice.ListChannels(ctxb, req) - if err != nil { - return nil, err - } - if len(aliceChannelInfo.Channels) != 1 { - t.Fatalf("alice should only have a single channel, "+ - "instead he has %v", - len(aliceChannelInfo.Channels)) - } - - return aliceChannelInfo.Channels[0], nil - } - // Fetch starting height of this test so we can compute the block // heights we expect certain events to take place. _, curHeight, err := net.Miner.Node.GetBestBlock() @@ -2251,7 +2285,8 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { htlcCsvMaturityHeight = startHeight + defaultCLTV + 1 + defaultCSV ) - aliceChan, err := getAliceChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + aliceChan, err := getChanInfo(ctxt, net.Alice) if err != nil { t.Fatalf("unable to get alice's channel info: %v", err) } @@ -3600,18 +3635,12 @@ func testMultiHopPayments(net *lntest.NetworkHarness, t *harnessTest) { // satoshis with a different preimage each time. const numPayments = 5 const paymentAmt = 1000 - payReqs := make([]string, numPayments) - for i := 0; i < numPayments; i++ { - invoice := &lnrpc.Invoice{ - Memo: "testing", - Value: paymentAmt, - } - resp, err := net.Bob.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - payReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + payReqs, _, _, err := createPayReqs( + ctxt, net.Bob, paymentAmt, numPayments, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // We'll wait for all parties to recognize the new channels within the @@ -3829,17 +3858,12 @@ func testSingleHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) { // Create 5 invoices for Bob, which expect a payment from Alice for 1k // satoshis with a different preimage each time. const numPayments = 5 - rHashes := make([][]byte, numPayments) - for i := 0; i < numPayments; i++ { - invoice := &lnrpc.Invoice{ - Value: paymentAmt, - } - resp, err := net.Bob.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - rHashes[i] = resp.RHash + ctxt, _ = context.WithTimeout(ctxb, timeout) + _, rHashes, _, err := createPayReqs( + ctxt, net.Bob, paymentAmt, numPayments, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // We'll wait for all parties to recognize the new channels within the @@ -4017,17 +4041,12 @@ func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) { // Create 5 invoices for Carol, which expect a payment from Alice for 1k // satoshis with a different preimage each time. const numPayments = 5 - rHashes := make([][]byte, numPayments) - for i := 0; i < numPayments; i++ { - invoice := &lnrpc.Invoice{ - Value: paymentAmt, - } - resp, err := carol.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - rHashes[i] = resp.RHash + ctxt, _ = context.WithTimeout(ctxb, timeout) + _, rHashes, _, err := createPayReqs( + ctxt, carol, paymentAmt, numPayments, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // We'll wait for all parties to recognize the new channels within the @@ -4533,25 +4552,12 @@ func testPrivateChannels(net *lntest.NetworkHarness, t *harnessTest) { // by only using one of the channels. const numPayments = 2 const paymentAmt = 70000 - payReqs := make([]string, numPayments) - for i := 0; i < numPayments; i++ { - preimage := make([]byte, 32) - _, err := rand.Read(preimage) - if err != nil { - t.Fatalf("unable to generate preimage: %v", err) - } - - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := net.Bob.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - payReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + payReqs, _, _, err := createPayReqs( + ctxt, net.Bob, paymentAmt, numPayments, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } time.Sleep(time.Millisecond * 50) @@ -4603,25 +4609,12 @@ func testPrivateChannels(net *lntest.NetworkHarness, t *harnessTest) { // Alice should also be able to route payments using this channel, // so send two payments of 60k back to Carol. const paymentAmt60k = 60000 - payReqs = make([]string, numPayments) - for i := 0; i < numPayments; i++ { - preimage := make([]byte, 32) - _, err := rand.Read(preimage) - if err != nil { - t.Fatalf("unable to generate preimage: %v", err) - } - - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt60k, - } - resp, err := carol.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - payReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + payReqs, _, _, err = createPayReqs( + ctxt, carol, paymentAmt60k, numPayments, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } time.Sleep(time.Millisecond * 50) @@ -5268,22 +5261,12 @@ func testInvoiceSubscriptions(net *lntest.NetworkHarness, t *harnessTest) { // We'll now add 3 more invoices to Bob's invoice registry. const numInvoices = 3 - newInvoices := make([]*lnrpc.Invoice, numInvoices) - payReqs := make([]string, numInvoices) - for i := 0; i < numInvoices; i++ { - preimage := bytes.Repeat([]byte{byte(90 + 1 + i)}, 32) - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := net.Bob.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - newInvoices[i] = invoice - payReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + payReqs, _, newInvoices, err := createPayReqs( + ctxt, net.Bob, paymentAmt, numInvoices, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // Now that the set of invoices has been added, we'll re-register for @@ -6105,36 +6088,12 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) { // With the channel open, we'll create a few invoices for Bob that // Carol will pay to in order to advance the state of the channel. - bobPayReqs := make([]string, numInvoices) - for i := 0; i < numInvoices; i++ { - preimage := bytes.Repeat([]byte{byte(255 - i)}, 32) - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := net.Bob.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - bobPayReqs[i] = resp.PaymentRequest - } - - // As we'll be querying the state of bob's channels frequently we'll - // create a closure helper function for the purpose. - getBobChanInfo := func() (*lnrpc.Channel, error) { - req := &lnrpc.ListChannelsRequest{} - bobChannelInfo, err := net.Bob.ListChannels(ctxb, req) - if err != nil { - return nil, err - } - if len(bobChannelInfo.Channels) != 1 { - t.Fatalf("bob should only have a single channel, instead he has %v", - len(bobChannelInfo.Channels)) - } - - return bobChannelInfo.Channels[0], nil + ctxt, _ = context.WithTimeout(ctxb, timeout) + bobPayReqs, _, _, err := createPayReqs( + ctxt, net.Bob, paymentAmt, numInvoices, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // Wait for Carol to receive the channel edge from the funding manager. @@ -6159,7 +6118,8 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) { var bobChan *lnrpc.Channel var predErr error err = lntest.WaitPredicate(func() bool { - bChan, err := getBobChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + bChan, err := getChanInfo(ctxt, net.Bob) if err != nil { t.Fatalf("unable to get bob's channel info: %v", err) } @@ -6207,7 +6167,8 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to send payments: %v", err) } - bobChan, err = getBobChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + bobChan, err = getChanInfo(ctxt, net.Bob) if err != nil { t.Fatalf("unable to get bob chan info: %v", err) } @@ -6224,7 +6185,8 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) { // Now query for Bob's channel state, it should show that he's at a // state number in the past, not the *latest* state. - bobChan, err = getBobChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + bobChan, err = getChanInfo(ctxt, net.Bob) if err != nil { t.Fatalf("unable to get bob chan info: %v", err) } @@ -6387,36 +6349,12 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness // With the channel open, we'll create a few invoices for Carol that // Dave will pay to in order to advance the state of the channel. - carolPayReqs := make([]string, numInvoices) - for i := 0; i < numInvoices; i++ { - preimage := bytes.Repeat([]byte{byte(192 - i)}, 32) - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := carol.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - carolPayReqs[i] = resp.PaymentRequest - } - - // As we'll be querying the state of Carols's channels frequently we'll - // create a closure helper function for the purpose. - getCarolChanInfo := func() (*lnrpc.Channel, error) { - req := &lnrpc.ListChannelsRequest{} - carolChannelInfo, err := carol.ListChannels(ctxb, req) - if err != nil { - return nil, err - } - if len(carolChannelInfo.Channels) != 1 { - t.Fatalf("carol should only have a single channel, "+ - "instead he has %v", len(carolChannelInfo.Channels)) - } - - return carolChannelInfo.Channels[0], nil + ctxt, _ = context.WithTimeout(ctxb, timeout) + carolPayReqs, _, _, err := createPayReqs( + ctxt, carol, paymentAmt, numInvoices, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // Wait for Dave to receive the channel edge from the funding manager. @@ -6429,7 +6367,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness // Next query for Carol's channel state, as we sent 0 payments, Carol // should now see her balance as being 0 satoshis. - carolChan, err := getCarolChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + carolChan, err := getChanInfo(ctxt, carol) if err != nil { t.Fatalf("unable to get carol's channel info: %v", err) } @@ -6466,7 +6405,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness t.Fatalf("unable to send payments: %v", err) } - carolChan, err = getCarolChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + carolChan, err = getChanInfo(ctxt, carol) if err != nil { t.Fatalf("unable to get carol chan info: %v", err) } @@ -6483,7 +6423,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness // Now query for Carol's channel state, it should show that he's at a // state number in the past, not the *latest* state. - carolChan, err = getCarolChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + carolChan, err = getChanInfo(ctxt, carol) if err != nil { t.Fatalf("unable to get carol chan info: %v", err) } @@ -6656,42 +6597,19 @@ func testRevokedCloseRetributionRemoteHodl(net *lntest.NetworkHarness, // With the channel open, we'll create a few invoices for Carol that // Dave will pay to in order to advance the state of the channel. - carolPayReqs := make([]string, numInvoices) - for i := 0; i < numInvoices; i++ { - preimage := bytes.Repeat([]byte{byte(192 - i)}, 32) - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := carol.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - carolPayReqs[i] = resp.PaymentRequest - } - - // As we'll be querying the state of Carol's channels frequently we'll - // create a closure helper function for the purpose. - getCarolChanInfo := func() (*lnrpc.Channel, error) { - req := &lnrpc.ListChannelsRequest{} - carolChannelInfo, err := carol.ListChannels(ctxb, req) - if err != nil { - return nil, err - } - if len(carolChannelInfo.Channels) != 1 { - t.Fatalf("carol should only have a single channel, instead he has %v", - len(carolChannelInfo.Channels)) - } - - return carolChannelInfo.Channels[0], nil + ctxt, _ = context.WithTimeout(ctxb, timeout) + carolPayReqs, _, _, err := createPayReqs( + ctxt, carol, paymentAmt, numInvoices, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // We'll introduce a closure to validate that Carol's current balance // matches the given expected amount. checkCarolBalance := func(expectedAmt int64) { - carolChan, err := getCarolChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + carolChan, err := getChanInfo(ctxt, carol) if err != nil { t.Fatalf("unable to get carol's channel info: %v", err) } @@ -6706,7 +6624,8 @@ func testRevokedCloseRetributionRemoteHodl(net *lntest.NetworkHarness, // number of updates is at least as large as the provided minimum // number. checkCarolNumUpdatesAtLeast := func(minimum uint64) { - carolChan, err := getCarolChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + carolChan, err := getChanInfo(ctxt, carol) if err != nil { t.Fatalf("unable to get carol's channel info: %v", err) } @@ -6740,20 +6659,12 @@ func testRevokedCloseRetributionRemoteHodl(net *lntest.NetworkHarness, // At this point, we'll also send over a set of HTLC's from Carol to // Dave. This ensures that the final revoked transaction has HTLC's in // both directions. - davePayReqs := make([]string, numInvoices) - for i := 0; i < numInvoices; i++ { - preimage := bytes.Repeat([]byte{byte(199 - i)}, 32) - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := dave.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - davePayReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + davePayReqs, _, _, err := createPayReqs( + ctxt, dave, paymentAmt, numInvoices, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // Send payments from Carol to Dave using 3 of Dave's payment hashes @@ -6768,7 +6679,8 @@ func testRevokedCloseRetributionRemoteHodl(net *lntest.NetworkHarness, // Next query for Carol's channel state, as we sent 3 payments of 10k // satoshis each, however Carol should now see her balance as being // equal to the push amount in satoshis since she has not settled. - carolChan, err := getCarolChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + carolChan, err := getChanInfo(ctxt, carol) if err != nil { t.Fatalf("unable to get carol's channel info: %v", err) } @@ -6837,7 +6749,8 @@ func testRevokedCloseRetributionRemoteHodl(net *lntest.NetworkHarness, // Now query for Carol's channel state, it should show that she's at a // state number in the past, *not* the latest state. - carolChan, err = getCarolChanInfo() + ctxt, _ = context.WithTimeout(ctxb, timeout) + carolChan, err = getChanInfo(ctxt, carol) if err != nil { t.Fatalf("unable to get carol chan info: %v", err) } @@ -7092,12 +7005,6 @@ func testDataLossProtection(net *lntest.NetworkHarness, t *harnessTest) { } defer shutdownAndAssert(net, t, dave) - // We must let Dave communicate with Carol before they are able to open - // channel, so we connect them. - if err := net.ConnectNodes(ctxb, carol, dave); err != nil { - t.Fatalf("unable to connect dave to carol: %v", err) - } - // Before we make a channel, we'll load up Carol with some coins sent // directly from the miner. err = net.SendCoins(ctxb, btcutil.SatoshiPerBitcoin, carol) @@ -7105,14 +7012,164 @@ func testDataLossProtection(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to send coins to carol: %v", err) } - // We'll first open up a channel between them with a 0.5 BTC value. - ctxt, _ := context.WithTimeout(ctxb, timeout) - chanPoint := openChannelAndAssert( - ctxt, t, net, carol, dave, - lntest.OpenChannelParams{ - Amt: chanAmt, - }, - ) + // timeTravel is a method that will make Carol open a channel to the + // passed node, settle a series of payments, then reset the node back + // to the state before the payments happened. When this method returns + // the node will be unaware of the new state updates. The returned + // function can be used to restart the node in this state. + timeTravel := func(node *lntest.HarnessNode) (func() error, + *lnrpc.ChannelPoint, int64, error) { + + // We must let the node communicate with Carol before they are + // able to open channel, so we connect them. + if err := net.EnsureConnected(ctxb, carol, node); err != nil { + t.Fatalf("unable to connect %v to carol: %v", + node.Name(), err) + } + + // We'll first open up a channel between them with a 0.5 BTC + // value. + ctxt, _ := context.WithTimeout(ctxb, timeout) + chanPoint := openChannelAndAssert( + ctxt, t, net, carol, node, + lntest.OpenChannelParams{ + Amt: chanAmt, + }, + ) + + // With the channel open, we'll create a few invoices for the + // node that Carol will pay to in order to advance the state of + // the channel. + // TODO(halseth): have dangling HTLCs on the commitment, able to + // retrive funds? + ctxt, _ = context.WithTimeout(ctxb, timeout) + payReqs, _, _, err := createPayReqs( + ctxt, node, paymentAmt, numInvoices, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) + } + + // Wait for Carol to receive the channel edge from the funding + // manager. + ctxt, _ = context.WithTimeout(ctxb, timeout) + err = carol.WaitForNetworkChannelOpen(ctxt, chanPoint) + if err != nil { + t.Fatalf("carol didn't see the carol->%s channel "+ + "before timeout: %v", node.Name(), err) + } + + // Send payments from Carol using 3 of the payment hashes + // generated above. + ctxt, _ = context.WithTimeout(ctxb, timeout) + err = completePaymentRequests(ctxt, carol, + payReqs[:numInvoices/2], true) + if err != nil { + t.Fatalf("unable to send payments: %v", err) + } + + // Next query for the node's channel state, as we sent 3 + // payments of 10k satoshis each, it should now see his balance + // as being 30k satoshis. + var nodeChan *lnrpc.Channel + var predErr error + err = lntest.WaitPredicate(func() bool { + ctxt, _ = context.WithTimeout(ctxb, timeout) + bChan, err := getChanInfo(ctxt, node) + if err != nil { + t.Fatalf("unable to get channel info: %v", err) + } + if bChan.LocalBalance != 30000 { + predErr = fmt.Errorf("balance is incorrect, "+ + "got %v, expected %v", + bChan.LocalBalance, 30000) + return false + } + + nodeChan = bChan + return true + }, time.Second*15) + if err != nil { + t.Fatalf("%v", predErr) + } + + // Grab the current commitment height (update number), we'll + // later revert him to this state after additional updates to + // revoke this state. + stateNumPreCopy := nodeChan.NumUpdates + + // Create a temporary file to house the database state at this + // particular point in history. + tempDbPath, err := ioutil.TempDir("", node.Name()+"-past-state") + if err != nil { + t.Fatalf("unable to create temp db folder: %v", err) + } + tempDbFile := filepath.Join(tempDbPath, "channel.db") + defer os.Remove(tempDbPath) + + // With the temporary file created, copy the current state into + // the temporary file we created above. Later after more + // updates, we'll restore this state. + if err := copyFile(tempDbFile, node.DBPath()); err != nil { + t.Fatalf("unable to copy database files: %v", err) + } + + // Finally, send more payments from , using the remaining + // payment hashes. + ctxt, _ = context.WithTimeout(ctxb, timeout) + err = completePaymentRequests(ctxt, carol, + payReqs[numInvoices/2:], true) + if err != nil { + t.Fatalf("unable to send payments: %v", err) + } + + ctxt, _ = context.WithTimeout(ctxb, timeout) + nodeChan, err = getChanInfo(ctxt, node) + if err != nil { + t.Fatalf("unable to get dave chan info: %v", err) + } + + // Now we shutdown the node, copying over the its temporary + // database state which has the *prior* channel state over his + // current most up to date state. With this, we essentially + // force the node to travel back in time within the channel's + // history. + if err = net.RestartNode(node, func() error { + return os.Rename(tempDbFile, node.DBPath()) + }); err != nil { + t.Fatalf("unable to restart node: %v", err) + } + + // Now query for the channel state, it should show that it's at + // a state number in the past, not the *latest* state. + ctxt, _ = context.WithTimeout(ctxb, timeout) + nodeChan, err = getChanInfo(ctxt, node) + if err != nil { + t.Fatalf("unable to get dave chan info: %v", err) + } + if nodeChan.NumUpdates != stateNumPreCopy { + t.Fatalf("db copy failed: %v", nodeChan.NumUpdates) + } + assertNodeNumChannels(t, ctxb, node, 1) + + balReq := &lnrpc.WalletBalanceRequest{} + balResp, err := node.WalletBalance(ctxb, balReq) + if err != nil { + t.Fatalf("unable to get dave's balance: %v", err) + } + + restart, err := net.SuspendNode(node) + if err != nil { + t.Fatalf("unable to suspend node: %v", err) + } + return restart, chanPoint, balResp.ConfirmedBalance, nil + } + + // Reset Dave to a state where he has an outdated channel state. + restartDave, _, daveStartingBalance, err := timeTravel(dave) + if err != nil { + t.Fatalf("unable to time travel dave: %v", err) + } // We a´make a note of the nodes' current on-chain balances, to make // sure they are able to retrieve the channel funds eventually, @@ -7123,152 +7180,14 @@ func testDataLossProtection(net *lntest.NetworkHarness, t *harnessTest) { } carolStartingBalance := carolBalResp.ConfirmedBalance - daveBalResp, err := dave.WalletBalance(ctxb, balReq) - if err != nil { - t.Fatalf("unable to get dave's balance: %v", err) + // Restart Dave to trigger a channel resync. + if err := restartDave(); err != nil { + t.Fatalf("unable to restart dave: %v", err) } - daveStartingBalance := daveBalResp.ConfirmedBalance - - // With the channel open, we'll create a few invoices for Dave that - // Carol will pay to in order to advance the state of the channel. - // TODO(halseth): have dangling HTLCs on the commitment, able to - // retrive funds? - davePayReqs := make([]string, numInvoices) - for i := 0; i < numInvoices; i++ { - preimage := bytes.Repeat([]byte{byte(17 - i)}, 32) - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := dave.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - davePayReqs[i] = resp.PaymentRequest - } - - // As we'll be querying the state of Dave's channels frequently we'll - // create a closure helper function for the purpose. - getDaveChanInfo := func() (*lnrpc.Channel, error) { - req := &lnrpc.ListChannelsRequest{} - daveChannelInfo, err := dave.ListChannels(ctxb, req) - if err != nil { - return nil, err - } - if len(daveChannelInfo.Channels) != 1 { - t.Fatalf("dave should only have a single channel, "+ - "instead he has %v", - len(daveChannelInfo.Channels)) - } - - return daveChannelInfo.Channels[0], nil - } - - // Wait for Carol to receive the channel edge from the funding manager. - ctxt, _ = context.WithTimeout(ctxb, timeout) - err = carol.WaitForNetworkChannelOpen(ctxt, chanPoint) - if err != nil { - t.Fatalf("carol didn't see the carol->dave channel before "+ - "timeout: %v", err) - } - - // Send payments from Carol to Dave using 3 of Dave's payment hashes - // generated above. - ctxt, _ = context.WithTimeout(ctxb, timeout) - err = completePaymentRequests(ctxt, carol, davePayReqs[:numInvoices/2], - true) - if err != nil { - t.Fatalf("unable to send payments: %v", err) - } - - // Next query for Dave's channel state, as we sent 3 payments of 10k - // satoshis each, Dave should now see his balance as being 30k satoshis. - var daveChan *lnrpc.Channel - var predErr error - err = lntest.WaitPredicate(func() bool { - bChan, err := getDaveChanInfo() - if err != nil { - t.Fatalf("unable to get dave's channel info: %v", err) - } - if bChan.LocalBalance != 30000 { - predErr = fmt.Errorf("dave's balance is incorrect, "+ - "got %v, expected %v", bChan.LocalBalance, - 30000) - return false - } - - daveChan = bChan - return true - }, time.Second*15) - if err != nil { - t.Fatalf("%v", predErr) - } - - // Grab Dave's current commitment height (update number), we'll later - // revert him to this state after additional updates to revoke this - // state. - daveStateNumPreCopy := daveChan.NumUpdates - - // Create a temporary file to house Dave's database state at this - // particular point in history. - daveTempDbPath, err := ioutil.TempDir("", "dave-past-state") - if err != nil { - t.Fatalf("unable to create temp db folder: %v", err) - } - daveTempDbFile := filepath.Join(daveTempDbPath, "channel.db") - defer os.Remove(daveTempDbPath) - - // With the temporary file created, copy Dave's current state into the - // temporary file we created above. Later after more updates, we'll - // restore this state. - if err := copyFile(daveTempDbFile, dave.DBPath()); err != nil { - t.Fatalf("unable to copy database files: %v", err) - } - - // Finally, send payments from Carol to Dave, consuming Dave's remaining - // payment hashes. - ctxt, _ = context.WithTimeout(ctxb, timeout) - err = completePaymentRequests(ctxt, carol, davePayReqs[numInvoices/2:], - true) - if err != nil { - t.Fatalf("unable to send payments: %v", err) - } - - daveChan, err = getDaveChanInfo() - if err != nil { - t.Fatalf("unable to get dave chan info: %v", err) - } - - // Now we shutdown Dave, copying over the his temporary database state - // which has the *prior* channel state over his current most up to date - // state. With this, we essentially force Dave to travel back in time - // within the channel's history. - if err = net.RestartNode(dave, func() error { - return os.Rename(daveTempDbFile, dave.DBPath()) - }); err != nil { - t.Fatalf("unable to restart node: %v", err) - } - - // Now query for Dave's channel state, it should show that he's at a - // state number in the past, not the *latest* state. - daveChan, err = getDaveChanInfo() - if err != nil { - t.Fatalf("unable to get dave chan info: %v", err) - } - if daveChan.NumUpdates != daveStateNumPreCopy { - t.Fatalf("db copy failed: %v", daveChan.NumUpdates) - } - assertNodeNumChannels(t, ctxb, dave, 1) // Upon reconnection, the nodes should detect that Dave is out of sync. - if err := net.ConnectNodes(ctxb, carol, dave); err != nil { - t.Fatalf("unable to connect dave to carol: %v", err) - } - // Carol should force close the channel using her latest commitment. - forceClose, err := waitForTxInMempool(net.Miner.Node, 5*time.Second) + forceClose, err := waitForTxInMempool(net.Miner.Node, 15*time.Second) if err != nil { t.Fatalf("unable to find Carol's force close tx in mempool: %v", err) @@ -7316,10 +7235,11 @@ func testDataLossProtection(net *lntest.NetworkHarness, t *harnessTest) { // We query Dave's balance to make sure it increased after the channel // closed. This checks that he was able to sweep the funds he had in // the channel. - daveBalResp, err = dave.WalletBalance(ctxb, balReq) + daveBalResp, err := dave.WalletBalance(ctxb, balReq) if err != nil { t.Fatalf("unable to get dave's balance: %v", err) } + daveBalance := daveBalResp.ConfirmedBalance if daveBalance <= daveStartingBalance { t.Fatalf("expected dave to have balance above %d, intead had %v", @@ -7328,7 +7248,7 @@ func testDataLossProtection(net *lntest.NetworkHarness, t *harnessTest) { // After the Carol's output matures, she should also reclaim her funds. mineBlocks(t, net, defaultCSV-1) - carolSweep, err := waitForTxInMempool(net.Miner.Node, 5*time.Second) + carolSweep, err := waitForTxInMempool(net.Miner.Node, 15*time.Second) if err != nil { t.Fatalf("unable to find Carol's sweep tx in mempool: %v", err) } @@ -7352,6 +7272,91 @@ func testDataLossProtection(net *lntest.NetworkHarness, t *harnessTest) { assertNodeNumChannels(t, ctxb, dave, 0) assertNodeNumChannels(t, ctxb, carol, 0) + + // As a second part of this test, we will test the the scenario where a + // channel is closed while Dave is offline, loses his state and comes + // back online. In this case the node should attempt to resync the + // channel, and the peer should resend a channel sync message for the + // closed channel, such that Dave can retrieve his funds. + // + // We start by letting Dave time travel back to an outdated state. + restartDave, chanPoint2, daveStartingBalance, err := timeTravel(dave) + if err != nil { + t.Fatalf("unable to time travel eve: %v", err) + } + + carolBalResp, err = carol.WalletBalance(ctxb, balReq) + if err != nil { + t.Fatalf("unable to get carol's balance: %v", err) + } + carolStartingBalance = carolBalResp.ConfirmedBalance + + // Now let Carol force close the channel while Dave is offline. + ctxt, _ := context.WithTimeout(ctxb, timeout) + closeChannelAndAssert(ctxt, t, net, carol, chanPoint2, true) + + // Wait for the channel to be marked pending force close. + ctxt, _ = context.WithTimeout(ctxb, timeout) + err = waitForChannelPendingForceClose(ctxt, carol, chanPoint2) + if err != nil { + t.Fatalf("channel not pending force close: %v", err) + } + + // Mine enough blocks for Carol to sweep her funds. + mineBlocks(t, net, defaultCSV) + + carolSweep, err = waitForTxInMempool(net.Miner.Node, 15*time.Second) + if err != nil { + t.Fatalf("unable to find Carol's sweep tx in mempool: %v", err) + } + block = mineBlocks(t, net, 1)[0] + assertTxInBlock(t, block, carolSweep) + + // Now the channel should be fully closed also from Carol's POV. + assertNumPendingChannels(t, carol, 0, 0) + + // Make sure Carol got her balance back. + carolBalResp, err = carol.WalletBalance(ctxb, balReq) + if err != nil { + t.Fatalf("unable to get carol's balance: %v", err) + } + carolBalance = carolBalResp.ConfirmedBalance + if carolBalance <= carolStartingBalance { + t.Fatalf("expected carol to have balance above %d, "+ + "instead had %v", carolStartingBalance, + carolBalance) + } + + assertNodeNumChannels(t, ctxb, carol, 0) + + // When Dave comes online, he will reconnect to Carol, try to resync + // the channel, but it will already be closed. Carol should resend the + // information Dave needs to sweep his funds. + if err := restartDave(); err != nil { + t.Fatalf("unabel to restart Eve: %v", err) + } + + // Dave should sweep his funds. + _, err = waitForTxInMempool(net.Miner.Node, 15*time.Second) + if err != nil { + t.Fatalf("unable to find Dave's sweep tx in mempool: %v", err) + } + + // Mine a block to confirm the sweep, and make sure Dave got his + // balance back. + mineBlocks(t, net, 1) + assertNodeNumChannels(t, ctxb, dave, 0) + + daveBalResp, err = dave.WalletBalance(ctxb, balReq) + if err != nil { + t.Fatalf("unable to get dave's balance: %v", err) + } + + daveBalance = daveBalResp.ConfirmedBalance + if daveBalance <= daveStartingBalance { + t.Fatalf("expected dave to have balance above %d, intead had %v", + daveStartingBalance, daveBalance) + } } // assertNodeNumChannels polls the provided node's list channels rpc until it @@ -8151,25 +8156,8 @@ func testNodeSignVerify(net *lntest.NetworkHarness, t *harnessTest) { func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() - // As we'll be querying the channels state frequently we'll - // create a closure helper function for the purpose. - getChanInfo := func(node *lntest.HarnessNode) (*lnrpc.Channel, error) { - req := &lnrpc.ListChannelsRequest{} - channelInfo, err := node.ListChannels(ctxb, req) - if err != nil { - return nil, err - } - if len(channelInfo.Channels) != 1 { - t.Fatalf("node should only have a single channel, "+ - "instead he has %v", - len(channelInfo.Channels)) - } - - return channelInfo.Channels[0], nil - } - const ( - timeout = time.Duration(time.Second * 5) + timeout = time.Duration(time.Second * 15) paymentAmt = 100 ) @@ -8185,7 +8173,8 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { }, ) - info, err := getChanInfo(net.Alice) + ctxt, _ = context.WithTimeout(ctxb, timeout) + info, err := getChanInfo(ctxt, net.Alice) if err != nil { t.Fatalf("unable to get alice channel info: %v", err) } @@ -8202,30 +8191,14 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { // Send one more payment in order to cause insufficient capacity error. numInvoices++ - // Initialize seed random in order to generate invoices. - prand.Seed(time.Now().UnixNano()) - // With the channel open, we'll create invoices for Bob that Alice // will pay to in order to advance the state of the channel. - bobPayReqs := make([]string, numInvoices) - for i := 0; i < numInvoices; i++ { - preimage := make([]byte, 32) - _, err := rand.Read(preimage) - if err != nil { - t.Fatalf("unable to generate preimage: %v", err) - } - - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := net.Bob.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - bobPayReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + bobPayReqs, _, _, err := createPayReqs( + ctxt, net.Bob, paymentAmt, numInvoices, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // Wait for Alice to receive the channel edge from the funding manager. @@ -8286,7 +8259,8 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { // Next query for Bob's and Alice's channel states, in order to confirm // that all payment have been successful transmitted. - aliceChan, err := getChanInfo(net.Alice) + ctxt, _ = context.WithTimeout(ctxb, timeout) + aliceChan, err := getChanInfo(ctxt, net.Alice) if len(aliceChan.PendingHtlcs) != 0 { t.Fatalf("alice's pending htlcs is incorrect, got %v, "+ "expected %v", len(aliceChan.PendingHtlcs), 0) @@ -8306,7 +8280,8 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { // Wait for Bob to receive revocation from Alice. time.Sleep(2 * time.Second) - bobChan, err := getChanInfo(net.Bob) + ctxt, _ = context.WithTimeout(ctxb, timeout) + bobChan, err := getChanInfo(ctxt, net.Bob) if err != nil { t.Fatalf("unable to get bob's channel info: %v", err) } @@ -8338,23 +8313,6 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() - // As we'll be querying the channels state frequently we'll - // create a closure helper function for the purpose. - getChanInfo := func(node *lntest.HarnessNode) (*lnrpc.Channel, error) { - req := &lnrpc.ListChannelsRequest{} - channelInfo, err := node.ListChannels(ctxb, req) - if err != nil { - return nil, err - } - if len(channelInfo.Channels) != 1 { - t.Fatalf("node should only have a single channel, "+ - "instead he has %v", - len(channelInfo.Channels)) - } - - return channelInfo.Channels[0], nil - } - const ( timeout = time.Duration(time.Second * 5) paymentAmt = 1000 @@ -8372,7 +8330,8 @@ func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) }, ) - info, err := getChanInfo(net.Alice) + ctxt, _ = context.WithTimeout(ctxb, timeout) + info, err := getChanInfo(ctxt, net.Alice) if err != nil { t.Fatalf("unable to get alice channel info: %v", err) } @@ -8385,53 +8344,24 @@ func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) aliceAmt := info.LocalBalance bobAmt := info.RemoteBalance - // Initialize seed random in order to generate invoices. - prand.Seed(time.Now().UnixNano()) - // With the channel open, we'll create invoices for Bob that Alice // will pay to in order to advance the state of the channel. - bobPayReqs := make([]string, numInvoices) - for i := 0; i < numInvoices; i++ { - preimage := make([]byte, 32) - _, err := rand.Read(preimage) - if err != nil { - t.Fatalf("unable to generate preimage: %v", err) - } - - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := net.Bob.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - bobPayReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + bobPayReqs, _, _, err := createPayReqs( + ctxt, net.Bob, paymentAmt, numInvoices, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // With the channel open, we'll create invoices for Alice that Bob // will pay to in order to advance the state of the channel. - alicePayReqs := make([]string, numInvoices) - for i := 0; i < numInvoices; i++ { - preimage := make([]byte, 32) - _, err := rand.Read(preimage) - if err != nil { - t.Fatalf("unable to generate preimage: %v", err) - } - - invoice := &lnrpc.Invoice{ - Memo: "testing", - RPreimage: preimage, - Value: paymentAmt, - } - resp, err := net.Alice.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - alicePayReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + alicePayReqs, _, _, err := createPayReqs( + ctxt, net.Alice, paymentAmt, numInvoices, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // Wait for Alice to receive the channel edge from the funding manager. @@ -8531,7 +8461,8 @@ func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) // states, i.e. balance info. time.Sleep(1 * time.Second) - aliceInfo, err := getChanInfo(net.Alice) + ctxt, _ = context.WithTimeout(ctxb, timeout) + aliceInfo, err := getChanInfo(ctxt, net.Alice) if err != nil { t.Fatalf("unable to get bob's channel info: %v", err) } @@ -8550,7 +8481,8 @@ func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) // Next query for Bob's and Alice's channel states, in order to confirm // that all payment have been successful transmitted. - bobInfo, err := getChanInfo(net.Bob) + ctxt, _ = context.WithTimeout(ctxb, timeout) + bobInfo, err := getChanInfo(ctxt, net.Bob) if err != nil { t.Fatalf("unable to get bob's channel info: %v", err) } @@ -10440,18 +10372,12 @@ func testSwitchCircuitPersistence(net *lntest.NetworkHarness, t *harnessTest) { // satoshis with a different preimage each time. const numPayments = 5 const paymentAmt = 1000 - payReqs := make([]string, numPayments) - for i := 0; i < numPayments; i++ { - invoice := &lnrpc.Invoice{ - Memo: "testing", - Value: paymentAmt, - } - resp, err := carol.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - payReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + payReqs, _, _, err := createPayReqs( + ctxt, carol, paymentAmt, numPayments, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // We'll wait for all parties to recognize the new channels within the @@ -10782,18 +10708,12 @@ func testSwitchOfflineDelivery(net *lntest.NetworkHarness, t *harnessTest) { // satoshis with a different preimage each time. const numPayments = 5 const paymentAmt = 1000 - payReqs := make([]string, numPayments) - for i := 0; i < numPayments; i++ { - invoice := &lnrpc.Invoice{ - Memo: "testing", - Value: paymentAmt, - } - resp, err := carol.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - payReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + payReqs, _, _, err := createPayReqs( + ctxt, carol, paymentAmt, numPayments, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // We'll wait for all parties to recognize the new channels within the @@ -11132,18 +11052,12 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness // satoshis with a different preimage each time. const numPayments = 5 const paymentAmt = 1000 - payReqs := make([]string, numPayments) - for i := 0; i < numPayments; i++ { - invoice := &lnrpc.Invoice{ - Memo: "testing", - Value: paymentAmt, - } - resp, err := carol.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - payReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + payReqs, _, _, err := createPayReqs( + ctxt, carol, paymentAmt, numPayments, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // We'll wait for all parties to recognize the new channels within the @@ -11486,18 +11400,12 @@ func testSwitchOfflineDeliveryOutgoingOffline( // satoshis with a different preimage each time. const numPayments = 5 const paymentAmt = 1000 - payReqs := make([]string, numPayments) - for i := 0; i < numPayments; i++ { - invoice := &lnrpc.Invoice{ - Memo: "testing", - Value: paymentAmt, - } - resp, err := carol.AddInvoice(ctxb, invoice) - if err != nil { - t.Fatalf("unable to add invoice: %v", err) - } - - payReqs[i] = resp.PaymentRequest + ctxt, _ = context.WithTimeout(ctxb, timeout) + payReqs, _, _, err := createPayReqs( + ctxt, carol, paymentAmt, numPayments, + ) + if err != nil { + t.Fatalf("unable to create pay reqs: %v", err) } // We'll wait for all parties to recognize the new channels within the diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 7d5bcaba..cdd2baac 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -3445,19 +3445,22 @@ func (lc *LightningChannel) ProcessChanSyncMsg( // it. // 3. We didn't get the last RevokeAndAck message they sent, so they'll // re-send it. -func (lc *LightningChannel) ChanSyncMsg() (*lnwire.ChannelReestablish, error) { +func ChanSyncMsg(c *channeldb.OpenChannel) (*lnwire.ChannelReestablish, error) { + c.Lock() + defer c.Unlock() + // The remote commitment height that we'll send in the // ChannelReestablish message is our current commitment height plus // one. If the receiver thinks that our commitment height is actually // *equal* to this value, then they'll re-send the last commitment that // they sent but we never fully processed. - localHeight := lc.localCommitChain.tip().height + localHeight := c.LocalCommitment.CommitHeight nextLocalCommitHeight := localHeight + 1 // The second value we'll send is the height of the remote commitment // from our PoV. If the receiver thinks that their height is actually // *one plus* this value, then they'll re-send their last revocation. - remoteChainTipHeight := lc.remoteCommitChain.tail().height + remoteChainTipHeight := c.RemoteCommitment.CommitHeight // If this channel has undergone a commitment update, then in order to // prove to the remote party our knowledge of their prior commitment @@ -3465,7 +3468,7 @@ func (lc *LightningChannel) ChanSyncMsg() (*lnwire.ChannelReestablish, error) { // remote party sent. var lastCommitSecret [32]byte if remoteChainTipHeight != 0 { - remoteSecret, err := lc.channelState.RevocationStore.LookUp( + remoteSecret, err := c.RevocationStore.LookUp( remoteChainTipHeight - 1, ) if err != nil { @@ -3476,7 +3479,7 @@ func (lc *LightningChannel) ChanSyncMsg() (*lnwire.ChannelReestablish, error) { // Additionally, we'll send over the current unrevoked commitment on // our local commitment transaction. - currentCommitSecret, err := lc.channelState.RevocationProducer.AtIndex( + currentCommitSecret, err := c.RevocationProducer.AtIndex( localHeight, ) if err != nil { @@ -3485,7 +3488,7 @@ func (lc *LightningChannel) ChanSyncMsg() (*lnwire.ChannelReestablish, error) { return &lnwire.ChannelReestablish{ ChanID: lnwire.NewChanIDFromOutPoint( - &lc.channelState.FundingOutpoint, + &c.FundingOutpoint, ), NextLocalCommitHeight: nextLocalCommitHeight, RemoteCommitTailHeight: remoteChainTipHeight, @@ -5112,6 +5115,15 @@ func NewUnilateralCloseSummary(chanState *channeldb.OpenChannel, signer Signer, LocalChanConfig: chanState.LocalChanCfg, } + // Attempt to add a channel sync message to the close summary. + chanSync, err := ChanSyncMsg(chanState) + if err != nil { + walletLog.Errorf("ChannelPoint(%v): unable to create channel sync "+ + "message: %v", chanState.FundingOutpoint, err) + } else { + closeSummary.LastChanSyncMsg = chanSync + } + return &UnilateralCloseSummary{ SpendDetail: commitSpend, ChannelCloseSummary: closeSummary, @@ -6187,7 +6199,7 @@ func (lc *LightningChannel) IsPending() bool { return lc.channelState.IsPending } -// State provides access to the channel's internal state for testing. +// State provides access to the channel's internal state. func (lc *LightningChannel) State() *channeldb.OpenChannel { return lc.channelState } diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 4586d75e..24701f28 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -2433,7 +2433,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel, _, _, line, _ := runtime.Caller(1) - aliceChanSyncMsg, err := aliceChannel.ChanSyncMsg() + aliceChanSyncMsg, err := ChanSyncMsg(aliceChannel.channelState) if err != nil { t.Fatalf("line #%v: unable to produce chan sync msg: %v", line, err) @@ -2448,7 +2448,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel, "instead wants to send: %v", line, spew.Sdump(bobMsgsToSend)) } - bobChanSyncMsg, err := bobChannel.ChanSyncMsg() + bobChanSyncMsg, err := ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("line #%v: unable to produce chan sync msg: %v", line, err) @@ -2681,11 +2681,11 @@ func TestChanSyncOweCommitment(t *testing.T) { // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, // while Bob should think he's properly synchronized. - aliceSyncMsg, err := aliceChannel.ChanSyncMsg() + aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.ChanSyncMsg() + bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -2995,11 +2995,11 @@ func TestChanSyncOweRevocation(t *testing.T) { // If we fetch the channel sync messages at this state, then Alice // should report that she owes Bob a revocation message, while Bob // thinks they're fully in sync. - aliceSyncMsg, err := aliceChannel.ChanSyncMsg() + aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.ChanSyncMsg() + bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3164,11 +3164,11 @@ func TestChanSyncOweRevocationAndCommit(t *testing.T) { // If we now attempt to resync, then Alice should conclude that she // doesn't need any further updates, while Bob concludes that he needs // to re-send both his revocation and commit sig message. - aliceSyncMsg, err := aliceChannel.ChanSyncMsg() + aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.ChanSyncMsg() + bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3374,11 +3374,11 @@ func TestChanSyncOweRevocationAndCommitForceTransition(t *testing.T) { // Now if we attempt to synchronize states at this point, Alice should // detect that she owes nothing, while Bob should re-send both his // RevokeAndAck as well as his commitment message. - aliceSyncMsg, err := aliceChannel.ChanSyncMsg() + aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.ChanSyncMsg() + bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3573,11 +3573,11 @@ func TestChanSyncFailure(t *testing.T) { // assertLocalDataLoss checks that aliceOld and bobChannel detects that // Alice has lost state during sync. assertLocalDataLoss := func(aliceOld *LightningChannel) { - aliceSyncMsg, err := aliceOld.ChanSyncMsg() + aliceSyncMsg, err := ChanSyncMsg(aliceOld.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.ChanSyncMsg() + bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3629,7 +3629,7 @@ func TestChanSyncFailure(t *testing.T) { // If we remove the recovery options from Bob's message, Alice cannot // tell if she lost state, since Bob might be lying. She still should // be able to detect that chains cannot be synced. - bobSyncMsg, err := bobChannel.ChanSyncMsg() + bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3643,7 +3643,7 @@ func TestChanSyncFailure(t *testing.T) { // If Bob lies about the NextLocalCommitHeight, making it greater than // what Alice expect, she cannot tell for sure whether she lost state, // but should detect the desync. - bobSyncMsg, err = bobChannel.ChanSyncMsg() + bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3656,7 +3656,7 @@ func TestChanSyncFailure(t *testing.T) { // If Bob's NextLocalCommitHeight is lower than what Alice expects, Bob // probably lost state. - bobSyncMsg, err = bobChannel.ChanSyncMsg() + bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3669,7 +3669,7 @@ func TestChanSyncFailure(t *testing.T) { // If Alice and Bob's states are in sync, but Bob is sending the wrong // LocalUnrevokedCommitPoint, Alice should detect this. - bobSyncMsg, err = bobChannel.ChanSyncMsg() + bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3695,7 +3695,7 @@ func TestChanSyncFailure(t *testing.T) { // when there's a pending remote commit. halfAdvance() - bobSyncMsg, err = bobChannel.ChanSyncMsg() + bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -3785,11 +3785,11 @@ func TestChannelRetransmissionFeeUpdate(t *testing.T) { // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, // while Bob should think he's properly synchronized. - aliceSyncMsg, err := aliceChannel.ChanSyncMsg() + aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } - bobSyncMsg, err := bobChannel.ChanSyncMsg() + bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to produce chan sync msg: %v", err) } @@ -4015,11 +4015,11 @@ func TestChanSyncInvalidLastSecret(t *testing.T) { } // Next, we'll produce the ChanSync messages for both parties. - aliceChanSync, err := aliceChannel.ChanSyncMsg() + aliceChanSync, err := ChanSyncMsg(aliceChannel.channelState) if err != nil { t.Fatalf("unable to generate chan sync msg: %v", err) } - bobChanSync, err := bobChannel.ChanSyncMsg() + bobChanSync, err := ChanSyncMsg(bobChannel.channelState) if err != nil { t.Fatalf("unable to generate chan sync msg: %v", err) } diff --git a/peer.go b/peer.go index 3cac60f9..90ecec10 100644 --- a/peer.go +++ b/peer.go @@ -836,11 +836,11 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]), 1000, func(msg lnwire.Message) { - _, isChanSycMsg := msg.(*lnwire.ChannelReestablish) + _, isChanSyncMsg := msg.(*lnwire.ChannelReestablish) // If this is the chanSync message, then we'll deliver // it immediately to the active link. - if !isChanSycMsg { + if !isChanSyncMsg { // We'll send a message to the funding manager // and wait iff an active funding process for // this channel hasn't yet completed. We do @@ -874,9 +874,36 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { // active goroutine dedicated to this channel. if chanLink == nil { link, err := p.server.htlcSwitch.GetLink(cid) - if err != nil { - peerLog.Errorf("recv'd update for unknown "+ - "channel %v from %v", cid, p) + switch { + + // If we failed to find the link in question, + // and the message received was a channel sync + // message, then this might be a peer trying to + // resync closed channel. In this case we'll + // try to resend our last channel sync message, + // such that the peer can recover funds from + // the closed channel. + case err != nil && isChanSyncMsg: + peerLog.Debugf("Unable to find "+ + "link(%v) to handle channel "+ + "sync, attempting to resend "+ + "last ChanSync message", cid) + + err := p.resendChanSyncMsg(cid) + if err != nil { + // TODO(halseth): send error to + // peer? + peerLog.Errorf( + "resend failed: %v", + err, + ) + } + return + + case err != nil: + peerLog.Errorf("recv'd update for "+ + "unknown channel %v from %v: "+ + "%v", cid, p, err) return } chanLink = link @@ -2143,6 +2170,35 @@ func (p *peer) sendInitMsg() error { return p.writeMessage(msg) } +// resendChanSyncMsg will attempt to find a channel sync message for the closed +// channel and resend it to our peer. +func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error { + // Check if we have any channel sync messages stored for this channel. + c, err := p.server.chanDB.FetchClosedChannelForID(cid) + if err != nil { + return fmt.Errorf("unable to fetch channel sync messages for "+ + "peer %v: %v", p, err) + } + + if c.LastChanSyncMsg == nil { + return fmt.Errorf("no chan sync message stored for channel %v", + cid) + } + + peerLog.Debugf("Re-sending channel sync message for channel %v to "+ + "peer %v", cid, p) + + if err := p.SendMessage(true, c.LastChanSyncMsg); err != nil { + return fmt.Errorf("Failed resending channel sync "+ + "message to peer %v: %v", p, err) + } + + peerLog.Debugf("Re-sent channel sync message for channel %v to peer "+ + "%v", cid, p) + + return nil +} + // SendMessage sends a variadic number of message to remote peer. The first // argument denotes if the method should block until the message has been sent // to the remote peer.