htlcswitch: pipeline settles to switch

This commit makes the outgoing link pipeline the settle to the
switch as soon as it receives it. Previously, it would wait for a
revocation before sending it, which caused increased latency on
payments as well as possibly never settling on the incoming link.
A duplicate settle is still sent to the switch, but it is handled
gracefully. A new AckEventTicker was added to the switch which
acknowledges any pending settle / fail entries in an outgoing
link's fwd pkgs in batch. This was needed in order to reduce the
number of db txn's which would have been incurred by acking whenever
we receive a duplicate settle without batching.
This commit is contained in:
nsa 2019-05-30 12:26:24 -04:00
parent 377b7bf3ce
commit 00814dc7c1
7 changed files with 105 additions and 34 deletions

View File

@ -1608,7 +1608,13 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
return
}
// TODO(roasbeef): pipeline to switch
settlePacket := &htlcPacket{
outgoingChanID: l.ShortChanID(),
outgoingHTLCID: idx,
htlc: &lnwire.UpdateFulfillHTLC{
PaymentPreimage: pre,
},
}
// Add the newly discovered preimage to our growing list of
// uncommitted preimage. These will be written to the witness
@ -1616,6 +1622,9 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// from the remote peer.
l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
// Pipeline this settle, send it to the switch.
go l.forwardBatch(settlePacket)
case *lnwire.UpdateFailMalformedHTLC:
// Convert the failure type encoded within the HTLC fail
// message to the proper generic lnwire error code.

View File

@ -232,10 +232,10 @@ func TestChannelLinkSingleHopPayment(t *testing.T) {
t.Fatalf("unable to make the payment: %v", err)
}
// Wait for Bob to receive the revocation.
// Wait for Alice to receive the revocation.
//
// TODO(roasbeef); replace with select over returned err chan
time.Sleep(100 * time.Millisecond)
time.Sleep(2 * time.Second)
// Check that alice invoice was settled and bandwidth of HTLC
// links was changed.
@ -494,8 +494,8 @@ func testChannelLinkMultiHopPayment(t *testing.T,
t.Fatalf("unable to send payment: %v", err)
}
// Wait for Bob to receive the revocation.
time.Sleep(100 * time.Millisecond)
// Wait for Alice and Bob's second link to receive the revocation.
time.Sleep(2 * time.Second)
// Check that Carol invoice was settled and bandwidth of HTLC
// links were changed.
@ -3977,7 +3977,8 @@ func TestChannelLinkAcceptOverpay(t *testing.T) {
t.Fatalf("unable to send payment: %v", err)
}
time.Sleep(100 * time.Millisecond)
// Wait for Alice and Bob's second link to receive the revocation.
time.Sleep(2 * time.Second)
// Even though we sent 2x what was asked for, Carol should still have
// accepted the payment and marked it as settled.
@ -5801,7 +5802,12 @@ func TestChannelLinkHoldInvoiceSettle(t *testing.T) {
t.Fatal(err)
}
// Wait for Bob to receive the revocation.
// Wait for Alice to receive the revocation. This is needed
// because the settles are pipelined to the switch and otherwise
// the bandwidth won't be updated by the time Alice receives a
// response here.
time.Sleep(2 * time.Second)
if ctx.startBandwidthAlice-ctx.amount !=
ctx.n.aliceChannelLink.Bandwidth() {

View File

@ -175,6 +175,7 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error)
Notifier: &mockNotifier{},
FwdEventTicker: ticker.NewForce(DefaultFwdEventInterval),
LogEventTicker: ticker.NewForce(DefaultLogInterval),
AckEventTicker: ticker.NewForce(DefaultAckInterval),
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
}

View File

@ -30,6 +30,10 @@ const (
// DefaultLogInterval is the duration between attempts to log statistics
// about forwarding events.
DefaultLogInterval = 10 * time.Second
// DefaultAckInterval is the duration between attempts to ack any settle
// fails in a forwarding package.
DefaultAckInterval = 15 * time.Second
)
var (
@ -159,6 +163,10 @@ type Config struct {
// aggregate stats about it's forwarding during the last interval.
LogEventTicker ticker.Ticker
// AckEventTicker is a signal instructing the htlcswitch to ack any settle
// fails in forwarding packages.
AckEventTicker ticker.Ticker
// NotifyActiveChannel and NotifyInactiveChannel allow the link to tell
// the ChannelNotifier when channels become active and inactive.
NotifyActiveChannel func(wire.OutPoint)
@ -259,6 +267,11 @@ type Switch struct {
// active ChainNotifier instance. This will be used to retrieve the
// lastest height of the chain.
blockEpochStream *chainntnfs.BlockEpochEvent
// pendingSettleFails is the set of settle/fail entries that we need to
// ack in the forwarding package of the outgoing link. This was added to
// make pipelining settles more efficient.
pendingSettleFails []channeldb.SettleFailRef
}
// New creates the new instance of htlc switch.
@ -1347,11 +1360,10 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
pkt.outgoingHTLCID)
log.Error(err)
// TODO(conner): ack settle/fail
if pkt.destRef != nil {
if err := s.ackSettleFail(*pkt.destRef); err != nil {
return nil, err
}
// Add this SettleFailRef to the set of pending settle/fail entries
// awaiting acknowledgement.
s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
}
return nil, err
@ -1366,9 +1378,9 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
// forwarding package of the outgoing link for a payment circuit. We do this if
// we're the originator of the payment, so the link stops attempting to
// re-broadcast.
func (s *Switch) ackSettleFail(settleFailRef channeldb.SettleFailRef) error {
func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
return s.cfg.DB.Batch(func(tx *bbolt.Tx) error {
return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRef)
return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
})
}
@ -1533,8 +1545,17 @@ func (s *Switch) htlcForwarder() {
s.cfg.FwdEventTicker.Resume()
defer s.cfg.FwdEventTicker.Stop()
defer s.cfg.AckEventTicker.Stop()
out:
for {
// If the set of pending settle/fail entries is non-zero,
// reinstate the ack ticker so we can batch ack them.
if len(s.pendingSettleFails) > 0 {
s.cfg.AckEventTicker.Resume()
}
select {
case blockEpoch, ok := <-s.blockEpochStream.Epochs:
if !ok {
@ -1697,6 +1718,31 @@ out:
totalSatSent += diffSatSent
totalSatRecv += diffSatRecv
// The ack ticker has fired so if we have any settle/fail entries
// for a forwarding package to ack, we will do so here in a batch
// db call.
case <-s.cfg.AckEventTicker.Ticks():
// If the current set is empty, pause the ticker.
if len(s.pendingSettleFails) == 0 {
s.cfg.AckEventTicker.Pause()
continue
}
// Batch ack the settle/fail entries.
if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
log.Errorf("Unable to ack batch of settle/fails: %v", err)
continue
}
log.Tracef("Acked %d settle fails: %v", len(s.pendingSettleFails),
newLogClosure(func() string {
return spew.Sdump(s.pendingSettleFails)
}))
// Reset the pendingSettleFails buffer while keeping acquired
// memory.
s.pendingSettleFails = s.pendingSettleFails[:0]
case <-s.quit:
return
}

View File

@ -9417,22 +9417,30 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) {
// Next query for Bob's and Alice's channel states, in order to confirm
// that all payment have been successful transmitted.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
aliceChan, err := getChanInfo(ctxt, net.Alice)
if len(aliceChan.PendingHtlcs) != 0 {
t.Fatalf("alice's pending htlcs is incorrect, got %v, "+
"expected %v", len(aliceChan.PendingHtlcs), 0)
}
// Wait for the revocation to be received so alice no longer has pending
// htlcs listed and has correct balances. This is needed due to the fact
// that we now pipeline the settles.
err = lntest.WaitPredicate(func() bool {
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
aliceChan, err := getChanInfo(ctxt, net.Alice)
if err != nil {
return false
}
if len(aliceChan.PendingHtlcs) != 0 {
return false
}
if aliceChan.RemoteBalance != bobAmt {
return false
}
if aliceChan.LocalBalance != aliceAmt {
return false
}
return true
}, time.Second*5)
if err != nil {
t.Fatalf("unable to get bob's channel info: %v", err)
}
if aliceChan.RemoteBalance != bobAmt {
t.Fatalf("alice's remote balance is incorrect, got %v, "+
"expected %v", aliceChan.RemoteBalance, bobAmt)
}
if aliceChan.LocalBalance != aliceAmt {
t.Fatalf("alice's local balance is incorrect, got %v, "+
"expected %v", aliceChan.LocalBalance, aliceAmt)
t.Fatalf("failed to assert alice's pending htlcs and/or remote/local balance")
}
// Wait for Bob to receive revocation from Alice.

View File

@ -437,12 +437,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: s.fetchLastChanUpdate(),
Notifier: s.cc.chainNotifier,
FwdEventTicker: ticker.New(
htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(
htlcswitch.DefaultLogInterval),
NotifyActiveChannel: s.channelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: s.channelNotifier.NotifyInactiveChannelEvent,
FwdEventTicker: ticker.New(htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(htlcswitch.DefaultLogInterval),
AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval),
NotifyActiveChannel: s.channelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: s.channelNotifier.NotifyInactiveChannelEvent,
}, uint32(currentHeight))
if err != nil {
return nil, err

View File

@ -365,6 +365,8 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(
htlcswitch.DefaultLogInterval),
AckEventTicker: ticker.New(
htlcswitch.DefaultAckInterval),
}, uint32(currentHeight))
if err != nil {
return nil, nil, nil, nil, err