channeldb: implement commitment state update log

This commit implements a state update log which is intended the record
the relevant information for each state transition on disk. For each
state transition a delta should be written recording the new state. A
new method is also provided which is able to retrieve a previous
channel state based on a state update #.

At the moment no measures has been taken to optimize the space
utilization of each update on disk. There are several low-hanging
fruits which can be addressed at a later point. Ultimately the update
log itself should be implemented with an append-only flat file at the
storage level. In any case, the high level abstraction should be able
to maintained independent of differences in the on-disk format itself.
This commit is contained in:
Olaoluwa Osuntokun 2016-09-02 18:51:34 -07:00
parent d2acb4336c
commit 6684f6aedf
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
4 changed files with 497 additions and 57 deletions

@ -2,6 +2,7 @@ package channeldb
import (
"bytes"
"fmt"
"io"
"sync"
"time"
@ -87,12 +88,13 @@ var (
deliveryScriptsKey = []byte("dsk")
)
// OpenChannel...
// TODO(roasbeef): Copy/Clone method, so CoW on writes?
// * CoW method would allow for intelligent partial writes for updates
// TODO(roasbeef): UpdateState(func (newChan *OpenChannel) error)
// * need mutex, invarient that all reads/writes grab the mutex
// * needs to also return two slices of added then removed HTLC's
// OpenChannel encapsulates the persistent and dynamic state of an open channel
// with a remote node. An open channel supports several options for on-disk
// serialization depending on the exact context. Full (upon channel creation)
// state commitments, and partial (due to a commitment update) writes are
// supported. Each partial write due to a state update appends the new update
// to an on-disk log, which can then subsequently be queried in order to
// "time-travel" to a prior state.
type OpenChannel struct {
// Hash? or Their current pubKey?
TheirLNID [wire.HashSize]byte
@ -150,13 +152,6 @@ type OpenChannel struct {
TotalNetFees uint64 // TODO(roasbeef): total fees paid too?
CreationTime time.Time // TODO(roasbeef): last update time?
// isPrevState denotes if this instance of an OpenChannel is a previous,
// revoked channel state. If so, then the FullSynv, and UpdateState
// methods are disabled in order to prevent overiding the latest channel
// state.
// TODO(roasbeef): scrap? already have snapshots now?
isPrevState bool
// TODO(roasbeef): eww
Db *DB
@ -232,6 +227,132 @@ func (c *OpenChannel) SyncRevocation() error {
})
}
// HTLC is the on-disk representation of a hash time-locked contract. HTLC's
// are contained within ChannelDeltas which encode the current state of the
// commitment between state updates.
type HTLC struct {
// Incoming denotes whether we're the receiver or the sender of this
// HTLC.
Incoming bool
// Amt is the amount of satoshis this HTLC escrows.
Amt btcutil.Amount
// RHash is the payment hash of the HTLC.
RHash [32]byte
// RefundTimeout is the absolute timeout on the HTLC that the sender
// must wait before reclaiming the funds in limbo.
RefundTimeout uint32
// RevocationTimeout is the relative timeout the party who broadcasts
// the commitment transaction must wait before being able to fully
// sweep the funds on-chain in the case of a unilateral channel
// closure.
RevocationTimeout uint32
}
// ChannelDelta is a snapshot of the commitment state at a particular point in
// the commitment chain. With each state transition, a snapshot of the current
// state along with all non-settled HTLC's are recorded.
// TODO(roasbeef): should only need the key instead of hash after refactor
// within state machine.
// * won't actually be needed if it's a past state?
type ChannelDelta struct {
RevocationHash [32]byte
RevocationKey *btcec.PublicKey
LocalBalance btcutil.Amount
RemoteBalance btcutil.Amount
Htlcs []*HTLC
UpdateNum uint32
}
// RecordChannelDelta records the new state transition within an on-disk
// append-only log which records all state transitions. Additionally, the
// internal balances and update counter of the target OpenChannel are updated
// accordingly based on the passed delta.
func (c *OpenChannel) RecordChannelDelta(delta *ChannelDelta) error {
return c.Db.store.Update(func(tx *bolt.Tx) error {
chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
}
id := c.TheirLNID[:]
nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id)
if nodeChanBucket == nil {
return ErrNoActiveChannels
}
c.OurBalance = delta.LocalBalance
c.TheirBalance = delta.RemoteBalance
c.NumUpdates = uint64(delta.UpdateNum)
// First we'll write out the current latest dynamic channel
// state: the current channel balance, the number of updates,
// and our latest commitment transaction+sig.
if err := putChanCapacity(chanBucket, c); err != nil {
return err
}
if err := putChanNumUpdates(chanBucket, c); err != nil {
return err
}
if err := putChanCommitTxns(nodeChanBucket, c); err != nil {
return err
}
// With the current state updated, append a new log entry
// recording this the delta of this state transition.
// TODO(roasbeef): could make the deltas relative, would save
// space, but then tradeoff for more disk-seeks to recover the
// full state.
logKey := channelLogBucket
logBucket, err := nodeChanBucket.CreateBucketIfNotExists(logKey)
if err != nil {
return err
}
return appendChannelLogEntry(logBucket, delta, c.ChanID)
})
}
// FindPreviousState scans through the append-only log in an attempt to recover
// the previous channel state indicated by the update number. This method is
// intended to be used for obtaining the relevant data needed to claim all
// funds rightfully spendable in the case of an on-chain broadcast of the
// commitment transaction.
func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelDelta, error) {
delta := &ChannelDelta{}
err := c.Db.store.View(func(tx *bolt.Tx) error {
chanBucket := tx.Bucket(openChannelBucket)
nodeChanBucket := chanBucket.Bucket(c.TheirLNID[:])
if nodeChanBucket == nil {
return ErrNoActiveChannels
}
logBucket := nodeChanBucket.Bucket(channelLogBucket)
if nodeChanBucket == nil {
return ErrNoPastDeltas
}
var err error
delta, err = fetchChannelLogEntry(logBucket, c.ChanID,
uint32(updateNum))
return err
})
if err != nil {
return nil, err
}
return delta, nil
}
// CloseChannel closes a previously active lightning channel. Closing a channel
// entails deleting all saved state within the database concerning this
// channel, as well as created a small channel summary for record keeping
@ -267,7 +388,7 @@ func (c *OpenChannel) CloseChannel() error {
}
// Now that the index to this channel has been deleted, purge
// the remaining channel meta-data from the databse.
// the remaining channel meta-data from the database.
if err := deleteOpenChannel(chanBucket, nodeChanBucket,
outPointBytes); err != nil {
return err
@ -282,8 +403,6 @@ func (c *OpenChannel) CloseChannel() error {
// ChannelSnapshot is a frozen snapshot of the current channel state. A
// snapshot is detached from the original channel that generated it, providing
// read-only access to the current or prior state of an active channel.
// TODO(roasbeef): methods to roll forwards/backwards in state etc
// * use botldb cursor?
type ChannelSnapshot struct {
RemoteID [wire.HashSize]byte
@ -299,8 +418,6 @@ type ChannelSnapshot struct {
TotalSatoshisReceived uint64
// TODO(roasbeef): fee stuff
// TODO(roasbeef): active HTLC's + their direction
updateNum uint64
channel *OpenChannel
}
@ -320,32 +437,12 @@ func (c *OpenChannel) Snapshot() *ChannelSnapshot {
}
copy(snapshot.RemoteID[:], c.TheirLNID[:])
// TODO(roasbeef): cache current channel delta in memory, either merge
// or replace with ChannelSnapshot
return snapshot
}
// FindPreviousState...
// TODO(roasbeef): method to retrieve both old commitment txns given update #
func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelSnapshot, error) {
return nil, nil
}
// ChannelDelta...
// TODO(roasbeef): binlog like entry?
type ChannelDelta struct {
// change in allocations
// added + removed htlcs
// index
}
// RecordChannelDelta
// TODO(roasbeef): only need their commit?
// * or as internal helper func to UpdateState func?
func (c OpenChannel) RecordChannelDelta(theirRevokedCommit *wire.MsgTx, updateNum uint64) error {
// TODO(roasbeef): record all HTLCs, pass those instead?
// *
return nil
}
func putClosedChannelSummary(tx *bolt.Tx, chanID []byte) error {
// For now, a summary of a closed channel simply involves recording the
// outpoint of the funding transaction.
@ -870,6 +967,7 @@ func putChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error
return err
}
// TODO(roasbeef): should move this into putChanFundingInfo
scratch := make([]byte, 4)
byteOrder.PutUint32(scratch, channel.LocalCsvDelay)
if _, err := b.Write(scratch); err != nil {
@ -1168,9 +1266,212 @@ func fetchChanDeliveryScripts(nodeChanBucket *bolt.Bucket, channel *OpenChannel)
return nil
}
// htlcDiskSize represents the number of btyes a serialized HTLC takes up on
// disk. The size of an HTLC on disk is 49 bytes total: incoming (1) + amt (8)
// + rhash (32) + timeouts (8)
const htlcDiskSize = 1 + 8 + 32 + 4 + 4
func serializeHTLC(w io.Writer, h *HTLC) error {
var buf [htlcDiskSize]byte
var boolByte [1]byte
if h.Incoming {
boolByte[0] = 1
} else {
boolByte[0] = 0
}
var n int
n += copy(buf[:], boolByte[:])
byteOrder.PutUint64(buf[n:], uint64(h.Amt))
n += 8
n += copy(buf[n:], h.RHash[:])
byteOrder.PutUint32(buf[n:], h.RefundTimeout)
n += 4
byteOrder.PutUint32(buf[n:], h.RevocationTimeout)
n += 4
if _, err := w.Write(buf[:]); err != nil {
return err
}
return nil
}
func deserializeHTLC(r io.Reader) (*HTLC, error) {
h := &HTLC{}
var scratch [8]byte
if _, err := r.Read(scratch[:1]); err != nil {
return nil, err
}
if scratch[0] == 1 {
h.Incoming = true
} else {
h.Incoming = false
}
if _, err := r.Read(scratch[:]); err != nil {
return nil, err
}
h.Amt = btcutil.Amount(byteOrder.Uint64(scratch[:]))
if _, err := r.Read(h.RHash[:]); err != nil {
return nil, err
}
if _, err := r.Read(scratch[:4]); err != nil {
return nil, err
}
h.RefundTimeout = byteOrder.Uint32(scratch[:4])
if _, err := r.Read(scratch[:4]); err != nil {
return nil, err
}
h.RevocationTimeout = byteOrder.Uint32(scratch[:])
return h, nil
}
func serializeChannelDelta(w io.Writer, delta *ChannelDelta) error {
if _, err := w.Write(delta.RevocationHash[:]); err != nil {
return err
}
serializeKey := delta.RevocationKey.SerializeCompressed()
if _, err := w.Write(serializeKey); err != nil {
return err
}
// TODO(roasbeef): could use compression here to reduce on-disk space.
var scratch [8]byte
byteOrder.PutUint64(scratch[:], uint64(delta.LocalBalance))
if _, err := w.Write(scratch[:]); err != nil {
return err
}
byteOrder.PutUint64(scratch[:], uint64(delta.RemoteBalance))
if _, err := w.Write(scratch[:]); err != nil {
return err
}
byteOrder.PutUint32(scratch[:4], delta.UpdateNum)
if _, err := w.Write(scratch[:4]); err != nil {
return err
}
numHtlcs := uint64(len(delta.Htlcs))
if err := wire.WriteVarInt(w, 0, numHtlcs); err != nil {
return err
}
for _, htlc := range delta.Htlcs {
if err := serializeHTLC(w, htlc); err != nil {
return err
}
}
return nil
}
func deserializeChannelDelta(r io.Reader) (*ChannelDelta, error) {
var (
err error
scratch [8]byte
key [33]byte
)
delta := &ChannelDelta{}
if _, err = r.Read(delta.RevocationHash[:]); err != nil {
return nil, err
}
if _, err = r.Read(key[:]); err != nil {
return nil, err
}
delta.RevocationKey, err = btcec.ParsePubKey(key[:], btcec.S256())
if err != nil {
return nil, err
}
if _, err := r.Read(scratch[:]); err != nil {
return nil, err
}
delta.LocalBalance = btcutil.Amount(byteOrder.Uint64(scratch[:]))
if _, err := r.Read(scratch[:]); err != nil {
return nil, err
}
delta.RemoteBalance = btcutil.Amount(byteOrder.Uint64(scratch[:]))
if _, err := r.Read(scratch[:4]); err != nil {
return nil, err
}
delta.UpdateNum = byteOrder.Uint32(scratch[:4])
numHtlcs, err := wire.ReadVarInt(r, 0)
if err != nil {
return nil, err
}
delta.Htlcs = make([]*HTLC, numHtlcs)
for i := uint64(0); i < numHtlcs; i++ {
htlc, err := deserializeHTLC(r)
if err != nil {
return nil, err
}
delta.Htlcs[i] = htlc
}
return delta, nil
}
func appendChannelLogEntry(log *bolt.Bucket, delta *ChannelDelta,
chanPoint *wire.OutPoint) error {
// First construct the key for this particular log entry. The key for
// each newly added log entry is: channelPoint || stateNum.
var logEntrykey [36 + 4]byte
copy(logEntrykey[:], chanPoint.Hash[:])
var scratch [4]byte
byteOrder.PutUint32(scratch[:], delta.UpdateNum)
copy(logEntrykey[36:], scratch[:])
// With the key constructed, serialize the delta to raw bytes, then
// write the new state to disk.
var b bytes.Buffer
if err := serializeChannelDelta(&b, delta); err != nil {
return err
}
return log.Put(logEntrykey[:], b.Bytes())
}
func fetchChannelLogEntry(log *bolt.Bucket, chanPoint *wire.OutPoint,
updateNum uint32) (*ChannelDelta, error) {
// First construct the key for this particular log entry. The key for
// each newly added log entry is: channelPoint || stateNum.
// TODO(roasbeef): make into func..
var logEntrykey [36 + 4]byte
copy(logEntrykey[:], chanPoint.Hash[:])
var scratch [4]byte
byteOrder.PutUint32(scratch[:], updateNum)
copy(logEntrykey[36:], scratch[:])
deltaBytes := log.Get(logEntrykey[:])
if deltaBytes == nil {
return nil, fmt.Errorf("log entry not found")
}
deltaReader := bytes.NewReader(deltaBytes)
return deserializeChannelDelta(deltaReader)
}
func writeOutpoint(w io.Writer, o *wire.OutPoint) error {
// TODO(roasbeef): make all scratch buffers on the stack
scratch := make([]byte, 4)
// TODO(roasbeef): write raw 32 bytes instead of wasting the extra
// byte.
if err := wire.WriteVarBytes(w, 0, o.Hash[:]); err != nil {
return err
}

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/elkrem"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg"
@ -76,35 +77,44 @@ var (
Hash: key,
Index: 0,
}
privKey, pubKey = btcec.PrivKeyFromBytes(btcec.S256(), key[:])
)
func TestOpenChannelPutGetDelete(t *testing.T) {
// makeTestDB creates a new instance of the ChannelDB for testing purposes. A
// callback which cleans up the created temporary directories is also returned
// and intended to be executed after the test completes.
func makeTestDB() (*DB, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
// TODO(roasbeef): move initial set up to something within testing.Main
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
t.Fatalf("unable to create temp dir: %v")
return nil, nil, err
}
defer os.RemoveAll(tempDirName)
// Next, create channeldb for the first time, also setting a mock
// EncryptorDecryptor implementation for testing purposes.
cdb, err := Open(tempDirName, netParams)
if err != nil {
t.Fatalf("unable to create channeldb: %v", err)
return nil, nil, err
}
defer cdb.Close()
privKey, pubKey := btcec.PrivKeyFromBytes(btcec.S256(), key[:])
cleanUp := func() {
os.RemoveAll(tempDirName)
cdb.Close()
}
return cdb, cleanUp, nil
}
func createTestChannelState(cdb *DB) (*OpenChannel, error) {
addr, err := btcutil.NewAddressPubKey(pubKey.SerializeCompressed(), netParams)
if err != nil {
t.Fatalf("unable to create delivery address")
return nil, err
}
script, err := txscript.MultiSigScript([]*btcutil.AddressPubKey{addr, addr}, 2)
if err != nil {
t.Fatalf("unable to create redeemScript")
return nil, err
}
// Simulate 1000 channel updates via progression of the elkrem
@ -114,15 +124,15 @@ func TestOpenChannelPutGetDelete(t *testing.T) {
for i := 0; i < 1000; i++ {
preImage, err := sender.AtIndex(uint64(i))
if err != nil {
t.Fatalf("unable to progress elkrem sender: %v", err)
return nil, err
}
if receiver.AddNext(preImage); err != nil {
t.Fatalf("unable to progress elkrem receiver: %v", err)
return nil, err
}
}
state := OpenChannel{
return &OpenChannel{
TheirLNID: key,
ChanID: id,
MinFeePerKb: btcutil.Amount(5000),
@ -145,14 +155,26 @@ func TestOpenChannelPutGetDelete(t *testing.T) {
TheirDeliveryScript: script,
LocalCsvDelay: 5,
RemoteCsvDelay: 9,
NumUpdates: 1,
NumUpdates: 0,
TotalSatoshisSent: 8,
TotalSatoshisReceived: 2,
TotalNetFees: 9,
CreationTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
Db: cdb,
}
}, nil
}
func TestOpenChannelPutGetDelete(t *testing.T) {
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("uanble to make test database: %v", err)
}
defer cleanUp()
state, err := createTestChannelState(cdb)
if err != nil {
t.Fatalf("unable to create channel state: %v", err)
}
if err := state.FullSync(); err != nil {
t.Fatalf("unable to save and serialize channel state: %v", err)
}
@ -302,5 +324,121 @@ func TestOpenChannelPutGetDelete(t *testing.T) {
}
}
func TestOpenChannelEncodeDecodeCorruption(t *testing.T) {
func TestChannelStateUpdateLog(t *testing.T) {
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("uanble to make test database: %v", err)
}
defer cleanUp()
// First create a minimal channel, then perform a full sync in order to
// persist the data.
channel, err := createTestChannelState(cdb)
if err != nil {
t.Fatalf("unable to create channel state: %v", err)
}
if err := channel.FullSync(); err != nil {
t.Fatalf("unable to save and serialize channel state: %v", err)
}
// Add some HTLC's which were added during this new state transition.
// Half of the HTLC's are incoming, while the other half are outgoing.
var htlcs []*HTLC
for i := uint32(0); i < 10; i++ {
var incoming bool
if i > 5 {
incoming = true
}
htlc := &HTLC{
Incoming: incoming,
Amt: 50000,
RHash: key,
RefundTimeout: i,
RevocationTimeout: i + 2,
}
htlcs = append(htlcs, htlc)
}
// Create a new channel delta which includes the above HTLC's, some
// balance updates, and an increment of the current commitment height.
// Additionally, modify the signature and commitment transaction.
newSequence := uint32(129498)
newSig := bytes.Repeat([]byte{3}, 71)
delta := &ChannelDelta{
RevocationHash: key,
RevocationKey: pubKey,
LocalBalance: btcutil.Amount(1e8),
RemoteBalance: btcutil.Amount(1e8),
Htlcs: htlcs,
UpdateNum: 1,
}
channel.OurCommitTx.TxIn[0].Sequence = newSequence
channel.OurCommitSig = newSig
if err := channel.RecordChannelDelta(delta); err != nil {
t.Fatalf("unable to record channel delta: %v", err)
}
// The balances, new update, and the changes to the fake commitment
// transaction along with the modified signature should all have been
// updated.
nodeID := wire.ShaHash(channel.TheirLNID)
updatedChannel, err := cdb.FetchOpenChannels(&nodeID)
if err != nil {
t.Fatalf("unable to fetch updated channel: %v", err)
}
if !bytes.Equal(updatedChannel[0].OurCommitSig, newSig) {
t.Fatalf("sigs don't match %x vs %x",
updatedChannel[0].OurCommitSig, newSig)
}
if updatedChannel[0].OurCommitTx.TxIn[0].Sequence != newSequence {
t.Fatalf("sequence numbers don't match: %v vs %v",
updatedChannel[0].OurCommitTx.TxIn[0].Sequence, newSequence)
}
if updatedChannel[0].OurBalance != delta.LocalBalance {
t.Fatalf("local balances don't match: %v vs %v",
updatedChannel[0].OurBalance, delta.LocalBalance)
}
if updatedChannel[0].TheirBalance != delta.RemoteBalance {
t.Fatalf("remote balances don't match: %v vs %v",
updatedChannel[0].TheirBalance, delta.RemoteBalance)
}
if updatedChannel[0].NumUpdates != uint64(delta.UpdateNum) {
t.Fatalf("update # doesn't match: %v vs %v",
updatedChannel[0].NumUpdates, delta.UpdateNum)
}
// We should be able to fetch the channel delta created above by it's
// update number with all the state properly reconstructed.
diskDelta, err := channel.FindPreviousState(uint64(delta.UpdateNum))
if err != nil {
t.Fatalf("unable to fetch past delta: %v", err)
}
// The two deltas (the original vs the on-disk version) should
// identical, and all HTLC data should properly be retained.
if !bytes.Equal(delta.RevocationHash[:], diskDelta.RevocationHash[:]) {
t.Fatalf("revocation hashes don't match")
}
if !bytes.Equal(delta.RevocationKey.SerializeCompressed(),
diskDelta.RevocationKey.SerializeCompressed()) {
t.Fatalf("revocation keys don't match")
}
if delta.LocalBalance != diskDelta.LocalBalance {
t.Fatalf("local balances don't match")
}
if delta.RemoteBalance != diskDelta.RemoteBalance {
t.Fatalf("remote balances don't match")
}
if delta.UpdateNum != diskDelta.UpdateNum {
t.Fatalf("update number doesn't match")
}
for i := 0; i < len(delta.Htlcs); i++ {
originalHTLC := delta.Htlcs[i]
diskHTLC := diskDelta.Htlcs[i]
if !reflect.DeepEqual(originalHTLC, diskHTLC) {
t.Fatalf("htlc's dont match: %v vs %v",
spew.Sdump(originalHTLC),
spew.Sdump(diskHTLC))
}
}
}

@ -7,4 +7,5 @@ var (
ErrNoActiveChannels = fmt.Errorf("no active channels exist")
ErrChannelNoExist = fmt.Errorf("this channel does not exist")
ErrNoPastDeltas = fmt.Errorf("channel has no recorded deltas")
)

@ -178,7 +178,7 @@ func senderHTLCScript(absoluteTimeout, relativeTimeout uint32, senderKey,
// Alternatively, the receiver can place a 0 as the second item of the
// witness stack if they wish to claim the HTLC with the proper
// pre-image as normal. In order to prevent an over-sized pre-image
// attack (which can create undesirable redemption asymmerties, we
// attack (which can create undesirable redemption asymmerties), we
// strongly require that all HTLC pre-images are exactly 32 bytes.
builder.AddOp(txscript.OP_ELSE)
builder.AddOp(txscript.OP_SIZE)