channeldb: add gossiper message store key migration
In this commit, we introduce a migration for the message store sub-bucket that will migrate all keys within it to a new key format. This new key format is composed of the peer's public key, followed by the short channel ID, followed by the message type. This migration is needed in order to provide backwards-compatibility with messages that were previously stored before the introduction of the new key format.
This commit is contained in:
parent
847b064461
commit
9febc9cc04
@ -90,6 +90,13 @@ var (
|
||||
number: 7,
|
||||
migration: migrateOptionalChannelCloseSummaryFields,
|
||||
},
|
||||
{
|
||||
// The DB version that changes the gossiper's message
|
||||
// store keys to account for the message's type and
|
||||
// ShortChannelID.
|
||||
number: 8,
|
||||
migration: migrateGossipMessageStoreKeys,
|
||||
},
|
||||
}
|
||||
|
||||
// Big endian is the preferred byte order, due to cursor scans over
|
||||
|
@ -50,7 +50,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB),
|
||||
if err == nil && shouldFail {
|
||||
t.Fatal("error wasn't received on migration stage")
|
||||
} else if err != nil && !shouldFail {
|
||||
t.Fatal("error was received on migration stage")
|
||||
t.Fatalf("error was received on migration stage: %v", err)
|
||||
}
|
||||
|
||||
// afterMigration usually used for checking the database state and
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
// migrateNodeAndEdgeUpdateIndex is a migration function that will update the
|
||||
@ -610,3 +611,75 @@ func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var messageStoreBucket = []byte("message-store")
|
||||
|
||||
// migrateGossipMessageStoreKeys migrates the key format for gossip messages
|
||||
// found in the message store to a new one that takes into consideration the of
|
||||
// the message being stored.
|
||||
func migrateGossipMessageStoreKeys(tx *bbolt.Tx) error {
|
||||
// We'll start by retrieving the bucket in which these messages are
|
||||
// stored within. If there isn't one, there's nothing left for us to do
|
||||
// so we can avoid the migration.
|
||||
messageStore := tx.Bucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info("Migrating to the gossip message store new key format")
|
||||
|
||||
// Otherwise we'll proceed with the migration. We'll start by coalescing
|
||||
// all the current messages within the store, which are indexed by the
|
||||
// public key of the peer which they should be sent to, followed by the
|
||||
// short channel ID of the channel for which the message belongs to. We
|
||||
// should only expect to find channel announcement signatures as that
|
||||
// was the only support message type previously.
|
||||
msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures)
|
||||
err := messageStore.ForEach(func(k, v []byte) error {
|
||||
var msgKey [33 + 8]byte
|
||||
copy(msgKey[:], k)
|
||||
|
||||
msg := &lnwire.AnnounceSignatures{}
|
||||
if err := msg.Decode(bytes.NewReader(v), 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgs[msgKey] = msg
|
||||
|
||||
return nil
|
||||
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Then, we'll go over all of our messages, remove their previous entry,
|
||||
// and add another with the new key format. Once we've done this for
|
||||
// every message, we can consider the migration complete.
|
||||
for oldMsgKey, msg := range msgs {
|
||||
if err := messageStore.Delete(oldMsgKey[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Construct the new key for which we'll find this message with
|
||||
// in the store. It'll be the same as the old, but we'll also
|
||||
// include the message type.
|
||||
var msgType [2]byte
|
||||
binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType()))
|
||||
newMsgKey := append(oldMsgKey[:], msgType[:]...)
|
||||
|
||||
// Serialize the message with its wire encoding.
|
||||
var b bytes.Buffer
|
||||
if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Migration to the gossip message store new key format complete!")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
// TestPaymentStatusesMigration checks that already completed payments will have
|
||||
@ -468,3 +469,98 @@ func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) {
|
||||
false)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMigrateGossipMessageStoreKeys ensures that the migration to the new
|
||||
// gossip message store key format is successful/unsuccessful under various
|
||||
// scenarios.
|
||||
func TestMigrateGossipMessageStoreKeys(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Construct the message which we'll use to test the migration, along
|
||||
// with its old and new key formats.
|
||||
shortChanID := lnwire.ShortChannelID{BlockHeight: 10}
|
||||
msg := &lnwire.AnnounceSignatures{ShortChannelID: shortChanID}
|
||||
|
||||
var oldMsgKey [33 + 8]byte
|
||||
copy(oldMsgKey[:33], pubKey.SerializeCompressed())
|
||||
binary.BigEndian.PutUint64(oldMsgKey[33:41], shortChanID.ToUint64())
|
||||
|
||||
var newMsgKey [33 + 8 + 2]byte
|
||||
copy(newMsgKey[:41], oldMsgKey[:])
|
||||
binary.BigEndian.PutUint16(newMsgKey[41:43], uint16(msg.MsgType()))
|
||||
|
||||
// Before the migration, we'll create the bucket where the messages
|
||||
// should live and insert them.
|
||||
beforeMigration := func(db *DB) {
|
||||
var b bytes.Buffer
|
||||
if err := msg.Encode(&b, 0); err != nil {
|
||||
t.Fatalf("unable to serialize message: %v", err)
|
||||
}
|
||||
|
||||
err := db.Update(func(tx *bbolt.Tx) error {
|
||||
messageStore, err := tx.CreateBucketIfNotExists(
|
||||
messageStoreBucket,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return messageStore.Put(oldMsgKey[:], b.Bytes())
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// After the migration, we'll make sure that:
|
||||
// 1. We cannot find the message under its old key.
|
||||
// 2. We can find the message under its new key.
|
||||
// 3. The message matches the original.
|
||||
afterMigration := func(db *DB) {
|
||||
meta, err := db.FetchMeta(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to fetch db version: %v", err)
|
||||
}
|
||||
if meta.DbVersionNumber != 1 {
|
||||
t.Fatalf("migration should have succeeded but didn't")
|
||||
}
|
||||
|
||||
var rawMsg []byte
|
||||
err = db.View(func(tx *bbolt.Tx) error {
|
||||
messageStore := tx.Bucket(messageStoreBucket)
|
||||
if messageStore == nil {
|
||||
return errors.New("message store bucket not " +
|
||||
"found")
|
||||
}
|
||||
rawMsg = messageStore.Get(oldMsgKey[:])
|
||||
if rawMsg != nil {
|
||||
t.Fatal("expected to not find message under " +
|
||||
"old key, but did")
|
||||
}
|
||||
rawMsg = messageStore.Get(newMsgKey[:])
|
||||
if rawMsg == nil {
|
||||
return fmt.Errorf("expected to find message " +
|
||||
"under new key, but didn't")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
gotMsg, err := lnwire.ReadMessage(bytes.NewReader(rawMsg), 0)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to deserialize raw message: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(msg, gotMsg) {
|
||||
t.Fatalf("expected message: %v\ngot message: %v",
|
||||
spew.Sdump(msg), spew.Sdump(gotMsg))
|
||||
}
|
||||
}
|
||||
|
||||
applyMigration(
|
||||
t, beforeMigration, afterMigration,
|
||||
migrateGossipMessageStoreKeys, false,
|
||||
)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user