htlcswitch: add usage of queue in channel link

In this commit usage of the pending packet queue have been added.
This queue will consume the downstream packets if state machine return
the error that we do not have enough capacity for htlc in commitment
transaction. Upon receiving settle/fail payment descriptors - add htlc
have been removed, we release the slot, and process pending add htlc
requests.
This commit is contained in:
Andrey Samokhvalov 2017-05-24 18:27:39 +03:00 committed by Olaoluwa Osuntokun
parent 22d90d6b35
commit 7595bee27c
3 changed files with 126 additions and 2 deletions

@ -9,6 +9,8 @@ import (
"io"
"encoding/hex"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
@ -82,6 +84,10 @@ type channelLink struct {
// which may affect behaviour of the service.
cfg *ChannelLinkConfig
// queue is used to store the htlc add updates which haven't been
// processed because of the commitment trancation overflow.
queue *packetQueue
// upstream is a channel which responsible for propagating the
// received from remote peer messages, with which we have an opened
// channel, to handler function.
@ -115,6 +121,7 @@ func NewChannelLink(cfg *ChannelLinkConfig,
downstream: make(chan *htlcPacket),
control: make(chan interface{}),
cancelReasons: make(map[uint64]lnwire.OpaqueReason),
queue: newWaitingQueue(),
quit: make(chan struct{}),
}
}
@ -248,7 +255,31 @@ out:
break out
}
// Previously add update have been added to the reprocessing
// queue because of the overflooding threat, and now we are
// trying to process it again.
case packet := <-l.queue.pending:
msg := packet.htlc.(*lnwire.UpdateAddHTLC)
log.Infof("Reprocess downstream add update "+
"with payment hash(%v)",
hex.EncodeToString(msg.PaymentHash[:]))
l.handleDownStreamPkt(packet)
case pkt := <-l.downstream:
// If we have non empty processing queue than in
// order to preserve the order of add updates
// consume it, and process it later.
htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
if ok && l.queue.length() != 0 {
log.Infof("Downstream htlc add update with "+
"payment hash(%v) have been added to "+
"reprocessing queue, batch: %v",
hex.EncodeToString(htlc.PaymentHash[:]),
l.batchCounter)
l.queue.consume(pkt)
continue
}
l.handleDownStreamPkt(pkt)
case msg := <-l.upstream:
@ -282,7 +313,15 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
// chains.
htlc.ChanID = l.ChanID()
index, err := l.channel.AddHTLC(htlc)
if err != nil {
if err == lnwallet.ErrMaxHTLCNumber {
log.Infof("Downstream htlc add update with "+
"payment hash(%v) have been added to "+
"reprocessing queue, batch: %v",
hex.EncodeToString(htlc.PaymentHash[:]),
l.batchCounter)
l.queue.consume(pkt)
return
} else if err != nil {
// TODO: possibly perform fallback/retry logic
// depending on type of error
@ -298,6 +337,10 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
err)
return
}
log.Tracef("Receive downstream htlc with payment hash"+
"(%v), assign the index: %v, batch: %v",
hex.EncodeToString(htlc.PaymentHash[:]),
index, l.batchCounter+1)
htlc.ID = index
l.cfg.Peer.SendMessage(htlc)
@ -381,6 +424,9 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
l.cfg.Peer.Disconnect()
return
}
log.Tracef("Receive upstream htlc with payment hash(%v), "+
"assign the index: %v",
hex.EncodeToString(msg.PaymentHash[:]), index)
// TODO(roasbeef): perform sanity checks on per-hop payload
// * time-lock is sane, fee, chain, etc
@ -611,6 +657,7 @@ func (l *channelLink) processLockedInHtlcs(
&lnwire.UpdateFufillHTLC{
PaymentPreimage: pd.RPreimage,
}, pd.RHash, pd.Amount))
l.queue.release()
case lnwallet.Fail:
opaqueReason := l.cancelReasons[pd.ParentIndex]
@ -625,6 +672,7 @@ func (l *channelLink) processLockedInHtlcs(
Reason: opaqueReason,
ChanID: l.ChanID(),
}, pd.RHash, pd.Amount))
l.queue.release()
case lnwallet.Add:
blob := l.blobs[pd.Index]

@ -126,6 +126,82 @@ func TestChannelLinkSingleHopPayment(t *testing.T) {
}
}
// TestChannelLinkBidirectionalOneHopPayments tests the ability of channel
// link to cope with bigger number of payment updates that commitment
// transaction may consist.
func TestChannelLinkBidirectionalOneHopPayments(t *testing.T) {
n := newThreeHopNetwork(t,
btcutil.SatoshiPerBitcoin*3,
btcutil.SatoshiPerBitcoin*5,
)
if err := n.start(); err != nil {
t.Fatal(err)
}
defer n.stop()
bobBandwidthBefore := n.firstBobChannelLink.Bandwidth()
aliceBandwidthBefore := n.aliceChannelLink.Bandwidth()
debug := false
if debug {
// Log message that alice receives.
n.aliceServer.record(createLogFunc("alice",
n.aliceChannelLink.ChanID()))
// Log message that bob receives.
n.bobServer.record(createLogFunc("bob",
n.firstBobChannelLink.ChanID()))
}
// Send max available payment number in both sides, thereby testing
// the property of channel link to cope with overflowing.
errChan := make(chan error)
count := 2 * lnwallet.MaxHTLCNumber
for i := 0; i < count/2; i++ {
go func() {
_, err := n.makePayment([]Peer{
n.aliceServer,
n.bobServer,
}, 10)
errChan <- err
}()
}
for i := 0; i < count/2; i++ {
go func() {
_, err := n.makePayment([]Peer{
n.bobServer,
n.aliceServer,
}, 10)
errChan <- err
}()
}
// Check that alice invoice was settled and bandwidth of HTLC
// links was changed.
for i := 0; i < count; i++ {
select {
case err := <-errChan:
if err != nil {
t.Fatalf("unable to make the payment: %v", err)
}
case <-time.After(4 * time.Second):
t.Fatalf("timeout: (%v/%v)", i+1, count)
}
}
// At the end Bob and Alice balances should be the same as previous,
// because they sent the equal amount of money to each other.
if aliceBandwidthBefore != n.aliceChannelLink.Bandwidth() {
t.Fatal("alice bandwidth shouldn't have changed")
}
if bobBandwidthBefore != n.firstBobChannelLink.Bandwidth() {
t.Fatal("bob bandwidth shouldn't have changed")
}
}
// TestChannelLinkMultiHopPayment checks the ability to send payment over two
// hopes. In this test we send the payment from Carol to Alice over Bob peer.
// (Carol -> Bob -> Alice) and checking that HTLC was settled properly and

@ -359,7 +359,7 @@ func (n *threeHopNetwork) makePayment(peers []Peer,
select {
case err := <-errChan:
return invoice, err
case <-time.After(6 * time.Second):
case <-time.After(12 * time.Second):
return invoice, errors.New("htlc was no settled in time")
}
}