multi: rename ReadTx to RTx
This commit is contained in:
parent
d32c7a4814
commit
d0d2ca403d
@ -51,7 +51,7 @@ func ChannelGraphFromDatabase(db *channeldb.ChannelGraph) ChannelGraph {
|
||||
// channeldb.LightningNode. The wrapper method implement the autopilot.Node
|
||||
// interface.
|
||||
type dbNode struct {
|
||||
tx kvdb.ReadTx
|
||||
tx kvdb.RTx
|
||||
|
||||
node *channeldb.LightningNode
|
||||
}
|
||||
@ -84,7 +84,7 @@ func (d dbNode) Addrs() []net.Addr {
|
||||
//
|
||||
// NOTE: Part of the autopilot.Node interface.
|
||||
func (d dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
|
||||
return d.node.ForEachChannel(d.tx, func(tx kvdb.ReadTx,
|
||||
return d.node.ForEachChannel(d.tx, func(tx kvdb.RTx,
|
||||
ei *channeldb.ChannelEdgeInfo, ep, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
// Skip channels for which no outgoing edge policy is available.
|
||||
@ -121,8 +121,7 @@ func (d dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
|
||||
//
|
||||
// NOTE: Part of the autopilot.ChannelGraph interface.
|
||||
func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error {
|
||||
return d.db.ForEachNode(func(tx kvdb.ReadTx, n *channeldb.LightningNode) error {
|
||||
|
||||
return d.db.ForEachNode(func(tx kvdb.RTx, n *channeldb.LightningNode) error {
|
||||
// We'll skip over any node that doesn't have any advertised
|
||||
// addresses. As we won't be able to reach them to actually
|
||||
// open any channels.
|
||||
|
@ -1291,7 +1291,7 @@ func (rs *retributionStore) GetFinalizedTxn(
|
||||
chanPoint *wire.OutPoint) (*wire.MsgTx, error) {
|
||||
|
||||
var finalTxBytes []byte
|
||||
if err := kvdb.View(rs.db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(rs.db, func(tx kvdb.RTx) error {
|
||||
justiceBkt := tx.ReadBucket(justiceTxnBucket)
|
||||
if justiceBkt == nil {
|
||||
return nil
|
||||
@ -1325,7 +1325,7 @@ func (rs *retributionStore) GetFinalizedTxn(
|
||||
// that has already been breached.
|
||||
func (rs *retributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
|
||||
var found bool
|
||||
err := kvdb.View(rs.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(rs.db, func(tx kvdb.RTx) error {
|
||||
retBucket := tx.ReadBucket(retributionBucket)
|
||||
if retBucket == nil {
|
||||
return nil
|
||||
@ -1389,7 +1389,7 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error {
|
||||
// ForAll iterates through all stored retributions and executes the passed
|
||||
// callback function on each retribution.
|
||||
func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error {
|
||||
return kvdb.View(rs.db, func(tx kvdb.ReadTx) error {
|
||||
return kvdb.View(rs.db, func(tx kvdb.RTx) error {
|
||||
// If the bucket does not exist, then there are no pending
|
||||
// retributions.
|
||||
retBucket := tx.ReadBucket(retributionBucket)
|
||||
|
@ -148,7 +148,7 @@ func (c *HeightHintCache) CommitSpendHint(height uint32,
|
||||
// cache for the outpoint.
|
||||
func (c *HeightHintCache) QuerySpendHint(spendRequest SpendRequest) (uint32, error) {
|
||||
var hint uint32
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
spendHints := tx.ReadBucket(spendHintBucket)
|
||||
if spendHints == nil {
|
||||
return ErrCorruptedHeightHintCache
|
||||
@ -242,7 +242,7 @@ func (c *HeightHintCache) CommitConfirmHint(height uint32,
|
||||
// the cache for the transaction hash.
|
||||
func (c *HeightHintCache) QueryConfirmHint(confRequest ConfRequest) (uint32, error) {
|
||||
var hint uint32
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
confirmHints := tx.ReadBucket(confirmHintBucket)
|
||||
if confirmHints == nil {
|
||||
return ErrCorruptedHeightHintCache
|
||||
|
@ -729,7 +729,7 @@ func (c *OpenChannel) RefreshShortChanID() error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
@ -755,7 +755,7 @@ func (c *OpenChannel) RefreshShortChanID() error {
|
||||
// fetchChanBucket is a helper function that returns the bucket where a
|
||||
// channel's data resides in given: the public key for the node, the outpoint,
|
||||
// and the chainhash that the channel resides on.
|
||||
func fetchChanBucket(tx kvdb.ReadTx, nodeKey *btcec.PublicKey,
|
||||
func fetchChanBucket(tx kvdb.RTx, nodeKey *btcec.PublicKey,
|
||||
outPoint *wire.OutPoint, chainHash chainhash.Hash) (kvdb.ReadBucket, error) {
|
||||
|
||||
// First fetch the top level bucket which stores all data related to
|
||||
@ -916,7 +916,7 @@ func (c *OpenChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error {
|
||||
func (c *OpenChannel) DataLossCommitPoint() (*btcec.PublicKey, error) {
|
||||
var commitPoint *btcec.PublicKey
|
||||
|
||||
err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
@ -1138,7 +1138,7 @@ func (c *OpenChannel) BroadcastedCooperative() (*wire.MsgTx, error) {
|
||||
func (c *OpenChannel) getClosingTx(key []byte) (*wire.MsgTx, error) {
|
||||
var closeTx *wire.MsgTx
|
||||
|
||||
err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
@ -2000,7 +2000,7 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error {
|
||||
// these pointers, causing the tip and the tail to point to the same entry.
|
||||
func (c *OpenChannel) RemoteCommitChainTip() (*CommitDiff, error) {
|
||||
var cd *CommitDiff
|
||||
err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
@ -2037,7 +2037,7 @@ func (c *OpenChannel) RemoteCommitChainTip() (*CommitDiff, error) {
|
||||
// updates that still need to be signed for.
|
||||
func (c *OpenChannel) UnsignedAckedUpdates() ([]LogUpdate, error) {
|
||||
var updates []LogUpdate
|
||||
err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
@ -2235,7 +2235,7 @@ func (c *OpenChannel) LoadFwdPkgs() ([]*FwdPkg, error) {
|
||||
defer c.RUnlock()
|
||||
|
||||
var fwdPkgs []*FwdPkg
|
||||
if err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
fwdPkgs, err = c.Packager.LoadFwdPkgs(tx)
|
||||
return err
|
||||
@ -2311,7 +2311,7 @@ func (c *OpenChannel) RevocationLogTail() (*ChannelCommitment, error) {
|
||||
}
|
||||
|
||||
var commit ChannelCommitment
|
||||
if err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
@ -2358,7 +2358,7 @@ func (c *OpenChannel) CommitmentHeight() (uint64, error) {
|
||||
defer c.RUnlock()
|
||||
|
||||
var height uint64
|
||||
err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
// Get the bucket dedicated to storing the metadata for open
|
||||
// channels.
|
||||
chanBucket, err := fetchChanBucket(
|
||||
@ -2393,7 +2393,7 @@ func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelCommitment, e
|
||||
defer c.RUnlock()
|
||||
|
||||
var commit ChannelCommitment
|
||||
err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
@ -2727,7 +2727,7 @@ func (c *OpenChannel) Snapshot() *ChannelSnapshot {
|
||||
// latest fully committed state is returned. The first commitment returned is
|
||||
// the local commitment, and the second returned is the remote commitment.
|
||||
func (c *OpenChannel) LatestCommitments() (*ChannelCommitment, *ChannelCommitment, error) {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
@ -2749,7 +2749,7 @@ func (c *OpenChannel) LatestCommitments() (*ChannelCommitment, *ChannelCommitmen
|
||||
// acting on a possible contract breach to ensure, that the caller has the most
|
||||
// up to date information required to deliver justice.
|
||||
func (c *OpenChannel) RemoteRevocationStore() (shachain.Store, error) {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
|
@ -394,7 +394,7 @@ func fileExists(path string) bool {
|
||||
// zero-length slice is returned.
|
||||
func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
|
||||
var channels []*OpenChannel
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
channels, err = d.fetchOpenChannels(tx, nodeID)
|
||||
return err
|
||||
@ -407,7 +407,7 @@ func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error)
|
||||
// stored currently active/open channels associated with the target nodeID. In
|
||||
// the case that no active channels are known to have been created with this
|
||||
// node, then a zero-length slice is returned.
|
||||
func (db *DB) fetchOpenChannels(tx kvdb.ReadTx,
|
||||
func (db *DB) fetchOpenChannels(tx kvdb.RTx,
|
||||
nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
|
||||
|
||||
// Get the bucket dedicated to storing the metadata for open channels.
|
||||
@ -519,7 +519,7 @@ func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) {
|
||||
// structure and skipping fully decoding each channel, we save a good
|
||||
// bit of CPU as we don't need to do things like decompress public
|
||||
// keys.
|
||||
chanScan := func(tx kvdb.ReadTx) error {
|
||||
chanScan := func(tx kvdb.RTx) error {
|
||||
// Get the bucket dedicated to storing the metadata for open
|
||||
// channels.
|
||||
openChanBucket := tx.ReadBucket(openChannelBucket)
|
||||
@ -675,7 +675,7 @@ func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
|
||||
func fetchChannels(d *DB, filters ...fetchChannelsFilter) ([]*OpenChannel, error) {
|
||||
var channels []*OpenChannel
|
||||
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
// Get the bucket dedicated to storing the metadata for open
|
||||
// channels.
|
||||
openChanBucket := tx.ReadBucket(openChannelBucket)
|
||||
@ -767,7 +767,7 @@ func fetchChannels(d *DB, filters ...fetchChannelsFilter) ([]*OpenChannel, error
|
||||
func (d *DB) FetchClosedChannels(pendingOnly bool) ([]*ChannelCloseSummary, error) {
|
||||
var chanSummaries []*ChannelCloseSummary
|
||||
|
||||
if err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
closeBucket := tx.ReadBucket(closedChannelBucket)
|
||||
if closeBucket == nil {
|
||||
return ErrNoClosedChannels
|
||||
@ -805,7 +805,7 @@ var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary
|
||||
// point of the channel in question.
|
||||
func (d *DB) FetchClosedChannel(chanID *wire.OutPoint) (*ChannelCloseSummary, error) {
|
||||
var chanSummary *ChannelCloseSummary
|
||||
if err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
closeBucket := tx.ReadBucket(closedChannelBucket)
|
||||
if closeBucket == nil {
|
||||
return ErrClosedChannelNotFound
|
||||
@ -839,7 +839,7 @@ func (d *DB) FetchClosedChannelForID(cid lnwire.ChannelID) (
|
||||
*ChannelCloseSummary, error) {
|
||||
|
||||
var chanSummary *ChannelCloseSummary
|
||||
if err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
closeBucket := tx.ReadBucket(closedChannelBucket)
|
||||
if closeBucket == nil {
|
||||
return ErrClosedChannelNotFound
|
||||
@ -1115,7 +1115,7 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
|
||||
graphNode LightningNode
|
||||
)
|
||||
|
||||
dbErr := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
dbErr := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
|
||||
linkNode, err = fetchLinkNode(tx, nodePub)
|
||||
@ -1312,7 +1312,7 @@ func getMigrationsToApply(versions []version, version uint32) ([]migration, []ui
|
||||
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
|
||||
// from the historical channel bucket. If the bucket does not exist,
|
||||
// ErrNoHistoricalBucket is returned.
|
||||
func fetchHistoricalChanBucket(tx kvdb.ReadTx,
|
||||
func fetchHistoricalChanBucket(tx kvdb.RTx,
|
||||
outPoint *wire.OutPoint) (kvdb.ReadBucket, error) {
|
||||
|
||||
// First fetch the top level bucket which stores all data related to
|
||||
@ -1340,7 +1340,7 @@ func fetchHistoricalChanBucket(tx kvdb.ReadTx,
|
||||
// bucket.
|
||||
func (db *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, error) {
|
||||
var channel *OpenChannel
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -204,7 +204,7 @@ func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, e
|
||||
recordsToSkip := q.IndexOffset
|
||||
recordOffset := q.IndexOffset
|
||||
|
||||
err := kvdb.View(f.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(f.db, func(tx kvdb.RTx) error {
|
||||
// If the bucket wasn't found, then there aren't any events to
|
||||
// be returned.
|
||||
logBucket := tx.ReadBucket(forwardingLogBucket)
|
||||
|
@ -396,7 +396,7 @@ type FwdPackager interface {
|
||||
|
||||
// LoadFwdPkgs loads all known forwarding packages owned by this
|
||||
// channel.
|
||||
LoadFwdPkgs(tx kvdb.ReadTx) ([]*FwdPkg, error)
|
||||
LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error)
|
||||
|
||||
// RemovePkg deletes a forwarding package owned by this channel at
|
||||
// the provided remote `height`.
|
||||
@ -497,12 +497,12 @@ func putLogUpdate(bkt kvdb.RwBucket, idx uint16, htlc *LogUpdate) error {
|
||||
// LoadFwdPkgs scans the forwarding log for any packages that haven't been
|
||||
// processed, and returns their deserialized log updates in a map indexed by the
|
||||
// remote commitment height at which the updates were locked in.
|
||||
func (p *ChannelPackager) LoadFwdPkgs(tx kvdb.ReadTx) ([]*FwdPkg, error) {
|
||||
func (p *ChannelPackager) LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error) {
|
||||
return loadChannelFwdPkgs(tx, p.source)
|
||||
}
|
||||
|
||||
// loadChannelFwdPkgs loads all forwarding packages owned by `source`.
|
||||
func loadChannelFwdPkgs(tx kvdb.ReadTx, source lnwire.ShortChannelID) ([]*FwdPkg, error) {
|
||||
func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, error) {
|
||||
fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
|
||||
if fwdPkgBkt == nil {
|
||||
return nil, nil
|
||||
|
@ -782,7 +782,7 @@ func loadFwdPkgs(t *testing.T, db kvdb.Backend,
|
||||
packager channeldb.FwdPackager) []*channeldb.FwdPkg {
|
||||
|
||||
var fwdPkgs []*channeldb.FwdPkg
|
||||
if err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
fwdPkgs, err = packager.LoadFwdPkgs(tx)
|
||||
return err
|
||||
|
@ -206,7 +206,7 @@ func (c *ChannelGraph) Database() *DB {
|
||||
func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
|
||||
// TODO(roasbeef): ptr map to reduce # of allocs? no duplicates
|
||||
|
||||
return kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
return kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
// First, grab the node bucket. This will be used to populate
|
||||
// the Node pointers in each edge read from disk.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
@ -265,8 +265,8 @@ func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, *ChannelEdgePoli
|
||||
// should be passed as the first argument. Otherwise the first argument should
|
||||
// be nil and a fresh transaction will be created to execute the graph
|
||||
// traversal.
|
||||
func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.ReadTx, nodePub []byte,
|
||||
cb func(kvdb.ReadTx, *ChannelEdgeInfo, *ChannelEdgePolicy,
|
||||
func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, nodePub []byte,
|
||||
cb func(kvdb.RTx, *ChannelEdgeInfo, *ChannelEdgePolicy,
|
||||
*ChannelEdgePolicy) error) error {
|
||||
|
||||
db := c.db
|
||||
@ -281,7 +281,7 @@ func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) {
|
||||
var disabledChanIDs []uint64
|
||||
chanEdgeFound := make(map[uint64]struct{})
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return ErrGraphNoEdgesFound
|
||||
@ -323,8 +323,8 @@ func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) {
|
||||
//
|
||||
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
|
||||
// traversal when graph gets mega
|
||||
func (c *ChannelGraph) ForEachNode(cb func(kvdb.ReadTx, *LightningNode) error) error { // nolint:interfacer
|
||||
traversal := func(tx kvdb.ReadTx) error {
|
||||
func (c *ChannelGraph) ForEachNode(cb func(kvdb.RTx, *LightningNode) error) error { // nolint:interfacer
|
||||
traversal := func(tx kvdb.RTx) error {
|
||||
// First grab the nodes bucket which stores the mapping from
|
||||
// pubKey to node information.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
@ -362,7 +362,7 @@ func (c *ChannelGraph) ForEachNode(cb func(kvdb.ReadTx, *LightningNode) error) e
|
||||
// node based off the source node.
|
||||
func (c *ChannelGraph) SourceNode() (*LightningNode, error) {
|
||||
var source *LightningNode
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
// First grab the nodes bucket which stores the mapping from
|
||||
// pubKey to node information.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
@ -472,7 +472,7 @@ func addLightningNode(tx kvdb.RwTx, node *LightningNode) error {
|
||||
func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
|
||||
var alias string
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
if nodes == nil {
|
||||
return ErrGraphNodesNotFound
|
||||
@ -720,7 +720,7 @@ func (c *ChannelGraph) HasChannelEdge(
|
||||
return upd1Time, upd2Time, exists, isZombie, nil
|
||||
}
|
||||
|
||||
if err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return ErrGraphNoEdgesFound
|
||||
@ -1210,7 +1210,7 @@ func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) {
|
||||
tipHeight uint32
|
||||
)
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
graphMeta := tx.ReadBucket(graphMetaBucket)
|
||||
if graphMeta == nil {
|
||||
return ErrGraphNotFound
|
||||
@ -1308,7 +1308,7 @@ func (c *ChannelGraph) DeleteChannelEdges(chanIDs ...uint64) error {
|
||||
// the database, then ErrEdgeNotFound is returned.
|
||||
func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
|
||||
var chanID uint64
|
||||
if err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
chanID, err = getChanID(tx, chanPoint)
|
||||
return err
|
||||
@ -1320,7 +1320,7 @@ func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
|
||||
}
|
||||
|
||||
// getChanID returns the assigned channel ID for a given channel point.
|
||||
func getChanID(tx kvdb.ReadTx, chanPoint *wire.OutPoint) (uint64, error) {
|
||||
func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) {
|
||||
var b bytes.Buffer
|
||||
if err := writeOutpoint(&b, chanPoint); err != nil {
|
||||
return 0, err
|
||||
@ -1353,7 +1353,7 @@ func getChanID(tx kvdb.ReadTx, chanPoint *wire.OutPoint) (uint64, error) {
|
||||
func (c *ChannelGraph) HighestChanID() (uint64, error) {
|
||||
var cid uint64
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return ErrGraphNoEdgesFound
|
||||
@ -1417,7 +1417,7 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
|
||||
defer c.cacheMu.Unlock()
|
||||
|
||||
var hits int
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return ErrGraphNoEdgesFound
|
||||
@ -1537,7 +1537,7 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
|
||||
func (c *ChannelGraph) NodeUpdatesInHorizon(startTime, endTime time.Time) ([]LightningNode, error) {
|
||||
var nodesInHorizon []LightningNode
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
if nodes == nil {
|
||||
return ErrGraphNodesNotFound
|
||||
@ -1599,7 +1599,7 @@ func (c *ChannelGraph) NodeUpdatesInHorizon(startTime, endTime time.Time) ([]Lig
|
||||
func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
|
||||
var newChanIDs []uint64
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return ErrGraphNoEdgesFound
|
||||
@ -1675,7 +1675,7 @@ func (c *ChannelGraph) FilterChannelRange(startHeight, endHeight uint32) ([]uint
|
||||
byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
|
||||
byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return ErrGraphNoEdgesFound
|
||||
@ -1728,7 +1728,7 @@ func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
|
||||
cidBytes [8]byte
|
||||
)
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return ErrGraphNoEdgesFound
|
||||
@ -2127,14 +2127,14 @@ func (l *LightningNode) NodeAnnouncement(signed bool) (*lnwire.NodeAnnouncement,
|
||||
// isPublic determines whether the node is seen as public within the graph from
|
||||
// the source node's point of view. An existing database transaction can also be
|
||||
// specified.
|
||||
func (l *LightningNode) isPublic(tx kvdb.ReadTx, sourcePubKey []byte) (bool, error) {
|
||||
func (l *LightningNode) isPublic(tx kvdb.RTx, sourcePubKey []byte) (bool, error) {
|
||||
// In order to determine whether this node is publicly advertised within
|
||||
// the graph, we'll need to look at all of its edges and check whether
|
||||
// they extend to any other node than the source node. errDone will be
|
||||
// used to terminate the check early.
|
||||
nodeIsPublic := false
|
||||
errDone := errors.New("done")
|
||||
err := l.ForEachChannel(tx, func(_ kvdb.ReadTx, info *ChannelEdgeInfo,
|
||||
err := l.ForEachChannel(tx, func(_ kvdb.RTx, info *ChannelEdgeInfo,
|
||||
_, _ *ChannelEdgePolicy) error {
|
||||
|
||||
// If this edge doesn't extend to the source node, we'll
|
||||
@ -2173,12 +2173,12 @@ func (l *LightningNode) isPublic(tx kvdb.ReadTx, sourcePubKey []byte) (bool, err
|
||||
// should be passed as the first argument. Otherwise the first argument should
|
||||
// be nil and a fresh transaction will be created to execute the graph
|
||||
// traversal.
|
||||
func (c *ChannelGraph) FetchLightningNode(tx kvdb.ReadTx, nodePub route.Vertex) (
|
||||
func (c *ChannelGraph) FetchLightningNode(tx kvdb.RTx, nodePub route.Vertex) (
|
||||
*LightningNode, error) {
|
||||
|
||||
var node *LightningNode
|
||||
|
||||
fetchNode := func(tx kvdb.ReadTx) error {
|
||||
fetchNode := func(tx kvdb.RTx) error {
|
||||
// First grab the nodes bucket which stores the mapping from
|
||||
// pubKey to node information.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
@ -2231,7 +2231,7 @@ func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool, erro
|
||||
exists bool
|
||||
)
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
// First grab the nodes bucket which stores the mapping from
|
||||
// pubKey to node information.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
@ -2269,10 +2269,10 @@ func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool, erro
|
||||
|
||||
// nodeTraversal is used to traverse all channels of a node given by its
|
||||
// public key and passes channel information into the specified callback.
|
||||
func nodeTraversal(tx kvdb.ReadTx, nodePub []byte, db *DB,
|
||||
cb func(kvdb.ReadTx, *ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
|
||||
func nodeTraversal(tx kvdb.RTx, nodePub []byte, db *DB,
|
||||
cb func(kvdb.RTx, *ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
|
||||
|
||||
traversal := func(tx kvdb.ReadTx) error {
|
||||
traversal := func(tx kvdb.RTx) error {
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
if nodes == nil {
|
||||
return ErrGraphNotFound
|
||||
@ -2367,8 +2367,8 @@ func nodeTraversal(tx kvdb.ReadTx, nodePub []byte, db *DB,
|
||||
// should be passed as the first argument. Otherwise the first argument should
|
||||
// be nil and a fresh transaction will be created to execute the graph
|
||||
// traversal.
|
||||
func (l *LightningNode) ForEachChannel(tx kvdb.ReadTx,
|
||||
cb func(kvdb.ReadTx, *ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
|
||||
func (l *LightningNode) ForEachChannel(tx kvdb.RTx,
|
||||
cb func(kvdb.RTx, *ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
|
||||
|
||||
nodePub := l.PubKeyBytes[:]
|
||||
db := l.db
|
||||
@ -2559,7 +2559,7 @@ func (c *ChannelEdgeInfo) OtherNodeKeyBytes(thisNodeKey []byte) (
|
||||
// the target node in the channel. This is useful when one knows the pubkey of
|
||||
// one of the nodes, and wishes to obtain the full LightningNode for the other
|
||||
// end of the channel.
|
||||
func (c *ChannelEdgeInfo) FetchOtherNode(tx kvdb.ReadTx, thisNodeKey []byte) (*LightningNode, error) {
|
||||
func (c *ChannelEdgeInfo) FetchOtherNode(tx kvdb.RTx, thisNodeKey []byte) (*LightningNode, error) {
|
||||
|
||||
// Ensure that the node passed in is actually a member of the channel.
|
||||
var targetNodeBytes [33]byte
|
||||
@ -2573,7 +2573,7 @@ func (c *ChannelEdgeInfo) FetchOtherNode(tx kvdb.ReadTx, thisNodeKey []byte) (*L
|
||||
}
|
||||
|
||||
var targetNode *LightningNode
|
||||
fetchNodeFunc := func(tx kvdb.ReadTx) error {
|
||||
fetchNodeFunc := func(tx kvdb.RTx) error {
|
||||
// First grab the nodes bucket which stores the mapping from
|
||||
// pubKey to node information.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
@ -2872,7 +2872,7 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint,
|
||||
policy2 *ChannelEdgePolicy
|
||||
)
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
// First, grab the node bucket. This will be used to populate
|
||||
// the Node pointers in each edge read from disk.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
@ -2956,7 +2956,7 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64,
|
||||
channelID [8]byte
|
||||
)
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
// First, grab the node bucket. This will be used to populate
|
||||
// the Node pointers in each edge read from disk.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
@ -3046,7 +3046,7 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64,
|
||||
// source node's point of view.
|
||||
func (c *ChannelGraph) IsPublicNode(pubKey [33]byte) (bool, error) {
|
||||
var nodeIsPublic bool
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
if nodes == nil {
|
||||
return ErrGraphNodesNotFound
|
||||
@ -3132,7 +3132,7 @@ func (e *EdgePoint) String() string {
|
||||
// closes on the resident blockchain.
|
||||
func (c *ChannelGraph) ChannelView() ([]EdgePoint, error) {
|
||||
var edgePoints []EdgePoint
|
||||
if err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
// We're going to iterate over the entire channel index, so
|
||||
// we'll need to fetch the edgeBucket to get to the index as
|
||||
// it's a sub-bucket.
|
||||
@ -3249,7 +3249,7 @@ func (c *ChannelGraph) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) {
|
||||
pubKey1, pubKey2 [33]byte
|
||||
)
|
||||
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return ErrGraphNoEdgesFound
|
||||
@ -3293,7 +3293,7 @@ func isZombieEdge(zombieIndex kvdb.ReadBucket,
|
||||
// NumZombies returns the current number of zombie channels in the graph.
|
||||
func (c *ChannelGraph) NumZombies() (uint64, error) {
|
||||
var numZombies uint64
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return nil
|
||||
|
@ -882,7 +882,7 @@ func TestGraphTraversal(t *testing.T) {
|
||||
|
||||
// Iterate over each node as returned by the graph, if all nodes are
|
||||
// reached, then the map created above should be empty.
|
||||
err = graph.ForEachNode(func(_ kvdb.ReadTx, node *LightningNode) error {
|
||||
err = graph.ForEachNode(func(_ kvdb.RTx, node *LightningNode) error {
|
||||
delete(nodeIndex, node.Alias)
|
||||
return nil
|
||||
})
|
||||
@ -978,7 +978,7 @@ func TestGraphTraversal(t *testing.T) {
|
||||
// Finally, we want to test the ability to iterate over all the
|
||||
// outgoing channels for a particular node.
|
||||
numNodeChans := 0
|
||||
err = firstNode.ForEachChannel(nil, func(_ kvdb.ReadTx, _ *ChannelEdgeInfo,
|
||||
err = firstNode.ForEachChannel(nil, func(_ kvdb.RTx, _ *ChannelEdgeInfo,
|
||||
outEdge, inEdge *ChannelEdgePolicy) error {
|
||||
|
||||
// All channels between first and second node should have fully
|
||||
@ -1051,7 +1051,7 @@ func assertNumChans(t *testing.T, graph *ChannelGraph, n int) {
|
||||
|
||||
func assertNumNodes(t *testing.T, graph *ChannelGraph, n int) {
|
||||
numNodes := 0
|
||||
err := graph.ForEachNode(func(_ kvdb.ReadTx, _ *LightningNode) error {
|
||||
err := graph.ForEachNode(func(_ kvdb.RTx, _ *LightningNode) error {
|
||||
numNodes++
|
||||
return nil
|
||||
})
|
||||
@ -2099,7 +2099,7 @@ func TestIncompleteChannelPolicies(t *testing.T) {
|
||||
// Ensure that channel is reported with unknown policies.
|
||||
checkPolicies := func(node *LightningNode, expectedIn, expectedOut bool) {
|
||||
calls := 0
|
||||
err := node.ForEachChannel(nil, func(_ kvdb.ReadTx, _ *ChannelEdgeInfo,
|
||||
err := node.ForEachChannel(nil, func(_ kvdb.RTx, _ *ChannelEdgeInfo,
|
||||
outEdge, inEdge *ChannelEdgePolicy) error {
|
||||
|
||||
if !expectedOut && outEdge != nil {
|
||||
@ -2235,7 +2235,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
|
||||
timestampSet[t] = struct{}{}
|
||||
}
|
||||
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
edges := tx.ReadBucket(edgeBucket)
|
||||
if edges == nil {
|
||||
return ErrGraphNoEdgesFound
|
||||
@ -2844,7 +2844,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) {
|
||||
|
||||
// Attempting to deserialize these bytes should return an error.
|
||||
r := bytes.NewReader(stripped)
|
||||
err = kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err = kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
if nodes == nil {
|
||||
return ErrGraphNotFound
|
||||
|
@ -485,7 +485,7 @@ func (d *DB) InvoicesAddedSince(sinceAddIndex uint64) ([]Invoice, error) {
|
||||
var startIndex [8]byte
|
||||
byteOrder.PutUint64(startIndex[:], sinceAddIndex)
|
||||
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
invoices := tx.ReadBucket(invoiceBucket)
|
||||
if invoices == nil {
|
||||
return ErrNoInvoicesCreated
|
||||
@ -540,7 +540,7 @@ func (d *DB) InvoicesAddedSince(sinceAddIndex uint64) ([]Invoice, error) {
|
||||
// terms of the payment.
|
||||
func (d *DB) LookupInvoice(paymentHash [32]byte) (Invoice, error) {
|
||||
var invoice Invoice
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
invoices := tx.ReadBucket(invoiceBucket)
|
||||
if invoices == nil {
|
||||
return ErrNoInvoicesCreated
|
||||
@ -595,7 +595,7 @@ func (d *DB) FetchAllInvoicesWithPaymentHash(pendingOnly bool) (
|
||||
|
||||
var result []InvoiceWithPaymentHash
|
||||
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
invoices := tx.ReadBucket(invoiceBucket)
|
||||
if invoices == nil {
|
||||
return ErrNoInvoicesCreated
|
||||
@ -701,7 +701,7 @@ func (d *DB) QueryInvoices(q InvoiceQuery) (InvoiceSlice, error) {
|
||||
InvoiceQuery: q,
|
||||
}
|
||||
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
// If the bucket wasn't found, then there aren't any invoices
|
||||
// within the database yet, so we can simply exit.
|
||||
invoices := tx.ReadBucket(invoiceBucket)
|
||||
@ -883,7 +883,7 @@ func (d *DB) InvoicesSettledSince(sinceSettleIndex uint64) ([]Invoice, error) {
|
||||
var startIndex [8]byte
|
||||
byteOrder.PutUint64(startIndex[:], sinceSettleIndex)
|
||||
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
invoices := tx.ReadBucket(invoiceBucket)
|
||||
if invoices == nil {
|
||||
return ErrNoInvoicesCreated
|
||||
|
@ -22,7 +22,7 @@ func Update(db Backend, f func(tx RwTx) error) error {
|
||||
// transaction passed as a parameter. After f exits, the transaction is rolled
|
||||
// back. If f errors, its error is returned, not a rollback error (if any
|
||||
// occur).
|
||||
func View(db Backend, f func(tx ReadTx) error) error {
|
||||
func View(db Backend, f func(tx RTx) error) error {
|
||||
if extendedDB, ok := db.(ExtendedBackend); ok {
|
||||
return extendedDB.View(f)
|
||||
}
|
||||
@ -90,9 +90,9 @@ type ReadBucket = walletdb.ReadBucket
|
||||
// This type is only allowed to perform database read operations.
|
||||
type ReadCursor = walletdb.ReadCursor
|
||||
|
||||
// ReadTx represents a database transaction that can only be used for reads. If
|
||||
// RTx represents a database transaction that can only be used for reads. If
|
||||
// a database update must occur, use a RwTx.
|
||||
type ReadTx = walletdb.ReadTx
|
||||
type RTx = walletdb.ReadTx
|
||||
|
||||
// RwBucket represents a bucket (a hierarchical structure within the database)
|
||||
// that is allowed to perform both read and write operations.
|
||||
@ -105,7 +105,7 @@ type RwBucket = walletdb.ReadWriteBucket
|
||||
type RwCursor = walletdb.ReadWriteCursor
|
||||
|
||||
// ReadWriteTx represents a database transaction that can be used for both
|
||||
// reads and writes. When only reads are necessary, consider using a ReadTx
|
||||
// reads and writes. When only reads are necessary, consider using a RTx
|
||||
// instead.
|
||||
type RwTx = walletdb.ReadWriteTx
|
||||
|
||||
|
@ -22,10 +22,10 @@ type Meta struct {
|
||||
|
||||
// FetchMeta fetches the meta data from boltdb and returns filled meta
|
||||
// structure.
|
||||
func (d *DB) FetchMeta(tx kvdb.ReadTx) (*Meta, error) {
|
||||
func (d *DB) FetchMeta(tx kvdb.RTx) (*Meta, error) {
|
||||
meta := &Meta{}
|
||||
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
return fetchMeta(meta, tx)
|
||||
})
|
||||
if err != nil {
|
||||
@ -38,7 +38,7 @@ func (d *DB) FetchMeta(tx kvdb.ReadTx) (*Meta, error) {
|
||||
// fetchMeta is an internal helper function used in order to allow callers to
|
||||
// re-use a database transaction. See the publicly exported FetchMeta method
|
||||
// for more information.
|
||||
func fetchMeta(meta *Meta, tx kvdb.ReadTx) error {
|
||||
func fetchMeta(meta *Meta, tx kvdb.RTx) error {
|
||||
metaBucket := tx.ReadBucket(metaBucket)
|
||||
if metaBucket == nil {
|
||||
return ErrMetaNotFound
|
||||
|
@ -481,7 +481,7 @@ func TestMigrationDryRun(t *testing.T) {
|
||||
|
||||
// Check that version of database version is not modified.
|
||||
afterMigrationFunc := func(d *DB) {
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
meta, err := d.FetchMeta(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -180,7 +180,7 @@ func fileExists(path string) bool {
|
||||
func (d *DB) FetchClosedChannels(pendingOnly bool) ([]*ChannelCloseSummary, error) {
|
||||
var chanSummaries []*ChannelCloseSummary
|
||||
|
||||
if err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
closeBucket := tx.ReadBucket(closedChannelBucket)
|
||||
if closeBucket == nil {
|
||||
return ErrNoClosedChannels
|
||||
|
@ -175,7 +175,7 @@ func newChannelGraph(db *DB, rejectCacheSize, chanCacheSize int) *ChannelGraph {
|
||||
// node based off the source node.
|
||||
func (c *ChannelGraph) SourceNode() (*LightningNode, error) {
|
||||
var source *LightningNode
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
// First grab the nodes bucket which stores the mapping from
|
||||
// pubKey to node information.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
|
@ -252,7 +252,7 @@ func validateInvoice(i *Invoice) error {
|
||||
func (d *DB) FetchAllInvoices(pendingOnly bool) ([]Invoice, error) {
|
||||
var invoices []Invoice
|
||||
|
||||
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
invoiceB := tx.ReadBucket(invoiceBucket)
|
||||
if invoiceB == nil {
|
||||
return ErrNoInvoicesCreated
|
||||
|
@ -104,7 +104,7 @@ func (db *DB) addPayment(payment *outgoingPayment) error {
|
||||
func (db *DB) fetchAllPayments() ([]*outgoingPayment, error) {
|
||||
var payments []*outgoingPayment
|
||||
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
bucket := tx.ReadBucket(paymentBucket)
|
||||
if bucket == nil {
|
||||
return ErrNoPaymentsCreated
|
||||
@ -140,7 +140,7 @@ func (db *DB) fetchAllPayments() ([]*outgoingPayment, error) {
|
||||
// NOTE: Deprecated. Kept around for migration purposes.
|
||||
func (db *DB) fetchPaymentStatus(paymentHash [32]byte) (PaymentStatus, error) {
|
||||
var paymentStatus = StatusUnknown
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
paymentStatus, err = fetchPaymentStatusTx(tx, paymentHash)
|
||||
return err
|
||||
@ -158,7 +158,7 @@ func (db *DB) fetchPaymentStatus(paymentHash [32]byte) (PaymentStatus, error) {
|
||||
// can be composed into other atomic operations.
|
||||
//
|
||||
// NOTE: Deprecated. Kept around for migration purposes.
|
||||
func fetchPaymentStatusTx(tx kvdb.ReadTx, paymentHash [32]byte) (PaymentStatus, error) {
|
||||
func fetchPaymentStatusTx(tx kvdb.RTx, paymentHash [32]byte) (PaymentStatus, error) {
|
||||
// The default status for all payments that aren't recorded in database.
|
||||
var paymentStatus = StatusUnknown
|
||||
|
||||
@ -375,7 +375,7 @@ func deserializeHopMigration9(r io.Reader) (*Hop, error) {
|
||||
func (db *DB) fetchPaymentsMigration9() ([]*Payment, error) {
|
||||
var payments []*Payment
|
||||
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
paymentsBucket := tx.ReadBucket(paymentsRootBucket)
|
||||
if paymentsBucket == nil {
|
||||
return nil
|
||||
|
@ -404,7 +404,7 @@ func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) {
|
||||
newSerialization := b.Bytes()
|
||||
|
||||
var dbSummary []byte
|
||||
err = kvdb.View(d, func(tx kvdb.ReadTx) error {
|
||||
err = kvdb.View(d, func(tx kvdb.RTx) error {
|
||||
closedChanBucket := tx.ReadBucket(closedChannelBucket)
|
||||
if closedChanBucket == nil {
|
||||
return errors.New("unable to find bucket")
|
||||
@ -503,7 +503,7 @@ func TestMigrateGossipMessageStoreKeys(t *testing.T) {
|
||||
// 3. The message matches the original.
|
||||
afterMigration := func(db *DB) {
|
||||
var rawMsg []byte
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
messageStore := tx.ReadBucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return errors.New("message store bucket not " +
|
||||
|
@ -254,7 +254,7 @@ type Payment struct {
|
||||
func (db *DB) FetchPayments() ([]*Payment, error) {
|
||||
var payments []*Payment
|
||||
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
paymentsBucket := tx.ReadBucket(paymentsRootBucket)
|
||||
if paymentsBucket == nil {
|
||||
return nil
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
// hex("1111"): hex("5783492373"),
|
||||
// },
|
||||
// }
|
||||
func DumpDB(tx kvdb.ReadTx, rootKey []byte) error {
|
||||
func DumpDB(tx kvdb.RTx, rootKey []byte) error {
|
||||
bucket := tx.ReadBucket(rootKey)
|
||||
if bucket == nil {
|
||||
return fmt.Errorf("bucket %v not found", string(rootKey))
|
||||
@ -100,7 +100,7 @@ func restoreDB(bucket kvdb.RwBucket, data map[string]interface{}) error {
|
||||
}
|
||||
|
||||
// VerifyDB verifies the database against the given data set.
|
||||
func VerifyDB(tx kvdb.ReadTx, rootKey []byte, data map[string]interface{}) error {
|
||||
func VerifyDB(tx kvdb.RTx, rootKey []byte, data map[string]interface{}) error {
|
||||
bucket := tx.ReadBucket(rootKey)
|
||||
if bucket == nil {
|
||||
return fmt.Errorf("bucket %v not found", string(rootKey))
|
||||
|
@ -150,7 +150,7 @@ func (db *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
|
||||
// key cannot be found, then ErrNodeNotFound if returned.
|
||||
func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
|
||||
var linkNode *LinkNode
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
node, err := fetchLinkNode(tx, identity)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -163,7 +163,7 @@ func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
|
||||
return linkNode, err
|
||||
}
|
||||
|
||||
func fetchLinkNode(tx kvdb.ReadTx, targetPub *btcec.PublicKey) (*LinkNode, error) {
|
||||
func fetchLinkNode(tx kvdb.RTx, targetPub *btcec.PublicKey) (*LinkNode, error) {
|
||||
// First fetch the bucket for storing node metadata, bailing out early
|
||||
// if it hasn't been created yet.
|
||||
nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
|
||||
@ -191,7 +191,7 @@ func fetchLinkNode(tx kvdb.ReadTx, targetPub *btcec.PublicKey) (*LinkNode, error
|
||||
// whom we have active channels with.
|
||||
func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) {
|
||||
var linkNodes []*LinkNode
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
nodes, err := db.fetchAllLinkNodes(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -209,7 +209,7 @@ func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) {
|
||||
|
||||
// fetchAllLinkNodes uses an existing database transaction to fetch all nodes
|
||||
// with whom we have active channels with.
|
||||
func (db *DB) fetchAllLinkNodes(tx kvdb.ReadTx) ([]*LinkNode, error) {
|
||||
func (db *DB) fetchAllLinkNodes(tx kvdb.RTx) ([]*LinkNode, error) {
|
||||
nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
|
||||
if nodeMetaBucket == nil {
|
||||
return nil, ErrLinkNodesNotFound
|
||||
|
@ -462,7 +462,7 @@ func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
|
||||
*MPPayment, error) {
|
||||
|
||||
var payment *MPPayment
|
||||
err := kvdb.View(p.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(p.db, func(tx kvdb.RTx) error {
|
||||
bucket, err := fetchPaymentBucket(tx, paymentHash)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -494,7 +494,7 @@ func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
|
||||
|
||||
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
|
||||
// the bucket does not exist, it returns ErrPaymentNotInitiated.
|
||||
func fetchPaymentBucket(tx kvdb.ReadTx, paymentHash lntypes.Hash) (
|
||||
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
|
||||
kvdb.ReadBucket, error) {
|
||||
|
||||
payments := tx.ReadBucket(paymentsRootBucket)
|
||||
@ -609,7 +609,7 @@ type InFlightPayment struct {
|
||||
// FetchInFlightPayments returns all payments with status InFlight.
|
||||
func (p *PaymentControl) FetchInFlightPayments() ([]*InFlightPayment, error) {
|
||||
var inFlights []*InFlightPayment
|
||||
err := kvdb.View(p.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(p.db, func(tx kvdb.RTx) error {
|
||||
payments := tx.ReadBucket(paymentsRootBucket)
|
||||
if payments == nil {
|
||||
return nil
|
||||
|
@ -206,7 +206,7 @@ type PaymentCreationInfo struct {
|
||||
func (db *DB) FetchPayments() ([]*MPPayment, error) {
|
||||
var payments []*MPPayment
|
||||
|
||||
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
paymentsBucket := tx.ReadBucket(paymentsRootBucket)
|
||||
if paymentsBucket == nil {
|
||||
return nil
|
||||
|
@ -123,7 +123,7 @@ func (s *WaitingProofStore) Remove(key WaitingProofKey) error {
|
||||
// ForAll iterates thought all waiting proofs and passing the waiting proof
|
||||
// in the given callback.
|
||||
func (s *WaitingProofStore) ForAll(cb func(*WaitingProof) error) error {
|
||||
return kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
return kvdb.View(s.db, func(tx kvdb.RTx) error {
|
||||
bucket := tx.ReadBucket(waitingProofsBucketKey)
|
||||
if bucket == nil {
|
||||
return ErrWaitingProofNotFound
|
||||
@ -158,7 +158,7 @@ func (s *WaitingProofStore) Get(key WaitingProofKey) (*WaitingProof, error) {
|
||||
return nil, ErrWaitingProofNotFound
|
||||
}
|
||||
|
||||
err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
|
||||
bucket := tx.ReadBucket(waitingProofsBucketKey)
|
||||
if bucket == nil {
|
||||
return ErrWaitingProofNotFound
|
||||
|
@ -150,7 +150,7 @@ func (w *WitnessCache) LookupSha256Witness(hash lntypes.Hash) (lntypes.Preimage,
|
||||
// will be returned.
|
||||
func (w *WitnessCache) lookupWitness(wType WitnessType, witnessKey []byte) ([]byte, error) {
|
||||
var witness []byte
|
||||
err := kvdb.View(w.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(w.db, func(tx kvdb.RTx) error {
|
||||
witnessBucket := tx.ReadBucket(witnessBucketKey)
|
||||
if witnessBucket == nil {
|
||||
return ErrNoWitnesses
|
||||
|
@ -337,7 +337,7 @@ func newBoltArbitratorLog(db kvdb.Backend, cfg ChannelArbitratorConfig,
|
||||
// interface.
|
||||
var _ ArbitratorLog = (*boltArbitratorLog)(nil)
|
||||
|
||||
func fetchContractReadBucket(tx kvdb.ReadTx, scopeKey []byte) (kvdb.ReadBucket, error) {
|
||||
func fetchContractReadBucket(tx kvdb.RTx, scopeKey []byte) (kvdb.ReadBucket, error) {
|
||||
scopeBucket := tx.ReadBucket(scopeKey)
|
||||
if scopeBucket == nil {
|
||||
return nil, errScopeBucketNoExist
|
||||
@ -415,7 +415,7 @@ func (b *boltArbitratorLog) writeResolver(contractBucket kvdb.RwBucket,
|
||||
// NOTE: Part of the ContractResolver interface.
|
||||
func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) {
|
||||
var s ArbitratorState
|
||||
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
|
||||
scopeBucket := tx.ReadBucket(b.scopeKey[:])
|
||||
if scopeBucket == nil {
|
||||
return errScopeBucketNoExist
|
||||
@ -461,7 +461,7 @@ func (b *boltArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver, erro
|
||||
Checkpoint: b.checkpointContract,
|
||||
}
|
||||
var contracts []ContractResolver
|
||||
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
|
||||
contractBucket, err := fetchContractReadBucket(tx, b.scopeKey[:])
|
||||
if err != nil {
|
||||
return err
|
||||
@ -675,7 +675,7 @@ func (b *boltArbitratorLog) LogContractResolutions(c *ContractResolutions) error
|
||||
// NOTE: Part of the ContractResolver interface.
|
||||
func (b *boltArbitratorLog) FetchContractResolutions() (*ContractResolutions, error) {
|
||||
c := &ContractResolutions{}
|
||||
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
|
||||
scopeBucket := tx.ReadBucket(b.scopeKey[:])
|
||||
if scopeBucket == nil {
|
||||
return errScopeBucketNoExist
|
||||
@ -774,7 +774,7 @@ func (b *boltArbitratorLog) FetchContractResolutions() (*ContractResolutions, er
|
||||
func (b *boltArbitratorLog) FetchChainActions() (ChainActionMap, error) {
|
||||
actionsMap := make(ChainActionMap)
|
||||
|
||||
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
|
||||
scopeBucket := tx.ReadBucket(b.scopeKey[:])
|
||||
if scopeBucket == nil {
|
||||
return errScopeBucketNoExist
|
||||
@ -837,7 +837,7 @@ func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error {
|
||||
// NOTE: Part of the ContractResolver interface.
|
||||
func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
|
||||
var c *CommitSet
|
||||
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
|
||||
scopeBucket := tx.ReadBucket(b.scopeKey[:])
|
||||
if scopeBucket == nil {
|
||||
return errScopeBucketNoExist
|
||||
|
@ -200,7 +200,7 @@ func readMessage(msgBytes []byte) (lnwire.Message, error) {
|
||||
// all peers.
|
||||
func (s *MessageStore) Messages() (map[[33]byte][]lnwire.Message, error) {
|
||||
msgs := make(map[[33]byte][]lnwire.Message)
|
||||
err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
|
||||
messageStore := tx.ReadBucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return ErrCorruptedMessageStore
|
||||
@ -238,7 +238,7 @@ func (s *MessageStore) MessagesForPeer(
|
||||
peerPubKey [33]byte) ([]lnwire.Message, error) {
|
||||
|
||||
var msgs []lnwire.Message
|
||||
err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
|
||||
messageStore := tx.ReadBucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return ErrCorruptedMessageStore
|
||||
@ -273,7 +273,7 @@ func (s *MessageStore) MessagesForPeer(
|
||||
// Peers returns the public key of all peers with messages within the store.
|
||||
func (s *MessageStore) Peers() (map[[33]byte]struct{}, error) {
|
||||
peers := make(map[[33]byte]struct{})
|
||||
err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
|
||||
messageStore := tx.ReadBucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return ErrCorruptedMessageStore
|
||||
|
@ -3477,7 +3477,7 @@ func (f *fundingManager) getChannelOpeningState(chanPoint *wire.OutPoint) (
|
||||
|
||||
var state channelOpeningState
|
||||
var shortChanID lnwire.ShortChannelID
|
||||
err := kvdb.View(f.cfg.Wallet.Cfg.Database, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RTx) error {
|
||||
|
||||
bucket := tx.ReadBucket(channelOpeningStateBucket)
|
||||
if bucket == nil {
|
||||
|
@ -261,7 +261,7 @@ func (d *DecayedLog) Delete(hash *sphinx.HashPrefix) error {
|
||||
func (d *DecayedLog) Get(hash *sphinx.HashPrefix) (uint32, error) {
|
||||
var value uint32
|
||||
|
||||
err := kvdb.View(d.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(d.db, func(tx kvdb.RTx) error {
|
||||
// Grab the shared hash bucket which stores the mapping from
|
||||
// truncated sha-256 hashes of shared secrets to CLTV's.
|
||||
sharedHashes := tx.ReadBucket(sharedHashBucket)
|
||||
|
@ -5093,7 +5093,7 @@ func (*mockPackager) AckAddHtlcs(tx kvdb.RwTx,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockPackager) LoadFwdPkgs(tx kvdb.ReadTx) ([]*channeldb.FwdPkg, error) {
|
||||
func (m *mockPackager) LoadFwdPkgs(tx kvdb.RTx) ([]*channeldb.FwdPkg, error) {
|
||||
if m.failLoadFwdPkgs {
|
||||
return nil, fmt.Errorf("failing LoadFwdPkgs")
|
||||
}
|
||||
|
@ -180,7 +180,7 @@ func (store *networkResultStore) subscribeResult(paymentID uint64) (
|
||||
resultChan = make(chan *networkResult, 1)
|
||||
)
|
||||
|
||||
err := kvdb.View(store.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(store.db, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
result, err = fetchResult(tx, paymentID)
|
||||
switch {
|
||||
@ -226,7 +226,7 @@ func (store *networkResultStore) getResult(pid uint64) (
|
||||
*networkResult, error) {
|
||||
|
||||
var result *networkResult
|
||||
err := kvdb.View(store.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(store.db, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
result, err = fetchResult(tx, pid)
|
||||
return err
|
||||
@ -238,7 +238,7 @@ func (store *networkResultStore) getResult(pid uint64) (
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func fetchResult(tx kvdb.ReadTx, pid uint64) (*networkResult, error) {
|
||||
func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
|
||||
var paymentIDBytes [8]byte
|
||||
binary.BigEndian.PutUint64(paymentIDBytes[:], pid)
|
||||
|
||||
|
@ -132,7 +132,7 @@ func (r *RootKeyStorage) Get(_ context.Context, id []byte) ([]byte, error) {
|
||||
return nil, ErrStoreLocked
|
||||
}
|
||||
var rootKey []byte
|
||||
err := kvdb.View(r, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(r, func(tx kvdb.RTx) error {
|
||||
dbKey := tx.ReadBucket(rootKeyBucketName).Get(id)
|
||||
if len(dbKey) == 0 {
|
||||
return fmt.Errorf("root key with id %s doesn't exist",
|
||||
|
@ -539,7 +539,7 @@ func (ns *nurseryStore) FetchClass(
|
||||
// processed at the provided block height.
|
||||
var kids []kidOutput
|
||||
var babies []babyOutput
|
||||
if err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(ns.db, func(tx kvdb.RTx) error {
|
||||
// Append each crib output to our list of babyOutputs.
|
||||
if err := ns.forEachHeightPrefix(tx, cribPrefix, height,
|
||||
func(buf []byte) error {
|
||||
@ -593,7 +593,7 @@ func (ns *nurseryStore) FetchClass(
|
||||
// preschool bucket.
|
||||
func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) {
|
||||
var kids []kidOutput
|
||||
if err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(ns.db, func(tx kvdb.RTx) error {
|
||||
|
||||
// Retrieve the existing chain bucket for this nursery store.
|
||||
chainBucket := tx.ReadBucket(ns.pfxChainKey)
|
||||
@ -666,7 +666,7 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) {
|
||||
// index at or below the provided upper bound.
|
||||
func (ns *nurseryStore) HeightsBelowOrEqual(height uint32) ([]uint32, error) {
|
||||
var activeHeights []uint32
|
||||
err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(ns.db, func(tx kvdb.RTx) error {
|
||||
// Ensure that the chain bucket for this nursery store exists.
|
||||
chainBucket := tx.ReadBucket(ns.pfxChainKey)
|
||||
if chainBucket == nil {
|
||||
@ -711,7 +711,7 @@ func (ns *nurseryStore) HeightsBelowOrEqual(height uint32) ([]uint32, error) {
|
||||
func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint,
|
||||
callback func([]byte, []byte) error) error {
|
||||
|
||||
return kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
|
||||
return kvdb.View(ns.db, func(tx kvdb.RTx) error {
|
||||
return ns.forChanOutputs(tx, chanPoint, callback)
|
||||
})
|
||||
}
|
||||
@ -719,7 +719,7 @@ func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint,
|
||||
// ListChannels returns all channels the nursery is currently tracking.
|
||||
func (ns *nurseryStore) ListChannels() ([]wire.OutPoint, error) {
|
||||
var activeChannels []wire.OutPoint
|
||||
if err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(ns.db, func(tx kvdb.RTx) error {
|
||||
// Retrieve the existing chain bucket for this nursery store.
|
||||
chainBucket := tx.ReadBucket(ns.pfxChainKey)
|
||||
if chainBucket == nil {
|
||||
@ -753,7 +753,7 @@ func (ns *nurseryStore) ListChannels() ([]wire.OutPoint, error) {
|
||||
// IsMatureChannel determines the whether or not all of the outputs in a
|
||||
// particular channel bucket have been marked as graduated.
|
||||
func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) {
|
||||
err := kvdb.View(ns.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(ns.db, func(tx kvdb.RTx) error {
|
||||
// Iterate over the contents of the channel bucket, computing
|
||||
// both total number of outputs, and those that have the grad
|
||||
// prefix.
|
||||
@ -965,7 +965,7 @@ func (ns *nurseryStore) createChannelBucket(tx kvdb.RwTx,
|
||||
// getChannelBucket retrieves an existing channel bucket from the nursery store,
|
||||
// using the given channel point. If the bucket does not exist, or any bucket
|
||||
// along its path does not exist, a nil value is returned.
|
||||
func (ns *nurseryStore) getChannelBucket(tx kvdb.ReadTx,
|
||||
func (ns *nurseryStore) getChannelBucket(tx kvdb.RTx,
|
||||
chanPoint *wire.OutPoint) kvdb.ReadBucket {
|
||||
|
||||
// Retrieve the existing chain bucket for this nursery store.
|
||||
@ -1048,7 +1048,7 @@ func (ns *nurseryStore) createHeightBucket(tx kvdb.RwTx,
|
||||
// getHeightBucketPath retrieves an existing height bucket from the nursery
|
||||
// store, using the provided block height. If the bucket does not exist, or any
|
||||
// bucket along its path does not exist, a nil value is returned.
|
||||
func (ns *nurseryStore) getHeightBucketPath(tx kvdb.ReadTx,
|
||||
func (ns *nurseryStore) getHeightBucketPath(tx kvdb.RTx,
|
||||
height uint32) (kvdb.ReadBucket, kvdb.ReadBucket, kvdb.ReadBucket) {
|
||||
|
||||
// Retrieve the existing chain bucket for this nursery store.
|
||||
@ -1102,7 +1102,7 @@ func (ns *nurseryStore) getHeightBucketPathWrite(tx kvdb.RwTx,
|
||||
// getHeightBucket retrieves an existing height bucket from the nursery store,
|
||||
// using the provided block height. If the bucket does not exist, or any bucket
|
||||
// along its path does not exist, a nil value is returned.
|
||||
func (ns *nurseryStore) getHeightBucket(tx kvdb.ReadTx,
|
||||
func (ns *nurseryStore) getHeightBucket(tx kvdb.RTx,
|
||||
height uint32) kvdb.ReadBucket {
|
||||
_, _, hghtBucket := ns.getHeightBucketPath(tx, height)
|
||||
|
||||
@ -1176,7 +1176,7 @@ func (ns *nurseryStore) getHeightChanBucketWrite(tx kvdb.RwTx,
|
||||
// enumerate crib and kindergarten outputs at a particular height. The callback
|
||||
// is invoked with serialized bytes retrieved for each output of interest,
|
||||
// allowing the caller to deserialize them into the appropriate type.
|
||||
func (ns *nurseryStore) forEachHeightPrefix(tx kvdb.ReadTx, prefix []byte,
|
||||
func (ns *nurseryStore) forEachHeightPrefix(tx kvdb.RTx, prefix []byte,
|
||||
height uint32, callback func([]byte) error) error {
|
||||
|
||||
// Start by retrieving the height bucket corresponding to the provided
|
||||
@ -1264,7 +1264,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx kvdb.ReadTx, prefix []byte,
|
||||
// provided callback. The callback accepts a key-value pair of byte slices
|
||||
// corresponding to the prefixed-output key and the serialized output,
|
||||
// respectively.
|
||||
func (ns *nurseryStore) forChanOutputs(tx kvdb.ReadTx, chanPoint *wire.OutPoint,
|
||||
func (ns *nurseryStore) forChanOutputs(tx kvdb.RTx, chanPoint *wire.OutPoint,
|
||||
callback func([]byte, []byte) error) error {
|
||||
|
||||
chanBucket := ns.getChannelBucket(tx, chanPoint)
|
||||
|
@ -26,7 +26,7 @@ type routingGraph interface {
|
||||
// database.
|
||||
type dbRoutingTx struct {
|
||||
graph *channeldb.ChannelGraph
|
||||
tx kvdb.ReadTx
|
||||
tx kvdb.RTx
|
||||
source route.Vertex
|
||||
}
|
||||
|
||||
@ -62,7 +62,7 @@ func (g *dbRoutingTx) forEachNodeChannel(nodePub route.Vertex,
|
||||
cb func(*channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy,
|
||||
*channeldb.ChannelEdgePolicy) error) error {
|
||||
|
||||
txCb := func(_ kvdb.ReadTx, info *channeldb.ChannelEdgeInfo,
|
||||
txCb := func(_ kvdb.RTx, info *channeldb.ChannelEdgeInfo,
|
||||
p1, p2 *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
return cb(info, p1, p2)
|
||||
|
@ -88,7 +88,7 @@ func (b *missionControlStore) clear() error {
|
||||
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
|
||||
var results []*paymentResult
|
||||
|
||||
err := kvdb.View(b.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
|
||||
resultBucket := tx.ReadBucket(resultsKey)
|
||||
results = make([]*paymentResult, 0)
|
||||
|
||||
|
@ -2186,7 +2186,7 @@ func (r *ChannelRouter) FetchLightningNode(node route.Vertex) (*channeldb.Lightn
|
||||
//
|
||||
// NOTE: This method is part of the ChannelGraphSource interface.
|
||||
func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) error {
|
||||
return r.cfg.Graph.ForEachNode(func(_ kvdb.ReadTx, n *channeldb.LightningNode) error {
|
||||
return r.cfg.Graph.ForEachNode(func(_ kvdb.RTx, n *channeldb.LightningNode) error {
|
||||
return cb(n)
|
||||
})
|
||||
}
|
||||
@ -2198,7 +2198,7 @@ func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) err
|
||||
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(*channeldb.ChannelEdgeInfo,
|
||||
*channeldb.ChannelEdgePolicy) error) error {
|
||||
|
||||
return r.selfNode.ForEachChannel(nil, func(_ kvdb.ReadTx, c *channeldb.ChannelEdgeInfo,
|
||||
return r.selfNode.ForEachChannel(nil, func(_ kvdb.RTx, c *channeldb.ChannelEdgeInfo,
|
||||
e, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
if e == nil {
|
||||
@ -2339,7 +2339,7 @@ func generateBandwidthHints(sourceNode *channeldb.LightningNode,
|
||||
// First, we'll collect the set of outbound edges from the target
|
||||
// source node.
|
||||
var localChans []*channeldb.ChannelEdgeInfo
|
||||
err := sourceNode.ForEachChannel(nil, func(tx kvdb.ReadTx,
|
||||
err := sourceNode.ForEachChannel(nil, func(tx kvdb.RTx,
|
||||
edgeInfo *channeldb.ChannelEdgeInfo,
|
||||
_, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
|
10
rpcserver.go
10
rpcserver.go
@ -4674,7 +4674,7 @@ func (r *rpcServer) DescribeGraph(ctx context.Context,
|
||||
// First iterate through all the known nodes (connected or unconnected
|
||||
// within the graph), collating their current state into the RPC
|
||||
// response.
|
||||
err := graph.ForEachNode(func(_ kvdb.ReadTx, node *channeldb.LightningNode) error {
|
||||
err := graph.ForEachNode(func(_ kvdb.RTx, node *channeldb.LightningNode) error {
|
||||
nodeAddrs := make([]*lnrpc.NodeAddress, 0)
|
||||
for _, addr := range node.Addresses {
|
||||
nodeAddr := &lnrpc.NodeAddress{
|
||||
@ -4891,7 +4891,7 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context,
|
||||
channels []*lnrpc.ChannelEdge
|
||||
)
|
||||
|
||||
if err := node.ForEachChannel(nil, func(_ kvdb.ReadTx,
|
||||
if err := node.ForEachChannel(nil, func(_ kvdb.RTx,
|
||||
edge *channeldb.ChannelEdgeInfo,
|
||||
c1, c2 *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
@ -4989,7 +4989,7 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context,
|
||||
// network, tallying up the total number of nodes, and also gathering
|
||||
// each node so we can measure the graph diameter and degree stats
|
||||
// below.
|
||||
if err := graph.ForEachNode(func(tx kvdb.ReadTx, node *channeldb.LightningNode) error {
|
||||
if err := graph.ForEachNode(func(tx kvdb.RTx, node *channeldb.LightningNode) error {
|
||||
// Increment the total number of nodes with each iteration.
|
||||
numNodes++
|
||||
|
||||
@ -4999,7 +4999,7 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context,
|
||||
// through the db transaction from the outer view so we can
|
||||
// re-use it within this inner view.
|
||||
var outDegree uint32
|
||||
if err := node.ForEachChannel(tx, func(_ kvdb.ReadTx,
|
||||
if err := node.ForEachChannel(tx, func(_ kvdb.RTx,
|
||||
edge *channeldb.ChannelEdgeInfo, _, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
// Bump up the out degree for this node for each
|
||||
@ -5403,7 +5403,7 @@ func (r *rpcServer) FeeReport(ctx context.Context,
|
||||
}
|
||||
|
||||
var feeReports []*lnrpc.ChannelFeeReport
|
||||
err = selfNode.ForEachChannel(nil, func(_ kvdb.ReadTx, chanInfo *channeldb.ChannelEdgeInfo,
|
||||
err = selfNode.ForEachChannel(nil, func(_ kvdb.RTx, chanInfo *channeldb.ChannelEdgeInfo,
|
||||
edgePolicy, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
// Self node should always have policies for its channels.
|
||||
|
@ -2112,7 +2112,7 @@ func (s *server) establishPersistentConnections() error {
|
||||
// each of the nodes.
|
||||
selfPub := s.identityECDH.PubKey().SerializeCompressed()
|
||||
err = sourceNode.ForEachChannel(nil, func(
|
||||
tx kvdb.ReadTx,
|
||||
tx kvdb.RTx,
|
||||
chanInfo *channeldb.ChannelEdgeInfo,
|
||||
policy, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
|
@ -201,7 +201,7 @@ func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error {
|
||||
func (s *sweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
|
||||
var sweepTx *wire.MsgTx
|
||||
|
||||
err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
|
||||
lastTxBucket := tx.ReadBucket(lastTxBucketKey)
|
||||
if lastTxBucket == nil {
|
||||
return errors.New("last tx bucket does not exist")
|
||||
@ -232,7 +232,7 @@ func (s *sweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
|
||||
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
|
||||
var ours bool
|
||||
|
||||
err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
|
||||
txHashesBucket := tx.ReadBucket(txHashesBucketKey)
|
||||
if txHashesBucket == nil {
|
||||
return errNoTxHashesBucket
|
||||
@ -253,7 +253,7 @@ func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
|
||||
func (s *sweeperStore) ListSweeps() ([]chainhash.Hash, error) {
|
||||
var sweepTxns []chainhash.Hash
|
||||
|
||||
if err := kvdb.View(s.db, func(tx kvdb.ReadTx) error {
|
||||
if err := kvdb.View(s.db, func(tx kvdb.RTx) error {
|
||||
txHashesBucket := tx.ReadBucket(txHashesBucketKey)
|
||||
if txHashesBucket == nil {
|
||||
return errNoTxHashesBucket
|
||||
|
@ -188,7 +188,7 @@ func (c *ClientDB) bdb() kvdb.Backend {
|
||||
// NOTE: Part of the versionedDB interface.
|
||||
func (c *ClientDB) Version() (uint32, error) {
|
||||
var version uint32
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
version, err = getDBVersion(tx)
|
||||
return err
|
||||
@ -383,7 +383,7 @@ func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error {
|
||||
// LoadTowerByID retrieves a tower by its tower ID.
|
||||
func (c *ClientDB) LoadTowerByID(towerID TowerID) (*Tower, error) {
|
||||
var tower *Tower
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
towers := tx.ReadBucket(cTowerBkt)
|
||||
if towers == nil {
|
||||
return ErrUninitializedDB
|
||||
@ -403,7 +403,7 @@ func (c *ClientDB) LoadTowerByID(towerID TowerID) (*Tower, error) {
|
||||
// LoadTower retrieves a tower by its public key.
|
||||
func (c *ClientDB) LoadTower(pubKey *btcec.PublicKey) (*Tower, error) {
|
||||
var tower *Tower
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
towers := tx.ReadBucket(cTowerBkt)
|
||||
if towers == nil {
|
||||
return ErrUninitializedDB
|
||||
@ -432,7 +432,7 @@ func (c *ClientDB) LoadTower(pubKey *btcec.PublicKey) (*Tower, error) {
|
||||
// ListTowers retrieves the list of towers available within the database.
|
||||
func (c *ClientDB) ListTowers() ([]*Tower, error) {
|
||||
var towers []*Tower
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
towerBucket := tx.ReadBucket(cTowerBkt)
|
||||
if towerBucket == nil {
|
||||
return ErrUninitializedDB
|
||||
@ -558,7 +558,7 @@ func (c *ClientDB) CreateClientSession(session *ClientSession) error {
|
||||
// response that do not correspond to this tower.
|
||||
func (c *ClientDB) ListClientSessions(id *TowerID) (map[SessionID]*ClientSession, error) {
|
||||
var clientSessions map[SessionID]*ClientSession
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
sessions := tx.ReadBucket(cSessionBkt)
|
||||
if sessions == nil {
|
||||
return ErrUninitializedDB
|
||||
@ -612,7 +612,7 @@ func listClientSessions(sessions kvdb.ReadBucket,
|
||||
// channel summaries.
|
||||
func (c *ClientDB) FetchChanSummaries() (ChannelSummaries, error) {
|
||||
summaries := make(map[lnwire.ChannelID]ClientChanSummary)
|
||||
err := kvdb.View(c.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
|
||||
chanSummaries := tx.ReadBucket(cChanSummaryBkt)
|
||||
if chanSummaries == nil {
|
||||
return ErrUninitializedDB
|
||||
|
@ -77,7 +77,7 @@ func createDBIfNotExist(dbPath, name string) (kvdb.Backend, bool, error) {
|
||||
// set firstInit to true so that we can treat is initialize the bucket.
|
||||
if !firstInit {
|
||||
var metadataExists bool
|
||||
err = kvdb.View(bdb, func(tx kvdb.ReadTx) error {
|
||||
err = kvdb.View(bdb, func(tx kvdb.RTx) error {
|
||||
metadataExists = tx.ReadBucket(metadataBkt) != nil
|
||||
return nil
|
||||
})
|
||||
|
@ -129,7 +129,7 @@ func (t *TowerDB) bdb() kvdb.Backend {
|
||||
// NOTE: Part of the versionedDB interface.
|
||||
func (t *TowerDB) Version() (uint32, error) {
|
||||
var version uint32
|
||||
err := kvdb.View(t.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(t.db, func(tx kvdb.RTx) error {
|
||||
var err error
|
||||
version, err = getDBVersion(tx)
|
||||
return err
|
||||
@ -150,7 +150,7 @@ func (t *TowerDB) Close() error {
|
||||
// returned if the session could not be found.
|
||||
func (t *TowerDB) GetSessionInfo(id *SessionID) (*SessionInfo, error) {
|
||||
var session *SessionInfo
|
||||
err := kvdb.View(t.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(t.db, func(tx kvdb.RTx) error {
|
||||
sessions := tx.ReadBucket(sessionsBkt)
|
||||
if sessions == nil {
|
||||
return ErrUninitializedDB
|
||||
@ -389,7 +389,7 @@ func (t *TowerDB) DeleteSession(target SessionID) error {
|
||||
// they exist in the database.
|
||||
func (t *TowerDB) QueryMatches(breachHints []blob.BreachHint) ([]Match, error) {
|
||||
var matches []Match
|
||||
err := kvdb.View(t.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(t.db, func(tx kvdb.RTx) error {
|
||||
sessions := tx.ReadBucket(sessionsBkt)
|
||||
if sessions == nil {
|
||||
return ErrUninitializedDB
|
||||
@ -485,7 +485,7 @@ func (t *TowerDB) SetLookoutTip(epoch *chainntnfs.BlockEpoch) error {
|
||||
// database.
|
||||
func (t *TowerDB) GetLookoutTip() (*chainntnfs.BlockEpoch, error) {
|
||||
var epoch *chainntnfs.BlockEpoch
|
||||
err := kvdb.View(t.db, func(tx kvdb.ReadTx) error {
|
||||
err := kvdb.View(t.db, func(tx kvdb.RTx) error {
|
||||
lookoutTip := tx.ReadBucket(lookoutTipBkt)
|
||||
if lookoutTip == nil {
|
||||
return ErrUninitializedDB
|
||||
|
@ -46,7 +46,7 @@ func getMigrations(versions []version, curVersion uint32) []version {
|
||||
|
||||
// getDBVersion retrieves the current database version from the metadata bucket
|
||||
// using the dbVersionKey.
|
||||
func getDBVersion(tx kvdb.ReadTx) (uint32, error) {
|
||||
func getDBVersion(tx kvdb.RTx) (uint32, error) {
|
||||
metadata := tx.ReadBucket(metadataBkt)
|
||||
if metadata == nil {
|
||||
return 0, ErrUninitializedDB
|
||||
|
Loading…
Reference in New Issue
Block a user