Merge pull request #2371 from Roasbeef/static-chan-backups-channeldb

channeldb: prepatory modifications for full SCB implementation
This commit is contained in:
Olaoluwa Osuntokun 2019-01-30 14:40:27 -08:00 committed by GitHub
commit 1863dcef1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 948 additions and 250 deletions

@ -1580,9 +1580,6 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa
if err := channelAlice.State().SyncPending(addr, 101); err != nil {
return nil, nil, nil, err
}
if err := channelAlice.State().FullSync(); err != nil {
return nil, nil, nil, err
}
addr = &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
@ -1591,9 +1588,6 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa
if err := channelBob.State().SyncPending(addr, 101); err != nil {
return nil, nil, nil, err
}
if err := channelBob.State().FullSync(); err != nil {
return nil, nil, nil, err
}
cleanUpFunc := func() {
dbBob.Close()

@ -7,6 +7,8 @@ import (
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"github.com/btcsuite/btcd/btcec"
@ -100,6 +102,11 @@ var (
// ErrNoCommitPoint is returned when no data loss commit point is found
// in the database.
ErrNoCommitPoint = fmt.Errorf("no commit point found")
// ErrNoRestoredChannelMutation is returned when a caller attempts to
// mutate a channel that's been recovered.
ErrNoRestoredChannelMutation = fmt.Errorf("cannot mutate restored " +
"channel state")
)
// ChannelType is an enum-like type that describes one of several possible
@ -293,40 +300,81 @@ type ChannelCommitment struct {
type ChannelStatus uint8
var (
// Default is the normal state of an open channel.
Default ChannelStatus
// ChanStatusDefault is the normal state of an open channel.
ChanStatusDefault ChannelStatus
// Borked indicates that the channel has entered an irreconcilable
// state, triggered by a state desynchronization or channel breach.
// Channels in this state should never be added to the htlc switch.
Borked ChannelStatus = 1
// ChanStatusBorked indicates that the channel has entered an
// irreconcilable state, triggered by a state desynchronization or
// channel breach. Channels in this state should never be added to the
// htlc switch.
ChanStatusBorked ChannelStatus = 1
// CommitmentBroadcasted indicates that a commitment for this channel
// has been broadcasted.
CommitmentBroadcasted ChannelStatus = 1 << 1
// ChanStatusCommitBroadcasted indicates that a commitment for this
// channel has been broadcasted.
ChanStatusCommitBroadcasted ChannelStatus = 1 << 1
// LocalDataLoss indicates that we have lost channel state for this
// channel, and broadcasting our latest commitment might be considered
// a breach.
// ChanStatusLocalDataLoss indicates that we have lost channel state
// for this channel, and broadcasting our latest commitment might be
// considered a breach.
//
// TODO(halseh): actually enforce that we are not force closing such a
// channel.
LocalDataLoss ChannelStatus = 1 << 2
ChanStatusLocalDataLoss ChannelStatus = 1 << 2
// ChanStatusRestored is a status flag that signals that the channel
// has been restored, and doesn't have all the fields a typical channel
// will have.
ChanStatusRestored ChannelStatus = 1 << 3
)
// chanStatusStrings maps a ChannelStatus to a human friendly string that
// describes that status.
var chanStatusStrings = map[ChannelStatus]string{
ChanStatusDefault: "ChanStatusDefault",
ChanStatusBorked: "ChanStatusBorked",
ChanStatusCommitBroadcasted: "ChanStatusCommitBroadcasted",
ChanStatusLocalDataLoss: "ChanStatusLocalDataLoss",
ChanStatusRestored: "ChanStatusRestored",
}
// orderedChanStatusFlags is an in-order list of all that channel status flags.
var orderedChanStatusFlags = []ChannelStatus{
ChanStatusDefault,
ChanStatusBorked,
ChanStatusCommitBroadcasted,
ChanStatusLocalDataLoss,
ChanStatusRestored,
}
// String returns a human-readable representation of the ChannelStatus.
func (c ChannelStatus) String() string {
switch c {
case Default:
return "Default"
case Borked:
return "Borked"
case CommitmentBroadcasted:
return "CommitmentBroadcasted"
case LocalDataLoss:
return "LocalDataLoss"
default:
return fmt.Sprintf("Unknown(%08b)", c)
// If no flags are set, then this is the default case.
if c == 0 {
return chanStatusStrings[ChanStatusDefault]
}
// Add individual bit flags.
statusStr := ""
for _, flag := range orderedChanStatusFlags {
if c&flag == flag {
statusStr += chanStatusStrings[flag] + "|"
c -= flag
}
}
// Remove anything to the right of the final bar, including it as well.
statusStr = strings.TrimRight(statusStr, "|")
// Add any remaining flags which aren't accounted for as hex.
if c != 0 {
statusStr += "|0x" + strconv.FormatUint(uint64(c), 16)
}
// If this was purely an unknown flag, then remove the extra bar at the
// start of the string.
statusStr = strings.TrimLeft(statusStr, "|")
return statusStr
}
// OpenChannel encapsulates the persistent and dynamic state of an open channel
@ -488,6 +536,28 @@ func (c *OpenChannel) ChanStatus() ChannelStatus {
return c.chanStatus
}
// ApplyChanStatus allows the caller to modify the internal channel state in a
// thead-safe manner.
func (c *OpenChannel) ApplyChanStatus(status ChannelStatus) error {
c.Lock()
defer c.Unlock()
return c.putChanStatus(status)
}
// HasChanStatus returns true if the internal bitfield channel status of the
// target channel has the specified status bit set.
func (c *OpenChannel) HasChanStatus(status ChannelStatus) bool {
c.RLock()
defer c.RUnlock()
return c.hasChanStatus(status)
}
func (c *OpenChannel) hasChanStatus(status ChannelStatus) bool {
return c.chanStatus&status == status
}
// RefreshShortChanID updates the in-memory short channel ID using the latest
// value observed on disk.
func (c *OpenChannel) RefreshShortChanID() error {
@ -591,16 +661,20 @@ func (c *OpenChannel) fullSync(tx *bbolt.Tx) error {
}
// With the bucket for the node fetched, we can now go down another
// level, creating the bucket (if it doesn't exist), for this channel
// itself.
// level, creating the bucket for this channel itself.
var chanPointBuf bytes.Buffer
if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil {
return err
}
chanBucket, err := chainBucket.CreateBucketIfNotExists(
chanBucket, err := chainBucket.CreateBucket(
chanPointBuf.Bytes(),
)
if err != nil {
switch {
case err == bbolt.ErrBucketExists:
// If this channel already exists, then in order to avoid
// overriding it, we'll return an error back up to the caller.
return ErrChanAlreadyExists
case err != nil:
return err
}
@ -664,7 +738,7 @@ func (c *OpenChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error {
// Add status LocalDataLoss to the existing bitvector found in
// the DB.
status = channel.chanStatus | LocalDataLoss
status = channel.chanStatus | ChanStatusLocalDataLoss
channel.chanStatus = status
var b bytes.Buffer
@ -730,7 +804,7 @@ func (c *OpenChannel) MarkBorked() error {
c.Lock()
defer c.Unlock()
return c.putChanStatus(Borked)
return c.putChanStatus(ChanStatusBorked)
}
// MarkCommitmentBroadcasted marks the channel as a commitment transaction has
@ -740,7 +814,7 @@ func (c *OpenChannel) MarkCommitmentBroadcasted() error {
c.Lock()
defer c.Unlock()
return c.putChanStatus(CommitmentBroadcasted)
return c.putChanStatus(ChanStatusCommitBroadcasted)
}
func (c *OpenChannel) putChanStatus(status ChannelStatus) error {
@ -846,35 +920,40 @@ func (c *OpenChannel) SyncPending(addr net.Addr, pendingHeight uint32) error {
c.FundingBroadcastHeight = pendingHeight
return c.Db.Update(func(tx *bbolt.Tx) error {
// First, sync all the persistent channel state to disk.
if err := c.fullSync(tx); err != nil {
return err
}
nodeInfoBucket, err := tx.CreateBucketIfNotExists(nodeInfoBucket)
if err != nil {
return err
}
// If a LinkNode for this identity public key already exists,
// then we can exit early.
nodePub := c.IdentityPub.SerializeCompressed()
if nodeInfoBucket.Get(nodePub) != nil {
return nil
}
// Next, we need to establish a (possibly) new LinkNode
// relationship for this channel. The LinkNode metadata
// contains reachability, up-time, and service bits related
// information.
linkNode := c.Db.NewLinkNode(wire.MainNet, c.IdentityPub, addr)
// TODO(roasbeef): do away with link node all together?
return putLinkNode(nodeInfoBucket, linkNode)
return syncNewChannel(tx, c, []net.Addr{addr})
})
}
// syncNewChannel will write the passed channel to disk, and also create a
// LinkNode (if needed) for the channel peer.
func syncNewChannel(tx *bbolt.Tx, c *OpenChannel, addrs []net.Addr) error {
// First, sync all the persistent channel state to disk.
if err := c.fullSync(tx); err != nil {
return err
}
nodeInfoBucket, err := tx.CreateBucketIfNotExists(nodeInfoBucket)
if err != nil {
return err
}
// If a LinkNode for this identity public key already exists,
// then we can exit early.
nodePub := c.IdentityPub.SerializeCompressed()
if nodeInfoBucket.Get(nodePub) != nil {
return nil
}
// Next, we need to establish a (possibly) new LinkNode relationship
// for this channel. The LinkNode metadata contains reachability,
// up-time, and service bits related information.
linkNode := c.Db.NewLinkNode(wire.MainNet, c.IdentityPub, addrs...)
// TODO(roasbeef): do away with link node all together?
return putLinkNode(nodeInfoBucket, linkNode)
}
// UpdateCommitment updates the commitment state for the specified party
// (remote or local). The commitment stat completely describes the balance
// state at this point in the commitment chain. This method its to be called on
@ -884,6 +963,13 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error {
c.Lock()
defer c.Unlock()
// If this is a restored channel, then we want to avoid mutating the
// state as all, as it's impossible to do so in a protocol compliant
// manner.
if c.hasChanStatus(ChanStatusRestored) {
return ErrNoRestoredChannelMutation
}
err := c.Db.Update(func(tx *bbolt.Tx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
@ -898,7 +984,9 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error {
// With the proper bucket fetched, we'll now write toe latest
// commitment state to dis for the target party.
err = putChanCommitment(chanBucket, newCommitment, true)
err = putChanCommitment(
chanBucket, newCommitment, true,
)
if err != nil {
return fmt.Errorf("unable to store chan "+
"revocations: %v", err)
@ -1303,6 +1391,13 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error {
c.Lock()
defer c.Unlock()
// If this is a restored channel, then we want to avoid mutating the
// state as all, as it's impossible to do so in a protocol compliant
// manner.
if c.hasChanStatus(ChanStatusRestored) {
return ErrNoRestoredChannelMutation
}
return c.Db.Update(func(tx *bbolt.Tx) error {
// First, we'll grab the writable bucket where this channel's
// data resides.
@ -1427,6 +1522,13 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error {
c.Lock()
defer c.Unlock()
// If this is a restored channel, then we want to avoid mutating the
// state as all, as it's impossible to do so in a protocol compliant
// manner.
if c.hasChanStatus(ChanStatusRestored) {
return ErrNoRestoredChannelMutation
}
var newRemoteCommit *ChannelCommitment
err := c.Db.Update(func(tx *bbolt.Tx) error {
@ -1466,7 +1568,9 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error {
if err != nil {
return err
}
err = putChanCommitment(chanBucket, &newCommit.Commitment, false)
err = putChanCommitment(
chanBucket, &newCommit.Commitment, false,
)
if err != nil {
return err
}
@ -2279,12 +2383,22 @@ func putChanCommitment(chanBucket *bbolt.Bucket, c *ChannelCommitment,
}
func putChanCommitments(chanBucket *bbolt.Bucket, channel *OpenChannel) error {
err := putChanCommitment(chanBucket, &channel.LocalCommitment, true)
// If this is a restored channel, then we don't have any commitments to
// write.
if channel.hasChanStatus(ChanStatusRestored) {
return nil
}
err := putChanCommitment(
chanBucket, &channel.LocalCommitment, true,
)
if err != nil {
return err
}
return putChanCommitment(chanBucket, &channel.RemoteCommitment, false)
return putChanCommitment(
chanBucket, &channel.RemoteCommitment, false,
)
}
func putChanRevocationState(chanBucket *bbolt.Bucket, channel *OpenChannel) error {
@ -2399,6 +2513,12 @@ func fetchChanCommitment(chanBucket *bbolt.Bucket, local bool) (ChannelCommitmen
func fetchChanCommitments(chanBucket *bbolt.Bucket, channel *OpenChannel) error {
var err error
// If this is a restored channel, then we don't have any commitments to
// read.
if channel.hasChanStatus(ChanStatusRestored) {
return nil
}
channel.LocalCommitment, err = fetchChanCommitment(chanBucket, true)
if err != nil {
return err

@ -4,9 +4,11 @@ import (
"bytes"
"encoding/binary"
"fmt"
"net"
"os"
"path/filepath"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
@ -407,6 +409,102 @@ func (d *DB) fetchNodeChannels(chainBucket *bbolt.Bucket) ([]*OpenChannel, error
return channels, nil
}
// FetchChannel attempts to locate a channel specified by the passed channel
// point. If the channel cannot be found, then an error will be returned.
func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) {
var (
targetChan *OpenChannel
targetChanPoint bytes.Buffer
)
if err := writeOutpoint(&targetChanPoint, &chanPoint); err != nil {
return nil, err
}
// chanScan will traverse the following bucket structure:
// * nodePub => chainHash => chanPoint
//
// At each level we go one further, ensuring that we're traversing the
// proper key (that's actually a bucket). By only reading the bucket
// structure and skipping fully decoding each channel, we save a good
// bit of CPU as we don't need to do things like decompress public
// keys.
chanScan := func(tx *bbolt.Tx) error {
// Get the bucket dedicated to storing the metadata for open
// channels.
openChanBucket := tx.Bucket(openChannelBucket)
if openChanBucket == nil {
return ErrNoActiveChannels
}
// Within the node channel bucket, are the set of node pubkeys
// we have channels with, we don't know the entire set, so
// we'll check them all.
return openChanBucket.ForEach(func(nodePub, v []byte) error {
// Ensure that this is a key the same size as a pubkey,
// and also that it leads directly to a bucket.
if len(nodePub) != 33 || v != nil {
return nil
}
nodeChanBucket := openChanBucket.Bucket(nodePub)
if nodeChanBucket == nil {
return nil
}
// The next layer down is all the chains that this node
// has channels on with us.
return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
// If there's a value, it's not a bucket so
// ignore it.
if v != nil {
return nil
}
chainBucket := nodeChanBucket.Bucket(chainHash)
if chainBucket == nil {
return fmt.Errorf("unable to read "+
"bucket for chain=%x", chainHash[:])
}
// Finally we reach the leaf bucket that stores
// all the chanPoints for this node.
chanBucket := chainBucket.Bucket(
targetChanPoint.Bytes(),
)
if chanBucket == nil {
return nil
}
channel, err := fetchOpenChannel(
chanBucket, &chanPoint,
)
if err != nil {
return err
}
targetChan = channel
targetChan.Db = d
return nil
})
})
}
err := d.View(chanScan)
if err != nil {
return nil, err
}
if targetChan != nil {
return targetChan, nil
}
// If we can't find the channel, then we return with an error, as we
// have nothing to backup.
return nil, ErrChannelNotFound
}
// FetchAllChannels attempts to retrieve all open channels currently stored
// within the database, including pending open, fully open and channels waiting
// for a closing transaction to confirm.
@ -530,7 +628,7 @@ func fetchChannels(d *DB, pending, waitingClose bool) ([]*OpenChannel, error) {
// than Default, then it means it is
// waiting to be closed.
channelWaitingClose :=
channel.ChanStatus() != Default
channel.ChanStatus() != ChanStatusDefault
// Only include it if we requested
// channels with the same waitingClose
@ -772,6 +870,165 @@ func (d *DB) PruneLinkNodes() error {
})
}
// ChannelShell is a shell of a channel that is meant to be used for channel
// recovery purposes. It contains a minimal OpenChannel instance along with
// addresses for that target node.
type ChannelShell struct {
// NodeAddrs the set of addresses that this node has known to be
// reachable at in the past.
NodeAddrs []net.Addr
// Chan is a shell of an OpenChannel, it contains only the items
// required to restore the channel on disk.
Chan *OpenChannel
}
// RestoreChannelShells is a method that allows the caller to reconstruct the
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
// new channel to disk, create a LinkNode instance with the passed node
// addresses, and finally create an edge within the graph for the channel as
// well. This method is idempotent, so repeated calls with the same set of
// channel shells won't modify the database after the initial call.
func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
chanGraph := ChannelGraph{d}
return d.Update(func(tx *bbolt.Tx) error {
for _, channelShell := range channelShells {
channel := channelShell.Chan
// First, we'll attempt to create a new open channel
// and link node for this channel. If the channel
// already exists, then in order to ensure this method
// is idempotent, we'll continue to the next step.
channel.Db = d
err := syncNewChannel(
tx, channel, channelShell.NodeAddrs,
)
if err != nil {
return err
}
// Next, we'll create an active edge in the graph
// database for this channel in order to restore our
// partial view of the network.
//
// TODO(roasbeef): if we restore *after* the channel
// has been closed on chain, then need to inform the
// router that it should try and prune these values as
// we can detect them
edgeInfo := ChannelEdgeInfo{
ChannelID: channel.ShortChannelID.ToUint64(),
ChainHash: channel.ChainHash,
ChannelPoint: channel.FundingOutpoint,
}
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
}
selfNode, err := chanGraph.sourceNode(nodes)
if err != nil {
return err
}
// Depending on which pub key is smaller, we'll assign
// our roles as "node1" and "node2".
chanPeer := channel.IdentityPub.SerializeCompressed()
selfIsSmaller := bytes.Compare(
selfNode.PubKeyBytes[:], chanPeer,
) == -1
if selfIsSmaller {
copy(edgeInfo.NodeKey1Bytes[:], selfNode.PubKeyBytes[:])
copy(edgeInfo.NodeKey2Bytes[:], chanPeer)
} else {
copy(edgeInfo.NodeKey1Bytes[:], chanPeer)
copy(edgeInfo.NodeKey2Bytes[:], selfNode.PubKeyBytes[:])
}
// With the edge info shell constructed, we'll now add
// it to the graph.
err = chanGraph.addChannelEdge(tx, &edgeInfo)
if err != nil {
return err
}
// Similarly, we'll construct a channel edge shell and
// add that itself to the graph.
chanEdge := ChannelEdgePolicy{
ChannelID: edgeInfo.ChannelID,
LastUpdate: time.Now(),
}
// If their pubkey is larger, then we'll flip the
// direction bit to indicate that us, the "second" node
// is updating their policy.
if !selfIsSmaller {
chanEdge.ChannelFlags |= lnwire.ChanUpdateDirection
}
err = updateEdgePolicy(tx, &chanEdge)
if err != nil {
return err
}
}
return nil
})
}
// AddrsForNode consults the graph and channel database for all addresses known
// to the passed node public key.
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
var (
linkNode *LinkNode
graphNode LightningNode
)
dbErr := d.View(func(tx *bbolt.Tx) error {
var err error
linkNode, err = fetchLinkNode(tx, nodePub)
if err != nil {
return err
}
// We'll also query the graph for this peer to see if they have
// any addresses that we don't currently have stored within the
// link node database.
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
}
compressedPubKey := nodePub.SerializeCompressed()
graphNode, err = fetchLightningNode(nodes, compressedPubKey)
if err != nil {
return err
}
return nil
})
if dbErr != nil {
return nil, dbErr
}
// Now that we have both sources of addrs for this node, we'll use a
// map to de-duplicate any addresses between the two sources, and
// produce a final list of the combined addrs.
addrs := make(map[string]net.Addr)
for _, addr := range linkNode.Addresses {
addrs[addr.String()] = addr
}
for _, addr := range graphNode.Addresses {
addrs[addr.String()] = addr
}
dedupedAddrs := make([]net.Addr, 0, len(addrs))
for _, addr := range addrs {
dedupedAddrs = append(dedupedAddrs, addr)
}
return dedupedAddrs, nil
}
// syncVersions function is used for safe db version synchronization. It
// applies migration functions to the current database and recovers the
// previous state of db if at least one error/panic appeared during migration.

@ -2,12 +2,22 @@ package channeldb
import (
"io/ioutil"
"math"
"math/rand"
"net"
"os"
"path/filepath"
"reflect"
"testing"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain"
)
func TestOpenWithCreate(t *testing.T) {
@ -147,3 +157,310 @@ func TestFetchClosedChannelForID(t *testing.T) {
t.Fatalf("expected ErrClosedChannelNotFound, instead got: %v", err)
}
}
// TestAddrsForNode tests the we're able to properly obtain all the addresses
// for a target node.
func TestAddrsForNode(t *testing.T) {
t.Parallel()
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := cdb.ChannelGraph()
// We'll make a test vertex to insert into the database, as the source
// node, but this node will only have half the number of addresses it
// usually does.
testNode, err := createTestVertex(cdb)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
testNode.Addresses = []net.Addr{testAddr}
if err := graph.SetSourceNode(testNode); err != nil {
t.Fatalf("unable to set source node: %v", err)
}
// Next, we'll make a link node with the same pubkey, but with an
// additional address.
nodePub, err := testNode.PubKey()
if err != nil {
t.Fatalf("unable to recv node pub: %v", err)
}
linkNode := cdb.NewLinkNode(
wire.MainNet, nodePub, anotherAddr,
)
if err := linkNode.Sync(); err != nil {
t.Fatalf("unable to sync link node: %v", err)
}
// Now that we've created a link node, as well as a vertex for the
// node, we'll query for all its addresses.
nodeAddrs, err := cdb.AddrsForNode(nodePub)
if err != nil {
t.Fatalf("unable to obtain node addrs: %v", err)
}
expectedAddrs := make(map[string]struct{})
expectedAddrs[testAddr.String()] = struct{}{}
expectedAddrs[anotherAddr.String()] = struct{}{}
// Finally, ensure that all the expected addresses are found.
if len(nodeAddrs) != len(expectedAddrs) {
t.Fatalf("expected %v addrs, got %v",
len(expectedAddrs), len(nodeAddrs))
}
for _, addr := range nodeAddrs {
if _, ok := expectedAddrs[addr.String()]; !ok {
t.Fatalf("unexpected addr: %v", addr)
}
}
}
// TestFetchChannel tests that we're able to fetch an arbitrary channel from
// disk.
func TestFetchChannel(t *testing.T) {
t.Parallel()
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
// Create the test channel state that we'll sync to the database
// shortly.
channelState, err := createTestChannelState(cdb)
if err != nil {
t.Fatalf("unable to create channel state: %v", err)
}
// Mark the channel as pending, then immediately mark it as open to it
// can be fully visible.
addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
if err := channelState.SyncPending(addr, 9); err != nil {
t.Fatalf("unable to save and serialize channel state: %v", err)
}
err = channelState.MarkAsOpen(lnwire.NewShortChanIDFromInt(99))
if err != nil {
t.Fatalf("unable to mark channel open: %v", err)
}
// Next, attempt to fetch the channel by its chan point.
dbChannel, err := cdb.FetchChannel(channelState.FundingOutpoint)
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
// The decoded channel state should be identical to what we stored
// above.
if !reflect.DeepEqual(channelState, dbChannel) {
t.Fatalf("channel state doesn't match:: %v vs %v",
spew.Sdump(channelState), spew.Sdump(dbChannel))
}
// If we attempt to query for a non-exist ante channel, then we should
// get an error.
channelState2, err := createTestChannelState(cdb)
if err != nil {
t.Fatalf("unable to create channel state: %v", err)
}
channelState2.FundingOutpoint.Index ^= 1
_, err = cdb.FetchChannel(channelState2.FundingOutpoint)
if err == nil {
t.Fatalf("expected query to fail")
}
}
func genRandomChannelShell() (*ChannelShell, error) {
var testPriv [32]byte
if _, err := rand.Read(testPriv[:]); err != nil {
return nil, err
}
_, pub := btcec.PrivKeyFromBytes(btcec.S256(), testPriv[:])
var chanPoint wire.OutPoint
if _, err := rand.Read(chanPoint.Hash[:]); err != nil {
return nil, err
}
pub.Curve = nil
chanPoint.Index = uint32(rand.Intn(math.MaxUint16))
chanStatus := ChanStatusDefault | ChanStatusRestored
var shaChainPriv [32]byte
if _, err := rand.Read(testPriv[:]); err != nil {
return nil, err
}
revRoot, err := chainhash.NewHash(shaChainPriv[:])
if err != nil {
return nil, err
}
shaChainProducer := shachain.NewRevocationProducer(*revRoot)
return &ChannelShell{
NodeAddrs: []net.Addr{&net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}},
Chan: &OpenChannel{
chanStatus: chanStatus,
ChainHash: rev,
FundingOutpoint: chanPoint,
ShortChannelID: lnwire.NewShortChanIDFromInt(
uint64(rand.Int63()),
),
IdentityPub: pub,
LocalChanCfg: ChannelConfig{
ChannelConstraints: ChannelConstraints{
CsvDelay: uint16(rand.Int63()),
},
PaymentBasePoint: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: keychain.KeyFamily(rand.Int63()),
Index: uint32(rand.Int63()),
},
},
},
RemoteCurrentRevocation: pub,
IsPending: false,
RevocationStore: shachain.NewRevocationStore(),
RevocationProducer: shaChainProducer,
},
}, nil
}
// TestRestoreChannelShells tests that we're able to insert a partially channel
// populated to disk. This is useful for channel recovery purposes. We should
// find the new channel shell on disk, and also the db should be populated with
// an edge for that channel.
func TestRestoreChannelShells(t *testing.T) {
t.Parallel()
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
// First, we'll make our channel shell, it will only have the minimal
// amount of information required for us to initiate the data loss
// protection feature.
channelShell, err := genRandomChannelShell()
if err != nil {
t.Fatalf("unable to gen channel shell: %v", err)
}
graph := cdb.ChannelGraph()
// Before we can restore the channel, we'll need to make a source node
// in the graph as the channel edge we create will need to have a
// origin.
testNode, err := createTestVertex(cdb)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.SetSourceNode(testNode); err != nil {
t.Fatalf("unable to set source node: %v", err)
}
// With the channel shell constructed, we'll now insert it into the
// database with the restoration method.
if err := cdb.RestoreChannelShells(channelShell); err != nil {
t.Fatalf("unable to restore channel shell: %v", err)
}
// Now that the channel has been inserted, we'll attempt to query for
// it to ensure we can properly locate it via various means.
//
// First, we'll attempt to query for all channels that we have with the
// node public key that was restored.
nodeChans, err := cdb.FetchOpenChannels(channelShell.Chan.IdentityPub)
if err != nil {
t.Fatalf("unable find channel: %v", err)
}
// We should now find a single channel from the database.
if len(nodeChans) != 1 {
t.Fatalf("unable to find restored channel by node "+
"pubkey: %v", err)
}
// Ensure that it isn't possible to modify the commitment state machine
// of this restored channel.
channel := nodeChans[0]
err = channel.UpdateCommitment(nil)
if err != ErrNoRestoredChannelMutation {
t.Fatalf("able to mutate restored channel")
}
err = channel.AppendRemoteCommitChain(nil)
if err != ErrNoRestoredChannelMutation {
t.Fatalf("able to mutate restored channel")
}
err = channel.AdvanceCommitChainTail(nil)
if err != ErrNoRestoredChannelMutation {
t.Fatalf("able to mutate restored channel")
}
// That single channel should have the proper channel point, and also
// the expected set of flags to indicate that it was a restored
// channel.
if nodeChans[0].FundingOutpoint != channelShell.Chan.FundingOutpoint {
t.Fatalf("wrong funding outpoint: expected %v, got %v",
nodeChans[0].FundingOutpoint,
channelShell.Chan.FundingOutpoint)
}
if !nodeChans[0].HasChanStatus(ChanStatusRestored) {
t.Fatalf("node has wrong status flags: %v",
nodeChans[0].chanStatus)
}
// We should also be able to find the channel if we query for it
// directly.
_, err = cdb.FetchChannel(channelShell.Chan.FundingOutpoint)
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
// We should also be able to find the link node that was inserted by
// its public key.
linkNode, err := cdb.FetchLinkNode(channelShell.Chan.IdentityPub)
if err != nil {
t.Fatalf("unable to fetch link node: %v", err)
}
// The node should have the same address, as specified in the channel
// shell.
if reflect.DeepEqual(linkNode.Addresses, channelShell.NodeAddrs) {
t.Fatalf("addr mismach: expected %v, got %v",
linkNode.Addresses, channelShell.NodeAddrs)
}
// Finally, we'll ensure that the edge for the channel was properly
// inserted.
chanInfos, err := graph.FetchChanInfos(
[]uint64{channelShell.Chan.ShortChannelID.ToUint64()},
)
if err != nil {
t.Fatalf("unable to find edges: %v", err)
}
if len(chanInfos) != 1 {
t.Fatalf("wrong amount of chan infos: expected %v got %v",
len(chanInfos), 1)
}
// We should only find a single edge.
if chanInfos[0].Policy1 != nil && chanInfos[0].Policy2 != nil {
t.Fatalf("only a single edge should be inserted: %v", err)
}
}

@ -103,6 +103,11 @@ var (
// indicate it should be.
ErrEdgePolicyOptionalFieldNotFound = fmt.Errorf("optional field not " +
"present")
// ErrChanAlreadyExists is return when the caller attempts to create a
// channel with a channel point that is already present in the
// database.
ErrChanAlreadyExists = fmt.Errorf("channel already exists")
)
// ErrTooManyExtraOpaqueBytes creates an error which should be returned if the

@ -483,101 +483,105 @@ func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket,
// the channel supports. The chanPoint and chanID are used to uniquely identify
// the edge globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return c.db.Update(func(tx *bbolt.Tx) error {
return c.addChannelEdge(tx, edge)
})
}
// addChannelEdge is the private form of AddChannelEdge that allows callers to
// utilize an existing db transaction.
func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error {
// Construct the channel's primary key which is the 8-byte channel ID.
var chanKey [8]byte
binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
return c.db.Update(func(tx *bbolt.Tx) error {
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
if err != nil {
return err
}
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil {
return err
}
chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
if err != nil {
return err
}
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil {
return err
}
chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
if err != nil {
return err
}
// First, attempt to check if this edge has already been created. If
// so, then we can exit early as this method is meant to be idempotent.
if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
return ErrEdgeAlreadyExist
}
// Before we insert the channel into the database, we'll ensure that
// both nodes already exist in the channel graph. If either node
// doesn't, then we'll insert a "shell" node that just includes its
// public key, so subsequent validation and queries can work properly.
_, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
switch {
case node1Err == ErrGraphNodeNotFound:
node1Shell := LightningNode{
PubKeyBytes: edge.NodeKey1Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node1Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey1Bytes)
}
case node1Err != nil:
return err
}
_, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
switch {
case node2Err == ErrGraphNodeNotFound:
node2Shell := LightningNode{
PubKeyBytes: edge.NodeKey2Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node2Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey2Bytes)
}
case node2Err != nil:
return err
}
// If the edge hasn't been created yet, then we'll first add it to the
// edge index in order to associate the edge between two nodes and also
// store the static components of the channel.
if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
return err
}
// Mark edge policies for both sides as unknown. This is to enable
// efficient incoming channel lookup for a node.
for _, key := range []*[33]byte{&edge.NodeKey1Bytes,
&edge.NodeKey2Bytes} {
err := putChanEdgePolicyUnknown(edges, edge.ChannelID,
key[:])
if err != nil {
return err
}
}
// First, attempt to check if this edge has already been
// created. If so, then we can exit early as this method is
// meant to be idempotent.
if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
return ErrEdgeAlreadyExist
}
// Before we insert the channel into the database, we'll ensure
// that both nodes already exist in the channel graph. If
// either node doesn't, then we'll insert a "shell" node that
// just includes its public key, so subsequent validation and
// queries can work properly.
_, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
switch {
case node1Err == ErrGraphNodeNotFound:
node1Shell := LightningNode{
PubKeyBytes: edge.NodeKey1Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node1Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey1Bytes)
}
case node1Err != nil:
return err
}
_, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
switch {
case node2Err == ErrGraphNodeNotFound:
node2Shell := LightningNode{
PubKeyBytes: edge.NodeKey2Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node2Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey2Bytes)
}
case node2Err != nil:
return err
}
// If the edge hasn't been created yet, then we'll first add it
// to the edge index in order to associate the edge between two
// nodes and also store the static components of the channel.
if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
return err
}
// Mark edge policies for both sides as unknown. This is to
// enable efficient incoming channel lookup for a node.
for _, key := range []*[33]byte{&edge.NodeKey1Bytes,
&edge.NodeKey2Bytes} {
err := putChanEdgePolicyUnknown(edges, edge.ChannelID,
key[:])
if err != nil {
return err
}
}
// Finally we add it to the channel index which maps channel
// points (outpoints) to the shorter channel ID's.
var b bytes.Buffer
if err := writeOutpoint(&b, &edge.ChannelPoint); err != nil {
return err
}
return chanIndex.Put(b.Bytes(), chanKey[:])
})
// Finally we add it to the channel index which maps channel points
// (outpoints) to the shorter channel ID's.
var b bytes.Buffer
if err := writeOutpoint(&b, &edge.ChannelPoint); err != nil {
return err
}
return chanIndex.Put(b.Bytes(), chanKey[:])
}
// HasChannelEdge returns true if the database knows of a channel edge with the
@ -1639,28 +1643,26 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket,
// the nodes on either side of the channel.
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
return c.db.Update(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrEdgeNotFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrEdgeNotFound
}
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
return updateEdgePolicy(edges, edgeIndex, nodes, edge)
return updateEdgePolicy(tx, edge)
})
}
// updateEdgePolicy attempts to update an edge's policy within the relevant
// buckets using an existing database transaction.
func updateEdgePolicy(edges, edgeIndex, nodes *bbolt.Bucket,
edge *ChannelEdgePolicy) error {
func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrEdgeNotFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrEdgeNotFound
}
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
// Create the channelID key be converting the channel ID
// integer into a byte slice.

@ -563,7 +563,7 @@ func migratePruneEdgeUpdateIndex(tx *bbolt.Tx) error {
return err
}
err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy)
err = updateEdgePolicy(tx, edgePolicy)
if err != nil {
return err
}

@ -62,13 +62,13 @@ type LinkNode struct {
// NewLinkNode creates a new LinkNode from the provided parameters, which is
// backed by an instance of channeldb.
func (db *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey,
addr net.Addr) *LinkNode {
addrs ...net.Addr) *LinkNode {
return &LinkNode{
Network: bitNet,
IdentityPub: pub,
LastSeen: time.Now(),
Addresses: []net.Addr{addr},
Addresses: addrs,
db: db,
}
}
@ -149,40 +149,44 @@ func (db *DB) deleteLinkNode(tx *bbolt.Tx, identity *btcec.PublicKey) error {
// identity public key. If a particular LinkNode for the passed identity public
// key cannot be found, then ErrNodeNotFound if returned.
func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
var (
node *LinkNode
err error
)
err = db.View(func(tx *bbolt.Tx) error {
// First fetch the bucket for storing node metadata, bailing
// out early if it hasn't been created yet.
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
if nodeMetaBucket == nil {
return ErrLinkNodesNotFound
var linkNode *LinkNode
err := db.View(func(tx *bbolt.Tx) error {
node, err := fetchLinkNode(tx, identity)
if err != nil {
return err
}
// If a link node for that particular public key cannot be
// located, then exit early with an ErrNodeNotFound.
pubKey := identity.SerializeCompressed()
nodeBytes := nodeMetaBucket.Get(pubKey)
if nodeBytes == nil {
return ErrNodeNotFound
}
// Finally, decode an allocate a fresh LinkNode object to be
// returned to the caller.
nodeReader := bytes.NewReader(nodeBytes)
node, err = deserializeLinkNode(nodeReader)
return err
linkNode = node
return nil
})
if err != nil {
return nil, err
return linkNode, err
}
func fetchLinkNode(tx *bbolt.Tx, targetPub *btcec.PublicKey) (*LinkNode, error) {
// First fetch the bucket for storing node metadata, bailing out early
// if it hasn't been created yet.
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
if nodeMetaBucket == nil {
return nil, ErrLinkNodesNotFound
}
return node, nil
// If a link node for that particular public key cannot be located,
// then exit early with an ErrNodeNotFound.
pubKey := targetPub.SerializeCompressed()
nodeBytes := nodeMetaBucket.Get(pubKey)
if nodeBytes == nil {
return nil, ErrNodeNotFound
}
// Finally, decode and allocate a fresh LinkNode object to be returned
// to the caller.
nodeReader := bytes.NewReader(nodeBytes)
return deserializeLinkNode(nodeReader)
}
// TODO(roasbeef): update link node addrs in server upon connection
// FetchAllLinkNodes starts a new database transaction to fetch all nodes with
// whom we have active channels with.
func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) {

@ -224,23 +224,10 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
// With the channels fetched, attempt to locate
// the target channel according to its channel
// point.
dbChannels, err := c.chanSource.FetchAllChannels()
channel, err := c.chanSource.FetchChannel(chanPoint)
if err != nil {
return nil, err
}
var channel *channeldb.OpenChannel
for _, dbChannel := range dbChannels {
if dbChannel.FundingOutpoint == chanPoint {
channel = dbChannel
break
}
}
// If the channel cannot be located, then we
// exit with an error to the channel.
if channel == nil {
return nil, fmt.Errorf("unable to find channel")
}
chanMachine, err := lnwallet.NewLightningChannel(
c.cfg.Signer, c.cfg.PreimageDB, channel, nil,

@ -5640,14 +5640,24 @@ func (lc *LightningChannel) ForceClose() (*LocalForceCloseSummary, error) {
lc.Lock()
defer lc.Unlock()
// If we've detected local data loss for this channel, then we won't
// allow a force close, as it may be the case that we have a dated
// version of the commitment, or this is actually a channel shell.
if lc.channelState.HasChanStatus(channeldb.ChanStatusLocalDataLoss) {
return nil, fmt.Errorf("cannot force close channel with "+
"state: %v", lc.channelState.ChanStatus())
}
commitTx, err := lc.getSignedCommitTx()
if err != nil {
return nil, err
}
localCommitment := lc.channelState.LocalCommitment
summary, err := NewLocalForceCloseSummary(lc.channelState,
lc.Signer, lc.pCache, commitTx, localCommitment)
summary, err := NewLocalForceCloseSummary(
lc.channelState, lc.Signer, lc.pCache, commitTx,
localCommitment,
)
if err != nil {
return nil, err
}

@ -17,6 +17,7 @@ import (
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
)
@ -6274,3 +6275,33 @@ func TestChannelRestoreCommitHeight(t *testing.T) {
bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 0, 2, 1)
bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 1, 2, 2)
}
// TestForceCloseFailLocalDataLoss tests that we don't allow a force close of a
// channel that's in a non-default state.
func TestForceCloseFailLocalDataLoss(t *testing.T) {
t.Parallel()
aliceChannel, _, cleanUp, err := CreateTestChannels()
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUp()
// Now that we have our set of channels, we'll modify the channel state
// to have a non-default channel flag.
err = aliceChannel.channelState.ApplyChanStatus(
channeldb.ChanStatusLocalDataLoss,
)
if err != nil {
t.Fatalf("unable to apply channel state: %v", err)
}
// Due to the change above, if we attempt to force close this
// channel, we should fail as it isn't safe to force close a
// channel that isn't in the pure default state.
_, err = aliceChannel.ForceClose()
if err == nil {
t.Fatalf("expected force close to fail due to non-default " +
"chan state")
}
}

@ -409,7 +409,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
// Skip adding any permanently irreconcilable channels to the
// htlcswitch.
if dbChan.ChanStatus() != channeldb.Default {
if dbChan.ChanStatus() != channeldb.ChanStatusDefault {
peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+
"start.", chanPoint, dbChan.ChanStatus())
continue

@ -1709,7 +1709,7 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
// With the chanPoint constructed, we'll attempt to find the target
// channel in the database. If we can't find the channel, then we'll
// return the error back to the caller.
dbChan, err := r.fetchOpenDbChannel(*chanPoint)
dbChan, err := r.server.chanDB.FetchChannel(*chanPoint)
if err != nil {
return nil, err
}
@ -1746,42 +1746,13 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
return &lnrpc.AbandonChannelResponse{}, nil
}
// fetchOpenDbChannel attempts to locate a channel identified by its channel
// point from the database's set of all currently opened channels.
func (r *rpcServer) fetchOpenDbChannel(chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error) {
dbChannels, err := r.server.chanDB.FetchAllChannels()
if err != nil {
return nil, err
}
// With the channels fetched, attempt to locate the target channel
// according to its channel point.
var dbChan *channeldb.OpenChannel
for _, dbChannel := range dbChannels {
if dbChannel.FundingOutpoint == chanPoint {
dbChan = dbChannel
break
}
}
// If the channel cannot be located, then we exit with an error to the
// caller.
if dbChan == nil {
return nil, fmt.Errorf("unable to find channel")
}
return dbChan, nil
}
// fetchActiveChannel attempts to locate a channel identified by its channel
// point from the database's set of all currently opened channels and
// return it as a fully populated state machine
func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) (
*lnwallet.LightningChannel, error) {
dbChan, err := r.fetchOpenDbChannel(chanPoint)
dbChan, err := r.server.chanDB.FetchChannel(chanPoint)
if err != nil {
return nil, err
}