channeldb: add database migration for new node+edge update indexes
In this commit, we add a new database migration required to update old database to the version of the database that tracks the update index for the nodes and edge policies. The migration is straight forward, we simply need to populate the new indexes for the all the nodes, and then all the edges.
This commit is contained in:
parent
baed4d1f47
commit
12e73f55e9
@ -40,6 +40,13 @@ var (
|
|||||||
number: 0,
|
number: 0,
|
||||||
migration: nil,
|
migration: nil,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
// The version of the database where two new indexes
|
||||||
|
// for the update time of node and channel updates were
|
||||||
|
// added.
|
||||||
|
number: 1,
|
||||||
|
migration: migrateNodeAndEdgeUpdateIndex,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Big endian is the preferred byte order, due to cursor scans over
|
// Big endian is the preferred byte order, due to cursor scans over
|
||||||
@ -523,8 +530,9 @@ func (d *DB) FetchClosedChannel(chanID *wire.OutPoint) (*ChannelCloseSummary, er
|
|||||||
|
|
||||||
// MarkChanFullyClosed marks a channel as fully closed within the database. A
|
// MarkChanFullyClosed marks a channel as fully closed within the database. A
|
||||||
// channel should be marked as fully closed if the channel was initially
|
// channel should be marked as fully closed if the channel was initially
|
||||||
// cooperatively closed and it's reached a single confirmation, or after all the
|
// cooperatively closed and it's reached a single confirmation, or after all
|
||||||
// pending funds in a channel that has been forcibly closed have been swept.
|
// the pending funds in a channel that has been forcibly closed have been
|
||||||
|
// swept.
|
||||||
func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
|
func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
|
||||||
return d.Update(func(tx *bolt.Tx) error {
|
return d.Update(func(tx *bolt.Tx) error {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
@ -594,8 +602,9 @@ func (d *DB) syncVersions(versions []version) error {
|
|||||||
// Otherwise, we fetch the migrations which need to applied, and
|
// Otherwise, we fetch the migrations which need to applied, and
|
||||||
// execute them serially within a single database transaction to ensure
|
// execute them serially within a single database transaction to ensure
|
||||||
// the migration is atomic.
|
// the migration is atomic.
|
||||||
migrations, migrationVersions := getMigrationsToApply(versions,
|
migrations, migrationVersions := getMigrationsToApply(
|
||||||
meta.DbVersionNumber)
|
versions, meta.DbVersionNumber,
|
||||||
|
)
|
||||||
return d.Update(func(tx *bolt.Tx) error {
|
return d.Update(func(tx *bolt.Tx) error {
|
||||||
for i, migration := range migrations {
|
for i, migration := range migrations {
|
||||||
if migration == nil {
|
if migration == nil {
|
||||||
|
@ -1 +1,114 @@
|
|||||||
package channeldb
|
package channeldb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/coreos/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// migrateNodeAndEdgeUpdateIndex is a migration function that will update the
|
||||||
|
// database from version 0 to version 1. In version 1, we add two new indexes
|
||||||
|
// (one for nodes and one for edges) to keep track of the last time a node or
|
||||||
|
// edge was updated on the network. These new indexes allow us to implement the
|
||||||
|
// new graph sync protocol added.
|
||||||
|
func migrateNodeAndEdgeUpdateIndex(tx *bolt.Tx) error {
|
||||||
|
// First, we'll populating the node portion of the new index. Before we
|
||||||
|
// can add new values to the index, we'll first create the new bucket
|
||||||
|
// where these items will be housed.
|
||||||
|
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create node bucket: %v", err)
|
||||||
|
}
|
||||||
|
nodeUpdateIndex, err := nodes.CreateBucketIfNotExists(
|
||||||
|
nodeUpdateIndexBucket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create node update index: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Populating new node update index bucket")
|
||||||
|
|
||||||
|
// Now that we know the bucket has been created, we'll iterate over the
|
||||||
|
// entire node bucket so we can add the (updateTime || nodePub) key
|
||||||
|
// into the node update index.
|
||||||
|
err = nodes.ForEach(func(nodePub, nodeInfo []byte) error {
|
||||||
|
if len(nodePub) != 33 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Tracef("Adding %x to node update index", nodePub)
|
||||||
|
|
||||||
|
// The first 8 bytes of a node's serialize data is the update
|
||||||
|
// time, so we can extract that without decoding the entire
|
||||||
|
// structure.
|
||||||
|
updateTime := nodeInfo[:8]
|
||||||
|
|
||||||
|
// Now that we have the update time, we can construct the key
|
||||||
|
// to insert into the index.
|
||||||
|
var indexKey [8 + 33]byte
|
||||||
|
copy(indexKey[:8], updateTime)
|
||||||
|
copy(indexKey[8:], nodePub)
|
||||||
|
|
||||||
|
return nodeUpdateIndex.Put(indexKey[:], nil)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to update node indexes: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Populating new edge update index bucket")
|
||||||
|
|
||||||
|
// With the set of nodes updated, we'll now update all edges to have a
|
||||||
|
// corresponding entry in the edge update index.
|
||||||
|
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create edge bucket: %v", err)
|
||||||
|
}
|
||||||
|
edgeUpdateIndex, err := edges.CreateBucketIfNotExists(
|
||||||
|
edgeUpdateIndexBucket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create edge update index: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We'll now run through each edge policy in the database, and update
|
||||||
|
// the index to ensure each edge has the proper record.
|
||||||
|
err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
|
||||||
|
if len(edgeKey) != 41 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we know this is the proper record, we'll grab the
|
||||||
|
// channel ID (last 8 bytes of the key), and then decode the
|
||||||
|
// edge policy so we can access the update time.
|
||||||
|
chanID := edgeKey[33:]
|
||||||
|
edgePolicyReader := bytes.NewReader(edgePolicyBytes)
|
||||||
|
|
||||||
|
edgePolicy, err := deserializeChanEdgePolicy(
|
||||||
|
edgePolicyReader, nodes,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Tracef("Adding chan_id=%v to edge update index",
|
||||||
|
edgePolicy.ChannelID)
|
||||||
|
|
||||||
|
// We'll now construct the index key using the channel ID, and
|
||||||
|
// the last time it was updated: (updateTime || chanID).
|
||||||
|
var indexKey [8 + 8]byte
|
||||||
|
byteOrder.PutUint64(
|
||||||
|
indexKey[:], uint64(edgePolicy.LastUpdate.Unix()),
|
||||||
|
)
|
||||||
|
copy(indexKey[8:], chanID)
|
||||||
|
|
||||||
|
return edgeUpdateIndex.Put(indexKey[:], nil)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to update edge indexes: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migration to node and edge update indexes complete!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user