not finished index persistence

This commit is contained in:
Andrey Samokhvalov 2017-08-15 20:09:16 +03:00 committed by Olaoluwa Osuntokun
parent 28dd6e5d84
commit 9247168c5d
4 changed files with 346 additions and 108 deletions

@ -3,6 +3,7 @@ package channeldb
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
@ -68,6 +69,8 @@ var (
minFeePerKwPrefix = []byte("mfp")
chanConfigPrefix = []byte("chan-config")
updatePrefix = []byte("uup")
ourIndexPrefix = []byte("tip")
theirIndexPrefix = []byte("oip")
satSentPrefix = []byte("ssp")
satReceivedPrefix = []byte("srp")
commitFeePrefix = []byte("cfp")
@ -328,6 +331,22 @@ type OpenChannel struct {
// channel.
NumUpdates uint64
// OurMessageIndex...
OurMessageIndex uint64
// TheirMessageIndex...
TheirMessageIndex uint64
// OurMessageIndex...
OurAckedIndex uint64
// TheirMessageIndex...
TheirAckedIndex uint64
// TotalSatoshisSent is the total number of satoshis we've sent within
// this channel.
TotalSatoshisSent uint64
// TotalMSatSent is the total number of milli-satoshis we've sent
// within this channel.
TotalMSatSent lnwire.MilliSatoshi
@ -474,6 +493,8 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx,
c.LocalBalance = delta.LocalBalance
c.RemoteBalance = delta.RemoteBalance
c.NumUpdates = delta.UpdateNum
c.OurMessageIndex = delta.OurMessageIndex
c.TheirMessageIndex = delta.TheirMessageIndex
c.Htlcs = delta.Htlcs
c.CommitFee = delta.CommitFee
c.FeePerKw = delta.FeePerKw
@ -491,6 +512,12 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx,
if err := putChanNumUpdates(chanBucket, c); err != nil {
return err
}
if err := putOurMessageIndex(chanBucket, c); err != nil {
return err
}
if err := putTheirMessageIndex(chanBucket, c); err != nil {
return err
}
if err := putChanCommitFee(chanBucket, c); err != nil {
return err
}
@ -509,6 +536,32 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx,
})
}
// UpdateHTLCs....
func (c *OpenChannel) UpdateHTLCs(htlcs []*HTLC) error {
c.Lock()
defer c.Unlock()
return c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
}
id := c.IdentityPub.SerializeCompressed()
nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id)
if err != nil {
return err
}
if err := putCurrentHtlcs(nodeChanBucket, htlcs,
&c.FundingOutpoint); err != nil {
return err
}
return nil
})
}
// HTLC is the on-disk representation of a hash time-locked contract. HTLCs
// are contained within ChannelDeltas which encode the current state of the
// commitment between state updates.
@ -542,6 +595,15 @@ type HTLC struct {
// OnionBlob is an opaque blob which is used to complete multi-hop
// routing.
OnionBlob []byte
// AddLocalInclusionHeight...
AddLocalInclusionHeight uint64
// AddRemoteInclusionHeight...
AddRemoteInclusionHeight uint64
// DescriptorIndex...
DescriptorIndex uint64
}
// Copy returns a full copy of the target HTLC.
@ -565,6 +627,12 @@ func (h *HTLC) Copy() HTLC {
// For ourselves (the local node) we ONLY store our most recent (unrevoked)
// state for safety purposes.
type ChannelDelta struct {
// OurMessageIndex...
OurMessageIndex uint64
// TheirMessageIndex...
TheirMessageIndex uint64
// LocalBalance is our current balance at this particular update
// number.
LocalBalance lnwire.MilliSatoshi
@ -606,9 +674,7 @@ type CommitDiff struct {
// decode...
func (d *CommitDiff) decode(w io.Writer) error {
var h [8]byte
binary.BigEndian.PutUint64(h[:], d.PendingHeight)
if _, err := w.Write(h[:]); err != nil {
if err := binary.Write(w, byteOrder, d.PendingHeight); err != nil {
return err
}
@ -616,9 +682,7 @@ func (d *CommitDiff) decode(w io.Writer) error {
return err
}
var l [2]byte
binary.BigEndian.PutUint16(l[:], uint16(len(d.Updates)))
if _, err := w.Write(l[:]); err != nil {
if err := binary.Write(w, byteOrder, uint16(len(d.Updates))); err != nil {
return err
}
@ -633,11 +697,9 @@ func (d *CommitDiff) decode(w io.Writer) error {
// encode...
func (d *CommitDiff) encode(r io.Reader) error {
var h [8]byte
if _, err := r.Read(h[:]); err != nil {
if err := binary.Read(r, byteOrder, &d.PendingHeight); err != nil {
return err
}
d.PendingHeight = binary.BigEndian.Uint64(h[:])
delta, err := deserializeChannelDelta(r)
if err != nil {
@ -645,11 +707,11 @@ func (d *CommitDiff) encode(r io.Reader) error {
}
d.PendingCommitment = delta
var l [2]byte
if _, err := r.Read(l[:]); err != nil {
var length uint16
if err := binary.Read(r, byteOrder, &length); err != nil {
return err
}
d.Updates = make([]lnwire.Message, binary.BigEndian.Uint16(l[:]))
d.Updates = make([]lnwire.Message, length)
for i, _ := range d.Updates {
msg, err := lnwire.ReadMessage(r, 0)
@ -663,7 +725,8 @@ func (d *CommitDiff) encode(r io.Reader) error {
}
// AddCommitDiff...
func AddCommitDiff(db *DB, diff *CommitDiff) error {
func AddCommitDiff(db *DB, fundingOutpoint *wire.OutPoint,
diff *CommitDiff) error {
return db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(commitDiffBucket)
if err != nil {
@ -675,12 +738,19 @@ func AddCommitDiff(db *DB, diff *CommitDiff) error {
return err
}
return bucket.Put([]byte("cdf"), b.Bytes())
var outpoint bytes.Buffer
if err := writeOutpoint(&outpoint, fundingOutpoint); err != nil {
return err
}
key := []byte("cdf")
key = append(key, outpoint.Bytes()...)
return bucket.Put(key, b.Bytes())
})
}
// FetchCommitDiff...
func FetchCommitDiff(db *DB) (*CommitDiff, error) {
func FetchCommitDiff(db *DB, fundingOutpoint *wire.OutPoint) (*CommitDiff, error) {
var diff *CommitDiff
err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(commitDiffBucket)
@ -688,8 +758,15 @@ func FetchCommitDiff(db *DB) (*CommitDiff, error) {
return errors.New("commit diff bucket haven't been found")
}
data := bucket.Get([]byte("cdf"))
if data != nil {
var outpoint bytes.Buffer
if err := writeOutpoint(&outpoint, fundingOutpoint); err != nil {
return err
}
key := []byte("cdf")
key = append(key, outpoint.Bytes()...)
data := bucket.Get(key)
if data == nil {
return errors.New("unable to find commit diff")
}
@ -768,8 +845,15 @@ func (c *OpenChannel) AppendToRevocationLog(delta *ChannelDelta) error {
// ...
diffBucket := tx.Bucket(commitDiffBucket)
if diffBucket != nil {
if diffBucket.Get([]byte("cdf")) != nil {
if err := diffBucket.Delete([]byte("cdf")); err != nil {
var outpoint bytes.Buffer
if err := writeOutpoint(&outpoint, &c.FundingOutpoint); err != nil {
return err
}
key := []byte("cdf")
key = append(key, outpoint.Bytes()...)
if diffBucket.Get(key) != nil {
if err := diffBucket.Delete(key); err != nil {
return err
}
}
@ -1241,6 +1325,12 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
if err := putChanNumUpdates(openChanBucket, channel); err != nil {
return err
}
if err := putOurMessageIndex(openChanBucket, channel); err != nil {
return err
}
if err := putTheirMessageIndex(openChanBucket, channel); err != nil {
return err
}
if err := putChanAmountsTransferred(openChanBucket, channel); err != nil {
return err
}
@ -1322,6 +1412,12 @@ func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
if err = fetchChanNumUpdates(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read num updates: %v", err)
}
if err = fetchOurMessageIndex(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read our message index: %v", err)
}
if err = fetchTheirMessageIndex(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read their message index: %v", err)
}
if err = fetchChanAmountsTransferred(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read sat transferred: %v", err)
}
@ -1352,6 +1448,12 @@ func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
if err := deleteChanNumUpdates(openChanBucket, channelID); err != nil {
return err
}
if err := deleteOurMessageIndex(openChanBucket, channelID); err != nil {
return err
}
if err := deleteTheirMessageIndex(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanAmountsTransferred(openChanBucket, channelID); err != nil {
return err
}
@ -2182,6 +2284,84 @@ func fetchChanRevocationState(nodeChanBucket *bolt.Bucket, channel *OpenChannel)
return nil
}
func putOurMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch := make([]byte, 8)
byteOrder.PutUint64(scratch, channel.OurMessageIndex)
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, ourIndexPrefix)
copy(keyPrefix[3:], b.Bytes())
return openChanBucket.Put(keyPrefix, scratch)
}
func deleteOurMessageIndex(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix, ourIndexPrefix)
copy(keyPrefix[3:], chanID)
return openChanBucket.Delete(keyPrefix)
}
func fetchOurMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, ourIndexPrefix)
copy(keyPrefix[3:], b.Bytes())
updateBytes := openChanBucket.Get(keyPrefix)
channel.OurMessageIndex = byteOrder.Uint64(updateBytes)
return nil
}
func putTheirMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch := make([]byte, 8)
byteOrder.PutUint64(scratch, channel.TheirMessageIndex)
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, theirIndexPrefix)
copy(keyPrefix[3:], b.Bytes())
return openChanBucket.Put(keyPrefix, scratch)
}
func deleteTheirMessageIndex(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix, theirIndexPrefix)
copy(keyPrefix[3:], chanID)
return openChanBucket.Delete(keyPrefix)
}
func fetchTheirMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, theirIndexPrefix)
copy(keyPrefix[3:], b.Bytes())
updateBytes := openChanBucket.Get(keyPrefix)
channel.TheirMessageIndex = byteOrder.Uint64(updateBytes)
return nil
}
func serializeHTLC(w io.Writer, h *HTLC) error {
if err := wire.WriteVarBytes(w, 0, h.Signature); err != nil {
return err
@ -2222,6 +2402,18 @@ func serializeHTLC(w io.Writer, h *HTLC) error {
return err
}
if err := binary.Write(w, byteOrder, h.AddLocalInclusionHeight); err != nil {
return err
}
if err := binary.Write(w, byteOrder, h.AddRemoteInclusionHeight); err != nil {
return err
}
if err := binary.Write(w, byteOrder, h.DescriptorIndex); err != nil {
return err
}
return nil
}
@ -2272,6 +2464,18 @@ func deserializeHTLC(r io.Reader) (*HTLC, error) {
}
}
if err := binary.Read(r, byteOrder, &h.AddLocalInclusionHeight); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &h.AddRemoteInclusionHeight); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &h.DescriptorIndex); err != nil {
return nil, err
}
return h, nil
}

@ -872,6 +872,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
}
}
}()
case *lnwire.UpdateFee:
// We received fee update from peer. If we are the initator we
// will fail the channel, if not we will apply the update.

@ -1828,9 +1828,9 @@ func TestChannelRetransmission(t *testing.T) {
serverErr := make(chan error, 4)
aliceInterceptor := createInterceptorFunc("[alice] <-- [bob]",
"alice", messages, chanID, false)
"alice", messages, chanID, true)
bobInterceptor := createInterceptorFunc("[alice] --> [bob]",
"bob", messages, chanID, false)
"bob", messages, chanID, true)
// Add interceptor to check the order of Bob and Alice messages.
n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice,

@ -516,61 +516,80 @@ func (c *commitment) populateHtlcIndexes(ourCommitTx bool,
// toChannelDelta converts the target commitment into a format suitable to be
// written to disk after an accepted state transition.
func (c *commitment) toChannelDelta(ourCommit bool) (*channeldb.ChannelDelta, error) {
numHtlcs := len(c.outgoingHTLCs) + len(c.incomingHTLCs)
var ourMessageIndex uint64
var theirMessageIndex uint64
delta := &channeldb.ChannelDelta{
LocalBalance: c.ourBalance,
RemoteBalance: c.theirBalance,
UpdateNum: c.height,
CommitFee: c.fee,
FeePerKw: c.feePerKw,
Htlcs: make([]*channeldb.HTLC, 0, numHtlcs),
if ourCommit {
ourMessageIndex = c.ourMessageIndex
theirMessageIndex = c.theirMessageIndex
} else {
ourMessageIndex = c.theirMessageIndex
theirMessageIndex = c.ourMessageIndex
}
return &channeldb.ChannelDelta{
OurMessageIndex: ourMessageIndex,
TheirMessageIndex: theirMessageIndex,
LocalBalance: c.ourBalance,
RemoteBalance: c.theirBalance,
UpdateNum: c.height,
CommitFee: c.fee,
FeePerKw: c.feePerKw,
Htlcs: c.htlcs(ourCommit),
}, nil
}
// htlcs...
func (c *commitment) htlcs(ourCommit bool) []*channeldb.HTLC {
numHtlcs := len(c.outgoingHTLCs) + len(c.incomingHTLCs)
htlcs := make([]*channeldb.HTLC, 0, numHtlcs)
pdToHtlc := func(incoming bool, htlc PaymentDescriptor) *channeldb.HTLC {
outputIndex := htlc.localOutputIndex
if !ourCommit {
outputIndex = htlc.remoteOutputIndex
}
h := &channeldb.HTLC{
Incoming: incoming,
Amt: htlc.Amount,
RHash: htlc.RHash,
RefundTimeout: htlc.Timeout,
OutputIndex: outputIndex,
OnionBlob: htlc.OnionBlob,
AddLocalInclusionHeight: htlc.addCommitHeightLocal,
AddRemoteInclusionHeight: htlc.addCommitHeightRemote,
DescriptorIndex: htlc.Index,
}
//if incoming {
// fmt.Println("save, receiver:",
// "remote:", h.AddRemoteInclusionHeight,
// "local:", h.AddLocalInclusionHeight,
// "index:", h.DescriptorIndex)
//} else {
// fmt.Println("save, sender:",
// "remote:", h.AddRemoteInclusionHeight,
// "local:", h.AddLocalInclusionHeight,
// "index:", h.DescriptorIndex)
//}
if ourCommit && htlc.sig != nil {
h.Signature = htlc.sig.Serialize()
}
return h
}
for _, htlc := range c.outgoingHTLCs {
outputIndex := htlc.localOutputIndex
if !ourCommit {
outputIndex = htlc.remoteOutputIndex
}
h := &channeldb.HTLC{
Incoming: false,
Amt: htlc.Amount,
RHash: htlc.RHash,
RefundTimeout: htlc.Timeout,
OutputIndex: outputIndex,
}
if ourCommit && htlc.sig != nil {
h.Signature = htlc.sig.Serialize()
}
delta.Htlcs = append(delta.Htlcs, h)
htlcs = append(htlcs, pdToHtlc(false, htlc))
}
for _, htlc := range c.incomingHTLCs {
outputIndex := htlc.localOutputIndex
if !ourCommit {
outputIndex = htlc.remoteOutputIndex
}
h := &channeldb.HTLC{
Incoming: true,
Amt: htlc.Amount,
RHash: htlc.RHash,
RefundTimeout: htlc.Timeout,
OutputIndex: outputIndex,
OnionBlob: htlc.OnionBlob,
}
if ourCommit && htlc.sig != nil {
h.Signature = htlc.sig.Serialize()
}
delta.Htlcs = append(delta.Htlcs, h)
htlcs = append(htlcs, pdToHtlc(true, htlc))
}
return delta, nil
return htlcs
}
// commitmentChain represents a chain of unrevoked commitments. The tail of the
@ -671,18 +690,20 @@ type updateLog struct {
}
// newUpdateLog creates a new updateLog instance.
func newUpdateLog() *updateLog {
func newUpdateLog(logIndex, ackedIndex uint64) *updateLog {
return &updateLog{
List: list.New(),
updateIndex: make(map[uint64]*list.Element),
htlcIndex: make(map[uint64]*list.Element),
logIndex: logIndex,
ackedIndex: ackedIndex,
}
}
// appendUpdate appends a new update to the tip of the updateLog. The entry is
// also added to index accordingly.
func (u *updateLog) appendUpdate(pd *PaymentDescriptor) {
u.updateIndex[u.logIndex] = u.PushBack(pd)
u.updateIndex[pd.Index] = u.PushBack(pd)
u.logIndex++
}
@ -964,8 +985,8 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier,
channelState: state,
localChanCfg: &state.LocalChanCfg,
remoteChanCfg: &state.RemoteChanCfg,
localUpdateLog: newUpdateLog(),
remoteUpdateLog: newUpdateLog(),
localUpdateLog: newUpdateLog(state.OurMessageIndex, state.OurMessageIndex),
remoteUpdateLog: newUpdateLog(state.TheirMessageIndex, state.TheirMessageIndex),
rHashMap: make(map[PaymentHash][]*PaymentDescriptor),
Capacity: state.Capacity,
FundingWitnessScript: multiSigScript,
@ -983,12 +1004,16 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier,
lc.localCommitChain.addCommitment(&commitment{
height: lc.currentHeight,
ourBalance: state.LocalBalance,
ourMessageIndex: 0,
ourMessageIndex: state.OurMessageIndex,
theirBalance: state.RemoteBalance,
theirMessageIndex: 0,
theirMessageIndex: state.TheirMessageIndex,
fee: state.CommitFee,
feePerKw: state.FeePerKw,
})
fmt.Println("local commit restored:", "our:", state.OurMessageIndex,
"their:", state.TheirMessageIndex)
walletLog.Debugf("ChannelPoint(%v), starting local commitment: %v",
state.FundingOutpoint, newLogClosure(func() string {
return spew.Sdump(lc.localCommitChain.tail())
@ -1003,37 +1028,42 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier,
err != channeldb.ErrNoPastDeltas {
return nil, err
}
remoteCommitment := &commitment{}
if logTail == nil {
remoteCommitment.ourBalance = state.LocalBalance
remoteCommitment.ourMessageIndex = 0
remoteCommitment.ourMessageIndex = state.OurMessageIndex
remoteCommitment.theirBalance = state.RemoteBalance
remoteCommitment.theirMessageIndex = 0
remoteCommitment.theirMessageIndex = state.TheirMessageIndex
remoteCommitment.fee = state.CommitFee
remoteCommitment.feePerKw = state.FeePerKw
remoteCommitment.height = 0
} else {
remoteCommitment.ourBalance = logTail.LocalBalance
remoteCommitment.ourMessageIndex = 0
remoteCommitment.theirBalance = logTail.RemoteBalance
remoteCommitment.theirMessageIndex = 0
remoteCommitment.fee = logTail.CommitFee
remoteCommitment.feePerKw = logTail.FeePerKw
remoteCommitment.ourBalance = state.LocalBalance
remoteCommitment.ourMessageIndex = logTail.OurMessageIndex
remoteCommitment.theirBalance = state.RemoteBalance
remoteCommitment.theirMessageIndex = logTail.TheirMessageIndex
remoteCommitment.fee = state.CommitFee
remoteCommitment.feePerKw = state.FeePerKw
remoteCommitment.height = logTail.UpdateNum + 1
}
lc.remoteCommitChain.addCommitment(remoteCommitment)
commitDiff, err := channeldb.FetchCommitDiff(lc.channelState.Db)
commitDiff, err := channeldb.FetchCommitDiff(lc.channelState.Db,
&lc.channelState.FundingOutpoint)
if err == nil {
lc.remoteCommitChain.addCommitment(&commitment{
height: commitDiff.PendingHeight,
ourBalance: commitDiff.PendingCommitment.LocalBalance,
theirBalance: commitDiff.PendingCommitment.RemoteBalance,
ourMessageIndex: 0,
theirMessageIndex: 0,
ourMessageIndex: commitDiff.PendingCommitment.OurMessageIndex,
theirMessageIndex: commitDiff.PendingCommitment.TheirMessageIndex,
fee: commitDiff.PendingCommitment.CommitFee,
feePerKw: commitDiff.PendingCommitment.FeePerKw,
})
fmt.Println("commit diff:", commitDiff.PendingCommitment.OurMessageIndex,
commitDiff.PendingCommitment.TheirMessageIndex)
}
walletLog.Debugf("ChannelPoint(%v), starting remote commitment: %v",
@ -1650,11 +1680,6 @@ func htlcIsDust(incoming, ourCommit bool,
// remote) for each HTLC read from disk. This method is required to sync the
// in-memory state of the state machine with that read from persistent storage.
func (lc *LightningChannel) restoreStateLogs() error {
var pastHeight uint64
if lc.currentHeight > 0 {
pastHeight = lc.currentHeight - 1
}
// Obtain the local and remote channel configurations. These house all
// the relevant public keys and points we'll need in order to restore
// the state log.
@ -1681,8 +1706,6 @@ func (lc *LightningChannel) restoreStateLogs() error {
remoteCommitKeys := deriveCommitmentKeys(remoteCommitPoint, false,
localChanCfg, remoteChanCfg)
var ourCounter, theirCounter uint64
// Grab the current fee rate as we'll need this to determine if the
// prior HTLC's were considered dust or not at this particular
// commitment state.
@ -1728,12 +1751,14 @@ func (lc *LightningChannel) restoreStateLogs() error {
}
pd := &PaymentDescriptor{
RHash: htlc.RHash,
Timeout: htlc.RefundTimeout,
Amount: htlc.Amt,
EntryType: Add,
addCommitHeightRemote: pastHeight,
addCommitHeightLocal: pastHeight,
RHash: htlc.RHash,
Timeout: htlc.RefundTimeout,
Amount: htlc.Amt,
EntryType: Add,
Index: htlc.DescriptorIndex,
addCommitHeightRemote: htlc.AddRemoteInclusionHeight,
addCommitHeightLocal: htlc.AddLocalInclusionHeight,
OnionBlob: htlc.OnionBlob,
ourPkScript: ourP2WSH,
ourWitnessScript: ourWitnessScript,
theirPkScript: theirP2WSH,
@ -1743,22 +1768,14 @@ func (lc *LightningChannel) restoreStateLogs() error {
if !htlc.Incoming {
pd.HtlcIndex = ourCounter
lc.localUpdateLog.appendHtlc(pd)
ourCounter++
} else {
pd.HtlcIndex = theirCounter
lc.remoteUpdateLog.appendHtlc(pd)
lc.rHashMap[pd.RHash] = append(lc.rHashMap[pd.RHash], pd)
theirCounter++
lc.rHashMap[pd.RHash] = append(lc.rHashMap[pd.RHash], pd)
}
}
lc.localCommitChain.tail().ourMessageIndex = ourCounter
lc.localCommitChain.tail().theirMessageIndex = theirCounter
lc.remoteCommitChain.tail().ourMessageIndex = ourCounter
lc.remoteCommitChain.tail().theirMessageIndex = theirCounter
return nil
}
@ -2521,8 +2538,13 @@ func (lc *LightningChannel) SignNextCommitment() (*btcec.Signature, []*btcec.Sig
}
// ...
if err := channeldb.AddCommitDiff(lc.channelState.Db, commitDiff); err != nil {
fmt.Println(err)
if err := channeldb.AddCommitDiff(lc.channelState.Db,
&lc.channelState.FundingOutpoint,
commitDiff); err != nil {
return nil, nil, err
}
if err := lc.channelState.UpdateHTLCs(newCommitView.htlcs(false)); err != nil {
return nil, nil, err
}
@ -2582,6 +2604,9 @@ func (lc *LightningChannel) ReceiveReestablish(msg *lnwire.ChannelReestablish) (
// last commit sig message.
commitment := lc.remoteCommitChain.tip()
chanID := lnwire.NewChanIDFromOutPoint(&lc.channelState.FundingOutpoint)
// TODO: Read from update log, which will contains settle/fail
// updates also.
for _, htlc := range commitment.outgoingHTLCs {
// If htlc is included in the local commitment chain (have been
// included by remote side) or htlc is included in remote chain, but
@ -3014,6 +3039,11 @@ func (lc *LightningChannel) FullySynced() bool {
remoteUpdatesSynced :=
lastLocalCommit.theirMessageIndex == lastRemoteCommit.theirMessageIndex
fmt.Println("remote, our:", lc.remoteCommitChain.tip().ourMessageIndex,
"local, our:", lc.localCommitChain.tip().ourMessageIndex)
fmt.Println("remote, their:", lc.remoteCommitChain.tip().theirMessageIndex,
"local, their:", lc.localCommitChain.tip().theirMessageIndex)
fmt.Println(!oweCommitment, localUpdatesSynced, remoteUpdatesSynced)
return !oweCommitment && localUpdatesSynced && remoteUpdatesSynced
}
@ -3140,6 +3170,9 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*P
// * either record add height, or set to N - 1
uncomitted := (htlc.addCommitHeightRemote == 0 ||
htlc.addCommitHeightLocal == 0)
fmt.Println(remoteChainTail, localChainTail,
htlc.addCommitHeightRemote,
htlc.addCommitHeightLocal)
if htlc.EntryType == Add && uncomitted {
continue
}