diff --git a/channeldb/channel.go b/channeldb/channel.go index 742ce2ef..b3e6cdd4 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -2,6 +2,7 @@ package channeldb import ( "bytes" + "fmt" "io" "sync" "time" @@ -87,12 +88,13 @@ var ( deliveryScriptsKey = []byte("dsk") ) -// OpenChannel... -// TODO(roasbeef): Copy/Clone method, so CoW on writes? -// * CoW method would allow for intelligent partial writes for updates -// TODO(roasbeef): UpdateState(func (newChan *OpenChannel) error) -// * need mutex, invarient that all reads/writes grab the mutex -// * needs to also return two slices of added then removed HTLC's +// OpenChannel encapsulates the persistent and dynamic state of an open channel +// with a remote node. An open channel supports several options for on-disk +// serialization depending on the exact context. Full (upon channel creation) +// state commitments, and partial (due to a commitment update) writes are +// supported. Each partial write due to a state update appends the new update +// to an on-disk log, which can then subsequently be queried in order to +// "time-travel" to a prior state. type OpenChannel struct { // Hash? or Their current pubKey? TheirLNID [wire.HashSize]byte @@ -150,13 +152,6 @@ type OpenChannel struct { TotalNetFees uint64 // TODO(roasbeef): total fees paid too? CreationTime time.Time // TODO(roasbeef): last update time? - // isPrevState denotes if this instance of an OpenChannel is a previous, - // revoked channel state. If so, then the FullSynv, and UpdateState - // methods are disabled in order to prevent overiding the latest channel - // state. - // TODO(roasbeef): scrap? already have snapshots now? - isPrevState bool - // TODO(roasbeef): eww Db *DB @@ -232,6 +227,132 @@ func (c *OpenChannel) SyncRevocation() error { }) } +// HTLC is the on-disk representation of a hash time-locked contract. HTLC's +// are contained within ChannelDeltas which encode the current state of the +// commitment between state updates. +type HTLC struct { + // Incoming denotes whether we're the receiver or the sender of this + // HTLC. + Incoming bool + + // Amt is the amount of satoshis this HTLC escrows. + Amt btcutil.Amount + + // RHash is the payment hash of the HTLC. + RHash [32]byte + + // RefundTimeout is the absolute timeout on the HTLC that the sender + // must wait before reclaiming the funds in limbo. + RefundTimeout uint32 + + // RevocationTimeout is the relative timeout the party who broadcasts + // the commitment transaction must wait before being able to fully + // sweep the funds on-chain in the case of a unilateral channel + // closure. + RevocationTimeout uint32 +} + +// ChannelDelta is a snapshot of the commitment state at a particular point in +// the commitment chain. With each state transition, a snapshot of the current +// state along with all non-settled HTLC's are recorded. +// TODO(roasbeef): should only need the key instead of hash after refactor +// within state machine. +// * won't actually be needed if it's a past state? +type ChannelDelta struct { + RevocationHash [32]byte + RevocationKey *btcec.PublicKey + + LocalBalance btcutil.Amount + RemoteBalance btcutil.Amount + + Htlcs []*HTLC + + UpdateNum uint32 +} + +// RecordChannelDelta records the new state transition within an on-disk +// append-only log which records all state transitions. Additionally, the +// internal balances and update counter of the target OpenChannel are updated +// accordingly based on the passed delta. +func (c *OpenChannel) RecordChannelDelta(delta *ChannelDelta) error { + return c.Db.store.Update(func(tx *bolt.Tx) error { + chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) + if err != nil { + return err + } + + id := c.TheirLNID[:] + nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id) + if nodeChanBucket == nil { + return ErrNoActiveChannels + } + + c.OurBalance = delta.LocalBalance + c.TheirBalance = delta.RemoteBalance + c.NumUpdates = uint64(delta.UpdateNum) + + // First we'll write out the current latest dynamic channel + // state: the current channel balance, the number of updates, + // and our latest commitment transaction+sig. + if err := putChanCapacity(chanBucket, c); err != nil { + return err + } + if err := putChanNumUpdates(chanBucket, c); err != nil { + return err + } + if err := putChanCommitTxns(nodeChanBucket, c); err != nil { + return err + } + + // With the current state updated, append a new log entry + // recording this the delta of this state transition. + // TODO(roasbeef): could make the deltas relative, would save + // space, but then tradeoff for more disk-seeks to recover the + // full state. + logKey := channelLogBucket + logBucket, err := nodeChanBucket.CreateBucketIfNotExists(logKey) + if err != nil { + return err + } + + return appendChannelLogEntry(logBucket, delta, c.ChanID) + }) +} + +// FindPreviousState scans through the append-only log in an attempt to recover +// the previous channel state indicated by the update number. This method is +// intended to be used for obtaining the relevant data needed to claim all +// funds rightfully spendable in the case of an on-chain broadcast of the +// commitment transaction. +func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelDelta, error) { + delta := &ChannelDelta{} + + err := c.Db.store.View(func(tx *bolt.Tx) error { + chanBucket := tx.Bucket(openChannelBucket) + + nodeChanBucket := chanBucket.Bucket(c.TheirLNID[:]) + if nodeChanBucket == nil { + return ErrNoActiveChannels + } + + logBucket := nodeChanBucket.Bucket(channelLogBucket) + if nodeChanBucket == nil { + return ErrNoPastDeltas + } + + var err error + delta, err = fetchChannelLogEntry(logBucket, c.ChanID, + uint32(updateNum)) + + return err + }) + if err != nil { + return nil, err + } + + return delta, nil +} + // CloseChannel closes a previously active lightning channel. Closing a channel // entails deleting all saved state within the database concerning this // channel, as well as created a small channel summary for record keeping @@ -267,7 +388,7 @@ func (c *OpenChannel) CloseChannel() error { } // Now that the index to this channel has been deleted, purge - // the remaining channel meta-data from the databse. + // the remaining channel meta-data from the database. if err := deleteOpenChannel(chanBucket, nodeChanBucket, outPointBytes); err != nil { return err @@ -282,8 +403,6 @@ func (c *OpenChannel) CloseChannel() error { // ChannelSnapshot is a frozen snapshot of the current channel state. A // snapshot is detached from the original channel that generated it, providing // read-only access to the current or prior state of an active channel. -// TODO(roasbeef): methods to roll forwards/backwards in state etc -// * use botldb cursor? type ChannelSnapshot struct { RemoteID [wire.HashSize]byte @@ -299,8 +418,6 @@ type ChannelSnapshot struct { TotalSatoshisReceived uint64 // TODO(roasbeef): fee stuff - - // TODO(roasbeef): active HTLC's + their direction updateNum uint64 channel *OpenChannel } @@ -320,32 +437,12 @@ func (c *OpenChannel) Snapshot() *ChannelSnapshot { } copy(snapshot.RemoteID[:], c.TheirLNID[:]) + // TODO(roasbeef): cache current channel delta in memory, either merge + // or replace with ChannelSnapshot + return snapshot } -// FindPreviousState... -// TODO(roasbeef): method to retrieve both old commitment txns given update # -func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelSnapshot, error) { - return nil, nil -} - -// ChannelDelta... -// TODO(roasbeef): binlog like entry? -type ChannelDelta struct { - // change in allocations - // added + removed htlcs - // index -} - -// RecordChannelDelta -// TODO(roasbeef): only need their commit? -// * or as internal helper func to UpdateState func? -func (c OpenChannel) RecordChannelDelta(theirRevokedCommit *wire.MsgTx, updateNum uint64) error { - // TODO(roasbeef): record all HTLCs, pass those instead? - // * - return nil -} - func putClosedChannelSummary(tx *bolt.Tx, chanID []byte) error { // For now, a summary of a closed channel simply involves recording the // outpoint of the funding transaction. @@ -870,6 +967,7 @@ func putChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error return err } + // TODO(roasbeef): should move this into putChanFundingInfo scratch := make([]byte, 4) byteOrder.PutUint32(scratch, channel.LocalCsvDelay) if _, err := b.Write(scratch); err != nil { @@ -1168,9 +1266,212 @@ func fetchChanDeliveryScripts(nodeChanBucket *bolt.Bucket, channel *OpenChannel) return nil } +// htlcDiskSize represents the number of btyes a serialized HTLC takes up on +// disk. The size of an HTLC on disk is 49 bytes total: incoming (1) + amt (8) +// + rhash (32) + timeouts (8) +const htlcDiskSize = 1 + 8 + 32 + 4 + 4 + +func serializeHTLC(w io.Writer, h *HTLC) error { + var buf [htlcDiskSize]byte + + var boolByte [1]byte + if h.Incoming { + boolByte[0] = 1 + } else { + boolByte[0] = 0 + } + + var n int + n += copy(buf[:], boolByte[:]) + byteOrder.PutUint64(buf[n:], uint64(h.Amt)) + n += 8 + n += copy(buf[n:], h.RHash[:]) + byteOrder.PutUint32(buf[n:], h.RefundTimeout) + n += 4 + byteOrder.PutUint32(buf[n:], h.RevocationTimeout) + n += 4 + + if _, err := w.Write(buf[:]); err != nil { + return err + } + + return nil +} + +func deserializeHTLC(r io.Reader) (*HTLC, error) { + h := &HTLC{} + + var scratch [8]byte + + if _, err := r.Read(scratch[:1]); err != nil { + return nil, err + } + if scratch[0] == 1 { + h.Incoming = true + } else { + h.Incoming = false + } + + if _, err := r.Read(scratch[:]); err != nil { + return nil, err + } + h.Amt = btcutil.Amount(byteOrder.Uint64(scratch[:])) + + if _, err := r.Read(h.RHash[:]); err != nil { + return nil, err + } + + if _, err := r.Read(scratch[:4]); err != nil { + return nil, err + } + h.RefundTimeout = byteOrder.Uint32(scratch[:4]) + + if _, err := r.Read(scratch[:4]); err != nil { + return nil, err + } + h.RevocationTimeout = byteOrder.Uint32(scratch[:]) + + return h, nil +} + +func serializeChannelDelta(w io.Writer, delta *ChannelDelta) error { + if _, err := w.Write(delta.RevocationHash[:]); err != nil { + return err + } + serializeKey := delta.RevocationKey.SerializeCompressed() + if _, err := w.Write(serializeKey); err != nil { + return err + } + + // TODO(roasbeef): could use compression here to reduce on-disk space. + var scratch [8]byte + byteOrder.PutUint64(scratch[:], uint64(delta.LocalBalance)) + if _, err := w.Write(scratch[:]); err != nil { + return err + } + byteOrder.PutUint64(scratch[:], uint64(delta.RemoteBalance)) + if _, err := w.Write(scratch[:]); err != nil { + return err + } + + byteOrder.PutUint32(scratch[:4], delta.UpdateNum) + if _, err := w.Write(scratch[:4]); err != nil { + return err + } + + numHtlcs := uint64(len(delta.Htlcs)) + if err := wire.WriteVarInt(w, 0, numHtlcs); err != nil { + return err + } + for _, htlc := range delta.Htlcs { + if err := serializeHTLC(w, htlc); err != nil { + return err + } + } + + return nil +} + +func deserializeChannelDelta(r io.Reader) (*ChannelDelta, error) { + var ( + err error + scratch [8]byte + key [33]byte + ) + + delta := &ChannelDelta{} + if _, err = r.Read(delta.RevocationHash[:]); err != nil { + return nil, err + } + + if _, err = r.Read(key[:]); err != nil { + return nil, err + } + delta.RevocationKey, err = btcec.ParsePubKey(key[:], btcec.S256()) + if err != nil { + return nil, err + } + + if _, err := r.Read(scratch[:]); err != nil { + return nil, err + } + delta.LocalBalance = btcutil.Amount(byteOrder.Uint64(scratch[:])) + if _, err := r.Read(scratch[:]); err != nil { + return nil, err + } + delta.RemoteBalance = btcutil.Amount(byteOrder.Uint64(scratch[:])) + + if _, err := r.Read(scratch[:4]); err != nil { + return nil, err + } + delta.UpdateNum = byteOrder.Uint32(scratch[:4]) + + numHtlcs, err := wire.ReadVarInt(r, 0) + if err != nil { + return nil, err + } + delta.Htlcs = make([]*HTLC, numHtlcs) + for i := uint64(0); i < numHtlcs; i++ { + htlc, err := deserializeHTLC(r) + if err != nil { + return nil, err + } + + delta.Htlcs[i] = htlc + } + + return delta, nil +} + +func appendChannelLogEntry(log *bolt.Bucket, delta *ChannelDelta, + chanPoint *wire.OutPoint) error { + + // First construct the key for this particular log entry. The key for + // each newly added log entry is: channelPoint || stateNum. + var logEntrykey [36 + 4]byte + copy(logEntrykey[:], chanPoint.Hash[:]) + var scratch [4]byte + byteOrder.PutUint32(scratch[:], delta.UpdateNum) + copy(logEntrykey[36:], scratch[:]) + + // With the key constructed, serialize the delta to raw bytes, then + // write the new state to disk. + var b bytes.Buffer + if err := serializeChannelDelta(&b, delta); err != nil { + return err + } + + return log.Put(logEntrykey[:], b.Bytes()) +} + +func fetchChannelLogEntry(log *bolt.Bucket, chanPoint *wire.OutPoint, + updateNum uint32) (*ChannelDelta, error) { + + // First construct the key for this particular log entry. The key for + // each newly added log entry is: channelPoint || stateNum. + // TODO(roasbeef): make into func.. + var logEntrykey [36 + 4]byte + copy(logEntrykey[:], chanPoint.Hash[:]) + var scratch [4]byte + byteOrder.PutUint32(scratch[:], updateNum) + copy(logEntrykey[36:], scratch[:]) + + deltaBytes := log.Get(logEntrykey[:]) + if deltaBytes == nil { + return nil, fmt.Errorf("log entry not found") + } + + deltaReader := bytes.NewReader(deltaBytes) + + return deserializeChannelDelta(deltaReader) +} + func writeOutpoint(w io.Writer, o *wire.OutPoint) error { + // TODO(roasbeef): make all scratch buffers on the stack scratch := make([]byte, 4) + // TODO(roasbeef): write raw 32 bytes instead of wasting the extra + // byte. if err := wire.WriteVarBytes(w, 0, o.Hash[:]); err != nil { return err } diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index b7ce9345..2afe3abf 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/elkrem" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg" @@ -76,35 +77,44 @@ var ( Hash: key, Index: 0, } + privKey, pubKey = btcec.PrivKeyFromBytes(btcec.S256(), key[:]) ) -func TestOpenChannelPutGetDelete(t *testing.T) { +// makeTestDB creates a new instance of the ChannelDB for testing purposes. A +// callback which cleans up the created temporary directories is also returned +// and intended to be executed after the test completes. +func makeTestDB() (*DB, func(), error) { // First, create a temporary directory to be used for the duration of // this test. - // TODO(roasbeef): move initial set up to something within testing.Main tempDirName, err := ioutil.TempDir("", "channeldb") if err != nil { - t.Fatalf("unable to create temp dir: %v") + return nil, nil, err } - defer os.RemoveAll(tempDirName) // Next, create channeldb for the first time, also setting a mock // EncryptorDecryptor implementation for testing purposes. cdb, err := Open(tempDirName, netParams) if err != nil { - t.Fatalf("unable to create channeldb: %v", err) + return nil, nil, err } - defer cdb.Close() - privKey, pubKey := btcec.PrivKeyFromBytes(btcec.S256(), key[:]) + cleanUp := func() { + os.RemoveAll(tempDirName) + cdb.Close() + } + + return cdb, cleanUp, nil +} + +func createTestChannelState(cdb *DB) (*OpenChannel, error) { addr, err := btcutil.NewAddressPubKey(pubKey.SerializeCompressed(), netParams) if err != nil { - t.Fatalf("unable to create delivery address") + return nil, err } script, err := txscript.MultiSigScript([]*btcutil.AddressPubKey{addr, addr}, 2) if err != nil { - t.Fatalf("unable to create redeemScript") + return nil, err } // Simulate 1000 channel updates via progression of the elkrem @@ -114,15 +124,15 @@ func TestOpenChannelPutGetDelete(t *testing.T) { for i := 0; i < 1000; i++ { preImage, err := sender.AtIndex(uint64(i)) if err != nil { - t.Fatalf("unable to progress elkrem sender: %v", err) + return nil, err } if receiver.AddNext(preImage); err != nil { - t.Fatalf("unable to progress elkrem receiver: %v", err) + return nil, err } } - state := OpenChannel{ + return &OpenChannel{ TheirLNID: key, ChanID: id, MinFeePerKb: btcutil.Amount(5000), @@ -145,14 +155,26 @@ func TestOpenChannelPutGetDelete(t *testing.T) { TheirDeliveryScript: script, LocalCsvDelay: 5, RemoteCsvDelay: 9, - NumUpdates: 1, + NumUpdates: 0, TotalSatoshisSent: 8, TotalSatoshisReceived: 2, TotalNetFees: 9, CreationTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), Db: cdb, - } + }, nil +} +func TestOpenChannelPutGetDelete(t *testing.T) { + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("uanble to make test database: %v", err) + } + defer cleanUp() + + state, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } if err := state.FullSync(); err != nil { t.Fatalf("unable to save and serialize channel state: %v", err) } @@ -302,5 +324,121 @@ func TestOpenChannelPutGetDelete(t *testing.T) { } } -func TestOpenChannelEncodeDecodeCorruption(t *testing.T) { +func TestChannelStateUpdateLog(t *testing.T) { + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("uanble to make test database: %v", err) + } + defer cleanUp() + + // First create a minimal channel, then perform a full sync in order to + // persist the data. + channel, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + if err := channel.FullSync(); err != nil { + t.Fatalf("unable to save and serialize channel state: %v", err) + } + + // Add some HTLC's which were added during this new state transition. + // Half of the HTLC's are incoming, while the other half are outgoing. + var htlcs []*HTLC + for i := uint32(0); i < 10; i++ { + var incoming bool + if i > 5 { + incoming = true + } + htlc := &HTLC{ + Incoming: incoming, + Amt: 50000, + RHash: key, + RefundTimeout: i, + RevocationTimeout: i + 2, + } + htlcs = append(htlcs, htlc) + } + + // Create a new channel delta which includes the above HTLC's, some + // balance updates, and an increment of the current commitment height. + // Additionally, modify the signature and commitment transaction. + newSequence := uint32(129498) + newSig := bytes.Repeat([]byte{3}, 71) + delta := &ChannelDelta{ + RevocationHash: key, + RevocationKey: pubKey, + LocalBalance: btcutil.Amount(1e8), + RemoteBalance: btcutil.Amount(1e8), + Htlcs: htlcs, + UpdateNum: 1, + } + channel.OurCommitTx.TxIn[0].Sequence = newSequence + channel.OurCommitSig = newSig + if err := channel.RecordChannelDelta(delta); err != nil { + t.Fatalf("unable to record channel delta: %v", err) + } + + // The balances, new update, and the changes to the fake commitment + // transaction along with the modified signature should all have been + // updated. + nodeID := wire.ShaHash(channel.TheirLNID) + updatedChannel, err := cdb.FetchOpenChannels(&nodeID) + if err != nil { + t.Fatalf("unable to fetch updated channel: %v", err) + } + if !bytes.Equal(updatedChannel[0].OurCommitSig, newSig) { + t.Fatalf("sigs don't match %x vs %x", + updatedChannel[0].OurCommitSig, newSig) + } + if updatedChannel[0].OurCommitTx.TxIn[0].Sequence != newSequence { + t.Fatalf("sequence numbers don't match: %v vs %v", + updatedChannel[0].OurCommitTx.TxIn[0].Sequence, newSequence) + } + if updatedChannel[0].OurBalance != delta.LocalBalance { + t.Fatalf("local balances don't match: %v vs %v", + updatedChannel[0].OurBalance, delta.LocalBalance) + } + if updatedChannel[0].TheirBalance != delta.RemoteBalance { + t.Fatalf("remote balances don't match: %v vs %v", + updatedChannel[0].TheirBalance, delta.RemoteBalance) + } + if updatedChannel[0].NumUpdates != uint64(delta.UpdateNum) { + t.Fatalf("update # doesn't match: %v vs %v", + updatedChannel[0].NumUpdates, delta.UpdateNum) + } + + // We should be able to fetch the channel delta created above by it's + // update number with all the state properly reconstructed. + diskDelta, err := channel.FindPreviousState(uint64(delta.UpdateNum)) + if err != nil { + t.Fatalf("unable to fetch past delta: %v", err) + } + + // The two deltas (the original vs the on-disk version) should + // identical, and all HTLC data should properly be retained. + if !bytes.Equal(delta.RevocationHash[:], diskDelta.RevocationHash[:]) { + t.Fatalf("revocation hashes don't match") + } + if !bytes.Equal(delta.RevocationKey.SerializeCompressed(), + diskDelta.RevocationKey.SerializeCompressed()) { + t.Fatalf("revocation keys don't match") + } + if delta.LocalBalance != diskDelta.LocalBalance { + t.Fatalf("local balances don't match") + } + if delta.RemoteBalance != diskDelta.RemoteBalance { + t.Fatalf("remote balances don't match") + } + if delta.UpdateNum != diskDelta.UpdateNum { + t.Fatalf("update number doesn't match") + } + for i := 0; i < len(delta.Htlcs); i++ { + originalHTLC := delta.Htlcs[i] + diskHTLC := diskDelta.Htlcs[i] + if !reflect.DeepEqual(originalHTLC, diskHTLC) { + t.Fatalf("htlc's dont match: %v vs %v", + spew.Sdump(originalHTLC), + spew.Sdump(diskHTLC)) + } + } } diff --git a/channeldb/error.go b/channeldb/error.go index 4a87cba3..a60876b1 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -7,4 +7,5 @@ var ( ErrNoActiveChannels = fmt.Errorf("no active channels exist") ErrChannelNoExist = fmt.Errorf("this channel does not exist") + ErrNoPastDeltas = fmt.Errorf("channel has no recorded deltas") ) diff --git a/lnwallet/script_utils.go b/lnwallet/script_utils.go index 01dff4e6..3eafcba1 100644 --- a/lnwallet/script_utils.go +++ b/lnwallet/script_utils.go @@ -178,7 +178,7 @@ func senderHTLCScript(absoluteTimeout, relativeTimeout uint32, senderKey, // Alternatively, the receiver can place a 0 as the second item of the // witness stack if they wish to claim the HTLC with the proper // pre-image as normal. In order to prevent an over-sized pre-image - // attack (which can create undesirable redemption asymmerties, we + // attack (which can create undesirable redemption asymmerties), we // strongly require that all HTLC pre-images are exactly 32 bytes. builder.AddOp(txscript.OP_ELSE) builder.AddOp(txscript.OP_SIZE)