channeldb+lnwallet: add remote pending commiment persistance

This commit is contained in:
Andrey Samokhvalov 2017-08-14 12:14:04 +03:00 committed by Olaoluwa Osuntokun
parent cfa09c1cff
commit 28dd6e5d84
2 changed files with 215 additions and 8 deletions

@ -36,6 +36,9 @@ var (
// active channels. // active channels.
chanIDBucket = []byte("cib") chanIDBucket = []byte("cib")
// commitDiffBucket...
commitDiffBucket = []byte("cdb")
// closedChannelBucket stores summarization information concerning // closedChannelBucket stores summarization information concerning
// previously open, but now closed channels. // previously open, but now closed channels.
closedChannelBucket = []byte("ccb") closedChannelBucket = []byte("ccb")
@ -337,6 +340,9 @@ type OpenChannel struct {
// within the channel. // within the channel.
Htlcs []*HTLC Htlcs []*HTLC
// LastUpdates...
LastUpdates lnwire.Message
// TODO(roasbeef): eww // TODO(roasbeef): eww
Db *DB Db *DB
@ -586,6 +592,114 @@ type ChannelDelta struct {
Htlcs []*HTLC Htlcs []*HTLC
} }
// CommitDiff...
type CommitDiff struct {
// PendingHeight...
PendingHeight uint64
// PendingCommitment...
PendingCommitment *ChannelDelta
// Updates...
Updates []lnwire.Message
}
// decode...
func (d *CommitDiff) decode(w io.Writer) error {
var h [8]byte
binary.BigEndian.PutUint64(h[:], d.PendingHeight)
if _, err := w.Write(h[:]); err != nil {
return err
}
if err := serializeChannelDelta(w, d.PendingCommitment); err != nil {
return err
}
var l [2]byte
binary.BigEndian.PutUint16(l[:], uint16(len(d.Updates)))
if _, err := w.Write(l[:]); err != nil {
return err
}
for _, msg := range d.Updates {
if _, err := lnwire.WriteMessage(w, msg, 0); err != nil {
return err
}
}
return nil
}
// encode...
func (d *CommitDiff) encode(r io.Reader) error {
var h [8]byte
if _, err := r.Read(h[:]); err != nil {
return err
}
d.PendingHeight = binary.BigEndian.Uint64(h[:])
delta, err := deserializeChannelDelta(r)
if err != nil {
return err
}
d.PendingCommitment = delta
var l [2]byte
if _, err := r.Read(l[:]); err != nil {
return err
}
d.Updates = make([]lnwire.Message, binary.BigEndian.Uint16(l[:]))
for i, _ := range d.Updates {
msg, err := lnwire.ReadMessage(r, 0)
if err != nil {
return err
}
d.Updates[i] = msg
}
return nil
}
// AddCommitDiff...
func AddCommitDiff(db *DB, diff *CommitDiff) error {
return db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(commitDiffBucket)
if err != nil {
return err
}
var b bytes.Buffer
if err := diff.decode(&b); err != nil {
return err
}
return bucket.Put([]byte("cdf"), b.Bytes())
})
}
// FetchCommitDiff...
func FetchCommitDiff(db *DB) (*CommitDiff, error) {
var diff *CommitDiff
err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(commitDiffBucket)
if bucket == nil {
return errors.New("commit diff bucket haven't been found")
}
data := bucket.Get([]byte("cdf"))
if data != nil {
return errors.New("unable to find commit diff")
}
diff = &CommitDiff{}
return diff.encode(bytes.NewReader(data))
})
return diff, err
}
// InsertNextRevocation inserts the _next_ commitment point (revocation) into // InsertNextRevocation inserts the _next_ commitment point (revocation) into
// the database, and also modifies the internal RemoteNextRevocation attribute // the database, and also modifies the internal RemoteNextRevocation attribute
// to point to the passed key. This method is to be using during final channel // to point to the passed key. This method is to be using during final channel
@ -651,6 +765,16 @@ func (c *OpenChannel) AppendToRevocationLog(delta *ChannelDelta) error {
return err return err
} }
// ...
diffBucket := tx.Bucket(commitDiffBucket)
if diffBucket != nil {
if diffBucket.Get([]byte("cdf")) != nil {
if err := diffBucket.Delete([]byte("cdf")); err != nil {
return err
}
}
}
return appendChannelLogEntry(logBucket, delta, &c.FundingOutpoint) return appendChannelLogEntry(logBucket, delta, &c.FundingOutpoint)
}) })
} }

@ -1003,20 +1003,39 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier,
err != channeldb.ErrNoPastDeltas { err != channeldb.ErrNoPastDeltas {
return nil, err return nil, err
} }
remoteCommitment := &commitment{ remoteCommitment := &commitment{}
ourBalance: state.LocalBalance,
ourMessageIndex: 0,
theirBalance: state.RemoteBalance,
theirMessageIndex: 0,
fee: state.CommitFee,
feePerKw: state.FeePerKw,
}
if logTail == nil { if logTail == nil {
remoteCommitment.ourBalance = state.LocalBalance
remoteCommitment.ourMessageIndex = 0
remoteCommitment.theirBalance = state.RemoteBalance
remoteCommitment.theirMessageIndex = 0
remoteCommitment.fee = state.CommitFee
remoteCommitment.feePerKw = state.FeePerKw
remoteCommitment.height = 0 remoteCommitment.height = 0
} else { } else {
remoteCommitment.ourBalance = logTail.LocalBalance
remoteCommitment.ourMessageIndex = 0
remoteCommitment.theirBalance = logTail.RemoteBalance
remoteCommitment.theirMessageIndex = 0
remoteCommitment.fee = logTail.CommitFee
remoteCommitment.feePerKw = logTail.FeePerKw
remoteCommitment.height = logTail.UpdateNum + 1 remoteCommitment.height = logTail.UpdateNum + 1
} }
lc.remoteCommitChain.addCommitment(remoteCommitment) lc.remoteCommitChain.addCommitment(remoteCommitment)
commitDiff, err := channeldb.FetchCommitDiff(lc.channelState.Db)
if err == nil {
lc.remoteCommitChain.addCommitment(&commitment{
height: commitDiff.PendingHeight,
ourBalance: commitDiff.PendingCommitment.LocalBalance,
theirBalance: commitDiff.PendingCommitment.RemoteBalance,
ourMessageIndex: 0,
theirMessageIndex: 0,
fee: commitDiff.PendingCommitment.CommitFee,
feePerKw: commitDiff.PendingCommitment.FeePerKw,
})
}
walletLog.Debugf("ChannelPoint(%v), starting remote commitment: %v", walletLog.Debugf("ChannelPoint(%v), starting remote commitment: %v",
state.FundingOutpoint, newLogClosure(func() string { state.FundingOutpoint, newLogClosure(func() string {
return spew.Sdump(lc.remoteCommitChain.tail()) return spew.Sdump(lc.remoteCommitChain.tail())
@ -2323,6 +2342,58 @@ func genRemoteHtlcSigJobs(keyRing *commitmentKeyRing,
return sigBatch, cancelChan, nil return sigBatch, cancelChan, nil
} }
// createCommitDiff
func (lc *LightningChannel) createCommitDiff(commitment *commitment) (*channeldb.CommitDiff,
error) {
chanID := lnwire.NewChanIDFromOutPoint(&lc.channelState.FundingOutpoint)
var htlcs []lnwire.Message
for e := lc.localUpdateLog.Front(); e != nil; e = e.Next() {
pd := e.Value.(*PaymentDescriptor)
// ...
var htlc lnwire.Message
if pd.addCommitHeightRemote == commitment.height {
switch pd.EntryType {
case Add:
continue
case Settle:
htlc = &lnwire.UpdateFufillHTLC{
ChanID: chanID,
ID: pd.Index,
PaymentPreimage: pd.RPreimage,
}
case Fail:
htlc = &lnwire.UpdateFailHTLC{
ChanID: chanID,
ID: pd.Index,
Reason: pd.FailReason,
}
case MalformedFail:
htlc = &lnwire.UpdateFailMalformedHTLC{
ChanID: chanID,
ID: pd.Index,
ShaOnionBlob: pd.ShaOnionBlob,
FailureCode: pd.FailCode,
}
}
htlcs = append(htlcs, htlc)
}
}
delta, err := commitment.toChannelDelta(false)
if err != nil {
return nil, err
}
return &channeldb.CommitDiff{
PendingHeight: commitment.height,
PendingCommitment: delta,
Updates: htlcs,
}, nil
}
// SignNextCommitment signs a new commitment which includes any previous // SignNextCommitment signs a new commitment which includes any previous
// unsettled HTLCs, any new HTLCs, and any modifications to prior HTLCs // unsettled HTLCs, any new HTLCs, and any modifications to prior HTLCs
// committed in previous commitment updates. Signing a new commitment // committed in previous commitment updates. Signing a new commitment
@ -2443,6 +2514,18 @@ func (lc *LightningChannel) SignNextCommitment() (*btcec.Signature, []*btcec.Sig
// latest commitment update. // latest commitment update.
lc.remoteCommitChain.addCommitment(newCommitView) lc.remoteCommitChain.addCommitment(newCommitView)
// ...
commitDiff, err := lc.createCommitDiff(newCommitView)
if err != nil {
return nil, nil, err
}
// ...
if err := channeldb.AddCommitDiff(lc.channelState.Db, commitDiff); err != nil {
fmt.Println(err)
return nil, nil, err
}
// If we are the channel initiator then we would have signed any sent // If we are the channel initiator then we would have signed any sent
// fee update at this point, so mark this update as pending ACK, and // fee update at this point, so mark this update as pending ACK, and
// set pendingFeeUpdate to nil. We can do this since we know we won't // set pendingFeeUpdate to nil. We can do this since we know we won't