channeldb: add HasLightningNode+HasChannelEdge methods to ChannelGraph

This commit adds to new functions to the ChannelGraph struct which
allow the callers to query for the existence or non-existence of a
vertex (node) or edge (channel) within the graph. In addition to
returning whether the edge exists, the functions will also return the
last time the state has been modified for the edge or vertex. This will
allow callers to ensure that only the most up to date state is
committed to disk.
This commit is contained in:
Olaoluwa Osuntokun 2016-12-22 12:49:30 -08:00
parent 782fc61dc1
commit 000c334e63
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 131 additions and 44 deletions

@ -456,6 +456,8 @@ type ChannelDelta struct {
RemoteBalance btcutil.Amount RemoteBalance btcutil.Amount
UpdateNum uint32 UpdateNum uint32
// TODO(roasbeef): add blockhash or timestamp?
Htlcs []*HTLC Htlcs []*HTLC
} }
@ -690,8 +692,9 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
channel *OpenChannel) error { channel *OpenChannel) error {
// First write out all the "common" fields using the field's prefix // First write out all the "common" fields using the field's prefix
// appened with the channel's ID. These fields go into a top-level bucket // append with the channel's ID. These fields go into a top-level
// to allow for ease of metric aggregation via efficient prefix scans. // bucket to allow for ease of metric aggregation via efficient prefix
// scans.
if err := putChanCapacity(openChanBucket, channel); err != nil { if err := putChanCapacity(openChanBucket, channel); err != nil {
return err return err
} }

@ -212,9 +212,9 @@ func (c *ChannelGraph) ForEachNode(cb func(*LightningNode) error) error {
// as the center node within a star-graph. This method may be used to kick-off // as the center node within a star-graph. This method may be used to kick-off
// a path finding algorithm in order to explore the reachability of another // a path finding algorithm in order to explore the reachability of another
// node based off the source node. // node based off the source node.
func (r *ChannelGraph) SourceNode() (*LightningNode, error) { func (c *ChannelGraph) SourceNode() (*LightningNode, error) {
var source *LightningNode var source *LightningNode
err := r.db.View(func(tx *bolt.Tx) error { err := c.db.View(func(tx *bolt.Tx) error {
// First grab the nodes bucket which stores the mapping from // First grab the nodes bucket which stores the mapping from
// pubKey to node information. // pubKey to node information.
nodes := tx.Bucket(nodeBucket) nodes := tx.Bucket(nodeBucket)
@ -235,7 +235,7 @@ func (r *ChannelGraph) SourceNode() (*LightningNode, error) {
} }
source = node source = node
source.db = r.db source.db = c.db
return nil return nil
}) })
if err != nil { if err != nil {
@ -248,9 +248,9 @@ func (r *ChannelGraph) SourceNode() (*LightningNode, error) {
// SetSourceNode sets the source node within the graph database. The source // SetSourceNode sets the source node within the graph database. The source
// node is to be used as the center of a star-graph within path finding // node is to be used as the center of a star-graph within path finding
// algorithms. // algorithms.
func (r *ChannelGraph) SetSourceNode(node *LightningNode) error { func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
nodePub := node.PubKey.SerializeCompressed() nodePub := node.PubKey.SerializeCompressed()
return r.db.Update(func(tx *bolt.Tx) error { return c.db.Update(func(tx *bolt.Tx) error {
// First grab the nodes bucket which stores the mapping from // First grab the nodes bucket which stores the mapping from
// pubKey to node information. // pubKey to node information.
nodes, err := tx.CreateBucketIfNotExists(nodeBucket) nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
@ -273,8 +273,9 @@ func (r *ChannelGraph) SetSourceNode(node *LightningNode) error {
// AddLightningNode adds a new (unconnected) vertex/node to the graph database. // AddLightningNode adds a new (unconnected) vertex/node to the graph database.
// When adding an edge, each node must be added before the edge can be // When adding an edge, each node must be added before the edge can be
// inserted. Afterwards the edge information can then be updated. // inserted. Afterwards the edge information can then be updated.
func (r *ChannelGraph) AddLightningNode(node *LightningNode) error { // TODO(roasbeef): also need sig of announcement
return r.db.Update(func(tx *bolt.Tx) error { func (c *ChannelGraph) AddLightningNode(node *LightningNode) error {
return c.db.Update(func(tx *bolt.Tx) error {
return addLightningNode(tx, node) return addLightningNode(tx, node)
}) })
} }
@ -295,10 +296,10 @@ func addLightningNode(tx *bolt.Tx, node *LightningNode) error {
// LookupAlias attempts to return the alias as advertised by the target node. // LookupAlias attempts to return the alias as advertised by the target node.
// TODO(roasbeef): currently assumes that aliases are unique... // TODO(roasbeef): currently assumes that aliases are unique...
func (r *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) { func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
var alias string var alias string
err := r.db.View(func(tx *bolt.Tx) error { err := c.db.View(func(tx *bolt.Tx) error {
nodes := tx.Bucket(nodeBucket) nodes := tx.Bucket(nodeBucket)
if nodes == nil { if nodes == nil {
return ErrGraphNodesNotFound return ErrGraphNodesNotFound
@ -329,11 +330,11 @@ func (r *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
// DeleteLightningNode removes a vertex/node from the database according to the // DeleteLightningNode removes a vertex/node from the database according to the
// node's public key. // node's public key.
func (r *ChannelGraph) DeleteLightningNode(nodePub *btcec.PublicKey) error { func (c *ChannelGraph) DeleteLightningNode(nodePub *btcec.PublicKey) error {
pub := nodePub.SerializeCompressed() pub := nodePub.SerializeCompressed()
// TODO(roasbeef): ensure dangling edges are removed... // TODO(roasbeef): ensure dangling edges are removed...
return r.db.Update(func(tx *bolt.Tx) error { return c.db.Update(func(tx *bolt.Tx) error {
nodes, err := tx.CreateBucketIfNotExists(nodeBucket) nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil { if err != nil {
return err return err
@ -354,7 +355,7 @@ func (r *ChannelGraph) DeleteLightningNode(nodePub *btcec.PublicKey) error {
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An // AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
// undirected edge from the two target nodes are created. The chanPoint and // undirected edge from the two target nodes are created. The chanPoint and
// chanID are used to uniquely identify the edge globally within the database. // chanID are used to uniquely identify the edge globally within the database.
func (r *ChannelGraph) AddChannelEdge(from, to *btcec.PublicKey, func (c *ChannelGraph) AddChannelEdge(from, to *btcec.PublicKey,
chanPoint *wire.OutPoint, chanID uint64) error { chanPoint *wire.OutPoint, chanID uint64) error {
// Construct the channel's primary key which is the 8-byte channel ID. // Construct the channel's primary key which is the 8-byte channel ID.
@ -380,7 +381,7 @@ func (r *ChannelGraph) AddChannelEdge(from, to *btcec.PublicKey,
node2 = fromBytes node2 = fromBytes
} }
return r.db.Update(func(tx *bolt.Tx) error { return c.db.Update(func(tx *bolt.Tx) error {
edges, err := tx.CreateBucketIfNotExists(edgeBucket) edges, err := tx.CreateBucketIfNotExists(edgeBucket)
if err != nil { if err != nil {
return err return err
@ -422,13 +423,19 @@ func (r *ChannelGraph) AddChannelEdge(from, to *btcec.PublicKey,
} }
// HasChannelEdge returns true if the database knows of a channel edge with the // HasChannelEdge returns true if the database knows of a channel edge with the
// passed channel ID, and false otherwise. // passed channel ID, and false otherwise. If the an edge with that ID is found
func (r *ChannelGraph) HasChannelEdge(chanID uint64) (bool, error) { // within the graph, then two time stamps representing the last time the edge
// was updated for both directed edges are returned along with the boolean.
func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool, error) {
// TODO(roasbeef): check internal bloom filter first // TODO(roasbeef): check internal bloom filter first
var b bool var (
node1UpdateTime time.Time
node2UpdateTime time.Time
exists bool
)
err := r.db.View(func(tx *bolt.Tx) error { err := c.db.View(func(tx *bolt.Tx) error {
edges := tx.Bucket(edgeBucket) edges := tx.Bucket(edgeBucket)
if edges == nil { if edges == nil {
return ErrGraphNoEdgesFound return ErrGraphNoEdgesFound
@ -440,17 +447,39 @@ func (r *ChannelGraph) HasChannelEdge(chanID uint64) (bool, error) {
var channelID [8]byte var channelID [8]byte
byteOrder.PutUint64(channelID[:], chanID) byteOrder.PutUint64(channelID[:], chanID)
if edgeIndex.Get(channelID[:]) != nil { if edgeIndex.Get(channelID[:]) == nil {
b = true exists = false
return nil
} }
// If the channel has been found in the graph, then retrieve
// the edges itself so we can return the last updated
// timestmaps.
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNodeNotFound
}
e1, e2, err := fetchEdges(edgeIndex, edges, nodes,
channelID[:], c.db)
if err != nil {
// TODO(roasbeef): hack fix to return false until both
// edges are populated
exists = false
return nil
}
node1UpdateTime = e1.LastUpdate
node2UpdateTime = e2.LastUpdate
exists = true
return nil return nil
}) })
if err != nil { if err != nil {
return b, err return time.Time{}, time.Time{}, exists, err
} }
return b, err return node1UpdateTime, node2UpdateTime, exists, err
} }
const ( const (
@ -463,17 +492,17 @@ const (
// PruneGraph prunes newly closed channels from the channel graph in response // PruneGraph prunes newly closed channels from the channel graph in response
// to a new block being solved on the network. Any transactions which spend the // to a new block being solved on the network. Any transactions which spend the
// funding output of any known channels withint he graph will be deleted. // funding output of any known channels within he graph will be deleted.
// Additionally, the "prune tip", or the last block which has been used to // Additionally, the "prune tip", or the last block which has been used to
// prune the graph is stored so callers can ensure the graph is fully in sync // prune the graph is stored so callers can ensure the graph is fully in sync
// with the current UTXO state. An integer is returned which reflects the // with the current UTXO state. An integer is returned which reflects the
// number of channels pruned due to the new incoming block. // number of channels pruned due to the new incoming block.
func (r *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
blockHash *wire.ShaHash, blockHeight uint32) (uint32, error) { blockHash *wire.ShaHash, blockHeight uint32) (uint32, error) {
var numChans uint32 var numChans uint32
err := r.db.Update(func(tx *bolt.Tx) error { err := c.db.Update(func(tx *bolt.Tx) error {
// First grab the edges bucket which houses the information // First grab the edges bucket which houses the information
// we'd like to delete // we'd like to delete
edges, err := tx.CreateBucketIfNotExists(edgeBucket) edges, err := tx.CreateBucketIfNotExists(edgeBucket)
@ -536,14 +565,14 @@ func (r *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
// used to prune channels in the graph. Knowing the "prune tip" allows callers // used to prune channels in the graph. Knowing the "prune tip" allows callers
// to tell if the graph is currently in sync with the current best known UTXO // to tell if the graph is currently in sync with the current best known UTXO
// state. // state.
func (r *ChannelGraph) PruneTip() (*wire.ShaHash, uint32, error) { func (c *ChannelGraph) PruneTip() (*wire.ShaHash, uint32, error) {
var ( var (
currentTip [pruneTipBytes]byte currentTip [pruneTipBytes]byte
tipHash wire.ShaHash tipHash wire.ShaHash
tipHeight uint32 tipHeight uint32
) )
err := r.db.View(func(tx *bolt.Tx) error { err := c.db.View(func(tx *bolt.Tx) error {
graphMeta := tx.Bucket(graphMetaBucket) graphMeta := tx.Bucket(graphMetaBucket)
if graphMeta == nil { if graphMeta == nil {
return ErrGraphNotFound return ErrGraphNotFound
@ -571,12 +600,12 @@ func (r *ChannelGraph) PruneTip() (*wire.ShaHash, uint32, error) {
// DeleteChannelEdge removes an edge from the database as identified by it's // DeleteChannelEdge removes an edge from the database as identified by it's
// funding outpoint. If the edge does not exist within the database, then this // funding outpoint. If the edge does not exist within the database, then this
func (r *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// TODO(roasbeef): possibly delete from node bucket if node has no more // TODO(roasbeef): possibly delete from node bucket if node has no more
// channels // channels
// TODO(roasbeef): don't delete both edges? // TODO(roasbeef): don't delete both edges?
return r.db.Update(func(tx *bolt.Tx) error { return c.db.Update(func(tx *bolt.Tx) error {
// First grab the edges bucket which houses the information // First grab the edges bucket which houses the information
// we'd like to delete // we'd like to delete
edges, err := tx.CreateBucketIfNotExists(edgeBucket) edges, err := tx.CreateBucketIfNotExists(edgeBucket)
@ -723,13 +752,14 @@ type LightningNode struct {
// TODO(roasbeef): add update method and fetch? // TODO(roasbeef): add update method and fetch?
} }
// FetchLightningNode... // FetchLightningNode attempts to look up a target node by its identity public
// key. If the node iwn't found in the database, then ErrGraphNodeNotFound is
// returned.
func (c *ChannelGraph) FetchLightningNode(pub *btcec.PublicKey) (*LightningNode, error) { func (c *ChannelGraph) FetchLightningNode(pub *btcec.PublicKey) (*LightningNode, error) {
node := &LightningNode{db: c.db} var node *LightningNode
nodePub := pub.SerializeCompressed() nodePub := pub.SerializeCompressed()
err := c.db.View(func(tx *bolt.Tx) error { err := c.db.View(func(tx *bolt.Tx) error {
// First grapb the nodes bucket which stores the mapping from // First grab the nodes bucket which stores the mapping from
// pubKey to node information. // pubKey to node information.
nodes := tx.Bucket(nodeBucket) nodes := tx.Bucket(nodeBucket)
if nodes == nil { if nodes == nil {
@ -763,6 +793,54 @@ func (c *ChannelGraph) FetchLightningNode(pub *btcec.PublicKey) (*LightningNode,
return node, nil return node, nil
} }
// HasLightningNode determines if the graph has a vertex identified by the
// target node identity public key. If the node exists in the database, a
// timestamp of when the data for the node was lasted updated is returned along
// with a true boolean. Otherwise, an empty time.Time is returned with a false
// boolean.
func (c *ChannelGraph) HasLightningNode(pub *btcec.PublicKey) (time.Time, bool, error) {
var (
updateTime time.Time
exists bool
)
nodePub := pub.SerializeCompressed()
err := c.db.View(func(tx *bolt.Tx) error {
// First grab the nodes bucket which stores the mapping from
// pubKey to node information.
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
}
// If a key for this serialized public key isn't found, we can
// exit early.
nodeBytes := nodes.Get(nodePub)
if nodeBytes == nil {
exists = false
return nil
}
// Otherwise we continue on to obtain the time stamp
// representing the last time the data for this node was
// updated.
nodeReader := bytes.NewReader(nodeBytes)
node, err := deserializeLightningNode(nodeReader)
if err != nil {
return err
}
exists = true
updateTime = node.LastUpdate
return nil
})
if err != nil {
return time.Time{}, exists, nil
}
return updateTime, exists, nil
}
// ForEachChannel iterates through all the outgoing channel edges from this // ForEachChannel iterates through all the outgoing channel edges from this
// node, executing the passed callback with each edge as its sole argument. If // node, executing the passed callback with each edge as its sole argument. If
// the callback returns an error, then the iteration is halted with the error // the callback returns an error, then the iteration is halted with the error

@ -74,6 +74,12 @@ func TestNodeInsertionAndDeletion(t *testing.T) {
t.Fatalf("unable to locate node: %v", err) t.Fatalf("unable to locate node: %v", err)
} }
if _, exists, err := graph.HasLightningNode(testPub); err != nil {
t.Fatalf("unable to query for node: %v", err)
} else if !exists {
t.Fatalf("node should be found but wasn't")
}
// The two nodes should match exactly! // The two nodes should match exactly!
if !reflect.DeepEqual(node, dbNode) { if !reflect.DeepEqual(node, dbNode) {
t.Fatalf("retrieved node doesn't match: expected %#v\n, got %#v\n", t.Fatalf("retrieved node doesn't match: expected %#v\n, got %#v\n",
@ -213,15 +219,6 @@ func TestEdgeInsertionDeletion(t *testing.T) {
t.Fatalf("unable to create channel edge: %v", err) t.Fatalf("unable to create channel edge: %v", err)
} }
// Check for existence of the edge within the database, it should be
// found.
found, err := graph.HasChannelEdge(chanID)
if err != nil {
t.Fatalf("unable to query for edge: %v", err)
} else if !found {
t.Fatalf("graph should have of inserted edge")
}
// Next, attempt to delete the edge from the database, again this // Next, attempt to delete the edge from the database, again this
// should proceed without any issues. // should proceed without any issues.
if err := graph.DeleteChannelEdge(&outpoint); err != nil { if err := graph.DeleteChannelEdge(&outpoint); err != nil {
@ -329,6 +326,15 @@ func TestEdgeInfoUpdates(t *testing.T) {
t.Fatalf("unable to update edge: %v", err) t.Fatalf("unable to update edge: %v", err)
} }
// Check for existence of the edge within the database, it should be
// found.
_, _, found, err := graph.HasChannelEdge(chanID)
if err != nil {
t.Fatalf("unable to query for edge: %v", err)
} else if !found {
t.Fatalf("graph should have of inserted edge")
}
// With the edges inserted, perform some queries to ensure that they've // With the edges inserted, perform some queries to ensure that they've
// been inserted properly. // been inserted properly.
dbEdge1, dbEdge2, err := graph.FetchChannelEdgesByID(chanID) dbEdge1, dbEdge2, err := graph.FetchChannelEdgesByID(chanID)