From f4054d2a663afdda1ac49ed6bfd47d64bceb3d79 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:15:09 -0800 Subject: [PATCH 01/13] channeldb: add new AddrsForNode method In this commit, we add a new AddrsForNode method. This method will allow a wrapper sturct to implement the new chanbackup.LiveChannelSource method which is required to implement the full SCB feature set. --- channeldb/db.go | 54 +++++++++++++++++++++++++++++++++++++ channeldb/db_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++ channeldb/nodes.go | 62 +++++++++++++++++++++++-------------------- 3 files changed, 150 insertions(+), 29 deletions(-) diff --git a/channeldb/db.go b/channeldb/db.go index d71656a7..c8ce7fb8 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "fmt" + "net" "os" "path/filepath" "sync" @@ -772,6 +773,59 @@ func (d *DB) PruneLinkNodes() error { }) } +// AddrsForNode consults the graph and channel database for all addresses known +// to the passed node public key. +func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) { + var ( + linkNode *LinkNode + graphNode LightningNode + ) + + dbErr := d.View(func(tx *bbolt.Tx) error { + var err error + + linkNode, err = fetchLinkNode(tx, nodePub) + if err != nil { + return err + } + + // We'll also query the graph for this peer to see if they have + // any addresses that we don't currently have stored within the + // link node database. + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return ErrGraphNotFound + } + compressedPubKey := nodePub.SerializeCompressed() + graphNode, err = fetchLightningNode(nodes, compressedPubKey) + if err != nil { + return err + } + + return nil + }) + if dbErr != nil { + return nil, dbErr + } + + // Now that we have both sources of addrs for this node, we'll use a + // map to de-duplicate any addresses between the two sources, and + // produce a final list of the combined addrs. + addrs := make(map[string]net.Addr) + for _, addr := range linkNode.Addresses { + addrs[addr.String()] = addr + } + for _, addr := range graphNode.Addresses { + addrs[addr.String()] = addr + } + dedupedAddrs := make([]net.Addr, 0, len(addrs)) + for _, addr := range addrs { + dedupedAddrs = append(dedupedAddrs, addr) + } + + return dedupedAddrs, nil +} + // syncVersions function is used for safe db version synchronization. It // applies migration functions to the current database and recovers the // previous state of db if at least one error/panic appeared during migration. diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 794b1fcf..dbd255e5 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -2,10 +2,12 @@ package channeldb import ( "io/ioutil" + "net" "os" "path/filepath" "testing" + "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/lnwire" ) @@ -147,3 +149,64 @@ func TestFetchClosedChannelForID(t *testing.T) { t.Fatalf("expected ErrClosedChannelNotFound, instead got: %v", err) } } + +// TestAddrsForNode tests the we're able to properly obtain all the addresses +// for a target node. +func TestAddrsForNode(t *testing.T) { + t.Parallel() + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + graph := cdb.ChannelGraph() + + // We'll make a test vertex to insert into the database, as the source + // node, but this node will only have half the number of addresses it + // usually does. + testNode, err := createTestVertex(cdb) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + testNode.Addresses = []net.Addr{testAddr} + if err := graph.SetSourceNode(testNode); err != nil { + t.Fatalf("unable to set source node: %v", err) + } + + // Next, we'll make a link node with the same pubkey, but with an + // additional address. + nodePub, err := testNode.PubKey() + if err != nil { + t.Fatalf("unable to recv node pub: %v", err) + } + linkNode := cdb.NewLinkNode( + wire.MainNet, nodePub, anotherAddr, + ) + if err := linkNode.Sync(); err != nil { + t.Fatalf("unable to sync link node: %v", err) + } + + // Now that we've created a link node, as well as a vertex for the + // node, we'll query for all its addresses. + nodeAddrs, err := cdb.AddrsForNode(nodePub) + if err != nil { + t.Fatalf("unable to obtain node addrs: %v", err) + } + + expectedAddrs := make(map[string]struct{}) + expectedAddrs[testAddr.String()] = struct{}{} + expectedAddrs[anotherAddr.String()] = struct{}{} + + // Finally, ensure that all the expected addresses are found. + if len(nodeAddrs) != len(expectedAddrs) { + t.Fatalf("expected %v addrs, got %v", + len(expectedAddrs), len(nodeAddrs)) + } + for _, addr := range nodeAddrs { + if _, ok := expectedAddrs[addr.String()]; !ok { + t.Fatalf("unexpected addr: %v", addr) + } + } +} diff --git a/channeldb/nodes.go b/channeldb/nodes.go index 43729d33..95f6f7a2 100644 --- a/channeldb/nodes.go +++ b/channeldb/nodes.go @@ -62,13 +62,13 @@ type LinkNode struct { // NewLinkNode creates a new LinkNode from the provided parameters, which is // backed by an instance of channeldb. func (db *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey, - addr net.Addr) *LinkNode { + addrs ...net.Addr) *LinkNode { return &LinkNode{ Network: bitNet, IdentityPub: pub, LastSeen: time.Now(), - Addresses: []net.Addr{addr}, + Addresses: addrs, db: db, } } @@ -149,40 +149,44 @@ func (db *DB) deleteLinkNode(tx *bbolt.Tx, identity *btcec.PublicKey) error { // identity public key. If a particular LinkNode for the passed identity public // key cannot be found, then ErrNodeNotFound if returned. func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) { - var ( - node *LinkNode - err error - ) - - err = db.View(func(tx *bbolt.Tx) error { - // First fetch the bucket for storing node metadata, bailing - // out early if it hasn't been created yet. - nodeMetaBucket := tx.Bucket(nodeInfoBucket) - if nodeMetaBucket == nil { - return ErrLinkNodesNotFound + var linkNode *LinkNode + err := db.View(func(tx *bbolt.Tx) error { + node, err := fetchLinkNode(tx, identity) + if err != nil { + return err } - // If a link node for that particular public key cannot be - // located, then exit early with an ErrNodeNotFound. - pubKey := identity.SerializeCompressed() - nodeBytes := nodeMetaBucket.Get(pubKey) - if nodeBytes == nil { - return ErrNodeNotFound - } - - // Finally, decode an allocate a fresh LinkNode object to be - // returned to the caller. - nodeReader := bytes.NewReader(nodeBytes) - node, err = deserializeLinkNode(nodeReader) - return err + linkNode = node + return nil }) - if err != nil { - return nil, err + + return linkNode, err +} + +func fetchLinkNode(tx *bbolt.Tx, targetPub *btcec.PublicKey) (*LinkNode, error) { + // First fetch the bucket for storing node metadata, bailing out early + // if it hasn't been created yet. + nodeMetaBucket := tx.Bucket(nodeInfoBucket) + if nodeMetaBucket == nil { + return nil, ErrLinkNodesNotFound } - return node, nil + // If a link node for that particular public key cannot be located, + // then exit early with an ErrNodeNotFound. + pubKey := targetPub.SerializeCompressed() + nodeBytes := nodeMetaBucket.Get(pubKey) + if nodeBytes == nil { + return nil, ErrNodeNotFound + } + + // Finally, decode and allocate a fresh LinkNode object to be returned + // to the caller. + nodeReader := bytes.NewReader(nodeBytes) + return deserializeLinkNode(nodeReader) } +// TODO(roasbeef): update link node addrs in server upon connection + // FetchAllLinkNodes starts a new database transaction to fetch all nodes with // whom we have active channels with. func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) { From f57f40e76760638dbb55ce8c623d56c0a79394f4 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:21:34 -0800 Subject: [PATCH 02/13] channeldb: add prefix naming to ChannelStatus enum variables In this commit, we add a prefix naming scheme to the ChannelStatus enum variables. We do this as it enables outside callers to more easily identify each individual enum variable as a part of the greater enum-like type. --- channeldb/channel.go | 55 +++++++++++++++++++++++++------------------- channeldb/db.go | 2 +- peer.go | 2 +- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 5d59f9dc..b4a1f2b0 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -293,37 +293,44 @@ type ChannelCommitment struct { type ChannelStatus uint8 var ( - // Default is the normal state of an open channel. - Default ChannelStatus + // ChanStatusDefault is the normal state of an open channel. + ChanStatusDefault ChannelStatus - // Borked indicates that the channel has entered an irreconcilable - // state, triggered by a state desynchronization or channel breach. - // Channels in this state should never be added to the htlc switch. - Borked ChannelStatus = 1 + // ChanStatusBorked indicates that the channel has entered an + // irreconcilable state, triggered by a state desynchronization or + // channel breach. Channels in this state should never be added to the + // htlc switch. + ChanStatusBorked ChannelStatus = 1 - // CommitmentBroadcasted indicates that a commitment for this channel - // has been broadcasted. - CommitmentBroadcasted ChannelStatus = 1 << 1 + // ChanStatusCommitBroadcasted indicates that a commitment for this + // channel has been broadcasted. + ChanStatusCommitBroadcasted ChannelStatus = 1 << 1 - // LocalDataLoss indicates that we have lost channel state for this - // channel, and broadcasting our latest commitment might be considered - // a breach. + // ChanStatusLocalDataLoss indicates that we have lost channel state + // for this channel, and broadcasting our latest commitment might be + // considered a breach. + // // TODO(halseh): actually enforce that we are not force closing such a // channel. - LocalDataLoss ChannelStatus = 1 << 2 + ChanStatusLocalDataLoss ChannelStatus = 1 << 2 + + // ChanStatusRestored is a status flag that signals that the chanel has + // been restored, and doesn't have all the fields a typical channel + // will have. + ChanStatusRestored ChannelStatus = 1 << 3 ) // String returns a human-readable representation of the ChannelStatus. func (c ChannelStatus) String() string { switch c { - case Default: - return "Default" - case Borked: - return "Borked" - case CommitmentBroadcasted: - return "CommitmentBroadcasted" - case LocalDataLoss: - return "LocalDataLoss" + case ChanStatusDefault: + return "ChanStatusDefault" + case ChanStatusBorked: + return "ChanStatusBorked" + case ChanStatusCommitBroadcasted: + return "ChanStatusCommitBroadcasted" + case ChanStatusLocalDataLoss: + return "ChanStatusLocalDataLoss" default: return fmt.Sprintf("Unknown(%08b)", c) } @@ -664,7 +671,7 @@ func (c *OpenChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error { // Add status LocalDataLoss to the existing bitvector found in // the DB. - status = channel.chanStatus | LocalDataLoss + status = channel.chanStatus | ChanStatusLocalDataLoss channel.chanStatus = status var b bytes.Buffer @@ -730,7 +737,7 @@ func (c *OpenChannel) MarkBorked() error { c.Lock() defer c.Unlock() - return c.putChanStatus(Borked) + return c.putChanStatus(ChanStatusBorked) } // MarkCommitmentBroadcasted marks the channel as a commitment transaction has @@ -740,7 +747,7 @@ func (c *OpenChannel) MarkCommitmentBroadcasted() error { c.Lock() defer c.Unlock() - return c.putChanStatus(CommitmentBroadcasted) + return c.putChanStatus(ChanStatusCommitBroadcasted) } func (c *OpenChannel) putChanStatus(status ChannelStatus) error { diff --git a/channeldb/db.go b/channeldb/db.go index c8ce7fb8..74b712f1 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -531,7 +531,7 @@ func fetchChannels(d *DB, pending, waitingClose bool) ([]*OpenChannel, error) { // than Default, then it means it is // waiting to be closed. channelWaitingClose := - channel.ChanStatus() != Default + channel.ChanStatus() != ChanStatusDefault // Only include it if we requested // channels with the same waitingClose diff --git a/peer.go b/peer.go index 148088c2..677d147d 100644 --- a/peer.go +++ b/peer.go @@ -409,7 +409,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { // Skip adding any permanently irreconcilable channels to the // htlcswitch. - if dbChan.ChanStatus() != channeldb.Default { + if dbChan.ChanStatus() != channeldb.ChanStatusDefault { peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+ "start.", chanPoint, dbChan.ChanStatus()) continue From a410262dda07ee77a5e8663a91498162b57ba088 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:23:21 -0800 Subject: [PATCH 03/13] channeldb: modify the String() method of ChannelStatus reflect all flags In this commit, we modify the String() method of the ChannelStatus type to reflect the fact that it's a flag set. With these new changes, we'll now print the variable name of each assigned bit with a bar delimiting them all. --- channeldb/channel.go | 58 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index b4a1f2b0..8bc24867 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -7,6 +7,8 @@ import ( "fmt" "io" "net" + "strconv" + "strings" "sync" "github.com/btcsuite/btcd/btcec" @@ -320,20 +322,54 @@ var ( ChanStatusRestored ChannelStatus = 1 << 3 ) +// chanStatusStrings maps a ChannelStatus to a human friendly string that +// describes that status. +var chanStatusStrings = map[ChannelStatus]string{ + ChanStatusDefault: "ChanStatusDefault", + ChanStatusBorked: "ChanStatusBorked", + ChanStatusCommitBroadcasted: "ChanStatusCommitBroadcasted", + ChanStatusLocalDataLoss: "ChanStatusLocalDataLoss", + ChanStatusRestored: "ChanStatusRestored", +} + +// orderedChanStatusFlags is an in-order list of all that channel status flags. +var orderedChanStatusFlags = []ChannelStatus{ + ChanStatusDefault, + ChanStatusBorked, + ChanStatusCommitBroadcasted, + ChanStatusLocalDataLoss, + ChanStatusRestored, +} + // String returns a human-readable representation of the ChannelStatus. func (c ChannelStatus) String() string { - switch c { - case ChanStatusDefault: - return "ChanStatusDefault" - case ChanStatusBorked: - return "ChanStatusBorked" - case ChanStatusCommitBroadcasted: - return "ChanStatusCommitBroadcasted" - case ChanStatusLocalDataLoss: - return "ChanStatusLocalDataLoss" - default: - return fmt.Sprintf("Unknown(%08b)", c) + // If no flags are set, then this is the default case. + if c == 0 { + return chanStatusStrings[ChanStatusDefault] } + + // Add individual bit flags. + statusStr := "" + for _, flag := range orderedChanStatusFlags { + if c&flag == flag { + statusStr += chanStatusStrings[flag] + "|" + c -= flag + } + } + + // Remove anything to the right of the final bar, including it as well. + statusStr = strings.TrimRight(statusStr, "|") + + // Add any remaining flags which aren't accounted for as hex. + if c != 0 { + statusStr += "|0x" + strconv.FormatUint(uint64(c), 16) + } + + // If this was purely an unknown flag, then remove the extra bar at the + // start of the string. + statusStr = strings.TrimLeft(statusStr, "|") + + return statusStr } // OpenChannel encapsulates the persistent and dynamic state of an open channel From aaf6456e12707295fa2ec86393cddd483eea6a9d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:24:21 -0800 Subject: [PATCH 04/13] channeldb: add HasChanStatus and ApplyChanState methods to OpenChannel These methods allow callers to properly query if a channel is in a particular flag, and also modify the channel status in a thread safe manner. --- channeldb/channel.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 8bc24867..e33ae77a 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -316,8 +316,8 @@ var ( // channel. ChanStatusLocalDataLoss ChannelStatus = 1 << 2 - // ChanStatusRestored is a status flag that signals that the chanel has - // been restored, and doesn't have all the fields a typical channel + // ChanStatusRestored is a status flag that signals that the channel + // has been restored, and doesn't have all the fields a typical channel // will have. ChanStatusRestored ChannelStatus = 1 << 3 ) @@ -531,6 +531,28 @@ func (c *OpenChannel) ChanStatus() ChannelStatus { return c.chanStatus } +// ApplyChanStatus allows the caller to modify the internal channel state in a +// thead-safe manner. +func (c *OpenChannel) ApplyChanStatus(status ChannelStatus) error { + c.Lock() + defer c.Unlock() + + return c.putChanStatus(status) +} + +// HasChanStatus returns true if the internal bitfield channel status of the +// target channel has the specified status bit set. +func (c *OpenChannel) HasChanStatus(status ChannelStatus) bool { + c.RLock() + defer c.RUnlock() + + return c.hasChanStatus(status) +} + +func (c *OpenChannel) hasChanStatus(status ChannelStatus) bool { + return c.chanStatus&status == status +} + // RefreshShortChanID updates the in-memory short channel ID using the latest // value observed on disk. func (c *OpenChannel) RefreshShortChanID() error { From 7a88f580eac7bc4a13a559bddc1881cbc0c57016 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:26:02 -0800 Subject: [PATCH 05/13] channeldb: add new FetchChannelMethod In this commit, we add a new method, FetchChannel which is required in order to implement the new chanbackup.LiveChannelSource interface. --- channeldb/db.go | 96 ++++++++++++++++++++++++++++++++++++++++++++ channeldb/db_test.go | 61 ++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/channeldb/db.go b/channeldb/db.go index 74b712f1..8e27b021 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -408,6 +408,102 @@ func (d *DB) fetchNodeChannels(chainBucket *bbolt.Bucket) ([]*OpenChannel, error return channels, nil } +// FetchChannel attempts to locate a channel specified by the passed channel +// point. If the channel cannot be found, then an error will be returned. +func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) { + var ( + targetChan *OpenChannel + targetChanPoint bytes.Buffer + ) + + if err := writeOutpoint(&targetChanPoint, &chanPoint); err != nil { + return nil, err + } + + // chanScan will traverse the following bucket structure: + // * nodePub => chainHash => chanPoint + // + // At each level we go one further, ensuring that we're traversing the + // proper key (that's actually a bucket). By only reading the bucket + // structure and skipping fully decoding each channel, we save a good + // bit of CPU as we don't need to do things like decompress public + // keys. + chanScan := func(tx *bbolt.Tx) error { + // Get the bucket dedicated to storing the metadata for open + // channels. + openChanBucket := tx.Bucket(openChannelBucket) + if openChanBucket == nil { + return ErrNoActiveChannels + } + + // Within the node channel bucket, are the set of node pubkeys + // we have channels with, we don't know the entire set, so + // we'll check them all. + return openChanBucket.ForEach(func(nodePub, v []byte) error { + // Ensure that this is a key the same size as a pubkey, + // and also that it leads directly to a bucket. + if len(nodePub) != 33 || v != nil { + return nil + } + + nodeChanBucket := openChanBucket.Bucket(nodePub) + if nodeChanBucket == nil { + return nil + } + + // The next layer down is all the chains that this node + // has channels on with us. + return nodeChanBucket.ForEach(func(chainHash, v []byte) error { + // If there's a value, it's not a bucket so + // ignore it. + if v != nil { + return nil + } + + chainBucket := nodeChanBucket.Bucket(chainHash) + if chainBucket == nil { + return fmt.Errorf("unable to read "+ + "bucket for chain=%x", chainHash[:]) + } + + // Finally we reach the leaf bucket that stores + // all the chanPoints for this node. + chanBucket := chainBucket.Bucket( + targetChanPoint.Bytes(), + ) + if chanBucket == nil { + return nil + } + + channel, err := fetchOpenChannel( + chanBucket, &chanPoint, + ) + if err != nil { + return err + } + + targetChan = channel + targetChan.Db = d + + return nil + }) + }) + } + + err := d.View(chanScan) + if err != nil { + return nil, err + } + + if targetChan != nil { + return targetChan, nil + } + + // If we can't find the channel, then we return with an error, as we + // have nothing to backup. + return nil, ErrChannelNotFound +} + // FetchAllChannels attempts to retrieve all open channels currently stored // within the database, including pending open, fully open and channels waiting // for a closing transaction to confirm. diff --git a/channeldb/db_test.go b/channeldb/db_test.go index dbd255e5..17018333 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -5,10 +5,12 @@ import ( "net" "os" "path/filepath" + "reflect" "testing" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/lnwire" ) @@ -210,3 +212,62 @@ func TestAddrsForNode(t *testing.T) { } } } + +// TestFetchChannel tests that we're able to fetch an arbitrary channel from +// disk. +func TestFetchChannel(t *testing.T) { + t.Parallel() + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + // Create the test channel state that we'll sync to the database + // shortly. + channelState, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + + // Mark the channel as pending, then immediately mark it as open to it + // can be fully visible. + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + if err := channelState.SyncPending(addr, 9); err != nil { + t.Fatalf("unable to save and serialize channel state: %v", err) + } + err = channelState.MarkAsOpen(lnwire.NewShortChanIDFromInt(99)) + if err != nil { + t.Fatalf("unable to mark channel open: %v", err) + } + + // Next, attempt to fetch the channel by its chan point. + dbChannel, err := cdb.FetchChannel(channelState.FundingOutpoint) + if err != nil { + t.Fatalf("unable to fetch channel: %v", err) + } + + // The decoded channel state should be identical to what we stored + // above. + if !reflect.DeepEqual(channelState, dbChannel) { + t.Fatalf("channel state doesn't match:: %v vs %v", + spew.Sdump(channelState), spew.Sdump(dbChannel)) + } + + // If we attempt to query for a non-exist ante channel, then we should + // get an error. + channelState2, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + channelState2.FundingOutpoint.Index ^= 1 + + _, err = cdb.FetchChannel(channelState2.FundingOutpoint) + if err == nil { + t.Fatalf("expected query to fail") + } +} From 11c6887ffaf3ffb9621ccf88dd396c780085fb4f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:28:54 -0800 Subject: [PATCH 06/13] channeldb: refactor syncPending to expose new syncNewChannel function The new syncNewChannel function will allow callers to insert a new channel given the OpenChannel struct, and set of addresses for the channel peer. This new method will also create a new LinkNode for the peer if one doesn't already exist. --- channeldb/channel.go | 69 +++++++++++++++++++++++++------------------- channeldb/error.go | 5 ++++ 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index e33ae77a..8dd26e57 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -656,16 +656,20 @@ func (c *OpenChannel) fullSync(tx *bbolt.Tx) error { } // With the bucket for the node fetched, we can now go down another - // level, creating the bucket (if it doesn't exist), for this channel - // itself. + // level, creating the bucket for this channel itself. var chanPointBuf bytes.Buffer if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil { return err } - chanBucket, err := chainBucket.CreateBucketIfNotExists( + chanBucket, err := chainBucket.CreateBucket( chanPointBuf.Bytes(), ) - if err != nil { + switch { + case err == bbolt.ErrBucketExists: + // If this channel already exists, then in order to avoid + // overriding it, we'll return an error back up to the caller. + return ErrChanAlreadyExists + case err != nil: return err } @@ -911,35 +915,40 @@ func (c *OpenChannel) SyncPending(addr net.Addr, pendingHeight uint32) error { c.FundingBroadcastHeight = pendingHeight return c.Db.Update(func(tx *bbolt.Tx) error { - // First, sync all the persistent channel state to disk. - if err := c.fullSync(tx); err != nil { - return err - } - - nodeInfoBucket, err := tx.CreateBucketIfNotExists(nodeInfoBucket) - if err != nil { - return err - } - - // If a LinkNode for this identity public key already exists, - // then we can exit early. - nodePub := c.IdentityPub.SerializeCompressed() - if nodeInfoBucket.Get(nodePub) != nil { - return nil - } - - // Next, we need to establish a (possibly) new LinkNode - // relationship for this channel. The LinkNode metadata - // contains reachability, up-time, and service bits related - // information. - linkNode := c.Db.NewLinkNode(wire.MainNet, c.IdentityPub, addr) - - // TODO(roasbeef): do away with link node all together? - - return putLinkNode(nodeInfoBucket, linkNode) + return syncNewChannel(tx, c, []net.Addr{addr}) }) } +// syncNewChannel will write the passed channel to disk, and also create a +// LinkNode (if needed) for the channel peer. +func syncNewChannel(tx *bbolt.Tx, c *OpenChannel, addrs []net.Addr) error { + // First, sync all the persistent channel state to disk. + if err := c.fullSync(tx); err != nil { + return err + } + + nodeInfoBucket, err := tx.CreateBucketIfNotExists(nodeInfoBucket) + if err != nil { + return err + } + + // If a LinkNode for this identity public key already exists, + // then we can exit early. + nodePub := c.IdentityPub.SerializeCompressed() + if nodeInfoBucket.Get(nodePub) != nil { + return nil + } + + // Next, we need to establish a (possibly) new LinkNode relationship + // for this channel. The LinkNode metadata contains reachability, + // up-time, and service bits related information. + linkNode := c.Db.NewLinkNode(wire.MainNet, c.IdentityPub, addrs...) + + // TODO(roasbeef): do away with link node all together? + + return putLinkNode(nodeInfoBucket, linkNode) +} + // UpdateCommitment updates the commitment state for the specified party // (remote or local). The commitment stat completely describes the balance // state at this point in the commitment chain. This method its to be called on diff --git a/channeldb/error.go b/channeldb/error.go index c9ddfb49..c0548106 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -103,6 +103,11 @@ var ( // indicate it should be. ErrEdgePolicyOptionalFieldNotFound = fmt.Errorf("optional field not " + "present") + + // ErrChanAlreadyExists is return when the caller attempts to create a + // channel with a channel point that is already present in the + // database. + ErrChanAlreadyExists = fmt.Errorf("channel already exists") ) // ErrTooManyExtraOpaqueBytes creates an error which should be returned if the From c656788b0bffc3ee7de6de464de17a7eb7cdee2b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:30:26 -0800 Subject: [PATCH 07/13] channeldb: add new methods to allow adding a new edge+policy w/ existing db transaction --- channeldb/graph.go | 210 ++++++++++++++++++++-------------------- channeldb/migrations.go | 2 +- 2 files changed, 107 insertions(+), 105 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index dba5108c..c13e1879 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -483,101 +483,105 @@ func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket, // the channel supports. The chanPoint and chanID are used to uniquely identify // the edge globally within the database. func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { + return c.db.Update(func(tx *bbolt.Tx) error { + return c.addChannelEdge(tx, edge) + }) +} + +// addChannelEdge is the private form of AddChannelEdge that allows callers to +// utilize an existing db transaction. +func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error { // Construct the channel's primary key which is the 8-byte channel ID. var chanKey [8]byte binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID) - return c.db.Update(func(tx *bbolt.Tx) error { - nodes, err := tx.CreateBucketIfNotExists(nodeBucket) - if err != nil { - return err - } - edges, err := tx.CreateBucketIfNotExists(edgeBucket) - if err != nil { - return err - } - edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket) - if err != nil { - return err - } - chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket) + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return err + } + edges, err := tx.CreateBucketIfNotExists(edgeBucket) + if err != nil { + return err + } + edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket) + if err != nil { + return err + } + chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket) + if err != nil { + return err + } + + // First, attempt to check if this edge has already been created. If + // so, then we can exit early as this method is meant to be idempotent. + if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil { + return ErrEdgeAlreadyExist + } + + // Before we insert the channel into the database, we'll ensure that + // both nodes already exist in the channel graph. If either node + // doesn't, then we'll insert a "shell" node that just includes its + // public key, so subsequent validation and queries can work properly. + _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:]) + switch { + case node1Err == ErrGraphNodeNotFound: + node1Shell := LightningNode{ + PubKeyBytes: edge.NodeKey1Bytes, + HaveNodeAnnouncement: false, + } + err := addLightningNode(tx, &node1Shell) + if err != nil { + return fmt.Errorf("unable to create shell node "+ + "for: %x", edge.NodeKey1Bytes) + + } + case node1Err != nil: + return err + } + + _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:]) + switch { + case node2Err == ErrGraphNodeNotFound: + node2Shell := LightningNode{ + PubKeyBytes: edge.NodeKey2Bytes, + HaveNodeAnnouncement: false, + } + err := addLightningNode(tx, &node2Shell) + if err != nil { + return fmt.Errorf("unable to create shell node "+ + "for: %x", edge.NodeKey2Bytes) + + } + case node2Err != nil: + return err + } + + // If the edge hasn't been created yet, then we'll first add it to the + // edge index in order to associate the edge between two nodes and also + // store the static components of the channel. + if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil { + return err + } + + // Mark edge policies for both sides as unknown. This is to enable + // efficient incoming channel lookup for a node. + for _, key := range []*[33]byte{&edge.NodeKey1Bytes, + &edge.NodeKey2Bytes} { + + err := putChanEdgePolicyUnknown(edges, edge.ChannelID, + key[:]) if err != nil { return err } + } - // First, attempt to check if this edge has already been - // created. If so, then we can exit early as this method is - // meant to be idempotent. - if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil { - return ErrEdgeAlreadyExist - } - - // Before we insert the channel into the database, we'll ensure - // that both nodes already exist in the channel graph. If - // either node doesn't, then we'll insert a "shell" node that - // just includes its public key, so subsequent validation and - // queries can work properly. - _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:]) - switch { - case node1Err == ErrGraphNodeNotFound: - node1Shell := LightningNode{ - PubKeyBytes: edge.NodeKey1Bytes, - HaveNodeAnnouncement: false, - } - err := addLightningNode(tx, &node1Shell) - if err != nil { - return fmt.Errorf("unable to create shell node "+ - "for: %x", edge.NodeKey1Bytes) - - } - case node1Err != nil: - return err - } - - _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:]) - switch { - case node2Err == ErrGraphNodeNotFound: - node2Shell := LightningNode{ - PubKeyBytes: edge.NodeKey2Bytes, - HaveNodeAnnouncement: false, - } - err := addLightningNode(tx, &node2Shell) - if err != nil { - return fmt.Errorf("unable to create shell node "+ - "for: %x", edge.NodeKey2Bytes) - - } - case node2Err != nil: - return err - } - - // If the edge hasn't been created yet, then we'll first add it - // to the edge index in order to associate the edge between two - // nodes and also store the static components of the channel. - if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil { - return err - } - - // Mark edge policies for both sides as unknown. This is to - // enable efficient incoming channel lookup for a node. - for _, key := range []*[33]byte{&edge.NodeKey1Bytes, - &edge.NodeKey2Bytes} { - - err := putChanEdgePolicyUnknown(edges, edge.ChannelID, - key[:]) - if err != nil { - return err - } - } - - // Finally we add it to the channel index which maps channel - // points (outpoints) to the shorter channel ID's. - var b bytes.Buffer - if err := writeOutpoint(&b, &edge.ChannelPoint); err != nil { - return err - } - return chanIndex.Put(b.Bytes(), chanKey[:]) - }) + // Finally we add it to the channel index which maps channel points + // (outpoints) to the shorter channel ID's. + var b bytes.Buffer + if err := writeOutpoint(&b, &edge.ChannelPoint); err != nil { + return err + } + return chanIndex.Put(b.Bytes(), chanKey[:]) } // HasChannelEdge returns true if the database knows of a channel edge with the @@ -1639,28 +1643,26 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket, // the nodes on either side of the channel. func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { return c.db.Update(func(tx *bbolt.Tx) error { - edges := tx.Bucket(edgeBucket) - if edges == nil { - return ErrEdgeNotFound - } - - edgeIndex := edges.Bucket(edgeIndexBucket) - if edgeIndex == nil { - return ErrEdgeNotFound - } - nodes, err := tx.CreateBucketIfNotExists(nodeBucket) - if err != nil { - return err - } - - return updateEdgePolicy(edges, edgeIndex, nodes, edge) + return updateEdgePolicy(tx, edge) }) } // updateEdgePolicy attempts to update an edge's policy within the relevant // buckets using an existing database transaction. -func updateEdgePolicy(edges, edgeIndex, nodes *bbolt.Bucket, - edge *ChannelEdgePolicy) error { +func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrEdgeNotFound + + } + edgeIndex := edges.Bucket(edgeIndexBucket) + if edgeIndex == nil { + return ErrEdgeNotFound + } + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return err + } // Create the channelID key be converting the channel ID // integer into a byte slice. diff --git a/channeldb/migrations.go b/channeldb/migrations.go index 647502c1..5d4919db 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -563,7 +563,7 @@ func migratePruneEdgeUpdateIndex(tx *bbolt.Tx) error { return err } - err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy) + err = updateEdgePolicy(tx, edgePolicy) if err != nil { return err } From fa30af04757d8dead9f11d146e503faead6ca8fe Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:31:34 -0800 Subject: [PATCH 08/13] channeldb: when fetching/inserting commits check for ChanStatusRestored If the ChanStatusRestored flag is set, then we don't need to write or read the set of commits for a channel as they won't exist. This will be the case when we restore a channel from an SCB. --- channeldb/channel.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 8dd26e57..a68f517c 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -972,7 +972,9 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error { // With the proper bucket fetched, we'll now write toe latest // commitment state to dis for the target party. - err = putChanCommitment(chanBucket, newCommitment, true) + err = putChanCommitment( + chanBucket, newCommitment, true, + ) if err != nil { return fmt.Errorf("unable to store chan "+ "revocations: %v", err) @@ -1540,7 +1542,9 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error { if err != nil { return err } - err = putChanCommitment(chanBucket, &newCommit.Commitment, false) + err = putChanCommitment( + chanBucket, &newCommit.Commitment, false, + ) if err != nil { return err } @@ -2353,12 +2357,22 @@ func putChanCommitment(chanBucket *bbolt.Bucket, c *ChannelCommitment, } func putChanCommitments(chanBucket *bbolt.Bucket, channel *OpenChannel) error { - err := putChanCommitment(chanBucket, &channel.LocalCommitment, true) + // If this is a restored channel, then we don't have any commitments to + // write. + if channel.hasChanStatus(ChanStatusRestored) { + return nil + } + + err := putChanCommitment( + chanBucket, &channel.LocalCommitment, true, + ) if err != nil { return err } - return putChanCommitment(chanBucket, &channel.RemoteCommitment, false) + return putChanCommitment( + chanBucket, &channel.RemoteCommitment, false, + ) } func putChanRevocationState(chanBucket *bbolt.Bucket, channel *OpenChannel) error { @@ -2473,6 +2487,12 @@ func fetchChanCommitment(chanBucket *bbolt.Bucket, local bool) (ChannelCommitmen func fetchChanCommitments(chanBucket *bbolt.Bucket, channel *OpenChannel) error { var err error + // If this is a restored channel, then we don't have any commitments to + // read. + if channel.hasChanStatus(ChanStatusRestored) { + return nil + } + channel.LocalCommitment, err = fetchChanCommitment(chanBucket, true) if err != nil { return err From b2b57314facd0111c17f4b74459d81013680a651 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:37:47 -0800 Subject: [PATCH 09/13] channeldb: add new RestoreChannelShells method In this commit, we add a new type (ChannelShell) along with a new method, RestoreChannelShells which allows a caller to insert a series of channel shells into the database. These channel shells will allow a restored node to initiate the DLP protocol and recover their set of existing channels. When we insert a channel shell, we re-create the original link node, and also add the outgoing edge to the channel graph. This way we can be sure that upon start up, we attempt to connect to the remote peers, and that the normal graph query commands will operate as expected. --- channeldb/db.go | 107 ++++++++++++++++++++++++++ channeldb/db_test.go | 177 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 284 insertions(+) diff --git a/channeldb/db.go b/channeldb/db.go index 8e27b021..c4262ffa 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" @@ -869,6 +870,112 @@ func (d *DB) PruneLinkNodes() error { }) } +// ChannelShell is a shell of a channel that is meant to be used for channel +// recovery purposes. It contains a minimal OpenChannel instance along with +// addresses for that target node. +type ChannelShell struct { + // NodeAddrs the set of addresses that this node has known to be + // reachable at in the past. + NodeAddrs []net.Addr + + // Chan is a shell of an OpenChannel, it contains only the items + // required to restore the channel on disk. + Chan *OpenChannel +} + +// RestoreChannelShells is a method that allows the caller to reconstruct the +// state of an OpenChannel from the ChannelShell. We'll attempt to write the +// new channel to disk, create a LinkNode instance with the passed node +// addresses, and finally create an edge within the graph for the channel as +// well. This method is idempotent, so repeated calls with the same set of +// channel shells won't modify the database after the initial call. +func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error { + chanGraph := ChannelGraph{d} + + return d.Update(func(tx *bbolt.Tx) error { + for _, channelShell := range channelShells { + channel := channelShell.Chan + + // First, we'll attempt to create a new open channel + // and link node for this channel. If the channel + // already exists, then in order to ensure this method + // is idempotent, we'll continue to the next step. + channel.Db = d + err := syncNewChannel( + tx, channel, channelShell.NodeAddrs, + ) + if err != nil { + return err + } + + // Next, we'll create an active edge in the graph + // database for this channel in order to restore our + // partial view of the network. + // + // TODO(roasbeef): if we restore *after* the channel + // has been closed on chain, then need to inform the + // router that it should try and prune these values as + // we can detect them + edgeInfo := ChannelEdgeInfo{ + ChannelID: channel.ShortChannelID.ToUint64(), + ChainHash: channel.ChainHash, + ChannelPoint: channel.FundingOutpoint, + } + + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return ErrGraphNotFound + } + selfNode, err := chanGraph.sourceNode(nodes) + if err != nil { + return err + } + + // Depending on which pub key is smaller, we'll assign + // our roles as "node1" and "node2". + chanPeer := channel.IdentityPub.SerializeCompressed() + selfIsSmaller := bytes.Compare( + selfNode.PubKeyBytes[:], chanPeer, + ) == -1 + if selfIsSmaller { + copy(edgeInfo.NodeKey1Bytes[:], selfNode.PubKeyBytes[:]) + copy(edgeInfo.NodeKey2Bytes[:], chanPeer) + } else { + copy(edgeInfo.NodeKey1Bytes[:], chanPeer) + copy(edgeInfo.NodeKey2Bytes[:], selfNode.PubKeyBytes[:]) + } + + // With the edge info shell constructed, we'll now add + // it to the graph. + err = chanGraph.addChannelEdge(tx, &edgeInfo) + if err != nil { + return err + } + + // Similarly, we'll construct a channel edge shell and + // add that itself to the graph. + chanEdge := ChannelEdgePolicy{ + ChannelID: edgeInfo.ChannelID, + LastUpdate: time.Now(), + } + + // If their pubkey is larger, then we'll flip the + // direction bit to indicate that us, the "second" node + // is updating their policy. + if !selfIsSmaller { + chanEdge.ChannelFlags |= lnwire.ChanUpdateDirection + } + + err = updateEdgePolicy(tx, &chanEdge) + if err != nil { + return err + } + } + + return nil + }) +} + // AddrsForNode consults the graph and channel database for all addresses known // to the passed node public key. func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) { diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 17018333..261e0b36 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -2,16 +2,22 @@ package channeldb import ( "io/ioutil" + "math" + "math/rand" "net" "os" "path/filepath" "reflect" "testing" + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/shachain" ) func TestOpenWithCreate(t *testing.T) { @@ -271,3 +277,174 @@ func TestFetchChannel(t *testing.T) { t.Fatalf("expected query to fail") } } + +func genRandomChannelShell() (*ChannelShell, error) { + var testPriv [32]byte + if _, err := rand.Read(testPriv[:]); err != nil { + return nil, err + } + + _, pub := btcec.PrivKeyFromBytes(btcec.S256(), testPriv[:]) + + var chanPoint wire.OutPoint + if _, err := rand.Read(chanPoint.Hash[:]); err != nil { + return nil, err + } + + pub.Curve = nil + + chanPoint.Index = uint32(rand.Intn(math.MaxUint16)) + + chanStatus := ChanStatusDefault | ChanStatusRestored + + var shaChainPriv [32]byte + if _, err := rand.Read(testPriv[:]); err != nil { + return nil, err + } + revRoot, err := chainhash.NewHash(shaChainPriv[:]) + if err != nil { + return nil, err + } + shaChainProducer := shachain.NewRevocationProducer(*revRoot) + + return &ChannelShell{ + NodeAddrs: []net.Addr{&net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + }}, + Chan: &OpenChannel{ + chanStatus: chanStatus, + ChainHash: rev, + FundingOutpoint: chanPoint, + ShortChannelID: lnwire.NewShortChanIDFromInt( + uint64(rand.Int63()), + ), + IdentityPub: pub, + LocalChanCfg: ChannelConfig{ + ChannelConstraints: ChannelConstraints{ + CsvDelay: uint16(rand.Int63()), + }, + PaymentBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: keychain.KeyFamily(rand.Int63()), + Index: uint32(rand.Int63()), + }, + }, + }, + RemoteCurrentRevocation: pub, + IsPending: false, + RevocationStore: shachain.NewRevocationStore(), + RevocationProducer: shaChainProducer, + }, + }, nil +} + +// TestRestoreChannelShells tests that we're able to insert a partially channel +// populated to disk. This is useful for channel recovery purposes. We should +// find the new channel shell on disk, and also the db should be populated with +// an edge for that channel. +func TestRestoreChannelShells(t *testing.T) { + t.Parallel() + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + // First, we'll make our channel shell, it will only have the minimal + // amount of information required for us to initiate the data loss + // protection feature. + channelShell, err := genRandomChannelShell() + if err != nil { + t.Fatalf("unable to gen channel shell: %v", err) + } + + graph := cdb.ChannelGraph() + + // Before we can restore the channel, we'll need to make a source node + // in the graph as the channel edge we create will need to have a + // origin. + testNode, err := createTestVertex(cdb) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.SetSourceNode(testNode); err != nil { + t.Fatalf("unable to set source node: %v", err) + } + + // With the channel shell constructed, we'll now insert it into the + // database with the restoration method. + if err := cdb.RestoreChannelShells(channelShell); err != nil { + t.Fatalf("unable to restore channel shell: %v", err) + } + + // Now that the channel has been inserted, we'll attempt to query for + // it to ensure we can properly locate it via various means. + // + // First, we'll attempt to query for all channels that we have with the + // node public key that was restored. + nodeChans, err := cdb.FetchOpenChannels(channelShell.Chan.IdentityPub) + if err != nil { + t.Fatalf("unable find channel: %v", err) + } + + // We should now find a single channel from the database. + if len(nodeChans) != 1 { + t.Fatalf("unable to find restored channel by node "+ + "pubkey: %v", err) + } + + // That single channel should have the proper channel point, and also + // the expected set of flags to indicate that it was a restored + // channel. + if nodeChans[0].FundingOutpoint != channelShell.Chan.FundingOutpoint { + t.Fatalf("wrong funding outpoint: expected %v, got %v", + nodeChans[0].FundingOutpoint, + channelShell.Chan.FundingOutpoint) + } + if !nodeChans[0].HasChanStatus(ChanStatusRestored) { + t.Fatalf("node has wrong status flags: %v", + nodeChans[0].chanStatus) + } + + // We should also be able to find the channel if we query for it + // directly. + _, err = cdb.FetchChannel(channelShell.Chan.FundingOutpoint) + if err != nil { + t.Fatalf("unable to fetch channel: %v", err) + } + + // We should also be able to find the link node that was inserted by + // its public key. + linkNode, err := cdb.FetchLinkNode(channelShell.Chan.IdentityPub) + if err != nil { + t.Fatalf("unable to fetch link node: %v", err) + } + + // The node should have the same address, as specified in the channel + // shell. + if reflect.DeepEqual(linkNode.Addresses, channelShell.NodeAddrs) { + t.Fatalf("addr mismach: expected %v, got %v", + linkNode.Addresses, channelShell.NodeAddrs) + } + + // Finally, we'll ensure that the edge for the channel was properly + // inserted. + chanInfos, err := graph.FetchChanInfos( + []uint64{channelShell.Chan.ShortChannelID.ToUint64()}, + ) + if err != nil { + t.Fatalf("unable to find edges: %v", err) + } + + if len(chanInfos) != 1 { + t.Fatalf("wrong amount of chan infos: expected %v got %v", + len(chanInfos), 1) + } + + // We should only find a single edge. + if chanInfos[0].Policy1 != nil && chanInfos[0].Policy2 != nil { + t.Fatalf("only a single edge should be inserted: %v", err) + } +} From 9e5723e1bc223834859e78763b28e55cdd0fd278 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:54:25 -0800 Subject: [PATCH 10/13] lnwallet: modify ForceClose to disallow closing if local data loss In this commit, we ensure that if a channel is detected to have local data loss, then we don't allow a force close attempt, as this may not be possible, or cause us to play an invalid state. --- lnwallet/channel.go | 14 ++++++++++++-- lnwallet/channel_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index f10f20a7..9ed768e1 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -5640,14 +5640,24 @@ func (lc *LightningChannel) ForceClose() (*LocalForceCloseSummary, error) { lc.Lock() defer lc.Unlock() + // If we've detected local data loss for this channel, then we won't + // allow a force close, as it may be the case that we have a dated + // version of the commitment, or this is actually a channel shell. + if lc.channelState.HasChanStatus(channeldb.ChanStatusLocalDataLoss) { + return nil, fmt.Errorf("cannot force close channel with "+ + "state: %v", lc.channelState.ChanStatus()) + } + commitTx, err := lc.getSignedCommitTx() if err != nil { return nil, err } localCommitment := lc.channelState.LocalCommitment - summary, err := NewLocalForceCloseSummary(lc.channelState, - lc.Signer, lc.pCache, commitTx, localCommitment) + summary, err := NewLocalForceCloseSummary( + lc.channelState, lc.Signer, lc.pCache, commitTx, + localCommitment, + ) if err != nil { return nil, err } diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index e1c88e9e..f9e4821a 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -17,6 +17,7 @@ import ( "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" ) @@ -6274,3 +6275,33 @@ func TestChannelRestoreCommitHeight(t *testing.T) { bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 0, 2, 1) bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 1, 2, 2) } + +// TestForceCloseFailLocalDataLoss tests that we don't allow a force close of a +// channel that's in a non-default state. +func TestForceCloseFailLocalDataLoss(t *testing.T) { + t.Parallel() + + aliceChannel, _, cleanUp, err := CreateTestChannels() + if err != nil { + t.Fatalf("unable to create test channels: %v", err) + } + defer cleanUp() + + // Now that we have our set of channels, we'll modify the channel state + // to have a non-default channel flag. + err = aliceChannel.channelState.ApplyChanStatus( + channeldb.ChanStatusLocalDataLoss, + ) + if err != nil { + t.Fatalf("unable to apply channel state: %v", err) + } + + // Due to the change above, if we attempt to force close this + // channel, we should fail as it isn't safe to force close a + // channel that isn't in the pure default state. + _, err = aliceChannel.ForceClose() + if err == nil { + t.Fatalf("expected force close to fail due to non-default " + + "chan state") + } +} From 821de3e1079b42037c1a3ee33c0d975066b4ab62 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 3 Jan 2019 17:54:49 -0800 Subject: [PATCH 11/13] test: remove FullSync call from breacharbiter_test.go In this commit, we remove an extra openChannel.FullSync() call from breacharbiter_test.go. Before this collective diff, calling SyncPending() then FullSync() didn't result in an error. However, a prior commit now makes this an error to ensure we don't attempt to override any existing channels. This is the only area in the codebase that we made this mistake which in this case, was benign. --- breacharbiter_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/breacharbiter_test.go b/breacharbiter_test.go index dcdb0f8d..986cfb2d 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -1580,9 +1580,6 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa if err := channelAlice.State().SyncPending(addr, 101); err != nil { return nil, nil, nil, err } - if err := channelAlice.State().FullSync(); err != nil { - return nil, nil, nil, err - } addr = &net.TCPAddr{ IP: net.ParseIP("127.0.0.1"), @@ -1591,9 +1588,6 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa if err := channelBob.State().SyncPending(addr, 101); err != nil { return nil, nil, nil, err } - if err := channelBob.State().FullSync(); err != nil { - return nil, nil, nil, err - } cleanUpFunc := func() { dbBob.Close() From 4fd1f832d72914742ed1a56cb5b79ff39c0d7a1f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 4 Jan 2019 14:59:04 -0800 Subject: [PATCH 12/13] contractcourt+rpc: use new FetchChannel method instead of scanning In this commit, we modify areas where we need to force close a channel to use the new FetchChannel method instead of manually scanning. This dramatically reduces the CPU usage when doing things like closing a large number of channels within lnd. --- contractcourt/chain_arbitrator.go | 15 +------------- rpcserver.go | 33 ++----------------------------- 2 files changed, 3 insertions(+), 45 deletions(-) diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 696b4636..6652df23 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -224,23 +224,10 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, // With the channels fetched, attempt to locate // the target channel according to its channel // point. - dbChannels, err := c.chanSource.FetchAllChannels() + channel, err := c.chanSource.FetchChannel(chanPoint) if err != nil { return nil, err } - var channel *channeldb.OpenChannel - for _, dbChannel := range dbChannels { - if dbChannel.FundingOutpoint == chanPoint { - channel = dbChannel - break - } - } - - // If the channel cannot be located, then we - // exit with an error to the channel. - if channel == nil { - return nil, fmt.Errorf("unable to find channel") - } chanMachine, err := lnwallet.NewLightningChannel( c.cfg.Signer, c.cfg.PreimageDB, channel, nil, diff --git a/rpcserver.go b/rpcserver.go index f992023d..46be68e8 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1709,7 +1709,7 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, // With the chanPoint constructed, we'll attempt to find the target // channel in the database. If we can't find the channel, then we'll // return the error back to the caller. - dbChan, err := r.fetchOpenDbChannel(*chanPoint) + dbChan, err := r.server.chanDB.FetchChannel(*chanPoint) if err != nil { return nil, err } @@ -1746,42 +1746,13 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, return &lnrpc.AbandonChannelResponse{}, nil } -// fetchOpenDbChannel attempts to locate a channel identified by its channel -// point from the database's set of all currently opened channels. -func (r *rpcServer) fetchOpenDbChannel(chanPoint wire.OutPoint) ( - *channeldb.OpenChannel, error) { - - dbChannels, err := r.server.chanDB.FetchAllChannels() - if err != nil { - return nil, err - } - - // With the channels fetched, attempt to locate the target channel - // according to its channel point. - var dbChan *channeldb.OpenChannel - for _, dbChannel := range dbChannels { - if dbChannel.FundingOutpoint == chanPoint { - dbChan = dbChannel - break - } - } - - // If the channel cannot be located, then we exit with an error to the - // caller. - if dbChan == nil { - return nil, fmt.Errorf("unable to find channel") - } - - return dbChan, nil -} - // fetchActiveChannel attempts to locate a channel identified by its channel // point from the database's set of all currently opened channels and // return it as a fully populated state machine func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) ( *lnwallet.LightningChannel, error) { - dbChan, err := r.fetchOpenDbChannel(chanPoint) + dbChan, err := r.server.chanDB.FetchChannel(chanPoint) if err != nil { return nil, err } From 480ec3bbca935bb597d617fc8ed7a28e1d22617f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 18 Jan 2019 19:00:37 -0800 Subject: [PATCH 13/13] channeldb: ensure restored channels can't be mutated --- channeldb/channel.go | 26 ++++++++++++++++++++++++++ channeldb/db_test.go | 16 ++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/channeldb/channel.go b/channeldb/channel.go index a68f517c..42a53a7c 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -102,6 +102,11 @@ var ( // ErrNoCommitPoint is returned when no data loss commit point is found // in the database. ErrNoCommitPoint = fmt.Errorf("no commit point found") + + // ErrNoRestoredChannelMutation is returned when a caller attempts to + // mutate a channel that's been recovered. + ErrNoRestoredChannelMutation = fmt.Errorf("cannot mutate restored " + + "channel state") ) // ChannelType is an enum-like type that describes one of several possible @@ -958,6 +963,13 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error { c.Lock() defer c.Unlock() + // If this is a restored channel, then we want to avoid mutating the + // state as all, as it's impossible to do so in a protocol compliant + // manner. + if c.hasChanStatus(ChanStatusRestored) { + return ErrNoRestoredChannelMutation + } + err := c.Db.Update(func(tx *bbolt.Tx) error { chanBucket, err := fetchChanBucket( tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, @@ -1379,6 +1391,13 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error { c.Lock() defer c.Unlock() + // If this is a restored channel, then we want to avoid mutating the + // state as all, as it's impossible to do so in a protocol compliant + // manner. + if c.hasChanStatus(ChanStatusRestored) { + return ErrNoRestoredChannelMutation + } + return c.Db.Update(func(tx *bbolt.Tx) error { // First, we'll grab the writable bucket where this channel's // data resides. @@ -1503,6 +1522,13 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error { c.Lock() defer c.Unlock() + // If this is a restored channel, then we want to avoid mutating the + // state as all, as it's impossible to do so in a protocol compliant + // manner. + if c.hasChanStatus(ChanStatusRestored) { + return ErrNoRestoredChannelMutation + } + var newRemoteCommit *ChannelCommitment err := c.Db.Update(func(tx *bbolt.Tx) error { diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 261e0b36..43b62619 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -395,6 +395,22 @@ func TestRestoreChannelShells(t *testing.T) { "pubkey: %v", err) } + // Ensure that it isn't possible to modify the commitment state machine + // of this restored channel. + channel := nodeChans[0] + err = channel.UpdateCommitment(nil) + if err != ErrNoRestoredChannelMutation { + t.Fatalf("able to mutate restored channel") + } + err = channel.AppendRemoteCommitChain(nil) + if err != ErrNoRestoredChannelMutation { + t.Fatalf("able to mutate restored channel") + } + err = channel.AdvanceCommitChainTail(nil) + if err != ErrNoRestoredChannelMutation { + t.Fatalf("able to mutate restored channel") + } + // That single channel should have the proper channel point, and also // the expected set of flags to indicate that it was a restored // channel.