channeldb: update FetchOpenChannel to traverse new chain bucket

This commit is contained in:
Olaoluwa Osuntokun 2017-11-09 20:57:09 -08:00
parent da7a5f7c4e
commit 385818307b
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -9,7 +9,6 @@ import (
"sync" "sync"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
) )
@ -223,65 +222,80 @@ func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error)
return nil return nil
} }
// Within this top level bucket, fetch the bucket dedicated to storing // Within this top level bucket, fetch the bucket dedicated to
// open channel data specific to the remote node. // storing open channel data specific to the remote node.
pub := nodeID.SerializeCompressed() pub := nodeID.SerializeCompressed()
nodeChanBucket := openChanBucket.Bucket(pub) nodeChanBucket := openChanBucket.Bucket(pub)
if nodeChanBucket == nil { if nodeChanBucket == nil {
return nil return nil
} }
// Finally, we both of the necessary buckets retrieved, fetch // Next, we'll need to go down an additional layer in order to
// all the active channels related to this node. // retrieve the channels for each chain the node knows of.
nodeChannels, err := d.fetchNodeChannels(openChanBucket, return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
nodeChanBucket) // If there's a value, it's not a bucket so ignore it.
if err != nil { if v != nil {
return fmt.Errorf("unable to read channel for "+ return nil
"node_key=%x: %v", pub, err) }
}
channels = nodeChannels // If we've found a valid chainhash bucket, then we'll
return nil // retrieve that so we can extract all the channels.
chainBucket := nodeChanBucket.Bucket(chainHash)
if chainBucket == nil {
return fmt.Errorf("unable to read bucket for "+
"chain=%x", chainHash[:])
}
// Finally, we both of the necessary buckets retrieved,
// fetch all the active channels related to this node.
nodeChannels, err := d.fetchNodeChannels(chainBucket)
if err != nil {
return fmt.Errorf("unable to read channel for "+
"chain_hash=%x, node_key=%x: %v",
chainHash[:], pub, err)
}
channels = nodeChannels
return nil
})
}) })
return channels, err return channels, err
} }
// fetchNodeChannels retrieves all active channels from the target // fetchNodeChannels retrieves all active channels from the target chainBucket
// nodeChanBucket. This function is typically used to fetch all the active // which is under a node's dedicated channel bucket. This function is typically
// channels related to a particular node. // used to fetch all the active channels related to a particular node.
func (d *DB) fetchNodeChannels(openChanBucket, func (d *DB) fetchNodeChannels(chainBucket *bolt.Bucket) ([]*OpenChannel, error) {
nodeChanBucket *bolt.Bucket) ([]*OpenChannel, error) {
var channels []*OpenChannel var channels []*OpenChannel
// Once we have the node's channel bucket, iterate through each // A node may have channels on several chains, so for each known chain,
// item in the inner chan ID bucket. This bucket acts as an // we'll extract all the channels.
// index for all channels we currently have open with this node. err := chainBucket.ForEach(func(chanPoint, v []byte) error {
nodeChanIDBucket := nodeChanBucket.Bucket(chanIDBucket[:]) // If there's a value, it's not a bucket so ignore it.
if nodeChanIDBucket == nil { if v != nil {
return nil, nil
}
err := nodeChanIDBucket.ForEach(func(k, v []byte) error {
if k == nil {
return nil return nil
} }
outBytes := bytes.NewReader(k) // Once we've found a valid channel bucket, we'll extract it
chanID := &wire.OutPoint{} // from the node's chain bucket.
if err := readOutpoint(outBytes, chanID); err != nil { chanBucket := chainBucket.Bucket(chanPoint)
var outPoint wire.OutPoint
err := readOutpoint(bytes.NewReader(chanPoint), &outPoint)
if err != nil {
return err return err
} }
oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
oChannel, err := fetchOpenChannel(openChanBucket,
nodeChanBucket, chanID)
if err != nil { if err != nil {
return fmt.Errorf("unable to read channel data for "+ return fmt.Errorf("unable to read channel data for "+
"chan_point=%v: %v", chanID, err) "chan_point=%v: %v", outPoint, err)
} }
oChannel.Db = d oChannel.Db = d
channels = append(channels, oChannel) channels = append(channels, oChannel)
return nil return nil
}) })
if err != nil { if err != nil {
@ -297,17 +311,17 @@ func (d *DB) FetchAllChannels() ([]*OpenChannel, error) {
return fetchChannels(d, false) return fetchChannels(d, false)
} }
// FetchPendingChannels will return channels that have completed the process // FetchPendingChannels will return channels that have completed the process of
// of generating and broadcasting funding transactions, but whose funding // generating and broadcasting funding transactions, but whose funding
// transactions have yet to be confirmed on the blockchain. // transactions have yet to be confirmed on the blockchain.
func (d *DB) FetchPendingChannels() ([]*OpenChannel, error) { func (d *DB) FetchPendingChannels() ([]*OpenChannel, error) {
return fetchChannels(d, true) return fetchChannels(d, true)
} }
// fetchChannels attempts to retrieve channels currently stored in the // fetchChannels attempts to retrieve channels currently stored in the
// database. The pendingOnly parameter determines whether only pending // database. The pendingOnly parameter determines whether only pending channels
// channels will be returned. If no active channels exist within the network, // will be returned. If no active channels exist within the network, then
// then ErrNoActiveChannels is returned. // ErrNoActiveChannels is returned.
func fetchChannels(d *DB, pendingOnly bool) ([]*OpenChannel, error) { func fetchChannels(d *DB, pendingOnly bool) ([]*OpenChannel, error) {
var channels []*OpenChannel var channels []*OpenChannel
@ -319,39 +333,57 @@ func fetchChannels(d *DB, pendingOnly bool) ([]*OpenChannel, error) {
return ErrNoActiveChannels return ErrNoActiveChannels
} }
// Next, fetch the bucket dedicated to storing metadata // Next, fetch the bucket dedicated to storing metadata related
// related to all nodes. All keys within this bucket are the // to all nodes. All keys within this bucket are the serialized
// serialized public keys of all our direct counterparties. // public keys of all our direct counterparties.
nodeMetaBucket := tx.Bucket(nodeInfoBucket) nodeMetaBucket := tx.Bucket(nodeInfoBucket)
if nodeMetaBucket == nil { if nodeMetaBucket == nil {
return fmt.Errorf("node bucket not created") return fmt.Errorf("node bucket not created")
} }
// Finally for each node public key in the bucket, fetch all // Finally for each node public key in the bucket, fetch all
// the channels related to this particualr ndoe. // the channels related to this particular node.
return nodeMetaBucket.ForEach(func(k, v []byte) error { return nodeMetaBucket.ForEach(func(k, v []byte) error {
nodeChanBucket := openChanBucket.Bucket(k) nodeChanBucket := openChanBucket.Bucket(k)
if nodeChanBucket == nil { if nodeChanBucket == nil {
return nil return nil
} }
nodeChannels, err := d.fetchNodeChannels(openChanBucket, return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
nodeChanBucket) // If there's a value, it's not a bucket so
if err != nil { // ignore it.
return fmt.Errorf("unable to read channel for "+ if v != nil {
"node_key=%x: %v", k, err) return nil
}
// TODO(roasbeef): simplify
if pendingOnly {
for _, channel := range nodeChannels {
if channel.IsPending {
channels = append(channels, channel)
}
} }
} else {
channels = append(channels, nodeChannels...) // If we've found a valid chainhash bucket,
} // then we'll retrieve that so we can extract
return nil // all the channels.
chainBucket := nodeChanBucket.Bucket(chainHash)
if chainBucket == nil {
return fmt.Errorf("unable to read "+
"bucket for chain=%x", chainHash[:])
}
nodeChans, err := d.fetchNodeChannels(chainBucket)
if err != nil {
return fmt.Errorf("unable to read "+
"channel for chain_hash=%x, "+
"node_key=%x: %v", chainHash[:], k, err)
}
// TODO(roasbeef): simplify
if pendingOnly {
for _, channel := range nodeChans {
if channel.IsPending {
channels = append(channels, channel)
}
}
} else {
channels = append(channels, nodeChans...)
}
return nil
})
}) })
}) })