diff --git a/channeldb/channel.go b/channeldb/channel.go index 7603132e..c3318461 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -1027,7 +1027,7 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error { return ErrNoRestoredChannelMutation } - err := c.Db.Batch(func(tx *bbolt.Tx) error { + err := c.Db.Update(func(tx *bbolt.Tx) error { chanBucket, err := fetchChanBucket( tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, ) @@ -1465,7 +1465,7 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error { return ErrNoRestoredChannelMutation } - return c.Db.Batch(func(tx *bbolt.Tx) error { + return c.Db.Update(func(tx *bbolt.Tx) error { // First, we'll grab the writable bucket where this channel's // data resides. chanBucket, err := fetchChanBucket( @@ -1608,9 +1608,7 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error { var newRemoteCommit *ChannelCommitment - err := c.Db.Batch(func(tx *bbolt.Tx) error { - newRemoteCommit = nil - + err := c.Db.Update(func(tx *bbolt.Tx) error { chanBucket, err := fetchChanBucket( tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, ) @@ -1748,7 +1746,7 @@ func (c *OpenChannel) AckAddHtlcs(addRefs ...AddRef) error { c.Lock() defer c.Unlock() - return c.Db.Batch(func(tx *bbolt.Tx) error { + return c.Db.Update(func(tx *bbolt.Tx) error { return c.Packager.AckAddHtlcs(tx, addRefs...) }) } @@ -1761,7 +1759,7 @@ func (c *OpenChannel) AckSettleFails(settleFailRefs ...SettleFailRef) error { c.Lock() defer c.Unlock() - return c.Db.Batch(func(tx *bbolt.Tx) error { + return c.Db.Update(func(tx *bbolt.Tx) error { return c.Packager.AckSettleFails(tx, settleFailRefs...) }) } @@ -1772,7 +1770,7 @@ func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error { c.Lock() defer c.Unlock() - return c.Db.Batch(func(tx *bbolt.Tx) error { + return c.Db.Update(func(tx *bbolt.Tx) error { return c.Packager.SetFwdFilter(tx, height, fwdFilter) }) } @@ -1785,15 +1783,14 @@ func (c *OpenChannel) RemoveFwdPkg(height uint64) error { c.Lock() defer c.Unlock() - return c.Db.Batch(func(tx *bbolt.Tx) error { + return c.Db.Update(func(tx *bbolt.Tx) error { return c.Packager.RemovePkg(tx, height) }) } // RevocationLogTail returns the "tail", or the end of the current revocation // log. This entry represents the last previous state for the remote node's -// commitment chain. The ChannelDelta returned by this method will always lag -// one state behind the most current (unrevoked) state of the remote node's +// commitment chain. The ChannelDelta returned by this method will always lag one state behind the most current (unrevoked) state of the remote node's // commitment chain. func (c *OpenChannel) RevocationLogTail() (*ChannelCommitment, error) { c.RLock() diff --git a/channeldb/channel_cache.go b/channeldb/channel_cache.go new file mode 100644 index 00000000..2f26c185 --- /dev/null +++ b/channeldb/channel_cache.go @@ -0,0 +1,50 @@ +package channeldb + +// channelCache is an in-memory cache used to improve the performance of +// ChanUpdatesInHorizon. It caches the chan info and edge policies for a +// particular channel. +type channelCache struct { + n int + channels map[uint64]ChannelEdge +} + +// newChannelCache creates a new channelCache with maximum capacity of n +// channels. +func newChannelCache(n int) *channelCache { + return &channelCache{ + n: n, + channels: make(map[uint64]ChannelEdge), + } +} + +// get returns the channel from the cache, if it exists. +func (c *channelCache) get(chanid uint64) (ChannelEdge, bool) { + channel, ok := c.channels[chanid] + return channel, ok +} + +// insert adds the entry to the channel cache. If an entry for chanid already +// exists, it will be replaced with the new entry. If the entry doesn't exist, +// it will be inserted to the cache, performing a random eviction if the cache +// is at capacity. +func (c *channelCache) insert(chanid uint64, channel ChannelEdge) { + // If entry exists, replace it. + if _, ok := c.channels[chanid]; ok { + c.channels[chanid] = channel + return + } + + // Otherwise, evict an entry at random and insert. + if len(c.channels) == c.n { + for id := range c.channels { + delete(c.channels, id) + break + } + } + c.channels[chanid] = channel +} + +// remove deletes an edge for chanid from the cache, if it exists. +func (c *channelCache) remove(chanid uint64) { + delete(c.channels, chanid) +} diff --git a/channeldb/channel_cache_test.go b/channeldb/channel_cache_test.go new file mode 100644 index 00000000..d776c131 --- /dev/null +++ b/channeldb/channel_cache_test.go @@ -0,0 +1,105 @@ +package channeldb + +import ( + "reflect" + "testing" +) + +// TestChannelCache checks the behavior of the channelCache with respect to +// insertion, eviction, and removal of cache entries. +func TestChannelCache(t *testing.T) { + const cacheSize = 100 + + // Create a new channel cache with the configured max size. + c := newChannelCache(cacheSize) + + // As a sanity check, assert that querying the empty cache does not + // return an entry. + _, ok := c.get(0) + if ok { + t.Fatalf("channel cache should be empty") + } + + // Now, fill up the cache entirely. + for i := uint64(0); i < cacheSize; i++ { + c.insert(i, channelForInt(i)) + } + + // Assert that the cache has all of the entries just inserted, since no + // eviction should occur until we try to surpass the max size. + assertHasChanEntries(t, c, 0, cacheSize) + + // Now, insert a new element that causes the cache to evict an element. + c.insert(cacheSize, channelForInt(cacheSize)) + + // Assert that the cache has this last entry, as the cache should evict + // some prior element and not the newly inserted one. + assertHasChanEntries(t, c, cacheSize, cacheSize) + + // Iterate over all inserted elements and construct a set of the evicted + // elements. + evicted := make(map[uint64]struct{}) + for i := uint64(0); i < cacheSize+1; i++ { + _, ok := c.get(i) + if !ok { + evicted[i] = struct{}{} + } + } + + // Assert that exactly one element has been evicted. + numEvicted := len(evicted) + if numEvicted != 1 { + t.Fatalf("expected one evicted entry, got: %d", numEvicted) + } + + // Remove the highest item which initially caused the eviction and + // reinsert the element that was evicted prior. + c.remove(cacheSize) + for i := range evicted { + c.insert(i, channelForInt(i)) + } + + // Since the removal created an extra slot, the last insertion should + // not have caused an eviction and the entries for all channels in the + // original set that filled the cache should be present. + assertHasChanEntries(t, c, 0, cacheSize) + + // Finally, reinsert the existing set back into the cache and test that + // the cache still has all the entries. If the randomized eviction were + // happening on inserts for existing cache items, we expect this to fail + // with high probability. + for i := uint64(0); i < cacheSize; i++ { + c.insert(i, channelForInt(i)) + } + assertHasChanEntries(t, c, 0, cacheSize) + +} + +// assertHasEntries queries the edge cache for all channels in the range [start, +// end), asserting that they exist and their value matches the entry produced by +// entryForInt. +func assertHasChanEntries(t *testing.T, c *channelCache, start, end uint64) { + t.Helper() + + for i := start; i < end; i++ { + entry, ok := c.get(i) + if !ok { + t.Fatalf("channel cache should contain chan %d", i) + } + + expEntry := channelForInt(i) + if !reflect.DeepEqual(entry, expEntry) { + t.Fatalf("entry mismatch, want: %v, got: %v", + expEntry, entry) + } + } +} + +// channelForInt generates a unique ChannelEdge given an integer. +func channelForInt(i uint64) ChannelEdge { + return ChannelEdge{ + Info: &ChannelEdgeInfo{ + ChannelID: i, + }, + } +} diff --git a/channeldb/db.go b/channeldb/db.go index d640cbdc..aecb75e4 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -7,7 +7,6 @@ import ( "net" "os" "path/filepath" - "sync" "time" "github.com/btcsuite/btcd/btcec" @@ -104,21 +103,18 @@ var ( byteOrder = binary.BigEndian ) -var bufPool = &sync.Pool{ - New: func() interface{} { return new(bytes.Buffer) }, -} - // DB is the primary datastore for the lnd daemon. The database stores // information related to nodes, routing data, open/closed channels, fee // schedules, and reputation data. type DB struct { *bbolt.DB dbPath string + graph *ChannelGraph } // Open opens an existing channeldb. Any necessary schemas migrations due to // updates will take place as necessary. -func Open(dbPath string) (*DB, error) { +func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) { path := filepath.Join(dbPath, dbName) if !fileExists(path) { @@ -127,6 +123,11 @@ func Open(dbPath string) (*DB, error) { } } + opts := DefaultOptions() + for _, modifier := range modifiers { + modifier(&opts) + } + bdb, err := bbolt.Open(path, dbFilePermission, nil) if err != nil { return nil, err @@ -136,6 +137,9 @@ func Open(dbPath string) (*DB, error) { DB: bdb, dbPath: dbPath, } + chanDB.graph = newChannelGraph( + chanDB, opts.RejectCacheSize, opts.ChannelCacheSize, + ) // Synchronize the version of database and apply migrations if needed. if err := chanDB.syncVersions(dbVersions); err != nil { @@ -900,9 +904,14 @@ type ChannelShell struct { // well. This method is idempotent, so repeated calls with the same set of // channel shells won't modify the database after the initial call. func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error { - chanGraph := ChannelGraph{d} + chanGraph := d.ChannelGraph() - return d.Update(func(tx *bbolt.Tx) error { + // TODO(conner): find way to do this w/o accessing internal members? + chanGraph.cacheMu.Lock() + defer chanGraph.cacheMu.Unlock() + + var chansRestored []uint64 + err := d.Update(func(tx *bbolt.Tx) error { for _, channelShell := range channelShells { channel := channelShell.Chan @@ -983,14 +992,26 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error { chanEdge.ChannelFlags |= lnwire.ChanUpdateDirection } - err = updateEdgePolicy(tx, &chanEdge) + _, err = updateEdgePolicy(tx, &chanEdge) if err != nil { return err } + + chansRestored = append(chansRestored, edgeInfo.ChannelID) } return nil }) + if err != nil { + return err + } + + for _, chanid := range chansRestored { + chanGraph.rejectCache.remove(chanid) + chanGraph.chanCache.remove(chanid) + } + + return nil } // AddrsForNode consults the graph and channel database for all addresses known @@ -1112,7 +1133,7 @@ func (d *DB) syncVersions(versions []version) error { // ChannelGraph returns a new instance of the directed channel graph. func (d *DB) ChannelGraph() *ChannelGraph { - return &ChannelGraph{d} + return d.graph } func getLatestDBVersion(versions []version) uint32 { diff --git a/channeldb/graph.go b/channeldb/graph.go index 11a875fc..25c59f5f 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -10,6 +10,7 @@ import ( "io" "math" "net" + "sync" "time" "github.com/btcsuite/btcd/btcec" @@ -157,9 +158,19 @@ const ( type ChannelGraph struct { db *DB - // TODO(roasbeef): store and update bloom filter to reduce disk access - // due to current gossip model - // * LRU cache for edges? + cacheMu sync.RWMutex + rejectCache *rejectCache + chanCache *channelCache +} + +// newChannelGraph allocates a new ChannelGraph backed by a DB instance. The +// returned instance has its own unique reject cache and channel cache. +func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph { + return &ChannelGraph{ + db: db, + rejectCache: newRejectCache(rejectCacheSize), + chanCache: newChannelCache(chanCacheSize), + } } // Database returns a pointer to the underlying database. @@ -334,7 +345,7 @@ func (c *ChannelGraph) sourceNode(nodes *bbolt.Bucket) (*LightningNode, error) { func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { nodePubBytes := node.PubKeyBytes[:] - return c.db.Batch(func(tx *bbolt.Tx) error { + return c.db.Update(func(tx *bbolt.Tx) error { // First grab the nodes bucket which stores the mapping from // pubKey to node information. nodes, err := tx.CreateBucketIfNotExists(nodeBucket) @@ -363,7 +374,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { // // TODO(roasbeef): also need sig of announcement func (c *ChannelGraph) AddLightningNode(node *LightningNode) error { - return c.db.Batch(func(tx *bbolt.Tx) error { + return c.db.Update(func(tx *bbolt.Tx) error { return addLightningNode(tx, node) }) } @@ -427,7 +438,7 @@ func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) { // from the database according to the node's public key. func (c *ChannelGraph) DeleteLightningNode(nodePub *btcec.PublicKey) error { // TODO(roasbeef): ensure dangling edges are removed... - return c.db.Batch(func(tx *bbolt.Tx) error { + return c.db.Update(func(tx *bbolt.Tx) error { nodes := tx.Bucket(nodeBucket) if nodes == nil { return ErrGraphNodeNotFound @@ -491,9 +502,20 @@ func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket, // the channel supports. The chanPoint and chanID are used to uniquely identify // the edge globally within the database. func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { - return c.db.Batch(func(tx *bbolt.Tx) error { + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + err := c.db.Update(func(tx *bbolt.Tx) error { return c.addChannelEdge(tx, edge) }) + if err != nil { + return err + } + + c.rejectCache.remove(edge.ChannelID) + c.chanCache.remove(edge.ChannelID) + + return nil } // addChannelEdge is the private form of AddChannelEdge that allows callers to @@ -598,17 +620,42 @@ func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error // was updated for both directed edges are returned along with the boolean. If // it is not found, then the zombie index is checked and its result is returned // as the second boolean. -func (c *ChannelGraph) HasChannelEdge(chanID uint64, -) (time.Time, time.Time, bool, bool, error) { +func (c *ChannelGraph) HasChannelEdge( + chanID uint64) (time.Time, time.Time, bool, bool, error) { var ( - node1UpdateTime time.Time - node2UpdateTime time.Time - exists bool - isZombie bool + upd1Time time.Time + upd2Time time.Time + exists bool + isZombie bool ) - err := c.db.View(func(tx *bbolt.Tx) error { + // We'll query the cache with the shared lock held to allow multiple + // readers to access values in the cache concurrently if they exist. + c.cacheMu.RLock() + if entry, ok := c.rejectCache.get(chanID); ok { + c.cacheMu.RUnlock() + upd1Time = time.Unix(entry.upd1Time, 0) + upd2Time = time.Unix(entry.upd2Time, 0) + exists, isZombie = entry.flags.unpack() + return upd1Time, upd2Time, exists, isZombie, nil + } + c.cacheMu.RUnlock() + + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + // The item was not found with the shared lock, so we'll acquire the + // exclusive lock and check the cache again in case another method added + // the entry to the cache while no lock was held. + if entry, ok := c.rejectCache.get(chanID); ok { + upd1Time = time.Unix(entry.upd1Time, 0) + upd2Time = time.Unix(entry.upd2Time, 0) + exists, isZombie = entry.flags.unpack() + return upd1Time, upd2Time, exists, isZombie, nil + } + + if err := c.db.View(func(tx *bbolt.Tx) error { edges := tx.Bucket(edgeBucket) if edges == nil { return ErrGraphNoEdgesFound @@ -627,7 +674,9 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64, exists = false zombieIndex := edges.Bucket(zombieBucket) if zombieIndex != nil { - isZombie, _, _ = isZombieEdge(zombieIndex, chanID) + isZombie, _, _ = isZombieEdge( + zombieIndex, chanID, + ) } return nil @@ -653,16 +702,24 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64, // As we may have only one of the edges populated, only set the // update time if the edge was found in the database. if e1 != nil { - node1UpdateTime = e1.LastUpdate + upd1Time = e1.LastUpdate } if e2 != nil { - node2UpdateTime = e2.LastUpdate + upd2Time = e2.LastUpdate } return nil + }); err != nil { + return time.Time{}, time.Time{}, exists, isZombie, err + } + + c.rejectCache.insert(chanID, rejectCacheEntry{ + upd1Time: upd1Time.Unix(), + upd2Time: upd2Time.Unix(), + flags: packRejectFlags(exists, isZombie), }) - return node1UpdateTime, node2UpdateTime, exists, isZombie, err + return upd1Time, upd2Time, exists, isZombie, nil } // UpdateChannelEdge retrieves and update edge of the graph database. Method @@ -675,7 +732,7 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error { var chanKey [8]byte binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID) - return c.db.Batch(func(tx *bbolt.Tx) error { + return c.db.Update(func(tx *bbolt.Tx) error { edges := tx.Bucket(edgeBucket) if edge == nil { return ErrEdgeNotFound @@ -713,11 +770,12 @@ const ( func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, blockHash *chainhash.Hash, blockHeight uint32) ([]*ChannelEdgeInfo, error) { + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + var chansClosed []*ChannelEdgeInfo - err := c.db.Batch(func(tx *bbolt.Tx) error { - chansClosed = nil - + err := c.db.Update(func(tx *bbolt.Tx) error { // First grab the edges bucket which houses the information // we'd like to delete edges, err := tx.CreateBucketIfNotExists(edgeBucket) @@ -818,6 +876,11 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, return nil, err } + for _, channel := range chansClosed { + c.rejectCache.remove(channel.ChannelID) + c.chanCache.remove(channel.ChannelID) + } + return chansClosed, nil } @@ -826,7 +889,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, // that we only maintain a graph of reachable nodes. In the event that a pruned // node gains more channels, it will be re-added back to the graph. func (c *ChannelGraph) PruneGraphNodes() error { - return c.db.Batch(func(tx *bbolt.Tx) error { + return c.db.Update(func(tx *bbolt.Tx) error { nodes := tx.Bucket(nodeBucket) if nodes == nil { return ErrGraphNodesNotFound @@ -968,12 +1031,13 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf var chanIDEnd [8]byte byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64()) + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + // Keep track of the channels that are removed from the graph. var removedChans []*ChannelEdgeInfo - if err := c.db.Batch(func(tx *bbolt.Tx) error { - removedChans = nil - + if err := c.db.Update(func(tx *bbolt.Tx) error { edges, err := tx.CreateBucketIfNotExists(edgeBucket) if err != nil { return err @@ -1048,6 +1112,11 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf return nil, err } + for _, channel := range removedChans { + c.rejectCache.remove(channel.ChannelID) + c.chanCache.remove(channel.ChannelID) + } + return removedChans, nil } @@ -1103,7 +1172,17 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { // channels // TODO(roasbeef): don't delete both edges? - return c.db.Batch(func(tx *bbolt.Tx) error { + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + var chanID uint64 + err := c.db.Update(func(tx *bbolt.Tx) error { + var err error + chanID, err = getChanID(tx, chanPoint) + if err != nil { + return err + } + // First grab the edges bucket which houses the information // we'd like to delete edges := tx.Bucket(edgeBucket) @@ -1135,6 +1214,14 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { chanPoint, true, ) }) + if err != nil { + return err + } + + c.rejectCache.remove(chanID) + c.chanCache.remove(chanID) + + return nil } // ChannelID attempt to lookup the 8-byte compact channel ID which maps to the @@ -1142,33 +1229,39 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { // the database, then ErrEdgeNotFound is returned. func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) { var chanID uint64 + if err := c.db.View(func(tx *bbolt.Tx) error { + var err error + chanID, err = getChanID(tx, chanPoint) + return err + }); err != nil { + return 0, err + } + return chanID, nil +} + +// getChanID returns the assigned channel ID for a given channel point. +func getChanID(tx *bbolt.Tx, chanPoint *wire.OutPoint) (uint64, error) { var b bytes.Buffer if err := writeOutpoint(&b, chanPoint); err != nil { return 0, nil } - if err := c.db.View(func(tx *bbolt.Tx) error { - edges := tx.Bucket(edgeBucket) - if edges == nil { - return ErrGraphNoEdgesFound - } - chanIndex := edges.Bucket(channelPointBucket) - if chanIndex == nil { - return ErrGraphNoEdgesFound - } - - chanIDBytes := chanIndex.Get(b.Bytes()) - if chanIDBytes == nil { - return ErrEdgeNotFound - } - - chanID = byteOrder.Uint64(chanIDBytes) - - return nil - }); err != nil { - return 0, err + edges := tx.Bucket(edgeBucket) + if edges == nil { + return 0, ErrGraphNoEdgesFound } + chanIndex := edges.Bucket(channelPointBucket) + if chanIndex == nil { + return 0, ErrGraphNoEdgesFound + } + + chanIDBytes := chanIndex.Get(b.Bytes()) + if chanIDBytes == nil { + return 0, ErrEdgeNotFound + } + + chanID := byteOrder.Uint64(chanIDBytes) return chanID, nil } @@ -1238,8 +1331,13 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha // additional map to keep track of the edges already seen to prevent // re-adding it. edgesSeen := make(map[uint64]struct{}) + edgesToCache := make(map[uint64]ChannelEdge) var edgesInHorizon []ChannelEdge + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + var hits int err := c.db.View(func(tx *bbolt.Tx) error { edges := tx.Bucket(edgeBucket) if edges == nil { @@ -1289,6 +1387,13 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha continue } + if channel, ok := c.chanCache.get(chanIDInt); ok { + hits++ + edgesSeen[chanIDInt] = struct{}{} + edgesInHorizon = append(edgesInHorizon, channel) + continue + } + // First, we'll fetch the static edge information. edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID) if err != nil { @@ -1313,11 +1418,13 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha // Finally, we'll collate this edge with the rest of // edges to be returned. edgesSeen[chanIDInt] = struct{}{} - edgesInHorizon = append(edgesInHorizon, ChannelEdge{ + channel := ChannelEdge{ Info: &edgeInfo, Policy1: edge1, Policy2: edge2, - }) + } + edgesInHorizon = append(edgesInHorizon, channel) + edgesToCache[chanIDInt] = channel } return nil @@ -1332,6 +1439,15 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha return nil, err } + // Insert any edges loaded from disk into the cache. + for chanid, channel := range edgesToCache { + c.chanCache.insert(chanid, channel) + } + + log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)", + float64(hits)/float64(len(edgesInHorizon)), hits, + len(edgesInHorizon)) + return edgesInHorizon, nil } @@ -1696,26 +1812,65 @@ func delChannelByEdge(edges, edgeIndex, chanIndex, zombieIndex, // determined by the lexicographical ordering of the identity public keys of // the nodes on either side of the channel. func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { - return c.db.Batch(func(tx *bbolt.Tx) error { - return updateEdgePolicy(tx, edge) + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + var isUpdate1 bool + err := c.db.Update(func(tx *bbolt.Tx) error { + var err error + isUpdate1, err = updateEdgePolicy(tx, edge) + return err }) + if err != nil { + return err + } + + // If an entry for this channel is found in reject cache, we'll modify + // the entry with the updated timestamp for the direction that was just + // written. If the edge doesn't exist, we'll load the cache entry lazily + // during the next query for this edge. + if entry, ok := c.rejectCache.get(edge.ChannelID); ok { + if isUpdate1 { + entry.upd1Time = edge.LastUpdate.Unix() + } else { + entry.upd2Time = edge.LastUpdate.Unix() + } + c.rejectCache.insert(edge.ChannelID, entry) + } + + // If an entry for this channel is found in channel cache, we'll modify + // the entry with the updated policy for the direction that was just + // written. If the edge doesn't exist, we'll defer loading the info and + // policies and lazily read from disk during the next query. + if channel, ok := c.chanCache.get(edge.ChannelID); ok { + if isUpdate1 { + channel.Policy1 = edge + } else { + channel.Policy2 = edge + } + c.chanCache.insert(edge.ChannelID, channel) + } + + return nil } // updateEdgePolicy attempts to update an edge's policy within the relevant -// buckets using an existing database transaction. -func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error { +// buckets using an existing database transaction. The returned boolean will be +// true if the updated policy belongs to node1, and false if the policy belonged +// to node2. +func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) (bool, error) { edges := tx.Bucket(edgeBucket) if edges == nil { - return ErrEdgeNotFound + return false, ErrEdgeNotFound } edgeIndex := edges.Bucket(edgeIndexBucket) if edgeIndex == nil { - return ErrEdgeNotFound + return false, ErrEdgeNotFound } nodes, err := tx.CreateBucketIfNotExists(nodeBucket) if err != nil { - return err + return false, err } // Create the channelID key be converting the channel ID @@ -1727,23 +1882,31 @@ func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error { // nodes which connect this channel edge. nodeInfo := edgeIndex.Get(chanID[:]) if nodeInfo == nil { - return ErrEdgeNotFound + return false, ErrEdgeNotFound } // Depending on the flags value passed above, either the first // or second edge policy is being updated. var fromNode, toNode []byte + var isUpdate1 bool if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 { fromNode = nodeInfo[:33] toNode = nodeInfo[33:66] + isUpdate1 = true } else { fromNode = nodeInfo[33:66] toNode = nodeInfo[:33] + isUpdate1 = false } // Finally, with the direction of the edge being updated // identified, we update the on-disk edge representation. - return putChanEdgePolicy(edges, nodes, edge, fromNode, toNode) + err = putChanEdgePolicy(edges, nodes, edge, fromNode, toNode) + if err != nil { + return false, err + } + + return isUpdate1, nil } // LightningNode represents an individual vertex/node within the channel graph. @@ -2885,7 +3048,10 @@ func (c *ChannelGraph) NewChannelEdgePolicy() *ChannelEdgePolicy { func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, pubKey1, pubKey2 [33]byte) error { - return c.db.Batch(func(tx *bbolt.Tx) error { + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + err := c.db.Update(func(tx *bbolt.Tx) error { edges := tx.Bucket(edgeBucket) if edges == nil { return ErrGraphNoEdgesFound @@ -2896,6 +3062,14 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, pubKey1, } return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2) }) + if err != nil { + return err + } + + c.rejectCache.remove(chanID) + c.chanCache.remove(chanID) + + return nil } // markEdgeZombie marks an edge as a zombie within our zombie index. The public @@ -2916,7 +3090,10 @@ func markEdgeZombie(zombieIndex *bbolt.Bucket, chanID uint64, pubKey1, // MarkEdgeLive clears an edge from our zombie index, deeming it as live. func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error { - return c.db.Batch(func(tx *bbolt.Tx) error { + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + err := c.db.Update(func(tx *bbolt.Tx) error { edges := tx.Bucket(edgeBucket) if edges == nil { return ErrGraphNoEdgesFound @@ -2930,6 +3107,14 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error { byteOrder.PutUint64(k[:], chanID) return zombieIndex.Delete(k[:]) }) + if err != nil { + return err + } + + c.rejectCache.remove(chanID) + c.chanCache.remove(chanID) + + return nil } // IsZombieEdge returns whether the edge is considered zombie. If it is a diff --git a/channeldb/invoices.go b/channeldb/invoices.go index 5e3c6202..47a8e199 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -242,9 +242,7 @@ func (d *DB) AddInvoice(newInvoice *Invoice, paymentHash lntypes.Hash) ( } var invoiceAddIndex uint64 - err := d.Batch(func(tx *bbolt.Tx) error { - invoiceAddIndex = 0 - + err := d.Update(func(tx *bbolt.Tx) error { invoices, err := tx.CreateBucketIfNotExists(invoiceBucket) if err != nil { return err @@ -637,9 +635,7 @@ func (d *DB) AcceptOrSettleInvoice(paymentHash [32]byte, amtPaid lnwire.MilliSatoshi) (*Invoice, error) { var settledInvoice *Invoice - err := d.Batch(func(tx *bbolt.Tx) error { - settledInvoice = nil - + err := d.Update(func(tx *bbolt.Tx) error { invoices, err := tx.CreateBucketIfNotExists(invoiceBucket) if err != nil { return err @@ -718,9 +714,7 @@ func (d *DB) SettleHoldInvoice(preimage lntypes.Preimage) (*Invoice, error) { // payment hash. func (d *DB) CancelInvoice(paymentHash lntypes.Hash) (*Invoice, error) { var canceledInvoice *Invoice - err := d.Batch(func(tx *bbolt.Tx) error { - canceledInvoice = nil - + err := d.Update(func(tx *bbolt.Tx) error { invoices, err := tx.CreateBucketIfNotExists(invoiceBucket) if err != nil { return err diff --git a/channeldb/migrations.go b/channeldb/migrations.go index f86e416b..72ba7882 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -564,7 +564,7 @@ func migratePruneEdgeUpdateIndex(tx *bbolt.Tx) error { return err } - err = updateEdgePolicy(tx, edgePolicy) + _, err = updateEdgePolicy(tx, edgePolicy) if err != nil { return err } diff --git a/channeldb/options.go b/channeldb/options.go new file mode 100644 index 00000000..a96ebfa9 --- /dev/null +++ b/channeldb/options.go @@ -0,0 +1,49 @@ +package channeldb + +const ( + // DefaultRejectCacheSize is the default number of rejectCacheEntries to + // cache for use in the rejection cache of incoming gossip traffic. This + // produces a cache size of around 1MB. + DefaultRejectCacheSize = 50000 + + // DefaultChannelCacheSize is the default number of ChannelEdges cached + // in order to reply to gossip queries. This produces a cache size of + // around 40MB. + DefaultChannelCacheSize = 20000 +) + +// Options holds parameters for tuning and customizing a channeldb.DB. +type Options struct { + // RejectCacheSize is the maximum number of rejectCacheEntries to hold + // in the rejection cache. + RejectCacheSize int + + // ChannelCacheSize is the maximum number of ChannelEdges to hold in the + // channel cache. + ChannelCacheSize int +} + +// DefaultOptions returns an Options populated with default values. +func DefaultOptions() Options { + return Options{ + RejectCacheSize: DefaultRejectCacheSize, + ChannelCacheSize: DefaultChannelCacheSize, + } +} + +// OptionModifier is a function signature for modifying the default Options. +type OptionModifier func(*Options) + +// OptionSetRejectCacheSize sets the RejectCacheSize to n. +func OptionSetRejectCacheSize(n int) OptionModifier { + return func(o *Options) { + o.RejectCacheSize = n + } +} + +// OptionSetChannelCacheSize sets the ChannelCacheSize to n. +func OptionSetChannelCacheSize(n int) OptionModifier { + return func(o *Options) { + o.ChannelCacheSize = n + } +} diff --git a/channeldb/reject_cache.go b/channeldb/reject_cache.go new file mode 100644 index 00000000..acadb878 --- /dev/null +++ b/channeldb/reject_cache.go @@ -0,0 +1,95 @@ +package channeldb + +// rejectFlags is a compact representation of various metadata stored by the +// reject cache about a particular channel. +type rejectFlags uint8 + +const ( + // rejectFlagExists is a flag indicating whether the channel exists, + // i.e. the channel is open and has a recent channel update. If this + // flag is not set, the channel is either a zombie or unknown. + rejectFlagExists rejectFlags = 1 << iota + + // rejectFlagZombie is a flag indicating whether the channel is a + // zombie, i.e. the channel is open but has no recent channel updates. + rejectFlagZombie +) + +// packRejectFlags computes the rejectFlags corresponding to the passed boolean +// values indicating whether the edge exists or is a zombie. +func packRejectFlags(exists, isZombie bool) rejectFlags { + var flags rejectFlags + if exists { + flags |= rejectFlagExists + } + if isZombie { + flags |= rejectFlagZombie + } + + return flags +} + +// unpack returns the booleans packed into the rejectFlags. The first indicates +// if the edge exists in our graph, the second indicates if the edge is a +// zombie. +func (f rejectFlags) unpack() (bool, bool) { + return f&rejectFlagExists == rejectFlagExists, + f&rejectFlagZombie == rejectFlagZombie +} + +// rejectCacheEntry caches frequently accessed information about a channel, +// including the timestamps of its latest edge policies and whether or not the +// channel exists in the graph. +type rejectCacheEntry struct { + upd1Time int64 + upd2Time int64 + flags rejectFlags +} + +// rejectCache is an in-memory cache used to improve the performance of +// HasChannelEdge. It caches information about the whether or channel exists, as +// well as the most recent timestamps for each policy (if they exists). +type rejectCache struct { + n int + edges map[uint64]rejectCacheEntry +} + +// newRejectCache creates a new rejectCache with maximum capacity of n entries. +func newRejectCache(n int) *rejectCache { + return &rejectCache{ + n: n, + edges: make(map[uint64]rejectCacheEntry, n), + } +} + +// get returns the entry from the cache for chanid, if it exists. +func (c *rejectCache) get(chanid uint64) (rejectCacheEntry, bool) { + entry, ok := c.edges[chanid] + return entry, ok +} + +// insert adds the entry to the reject cache. If an entry for chanid already +// exists, it will be replaced with the new entry. If the entry doesn't exists, +// it will be inserted to the cache, performing a random eviction if the cache +// is at capacity. +func (c *rejectCache) insert(chanid uint64, entry rejectCacheEntry) { + // If entry exists, replace it. + if _, ok := c.edges[chanid]; ok { + c.edges[chanid] = entry + return + } + + // Otherwise, evict an entry at random and insert. + if len(c.edges) == c.n { + for id := range c.edges { + delete(c.edges, id) + break + } + } + c.edges[chanid] = entry +} + +// remove deletes an entry for chanid from the cache, if it exists. +func (c *rejectCache) remove(chanid uint64) { + delete(c.edges, chanid) +} diff --git a/channeldb/reject_cache_test.go b/channeldb/reject_cache_test.go new file mode 100644 index 00000000..6974f425 --- /dev/null +++ b/channeldb/reject_cache_test.go @@ -0,0 +1,107 @@ +package channeldb + +import ( + "reflect" + "testing" +) + +// TestRejectCache checks the behavior of the rejectCache with respect to insertion, +// eviction, and removal of cache entries. +func TestRejectCache(t *testing.T) { + const cacheSize = 100 + + // Create a new reject cache with the configured max size. + c := newRejectCache(cacheSize) + + // As a sanity check, assert that querying the empty cache does not + // return an entry. + _, ok := c.get(0) + if ok { + t.Fatalf("reject cache should be empty") + } + + // Now, fill up the cache entirely. + for i := uint64(0); i < cacheSize; i++ { + c.insert(i, entryForInt(i)) + } + + // Assert that the cache has all of the entries just inserted, since no + // eviction should occur until we try to surpass the max size. + assertHasEntries(t, c, 0, cacheSize) + + // Now, insert a new element that causes the cache to evict an element. + c.insert(cacheSize, entryForInt(cacheSize)) + + // Assert that the cache has this last entry, as the cache should evict + // some prior element and not the newly inserted one. + assertHasEntries(t, c, cacheSize, cacheSize) + + // Iterate over all inserted elements and construct a set of the evicted + // elements. + evicted := make(map[uint64]struct{}) + for i := uint64(0); i < cacheSize+1; i++ { + _, ok := c.get(i) + if !ok { + evicted[i] = struct{}{} + } + } + + // Assert that exactly one element has been evicted. + numEvicted := len(evicted) + if numEvicted != 1 { + t.Fatalf("expected one evicted entry, got: %d", numEvicted) + } + + // Remove the highest item which initially caused the eviction and + // reinsert the element that was evicted prior. + c.remove(cacheSize) + for i := range evicted { + c.insert(i, entryForInt(i)) + } + + // Since the removal created an extra slot, the last insertion should + // not have caused an eviction and the entries for all channels in the + // original set that filled the cache should be present. + assertHasEntries(t, c, 0, cacheSize) + + // Finally, reinsert the existing set back into the cache and test that + // the cache still has all the entries. If the randomized eviction were + // happening on inserts for existing cache items, we expect this to fail + // with high probability. + for i := uint64(0); i < cacheSize; i++ { + c.insert(i, entryForInt(i)) + } + assertHasEntries(t, c, 0, cacheSize) + +} + +// assertHasEntries queries the reject cache for all channels in the range [start, +// end), asserting that they exist and their value matches the entry produced by +// entryForInt. +func assertHasEntries(t *testing.T, c *rejectCache, start, end uint64) { + t.Helper() + + for i := start; i < end; i++ { + entry, ok := c.get(i) + if !ok { + t.Fatalf("reject cache should contain chan %d", i) + } + + expEntry := entryForInt(i) + if !reflect.DeepEqual(entry, expEntry) { + t.Fatalf("entry mismatch, want: %v, got: %v", + expEntry, entry) + } + } +} + +// entryForInt generates a unique rejectCacheEntry given an integer. +func entryForInt(i uint64) rejectCacheEntry { + exists := i%2 == 0 + isZombie := i%3 == 0 + return rejectCacheEntry{ + upd1Time: int64(2 * i), + upd2Time: int64(2*i + 1), + flags: packRejectFlags(exists, isZombie), + } +} diff --git a/config.go b/config.go index 2cb6da72..8589cec5 100644 --- a/config.go +++ b/config.go @@ -23,6 +23,7 @@ import ( flags "github.com/jessevdk/go-flags" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chanbackup" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc/signrpc" @@ -256,6 +257,8 @@ type config struct { Routing *routing.Conf `group:"routing" namespace:"routing"` Workers *lncfg.Workers `group:"workers" namespace:"workers"` + + Caches *lncfg.Caches `group:"caches" namespace:"caches"` } // loadConfig initializes and parses the config using a config file and command @@ -343,6 +346,10 @@ func loadConfig() (*config, error) { Write: lncfg.DefaultWriteWorkers, Sig: lncfg.DefaultSigWorkers, }, + Caches: &lncfg.Caches{ + RejectCacheSize: channeldb.DefaultRejectCacheSize, + ChannelCacheSize: channeldb.DefaultChannelCacheSize, + }, } // Pre-parse the command line options to pick up an alternative config @@ -985,9 +992,12 @@ func loadConfig() (*config, error) { "minbackoff") } - // Assert that all worker pools will have a positive number of - // workers, otherwise the pools will rendered useless. - if err := cfg.Workers.Validate(); err != nil { + // Validate the subconfigs for workers and caches. + err = lncfg.Validate( + cfg.Workers, + cfg.Caches, + ) + if err != nil { return nil, err } diff --git a/lncfg/caches.go b/lncfg/caches.go new file mode 100644 index 00000000..47b9a97e --- /dev/null +++ b/lncfg/caches.go @@ -0,0 +1,45 @@ +package lncfg + +import "fmt" + +const ( + // MinRejectCacheSize is a floor on the maximum capacity allowed for + // channeldb's reject cache. This amounts to roughly 125 KB when full. + MinRejectCacheSize = 5000 + + // MinChannelCacheSize is a floor on the maximum capacity allowed for + // channeldb's channel cache. This amounts to roughly 2 MB when full. + MinChannelCacheSize = 1000 +) + +// Caches holds the configuration for various caches within lnd. +type Caches struct { + // RejectCacheSize is the maximum number of entries stored in lnd's + // reject cache, which is used for efficiently rejecting gossip updates. + // Memory usage is roughly 25b per entry. + RejectCacheSize int `long:"reject-cache-size" description:"Maximum number of entries contained in the reject cache, which is used to speed up filtering of new channel announcements and channel updates from peers. Each entry requires 25 bytes."` + + // ChannelCacheSize is the maximum number of entries stored in lnd's + // channel cache, which is used reduce memory allocations in reply to + // peers querying for gossip traffic. Memory usage is roughly 2Kb per + // entry. + ChannelCacheSize int `long:"channel-cache-size" description:"Maximum number of entries contained in the channel cache, which is used to reduce memory allocations from gossip queries from peers. Each entry requires roughly 2Kb."` +} + +// Validate checks the Caches configuration for values that are too small to be +// sane. +func (c *Caches) Validate() error { + if c.RejectCacheSize < MinRejectCacheSize { + return fmt.Errorf("reject cache size %d is less than min: %d", + c.RejectCacheSize, MinRejectCacheSize) + } + if c.ChannelCacheSize < MinChannelCacheSize { + return fmt.Errorf("channel cache size %d is less than min: %d", + c.ChannelCacheSize, MinChannelCacheSize) + } + + return nil +} + +// Compile-time constraint to ensure Caches implements the Validator interface. +var _ Validator = (*Caches)(nil) diff --git a/lncfg/interface.go b/lncfg/interface.go new file mode 100644 index 00000000..78de294d --- /dev/null +++ b/lncfg/interface.go @@ -0,0 +1,21 @@ +package lncfg + +// Validator is a generic interface for validating sub configurations. +type Validator interface { + // Validate returns an error if a particular configuration is invalid or + // insane. + Validate() error +} + +// Validate accepts a variadic list of Validators and checks that each one +// passes its Validate method. An error is returned from the first Validator +// that fails. +func Validate(validators ...Validator) error { + for _, validator := range validators { + if err := validator.Validate(); err != nil { + return err + } + } + + return nil +} diff --git a/lncfg/workers.go b/lncfg/workers.go index 6bbacc5f..bbd9960b 100644 --- a/lncfg/workers.go +++ b/lncfg/workers.go @@ -47,3 +47,6 @@ func (w *Workers) Validate() error { return nil } + +// Compile-time constraint to ensure Workers implements the Validator interface. +var _ Validator = (*Workers)(nil) diff --git a/lnd.go b/lnd.go index c79ca924..c6d77a0f 100644 --- a/lnd.go +++ b/lnd.go @@ -161,7 +161,11 @@ func lndMain() error { // Open the channeldb, which is dedicated to storing channel, and // network related metadata. - chanDB, err := channeldb.Open(graphDir) + chanDB, err := channeldb.Open( + graphDir, + channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), + channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), + ) if err != nil { ltndLog.Errorf("unable to open channeldb: %v", err) return err