htlcswitch: use atomic integer to track link bandwidth internally

This commit modifies the way the bandwidth of a given channel link is
tracked, and reported externally. The prior approach pushed most of the
logic for tracking channel bandwidth into the link itself, and relied
on a report from the queue in order to determine the total available
bandwidth. This approach at times could inadvertently introduce
deadlocks when working on new features as since the query was handled
internally, it required the link to be _active_ and non-blocked in
order to respond to.

We’ve now abandoned this approach in favor of lifting the bandwidth
accounting to the highest possible abstraction layer within the link
itself. We now maintain a availableBandwidth integer that’s used
atomically within the link in response to: us adding+settling an HTLC,
and the remote party failing one of our HTLC’s.
This commit is contained in:
Olaoluwa Osuntokun 2017-09-25 12:31:52 -07:00
parent 8ecb8c70bb
commit 64317c04f1
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -92,6 +92,8 @@ type ChannelLinkConfig struct {
// Switch is a subsystem which is used to forward the incoming HTLC // Switch is a subsystem which is used to forward the incoming HTLC
// packets according to the encoded hop forwarding information // packets according to the encoded hop forwarding information
// contained in the forwarding blob within each HTLC. // contained in the forwarding blob within each HTLC.
//
// TODO(roasbeef): remove in favor of simple ForwardPacket closure func
Switch *Switch Switch *Switch
// DecodeHopIterator function is responsible for decoding HTLC Sphinx // DecodeHopIterator function is responsible for decoding HTLC Sphinx
@ -194,6 +196,12 @@ type channelLink struct {
// been processed because of the commitment transaction overflow. // been processed because of the commitment transaction overflow.
overflowQueue *packetQueue overflowQueue *packetQueue
// availableBandwidth is an integer with units of millisatoshi which
// indicates the total available bandwidth of a link, taking into
// account any pending (uncommitted) HLTC's, and any HTLC's that are
// within the overflow queue.
availableBandwidth uint64
// upstream is a channel that new messages sent from the remote peer to // upstream is a channel that new messages sent from the remote peer to
// the local peer will be sent across. // the local peer will be sent across.
upstream chan lnwire.Message upstream chan lnwire.Message
@ -237,6 +245,8 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel,
overflowQueue: newPacketQueue(), overflowQueue: newPacketQueue(),
bestHeight: currentHeight, bestHeight: currentHeight,
quit: make(chan struct{}), quit: make(chan struct{}),
// TODO(roasbeef): just do reserve here?
availableBandwidth: uint64(channel.StateSnapshot().LocalBalance),
} }
} }
@ -276,7 +286,6 @@ func (l *channelLink) Stop() {
log.Infof("ChannelLink(%v) is stopping", l) log.Infof("ChannelLink(%v) is stopping", l)
// TODO(roasbeef): need to stop channel?
l.channel.Stop() l.channel.Stop()
l.overflowQueue.Stop() l.overflowQueue.Stop()
@ -301,7 +310,7 @@ func (l *channelLink) htlcManager() {
defer l.wg.Done() defer l.wg.Done()
log.Infof("HTLC manager for ChannelPoint(%v) started, "+ log.Infof("HTLC manager for ChannelPoint(%v) started, "+
"bandwidth=%v", l.channel.ChannelPoint(), l.getBandwidth()) "bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth())
// TODO(roasbeef): check to see if able to settle any currently pending // TODO(roasbeef): check to see if able to settle any currently pending
// HTLCs // HTLCs
@ -402,7 +411,7 @@ out:
log.Tracef("Reprocessing downstream add update "+ log.Tracef("Reprocessing downstream add update "+
"with payment hash(%x)", msg.PaymentHash[:]) "with payment hash(%x)", msg.PaymentHash[:])
l.handleDownStreamPkt(packet) l.handleDownStreamPkt(packet, true)
// A message from the switch was just received. This indicates // A message from the switch was just received. This indicates
// that the link is an intermediate hop in a multi-hop HTLC // that the link is an intermediate hop in a multi-hop HTLC
@ -416,14 +425,20 @@ out:
if ok && l.overflowQueue.Length() != 0 { if ok && l.overflowQueue.Length() != 0 {
log.Infof("Downstream htlc add update with "+ log.Infof("Downstream htlc add update with "+
"payment hash(%x) have been added to "+ "payment hash(%x) have been added to "+
"reprocessing queue, batch: %v", "reprocessing queue, batch_size=%v",
htlc.PaymentHash[:], htlc.PaymentHash[:],
l.batchCounter) l.batchCounter)
// As we're adding a new pkt to the overflow
// queue, decrement the available bandwidth.
atomic.AddUint64(
&l.availableBandwidth,
-uint64(htlc.Amount),
)
l.overflowQueue.AddPkt(pkt) l.overflowQueue.AddPkt(pkt)
continue continue
} }
l.handleDownStreamPkt(pkt) l.handleDownStreamPkt(pkt, false)
// A message from the connected peer was just received. This // A message from the connected peer was just received. This
// indicates that we have a new incoming HTLC, either directly // indicates that we have a new incoming HTLC, either directly
@ -433,8 +448,6 @@ out:
case cmd := <-l.linkControl: case cmd := <-l.linkControl:
switch req := cmd.(type) { switch req := cmd.(type) {
case *getBandwidthCmd:
req.resp <- l.getBandwidth()
case *policyUpdate: case *policyUpdate:
// In order to avoid overriding a valid policy // In order to avoid overriding a valid policy
// with a "null" field in the new policy, we'll // with a "null" field in the new policy, we'll
@ -470,7 +483,7 @@ out:
// Switch. Possible messages sent by the switch include requests to forward new // Switch. Possible messages sent by the switch include requests to forward new
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently // HTLCs, timeout previously cleared HTLCs, and finally to settle currently
// cleared HTLCs with the upstream peer. // cleared HTLCs with the upstream peer.
func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
var isSettle bool var isSettle bool
switch htlc := pkt.htlc.(type) { switch htlc := pkt.htlc.(type) {
case *lnwire.UpdateAddHTLC: case *lnwire.UpdateAddHTLC:
@ -484,12 +497,25 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
// The channels spare bandwidth is fully allocated, so // The channels spare bandwidth is fully allocated, so
// we'll put this HTLC into the overflow queue. // we'll put this HTLC into the overflow queue.
case lnwallet.ErrInsufficientBalance:
fallthrough
case lnwallet.ErrMaxHTLCNumber: case lnwallet.ErrMaxHTLCNumber:
log.Infof("Downstream htlc add update with "+ log.Infof("Downstream htlc add update with "+
"payment hash(%x) have been added to "+ "payment hash(%x) have been added to "+
"reprocessing queue, batch: %v", "reprocessing queue, batch: %v",
htlc.PaymentHash[:], htlc.PaymentHash[:],
l.batchCounter) l.batchCounter)
// If we're processing this HTLC for the first
// time, then we'll decrement the available
// bandwidth. As otherwise, we'd double count
// the effect of the HTLC.
if !isReProcess {
atomic.AddUint64(
&l.availableBandwidth, -uint64(htlc.Amount),
)
}
l.overflowQueue.AddPkt(pkt) l.overflowQueue.AddPkt(pkt)
return return
@ -548,6 +574,14 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
} }
} }
// If we're processing this HTLC for the first time, then we'll
// decrement the available bandwidth.
if !isReProcess {
// Subtract the available bandwidth as we have a new
// HTLC in limbo.
atomic.AddUint64(&l.availableBandwidth, -uint64(htlc.Amount))
}
log.Tracef("Received downstream htlc: payment_hash=%x, "+ log.Tracef("Received downstream htlc: payment_hash=%x, "+
"local_log_index=%v, batch_size=%v", "local_log_index=%v, batch_size=%v",
htlc.PaymentHash[:], index, l.batchCounter+1) htlc.PaymentHash[:], index, l.batchCounter+1)
@ -560,13 +594,17 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
// upstream. Therefore we settle the HTLC within the our local // upstream. Therefore we settle the HTLC within the our local
// state machine. // state machine.
pre := htlc.PaymentPreimage pre := htlc.PaymentPreimage
logIndex, err := l.channel.SettleHTLC(pre) logIndex, amt, err := l.channel.SettleHTLC(pre)
if err != nil { if err != nil {
// TODO(roasbeef): broadcast on-chain // TODO(roasbeef): broadcast on-chain
l.fail("unable to settle incoming HTLC: %v", err) l.fail("unable to settle incoming HTLC: %v", err)
return return
} }
// Increment the available bandwidth as we've settled an HTLC
// extended by tbe remote party.
atomic.AddUint64(&l.availableBandwidth, uint64(amt))
// With the HTLC settled, we'll need to populate the wire // With the HTLC settled, we'll need to populate the wire
// message to target the specific channel and HTLC to be // message to target the specific channel and HTLC to be
// cancelled. // cancelled.
@ -651,11 +689,16 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// have sent to it, than we should transform the malformed HTLC // have sent to it, than we should transform the malformed HTLC
// message to the usual HTLC fail message. // message to the usual HTLC fail message.
idx := msg.ID idx := msg.ID
if err := l.channel.ReceiveFailHTLC(idx); err != nil { amt, err := l.channel.ReceiveFailHTLC(idx)
if err != nil {
l.fail("unable to handle upstream fail HTLC: %v", err) l.fail("unable to handle upstream fail HTLC: %v", err)
return return
} }
// Increment the available bandwidth as they've removed our
// HTLC.
atomic.AddUint64(&l.availableBandwidth, uint64(amt))
// Convert the failure type encoded within the HTLC fail // Convert the failure type encoded within the HTLC fail
// message to the proper generic lnwire error code. // message to the proper generic lnwire error code.
var failure lnwire.FailureMessage var failure lnwire.FailureMessage
@ -692,11 +735,16 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
case *lnwire.UpdateFailHTLC: case *lnwire.UpdateFailHTLC:
idx := msg.ID idx := msg.ID
if err := l.channel.ReceiveFailHTLC(idx); err != nil { amt, err := l.channel.ReceiveFailHTLC(idx)
if err != nil {
l.fail("unable to handle upstream fail HTLC: %v", err) l.fail("unable to handle upstream fail HTLC: %v", err)
return return
} }
// Increment the available bandwidth as they've removed our
// HTLC.
atomic.AddUint64(&l.availableBandwidth, uint64(amt))
l.cancelReasons[idx] = msg.Reason l.cancelReasons[idx] = msg.Reason
case *lnwire.CommitSig: case *lnwire.CommitSig:
@ -854,32 +902,15 @@ type getBandwidthCmd struct {
resp chan lnwire.MilliSatoshi resp chan lnwire.MilliSatoshi
} }
// Bandwidth returns the amount which current link might pass through channel // Bandwidth returns the total amount that can flow through the channel link at
// link. Execution through control channel gives as confidence that bandwidth // this given instance. The value returned is expressed in millatoshi and
// will not be changed during function execution. // can be used by callers when making forwarding decisions to determine if a
// link can accept an HTLC.
// //
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi { func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
command := &getBandwidthCmd{ // TODO(roasbeef): subtract reserverj
resp: make(chan lnwire.MilliSatoshi, 1), return lnwire.MilliSatoshi(atomic.LoadUint64(&l.availableBandwidth))
}
select {
case l.linkControl <- command:
return <-command.resp
case <-l.quit:
return 0
}
}
// getBandwidth returns the amount which current link might pass through
// channel link.
//
// NOTE: Should be used inside main goroutine only, otherwise the result might
// not be accurate.
func (l *channelLink) getBandwidth() lnwire.MilliSatoshi {
// TODO(roasbeef): factor in reserve, just grab mutex
return l.channel.LocalAvailableBalance() - l.overflowQueue.PendingAmount()
} }
// policyUpdate is a message sent to a channel link when an outside sub-system // policyUpdate is a message sent to a channel link when an outside sub-system
@ -1195,12 +1226,19 @@ func (l *channelLink) processLockedInHtlcs(
} }
preimage := invoice.Terms.PaymentPreimage preimage := invoice.Terms.PaymentPreimage
logIndex, err := l.channel.SettleHTLC(preimage) logIndex, amt, err := l.channel.SettleHTLC(preimage)
if err != nil { if err != nil {
l.fail("unable to settle htlc: %v", err) l.fail("unable to settle htlc: %v", err)
return nil return nil
} }
// Increment the available bandwidth as we've
// settled an HTLC extended by tbe remote
// party.
atomic.AddUint64(
&l.availableBandwidth, uint64(amt),
)
// Notify the invoiceRegistry of the invoices // Notify the invoiceRegistry of the invoices
// we just settled with this latest commitment // we just settled with this latest commitment
// update. // update.