diff --git a/breacharbiter_test.go b/breacharbiter_test.go index dcdb0f8d..986cfb2d 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -1580,9 +1580,6 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa if err := channelAlice.State().SyncPending(addr, 101); err != nil { return nil, nil, nil, err } - if err := channelAlice.State().FullSync(); err != nil { - return nil, nil, nil, err - } addr = &net.TCPAddr{ IP: net.ParseIP("127.0.0.1"), @@ -1591,9 +1588,6 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa if err := channelBob.State().SyncPending(addr, 101); err != nil { return nil, nil, nil, err } - if err := channelBob.State().FullSync(); err != nil { - return nil, nil, nil, err - } cleanUpFunc := func() { dbBob.Close() diff --git a/channeldb/channel.go b/channeldb/channel.go index 5d59f9dc..42a53a7c 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -7,6 +7,8 @@ import ( "fmt" "io" "net" + "strconv" + "strings" "sync" "github.com/btcsuite/btcd/btcec" @@ -100,6 +102,11 @@ var ( // ErrNoCommitPoint is returned when no data loss commit point is found // in the database. ErrNoCommitPoint = fmt.Errorf("no commit point found") + + // ErrNoRestoredChannelMutation is returned when a caller attempts to + // mutate a channel that's been recovered. + ErrNoRestoredChannelMutation = fmt.Errorf("cannot mutate restored " + + "channel state") ) // ChannelType is an enum-like type that describes one of several possible @@ -293,40 +300,81 @@ type ChannelCommitment struct { type ChannelStatus uint8 var ( - // Default is the normal state of an open channel. - Default ChannelStatus + // ChanStatusDefault is the normal state of an open channel. + ChanStatusDefault ChannelStatus - // Borked indicates that the channel has entered an irreconcilable - // state, triggered by a state desynchronization or channel breach. - // Channels in this state should never be added to the htlc switch. - Borked ChannelStatus = 1 + // ChanStatusBorked indicates that the channel has entered an + // irreconcilable state, triggered by a state desynchronization or + // channel breach. Channels in this state should never be added to the + // htlc switch. + ChanStatusBorked ChannelStatus = 1 - // CommitmentBroadcasted indicates that a commitment for this channel - // has been broadcasted. - CommitmentBroadcasted ChannelStatus = 1 << 1 + // ChanStatusCommitBroadcasted indicates that a commitment for this + // channel has been broadcasted. + ChanStatusCommitBroadcasted ChannelStatus = 1 << 1 - // LocalDataLoss indicates that we have lost channel state for this - // channel, and broadcasting our latest commitment might be considered - // a breach. + // ChanStatusLocalDataLoss indicates that we have lost channel state + // for this channel, and broadcasting our latest commitment might be + // considered a breach. + // // TODO(halseh): actually enforce that we are not force closing such a // channel. - LocalDataLoss ChannelStatus = 1 << 2 + ChanStatusLocalDataLoss ChannelStatus = 1 << 2 + + // ChanStatusRestored is a status flag that signals that the channel + // has been restored, and doesn't have all the fields a typical channel + // will have. + ChanStatusRestored ChannelStatus = 1 << 3 ) +// chanStatusStrings maps a ChannelStatus to a human friendly string that +// describes that status. +var chanStatusStrings = map[ChannelStatus]string{ + ChanStatusDefault: "ChanStatusDefault", + ChanStatusBorked: "ChanStatusBorked", + ChanStatusCommitBroadcasted: "ChanStatusCommitBroadcasted", + ChanStatusLocalDataLoss: "ChanStatusLocalDataLoss", + ChanStatusRestored: "ChanStatusRestored", +} + +// orderedChanStatusFlags is an in-order list of all that channel status flags. +var orderedChanStatusFlags = []ChannelStatus{ + ChanStatusDefault, + ChanStatusBorked, + ChanStatusCommitBroadcasted, + ChanStatusLocalDataLoss, + ChanStatusRestored, +} + // String returns a human-readable representation of the ChannelStatus. func (c ChannelStatus) String() string { - switch c { - case Default: - return "Default" - case Borked: - return "Borked" - case CommitmentBroadcasted: - return "CommitmentBroadcasted" - case LocalDataLoss: - return "LocalDataLoss" - default: - return fmt.Sprintf("Unknown(%08b)", c) + // If no flags are set, then this is the default case. + if c == 0 { + return chanStatusStrings[ChanStatusDefault] } + + // Add individual bit flags. + statusStr := "" + for _, flag := range orderedChanStatusFlags { + if c&flag == flag { + statusStr += chanStatusStrings[flag] + "|" + c -= flag + } + } + + // Remove anything to the right of the final bar, including it as well. + statusStr = strings.TrimRight(statusStr, "|") + + // Add any remaining flags which aren't accounted for as hex. + if c != 0 { + statusStr += "|0x" + strconv.FormatUint(uint64(c), 16) + } + + // If this was purely an unknown flag, then remove the extra bar at the + // start of the string. + statusStr = strings.TrimLeft(statusStr, "|") + + return statusStr } // OpenChannel encapsulates the persistent and dynamic state of an open channel @@ -488,6 +536,28 @@ func (c *OpenChannel) ChanStatus() ChannelStatus { return c.chanStatus } +// ApplyChanStatus allows the caller to modify the internal channel state in a +// thead-safe manner. +func (c *OpenChannel) ApplyChanStatus(status ChannelStatus) error { + c.Lock() + defer c.Unlock() + + return c.putChanStatus(status) +} + +// HasChanStatus returns true if the internal bitfield channel status of the +// target channel has the specified status bit set. +func (c *OpenChannel) HasChanStatus(status ChannelStatus) bool { + c.RLock() + defer c.RUnlock() + + return c.hasChanStatus(status) +} + +func (c *OpenChannel) hasChanStatus(status ChannelStatus) bool { + return c.chanStatus&status == status +} + // RefreshShortChanID updates the in-memory short channel ID using the latest // value observed on disk. func (c *OpenChannel) RefreshShortChanID() error { @@ -591,16 +661,20 @@ func (c *OpenChannel) fullSync(tx *bbolt.Tx) error { } // With the bucket for the node fetched, we can now go down another - // level, creating the bucket (if it doesn't exist), for this channel - // itself. + // level, creating the bucket for this channel itself. var chanPointBuf bytes.Buffer if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil { return err } - chanBucket, err := chainBucket.CreateBucketIfNotExists( + chanBucket, err := chainBucket.CreateBucket( chanPointBuf.Bytes(), ) - if err != nil { + switch { + case err == bbolt.ErrBucketExists: + // If this channel already exists, then in order to avoid + // overriding it, we'll return an error back up to the caller. + return ErrChanAlreadyExists + case err != nil: return err } @@ -664,7 +738,7 @@ func (c *OpenChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error { // Add status LocalDataLoss to the existing bitvector found in // the DB. - status = channel.chanStatus | LocalDataLoss + status = channel.chanStatus | ChanStatusLocalDataLoss channel.chanStatus = status var b bytes.Buffer @@ -730,7 +804,7 @@ func (c *OpenChannel) MarkBorked() error { c.Lock() defer c.Unlock() - return c.putChanStatus(Borked) + return c.putChanStatus(ChanStatusBorked) } // MarkCommitmentBroadcasted marks the channel as a commitment transaction has @@ -740,7 +814,7 @@ func (c *OpenChannel) MarkCommitmentBroadcasted() error { c.Lock() defer c.Unlock() - return c.putChanStatus(CommitmentBroadcasted) + return c.putChanStatus(ChanStatusCommitBroadcasted) } func (c *OpenChannel) putChanStatus(status ChannelStatus) error { @@ -846,35 +920,40 @@ func (c *OpenChannel) SyncPending(addr net.Addr, pendingHeight uint32) error { c.FundingBroadcastHeight = pendingHeight return c.Db.Update(func(tx *bbolt.Tx) error { - // First, sync all the persistent channel state to disk. - if err := c.fullSync(tx); err != nil { - return err - } - - nodeInfoBucket, err := tx.CreateBucketIfNotExists(nodeInfoBucket) - if err != nil { - return err - } - - // If a LinkNode for this identity public key already exists, - // then we can exit early. - nodePub := c.IdentityPub.SerializeCompressed() - if nodeInfoBucket.Get(nodePub) != nil { - return nil - } - - // Next, we need to establish a (possibly) new LinkNode - // relationship for this channel. The LinkNode metadata - // contains reachability, up-time, and service bits related - // information. - linkNode := c.Db.NewLinkNode(wire.MainNet, c.IdentityPub, addr) - - // TODO(roasbeef): do away with link node all together? - - return putLinkNode(nodeInfoBucket, linkNode) + return syncNewChannel(tx, c, []net.Addr{addr}) }) } +// syncNewChannel will write the passed channel to disk, and also create a +// LinkNode (if needed) for the channel peer. +func syncNewChannel(tx *bbolt.Tx, c *OpenChannel, addrs []net.Addr) error { + // First, sync all the persistent channel state to disk. + if err := c.fullSync(tx); err != nil { + return err + } + + nodeInfoBucket, err := tx.CreateBucketIfNotExists(nodeInfoBucket) + if err != nil { + return err + } + + // If a LinkNode for this identity public key already exists, + // then we can exit early. + nodePub := c.IdentityPub.SerializeCompressed() + if nodeInfoBucket.Get(nodePub) != nil { + return nil + } + + // Next, we need to establish a (possibly) new LinkNode relationship + // for this channel. The LinkNode metadata contains reachability, + // up-time, and service bits related information. + linkNode := c.Db.NewLinkNode(wire.MainNet, c.IdentityPub, addrs...) + + // TODO(roasbeef): do away with link node all together? + + return putLinkNode(nodeInfoBucket, linkNode) +} + // UpdateCommitment updates the commitment state for the specified party // (remote or local). The commitment stat completely describes the balance // state at this point in the commitment chain. This method its to be called on @@ -884,6 +963,13 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error { c.Lock() defer c.Unlock() + // If this is a restored channel, then we want to avoid mutating the + // state as all, as it's impossible to do so in a protocol compliant + // manner. + if c.hasChanStatus(ChanStatusRestored) { + return ErrNoRestoredChannelMutation + } + err := c.Db.Update(func(tx *bbolt.Tx) error { chanBucket, err := fetchChanBucket( tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, @@ -898,7 +984,9 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error { // With the proper bucket fetched, we'll now write toe latest // commitment state to dis for the target party. - err = putChanCommitment(chanBucket, newCommitment, true) + err = putChanCommitment( + chanBucket, newCommitment, true, + ) if err != nil { return fmt.Errorf("unable to store chan "+ "revocations: %v", err) @@ -1303,6 +1391,13 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error { c.Lock() defer c.Unlock() + // If this is a restored channel, then we want to avoid mutating the + // state as all, as it's impossible to do so in a protocol compliant + // manner. + if c.hasChanStatus(ChanStatusRestored) { + return ErrNoRestoredChannelMutation + } + return c.Db.Update(func(tx *bbolt.Tx) error { // First, we'll grab the writable bucket where this channel's // data resides. @@ -1427,6 +1522,13 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error { c.Lock() defer c.Unlock() + // If this is a restored channel, then we want to avoid mutating the + // state as all, as it's impossible to do so in a protocol compliant + // manner. + if c.hasChanStatus(ChanStatusRestored) { + return ErrNoRestoredChannelMutation + } + var newRemoteCommit *ChannelCommitment err := c.Db.Update(func(tx *bbolt.Tx) error { @@ -1466,7 +1568,9 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error { if err != nil { return err } - err = putChanCommitment(chanBucket, &newCommit.Commitment, false) + err = putChanCommitment( + chanBucket, &newCommit.Commitment, false, + ) if err != nil { return err } @@ -2279,12 +2383,22 @@ func putChanCommitment(chanBucket *bbolt.Bucket, c *ChannelCommitment, } func putChanCommitments(chanBucket *bbolt.Bucket, channel *OpenChannel) error { - err := putChanCommitment(chanBucket, &channel.LocalCommitment, true) + // If this is a restored channel, then we don't have any commitments to + // write. + if channel.hasChanStatus(ChanStatusRestored) { + return nil + } + + err := putChanCommitment( + chanBucket, &channel.LocalCommitment, true, + ) if err != nil { return err } - return putChanCommitment(chanBucket, &channel.RemoteCommitment, false) + return putChanCommitment( + chanBucket, &channel.RemoteCommitment, false, + ) } func putChanRevocationState(chanBucket *bbolt.Bucket, channel *OpenChannel) error { @@ -2399,6 +2513,12 @@ func fetchChanCommitment(chanBucket *bbolt.Bucket, local bool) (ChannelCommitmen func fetchChanCommitments(chanBucket *bbolt.Bucket, channel *OpenChannel) error { var err error + // If this is a restored channel, then we don't have any commitments to + // read. + if channel.hasChanStatus(ChanStatusRestored) { + return nil + } + channel.LocalCommitment, err = fetchChanCommitment(chanBucket, true) if err != nil { return err diff --git a/channeldb/db.go b/channeldb/db.go index d71656a7..c4262ffa 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -4,9 +4,11 @@ import ( "bytes" "encoding/binary" "fmt" + "net" "os" "path/filepath" "sync" + "time" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" @@ -407,6 +409,102 @@ func (d *DB) fetchNodeChannels(chainBucket *bbolt.Bucket) ([]*OpenChannel, error return channels, nil } +// FetchChannel attempts to locate a channel specified by the passed channel +// point. If the channel cannot be found, then an error will be returned. +func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) { + var ( + targetChan *OpenChannel + targetChanPoint bytes.Buffer + ) + + if err := writeOutpoint(&targetChanPoint, &chanPoint); err != nil { + return nil, err + } + + // chanScan will traverse the following bucket structure: + // * nodePub => chainHash => chanPoint + // + // At each level we go one further, ensuring that we're traversing the + // proper key (that's actually a bucket). By only reading the bucket + // structure and skipping fully decoding each channel, we save a good + // bit of CPU as we don't need to do things like decompress public + // keys. + chanScan := func(tx *bbolt.Tx) error { + // Get the bucket dedicated to storing the metadata for open + // channels. + openChanBucket := tx.Bucket(openChannelBucket) + if openChanBucket == nil { + return ErrNoActiveChannels + } + + // Within the node channel bucket, are the set of node pubkeys + // we have channels with, we don't know the entire set, so + // we'll check them all. + return openChanBucket.ForEach(func(nodePub, v []byte) error { + // Ensure that this is a key the same size as a pubkey, + // and also that it leads directly to a bucket. + if len(nodePub) != 33 || v != nil { + return nil + } + + nodeChanBucket := openChanBucket.Bucket(nodePub) + if nodeChanBucket == nil { + return nil + } + + // The next layer down is all the chains that this node + // has channels on with us. + return nodeChanBucket.ForEach(func(chainHash, v []byte) error { + // If there's a value, it's not a bucket so + // ignore it. + if v != nil { + return nil + } + + chainBucket := nodeChanBucket.Bucket(chainHash) + if chainBucket == nil { + return fmt.Errorf("unable to read "+ + "bucket for chain=%x", chainHash[:]) + } + + // Finally we reach the leaf bucket that stores + // all the chanPoints for this node. + chanBucket := chainBucket.Bucket( + targetChanPoint.Bytes(), + ) + if chanBucket == nil { + return nil + } + + channel, err := fetchOpenChannel( + chanBucket, &chanPoint, + ) + if err != nil { + return err + } + + targetChan = channel + targetChan.Db = d + + return nil + }) + }) + } + + err := d.View(chanScan) + if err != nil { + return nil, err + } + + if targetChan != nil { + return targetChan, nil + } + + // If we can't find the channel, then we return with an error, as we + // have nothing to backup. + return nil, ErrChannelNotFound +} + // FetchAllChannels attempts to retrieve all open channels currently stored // within the database, including pending open, fully open and channels waiting // for a closing transaction to confirm. @@ -530,7 +628,7 @@ func fetchChannels(d *DB, pending, waitingClose bool) ([]*OpenChannel, error) { // than Default, then it means it is // waiting to be closed. channelWaitingClose := - channel.ChanStatus() != Default + channel.ChanStatus() != ChanStatusDefault // Only include it if we requested // channels with the same waitingClose @@ -772,6 +870,165 @@ func (d *DB) PruneLinkNodes() error { }) } +// ChannelShell is a shell of a channel that is meant to be used for channel +// recovery purposes. It contains a minimal OpenChannel instance along with +// addresses for that target node. +type ChannelShell struct { + // NodeAddrs the set of addresses that this node has known to be + // reachable at in the past. + NodeAddrs []net.Addr + + // Chan is a shell of an OpenChannel, it contains only the items + // required to restore the channel on disk. + Chan *OpenChannel +} + +// RestoreChannelShells is a method that allows the caller to reconstruct the +// state of an OpenChannel from the ChannelShell. We'll attempt to write the +// new channel to disk, create a LinkNode instance with the passed node +// addresses, and finally create an edge within the graph for the channel as +// well. This method is idempotent, so repeated calls with the same set of +// channel shells won't modify the database after the initial call. +func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error { + chanGraph := ChannelGraph{d} + + return d.Update(func(tx *bbolt.Tx) error { + for _, channelShell := range channelShells { + channel := channelShell.Chan + + // First, we'll attempt to create a new open channel + // and link node for this channel. If the channel + // already exists, then in order to ensure this method + // is idempotent, we'll continue to the next step. + channel.Db = d + err := syncNewChannel( + tx, channel, channelShell.NodeAddrs, + ) + if err != nil { + return err + } + + // Next, we'll create an active edge in the graph + // database for this channel in order to restore our + // partial view of the network. + // + // TODO(roasbeef): if we restore *after* the channel + // has been closed on chain, then need to inform the + // router that it should try and prune these values as + // we can detect them + edgeInfo := ChannelEdgeInfo{ + ChannelID: channel.ShortChannelID.ToUint64(), + ChainHash: channel.ChainHash, + ChannelPoint: channel.FundingOutpoint, + } + + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return ErrGraphNotFound + } + selfNode, err := chanGraph.sourceNode(nodes) + if err != nil { + return err + } + + // Depending on which pub key is smaller, we'll assign + // our roles as "node1" and "node2". + chanPeer := channel.IdentityPub.SerializeCompressed() + selfIsSmaller := bytes.Compare( + selfNode.PubKeyBytes[:], chanPeer, + ) == -1 + if selfIsSmaller { + copy(edgeInfo.NodeKey1Bytes[:], selfNode.PubKeyBytes[:]) + copy(edgeInfo.NodeKey2Bytes[:], chanPeer) + } else { + copy(edgeInfo.NodeKey1Bytes[:], chanPeer) + copy(edgeInfo.NodeKey2Bytes[:], selfNode.PubKeyBytes[:]) + } + + // With the edge info shell constructed, we'll now add + // it to the graph. + err = chanGraph.addChannelEdge(tx, &edgeInfo) + if err != nil { + return err + } + + // Similarly, we'll construct a channel edge shell and + // add that itself to the graph. + chanEdge := ChannelEdgePolicy{ + ChannelID: edgeInfo.ChannelID, + LastUpdate: time.Now(), + } + + // If their pubkey is larger, then we'll flip the + // direction bit to indicate that us, the "second" node + // is updating their policy. + if !selfIsSmaller { + chanEdge.ChannelFlags |= lnwire.ChanUpdateDirection + } + + err = updateEdgePolicy(tx, &chanEdge) + if err != nil { + return err + } + } + + return nil + }) +} + +// AddrsForNode consults the graph and channel database for all addresses known +// to the passed node public key. +func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) { + var ( + linkNode *LinkNode + graphNode LightningNode + ) + + dbErr := d.View(func(tx *bbolt.Tx) error { + var err error + + linkNode, err = fetchLinkNode(tx, nodePub) + if err != nil { + return err + } + + // We'll also query the graph for this peer to see if they have + // any addresses that we don't currently have stored within the + // link node database. + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return ErrGraphNotFound + } + compressedPubKey := nodePub.SerializeCompressed() + graphNode, err = fetchLightningNode(nodes, compressedPubKey) + if err != nil { + return err + } + + return nil + }) + if dbErr != nil { + return nil, dbErr + } + + // Now that we have both sources of addrs for this node, we'll use a + // map to de-duplicate any addresses between the two sources, and + // produce a final list of the combined addrs. + addrs := make(map[string]net.Addr) + for _, addr := range linkNode.Addresses { + addrs[addr.String()] = addr + } + for _, addr := range graphNode.Addresses { + addrs[addr.String()] = addr + } + dedupedAddrs := make([]net.Addr, 0, len(addrs)) + for _, addr := range addrs { + dedupedAddrs = append(dedupedAddrs, addr) + } + + return dedupedAddrs, nil +} + // syncVersions function is used for safe db version synchronization. It // applies migration functions to the current database and recovers the // previous state of db if at least one error/panic appeared during migration. diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 794b1fcf..43b62619 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -2,12 +2,22 @@ package channeldb import ( "io/ioutil" + "math" + "math/rand" + "net" "os" "path/filepath" + "reflect" "testing" + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/shachain" ) func TestOpenWithCreate(t *testing.T) { @@ -147,3 +157,310 @@ func TestFetchClosedChannelForID(t *testing.T) { t.Fatalf("expected ErrClosedChannelNotFound, instead got: %v", err) } } + +// TestAddrsForNode tests the we're able to properly obtain all the addresses +// for a target node. +func TestAddrsForNode(t *testing.T) { + t.Parallel() + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + graph := cdb.ChannelGraph() + + // We'll make a test vertex to insert into the database, as the source + // node, but this node will only have half the number of addresses it + // usually does. + testNode, err := createTestVertex(cdb) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + testNode.Addresses = []net.Addr{testAddr} + if err := graph.SetSourceNode(testNode); err != nil { + t.Fatalf("unable to set source node: %v", err) + } + + // Next, we'll make a link node with the same pubkey, but with an + // additional address. + nodePub, err := testNode.PubKey() + if err != nil { + t.Fatalf("unable to recv node pub: %v", err) + } + linkNode := cdb.NewLinkNode( + wire.MainNet, nodePub, anotherAddr, + ) + if err := linkNode.Sync(); err != nil { + t.Fatalf("unable to sync link node: %v", err) + } + + // Now that we've created a link node, as well as a vertex for the + // node, we'll query for all its addresses. + nodeAddrs, err := cdb.AddrsForNode(nodePub) + if err != nil { + t.Fatalf("unable to obtain node addrs: %v", err) + } + + expectedAddrs := make(map[string]struct{}) + expectedAddrs[testAddr.String()] = struct{}{} + expectedAddrs[anotherAddr.String()] = struct{}{} + + // Finally, ensure that all the expected addresses are found. + if len(nodeAddrs) != len(expectedAddrs) { + t.Fatalf("expected %v addrs, got %v", + len(expectedAddrs), len(nodeAddrs)) + } + for _, addr := range nodeAddrs { + if _, ok := expectedAddrs[addr.String()]; !ok { + t.Fatalf("unexpected addr: %v", addr) + } + } +} + +// TestFetchChannel tests that we're able to fetch an arbitrary channel from +// disk. +func TestFetchChannel(t *testing.T) { + t.Parallel() + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + // Create the test channel state that we'll sync to the database + // shortly. + channelState, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + + // Mark the channel as pending, then immediately mark it as open to it + // can be fully visible. + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + if err := channelState.SyncPending(addr, 9); err != nil { + t.Fatalf("unable to save and serialize channel state: %v", err) + } + err = channelState.MarkAsOpen(lnwire.NewShortChanIDFromInt(99)) + if err != nil { + t.Fatalf("unable to mark channel open: %v", err) + } + + // Next, attempt to fetch the channel by its chan point. + dbChannel, err := cdb.FetchChannel(channelState.FundingOutpoint) + if err != nil { + t.Fatalf("unable to fetch channel: %v", err) + } + + // The decoded channel state should be identical to what we stored + // above. + if !reflect.DeepEqual(channelState, dbChannel) { + t.Fatalf("channel state doesn't match:: %v vs %v", + spew.Sdump(channelState), spew.Sdump(dbChannel)) + } + + // If we attempt to query for a non-exist ante channel, then we should + // get an error. + channelState2, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + channelState2.FundingOutpoint.Index ^= 1 + + _, err = cdb.FetchChannel(channelState2.FundingOutpoint) + if err == nil { + t.Fatalf("expected query to fail") + } +} + +func genRandomChannelShell() (*ChannelShell, error) { + var testPriv [32]byte + if _, err := rand.Read(testPriv[:]); err != nil { + return nil, err + } + + _, pub := btcec.PrivKeyFromBytes(btcec.S256(), testPriv[:]) + + var chanPoint wire.OutPoint + if _, err := rand.Read(chanPoint.Hash[:]); err != nil { + return nil, err + } + + pub.Curve = nil + + chanPoint.Index = uint32(rand.Intn(math.MaxUint16)) + + chanStatus := ChanStatusDefault | ChanStatusRestored + + var shaChainPriv [32]byte + if _, err := rand.Read(testPriv[:]); err != nil { + return nil, err + } + revRoot, err := chainhash.NewHash(shaChainPriv[:]) + if err != nil { + return nil, err + } + shaChainProducer := shachain.NewRevocationProducer(*revRoot) + + return &ChannelShell{ + NodeAddrs: []net.Addr{&net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + }}, + Chan: &OpenChannel{ + chanStatus: chanStatus, + ChainHash: rev, + FundingOutpoint: chanPoint, + ShortChannelID: lnwire.NewShortChanIDFromInt( + uint64(rand.Int63()), + ), + IdentityPub: pub, + LocalChanCfg: ChannelConfig{ + ChannelConstraints: ChannelConstraints{ + CsvDelay: uint16(rand.Int63()), + }, + PaymentBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: keychain.KeyFamily(rand.Int63()), + Index: uint32(rand.Int63()), + }, + }, + }, + RemoteCurrentRevocation: pub, + IsPending: false, + RevocationStore: shachain.NewRevocationStore(), + RevocationProducer: shaChainProducer, + }, + }, nil +} + +// TestRestoreChannelShells tests that we're able to insert a partially channel +// populated to disk. This is useful for channel recovery purposes. We should +// find the new channel shell on disk, and also the db should be populated with +// an edge for that channel. +func TestRestoreChannelShells(t *testing.T) { + t.Parallel() + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + // First, we'll make our channel shell, it will only have the minimal + // amount of information required for us to initiate the data loss + // protection feature. + channelShell, err := genRandomChannelShell() + if err != nil { + t.Fatalf("unable to gen channel shell: %v", err) + } + + graph := cdb.ChannelGraph() + + // Before we can restore the channel, we'll need to make a source node + // in the graph as the channel edge we create will need to have a + // origin. + testNode, err := createTestVertex(cdb) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.SetSourceNode(testNode); err != nil { + t.Fatalf("unable to set source node: %v", err) + } + + // With the channel shell constructed, we'll now insert it into the + // database with the restoration method. + if err := cdb.RestoreChannelShells(channelShell); err != nil { + t.Fatalf("unable to restore channel shell: %v", err) + } + + // Now that the channel has been inserted, we'll attempt to query for + // it to ensure we can properly locate it via various means. + // + // First, we'll attempt to query for all channels that we have with the + // node public key that was restored. + nodeChans, err := cdb.FetchOpenChannels(channelShell.Chan.IdentityPub) + if err != nil { + t.Fatalf("unable find channel: %v", err) + } + + // We should now find a single channel from the database. + if len(nodeChans) != 1 { + t.Fatalf("unable to find restored channel by node "+ + "pubkey: %v", err) + } + + // Ensure that it isn't possible to modify the commitment state machine + // of this restored channel. + channel := nodeChans[0] + err = channel.UpdateCommitment(nil) + if err != ErrNoRestoredChannelMutation { + t.Fatalf("able to mutate restored channel") + } + err = channel.AppendRemoteCommitChain(nil) + if err != ErrNoRestoredChannelMutation { + t.Fatalf("able to mutate restored channel") + } + err = channel.AdvanceCommitChainTail(nil) + if err != ErrNoRestoredChannelMutation { + t.Fatalf("able to mutate restored channel") + } + + // That single channel should have the proper channel point, and also + // the expected set of flags to indicate that it was a restored + // channel. + if nodeChans[0].FundingOutpoint != channelShell.Chan.FundingOutpoint { + t.Fatalf("wrong funding outpoint: expected %v, got %v", + nodeChans[0].FundingOutpoint, + channelShell.Chan.FundingOutpoint) + } + if !nodeChans[0].HasChanStatus(ChanStatusRestored) { + t.Fatalf("node has wrong status flags: %v", + nodeChans[0].chanStatus) + } + + // We should also be able to find the channel if we query for it + // directly. + _, err = cdb.FetchChannel(channelShell.Chan.FundingOutpoint) + if err != nil { + t.Fatalf("unable to fetch channel: %v", err) + } + + // We should also be able to find the link node that was inserted by + // its public key. + linkNode, err := cdb.FetchLinkNode(channelShell.Chan.IdentityPub) + if err != nil { + t.Fatalf("unable to fetch link node: %v", err) + } + + // The node should have the same address, as specified in the channel + // shell. + if reflect.DeepEqual(linkNode.Addresses, channelShell.NodeAddrs) { + t.Fatalf("addr mismach: expected %v, got %v", + linkNode.Addresses, channelShell.NodeAddrs) + } + + // Finally, we'll ensure that the edge for the channel was properly + // inserted. + chanInfos, err := graph.FetchChanInfos( + []uint64{channelShell.Chan.ShortChannelID.ToUint64()}, + ) + if err != nil { + t.Fatalf("unable to find edges: %v", err) + } + + if len(chanInfos) != 1 { + t.Fatalf("wrong amount of chan infos: expected %v got %v", + len(chanInfos), 1) + } + + // We should only find a single edge. + if chanInfos[0].Policy1 != nil && chanInfos[0].Policy2 != nil { + t.Fatalf("only a single edge should be inserted: %v", err) + } +} diff --git a/channeldb/error.go b/channeldb/error.go index c9ddfb49..c0548106 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -103,6 +103,11 @@ var ( // indicate it should be. ErrEdgePolicyOptionalFieldNotFound = fmt.Errorf("optional field not " + "present") + + // ErrChanAlreadyExists is return when the caller attempts to create a + // channel with a channel point that is already present in the + // database. + ErrChanAlreadyExists = fmt.Errorf("channel already exists") ) // ErrTooManyExtraOpaqueBytes creates an error which should be returned if the diff --git a/channeldb/graph.go b/channeldb/graph.go index dba5108c..c13e1879 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -483,101 +483,105 @@ func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket, // the channel supports. The chanPoint and chanID are used to uniquely identify // the edge globally within the database. func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { + return c.db.Update(func(tx *bbolt.Tx) error { + return c.addChannelEdge(tx, edge) + }) +} + +// addChannelEdge is the private form of AddChannelEdge that allows callers to +// utilize an existing db transaction. +func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error { // Construct the channel's primary key which is the 8-byte channel ID. var chanKey [8]byte binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID) - return c.db.Update(func(tx *bbolt.Tx) error { - nodes, err := tx.CreateBucketIfNotExists(nodeBucket) - if err != nil { - return err - } - edges, err := tx.CreateBucketIfNotExists(edgeBucket) - if err != nil { - return err - } - edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket) - if err != nil { - return err - } - chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket) + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return err + } + edges, err := tx.CreateBucketIfNotExists(edgeBucket) + if err != nil { + return err + } + edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket) + if err != nil { + return err + } + chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket) + if err != nil { + return err + } + + // First, attempt to check if this edge has already been created. If + // so, then we can exit early as this method is meant to be idempotent. + if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil { + return ErrEdgeAlreadyExist + } + + // Before we insert the channel into the database, we'll ensure that + // both nodes already exist in the channel graph. If either node + // doesn't, then we'll insert a "shell" node that just includes its + // public key, so subsequent validation and queries can work properly. + _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:]) + switch { + case node1Err == ErrGraphNodeNotFound: + node1Shell := LightningNode{ + PubKeyBytes: edge.NodeKey1Bytes, + HaveNodeAnnouncement: false, + } + err := addLightningNode(tx, &node1Shell) + if err != nil { + return fmt.Errorf("unable to create shell node "+ + "for: %x", edge.NodeKey1Bytes) + + } + case node1Err != nil: + return err + } + + _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:]) + switch { + case node2Err == ErrGraphNodeNotFound: + node2Shell := LightningNode{ + PubKeyBytes: edge.NodeKey2Bytes, + HaveNodeAnnouncement: false, + } + err := addLightningNode(tx, &node2Shell) + if err != nil { + return fmt.Errorf("unable to create shell node "+ + "for: %x", edge.NodeKey2Bytes) + + } + case node2Err != nil: + return err + } + + // If the edge hasn't been created yet, then we'll first add it to the + // edge index in order to associate the edge between two nodes and also + // store the static components of the channel. + if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil { + return err + } + + // Mark edge policies for both sides as unknown. This is to enable + // efficient incoming channel lookup for a node. + for _, key := range []*[33]byte{&edge.NodeKey1Bytes, + &edge.NodeKey2Bytes} { + + err := putChanEdgePolicyUnknown(edges, edge.ChannelID, + key[:]) if err != nil { return err } + } - // First, attempt to check if this edge has already been - // created. If so, then we can exit early as this method is - // meant to be idempotent. - if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil { - return ErrEdgeAlreadyExist - } - - // Before we insert the channel into the database, we'll ensure - // that both nodes already exist in the channel graph. If - // either node doesn't, then we'll insert a "shell" node that - // just includes its public key, so subsequent validation and - // queries can work properly. - _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:]) - switch { - case node1Err == ErrGraphNodeNotFound: - node1Shell := LightningNode{ - PubKeyBytes: edge.NodeKey1Bytes, - HaveNodeAnnouncement: false, - } - err := addLightningNode(tx, &node1Shell) - if err != nil { - return fmt.Errorf("unable to create shell node "+ - "for: %x", edge.NodeKey1Bytes) - - } - case node1Err != nil: - return err - } - - _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:]) - switch { - case node2Err == ErrGraphNodeNotFound: - node2Shell := LightningNode{ - PubKeyBytes: edge.NodeKey2Bytes, - HaveNodeAnnouncement: false, - } - err := addLightningNode(tx, &node2Shell) - if err != nil { - return fmt.Errorf("unable to create shell node "+ - "for: %x", edge.NodeKey2Bytes) - - } - case node2Err != nil: - return err - } - - // If the edge hasn't been created yet, then we'll first add it - // to the edge index in order to associate the edge between two - // nodes and also store the static components of the channel. - if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil { - return err - } - - // Mark edge policies for both sides as unknown. This is to - // enable efficient incoming channel lookup for a node. - for _, key := range []*[33]byte{&edge.NodeKey1Bytes, - &edge.NodeKey2Bytes} { - - err := putChanEdgePolicyUnknown(edges, edge.ChannelID, - key[:]) - if err != nil { - return err - } - } - - // Finally we add it to the channel index which maps channel - // points (outpoints) to the shorter channel ID's. - var b bytes.Buffer - if err := writeOutpoint(&b, &edge.ChannelPoint); err != nil { - return err - } - return chanIndex.Put(b.Bytes(), chanKey[:]) - }) + // Finally we add it to the channel index which maps channel points + // (outpoints) to the shorter channel ID's. + var b bytes.Buffer + if err := writeOutpoint(&b, &edge.ChannelPoint); err != nil { + return err + } + return chanIndex.Put(b.Bytes(), chanKey[:]) } // HasChannelEdge returns true if the database knows of a channel edge with the @@ -1639,28 +1643,26 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket, // the nodes on either side of the channel. func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { return c.db.Update(func(tx *bbolt.Tx) error { - edges := tx.Bucket(edgeBucket) - if edges == nil { - return ErrEdgeNotFound - } - - edgeIndex := edges.Bucket(edgeIndexBucket) - if edgeIndex == nil { - return ErrEdgeNotFound - } - nodes, err := tx.CreateBucketIfNotExists(nodeBucket) - if err != nil { - return err - } - - return updateEdgePolicy(edges, edgeIndex, nodes, edge) + return updateEdgePolicy(tx, edge) }) } // updateEdgePolicy attempts to update an edge's policy within the relevant // buckets using an existing database transaction. -func updateEdgePolicy(edges, edgeIndex, nodes *bbolt.Bucket, - edge *ChannelEdgePolicy) error { +func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrEdgeNotFound + + } + edgeIndex := edges.Bucket(edgeIndexBucket) + if edgeIndex == nil { + return ErrEdgeNotFound + } + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return err + } // Create the channelID key be converting the channel ID // integer into a byte slice. diff --git a/channeldb/migrations.go b/channeldb/migrations.go index 647502c1..5d4919db 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -563,7 +563,7 @@ func migratePruneEdgeUpdateIndex(tx *bbolt.Tx) error { return err } - err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy) + err = updateEdgePolicy(tx, edgePolicy) if err != nil { return err } diff --git a/channeldb/nodes.go b/channeldb/nodes.go index 43729d33..95f6f7a2 100644 --- a/channeldb/nodes.go +++ b/channeldb/nodes.go @@ -62,13 +62,13 @@ type LinkNode struct { // NewLinkNode creates a new LinkNode from the provided parameters, which is // backed by an instance of channeldb. func (db *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey, - addr net.Addr) *LinkNode { + addrs ...net.Addr) *LinkNode { return &LinkNode{ Network: bitNet, IdentityPub: pub, LastSeen: time.Now(), - Addresses: []net.Addr{addr}, + Addresses: addrs, db: db, } } @@ -149,40 +149,44 @@ func (db *DB) deleteLinkNode(tx *bbolt.Tx, identity *btcec.PublicKey) error { // identity public key. If a particular LinkNode for the passed identity public // key cannot be found, then ErrNodeNotFound if returned. func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) { - var ( - node *LinkNode - err error - ) - - err = db.View(func(tx *bbolt.Tx) error { - // First fetch the bucket for storing node metadata, bailing - // out early if it hasn't been created yet. - nodeMetaBucket := tx.Bucket(nodeInfoBucket) - if nodeMetaBucket == nil { - return ErrLinkNodesNotFound + var linkNode *LinkNode + err := db.View(func(tx *bbolt.Tx) error { + node, err := fetchLinkNode(tx, identity) + if err != nil { + return err } - // If a link node for that particular public key cannot be - // located, then exit early with an ErrNodeNotFound. - pubKey := identity.SerializeCompressed() - nodeBytes := nodeMetaBucket.Get(pubKey) - if nodeBytes == nil { - return ErrNodeNotFound - } - - // Finally, decode an allocate a fresh LinkNode object to be - // returned to the caller. - nodeReader := bytes.NewReader(nodeBytes) - node, err = deserializeLinkNode(nodeReader) - return err + linkNode = node + return nil }) - if err != nil { - return nil, err + + return linkNode, err +} + +func fetchLinkNode(tx *bbolt.Tx, targetPub *btcec.PublicKey) (*LinkNode, error) { + // First fetch the bucket for storing node metadata, bailing out early + // if it hasn't been created yet. + nodeMetaBucket := tx.Bucket(nodeInfoBucket) + if nodeMetaBucket == nil { + return nil, ErrLinkNodesNotFound } - return node, nil + // If a link node for that particular public key cannot be located, + // then exit early with an ErrNodeNotFound. + pubKey := targetPub.SerializeCompressed() + nodeBytes := nodeMetaBucket.Get(pubKey) + if nodeBytes == nil { + return nil, ErrNodeNotFound + } + + // Finally, decode and allocate a fresh LinkNode object to be returned + // to the caller. + nodeReader := bytes.NewReader(nodeBytes) + return deserializeLinkNode(nodeReader) } +// TODO(roasbeef): update link node addrs in server upon connection + // FetchAllLinkNodes starts a new database transaction to fetch all nodes with // whom we have active channels with. func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) { diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 696b4636..6652df23 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -224,23 +224,10 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, // With the channels fetched, attempt to locate // the target channel according to its channel // point. - dbChannels, err := c.chanSource.FetchAllChannels() + channel, err := c.chanSource.FetchChannel(chanPoint) if err != nil { return nil, err } - var channel *channeldb.OpenChannel - for _, dbChannel := range dbChannels { - if dbChannel.FundingOutpoint == chanPoint { - channel = dbChannel - break - } - } - - // If the channel cannot be located, then we - // exit with an error to the channel. - if channel == nil { - return nil, fmt.Errorf("unable to find channel") - } chanMachine, err := lnwallet.NewLightningChannel( c.cfg.Signer, c.cfg.PreimageDB, channel, nil, diff --git a/lnwallet/channel.go b/lnwallet/channel.go index f10f20a7..9ed768e1 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -5640,14 +5640,24 @@ func (lc *LightningChannel) ForceClose() (*LocalForceCloseSummary, error) { lc.Lock() defer lc.Unlock() + // If we've detected local data loss for this channel, then we won't + // allow a force close, as it may be the case that we have a dated + // version of the commitment, or this is actually a channel shell. + if lc.channelState.HasChanStatus(channeldb.ChanStatusLocalDataLoss) { + return nil, fmt.Errorf("cannot force close channel with "+ + "state: %v", lc.channelState.ChanStatus()) + } + commitTx, err := lc.getSignedCommitTx() if err != nil { return nil, err } localCommitment := lc.channelState.LocalCommitment - summary, err := NewLocalForceCloseSummary(lc.channelState, - lc.Signer, lc.pCache, commitTx, localCommitment) + summary, err := NewLocalForceCloseSummary( + lc.channelState, lc.Signer, lc.pCache, commitTx, + localCommitment, + ) if err != nil { return nil, err } diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index e1c88e9e..f9e4821a 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -17,6 +17,7 @@ import ( "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" ) @@ -6274,3 +6275,33 @@ func TestChannelRestoreCommitHeight(t *testing.T) { bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 0, 2, 1) bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 1, 2, 2) } + +// TestForceCloseFailLocalDataLoss tests that we don't allow a force close of a +// channel that's in a non-default state. +func TestForceCloseFailLocalDataLoss(t *testing.T) { + t.Parallel() + + aliceChannel, _, cleanUp, err := CreateTestChannels() + if err != nil { + t.Fatalf("unable to create test channels: %v", err) + } + defer cleanUp() + + // Now that we have our set of channels, we'll modify the channel state + // to have a non-default channel flag. + err = aliceChannel.channelState.ApplyChanStatus( + channeldb.ChanStatusLocalDataLoss, + ) + if err != nil { + t.Fatalf("unable to apply channel state: %v", err) + } + + // Due to the change above, if we attempt to force close this + // channel, we should fail as it isn't safe to force close a + // channel that isn't in the pure default state. + _, err = aliceChannel.ForceClose() + if err == nil { + t.Fatalf("expected force close to fail due to non-default " + + "chan state") + } +} diff --git a/peer.go b/peer.go index 148088c2..677d147d 100644 --- a/peer.go +++ b/peer.go @@ -409,7 +409,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { // Skip adding any permanently irreconcilable channels to the // htlcswitch. - if dbChan.ChanStatus() != channeldb.Default { + if dbChan.ChanStatus() != channeldb.ChanStatusDefault { peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+ "start.", chanPoint, dbChan.ChanStatus()) continue diff --git a/rpcserver.go b/rpcserver.go index f992023d..46be68e8 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1709,7 +1709,7 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, // With the chanPoint constructed, we'll attempt to find the target // channel in the database. If we can't find the channel, then we'll // return the error back to the caller. - dbChan, err := r.fetchOpenDbChannel(*chanPoint) + dbChan, err := r.server.chanDB.FetchChannel(*chanPoint) if err != nil { return nil, err } @@ -1746,42 +1746,13 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, return &lnrpc.AbandonChannelResponse{}, nil } -// fetchOpenDbChannel attempts to locate a channel identified by its channel -// point from the database's set of all currently opened channels. -func (r *rpcServer) fetchOpenDbChannel(chanPoint wire.OutPoint) ( - *channeldb.OpenChannel, error) { - - dbChannels, err := r.server.chanDB.FetchAllChannels() - if err != nil { - return nil, err - } - - // With the channels fetched, attempt to locate the target channel - // according to its channel point. - var dbChan *channeldb.OpenChannel - for _, dbChannel := range dbChannels { - if dbChannel.FundingOutpoint == chanPoint { - dbChan = dbChannel - break - } - } - - // If the channel cannot be located, then we exit with an error to the - // caller. - if dbChan == nil { - return nil, fmt.Errorf("unable to find channel") - } - - return dbChan, nil -} - // fetchActiveChannel attempts to locate a channel identified by its channel // point from the database's set of all currently opened channels and // return it as a fully populated state machine func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) ( *lnwallet.LightningChannel, error) { - dbChan, err := r.fetchOpenDbChannel(chanPoint) + dbChan, err := r.server.chanDB.FetchChannel(chanPoint) if err != nil { return nil, err }