diff --git a/channeldb/channel.go b/channeldb/channel.go index b2f4da2b..ea5d91b7 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -21,11 +21,11 @@ var ( // at the base level of this bucket several prefixed keys are stored which // house channel meta-data such as total satoshis sent, number of updates // etc. These fields are stored at this top level rather than within a - // node's channel bucket in orer to facilitate sequential prefix scans + // node's channel bucket in order to facilitate sequential prefix scans // to gather stats such as total satoshis received. openChannelBucket = []byte("ocb") - // chanIDBucket is a thrid-level bucket stored within a node's ID bucket + // chanIDBucket is a third-level bucket stored within a node's ID bucket // in the open channel bucket. The resolution path looks something like: // ocb -> nodeID -> cib. This bucket contains a series of keys with no // values, these keys are the channel ID's of all the active channels diff --git a/channeldb/db.go b/channeldb/db.go index cd97be22..fa93bc19 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -15,10 +15,32 @@ import ( ) const ( - dbName = "channel.db" + dbName = "channel.db" + dbFilePermission = 0600 ) +// Migration is a function which takes a prior outdated version of the database +// instances and mutates the key/bucket structure to arrive at a more up-to-date +// version of the database. +type migration func(tx *bolt.Tx) error + +type version struct { + number uint32 + migration migration +} + var ( + // DBVersions is storing all versions of database. If current version of + // database don't match with latest version this list will be used for + // retrieving all migration function that are need to apply to the + // current db. + DBVersions = []version{ + { + number: 1, + migration: nil, // The base DB version requires no migration + }, + } + // Big endian is the preferred byte order, due to cursor scans over integer // keys iterating in order. byteOrder = binary.BigEndian @@ -32,9 +54,9 @@ var bufPool = &sync.Pool{ // information related to nodes, routing data, open/closed channels, fee // schedules, and reputation data. type DB struct { - store *bolt.DB - + store *bolt.DB netParams *chaincfg.Params + dbPath string } // Open opens an existing channeldb created under the passed namespace with @@ -49,12 +71,16 @@ func Open(dbPath string, netParams *chaincfg.Params) (*DB, error) { } } - bdb, err := bolt.Open(path, 0600, nil) + bdb, err := bolt.Open(path, dbFilePermission, nil) if err != nil { return nil, err } - return &DB{store: bdb, netParams: netParams}, nil + return &DB{ + store: bdb, + netParams: netParams, + dbPath: dbPath, + }, nil } // Wipe completely deletes all saved state within all used buckets within the @@ -103,7 +129,7 @@ func createChannelDB(dbPath string) error { } path := filepath.Join(dbPath, dbName) - bdb, err := bolt.Open(path, 0600, nil) + bdb, err := bolt.Open(path, dbFilePermission, nil) if err != nil { return err } @@ -125,6 +151,10 @@ func createChannelDB(dbPath string) error { return err } + if _, err := tx.CreateBucket(metaBucket); err != nil { + return err + } + return nil }) if err != nil { @@ -268,3 +298,59 @@ func (d *DB) FetchAllChannels() ([]*OpenChannel, error) { return channels, err } + +// SyncVersions function is used for safe db version synchronization. It applies +// migration functions to the current database and recovers the previous +// state of db if at least one error/panic appeared during migration. +func (d *DB) SyncVersions(versions []version) error { + meta, err := d.FetchMeta(nil) + if err != nil { + return err + } + + latestVersion := getLatestDBVersion(versions) + + if meta.dbVersionNumber < latestVersion { + migrations := getMigrationsToApply(versions, meta.dbVersionNumber) + + return d.store.Update(func(tx *bolt.Tx) error { + for _, migration := range migrations { + if migration == nil { + continue + } + + if err := migration(tx); err != nil { + return err + } + } + + meta.dbVersionNumber = latestVersion + if err := d.PutMeta(meta, tx); err != nil { + return err + } + + return nil + }) + + } + + return nil +} + +func getLatestDBVersion(versions []version) uint32 { + return versions[len(versions)-1].number +} + +// getMigrationsToApply retrieves the migration function that should be +// applied to the database. +func getMigrationsToApply(versions []version, version uint32) []migration { + migrations := make([]migration, 0, len(versions)) + + for _, v := range versions { + if v.number > version { + migrations = append(migrations, v.migration) + } + } + + return migrations +} diff --git a/channeldb/error.go b/channeldb/error.go index a913f454..96393e49 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -14,4 +14,5 @@ var ( ErrDuplicateInvoice = fmt.Errorf("invoice with payment hash already exists") ErrNodeNotFound = fmt.Errorf("link node with target identity not found") + ErrMetaNotFound = fmt.Errorf("unable to locate meta information") ) diff --git a/channeldb/meta.go b/channeldb/meta.go new file mode 100644 index 00000000..369e21b4 --- /dev/null +++ b/channeldb/meta.go @@ -0,0 +1,90 @@ +package channeldb + +import ( + "github.com/boltdb/bolt" +) + +var ( + // metaBucket stores all the meta information concerning the state of + // the database. + metaBucket = []byte("metadata") + + // dbVersionKey is a boltdb key and it's used for storing/retrieveing + // current database version. + dbVersionKey = []byte("dbp") +) + +// Meta structure holds the database meta information. +type Meta struct { + dbVersionNumber uint32 +} + +// FetchMeta fetches the meta data from boltdb and returns filled meta +// structure. If transaction object is specified then it will be used rather +// than initiation creation of new one. +func (d *DB) FetchMeta(tx *bolt.Tx) (*Meta, error) { + meta := &Meta{} + fetchMeta := func(tx *bolt.Tx) error { + if metaBucket := tx.Bucket(metaBucket); metaBucket != nil { + fetchDbVersion(metaBucket, meta) + return nil + } else { + return ErrMetaNotFound + } + } + + var err error + + if tx == nil { + err = d.store.View(fetchMeta) + } else { + err = fetchMeta(tx) + } + + if err != nil { + return nil, err + } + + return meta, nil +} + +// PutMeta gets as input meta structure and put it into boltdb. If transaction +// object is specified then it will be used rather than initiation creation of +// new one. +func (d *DB) PutMeta(meta *Meta, tx *bolt.Tx) error { + putMeta := func(tx *bolt.Tx) error { + metaBucket := tx.Bucket(metaBucket) + if metaBucket == nil { + return ErrMetaNotFound + } + + if err := putDbVersion(metaBucket, meta); err != nil { + return err + } + + return nil + } + + if tx == nil { + return d.store.Update(putMeta) + } else { + return putMeta(tx) + } +} + +func putDbVersion(metaBucket *bolt.Bucket, meta *Meta) error { + scratch := make([]byte, 4) + byteOrder.PutUint32(scratch, meta.dbVersionNumber) + if err := metaBucket.Put(dbVersionKey, scratch); err != nil { + return err + } + return nil +} + +func fetchDbVersion(metaBucket *bolt.Bucket, meta *Meta) { + if data := metaBucket.Get(dbVersionKey); data != nil { + meta.dbVersionNumber = byteOrder.Uint32(data) + } else { + meta.dbVersionNumber = getLatestDBVersion(DBVersions) + } +} diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go new file mode 100644 index 00000000..ccf0bae5 --- /dev/null +++ b/channeldb/meta_test.go @@ -0,0 +1,367 @@ +package channeldb + +import ( + "bytes" + "github.com/boltdb/bolt" + "github.com/go-errors/errors" + "testing" +) + +// TestVersionFetchPut checks the propernces of fetch/put methods +// and also initialization of meta data in case if don't have any in +// database. +func TestVersionFetchPut(t *testing.T) { + db, cleanUp, err := makeTestDB() + if err != nil { + t.Fatal(err) + } + defer cleanUp() + + meta, err := db.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.dbVersionNumber != getLatestDBVersion(DBVersions) { + t.Fatal("initialization of meta information wasn't performed") + } + + var newVersion uint32 = getLatestDBVersion(DBVersions) + 1 + meta.dbVersionNumber = newVersion + + if err := db.PutMeta(meta, nil); err != nil { + t.Fatalf("update of meta failed %v", err) + } + + meta, err = db.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.dbVersionNumber != newVersion { + t.Fatal("update of meta information wasn't performed") + } +} + +// TestOrderOfMigrations checks that migrations are applied in proper order. +func TestOrderOfMigrations(t *testing.T) { + appliedMigration := -1 + versions := []version{ + {0, nil}, + {1, nil}, + {2, func(tx *bolt.Tx) error { + appliedMigration = 2 + return nil + }}, + {3, func(tx *bolt.Tx) error { + appliedMigration = 3 + return nil + }}, + } + + // Retrieve the migration that should be applied to db, as far as + // current version is 1, we skip zero and first versions. + migrations := getMigrationsToApply(versions, 1) + + if len(migrations) != 2 { + t.Fatal("incorrect number of migrations to apply") + } + + // Apply first migration. + migrations[0](nil) + + // Check that first migration corresponds to the second version. + if appliedMigration != 2 { + t.Fatal("incorrect order of applying migrations") + } + + // Apply second migration. + migrations[1](nil) + + // Check that second migration corresponds to the third version. + if appliedMigration != 3 { + t.Fatal("incorrect order of applying migrations") + } +} + +// TestGlobalVersionList checks that there is no mistake in global version list +// in terms of version ordering. +func TestGlobalVersionList(t *testing.T) { + if DBVersions == nil { + t.Fatal("can't find versions list") + } + + if len(DBVersions) == 0 { + t.Fatal("db versions list is empty") + } + + prev := DBVersions[0].number + for i := 1; i < len(DBVersions); i++ { + version := DBVersions[i].number + + if version == prev { + t.Fatal("duplicates db versions") + } + if version < prev { + t.Fatal("order of db versions is wrong") + } + + prev = version + } +} + +// applyMigration is a helper test function that encapsulates the general steps +// which are needed to properly check the result of applying migration function. +func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), + migrationFunc migration, shouldFail bool) { + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatal(err) + } + defer cleanUp() + + // beforeMigration usually used for populating the database + // with test data. + beforeMigration(cdb) + + // Create test meta info with zero database version and put it on disk. + // Than creating the version list pretending that new version was added. + meta := &Meta{dbVersionNumber: 0} + cdb.PutMeta(meta, nil) + + versions := []version{ + { + number: 0, + migration: nil, + }, + { + number: 1, + migration: migrationFunc, + }, + } + + defer func() { + if r := recover(); r != nil { + err = errors.New(r) + } + + if err == nil && shouldFail { + t.Fatal("error wasn't received on migration stage") + } else if err != nil && !shouldFail { + t.Fatal("error was received on migration stage") + } + + // afterMigration usually used for checking the database state and + // throwing the error if something went wrong. + afterMigration(cdb) + }() + + // Sync with the latest version - applying migration function. + err = cdb.SyncVersions(versions) +} + +func TestMigrationWithPanic(t *testing.T) { + bucketPrefix := []byte("somebucket") + keyPrefix := []byte("someprefix") + beforeMigration := []byte("beforemigration") + afterMigration := []byte("aftermigration") + + beforeMigrationFunc := func(d *DB) { + // Insert data in database and in order then make sure that the + // key isn't changes in case of panic or fail. + d.store.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketPrefix) + if err != nil { + return err + } + + bucket.Put(keyPrefix, beforeMigration) + return nil + }) + } + + // Create migration function which changes the initialy created data and + // throw the panic, in this case we pretending that something goes. + migrationWithPanic := func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketPrefix) + if err != nil { + return err + } + + bucket.Put(keyPrefix, afterMigration) + panic("panic!") + } + + // Check that version of database and data wasn't changed. + afterMigrationFunc := func(d *DB) { + meta, err := d.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.dbVersionNumber != 0 { + t.Fatal("migration paniced but version is changed") + } + + err = d.store.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketPrefix) + if err != nil { + return err + } + + value := bucket.Get(keyPrefix) + if !bytes.Equal(value, beforeMigration) { + return errors.New("migration failed but data is " + + "changed") + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + } + + applyMigration(t, + beforeMigrationFunc, + afterMigrationFunc, + migrationWithPanic, + true) +} + +func TestMigrationWithFatal(t *testing.T) { + bucketPrefix := []byte("somebucket") + keyPrefix := []byte("someprefix") + beforeMigration := []byte("beforemigration") + afterMigration := []byte("aftermigration") + + beforeMigrationFunc := func(d *DB) { + d.store.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketPrefix) + if err != nil { + return err + } + + bucket.Put(keyPrefix, beforeMigration) + return nil + }) + } + + // Create migration function which changes the initialy created data and + // return the error, in this case we pretending that somthing goes + // wrong. + migrationWithFatal := func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketPrefix) + if err != nil { + return err + } + + bucket.Put(keyPrefix, afterMigration) + return errors.New("some error!") + } + + // Check that version of database and initial data wasn't changed. + afterMigrationFunc := func(d *DB) { + meta, err := d.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.dbVersionNumber != 0 { + t.Fatal("migration failed but version is changed") + } + + err = d.store.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketPrefix) + if err != nil { + return err + } + + value := bucket.Get(keyPrefix) + if !bytes.Equal(value, beforeMigration) { + return errors.New("migration failed but data is " + + "changed") + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + } + + applyMigration(t, + beforeMigrationFunc, + afterMigrationFunc, + migrationWithFatal, + true) +} + +func TestMigrationWithoutErrors(t *testing.T) { + bucketPrefix := []byte("somebucket") + keyPrefix := []byte("someprefix") + beforeMigration := []byte("beforemigration") + afterMigration := []byte("aftermigration") + + // Populate database with initial data. + beforeMigrationFunc := func(d *DB) { + d.store.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketPrefix) + if err != nil { + return err + } + + bucket.Put(keyPrefix, beforeMigration) + return nil + }) + } + + // Create migration function which changes the initialy created data. + migrationWithoutErrors := func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketPrefix) + if err != nil { + return err + } + + bucket.Put(keyPrefix, afterMigration) + return nil + } + + // Check that version of database and data was properly changed. + afterMigrationFunc := func(d *DB) { + meta, err := d.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.dbVersionNumber != 1 { + t.Fatal("version number isn't changed after " + + "succesfully aplied migration") + } + + err = d.store.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketPrefix) + if err != nil { + return err + } + + value := bucket.Get(keyPrefix) + if !bytes.Equal(value, afterMigration) { + return errors.New("migration wasn't applyied " + + "properly") + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + } + + applyMigration(t, + beforeMigrationFunc, + afterMigrationFunc, + migrationWithoutErrors, + false) +}