diff --git a/htlcswitch/link.go b/htlcswitch/link.go index f0315b13..d9433542 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -92,6 +92,8 @@ type ChannelLinkConfig struct { // Switch is a subsystem which is used to forward the incoming HTLC // packets according to the encoded hop forwarding information // contained in the forwarding blob within each HTLC. + // + // TODO(roasbeef): remove in favor of simple ForwardPacket closure func Switch *Switch // DecodeHopIterator function is responsible for decoding HTLC Sphinx @@ -194,6 +196,12 @@ type channelLink struct { // been processed because of the commitment transaction overflow. overflowQueue *packetQueue + // availableBandwidth is an integer with units of millisatoshi which + // indicates the total available bandwidth of a link, taking into + // account any pending (uncommitted) HLTC's, and any HTLC's that are + // within the overflow queue. + availableBandwidth uint64 + // upstream is a channel that new messages sent from the remote peer to // the local peer will be sent across. upstream chan lnwire.Message @@ -237,6 +245,8 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, overflowQueue: newPacketQueue(), bestHeight: currentHeight, quit: make(chan struct{}), + // TODO(roasbeef): just do reserve here? + availableBandwidth: uint64(channel.StateSnapshot().LocalBalance), } } @@ -276,7 +286,6 @@ func (l *channelLink) Stop() { log.Infof("ChannelLink(%v) is stopping", l) - // TODO(roasbeef): need to stop channel? l.channel.Stop() l.overflowQueue.Stop() @@ -301,7 +310,7 @@ func (l *channelLink) htlcManager() { defer l.wg.Done() log.Infof("HTLC manager for ChannelPoint(%v) started, "+ - "bandwidth=%v", l.channel.ChannelPoint(), l.getBandwidth()) + "bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth()) // TODO(roasbeef): check to see if able to settle any currently pending // HTLCs @@ -402,7 +411,7 @@ out: log.Tracef("Reprocessing downstream add update "+ "with payment hash(%x)", msg.PaymentHash[:]) - l.handleDownStreamPkt(packet) + l.handleDownStreamPkt(packet, true) // A message from the switch was just received. This indicates // that the link is an intermediate hop in a multi-hop HTLC @@ -416,14 +425,20 @@ out: if ok && l.overflowQueue.Length() != 0 { log.Infof("Downstream htlc add update with "+ "payment hash(%x) have been added to "+ - "reprocessing queue, batch: %v", + "reprocessing queue, batch_size=%v", htlc.PaymentHash[:], l.batchCounter) + // As we're adding a new pkt to the overflow + // queue, decrement the available bandwidth. + atomic.AddUint64( + &l.availableBandwidth, + -uint64(htlc.Amount), + ) l.overflowQueue.AddPkt(pkt) continue } - l.handleDownStreamPkt(pkt) + l.handleDownStreamPkt(pkt, false) // A message from the connected peer was just received. This // indicates that we have a new incoming HTLC, either directly @@ -433,8 +448,6 @@ out: case cmd := <-l.linkControl: switch req := cmd.(type) { - case *getBandwidthCmd: - req.resp <- l.getBandwidth() case *policyUpdate: // In order to avoid overriding a valid policy // with a "null" field in the new policy, we'll @@ -470,7 +483,7 @@ out: // Switch. Possible messages sent by the switch include requests to forward new // HTLCs, timeout previously cleared HTLCs, and finally to settle currently // cleared HTLCs with the upstream peer. -func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { +func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { var isSettle bool switch htlc := pkt.htlc.(type) { case *lnwire.UpdateAddHTLC: @@ -484,12 +497,25 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { // The channels spare bandwidth is fully allocated, so // we'll put this HTLC into the overflow queue. + case lnwallet.ErrInsufficientBalance: + fallthrough case lnwallet.ErrMaxHTLCNumber: log.Infof("Downstream htlc add update with "+ "payment hash(%x) have been added to "+ "reprocessing queue, batch: %v", htlc.PaymentHash[:], l.batchCounter) + + // If we're processing this HTLC for the first + // time, then we'll decrement the available + // bandwidth. As otherwise, we'd double count + // the effect of the HTLC. + if !isReProcess { + atomic.AddUint64( + &l.availableBandwidth, -uint64(htlc.Amount), + ) + } + l.overflowQueue.AddPkt(pkt) return @@ -548,6 +574,14 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { } } + // If we're processing this HTLC for the first time, then we'll + // decrement the available bandwidth. + if !isReProcess { + // Subtract the available bandwidth as we have a new + // HTLC in limbo. + atomic.AddUint64(&l.availableBandwidth, -uint64(htlc.Amount)) + } + log.Tracef("Received downstream htlc: payment_hash=%x, "+ "local_log_index=%v, batch_size=%v", htlc.PaymentHash[:], index, l.batchCounter+1) @@ -560,13 +594,17 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { // upstream. Therefore we settle the HTLC within the our local // state machine. pre := htlc.PaymentPreimage - logIndex, err := l.channel.SettleHTLC(pre) + logIndex, amt, err := l.channel.SettleHTLC(pre) if err != nil { // TODO(roasbeef): broadcast on-chain l.fail("unable to settle incoming HTLC: %v", err) return } + // Increment the available bandwidth as we've settled an HTLC + // extended by tbe remote party. + atomic.AddUint64(&l.availableBandwidth, uint64(amt)) + // With the HTLC settled, we'll need to populate the wire // message to target the specific channel and HTLC to be // cancelled. @@ -651,11 +689,16 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // have sent to it, than we should transform the malformed HTLC // message to the usual HTLC fail message. idx := msg.ID - if err := l.channel.ReceiveFailHTLC(idx); err != nil { + amt, err := l.channel.ReceiveFailHTLC(idx) + if err != nil { l.fail("unable to handle upstream fail HTLC: %v", err) return } + // Increment the available bandwidth as they've removed our + // HTLC. + atomic.AddUint64(&l.availableBandwidth, uint64(amt)) + // Convert the failure type encoded within the HTLC fail // message to the proper generic lnwire error code. var failure lnwire.FailureMessage @@ -692,11 +735,16 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { case *lnwire.UpdateFailHTLC: idx := msg.ID - if err := l.channel.ReceiveFailHTLC(idx); err != nil { + amt, err := l.channel.ReceiveFailHTLC(idx) + if err != nil { l.fail("unable to handle upstream fail HTLC: %v", err) return } + // Increment the available bandwidth as they've removed our + // HTLC. + atomic.AddUint64(&l.availableBandwidth, uint64(amt)) + l.cancelReasons[idx] = msg.Reason case *lnwire.CommitSig: @@ -854,32 +902,15 @@ type getBandwidthCmd struct { resp chan lnwire.MilliSatoshi } -// Bandwidth returns the amount which current link might pass through channel -// link. Execution through control channel gives as confidence that bandwidth -// will not be changed during function execution. +// Bandwidth returns the total amount that can flow through the channel link at +// this given instance. The value returned is expressed in millatoshi and +// can be used by callers when making forwarding decisions to determine if a +// link can accept an HTLC. // // NOTE: Part of the ChannelLink interface. func (l *channelLink) Bandwidth() lnwire.MilliSatoshi { - command := &getBandwidthCmd{ - resp: make(chan lnwire.MilliSatoshi, 1), - } - - select { - case l.linkControl <- command: - return <-command.resp - case <-l.quit: - return 0 - } -} - -// getBandwidth returns the amount which current link might pass through -// channel link. -// -// NOTE: Should be used inside main goroutine only, otherwise the result might -// not be accurate. -func (l *channelLink) getBandwidth() lnwire.MilliSatoshi { - // TODO(roasbeef): factor in reserve, just grab mutex - return l.channel.LocalAvailableBalance() - l.overflowQueue.PendingAmount() + // TODO(roasbeef): subtract reserverj + return lnwire.MilliSatoshi(atomic.LoadUint64(&l.availableBandwidth)) } // policyUpdate is a message sent to a channel link when an outside sub-system @@ -1195,12 +1226,19 @@ func (l *channelLink) processLockedInHtlcs( } preimage := invoice.Terms.PaymentPreimage - logIndex, err := l.channel.SettleHTLC(preimage) + logIndex, amt, err := l.channel.SettleHTLC(preimage) if err != nil { l.fail("unable to settle htlc: %v", err) return nil } + // Increment the available bandwidth as we've + // settled an HTLC extended by tbe remote + // party. + atomic.AddUint64( + &l.availableBandwidth, uint64(amt), + ) + // Notify the invoiceRegistry of the invoices // we just settled with this latest commitment // update.