From edaa5d4308c28de97ff42d0c19280ec0f4f6a7c9 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 24 Nov 2020 16:39:28 -0800 Subject: [PATCH] channeldb/graph: add batched APIs for graph ingestion --- channeldb/graph.go | 129 ++++++++++++++++++++++------------ channeldb/graph_test.go | 148 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 235 insertions(+), 42 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index aa9abbaf..4e4dd995 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -18,6 +18,7 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -177,16 +178,26 @@ type ChannelGraph struct { cacheMu sync.RWMutex rejectCache *rejectCache chanCache *channelCache + + chanScheduler batch.Scheduler + nodeScheduler batch.Scheduler } // newChannelGraph allocates a new ChannelGraph backed by a DB instance. The // returned instance has its own unique reject cache and channel cache. func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph { - return &ChannelGraph{ + g := &ChannelGraph{ db: db, rejectCache: newRejectCache(rejectCacheSize), chanCache: newChannelCache(chanCacheSize), } + g.chanScheduler = batch.NewTimeScheduler( + db.Backend, &g.cacheMu, 500*time.Millisecond, + ) + g.nodeScheduler = batch.NewTimeScheduler( + db.Backend, nil, 500*time.Millisecond, + ) + return g } // Database returns a pointer to the underlying database. @@ -440,15 +451,17 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { // AddLightningNode adds a vertex/node to the graph database. If the node is not // in the database from before, this will add a new, unconnected one to the // graph. If it is present from before, this will update that node's -// information. Note that this method is expected to only be called to update -// an already present node from a node announcement, or to insert a node found -// in a channel update. +// information. Note that this method is expected to only be called to update an +// already present node from a node announcement, or to insert a node found in a +// channel update. // // TODO(roasbeef): also need sig of announcement func (c *ChannelGraph) AddLightningNode(node *LightningNode) error { - return kvdb.Update(c.db, func(tx kvdb.RwTx) error { - return addLightningNode(tx, node) - }, func() {}) + return c.nodeScheduler.Execute(&batch.Request{ + Update: func(tx kvdb.RwTx) error { + return addLightningNode(tx, node) + }, + }) } func addLightningNode(tx kvdb.RwTx, node *LightningNode) error { @@ -568,26 +581,42 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket, } // AddChannelEdge adds a new (undirected, blank) edge to the graph database. An -// undirected edge from the two target nodes are created. The information -// stored denotes the static attributes of the channel, such as the channelID, -// the keys involved in creation of the channel, and the set of features that -// the channel supports. The chanPoint and chanID are used to uniquely identify -// the edge globally within the database. +// undirected edge from the two target nodes are created. The information stored +// denotes the static attributes of the channel, such as the channelID, the keys +// involved in creation of the channel, and the set of features that the channel +// supports. The chanPoint and chanID are used to uniquely identify the edge +// globally within the database. func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { - c.cacheMu.Lock() - defer c.cacheMu.Unlock() + var alreadyExists bool + return c.chanScheduler.Execute(&batch.Request{ + Reset: func() { + alreadyExists = false + }, + Update: func(tx kvdb.RwTx) error { + err := c.addChannelEdge(tx, edge) - err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { - return c.addChannelEdge(tx, edge) - }, func() {}) - if err != nil { - return err - } + // Silence ErrEdgeAlreadyExist so that the batch can + // succeed, but propagate the error via local state. + if err == ErrEdgeAlreadyExist { + alreadyExists = true + return nil + } - c.rejectCache.remove(edge.ChannelID) - c.chanCache.remove(edge.ChannelID) - - return nil + return err + }, + OnCommit: func(err error) error { + switch { + case err != nil: + return err + case alreadyExists: + return ErrEdgeAlreadyExist + default: + c.rejectCache.remove(edge.ChannelID) + c.chanCache.remove(edge.ChannelID) + return nil + } + }, + }) } // addChannelEdge is the private form of AddChannelEdge that allows callers to @@ -1929,27 +1958,43 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex, // the ChannelEdgePolicy determines which of the directed edges are being // updated. If the flag is 1, then the first node's information is being // updated, otherwise it's the second node's information. The node ordering is -// determined by the lexicographical ordering of the identity public keys of -// the nodes on either side of the channel. +// determined by the lexicographical ordering of the identity public keys of the +// nodes on either side of the channel. func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { - c.cacheMu.Lock() - defer c.cacheMu.Unlock() + var ( + isUpdate1 bool + edgeNotFound bool + ) + return c.chanScheduler.Execute(&batch.Request{ + Reset: func() { + isUpdate1 = false + edgeNotFound = false + }, + Update: func(tx kvdb.RwTx) error { + var err error + isUpdate1, err = updateEdgePolicy(tx, edge) - var isUpdate1 bool - err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { - var err error - isUpdate1, err = updateEdgePolicy(tx, edge) - return err - }, func() { - isUpdate1 = false + // Silence ErrEdgeNotFound so that the batch can + // succeed, but propagate the error via local state. + if err == ErrEdgeNotFound { + edgeNotFound = true + return nil + } + + return err + }, + OnCommit: func(err error) error { + switch { + case err != nil: + return err + case edgeNotFound: + return ErrEdgeNotFound + default: + c.updateEdgeCache(edge, isUpdate1) + return nil + } + }, }) - if err != nil { - return err - } - - c.updateEdgeCache(edge, isUpdate1) - - return nil } func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) { diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 52d1114d..2abdcc8e 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -3,6 +3,7 @@ package channeldb import ( "bytes" "crypto/sha256" + "errors" "fmt" "image/color" "math" @@ -11,6 +12,7 @@ import ( "net" "reflect" "runtime" + "sync" "testing" "time" @@ -21,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" ) var ( @@ -3195,3 +3198,148 @@ func TestComputeFee(t *testing.T) { t.Fatalf("expected fee %v, but got %v", fee, fwdFee) } } + +// TestBatchedAddChannelEdge asserts that BatchedAddChannelEdge properly +// executes multiple AddChannelEdge requests in a single txn. +func TestBatchedAddChannelEdge(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + require.Nil(t, err) + defer cleanUp() + + graph := db.ChannelGraph() + sourceNode, err := createTestVertex(db) + require.Nil(t, err) + err = graph.SetSourceNode(sourceNode) + require.Nil(t, err) + + // We'd like to test the insertion/deletion of edges, so we create two + // vertexes to connect. + node1, err := createTestVertex(db) + require.Nil(t, err) + node2, err := createTestVertex(db) + require.Nil(t, err) + + // In addition to the fake vertexes we create some fake channel + // identifiers. + var spendOutputs []*wire.OutPoint + var blockHash chainhash.Hash + copy(blockHash[:], bytes.Repeat([]byte{1}, 32)) + + // Prune the graph a few times to make sure we have entries in the + // prune log. + _, err = graph.PruneGraph(spendOutputs, &blockHash, 155) + require.Nil(t, err) + var blockHash2 chainhash.Hash + copy(blockHash2[:], bytes.Repeat([]byte{2}, 32)) + + _, err = graph.PruneGraph(spendOutputs, &blockHash2, 156) + require.Nil(t, err) + + // We'll create 3 almost identical edges, so first create a helper + // method containing all logic for doing so. + + // Create an edge which has its block height at 156. + height := uint32(156) + edgeInfo, _ := createEdge(height, 0, 0, 0, node1, node2) + + // Create an edge with block height 157. We give it + // maximum values for tx index and position, to make + // sure our database range scan get edges from the + // entire range. + edgeInfo2, _ := createEdge( + height+1, math.MaxUint32&0x00ffffff, math.MaxUint16, 1, + node1, node2, + ) + + // Create a third edge, this with a block height of 155. + edgeInfo3, _ := createEdge(height-1, 0, 0, 2, node1, node2) + + edges := []ChannelEdgeInfo{edgeInfo, edgeInfo2, edgeInfo3} + errChan := make(chan error, len(edges)) + errTimeout := errors.New("timeout adding batched channel") + + // Now add all these new edges to the database. + var wg sync.WaitGroup + for _, edge := range edges { + wg.Add(1) + go func(edge ChannelEdgeInfo) { + defer wg.Done() + + select { + case errChan <- graph.AddChannelEdge(&edge): + case <-time.After(2 * time.Second): + errChan <- errTimeout + } + }(edge) + } + wg.Wait() + + for i := 0; i < len(edges); i++ { + err := <-errChan + require.Nil(t, err) + } +} + +// TestBatchedUpdateEdgePolicy asserts that BatchedUpdateEdgePolicy properly +// executes multiple UpdateEdgePolicy requests in a single txn. +func TestBatchedUpdateEdgePolicy(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + require.Nil(t, err) + defer cleanUp() + + graph := db.ChannelGraph() + + // We'd like to test the update of edges inserted into the database, so + // we create two vertexes to connect. + node1, err := createTestVertex(db) + require.Nil(t, err) + err = graph.AddLightningNode(node1) + require.Nil(t, err) + node2, err := createTestVertex(db) + require.Nil(t, err) + err = graph.AddLightningNode(node2) + require.Nil(t, err) + + // Create an edge and add it to the db. + edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2) + + // Make sure inserting the policy at this point, before the edge info + // is added, will fail. + err = graph.UpdateEdgePolicy(edge1) + require.Error(t, ErrEdgeNotFound, err) + + // Add the edge info. + err = graph.AddChannelEdge(edgeInfo) + require.Nil(t, err) + + errTimeout := errors.New("timeout adding batched channel") + + updates := []*ChannelEdgePolicy{edge1, edge2} + + errChan := make(chan error, len(updates)) + + // Now add all these new edges to the database. + var wg sync.WaitGroup + for _, update := range updates { + wg.Add(1) + go func(update *ChannelEdgePolicy) { + defer wg.Done() + + select { + case errChan <- graph.UpdateEdgePolicy(update): + case <-time.After(2 * time.Second): + errChan <- errTimeout + } + }(update) + } + wg.Wait() + + for i := 0; i < len(updates); i++ { + err := <-errChan + require.Nil(t, err) + } +}