Merge pull request #3143 from Crypt-iQ/pipelining_settle_0525

htlcswitch: pipeline settles to switch
This commit is contained in:
Conner Fromknecht 2019-07-19 18:12:59 -07:00 committed by GitHub
commit 807012f960
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 105 additions and 34 deletions

@ -1608,7 +1608,13 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
return
}
// TODO(roasbeef): pipeline to switch
settlePacket := &htlcPacket{
outgoingChanID: l.ShortChanID(),
outgoingHTLCID: idx,
htlc: &lnwire.UpdateFulfillHTLC{
PaymentPreimage: pre,
},
}
// Add the newly discovered preimage to our growing list of
// uncommitted preimage. These will be written to the witness
@ -1616,6 +1622,9 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// from the remote peer.
l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
// Pipeline this settle, send it to the switch.
go l.forwardBatch(settlePacket)
case *lnwire.UpdateFailMalformedHTLC:
// Convert the failure type encoded within the HTLC fail
// message to the proper generic lnwire error code.

@ -232,10 +232,10 @@ func TestChannelLinkSingleHopPayment(t *testing.T) {
t.Fatalf("unable to make the payment: %v", err)
}
// Wait for Bob to receive the revocation.
// Wait for Alice to receive the revocation.
//
// TODO(roasbeef); replace with select over returned err chan
time.Sleep(100 * time.Millisecond)
time.Sleep(2 * time.Second)
// Check that alice invoice was settled and bandwidth of HTLC
// links was changed.
@ -494,8 +494,8 @@ func testChannelLinkMultiHopPayment(t *testing.T,
t.Fatalf("unable to send payment: %v", err)
}
// Wait for Bob to receive the revocation.
time.Sleep(100 * time.Millisecond)
// Wait for Alice and Bob's second link to receive the revocation.
time.Sleep(2 * time.Second)
// Check that Carol invoice was settled and bandwidth of HTLC
// links were changed.
@ -3977,7 +3977,8 @@ func TestChannelLinkAcceptOverpay(t *testing.T) {
t.Fatalf("unable to send payment: %v", err)
}
time.Sleep(100 * time.Millisecond)
// Wait for Alice and Bob's second link to receive the revocation.
time.Sleep(2 * time.Second)
// Even though we sent 2x what was asked for, Carol should still have
// accepted the payment and marked it as settled.
@ -5801,7 +5802,12 @@ func TestChannelLinkHoldInvoiceSettle(t *testing.T) {
t.Fatal(err)
}
// Wait for Bob to receive the revocation.
// Wait for Alice to receive the revocation. This is needed
// because the settles are pipelined to the switch and otherwise
// the bandwidth won't be updated by the time Alice receives a
// response here.
time.Sleep(2 * time.Second)
if ctx.startBandwidthAlice-ctx.amount !=
ctx.n.aliceChannelLink.Bandwidth() {

@ -175,6 +175,7 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error)
Notifier: &mockNotifier{},
FwdEventTicker: ticker.NewForce(DefaultFwdEventInterval),
LogEventTicker: ticker.NewForce(DefaultLogInterval),
AckEventTicker: ticker.NewForce(DefaultAckInterval),
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
}

@ -30,6 +30,10 @@ const (
// DefaultLogInterval is the duration between attempts to log statistics
// about forwarding events.
DefaultLogInterval = 10 * time.Second
// DefaultAckInterval is the duration between attempts to ack any settle
// fails in a forwarding package.
DefaultAckInterval = 15 * time.Second
)
var (
@ -159,6 +163,10 @@ type Config struct {
// aggregate stats about it's forwarding during the last interval.
LogEventTicker ticker.Ticker
// AckEventTicker is a signal instructing the htlcswitch to ack any settle
// fails in forwarding packages.
AckEventTicker ticker.Ticker
// NotifyActiveChannel and NotifyInactiveChannel allow the link to tell
// the ChannelNotifier when channels become active and inactive.
NotifyActiveChannel func(wire.OutPoint)
@ -259,6 +267,11 @@ type Switch struct {
// active ChainNotifier instance. This will be used to retrieve the
// lastest height of the chain.
blockEpochStream *chainntnfs.BlockEpochEvent
// pendingSettleFails is the set of settle/fail entries that we need to
// ack in the forwarding package of the outgoing link. This was added to
// make pipelining settles more efficient.
pendingSettleFails []channeldb.SettleFailRef
}
// New creates the new instance of htlc switch.
@ -1347,11 +1360,10 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
pkt.outgoingHTLCID)
log.Error(err)
// TODO(conner): ack settle/fail
if pkt.destRef != nil {
if err := s.ackSettleFail(*pkt.destRef); err != nil {
return nil, err
}
// Add this SettleFailRef to the set of pending settle/fail entries
// awaiting acknowledgement.
s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
}
return nil, err
@ -1366,9 +1378,9 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
// forwarding package of the outgoing link for a payment circuit. We do this if
// we're the originator of the payment, so the link stops attempting to
// re-broadcast.
func (s *Switch) ackSettleFail(settleFailRef channeldb.SettleFailRef) error {
func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
return s.cfg.DB.Batch(func(tx *bbolt.Tx) error {
return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRef)
return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
})
}
@ -1533,8 +1545,17 @@ func (s *Switch) htlcForwarder() {
s.cfg.FwdEventTicker.Resume()
defer s.cfg.FwdEventTicker.Stop()
defer s.cfg.AckEventTicker.Stop()
out:
for {
// If the set of pending settle/fail entries is non-zero,
// reinstate the ack ticker so we can batch ack them.
if len(s.pendingSettleFails) > 0 {
s.cfg.AckEventTicker.Resume()
}
select {
case blockEpoch, ok := <-s.blockEpochStream.Epochs:
if !ok {
@ -1697,6 +1718,31 @@ out:
totalSatSent += diffSatSent
totalSatRecv += diffSatRecv
// The ack ticker has fired so if we have any settle/fail entries
// for a forwarding package to ack, we will do so here in a batch
// db call.
case <-s.cfg.AckEventTicker.Ticks():
// If the current set is empty, pause the ticker.
if len(s.pendingSettleFails) == 0 {
s.cfg.AckEventTicker.Pause()
continue
}
// Batch ack the settle/fail entries.
if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
log.Errorf("Unable to ack batch of settle/fails: %v", err)
continue
}
log.Tracef("Acked %d settle fails: %v", len(s.pendingSettleFails),
newLogClosure(func() string {
return spew.Sdump(s.pendingSettleFails)
}))
// Reset the pendingSettleFails buffer while keeping acquired
// memory.
s.pendingSettleFails = s.pendingSettleFails[:0]
case <-s.quit:
return
}

@ -9409,22 +9409,30 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) {
// Next query for Bob's and Alice's channel states, in order to confirm
// that all payment have been successful transmitted.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
aliceChan, err := getChanInfo(ctxt, net.Alice)
if len(aliceChan.PendingHtlcs) != 0 {
t.Fatalf("alice's pending htlcs is incorrect, got %v, "+
"expected %v", len(aliceChan.PendingHtlcs), 0)
}
// Wait for the revocation to be received so alice no longer has pending
// htlcs listed and has correct balances. This is needed due to the fact
// that we now pipeline the settles.
err = lntest.WaitPredicate(func() bool {
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
aliceChan, err := getChanInfo(ctxt, net.Alice)
if err != nil {
return false
}
if len(aliceChan.PendingHtlcs) != 0 {
return false
}
if aliceChan.RemoteBalance != bobAmt {
return false
}
if aliceChan.LocalBalance != aliceAmt {
return false
}
return true
}, time.Second*5)
if err != nil {
t.Fatalf("unable to get bob's channel info: %v", err)
}
if aliceChan.RemoteBalance != bobAmt {
t.Fatalf("alice's remote balance is incorrect, got %v, "+
"expected %v", aliceChan.RemoteBalance, bobAmt)
}
if aliceChan.LocalBalance != aliceAmt {
t.Fatalf("alice's local balance is incorrect, got %v, "+
"expected %v", aliceChan.LocalBalance, aliceAmt)
t.Fatalf("failed to assert alice's pending htlcs and/or remote/local balance")
}
// Wait for Bob to receive revocation from Alice.

@ -437,12 +437,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: s.fetchLastChanUpdate(),
Notifier: s.cc.chainNotifier,
FwdEventTicker: ticker.New(
htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(
htlcswitch.DefaultLogInterval),
NotifyActiveChannel: s.channelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: s.channelNotifier.NotifyInactiveChannelEvent,
FwdEventTicker: ticker.New(htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(htlcswitch.DefaultLogInterval),
AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval),
NotifyActiveChannel: s.channelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: s.channelNotifier.NotifyInactiveChannelEvent,
}, uint32(currentHeight))
if err != nil {
return nil, err

@ -365,6 +365,8 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(
htlcswitch.DefaultLogInterval),
AckEventTicker: ticker.New(
htlcswitch.DefaultAckInterval),
}, uint32(currentHeight))
if err != nil {
return nil, nil, nil, nil, err