From 9247168c5d28929081c4047b6b4cf1e416e1518b Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Tue, 15 Aug 2017 20:09:16 +0300 Subject: [PATCH] not finished index persistence --- channeldb/channel.go | 242 ++++++++++++++++++++++++++++++++++++---- htlcswitch/link.go | 1 + htlcswitch/link_test.go | 4 +- lnwallet/channel.go | 207 +++++++++++++++++++--------------- 4 files changed, 346 insertions(+), 108 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 1fcae57c..0d832457 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -3,6 +3,7 @@ package channeldb import ( "bytes" "encoding/binary" + "errors" "fmt" "io" "net" @@ -68,6 +69,8 @@ var ( minFeePerKwPrefix = []byte("mfp") chanConfigPrefix = []byte("chan-config") updatePrefix = []byte("uup") + ourIndexPrefix = []byte("tip") + theirIndexPrefix = []byte("oip") satSentPrefix = []byte("ssp") satReceivedPrefix = []byte("srp") commitFeePrefix = []byte("cfp") @@ -328,6 +331,22 @@ type OpenChannel struct { // channel. NumUpdates uint64 + // OurMessageIndex... + OurMessageIndex uint64 + + // TheirMessageIndex... + TheirMessageIndex uint64 + + // OurMessageIndex... + OurAckedIndex uint64 + + // TheirMessageIndex... + TheirAckedIndex uint64 + + // TotalSatoshisSent is the total number of satoshis we've sent within + // this channel. + TotalSatoshisSent uint64 + // TotalMSatSent is the total number of milli-satoshis we've sent // within this channel. TotalMSatSent lnwire.MilliSatoshi @@ -474,6 +493,8 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx, c.LocalBalance = delta.LocalBalance c.RemoteBalance = delta.RemoteBalance c.NumUpdates = delta.UpdateNum + c.OurMessageIndex = delta.OurMessageIndex + c.TheirMessageIndex = delta.TheirMessageIndex c.Htlcs = delta.Htlcs c.CommitFee = delta.CommitFee c.FeePerKw = delta.FeePerKw @@ -491,6 +512,12 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx, if err := putChanNumUpdates(chanBucket, c); err != nil { return err } + if err := putOurMessageIndex(chanBucket, c); err != nil { + return err + } + if err := putTheirMessageIndex(chanBucket, c); err != nil { + return err + } if err := putChanCommitFee(chanBucket, c); err != nil { return err } @@ -509,6 +536,32 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx, }) } +// UpdateHTLCs.... +func (c *OpenChannel) UpdateHTLCs(htlcs []*HTLC) error { + c.Lock() + defer c.Unlock() + + return c.Db.Update(func(tx *bolt.Tx) error { + chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) + if err != nil { + return err + } + + id := c.IdentityPub.SerializeCompressed() + nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id) + if err != nil { + return err + } + + if err := putCurrentHtlcs(nodeChanBucket, htlcs, + &c.FundingOutpoint); err != nil { + return err + } + + return nil + }) +} + // HTLC is the on-disk representation of a hash time-locked contract. HTLCs // are contained within ChannelDeltas which encode the current state of the // commitment between state updates. @@ -542,6 +595,15 @@ type HTLC struct { // OnionBlob is an opaque blob which is used to complete multi-hop // routing. OnionBlob []byte + + // AddLocalInclusionHeight... + AddLocalInclusionHeight uint64 + + // AddRemoteInclusionHeight... + AddRemoteInclusionHeight uint64 + + // DescriptorIndex... + DescriptorIndex uint64 } // Copy returns a full copy of the target HTLC. @@ -565,6 +627,12 @@ func (h *HTLC) Copy() HTLC { // For ourselves (the local node) we ONLY store our most recent (unrevoked) // state for safety purposes. type ChannelDelta struct { + // OurMessageIndex... + OurMessageIndex uint64 + + // TheirMessageIndex... + TheirMessageIndex uint64 + // LocalBalance is our current balance at this particular update // number. LocalBalance lnwire.MilliSatoshi @@ -606,9 +674,7 @@ type CommitDiff struct { // decode... func (d *CommitDiff) decode(w io.Writer) error { - var h [8]byte - binary.BigEndian.PutUint64(h[:], d.PendingHeight) - if _, err := w.Write(h[:]); err != nil { + if err := binary.Write(w, byteOrder, d.PendingHeight); err != nil { return err } @@ -616,9 +682,7 @@ func (d *CommitDiff) decode(w io.Writer) error { return err } - var l [2]byte - binary.BigEndian.PutUint16(l[:], uint16(len(d.Updates))) - if _, err := w.Write(l[:]); err != nil { + if err := binary.Write(w, byteOrder, uint16(len(d.Updates))); err != nil { return err } @@ -633,11 +697,9 @@ func (d *CommitDiff) decode(w io.Writer) error { // encode... func (d *CommitDiff) encode(r io.Reader) error { - var h [8]byte - if _, err := r.Read(h[:]); err != nil { + if err := binary.Read(r, byteOrder, &d.PendingHeight); err != nil { return err } - d.PendingHeight = binary.BigEndian.Uint64(h[:]) delta, err := deserializeChannelDelta(r) if err != nil { @@ -645,11 +707,11 @@ func (d *CommitDiff) encode(r io.Reader) error { } d.PendingCommitment = delta - var l [2]byte - if _, err := r.Read(l[:]); err != nil { + var length uint16 + if err := binary.Read(r, byteOrder, &length); err != nil { return err } - d.Updates = make([]lnwire.Message, binary.BigEndian.Uint16(l[:])) + d.Updates = make([]lnwire.Message, length) for i, _ := range d.Updates { msg, err := lnwire.ReadMessage(r, 0) @@ -663,7 +725,8 @@ func (d *CommitDiff) encode(r io.Reader) error { } // AddCommitDiff... -func AddCommitDiff(db *DB, diff *CommitDiff) error { +func AddCommitDiff(db *DB, fundingOutpoint *wire.OutPoint, + diff *CommitDiff) error { return db.Update(func(tx *bolt.Tx) error { bucket, err := tx.CreateBucketIfNotExists(commitDiffBucket) if err != nil { @@ -675,12 +738,19 @@ func AddCommitDiff(db *DB, diff *CommitDiff) error { return err } - return bucket.Put([]byte("cdf"), b.Bytes()) + var outpoint bytes.Buffer + if err := writeOutpoint(&outpoint, fundingOutpoint); err != nil { + return err + } + + key := []byte("cdf") + key = append(key, outpoint.Bytes()...) + return bucket.Put(key, b.Bytes()) }) } // FetchCommitDiff... -func FetchCommitDiff(db *DB) (*CommitDiff, error) { +func FetchCommitDiff(db *DB, fundingOutpoint *wire.OutPoint) (*CommitDiff, error) { var diff *CommitDiff err := db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(commitDiffBucket) @@ -688,8 +758,15 @@ func FetchCommitDiff(db *DB) (*CommitDiff, error) { return errors.New("commit diff bucket haven't been found") } - data := bucket.Get([]byte("cdf")) - if data != nil { + var outpoint bytes.Buffer + if err := writeOutpoint(&outpoint, fundingOutpoint); err != nil { + return err + } + + key := []byte("cdf") + key = append(key, outpoint.Bytes()...) + data := bucket.Get(key) + if data == nil { return errors.New("unable to find commit diff") } @@ -768,8 +845,15 @@ func (c *OpenChannel) AppendToRevocationLog(delta *ChannelDelta) error { // ... diffBucket := tx.Bucket(commitDiffBucket) if diffBucket != nil { - if diffBucket.Get([]byte("cdf")) != nil { - if err := diffBucket.Delete([]byte("cdf")); err != nil { + var outpoint bytes.Buffer + if err := writeOutpoint(&outpoint, &c.FundingOutpoint); err != nil { + return err + } + + key := []byte("cdf") + key = append(key, outpoint.Bytes()...) + if diffBucket.Get(key) != nil { + if err := diffBucket.Delete(key); err != nil { return err } } @@ -1241,6 +1325,12 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err := putChanNumUpdates(openChanBucket, channel); err != nil { return err } + if err := putOurMessageIndex(openChanBucket, channel); err != nil { + return err + } + if err := putTheirMessageIndex(openChanBucket, channel); err != nil { + return err + } if err := putChanAmountsTransferred(openChanBucket, channel); err != nil { return err } @@ -1322,6 +1412,12 @@ func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err = fetchChanNumUpdates(openChanBucket, channel); err != nil { return nil, fmt.Errorf("unable to read num updates: %v", err) } + if err = fetchOurMessageIndex(openChanBucket, channel); err != nil { + return nil, fmt.Errorf("unable to read our message index: %v", err) + } + if err = fetchTheirMessageIndex(openChanBucket, channel); err != nil { + return nil, fmt.Errorf("unable to read their message index: %v", err) + } if err = fetchChanAmountsTransferred(openChanBucket, channel); err != nil { return nil, fmt.Errorf("unable to read sat transferred: %v", err) } @@ -1352,6 +1448,12 @@ func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err := deleteChanNumUpdates(openChanBucket, channelID); err != nil { return err } + if err := deleteOurMessageIndex(openChanBucket, channelID); err != nil { + return err + } + if err := deleteTheirMessageIndex(openChanBucket, channelID); err != nil { + return err + } if err := deleteChanAmountsTransferred(openChanBucket, channelID); err != nil { return err } @@ -2182,6 +2284,84 @@ func fetchChanRevocationState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) return nil } +func putOurMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error { + scratch := make([]byte, 8) + byteOrder.PutUint64(scratch, channel.OurMessageIndex) + + var b bytes.Buffer + if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { + return err + } + + keyPrefix := make([]byte, 3+b.Len()) + copy(keyPrefix, ourIndexPrefix) + copy(keyPrefix[3:], b.Bytes()) + + return openChanBucket.Put(keyPrefix, scratch) +} + +func deleteOurMessageIndex(openChanBucket *bolt.Bucket, chanID []byte) error { + keyPrefix := make([]byte, 3+len(chanID)) + copy(keyPrefix, ourIndexPrefix) + copy(keyPrefix[3:], chanID) + return openChanBucket.Delete(keyPrefix) +} + +func fetchOurMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error { + var b bytes.Buffer + if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { + return err + } + + keyPrefix := make([]byte, 3+b.Len()) + copy(keyPrefix, ourIndexPrefix) + copy(keyPrefix[3:], b.Bytes()) + + updateBytes := openChanBucket.Get(keyPrefix) + channel.OurMessageIndex = byteOrder.Uint64(updateBytes) + + return nil +} + +func putTheirMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error { + scratch := make([]byte, 8) + byteOrder.PutUint64(scratch, channel.TheirMessageIndex) + + var b bytes.Buffer + if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { + return err + } + + keyPrefix := make([]byte, 3+b.Len()) + copy(keyPrefix, theirIndexPrefix) + copy(keyPrefix[3:], b.Bytes()) + + return openChanBucket.Put(keyPrefix, scratch) +} + +func deleteTheirMessageIndex(openChanBucket *bolt.Bucket, chanID []byte) error { + keyPrefix := make([]byte, 3+len(chanID)) + copy(keyPrefix, theirIndexPrefix) + copy(keyPrefix[3:], chanID) + return openChanBucket.Delete(keyPrefix) +} + +func fetchTheirMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error { + var b bytes.Buffer + if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { + return err + } + + keyPrefix := make([]byte, 3+b.Len()) + copy(keyPrefix, theirIndexPrefix) + copy(keyPrefix[3:], b.Bytes()) + + updateBytes := openChanBucket.Get(keyPrefix) + channel.TheirMessageIndex = byteOrder.Uint64(updateBytes) + + return nil +} + func serializeHTLC(w io.Writer, h *HTLC) error { if err := wire.WriteVarBytes(w, 0, h.Signature); err != nil { return err @@ -2222,6 +2402,18 @@ func serializeHTLC(w io.Writer, h *HTLC) error { return err } + if err := binary.Write(w, byteOrder, h.AddLocalInclusionHeight); err != nil { + return err + } + + if err := binary.Write(w, byteOrder, h.AddRemoteInclusionHeight); err != nil { + return err + } + + if err := binary.Write(w, byteOrder, h.DescriptorIndex); err != nil { + return err + } + return nil } @@ -2272,6 +2464,18 @@ func deserializeHTLC(r io.Reader) (*HTLC, error) { } } + if err := binary.Read(r, byteOrder, &h.AddLocalInclusionHeight); err != nil { + return nil, err + } + + if err := binary.Read(r, byteOrder, &h.AddRemoteInclusionHeight); err != nil { + return nil, err + } + + if err := binary.Read(r, byteOrder, &h.DescriptorIndex); err != nil { + return nil, err + } + return h, nil } diff --git a/htlcswitch/link.go b/htlcswitch/link.go index b81f3d3f..64c1273a 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -872,6 +872,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } } }() + case *lnwire.UpdateFee: // We received fee update from peer. If we are the initator we // will fail the channel, if not we will apply the update. diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 5e373bde..ccabcb3e 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1828,9 +1828,9 @@ func TestChannelRetransmission(t *testing.T) { serverErr := make(chan error, 4) aliceInterceptor := createInterceptorFunc("[alice] <-- [bob]", - "alice", messages, chanID, false) + "alice", messages, chanID, true) bobInterceptor := createInterceptorFunc("[alice] --> [bob]", - "bob", messages, chanID, false) + "bob", messages, chanID, true) // Add interceptor to check the order of Bob and Alice messages. n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice, diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 12d9bdbd..7a9c3f2e 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -516,61 +516,80 @@ func (c *commitment) populateHtlcIndexes(ourCommitTx bool, // toChannelDelta converts the target commitment into a format suitable to be // written to disk after an accepted state transition. func (c *commitment) toChannelDelta(ourCommit bool) (*channeldb.ChannelDelta, error) { - numHtlcs := len(c.outgoingHTLCs) + len(c.incomingHTLCs) + var ourMessageIndex uint64 + var theirMessageIndex uint64 - delta := &channeldb.ChannelDelta{ - LocalBalance: c.ourBalance, - RemoteBalance: c.theirBalance, - UpdateNum: c.height, - CommitFee: c.fee, - FeePerKw: c.feePerKw, - Htlcs: make([]*channeldb.HTLC, 0, numHtlcs), + if ourCommit { + ourMessageIndex = c.ourMessageIndex + theirMessageIndex = c.theirMessageIndex + } else { + ourMessageIndex = c.theirMessageIndex + theirMessageIndex = c.ourMessageIndex + } + + return &channeldb.ChannelDelta{ + OurMessageIndex: ourMessageIndex, + TheirMessageIndex: theirMessageIndex, + LocalBalance: c.ourBalance, + RemoteBalance: c.theirBalance, + UpdateNum: c.height, + CommitFee: c.fee, + FeePerKw: c.feePerKw, + Htlcs: c.htlcs(ourCommit), + }, nil +} + +// htlcs... +func (c *commitment) htlcs(ourCommit bool) []*channeldb.HTLC { + numHtlcs := len(c.outgoingHTLCs) + len(c.incomingHTLCs) + htlcs := make([]*channeldb.HTLC, 0, numHtlcs) + + pdToHtlc := func(incoming bool, htlc PaymentDescriptor) *channeldb.HTLC { + outputIndex := htlc.localOutputIndex + if !ourCommit { + outputIndex = htlc.remoteOutputIndex + } + + h := &channeldb.HTLC{ + Incoming: incoming, + Amt: htlc.Amount, + RHash: htlc.RHash, + RefundTimeout: htlc.Timeout, + OutputIndex: outputIndex, + OnionBlob: htlc.OnionBlob, + AddLocalInclusionHeight: htlc.addCommitHeightLocal, + AddRemoteInclusionHeight: htlc.addCommitHeightRemote, + DescriptorIndex: htlc.Index, + } + + //if incoming { + // fmt.Println("save, receiver:", + // "remote:", h.AddRemoteInclusionHeight, + // "local:", h.AddLocalInclusionHeight, + // "index:", h.DescriptorIndex) + //} else { + // fmt.Println("save, sender:", + // "remote:", h.AddRemoteInclusionHeight, + // "local:", h.AddLocalInclusionHeight, + // "index:", h.DescriptorIndex) + //} + + if ourCommit && htlc.sig != nil { + h.Signature = htlc.sig.Serialize() + } + + return h } for _, htlc := range c.outgoingHTLCs { - outputIndex := htlc.localOutputIndex - if !ourCommit { - outputIndex = htlc.remoteOutputIndex - } - - h := &channeldb.HTLC{ - Incoming: false, - Amt: htlc.Amount, - RHash: htlc.RHash, - RefundTimeout: htlc.Timeout, - OutputIndex: outputIndex, - } - - if ourCommit && htlc.sig != nil { - h.Signature = htlc.sig.Serialize() - } - - delta.Htlcs = append(delta.Htlcs, h) + htlcs = append(htlcs, pdToHtlc(false, htlc)) } for _, htlc := range c.incomingHTLCs { - outputIndex := htlc.localOutputIndex - if !ourCommit { - outputIndex = htlc.remoteOutputIndex - } - - h := &channeldb.HTLC{ - Incoming: true, - Amt: htlc.Amount, - RHash: htlc.RHash, - RefundTimeout: htlc.Timeout, - OutputIndex: outputIndex, - OnionBlob: htlc.OnionBlob, - } - - if ourCommit && htlc.sig != nil { - h.Signature = htlc.sig.Serialize() - } - - delta.Htlcs = append(delta.Htlcs, h) + htlcs = append(htlcs, pdToHtlc(true, htlc)) } - return delta, nil + return htlcs } // commitmentChain represents a chain of unrevoked commitments. The tail of the @@ -671,18 +690,20 @@ type updateLog struct { } // newUpdateLog creates a new updateLog instance. -func newUpdateLog() *updateLog { +func newUpdateLog(logIndex, ackedIndex uint64) *updateLog { return &updateLog{ List: list.New(), updateIndex: make(map[uint64]*list.Element), htlcIndex: make(map[uint64]*list.Element), + logIndex: logIndex, + ackedIndex: ackedIndex, } } // appendUpdate appends a new update to the tip of the updateLog. The entry is // also added to index accordingly. func (u *updateLog) appendUpdate(pd *PaymentDescriptor) { - u.updateIndex[u.logIndex] = u.PushBack(pd) + u.updateIndex[pd.Index] = u.PushBack(pd) u.logIndex++ } @@ -964,8 +985,8 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier, channelState: state, localChanCfg: &state.LocalChanCfg, remoteChanCfg: &state.RemoteChanCfg, - localUpdateLog: newUpdateLog(), - remoteUpdateLog: newUpdateLog(), + localUpdateLog: newUpdateLog(state.OurMessageIndex, state.OurMessageIndex), + remoteUpdateLog: newUpdateLog(state.TheirMessageIndex, state.TheirMessageIndex), rHashMap: make(map[PaymentHash][]*PaymentDescriptor), Capacity: state.Capacity, FundingWitnessScript: multiSigScript, @@ -983,12 +1004,16 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier, lc.localCommitChain.addCommitment(&commitment{ height: lc.currentHeight, ourBalance: state.LocalBalance, - ourMessageIndex: 0, + ourMessageIndex: state.OurMessageIndex, theirBalance: state.RemoteBalance, - theirMessageIndex: 0, + theirMessageIndex: state.TheirMessageIndex, fee: state.CommitFee, feePerKw: state.FeePerKw, }) + + fmt.Println("local commit restored:", "our:", state.OurMessageIndex, + "their:", state.TheirMessageIndex) + walletLog.Debugf("ChannelPoint(%v), starting local commitment: %v", state.FundingOutpoint, newLogClosure(func() string { return spew.Sdump(lc.localCommitChain.tail()) @@ -1003,37 +1028,42 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier, err != channeldb.ErrNoPastDeltas { return nil, err } + remoteCommitment := &commitment{} if logTail == nil { remoteCommitment.ourBalance = state.LocalBalance - remoteCommitment.ourMessageIndex = 0 + remoteCommitment.ourMessageIndex = state.OurMessageIndex remoteCommitment.theirBalance = state.RemoteBalance - remoteCommitment.theirMessageIndex = 0 + remoteCommitment.theirMessageIndex = state.TheirMessageIndex remoteCommitment.fee = state.CommitFee remoteCommitment.feePerKw = state.FeePerKw remoteCommitment.height = 0 } else { - remoteCommitment.ourBalance = logTail.LocalBalance - remoteCommitment.ourMessageIndex = 0 - remoteCommitment.theirBalance = logTail.RemoteBalance - remoteCommitment.theirMessageIndex = 0 - remoteCommitment.fee = logTail.CommitFee - remoteCommitment.feePerKw = logTail.FeePerKw + remoteCommitment.ourBalance = state.LocalBalance + remoteCommitment.ourMessageIndex = logTail.OurMessageIndex + remoteCommitment.theirBalance = state.RemoteBalance + remoteCommitment.theirMessageIndex = logTail.TheirMessageIndex + remoteCommitment.fee = state.CommitFee + remoteCommitment.feePerKw = state.FeePerKw remoteCommitment.height = logTail.UpdateNum + 1 } lc.remoteCommitChain.addCommitment(remoteCommitment) - commitDiff, err := channeldb.FetchCommitDiff(lc.channelState.Db) + commitDiff, err := channeldb.FetchCommitDiff(lc.channelState.Db, + &lc.channelState.FundingOutpoint) if err == nil { lc.remoteCommitChain.addCommitment(&commitment{ height: commitDiff.PendingHeight, ourBalance: commitDiff.PendingCommitment.LocalBalance, theirBalance: commitDiff.PendingCommitment.RemoteBalance, - ourMessageIndex: 0, - theirMessageIndex: 0, + ourMessageIndex: commitDiff.PendingCommitment.OurMessageIndex, + theirMessageIndex: commitDiff.PendingCommitment.TheirMessageIndex, fee: commitDiff.PendingCommitment.CommitFee, feePerKw: commitDiff.PendingCommitment.FeePerKw, }) + + fmt.Println("commit diff:", commitDiff.PendingCommitment.OurMessageIndex, + commitDiff.PendingCommitment.TheirMessageIndex) } walletLog.Debugf("ChannelPoint(%v), starting remote commitment: %v", @@ -1650,11 +1680,6 @@ func htlcIsDust(incoming, ourCommit bool, // remote) for each HTLC read from disk. This method is required to sync the // in-memory state of the state machine with that read from persistent storage. func (lc *LightningChannel) restoreStateLogs() error { - var pastHeight uint64 - if lc.currentHeight > 0 { - pastHeight = lc.currentHeight - 1 - } - // Obtain the local and remote channel configurations. These house all // the relevant public keys and points we'll need in order to restore // the state log. @@ -1681,8 +1706,6 @@ func (lc *LightningChannel) restoreStateLogs() error { remoteCommitKeys := deriveCommitmentKeys(remoteCommitPoint, false, localChanCfg, remoteChanCfg) - var ourCounter, theirCounter uint64 - // Grab the current fee rate as we'll need this to determine if the // prior HTLC's were considered dust or not at this particular // commitment state. @@ -1728,12 +1751,14 @@ func (lc *LightningChannel) restoreStateLogs() error { } pd := &PaymentDescriptor{ - RHash: htlc.RHash, - Timeout: htlc.RefundTimeout, - Amount: htlc.Amt, - EntryType: Add, - addCommitHeightRemote: pastHeight, - addCommitHeightLocal: pastHeight, + RHash: htlc.RHash, + Timeout: htlc.RefundTimeout, + Amount: htlc.Amt, + EntryType: Add, + Index: htlc.DescriptorIndex, + addCommitHeightRemote: htlc.AddRemoteInclusionHeight, + addCommitHeightLocal: htlc.AddLocalInclusionHeight, + OnionBlob: htlc.OnionBlob, ourPkScript: ourP2WSH, ourWitnessScript: ourWitnessScript, theirPkScript: theirP2WSH, @@ -1743,22 +1768,14 @@ func (lc *LightningChannel) restoreStateLogs() error { if !htlc.Incoming { pd.HtlcIndex = ourCounter lc.localUpdateLog.appendHtlc(pd) - - ourCounter++ } else { pd.HtlcIndex = theirCounter lc.remoteUpdateLog.appendHtlc(pd) - lc.rHashMap[pd.RHash] = append(lc.rHashMap[pd.RHash], pd) - theirCounter++ + lc.rHashMap[pd.RHash] = append(lc.rHashMap[pd.RHash], pd) } } - lc.localCommitChain.tail().ourMessageIndex = ourCounter - lc.localCommitChain.tail().theirMessageIndex = theirCounter - lc.remoteCommitChain.tail().ourMessageIndex = ourCounter - lc.remoteCommitChain.tail().theirMessageIndex = theirCounter - return nil } @@ -2521,8 +2538,13 @@ func (lc *LightningChannel) SignNextCommitment() (*btcec.Signature, []*btcec.Sig } // ... - if err := channeldb.AddCommitDiff(lc.channelState.Db, commitDiff); err != nil { - fmt.Println(err) + if err := channeldb.AddCommitDiff(lc.channelState.Db, + &lc.channelState.FundingOutpoint, + commitDiff); err != nil { + return nil, nil, err + } + + if err := lc.channelState.UpdateHTLCs(newCommitView.htlcs(false)); err != nil { return nil, nil, err } @@ -2582,6 +2604,9 @@ func (lc *LightningChannel) ReceiveReestablish(msg *lnwire.ChannelReestablish) ( // last commit sig message. commitment := lc.remoteCommitChain.tip() chanID := lnwire.NewChanIDFromOutPoint(&lc.channelState.FundingOutpoint) + + // TODO: Read from update log, which will contains settle/fail + // updates also. for _, htlc := range commitment.outgoingHTLCs { // If htlc is included in the local commitment chain (have been // included by remote side) or htlc is included in remote chain, but @@ -3014,6 +3039,11 @@ func (lc *LightningChannel) FullySynced() bool { remoteUpdatesSynced := lastLocalCommit.theirMessageIndex == lastRemoteCommit.theirMessageIndex + fmt.Println("remote, our:", lc.remoteCommitChain.tip().ourMessageIndex, + "local, our:", lc.localCommitChain.tip().ourMessageIndex) + fmt.Println("remote, their:", lc.remoteCommitChain.tip().theirMessageIndex, + "local, their:", lc.localCommitChain.tip().theirMessageIndex) + fmt.Println(!oweCommitment, localUpdatesSynced, remoteUpdatesSynced) return !oweCommitment && localUpdatesSynced && remoteUpdatesSynced } @@ -3140,6 +3170,9 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*P // * either record add height, or set to N - 1 uncomitted := (htlc.addCommitHeightRemote == 0 || htlc.addCommitHeightLocal == 0) + fmt.Println(remoteChainTail, localChainTail, + htlc.addCommitHeightRemote, + htlc.addCommitHeightLocal) if htlc.EntryType == Add && uncomitted { continue }