channeldb/graph: add batched APIs for graph ingestion

This commit is contained in:
Conner Fromknecht 2020-11-24 16:39:28 -08:00
parent 9cfe08c879
commit edaa5d4308
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
2 changed files with 235 additions and 42 deletions

@ -18,6 +18,7 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
@ -177,16 +178,26 @@ type ChannelGraph struct {
cacheMu sync.RWMutex
rejectCache *rejectCache
chanCache *channelCache
chanScheduler batch.Scheduler
nodeScheduler batch.Scheduler
}
// newChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// returned instance has its own unique reject cache and channel cache.
func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph {
return &ChannelGraph{
g := &ChannelGraph{
db: db,
rejectCache: newRejectCache(rejectCacheSize),
chanCache: newChannelCache(chanCacheSize),
}
g.chanScheduler = batch.NewTimeScheduler(
db.Backend, &g.cacheMu, 500*time.Millisecond,
)
g.nodeScheduler = batch.NewTimeScheduler(
db.Backend, nil, 500*time.Millisecond,
)
return g
}
// Database returns a pointer to the underlying database.
@ -440,15 +451,17 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
// AddLightningNode adds a vertex/node to the graph database. If the node is not
// in the database from before, this will add a new, unconnected one to the
// graph. If it is present from before, this will update that node's
// information. Note that this method is expected to only be called to update
// an already present node from a node announcement, or to insert a node found
// in a channel update.
// information. Note that this method is expected to only be called to update an
// already present node from a node announcement, or to insert a node found in a
// channel update.
//
// TODO(roasbeef): also need sig of announcement
func (c *ChannelGraph) AddLightningNode(node *LightningNode) error {
return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
return addLightningNode(tx, node)
}, func() {})
return c.nodeScheduler.Execute(&batch.Request{
Update: func(tx kvdb.RwTx) error {
return addLightningNode(tx, node)
},
})
}
func addLightningNode(tx kvdb.RwTx, node *LightningNode) error {
@ -568,26 +581,42 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket,
}
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
// undirected edge from the two target nodes are created. The information
// stored denotes the static attributes of the channel, such as the channelID,
// the keys involved in creation of the channel, and the set of features that
// the channel supports. The chanPoint and chanID are used to uniquely identify
// the edge globally within the database.
// undirected edge from the two target nodes are created. The information stored
// denotes the static attributes of the channel, such as the channelID, the keys
// involved in creation of the channel, and the set of features that the channel
// supports. The chanPoint and chanID are used to uniquely identify the edge
// globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var alreadyExists bool
return c.chanScheduler.Execute(&batch.Request{
Reset: func() {
alreadyExists = false
},
Update: func(tx kvdb.RwTx) error {
err := c.addChannelEdge(tx, edge)
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
return c.addChannelEdge(tx, edge)
}, func() {})
if err != nil {
return err
}
// Silence ErrEdgeAlreadyExist so that the batch can
// succeed, but propagate the error via local state.
if err == ErrEdgeAlreadyExist {
alreadyExists = true
return nil
}
c.rejectCache.remove(edge.ChannelID)
c.chanCache.remove(edge.ChannelID)
return nil
return err
},
OnCommit: func(err error) error {
switch {
case err != nil:
return err
case alreadyExists:
return ErrEdgeAlreadyExist
default:
c.rejectCache.remove(edge.ChannelID)
c.chanCache.remove(edge.ChannelID)
return nil
}
},
})
}
// addChannelEdge is the private form of AddChannelEdge that allows callers to
@ -1929,27 +1958,43 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex,
// the ChannelEdgePolicy determines which of the directed edges are being
// updated. If the flag is 1, then the first node's information is being
// updated, otherwise it's the second node's information. The node ordering is
// determined by the lexicographical ordering of the identity public keys of
// the nodes on either side of the channel.
// determined by the lexicographical ordering of the identity public keys of the
// nodes on either side of the channel.
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var (
isUpdate1 bool
edgeNotFound bool
)
return c.chanScheduler.Execute(&batch.Request{
Reset: func() {
isUpdate1 = false
edgeNotFound = false
},
Update: func(tx kvdb.RwTx) error {
var err error
isUpdate1, err = updateEdgePolicy(tx, edge)
var isUpdate1 bool
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
var err error
isUpdate1, err = updateEdgePolicy(tx, edge)
return err
}, func() {
isUpdate1 = false
// Silence ErrEdgeNotFound so that the batch can
// succeed, but propagate the error via local state.
if err == ErrEdgeNotFound {
edgeNotFound = true
return nil
}
return err
},
OnCommit: func(err error) error {
switch {
case err != nil:
return err
case edgeNotFound:
return ErrEdgeNotFound
default:
c.updateEdgeCache(edge, isUpdate1)
return nil
}
},
})
if err != nil {
return err
}
c.updateEdgeCache(edge, isUpdate1)
return nil
}
func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) {

@ -3,6 +3,7 @@ package channeldb
import (
"bytes"
"crypto/sha256"
"errors"
"fmt"
"image/color"
"math"
@ -11,6 +12,7 @@ import (
"net"
"reflect"
"runtime"
"sync"
"testing"
"time"
@ -21,6 +23,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/stretchr/testify/require"
)
var (
@ -3195,3 +3198,148 @@ func TestComputeFee(t *testing.T) {
t.Fatalf("expected fee %v, but got %v", fee, fwdFee)
}
}
// TestBatchedAddChannelEdge asserts that BatchedAddChannelEdge properly
// executes multiple AddChannelEdge requests in a single txn.
func TestBatchedAddChannelEdge(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
require.Nil(t, err)
defer cleanUp()
graph := db.ChannelGraph()
sourceNode, err := createTestVertex(db)
require.Nil(t, err)
err = graph.SetSourceNode(sourceNode)
require.Nil(t, err)
// We'd like to test the insertion/deletion of edges, so we create two
// vertexes to connect.
node1, err := createTestVertex(db)
require.Nil(t, err)
node2, err := createTestVertex(db)
require.Nil(t, err)
// In addition to the fake vertexes we create some fake channel
// identifiers.
var spendOutputs []*wire.OutPoint
var blockHash chainhash.Hash
copy(blockHash[:], bytes.Repeat([]byte{1}, 32))
// Prune the graph a few times to make sure we have entries in the
// prune log.
_, err = graph.PruneGraph(spendOutputs, &blockHash, 155)
require.Nil(t, err)
var blockHash2 chainhash.Hash
copy(blockHash2[:], bytes.Repeat([]byte{2}, 32))
_, err = graph.PruneGraph(spendOutputs, &blockHash2, 156)
require.Nil(t, err)
// We'll create 3 almost identical edges, so first create a helper
// method containing all logic for doing so.
// Create an edge which has its block height at 156.
height := uint32(156)
edgeInfo, _ := createEdge(height, 0, 0, 0, node1, node2)
// Create an edge with block height 157. We give it
// maximum values for tx index and position, to make
// sure our database range scan get edges from the
// entire range.
edgeInfo2, _ := createEdge(
height+1, math.MaxUint32&0x00ffffff, math.MaxUint16, 1,
node1, node2,
)
// Create a third edge, this with a block height of 155.
edgeInfo3, _ := createEdge(height-1, 0, 0, 2, node1, node2)
edges := []ChannelEdgeInfo{edgeInfo, edgeInfo2, edgeInfo3}
errChan := make(chan error, len(edges))
errTimeout := errors.New("timeout adding batched channel")
// Now add all these new edges to the database.
var wg sync.WaitGroup
for _, edge := range edges {
wg.Add(1)
go func(edge ChannelEdgeInfo) {
defer wg.Done()
select {
case errChan <- graph.AddChannelEdge(&edge):
case <-time.After(2 * time.Second):
errChan <- errTimeout
}
}(edge)
}
wg.Wait()
for i := 0; i < len(edges); i++ {
err := <-errChan
require.Nil(t, err)
}
}
// TestBatchedUpdateEdgePolicy asserts that BatchedUpdateEdgePolicy properly
// executes multiple UpdateEdgePolicy requests in a single txn.
func TestBatchedUpdateEdgePolicy(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
require.Nil(t, err)
defer cleanUp()
graph := db.ChannelGraph()
// We'd like to test the update of edges inserted into the database, so
// we create two vertexes to connect.
node1, err := createTestVertex(db)
require.Nil(t, err)
err = graph.AddLightningNode(node1)
require.Nil(t, err)
node2, err := createTestVertex(db)
require.Nil(t, err)
err = graph.AddLightningNode(node2)
require.Nil(t, err)
// Create an edge and add it to the db.
edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2)
// Make sure inserting the policy at this point, before the edge info
// is added, will fail.
err = graph.UpdateEdgePolicy(edge1)
require.Error(t, ErrEdgeNotFound, err)
// Add the edge info.
err = graph.AddChannelEdge(edgeInfo)
require.Nil(t, err)
errTimeout := errors.New("timeout adding batched channel")
updates := []*ChannelEdgePolicy{edge1, edge2}
errChan := make(chan error, len(updates))
// Now add all these new edges to the database.
var wg sync.WaitGroup
for _, update := range updates {
wg.Add(1)
go func(update *ChannelEdgePolicy) {
defer wg.Done()
select {
case errChan <- graph.UpdateEdgePolicy(update):
case <-time.After(2 * time.Second):
errChan <- errTimeout
}
}(update)
}
wg.Wait()
for i := 0; i < len(updates); i++ {
err := <-errChan
require.Nil(t, err)
}
}