htlcswitch: perform fee related checks at forwarding time

In this commit, we fix a very old, lingering bug within the link. When
accepting an HTLC we are meant to validate the fee against the
constraints of the *outgoing* link. This is due to the fact that we're
offering a payment transit service on our outgoing link. Before this
commit, we would use the policies of the *incoming* link. This would at
times lead to odd routing errors as we would go to route, get an error
update and then route again, repeating the process.

With this commit, we'll properly use the incoming link for timelock
related constraints, and the outgoing link for fee related constraints.
We do this by introducing a new HtlcSatisfiesPolicy method in the link.
This method should return a non-nil error if the link can carry the HTLC
as it satisfies its current forwarding policy. We'll use this method now
at *forwarding* time to ensure that we only forward to links that
actually accept the policy. This fixes a number of bugs that existed
before that could result in a link accepting an HTLC that actually
violated its policy. In the case that the policy is violated for *all*
links, we take care to return the error returned by the *target* link so
the caller can update their sending accordingly.

In this commit, we also remove the prior linkControl channel in the
channelLink. Instead, of sending a message to update the internal link
policy, we'll use a mutex in place. This simplifies the code, and also
adds some necessary refactoring in anticipation of the next follow up
commit.
This commit is contained in:
Olaoluwa Osuntokun 2018-04-03 19:51:40 -07:00
parent 9d4cea93f0
commit 7037d55f65
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
3 changed files with 181 additions and 134 deletions

@ -76,6 +76,14 @@ type ChannelLink interface {
// policy to govern if it an incoming HTLC should be forwarded or not. // policy to govern if it an incoming HTLC should be forwarded or not.
UpdateForwardingPolicy(ForwardingPolicy) UpdateForwardingPolicy(ForwardingPolicy)
// HtlcSatifiesPolicy should return a nil error if the passed HTLC
// details satisfy the current forwarding policy fo the target link.
// Otherwise, a valid protocol failure message should be returned in
// order to signal to the source of the HTLC, the policy consistency
// issue.
HtlcSatifiesPolicy(payHash [32]byte,
incomingAmt, amtToForward lnwire.MilliSatoshi) lnwire.FailureMessage
// Bandwidth returns the amount of milli-satoshis which current link // Bandwidth returns the amount of milli-satoshis which current link
// might pass through channel link. The value returned from this method // might pass through channel link. The value returned from this method
// represents the up to date available flow through the channel. This // represents the up to date available flow through the channel. This

@ -79,7 +79,8 @@ type ForwardingPolicy struct {
// //
// TODO(roasbeef): also add in current available channel bandwidth, inverse // TODO(roasbeef): also add in current available channel bandwidth, inverse
// func // func
func ExpectedFee(f ForwardingPolicy, htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi { func ExpectedFee(f ForwardingPolicy,
htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
// TODO(roasbeef): write some basic table driven tests // TODO(roasbeef): write some basic table driven tests
return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
@ -151,10 +152,12 @@ type ChannelLinkConfig struct {
// Sphinx onion blob, and creating onion failure obfuscator. // Sphinx onion blob, and creating onion failure obfuscator.
ExtractErrorEncrypter ErrorEncrypterExtracter ExtractErrorEncrypter ErrorEncrypterExtracter
// GetLastChannelUpdate retrieves the latest routing policy for this // FetchLastChannelUpdate retrieves the latest routing policy for a
// particular channel. This will be used to provide payment senders our // target channel. This channel will typically be the outgoing channel
// latest policy when sending encrypted error messages. // specified when we receive an incoming HTLC. This will be used to
GetLastChannelUpdate func() (*lnwire.ChannelUpdate, error) // provide payment senders our latest policy when sending encrypted
// error messages.
FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
// Peer is a lightning network node with which we have the channel link // Peer is a lightning network node with which we have the channel link
// opened. // opened.
@ -310,11 +313,6 @@ type channelLink struct {
// by the HTLC switch. // by the HTLC switch.
downstream chan *htlcPacket downstream chan *htlcPacket
// linkControl is a channel which is used to query the state of the
// link, or update various policies used which govern if an HTLC is to
// be forwarded and/or accepted.
linkControl chan interface{}
// htlcUpdates is a channel that we'll use to update outside // htlcUpdates is a channel that we'll use to update outside
// sub-systems with the latest set of active HTLC's on our channel. // sub-systems with the latest set of active HTLC's on our channel.
htlcUpdates chan []channeldb.HTLC htlcUpdates chan []channeldb.HTLC
@ -342,7 +340,6 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel,
cfg: cfg, cfg: cfg,
channel: channel, channel: channel,
shortChanID: channel.ShortChanID(), shortChanID: channel.ShortChanID(),
linkControl: make(chan interface{}),
// TODO(roasbeef): just do reserve here? // TODO(roasbeef): just do reserve here?
logCommitTimer: time.NewTimer(300 * time.Millisecond), logCommitTimer: time.NewTimer(300 * time.Millisecond),
overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2), overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2),
@ -920,30 +917,6 @@ out:
case msg := <-l.upstream: case msg := <-l.upstream:
l.handleUpstreamMsg(msg) l.handleUpstreamMsg(msg)
// TODO(roasbeef): make distinct goroutine to handle?
case cmd := <-l.linkControl:
switch req := cmd.(type) {
case *policyUpdate:
// In order to avoid overriding a valid policy
// with a "null" field in the new policy, we'll
// only update to the set sub policy if the new
// value isn't uninitialized.
if req.policy.BaseFee != 0 {
l.cfg.FwrdingPolicy.BaseFee = req.policy.BaseFee
}
if req.policy.FeeRate != 0 {
l.cfg.FwrdingPolicy.FeeRate = req.policy.FeeRate
}
if req.policy.TimeLockDelta != 0 {
l.cfg.FwrdingPolicy.TimeLockDelta = req.policy.TimeLockDelta
}
if req.done != nil {
close(req.done)
}
}
case <-l.quit: case <-l.quit:
break out break out
} }
@ -1502,6 +1475,9 @@ func (l *channelLink) Peer() Peer {
// //
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) ShortChanID() lnwire.ShortChannelID { func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
l.RLock()
defer l.RUnlock()
return l.shortChanID return l.shortChanID
} }
@ -1581,14 +1557,6 @@ func (l *channelLink) AttachMailBox(mailbox MailBox) {
l.Unlock() l.Unlock()
} }
// policyUpdate is a message sent to a channel link when an outside sub-system
// wishes to update the current forwarding policy.
type policyUpdate struct {
policy ForwardingPolicy
done chan struct{}
}
// UpdateForwardingPolicy updates the forwarding policy for the target // UpdateForwardingPolicy updates the forwarding policy for the target
// ChannelLink. Once updated, the link will use the new forwarding policy to // ChannelLink. Once updated, the link will use the new forwarding policy to
// govern if it an incoming HTLC should be forwarded or not. Note that this // govern if it an incoming HTLC should be forwarded or not. Note that this
@ -1598,20 +1566,98 @@ type policyUpdate struct {
// //
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) UpdateForwardingPolicy(newPolicy ForwardingPolicy) { func (l *channelLink) UpdateForwardingPolicy(newPolicy ForwardingPolicy) {
cmd := &policyUpdate{ l.Lock()
policy: newPolicy, defer l.Unlock()
done: make(chan struct{}),
// In order to avoid overriding a valid policy with a "null" field in
// the new policy, we'll only update to the set sub policy if the new
// value isn't uninitialized.
if newPolicy.BaseFee != 0 {
l.cfg.FwrdingPolicy.BaseFee = newPolicy.BaseFee
}
if newPolicy.FeeRate != 0 {
l.cfg.FwrdingPolicy.FeeRate = newPolicy.FeeRate
}
if newPolicy.TimeLockDelta != 0 {
l.cfg.FwrdingPolicy.TimeLockDelta = newPolicy.TimeLockDelta
}
if newPolicy.MinHTLC != 0 {
l.cfg.FwrdingPolicy.MinHTLC = newPolicy.MinHTLC
}
}
// HtlcSatifiesPolicy should return a nil error if the passed HTLC details
// satisfy the current forwarding policy fo the target link. Otherwise, a
// valid protocol failure message should be returned in order to signal to the
// source of the HTLC, the policy consistency issue.
//
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) HtlcSatifiesPolicy(payHash [32]byte,
incomingHtlcAmt, amtToForward lnwire.MilliSatoshi) lnwire.FailureMessage {
l.RLock()
defer l.RUnlock()
// As our first sanity check, we'll ensure that the passed HTLC isn't
// too small for the next hop. If so, then we'll cancel the HTLC
// directly.
if amtToForward < l.cfg.FwrdingPolicy.MinHTLC {
l.errorf("outgoing htlc(%x) is too small: min_htlc=%v, "+
"htlc_value=%v", payHash[:], l.cfg.FwrdingPolicy.MinHTLC,
amtToForward)
// As part of the returned error, we'll send our latest routing
// policy so the sending node obtains the most up to date data.
var failure lnwire.FailureMessage
update, err := l.cfg.FetchLastChannelUpdate(
l.shortChanID,
)
if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil)
} else {
failure = lnwire.NewAmountBelowMinimum(
amtToForward, *update,
)
}
return failure
} }
select { // Next, using the amount of the incoming HTLC, we'll calculate the
case l.linkControl <- cmd: // expected fee this incoming HTLC must carry in order to satisfy the
case <-l.quit: // constraints of the outgoing link.
expectedFee := ExpectedFee(l.cfg.FwrdingPolicy, amtToForward)
// If the actual fee is less than our expected fee, then we'll reject
// this HTLC as it didn't provide a sufficient amount of fees, or the
// values have been tampered with, or the send used incorrect/dated
// information to construct the forwarding information for this hop. In
// any case, we'll cancel this HTLC.
actualFee := incomingHtlcAmt - amtToForward
if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
l.errorf("outgoing htlc(%x) has insufficient "+
"fee: expected %v, got %v", payHash[:],
int64(expectedFee),
int64(actualFee))
// As part of the returned error, we'll send our latest routing
// policy so the sending node obtains the most up to date data.
var failure lnwire.FailureMessage
update, err := l.cfg.FetchLastChannelUpdate(
l.shortChanID,
)
if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil)
} else {
failure = lnwire.NewFeeInsufficient(
amtToForward, *update,
)
}
return failure
} }
select { return nil
case <-cmd.done:
case <-l.quit:
}
} }
// Stats returns the statistics of channel link. // Stats returns the statistics of channel link.
@ -2101,17 +2147,22 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
htlc: addMsg, htlc: addMsg,
obfuscator: obfuscator, obfuscator: obfuscator,
} }
switchPackets = append(switchPackets, switchPackets = append(
updatePacket) switchPackets, updatePacket,
)
continue continue
} }
// We'll consult the forwarding policy for this link
// when checking time locked related constraints.
hopPolicy := l.cfg.FwrdingPolicy
// We want to avoid forwarding an HTLC which will // We want to avoid forwarding an HTLC which will
// expire in the near future, so we'll reject an HTLC // expire in the near future, so we'll reject an HTLC
// if its expiration time is too close to the current // if its expiration time is too close to the current
// height. // height.
timeDelta := l.cfg.FwrdingPolicy.TimeLockDelta timeDelta := hopPolicy.TimeLockDelta
if pd.Timeout-timeDelta <= heightNow { if pd.Timeout-timeDelta <= heightNow {
log.Errorf("htlc(%x) has an expiry "+ log.Errorf("htlc(%x) has an expiry "+
"that's too soon: outgoing_expiry=%v, "+ "that's too soon: outgoing_expiry=%v, "+
@ -2119,7 +2170,9 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
pd.Timeout-timeDelta, heightNow) pd.Timeout-timeDelta, heightNow)
var failure lnwire.FailureMessage var failure lnwire.FailureMessage
update, err := l.cfg.GetLastChannelUpdate() update, err := l.cfg.FetchLastChannelUpdate(
l.shortChanID,
)
if err != nil { if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil) failure = lnwire.NewTemporaryChannelFailure(nil)
} else { } else {
@ -2127,78 +2180,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
} }
l.sendHTLCError( l.sendHTLCError(
pd.HtlcIndex, failure, obfuscator, pd.SourceRef, pd.HtlcIndex, failure, obfuscator,
) pd.SourceRef,
needUpdate = true
continue
}
// As our second sanity check, we'll ensure that the
// passed HTLC isn't too small. If so, then
// we'll cancel the HTLC directly.
if pd.Amount < l.cfg.FwrdingPolicy.MinHTLC {
log.Errorf("Incoming htlc(%x) is too "+
"small: min_htlc=%v, htlc_value=%v",
pd.RHash[:], l.cfg.FwrdingPolicy.MinHTLC,
pd.Amount)
// As part of the returned error, we'll send
// our latest routing policy so the sending
// node obtains the most up to date data.
var failure lnwire.FailureMessage
update, err := l.cfg.GetLastChannelUpdate()
if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil)
} else {
failure = lnwire.NewAmountBelowMinimum(
pd.Amount, *update)
}
l.sendHTLCError(
pd.HtlcIndex, failure, obfuscator, pd.SourceRef,
)
needUpdate = true
continue
}
// Next, using the amount of the incoming HTLC, we'll
// calculate the expected fee this incoming HTLC must
// carry in order to be accepted.
expectedFee := ExpectedFee(
l.cfg.FwrdingPolicy,
fwdInfo.AmountToForward,
)
// If the actual fee is less than our expected
// fee, then we'll reject this HTLC as it didn't
// provide a sufficient amount of fees, or the values
// have been tampered with, or the send used
// incorrect/dated information to construct the
// forwarding information for this hop. In any case,
// we'll cancel this HTLC.
actualFee := pd.Amount - fwdInfo.AmountToForward
if pd.Amount < fwdInfo.AmountToForward ||
actualFee < expectedFee {
log.Errorf("Incoming htlc(%x) has insufficient "+
"fee: expected %v, got %v", pd.RHash[:],
int64(expectedFee),
int64(pd.Amount-fwdInfo.AmountToForward))
// As part of the returned error, we'll send
// our latest routing policy so the sending
// node obtains the most up to date data.
var failure lnwire.FailureMessage
update, err := l.cfg.GetLastChannelUpdate()
if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil)
} else {
failure = lnwire.NewFeeInsufficient(pd.Amount,
*update)
}
l.sendHTLCError(
pd.HtlcIndex, failure, obfuscator, pd.SourceRef,
) )
needUpdate = true needUpdate = true
continue continue
@ -2220,7 +2203,9 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
// Grab the latest routing policy so the // Grab the latest routing policy so the
// sending node is up to date with our current // sending node is up to date with our current
// policy. // policy.
update, err := l.cfg.GetLastChannelUpdate() update, err := l.cfg.FetchLastChannelUpdate(
l.shortChanID,
)
if err != nil { if err != nil {
l.fail("unable to create channel update "+ l.fail("unable to create channel update "+
"while handling the error: %v", err) "while handling the error: %v", err)

@ -893,13 +893,37 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
} }
interfaceLinks, _ := s.getLinks(targetLink.Peer().PubKey()) interfaceLinks, _ := s.getLinks(targetLink.Peer().PubKey())
// We'll keep track of any HTLC failures during the link
// selection process. This way we can return the error for
// precise link that the sender selected, while optimistically
// trying all links to utilize our available bandwidth.
linkErrs := make(map[lnwire.ShortChannelID]lnwire.FailureMessage)
// Try to find destination channel link with appropriate // Try to find destination channel link with appropriate
// bandwidth. // bandwidth.
var destination ChannelLink var destination ChannelLink
for _, link := range interfaceLinks { for _, link := range interfaceLinks {
// We'll skip any links that aren't yet eligible for // We'll skip any links that aren't yet eligible for
// forwarding. // forwarding.
if !link.EligibleToForward() { switch {
case !link.EligibleToForward():
continue
// If the link doesn't yet have a source chan ID, then
// we'll skip it as well.
case link.ShortChanID() == sourceHop:
continue
}
// Before we check the link's bandwidth, we'll ensure
// that the HTLC satisfies the current forwarding
// policy of this target link.
err := link.HtlcSatifiesPolicy(
htlc.PaymentHash, packet.incomingAmount,
packet.amount,
)
if err != nil {
linkErrs[link.ShortChanID()] = err
continue continue
} }
@ -910,10 +934,12 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
} }
} }
switch {
// If the channel link we're attempting to forward the update // If the channel link we're attempting to forward the update
// over has insufficient capacity, then we'll cancel the htlc // over has insufficient capacity, and didn't violate any
// as the payment cannot succeed. // forwarding policies, then we'll cancel the htlc as the
if destination == nil { // payment cannot succeed.
case destination == nil && len(linkErrs) == 0:
// If packet was forwarded from another channel link // If packet was forwarded from another channel link
// than we should notify this link that some error // than we should notify this link that some error
// occurred. // occurred.
@ -923,6 +949,34 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
"%v", htlc.Amount) "%v", htlc.Amount)
return s.failAddPacket(packet, failure, addErr) return s.failAddPacket(packet, failure, addErr)
// If we had a forwarding failure due to the HTLC not
// satisfying the current policy, then we'll send back an
// error, but ensure we send back the error sourced at the
// *target* link.
case destination == nil && len(linkErrs) != 0:
// At this point, some or all of the links rejected the
// HTLC so we couldn't forward it. So we'll try to look
// up the error that came from the source.
linkErr, ok := linkErrs[packet.outgoingChanID]
if !ok {
// If we can't find the error of the source,
// then we'll return an unknown next peer,
// though this should never happen.
linkErr = &lnwire.FailUnknownNextPeer{}
log.Warnf("unable to find err source for "+
"outgoing_link=%v, errors=%v",
packet.outgoingChanID, newLogClosure(func() string {
return spew.Sdump(linkErrs)
}))
}
addErr := fmt.Errorf("incoming HTLC(%x) violated "+
"target outgoing link (id=%v) policy: %v",
htlc.PaymentHash[:], packet.outgoingChanID,
linkErr)
return s.failAddPacket(packet, linkErr, addErr)
} }
// Send the packet to the destination channel link which // Send the packet to the destination channel link which