From 7a88f580eac7bc4a13a559bddc1881cbc0c57016 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:26:02 -0800 Subject: [PATCH] channeldb: add new FetchChannelMethod In this commit, we add a new method, FetchChannel which is required in order to implement the new chanbackup.LiveChannelSource interface. --- channeldb/db.go | 96 ++++++++++++++++++++++++++++++++++++++++++++ channeldb/db_test.go | 61 ++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/channeldb/db.go b/channeldb/db.go index 74b712f1..8e27b021 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -408,6 +408,102 @@ func (d *DB) fetchNodeChannels(chainBucket *bbolt.Bucket) ([]*OpenChannel, error return channels, nil } +// FetchChannel attempts to locate a channel specified by the passed channel +// point. If the channel cannot be found, then an error will be returned. +func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) { + var ( + targetChan *OpenChannel + targetChanPoint bytes.Buffer + ) + + if err := writeOutpoint(&targetChanPoint, &chanPoint); err != nil { + return nil, err + } + + // chanScan will traverse the following bucket structure: + // * nodePub => chainHash => chanPoint + // + // At each level we go one further, ensuring that we're traversing the + // proper key (that's actually a bucket). By only reading the bucket + // structure and skipping fully decoding each channel, we save a good + // bit of CPU as we don't need to do things like decompress public + // keys. + chanScan := func(tx *bbolt.Tx) error { + // Get the bucket dedicated to storing the metadata for open + // channels. + openChanBucket := tx.Bucket(openChannelBucket) + if openChanBucket == nil { + return ErrNoActiveChannels + } + + // Within the node channel bucket, are the set of node pubkeys + // we have channels with, we don't know the entire set, so + // we'll check them all. + return openChanBucket.ForEach(func(nodePub, v []byte) error { + // Ensure that this is a key the same size as a pubkey, + // and also that it leads directly to a bucket. + if len(nodePub) != 33 || v != nil { + return nil + } + + nodeChanBucket := openChanBucket.Bucket(nodePub) + if nodeChanBucket == nil { + return nil + } + + // The next layer down is all the chains that this node + // has channels on with us. + return nodeChanBucket.ForEach(func(chainHash, v []byte) error { + // If there's a value, it's not a bucket so + // ignore it. + if v != nil { + return nil + } + + chainBucket := nodeChanBucket.Bucket(chainHash) + if chainBucket == nil { + return fmt.Errorf("unable to read "+ + "bucket for chain=%x", chainHash[:]) + } + + // Finally we reach the leaf bucket that stores + // all the chanPoints for this node. + chanBucket := chainBucket.Bucket( + targetChanPoint.Bytes(), + ) + if chanBucket == nil { + return nil + } + + channel, err := fetchOpenChannel( + chanBucket, &chanPoint, + ) + if err != nil { + return err + } + + targetChan = channel + targetChan.Db = d + + return nil + }) + }) + } + + err := d.View(chanScan) + if err != nil { + return nil, err + } + + if targetChan != nil { + return targetChan, nil + } + + // If we can't find the channel, then we return with an error, as we + // have nothing to backup. + return nil, ErrChannelNotFound +} + // FetchAllChannels attempts to retrieve all open channels currently stored // within the database, including pending open, fully open and channels waiting // for a closing transaction to confirm. diff --git a/channeldb/db_test.go b/channeldb/db_test.go index dbd255e5..17018333 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -5,10 +5,12 @@ import ( "net" "os" "path/filepath" + "reflect" "testing" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/lnwire" ) @@ -210,3 +212,62 @@ func TestAddrsForNode(t *testing.T) { } } } + +// TestFetchChannel tests that we're able to fetch an arbitrary channel from +// disk. +func TestFetchChannel(t *testing.T) { + t.Parallel() + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + // Create the test channel state that we'll sync to the database + // shortly. + channelState, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + + // Mark the channel as pending, then immediately mark it as open to it + // can be fully visible. + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + if err := channelState.SyncPending(addr, 9); err != nil { + t.Fatalf("unable to save and serialize channel state: %v", err) + } + err = channelState.MarkAsOpen(lnwire.NewShortChanIDFromInt(99)) + if err != nil { + t.Fatalf("unable to mark channel open: %v", err) + } + + // Next, attempt to fetch the channel by its chan point. + dbChannel, err := cdb.FetchChannel(channelState.FundingOutpoint) + if err != nil { + t.Fatalf("unable to fetch channel: %v", err) + } + + // The decoded channel state should be identical to what we stored + // above. + if !reflect.DeepEqual(channelState, dbChannel) { + t.Fatalf("channel state doesn't match:: %v vs %v", + spew.Sdump(channelState), spew.Sdump(dbChannel)) + } + + // If we attempt to query for a non-exist ante channel, then we should + // get an error. + channelState2, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + channelState2.FundingOutpoint.Index ^= 1 + + _, err = cdb.FetchChannel(channelState2.FundingOutpoint) + if err == nil { + t.Fatalf("expected query to fail") + } +}