channeldb: MigrateOutpointIndex, store indexStatus in outpoint index
Adds an outpoint index that stores a tlv stream. Currently the stream only contains the outpoint's indexStatus. This should cut down on big bbolt transactions in several places throughout the codebase.
This commit is contained in:
parent
08ee754a6d
commit
204b6c51cf
@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/keychain"
|
"github.com/lightningnetwork/lnd/keychain"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/shachain"
|
"github.com/lightningnetwork/lnd/shachain"
|
||||||
|
"github.com/lightningnetwork/lnd/tlv"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -45,6 +46,13 @@ var (
|
|||||||
// TODO(roasbeef): flesh out comment
|
// TODO(roasbeef): flesh out comment
|
||||||
openChannelBucket = []byte("open-chan-bucket")
|
openChannelBucket = []byte("open-chan-bucket")
|
||||||
|
|
||||||
|
// outpointBucket stores all of our channel outpoints and a tlv
|
||||||
|
// stream containing channel data.
|
||||||
|
//
|
||||||
|
// outpoint -> tlv stream
|
||||||
|
//
|
||||||
|
outpointBucket = []byte("outpoint-bucket")
|
||||||
|
|
||||||
// historicalChannelBucket stores all channels that have seen their
|
// historicalChannelBucket stores all channels that have seen their
|
||||||
// commitment tx confirm. All information from their previous open state
|
// commitment tx confirm. All information from their previous open state
|
||||||
// is retained.
|
// is retained.
|
||||||
@ -167,12 +175,35 @@ var (
|
|||||||
// the height requested in the revocation log.
|
// the height requested in the revocation log.
|
||||||
ErrLogEntryNotFound = fmt.Errorf("log entry not found")
|
ErrLogEntryNotFound = fmt.Errorf("log entry not found")
|
||||||
|
|
||||||
|
// ErrMissingIndexEntry is returned when a caller attempts to close a
|
||||||
|
// channel and the outpoint is missing from the index.
|
||||||
|
ErrMissingIndexEntry = fmt.Errorf("missing outpoint from index")
|
||||||
|
|
||||||
// errHeightNotFound is returned when a query for channel balances at
|
// errHeightNotFound is returned when a query for channel balances at
|
||||||
// a height that we have not reached yet is made.
|
// a height that we have not reached yet is made.
|
||||||
errHeightNotReached = fmt.Errorf("height requested greater than " +
|
errHeightNotReached = fmt.Errorf("height requested greater than " +
|
||||||
"current commit height")
|
"current commit height")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// A tlv type definition used to serialize an outpoint's indexStatus
|
||||||
|
// for use in the outpoint index.
|
||||||
|
indexStatusType tlv.Type = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
// indexStatus is an enum-like type that describes what state the
|
||||||
|
// outpoint is in. Currently only two possible values.
|
||||||
|
type indexStatus uint8
|
||||||
|
|
||||||
|
const (
|
||||||
|
// outpointOpen represents an outpoint that is open in the outpoint index.
|
||||||
|
outpointOpen indexStatus = 0
|
||||||
|
|
||||||
|
// outpointClosed represents an outpoint that is closed in the outpoint
|
||||||
|
// index.
|
||||||
|
outpointClosed indexStatus = 1
|
||||||
|
)
|
||||||
|
|
||||||
// ChannelType is an enum-like type that describes one of several possible
|
// ChannelType is an enum-like type that describes one of several possible
|
||||||
// channel types. Each open channel is associated with a particular type as the
|
// channel types. Each open channel is associated with a particular type as the
|
||||||
// channel type may determine how higher level operations are conducted such as
|
// channel type may determine how higher level operations are conducted such as
|
||||||
@ -827,6 +858,39 @@ func fetchChanBucketRw(tx kvdb.RwTx, nodeKey *btcec.PublicKey, // nolint:interfa
|
|||||||
// fullSync syncs the contents of an OpenChannel while re-using an existing
|
// fullSync syncs the contents of an OpenChannel while re-using an existing
|
||||||
// database transaction.
|
// database transaction.
|
||||||
func (c *OpenChannel) fullSync(tx kvdb.RwTx) error {
|
func (c *OpenChannel) fullSync(tx kvdb.RwTx) error {
|
||||||
|
// Fetch the outpoint bucket and check if the outpoint already exists.
|
||||||
|
opBucket := tx.ReadWriteBucket(outpointBucket)
|
||||||
|
|
||||||
|
var chanPointBuf bytes.Buffer
|
||||||
|
if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, check if the outpoint exists in our index.
|
||||||
|
if opBucket.Get(chanPointBuf.Bytes()) != nil {
|
||||||
|
return ErrChanAlreadyExists
|
||||||
|
}
|
||||||
|
|
||||||
|
status := uint8(outpointOpen)
|
||||||
|
|
||||||
|
// Write the status of this outpoint as the first entry in a tlv
|
||||||
|
// stream.
|
||||||
|
statusRecord := tlv.MakePrimitiveRecord(indexStatusType, &status)
|
||||||
|
opStream, err := tlv.NewStream(statusRecord)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var b bytes.Buffer
|
||||||
|
if err := opStream.Encode(&b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the outpoint to our outpoint index with the tlv stream.
|
||||||
|
if err := opBucket.Put(chanPointBuf.Bytes(), b.Bytes()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// First fetch the top level bucket which stores all data related to
|
// First fetch the top level bucket which stores all data related to
|
||||||
// current, active channels.
|
// current, active channels.
|
||||||
openChanBucket, err := tx.CreateTopLevelBucket(openChannelBucket)
|
openChanBucket, err := tx.CreateTopLevelBucket(openChannelBucket)
|
||||||
@ -851,10 +915,6 @@ func (c *OpenChannel) fullSync(tx kvdb.RwTx) error {
|
|||||||
|
|
||||||
// With the bucket for the node fetched, we can now go down another
|
// With the bucket for the node fetched, we can now go down another
|
||||||
// level, creating the bucket for this channel itself.
|
// level, creating the bucket for this channel itself.
|
||||||
var chanPointBuf bytes.Buffer
|
|
||||||
if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
chanBucket, err := chainBucket.CreateBucket(
|
chanBucket, err := chainBucket.CreateBucket(
|
||||||
chanPointBuf.Bytes(),
|
chanPointBuf.Bytes(),
|
||||||
)
|
)
|
||||||
@ -1258,7 +1318,7 @@ func (c *OpenChannel) clearChanStatus(status ChannelStatus) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// putChannel serializes, and stores the current state of the channel in its
|
// putOpenChannel serializes, and stores the current state of the channel in its
|
||||||
// entirety.
|
// entirety.
|
||||||
func putOpenChannel(chanBucket kvdb.RwBucket, channel *OpenChannel) error {
|
func putOpenChannel(chanBucket kvdb.RwBucket, channel *OpenChannel) error {
|
||||||
// First, we'll write out all the relatively static fields, that are
|
// First, we'll write out all the relatively static fields, that are
|
||||||
@ -2772,6 +2832,36 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fetch the outpoint bucket to see if the outpoint exists or
|
||||||
|
// not.
|
||||||
|
opBucket := tx.ReadWriteBucket(outpointBucket)
|
||||||
|
|
||||||
|
// Add the closed outpoint to our outpoint index. This should
|
||||||
|
// replace an open outpoint in the index.
|
||||||
|
if opBucket.Get(chanPointBuf.Bytes()) == nil {
|
||||||
|
return ErrMissingIndexEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
status := uint8(outpointClosed)
|
||||||
|
|
||||||
|
// Write the IndexStatus of this outpoint as the first entry in a tlv
|
||||||
|
// stream.
|
||||||
|
statusRecord := tlv.MakePrimitiveRecord(indexStatusType, &status)
|
||||||
|
opStream, err := tlv.NewStream(statusRecord)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var b bytes.Buffer
|
||||||
|
if err := opStream.Encode(&b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally add the closed outpoint and tlv stream to the index.
|
||||||
|
if err := opBucket.Put(chanPointBuf.Bytes(), b.Bytes()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Add channel state to the historical channel bucket.
|
// Add channel state to the historical channel bucket.
|
||||||
historicalBucket, err := tx.CreateTopLevelBucket(
|
historicalBucket, err := tx.CreateTopLevelBucket(
|
||||||
historicalChannelBucket,
|
historicalChannelBucket,
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/channeldb/migration12"
|
"github.com/lightningnetwork/lnd/channeldb/migration12"
|
||||||
"github.com/lightningnetwork/lnd/channeldb/migration13"
|
"github.com/lightningnetwork/lnd/channeldb/migration13"
|
||||||
"github.com/lightningnetwork/lnd/channeldb/migration16"
|
"github.com/lightningnetwork/lnd/channeldb/migration16"
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb/migration20"
|
||||||
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
|
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
|
||||||
"github.com/lightningnetwork/lnd/clock"
|
"github.com/lightningnetwork/lnd/clock"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
@ -170,6 +171,17 @@ var (
|
|||||||
number: 18,
|
number: 18,
|
||||||
migration: mig.CreateTLB(peersBucket),
|
migration: mig.CreateTLB(peersBucket),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
// Create a top level bucket which holds outpoint
|
||||||
|
// information.
|
||||||
|
number: 19,
|
||||||
|
migration: mig.CreateTLB(outpointBucket),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Migrate some data to the outpoint index.
|
||||||
|
number: 20,
|
||||||
|
migration: migration20.MigrateOutpointIndex,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Big endian is the preferred byte order, due to cursor scans over
|
// Big endian is the preferred byte order, due to cursor scans over
|
||||||
@ -309,6 +321,7 @@ var topLevelBuckets = [][]byte{
|
|||||||
graphMetaBucket,
|
graphMetaBucket,
|
||||||
metaBucket,
|
metaBucket,
|
||||||
closeSummaryBucket,
|
closeSummaryBucket,
|
||||||
|
outpointBucket,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wipe completely deletes all saved state within all used buckets within the
|
// Wipe completely deletes all saved state within all used buckets within the
|
||||||
|
36
channeldb/migration20/codec.go
Normal file
36
channeldb/migration20/codec.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
package migration20
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/wire"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
byteOrder = binary.BigEndian
|
||||||
|
)
|
||||||
|
|
||||||
|
// writeOutpoint writes an outpoint from the passed writer.
|
||||||
|
func writeOutpoint(w io.Writer, o *wire.OutPoint) error {
|
||||||
|
if _, err := w.Write(o.Hash[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := binary.Write(w, byteOrder, o.Index); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// readOutpoint reads an outpoint from the passed reader.
|
||||||
|
func readOutpoint(r io.Reader, o *wire.OutPoint) error {
|
||||||
|
if _, err := io.ReadFull(r, o.Hash[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := binary.Read(r, byteOrder, &o.Index); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
14
channeldb/migration20/log.go
Normal file
14
channeldb/migration20/log.go
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
package migration20
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/btcsuite/btclog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// log is a logger that is initialized as disabled. This means the package
|
||||||
|
// will not perform any logging by default until a logger is set.
|
||||||
|
var log = btclog.Disabled
|
||||||
|
|
||||||
|
// UseLogger uses a specified Logger to output package logging info.
|
||||||
|
func UseLogger(logger btclog.Logger) {
|
||||||
|
log = logger
|
||||||
|
}
|
196
channeldb/migration20/migration.go
Normal file
196
channeldb/migration20/migration.go
Normal file
@ -0,0 +1,196 @@
|
|||||||
|
package migration20
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/wire"
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||||
|
"github.com/lightningnetwork/lnd/tlv"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// openChanBucket stores all the open channel information.
|
||||||
|
openChanBucket = []byte("open-chan-bucket")
|
||||||
|
|
||||||
|
// closedChannelBucket stores all the closed channel information.
|
||||||
|
closedChannelBucket = []byte("closed-chan-bucket")
|
||||||
|
|
||||||
|
// outpointBucket is an index mapping outpoints to a tlv
|
||||||
|
// stream of channel data.
|
||||||
|
outpointBucket = []byte("outpoint-bucket")
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// A tlv type definition used to serialize an outpoint's indexStatus for
|
||||||
|
// use in the outpoint index.
|
||||||
|
indexStatusType tlv.Type = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
// indexStatus is an enum-like type that describes what state the
|
||||||
|
// outpoint is in. Currently only two possible values.
|
||||||
|
type indexStatus uint8
|
||||||
|
|
||||||
|
const (
|
||||||
|
// outpointOpen represents an outpoint that is open in the outpoint index.
|
||||||
|
outpointOpen indexStatus = 0
|
||||||
|
|
||||||
|
// outpointClosed represents an outpoint that is closed in the outpoint
|
||||||
|
// index.
|
||||||
|
outpointClosed indexStatus = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
// MigrateOutpointIndex populates the outpoint index with outpoints that
|
||||||
|
// the node already has. This takes every outpoint in the open channel
|
||||||
|
// bucket and every outpoint in the closed channel bucket and stores them
|
||||||
|
// in this index.
|
||||||
|
func MigrateOutpointIndex(tx kvdb.RwTx) error {
|
||||||
|
log.Info("Migrating to the outpoint index")
|
||||||
|
|
||||||
|
// First get the set of open outpoints.
|
||||||
|
openList, err := getOpenOutpoints(tx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then get the set of closed outpoints.
|
||||||
|
closedList, err := getClosedOutpoints(tx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the outpoint bucket which was created in migration 19.
|
||||||
|
bucket := tx.ReadWriteBucket(outpointBucket)
|
||||||
|
|
||||||
|
// Store the set of open outpoints in the outpoint bucket.
|
||||||
|
if err := putOutpoints(bucket, openList, false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the set of closed outpoints in the outpoint bucket.
|
||||||
|
return putOutpoints(bucket, closedList, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getOpenOutpoints traverses through the openChanBucket and returns the
|
||||||
|
// list of these channels' outpoints.
|
||||||
|
func getOpenOutpoints(tx kvdb.RwTx) ([]*wire.OutPoint, error) {
|
||||||
|
var ops []*wire.OutPoint
|
||||||
|
|
||||||
|
openBucket := tx.ReadBucket(openChanBucket)
|
||||||
|
if openBucket == nil {
|
||||||
|
return ops, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate through every node and chain bucket to get every open
|
||||||
|
// outpoint.
|
||||||
|
//
|
||||||
|
// The bucket tree:
|
||||||
|
// openChanBucket -> nodePub -> chainHash -> chanPoint
|
||||||
|
err := openBucket.ForEach(func(k, v []byte) error {
|
||||||
|
// Ensure that the key is the same size as a pubkey and the
|
||||||
|
// value is nil.
|
||||||
|
if len(k) != 33 || v != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeBucket := openBucket.NestedReadBucket(k)
|
||||||
|
if nodeBucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nodeBucket.ForEach(func(k, v []byte) error {
|
||||||
|
// If there's a value it's not a bucket.
|
||||||
|
if v != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
chainBucket := nodeBucket.NestedReadBucket(k)
|
||||||
|
if chainBucket == nil {
|
||||||
|
return fmt.Errorf("unable to read "+
|
||||||
|
"bucket for chain: %x", k)
|
||||||
|
}
|
||||||
|
|
||||||
|
return chainBucket.ForEach(func(k, v []byte) error {
|
||||||
|
// If there's a value it's not a bucket.
|
||||||
|
if v != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var op wire.OutPoint
|
||||||
|
r := bytes.NewReader(k)
|
||||||
|
if err := readOutpoint(r, &op); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ops = append(ops, &op)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return ops, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getClosedOutpoints traverses through the closedChanBucket and returns
|
||||||
|
// a list of closed outpoints.
|
||||||
|
func getClosedOutpoints(tx kvdb.RwTx) ([]*wire.OutPoint, error) {
|
||||||
|
var ops []*wire.OutPoint
|
||||||
|
closedBucket := tx.ReadBucket(closedChannelBucket)
|
||||||
|
if closedBucket == nil {
|
||||||
|
return ops, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate through every key-value pair to gather all outpoints.
|
||||||
|
err := closedBucket.ForEach(func(k, v []byte) error {
|
||||||
|
var op wire.OutPoint
|
||||||
|
r := bytes.NewReader(k)
|
||||||
|
if err := readOutpoint(r, &op); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ops = append(ops, &op)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ops, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// putOutpoints puts the set of outpoints into the outpoint bucket.
|
||||||
|
func putOutpoints(bucket kvdb.RwBucket, ops []*wire.OutPoint, isClosed bool) error {
|
||||||
|
status := uint8(outpointOpen)
|
||||||
|
if isClosed {
|
||||||
|
status = uint8(outpointClosed)
|
||||||
|
}
|
||||||
|
|
||||||
|
record := tlv.MakePrimitiveRecord(indexStatusType, &status)
|
||||||
|
stream, err := tlv.NewStream(record)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var b bytes.Buffer
|
||||||
|
if err := stream.Encode(&b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the set of outpoints with the encoded tlv stream.
|
||||||
|
for _, op := range ops {
|
||||||
|
var opBuf bytes.Buffer
|
||||||
|
if err := writeOutpoint(&opBuf, op); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Put(opBuf.Bytes(), b.Bytes()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user