Merge pull request #3367 from cfromknecht/batched-graph-updates
Batched graph updates
This commit is contained in:
commit
7e298f1434
102
batch/batch.go
Normal file
102
batch/batch.go
Normal file
@ -0,0 +1,102 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||
)
|
||||
|
||||
// errSolo is a sentinel error indicating that the requester should re-run the
|
||||
// operation in isolation.
|
||||
var errSolo = errors.New(
|
||||
"batch function returned an error and should be re-run solo",
|
||||
)
|
||||
|
||||
type request struct {
|
||||
*Request
|
||||
errChan chan error
|
||||
}
|
||||
|
||||
type batch struct {
|
||||
db kvdb.Backend
|
||||
start sync.Once
|
||||
reqs []*request
|
||||
clear func(b *batch)
|
||||
locker sync.Locker
|
||||
}
|
||||
|
||||
// trigger is the entry point for the batch and ensures that run is started at
|
||||
// most once.
|
||||
func (b *batch) trigger() {
|
||||
b.start.Do(b.run)
|
||||
}
|
||||
|
||||
// run executes the current batch of requests. If any individual requests fail
|
||||
// alongside others they will be retried by the caller.
|
||||
func (b *batch) run() {
|
||||
// Clear the batch from its scheduler, ensuring that no new requests are
|
||||
// added to this batch.
|
||||
b.clear(b)
|
||||
|
||||
// If a cache lock was provided, hold it until the this method returns.
|
||||
// This is critical for ensuring external consistency of the operation,
|
||||
// so that caches don't get out of sync with the on disk state.
|
||||
if b.locker != nil {
|
||||
b.locker.Lock()
|
||||
defer b.locker.Unlock()
|
||||
}
|
||||
|
||||
// Apply the batch until a subset succeeds or all of them fail. Requests
|
||||
// that fail will be retried individually.
|
||||
for len(b.reqs) > 0 {
|
||||
var failIdx = -1
|
||||
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
|
||||
for i, req := range b.reqs {
|
||||
err := req.Update(tx)
|
||||
if err != nil {
|
||||
failIdx = i
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, func() {
|
||||
for _, req := range b.reqs {
|
||||
if req.Reset != nil {
|
||||
req.Reset()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// If a request's Update failed, extract it and re-run the
|
||||
// batch. The removed request will be retried individually by
|
||||
// the caller.
|
||||
if failIdx >= 0 {
|
||||
req := b.reqs[failIdx]
|
||||
|
||||
// It's safe to shorten b.reqs here because the
|
||||
// scheduler's batch no longer points to us.
|
||||
b.reqs[failIdx] = b.reqs[len(b.reqs)-1]
|
||||
b.reqs = b.reqs[:len(b.reqs)-1]
|
||||
|
||||
// Tell the submitter re-run it solo, continue with the
|
||||
// rest of the batch.
|
||||
req.errChan <- errSolo
|
||||
continue
|
||||
}
|
||||
|
||||
// None of the remaining requests failed, process the errors
|
||||
// using each request's OnCommit closure and return the error
|
||||
// to the requester. If no OnCommit closure is provided, simply
|
||||
// return the error directly.
|
||||
for _, req := range b.reqs {
|
||||
if req.OnCommit != nil {
|
||||
req.errChan <- req.OnCommit(err)
|
||||
} else {
|
||||
req.errChan <- err
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
38
batch/interface.go
Normal file
38
batch/interface.go
Normal file
@ -0,0 +1,38 @@
|
||||
package batch
|
||||
|
||||
import "github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||
|
||||
// Request defines an operation that can be batched into a single bbolt
|
||||
// transaction.
|
||||
type Request struct {
|
||||
// Reset is called before each invocation of Update and is used to clear
|
||||
// any possible modifications to local state as a result of previous
|
||||
// calls to Update that were not committed due to a concurrent batch
|
||||
// failure.
|
||||
//
|
||||
// NOTE: This field is optional.
|
||||
Reset func()
|
||||
|
||||
// Update is applied alongside other operations in the batch.
|
||||
//
|
||||
// NOTE: This method MUST NOT acquire any mutexes.
|
||||
Update func(tx kvdb.RwTx) error
|
||||
|
||||
// OnCommit is called if the batch or a subset of the batch including
|
||||
// this request all succeeded without failure. The passed error should
|
||||
// contain the result of the transaction commit, as that can still fail
|
||||
// even if none of the closures returned an error.
|
||||
//
|
||||
// NOTE: This field is optional.
|
||||
OnCommit func(commitErr error) error
|
||||
}
|
||||
|
||||
// Scheduler abstracts a generic batching engine that accumulates an incoming
|
||||
// set of Requests, executes them, and returns the error from the operation.
|
||||
type Scheduler interface {
|
||||
// Execute schedules a Request for execution with the next available
|
||||
// batch. This method blocks until the the underlying closure has been
|
||||
// run against the databse. The resulting error is returned to the
|
||||
// caller.
|
||||
Execute(req *Request) error
|
||||
}
|
103
batch/scheduler.go
Normal file
103
batch/scheduler.go
Normal file
@ -0,0 +1,103 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||
)
|
||||
|
||||
// TimeScheduler is a batching engine that executes requests within a fixed
|
||||
// horizon. When the first request is received, a TimeScheduler waits a
|
||||
// configurable duration for other concurrent requests to join the batch. Once
|
||||
// this time has elapsed, the batch is closed and executed. Subsequent requests
|
||||
// are then added to a new batch which undergoes the same process.
|
||||
type TimeScheduler struct {
|
||||
db kvdb.Backend
|
||||
locker sync.Locker
|
||||
duration time.Duration
|
||||
|
||||
mu sync.Mutex
|
||||
b *batch
|
||||
}
|
||||
|
||||
// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
|
||||
// which to schedule batches. If the operation needs to modify a higher-level
|
||||
// cache, the cache's lock should be provided to so that external consistency
|
||||
// can be maintained, as successful db operations will cause a request's
|
||||
// OnCommit method to be executed while holding this lock.
|
||||
func NewTimeScheduler(db kvdb.Backend, locker sync.Locker,
|
||||
duration time.Duration) *TimeScheduler {
|
||||
|
||||
return &TimeScheduler{
|
||||
db: db,
|
||||
locker: locker,
|
||||
duration: duration,
|
||||
}
|
||||
}
|
||||
|
||||
// Execute schedules the provided request for batch execution along with other
|
||||
// concurrent requests. The request will be executed within a fixed horizon,
|
||||
// parameterizeed by the duration of the scheduler. The error from the
|
||||
// underlying operation is returned to the caller.
|
||||
//
|
||||
// NOTE: Part of the Scheduler interface.
|
||||
func (s *TimeScheduler) Execute(r *Request) error {
|
||||
req := request{
|
||||
Request: r,
|
||||
errChan: make(chan error, 1),
|
||||
}
|
||||
|
||||
// Add the request to the current batch. If the batch has been cleared
|
||||
// or no batch exists, create a new one.
|
||||
s.mu.Lock()
|
||||
if s.b == nil {
|
||||
s.b = &batch{
|
||||
db: s.db,
|
||||
clear: s.clear,
|
||||
locker: s.locker,
|
||||
}
|
||||
time.AfterFunc(s.duration, s.b.trigger)
|
||||
}
|
||||
s.b.reqs = append(s.b.reqs, &req)
|
||||
s.mu.Unlock()
|
||||
|
||||
// Wait for the batch to process the request. If the batch didn't
|
||||
// ask us to execute the request individually, simply return the error.
|
||||
err := <-req.errChan
|
||||
if err != errSolo {
|
||||
return err
|
||||
}
|
||||
|
||||
// Obtain exclusive access to the cache if this scheduler needs to
|
||||
// modify the cache in OnCommit.
|
||||
if s.locker != nil {
|
||||
s.locker.Lock()
|
||||
defer s.locker.Unlock()
|
||||
}
|
||||
|
||||
// Otherwise, run the request on its own.
|
||||
commitErr := kvdb.Update(s.db, req.Update, func() {
|
||||
if req.Reset != nil {
|
||||
req.Reset()
|
||||
}
|
||||
})
|
||||
|
||||
// Finally, return the commit error directly or execute the OnCommit
|
||||
// closure with the commit error if present.
|
||||
if req.OnCommit != nil {
|
||||
return req.OnCommit(commitErr)
|
||||
}
|
||||
|
||||
return commitErr
|
||||
}
|
||||
|
||||
// clear resets the scheduler's batch to nil so that no more requests can be
|
||||
// added.
|
||||
func (s *TimeScheduler) clear(b *batch) {
|
||||
s.mu.Lock()
|
||||
if s.b == b {
|
||||
s.b = nil
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
@ -275,6 +275,7 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
|
||||
}
|
||||
chanDB.graph = newChannelGraph(
|
||||
chanDB, opts.RejectCacheSize, opts.ChannelCacheSize,
|
||||
opts.BatchCommitInterval,
|
||||
)
|
||||
|
||||
// Synchronize the version of database and apply migrations if needed.
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"github.com/btcsuite/btcd/txscript"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/btcsuite/btcutil"
|
||||
"github.com/lightningnetwork/lnd/batch"
|
||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
@ -177,16 +178,27 @@ type ChannelGraph struct {
|
||||
cacheMu sync.RWMutex
|
||||
rejectCache *rejectCache
|
||||
chanCache *channelCache
|
||||
|
||||
chanScheduler batch.Scheduler
|
||||
nodeScheduler batch.Scheduler
|
||||
}
|
||||
|
||||
// newChannelGraph allocates a new ChannelGraph backed by a DB instance. The
|
||||
// returned instance has its own unique reject cache and channel cache.
|
||||
func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph {
|
||||
return &ChannelGraph{
|
||||
func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int,
|
||||
batchCommitInterval time.Duration) *ChannelGraph {
|
||||
g := &ChannelGraph{
|
||||
db: db,
|
||||
rejectCache: newRejectCache(rejectCacheSize),
|
||||
chanCache: newChannelCache(chanCacheSize),
|
||||
}
|
||||
g.chanScheduler = batch.NewTimeScheduler(
|
||||
db.Backend, &g.cacheMu, batchCommitInterval,
|
||||
)
|
||||
g.nodeScheduler = batch.NewTimeScheduler(
|
||||
db.Backend, nil, batchCommitInterval,
|
||||
)
|
||||
return g
|
||||
}
|
||||
|
||||
// Database returns a pointer to the underlying database.
|
||||
@ -440,15 +452,17 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
|
||||
// AddLightningNode adds a vertex/node to the graph database. If the node is not
|
||||
// in the database from before, this will add a new, unconnected one to the
|
||||
// graph. If it is present from before, this will update that node's
|
||||
// information. Note that this method is expected to only be called to update
|
||||
// an already present node from a node announcement, or to insert a node found
|
||||
// in a channel update.
|
||||
// information. Note that this method is expected to only be called to update an
|
||||
// already present node from a node announcement, or to insert a node found in a
|
||||
// channel update.
|
||||
//
|
||||
// TODO(roasbeef): also need sig of announcement
|
||||
func (c *ChannelGraph) AddLightningNode(node *LightningNode) error {
|
||||
return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
|
||||
return c.nodeScheduler.Execute(&batch.Request{
|
||||
Update: func(tx kvdb.RwTx) error {
|
||||
return addLightningNode(tx, node)
|
||||
}, func() {})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func addLightningNode(tx kvdb.RwTx, node *LightningNode) error {
|
||||
@ -568,26 +582,42 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket,
|
||||
}
|
||||
|
||||
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
|
||||
// undirected edge from the two target nodes are created. The information
|
||||
// stored denotes the static attributes of the channel, such as the channelID,
|
||||
// the keys involved in creation of the channel, and the set of features that
|
||||
// the channel supports. The chanPoint and chanID are used to uniquely identify
|
||||
// the edge globally within the database.
|
||||
// undirected edge from the two target nodes are created. The information stored
|
||||
// denotes the static attributes of the channel, such as the channelID, the keys
|
||||
// involved in creation of the channel, and the set of features that the channel
|
||||
// supports. The chanPoint and chanID are used to uniquely identify the edge
|
||||
// globally within the database.
|
||||
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
|
||||
c.cacheMu.Lock()
|
||||
defer c.cacheMu.Unlock()
|
||||
var alreadyExists bool
|
||||
return c.chanScheduler.Execute(&batch.Request{
|
||||
Reset: func() {
|
||||
alreadyExists = false
|
||||
},
|
||||
Update: func(tx kvdb.RwTx) error {
|
||||
err := c.addChannelEdge(tx, edge)
|
||||
|
||||
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
|
||||
return c.addChannelEdge(tx, edge)
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return err
|
||||
// Silence ErrEdgeAlreadyExist so that the batch can
|
||||
// succeed, but propagate the error via local state.
|
||||
if err == ErrEdgeAlreadyExist {
|
||||
alreadyExists = true
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
},
|
||||
OnCommit: func(err error) error {
|
||||
switch {
|
||||
case err != nil:
|
||||
return err
|
||||
case alreadyExists:
|
||||
return ErrEdgeAlreadyExist
|
||||
default:
|
||||
c.rejectCache.remove(edge.ChannelID)
|
||||
c.chanCache.remove(edge.ChannelID)
|
||||
|
||||
return nil
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// addChannelEdge is the private form of AddChannelEdge that allows callers to
|
||||
@ -1929,51 +1959,71 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex,
|
||||
// the ChannelEdgePolicy determines which of the directed edges are being
|
||||
// updated. If the flag is 1, then the first node's information is being
|
||||
// updated, otherwise it's the second node's information. The node ordering is
|
||||
// determined by the lexicographical ordering of the identity public keys of
|
||||
// the nodes on either side of the channel.
|
||||
// determined by the lexicographical ordering of the identity public keys of the
|
||||
// nodes on either side of the channel.
|
||||
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
|
||||
c.cacheMu.Lock()
|
||||
defer c.cacheMu.Unlock()
|
||||
|
||||
var isUpdate1 bool
|
||||
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
|
||||
var (
|
||||
isUpdate1 bool
|
||||
edgeNotFound bool
|
||||
)
|
||||
return c.chanScheduler.Execute(&batch.Request{
|
||||
Reset: func() {
|
||||
isUpdate1 = false
|
||||
edgeNotFound = false
|
||||
},
|
||||
Update: func(tx kvdb.RwTx) error {
|
||||
var err error
|
||||
isUpdate1, err = updateEdgePolicy(tx, edge)
|
||||
return err
|
||||
}, func() {
|
||||
isUpdate1 = false
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
// Silence ErrEdgeNotFound so that the batch can
|
||||
// succeed, but propagate the error via local state.
|
||||
if err == ErrEdgeNotFound {
|
||||
edgeNotFound = true
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
},
|
||||
OnCommit: func(err error) error {
|
||||
switch {
|
||||
case err != nil:
|
||||
return err
|
||||
case edgeNotFound:
|
||||
return ErrEdgeNotFound
|
||||
default:
|
||||
c.updateEdgeCache(edge, isUpdate1)
|
||||
return nil
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) {
|
||||
// If an entry for this channel is found in reject cache, we'll modify
|
||||
// the entry with the updated timestamp for the direction that was just
|
||||
// written. If the edge doesn't exist, we'll load the cache entry lazily
|
||||
// during the next query for this edge.
|
||||
if entry, ok := c.rejectCache.get(edge.ChannelID); ok {
|
||||
if entry, ok := c.rejectCache.get(e.ChannelID); ok {
|
||||
if isUpdate1 {
|
||||
entry.upd1Time = edge.LastUpdate.Unix()
|
||||
entry.upd1Time = e.LastUpdate.Unix()
|
||||
} else {
|
||||
entry.upd2Time = edge.LastUpdate.Unix()
|
||||
entry.upd2Time = e.LastUpdate.Unix()
|
||||
}
|
||||
c.rejectCache.insert(edge.ChannelID, entry)
|
||||
c.rejectCache.insert(e.ChannelID, entry)
|
||||
}
|
||||
|
||||
// If an entry for this channel is found in channel cache, we'll modify
|
||||
// the entry with the updated policy for the direction that was just
|
||||
// written. If the edge doesn't exist, we'll defer loading the info and
|
||||
// policies and lazily read from disk during the next query.
|
||||
if channel, ok := c.chanCache.get(edge.ChannelID); ok {
|
||||
if channel, ok := c.chanCache.get(e.ChannelID); ok {
|
||||
if isUpdate1 {
|
||||
channel.Policy1 = edge
|
||||
channel.Policy1 = e
|
||||
} else {
|
||||
channel.Policy2 = edge
|
||||
channel.Policy2 = e
|
||||
}
|
||||
c.chanCache.insert(edge.ChannelID, channel)
|
||||
c.chanCache.insert(e.ChannelID, channel)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateEdgePolicy attempts to update an edge's policy within the relevant
|
||||
|
@ -3,6 +3,7 @@ package channeldb
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"image/color"
|
||||
"math"
|
||||
@ -11,6 +12,7 @@ import (
|
||||
"net"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -21,6 +23,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -3195,3 +3198,148 @@ func TestComputeFee(t *testing.T) {
|
||||
t.Fatalf("expected fee %v, but got %v", fee, fwdFee)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBatchedAddChannelEdge asserts that BatchedAddChannelEdge properly
|
||||
// executes multiple AddChannelEdge requests in a single txn.
|
||||
func TestBatchedAddChannelEdge(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, cleanUp, err := MakeTestDB()
|
||||
require.Nil(t, err)
|
||||
defer cleanUp()
|
||||
|
||||
graph := db.ChannelGraph()
|
||||
sourceNode, err := createTestVertex(db)
|
||||
require.Nil(t, err)
|
||||
err = graph.SetSourceNode(sourceNode)
|
||||
require.Nil(t, err)
|
||||
|
||||
// We'd like to test the insertion/deletion of edges, so we create two
|
||||
// vertexes to connect.
|
||||
node1, err := createTestVertex(db)
|
||||
require.Nil(t, err)
|
||||
node2, err := createTestVertex(db)
|
||||
require.Nil(t, err)
|
||||
|
||||
// In addition to the fake vertexes we create some fake channel
|
||||
// identifiers.
|
||||
var spendOutputs []*wire.OutPoint
|
||||
var blockHash chainhash.Hash
|
||||
copy(blockHash[:], bytes.Repeat([]byte{1}, 32))
|
||||
|
||||
// Prune the graph a few times to make sure we have entries in the
|
||||
// prune log.
|
||||
_, err = graph.PruneGraph(spendOutputs, &blockHash, 155)
|
||||
require.Nil(t, err)
|
||||
var blockHash2 chainhash.Hash
|
||||
copy(blockHash2[:], bytes.Repeat([]byte{2}, 32))
|
||||
|
||||
_, err = graph.PruneGraph(spendOutputs, &blockHash2, 156)
|
||||
require.Nil(t, err)
|
||||
|
||||
// We'll create 3 almost identical edges, so first create a helper
|
||||
// method containing all logic for doing so.
|
||||
|
||||
// Create an edge which has its block height at 156.
|
||||
height := uint32(156)
|
||||
edgeInfo, _ := createEdge(height, 0, 0, 0, node1, node2)
|
||||
|
||||
// Create an edge with block height 157. We give it
|
||||
// maximum values for tx index and position, to make
|
||||
// sure our database range scan get edges from the
|
||||
// entire range.
|
||||
edgeInfo2, _ := createEdge(
|
||||
height+1, math.MaxUint32&0x00ffffff, math.MaxUint16, 1,
|
||||
node1, node2,
|
||||
)
|
||||
|
||||
// Create a third edge, this with a block height of 155.
|
||||
edgeInfo3, _ := createEdge(height-1, 0, 0, 2, node1, node2)
|
||||
|
||||
edges := []ChannelEdgeInfo{edgeInfo, edgeInfo2, edgeInfo3}
|
||||
errChan := make(chan error, len(edges))
|
||||
errTimeout := errors.New("timeout adding batched channel")
|
||||
|
||||
// Now add all these new edges to the database.
|
||||
var wg sync.WaitGroup
|
||||
for _, edge := range edges {
|
||||
wg.Add(1)
|
||||
go func(edge ChannelEdgeInfo) {
|
||||
defer wg.Done()
|
||||
|
||||
select {
|
||||
case errChan <- graph.AddChannelEdge(&edge):
|
||||
case <-time.After(2 * time.Second):
|
||||
errChan <- errTimeout
|
||||
}
|
||||
}(edge)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i := 0; i < len(edges); i++ {
|
||||
err := <-errChan
|
||||
require.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBatchedUpdateEdgePolicy asserts that BatchedUpdateEdgePolicy properly
|
||||
// executes multiple UpdateEdgePolicy requests in a single txn.
|
||||
func TestBatchedUpdateEdgePolicy(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, cleanUp, err := MakeTestDB()
|
||||
require.Nil(t, err)
|
||||
defer cleanUp()
|
||||
|
||||
graph := db.ChannelGraph()
|
||||
|
||||
// We'd like to test the update of edges inserted into the database, so
|
||||
// we create two vertexes to connect.
|
||||
node1, err := createTestVertex(db)
|
||||
require.Nil(t, err)
|
||||
err = graph.AddLightningNode(node1)
|
||||
require.Nil(t, err)
|
||||
node2, err := createTestVertex(db)
|
||||
require.Nil(t, err)
|
||||
err = graph.AddLightningNode(node2)
|
||||
require.Nil(t, err)
|
||||
|
||||
// Create an edge and add it to the db.
|
||||
edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2)
|
||||
|
||||
// Make sure inserting the policy at this point, before the edge info
|
||||
// is added, will fail.
|
||||
err = graph.UpdateEdgePolicy(edge1)
|
||||
require.Error(t, ErrEdgeNotFound, err)
|
||||
|
||||
// Add the edge info.
|
||||
err = graph.AddChannelEdge(edgeInfo)
|
||||
require.Nil(t, err)
|
||||
|
||||
errTimeout := errors.New("timeout adding batched channel")
|
||||
|
||||
updates := []*ChannelEdgePolicy{edge1, edge2}
|
||||
|
||||
errChan := make(chan error, len(updates))
|
||||
|
||||
// Now add all these new edges to the database.
|
||||
var wg sync.WaitGroup
|
||||
for _, update := range updates {
|
||||
wg.Add(1)
|
||||
go func(update *ChannelEdgePolicy) {
|
||||
defer wg.Done()
|
||||
|
||||
select {
|
||||
case errChan <- graph.UpdateEdgePolicy(update):
|
||||
case <-time.After(2 * time.Second):
|
||||
errChan <- errTimeout
|
||||
}
|
||||
}(update)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i := 0; i < len(updates); i++ {
|
||||
err := <-errChan
|
||||
require.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,10 @@ type Options struct {
|
||||
// channel cache.
|
||||
ChannelCacheSize int
|
||||
|
||||
// BatchCommitInterval is the maximum duration the batch schedulers will
|
||||
// wait before attempting to commit a pending set of updates.
|
||||
BatchCommitInterval time.Duration
|
||||
|
||||
// clock is the time source used by the database.
|
||||
clock clock.Clock
|
||||
|
||||
@ -92,6 +96,14 @@ func OptionAutoCompactMinAge(minAge time.Duration) OptionModifier {
|
||||
}
|
||||
}
|
||||
|
||||
// OptionSetBatchCommitInterval sets the batch commit interval for the internval
|
||||
// batch schedulers.
|
||||
func OptionSetBatchCommitInterval(interval time.Duration) OptionModifier {
|
||||
return func(o *Options) {
|
||||
o.BatchCommitInterval = interval
|
||||
}
|
||||
}
|
||||
|
||||
// OptionClock sets a non-default clock dependency.
|
||||
func OptionClock(clock clock.Clock) OptionModifier {
|
||||
return func(o *Options) {
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -912,9 +911,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
|
||||
// We'll use this validation to ensure that we process jobs in their
|
||||
// dependency order during parallel validation.
|
||||
validationBarrier := routing.NewValidationBarrier(
|
||||
runtime.NumCPU()*4, d.quit,
|
||||
)
|
||||
validationBarrier := routing.NewValidationBarrier(1000, d.quit)
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -1692,9 +1689,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
// If the edge was rejected due to already being known,
|
||||
// then it may be that case that this new message has a
|
||||
// fresh channel proof, so we'll check.
|
||||
if routing.IsError(err, routing.ErrOutdated,
|
||||
routing.ErrIgnored) {
|
||||
|
||||
if routing.IsError(err, routing.ErrIgnored) {
|
||||
// Attempt to process the rejected message to
|
||||
// see if we get any new announcements.
|
||||
anns, rErr := d.processRejectedEdge(msg, proof)
|
||||
|
@ -3,6 +3,7 @@ package lncfg
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||
)
|
||||
@ -11,12 +12,15 @@ const (
|
||||
dbName = "channel.db"
|
||||
BoltBackend = "bolt"
|
||||
EtcdBackend = "etcd"
|
||||
DefaultBatchCommitInterval = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
// DB holds database configuration for LND.
|
||||
type DB struct {
|
||||
Backend string `long:"backend" description:"The selected database backend."`
|
||||
|
||||
BatchCommitInterval time.Duration `long:"batch-commit-interval" description:"The maximum duration the channel graph batch schedulers will wait before attempting to commit a batch of pending updates. This can be tradeoff database contenion for commit latency."`
|
||||
|
||||
Etcd *kvdb.EtcdConfig `group:"etcd" namespace:"etcd" description:"Etcd settings."`
|
||||
|
||||
Bolt *kvdb.BoltConfig `group:"bolt" namespace:"bolt" description:"Bolt settings."`
|
||||
@ -26,6 +30,7 @@ type DB struct {
|
||||
func DefaultDB() *DB {
|
||||
return &DB{
|
||||
Backend: BoltBackend,
|
||||
BatchCommitInterval: DefaultBatchCommitInterval,
|
||||
Bolt: &kvdb.BoltConfig{
|
||||
AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
|
||||
},
|
||||
|
3
lnd.go
3
lnd.go
@ -1381,6 +1381,7 @@ func initializeDatabases(ctx context.Context,
|
||||
databaseBackends.LocalDB,
|
||||
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
|
||||
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
|
||||
channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval),
|
||||
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
|
||||
)
|
||||
switch {
|
||||
@ -1409,6 +1410,7 @@ func initializeDatabases(ctx context.Context,
|
||||
databaseBackends.LocalDB,
|
||||
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
|
||||
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
|
||||
channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval),
|
||||
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
|
||||
)
|
||||
switch {
|
||||
@ -1433,6 +1435,7 @@ func initializeDatabases(ctx context.Context,
|
||||
remoteChanDB, err = channeldb.CreateWithBackend(
|
||||
databaseBackends.RemoteDB,
|
||||
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
|
||||
channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval),
|
||||
)
|
||||
switch {
|
||||
case err == channeldb.ErrDryRunMigrationOK:
|
||||
|
@ -1850,12 +1850,11 @@ func getChannelPolicies(t *harnessTest, node *lntest.HarnessNode,
|
||||
}
|
||||
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
|
||||
chanGraph, err := node.DescribeGraph(ctxt, descReq)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to query for alice's graph: %v", err)
|
||||
}
|
||||
require.NoError(t.t, err, "unable to query for alice's graph")
|
||||
|
||||
var policies []*lnrpc.RoutingPolicy
|
||||
out:
|
||||
err = wait.NoError(func() error {
|
||||
out:
|
||||
for _, chanPoint := range chanPoints {
|
||||
for _, e := range chanGraph.Edges {
|
||||
if e.ChanPoint != txStr(chanPoint) {
|
||||
@ -1863,9 +1862,11 @@ out:
|
||||
}
|
||||
|
||||
if e.Node1Pub == advertisingNode {
|
||||
policies = append(policies, e.Node1Policy)
|
||||
policies = append(policies,
|
||||
e.Node1Policy)
|
||||
} else {
|
||||
policies = append(policies, e.Node2Policy)
|
||||
policies = append(policies,
|
||||
e.Node2Policy)
|
||||
}
|
||||
|
||||
continue out
|
||||
@ -1873,9 +1874,13 @@ out:
|
||||
|
||||
// If we've iterated over all the known edges and we weren't
|
||||
// able to find this specific one, then we'll fail.
|
||||
t.Fatalf("did not find edge %v", txStr(chanPoint))
|
||||
return fmt.Errorf("did not find edge %v", txStr(chanPoint))
|
||||
}
|
||||
|
||||
return nil
|
||||
}, defaultTimeout)
|
||||
require.NoError(t.t, err)
|
||||
|
||||
return policies
|
||||
}
|
||||
|
||||
|
@ -238,6 +238,7 @@ func (cfg NodeConfig) genArgs() []string {
|
||||
args = append(args, "--nobootstrap")
|
||||
args = append(args, "--debuglevel=debug")
|
||||
args = append(args, "--bitcoin.defaultchanconfs=1")
|
||||
args = append(args, fmt.Sprintf("--db.batch-commit-interval=%v", 10*time.Millisecond))
|
||||
args = append(args, fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV))
|
||||
args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()))
|
||||
args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()))
|
||||
|
@ -3,7 +3,6 @@ package routing
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -914,7 +913,7 @@ func (r *ChannelRouter) networkHandler() {
|
||||
|
||||
// We'll use this validation barrier to ensure that we process all jobs
|
||||
// in the proper order during parallel validation.
|
||||
validationBarrier := NewValidationBarrier(runtime.NumCPU()*4, r.quit)
|
||||
validationBarrier := NewValidationBarrier(1000, r.quit)
|
||||
|
||||
for {
|
||||
|
||||
|
@ -930,6 +930,10 @@ litecoin.node=ltcd
|
||||
; also has experimental support for etcd, a replicated backend.
|
||||
; db.backend=bolt
|
||||
|
||||
; The maximum interval the graph database will wait between attempting to flush
|
||||
; a batch of modifications to disk. Defaults to 500 milliseconds.
|
||||
; db.batch-commit-interval=500ms
|
||||
|
||||
[etcd]
|
||||
; Etcd database host.
|
||||
; db.etcd.host=localhost:2379
|
||||
|
Loading…
Reference in New Issue
Block a user