lnd: add synchronization to RPC initiated HTLC payments

With this commit, calls to htlcSwitch.SendHTLC() are now synchronous,
only returning after the payment has been fully settled. This will
allow one to accurately measure the commitment update speed with the
current state machine implementation which is missing a number of
low-hanging optimizations.

The htlcManager for each channel now keeps a map of cleared HTLC’s
keyed by the index number of the add entry within the state machine’s
HTLC log. This map of HTLC’s will later be used to properly implement
time outs

Additionally, a slight refactoring has been executed w.r.t handling
upstream/downstream messages. This cleans up the main htlcManager loop,
freeing it up for the addition of future logic to properly observe
timeouts as well as, proper batching+trickling of HTLC updates, and a
commitment signature ticker.
This commit is contained in:
Olaoluwa Osuntokun 2016-07-16 18:20:13 -07:00
parent b60270f3f7
commit 6283eb29bf
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 217 additions and 150 deletions

@ -28,7 +28,7 @@ type link struct {
availableBandwidth btcutil.Amount availableBandwidth btcutil.Amount
linkChan chan lnwire.Message linkChan chan *htlcPacket
peer *peer peer *peer
@ -39,9 +39,12 @@ type link struct {
// settles an active HTLC. The dest field denotes the name of the interface to // settles an active HTLC. The dest field denotes the name of the interface to
// forward this htlcPacket on. // forward this htlcPacket on.
type htlcPacket struct { type htlcPacket struct {
src wire.ShaHash
dest wire.ShaHash dest wire.ShaHash
msg lnwire.Message msg lnwire.Message
err chan error
} }
// HtlcSwitch is a central messaging bus for all incoming/outgoing HTLC's. // HtlcSwitch is a central messaging bus for all incoming/outgoing HTLC's.
@ -115,22 +118,29 @@ func (h *htlcSwitch) Stop() error {
// an error is returned. Additionally, if the interface cannot be found, an // an error is returned. Additionally, if the interface cannot be found, an
// alternative error is returned. // alternative error is returned.
func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) error { func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) error {
// TODO(roasbeef): hook in errors htlcPkt.err = make(chan error, 1)
h.outgoingPayments <- htlcPkt h.outgoingPayments <- htlcPkt
return nil
return <-htlcPkt.err
} }
// htlcForwarder is responsible for optimally forwarding (and possibly // htlcForwarder is responsible for optimally forwarding (and possibly
// fragmenting) incoming/outgoing HTLC's amongst all active interfaces and // fragmenting) incoming/outgoing HTLC's amongst all active interfaces and
// their links. // their links.
func (h *htlcSwitch) htlcForwarder() { func (h *htlcSwitch) htlcForwarder() {
// TODO(roasbeef): track pending payments here instead of within each peer?
// Examine settles/timeouts from htl cplex. Add src to htlcPacket, key by
// (src, htlcKey).
out: out:
for { for {
select { select {
case htlcPkt := <-h.outgoingPayments: case htlcPkt := <-h.outgoingPayments:
chanInterface, ok := h.interfaces[htlcPkt.dest] chanInterface, ok := h.interfaces[htlcPkt.dest]
if !ok { if !ok {
hswcLog.Errorf("unable to locate link %x", htlcPkt.dest[:]) err := fmt.Errorf("unable to locate link %x", htlcPkt.dest[:])
hswcLog.Errorf(err.Error())
htlcPkt.err <- err
continue continue
} }
@ -146,7 +156,7 @@ out:
link.chanPoint, amt, htlcPkt.dest[:]) link.chanPoint, amt, htlcPkt.dest[:])
wireMsg.ChannelPoint = link.chanPoint wireMsg.ChannelPoint = link.chanPoint
link.linkChan <- wireMsg link.linkChan <- htlcPkt
// TODO(roasbeef): update link info on // TODO(roasbeef): update link info on
// timeout/settle // timeout/settle
link.availableBandwidth -= amt link.availableBandwidth -= amt
@ -156,6 +166,8 @@ out:
if wireMsg.ChannelPoint == nil { if wireMsg.ChannelPoint == nil {
hswcLog.Errorf("unable to send payment, " + hswcLog.Errorf("unable to send payment, " +
"insufficient capacity") "insufficient capacity")
htlcPkt.err <- fmt.Errorf("insufficient capacity")
continue
} }
case <-h.htlcPlex: case <-h.htlcPlex:
case <-h.quit: case <-h.quit:
@ -278,15 +290,18 @@ type registerLinkMsg struct {
peer *peer peer *peer
linkInfo *channeldb.ChannelSnapshot linkInfo *channeldb.ChannelSnapshot
linkChan chan lnwire.Message linkChan chan *htlcPacket
done chan struct{} done chan struct{}
} }
// RegisterLink requests the htlcSwitch to register a new active link. The new // RegisterLink requests the htlcSwitch to register a new active link. The new
// link encapsulates an active channel. // link encapsulates an active channel. The htlc plex channel is returned. The
// plex channel allows the switch to properly de-multiplex incoming/outgoing
// HTLC messages forwarding them to their proper destination in the multi-hop
// settings.
func (h *htlcSwitch) RegisterLink(p *peer, linkInfo *channeldb.ChannelSnapshot, func (h *htlcSwitch) RegisterLink(p *peer, linkInfo *channeldb.ChannelSnapshot,
linkChan chan lnwire.Message) chan *htlcPacket { linkChan chan *htlcPacket) chan *htlcPacket {
done := make(chan struct{}, 1) done := make(chan struct{}, 1)
req := &registerLinkMsg{p, linkInfo, linkChan, done} req := &registerLinkMsg{p, linkInfo, linkChan, done}

202
peer.go

@ -231,7 +231,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
// Register this new channel link with the HTLC Switch. This is // Register this new channel link with the HTLC Switch. This is
// necessary to properly route multi-hop payments, and forward // necessary to properly route multi-hop payments, and forward
// new payments triggered by RPC clients. // new payments triggered by RPC clients.
downstreamLink := make(chan lnwire.Message) downstreamLink := make(chan *htlcPacket)
plexChan := p.server.htlcSwitch.RegisterLink(p, plexChan := p.server.htlcSwitch.RegisterLink(p,
dbChan.Snapshot(), downstreamLink) dbChan.Snapshot(), downstreamLink)
@ -579,7 +579,7 @@ out:
// Now that the channel is open, notify the Htlc // Now that the channel is open, notify the Htlc
// Switch of a new active link. // Switch of a new active link.
chanSnapShot := newChan.StateSnapshot() chanSnapShot := newChan.StateSnapshot()
downstreamLink := make(chan lnwire.Message) downstreamLink := make(chan *htlcPacket)
plexChan := p.server.htlcSwitch.RegisterLink(p, plexChan := p.server.htlcSwitch.RegisterLink(p,
chanSnapShot, downstreamLink) chanSnapShot, downstreamLink)
@ -750,15 +750,38 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) {
} }
} }
// pendingPayment represents a pending HTLC which has yet to be settled by the
// upstream peer. A pending payment encapsulates the initial HTLC add request
// additionally coupling the index of the HTLC within the log, and an error
// channel to signal the payment requester once the payment has been fully
// fufilled.
type pendingPayment struct {
htlc *lnwire.HTLCAddRequest
index uint32
err chan error
}
// commitmentState is the volatile+persistent state of an active channel's // commitmentState is the volatile+persistent state of an active channel's
// commitment update state-machine. This struct is used by htlcManager's to // commitment update state-machine. This struct is used by htlcManager's to
// save meta-state required for proper functioning. // save meta-state required for proper functioning.
type commitmentState struct { type commitmentState struct {
// TODO(roasbeef): use once trickle+batch logic is in
pendingLogLen uint32 pendingLogLen uint32
// htlcsToSettle is a list of preimages which allow us to settle one or
// many of the pending HTLC's we've received from the upstream peer.
// TODO(roasbeef): should send sig to settle once preimage is known.
htlcsToSettle [][32]byte htlcsToSettle [][32]byte
// sigPending is a bool which indicates if we're currently awaiting a
// signature response to a commitment update we've initiated.
sigPending bool sigPending bool
// clearedHTCLs is a map of outgoing HTLC's we've committed to in our
// chain which have not yet been settled by the upstream peer.
clearedHTCLs map[uint32]*pendingPayment
channel *lnwallet.LightningChannel channel *lnwallet.LightningChannel
chanPoint *wire.OutPoint chanPoint *wire.OutPoint
} }
@ -773,7 +796,7 @@ type commitmentState struct {
// manages the channel's revocation window, and also the htlc trickle // manages the channel's revocation window, and also the htlc trickle
// queue+timer for this active channels. // queue+timer for this active channels.
func (p *peer) htlcManager(channel *lnwallet.LightningChannel, func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
htlcPlex chan<- *htlcPacket, downstreamLink <-chan lnwire.Message, htlcPlex chan<- *htlcPacket, downstreamLink <-chan *htlcPacket,
upstreamLink <-chan lnwire.Message) { upstreamLink <-chan lnwire.Message) {
chanStats := channel.StateSnapshot() chanStats := channel.StateSnapshot()
@ -796,27 +819,13 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
state := &commitmentState{ state := &commitmentState{
channel: channel, channel: channel,
chanPoint: channel.ChannelPoint(), chanPoint: channel.ChannelPoint(),
clearedHTCLs: make(map[uint32]*pendingPayment),
} }
out: out:
for { for {
select { select {
case msg := <-downstreamLink: case pkt := <-downstreamLink:
switch htlc := msg.(type) { p.handleDownStreamPkt(state, pkt)
case *lnwire.HTLCAddRequest:
// A new payment has been initiated via the
// downstream channel, so we add the new HTLC
// to our local log, then update the commitment
// chains.
channel.AddHTLC(htlc, false)
p.queueMsg(htlc, nil)
// TODO(roasbeef): batch trickle timer + cap
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
}
state.sigPending = true
}
case msg, ok := <-upstreamLink: case msg, ok := <-upstreamLink:
// If the upstream message link is closed, this signals // If the upstream message link is closed, this signals
// that the channel itself is being closed, therefore // that the channel itself is being closed, therefore
@ -825,101 +834,152 @@ out:
break out break out
} }
p.handleUpstreamMsg(state, msg)
case <-p.quit:
break out
}
}
p.wg.Done()
peerLog.Tracef("htlcManager for peer %v done", p)
}
// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC
// Switch. Possible messages sent by the switch include requests to forward new
// HTLC's, timeout previously cleared HTLC's, and finally to settle currently
// cleared HTLC's with the upstream peer.
func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
switch htlc := pkt.msg.(type) {
case *lnwire.HTLCAddRequest:
// A new payment has been initiated via the
// downstream channel, so we add the new HTLC
// to our local log, then update the commitment
// chains.
index := state.channel.AddHTLC(htlc, false)
p.queueMsg(htlc, nil)
// TODO(roasbeef): batch trickle timer + cap
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
}
state.sigPending = true
state.clearedHTCLs[index] = &pendingPayment{
htlc: htlc,
index: index,
err: pkt.err,
}
}
}
// handleUpstreamMsg processes wire messages related to commitment state
// updates from the upstream peer. The upstream peer is the peer whom we have a
// direct channel with, updating our respective commitment chains.
func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
switch htlcPkt := msg.(type) { switch htlcPkt := msg.(type) {
// TODO(roasbeef): timeouts // TODO(roasbeef): timeouts
case *lnwire.HTLCAddRequest: case *lnwire.HTLCAddRequest:
// We just received an add request from an // We just received an add request from an upstream peer, so we
// upstream peer, so we add it to our state // add it to our state machine, then add the HTLC to our
// machine, then add the HTLC to our "settle" // "settle" list in the event that we know the pre-image
// list in the event that we know the pre-image state.channel.AddHTLC(htlcPkt, true)
channel.AddHTLC(htlcPkt, true)
rHash := htlcPkt.RedemptionHashes[0] rHash := htlcPkt.RedemptionHashes[0]
if invoice, found := p.server.invoices.lookupInvoice(rHash); found { if invoice, found := p.server.invoices.lookupInvoice(rHash); found {
// TODO(roasbeef): check value // TODO(roasbeef): check value
// * onion layer strip should also be before invoice lookup // * onion layer strip should also be before invoice lookup
// * also can immediately send the settle msg
pre := invoice.paymentPreimage pre := invoice.paymentPreimage
state.htlcsToSettle = append(state.htlcsToSettle, pre) state.htlcsToSettle = append(state.htlcsToSettle, pre)
} }
case *lnwire.HTLCSettleRequest: case *lnwire.HTLCSettleRequest:
// TODO(roasbeef): this assumes no "multi-sig" // TODO(roasbeef): this assumes no "multi-sig"
pre := htlcPkt.RedemptionProofs[0] pre := htlcPkt.RedemptionProofs[0]
if _, err := channel.SettleHTLC(pre, true); err != nil { if _, err := state.channel.SettleHTLC(pre, true); err != nil {
// TODO(roasbeef): broadcast on-chain // TODO(roasbeef): broadcast on-chain
peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) peerLog.Errorf("settle for outgoing HTLC rejected: %v", err)
p.Disconnect() p.Disconnect()
break out return
} }
case *lnwire.CommitSignature: case *lnwire.CommitSignature:
// We just received a new update to our local // We just received a new update to our local commitment chain,
// commitment chain, validate this new // validate this new commitment, closing the link if invalid.
// commitment, closing the link if invalid.
// TODO(roasbeef): use uint64 for indexes? // TODO(roasbeef): use uint64 for indexes?
logIndex := uint32(htlcPkt.LogIndex) logIndex := uint32(htlcPkt.LogIndex)
sig := htlcPkt.CommitSig.Serialize() sig := htlcPkt.CommitSig.Serialize()
if err := channel.ReceiveNewCommitment(sig, logIndex); err != nil { if err := state.channel.ReceiveNewCommitment(sig, logIndex); err != nil {
peerLog.Errorf("unable to accept new commitment: %v", err) peerLog.Errorf("unable to accept new commitment: %v", err)
p.Disconnect() p.Disconnect()
break out return
} }
// If we didn't initiate this state transition, // If we didn't initiate this state transition, then we'll
// then we'll update the remote commitment // update the remote commitment chain with a new commitment.
// chain with a new commitment. Otherwise, we // Otherwise, we can reset the pending bit as we received the
// can reset the pending bit as we received the
// signature we were expecting. // signature we were expecting.
// TODO(roasbeef): move sig updates to own trigger
// * can remove sigPending if so
if !state.sigPending { if !state.sigPending {
// TODO(roasbeef): may not always want to *immediatly*
// sign next commitment.
if err := p.updateCommitTx(state); err != nil { if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+ peerLog.Errorf("unable to update "+
"commitment: %v", err) "commitment: %v", err)
continue return
} }
} else { } else {
state.sigPending = false state.sigPending = false
} }
// Finally, since we just accepted a new state, // Finally, since we just accepted a new state, send the remote
// send the remote peer a revocation for our // peer a revocation for our prior state.
// prior state. nextRevocation, err := state.channel.RevokeCurrentCommitment()
nextRevocation, err := channel.RevokeCurrentCommitment()
if err != nil { if err != nil {
peerLog.Errorf("unable to revoke current commitment: %v", err) peerLog.Errorf("unable to revoke current commitment: %v", err)
continue return
} }
p.queueMsg(nextRevocation, nil) p.queueMsg(nextRevocation, nil)
case *lnwire.CommitRevocation: case *lnwire.CommitRevocation:
// We've received a revocation from the remote // We've received a revocation from the remote chain, if valid,
// chain, if valid, this moves the remote chain // this moves the remote chain forward, and expands our
// forward, and expands our revocation window. // revocation window.
htlcsToForward, err := channel.ReceiveRevocation(htlcPkt) htlcsToForward, err := state.channel.ReceiveRevocation(htlcPkt)
if err != nil { if err != nil {
peerLog.Errorf("unable to accept revocation: %v", err) peerLog.Errorf("unable to accept revocation: %v", err)
p.Disconnect() p.Disconnect()
break out return
} }
// TODO(roasbeef): send the locked-in HTLC's // TODO(roasbeef): send the locked-in HTLC's over the plex chan
// over the plex chan to the switch. // to the switch.
peerLog.Debugf("htlcs ready to forward: %v", peerLog.Debugf("htlcs ready to forward: %v",
spew.Sdump(htlcsToForward)) spew.Sdump(htlcsToForward))
// A full state transition has been completed, // If any of the htlc's eligible for forwarding are pending
// if we don't need to settle any HTLC's, then // settling or timeing out previous outgoing payments, then we
// we're done. // can them from the pending set, and signal the requster (if
if len(state.htlcsToSettle) == 0 { // existing) that the payment has been fully fulfilled.
continue for _, htlc := range htlcsToForward {
if p, ok := state.clearedHTCLs[htlc.ParentIndex]; ok {
peerLog.Debugf("local htlc %v cleared",
spew.Sdump(p.htlc))
p.err <- nil
delete(state.clearedHTCLs, htlc.ParentIndex)
}
} }
// Otherwise, we have some pending HTLC's which // A full state transition has been completed, if we don't need
// we can pull funds from, thereby settling. // to settle any HTLC's, then we're done.
if len(state.htlcsToSettle) == 0 {
return
}
// Otherwise, we have some pending HTLC's which we can pull
// funds from, thereby settling.
peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle)) peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle))
for _, pre := range state.htlcsToSettle { for _, pre := range state.htlcsToSettle {
// Add each HTLC settle update to the // Add each HTLC settle update to the channel's state
// channel's state update log, also // update log, also sending the log update to the
// sending the log update to the remote // remote party.
// party. logIndex, err := state.channel.SettleHTLC(pre, false)
logIndex, err := channel.SettleHTLC(pre, false)
if err != nil { if err != nil {
peerLog.Errorf("unable to settle htlc: %v", err) peerLog.Errorf("unable to settle htlc: %v", err)
continue continue
@ -932,25 +992,17 @@ out:
p.queueMsg(settleMsg, nil) p.queueMsg(settleMsg, nil)
} }
// With all the settle updates added to the // With all the settle updates added to the local and remote
// local and remote HTLC logs, initiate a state // HTLC logs, initiate a state transition by updating the
// transition by updating the remote commitment // remote commitment chain.
// chain.
if err := p.updateCommitTx(state); err != nil { if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+ peerLog.Errorf("unable to update "+
"commitment: %v", err) "commitment: %v", err)
continue return
} }
state.sigPending = true state.sigPending = true
state.htlcsToSettle = nil state.htlcsToSettle = nil
} }
case <-p.quit:
break out
}
}
p.wg.Done()
peerLog.Tracef("htlcManager for peer %v done", p)
} }
// updateCommitTx signs, then sends an update to the remote peer adding a new // updateCommitTx signs, then sends an update to the remote peer adding a new