htlcswitch/switch: reorder persistent calls and app ntfn...

for Settle/Fail responses.
This commit is contained in:
Conner Fromknecht 2018-08-10 13:59:50 -07:00
parent e7c0f4c5dc
commit d6083e0d66
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -9,10 +9,10 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/boltdb/bolt"
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
@ -63,7 +63,6 @@ type pendingPayment struct {
amount lnwire.MilliSatoshi amount lnwire.MilliSatoshi
preimage chan [sha256.Size]byte preimage chan [sha256.Size]byte
response chan *htlcPacket
err chan error err chan error
// deobfuscator is a serializable entity which is used if we received // deobfuscator is a serializable entity which is used if we received
@ -347,8 +346,10 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID,
htlc *lnwire.UpdateAddHTLC, htlc *lnwire.UpdateAddHTLC,
deobfuscator ErrorDecrypter) ([sha256.Size]byte, error) { deobfuscator ErrorDecrypter) ([sha256.Size]byte, error) {
// Verify message by ControlTower implementation. // Before sending, double check that we don't already have 1) an
if err := s.control.CheckSend(htlc); err != nil { // in-flight payment to this payment hash, or 2) a complete payment for
// the same hash.
if err := s.control.ClearForTakeoff(htlc); err != nil {
return zeroPreimage, err return zeroPreimage, err
} }
@ -356,7 +357,6 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID,
// able to retrieve it and return response to the user. // able to retrieve it and return response to the user.
payment := &pendingPayment{ payment := &pendingPayment{
err: make(chan error, 1), err: make(chan error, 1),
response: make(chan *htlcPacket, 1),
preimage: make(chan [sha256.Size]byte, 1), preimage: make(chan [sha256.Size]byte, 1),
paymentHash: htlc.PaymentHash, paymentHash: htlc.PaymentHash,
amount: htlc.Amount, amount: htlc.Amount,
@ -394,7 +394,6 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID,
// Returns channels so that other subsystem might wait/skip the // Returns channels so that other subsystem might wait/skip the
// waiting of handling of payment. // waiting of handling of payment.
var preimage [sha256.Size]byte var preimage [sha256.Size]byte
var response *htlcPacket
select { select {
case e := <-payment.err: case e := <-payment.err:
@ -403,13 +402,6 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID,
return zeroPreimage, ErrSwitchExiting return zeroPreimage, ErrSwitchExiting
} }
select {
case pkt := <-payment.response:
response = pkt
case <-s.quit:
return zeroPreimage, ErrSwitchExiting
}
select { select {
case p := <-payment.preimage: case p := <-payment.preimage:
preimage = p preimage = p
@ -417,24 +409,6 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID,
return zeroPreimage, ErrSwitchExiting return zeroPreimage, ErrSwitchExiting
} }
// Remove circuit since we are about to complete an add/fail of this
// HTLC.
if teardownErr := s.teardownCircuit(response); teardownErr != nil {
log.Warnf("unable to teardown circuit %s: %v",
response.inKey(), teardownErr)
return preimage, err
}
// Finally, if this response is contained in a forwarding package, ack
// the settle/fail so that we don't continue to retransmit the HTLC
// internally.
if response.destRef != nil {
if ackErr := s.ackSettleFail(*response.destRef); ackErr != nil {
log.Warnf("unable to ack settle/fail reference: %s: %v",
*response.destRef, ackErr)
}
}
return preimage, err return preimage, err
} }
@ -782,20 +756,10 @@ func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
// Alice Bob Carol // Alice Bob Carol
// //
func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error {
// Pending payments use a special interpretation of the incomingChanID and
// incomingHTLCID fields on packet where the channel ID is blank and the
// HTLC ID is the payment ID. The switch basically views the users of the
// node as a special channel that also offers a sequence of HTLCs.
payment, err := s.findPayment(pkt.incomingHTLCID)
if err != nil {
return err
}
switch htlc := pkt.htlc.(type) {
// User have created the htlc update therefore we should find the // User have created the htlc update therefore we should find the
// appropriate channel link and send the payment over this link. // appropriate channel link and send the payment over this link.
case *lnwire.UpdateAddHTLC: if htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC); ok {
// Try to find links by node destination.
s.indexMtx.RLock() s.indexMtx.RLock()
link, err := s.getLinkByShortID(pkt.outgoingChanID) link, err := s.getLinkByShortID(pkt.outgoingChanID)
s.indexMtx.RUnlock() s.indexMtx.RUnlock()
@ -839,32 +803,85 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error {
} }
return link.HandleSwitchPacket(pkt) return link.HandleSwitchPacket(pkt)
}
// We've just received a settle update which means we can finalize the // Otherwise this is a response to a payment that we initiated. We'll
// user payment and return successful response. // clean up any fwdpkg references, circuit entries, and mark in our db
// that the payment for this payment hash has either succeeded or
// failed.
//
// If this response is contained in a forwarding package, we'll start by
// acking the settle/fail so that we don't continue to retransmit the
// HTLC internally.
if pkt.destRef != nil {
if err := s.ackSettleFail(*pkt.destRef); err != nil {
log.Warnf("Unable to ack settle/fail reference: %s: %v",
*pkt.destRef, err)
return err
}
}
// Next, we'll remove the circuit since we are about to complete an
// fulfill/fail of this HTLC. Since we've already removed the
// settle/fail fwdpkg reference, the response from the peer cannot be
// replayed internally if this step fails. If this happens, this logic
// will be executed when a provided resolution message comes through.
// This can only happen if the circuit is still open, which is why this
// ordering is chosen.
if err := s.teardownCircuit(pkt); err != nil {
log.Warnf("Unable to teardown circuit %s: %v",
pkt.inKey(), err)
return err
}
// Locate the pending payment to notify the application that this
// payment has failed. If one is not found, it likely means the daemon
// has been restarted since sending the payment.
payment := s.findPayment(pkt.incomingHTLCID)
var (
preimage [32]byte
paymentErr error
)
switch htlc := pkt.htlc.(type) {
// We've received a settle update which means we can finalize the user
// payment and return successful response.
case *lnwire.UpdateFulfillHTLC: case *lnwire.UpdateFulfillHTLC:
// Notify the user that his payment was successfully proceed. // Persistently mark that a payment to this payment hash
payment.err <- nil // succeeded. This will prevent us from ever making another
payment.response <- pkt // payment to this hash.
payment.preimage <- htlc.PaymentPreimage
s.removePendingPayment(pkt.incomingHTLCID)
if err := s.control.Success(pkt.circuit.PaymentHash); err != nil { if err := s.control.Success(pkt.circuit.PaymentHash); err != nil {
return err return err
} }
// We've just received a fail update which means we can finalize the preimage = htlc.PaymentPreimage
// user payment and return fail response.
// We've received a fail update which means we can finalize the user
// payment and return fail response.
case *lnwire.UpdateFailHTLC: case *lnwire.UpdateFailHTLC:
payment.err <- s.parseFailedPayment(payment, pkt, htlc) // Persistently mark that a payment to this payment hash failed.
payment.response <- pkt // This will permit us to make another attempt at a successful
payment.preimage <- zeroPreimage // payment.
s.removePendingPayment(pkt.incomingHTLCID) if err := s.control.Fail(pkt.circuit.PaymentHash); err != nil {
return err
}
paymentErr = s.parseFailedPayment(payment, pkt, htlc)
default: default:
return errors.New("wrong update type") return errors.New("wrong update type")
} }
// Deliver the payment error and preimage to the application, if it is
// waiting for a response.
if payment != nil {
payment.err <- paymentErr
payment.preimage <- preimage
s.removePendingPayment(pkt.incomingHTLCID)
}
return nil return nil
} }
@ -890,7 +907,8 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket,
failureMsg, err := lnwire.DecodeFailure(r, 0) failureMsg, err := lnwire.DecodeFailure(r, 0)
if err != nil { if err != nil {
userErr = fmt.Sprintf("unable to decode onion failure, "+ userErr = fmt.Sprintf("unable to decode onion failure, "+
"htlc with hash(%x): %v", payment.paymentHash[:], err) "htlc with hash(%x): %v",
payment.paymentHash[:], err)
log.Error(userErr) log.Error(userErr)
// As this didn't even clear the link, we don't need to // As this didn't even clear the link, we don't need to
@ -917,10 +935,6 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket,
FailureMessage: lnwire.FailPermanentChannelFailure{}, FailureMessage: lnwire.FailPermanentChannelFailure{},
} }
if err := s.control.Fail(pkt.circuit.PaymentHash); err != nil {
log.Error(err)
}
// A regular multi-hop payment error that we'll need to // A regular multi-hop payment error that we'll need to
// decrypt. // decrypt.
default: default:
@ -937,10 +951,6 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket,
ExtraMsg: userErr, ExtraMsg: userErr,
FailureMessage: lnwire.NewTemporaryChannelFailure(nil), FailureMessage: lnwire.NewTemporaryChannelFailure(nil),
} }
if err := s.control.Fail(pkt.circuit.PaymentHash); err != nil {
log.Error(err)
}
} }
} }
@ -2080,16 +2090,18 @@ func (s *Switch) removePendingPayment(paymentID uint64) error {
} }
// findPayment is the helper function which find the payment. // findPayment is the helper function which find the payment.
func (s *Switch) findPayment(paymentID uint64) (*pendingPayment, error) { func (s *Switch) findPayment(paymentID uint64) *pendingPayment {
s.pendingMutex.RLock() s.pendingMutex.RLock()
defer s.pendingMutex.RUnlock() defer s.pendingMutex.RUnlock()
payment, ok := s.pendingPayments[paymentID] payment, ok := s.pendingPayments[paymentID]
if !ok { if !ok {
return nil, fmt.Errorf("Cannot find pending payment with ID %d", log.Errorf("Cannot find pending payment with ID %d",
paymentID) paymentID)
return nil
} }
return payment, nil
return payment
} }
// CircuitModifier returns a reference to subset of the interfaces provided by // CircuitModifier returns a reference to subset of the interfaces provided by