diff --git a/watchtower/wtdb/tower_db.go b/watchtower/wtdb/tower_db.go new file mode 100644 index 00000000..0bcd271c --- /dev/null +++ b/watchtower/wtdb/tower_db.go @@ -0,0 +1,733 @@ +package wtdb + +import ( + "bytes" + "encoding/binary" + "errors" + "os" + "path/filepath" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" +) + +const ( + // dbName is the filename of tower database. + dbName = "watchtower.db" + + // dbFilePermission requests read+write access to the db file. + dbFilePermission = 0600 +) + +var ( + // sessionsBkt is a bucket containing all negotiated client sessions. + // session id -> session + sessionsBkt = []byte("sessions-bucket") + + // updatesBkt is a bucket containing all state updates sent by clients. + // The updates are further bucketed by session id to prevent clients + // from overwrite each other. + // hint => session id -> update + updatesBkt = []byte("updates-bucket") + + // updateIndexBkt is a bucket that indexes all state updates by their + // overarching session id. This allows for efficient lookup of updates + // by their session id, which is currently used to aide deletion + // performance. + // session id => hint1 -> []byte{} + // => hint2 -> []byte{} + updateIndexBkt = []byte("update-index-bucket") + + // lookoutTipBkt is a bucket containing the last block epoch processed + // by the lookout subsystem. It has one key, lookoutTipKey. + // lookoutTipKey -> block epoch + lookoutTipBkt = []byte("lookout-tip-bucket") + + // lookoutTipKey is a static key used to retrieve lookout tip's block + // epoch from the lookoutTipBkt. + lookoutTipKey = []byte("lookout-tip") + + // metadataBkt stores all the meta information concerning the state of + // the database. + metadataBkt = []byte("metadata-bucket") + + // dbVersionKey is a static key used to retrieve the database version + // number from the metadataBkt. + dbVersionKey = []byte("version") + + // ErrUninitializedDB signals that top-level buckets for the database + // have not been initialized. + ErrUninitializedDB = errors.New("tower db not initialized") + + // ErrNoDBVersion signals that the database contains no version info. + ErrNoDBVersion = errors.New("tower db has no version") + + // ErrNoSessionHintIndex signals that an active session does not have an + // initialized index for tracking its own state updates. + ErrNoSessionHintIndex = errors.New("session hint index missing") + + byteOrder = binary.BigEndian +) + +// TowerDB is single database providing a persistent storage engine for the +// wtserver and lookout subsystems. +type TowerDB struct { + db *bbolt.DB + dbPath string +} + +// OpenTowerDB opens the tower database given the path to the database's +// directory. If no such database exists, this method will initialize a fresh +// one using the latest version number and bucket structure. If a database +// exists but has a lower version number than the current version, any necessary +// migrations will be applied before returning. Any attempt to open a database +// with a version number higher that the latest version will fail to prevent +// accidental reversion. +func OpenTowerDB(dbPath string) (*TowerDB, error) { + path := filepath.Join(dbPath, dbName) + + // If the database file doesn't exist, this indicates we much initialize + // a fresh database with the latest version. + firstInit := !fileExists(path) + if firstInit { + // Ensure all parent directories are initialized. + err := os.MkdirAll(dbPath, 0700) + if err != nil { + return nil, err + } + } + + bdb, err := bbolt.Open(path, dbFilePermission, nil) + if err != nil { + return nil, err + } + + // If the file existed previously, we'll now check to see that the + // metadata bucket is properly initialized. It could be the case that + // the database was created, but we failed to actually populate any + // metadata. If the metadata bucket does not actually exist, we'll + // set firstInit to true so that we can treat is initialize the bucket. + if !firstInit { + var metadataExists bool + err = bdb.View(func(tx *bbolt.Tx) error { + metadataExists = tx.Bucket(metadataBkt) != nil + return nil + }) + if err != nil { + return nil, err + } + + if !metadataExists { + firstInit = true + } + } + + towerDB := &TowerDB{ + db: bdb, + dbPath: dbPath, + } + + if firstInit { + // If the database has not yet been created, we'll initialize + // the database version with the latest known version. + err = towerDB.db.Update(func(tx *bbolt.Tx) error { + return initDBVersion(tx, getLatestDBVersion(dbVersions)) + }) + if err != nil { + bdb.Close() + return nil, err + } + } else { + // Otherwise, ensure that any migrations are applied to ensure + // the data is in the format expected by the latest version. + err = towerDB.syncVersions(dbVersions) + if err != nil { + bdb.Close() + return nil, err + } + } + + // Now that the database version fully consistent with our latest known + // version, ensure that all top-level buckets known to this version are + // initialized. This allows us to assume their presence throughout all + // operations. If an known top-level bucket is expected to exist but is + // missing, this will trigger a ErrUninitializedDB error. + err = towerDB.db.Update(initTowerDBBuckets) + if err != nil { + bdb.Close() + return nil, err + } + + return towerDB, nil +} + +// fileExists returns true if the file exists, and false otherwise. +func fileExists(path string) bool { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return false + } + } + + return true +} + +// initTowerDBBuckets creates all top-level buckets required to handle database +// operations required by the latest version. +func initTowerDBBuckets(tx *bbolt.Tx) error { + buckets := [][]byte{ + sessionsBkt, + updateIndexBkt, + updatesBkt, + lookoutTipBkt, + } + + for _, bucket := range buckets { + _, err := tx.CreateBucketIfNotExists(bucket) + if err != nil { + return err + } + } + + return nil +} + +// syncVersions ensures the database version is consistent with the highest +// known database version, applying any migrations that have not been made. If +// the highest known version number is lower than the database's version, this +// method will fail to prevent accidental reversions. +func (t *TowerDB) syncVersions(versions []version) error { + curVersion, err := t.Version() + if err != nil { + return err + } + + latestVersion := getLatestDBVersion(versions) + switch { + + // Current version is higher than any known version, fail to prevent + // reversion. + case curVersion > latestVersion: + return channeldb.ErrDBReversion + + // Current version matches highest known version, nothing to do. + case curVersion == latestVersion: + return nil + } + + // Otherwise, apply any migrations in order to bring the database + // version up to the highest known version. + updates := getMigrations(versions, curVersion) + return t.db.Update(func(tx *bbolt.Tx) error { + for _, update := range updates { + if update.migration == nil { + continue + } + + log.Infof("Applying migration #%d", update.number) + + err := update.migration(tx) + if err != nil { + log.Errorf("Unable to apply migration #%d: %v", + err) + return err + } + } + + return putDBVersion(tx, latestVersion) + }) +} + +// Version returns the database's current version number. +func (t *TowerDB) Version() (uint32, error) { + var version uint32 + err := t.db.View(func(tx *bbolt.Tx) error { + var err error + version, err = getDBVersion(tx) + return err + }) + if err != nil { + return 0, err + } + + return version, nil +} + +// Close closes the underlying database. +func (t *TowerDB) Close() error { + return t.db.Close() +} + +// GetSessionInfo retrieves the session for the passed session id. An error is +// returned if the session could not be found. +func (t *TowerDB) GetSessionInfo(id *SessionID) (*SessionInfo, error) { + var session *SessionInfo + err := t.db.View(func(tx *bbolt.Tx) error { + sessions := tx.Bucket(sessionsBkt) + if sessions == nil { + return ErrUninitializedDB + } + + var err error + session, err = getSession(sessions, id[:]) + return err + }) + if err != nil { + return nil, err + } + + return session, nil +} + +// InsertSessionInfo records a negotiated session in the tower database. An +// error is returned if the session already exists. +func (t *TowerDB) InsertSessionInfo(session *SessionInfo) error { + return t.db.Update(func(tx *bbolt.Tx) error { + sessions := tx.Bucket(sessionsBkt) + if sessions == nil { + return ErrUninitializedDB + } + + updateIndex := tx.Bucket(updateIndexBkt) + if updateIndex == nil { + return ErrUninitializedDB + } + + dbSession, err := getSession(sessions, session.ID[:]) + switch { + case err == ErrSessionNotFound: + // proceed. + + case err != nil: + return err + + case dbSession.LastApplied > 0: + return ErrSessionAlreadyExists + } + + err = putSession(sessions, session) + if err != nil { + return err + } + + // Initialize the session-hint index which will be used to track + // all updates added for this session. Upon deletion, we will + // consult the index to determine exactly which updates should + // be deleted without needing to iterate over the entire + // database. + return touchSessionHintBkt(updateIndex, &session.ID) + }) +} + +// InsertStateUpdate stores an update sent by the client after validating that +// the update is well-formed in the context of other updates sent for the same +// session. This include verifying that the sequence number is incremented +// properly and the last applied values echoed by the client are sane. +func (t *TowerDB) InsertStateUpdate(update *SessionStateUpdate) (uint16, error) { + var lastApplied uint16 + err := t.db.Update(func(tx *bbolt.Tx) error { + sessions := tx.Bucket(sessionsBkt) + if sessions == nil { + return ErrUninitializedDB + } + + updates := tx.Bucket(updatesBkt) + if updates == nil { + return ErrUninitializedDB + } + + updateIndex := tx.Bucket(updateIndexBkt) + if updateIndex == nil { + return ErrUninitializedDB + } + + // Fetch the session corresponding to the update's session id. + // This will be used to validate that the update's sequence + // number and last applied values are sane. + session, err := getSession(sessions, update.ID[:]) + if err != nil { + return err + } + + // Validate the update against the current state of the session. + err = session.AcceptUpdateSequence( + update.SeqNum, update.LastApplied, + ) + if err != nil { + return err + } + + // Validation succeeded, therefore the update is committed and + // the session's last applied value is equal to the update's + // sequence number. + lastApplied = session.LastApplied + + // Store the updated session to persist the updated last applied + // values. + err = putSession(sessions, session) + if err != nil { + return err + } + + // Create or load the hint bucket for this state update's hint + // and write the given update. + hints, err := updates.CreateBucketIfNotExists(update.Hint[:]) + if err != nil { + return err + } + + var b bytes.Buffer + err = update.Encode(&b) + if err != nil { + return err + } + + err = hints.Put(update.ID[:], b.Bytes()) + if err != nil { + return err + } + + // Finally, create an entry in the update index to track this + // hint under its session id. This will allow us to delete the + // entries efficiently if the session is ever removed. + return putHintForSession(updateIndex, &update.ID, update.Hint) + }) + if err != nil { + return 0, err + } + + return lastApplied, nil +} + +// DeleteSession removes all data associated with a particular session id from +// the tower's database. +func (t *TowerDB) DeleteSession(target SessionID) error { + return t.db.Update(func(tx *bbolt.Tx) error { + sessions := tx.Bucket(sessionsBkt) + if sessions == nil { + return ErrUninitializedDB + } + + updates := tx.Bucket(updatesBkt) + if updates == nil { + return ErrUninitializedDB + } + + updateIndex := tx.Bucket(updateIndexBkt) + if updateIndex == nil { + return ErrUninitializedDB + } + + // Fail if the session doesn't exit. + _, err := getSession(sessions, target[:]) + if err != nil { + return err + } + + // Remove the target session. + err = sessions.Delete(target[:]) + if err != nil { + return err + } + + // Next, check the update index for any hints that were added + // under this session. + hints, err := getHintsForSession(updateIndex, &target) + if err != nil { + return err + } + + for _, hint := range hints { + // Remove the state updates for any blobs stored under + // the target session identifier. + updatesForHint := updates.Bucket(hint[:]) + if updatesForHint == nil { + continue + } + + update := updatesForHint.Get(target[:]) + if update == nil { + continue + } + + err := updatesForHint.Delete(target[:]) + if err != nil { + return err + } + + // If this was the last state update, we can also remove + // the hint that would map to an empty set. + err = isBucketEmpty(updatesForHint) + switch { + + // Other updates exist for this hint, keep the bucket. + case err == errBucketNotEmpty: + continue + + // Unexpected error. + case err != nil: + return err + + // No more updates for this hint, prune hint bucket. + default: + err = updates.DeleteBucket(hint[:]) + if err != nil { + return err + } + } + } + + // Finally, remove this session from the update index, which + // also removes any of the indexed hints beneath it. + return removeSessionHintBkt(updateIndex, &target) + }) +} + +// QueryMatches searches against all known state updates for any that match the +// passed breachHints. More than one Match will be returned for a given hint if +// they exist in the database. +func (t *TowerDB) QueryMatches(breachHints []BreachHint) ([]Match, error) { + var matches []Match + err := t.db.View(func(tx *bbolt.Tx) error { + sessions := tx.Bucket(sessionsBkt) + if sessions == nil { + return ErrUninitializedDB + } + + updates := tx.Bucket(updatesBkt) + if updates == nil { + return ErrUninitializedDB + } + + // Iterate through the target breach hints, appending any + // matching updates to the set of matches. + for _, hint := range breachHints { + // If a bucket does not exist for this hint, no matches + // are known. + updatesForHint := updates.Bucket(hint[:]) + if updatesForHint == nil { + continue + } + + // Otherwise, iterate through all (session id, update) + // pairs, creating a Match for each. + err := updatesForHint.ForEach(func(k, v []byte) error { + // Load the session via the session id for this + // update. The session info contains further + // instructions for how to process the state + // update. + session, err := getSession(sessions, k) + switch { + case err == ErrSessionNotFound: + log.Warnf("Missing session=%x for "+ + "matched state update hint=%x", + k, hint) + return nil + + case err != nil: + return err + } + + // Decode the state update containing the + // encrypted blob. + update := &SessionStateUpdate{} + err = update.Decode(bytes.NewReader(v)) + if err != nil { + return err + } + + var id SessionID + copy(id[:], k) + + // Construct the final match using the found + // update and its session info. + match := Match{ + ID: id, + SeqNum: update.SeqNum, + Hint: hint, + EncryptedBlob: update.EncryptedBlob, + SessionInfo: session, + } + + matches = append(matches, match) + + return nil + }) + if err != nil { + return err + } + } + + return nil + }) + if err != nil { + return nil, err + } + + return matches, nil +} + +// SetLookoutTip stores the provided epoch as the latest lookout tip epoch in +// the tower database. +func (t *TowerDB) SetLookoutTip(epoch *chainntnfs.BlockEpoch) error { + return t.db.Update(func(tx *bbolt.Tx) error { + lookoutTip := tx.Bucket(lookoutTipBkt) + if lookoutTip == nil { + return ErrUninitializedDB + } + + return putLookoutEpoch(lookoutTip, epoch) + }) +} + +// GetLookoutTip retrieves the current lookout tip block epoch from the tower +// database. +func (t *TowerDB) GetLookoutTip() (*chainntnfs.BlockEpoch, error) { + var epoch *chainntnfs.BlockEpoch + err := t.db.View(func(tx *bbolt.Tx) error { + lookoutTip := tx.Bucket(lookoutTipBkt) + if lookoutTip == nil { + return ErrUninitializedDB + } + + epoch = getLookoutEpoch(lookoutTip) + + return nil + }) + if err != nil { + return nil, err + } + + return epoch, nil +} + +// getSession retrieves the session info from the sessions bucket identified by +// its session id. An error is returned if the session is not found or a +// deserialization error occurs. +func getSession(sessions *bbolt.Bucket, id []byte) (*SessionInfo, error) { + sessionBytes := sessions.Get(id) + if sessionBytes == nil { + return nil, ErrSessionNotFound + } + + var session SessionInfo + err := session.Decode(bytes.NewReader(sessionBytes)) + if err != nil { + return nil, err + } + + return &session, nil +} + +// putSession stores the session info in the sessions bucket identified by its +// session id. An error is returned if a serialization error occurs. +func putSession(sessions *bbolt.Bucket, session *SessionInfo) error { + var b bytes.Buffer + err := session.Encode(&b) + if err != nil { + return err + } + + return sessions.Put(session.ID[:], b.Bytes()) +} + +// touchSessionHintBkt initializes the session-hint bucket for a particular +// session id. This ensures that future calls to getHintsForSession or +// putHintForSession can rely on the bucket already being created, and fail if +// index has not been initialized as this points to improper usage. +func touchSessionHintBkt(updateIndex *bbolt.Bucket, id *SessionID) error { + _, err := updateIndex.CreateBucketIfNotExists(id[:]) + return err +} + +// removeSessionHintBkt prunes the session-hint bucket for the given session id +// and all of the hints contained inside. This should be used to clean up the +// index upon session deletion. +func removeSessionHintBkt(updateIndex *bbolt.Bucket, id *SessionID) error { + return updateIndex.DeleteBucket(id[:]) +} + +// getHintsForSession returns all known hints belonging to the given session id. +// If the index for the session has not been initialized, this method returns +// ErrNoSessionHintIndex. +func getHintsForSession(updateIndex *bbolt.Bucket, + id *SessionID) ([]BreachHint, error) { + + sessionHints := updateIndex.Bucket(id[:]) + if sessionHints == nil { + return nil, ErrNoSessionHintIndex + } + + var hints []BreachHint + err := sessionHints.ForEach(func(k, _ []byte) error { + if len(k) != BreachHintSize { + return nil + } + + var hint BreachHint + copy(hint[:], k) + hints = append(hints, hint) + return nil + }) + if err != nil { + return nil, err + } + + return hints, nil +} + +// putHintForSession inserts a record into the update index for a given +// (session, hint) pair. The hints are coalesced under a bucket for the target +// session id, and used to perform efficient removal of updates. If the index +// for the session has not been initialized, this method returns +// ErrNoSessionHintIndex. +func putHintForSession(updateIndex *bbolt.Bucket, id *SessionID, + hint BreachHint) error { + + sessionHints := updateIndex.Bucket(id[:]) + if sessionHints == nil { + return ErrNoSessionHintIndex + } + + return sessionHints.Put(hint[:], []byte{}) +} + +// putLookoutEpoch stores the given lookout tip block epoch in provided bucket. +func putLookoutEpoch(bkt *bbolt.Bucket, epoch *chainntnfs.BlockEpoch) error { + epochBytes := make([]byte, 36) + copy(epochBytes, epoch.Hash[:]) + byteOrder.PutUint32(epochBytes[32:], uint32(epoch.Height)) + + return bkt.Put(lookoutTipKey, epochBytes) +} + +// getLookoutEpoch retrieves the lookout tip block epoch from the given bucket. +// A nil epoch is returned if no update exists. +func getLookoutEpoch(bkt *bbolt.Bucket) *chainntnfs.BlockEpoch { + epochBytes := bkt.Get(lookoutTipKey) + if len(epochBytes) != 36 { + return nil + } + + var hash chainhash.Hash + copy(hash[:], epochBytes[:32]) + height := byteOrder.Uint32(epochBytes[32:]) + + return &chainntnfs.BlockEpoch{ + Hash: &hash, + Height: int32(height), + } +} + +// errBucketNotEmpty is a helper error returned when testing whether a bucket is +// empty or not. +var errBucketNotEmpty = errors.New("bucket not empty") + +// isBucketEmpty returns errBucketNotEmpty if the bucket is not empty. +func isBucketEmpty(bkt *bbolt.Bucket) error { + return bkt.ForEach(func(_, _ []byte) error { + return errBucketNotEmpty + }) +} diff --git a/watchtower/wtdb/version.go b/watchtower/wtdb/version.go new file mode 100644 index 00000000..fd7481af --- /dev/null +++ b/watchtower/wtdb/version.go @@ -0,0 +1,84 @@ +package wtdb + +import "github.com/coreos/bbolt" + +// migration is a function which takes a prior outdated version of the database +// instances and mutates the key/bucket structure to arrive at a more +// up-to-date version of the database. +type migration func(tx *bbolt.Tx) error + +// version pairs a version number with the migration that would need to be +// applied from the prior version to upgrade. +type version struct { + number uint32 + migration migration +} + +// dbVersions stores all versions and migrations of the database. This list will +// be used when opening the database to determine if any migrations must be +// applied. +var dbVersions = []version{ + { + // Initial version requires no migration. + number: 0, + migration: nil, + }, +} + +// getLatestDBVersion returns the last known database version. +func getLatestDBVersion(versions []version) uint32 { + return versions[len(versions)-1].number +} + +// getMigrations returns a slice of all updates with a greater number that +// curVersion that need to be applied to sync up with the latest version. +func getMigrations(versions []version, curVersion uint32) []version { + var updates []version + for _, v := range versions { + if v.number > curVersion { + updates = append(updates, v) + } + } + + return updates +} + +// getDBVersion retrieves the current database version from the metadata bucket +// using the dbVersionKey. +func getDBVersion(tx *bbolt.Tx) (uint32, error) { + metadata := tx.Bucket(metadataBkt) + if metadata == nil { + return 0, ErrUninitializedDB + } + + versionBytes := metadata.Get(dbVersionKey) + if len(versionBytes) != 4 { + return 0, ErrNoDBVersion + } + + return byteOrder.Uint32(versionBytes), nil +} + +// initDBVersion initializes the top-level metadata bucket and writes the passed +// version number as the current version. +func initDBVersion(tx *bbolt.Tx, version uint32) error { + _, err := tx.CreateBucketIfNotExists(metadataBkt) + if err != nil { + return err + } + + return putDBVersion(tx, version) +} + +// putDBVersion stores the passed database version in the metadata bucket under +// the dbVersionKey. +func putDBVersion(tx *bbolt.Tx, version uint32) error { + metadata := tx.Bucket(metadataBkt) + if metadata == nil { + return ErrUninitializedDB + } + + versionBytes := make([]byte, 4) + byteOrder.PutUint32(versionBytes, version) + return metadata.Put(dbVersionKey, versionBytes) +}