htlcswitch: add HTLC overflow flow-control via bounded channels

This commit fixes a prior bug which would cause the set of HTLC’s on a
node’s commitment to potentially overflow if an HTLC was accepted or
attempted to be forwarded that but the commitment transaction over the
maximum allowed HTLC’s on a commitment transaction. This would cause
the HTLC to silently be rejected or cause a connection disconnect. In
either case, this would cause the two states to be desynchronized any
pending HTLC’s to be ignored.

We fix this issue by introducing the concept of a bounded channel,
which is a channel in which the number of items send and recevied over
the channel must be balanced in order to allow a new send to succeed
w/o blocking. We achieve this by using a chan struct{} as a semaphore
and decrement it each time a packet it sent, increasing the semaphore
one a packet is received. This creates a channel that we can use to
ensure the switch never sends more than N HTLC’s to a link before any
of the HTLC’s have been settled.

With this bug fix, it’s now once again possible to trigger sustained
bursts of payments through lnd nodes.
This commit is contained in:
Olaoluwa Osuntokun 2017-03-24 16:39:56 -07:00
parent b7fa816bce
commit 505421db2c
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 132 additions and 22 deletions

@ -12,6 +12,7 @@ import (
"github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/chaincfg/chainhash"
@ -30,6 +31,77 @@ var (
zeroBytes [32]byte zeroBytes [32]byte
) )
// boundedLinkChan is a simple wrapper around a link's communication channel
// that bounds the total flow into and through the channel. Channels attached
// the link have a value which defines the max number of pending HTLC's present
// within the commitment transaction. Using this struct we establish a
// synchronization primitive that ensure we don't send additional htlcPackets
// to a link if the max limit has een reached. Once HTLC's are cleared from the
// commitment transaction, slots are freed up and more can proceed.
type boundedLinkChan struct {
// slots is a buffered channel whose buffer is the total number of
// outstanding HTLC's we can add to a link's commitment transaction.
// This channel is essentially used as a semaphore.
slots chan struct{}
// linkChan is a channel that is connected to the channel state machine
// for a link. The switch will send adds, settles, and cancels over
// this channel.
linkChan chan *htlcPacket
}
// newBoundedChan makes a new boundedLinkChan that has numSlots free slots that
// are depleted on each send until a slot is re-stored. linkChan is the
// underlying channel that will be sent upon.
func newBoundedLinkChan(numSlots uint32,
linkChan chan *htlcPacket) *boundedLinkChan {
b := &boundedLinkChan{
slots: make(chan struct{}, numSlots),
linkChan: linkChan,
}
b.restoreSlots(numSlots)
return b
}
// sendAndConsume sends a packet to the linkChan and consumes a single token in
// the process.
//
// TODO(roasbeef): add error fall through case?
func (b *boundedLinkChan) sendAndConsume(pkt *htlcPacket) {
<-b.slots
b.linkChan <- pkt
}
// sendAndRestore sends a packet to the linkChan and consumes a single token in
// the process. This method is called when the switch sends either a cancel or
// settle HTLC message to the link.
func (b *boundedLinkChan) sendAndRestore(pkt *htlcPacket) {
b.linkChan <- pkt
b.slots <- struct{}{}
}
// consumeSlot consumes a single slot from the bounded channel. This method is
// called once the switch receives a new htlc add message from a link right
// before forwarding it to the next hop.
func (b *boundedLinkChan) consumeSlot() {
<-b.slots
}
// restoreSlot restores a single slots to the bounded channel. This method is
// called once the switch receives an HTLC cancel or settle from a link.
func (b *boundedLinkChan) restoreSlot() {
b.slots <- struct{}{}
}
// restoreSlots adds numSlots additional slots to the bounded channel.
func (b *boundedLinkChan) restoreSlots(numSlots uint32) {
for i := uint32(0); i < numSlots; i++ {
b.slots <- struct{}{}
}
}
// link represents an active channel capable of forwarding HTLCs. Each // link represents an active channel capable of forwarding HTLCs. Each
// active channel registered with the htlc switch creates a new link which will // active channel registered with the htlc switch creates a new link which will
// be used for forwarding outgoing HTLCs. The link also has additional // be used for forwarding outgoing HTLCs. The link also has additional
@ -40,7 +112,7 @@ type link struct {
availableBandwidth int64 // atomic availableBandwidth int64 // atomic
linkChan chan *htlcPacket *boundedLinkChan
peer *peer peer *peer
@ -66,7 +138,8 @@ type htlcPacket struct {
preImage chan [32]byte preImage chan [32]byte
err chan error err chan error
done chan struct{}
} }
// circuitKey uniquely identifies an active Sphinx (onion routing) circuit // circuitKey uniquely identifies an active Sphinx (onion routing) circuit
@ -214,6 +287,7 @@ func (h *htlcSwitch) Stop() error {
// alternative error is returned. // alternative error is returned.
func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) ([32]byte, error) { func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) ([32]byte, error) {
htlcPkt.err = make(chan error, 1) htlcPkt.err = make(chan error, 1)
htlcPkt.done = make(chan struct{})
htlcPkt.preImage = make(chan [32]byte, 1) htlcPkt.preImage = make(chan [32]byte, 1)
h.outgoingPayments <- htlcPkt h.outgoingPayments <- htlcPkt
@ -277,7 +351,9 @@ out:
hswcLog.Tracef("Sending %v to %x", amt, dest[:]) hswcLog.Tracef("Sending %v to %x", amt, dest[:])
go func() { go func() {
link.linkChan <- htlcPkt link.sendAndConsume(htlcPkt)
<-htlcPkt.done
link.restoreSlot()
}() }()
n := atomic.AddInt64(&link.availableBandwidth, n := atomic.AddInt64(&link.availableBandwidth,
@ -345,6 +421,11 @@ out:
settleLink := h.chanIndex[pkt.srcLink] settleLink := h.chanIndex[pkt.srcLink]
h.chanIndexMtx.RUnlock() h.chanIndexMtx.RUnlock()
// As the link now has a new HTLC that's been
// propagated to us, we'll consume a slot from
// it's bounded channel.
settleLink.consumeSlot()
// If the link we're attempting to forward the // If the link we're attempting to forward the
// HTLC over has insufficient capacity, then // HTLC over has insufficient capacity, then
// we'll cancel the HTLC as the payment cannot // we'll cancel the HTLC as the payment cannot
@ -361,11 +442,13 @@ out:
payHash: payHash, payHash: payHash,
msg: &lnwire.UpdateFailHTLC{ msg: &lnwire.UpdateFailHTLC{
Reason: []byte{uint8(lnwire.InsufficientCapacity)}, Reason: []byte{uint8(lnwire.InsufficientCapacity)},
}, }, err: make(chan error, 1),
err: make(chan error, 1),
} }
settleLink.linkChan <- pkt // Send the cancel message along the
// link, restoring a slot in the
// bounded channel in the process.
settleLink.sendAndRestore(pkt)
continue continue
} }
@ -385,11 +468,12 @@ out:
// to the clearing link within the circuit to // to the clearing link within the circuit to
// continue propagating the HTLC across the // continue propagating the HTLC across the
// network. // network.
circuit.clear.linkChan <- &htlcPacket{ circuit.clear.sendAndConsume(&htlcPacket{
msg: wireMsg, msg: wireMsg,
preImage: make(chan [32]byte, 1), preImage: make(chan [32]byte, 1),
err: make(chan error, 1), err: make(chan error, 1),
} done: make(chan struct{}),
})
// Reduce the available bandwidth for the link // Reduce the available bandwidth for the link
// as it will clear the above HTLC, increasing // as it will clear the above HTLC, increasing
@ -421,15 +505,17 @@ out:
continue continue
} }
circuit.clear.restoreSlot()
hswcLog.Debugf("Closing completed onion "+ hswcLog.Debugf("Closing completed onion "+
"circuit for %x: %v<->%v", rHash[:], "circuit for %x: %v<->%v", rHash[:],
circuit.clear.chanPoint, circuit.clear.chanPoint,
circuit.settle.chanPoint) circuit.settle.chanPoint)
circuit.settle.linkChan <- &htlcPacket{ circuit.settle.sendAndRestore(&htlcPacket{
msg: wireMsg, msg: wireMsg,
err: make(chan error, 1), err: make(chan error, 1),
} })
// Increase the available bandwidth for the // Increase the available bandwidth for the
// link as it will settle the above HTLC, // link as it will settle the above HTLC,
@ -459,6 +545,8 @@ out:
continue continue
} }
circuit.clear.restoreSlot()
// Since an outgoing HTLC we sent on the clear // Since an outgoing HTLC we sent on the clear
// link has been cancelled, we update the // link has been cancelled, we update the
// bandwidth of the clear link, restoring the // bandwidth of the clear link, restoring the
@ -473,11 +561,11 @@ out:
// the error propagation by sending the // the error propagation by sending the
// cancellation message over the link that sent // cancellation message over the link that sent
// us the incoming HTLC. // us the incoming HTLC.
circuit.settle.linkChan <- &htlcPacket{ circuit.settle.sendAndRestore(&htlcPacket{
msg: wireMsg, msg: wireMsg,
payHash: pkt.payHash, payHash: pkt.payHash,
err: make(chan error, 1), err: make(chan error, 1),
} })
delete(h.paymentCircuits, pkt.payHash) delete(h.paymentCircuits, pkt.payHash)
} }
@ -550,11 +638,17 @@ func (h *htlcSwitch) handleRegisterLink(req *registerLinkMsg) {
newLink := &link{ newLink := &link{
capacity: req.linkInfo.Capacity, capacity: req.linkInfo.Capacity,
availableBandwidth: int64(req.linkInfo.LocalBalance), availableBandwidth: int64(req.linkInfo.LocalBalance),
linkChan: req.linkChan,
peer: req.peer, peer: req.peer,
chanPoint: chanPoint, chanPoint: chanPoint,
} }
// To ensure we never accidentally cause an HTLC overflow, we'll limit,
// we'll use this buffered channel as as semaphore in order to limit
// the number of outstanding HTLC's we extend to the target link.
//const numSlots = (lnwallet.MaxHTLCNumber / 2) - 1
const numSlots = lnwallet.MaxHTLCNumber - 5
newLink.boundedLinkChan = newBoundedLinkChan(numSlots, req.linkChan)
// First update the channel index with this new channel point. The // First update the channel index with this new channel point. The
// channel index will be used to quickly lookup channels in order to: // channel index will be used to quickly lookup channels in order to:
// close them, update their link capacity, or possibly during multi-hop // close them, update their link capacity, or possibly during multi-hop
@ -818,6 +912,9 @@ type linkInfoUpdateMsg struct {
// within the link by the passed satoshi delta. This function may be used when // within the link by the passed satoshi delta. This function may be used when
// re-anchoring to boost the capacity of a channel, or once a peer settles an // re-anchoring to boost the capacity of a channel, or once a peer settles an
// HTLC invoice. // HTLC invoice.
func (h *htlcSwitch) UpdateLink(chanPoint *wire.OutPoint, bandwidthDelta btcutil.Amount) { func (h *htlcSwitch) UpdateLink(chanPoint *wire.OutPoint, delta btcutil.Amount) {
h.linkControl <- &linkInfoUpdateMsg{chanPoint, bandwidthDelta} h.linkControl <- &linkInfoUpdateMsg{
targetLink: chanPoint,
bandwidthDelta: delta,
}
} }

27
peer.go

@ -741,6 +741,8 @@ out:
// Now that the channel is open, notify the Htlc // Now that the channel is open, notify the Htlc
// Switch of a new active link. // Switch of a new active link.
// TODO(roasbeef): register needs to account for
// in-flight htlc's on restart
chanSnapShot := newChanReq.channel.StateSnapshot() chanSnapShot := newChanReq.channel.StateSnapshot()
downstreamLink := make(chan *htlcPacket, 10) downstreamLink := make(chan *htlcPacket, 10)
plexChan := p.server.htlcSwitch.RegisterLink(p, plexChan := p.server.htlcSwitch.RegisterLink(p,
@ -1022,6 +1024,7 @@ type pendingPayment struct {
preImage chan [32]byte preImage chan [32]byte
err chan error err chan error
done chan struct{}
} }
// commitmentState is the volatile+persistent state of an active channel's // commitmentState is the volatile+persistent state of an active channel's
@ -1263,16 +1266,22 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
if err != nil { if err != nil {
// TODO: possibly perform fallback/retry logic // TODO: possibly perform fallback/retry logic
// depending on type of error // depending on type of error
// TODO: send a cancel message back to the htlcSwitch.
peerLog.Errorf("Adding HTLC rejected: %v", err) peerLog.Errorf("Adding HTLC rejected: %v", err)
pkt.err <- err pkt.err <- err
close(pkt.done)
// Increase the available bandwidth of the link, // The HTLC was unable to be added to the state
// previously it was decremented and because // machine, as a result, we'll signal the switch to
// HTLC adding failed we should do the reverse // cancel the pending payment.
// operation. // TODO(roasbeef): need to update link as well if local
htlcSwitch := p.server.htlcSwitch // HTLC?
htlcSwitch.UpdateLink(&htlc.ChannelPoint, pkt.amt) state.switchChan <- &htlcPacket{
amt: htlc.Amount,
msg: &lnwire.UpdateFailHTLC{
Reason: []byte{byte(0)},
},
srcLink: *state.chanPoint,
}
return return
} }
@ -1283,6 +1292,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
index: index, index: index,
preImage: pkt.preImage, preImage: pkt.preImage,
err: pkt.err, err: pkt.err,
done: pkt.done,
}) })
case *lnwire.UpdateFufillHTLC: case *lnwire.UpdateFufillHTLC:
@ -1368,6 +1378,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
index, err := state.channel.ReceiveHTLC(htlcPkt) index, err := state.channel.ReceiveHTLC(htlcPkt)
if err != nil { if err != nil {
peerLog.Errorf("Receiving HTLC rejected: %v", err) peerLog.Errorf("Receiving HTLC rejected: %v", err)
p.Disconnect()
return return
} }
@ -1545,6 +1556,8 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
p.err <- errors.New(errMsg.String()) p.err <- errors.New(errMsg.String())
} }
close(p.done)
delete(state.clearedHTCLs, htlc.ParentIndex) delete(state.clearedHTCLs, htlc.ParentIndex)
} }