channeldb: add support for channel graph pruning

This commit adds support for channel graph pruning, which is the method
used to keep the channel graph in sync with the current UTXO state. As
the channel graph is essentially simply a subset of the UTXO set, by
evaluating the channel graph with the set of outfits spent within a
block, then we’re able to prune channels that’ve been closed by
spending their funding outpoint. A new method `PruneGraph` has been
provided which implements the described functionality.

Upon start up any upper routing layers should sync forward in the chain
pruning the channel graph with each newly found block. In order to
facilitate such channel graph reconciliation a new method `PruneTip`
has been added which allows callers to query current pruning state of
the channel graph.
This commit is contained in:
Olaoluwa Osuntokun 2016-12-19 16:58:27 -08:00
parent cbd26b35e0
commit 12538ea922
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
4 changed files with 359 additions and 45 deletions

@ -124,6 +124,10 @@ func (d *DB) Wipe() error {
if err != nil && err != bolt.ErrBucketNotFound {
return err
}
err = tx.DeleteBucket(graphMetaBucket)
if err != nil && err != bolt.ErrBucketNotFound {
return err
}
return nil
})
@ -169,6 +173,9 @@ func createChannelDB(dbPath string) error {
if _, err := tx.CreateBucket(edgeBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(graphMetaBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(metaBucket); err != nil {
return err

@ -23,6 +23,7 @@ var (
ErrGraphNodesNotFound = fmt.Errorf("no graph nodes exist")
ErrGraphNoEdgesFound = fmt.Errorf("no graph edges exist")
ErrGraphNodeNotFound = fmt.Errorf("unable to find node")
ErrGraphNeverPruned = fmt.Errorf("graph never pruned")
ErrEdgeNotFound = fmt.Errorf("edge for chanID not found")

@ -75,6 +75,19 @@ var (
// maps: outPoint -> chanID
channelPointBucket = []byte("chan-index")
// graphMetaBucket is a top-level bucket which stores various meta-deta
// related to the on-disk channel graph. Data strored in this bucket
// includes the block to which the graph has been synced to, the total
// number of channels, etc.
graphMetaBucket = []byte("graph-meta")
// pruneTipKey is a key within the above graphMetaBucket that stores
// the best known blockhash+height that the channel graph has been
// known to be pruned to. Once a new block is discovered, any channels
// that have been closed (by spending the outpoint) can safely be
// removed from the graph.
pruneTipKey = []byte("prune-tip")
edgeBloomKey = []byte("edge-bloom")
nodeBloomKey = []byte("node-bloom")
)
@ -281,6 +294,7 @@ func addLightningNode(tx *bolt.Tx, node *LightningNode) error {
}
// LookupAlias attempts to return the alias as advertised by the target node.
// TODO(roasbeef): currently assumes that aliases are unique...
func (r *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
var alias string
@ -439,14 +453,27 @@ func (r *ChannelGraph) HasChannelEdge(chanID uint64) (bool, error) {
return b, err
}
// DeleteChannelEdge removes an edge from the database as identified by it's
// funding outpoint. If the edge does not exist within the database, then this
func (r *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// TODO(roasbeef): possibly delete from node bucket if node has no more
// channels
// TODO(roasbeef): don't delete both edges?
const (
// pruneTipBytes is the total size of the value which stores the
// current prune tip of the graph. The prune tip indicates if the
// channel graph is in sync with the current UTXO state. The structure
// is: blockHash || blockHeight, taking 36 bytes total.
pruneTipBytes = 32 + 4
)
return r.db.Update(func(tx *bolt.Tx) error {
// PruneGraph prunes newly closed channels from the channel graph in response
// to a new block being solved on the network. Any transactions which spend the
// funding output of any known channels withint he graph will be deleted.
// Additionally, the "prune tip", or the last block which has been used to
// prune the graph is stored so callers can ensure the graph is fully in sync
// with the current UTXO state. An integer is returned which reflects the
// number of channels pruned due to the new incoming block.
func (r *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
blockHash *wire.ShaHash, blockHeight uint32) (uint32, error) {
var numChans uint32
err := r.db.Update(func(tx *bolt.Tx) error {
// First grab the edges bucket which houses the information
// we'd like to delete
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
@ -464,52 +491,159 @@ func (r *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
return err
}
var b bytes.Buffer
if err := writeOutpoint(&b, chanPoint); err != nil {
return err
}
// For each of the outpoints that've been spent within the
// block, we attempt to delete them from the graph as if that
// outpoint was a channel, then it has now been closed.
for _, chanPoint := range spentOutputs {
// TODO(roasbeef): load channel bloom filter, continue
// if NOT if filter
// If the channel's outpoint doesn't exist within the outpoint
// index, then the edge does not exist.
chanID := chanIndex.Get(b.Bytes())
if chanID == nil {
return ErrEdgeNotFound
}
// Otherwise we obtain the two public keys from the mapping:
// chanID -> pubKey1 || pubKey2. With this, we can construct
// the keys which house both of the directed edges for this
// channel.
nodeKeys := edgeIndex.Get(chanID)
// The edge key is of the format pubKey || chanID. First we
// construct the latter half, populating the channel ID.
var edgeKey [33 + 8]byte
copy(edgeKey[33:], chanID)
// With the latter half constructed, copy over the first public
// key to delete the edge in this direction, then the second to
// delete the edge in the opposite direction.
copy(edgeKey[:33], nodeKeys[:33])
if edges.Get(edgeKey[:]) != nil {
if err := edges.Delete(edgeKey[:]); err != nil {
return err
}
}
copy(edgeKey[:33], nodeKeys[33:])
if edges.Get(edgeKey[:]) != nil {
if err := edges.Delete(edgeKey[:]); err != nil {
// Attempt to delete the channel, and ErrEdgeNotFound
// will be returned if that outpoint isn't known to be
// a channel. If no error is returned, then a channel
// was successfully pruned.
err := delChannelByEdge(edges, edgeIndex, chanIndex,
chanPoint)
if err != nil && err != ErrEdgeNotFound {
return err
} else if err == nil {
numChans += 1
}
}
// Finally, with the edge data deleted, we can purge the
// information from the two edge indexes.
if err := edgeIndex.Delete(chanID); err != nil {
metaBucket, err := tx.CreateBucketIfNotExists(graphMetaBucket)
if err != nil {
return err
}
return chanIndex.Delete(b.Bytes())
// With the graph pruned, update the current "prune tip" which
// can eb used to check if the graph is fully synced with the
// current UTXO state.
var newTip [pruneTipBytes]byte
copy(newTip[:], blockHash[:])
byteOrder.PutUint32(newTip[32:], uint32(blockHeight))
return metaBucket.Put(pruneTipKey, newTip[:])
})
if err != nil {
return 0, err
}
return numChans, nil
}
// PruneTip returns the block height and hash of the latest block that has been
// used to prune channels in the graph. Knowing the "prune tip" allows callers
// to tell if the graph is currently in sync with the current best known UTXO
// state.
func (r *ChannelGraph) PruneTip() (*wire.ShaHash, uint32, error) {
var (
currentTip [pruneTipBytes]byte
tipHash wire.ShaHash
tipHeight uint32
)
err := r.db.View(func(tx *bolt.Tx) error {
graphMeta := tx.Bucket(graphMetaBucket)
if graphMeta == nil {
return ErrGraphNotFound
}
tipBytes := graphMeta.Get(pruneTipKey)
if tipBytes == nil {
return ErrGraphNeverPruned
}
copy(currentTip[:], tipBytes)
return nil
})
if err != nil {
return nil, 0, err
}
// Once we have the prune tip, the first 32 bytes are the block hash,
// with the latter 4 bytes being the block height.
copy(tipHash[:], currentTip[:32])
tipHeight = byteOrder.Uint32(currentTip[32:])
return &tipHash, tipHeight, nil
}
// DeleteChannelEdge removes an edge from the database as identified by it's
// funding outpoint. If the edge does not exist within the database, then this
func (r *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// TODO(roasbeef): possibly delete from node bucket if node has no more
// channels
// TODO(roasbeef): don't delete both edges?
return r.db.Update(func(tx *bolt.Tx) error {
// First grab the edges bucket which houses the information
// we'd like to delete
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
if err != nil {
return err
}
// Next grab the two edge indexes which will also need to be updated.
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil {
return err
}
chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
if err != nil {
return err
}
return delChannelByEdge(edges, edgeIndex, chanIndex, chanPoint)
})
}
func delChannelByEdge(edges *bolt.Bucket, edgeIndex *bolt.Bucket,
chanIndex *bolt.Bucket, chanPoint *wire.OutPoint) error {
var b bytes.Buffer
if err := writeOutpoint(&b, chanPoint); err != nil {
return err
}
// If the channel's outpoint doesn't exist within the outpoint
// index, then the edge does not exist.
chanID := chanIndex.Get(b.Bytes())
if chanID == nil {
return ErrEdgeNotFound
}
// Otherwise we obtain the two public keys from the mapping:
// chanID -> pubKey1 || pubKey2. With this, we can construct
// the keys which house both of the directed edges for this
// channel.
nodeKeys := edgeIndex.Get(chanID)
// The edge key is of the format pubKey || chanID. First we
// construct the latter half, populating the channel ID.
var edgeKey [33 + 8]byte
copy(edgeKey[33:], chanID)
// With the latter half constructed, copy over the first public
// key to delete the edge in this direction, then the second to
// delete the edge in the opposite direction.
copy(edgeKey[:33], nodeKeys[:33])
if edges.Get(edgeKey[:]) != nil {
if err := edges.Delete(edgeKey[:]); err != nil {
return err
}
}
copy(edgeKey[:33], nodeKeys[33:])
if edges.Get(edgeKey[:]) != nil {
if err := edges.Delete(edgeKey[:]); err != nil {
return err
}
}
// Finally, with the edge data deleted, we can purge the
// information from the two edge indexes.
if err := edgeIndex.Delete(chanID); err != nil {
return err
}
return chanIndex.Delete(b.Bytes())
}
// UpdateEdgeInfo updates the edge information for a single directed edge

@ -7,6 +7,7 @@ import (
prand "math/rand"
"net"
"reflect"
"runtime"
"testing"
"time"
@ -505,3 +506,174 @@ func TestGraphTraversal(t *testing.T) {
t.Fatalf("all edges for node reached within ForEach")
}
}
func assertPruneTip(t *testing.T, graph *ChannelGraph, blockHash *wire.ShaHash,
blockHeight uint32) {
pruneHash, pruneHeight, err := graph.PruneTip()
if err != nil {
_, _, line, _ := runtime.Caller(1)
t.Fatalf("line %v: unable to fetch prune tip: %v", line, err)
}
if !bytes.Equal(blockHash[:], pruneHash[:]) {
_, _, line, _ := runtime.Caller(1)
t.Fatalf("line: %v, prune tips don't match, expected %x got %x",
line, blockHash, pruneHash)
}
if pruneHeight != blockHeight {
_, _, line, _ := runtime.Caller(1)
t.Fatalf("line %v: prune heights don't match, expected %v "+
"got %v", line, blockHeight, pruneHeight)
}
}
func asserNumChans(t *testing.T, graph *ChannelGraph, n int) {
numChans := 0
if err := graph.ForEachChannel(func(*ChannelEdge, *ChannelEdge) error {
numChans += 1
return nil
}); err != nil {
_, _, line, _ := runtime.Caller(1)
t.Fatalf("line %v:unable to scan channels: %v", line, err)
}
if numChans != n {
_, _, line, _ := runtime.Caller(1)
t.Fatalf("line %v: expected %v chans instead have %v", line, n, numChans)
}
}
func TestGraphPruning(t *testing.T) {
db, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// As initial set up for the test, we'll create a graph with 5 vertexes
// and enough edges to create a fully connected graph. The graph will
// be rather simple, representing a straight line.
const numNodes = 5
graphNodes := make([]*LightningNode, numNodes)
for i := 0; i < numNodes; i++ {
node, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create node: %v", err)
}
if err := graph.AddLightningNode(node); err != nil {
t.Fatalf("unable to add node: %v", err)
}
graphNodes[i] = node
}
// With the vertexes created, we'll next create a series of channels
// between them.
channelPoints := make([]*wire.OutPoint, 0, numNodes-1)
for i := 0; i < numNodes-1; i++ {
txHash := fastsha256.Sum256([]byte{byte(i)})
chanID := uint64(i + 1)
op := wire.OutPoint{
Hash: txHash,
Index: 0,
}
channelPoints = append(channelPoints, &op)
err := graph.AddChannelEdge(graphNodes[i].PubKey,
graphNodes[i+1].PubKey, &op, chanID)
if err != nil {
t.Fatalf("unable to add node: %v", err)
}
// Create and add an edge with random data that points from
// node_i -> node_i+1
edge := randEdge(chanID, op, db)
edge.Flags = 0
edge.Node = graphNodes[i]
if err := graph.UpdateEdgeInfo(edge); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
// Create another random edge that points from node_i+1 ->
// node_i this time.
edge = randEdge(chanID, op, db)
edge.Flags = 1
edge.Node = graphNodes[i]
if err := graph.UpdateEdgeInfo(edge); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
}
// Now with our test graph created, we can test the pruning
// capabilities of the channel graph.
// First we create a mock block that ends up closing the first two
// channels.
var blockHash wire.ShaHash
copy(blockHash[:], bytes.Repeat([]byte{1}, 32))
blockHeight := uint32(1)
block := channelPoints[:2]
numPruned, err := graph.PruneGraph(block, &blockHash, blockHeight)
if err != nil {
t.Fatalf("unable to prune graph: %v", err)
}
if numPruned != 2 {
t.Fatalf("incorrect number of channels pruned: expected %v, got %v",
2, numPruned)
}
// Now ensure that the prune tip has been updated.
assertPruneTip(t, graph, &blockHash, blockHeight)
// Count up the number of channels known within the graph, only 2
// should be remaining.
asserNumChans(t, graph, 2)
// Next we'll create a block that doesn't close any channels within the
// graph to test the negative error case.
fakeHash := fastsha256.Sum256([]byte("test prune"))
nonChannel := &wire.OutPoint{
Hash: fakeHash,
Index: 9,
}
blockHash = fastsha256.Sum256(blockHash[:])
blockHeight = 2
numPruned, err = graph.PruneGraph([]*wire.OutPoint{nonChannel},
&blockHash, blockHeight)
if err != nil {
t.Fatalf("unable to prune graph: %v", err)
}
// No channels should've been detected as pruned.
if numPruned != 0 {
t.Fatalf("channels were pruned but shouldn't have been")
}
// Once again, the prune tip should've been updated.
assertPruneTip(t, graph, &blockHash, blockHeight)
asserNumChans(t, graph, 2)
// Finally, create a block that prunes the remainder of the channels
// from the graph.
blockHash = fastsha256.Sum256(blockHash[:])
blockHeight = 3
numPruned, err = graph.PruneGraph(channelPoints[2:], &blockHash,
blockHeight)
if err != nil {
t.Fatalf("unable to prune graph: %v", err)
}
// The remainder of the channels should've been pruned from the graph.
if numPruned != 2 {
t.Fatalf("incorrect number of channels pruned: expected %v, got %v",
2, numPruned)
}
// The prune tip should be updated, and no channels should be found
// within the current graph.
assertPruneTip(t, graph, &blockHash, blockHeight)
asserNumChans(t, graph, 0)
}