channeldb: add ability to close active channels

This commit introduces the concept of “closing” an already active
channel. Closing a channel causes all the channel state to be purged
from the database, and also triggers the creation of a small “summary”
kept concerning details of the previously open channel.

This commit also updates the previous test case(s), and includes the
close channel bucket in the database deletion in the .Wipe() method.
This commit is contained in:
Olaoluwa Osuntokun 2016-06-22 16:15:07 -07:00
parent 7e09a70706
commit f1f27b2046
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 227 additions and 3 deletions

@ -204,6 +204,53 @@ func (c *OpenChannel) FullSync() error {
}) })
} }
// CloseChannel closes a previously active lightning channel. Closing a channel
// entails deleting all saved state within the database concerning this
// channel, as well as created a small channel summary for record keeping
// purposes.
func (c *OpenChannel) CloseChannel() error {
return c.Db.store.Update(func(tx *bolt.Tx) error {
// First fetch the top level bucket which stores all data related to
// current, active channels.
chanBucket := tx.Bucket(openChannelBucket)
if chanBucket == nil {
return ErrNoChanDBExists
}
// Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node.
nodeChanBucket := chanBucket.Bucket(c.TheirLNID[:])
if nodeChanBucket == nil {
return ErrNoActiveChannels
}
// Delete this channel ID from the node's active channel index.
chanIndexBucket := nodeChanBucket.Bucket(chanIDBucket)
if chanIndexBucket == nil {
return ErrNoActiveChannels
}
var b bytes.Buffer
if err := writeOutpoint(&b, c.ChanID); err != nil {
return err
}
outPointBytes := b.Bytes()
if err := chanIndexBucket.Delete(b.Bytes()); err != nil {
return err
}
// Now that the index to this channel has been deleted, purge
// the remaining channel meta-data from the databse.
if err := deleteOpenChannel(chanBucket, nodeChanBucket,
outPointBytes); err != nil {
return err
}
// Finally, create a summary of this channel in the closed
// channel bucket for this node.
return putClosedChannelSummary(tx, outPointBytes)
})
}
// ChannelSnapshot.... // ChannelSnapshot....
// TODO(roasbeef): methods to roll forwards/backwards in state etc // TODO(roasbeef): methods to roll forwards/backwards in state etc
// * use botldb cursor? // * use botldb cursor?
@ -244,6 +291,19 @@ func (c OpenChannel) RecordChannelDelta(theirRevokedCommit *wire.MsgTx, updateNu
return nil return nil
} }
func putClosedChannelSummary(tx *bolt.Tx, chanID []byte) error {
// For now, a summary of a closed channel simply involves recording the
// outpoint of the funding transaction.
closedChanBucket, err := tx.CreateBucketIfNotExists(closedChannelBucket)
if err != nil {
return err
}
// TODO(roasbeef): add other info
// * should likely have each in own bucket per node
return closedChanBucket.Put(chanID, nil)
}
// putChannel serializes, and stores the current state of the channel in its // putChannel serializes, and stores the current state of the channel in its
// entirety. // entirety.
func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
@ -344,6 +404,51 @@ func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
return channel, nil return channel, nil
} }
func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
channelID []byte) error {
// First we'll delete all the "common" top level items stored outside
// the node's channel bucket.
if err := deleteChanCapacity(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanMinFeePerKb(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanNumUpdates(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanTotalFlow(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanNetFee(openChanBucket, channelID); err != nil {
return err
}
// Finally, delete all the fields directly within the node's channel
// bucket.
if err := deleteChannelIDs(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanCommitKeys(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanCommitTxns(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanFundingInfo(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanEklremState(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanDeliveryScripts(nodeChanBucket, channelID); err != nil {
return err
}
return nil
}
func putChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error { func putChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
// Some scratch bytes re-used for serializing each of the uint64's. // Some scratch bytes re-used for serializing each of the uint64's.
scratch1 := make([]byte, 8) scratch1 := make([]byte, 8)
@ -375,6 +480,24 @@ func putChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
return openChanBucket.Put(keyPrefix, scratch3) return openChanBucket.Put(keyPrefix, scratch3)
} }
func deleteChanCapacity(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix[3:], chanID)
copy(keyPrefix[:3], chanCapacityPrefix)
if err := openChanBucket.Delete(keyPrefix); err != nil {
return err
}
copy(keyPrefix[:3], selfBalancePrefix)
if err := openChanBucket.Delete(keyPrefix); err != nil {
return err
}
copy(keyPrefix[:3], theirBalancePrefix)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
// A byte slice re-used to compute each key prefix below. // A byte slice re-used to compute each key prefix below.
var b bytes.Buffer var b bytes.Buffer
@ -416,6 +539,13 @@ func putChanMinFeePerKb(openChanBucket *bolt.Bucket, channel *OpenChannel) error
return openChanBucket.Put(keyPrefix, scratch) return openChanBucket.Put(keyPrefix, scratch)
} }
func deleteChanMinFeePerKb(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix, minFeePerKbPrefix)
copy(keyPrefix[3:], chanID)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanMinFeePerKb(openChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanMinFeePerKb(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil { if err := writeOutpoint(&b, channel.ChanID); err != nil {
@ -448,6 +578,13 @@ func putChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error
return openChanBucket.Put(keyPrefix, scratch) return openChanBucket.Put(keyPrefix, scratch)
} }
func deleteChanNumUpdates(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix, updatePrefix)
copy(keyPrefix[3:], chanID)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil { if err := writeOutpoint(&b, channel.ChanID); err != nil {
@ -487,6 +624,19 @@ func putChanTotalFlow(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
return openChanBucket.Put(keyPrefix, scratch2) return openChanBucket.Put(keyPrefix, scratch2)
} }
func deleteChanTotalFlow(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix[3:], chanID)
copy(keyPrefix[:3], satSentPrefix)
if err := openChanBucket.Delete(keyPrefix); err != nil {
return err
}
copy(keyPrefix[:3], satRecievedPrefix)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanTotalFlow(openChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanTotalFlow(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil { if err := writeOutpoint(&b, channel.ChanID); err != nil {
@ -523,6 +673,13 @@ func putChanNetFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
return openChanBucket.Put(keyPrefix, scratch) return openChanBucket.Put(keyPrefix, scratch)
} }
func deleteChanNetFee(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix, netFeesPrefix)
copy(keyPrefix[3:], chanID)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanNetFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanNetFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil { if err := writeOutpoint(&b, channel.ChanID); err != nil {
@ -540,6 +697,7 @@ func fetchChanNetFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
} }
func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
// TODO(roabeef): just pass in chanID everywhere for puts
var b bytes.Buffer var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil { if err := writeOutpoint(&b, channel.ChanID); err != nil {
return err return err
@ -554,6 +712,13 @@ func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
return nodeChanBucket.Put(idKey, channel.TheirLNID[:]) return nodeChanBucket.Put(idKey, channel.TheirLNID[:])
} }
func deleteChannelIDs(nodeChanBucket *bolt.Bucket, chanID []byte) error {
idKey := make([]byte, len(chanIDKey)+len(chanID))
copy(idKey[:3], chanIDKey)
copy(idKey[3:], chanID)
return nodeChanBucket.Delete(idKey)
}
func fetchChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil { if err := writeOutpoint(&b, channel.ChanID); err != nil {
@ -602,6 +767,13 @@ func putChanCommitKeys(nodeChanBucket *bolt.Bucket, channel *OpenChannel,
return nodeChanBucket.Put(commitKey, b.Bytes()) return nodeChanBucket.Put(commitKey, b.Bytes())
} }
func deleteChanCommitKeys(nodeChanBucket *bolt.Bucket, chanID []byte) error {
commitKey := make([]byte, len(commitKeys)+len(chanID))
copy(commitKey[:3], commitKeys)
copy(commitKeys[3:], chanID)
return nodeChanBucket.Delete(commitKey)
}
func fetchChanCommitKeys(nodeChanBucket *bolt.Bucket, channel *OpenChannel, func fetchChanCommitKeys(nodeChanBucket *bolt.Bucket, channel *OpenChannel,
ed EncryptorDecryptor) error { ed EncryptorDecryptor) error {
@ -668,6 +840,13 @@ func putChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error
return nodeChanBucket.Put(txnsKey, b.Bytes()) return nodeChanBucket.Put(txnsKey, b.Bytes())
} }
func deleteChanCommitTxns(nodeChanBucket *bolt.Bucket, chanID []byte) error {
txnsKey := make([]byte, len(commitTxnsKey)+len(chanID))
copy(txnsKey[:3], commitTxnsKey)
copy(txnsKey[3:], chanID)
return nodeChanBucket.Delete(txnsKey)
}
func fetchChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var bc bytes.Buffer var bc bytes.Buffer
if err := writeOutpoint(&bc, channel.ChanID); err != nil { if err := writeOutpoint(&bc, channel.ChanID); err != nil {
@ -747,6 +926,13 @@ func putChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel,
return nodeChanBucket.Put(fundTxnKey, b.Bytes()) return nodeChanBucket.Put(fundTxnKey, b.Bytes())
} }
func deleteChanFundingInfo(nodeChanBucket *bolt.Bucket, chanID []byte) error {
fundTxnKey := make([]byte, len(fundingTxnKey)+len(chanID))
copy(fundTxnKey[:3], fundingTxnKey)
copy(fundTxnKey[3:], chanID)
return nodeChanBucket.Delete(fundTxnKey)
}
func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel, func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel,
ed EncryptorDecryptor) error { ed EncryptorDecryptor) error {
@ -816,6 +1002,7 @@ func putChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error
return err return err
} }
// TODO(roasbeef): no longer need to store any sender data
senderBytes, err := channel.LocalElkrem.ToBytes() senderBytes, err := channel.LocalElkrem.ToBytes()
if err != nil { if err != nil {
return err return err
@ -835,6 +1022,13 @@ func putChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error
return nodeChanBucket.Put(elkremKey, b.Bytes()) return nodeChanBucket.Put(elkremKey, b.Bytes())
} }
func deleteChanEklremState(nodeChanBucket *bolt.Bucket, chanID []byte) error {
elkremKey := make([]byte, len(elkremStateKey)+len(chanID))
copy(elkremKey[:3], elkremStateKey)
copy(elkremKey[3:], chanID)
return nodeChanBucket.Delete(elkremKey)
}
func fetchChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil { if err := writeOutpoint(&b, channel.ChanID); err != nil {
@ -891,7 +1085,13 @@ func putChanDeliveryScripts(nodeChanBucket *bolt.Bucket, channel *OpenChannel) e
} }
return nodeChanBucket.Put(deliveryScriptsKey, b.Bytes()) return nodeChanBucket.Put(deliveryScriptsKey, b.Bytes())
}
func deleteChanDeliveryScripts(nodeChanBucket *bolt.Bucket, chanID []byte) error {
deliveryKey := make([]byte, len(deliveryScriptsKey)+len(chanID))
copy(deliveryKey[:3], deliveryScriptsKey)
copy(deliveryKey[3:], chanID)
return nodeChanBucket.Delete(deliveryScriptsKey)
} }
func fetchChanDeliveryScripts(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanDeliveryScripts(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {

@ -95,9 +95,10 @@ func (m *MockEncryptorDecryptor) OverheadSize() uint32 {
var _ EncryptorDecryptor = (*MockEncryptorDecryptor)(nil) var _ EncryptorDecryptor = (*MockEncryptorDecryptor)(nil)
func TestOpenChannelEncodeDecode(t *testing.T) { func TestOpenChannelPutGetDelete(t *testing.T) {
// First, create a temporary directory to be used for the duration of // First, create a temporary directory to be used for the duration of
// this test. // this test.
// TODO(roasbeef): move initial set up to something within testing.Main
tempDirName, err := ioutil.TempDir("", "channeldb") tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil { if err != nil {
t.Fatalf("unable to create temp dir: %v") t.Fatalf("unable to create temp dir: %v")
@ -287,6 +288,26 @@ func TestOpenChannelEncodeDecode(t *testing.T) {
if state.CreationTime.Unix() != newState.CreationTime.Unix() { if state.CreationTime.Unix() != newState.CreationTime.Unix() {
t.Fatalf("creation time doesn't match") t.Fatalf("creation time doesn't match")
} }
// Finally to wrap up the test, delete the state of the channel within
// the database. This involves "closing" the channel which removes all
// written state, and creates a small "summary" elsewhere within the
// database.
if err := state.CloseChannel(); err != nil {
t.Fatalf("unable to close channel: %v", err)
}
// As the channel is now closed, attempting to fetch all open channels
// for our fake node ID should return an empty slice.
openChans, err := cdb.FetchOpenChannels(&nodeID)
if err != nil {
t.Fatalf("unable to fetch open channels: %v", err)
}
// TODO(roasbeef): need to assert much more
if len(openChans) != 0 {
t.Fatalf("all channels not deleted, found %v", len(openChans))
}
} }
func TestOpenChannelEncodeDecodeCorruption(t *testing.T) { func TestOpenChannelEncodeDecodeCorruption(t *testing.T) {

@ -72,8 +72,11 @@ func (d *DB) RegisterCryptoSystem(ed EncryptorDecryptor) {
// Wipe... // Wipe...
func (d *DB) Wipe() error { func (d *DB) Wipe() error {
return d.store.Update(func(tx *bolt.Tx) error { return d.store.Update(func(tx *bolt.Tx) error {
// TODO(roasbee): delete all other top-level buckets. if err := tx.DeleteBucket(openChannelBucket); err != nil {
return tx.DeleteBucket(openChannelBucket) return err
}
return tx.DeleteBucket(closedChannelBucket)
}) })
} }