htlcswitch: modify Bandwidth() method on links to use more accurate accoutning

In this commit, we modify the existing implementation of the
Bandwidth() method on the default ChannelLink implementation to use
much tighter accounting. Before this commit, there was a bug wherein if
the link restarted with pending un-settled HTLC’s, and one of them was
settled, then the bandwidth wouldn’t properly be updated to reflect
this fact.

To fix this, we’ve done away with the manual accounting and instead
grab the current balances from two sources: the set of active HTLC’s
within the overflow queue, and the report from the link itself which
includes the pending HTLC’s and factors in the amount we’d need to (or
not need to) pay in fees for each HTLC.
This commit is contained in:
Olaoluwa Osuntokun 2017-11-10 14:52:27 -08:00
parent 70ed50738a
commit de3af9b0c0
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
3 changed files with 29 additions and 48 deletions

@ -225,7 +225,6 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel,
mailBox: newMemoryMailBox(), mailBox: newMemoryMailBox(),
linkControl: make(chan interface{}), linkControl: make(chan interface{}),
// TODO(roasbeef): just do reserve here? // TODO(roasbeef): just do reserve here?
availableBandwidth: uint64(channel.StateSnapshot().LocalBalance),
logCommitTimer: time.NewTimer(300 * time.Millisecond), logCommitTimer: time.NewTimer(300 * time.Millisecond),
overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2), overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2),
bestHeight: currentHeight, bestHeight: currentHeight,
@ -255,6 +254,7 @@ func (l *channelLink) Start() error {
log.Infof("ChannelLink(%v) is starting", l) log.Infof("ChannelLink(%v) is starting", l)
l.mailBox.Start()
l.overflowQueue.Start() l.overflowQueue.Start()
l.wg.Add(1) l.wg.Add(1)
@ -277,6 +277,7 @@ func (l *channelLink) Stop() {
l.channel.Stop() l.channel.Stop()
l.mailBox.Stop()
l.overflowQueue.Stop() l.overflowQueue.Stop()
close(l.quit) close(l.quit)
@ -463,12 +464,6 @@ out:
htlc.PaymentHash[:], htlc.PaymentHash[:],
l.batchCounter) l.batchCounter)
// As we're adding a new pkt to the overflow
// queue, decrement the available bandwidth.
atomic.AddUint64(
&l.availableBandwidth,
-uint64(htlc.Amount),
)
l.overflowQueue.AddPkt(pkt) l.overflowQueue.AddPkt(pkt)
continue continue
} }
@ -541,16 +536,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
htlc.PaymentHash[:], htlc.PaymentHash[:],
l.batchCounter) l.batchCounter)
// If we're processing this HTLC for the first
// time, then we'll decrement the available
// bandwidth. As otherwise, we'd double count
// the effect of the HTLC.
if !isReProcess {
atomic.AddUint64(
&l.availableBandwidth, -uint64(htlc.Amount),
)
}
l.overflowQueue.AddPkt(pkt) l.overflowQueue.AddPkt(pkt)
return return
@ -603,8 +588,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
isObfuscated, isObfuscated,
) )
atomic.AddUint64(&l.availableBandwidth, uint64(htlc.Amount))
// TODO(roasbeef): need to identify if sent // TODO(roasbeef): need to identify if sent
// from switch so don't need to obfuscate // from switch so don't need to obfuscate
go l.cfg.Switch.forward(failPkt) go l.cfg.Switch.forward(failPkt)
@ -613,14 +596,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
} }
} }
// If we're processing this HTLC for the first time, then we'll
// decrement the available bandwidth.
if !isReProcess {
// Subtract the available bandwidth as we have a new
// HTLC in limbo.
atomic.AddUint64(&l.availableBandwidth, -uint64(htlc.Amount))
}
log.Tracef("Received downstream htlc: payment_hash=%x, "+ log.Tracef("Received downstream htlc: payment_hash=%x, "+
"local_log_index=%v, batch_size=%v", "local_log_index=%v, batch_size=%v",
htlc.PaymentHash[:], index, l.batchCounter+1) htlc.PaymentHash[:], index, l.batchCounter+1)
@ -633,17 +608,13 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// upstream. Therefore we settle the HTLC within the our local // upstream. Therefore we settle the HTLC within the our local
// state machine. // state machine.
pre := htlc.PaymentPreimage pre := htlc.PaymentPreimage
logIndex, amt, err := l.channel.SettleHTLC(pre) logIndex, _, err := l.channel.SettleHTLC(pre)
if err != nil { if err != nil {
// TODO(roasbeef): broadcast on-chain // TODO(roasbeef): broadcast on-chain
l.fail("unable to settle incoming HTLC: %v", err) l.fail("unable to settle incoming HTLC: %v", err)
return return
} }
// Increment the available bandwidth as we've settled an HTLC
// extended by tbe remote party.
atomic.AddUint64(&l.availableBandwidth, uint64(amt))
// With the HTLC settled, we'll need to populate the wire // With the HTLC settled, we'll need to populate the wire
// message to target the specific channel and HTLC to be // message to target the specific channel and HTLC to be
// cancelled. // cancelled.
@ -778,19 +749,15 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// If remote side have been unable to parse the onion blob we // If remote side have been unable to parse the onion blob we
// have sent to it, than we should transform the malformed HTLC // have sent to it, than we should transform the malformed HTLC
// message to the usual HTLC fail message. // message to the usual HTLC fail message.
amt, err := l.channel.ReceiveFailHTLC(idx, b.Bytes()) _, err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
if err != nil { if err != nil {
l.fail("unable to handle upstream fail HTLC: %v", err) l.fail("unable to handle upstream fail HTLC: %v", err)
return return
} }
// Increment the available bandwidth as they've removed our
// HTLC.
atomic.AddUint64(&l.availableBandwidth, uint64(amt))
case *lnwire.UpdateFailHTLC: case *lnwire.UpdateFailHTLC:
idx := msg.ID idx := msg.ID
amt, err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:]) _, err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
if err != nil { if err != nil {
l.fail("unable to handle upstream fail HTLC: %v", err) l.fail("unable to handle upstream fail HTLC: %v", err)
return return
@ -963,8 +930,11 @@ type getBandwidthCmd struct {
// //
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi { func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
// TODO(roasbeef): subtract reserverj // TODO(roasbeef): subtract reserve
return lnwire.MilliSatoshi(atomic.LoadUint64(&l.availableBandwidth)) channelBandwidth := l.channel.AvailableBalance()
overflowBandwidth := l.overflowQueue.TotalHtlcAmount()
return channelBandwidth - overflowBandwidth
} }
// policyUpdate is a message sent to a channel link when an outside sub-system // policyUpdate is a message sent to a channel link when an outside sub-system
@ -1276,19 +1246,12 @@ func (l *channelLink) processLockedInHtlcs(
} }
preimage := invoice.Terms.PaymentPreimage preimage := invoice.Terms.PaymentPreimage
logIndex, amt, err := l.channel.SettleHTLC(preimage) logIndex, _, err := l.channel.SettleHTLC(preimage)
if err != nil { if err != nil {
l.fail("unable to settle htlc: %v", err) l.fail("unable to settle htlc: %v", err)
return nil return nil
} }
// Increment the available bandwidth as we've
// settled an HTLC extended by tbe remote
// party.
atomic.AddUint64(
&l.availableBandwidth, uint64(amt),
)
// Notify the invoiceRegistry of the invoices // Notify the invoiceRegistry of the invoices
// we just settled with this latest commitment // we just settled with this latest commitment
// update. // update.

@ -51,6 +51,7 @@ type htlcPacket struct {
func newInitPacket(destNode [33]byte, htlc *lnwire.UpdateAddHTLC) *htlcPacket { func newInitPacket(destNode [33]byte, htlc *lnwire.UpdateAddHTLC) *htlcPacket {
return &htlcPacket{ return &htlcPacket{
destNode: destNode, destNode: destNode,
amount: htlc.Amount,
htlc: htlc, htlc: htlc,
} }
} }
@ -61,6 +62,7 @@ func newAddPacket(src, dest lnwire.ShortChannelID,
htlc *lnwire.UpdateAddHTLC, e ErrorEncrypter) *htlcPacket { htlc *lnwire.UpdateAddHTLC, e ErrorEncrypter) *htlcPacket {
return &htlcPacket{ return &htlcPacket{
amount: htlc.Amount,
dest: dest, dest: dest,
src: src, src: src,
htlc: htlc, htlc: htlc,

@ -3,6 +3,8 @@ package htlcswitch
import ( import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/lightningnetwork/lnd/lnwire"
) )
// packetQueue is an goroutine-safe queue of htlc packets which over flow the // packetQueue is an goroutine-safe queue of htlc packets which over flow the
@ -23,6 +25,11 @@ type packetQueue struct {
// with the lock held. // with the lock held.
queueLen int32 queueLen int32
// totalHtlcAmt is the sum of the value of all pending HTLC's currently
// residing within the overflow queue. This value should only read or
// modified *atomically*.
totalHtlcAmt int64
queueCond *sync.Cond queueCond *sync.Cond
queueMtx sync.Mutex queueMtx sync.Mutex
queue []*htlcPacket queue []*htlcPacket
@ -125,6 +132,7 @@ func (p *packetQueue) packetCoordinator() {
p.queue[0] = nil p.queue[0] = nil
p.queue = p.queue[1:] p.queue = p.queue[1:]
atomic.AddInt32(&p.queueLen, -1) atomic.AddInt32(&p.queueLen, -1)
atomic.AddInt64(&p.totalHtlcAmt, int64(-nextPkt.amount))
p.queueCond.L.Unlock() p.queueCond.L.Unlock()
case <-p.quit: case <-p.quit:
return return
@ -147,6 +155,7 @@ func (p *packetQueue) AddPkt(pkt *htlcPacket) {
p.queueCond.L.Lock() p.queueCond.L.Lock()
p.queue = append(p.queue, pkt) p.queue = append(p.queue, pkt)
atomic.AddInt32(&p.queueLen, 1) atomic.AddInt32(&p.queueLen, 1)
atomic.AddInt64(&p.totalHtlcAmt, int64(pkt.amount))
p.queueCond.L.Unlock() p.queueCond.L.Unlock()
// With the message added, we signal to the msgConsumer that there are // With the message added, we signal to the msgConsumer that there are
@ -180,3 +189,10 @@ func (p *packetQueue) SignalFreeSlot() {
func (p *packetQueue) Length() int32 { func (p *packetQueue) Length() int32 {
return atomic.LoadInt32(&p.queueLen) return atomic.LoadInt32(&p.queueLen)
} }
// TotalHtlcAmount is the total amount (in mSAT) of all HTLC's currently
// residing within the overflow queue.
func (p *packetQueue) TotalHtlcAmount() lnwire.MilliSatoshi {
// TODO(roasbeef): also factor in fee rate?
return lnwire.MilliSatoshi(atomic.LoadInt64(&p.totalHtlcAmt))
}