diff --git a/channeldb/channel.go b/channeldb/channel.go index b3230550..0d7e6cd6 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -75,6 +75,10 @@ var ( // commitment transactions in addition to the csvDelay for both. commitTxnsKey = []byte("ctk") + // currentHtlcKey stores the set of fully locked-in HTLC's on our + // latest commitment state. + currentHtlcKey = []byte("chk") + // fundingTxnKey stroes the funding tx, our encrypted multi-sig key, // and finally 2-of-2 multisig redeem script. fundingTxnKey = []byte("fsk") @@ -100,12 +104,8 @@ type OpenChannel struct { TheirLNID [wire.HashSize]byte // The ID of a channel is the txid of the funding transaction. - ChanID *wire.OutPoint - + ChanID *wire.OutPoint MinFeePerKb btcutil.Amount - // Our reserve. Assume symmetric reserve amounts. Only needed if the - // funding type is CLTV. - //ReserveAmount btcutil.Amount // Keys for both sides to be used for the commitment transactions. OurCommitKey *btcec.PublicKey @@ -152,6 +152,8 @@ type OpenChannel struct { TotalNetFees uint64 // TODO(roasbeef): total fees paid too? CreationTime time.Time // TODO(roasbeef): last update time? + Htlcs []*HTLC + // TODO(roasbeef): eww Db *DB @@ -165,6 +167,9 @@ type OpenChannel struct { // NOTE: This method requires an active EncryptorDecryptor to be registered in // order to encrypt sensitive information. func (c *OpenChannel) FullSync() error { + c.Lock() + defer c.Unlock() + return c.Db.store.Update(func(tx *bolt.Tx) error { // TODO(roasbeef): add helper funcs to create scoped update // First fetch the top level bucket which stores all data related to @@ -192,34 +197,61 @@ func (c *OpenChannel) FullSync() error { return err } if chanIDBucket.Get(b.Bytes()) == nil { - chanIDBucket.Put(b.Bytes(), nil) + if err := chanIDBucket.Put(b.Bytes(), nil); err != nil { + return err + } } return putOpenChannel(chanBucket, nodeChanBucket, c) }) } -// SyncRevocation writes to disk the current revocation state of the channel. -// The revocation state is defined as the current elkrem receiver, and the -// latest unrevoked key+hash for the remote party. -func (c *OpenChannel) SyncRevocation() error { +// UpdateCommitment updates the on-disk state of our currently broadcastable +// commitment state. This method is to be called once we have revoked our prior +// commitment state, accepting the new state as defined by the passed +// parameters. +func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx, + newSig []byte, delta *ChannelDelta) error { + + c.Lock() + defer c.Unlock() + return c.Db.store.Update(func(tx *bolt.Tx) error { - // First fetch the top level bucket which stores all data related to - // current, active channels. chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) if err != nil { return err } - // Within this top level bucket, fetch the bucket dedicated to storing - // open channel data specific to the remote node. - nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(c.TheirLNID[:]) + id := c.TheirLNID[:] + nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id) if err != nil { return err } - // Sync the current elkrem state to disk. - if err := putChanEklremState(nodeChanBucket, c); err != nil { + // TODO(roasbeef): modify the funcs below to take values + // directly, otherwise need to roll back to prior state. Could + // also make copy above, then modify to pass in. + c.OurCommitTx = newCommitment + c.OurCommitSig = newSig + c.OurBalance = delta.LocalBalance + c.TheirBalance = delta.RemoteBalance + c.NumUpdates = uint64(delta.UpdateNum) + c.Htlcs = delta.Htlcs + + // First we'll write out the current latest dynamic channel + // state: the current channel balance, the number of updates, + // and our latest commitment transaction+sig. + // TODO(roasbeef): re-make schema s.t this is a single put + if err := putChanCapacity(chanBucket, c); err != nil { + return err + } + if err := putChanNumUpdates(chanBucket, c); err != nil { + return err + } + if err := putChanCommitTxns(nodeChanBucket, c); err != nil { + return err + } + if err := putCurrentHtlcs(nodeChanBucket, delta.Htlcs, c.ChanID); err != nil { return err } @@ -245,11 +277,24 @@ type HTLC struct { // must wait before reclaiming the funds in limbo. RefundTimeout uint32 - // RevocationTimeout is the relative timeout the party who broadcasts + // RevocationDelay is the relative timeout the party who broadcasts // the commitment transaction must wait before being able to fully // sweep the funds on-chain in the case of a unilateral channel // closure. - RevocationTimeout uint32 + RevocationDelay uint32 +} + +// Copy returns a full copy of the target HTLC. +func (h *HTLC) Copy() HTLC { + clone := HTLC{ + Incoming: h.Incoming, + Amt: h.Amt, + RefundTimeout: h.RefundTimeout, + RevocationDelay: h.RevocationDelay, + } + copy(clone.RHash[:], h.RHash[:]) + + return clone } // ChannelDelta is a snapshot of the commitment state at a particular point in @@ -263,13 +308,12 @@ type ChannelDelta struct { Htlcs []*HTLC } -// RecordChannelDelta records the new state transition within an on-disk -// append-only log which records all state transitions. Additionally, the -// internal balances and update counter of the target OpenChannel are updated -// accordingly based on the passed delta. -func (c *OpenChannel) RecordChannelDelta(newCommitment *wire.MsgTx, - newSig []byte, delta *ChannelDelta) error { - +// AppendToRevocationLog records the new state transition within an on-disk +// append-only log which records all state transitions by the remote peer. In +// the case of an uncooperative broadcast of a prior state by the remote peer, +// this log can be consulted in order to reconstruct the state needed to +// rectify the situation. +func (c *OpenChannel) AppendToRevocationLog(delta *ChannelDelta) error { return c.Db.store.Update(func(tx *bolt.Tx) error { chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) if err != nil { @@ -278,32 +322,19 @@ func (c *OpenChannel) RecordChannelDelta(newCommitment *wire.MsgTx, id := c.TheirLNID[:] nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id) - if nodeChanBucket == nil { - return ErrNoActiveChannels - } - - // TODO(roasbeef): revisit in-line mutation - c.OurCommitTx = newCommitment - c.OurBalance = delta.LocalBalance - c.TheirBalance = delta.RemoteBalance - c.OurCommitSig = newSig - c.NumUpdates = uint64(delta.UpdateNum) - - // First we'll write out the current latest dynamic channel - // state: the current channel balance, the number of updates, - // and our latest commitment transaction+sig. - if err := putChanCapacity(chanBucket, c); err != nil { - return err - } - if err := putChanNumUpdates(chanBucket, c); err != nil { - return err - } - if err := putChanCommitTxns(nodeChanBucket, c); err != nil { + if err != nil { return err } - // With the current state updated, append a new log entry - // recording this the delta of this state transition. + // Persist the latest elkrem state to disk as the remote peer + // has just added to our local elkrem receiver, and given us a + // new pending revocation key. + if err := putChanElkremState(nodeChanBucket, c); err != nil { + return err + } + + // With the current elkrem state updated, append a new log + // entry recording this the delta of this state transition. // TODO(roasbeef): could make the deltas relative, would save // space, but then tradeoff for more disk-seeks to recover the // full state. @@ -355,6 +386,7 @@ func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelDelta, error) // entails deleting all saved state within the database concerning this // channel, as well as created a small channel summary for record keeping // purposes. +// TODO(roasbeef): delete on-disk set of HTLC's func (c *OpenChannel) CloseChannel() error { return c.Db.store.Update(func(tx *bolt.Tx) error { // First fetch the top level bucket which stores all data related to @@ -415,15 +447,16 @@ type ChannelSnapshot struct { TotalSatoshisSent uint64 TotalSatoshisReceived uint64 - // TODO(roasbeef): fee stuff - updateNum uint64 - channel *OpenChannel + Htlcs []HTLC } // Snapshot returns a read-only snapshot of the current channel state. This // snapshot includes information concerning the current settled balance within // the channel, meta-data detailing total flows, and any outstanding HTLCs. func (c *OpenChannel) Snapshot() *ChannelSnapshot { + c.RLock() + defer c.RUnlock() + snapshot := &ChannelSnapshot{ ChannelPoint: c.ChanID, Capacity: c.Capacity, @@ -435,8 +468,12 @@ func (c *OpenChannel) Snapshot() *ChannelSnapshot { } copy(snapshot.RemoteID[:], c.TheirLNID[:]) - // TODO(roasbeef): cache current channel delta in memory, either merge - // or replace with ChannelSnapshot + // Copy over the current set of HTLC's to ensure the caller can't + // mutate our internal state. + snapshot.Htlcs = make([]HTLC, len(c.Htlcs)) + for i, h := range c.Htlcs { + snapshot.Htlcs[i] = h.Copy() + } return snapshot } @@ -491,12 +528,16 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err := putChanFundingInfo(nodeChanBucket, channel); err != nil { return err } - if err := putChanEklremState(nodeChanBucket, channel); err != nil { + if err := putChanElkremState(nodeChanBucket, channel); err != nil { return err } if err := putChanDeliveryScripts(nodeChanBucket, channel); err != nil { return err } + if err := putCurrentHtlcs(nodeChanBucket, channel.Htlcs, + channel.ChanID); err != nil { + return err + } return nil } @@ -508,46 +549,51 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, chanID *wire.OutPoint) (*OpenChannel, error) { + var err error channel := &OpenChannel{ ChanID: chanID, } // First, read out the fields of the channel update less frequently. - if err := fetchChannelIDs(nodeChanBucket, channel); err != nil { + if err = fetchChannelIDs(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanCommitKeys(nodeChanBucket, channel); err != nil { + if err = fetchChanCommitKeys(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanCommitTxns(nodeChanBucket, channel); err != nil { + if err = fetchChanCommitTxns(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanFundingInfo(nodeChanBucket, channel); err != nil { + if err = fetchChanFundingInfo(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanEklremState(nodeChanBucket, channel); err != nil { + if err = fetchChanElkremState(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanDeliveryScripts(nodeChanBucket, channel); err != nil { + if err = fetchChanDeliveryScripts(nodeChanBucket, channel); err != nil { + return nil, err + } + channel.Htlcs, err = fetchCurrentHtlcs(nodeChanBucket, chanID) + if err != nil { return nil, err } // With the existence of an open channel bucket with this node verified, // perform a full read of the entire struct. Starting with the prefixed // fields residing in the parent bucket. - if err := fetchChanCapacity(openChanBucket, channel); err != nil { + if err = fetchChanCapacity(openChanBucket, channel); err != nil { return nil, err } - if err := fetchChanMinFeePerKb(openChanBucket, channel); err != nil { + if err = fetchChanMinFeePerKb(openChanBucket, channel); err != nil { return nil, err } - if err := fetchChanNumUpdates(openChanBucket, channel); err != nil { + if err = fetchChanNumUpdates(openChanBucket, channel); err != nil { return nil, err } - if err := fetchChanTotalFlow(openChanBucket, channel); err != nil { + if err = fetchChanTotalFlow(openChanBucket, channel); err != nil { return nil, err } - if err := fetchChanNetFee(openChanBucket, channel); err != nil { + if err = fetchChanNetFee(openChanBucket, channel); err != nil { return nil, err } @@ -589,7 +635,7 @@ func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err := deleteChanFundingInfo(nodeChanBucket, channelID); err != nil { return err } - if err := deleteChanEklremState(nodeChanBucket, channelID); err != nil { + if err := deleteChanElkremState(nodeChanBucket, channelID); err != nil { return err } if err := deleteChanDeliveryScripts(nodeChanBucket, channelID); err != nil { @@ -1118,7 +1164,7 @@ func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) err return nil } -func putChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { +func putChanElkremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { var bc bytes.Buffer if err := writeOutpoint(&bc, channel.ChanID); err != nil { return err @@ -1157,14 +1203,14 @@ func putChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error return nodeChanBucket.Put(elkremKey, b.Bytes()) } -func deleteChanEklremState(nodeChanBucket *bolt.Bucket, chanID []byte) error { +func deleteChanElkremState(nodeChanBucket *bolt.Bucket, chanID []byte) error { elkremKey := make([]byte, len(elkremStateKey)+len(chanID)) copy(elkremKey[:3], elkremStateKey) copy(elkremKey[3:], chanID) return nodeChanBucket.Delete(elkremKey) } -func fetchChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { +func fetchChanElkremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer if err := writeOutpoint(&b, channel.ChanID); err != nil { return err @@ -1286,7 +1332,7 @@ func serializeHTLC(w io.Writer, h *HTLC) error { n += copy(buf[n:], h.RHash[:]) byteOrder.PutUint32(buf[n:], h.RefundTimeout) n += 4 - byteOrder.PutUint32(buf[n:], h.RevocationTimeout) + byteOrder.PutUint32(buf[n:], h.RevocationDelay) n += 4 if _, err := w.Write(buf[:]); err != nil { @@ -1327,11 +1373,65 @@ func deserializeHTLC(r io.Reader) (*HTLC, error) { if _, err := r.Read(scratch[:4]); err != nil { return nil, err } - h.RevocationTimeout = byteOrder.Uint32(scratch[:]) + h.RevocationDelay = byteOrder.Uint32(scratch[:]) return h, nil } +func makeHtlcKey(o *wire.OutPoint) [39]byte { + var ( + n int + k [39]byte + ) + + // chk || txid || index + n += copy(k[:], currentHtlcKey) + n += copy(k[n:], o.Hash[:]) + var scratch [4]byte + byteOrder.PutUint32(scratch[:], o.Index) + copy(k[n:], scratch[:]) + + return k +} + +func putCurrentHtlcs(nodeChanBucket *bolt.Bucket, htlcs []*HTLC, + o *wire.OutPoint) error { + var b bytes.Buffer + + for _, htlc := range htlcs { + if err := serializeHTLC(&b, htlc); err != nil { + return err + } + } + + htlcKey := makeHtlcKey(o) + return nodeChanBucket.Put(htlcKey[:], b.Bytes()) +} + +func fetchCurrentHtlcs(nodeChanBucket *bolt.Bucket, + o *wire.OutPoint) ([]*HTLC, error) { + + htlcKey := makeHtlcKey(o) + htlcBytes := nodeChanBucket.Get(htlcKey[:]) + if htlcBytes == nil { + return nil, nil + } + + // TODO(roasbeef): can preallocate here + var htlcs []*HTLC + htlcReader := bytes.NewReader(htlcBytes) + for htlcReader.Len() != 0 { + htlc, err := deserializeHTLC(htlcReader) + if err != nil { + return nil, err + } + + htlcs = append(htlcs, htlc) + } + + return htlcs, nil +} + func serializeChannelDelta(w io.Writer, delta *ChannelDelta) error { // TODO(roasbeef): could use compression here to reduce on-disk space. var scratch [8]byte @@ -1401,39 +1501,41 @@ func deserializeChannelDelta(r io.Reader) (*ChannelDelta, error) { return delta, nil } +func makeLogKey(o *wire.OutPoint, updateNum uint32) [40]byte { + var ( + scratch [4]byte + n int + k [40]byte + ) + + n += copy(k[:], o.Hash[:]) + + byteOrder.PutUint32(scratch[:], o.Index) + copy(k[n:], scratch[:]) + n += 4 + + byteOrder.PutUint32(scratch[:], updateNum) + copy(k[n:], scratch[:]) + + return k +} + func appendChannelLogEntry(log *bolt.Bucket, delta *ChannelDelta, chanPoint *wire.OutPoint) error { - // First construct the key for this particular log entry. The key for - // each newly added log entry is: channelPoint || stateNum. - var logEntrykey [36 + 4]byte - copy(logEntrykey[:], chanPoint.Hash[:]) - var scratch [4]byte - byteOrder.PutUint32(scratch[:], delta.UpdateNum) - copy(logEntrykey[36:], scratch[:]) - - // With the key constructed, serialize the delta to raw bytes, then - // write the new state to disk. var b bytes.Buffer if err := serializeChannelDelta(&b, delta); err != nil { return err } + logEntrykey := makeLogKey(chanPoint, delta.UpdateNum) return log.Put(logEntrykey[:], b.Bytes()) } func fetchChannelLogEntry(log *bolt.Bucket, chanPoint *wire.OutPoint, updateNum uint32) (*ChannelDelta, error) { - // First construct the key for this particular log entry. The key for - // each newly added log entry is: channelPoint || stateNum. - // TODO(roasbeef): make into func.. - var logEntrykey [36 + 4]byte - copy(logEntrykey[:], chanPoint.Hash[:]) - var scratch [4]byte - byteOrder.PutUint32(scratch[:], updateNum) - copy(logEntrykey[36:], scratch[:]) - + logEntrykey := makeLogKey(chanPoint, updateNum) deltaBytes := log.Get(logEntrykey[:]) if deltaBytes == nil { return nil, fmt.Errorf("log entry not found") diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 5f5e09f3..41419d71 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -171,10 +171,21 @@ func TestOpenChannelPutGetDelete(t *testing.T) { } defer cleanUp() + // Create the test channel state, then add an additional fake HTLC + // before syncing to disk. state, err := createTestChannelState(cdb) if err != nil { t.Fatalf("unable to create channel state: %v", err) } + state.Htlcs = []*HTLC{ + &HTLC{ + Incoming: true, + Amt: 10, + RHash: key, + RefundTimeout: 1, + RevocationDelay: 2, + }, + } if err := state.FullSync(); err != nil { t.Fatalf("unable to save and serialize channel state: %v", err) } @@ -302,6 +313,10 @@ func TestOpenChannelPutGetDelete(t *testing.T) { if !bytes.Equal(newState.TheirCurrentRevocationHash[:], state.TheirCurrentRevocationHash[:]) { t.Fatalf("revocation hashes don't match") } + if !reflect.DeepEqual(state.Htlcs[0], newState.Htlcs[0]) { + t.Fatalf("htlcs don't match: %v vs %v", spew.Sdump(state.Htlcs[0]), + spew.Sdump(newState.Htlcs[0])) + } // Finally to wrap up the test, delete the state of the channel within // the database. This involves "closing" the channel which removes all @@ -324,7 +339,7 @@ func TestOpenChannelPutGetDelete(t *testing.T) { } } -func TestChannelStateUpdateLog(t *testing.T) { +func TestChannelStateTransition(t *testing.T) { cdb, cleanUp, err := makeTestDB() if err != nil { t.Fatalf("uanble to make test database: %v", err) @@ -350,11 +365,11 @@ func TestChannelStateUpdateLog(t *testing.T) { incoming = true } htlc := &HTLC{ - Incoming: incoming, - Amt: 50000, - RHash: key, - RefundTimeout: i, - RevocationTimeout: i + 2, + Incoming: incoming, + Amt: 50000, + RHash: key, + RefundTimeout: i, + RevocationDelay: i + 2, } htlcs = append(htlcs, htlc) } @@ -372,13 +387,15 @@ func TestChannelStateUpdateLog(t *testing.T) { Htlcs: htlcs, UpdateNum: 1, } - if err := channel.RecordChannelDelta(newTx, newSig, delta); err != nil { - t.Fatalf("unable to record channel delta: %v", err) + + // First update the local node's broadcastable state. + if err := channel.UpdateCommitment(newTx, newSig, delta); err != nil { + t.Fatalf("unable to update commitment: %v", err) } - // The balances, new update, and the changes to the fake commitment - // transaction along with the modified signature should all have been - // updated. + // The balances, new update, the HTLC's and the changes to the fake + // commitment transaction along with the modified signature should all + // have been updated. nodeID := wire.ShaHash(channel.TheirLNID) updatedChannel, err := cdb.FetchOpenChannels(&nodeID) if err != nil { @@ -404,6 +421,25 @@ func TestChannelStateUpdateLog(t *testing.T) { t.Fatalf("update # doesn't match: %v vs %v", updatedChannel[0].NumUpdates, delta.UpdateNum) } + for i := 0; i < len(updatedChannel[0].Htlcs); i++ { + originalHTLC := updatedChannel[0].Htlcs[i] + diskHTLC := channel.Htlcs[i] + if !reflect.DeepEqual(originalHTLC, diskHTLC) { + t.Fatalf("htlc's dont match: %v vs %v", + spew.Sdump(originalHTLC), + spew.Sdump(diskHTLC)) + } + } + + // Next, write to the log which tracks the necessary revocation state + // needed to rectify any fishy behavior by the remote party. Modify the + // current uncollapsed revocation state to simulate a state transition + // by the remote party. + newRevocation := bytes.Repeat([]byte{9}, 32) + copy(channel.TheirCurrentRevocationHash[:], newRevocation) + if err := channel.AppendToRevocationLog(delta); err != nil { + t.Fatalf("unable to append to revocation log: %v", err) + } // We should be able to fetch the channel delta created above by it's // update number with all the state properly reconstructed. @@ -432,4 +468,13 @@ func TestChannelStateUpdateLog(t *testing.T) { spew.Sdump(diskHTLC)) } } + // The revocation state stored on-disk should now also be identical. + updatedChannel, err = cdb.FetchOpenChannels(&nodeID) + if err != nil { + t.Fatalf("unable to fetch updated channel: %v", err) + } + if !bytes.Equal(updatedChannel[0].TheirCurrentRevocationHash[:], + newRevocation) { + t.Fatalf("revocation state wasn't synced!") + } }