channeldb: store unknown policies in database

The commit ensures that for every channel, there will always
be two entries in the edges bucket. If the policy from one or
both ends of the channel is unknown, it is marked as such.

This allows efficient lookup of incoming edges. This is
required for backwards payment path finding.
This commit is contained in:
Joost Jager 2018-06-18 12:35:22 +02:00 committed by Olaoluwa Osuntokun
parent d2612e51bd
commit 6c918a1806
7 changed files with 322 additions and 43 deletions

View File

@ -85,6 +85,17 @@ func (d dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
return d.node.ForEachChannel(d.tx, func(tx *bolt.Tx,
ei *channeldb.ChannelEdgeInfo, ep, _ *channeldb.ChannelEdgePolicy) error {
// Skip channels for which no outgoing edge policy is available.
//
// TODO(joostjager): Ideally the case where channels have a nil
// policy should be supported, as auto pilot is not looking at
// the policies. For now, it is not easily possible to get a
// reference to the other end LightningNode object without
// retrieving the policy.
if ep == nil {
return nil
}
pubkey, _ := ep.Node.PubKey()
edge := ChannelEdge{
Channel: Channel{

View File

@ -59,6 +59,14 @@ var (
number: 3,
migration: migrateInvoiceTimeSeriesOutgoingPayments,
},
{
// The version of the database where every channel
// always has two entries in the edges bucket. If
// a policy is unknown, this will be represented
// by a special byte sequence.
number: 4,
migration: migrateEdgePolicies,
},
}
// Big endian is the preferred byte order, due to cursor scans over

View File

@ -56,16 +56,22 @@ var (
// edgeBucket is a bucket which houses all of the edge or channel
// information within the channel graph. This bucket essentially acts
// as an adjacency list, which in conjunction with a range scan, can be
// used to iterate over all the _outgoing_ edges for a particular node.
// Key in the bucket use a prefix scheme which leads with the node's
// public key and sends with the compact edge ID. For each edgeID,
// there will be two entries within the bucket, as the graph is
// directed: nodes may have different policies w.r.t to fees for their
// respective directions.
// used to iterate over all the incoming and outgoing edges for a
// particular node. Key in the bucket use a prefix scheme which leads
// with the node's public key and sends with the compact edge ID.
// For each chanID, there will be two entries within the bucket, as the
// graph is directed: nodes may have different policies w.r.t to fees
// for their respective directions.
//
// maps: pubKey || edgeID -> edge policy for node
// maps: pubKey || chanID -> channel edge policy for node
edgeBucket = []byte("graph-edge")
// unknownPolicy is represented as an empty slice. It is
// used as the value in edgeBucket for unknown channel edge policies.
// Unknown policies are still stored in the database to enable efficient
// lookup of incoming channel edges.
unknownPolicy = []byte{}
// chanStart is an array of all zero bytes which is used to perform
// range scans within the edgeBucket to obtain all of the outgoing
// edges for a particular node.
@ -511,6 +517,18 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return err
}
// Mark edge policies for both sides as unknown. This is to
// enable efficient incoming channel lookup for a node.
for _, key := range []*[33]byte{&edge.NodeKey1Bytes,
&edge.NodeKey2Bytes} {
err := putChanEdgePolicyUnknown(edges, edge.ChannelID,
key[:])
if err != nil {
return err
}
}
// Finally we add it to the channel index which maps channel
// points (outpoints) to the shorter channel ID's.
var b bytes.Buffer
@ -1759,12 +1777,14 @@ func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool, erro
return updateTime, exists, nil
}
// ForEachChannel iterates through all the outgoing channel edges from this
// node, executing the passed callback with each edge as its sole argument. The
// first edge policy is the outgoing edge *to* the connecting node, while the
// second is the incoming edge *from* the connecting node. If the callback
// returns an error, then the iteration is halted with the error propagated
// back up to the caller.
// ForEachChannel iterates through all channels of this node, executing the
// passed callback with an edge info structure and the policies of each end
// of the channel. The first edge policy is the outgoing edge *to* the
// the connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
//
// If the caller wishes to re-use an existing boltdb transaction, then it
// should be passed as the first argument. Otherwise the first argument should
@ -1805,44 +1825,32 @@ func (l *LightningNode) ForEachChannel(tx *bolt.Tx,
// as its prefix. This indicates that we've stepped over into
// another node's edges, so we can terminate our scan.
edgeCursor := edges.Cursor()
for nodeEdge, edgeInfo := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, edgeInfo = edgeCursor.Next() {
// If the prefix still matches, then the value is the
// raw edge information. So we can now serialize the
// edge info and fetch the outgoing node in order to
// retrieve the full channel edge.
edgeReader := bytes.NewReader(edgeInfo)
toEdgePolicy, err := deserializeChanEdgePolicy(edgeReader, nodes)
if err != nil {
return err
}
toEdgePolicy.db = l.db
toEdgePolicy.Node.db = l.db
for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() {
// If the prefix still matches, the channel id is
// returned in nodeEdge. Channel id is used to lookup
// the node at the other end of the channel and both
// edge policies.
chanID := nodeEdge[33:]
edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
if err != nil {
return err
}
// We'll also fetch the incoming edge so this
// information can be available to the caller.
incomingNode := toEdgePolicy.Node.PubKeyBytes[:]
fromEdgePolicy, err := fetchChanEdgePolicy(
edges, chanID, incomingNode, nodes,
outgoingPolicy, err := fetchChanEdgePolicy(
edges, chanID, nodePub, nodes,
)
if err != nil && err != ErrEdgeNotFound &&
err != ErrGraphNodeNotFound {
otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub)
if err != nil {
return err
}
if fromEdgePolicy != nil {
fromEdgePolicy.db = l.db
if fromEdgePolicy.Node != nil {
fromEdgePolicy.Node.db = l.db
}
}
incomingPolicy, err := fetchChanEdgePolicy(
edges, chanID, otherNode, nodes,
)
// Finally, we execute the callback.
err = cb(tx, &edgeInfo, toEdgePolicy, fromEdgePolicy)
err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy)
if err != nil {
return err
}
@ -2016,6 +2024,21 @@ func (c *ChannelEdgeInfo) BitcoinKey2() (*btcec.PublicKey, error) {
return key, nil
}
// OtherNodeKeyBytes returns the node key bytes of the other end of
// the channel.
func (c *ChannelEdgeInfo) OtherNodeKeyBytes(thisNodeKey []byte) (
[]byte, error) {
switch {
case bytes.Equal(c.NodeKey1Bytes[:], thisNodeKey):
return c.NodeKey2Bytes[:], nil
case bytes.Equal(c.NodeKey2Bytes[:], thisNodeKey):
return c.NodeKey1Bytes[:], nil
default:
return nil, fmt.Errorf("Node not participating in this channel")
}
}
// ChannelAuthProof is the authentication proof (the signature portion) for a
// channel. Using the four signatures contained in the struct, and some
// auxiliary knowledge (the funding script, node identities, and outpoint) nodes
@ -2871,7 +2894,11 @@ func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []b
// If there was already an entry for this edge, then we'll need to
// delete the old one to ensure we don't leave around any after-images.
if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil {
// An unknown policy value does not have a update time recorded, so
// it also does not need to be removed.
if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil &&
!bytes.Equal(edgeBytes[:], unknownPolicy) {
// In order to delete the old entry, we'll need to obtain the
// *prior* update time in order to delete it. To do this, we'll
// create an offset to slice in. Starting backwards, we'll
@ -2899,6 +2926,23 @@ func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []b
return edges.Put(edgeKey[:], b.Bytes()[:])
}
// putChanEdgePolicyUnknown marks the edge policy as unknown
// in the edges bucket.
func putChanEdgePolicyUnknown(edges *bolt.Bucket, channelID uint64,
from []byte) error {
var edgeKey [33 + 8]byte
copy(edgeKey[:], from)
byteOrder.PutUint64(edgeKey[33:], channelID)
if edges.Get(edgeKey[:]) != nil {
return fmt.Errorf("Cannot write unknown policy for channel %v "+
" when there is already a policy present", channelID)
}
return edges.Put(edgeKey[:], unknownPolicy)
}
func fetchChanEdgePolicy(edges *bolt.Bucket, chanID []byte,
nodePub []byte, nodes *bolt.Bucket) (*ChannelEdgePolicy, error) {
@ -2911,6 +2955,11 @@ func fetchChanEdgePolicy(edges *bolt.Bucket, chanID []byte,
return nil, ErrEdgeNotFound
}
// No need to deserialize unknown policy.
if bytes.Equal(edgeBytes[:], unknownPolicy) {
return nil, nil
}
edgeReader := bytes.NewReader(edgeBytes)
return deserializeChanEdgePolicy(edgeReader, nodes)
@ -2930,7 +2979,7 @@ func fetchChanEdgePolicies(edgeIndex *bolt.Bucket, edges *bolt.Bucket,
// something other than edge non-existence.
node1Pub := edgeInfo[:33]
edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub, nodes)
if err != nil && err != ErrEdgeNotFound {
if err != nil {
return nil, nil, err
}
@ -2945,7 +2994,7 @@ func fetchChanEdgePolicies(edgeIndex *bolt.Bucket, edges *bolt.Bucket,
// half of the edge information.
node2Pub := edgeInfo[33:67]
edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub, nodes)
if err != nil && err != ErrEdgeNotFound {
if err != nil {
return nil, nil, err
}

View File

@ -350,6 +350,15 @@ func TestEdgeInsertionDeletion(t *testing.T) {
t.Fatalf("unable to create channel edge: %v", err)
}
// Ensure that both policies are returned as unknown (nil).
_, e1, e2, err := graph.FetchChannelEdgesByID(chanID)
if err != nil {
t.Fatalf("unable to fetch channel edge")
}
if e1 != nil || e2 != nil {
t.Fatalf("channel edges not unknown")
}
// Next, attempt to delete the edge from the database, again this
// should proceed without any issues.
if err := graph.DeleteChannelEdge(&outpoint); err != nil {
@ -918,6 +927,12 @@ func TestGraphTraversal(t *testing.T) {
err = firstNode.ForEachChannel(nil, func(_ *bolt.Tx, _ *ChannelEdgeInfo,
outEdge, inEdge *ChannelEdgePolicy) error {
// All channels between first and second node should have fully
// (both sides) specified policies.
if inEdge == nil || outEdge == nil {
return fmt.Errorf("channel policy not present")
}
// Each should indicate that it's outgoing (pointed
// towards the second node).
if !bytes.Equal(outEdge.Node.PubKeyBytes[:], secondNode.PubKeyBytes[:]) {
@ -1941,6 +1956,119 @@ func TestFetchChanInfos(t *testing.T) {
}
}
// TestIncompleteChannelPolicies tests that a channel that only has a policy
// specified on one end is properly returned in ForEachChannel calls from
// both sides.
func TestIncompleteChannelPolicies(t *testing.T) {
t.Parallel()
db, cleanUp, err := makeTestDB()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// Create two nodes.
node1, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node1); err != nil {
t.Fatalf("unable to add node: %v", err)
}
node2, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node2); err != nil {
t.Fatalf("unable to add node: %v", err)
}
// Create channel between nodes.
txHash := sha256.Sum256([]byte{0})
op := wire.OutPoint{
Hash: txHash,
Index: 0,
}
channel, chanID := createEdge(
uint32(0), 0, 0, 0, node1, node2,
)
if err := graph.AddChannelEdge(&channel); err != nil {
t.Fatalf("unable to create channel edge: %v", err)
}
// Ensure that channel is reported with unknown policies.
checkPolicies := func(node *LightningNode, expectedIn, expectedOut bool) {
calls := 0
node.ForEachChannel(nil, func(_ *bolt.Tx, _ *ChannelEdgeInfo,
outEdge, inEdge *ChannelEdgePolicy) error {
if !expectedOut && outEdge != nil {
t.Fatalf("Expected no outgoing policy")
}
if expectedOut && outEdge == nil {
t.Fatalf("Expected an outgoing policy")
}
if !expectedIn && inEdge != nil {
t.Fatalf("Expected no incoming policy")
}
if expectedIn && inEdge == nil {
t.Fatalf("Expected an incoming policy")
}
calls++
return nil
})
if calls != 1 {
t.Fatalf("Expected only one callback call")
}
}
checkPolicies(node2, false, false)
// Only create an edge policy for node1 and leave the policy for node2
// unknown.
updateTime := time.Unix(1234, 0)
edgePolicy := newEdgePolicy(
chanID.ToUint64(), op, db, updateTime.Unix(),
)
edgePolicy.Flags = 0
edgePolicy.Node = node2
edgePolicy.SigBytes = testSig.Serialize()
if err := graph.UpdateEdgePolicy(edgePolicy); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
checkPolicies(node1, false, true)
checkPolicies(node2, true, false)
// Create second policy and assert that both policies are reported
// as present.
edgePolicy = newEdgePolicy(
chanID.ToUint64(), op, db, updateTime.Unix(),
)
edgePolicy.Flags = 1
edgePolicy.Node = node1
edgePolicy.SigBytes = testSig.Serialize()
if err := graph.UpdateEdgePolicy(edgePolicy); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
checkPolicies(node1, true, true)
checkPolicies(node2, true, true)
}
// TestChannelEdgePruningUpdateIndexDeletion tests that once edges are deleted
// from the graph, then their entries within the update index are also cleaned
// up.

View File

@ -300,3 +300,76 @@ func migrateInvoiceTimeSeriesOutgoingPayments(tx *bolt.Tx) error {
return nil
}
// migrateEdgePolicies is a migration function that will update the edges
// bucket. It ensure that edges with unknown policies will also have an entry
// in the bucket. After the migration, there will be two edge entries for
// every channel, regardless of whether the policies are known.
func migrateEdgePolicies(tx *bolt.Tx) error {
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return nil
}
edges := tx.Bucket(edgeBucket)
if edges == nil {
return nil
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return nil
}
// checkKey gets the policy from the database with a low-level call
// so that it is still possible to distinguish between unknown and
// not present.
checkKey := func(channelId uint64, keyBytes []byte) error {
var channelID [8]byte
byteOrder.PutUint64(channelID[:], channelId)
_, err := fetchChanEdgePolicy(edges,
channelID[:], keyBytes, nodes)
if err == ErrEdgeNotFound {
log.Tracef("Adding unknown edge policy present for node %x, channel %v",
keyBytes, channelId)
err := putChanEdgePolicyUnknown(edges, channelId, keyBytes)
if err != nil {
return err
}
return nil
}
return err
}
// Iterate over all channels and check both edge policies.
err := edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
infoReader := bytes.NewReader(edgeInfoBytes)
edgeInfo, err := deserializeChanEdgeInfo(infoReader)
if err != nil {
return err
}
for _, key := range [][]byte{edgeInfo.NodeKey1Bytes[:],
edgeInfo.NodeKey2Bytes[:]} {
if err := checkKey(edgeInfo.ChannelID, key); err != nil {
return err
}
}
return nil
})
if err != nil {
return fmt.Errorf("unable to update edge policies: %v", err)
}
log.Infof("Migration of edge policies complete!")
return nil
}

View File

@ -2136,6 +2136,10 @@ func (r *ChannelRouter) ForAllOutgoingChannels(cb func(*channeldb.ChannelEdgeInf
return r.selfNode.ForEachChannel(nil, func(_ *bolt.Tx, c *channeldb.ChannelEdgeInfo,
e, _ *channeldb.ChannelEdgePolicy) error {
if e == nil {
return fmt.Errorf("Channel from self node has no policy")
}
return cb(c, e)
})
}

View File

@ -3733,6 +3733,12 @@ func (r *rpcServer) FeeReport(ctx context.Context,
err = selfNode.ForEachChannel(nil, func(_ *bolt.Tx, chanInfo *channeldb.ChannelEdgeInfo,
edgePolicy, _ *channeldb.ChannelEdgePolicy) error {
// Self node should always have policies for its channels.
if edgePolicy == nil {
return fmt.Errorf("no policy for outgoing channel %v ",
chanInfo.ChannelID)
}
// We'll compute the effective fee rate by converting from a
// fixed point fee rate to a floating point fee rate. The fee
// rate field in the database the amount of mSAT charged per