From 53e4422a2ea127928079a832625f9d5d8f3e536d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 27 Feb 2018 20:01:41 -0800 Subject: [PATCH] lnwallet/channel: integrate fwdpkgs w/ in-mem buffering --- lnwallet/channel.go | 421 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 362 insertions(+), 59 deletions(-) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 3cb97f81..295a8b5d 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -209,6 +209,37 @@ type PaymentDescriptor struct { // Settle. ParentIndex uint64 + // SourceRef points to an Add update in a forwarding package owned by + // this channel. + // + // NOTE: This field will only be populated if EntryType is Fail or + // Settle. + SourceRef *channeldb.AddRef + + // DestRef points to a Fail/Settle update in another link's forwarding + // package. + // + // NOTE: This field will only be populated if EntryType is Fail or + // Settle, and the forwarded Add successfully included in an outgoing + // link's commitment txn. + DestRef *channeldb.SettleFailRef + + // OpenCircuitRef references the incoming Chan/HTLC ID of an Add HTLC + // packet delivered by the switch. + // + // NOTE: This field is only populated for payment descriptors in the + // *local* update log, and if the Add packet was delivered by the + // switch. + OpenCircuitRef *channeldb.CircuitKey + + // ClosedCircuitRef references the incoming Chan/HTLC ID of the Add HTLC + // that opened the circuit. + // + // NOTE: This field is only populated for payment descriptors in the + // *local* update log, and if settle/fails have a committed circuit in + // the circuit map. + ClosedCircuitRef *channeldb.CircuitKey + // localOutputIndex is the output index of this HTLc output in the // commitment transaction of the local node. // @@ -291,6 +322,96 @@ type PaymentDescriptor struct { isForwarded bool } +// PayDescsFromRemoteLogUpdates converts a slice of LogUpdates received from the +// remote peer into PaymentDescriptors to inform a link's forwarding decisions. +// +// NOTE: The provided `logUpdates` MUST corresponding exactly to either the Adds +// or SettleFails in this channel's forwarding package at `height`. +func (lc *LightningChannel) PayDescsFromRemoteLogUpdates(height uint64, + logUpdates []channeldb.LogUpdate) []*PaymentDescriptor { + + lc.RLock() + defer lc.RUnlock() + + // Allocate enough space to hold all of the payment descriptors we will + // reconstruct, and also the list of pointers that will be returned to + // the caller. + payDescs := make([]PaymentDescriptor, 0, len(logUpdates)) + payDescPtrs := make([]*PaymentDescriptor, 0, len(logUpdates)) + + // Iterate over the log updates we loaded from disk, and reconstruct the + // payment descriptor corresponding to one of the four types of htlcs we + // can receive from the remote peer. We only repopulate the information + // necessary to process the packets and, if necessary, forward them to + // the switch. + // + // For each log update, we include either an AddRef or a SettleFailRef + // so that they can be ACK'd and garbage collected. + for i, logUpdate := range logUpdates { + var pd PaymentDescriptor + switch wireMsg := logUpdate.UpdateMsg.(type) { + + case *lnwire.UpdateAddHTLC: + pd = PaymentDescriptor{ + RHash: wireMsg.PaymentHash, + Timeout: wireMsg.Expiry, + Amount: wireMsg.Amount, + EntryType: Add, + HtlcIndex: wireMsg.ID, + LogIndex: logUpdate.LogIndex, + SourceRef: &channeldb.AddRef{ + Height: height, + Index: uint16(i), + }, + } + pd.OnionBlob = make([]byte, len(wireMsg.OnionBlob)) + copy(pd.OnionBlob[:], wireMsg.OnionBlob[:]) + + case *lnwire.UpdateFulfillHTLC: + pd = PaymentDescriptor{ + RPreimage: wireMsg.PaymentPreimage, + ParentIndex: wireMsg.ID, + EntryType: Settle, + DestRef: &channeldb.SettleFailRef{ + Source: lc.ShortChanID(), + Height: height, + Index: uint16(i), + }, + } + + case *lnwire.UpdateFailHTLC: + pd = PaymentDescriptor{ + ParentIndex: wireMsg.ID, + EntryType: Fail, + FailReason: wireMsg.Reason[:], + DestRef: &channeldb.SettleFailRef{ + Source: lc.ShortChanID(), + Height: height, + Index: uint16(i), + }, + } + + case *lnwire.UpdateFailMalformedHTLC: + pd = PaymentDescriptor{ + ParentIndex: wireMsg.ID, + EntryType: MalformedFail, + FailCode: wireMsg.FailureCode, + ShaOnionBlob: wireMsg.ShaOnionBlob, + DestRef: &channeldb.SettleFailRef{ + Source: lc.ShortChanID(), + Height: height, + Index: uint16(i), + }, + } + } + + payDescs = append(payDescs, pd) + payDescPtrs = append(payDescPtrs, &payDescs[i]) + } + + return payDescPtrs +} + // commitment represents a commitment to a new state within an active channel. // New commitments can be initiated by either side. Commitments are ordered // into a commitment chain, with one existing for both parties. Each side can @@ -2572,6 +2693,13 @@ func (lc *LightningChannel) createCommitDiff( }) } + var ( + ackAddRefs []channeldb.AddRef + settleFailRefs []channeldb.SettleFailRef + openCircuitKeys []channeldb.CircuitKey + closedCircuitKeys []channeldb.CircuitKey + ) + // We'll now run through our local update log to locate the items which // were only just committed within this pending state. This will be the // set of items we need to retransmit if we reconnect and find that @@ -2611,6 +2739,20 @@ func (lc *LightningChannel) createCommitDiff( copy(htlc.OnionBlob[:], pd.OnionBlob) logUpdate.UpdateMsg = htlc + // Gather any references for circuits opened by this Add + // HTLC. + if pd.OpenCircuitRef != nil { + openCircuitKeys = append(openCircuitKeys, + *pd.OpenCircuitRef) + } + + logUpdates = append(logUpdates, logUpdate) + + // Short circuit here since an add should not have any + // of the references gathered in the case of settles, + // fails or malformed fails. + continue + case Settle: logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{ ChanID: chanID, @@ -2634,6 +2776,19 @@ func (lc *LightningChannel) createCommitDiff( } } + // Gather the fwd pkg references from any settle or fail + // packets, if they exist. + if pd.SourceRef != nil { + ackAddRefs = append(ackAddRefs, *pd.SourceRef) + } + if pd.DestRef != nil { + settleFailRefs = append(settleFailRefs, *pd.DestRef) + } + if pd.ClosedCircuitRef != nil { + closedCircuitKeys = append(closedCircuitKeys, + *pd.ClosedCircuitRef) + } + logUpdates = append(logUpdates, logUpdate) } @@ -2651,7 +2806,11 @@ func (lc *LightningChannel) createCommitDiff( CommitSig: commitSig, HtlcSigs: htlcSigs, }, - LogUpdates: logUpdates, + LogUpdates: logUpdates, + OpenedCircuitKeys: openCircuitKeys, + ClosedCircuitKeys: closedCircuitKeys, + AddAcks: ackAddRefs, + SettleFailAcks: settleFailRefs, }, nil } @@ -2832,7 +2991,8 @@ func (lc *LightningChannel) SignNextCommitment() (lnwire.Sig, []lnwire.Sig, erro // have not received // * RevokeAndAck: if we sent a revocation message that they claim to have // not received -func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ([]lnwire.Message, error) { +func (lc *LightningChannel) ProcessChanSyncMsg( + msg *lnwire.ChannelReestablish) ([]lnwire.Message, error) { // We owe them a commitment if they have an un-acked commitment and the // tip of their chain (from our Pov) is equal to what they think their @@ -2850,7 +3010,13 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ( // chain sync message. If we're de-synchronized, then we'll send a // batch of messages which when applied will kick start the chain // resync. - var updates []lnwire.Message + var ( + updates []lnwire.Message + // TODO(conner): uncomment after API exposes these return + // variables, this permits compilation in the meantime + //openedCircuits []channeldb.CircuitKey + //closedCircuits []channeldb.CircuitKey + ) // If the remote party included the optional fields, then we'll verify // their correctness first, as it will influence our decisions below. @@ -2974,9 +3140,12 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ( // commitment chain with our local version of their chain. updates = append(updates, commitDiff.CommitSig) - } else if !oweCommitment && remoteChainTip.height+1 != - msg.NextLocalCommitHeight { + // TODO(conner): uncomment after API exposes these return + // variables, this permits compilation in the meantime + //openedCircuits = commitDiff.OpenedCircuitKeys + //closedCircuits = commitDiff.ClosedCircuitKeys + } else if remoteChainTip.height+1 != msg.NextLocalCommitHeight { if err := lc.channelState.MarkBorked(); err != nil { return nil, err } @@ -3692,7 +3861,9 @@ func (lc *LightningChannel) RevokeCurrentCommitment() (*lnwire.RevokeAndAck, []c // successful, then the remote commitment chain is advanced by a single // commitment, and a log compaction is attempted. In addition, a slice of // HTLC's which can be forwarded upstream are returned. -func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*PaymentDescriptor, error) { +func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ( + []*PaymentDescriptor, error) { + lc.Lock() defer lc.Unlock() @@ -3728,12 +3899,155 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*P lc.remoteCommitChain.tail().height, lc.remoteCommitChain.tail().height+1) + // Add one to the remote tail since this will be height *after* we write + // the revocation to disk, the local height will remain unchanged. + remoteChainTail := lc.remoteCommitChain.tail().height + 1 + localChainTail := lc.localCommitChain.tail().height + + chanID := lnwire.NewChanIDFromOutPoint(&lc.channelState.FundingOutpoint) + + // Determine the set of htlcs that can be forwarded as a result of + // having received the revocation. We will simultaneously construct the + // log updates and payment descriptors, allowing us to persist the log + // updates to disk and optimistically buffer the forwarding package in + // memory. + var ( + addsToForward []*PaymentDescriptor + addUpdates []channeldb.LogUpdate + settleFailsToForward []*PaymentDescriptor + settleFailUpdates []channeldb.LogUpdate + ) + + var addIndex, settleFailIndex uint16 + for e := lc.remoteUpdateLog.Front(); e != nil; e = e.Next() { + pd := e.Value.(*PaymentDescriptor) + + if pd.isForwarded { + continue + } + + uncommitted := (pd.addCommitHeightRemote == 0 || + pd.addCommitHeightLocal == 0) + if pd.EntryType == Add && uncommitted { + continue + } + + // Using the height of the remote and local commitments, + // preemptively compute whether or not to forward this HTLC for + // the case in which this in an Add HTLC, or if this is a + // Settle, Fail, or MalformedFail. + shouldFwdAdd := remoteChainTail == pd.addCommitHeightRemote && + localChainTail >= pd.addCommitHeightLocal + shouldFwdRmv := remoteChainTail >= pd.removeCommitHeightRemote && + localChainTail >= pd.removeCommitHeightLocal + + // We'll only forward any new HTLC additions iff, it's "freshly + // locked in". Meaning that the HTLC was only *just* considered + // locked-in at this new state. By doing this we ensure that we + // don't re-forward any already processed HTLC's after a + // restart. + switch { + case pd.EntryType == Add && shouldFwdAdd: + + // Construct a reference specifying the location that + // this forwarded Add will be written in the forwarding + // package constructed at this remote height. + pd.SourceRef = &channeldb.AddRef{ + Height: remoteChainTail, + Index: addIndex, + } + addIndex++ + + pd.isForwarded = true + addsToForward = append(addsToForward, pd) + + case pd.EntryType != Add && shouldFwdRmv: + + // Construct a reference specifying the location that + // this forwarded Settle/Fail will be written in the + // forwarding package constructed at this remote height. + pd.DestRef = &channeldb.SettleFailRef{ + Source: lc.ShortChanID(), + Height: remoteChainTail, + Index: settleFailIndex, + } + settleFailIndex++ + + pd.isForwarded = true + settleFailsToForward = append(settleFailsToForward, pd) + + default: + continue + } + + // If we've reached this point, this HTLC will be added to the + // forwarding package at the height of the remote commitment. + // All types of HTLCs will record their assigned log index. + logUpdate := channeldb.LogUpdate{ + LogIndex: pd.LogIndex, + } + + // Next, we'll map the type of the PaymentDescriptor to one of + // the four messages that it corresponds to and separate the + // updates into Adds and Settle/Fail/MalformedFail such that + // they can be written in the forwarding package. Adds are + // aggregated separately from the other types of HTLCs. + switch pd.EntryType { + case Add: + htlc := &lnwire.UpdateAddHTLC{ + ChanID: chanID, + ID: pd.HtlcIndex, + Amount: pd.Amount, + Expiry: pd.Timeout, + PaymentHash: pd.RHash, + } + copy(htlc.OnionBlob[:], pd.OnionBlob) + logUpdate.UpdateMsg = htlc + addUpdates = append(addUpdates, logUpdate) + + case Settle: + logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{ + ChanID: chanID, + ID: pd.ParentIndex, + PaymentPreimage: pd.RPreimage, + } + settleFailUpdates = append(settleFailUpdates, logUpdate) + + case Fail: + logUpdate.UpdateMsg = &lnwire.UpdateFailHTLC{ + ChanID: chanID, + ID: pd.ParentIndex, + Reason: pd.FailReason, + } + settleFailUpdates = append(settleFailUpdates, logUpdate) + + case MalformedFail: + logUpdate.UpdateMsg = &lnwire.UpdateFailMalformedHTLC{ + ChanID: chanID, + ID: pd.ParentIndex, + ShaOnionBlob: pd.ShaOnionBlob, + FailureCode: pd.FailCode, + } + settleFailUpdates = append(settleFailUpdates, logUpdate) + } + } + + source := lc.channelState.ShortChanID + + // Now that we have gathered the set of HTLCs to forward, separated by + // type, construct a forwarding package using the height that the remote + // commitment chain will be extended after persisting the revocation. + fwdPkg := channeldb.NewFwdPkg( + source, remoteChainTail, addUpdates, settleFailUpdates, + ) + // At this point, the revocation has been accepted, and we've rotated // the current revocation key+hash for the remote party. Therefore we // sync now to ensure the revocation producer state is consistent with // the current commitment height and also to advance the on-disk // commitment chain. - if err := lc.channelState.AdvanceCommitChainTail(); err != nil { + err = lc.channelState.AdvanceCommitChainTail(fwdPkg) + if err != nil { return nil, err } @@ -3741,59 +4055,37 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*P // chain, we can advance their chain by a single commitment. lc.remoteCommitChain.advanceTail() - remoteChainTail := lc.remoteCommitChain.tail().height - localChainTail := lc.localCommitChain.tail().height - - // Now that we've verified the revocation update the state of the HTLC - // log as we may be able to prune portions of it now, and update their - // balance. - var htlcsToForward []*PaymentDescriptor - for e := lc.remoteUpdateLog.Front(); e != nil; e = e.Next() { - htlc := e.Value.(*PaymentDescriptor) - - if htlc.isForwarded { - continue - } - - uncommitted := (htlc.addCommitHeightRemote == 0 || - htlc.addCommitHeightLocal == 0) - if htlc.EntryType == Add && uncommitted { - continue - } - - // We'll only forward any new HTLC additions iff, it's "freshly - // locked in". Meaning that the HTLC was only *just* considered - // locked-in at this new state. By doing this we ensure that we - // don't re-forward any already processed HTLC's after a - // restart. - if htlc.EntryType == Add && - remoteChainTail == htlc.addCommitHeightRemote && - localChainTail >= htlc.addCommitHeightLocal { - - htlc.isForwarded = true - htlcsToForward = append(htlcsToForward, htlc) - continue - } - - if htlc.EntryType != Add && - remoteChainTail >= htlc.removeCommitHeightRemote && - localChainTail >= htlc.removeCommitHeightLocal { - - htlc.isForwarded = true - htlcsToForward = append(htlcsToForward, htlc) - continue - } - } - // As we've just completed a new state transition, attempt to see if we // can remove any entries from the update log which have been removed // from the PoV of both commitment chains. compactLogs(lc.localUpdateLog, lc.remoteUpdateLog, localChainTail, remoteChainTail) + htlcsToForward := append(settleFailsToForward, + addsToForward...) + return htlcsToForward, nil } +// LoadFwdPkgs loads any pending log updates from disk and returns the payment +// descriptors to be processed by the link. +func (lc *LightningChannel) LoadFwdPkgs() ([]*channeldb.FwdPkg, error) { + return lc.channelState.LoadFwdPkgs() +} + +// SetFwdFilter writes the forwarding decision for a given remote commitment +// height. +func (lc *LightningChannel) SetFwdFilter(height uint64, + fwdFilter *channeldb.PkgFilter) error { + + return lc.channelState.SetFwdFilter(height, fwdFilter) +} + +// RemoveFwdPkg permanently deletes the forwarding package at the given height. +func (lc *LightningChannel) RemoveFwdPkg(height uint64) error { + return lc.channelState.RemoveFwdPkg(height) +} + // NextRevocationKey returns the commitment point for the _next_ commitment // height. The pubkey returned by this function is required by the remote party // along with their revocation base to to extend our commitment chain with a @@ -3825,6 +4117,7 @@ func (lc *LightningChannel) InitNextRevocation(revKey *btcec.PublicKey) error { // AddHTLC adds an HTLC to the state machine's local update log. This method // should be called when preparing to send an outgoing HTLC. func (lc *LightningChannel) AddHTLC(htlc *lnwire.UpdateAddHTLC) (uint64, error) { + lc.Lock() defer lc.Unlock() @@ -3885,8 +4178,7 @@ func (lc *LightningChannel) ReceiveHTLC(htlc *lnwire.UpdateAddHTLC) (uint64, err // creating the corresponding wire message. In the case the supplied preimage // is invalid, an error is returned. Additionally, the value of the settled // HTLC is also returned. -func (lc *LightningChannel) SettleHTLC(preimage [32]byte, htlcIndex uint64, -) error { +func (lc *LightningChannel) SettleHTLC(preimage [32]byte, htlcIndex uint64) error { lc.Lock() defer lc.Unlock() @@ -3952,6 +4244,7 @@ func (lc *LightningChannel) ReceiveHTLCSettle(preimage [32]byte, htlcIndex uint6 // update. This method is intended to be called in order to cancel in // _incoming_ HTLC. func (lc *LightningChannel) FailHTLC(htlcIndex uint64, reason []byte) error { + lc.Lock() defer lc.Unlock() @@ -4041,9 +4334,6 @@ func (lc *LightningChannel) ReceiveFailHTLC(htlcIndex uint64, reason []byte, // created this active channel. This outpoint is used throughout various // subsystems to uniquely identify an open channel. func (lc *LightningChannel) ChannelPoint() *wire.OutPoint { - lc.RLock() - defer lc.RUnlock() - return &lc.channelState.FundingOutpoint } @@ -4051,9 +4341,6 @@ func (lc *LightningChannel) ChannelPoint() *wire.OutPoint { // ID encodes the exact location in the main chain that the original // funding output can be found. func (lc *LightningChannel) ShortChanID() lnwire.ShortChannelID { - lc.RLock() - defer lc.RUnlock() - return lc.channelState.ShortChanID } @@ -5417,3 +5704,19 @@ func (lc *LightningChannel) ActiveHtlcs() []channeldb.HTLC { func (lc *LightningChannel) LocalChanReserve() btcutil.Amount { return lc.localChanCfg.ChanReserve } + +// LocalHtlcIndex returns the next local htlc index to be allocated. +func (lc *LightningChannel) LocalHtlcIndex() uint64 { + lc.RLock() + defer lc.RUnlock() + + return lc.channelState.LocalCommitment.LocalHtlcIndex +} + +// RemoteCommitHeight returns the commitment height of the remote chain. +func (lc *LightningChannel) RemoteCommitHeight() uint64 { + lc.RLock() + defer lc.RUnlock() + + return lc.channelState.RemoteCommitment.CommitHeight +}