Merge pull request #1019 from Roasbeef/proper-link-constraints

htlcswitch: properly apply the fee constraints of the *outgoing* link when accepting HTLC's
This commit is contained in:
Olaoluwa Osuntokun 2018-04-06 18:31:13 -07:00 committed by GitHub
commit 5b5491338b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 442 additions and 486 deletions

@ -1097,8 +1097,9 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
for _, chanToUpdate := range edgesToUpdate {
// Re-sign and update the channel on disk and retrieve our
// ChannelUpdate to broadcast.
chanAnn, chanUpdate, err := d.updateChannel(chanToUpdate.info,
chanToUpdate.edge)
chanAnn, chanUpdate, err := d.updateChannel(
chanToUpdate.info, chanToUpdate.edge,
)
if err != nil {
return fmt.Errorf("unable to update channel: %v", err)
}
@ -1131,8 +1132,8 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
// processChanPolicyUpdate generates a new set of channel updates with the new
// channel policy applied for each specified channel identified by its channel
// point. In the case that no channel points are specified, then the update will
// be applied to all channels. Finally, the backing ChannelGraphSource is
// point. In the case that no channel points are specified, then the update
// will be applied to all channels. Finally, the backing ChannelGraphSource is
// updated with the latest information reflecting the applied updates.
//
// TODO(roasbeef): generalize into generic for any channel update
@ -1146,13 +1147,24 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
}
haveChanFilter := len(chansToUpdate) != 0
if haveChanFilter {
log.Infof("Updating routing policies for chan_points=%v",
spew.Sdump(chansToUpdate))
} else {
log.Infof("Updating routing policies for all chans")
}
var chanUpdates []networkMsg
type edgeWithInfo struct {
info *channeldb.ChannelEdgeInfo
edge *channeldb.ChannelEdgePolicy
}
var edgesToUpdate []edgeWithInfo
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo,
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
// If we have a channel filter, and this channel isn't a part
@ -1161,20 +1173,37 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
return nil
}
// Apply the new fee schema to the edge.
// Now that we know we should update this channel, we'll update
// its set of policies.
edge.FeeBaseMSat = policyUpdate.newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
policyUpdate.newSchema.FeeRate,
)
// Apply the new TimeLockDelta.
edge.TimeLockDelta = uint16(policyUpdate.newSchema.TimeLockDelta)
// Re-sign and update the backing ChannelGraphSource, and
// retrieve our ChannelUpdate to broadcast.
_, chanUpdate, err := d.updateChannel(info, edge)
edgesToUpdate = append(edgesToUpdate, edgeWithInfo{
info: info,
edge: edge,
})
return nil
})
if err != nil {
return err
return nil, err
}
// With the set of edges we need to update retrieved, we'll now re-sign
// them, and insert them into the database.
var chanUpdates []networkMsg
for _, edgeInfo := range edgesToUpdate {
// Now that we've collected all the channels we need to update,
// we'll Re-sign and update the backing ChannelGraphSource, and
// retrieve our ChannelUpdate to broadcast.
_, chanUpdate, err := d.updateChannel(
edgeInfo.info, edgeInfo.edge,
)
if err != nil {
return nil, err
}
// We set ourselves as the source of this message to indicate
@ -1183,11 +1212,6 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
peer: d.selfKey,
msg: chanUpdate,
})
return nil
})
if err != nil {
return nil, err
}
return chanUpdates, nil
@ -2127,8 +2151,8 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
var err error
// Make sure timestamp is always increased, such that our update
// gets propagated.
// Make sure timestamp is always increased, such that our update gets
// propagated.
timestamp := time.Now().Unix()
if timestamp <= edge.LastUpdate.Unix() {
timestamp = edge.LastUpdate.Unix() + 1

@ -76,6 +76,14 @@ type ChannelLink interface {
// policy to govern if it an incoming HTLC should be forwarded or not.
UpdateForwardingPolicy(ForwardingPolicy)
// HtlcSatifiesPolicy should return a nil error if the passed HTLC
// details satisfy the current forwarding policy fo the target link.
// Otherwise, a valid protocol failure message should be returned in
// order to signal to the source of the HTLC, the policy consistency
// issue.
HtlcSatifiesPolicy(payHash [32]byte,
incomingAmt, amtToForward lnwire.MilliSatoshi) lnwire.FailureMessage
// Bandwidth returns the amount of milli-satoshis which current link
// might pass through channel link. The value returned from this method
// represents the up to date available flow through the channel. This

@ -79,7 +79,8 @@ type ForwardingPolicy struct {
//
// TODO(roasbeef): also add in current available channel bandwidth, inverse
// func
func ExpectedFee(f ForwardingPolicy, htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
func ExpectedFee(f ForwardingPolicy,
htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
// TODO(roasbeef): write some basic table driven tests
return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
@ -151,10 +152,12 @@ type ChannelLinkConfig struct {
// Sphinx onion blob, and creating onion failure obfuscator.
ExtractErrorEncrypter ErrorEncrypterExtracter
// GetLastChannelUpdate retrieves the latest routing policy for this
// particular channel. This will be used to provide payment senders our
// latest policy when sending encrypted error messages.
GetLastChannelUpdate func() (*lnwire.ChannelUpdate, error)
// FetchLastChannelUpdate retrieves the latest routing policy for a
// target channel. This channel will typically be the outgoing channel
// specified when we receive an incoming HTLC. This will be used to
// provide payment senders our latest policy when sending encrypted
// error messages.
FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
// Peer is a lightning network node with which we have the channel link
// opened.
@ -310,11 +313,6 @@ type channelLink struct {
// by the HTLC switch.
downstream chan *htlcPacket
// linkControl is a channel which is used to query the state of the
// link, or update various policies used which govern if an HTLC is to
// be forwarded and/or accepted.
linkControl chan interface{}
// htlcUpdates is a channel that we'll use to update outside
// sub-systems with the latest set of active HTLC's on our channel.
htlcUpdates chan []channeldb.HTLC
@ -342,7 +340,6 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel,
cfg: cfg,
channel: channel,
shortChanID: channel.ShortChanID(),
linkControl: make(chan interface{}),
// TODO(roasbeef): just do reserve here?
logCommitTimer: time.NewTimer(300 * time.Millisecond),
overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2),
@ -920,30 +917,6 @@ out:
case msg := <-l.upstream:
l.handleUpstreamMsg(msg)
// TODO(roasbeef): make distinct goroutine to handle?
case cmd := <-l.linkControl:
switch req := cmd.(type) {
case *policyUpdate:
// In order to avoid overriding a valid policy
// with a "null" field in the new policy, we'll
// only update to the set sub policy if the new
// value isn't uninitialized.
if req.policy.BaseFee != 0 {
l.cfg.FwrdingPolicy.BaseFee = req.policy.BaseFee
}
if req.policy.FeeRate != 0 {
l.cfg.FwrdingPolicy.FeeRate = req.policy.FeeRate
}
if req.policy.TimeLockDelta != 0 {
l.cfg.FwrdingPolicy.TimeLockDelta = req.policy.TimeLockDelta
}
if req.done != nil {
close(req.done)
}
}
case <-l.quit:
break out
}
@ -1502,6 +1475,9 @@ func (l *channelLink) Peer() Peer {
//
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
l.RLock()
defer l.RUnlock()
return l.shortChanID
}
@ -1581,14 +1557,6 @@ func (l *channelLink) AttachMailBox(mailbox MailBox) {
l.Unlock()
}
// policyUpdate is a message sent to a channel link when an outside sub-system
// wishes to update the current forwarding policy.
type policyUpdate struct {
policy ForwardingPolicy
done chan struct{}
}
// UpdateForwardingPolicy updates the forwarding policy for the target
// ChannelLink. Once updated, the link will use the new forwarding policy to
// govern if it an incoming HTLC should be forwarded or not. Note that this
@ -1598,20 +1566,98 @@ type policyUpdate struct {
//
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) UpdateForwardingPolicy(newPolicy ForwardingPolicy) {
cmd := &policyUpdate{
policy: newPolicy,
done: make(chan struct{}),
l.Lock()
defer l.Unlock()
// In order to avoid overriding a valid policy with a "null" field in
// the new policy, we'll only update to the set sub policy if the new
// value isn't uninitialized.
if newPolicy.BaseFee != 0 {
l.cfg.FwrdingPolicy.BaseFee = newPolicy.BaseFee
}
if newPolicy.FeeRate != 0 {
l.cfg.FwrdingPolicy.FeeRate = newPolicy.FeeRate
}
if newPolicy.TimeLockDelta != 0 {
l.cfg.FwrdingPolicy.TimeLockDelta = newPolicy.TimeLockDelta
}
if newPolicy.MinHTLC != 0 {
l.cfg.FwrdingPolicy.MinHTLC = newPolicy.MinHTLC
}
}
select {
case l.linkControl <- cmd:
case <-l.quit:
// HtlcSatifiesPolicy should return a nil error if the passed HTLC details
// satisfy the current forwarding policy fo the target link. Otherwise, a
// valid protocol failure message should be returned in order to signal to the
// source of the HTLC, the policy consistency issue.
//
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) HtlcSatifiesPolicy(payHash [32]byte,
incomingHtlcAmt, amtToForward lnwire.MilliSatoshi) lnwire.FailureMessage {
l.RLock()
defer l.RUnlock()
// As our first sanity check, we'll ensure that the passed HTLC isn't
// too small for the next hop. If so, then we'll cancel the HTLC
// directly.
if amtToForward < l.cfg.FwrdingPolicy.MinHTLC {
l.errorf("outgoing htlc(%x) is too small: min_htlc=%v, "+
"htlc_value=%v", payHash[:], l.cfg.FwrdingPolicy.MinHTLC,
amtToForward)
// As part of the returned error, we'll send our latest routing
// policy so the sending node obtains the most up to date data.
var failure lnwire.FailureMessage
update, err := l.cfg.FetchLastChannelUpdate(
l.shortChanID,
)
if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil)
} else {
failure = lnwire.NewAmountBelowMinimum(
amtToForward, *update,
)
}
select {
case <-cmd.done:
case <-l.quit:
return failure
}
// Next, using the amount of the incoming HTLC, we'll calculate the
// expected fee this incoming HTLC must carry in order to satisfy the
// constraints of the outgoing link.
expectedFee := ExpectedFee(l.cfg.FwrdingPolicy, amtToForward)
// If the actual fee is less than our expected fee, then we'll reject
// this HTLC as it didn't provide a sufficient amount of fees, or the
// values have been tampered with, or the send used incorrect/dated
// information to construct the forwarding information for this hop. In
// any case, we'll cancel this HTLC.
actualFee := incomingHtlcAmt - amtToForward
if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
l.errorf("outgoing htlc(%x) has insufficient "+
"fee: expected %v, got %v", payHash[:],
int64(expectedFee),
int64(actualFee))
// As part of the returned error, we'll send our latest routing
// policy so the sending node obtains the most up to date data.
var failure lnwire.FailureMessage
update, err := l.cfg.FetchLastChannelUpdate(
l.shortChanID,
)
if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil)
} else {
failure = lnwire.NewFeeInsufficient(
amtToForward, *update,
)
}
return failure
}
return nil
}
// Stats returns the statistics of channel link.
@ -2101,17 +2147,22 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
htlc: addMsg,
obfuscator: obfuscator,
}
switchPackets = append(switchPackets,
updatePacket)
switchPackets = append(
switchPackets, updatePacket,
)
continue
}
// We'll consult the forwarding policy for this link
// when checking time locked related constraints.
hopPolicy := l.cfg.FwrdingPolicy
// We want to avoid forwarding an HTLC which will
// expire in the near future, so we'll reject an HTLC
// if its expiration time is too close to the current
// height.
timeDelta := l.cfg.FwrdingPolicy.TimeLockDelta
timeDelta := hopPolicy.TimeLockDelta
if pd.Timeout-timeDelta <= heightNow {
log.Errorf("htlc(%x) has an expiry "+
"that's too soon: outgoing_expiry=%v, "+
@ -2119,7 +2170,9 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
pd.Timeout-timeDelta, heightNow)
var failure lnwire.FailureMessage
update, err := l.cfg.GetLastChannelUpdate()
update, err := l.cfg.FetchLastChannelUpdate(
l.shortChanID,
)
if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil)
} else {
@ -2127,78 +2180,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
}
l.sendHTLCError(
pd.HtlcIndex, failure, obfuscator, pd.SourceRef,
)
needUpdate = true
continue
}
// As our second sanity check, we'll ensure that the
// passed HTLC isn't too small. If so, then
// we'll cancel the HTLC directly.
if pd.Amount < l.cfg.FwrdingPolicy.MinHTLC {
log.Errorf("Incoming htlc(%x) is too "+
"small: min_htlc=%v, htlc_value=%v",
pd.RHash[:], l.cfg.FwrdingPolicy.MinHTLC,
pd.Amount)
// As part of the returned error, we'll send
// our latest routing policy so the sending
// node obtains the most up to date data.
var failure lnwire.FailureMessage
update, err := l.cfg.GetLastChannelUpdate()
if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil)
} else {
failure = lnwire.NewAmountBelowMinimum(
pd.Amount, *update)
}
l.sendHTLCError(
pd.HtlcIndex, failure, obfuscator, pd.SourceRef,
)
needUpdate = true
continue
}
// Next, using the amount of the incoming HTLC, we'll
// calculate the expected fee this incoming HTLC must
// carry in order to be accepted.
expectedFee := ExpectedFee(
l.cfg.FwrdingPolicy,
fwdInfo.AmountToForward,
)
// If the actual fee is less than our expected
// fee, then we'll reject this HTLC as it didn't
// provide a sufficient amount of fees, or the values
// have been tampered with, or the send used
// incorrect/dated information to construct the
// forwarding information for this hop. In any case,
// we'll cancel this HTLC.
actualFee := pd.Amount - fwdInfo.AmountToForward
if pd.Amount < fwdInfo.AmountToForward ||
actualFee < expectedFee {
log.Errorf("Incoming htlc(%x) has insufficient "+
"fee: expected %v, got %v", pd.RHash[:],
int64(expectedFee),
int64(pd.Amount-fwdInfo.AmountToForward))
// As part of the returned error, we'll send
// our latest routing policy so the sending
// node obtains the most up to date data.
var failure lnwire.FailureMessage
update, err := l.cfg.GetLastChannelUpdate()
if err != nil {
failure = lnwire.NewTemporaryChannelFailure(nil)
} else {
failure = lnwire.NewFeeInsufficient(pd.Amount,
*update)
}
l.sendHTLCError(
pd.HtlcIndex, failure, obfuscator, pd.SourceRef,
pd.HtlcIndex, failure, obfuscator,
pd.SourceRef,
)
needUpdate = true
continue
@ -2220,7 +2203,9 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
// Grab the latest routing policy so the
// sending node is up to date with our current
// policy.
update, err := l.cfg.GetLastChannelUpdate()
update, err := l.cfg.FetchLastChannelUpdate(
l.shortChanID,
)
if err != nil {
l.fail("unable to create channel update "+
"while handling the error: %v", err)

@ -826,7 +826,7 @@ func TestUpdateForwardingPolicy(t *testing.T) {
// update logic
newPolicy := n.globalPolicy
newPolicy.BaseFee = lnwire.NewMSatFromSatoshis(1000)
n.firstBobChannelLink.UpdateForwardingPolicy(newPolicy)
n.secondBobChannelLink.UpdateForwardingPolicy(newPolicy)
// Next, we'll send the payment again, using the exact same per-hop
// payload for each node. This payment should fail as it won't factor
@ -1476,7 +1476,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
ErrorEncrypter, lnwire.FailCode) {
return obfuscator, lnwire.CodeNone
},
GetLastChannelUpdate: mockGetChanUpdateMessage,
FetchLastChannelUpdate: mockGetChanUpdateMessage,
PreimageCache: pCache,
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil

@ -604,6 +604,10 @@ func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) {
func (f *mockChannelLink) UpdateForwardingPolicy(_ ForwardingPolicy) {
}
func (f *mockChannelLink) HtlcSatifiesPolicy([32]byte, lnwire.MilliSatoshi,
lnwire.MilliSatoshi) lnwire.FailureMessage {
return nil
}
func (f *mockChannelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
return 0, 0, 0

@ -168,10 +168,6 @@ type Switch struct {
// forward the settle/fail htlc updates back to the add htlc initiator.
circuits CircuitMap
// links is a map of channel id and channel link which manages
// this channel.
linkIndex map[lnwire.ChannelID]ChannelLink
// mailMtx is a read/write mutex that protects the mailboxes map.
mailMtx sync.RWMutex
@ -179,6 +175,14 @@ type Switch struct {
// switch to buffer messages for peers that have not come back online.
mailboxes map[lnwire.ShortChannelID]MailBox
// indexMtx is a read/write mutex that protects the set of indexes
// below.
indexMtx sync.RWMutex
// links is a map of channel id and channel link which manages
// this channel.
linkIndex map[lnwire.ChannelID]ChannelLink
// forwardingIndex is an index which is consulted by the switch when it
// needs to locate the next hop to forward an incoming/outgoing HTLC
// update to/from.
@ -244,7 +248,6 @@ func New(cfg Config) (*Switch, error) {
htlcPlex: make(chan *plexPacket),
chanCloseRequests: make(chan *ChanClose),
resolutionMsgs: make(chan *resolutionMsg),
linkControl: make(chan interface{}),
quit: make(chan struct{}),
}, nil
}
@ -386,63 +389,47 @@ func (s *Switch) SendHTLC(nextNode [33]byte, htlc *lnwire.UpdateAddHTLC,
func (s *Switch) UpdateForwardingPolicies(newPolicy ForwardingPolicy,
targetChans ...wire.OutPoint) error {
errChan := make(chan error, 1)
select {
case s.linkControl <- &updatePoliciesCmd{
newPolicy: newPolicy,
targetChans: targetChans,
err: errChan,
}:
case <-s.quit:
return fmt.Errorf("switch is shutting down")
}
log.Debugf("Updating link policies: %v", newLogClosure(func() string {
return spew.Sdump(newPolicy)
}))
select {
case err := <-errChan:
return err
case <-s.quit:
return fmt.Errorf("switch is shutting down")
}
}
var linksToUpdate []ChannelLink
// updatePoliciesCmd is a message sent to the switch to update the forwarding
// policies of a set of target links.
type updatePoliciesCmd struct {
newPolicy ForwardingPolicy
targetChans []wire.OutPoint
s.indexMtx.RLock()
err chan error
}
// updateLinkPolicies attempts to update the forwarding policies for the set of
// passed links identified by their channel points. If a nil set of channel
// points is passed, then the forwarding policies for all active links will be
// updated.
func (s *Switch) updateLinkPolicies(c *updatePoliciesCmd) error {
log.Debugf("Updating link policies: %v", spew.Sdump(c))
// If no channels have been targeted, then we'll update the link policies
// for all active channels
if len(c.targetChans) == 0 {
// If no channels have been targeted, then we'll collect all inks to
// update their policies.
if len(targetChans) == 0 {
for _, link := range s.linkIndex {
link.UpdateForwardingPolicy(c.newPolicy)
linksToUpdate = append(linksToUpdate, link)
}
}
// Otherwise, we'll only attempt to update the forwarding policies for the
// set of targeted links.
for _, targetLink := range c.targetChans {
} else {
// Otherwise, we'll only attempt to update the forwarding
// policies for the set of targeted links.
for _, targetLink := range targetChans {
cid := lnwire.NewChanIDFromOutPoint(&targetLink)
// If we can't locate a link by its converted channel ID, then we'll
// return an error back to the caller.
// If we can't locate a link by its converted channel
// ID, then we'll return an error back to the caller.
link, ok := s.linkIndex[cid]
if !ok {
return fmt.Errorf("unable to find ChannelPoint(%v) to "+
"update link policy", targetLink)
s.indexMtx.RUnlock()
return fmt.Errorf("unable to find "+
"ChannelPoint(%v) to update link "+
"policy", targetLink)
}
link.UpdateForwardingPolicy(c.newPolicy)
linksToUpdate = append(linksToUpdate, link)
}
}
s.indexMtx.RUnlock()
// With all the links we need to update collected, we can release the
// mutex then update each link directly.
for _, link := range linksToUpdate {
link.UpdateForwardingPolicy(newPolicy)
}
return nil
@ -715,14 +702,18 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error {
// appropriate channel link and send the payment over this link.
case *lnwire.UpdateAddHTLC:
// Try to find links by node destination.
s.indexMtx.RLock()
links, err := s.getLinks(pkt.destNode)
if err != nil {
s.indexMtx.RUnlock()
log.Errorf("unable to find links by destination %v", err)
return &ForwardingError{
ErrorSource: s.cfg.SelfKey,
FailureMessage: &lnwire.FailUnknownNextPeer{},
}
}
s.indexMtx.RUnlock()
// Try to find destination channel link with appropriate
// bandwidth.
@ -880,8 +871,11 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
return s.handleLocalDispatch(packet)
}
s.indexMtx.RLock()
targetLink, err := s.getLinkByShortID(packet.outgoingChanID)
if err != nil {
s.indexMtx.RUnlock()
// If packet was forwarded from another channel link
// than we should notify this link that some error
// occurred.
@ -892,6 +886,13 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
return s.failAddPacket(packet, failure, addErr)
}
interfaceLinks, _ := s.getLinks(targetLink.Peer().PubKey())
s.indexMtx.RUnlock()
// We'll keep track of any HTLC failures during the link
// selection process. This way we can return the error for
// precise link that the sender selected, while optimistically
// trying all links to utilize our available bandwidth.
linkErrs := make(map[lnwire.ShortChannelID]lnwire.FailureMessage)
// Try to find destination channel link with appropriate
// bandwidth.
@ -899,7 +900,25 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
for _, link := range interfaceLinks {
// We'll skip any links that aren't yet eligible for
// forwarding.
if !link.EligibleToForward() {
switch {
case !link.EligibleToForward():
continue
// If the link doesn't yet have a source chan ID, then
// we'll skip it as well.
case link.ShortChanID() == sourceHop:
continue
}
// Before we check the link's bandwidth, we'll ensure
// that the HTLC satisfies the current forwarding
// policy of this target link.
err := link.HtlcSatifiesPolicy(
htlc.PaymentHash, packet.incomingAmount,
packet.amount,
)
if err != nil {
linkErrs[link.ShortChanID()] = err
continue
}
@ -910,10 +929,12 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
}
}
switch {
// If the channel link we're attempting to forward the update
// over has insufficient capacity, then we'll cancel the htlc
// as the payment cannot succeed.
if destination == nil {
// over has insufficient capacity, and didn't violate any
// forwarding policies, then we'll cancel the htlc as the
// payment cannot succeed.
case destination == nil && len(linkErrs) == 0:
// If packet was forwarded from another channel link
// than we should notify this link that some error
// occurred.
@ -923,6 +944,34 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
"%v", htlc.Amount)
return s.failAddPacket(packet, failure, addErr)
// If we had a forwarding failure due to the HTLC not
// satisfying the current policy, then we'll send back an
// error, but ensure we send back the error sourced at the
// *target* link.
case destination == nil && len(linkErrs) != 0:
// At this point, some or all of the links rejected the
// HTLC so we couldn't forward it. So we'll try to look
// up the error that came from the source.
linkErr, ok := linkErrs[packet.outgoingChanID]
if !ok {
// If we can't find the error of the source,
// then we'll return an unknown next peer,
// though this should never happen.
linkErr = &lnwire.FailUnknownNextPeer{}
log.Warnf("unable to find err source for "+
"outgoing_link=%v, errors=%v",
packet.outgoingChanID, newLogClosure(func() string {
return spew.Sdump(linkErrs)
}))
}
addErr := fmt.Errorf("incoming HTLC(%x) violated "+
"target outgoing link (id=%v) policy: %v",
htlc.PaymentHash[:], packet.outgoingChanID,
linkErr)
return s.failAddPacket(packet, linkErr, addErr)
}
// Send the packet to the destination channel link which
@ -1246,12 +1295,14 @@ func (s *Switch) htlcForwarder() {
// Remove all links once we've been signalled for shutdown.
defer func() {
s.indexMtx.Lock()
for _, link := range s.linkIndex {
if err := s.removeLink(link.ChanID()); err != nil {
log.Errorf("unable to remove "+
"channel link on stop: %v", err)
}
}
s.indexMtx.Unlock()
// Before we exit fully, we'll attempt to flush out any
// forwarding events that may still be lingering since the last
@ -1282,12 +1333,17 @@ func (s *Switch) htlcForwarder() {
// cooperatively closed (if possible).
case req := <-s.chanCloseRequests:
chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint)
s.indexMtx.RLock()
link, ok := s.linkIndex[chanID]
if !ok {
s.indexMtx.RUnlock()
req.Err <- errors.Errorf("no peer for channel with "+
"chan_id=%x", chanID[:])
continue
}
s.indexMtx.RUnlock()
peerPub := link.Peer().PubKey()
log.Debugf("Requesting local channel close: peer=%v, "+
@ -1367,6 +1423,7 @@ func (s *Switch) htlcForwarder() {
// Next, we'll run through all the registered links and
// compute their up-to-date forwarding stats.
s.indexMtx.RLock()
for _, link := range s.linkIndex {
// TODO(roasbeef): when links first registered
// stats printed.
@ -1375,6 +1432,7 @@ func (s *Switch) htlcForwarder() {
newSatSent += sent.ToSatoshis()
newSatRecv += recv.ToSatoshis()
}
s.indexMtx.RUnlock()
var (
diffNumUpdates uint64
@ -1424,28 +1482,6 @@ func (s *Switch) htlcForwarder() {
totalSatSent += diffSatSent
totalSatRecv += diffSatRecv
case req := <-s.linkControl:
switch cmd := req.(type) {
case *updatePoliciesCmd:
cmd.err <- s.updateLinkPolicies(cmd)
case *addLinkCmd:
cmd.err <- s.addLink(cmd.link)
case *removeLinkCmd:
cmd.err <- s.removeLink(cmd.chanID)
case *getLinkCmd:
link, err := s.getLink(cmd.chanID)
cmd.done <- link
cmd.err <- err
case *getLinksCmd:
links, err := s.getLinks(cmd.peer)
cmd.done <- links
cmd.err <- err
case *updateForwardingIndexCmd:
cmd.err <- s.updateShortChanID(
cmd.chanID, cmd.shortChanID,
)
}
case <-s.quit:
return
}
@ -1501,8 +1537,7 @@ func (s *Switch) reforwardResponses() error {
// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short
// channel identifier.
func (s *Switch) loadChannelFwdPkgs(
source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
var fwdPkgs []*channeldb.FwdPkg
if err := s.cfg.DB.Update(func(tx *bolt.Tx) error {
@ -1634,38 +1669,11 @@ func (s *Switch) Stop() error {
return nil
}
// addLinkCmd is a add link command wrapper, it is used to propagate handler
// parameters and return handler error.
type addLinkCmd struct {
link ChannelLink
err chan error
}
// AddLink is used to initiate the handling of the add link command. The
// request will be propagated and handled in the main goroutine.
func (s *Switch) AddLink(link ChannelLink) error {
command := &addLinkCmd{
link: link,
err: make(chan error, 1),
}
select {
case s.linkControl <- command:
select {
case err := <-command.err:
return err
case <-s.quit:
}
case <-s.quit:
}
return errors.New("unable to add link htlc switch was stopped")
}
// addLink is used to add the newly created channel link and start use it to
// handle the channel updates.
func (s *Switch) addLink(link ChannelLink) error {
// TODO(roasbeef): reject if link already tehre?
s.indexMtx.Lock()
defer s.indexMtx.Unlock()
// First we'll add the link to the linkIndex which lets us quickly look
// up a channel when we need to close or register it, and the
@ -1727,47 +1735,12 @@ func (s *Switch) getOrCreateMailBox(chanID lnwire.ShortChannelID) MailBox {
return mailbox
}
// getLinkCmd is a get link command wrapper, it is used to propagate handler
// parameters and return handler error.
type getLinkCmd struct {
chanID lnwire.ChannelID
err chan error
done chan ChannelLink
}
// GetLink is used to initiate the handling of the get link command. The
// request will be propagated/handled to/in the main goroutine.
func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelLink, error) {
command := &getLinkCmd{
chanID: chanID,
err: make(chan error, 1),
done: make(chan ChannelLink, 1),
}
s.indexMtx.RLock()
defer s.indexMtx.RUnlock()
query:
select {
case s.linkControl <- command:
var link ChannelLink
select {
case link = <-command.done:
case <-s.quit:
break query
}
select {
case err := <-command.err:
return link, err
case <-s.quit:
}
case <-s.quit:
}
return nil, errors.New("unable to get link htlc switch was stopped")
}
// getLink attempts to return the link that has the specified channel ID.
func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
link, ok := s.linkIndex[chanID]
if !ok {
return nil, ErrChannelLinkNotFound
@ -1778,6 +1751,8 @@ func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
// getLinkByShortID attempts to return the link which possesses the target
// short channel ID.
//
// NOTE: This MUST be called with the indexMtx held.
func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
link, ok := s.forwardingIndex[chanID]
if !ok {
@ -1787,35 +1762,18 @@ func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, er
return link, nil
}
// removeLinkCmd is a get link command wrapper, it is used to propagate handler
// parameters and return handler error.
type removeLinkCmd struct {
chanID lnwire.ChannelID
err chan error
}
// RemoveLink is used to initiate the handling of the remove link command. The
// request will be propagated/handled to/in the main goroutine.
func (s *Switch) RemoveLink(chanID lnwire.ChannelID) error {
command := &removeLinkCmd{
chanID: chanID,
err: make(chan error, 1),
}
s.indexMtx.Lock()
defer s.indexMtx.Unlock()
select {
case s.linkControl <- command:
select {
case err := <-command.err:
return err
case <-s.quit:
}
case <-s.quit:
}
return errors.New("unable to remove link htlc switch was stopped")
return s.removeLink(chanID)
}
// removeLink is used to remove and stop the channel link.
//
// NOTE: This MUST be called with the indexMtx held.
func (s *Switch) removeLink(chanID lnwire.ChannelID) error {
log.Infof("Removing channel link with ChannelID(%v)", chanID)
@ -1837,50 +1795,21 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) error {
return nil
}
// updateForwardingIndexCmd is a command sent by outside sub-systems to update
// the forwarding index of the switch in the event that the short channel ID of
// a particular link changes.
type updateForwardingIndexCmd struct {
chanID lnwire.ChannelID
shortChanID lnwire.ShortChannelID
err chan error
}
// UpdateShortChanID updates the short chan ID for an existing channel. This is
// required in the case of a re-org and re-confirmation or a channel, or in the
// case that a link was added to the switch before its short chan ID was known.
func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID,
shortChanID lnwire.ShortChannelID) error {
command := &updateForwardingIndexCmd{
chanID: chanID,
shortChanID: shortChanID,
err: make(chan error, 1),
}
select {
case s.linkControl <- command:
select {
case err := <-command.err:
return err
case <-s.quit:
}
case <-s.quit:
}
return errors.New("unable to update short chan id htlc switch was stopped")
}
// updateShortChanID updates the short chan ID of an existing link.
func (s *Switch) updateShortChanID(chanID lnwire.ChannelID,
shortChanID lnwire.ShortChannelID) error {
s.indexMtx.Lock()
// First, we'll extract the current link as is from the link link
// index. If the link isn't even in the index, then we'll return an
// error.
link, ok := s.linkIndex[chanID]
if !ok {
s.indexMtx.Unlock()
return fmt.Errorf("link %v not found", chanID)
}
@ -1891,53 +1820,27 @@ func (s *Switch) updateShortChanID(chanID lnwire.ChannelID,
// forwarding index with the next short channel ID.
s.forwardingIndex[shortChanID] = link
s.indexMtx.Unlock()
// Finally, we'll notify the link of its new short channel ID.
link.UpdateShortChanID(shortChanID)
return nil
}
// getLinksCmd is a get links command wrapper, it is used to propagate handler
// parameters and return handler error.
type getLinksCmd struct {
peer [33]byte
err chan error
done chan []ChannelLink
}
// GetLinksByInterface fetches all the links connected to a particular node
// identified by the serialized compressed form of its public key.
func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelLink, error) {
command := &getLinksCmd{
peer: hop,
err: make(chan error, 1),
done: make(chan []ChannelLink, 1),
}
s.indexMtx.RLock()
defer s.indexMtx.RUnlock()
query:
select {
case s.linkControl <- command:
var links []ChannelLink
select {
case links = <-command.done:
case <-s.quit:
break query
}
select {
case err := <-command.err:
return links, err
case <-s.quit:
}
case <-s.quit:
}
return nil, errors.New("unable to get links htlc switch was stopped")
return s.getLinks(hop)
}
// getLinks is function which returns the channel links of the peer by hop
// destination id.
//
// NOTE: This MUST be called with the indexMtx held.
func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
links, ok := s.interfaceIndex[destination]
if !ok {

@ -116,9 +116,9 @@ func genIDs() (lnwire.ChannelID, lnwire.ChannelID, lnwire.ShortChannelID,
return chanID1, chanID2, aliceChanID, bobChanID
}
// mockGetChanUpdateMessage helper function which returns topology update
// of the channel
func mockGetChanUpdateMessage() (*lnwire.ChannelUpdate, error) {
// mockGetChanUpdateMessage helper function which returns topology update of
// the channel
func mockGetChanUpdateMessage(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
return &lnwire.ChannelUpdate{
Signature: wireSig,
}, nil
@ -902,6 +902,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
aliceTicker := time.NewTicker(50 * time.Millisecond)
aliceChannelLink := NewChannelLink(
ChannelLinkConfig{
Switch: aliceServer.htlcSwitch,
FwrdingPolicy: globalPolicy,
Peer: bobServer,
Circuits: aliceServer.htlcSwitch.CircuitModifier(),
@ -911,7 +912,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
ErrorEncrypter, lnwire.FailCode) {
return obfuscator, lnwire.CodeNone
},
GetLastChannelUpdate: mockGetChanUpdateMessage,
FetchLastChannelUpdate: mockGetChanUpdateMessage,
Registry: aliceServer.registry,
BlockEpochs: aliceEpoch,
FeeEstimator: feeEstimator,
@ -928,7 +929,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
aliceChannel,
startingHeight,
)
if err := aliceServer.htlcSwitch.addLink(aliceChannelLink); err != nil {
if err := aliceServer.htlcSwitch.AddLink(aliceChannelLink); err != nil {
t.Fatalf("unable to add alice channel link: %v", err)
}
go func() {
@ -950,6 +951,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
firstBobTicker := time.NewTicker(50 * time.Millisecond)
firstBobChannelLink := NewChannelLink(
ChannelLinkConfig{
Switch: bobServer.htlcSwitch,
FwrdingPolicy: globalPolicy,
Peer: aliceServer,
Circuits: bobServer.htlcSwitch.CircuitModifier(),
@ -959,7 +961,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
ErrorEncrypter, lnwire.FailCode) {
return obfuscator, lnwire.CodeNone
},
GetLastChannelUpdate: mockGetChanUpdateMessage,
FetchLastChannelUpdate: mockGetChanUpdateMessage,
Registry: bobServer.registry,
BlockEpochs: bobFirstEpoch,
FeeEstimator: feeEstimator,
@ -976,7 +978,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
firstBobChannel,
startingHeight,
)
if err := bobServer.htlcSwitch.addLink(firstBobChannelLink); err != nil {
if err := bobServer.htlcSwitch.AddLink(firstBobChannelLink); err != nil {
t.Fatalf("unable to add first bob channel link: %v", err)
}
go func() {
@ -998,6 +1000,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
secondBobTicker := time.NewTicker(50 * time.Millisecond)
secondBobChannelLink := NewChannelLink(
ChannelLinkConfig{
Switch: bobServer.htlcSwitch,
FwrdingPolicy: globalPolicy,
Peer: carolServer,
Circuits: bobServer.htlcSwitch.CircuitModifier(),
@ -1007,7 +1010,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
ErrorEncrypter, lnwire.FailCode) {
return obfuscator, lnwire.CodeNone
},
GetLastChannelUpdate: mockGetChanUpdateMessage,
FetchLastChannelUpdate: mockGetChanUpdateMessage,
Registry: bobServer.registry,
BlockEpochs: bobSecondEpoch,
FeeEstimator: feeEstimator,
@ -1024,7 +1027,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
secondBobChannel,
startingHeight,
)
if err := bobServer.htlcSwitch.addLink(secondBobChannelLink); err != nil {
if err := bobServer.htlcSwitch.AddLink(secondBobChannelLink); err != nil {
t.Fatalf("unable to add second bob channel link: %v", err)
}
go func() {
@ -1046,6 +1049,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
carolTicker := time.NewTicker(50 * time.Millisecond)
carolChannelLink := NewChannelLink(
ChannelLinkConfig{
Switch: carolServer.htlcSwitch,
FwrdingPolicy: globalPolicy,
Peer: bobServer,
Circuits: carolServer.htlcSwitch.CircuitModifier(),
@ -1055,7 +1059,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
ErrorEncrypter, lnwire.FailCode) {
return obfuscator, lnwire.CodeNone
},
GetLastChannelUpdate: mockGetChanUpdateMessage,
FetchLastChannelUpdate: mockGetChanUpdateMessage,
Registry: carolServer.registry,
BlockEpochs: carolEpoch,
FeeEstimator: feeEstimator,
@ -1072,7 +1076,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
carolChannel,
startingHeight,
)
if err := carolServer.htlcSwitch.addLink(carolChannelLink); err != nil {
if err := carolServer.htlcSwitch.AddLink(carolChannelLink); err != nil {
t.Fatalf("unable to add carol channel link: %v", err)
}
go func() {

@ -479,8 +479,8 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background()
// Launch notification clients for all nodes, such that we can
// get notified when they discover new channels and updates
// in the graph.
// get notified when they discover new channels and updates in the
// graph.
aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxb, net.Alice)
defer close(aQuit)
bobUpdates, bQuit := subscribeGraphNotifications(t, ctxb, net.Bob)
@ -530,8 +530,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("carol didn't report channel: %v", err)
}
// Update the fees for the channel Alice->Bob, and make sure
// all nodes learn about it.
// With our little cluster set up, we'll update the fees for the
// channel Bob side of the Alice->Bob channel, and make sure all nodes
// learn about it.
const feeBase = 1000000
baseFee := int64(1500)
feeRate := int64(12)
@ -546,7 +547,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
ChanPoint: chanPoint,
}
_, err = net.Alice.UpdateChannelPolicy(ctxb, req)
_, err = net.Bob.UpdateChannelPolicy(ctxb, req)
if err != nil {
t.Fatalf("unable to get alice's balance: %v", err)
}
@ -572,7 +573,8 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// A closure that is used to wait for a channel updates that matches
// the channel policy update done by Alice.
waitForChannelUpdate := func(graphUpdates chan *lnrpc.GraphTopologyUpdate,
chanPoints ...*lnrpc.ChannelPoint) {
advertisingNode string, chanPoints ...*lnrpc.ChannelPoint) {
// Create a map containing all the channel points we are
// waiting for updates for.
cps := make(map[string]bool)
@ -592,7 +594,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
continue
}
if chanUpdate.AdvertisingNode != net.Alice.PubKeyStr {
if chanUpdate.AdvertisingNode != advertisingNode {
continue
}
@ -623,16 +625,17 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
}
}
// Wait for all nodes to have seen the policy update done by Alice.
waitForChannelUpdate(aliceUpdates, chanPoint)
waitForChannelUpdate(bobUpdates, chanPoint)
waitForChannelUpdate(carolUpdates, chanPoint)
// Wait for all nodes to have seen the policy update done by Bob.
waitForChannelUpdate(aliceUpdates, net.Bob.PubKeyStr, chanPoint)
waitForChannelUpdate(bobUpdates, net.Bob.PubKeyStr, chanPoint)
waitForChannelUpdate(carolUpdates, net.Bob.PubKeyStr, chanPoint)
// assertChannelPolicy asserts that the passed node's known channel
// policy for the passed chanPoint is consistent with Alice's current
// policy for the passed chanPoint is consistent with Bob's current
// expected policy values.
assertChannelPolicy := func(node *lntest.HarnessNode,
chanPoint *lnrpc.ChannelPoint) {
advertisingNode string, chanPoint *lnrpc.ChannelPoint) {
// Get a DescribeGraph from the node.
descReq := &lnrpc.ChannelGraphRequest{}
chanGraph, err := node.DescribeGraph(ctxb, descReq)
@ -645,7 +648,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
for _, e := range chanGraph.Edges {
if e.ChanPoint == txStr(chanPoint) {
edgeFound = true
if e.Node1Pub == net.Alice.PubKeyStr {
if e.Node1Pub == advertisingNode {
if e.Node1Policy.FeeBaseMsat != baseFee {
t.Fatalf("expected base fee "+
"%v, got %v", baseFee,
@ -689,18 +692,42 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
}
// Check that all nodes now know about Alice's updated policy.
assertChannelPolicy(net.Alice, chanPoint)
assertChannelPolicy(net.Bob, chanPoint)
assertChannelPolicy(carol, chanPoint)
// Check that all nodes now know about Bob's updated policy.
assertChannelPolicy(net.Alice, net.Bob.PubKeyStr, chanPoint)
assertChannelPolicy(net.Bob, net.Bob.PubKeyStr, chanPoint)
assertChannelPolicy(carol, net.Bob.PubKeyStr, chanPoint)
// Open channel to Carol.
// Now that all nodes have received the new channel update, we'll try
// to send a payment from Alice to Carol to ensure that Alice has
// internalized this fee update. This shouldn't affect the route that
// Alice takes though: we updated the Alice -> Bob channel and she
// doesn't pay for transit over that channel as it's direct.
payAmt := lnwire.MilliSatoshi(2000)
invoice := &lnrpc.Invoice{
Memo: "testing",
Value: int64(payAmt),
}
resp, err := carol.AddInvoice(ctxb, invoice)
if err != nil {
t.Fatalf("unable to add invoice: %v", err)
}
ctxt, _ = context.WithTimeout(ctxb, timeout)
err = completePaymentRequests(
ctxt, net.Alice, []string{resp.PaymentRequest}, true,
)
if err != nil {
t.Fatalf("unable to send payment: %v", err)
}
// We'll now open a channel from Alice directly to Carol.
if err := net.ConnectNodes(ctxb, net.Alice, carol); err != nil {
t.Fatalf("unable to connect dave to alice: %v", err)
}
ctxt, _ = context.WithTimeout(ctxb, timeout)
chanPoint3 := openChannelAndAssert(ctxt, t, net, net.Alice, carol,
chanAmt, pushAmt)
chanPoint3 := openChannelAndAssert(
ctxt, t, net, net.Alice, carol, chanAmt, pushAmt,
)
ctxt, _ = context.WithTimeout(ctxb, time.Second*15)
err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint3)
@ -712,8 +739,8 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("bob didn't report channel: %v", err)
}
// Make a global update, and check that both channels'
// new policies get propagated.
// Make a global update, and check that both channels' new policies get
// propagated.
baseFee = int64(800)
feeRate = int64(123)
timeLockDelta = uint32(22)
@ -730,21 +757,21 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to get alice's balance: %v", err)
}
// Wait for all nodes to have seen the policy updates
// for both of Alice's channels.
waitForChannelUpdate(aliceUpdates, chanPoint, chanPoint3)
waitForChannelUpdate(bobUpdates, chanPoint, chanPoint3)
waitForChannelUpdate(carolUpdates, chanPoint, chanPoint3)
// Wait for all nodes to have seen the policy updates for both of
// Alice's channels.
waitForChannelUpdate(aliceUpdates, net.Alice.PubKeyStr, chanPoint3)
waitForChannelUpdate(bobUpdates, net.Alice.PubKeyStr, chanPoint3)
waitForChannelUpdate(carolUpdates, net.Alice.PubKeyStr, chanPoint3)
// And finally check that all nodes remembers the policy
// update they received.
assertChannelPolicy(net.Alice, chanPoint)
assertChannelPolicy(net.Bob, chanPoint)
assertChannelPolicy(carol, chanPoint)
// And finally check that all nodes remembers the policy update they
// received.
assertChannelPolicy(net.Alice, net.Alice.PubKeyStr, chanPoint)
assertChannelPolicy(net.Bob, net.Alice.PubKeyStr, chanPoint)
assertChannelPolicy(carol, net.Alice.PubKeyStr, chanPoint)
assertChannelPolicy(net.Alice, chanPoint3)
assertChannelPolicy(net.Bob, chanPoint3)
assertChannelPolicy(carol, chanPoint3)
assertChannelPolicy(net.Alice, net.Alice.PubKeyStr, chanPoint3)
assertChannelPolicy(net.Bob, net.Alice.PubKeyStr, chanPoint3)
assertChannelPolicy(carol, net.Alice.PubKeyStr, chanPoint3)
// Close the channels.
ctxt, _ = context.WithTimeout(ctxb, timeout)

@ -36,7 +36,7 @@ func NewShortChanIDFromInt(chanID uint64) ShortChannelID {
// ToUint64 converts the ShortChannelID into a compact format encoded within a
// uint64 (8 bytes).
func (c *ShortChannelID) ToUint64() uint64 {
func (c ShortChannelID) ToUint64() uint64 {
// TODO(roasbeef): explicit error on overflow?
return ((uint64(c.BlockHeight) << 40) | (uint64(c.TxIndex) << 16) |
(uint64(c.TxPosition)))

36
peer.go

@ -413,8 +413,9 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
Peer: p,
DecodeHopIterators: p.server.sphinx.DecodeHopIterators,
ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter,
GetLastChannelUpdate: createGetLastUpdate(p.server.chanRouter,
p.PubKey(), lnChan.ShortChanID()),
FetchLastChannelUpdate: fetchLastChanUpdate(
p.server.chanRouter, p.PubKey(),
),
DebugHTLC: cfg.DebugHTLC,
HodlHTLC: cfg.HodlHTLC,
Registry: p.server.invoices,
@ -1389,8 +1390,9 @@ out:
Peer: p,
DecodeHopIterators: p.server.sphinx.DecodeHopIterators,
ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter,
GetLastChannelUpdate: createGetLastUpdate(p.server.chanRouter,
p.PubKey(), newChanReq.channel.ShortChanID()),
FetchLastChannelUpdate: fetchLastChanUpdate(
p.server.chanRouter, p.PubKey(),
),
DebugHTLC: cfg.DebugHTLC,
HodlHTLC: cfg.HodlHTLC,
Registry: p.server.invoices,
@ -1906,21 +1908,20 @@ func (p *peer) PubKey() [33]byte {
// TODO(roasbeef): make all start/stop mutexes a CAS
// createGetLastUpdate returns the handler which serve as a source of the last
// update of the channel in a form of lnwire update message.
func createGetLastUpdate(router *routing.ChannelRouter,
pubKey [33]byte, chanID lnwire.ShortChannelID) func() (*lnwire.ChannelUpdate,
error) {
// fetchLastChanUpdate returns a function which is able to retrieve the last
// channel update for a target channel.
func fetchLastChanUpdate(router *routing.ChannelRouter,
pubKey [33]byte) func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
return func() (*lnwire.ChannelUpdate, error) {
info, edge1, edge2, err := router.GetChannelByID(chanID)
return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
info, edge1, edge2, err := router.GetChannelByID(cid)
if err != nil {
return nil, err
}
if edge1 == nil || edge2 == nil {
return nil, errors.Errorf("unable to find "+
"channel by ShortChannelID(%v)", chanID)
"channel by ShortChannelID(%v)", cid)
}
// If we're the outgoing node on the first edge, then that
@ -1933,7 +1934,7 @@ func createGetLastUpdate(router *routing.ChannelRouter,
local = edge1
}
update := &lnwire.ChannelUpdate{
update := lnwire.ChannelUpdate{
ChainHash: info.ChainHash,
ShortChannelID: lnwire.NewShortChanIDFromInt(local.ChannelID),
Timestamp: uint32(local.LastUpdate.Unix()),
@ -1948,9 +1949,12 @@ func createGetLastUpdate(router *routing.ChannelRouter,
return nil, err
}
hswcLog.Debugf("Sending latest channel_update: %v",
spew.Sdump(update))
hswcLog.Tracef("Sending latest channel_update: %v",
newLogClosure(func() string {
return spew.Sdump(update)
}),
)
return update, nil
return &update, nil
}
}

@ -1724,12 +1724,12 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *Route
// We'll now check to see if we've already
// reported a fee related failure for this
// node. If so, then we'll actually prune out
// the edge for now.
// the vertex for now.
chanID := update.ShortChannelID
_, ok := errFailedFeeChans[chanID]
if ok {
pruneEdgeFailure(
paySession, route, errSource,
pruneVertexFailure(
paySession, route, errSource, false,
)
continue
}

@ -297,7 +297,7 @@ func TestSendPaymentErrorRepeatedFeeInsufficient(t *testing.T) {
// to luo ji for 100 satoshis.
var payHash [32]byte
payment := LightningPayment{
Target: ctx.aliases["luoji"],
Target: ctx.aliases["sophon"],
Amount: lnwire.NewMSatFromSatoshis(1000),
PaymentHash: payHash,
}
@ -306,9 +306,9 @@ func TestSendPaymentErrorRepeatedFeeInsufficient(t *testing.T) {
copy(preImage[:], bytes.Repeat([]byte{9}, 32))
// We'll also fetch the first outgoing channel edge from roasbeef to
// luo ji. We'll obtain this as we'll need to to generate the
// son goku. We'll obtain this as we'll need to to generate the
// FeeInsufficient error that we'll send back.
chanID := uint64(689530843)
chanID := uint64(3495345)
_, _, edgeUpateToFail, err := ctx.graph.FetchChannelEdgesByID(chanID)
if err != nil {
t.Fatalf("unable to fetch chan id: %v", err)
@ -324,24 +324,21 @@ func TestSendPaymentErrorRepeatedFeeInsufficient(t *testing.T) {
FeeRate: uint32(edgeUpateToFail.FeeProportionalMillionths),
}
sourceNode := ctx.router.selfNode
// The error will be returned by Son Goku.
sourceNode := ctx.aliases["songoku"]
// We'll now modify the SendToSwitch method to return an error for the
// outgoing channel to luo ji. This will be a fee related error, so it
// should only cause the edge to be pruned after the second attempt.
// outgoing channel to Son goku. This will be a fee related error, so
// it should only cause the edge to be pruned after the second attempt.
ctx.router.cfg.SendToSwitch = func(n [33]byte,
_ *lnwire.UpdateAddHTLC, _ *sphinx.Circuit) ([32]byte, error) {
if bytes.Equal(ctx.aliases["luoji"].SerializeCompressed(), n[:]) {
pub, err := sourceNode.PubKey()
if err != nil {
return preImage, err
}
if bytes.Equal(sourceNode.SerializeCompressed(), n[:]) {
return [32]byte{}, &htlcswitch.ForwardingError{
ErrorSource: pub,
ErrorSource: sourceNode,
// Within our error, we'll add a channel update
// which is meant to refelct he new fee
// which is meant to reflect he new fee
// schedule for the node/channel.
FailureMessage: &lnwire.FailFeeInsufficient{
Update: errChanUpdate,
@ -371,8 +368,8 @@ func TestSendPaymentErrorRepeatedFeeInsufficient(t *testing.T) {
preImage[:], paymentPreImage[:])
}
// The route should have satoshi as the first hop.
if route.Hops[0].Channel.Node.Alias != "satoshi" {
// The route should have pham nuwen as the first hop.
if route.Hops[0].Channel.Node.Alias != "phamnuwen" {
t.Fatalf("route should go through satoshi as first hop, "+
"instead passes through: %v",
route.Hops[0].Channel.Node.Alias)

@ -3331,7 +3331,7 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
TimeLockDelta: req.TimeLockDelta,
}
rpcsLog.Tracef("[updatechanpolicy] updating channel policy base_fee=%v, "+
rpcsLog.Debugf("[updatechanpolicy] updating channel policy base_fee=%v, "+
"rate_float=%v, rate_fixed=%v, time_lock_delta: %v, targets=%v",
req.BaseFeeMsat, req.FeeRate, feeRateFixed, req.TimeLockDelta,
spew.Sdump(targetChans))