Merge pull request #2847 from cfromknecht/reject-and-channel-cache

channeldb: add reject and channel caches
This commit is contained in:
Olaoluwa Osuntokun 2019-04-02 18:23:52 -07:00 committed by GitHub
commit 1dc1e8510a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 781 additions and 95 deletions

@ -1027,7 +1027,7 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error {
return ErrNoRestoredChannelMutation
}
err := c.Db.Batch(func(tx *bbolt.Tx) error {
err := c.Db.Update(func(tx *bbolt.Tx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
@ -1465,7 +1465,7 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error {
return ErrNoRestoredChannelMutation
}
return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
// First, we'll grab the writable bucket where this channel's
// data resides.
chanBucket, err := fetchChanBucket(
@ -1608,9 +1608,7 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error {
var newRemoteCommit *ChannelCommitment
err := c.Db.Batch(func(tx *bbolt.Tx) error {
newRemoteCommit = nil
err := c.Db.Update(func(tx *bbolt.Tx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
@ -1748,7 +1746,7 @@ func (c *OpenChannel) AckAddHtlcs(addRefs ...AddRef) error {
c.Lock()
defer c.Unlock()
return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
return c.Packager.AckAddHtlcs(tx, addRefs...)
})
}
@ -1761,7 +1759,7 @@ func (c *OpenChannel) AckSettleFails(settleFailRefs ...SettleFailRef) error {
c.Lock()
defer c.Unlock()
return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
return c.Packager.AckSettleFails(tx, settleFailRefs...)
})
}
@ -1772,7 +1770,7 @@ func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error {
c.Lock()
defer c.Unlock()
return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
return c.Packager.SetFwdFilter(tx, height, fwdFilter)
})
}
@ -1785,15 +1783,14 @@ func (c *OpenChannel) RemoveFwdPkg(height uint64) error {
c.Lock()
defer c.Unlock()
return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
return c.Packager.RemovePkg(tx, height)
})
}
// RevocationLogTail returns the "tail", or the end of the current revocation
// log. This entry represents the last previous state for the remote node's
// commitment chain. The ChannelDelta returned by this method will always lag
// one state behind the most current (unrevoked) state of the remote node's
// commitment chain. The ChannelDelta returned by this method will always lag one state behind the most current (unrevoked) state of the remote node's
// commitment chain.
func (c *OpenChannel) RevocationLogTail() (*ChannelCommitment, error) {
c.RLock()

@ -0,0 +1,50 @@
package channeldb
// channelCache is an in-memory cache used to improve the performance of
// ChanUpdatesInHorizon. It caches the chan info and edge policies for a
// particular channel.
type channelCache struct {
n int
channels map[uint64]ChannelEdge
}
// newChannelCache creates a new channelCache with maximum capacity of n
// channels.
func newChannelCache(n int) *channelCache {
return &channelCache{
n: n,
channels: make(map[uint64]ChannelEdge),
}
}
// get returns the channel from the cache, if it exists.
func (c *channelCache) get(chanid uint64) (ChannelEdge, bool) {
channel, ok := c.channels[chanid]
return channel, ok
}
// insert adds the entry to the channel cache. If an entry for chanid already
// exists, it will be replaced with the new entry. If the entry doesn't exist,
// it will be inserted to the cache, performing a random eviction if the cache
// is at capacity.
func (c *channelCache) insert(chanid uint64, channel ChannelEdge) {
// If entry exists, replace it.
if _, ok := c.channels[chanid]; ok {
c.channels[chanid] = channel
return
}
// Otherwise, evict an entry at random and insert.
if len(c.channels) == c.n {
for id := range c.channels {
delete(c.channels, id)
break
}
}
c.channels[chanid] = channel
}
// remove deletes an edge for chanid from the cache, if it exists.
func (c *channelCache) remove(chanid uint64) {
delete(c.channels, chanid)
}

@ -0,0 +1,105 @@
package channeldb
import (
"reflect"
"testing"
)
// TestChannelCache checks the behavior of the channelCache with respect to
// insertion, eviction, and removal of cache entries.
func TestChannelCache(t *testing.T) {
const cacheSize = 100
// Create a new channel cache with the configured max size.
c := newChannelCache(cacheSize)
// As a sanity check, assert that querying the empty cache does not
// return an entry.
_, ok := c.get(0)
if ok {
t.Fatalf("channel cache should be empty")
}
// Now, fill up the cache entirely.
for i := uint64(0); i < cacheSize; i++ {
c.insert(i, channelForInt(i))
}
// Assert that the cache has all of the entries just inserted, since no
// eviction should occur until we try to surpass the max size.
assertHasChanEntries(t, c, 0, cacheSize)
// Now, insert a new element that causes the cache to evict an element.
c.insert(cacheSize, channelForInt(cacheSize))
// Assert that the cache has this last entry, as the cache should evict
// some prior element and not the newly inserted one.
assertHasChanEntries(t, c, cacheSize, cacheSize)
// Iterate over all inserted elements and construct a set of the evicted
// elements.
evicted := make(map[uint64]struct{})
for i := uint64(0); i < cacheSize+1; i++ {
_, ok := c.get(i)
if !ok {
evicted[i] = struct{}{}
}
}
// Assert that exactly one element has been evicted.
numEvicted := len(evicted)
if numEvicted != 1 {
t.Fatalf("expected one evicted entry, got: %d", numEvicted)
}
// Remove the highest item which initially caused the eviction and
// reinsert the element that was evicted prior.
c.remove(cacheSize)
for i := range evicted {
c.insert(i, channelForInt(i))
}
// Since the removal created an extra slot, the last insertion should
// not have caused an eviction and the entries for all channels in the
// original set that filled the cache should be present.
assertHasChanEntries(t, c, 0, cacheSize)
// Finally, reinsert the existing set back into the cache and test that
// the cache still has all the entries. If the randomized eviction were
// happening on inserts for existing cache items, we expect this to fail
// with high probability.
for i := uint64(0); i < cacheSize; i++ {
c.insert(i, channelForInt(i))
}
assertHasChanEntries(t, c, 0, cacheSize)
}
// assertHasEntries queries the edge cache for all channels in the range [start,
// end), asserting that they exist and their value matches the entry produced by
// entryForInt.
func assertHasChanEntries(t *testing.T, c *channelCache, start, end uint64) {
t.Helper()
for i := start; i < end; i++ {
entry, ok := c.get(i)
if !ok {
t.Fatalf("channel cache should contain chan %d", i)
}
expEntry := channelForInt(i)
if !reflect.DeepEqual(entry, expEntry) {
t.Fatalf("entry mismatch, want: %v, got: %v",
expEntry, entry)
}
}
}
// channelForInt generates a unique ChannelEdge given an integer.
func channelForInt(i uint64) ChannelEdge {
return ChannelEdge{
Info: &ChannelEdgeInfo{
ChannelID: i,
},
}
}

@ -7,7 +7,6 @@ import (
"net"
"os"
"path/filepath"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
@ -104,21 +103,18 @@ var (
byteOrder = binary.BigEndian
)
var bufPool = &sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
// DB is the primary datastore for the lnd daemon. The database stores
// information related to nodes, routing data, open/closed channels, fee
// schedules, and reputation data.
type DB struct {
*bbolt.DB
dbPath string
graph *ChannelGraph
}
// Open opens an existing channeldb. Any necessary schemas migrations due to
// updates will take place as necessary.
func Open(dbPath string) (*DB, error) {
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
path := filepath.Join(dbPath, dbName)
if !fileExists(path) {
@ -127,6 +123,11 @@ func Open(dbPath string) (*DB, error) {
}
}
opts := DefaultOptions()
for _, modifier := range modifiers {
modifier(&opts)
}
bdb, err := bbolt.Open(path, dbFilePermission, nil)
if err != nil {
return nil, err
@ -136,6 +137,9 @@ func Open(dbPath string) (*DB, error) {
DB: bdb,
dbPath: dbPath,
}
chanDB.graph = newChannelGraph(
chanDB, opts.RejectCacheSize, opts.ChannelCacheSize,
)
// Synchronize the version of database and apply migrations if needed.
if err := chanDB.syncVersions(dbVersions); err != nil {
@ -900,9 +904,14 @@ type ChannelShell struct {
// well. This method is idempotent, so repeated calls with the same set of
// channel shells won't modify the database after the initial call.
func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
chanGraph := ChannelGraph{d}
chanGraph := d.ChannelGraph()
return d.Update(func(tx *bbolt.Tx) error {
// TODO(conner): find way to do this w/o accessing internal members?
chanGraph.cacheMu.Lock()
defer chanGraph.cacheMu.Unlock()
var chansRestored []uint64
err := d.Update(func(tx *bbolt.Tx) error {
for _, channelShell := range channelShells {
channel := channelShell.Chan
@ -983,14 +992,26 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
chanEdge.ChannelFlags |= lnwire.ChanUpdateDirection
}
err = updateEdgePolicy(tx, &chanEdge)
_, err = updateEdgePolicy(tx, &chanEdge)
if err != nil {
return err
}
chansRestored = append(chansRestored, edgeInfo.ChannelID)
}
return nil
})
if err != nil {
return err
}
for _, chanid := range chansRestored {
chanGraph.rejectCache.remove(chanid)
chanGraph.chanCache.remove(chanid)
}
return nil
}
// AddrsForNode consults the graph and channel database for all addresses known
@ -1112,7 +1133,7 @@ func (d *DB) syncVersions(versions []version) error {
// ChannelGraph returns a new instance of the directed channel graph.
func (d *DB) ChannelGraph() *ChannelGraph {
return &ChannelGraph{d}
return d.graph
}
func getLatestDBVersion(versions []version) uint32 {

@ -10,6 +10,7 @@ import (
"io"
"math"
"net"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
@ -157,9 +158,19 @@ const (
type ChannelGraph struct {
db *DB
// TODO(roasbeef): store and update bloom filter to reduce disk access
// due to current gossip model
// * LRU cache for edges?
cacheMu sync.RWMutex
rejectCache *rejectCache
chanCache *channelCache
}
// newChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// returned instance has its own unique reject cache and channel cache.
func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph {
return &ChannelGraph{
db: db,
rejectCache: newRejectCache(rejectCacheSize),
chanCache: newChannelCache(chanCacheSize),
}
}
// Database returns a pointer to the underlying database.
@ -334,7 +345,7 @@ func (c *ChannelGraph) sourceNode(nodes *bbolt.Bucket) (*LightningNode, error) {
func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
nodePubBytes := node.PubKeyBytes[:]
return c.db.Batch(func(tx *bbolt.Tx) error {
return c.db.Update(func(tx *bbolt.Tx) error {
// First grab the nodes bucket which stores the mapping from
// pubKey to node information.
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
@ -363,7 +374,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
//
// TODO(roasbeef): also need sig of announcement
func (c *ChannelGraph) AddLightningNode(node *LightningNode) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
return c.db.Update(func(tx *bbolt.Tx) error {
return addLightningNode(tx, node)
})
}
@ -427,7 +438,7 @@ func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
// from the database according to the node's public key.
func (c *ChannelGraph) DeleteLightningNode(nodePub *btcec.PublicKey) error {
// TODO(roasbeef): ensure dangling edges are removed...
return c.db.Batch(func(tx *bbolt.Tx) error {
return c.db.Update(func(tx *bbolt.Tx) error {
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNodeNotFound
@ -491,9 +502,20 @@ func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket,
// the channel supports. The chanPoint and chanID are used to uniquely identify
// the edge globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.db.Update(func(tx *bbolt.Tx) error {
return c.addChannelEdge(tx, edge)
})
if err != nil {
return err
}
c.rejectCache.remove(edge.ChannelID)
c.chanCache.remove(edge.ChannelID)
return nil
}
// addChannelEdge is the private form of AddChannelEdge that allows callers to
@ -598,17 +620,42 @@ func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error
// was updated for both directed edges are returned along with the boolean. If
// it is not found, then the zombie index is checked and its result is returned
// as the second boolean.
func (c *ChannelGraph) HasChannelEdge(chanID uint64,
) (time.Time, time.Time, bool, bool, error) {
func (c *ChannelGraph) HasChannelEdge(
chanID uint64) (time.Time, time.Time, bool, bool, error) {
var (
node1UpdateTime time.Time
node2UpdateTime time.Time
exists bool
isZombie bool
upd1Time time.Time
upd2Time time.Time
exists bool
isZombie bool
)
err := c.db.View(func(tx *bbolt.Tx) error {
// We'll query the cache with the shared lock held to allow multiple
// readers to access values in the cache concurrently if they exist.
c.cacheMu.RLock()
if entry, ok := c.rejectCache.get(chanID); ok {
c.cacheMu.RUnlock()
upd1Time = time.Unix(entry.upd1Time, 0)
upd2Time = time.Unix(entry.upd2Time, 0)
exists, isZombie = entry.flags.unpack()
return upd1Time, upd2Time, exists, isZombie, nil
}
c.cacheMu.RUnlock()
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
// The item was not found with the shared lock, so we'll acquire the
// exclusive lock and check the cache again in case another method added
// the entry to the cache while no lock was held.
if entry, ok := c.rejectCache.get(chanID); ok {
upd1Time = time.Unix(entry.upd1Time, 0)
upd2Time = time.Unix(entry.upd2Time, 0)
exists, isZombie = entry.flags.unpack()
return upd1Time, upd2Time, exists, isZombie, nil
}
if err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
@ -627,7 +674,9 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64,
exists = false
zombieIndex := edges.Bucket(zombieBucket)
if zombieIndex != nil {
isZombie, _, _ = isZombieEdge(zombieIndex, chanID)
isZombie, _, _ = isZombieEdge(
zombieIndex, chanID,
)
}
return nil
@ -653,16 +702,24 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64,
// As we may have only one of the edges populated, only set the
// update time if the edge was found in the database.
if e1 != nil {
node1UpdateTime = e1.LastUpdate
upd1Time = e1.LastUpdate
}
if e2 != nil {
node2UpdateTime = e2.LastUpdate
upd2Time = e2.LastUpdate
}
return nil
}); err != nil {
return time.Time{}, time.Time{}, exists, isZombie, err
}
c.rejectCache.insert(chanID, rejectCacheEntry{
upd1Time: upd1Time.Unix(),
upd2Time: upd2Time.Unix(),
flags: packRejectFlags(exists, isZombie),
})
return node1UpdateTime, node2UpdateTime, exists, isZombie, err
return upd1Time, upd2Time, exists, isZombie, nil
}
// UpdateChannelEdge retrieves and update edge of the graph database. Method
@ -675,7 +732,7 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error {
var chanKey [8]byte
binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
return c.db.Batch(func(tx *bbolt.Tx) error {
return c.db.Update(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edge == nil {
return ErrEdgeNotFound
@ -713,11 +770,12 @@ const (
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
blockHash *chainhash.Hash, blockHeight uint32) ([]*ChannelEdgeInfo, error) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var chansClosed []*ChannelEdgeInfo
err := c.db.Batch(func(tx *bbolt.Tx) error {
chansClosed = nil
err := c.db.Update(func(tx *bbolt.Tx) error {
// First grab the edges bucket which houses the information
// we'd like to delete
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
@ -818,6 +876,11 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
return nil, err
}
for _, channel := range chansClosed {
c.rejectCache.remove(channel.ChannelID)
c.chanCache.remove(channel.ChannelID)
}
return chansClosed, nil
}
@ -826,7 +889,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
// that we only maintain a graph of reachable nodes. In the event that a pruned
// node gains more channels, it will be re-added back to the graph.
func (c *ChannelGraph) PruneGraphNodes() error {
return c.db.Batch(func(tx *bbolt.Tx) error {
return c.db.Update(func(tx *bbolt.Tx) error {
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNodesNotFound
@ -968,12 +1031,13 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
var chanIDEnd [8]byte
byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64())
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
// Keep track of the channels that are removed from the graph.
var removedChans []*ChannelEdgeInfo
if err := c.db.Batch(func(tx *bbolt.Tx) error {
removedChans = nil
if err := c.db.Update(func(tx *bbolt.Tx) error {
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
if err != nil {
return err
@ -1048,6 +1112,11 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
return nil, err
}
for _, channel := range removedChans {
c.rejectCache.remove(channel.ChannelID)
c.chanCache.remove(channel.ChannelID)
}
return removedChans, nil
}
@ -1103,7 +1172,17 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// channels
// TODO(roasbeef): don't delete both edges?
return c.db.Batch(func(tx *bbolt.Tx) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var chanID uint64
err := c.db.Update(func(tx *bbolt.Tx) error {
var err error
chanID, err = getChanID(tx, chanPoint)
if err != nil {
return err
}
// First grab the edges bucket which houses the information
// we'd like to delete
edges := tx.Bucket(edgeBucket)
@ -1135,6 +1214,14 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
chanPoint, true,
)
})
if err != nil {
return err
}
c.rejectCache.remove(chanID)
c.chanCache.remove(chanID)
return nil
}
// ChannelID attempt to lookup the 8-byte compact channel ID which maps to the
@ -1142,33 +1229,39 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// the database, then ErrEdgeNotFound is returned.
func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
var chanID uint64
if err := c.db.View(func(tx *bbolt.Tx) error {
var err error
chanID, err = getChanID(tx, chanPoint)
return err
}); err != nil {
return 0, err
}
return chanID, nil
}
// getChanID returns the assigned channel ID for a given channel point.
func getChanID(tx *bbolt.Tx, chanPoint *wire.OutPoint) (uint64, error) {
var b bytes.Buffer
if err := writeOutpoint(&b, chanPoint); err != nil {
return 0, nil
}
if err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
chanIndex := edges.Bucket(channelPointBucket)
if chanIndex == nil {
return ErrGraphNoEdgesFound
}
chanIDBytes := chanIndex.Get(b.Bytes())
if chanIDBytes == nil {
return ErrEdgeNotFound
}
chanID = byteOrder.Uint64(chanIDBytes)
return nil
}); err != nil {
return 0, err
edges := tx.Bucket(edgeBucket)
if edges == nil {
return 0, ErrGraphNoEdgesFound
}
chanIndex := edges.Bucket(channelPointBucket)
if chanIndex == nil {
return 0, ErrGraphNoEdgesFound
}
chanIDBytes := chanIndex.Get(b.Bytes())
if chanIDBytes == nil {
return 0, ErrEdgeNotFound
}
chanID := byteOrder.Uint64(chanIDBytes)
return chanID, nil
}
@ -1238,8 +1331,13 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
// additional map to keep track of the edges already seen to prevent
// re-adding it.
edgesSeen := make(map[uint64]struct{})
edgesToCache := make(map[uint64]ChannelEdge)
var edgesInHorizon []ChannelEdge
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var hits int
err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
@ -1289,6 +1387,13 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
continue
}
if channel, ok := c.chanCache.get(chanIDInt); ok {
hits++
edgesSeen[chanIDInt] = struct{}{}
edgesInHorizon = append(edgesInHorizon, channel)
continue
}
// First, we'll fetch the static edge information.
edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
if err != nil {
@ -1313,11 +1418,13 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
// Finally, we'll collate this edge with the rest of
// edges to be returned.
edgesSeen[chanIDInt] = struct{}{}
edgesInHorizon = append(edgesInHorizon, ChannelEdge{
channel := ChannelEdge{
Info: &edgeInfo,
Policy1: edge1,
Policy2: edge2,
})
}
edgesInHorizon = append(edgesInHorizon, channel)
edgesToCache[chanIDInt] = channel
}
return nil
@ -1332,6 +1439,15 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
return nil, err
}
// Insert any edges loaded from disk into the cache.
for chanid, channel := range edgesToCache {
c.chanCache.insert(chanid, channel)
}
log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
float64(hits)/float64(len(edgesInHorizon)), hits,
len(edgesInHorizon))
return edgesInHorizon, nil
}
@ -1696,26 +1812,65 @@ func delChannelByEdge(edges, edgeIndex, chanIndex, zombieIndex,
// determined by the lexicographical ordering of the identity public keys of
// the nodes on either side of the channel.
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
return updateEdgePolicy(tx, edge)
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var isUpdate1 bool
err := c.db.Update(func(tx *bbolt.Tx) error {
var err error
isUpdate1, err = updateEdgePolicy(tx, edge)
return err
})
if err != nil {
return err
}
// If an entry for this channel is found in reject cache, we'll modify
// the entry with the updated timestamp for the direction that was just
// written. If the edge doesn't exist, we'll load the cache entry lazily
// during the next query for this edge.
if entry, ok := c.rejectCache.get(edge.ChannelID); ok {
if isUpdate1 {
entry.upd1Time = edge.LastUpdate.Unix()
} else {
entry.upd2Time = edge.LastUpdate.Unix()
}
c.rejectCache.insert(edge.ChannelID, entry)
}
// If an entry for this channel is found in channel cache, we'll modify
// the entry with the updated policy for the direction that was just
// written. If the edge doesn't exist, we'll defer loading the info and
// policies and lazily read from disk during the next query.
if channel, ok := c.chanCache.get(edge.ChannelID); ok {
if isUpdate1 {
channel.Policy1 = edge
} else {
channel.Policy2 = edge
}
c.chanCache.insert(edge.ChannelID, channel)
}
return nil
}
// updateEdgePolicy attempts to update an edge's policy within the relevant
// buckets using an existing database transaction.
func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error {
// buckets using an existing database transaction. The returned boolean will be
// true if the updated policy belongs to node1, and false if the policy belonged
// to node2.
func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) (bool, error) {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrEdgeNotFound
return false, ErrEdgeNotFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrEdgeNotFound
return false, ErrEdgeNotFound
}
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
return false, err
}
// Create the channelID key be converting the channel ID
@ -1727,23 +1882,31 @@ func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error {
// nodes which connect this channel edge.
nodeInfo := edgeIndex.Get(chanID[:])
if nodeInfo == nil {
return ErrEdgeNotFound
return false, ErrEdgeNotFound
}
// Depending on the flags value passed above, either the first
// or second edge policy is being updated.
var fromNode, toNode []byte
var isUpdate1 bool
if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
fromNode = nodeInfo[:33]
toNode = nodeInfo[33:66]
isUpdate1 = true
} else {
fromNode = nodeInfo[33:66]
toNode = nodeInfo[:33]
isUpdate1 = false
}
// Finally, with the direction of the edge being updated
// identified, we update the on-disk edge representation.
return putChanEdgePolicy(edges, nodes, edge, fromNode, toNode)
err = putChanEdgePolicy(edges, nodes, edge, fromNode, toNode)
if err != nil {
return false, err
}
return isUpdate1, nil
}
// LightningNode represents an individual vertex/node within the channel graph.
@ -2885,7 +3048,10 @@ func (c *ChannelGraph) NewChannelEdgePolicy() *ChannelEdgePolicy {
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, pubKey1,
pubKey2 [33]byte) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.db.Update(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
@ -2896,6 +3062,14 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, pubKey1,
}
return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
})
if err != nil {
return err
}
c.rejectCache.remove(chanID)
c.chanCache.remove(chanID)
return nil
}
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
@ -2916,7 +3090,10 @@ func markEdgeZombie(zombieIndex *bbolt.Bucket, chanID uint64, pubKey1,
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.db.Update(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
@ -2930,6 +3107,14 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
byteOrder.PutUint64(k[:], chanID)
return zombieIndex.Delete(k[:])
})
if err != nil {
return err
}
c.rejectCache.remove(chanID)
c.chanCache.remove(chanID)
return nil
}
// IsZombieEdge returns whether the edge is considered zombie. If it is a

@ -242,9 +242,7 @@ func (d *DB) AddInvoice(newInvoice *Invoice, paymentHash lntypes.Hash) (
}
var invoiceAddIndex uint64
err := d.Batch(func(tx *bbolt.Tx) error {
invoiceAddIndex = 0
err := d.Update(func(tx *bbolt.Tx) error {
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
if err != nil {
return err
@ -637,9 +635,7 @@ func (d *DB) AcceptOrSettleInvoice(paymentHash [32]byte,
amtPaid lnwire.MilliSatoshi) (*Invoice, error) {
var settledInvoice *Invoice
err := d.Batch(func(tx *bbolt.Tx) error {
settledInvoice = nil
err := d.Update(func(tx *bbolt.Tx) error {
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
if err != nil {
return err
@ -718,9 +714,7 @@ func (d *DB) SettleHoldInvoice(preimage lntypes.Preimage) (*Invoice, error) {
// payment hash.
func (d *DB) CancelInvoice(paymentHash lntypes.Hash) (*Invoice, error) {
var canceledInvoice *Invoice
err := d.Batch(func(tx *bbolt.Tx) error {
canceledInvoice = nil
err := d.Update(func(tx *bbolt.Tx) error {
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
if err != nil {
return err

@ -564,7 +564,7 @@ func migratePruneEdgeUpdateIndex(tx *bbolt.Tx) error {
return err
}
err = updateEdgePolicy(tx, edgePolicy)
_, err = updateEdgePolicy(tx, edgePolicy)
if err != nil {
return err
}

49
channeldb/options.go Normal file

@ -0,0 +1,49 @@
package channeldb
const (
// DefaultRejectCacheSize is the default number of rejectCacheEntries to
// cache for use in the rejection cache of incoming gossip traffic. This
// produces a cache size of around 1MB.
DefaultRejectCacheSize = 50000
// DefaultChannelCacheSize is the default number of ChannelEdges cached
// in order to reply to gossip queries. This produces a cache size of
// around 40MB.
DefaultChannelCacheSize = 20000
)
// Options holds parameters for tuning and customizing a channeldb.DB.
type Options struct {
// RejectCacheSize is the maximum number of rejectCacheEntries to hold
// in the rejection cache.
RejectCacheSize int
// ChannelCacheSize is the maximum number of ChannelEdges to hold in the
// channel cache.
ChannelCacheSize int
}
// DefaultOptions returns an Options populated with default values.
func DefaultOptions() Options {
return Options{
RejectCacheSize: DefaultRejectCacheSize,
ChannelCacheSize: DefaultChannelCacheSize,
}
}
// OptionModifier is a function signature for modifying the default Options.
type OptionModifier func(*Options)
// OptionSetRejectCacheSize sets the RejectCacheSize to n.
func OptionSetRejectCacheSize(n int) OptionModifier {
return func(o *Options) {
o.RejectCacheSize = n
}
}
// OptionSetChannelCacheSize sets the ChannelCacheSize to n.
func OptionSetChannelCacheSize(n int) OptionModifier {
return func(o *Options) {
o.ChannelCacheSize = n
}
}

95
channeldb/reject_cache.go Normal file

@ -0,0 +1,95 @@
package channeldb
// rejectFlags is a compact representation of various metadata stored by the
// reject cache about a particular channel.
type rejectFlags uint8
const (
// rejectFlagExists is a flag indicating whether the channel exists,
// i.e. the channel is open and has a recent channel update. If this
// flag is not set, the channel is either a zombie or unknown.
rejectFlagExists rejectFlags = 1 << iota
// rejectFlagZombie is a flag indicating whether the channel is a
// zombie, i.e. the channel is open but has no recent channel updates.
rejectFlagZombie
)
// packRejectFlags computes the rejectFlags corresponding to the passed boolean
// values indicating whether the edge exists or is a zombie.
func packRejectFlags(exists, isZombie bool) rejectFlags {
var flags rejectFlags
if exists {
flags |= rejectFlagExists
}
if isZombie {
flags |= rejectFlagZombie
}
return flags
}
// unpack returns the booleans packed into the rejectFlags. The first indicates
// if the edge exists in our graph, the second indicates if the edge is a
// zombie.
func (f rejectFlags) unpack() (bool, bool) {
return f&rejectFlagExists == rejectFlagExists,
f&rejectFlagZombie == rejectFlagZombie
}
// rejectCacheEntry caches frequently accessed information about a channel,
// including the timestamps of its latest edge policies and whether or not the
// channel exists in the graph.
type rejectCacheEntry struct {
upd1Time int64
upd2Time int64
flags rejectFlags
}
// rejectCache is an in-memory cache used to improve the performance of
// HasChannelEdge. It caches information about the whether or channel exists, as
// well as the most recent timestamps for each policy (if they exists).
type rejectCache struct {
n int
edges map[uint64]rejectCacheEntry
}
// newRejectCache creates a new rejectCache with maximum capacity of n entries.
func newRejectCache(n int) *rejectCache {
return &rejectCache{
n: n,
edges: make(map[uint64]rejectCacheEntry, n),
}
}
// get returns the entry from the cache for chanid, if it exists.
func (c *rejectCache) get(chanid uint64) (rejectCacheEntry, bool) {
entry, ok := c.edges[chanid]
return entry, ok
}
// insert adds the entry to the reject cache. If an entry for chanid already
// exists, it will be replaced with the new entry. If the entry doesn't exists,
// it will be inserted to the cache, performing a random eviction if the cache
// is at capacity.
func (c *rejectCache) insert(chanid uint64, entry rejectCacheEntry) {
// If entry exists, replace it.
if _, ok := c.edges[chanid]; ok {
c.edges[chanid] = entry
return
}
// Otherwise, evict an entry at random and insert.
if len(c.edges) == c.n {
for id := range c.edges {
delete(c.edges, id)
break
}
}
c.edges[chanid] = entry
}
// remove deletes an entry for chanid from the cache, if it exists.
func (c *rejectCache) remove(chanid uint64) {
delete(c.edges, chanid)
}

@ -0,0 +1,107 @@
package channeldb
import (
"reflect"
"testing"
)
// TestRejectCache checks the behavior of the rejectCache with respect to insertion,
// eviction, and removal of cache entries.
func TestRejectCache(t *testing.T) {
const cacheSize = 100
// Create a new reject cache with the configured max size.
c := newRejectCache(cacheSize)
// As a sanity check, assert that querying the empty cache does not
// return an entry.
_, ok := c.get(0)
if ok {
t.Fatalf("reject cache should be empty")
}
// Now, fill up the cache entirely.
for i := uint64(0); i < cacheSize; i++ {
c.insert(i, entryForInt(i))
}
// Assert that the cache has all of the entries just inserted, since no
// eviction should occur until we try to surpass the max size.
assertHasEntries(t, c, 0, cacheSize)
// Now, insert a new element that causes the cache to evict an element.
c.insert(cacheSize, entryForInt(cacheSize))
// Assert that the cache has this last entry, as the cache should evict
// some prior element and not the newly inserted one.
assertHasEntries(t, c, cacheSize, cacheSize)
// Iterate over all inserted elements and construct a set of the evicted
// elements.
evicted := make(map[uint64]struct{})
for i := uint64(0); i < cacheSize+1; i++ {
_, ok := c.get(i)
if !ok {
evicted[i] = struct{}{}
}
}
// Assert that exactly one element has been evicted.
numEvicted := len(evicted)
if numEvicted != 1 {
t.Fatalf("expected one evicted entry, got: %d", numEvicted)
}
// Remove the highest item which initially caused the eviction and
// reinsert the element that was evicted prior.
c.remove(cacheSize)
for i := range evicted {
c.insert(i, entryForInt(i))
}
// Since the removal created an extra slot, the last insertion should
// not have caused an eviction and the entries for all channels in the
// original set that filled the cache should be present.
assertHasEntries(t, c, 0, cacheSize)
// Finally, reinsert the existing set back into the cache and test that
// the cache still has all the entries. If the randomized eviction were
// happening on inserts for existing cache items, we expect this to fail
// with high probability.
for i := uint64(0); i < cacheSize; i++ {
c.insert(i, entryForInt(i))
}
assertHasEntries(t, c, 0, cacheSize)
}
// assertHasEntries queries the reject cache for all channels in the range [start,
// end), asserting that they exist and their value matches the entry produced by
// entryForInt.
func assertHasEntries(t *testing.T, c *rejectCache, start, end uint64) {
t.Helper()
for i := start; i < end; i++ {
entry, ok := c.get(i)
if !ok {
t.Fatalf("reject cache should contain chan %d", i)
}
expEntry := entryForInt(i)
if !reflect.DeepEqual(entry, expEntry) {
t.Fatalf("entry mismatch, want: %v, got: %v",
expEntry, entry)
}
}
}
// entryForInt generates a unique rejectCacheEntry given an integer.
func entryForInt(i uint64) rejectCacheEntry {
exists := i%2 == 0
isZombie := i%3 == 0
return rejectCacheEntry{
upd1Time: int64(2 * i),
upd2Time: int64(2*i + 1),
flags: packRejectFlags(exists, isZombie),
}
}

@ -23,6 +23,7 @@ import (
flags "github.com/jessevdk/go-flags"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc/signrpc"
@ -256,6 +257,8 @@ type config struct {
Routing *routing.Conf `group:"routing" namespace:"routing"`
Workers *lncfg.Workers `group:"workers" namespace:"workers"`
Caches *lncfg.Caches `group:"caches" namespace:"caches"`
}
// loadConfig initializes and parses the config using a config file and command
@ -343,6 +346,10 @@ func loadConfig() (*config, error) {
Write: lncfg.DefaultWriteWorkers,
Sig: lncfg.DefaultSigWorkers,
},
Caches: &lncfg.Caches{
RejectCacheSize: channeldb.DefaultRejectCacheSize,
ChannelCacheSize: channeldb.DefaultChannelCacheSize,
},
}
// Pre-parse the command line options to pick up an alternative config
@ -985,9 +992,12 @@ func loadConfig() (*config, error) {
"minbackoff")
}
// Assert that all worker pools will have a positive number of
// workers, otherwise the pools will rendered useless.
if err := cfg.Workers.Validate(); err != nil {
// Validate the subconfigs for workers and caches.
err = lncfg.Validate(
cfg.Workers,
cfg.Caches,
)
if err != nil {
return nil, err
}

45
lncfg/caches.go Normal file

@ -0,0 +1,45 @@
package lncfg
import "fmt"
const (
// MinRejectCacheSize is a floor on the maximum capacity allowed for
// channeldb's reject cache. This amounts to roughly 125 KB when full.
MinRejectCacheSize = 5000
// MinChannelCacheSize is a floor on the maximum capacity allowed for
// channeldb's channel cache. This amounts to roughly 2 MB when full.
MinChannelCacheSize = 1000
)
// Caches holds the configuration for various caches within lnd.
type Caches struct {
// RejectCacheSize is the maximum number of entries stored in lnd's
// reject cache, which is used for efficiently rejecting gossip updates.
// Memory usage is roughly 25b per entry.
RejectCacheSize int `long:"reject-cache-size" description:"Maximum number of entries contained in the reject cache, which is used to speed up filtering of new channel announcements and channel updates from peers. Each entry requires 25 bytes."`
// ChannelCacheSize is the maximum number of entries stored in lnd's
// channel cache, which is used reduce memory allocations in reply to
// peers querying for gossip traffic. Memory usage is roughly 2Kb per
// entry.
ChannelCacheSize int `long:"channel-cache-size" description:"Maximum number of entries contained in the channel cache, which is used to reduce memory allocations from gossip queries from peers. Each entry requires roughly 2Kb."`
}
// Validate checks the Caches configuration for values that are too small to be
// sane.
func (c *Caches) Validate() error {
if c.RejectCacheSize < MinRejectCacheSize {
return fmt.Errorf("reject cache size %d is less than min: %d",
c.RejectCacheSize, MinRejectCacheSize)
}
if c.ChannelCacheSize < MinChannelCacheSize {
return fmt.Errorf("channel cache size %d is less than min: %d",
c.ChannelCacheSize, MinChannelCacheSize)
}
return nil
}
// Compile-time constraint to ensure Caches implements the Validator interface.
var _ Validator = (*Caches)(nil)

21
lncfg/interface.go Normal file

@ -0,0 +1,21 @@
package lncfg
// Validator is a generic interface for validating sub configurations.
type Validator interface {
// Validate returns an error if a particular configuration is invalid or
// insane.
Validate() error
}
// Validate accepts a variadic list of Validators and checks that each one
// passes its Validate method. An error is returned from the first Validator
// that fails.
func Validate(validators ...Validator) error {
for _, validator := range validators {
if err := validator.Validate(); err != nil {
return err
}
}
return nil
}

@ -47,3 +47,6 @@ func (w *Workers) Validate() error {
return nil
}
// Compile-time constraint to ensure Workers implements the Validator interface.
var _ Validator = (*Workers)(nil)

6
lnd.go

@ -161,7 +161,11 @@ func lndMain() error {
// Open the channeldb, which is dedicated to storing channel, and
// network related metadata.
chanDB, err := channeldb.Open(graphDir)
chanDB, err := channeldb.Open(
graphDir,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
)
if err != nil {
ltndLog.Errorf("unable to open channeldb: %v", err)
return err