htlcswitch: sync link hand-off
This commit extends the link with a new synchronous delivery point for local UpdateAddHTLC messages. The switch method SendHTLC is updated to use this delivery point and thereby becomes a synchronous call. For MPP payments, synchronous hand-off is important. Otherwise the next pathfinding round could start without the channel balance updated yet.
This commit is contained in:
parent
de2df5606a
commit
c325bf8c57
@ -70,6 +70,10 @@ type ChannelLink interface {
|
|||||||
// possible).
|
// possible).
|
||||||
HandleSwitchPacket(*htlcPacket) error
|
HandleSwitchPacket(*htlcPacket) error
|
||||||
|
|
||||||
|
// HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC
|
||||||
|
// packet. It will be processed synchronously.
|
||||||
|
HandleLocalAddPacket(*htlcPacket) error
|
||||||
|
|
||||||
// HandleChannelUpdate handles the htlc requests as settle/add/fail
|
// HandleChannelUpdate handles the htlc requests as settle/add/fail
|
||||||
// which sent to us from remote peer we have a channel with.
|
// which sent to us from remote peer we have a channel with.
|
||||||
//
|
//
|
||||||
|
@ -286,6 +286,14 @@ type ChannelLinkConfig struct {
|
|||||||
HtlcNotifier htlcNotifier
|
HtlcNotifier htlcNotifier
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// localUpdateAddMsg contains a locally initiated htlc and a channel that will
|
||||||
|
// receive the outcome of the link processing. This channel must be buffered to
|
||||||
|
// prevent the link from blocking.
|
||||||
|
type localUpdateAddMsg struct {
|
||||||
|
pkt *htlcPacket
|
||||||
|
err chan error
|
||||||
|
}
|
||||||
|
|
||||||
// channelLink is the service which drives a channel's commitment update
|
// channelLink is the service which drives a channel's commitment update
|
||||||
// state-machine. In the event that an HTLC needs to be propagated to another
|
// state-machine. In the event that an HTLC needs to be propagated to another
|
||||||
// link, the forward handler from config is used which sends HTLC to the
|
// link, the forward handler from config is used which sends HTLC to the
|
||||||
@ -346,6 +354,10 @@ type channelLink struct {
|
|||||||
// by the HTLC switch.
|
// by the HTLC switch.
|
||||||
downstream chan *htlcPacket
|
downstream chan *htlcPacket
|
||||||
|
|
||||||
|
// localUpdateAdd is a channel to which locally initiated HTLCs are
|
||||||
|
// sent across.
|
||||||
|
localUpdateAdd chan *localUpdateAddMsg
|
||||||
|
|
||||||
// htlcUpdates is a channel that we'll use to update outside
|
// htlcUpdates is a channel that we'll use to update outside
|
||||||
// sub-systems with the latest set of active HTLC's on our channel.
|
// sub-systems with the latest set of active HTLC's on our channel.
|
||||||
htlcUpdates chan *contractcourt.ContractUpdate
|
htlcUpdates chan *contractcourt.ContractUpdate
|
||||||
@ -395,11 +407,12 @@ func NewChannelLink(cfg ChannelLinkConfig,
|
|||||||
channel: channel,
|
channel: channel,
|
||||||
shortChanID: channel.ShortChanID(),
|
shortChanID: channel.ShortChanID(),
|
||||||
// TODO(roasbeef): just do reserve here?
|
// TODO(roasbeef): just do reserve here?
|
||||||
htlcUpdates: make(chan *contractcourt.ContractUpdate),
|
htlcUpdates: make(chan *contractcourt.ContractUpdate),
|
||||||
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
|
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
|
||||||
hodlQueue: queue.NewConcurrentQueue(10),
|
hodlQueue: queue.NewConcurrentQueue(10),
|
||||||
log: build.NewPrefixLog(logPrefix, log),
|
log: build.NewPrefixLog(logPrefix, log),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
|
localUpdateAdd: make(chan *localUpdateAddMsg),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1112,6 +1125,10 @@ func (l *channelLink) htlcManager() {
|
|||||||
case pkt := <-l.downstream:
|
case pkt := <-l.downstream:
|
||||||
l.handleDownstreamPkt(pkt)
|
l.handleDownstreamPkt(pkt)
|
||||||
|
|
||||||
|
// A message containing a locally initiated add was received.
|
||||||
|
case msg := <-l.localUpdateAdd:
|
||||||
|
msg.err <- l.handleDownstreamUpdateAdd(msg.pkt)
|
||||||
|
|
||||||
// A message from the connected peer was just received. This
|
// A message from the connected peer was just received. This
|
||||||
// indicates that we have a new incoming HTLC, either directly
|
// indicates that we have a new incoming HTLC, either directly
|
||||||
// for us, or part of a multi-hop HTLC circuit.
|
// for us, or part of a multi-hop HTLC circuit.
|
||||||
@ -1256,8 +1273,11 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
|
|||||||
|
|
||||||
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
|
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
|
||||||
// downstream HTLC Switch.
|
// downstream HTLC Switch.
|
||||||
func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) {
|
func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
|
||||||
htlc := pkt.htlc.(*lnwire.UpdateAddHTLC)
|
htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
|
||||||
|
if !ok {
|
||||||
|
return errors.New("not an UpdateAddHTLC packet")
|
||||||
|
}
|
||||||
|
|
||||||
// If hodl.AddOutgoing mode is active, we exit early to simulate
|
// If hodl.AddOutgoing mode is active, we exit early to simulate
|
||||||
// arbitrary delays between the switch adding an ADD to the
|
// arbitrary delays between the switch adding an ADD to the
|
||||||
@ -1265,7 +1285,7 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) {
|
|||||||
if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
|
if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
|
||||||
l.log.Warnf(hodl.AddOutgoing.Warning())
|
l.log.Warnf(hodl.AddOutgoing.Warning())
|
||||||
l.mailBox.AckPacket(pkt.inKey())
|
l.mailBox.AckPacket(pkt.inKey())
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// A new payment has been initiated via the downstream channel,
|
// A new payment has been initiated via the downstream channel,
|
||||||
@ -1291,7 +1311,10 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) {
|
|||||||
// unacknowledged.
|
// unacknowledged.
|
||||||
l.mailBox.FailAdd(pkt)
|
l.mailBox.FailAdd(pkt)
|
||||||
|
|
||||||
return
|
return NewDetailedLinkError(
|
||||||
|
lnwire.NewTemporaryChannelFailure(nil),
|
||||||
|
OutgoingFailureDownstreamHtlcAdd,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
l.log.Tracef("received downstream htlc: payment_hash=%x, "+
|
l.log.Tracef("received downstream htlc: payment_hash=%x, "+
|
||||||
@ -1324,6 +1347,8 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
l.tryBatchUpdateCommitTx()
|
l.tryBatchUpdateCommitTx()
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
|
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
|
||||||
@ -1335,7 +1360,9 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) {
|
|||||||
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
|
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
|
||||||
switch htlc := pkt.htlc.(type) {
|
switch htlc := pkt.htlc.(type) {
|
||||||
case *lnwire.UpdateAddHTLC:
|
case *lnwire.UpdateAddHTLC:
|
||||||
l.handleDownstreamUpdateAdd(pkt)
|
// Handle add message. The returned error can be ignored,
|
||||||
|
// because it is also sent through the mailbox.
|
||||||
|
_ = l.handleDownstreamUpdateAdd(pkt)
|
||||||
|
|
||||||
case *lnwire.UpdateFulfillHTLC:
|
case *lnwire.UpdateFulfillHTLC:
|
||||||
// If hodl.SettleOutgoing mode is active, we exit early to
|
// If hodl.SettleOutgoing mode is active, we exit early to
|
||||||
@ -2329,6 +2356,33 @@ func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error {
|
|||||||
return l.mailBox.AddPacket(pkt)
|
return l.mailBox.AddPacket(pkt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC packet. It
|
||||||
|
// will be processed synchronously.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the ChannelLink interface.
|
||||||
|
func (l *channelLink) HandleLocalAddPacket(pkt *htlcPacket) error {
|
||||||
|
l.log.Tracef("received switch packet outkey=%v", pkt.outKey())
|
||||||
|
|
||||||
|
// Create a buffered result channel to prevent the link from blocking.
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case l.localUpdateAdd <- &localUpdateAddMsg{
|
||||||
|
pkt: pkt,
|
||||||
|
err: errChan,
|
||||||
|
}:
|
||||||
|
case <-l.quit:
|
||||||
|
return ErrLinkShuttingDown
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errChan:
|
||||||
|
return err
|
||||||
|
case <-l.quit:
|
||||||
|
return ErrLinkShuttingDown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
|
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
|
||||||
// to us from remote peer we have a channel with.
|
// to us from remote peer we have a channel with.
|
||||||
//
|
//
|
||||||
|
@ -702,6 +702,11 @@ func (f *mockChannelLink) HandleSwitchPacket(pkt *htlcPacket) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *mockChannelLink) HandleLocalAddPacket(pkt *htlcPacket) error {
|
||||||
|
_ = f.mailBox.AddPacket(pkt)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) {
|
func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,7 +491,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64,
|
|||||||
return linkErr
|
return linkErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return link.HandleSwitchPacket(packet)
|
return link.HandleLocalAddPacket(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateForwardingPolicies sends a message to the switch to update the
|
// UpdateForwardingPolicies sends a message to the switch to update the
|
||||||
|
@ -4182,6 +4182,34 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Next, we'll create Fred who is going to initiate the payment and
|
||||||
|
// establish a channel to from him to Carol. We can't perform this test
|
||||||
|
// by paying from Carol directly to Dave, because the '--unsafe-replay'
|
||||||
|
// setup doesn't apply to locally added htlcs. In that case, the
|
||||||
|
// mailbox, that is responsible for generating the replay, is bypassed.
|
||||||
|
fred, err := net.NewNode("Fred", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create new nodes: %v", err)
|
||||||
|
}
|
||||||
|
defer shutdownAndAssert(net, t, fred)
|
||||||
|
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
|
||||||
|
if err := net.ConnectNodes(ctxt, fred, carol); err != nil {
|
||||||
|
t.Fatalf("unable to connect fred to carol: %v", err)
|
||||||
|
}
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
|
||||||
|
err = net.SendCoins(ctxt, btcutil.SatoshiPerBitcoin, fred)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to send coins to fred: %v", err)
|
||||||
|
}
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout)
|
||||||
|
chanPointFC := openChannelAndAssert(
|
||||||
|
ctxt, t, net, fred, carol,
|
||||||
|
lntest.OpenChannelParams{
|
||||||
|
Amt: chanAmt,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
// Now that the channel is open, create an invoice for Dave which
|
// Now that the channel is open, create an invoice for Dave which
|
||||||
// expects a payment of 1000 satoshis from Carol paid via a particular
|
// expects a payment of 1000 satoshis from Carol paid via a particular
|
||||||
// preimage.
|
// preimage.
|
||||||
@ -4198,8 +4226,7 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) {
|
|||||||
t.Fatalf("unable to add invoice: %v", err)
|
t.Fatalf("unable to add invoice: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for Carol to recognize and advertise the new channel generated
|
// Wait for all channels to be recognized and advertized.
|
||||||
// above.
|
|
||||||
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
|
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
|
||||||
err = carol.WaitForNetworkChannelOpen(ctxt, chanPoint)
|
err = carol.WaitForNetworkChannelOpen(ctxt, chanPoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -4211,13 +4238,23 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) {
|
|||||||
t.Fatalf("bob didn't advertise channel before "+
|
t.Fatalf("bob didn't advertise channel before "+
|
||||||
"timeout: %v", err)
|
"timeout: %v", err)
|
||||||
}
|
}
|
||||||
|
err = carol.WaitForNetworkChannelOpen(ctxt, chanPointFC)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("alice didn't advertise channel before "+
|
||||||
|
"timeout: %v", err)
|
||||||
|
}
|
||||||
|
err = fred.WaitForNetworkChannelOpen(ctxt, chanPointFC)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("bob didn't advertise channel before "+
|
||||||
|
"timeout: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// With the invoice for Dave added, send a payment from Carol paying
|
// With the invoice for Dave added, send a payment from Fred paying
|
||||||
// to the above generated invoice.
|
// to the above generated invoice.
|
||||||
ctx, cancel := context.WithCancel(ctxb)
|
ctx, cancel := context.WithCancel(ctxb)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
payStream, err := carol.SendPayment(ctx)
|
payStream, err := fred.SendPayment(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to open payment stream: %v", err)
|
t.Fatalf("unable to open payment stream: %v", err)
|
||||||
}
|
}
|
||||||
@ -4270,12 +4307,12 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) {
|
|||||||
t.Fatalf("unable to receive payment response: %v", err)
|
t.Fatalf("unable to receive payment response: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Construct the response we expect after sending a duplicate packet
|
// Assert that Fred receives the expected failure after Carol sent a
|
||||||
// that fails due to sphinx replay detection.
|
// duplicate packet that fails due to sphinx replay detection.
|
||||||
if resp.PaymentError == "" {
|
if resp.PaymentError == "" {
|
||||||
t.Fatalf("expected payment error")
|
t.Fatalf("expected payment error")
|
||||||
}
|
}
|
||||||
assertLastHTLCError(t, carol, lnrpc.Failure_INVALID_ONION_KEY)
|
assertLastHTLCError(t, fred, lnrpc.Failure_INVALID_ONION_KEY)
|
||||||
|
|
||||||
// Since the payment failed, the balance should still be left
|
// Since the payment failed, the balance should still be left
|
||||||
// unaltered.
|
// unaltered.
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
<time> [ERR] CRTR: Channel update of ourselves received
|
<time> [ERR] CRTR: Channel update of ourselves received
|
||||||
<time> [ERR] CRTR: Error collecting result for shard <number> for payment <hex>: shard handler exiting
|
<time> [ERR] CRTR: Error collecting result for shard <number> for payment <hex>: shard handler exiting
|
||||||
<time> [ERR] CRTR: Error encountered during rescan: rescan exited
|
<time> [ERR] CRTR: Error encountered during rescan: rescan exited
|
||||||
|
<time> [ERR] CRTR: Failed sending attempt <number> for payment <hex> to switch: could not add downstream htlc
|
||||||
<time> [ERR] CRTR: Failed sending attempt <number> for payment <hex> to switch: insufficient bandwidth to route htlc
|
<time> [ERR] CRTR: Failed sending attempt <number> for payment <hex> to switch: insufficient bandwidth to route htlc
|
||||||
<time> [ERR] CRTR: Failed sending attempt <number> for payment <hex> to switch: UnknownNextPeer
|
<time> [ERR] CRTR: Failed sending attempt <number> for payment <hex> to switch: UnknownNextPeer
|
||||||
<time> [ERR] CRTR: out of order block: expecting height=<height>, got height=<height>
|
<time> [ERR] CRTR: out of order block: expecting height=<height>, got height=<height>
|
||||||
@ -80,6 +81,7 @@
|
|||||||
<time> [ERR] HSWC: ChannelLink(<chan>): link failed, exiting htlcManager
|
<time> [ERR] HSWC: ChannelLink(<chan>): link failed, exiting htlcManager
|
||||||
<time> [ERR] HSWC: ChannelLink(<chan>): outgoing htlc(<hex>) has insufficient fee: expected 575000, got 1075
|
<time> [ERR] HSWC: ChannelLink(<chan>): outgoing htlc(<hex>) has insufficient fee: expected 575000, got 1075
|
||||||
<time> [ERR] HSWC: ChannelLink(<chan>): outgoing htlc(<hex>) is too small: min_htlc=<amt>, htlc_value=<amt>
|
<time> [ERR] HSWC: ChannelLink(<chan>): outgoing htlc(<hex>) is too small: min_htlc=<amt>, htlc_value=<amt>
|
||||||
|
<time> [ERR] HSWC: ChannelLink(<chan>): unable to cancel incoming HTLC for circuit-key=(Chan ID=<chan>, HTLC ID=0): HTLC with ID 0 has already been failed
|
||||||
<time> [ERR] HSWC: ChannelLink(<chan>): unable to decode onion hop iterator: TemporaryChannelFailure
|
<time> [ERR] HSWC: ChannelLink(<chan>): unable to decode onion hop iterator: TemporaryChannelFailure
|
||||||
<time> [ERR] HSWC: ChannelLink(<chan>): unhandled error while forwarding htlc packet over htlcswitch: AmountBelowMinimum(amt=4000 mSAT, update=(lnwire.ChannelUpdate) {
|
<time> [ERR] HSWC: ChannelLink(<chan>): unhandled error while forwarding htlc packet over htlcswitch: AmountBelowMinimum(amt=4000 mSAT, update=(lnwire.ChannelUpdate) {
|
||||||
<time> [ERR] HSWC: ChannelLink(<chan>): unhandled error while forwarding htlc packet over htlcswitch: circuit has already been closed
|
<time> [ERR] HSWC: ChannelLink(<chan>): unhandled error while forwarding htlc packet over htlcswitch: circuit has already been closed
|
||||||
|
Loading…
Reference in New Issue
Block a user