htlcswitch+lnwallet: replace updateNeeded by check on channel itself

Instead of tracking local updates in a separate link variable, query
this state from the channel itself.

This commit also fixes the issue where the commit tx was not updated
anymore after a failed first attempt because the revocation window was
closed. Also those pending updates will be taken into account when the
remote party revokes.
This commit is contained in:
Joost Jager 2019-04-10 13:10:25 +02:00
parent f59b4d62bf
commit 03b32d046a
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
2 changed files with 41 additions and 47 deletions

@ -742,19 +742,15 @@ func (l *channelLink) resolveFwdPkgs() error {
l.log.Debugf("loaded %d fwd pks", len(fwdPkgs)) l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
var needUpdate bool
for _, fwdPkg := range fwdPkgs { for _, fwdPkg := range fwdPkgs {
hasUpdate, err := l.resolveFwdPkg(fwdPkg) if err := l.resolveFwdPkg(fwdPkg); err != nil {
if err != nil {
return err return err
} }
needUpdate = needUpdate || hasUpdate
} }
// If any of our reprocessing steps require an update to the commitment // If any of our reprocessing steps require an update to the commitment
// txn, we initiate a state transition to capture all relevant changes. // txn, we initiate a state transition to capture all relevant changes.
if needUpdate { if l.channel.PendingLocalUpdateCount() > 0 {
return l.updateCommitTx() return l.updateCommitTx()
} }
@ -764,7 +760,7 @@ func (l *channelLink) resolveFwdPkgs() error {
// resolveFwdPkg interprets the FwdState of the provided package, either // resolveFwdPkg interprets the FwdState of the provided package, either
// reprocesses any outstanding htlcs in the package, or performs garbage // reprocesses any outstanding htlcs in the package, or performs garbage
// collection on the package. // collection on the package.
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) { func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
// Remove any completed packages to clear up space. // Remove any completed packages to clear up space.
if fwdPkg.State == channeldb.FwdStateCompleted { if fwdPkg.State == channeldb.FwdStateCompleted {
l.log.Debugf("removing completed fwd pkg for height=%d", l.log.Debugf("removing completed fwd pkg for height=%d",
@ -774,7 +770,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
if err != nil { if err != nil {
l.log.Errorf("unable to remove fwd pkg for height=%d: "+ l.log.Errorf("unable to remove fwd pkg for height=%d: "+
"%v", fwdPkg.Height, err) "%v", fwdPkg.Height, err)
return false, err return err
} }
} }
@ -793,7 +789,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
if err != nil { if err != nil {
l.log.Errorf("unable to process remote log updates: %v", l.log.Errorf("unable to process remote log updates: %v",
err) err)
return false, err return err
} }
l.processRemoteSettleFails(fwdPkg, settleFails) l.processRemoteSettleFails(fwdPkg, settleFails)
} }
@ -802,7 +798,6 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
// downstream logic is able to filter out any duplicates, but we must // downstream logic is able to filter out any duplicates, but we must
// shove the entire, original set of adds down the pipeline so that the // shove the entire, original set of adds down the pipeline so that the
// batch of adds presented to the sphinx router does not ever change. // batch of adds presented to the sphinx router does not ever change.
var needUpdate bool
if !fwdPkg.AckFilter.IsFull() { if !fwdPkg.AckFilter.IsFull() {
adds, err := lnwallet.PayDescsFromRemoteLogUpdates( adds, err := lnwallet.PayDescsFromRemoteLogUpdates(
fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds, fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds,
@ -810,20 +805,20 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
if err != nil { if err != nil {
l.log.Errorf("unable to process remote log updates: %v", l.log.Errorf("unable to process remote log updates: %v",
err) err)
return false, err return err
} }
needUpdate = l.processRemoteAdds(fwdPkg, adds) l.processRemoteAdds(fwdPkg, adds)
// If the link failed during processing the adds, we must // If the link failed during processing the adds, we must
// return to ensure we won't attempted to update the state // return to ensure we won't attempted to update the state
// further. // further.
if l.failed { if l.failed {
return false, fmt.Errorf("link failed while " + return fmt.Errorf("link failed while " +
"processing remote adds") "processing remote adds")
} }
} }
return needUpdate, nil return nil
} }
// fwdPkgGarbager periodically reads all forwarding packages from disk and // fwdPkgGarbager periodically reads all forwarding packages from disk and
@ -1873,7 +1868,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
} }
l.processRemoteSettleFails(fwdPkg, settleFails) l.processRemoteSettleFails(fwdPkg, settleFails)
needUpdate := l.processRemoteAdds(fwdPkg, adds) l.processRemoteAdds(fwdPkg, adds)
// If the link failed during processing the adds, we must // If the link failed during processing the adds, we must
// return to ensure we won't attempted to update the state // return to ensure we won't attempted to update the state
@ -1882,7 +1877,11 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
return return
} }
if needUpdate { // If there are pending local updates, try to update the commit
// tx. Pending updates could already have been present because
// of a previously failed update to the commit tx or freshly
// added by processRemoteAdds.
if l.channel.PendingLocalUpdateCount() > 0 {
if err := l.updateCommitTx(); err != nil { if err := l.updateCommitTx(); err != nil {
l.fail(LinkFailureError{code: ErrInternalError}, l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err) "unable to update commitment: %v", err)
@ -2532,7 +2531,7 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
// whether we are reprocessing as a result of a failure or restart. Adds that // whether we are reprocessing as a result of a failure or restart. Adds that
// have already been acknowledged in the forwarding package will be ignored. // have already been acknowledged in the forwarding package will be ignored.
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
lockedInHtlcs []*lnwallet.PaymentDescriptor) bool { lockedInHtlcs []*lnwallet.PaymentDescriptor) {
l.log.Tracef("processing %d remote adds for height %d", l.log.Tracef("processing %d remote adds for height %d",
len(lockedInHtlcs), fwdPkg.Height) len(lockedInHtlcs), fwdPkg.Height)
@ -2571,13 +2570,10 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
if sphinxErr != nil { if sphinxErr != nil {
l.fail(LinkFailureError{code: ErrInternalError}, l.fail(LinkFailureError{code: ErrInternalError},
"unable to decode hop iterators: %v", sphinxErr) "unable to decode hop iterators: %v", sphinxErr)
return false return
} }
var ( var switchPackets []*htlcPacket
needUpdate bool
switchPackets []*htlcPacket
)
for i, pd := range lockedInHtlcs { for i, pd := range lockedInHtlcs {
idx := uint16(i) idx := uint16(i)
@ -2614,7 +2610,6 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
// sender. // sender.
l.sendMalformedHTLCError(pd.HtlcIndex, failureCode, l.sendMalformedHTLCError(pd.HtlcIndex, failureCode,
onionBlob[:], pd.SourceRef) onionBlob[:], pd.SourceRef)
needUpdate = true
l.log.Errorf("unable to decode onion hop "+ l.log.Errorf("unable to decode onion hop "+
"iterator: %v", failureCode) "iterator: %v", failureCode)
@ -2633,7 +2628,6 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
l.sendMalformedHTLCError( l.sendMalformedHTLCError(
pd.HtlcIndex, failureCode, onionBlob[:], pd.SourceRef, pd.HtlcIndex, failureCode, onionBlob[:], pd.SourceRef,
) )
needUpdate = true
l.log.Errorf("unable to decode onion "+ l.log.Errorf("unable to decode onion "+
"obfuscator: %v", failureCode) "obfuscator: %v", failureCode)
@ -2664,7 +2658,6 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
lnwire.NewInvalidOnionPayload(failedType, 0), lnwire.NewInvalidOnionPayload(failedType, 0),
obfuscator, pd.SourceRef, obfuscator, pd.SourceRef,
) )
needUpdate = true
l.log.Errorf("unable to decode forwarding "+ l.log.Errorf("unable to decode forwarding "+
"instructions: %v", err) "instructions: %v", err)
@ -2675,7 +2668,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
switch fwdInfo.NextHop { switch fwdInfo.NextHop {
case hop.Exit: case hop.Exit:
updated, err := l.processExitHop( err := l.processExitHop(
pd, obfuscator, fwdInfo, heightNow, pld, pd, obfuscator, fwdInfo, heightNow, pld,
) )
if err != nil { if err != nil {
@ -2683,10 +2676,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
err.Error(), err.Error(),
) )
return false return
}
if updated {
needUpdate = true
} }
// There are additional channels left within this route. So // There are additional channels left within this route. So
@ -2781,7 +2771,6 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
l.sendHTLCError( l.sendHTLCError(
pd.HtlcIndex, failure, obfuscator, pd.SourceRef, pd.HtlcIndex, failure, obfuscator, pd.SourceRef,
) )
needUpdate = true
continue continue
} }
@ -2821,12 +2810,12 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
if err != nil { if err != nil {
l.fail(LinkFailureError{code: ErrInternalError}, l.fail(LinkFailureError{code: ErrInternalError},
"unable to set fwd filter: %v", err) "unable to set fwd filter: %v", err)
return false return
} }
} }
if len(switchPackets) == 0 { if len(switchPackets) == 0 {
return needUpdate return
} }
l.log.Debugf("forwarding %d packets to switch", len(switchPackets)) l.log.Debugf("forwarding %d packets to switch", len(switchPackets))
@ -2837,15 +2826,13 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
// opened circuits, which violates assumptions made by the circuit // opened circuits, which violates assumptions made by the circuit
// trimming. // trimming.
l.forwardBatch(switchPackets...) l.forwardBatch(switchPackets...)
return needUpdate
} }
// processExitHop handles an htlc for which this link is the exit hop. It // processExitHop handles an htlc for which this link is the exit hop. It
// returns a boolean indicating whether the commitment tx needs an update. // returns a boolean indicating whether the commitment tx needs an update.
func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor, func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
obfuscator hop.ErrorEncrypter, fwdInfo hop.ForwardingInfo, obfuscator hop.ErrorEncrypter, fwdInfo hop.ForwardingInfo,
heightNow uint32, payload invoices.Payload) (bool, error) { heightNow uint32, payload invoices.Payload) error {
// If hodl.ExitSettle is requested, we will not validate the final hop's // If hodl.ExitSettle is requested, we will not validate the final hop's
// ADD, nor will we settle the corresponding invoice or respond with the // ADD, nor will we settle the corresponding invoice or respond with the
@ -2853,7 +2840,7 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
if l.cfg.HodlMask.Active(hodl.ExitSettle) { if l.cfg.HodlMask.Active(hodl.ExitSettle) {
l.log.Warnf(hodl.ExitSettle.Warning()) l.log.Warnf(hodl.ExitSettle.Warning())
return false, nil return nil
} }
// As we're the exit hop, we'll double check the hop-payload included in // As we're the exit hop, we'll double check the hop-payload included in
@ -2868,7 +2855,7 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
failure := lnwire.NewFinalIncorrectHtlcAmount(pd.Amount) failure := lnwire.NewFinalIncorrectHtlcAmount(pd.Amount)
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef) l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef)
return true, nil return nil
} }
// We'll also ensure that our time-lock value has been computed // We'll also ensure that our time-lock value has been computed
@ -2881,7 +2868,7 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
failure := lnwire.NewFinalIncorrectCltvExpiry(pd.Timeout) failure := lnwire.NewFinalIncorrectCltvExpiry(pd.Timeout)
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef) l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef)
return true, nil return nil
} }
// Notify the invoiceRegistry of the exit hop htlc. If we crash right // Notify the invoiceRegistry of the exit hop htlc. If we crash right
@ -2906,14 +2893,14 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
failure := lnwire.NewFailIncorrectDetails(pd.Amount, heightNow) failure := lnwire.NewFailIncorrectDetails(pd.Amount, heightNow)
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef) l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef)
return true, nil return nil
// No error. // No error.
case nil: case nil:
// Pass error to caller. // Pass error to caller.
default: default:
return false, err return err
} }
// Create a hodlHtlc struct and decide either resolved now or later. // Create a hodlHtlc struct and decide either resolved now or later.
@ -2926,15 +2913,11 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
// Save payment descriptor for future reference. // Save payment descriptor for future reference.
l.hodlMap[circuitKey] = htlc l.hodlMap[circuitKey] = htlc
return false, nil return nil
} }
// Process the received resolution. // Process the received resolution.
err = l.processHodlEvent(*event, htlc) return l.processHodlEvent(*event, htlc)
if err != nil {
return false, err
}
return true, nil
} }
// settleHTLC settles the HTLC on the channel. // settleHTLC settles the HTLC on the channel.

@ -4225,6 +4225,17 @@ func (lc *LightningChannel) oweCommitment(local bool) bool {
return oweCommitment return oweCommitment
} }
// PendingLocalUpdateCount returns the number of local updates that still need
// to be applied to the remote commitment tx.
func (lc *LightningChannel) PendingLocalUpdateCount() uint64 {
lc.RLock()
defer lc.RUnlock()
lastRemoteCommit := lc.remoteCommitChain.tip()
return lc.localUpdateLog.logIndex - lastRemoteCommit.ourMessageIndex
}
// RevokeCurrentCommitment revokes the next lowest unrevoked commitment // RevokeCurrentCommitment revokes the next lowest unrevoked commitment
// transaction in the local commitment chain. As a result the edge of our // transaction in the local commitment chain. As a result the edge of our
// revocation window is extended by one, and the tail of our local commitment // revocation window is extended by one, and the tail of our local commitment