lnwallet/channel: integrate fwdpkgs w/ in-mem buffering

This commit is contained in:
Conner Fromknecht 2018-02-27 20:01:41 -08:00
parent 7a93c7530c
commit 53e4422a2e
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -209,6 +209,37 @@ type PaymentDescriptor struct {
// Settle. // Settle.
ParentIndex uint64 ParentIndex uint64
// SourceRef points to an Add update in a forwarding package owned by
// this channel.
//
// NOTE: This field will only be populated if EntryType is Fail or
// Settle.
SourceRef *channeldb.AddRef
// DestRef points to a Fail/Settle update in another link's forwarding
// package.
//
// NOTE: This field will only be populated if EntryType is Fail or
// Settle, and the forwarded Add successfully included in an outgoing
// link's commitment txn.
DestRef *channeldb.SettleFailRef
// OpenCircuitRef references the incoming Chan/HTLC ID of an Add HTLC
// packet delivered by the switch.
//
// NOTE: This field is only populated for payment descriptors in the
// *local* update log, and if the Add packet was delivered by the
// switch.
OpenCircuitRef *channeldb.CircuitKey
// ClosedCircuitRef references the incoming Chan/HTLC ID of the Add HTLC
// that opened the circuit.
//
// NOTE: This field is only populated for payment descriptors in the
// *local* update log, and if settle/fails have a committed circuit in
// the circuit map.
ClosedCircuitRef *channeldb.CircuitKey
// localOutputIndex is the output index of this HTLc output in the // localOutputIndex is the output index of this HTLc output in the
// commitment transaction of the local node. // commitment transaction of the local node.
// //
@ -291,6 +322,96 @@ type PaymentDescriptor struct {
isForwarded bool isForwarded bool
} }
// PayDescsFromRemoteLogUpdates converts a slice of LogUpdates received from the
// remote peer into PaymentDescriptors to inform a link's forwarding decisions.
//
// NOTE: The provided `logUpdates` MUST corresponding exactly to either the Adds
// or SettleFails in this channel's forwarding package at `height`.
func (lc *LightningChannel) PayDescsFromRemoteLogUpdates(height uint64,
logUpdates []channeldb.LogUpdate) []*PaymentDescriptor {
lc.RLock()
defer lc.RUnlock()
// Allocate enough space to hold all of the payment descriptors we will
// reconstruct, and also the list of pointers that will be returned to
// the caller.
payDescs := make([]PaymentDescriptor, 0, len(logUpdates))
payDescPtrs := make([]*PaymentDescriptor, 0, len(logUpdates))
// Iterate over the log updates we loaded from disk, and reconstruct the
// payment descriptor corresponding to one of the four types of htlcs we
// can receive from the remote peer. We only repopulate the information
// necessary to process the packets and, if necessary, forward them to
// the switch.
//
// For each log update, we include either an AddRef or a SettleFailRef
// so that they can be ACK'd and garbage collected.
for i, logUpdate := range logUpdates {
var pd PaymentDescriptor
switch wireMsg := logUpdate.UpdateMsg.(type) {
case *lnwire.UpdateAddHTLC:
pd = PaymentDescriptor{
RHash: wireMsg.PaymentHash,
Timeout: wireMsg.Expiry,
Amount: wireMsg.Amount,
EntryType: Add,
HtlcIndex: wireMsg.ID,
LogIndex: logUpdate.LogIndex,
SourceRef: &channeldb.AddRef{
Height: height,
Index: uint16(i),
},
}
pd.OnionBlob = make([]byte, len(wireMsg.OnionBlob))
copy(pd.OnionBlob[:], wireMsg.OnionBlob[:])
case *lnwire.UpdateFulfillHTLC:
pd = PaymentDescriptor{
RPreimage: wireMsg.PaymentPreimage,
ParentIndex: wireMsg.ID,
EntryType: Settle,
DestRef: &channeldb.SettleFailRef{
Source: lc.ShortChanID(),
Height: height,
Index: uint16(i),
},
}
case *lnwire.UpdateFailHTLC:
pd = PaymentDescriptor{
ParentIndex: wireMsg.ID,
EntryType: Fail,
FailReason: wireMsg.Reason[:],
DestRef: &channeldb.SettleFailRef{
Source: lc.ShortChanID(),
Height: height,
Index: uint16(i),
},
}
case *lnwire.UpdateFailMalformedHTLC:
pd = PaymentDescriptor{
ParentIndex: wireMsg.ID,
EntryType: MalformedFail,
FailCode: wireMsg.FailureCode,
ShaOnionBlob: wireMsg.ShaOnionBlob,
DestRef: &channeldb.SettleFailRef{
Source: lc.ShortChanID(),
Height: height,
Index: uint16(i),
},
}
}
payDescs = append(payDescs, pd)
payDescPtrs = append(payDescPtrs, &payDescs[i])
}
return payDescPtrs
}
// commitment represents a commitment to a new state within an active channel. // commitment represents a commitment to a new state within an active channel.
// New commitments can be initiated by either side. Commitments are ordered // New commitments can be initiated by either side. Commitments are ordered
// into a commitment chain, with one existing for both parties. Each side can // into a commitment chain, with one existing for both parties. Each side can
@ -2572,6 +2693,13 @@ func (lc *LightningChannel) createCommitDiff(
}) })
} }
var (
ackAddRefs []channeldb.AddRef
settleFailRefs []channeldb.SettleFailRef
openCircuitKeys []channeldb.CircuitKey
closedCircuitKeys []channeldb.CircuitKey
)
// We'll now run through our local update log to locate the items which // We'll now run through our local update log to locate the items which
// were only just committed within this pending state. This will be the // were only just committed within this pending state. This will be the
// set of items we need to retransmit if we reconnect and find that // set of items we need to retransmit if we reconnect and find that
@ -2611,6 +2739,20 @@ func (lc *LightningChannel) createCommitDiff(
copy(htlc.OnionBlob[:], pd.OnionBlob) copy(htlc.OnionBlob[:], pd.OnionBlob)
logUpdate.UpdateMsg = htlc logUpdate.UpdateMsg = htlc
// Gather any references for circuits opened by this Add
// HTLC.
if pd.OpenCircuitRef != nil {
openCircuitKeys = append(openCircuitKeys,
*pd.OpenCircuitRef)
}
logUpdates = append(logUpdates, logUpdate)
// Short circuit here since an add should not have any
// of the references gathered in the case of settles,
// fails or malformed fails.
continue
case Settle: case Settle:
logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{ logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{
ChanID: chanID, ChanID: chanID,
@ -2634,6 +2776,19 @@ func (lc *LightningChannel) createCommitDiff(
} }
} }
// Gather the fwd pkg references from any settle or fail
// packets, if they exist.
if pd.SourceRef != nil {
ackAddRefs = append(ackAddRefs, *pd.SourceRef)
}
if pd.DestRef != nil {
settleFailRefs = append(settleFailRefs, *pd.DestRef)
}
if pd.ClosedCircuitRef != nil {
closedCircuitKeys = append(closedCircuitKeys,
*pd.ClosedCircuitRef)
}
logUpdates = append(logUpdates, logUpdate) logUpdates = append(logUpdates, logUpdate)
} }
@ -2651,7 +2806,11 @@ func (lc *LightningChannel) createCommitDiff(
CommitSig: commitSig, CommitSig: commitSig,
HtlcSigs: htlcSigs, HtlcSigs: htlcSigs,
}, },
LogUpdates: logUpdates, LogUpdates: logUpdates,
OpenedCircuitKeys: openCircuitKeys,
ClosedCircuitKeys: closedCircuitKeys,
AddAcks: ackAddRefs,
SettleFailAcks: settleFailRefs,
}, nil }, nil
} }
@ -2832,7 +2991,8 @@ func (lc *LightningChannel) SignNextCommitment() (lnwire.Sig, []lnwire.Sig, erro
// have not received // have not received
// * RevokeAndAck: if we sent a revocation message that they claim to have // * RevokeAndAck: if we sent a revocation message that they claim to have
// not received // not received
func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ([]lnwire.Message, error) { func (lc *LightningChannel) ProcessChanSyncMsg(
msg *lnwire.ChannelReestablish) ([]lnwire.Message, error) {
// We owe them a commitment if they have an un-acked commitment and the // We owe them a commitment if they have an un-acked commitment and the
// tip of their chain (from our Pov) is equal to what they think their // tip of their chain (from our Pov) is equal to what they think their
@ -2850,7 +3010,13 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) (
// chain sync message. If we're de-synchronized, then we'll send a // chain sync message. If we're de-synchronized, then we'll send a
// batch of messages which when applied will kick start the chain // batch of messages which when applied will kick start the chain
// resync. // resync.
var updates []lnwire.Message var (
updates []lnwire.Message
// TODO(conner): uncomment after API exposes these return
// variables, this permits compilation in the meantime
//openedCircuits []channeldb.CircuitKey
//closedCircuits []channeldb.CircuitKey
)
// If the remote party included the optional fields, then we'll verify // If the remote party included the optional fields, then we'll verify
// their correctness first, as it will influence our decisions below. // their correctness first, as it will influence our decisions below.
@ -2974,9 +3140,12 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) (
// commitment chain with our local version of their chain. // commitment chain with our local version of their chain.
updates = append(updates, commitDiff.CommitSig) updates = append(updates, commitDiff.CommitSig)
} else if !oweCommitment && remoteChainTip.height+1 != // TODO(conner): uncomment after API exposes these return
msg.NextLocalCommitHeight { // variables, this permits compilation in the meantime
//openedCircuits = commitDiff.OpenedCircuitKeys
//closedCircuits = commitDiff.ClosedCircuitKeys
} else if remoteChainTip.height+1 != msg.NextLocalCommitHeight {
if err := lc.channelState.MarkBorked(); err != nil { if err := lc.channelState.MarkBorked(); err != nil {
return nil, err return nil, err
} }
@ -3692,7 +3861,9 @@ func (lc *LightningChannel) RevokeCurrentCommitment() (*lnwire.RevokeAndAck, []c
// successful, then the remote commitment chain is advanced by a single // successful, then the remote commitment chain is advanced by a single
// commitment, and a log compaction is attempted. In addition, a slice of // commitment, and a log compaction is attempted. In addition, a slice of
// HTLC's which can be forwarded upstream are returned. // HTLC's which can be forwarded upstream are returned.
func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*PaymentDescriptor, error) { func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) (
[]*PaymentDescriptor, error) {
lc.Lock() lc.Lock()
defer lc.Unlock() defer lc.Unlock()
@ -3728,12 +3899,155 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*P
lc.remoteCommitChain.tail().height, lc.remoteCommitChain.tail().height,
lc.remoteCommitChain.tail().height+1) lc.remoteCommitChain.tail().height+1)
// Add one to the remote tail since this will be height *after* we write
// the revocation to disk, the local height will remain unchanged.
remoteChainTail := lc.remoteCommitChain.tail().height + 1
localChainTail := lc.localCommitChain.tail().height
chanID := lnwire.NewChanIDFromOutPoint(&lc.channelState.FundingOutpoint)
// Determine the set of htlcs that can be forwarded as a result of
// having received the revocation. We will simultaneously construct the
// log updates and payment descriptors, allowing us to persist the log
// updates to disk and optimistically buffer the forwarding package in
// memory.
var (
addsToForward []*PaymentDescriptor
addUpdates []channeldb.LogUpdate
settleFailsToForward []*PaymentDescriptor
settleFailUpdates []channeldb.LogUpdate
)
var addIndex, settleFailIndex uint16
for e := lc.remoteUpdateLog.Front(); e != nil; e = e.Next() {
pd := e.Value.(*PaymentDescriptor)
if pd.isForwarded {
continue
}
uncommitted := (pd.addCommitHeightRemote == 0 ||
pd.addCommitHeightLocal == 0)
if pd.EntryType == Add && uncommitted {
continue
}
// Using the height of the remote and local commitments,
// preemptively compute whether or not to forward this HTLC for
// the case in which this in an Add HTLC, or if this is a
// Settle, Fail, or MalformedFail.
shouldFwdAdd := remoteChainTail == pd.addCommitHeightRemote &&
localChainTail >= pd.addCommitHeightLocal
shouldFwdRmv := remoteChainTail >= pd.removeCommitHeightRemote &&
localChainTail >= pd.removeCommitHeightLocal
// We'll only forward any new HTLC additions iff, it's "freshly
// locked in". Meaning that the HTLC was only *just* considered
// locked-in at this new state. By doing this we ensure that we
// don't re-forward any already processed HTLC's after a
// restart.
switch {
case pd.EntryType == Add && shouldFwdAdd:
// Construct a reference specifying the location that
// this forwarded Add will be written in the forwarding
// package constructed at this remote height.
pd.SourceRef = &channeldb.AddRef{
Height: remoteChainTail,
Index: addIndex,
}
addIndex++
pd.isForwarded = true
addsToForward = append(addsToForward, pd)
case pd.EntryType != Add && shouldFwdRmv:
// Construct a reference specifying the location that
// this forwarded Settle/Fail will be written in the
// forwarding package constructed at this remote height.
pd.DestRef = &channeldb.SettleFailRef{
Source: lc.ShortChanID(),
Height: remoteChainTail,
Index: settleFailIndex,
}
settleFailIndex++
pd.isForwarded = true
settleFailsToForward = append(settleFailsToForward, pd)
default:
continue
}
// If we've reached this point, this HTLC will be added to the
// forwarding package at the height of the remote commitment.
// All types of HTLCs will record their assigned log index.
logUpdate := channeldb.LogUpdate{
LogIndex: pd.LogIndex,
}
// Next, we'll map the type of the PaymentDescriptor to one of
// the four messages that it corresponds to and separate the
// updates into Adds and Settle/Fail/MalformedFail such that
// they can be written in the forwarding package. Adds are
// aggregated separately from the other types of HTLCs.
switch pd.EntryType {
case Add:
htlc := &lnwire.UpdateAddHTLC{
ChanID: chanID,
ID: pd.HtlcIndex,
Amount: pd.Amount,
Expiry: pd.Timeout,
PaymentHash: pd.RHash,
}
copy(htlc.OnionBlob[:], pd.OnionBlob)
logUpdate.UpdateMsg = htlc
addUpdates = append(addUpdates, logUpdate)
case Settle:
logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{
ChanID: chanID,
ID: pd.ParentIndex,
PaymentPreimage: pd.RPreimage,
}
settleFailUpdates = append(settleFailUpdates, logUpdate)
case Fail:
logUpdate.UpdateMsg = &lnwire.UpdateFailHTLC{
ChanID: chanID,
ID: pd.ParentIndex,
Reason: pd.FailReason,
}
settleFailUpdates = append(settleFailUpdates, logUpdate)
case MalformedFail:
logUpdate.UpdateMsg = &lnwire.UpdateFailMalformedHTLC{
ChanID: chanID,
ID: pd.ParentIndex,
ShaOnionBlob: pd.ShaOnionBlob,
FailureCode: pd.FailCode,
}
settleFailUpdates = append(settleFailUpdates, logUpdate)
}
}
source := lc.channelState.ShortChanID
// Now that we have gathered the set of HTLCs to forward, separated by
// type, construct a forwarding package using the height that the remote
// commitment chain will be extended after persisting the revocation.
fwdPkg := channeldb.NewFwdPkg(
source, remoteChainTail, addUpdates, settleFailUpdates,
)
// At this point, the revocation has been accepted, and we've rotated // At this point, the revocation has been accepted, and we've rotated
// the current revocation key+hash for the remote party. Therefore we // the current revocation key+hash for the remote party. Therefore we
// sync now to ensure the revocation producer state is consistent with // sync now to ensure the revocation producer state is consistent with
// the current commitment height and also to advance the on-disk // the current commitment height and also to advance the on-disk
// commitment chain. // commitment chain.
if err := lc.channelState.AdvanceCommitChainTail(); err != nil { err = lc.channelState.AdvanceCommitChainTail(fwdPkg)
if err != nil {
return nil, err return nil, err
} }
@ -3741,59 +4055,37 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*P
// chain, we can advance their chain by a single commitment. // chain, we can advance their chain by a single commitment.
lc.remoteCommitChain.advanceTail() lc.remoteCommitChain.advanceTail()
remoteChainTail := lc.remoteCommitChain.tail().height
localChainTail := lc.localCommitChain.tail().height
// Now that we've verified the revocation update the state of the HTLC
// log as we may be able to prune portions of it now, and update their
// balance.
var htlcsToForward []*PaymentDescriptor
for e := lc.remoteUpdateLog.Front(); e != nil; e = e.Next() {
htlc := e.Value.(*PaymentDescriptor)
if htlc.isForwarded {
continue
}
uncommitted := (htlc.addCommitHeightRemote == 0 ||
htlc.addCommitHeightLocal == 0)
if htlc.EntryType == Add && uncommitted {
continue
}
// We'll only forward any new HTLC additions iff, it's "freshly
// locked in". Meaning that the HTLC was only *just* considered
// locked-in at this new state. By doing this we ensure that we
// don't re-forward any already processed HTLC's after a
// restart.
if htlc.EntryType == Add &&
remoteChainTail == htlc.addCommitHeightRemote &&
localChainTail >= htlc.addCommitHeightLocal {
htlc.isForwarded = true
htlcsToForward = append(htlcsToForward, htlc)
continue
}
if htlc.EntryType != Add &&
remoteChainTail >= htlc.removeCommitHeightRemote &&
localChainTail >= htlc.removeCommitHeightLocal {
htlc.isForwarded = true
htlcsToForward = append(htlcsToForward, htlc)
continue
}
}
// As we've just completed a new state transition, attempt to see if we // As we've just completed a new state transition, attempt to see if we
// can remove any entries from the update log which have been removed // can remove any entries from the update log which have been removed
// from the PoV of both commitment chains. // from the PoV of both commitment chains.
compactLogs(lc.localUpdateLog, lc.remoteUpdateLog, compactLogs(lc.localUpdateLog, lc.remoteUpdateLog,
localChainTail, remoteChainTail) localChainTail, remoteChainTail)
htlcsToForward := append(settleFailsToForward,
addsToForward...)
return htlcsToForward, nil return htlcsToForward, nil
} }
// LoadFwdPkgs loads any pending log updates from disk and returns the payment
// descriptors to be processed by the link.
func (lc *LightningChannel) LoadFwdPkgs() ([]*channeldb.FwdPkg, error) {
return lc.channelState.LoadFwdPkgs()
}
// SetFwdFilter writes the forwarding decision for a given remote commitment
// height.
func (lc *LightningChannel) SetFwdFilter(height uint64,
fwdFilter *channeldb.PkgFilter) error {
return lc.channelState.SetFwdFilter(height, fwdFilter)
}
// RemoveFwdPkg permanently deletes the forwarding package at the given height.
func (lc *LightningChannel) RemoveFwdPkg(height uint64) error {
return lc.channelState.RemoveFwdPkg(height)
}
// NextRevocationKey returns the commitment point for the _next_ commitment // NextRevocationKey returns the commitment point for the _next_ commitment
// height. The pubkey returned by this function is required by the remote party // height. The pubkey returned by this function is required by the remote party
// along with their revocation base to to extend our commitment chain with a // along with their revocation base to to extend our commitment chain with a
@ -3825,6 +4117,7 @@ func (lc *LightningChannel) InitNextRevocation(revKey *btcec.PublicKey) error {
// AddHTLC adds an HTLC to the state machine's local update log. This method // AddHTLC adds an HTLC to the state machine's local update log. This method
// should be called when preparing to send an outgoing HTLC. // should be called when preparing to send an outgoing HTLC.
func (lc *LightningChannel) AddHTLC(htlc *lnwire.UpdateAddHTLC) (uint64, error) { func (lc *LightningChannel) AddHTLC(htlc *lnwire.UpdateAddHTLC) (uint64, error) {
lc.Lock() lc.Lock()
defer lc.Unlock() defer lc.Unlock()
@ -3885,8 +4178,7 @@ func (lc *LightningChannel) ReceiveHTLC(htlc *lnwire.UpdateAddHTLC) (uint64, err
// creating the corresponding wire message. In the case the supplied preimage // creating the corresponding wire message. In the case the supplied preimage
// is invalid, an error is returned. Additionally, the value of the settled // is invalid, an error is returned. Additionally, the value of the settled
// HTLC is also returned. // HTLC is also returned.
func (lc *LightningChannel) SettleHTLC(preimage [32]byte, htlcIndex uint64, func (lc *LightningChannel) SettleHTLC(preimage [32]byte, htlcIndex uint64) error {
) error {
lc.Lock() lc.Lock()
defer lc.Unlock() defer lc.Unlock()
@ -3952,6 +4244,7 @@ func (lc *LightningChannel) ReceiveHTLCSettle(preimage [32]byte, htlcIndex uint6
// update. This method is intended to be called in order to cancel in // update. This method is intended to be called in order to cancel in
// _incoming_ HTLC. // _incoming_ HTLC.
func (lc *LightningChannel) FailHTLC(htlcIndex uint64, reason []byte) error { func (lc *LightningChannel) FailHTLC(htlcIndex uint64, reason []byte) error {
lc.Lock() lc.Lock()
defer lc.Unlock() defer lc.Unlock()
@ -4041,9 +4334,6 @@ func (lc *LightningChannel) ReceiveFailHTLC(htlcIndex uint64, reason []byte,
// created this active channel. This outpoint is used throughout various // created this active channel. This outpoint is used throughout various
// subsystems to uniquely identify an open channel. // subsystems to uniquely identify an open channel.
func (lc *LightningChannel) ChannelPoint() *wire.OutPoint { func (lc *LightningChannel) ChannelPoint() *wire.OutPoint {
lc.RLock()
defer lc.RUnlock()
return &lc.channelState.FundingOutpoint return &lc.channelState.FundingOutpoint
} }
@ -4051,9 +4341,6 @@ func (lc *LightningChannel) ChannelPoint() *wire.OutPoint {
// ID encodes the exact location in the main chain that the original // ID encodes the exact location in the main chain that the original
// funding output can be found. // funding output can be found.
func (lc *LightningChannel) ShortChanID() lnwire.ShortChannelID { func (lc *LightningChannel) ShortChanID() lnwire.ShortChannelID {
lc.RLock()
defer lc.RUnlock()
return lc.channelState.ShortChanID return lc.channelState.ShortChanID
} }
@ -5417,3 +5704,19 @@ func (lc *LightningChannel) ActiveHtlcs() []channeldb.HTLC {
func (lc *LightningChannel) LocalChanReserve() btcutil.Amount { func (lc *LightningChannel) LocalChanReserve() btcutil.Amount {
return lc.localChanCfg.ChanReserve return lc.localChanCfg.ChanReserve
} }
// LocalHtlcIndex returns the next local htlc index to be allocated.
func (lc *LightningChannel) LocalHtlcIndex() uint64 {
lc.RLock()
defer lc.RUnlock()
return lc.channelState.LocalCommitment.LocalHtlcIndex
}
// RemoteCommitHeight returns the commitment height of the remote chain.
func (lc *LightningChannel) RemoteCommitHeight() uint64 {
lc.RLock()
defer lc.RUnlock()
return lc.channelState.RemoteCommitment.CommitHeight
}