channeldb: add new FetchChannelMethod
In this commit, we add a new method, FetchChannel which is required in order to implement the new chanbackup.LiveChannelSource interface.
This commit is contained in:
parent
aaf6456e12
commit
7a88f580ea
@ -408,6 +408,102 @@ func (d *DB) fetchNodeChannels(chainBucket *bbolt.Bucket) ([]*OpenChannel, error
|
|||||||
return channels, nil
|
return channels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FetchChannel attempts to locate a channel specified by the passed channel
|
||||||
|
// point. If the channel cannot be found, then an error will be returned.
|
||||||
|
func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) {
|
||||||
|
var (
|
||||||
|
targetChan *OpenChannel
|
||||||
|
targetChanPoint bytes.Buffer
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := writeOutpoint(&targetChanPoint, &chanPoint); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// chanScan will traverse the following bucket structure:
|
||||||
|
// * nodePub => chainHash => chanPoint
|
||||||
|
//
|
||||||
|
// At each level we go one further, ensuring that we're traversing the
|
||||||
|
// proper key (that's actually a bucket). By only reading the bucket
|
||||||
|
// structure and skipping fully decoding each channel, we save a good
|
||||||
|
// bit of CPU as we don't need to do things like decompress public
|
||||||
|
// keys.
|
||||||
|
chanScan := func(tx *bbolt.Tx) error {
|
||||||
|
// Get the bucket dedicated to storing the metadata for open
|
||||||
|
// channels.
|
||||||
|
openChanBucket := tx.Bucket(openChannelBucket)
|
||||||
|
if openChanBucket == nil {
|
||||||
|
return ErrNoActiveChannels
|
||||||
|
}
|
||||||
|
|
||||||
|
// Within the node channel bucket, are the set of node pubkeys
|
||||||
|
// we have channels with, we don't know the entire set, so
|
||||||
|
// we'll check them all.
|
||||||
|
return openChanBucket.ForEach(func(nodePub, v []byte) error {
|
||||||
|
// Ensure that this is a key the same size as a pubkey,
|
||||||
|
// and also that it leads directly to a bucket.
|
||||||
|
if len(nodePub) != 33 || v != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeChanBucket := openChanBucket.Bucket(nodePub)
|
||||||
|
if nodeChanBucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// The next layer down is all the chains that this node
|
||||||
|
// has channels on with us.
|
||||||
|
return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
|
||||||
|
// If there's a value, it's not a bucket so
|
||||||
|
// ignore it.
|
||||||
|
if v != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
chainBucket := nodeChanBucket.Bucket(chainHash)
|
||||||
|
if chainBucket == nil {
|
||||||
|
return fmt.Errorf("unable to read "+
|
||||||
|
"bucket for chain=%x", chainHash[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally we reach the leaf bucket that stores
|
||||||
|
// all the chanPoints for this node.
|
||||||
|
chanBucket := chainBucket.Bucket(
|
||||||
|
targetChanPoint.Bytes(),
|
||||||
|
)
|
||||||
|
if chanBucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
channel, err := fetchOpenChannel(
|
||||||
|
chanBucket, &chanPoint,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
targetChan = channel
|
||||||
|
targetChan.Db = d
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
err := d.View(chanScan)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if targetChan != nil {
|
||||||
|
return targetChan, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we can't find the channel, then we return with an error, as we
|
||||||
|
// have nothing to backup.
|
||||||
|
return nil, ErrChannelNotFound
|
||||||
|
}
|
||||||
|
|
||||||
// FetchAllChannels attempts to retrieve all open channels currently stored
|
// FetchAllChannels attempts to retrieve all open channels currently stored
|
||||||
// within the database, including pending open, fully open and channels waiting
|
// within the database, including pending open, fully open and channels waiting
|
||||||
// for a closing transaction to confirm.
|
// for a closing transaction to confirm.
|
||||||
|
@ -5,10 +5,12 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/btcsuite/btcutil"
|
"github.com/btcsuite/btcutil"
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -210,3 +212,62 @@ func TestAddrsForNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestFetchChannel tests that we're able to fetch an arbitrary channel from
|
||||||
|
// disk.
|
||||||
|
func TestFetchChannel(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
cdb, cleanUp, err := makeTestDB()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to make test database: %v", err)
|
||||||
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
|
// Create the test channel state that we'll sync to the database
|
||||||
|
// shortly.
|
||||||
|
channelState, err := createTestChannelState(cdb)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create channel state: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark the channel as pending, then immediately mark it as open to it
|
||||||
|
// can be fully visible.
|
||||||
|
addr := &net.TCPAddr{
|
||||||
|
IP: net.ParseIP("127.0.0.1"),
|
||||||
|
Port: 18555,
|
||||||
|
}
|
||||||
|
if err := channelState.SyncPending(addr, 9); err != nil {
|
||||||
|
t.Fatalf("unable to save and serialize channel state: %v", err)
|
||||||
|
}
|
||||||
|
err = channelState.MarkAsOpen(lnwire.NewShortChanIDFromInt(99))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to mark channel open: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next, attempt to fetch the channel by its chan point.
|
||||||
|
dbChannel, err := cdb.FetchChannel(channelState.FundingOutpoint)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch channel: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The decoded channel state should be identical to what we stored
|
||||||
|
// above.
|
||||||
|
if !reflect.DeepEqual(channelState, dbChannel) {
|
||||||
|
t.Fatalf("channel state doesn't match:: %v vs %v",
|
||||||
|
spew.Sdump(channelState), spew.Sdump(dbChannel))
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we attempt to query for a non-exist ante channel, then we should
|
||||||
|
// get an error.
|
||||||
|
channelState2, err := createTestChannelState(cdb)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create channel state: %v", err)
|
||||||
|
}
|
||||||
|
channelState2.FundingOutpoint.Index ^= 1
|
||||||
|
|
||||||
|
_, err = cdb.FetchChannel(channelState2.FundingOutpoint)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected query to fail")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user