From 28dd6e5d843fa5d477fb8717b3561f3edd3aea43 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Mon, 14 Aug 2017 12:14:04 +0300 Subject: [PATCH] channeldb+lnwallet: add remote pending commiment persistance --- channeldb/channel.go | 124 +++++++++++++++++++++++++++++++++++++++++++ lnwallet/channel.go | 99 +++++++++++++++++++++++++++++++--- 2 files changed, 215 insertions(+), 8 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index e6a756b6..1fcae57c 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -36,6 +36,9 @@ var ( // active channels. chanIDBucket = []byte("cib") + // commitDiffBucket... + commitDiffBucket = []byte("cdb") + // closedChannelBucket stores summarization information concerning // previously open, but now closed channels. closedChannelBucket = []byte("ccb") @@ -337,6 +340,9 @@ type OpenChannel struct { // within the channel. Htlcs []*HTLC + // LastUpdates... + LastUpdates lnwire.Message + // TODO(roasbeef): eww Db *DB @@ -586,6 +592,114 @@ type ChannelDelta struct { Htlcs []*HTLC } +// CommitDiff... +type CommitDiff struct { + // PendingHeight... + PendingHeight uint64 + + // PendingCommitment... + PendingCommitment *ChannelDelta + + // Updates... + Updates []lnwire.Message +} + +// decode... +func (d *CommitDiff) decode(w io.Writer) error { + var h [8]byte + binary.BigEndian.PutUint64(h[:], d.PendingHeight) + if _, err := w.Write(h[:]); err != nil { + return err + } + + if err := serializeChannelDelta(w, d.PendingCommitment); err != nil { + return err + } + + var l [2]byte + binary.BigEndian.PutUint16(l[:], uint16(len(d.Updates))) + if _, err := w.Write(l[:]); err != nil { + return err + } + + for _, msg := range d.Updates { + if _, err := lnwire.WriteMessage(w, msg, 0); err != nil { + return err + } + } + + return nil +} + +// encode... +func (d *CommitDiff) encode(r io.Reader) error { + var h [8]byte + if _, err := r.Read(h[:]); err != nil { + return err + } + d.PendingHeight = binary.BigEndian.Uint64(h[:]) + + delta, err := deserializeChannelDelta(r) + if err != nil { + return err + } + d.PendingCommitment = delta + + var l [2]byte + if _, err := r.Read(l[:]); err != nil { + return err + } + d.Updates = make([]lnwire.Message, binary.BigEndian.Uint16(l[:])) + + for i, _ := range d.Updates { + msg, err := lnwire.ReadMessage(r, 0) + if err != nil { + return err + } + d.Updates[i] = msg + } + + return nil +} + +// AddCommitDiff... +func AddCommitDiff(db *DB, diff *CommitDiff) error { + return db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(commitDiffBucket) + if err != nil { + return err + } + + var b bytes.Buffer + if err := diff.decode(&b); err != nil { + return err + } + + return bucket.Put([]byte("cdf"), b.Bytes()) + }) +} + +// FetchCommitDiff... +func FetchCommitDiff(db *DB) (*CommitDiff, error) { + var diff *CommitDiff + err := db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(commitDiffBucket) + if bucket == nil { + return errors.New("commit diff bucket haven't been found") + } + + data := bucket.Get([]byte("cdf")) + if data != nil { + return errors.New("unable to find commit diff") + } + + diff = &CommitDiff{} + return diff.encode(bytes.NewReader(data)) + }) + + return diff, err +} + // InsertNextRevocation inserts the _next_ commitment point (revocation) into // the database, and also modifies the internal RemoteNextRevocation attribute // to point to the passed key. This method is to be using during final channel @@ -651,6 +765,16 @@ func (c *OpenChannel) AppendToRevocationLog(delta *ChannelDelta) error { return err } + // ... + diffBucket := tx.Bucket(commitDiffBucket) + if diffBucket != nil { + if diffBucket.Get([]byte("cdf")) != nil { + if err := diffBucket.Delete([]byte("cdf")); err != nil { + return err + } + } + } + return appendChannelLogEntry(logBucket, delta, &c.FundingOutpoint) }) } diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 81c390af..12d9bdbd 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -1003,20 +1003,39 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier, err != channeldb.ErrNoPastDeltas { return nil, err } - remoteCommitment := &commitment{ - ourBalance: state.LocalBalance, - ourMessageIndex: 0, - theirBalance: state.RemoteBalance, - theirMessageIndex: 0, - fee: state.CommitFee, - feePerKw: state.FeePerKw, - } + remoteCommitment := &commitment{} if logTail == nil { + remoteCommitment.ourBalance = state.LocalBalance + remoteCommitment.ourMessageIndex = 0 + remoteCommitment.theirBalance = state.RemoteBalance + remoteCommitment.theirMessageIndex = 0 + remoteCommitment.fee = state.CommitFee + remoteCommitment.feePerKw = state.FeePerKw remoteCommitment.height = 0 } else { + remoteCommitment.ourBalance = logTail.LocalBalance + remoteCommitment.ourMessageIndex = 0 + remoteCommitment.theirBalance = logTail.RemoteBalance + remoteCommitment.theirMessageIndex = 0 + remoteCommitment.fee = logTail.CommitFee + remoteCommitment.feePerKw = logTail.FeePerKw remoteCommitment.height = logTail.UpdateNum + 1 } lc.remoteCommitChain.addCommitment(remoteCommitment) + + commitDiff, err := channeldb.FetchCommitDiff(lc.channelState.Db) + if err == nil { + lc.remoteCommitChain.addCommitment(&commitment{ + height: commitDiff.PendingHeight, + ourBalance: commitDiff.PendingCommitment.LocalBalance, + theirBalance: commitDiff.PendingCommitment.RemoteBalance, + ourMessageIndex: 0, + theirMessageIndex: 0, + fee: commitDiff.PendingCommitment.CommitFee, + feePerKw: commitDiff.PendingCommitment.FeePerKw, + }) + } + walletLog.Debugf("ChannelPoint(%v), starting remote commitment: %v", state.FundingOutpoint, newLogClosure(func() string { return spew.Sdump(lc.remoteCommitChain.tail()) @@ -2323,6 +2342,58 @@ func genRemoteHtlcSigJobs(keyRing *commitmentKeyRing, return sigBatch, cancelChan, nil } +// createCommitDiff +func (lc *LightningChannel) createCommitDiff(commitment *commitment) (*channeldb.CommitDiff, + error) { + + chanID := lnwire.NewChanIDFromOutPoint(&lc.channelState.FundingOutpoint) + var htlcs []lnwire.Message + for e := lc.localUpdateLog.Front(); e != nil; e = e.Next() { + pd := e.Value.(*PaymentDescriptor) + + // ... + var htlc lnwire.Message + if pd.addCommitHeightRemote == commitment.height { + switch pd.EntryType { + case Add: + continue + case Settle: + htlc = &lnwire.UpdateFufillHTLC{ + ChanID: chanID, + ID: pd.Index, + PaymentPreimage: pd.RPreimage, + } + case Fail: + htlc = &lnwire.UpdateFailHTLC{ + ChanID: chanID, + ID: pd.Index, + Reason: pd.FailReason, + } + case MalformedFail: + htlc = &lnwire.UpdateFailMalformedHTLC{ + ChanID: chanID, + ID: pd.Index, + ShaOnionBlob: pd.ShaOnionBlob, + FailureCode: pd.FailCode, + } + } + + htlcs = append(htlcs, htlc) + } + } + + delta, err := commitment.toChannelDelta(false) + if err != nil { + return nil, err + } + + return &channeldb.CommitDiff{ + PendingHeight: commitment.height, + PendingCommitment: delta, + Updates: htlcs, + }, nil +} + // SignNextCommitment signs a new commitment which includes any previous // unsettled HTLCs, any new HTLCs, and any modifications to prior HTLCs // committed in previous commitment updates. Signing a new commitment @@ -2443,6 +2514,18 @@ func (lc *LightningChannel) SignNextCommitment() (*btcec.Signature, []*btcec.Sig // latest commitment update. lc.remoteCommitChain.addCommitment(newCommitView) + // ... + commitDiff, err := lc.createCommitDiff(newCommitView) + if err != nil { + return nil, nil, err + } + + // ... + if err := channeldb.AddCommitDiff(lc.channelState.Db, commitDiff); err != nil { + fmt.Println(err) + return nil, nil, err + } + // If we are the channel initiator then we would have signed any sent // fee update at this point, so mark this update as pending ACK, and // set pendingFeeUpdate to nil. We can do this since we know we won't