From 6283eb29bfa761531e8c3f4930c6cad2a8fb7a5d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 16 Jul 2016 18:20:13 -0700 Subject: [PATCH] lnd: add synchronization to RPC initiated HTLC payments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With this commit, calls to htlcSwitch.SendHTLC() are now synchronous, only returning after the payment has been fully settled. This will allow one to accurately measure the commitment update speed with the current state machine implementation which is missing a number of low-hanging optimizations. The htlcManager for each channel now keeps a map of cleared HTLC’s keyed by the index number of the add entry within the state machine’s HTLC log. This map of HTLC’s will later be used to properly implement time outs Additionally, a slight refactoring has been executed w.r.t handling upstream/downstream messages. This cleans up the main htlcManager loop, freeing it up for the addition of future logic to properly observe timeouts as well as, proper batching+trickling of HTLC updates, and a commitment signature ticker. --- htlcswitch.go | 31 +++-- peer.go | 336 +++++++++++++++++++++++++++++--------------------- 2 files changed, 217 insertions(+), 150 deletions(-) diff --git a/htlcswitch.go b/htlcswitch.go index 8a24c620..3253fc7b 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -28,7 +28,7 @@ type link struct { availableBandwidth btcutil.Amount - linkChan chan lnwire.Message + linkChan chan *htlcPacket peer *peer @@ -39,9 +39,12 @@ type link struct { // settles an active HTLC. The dest field denotes the name of the interface to // forward this htlcPacket on. type htlcPacket struct { + src wire.ShaHash dest wire.ShaHash msg lnwire.Message + + err chan error } // HtlcSwitch is a central messaging bus for all incoming/outgoing HTLC's. @@ -115,22 +118,29 @@ func (h *htlcSwitch) Stop() error { // an error is returned. Additionally, if the interface cannot be found, an // alternative error is returned. func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) error { - // TODO(roasbeef): hook in errors + htlcPkt.err = make(chan error, 1) + h.outgoingPayments <- htlcPkt - return nil + + return <-htlcPkt.err } // htlcForwarder is responsible for optimally forwarding (and possibly // fragmenting) incoming/outgoing HTLC's amongst all active interfaces and // their links. func (h *htlcSwitch) htlcForwarder() { + // TODO(roasbeef): track pending payments here instead of within each peer? + // Examine settles/timeouts from htl cplex. Add src to htlcPacket, key by + // (src, htlcKey). out: for { select { case htlcPkt := <-h.outgoingPayments: chanInterface, ok := h.interfaces[htlcPkt.dest] if !ok { - hswcLog.Errorf("unable to locate link %x", htlcPkt.dest[:]) + err := fmt.Errorf("unable to locate link %x", htlcPkt.dest[:]) + hswcLog.Errorf(err.Error()) + htlcPkt.err <- err continue } @@ -146,7 +156,7 @@ out: link.chanPoint, amt, htlcPkt.dest[:]) wireMsg.ChannelPoint = link.chanPoint - link.linkChan <- wireMsg + link.linkChan <- htlcPkt // TODO(roasbeef): update link info on // timeout/settle link.availableBandwidth -= amt @@ -156,6 +166,8 @@ out: if wireMsg.ChannelPoint == nil { hswcLog.Errorf("unable to send payment, " + "insufficient capacity") + htlcPkt.err <- fmt.Errorf("insufficient capacity") + continue } case <-h.htlcPlex: case <-h.quit: @@ -278,15 +290,18 @@ type registerLinkMsg struct { peer *peer linkInfo *channeldb.ChannelSnapshot - linkChan chan lnwire.Message + linkChan chan *htlcPacket done chan struct{} } // RegisterLink requests the htlcSwitch to register a new active link. The new -// link encapsulates an active channel. +// link encapsulates an active channel. The htlc plex channel is returned. The +// plex channel allows the switch to properly de-multiplex incoming/outgoing +// HTLC messages forwarding them to their proper destination in the multi-hop +// settings. func (h *htlcSwitch) RegisterLink(p *peer, linkInfo *channeldb.ChannelSnapshot, - linkChan chan lnwire.Message) chan *htlcPacket { + linkChan chan *htlcPacket) chan *htlcPacket { done := make(chan struct{}, 1) req := ®isterLinkMsg{p, linkInfo, linkChan, done} diff --git a/peer.go b/peer.go index 73cb6549..cf9dc128 100644 --- a/peer.go +++ b/peer.go @@ -231,7 +231,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { // Register this new channel link with the HTLC Switch. This is // necessary to properly route multi-hop payments, and forward // new payments triggered by RPC clients. - downstreamLink := make(chan lnwire.Message) + downstreamLink := make(chan *htlcPacket) plexChan := p.server.htlcSwitch.RegisterLink(p, dbChan.Snapshot(), downstreamLink) @@ -579,7 +579,7 @@ out: // Now that the channel is open, notify the Htlc // Switch of a new active link. chanSnapShot := newChan.StateSnapshot() - downstreamLink := make(chan lnwire.Message) + downstreamLink := make(chan *htlcPacket) plexChan := p.server.htlcSwitch.RegisterLink(p, chanSnapShot, downstreamLink) @@ -750,14 +750,37 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) { } } +// pendingPayment represents a pending HTLC which has yet to be settled by the +// upstream peer. A pending payment encapsulates the initial HTLC add request +// additionally coupling the index of the HTLC within the log, and an error +// channel to signal the payment requester once the payment has been fully +// fufilled. +type pendingPayment struct { + htlc *lnwire.HTLCAddRequest + index uint32 + + err chan error +} + // commitmentState is the volatile+persistent state of an active channel's // commitment update state-machine. This struct is used by htlcManager's to // save meta-state required for proper functioning. type commitmentState struct { + // TODO(roasbeef): use once trickle+batch logic is in pendingLogLen uint32 + // htlcsToSettle is a list of preimages which allow us to settle one or + // many of the pending HTLC's we've received from the upstream peer. + // TODO(roasbeef): should send sig to settle once preimage is known. htlcsToSettle [][32]byte - sigPending bool + + // sigPending is a bool which indicates if we're currently awaiting a + // signature response to a commitment update we've initiated. + sigPending bool + + // clearedHTCLs is a map of outgoing HTLC's we've committed to in our + // chain which have not yet been settled by the upstream peer. + clearedHTCLs map[uint32]*pendingPayment channel *lnwallet.LightningChannel chanPoint *wire.OutPoint @@ -773,7 +796,7 @@ type commitmentState struct { // manages the channel's revocation window, and also the htlc trickle // queue+timer for this active channels. func (p *peer) htlcManager(channel *lnwallet.LightningChannel, - htlcPlex chan<- *htlcPacket, downstreamLink <-chan lnwire.Message, + htlcPlex chan<- *htlcPacket, downstreamLink <-chan *htlcPacket, upstreamLink <-chan lnwire.Message) { chanStats := channel.StateSnapshot() @@ -794,29 +817,15 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, } state := &commitmentState{ - channel: channel, - chanPoint: channel.ChannelPoint(), + channel: channel, + chanPoint: channel.ChannelPoint(), + clearedHTCLs: make(map[uint32]*pendingPayment), } out: for { select { - case msg := <-downstreamLink: - switch htlc := msg.(type) { - case *lnwire.HTLCAddRequest: - // A new payment has been initiated via the - // downstream channel, so we add the new HTLC - // to our local log, then update the commitment - // chains. - channel.AddHTLC(htlc, false) - p.queueMsg(htlc, nil) - - // TODO(roasbeef): batch trickle timer + cap - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - } - state.sigPending = true - } + case pkt := <-downstreamLink: + p.handleDownStreamPkt(state, pkt) case msg, ok := <-upstreamLink: // If the upstream message link is closed, this signals // that the channel itself is being closed, therefore @@ -825,125 +834,7 @@ out: break out } - switch htlcPkt := msg.(type) { - // TODO(roasbeef): timeouts - case *lnwire.HTLCAddRequest: - // We just received an add request from an - // upstream peer, so we add it to our state - // machine, then add the HTLC to our "settle" - // list in the event that we know the pre-image - channel.AddHTLC(htlcPkt, true) - - rHash := htlcPkt.RedemptionHashes[0] - if invoice, found := p.server.invoices.lookupInvoice(rHash); found { - // TODO(roasbeef): check value - // * onion layer strip should also be before invoice lookup - pre := invoice.paymentPreimage - state.htlcsToSettle = append(state.htlcsToSettle, pre) - } - case *lnwire.HTLCSettleRequest: - // TODO(roasbeef): this assumes no "multi-sig" - pre := htlcPkt.RedemptionProofs[0] - if _, err := channel.SettleHTLC(pre, true); err != nil { - // TODO(roasbeef): broadcast on-chain - peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) - p.Disconnect() - break out - } - case *lnwire.CommitSignature: - // We just received a new update to our local - // commitment chain, validate this new - // commitment, closing the link if invalid. - // TODO(roasbeef): use uint64 for indexes? - logIndex := uint32(htlcPkt.LogIndex) - sig := htlcPkt.CommitSig.Serialize() - if err := channel.ReceiveNewCommitment(sig, logIndex); err != nil { - peerLog.Errorf("unable to accept new commitment: %v", err) - p.Disconnect() - break out - } - - // If we didn't initiate this state transition, - // then we'll update the remote commitment - // chain with a new commitment. Otherwise, we - // can reset the pending bit as we received the - // signature we were expecting. - if !state.sigPending { - // TODO(roasbeef): may not always want to *immediatly* - // sign next commitment. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - continue - } - } else { - state.sigPending = false - } - - // Finally, since we just accepted a new state, - // send the remote peer a revocation for our - // prior state. - nextRevocation, err := channel.RevokeCurrentCommitment() - if err != nil { - peerLog.Errorf("unable to revoke current commitment: %v", err) - continue - } - p.queueMsg(nextRevocation, nil) - case *lnwire.CommitRevocation: - // We've received a revocation from the remote - // chain, if valid, this moves the remote chain - // forward, and expands our revocation window. - htlcsToForward, err := channel.ReceiveRevocation(htlcPkt) - if err != nil { - peerLog.Errorf("unable to accept revocation: %v", err) - p.Disconnect() - break out - } - // TODO(roasbeef): send the locked-in HTLC's - // over the plex chan to the switch. - peerLog.Debugf("htlcs ready to forward: %v", - spew.Sdump(htlcsToForward)) - - // A full state transition has been completed, - // if we don't need to settle any HTLC's, then - // we're done. - if len(state.htlcsToSettle) == 0 { - continue - } - - // Otherwise, we have some pending HTLC's which - // we can pull funds from, thereby settling. - peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle)) - for _, pre := range state.htlcsToSettle { - // Add each HTLC settle update to the - // channel's state update log, also - // sending the log update to the remote - // party. - logIndex, err := channel.SettleHTLC(pre, false) - if err != nil { - peerLog.Errorf("unable to settle htlc: %v", err) - continue - } - settleMsg := &lnwire.HTLCSettleRequest{ - ChannelPoint: state.chanPoint, - HTLCKey: lnwire.HTLCKey(logIndex), - RedemptionProofs: [][32]byte{pre}, - } - p.queueMsg(settleMsg, nil) - } - - // With all the settle updates added to the - // local and remote HTLC logs, initiate a state - // transition by updating the remote commitment - // chain. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - continue - } - state.sigPending = true - state.htlcsToSettle = nil - } + p.handleUpstreamMsg(state, msg) case <-p.quit: break out } @@ -953,6 +844,167 @@ out: peerLog.Tracef("htlcManager for peer %v done", p) } +// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC +// Switch. Possible messages sent by the switch include requests to forward new +// HTLC's, timeout previously cleared HTLC's, and finally to settle currently +// cleared HTLC's with the upstream peer. +func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { + switch htlc := pkt.msg.(type) { + case *lnwire.HTLCAddRequest: + // A new payment has been initiated via the + // downstream channel, so we add the new HTLC + // to our local log, then update the commitment + // chains. + index := state.channel.AddHTLC(htlc, false) + p.queueMsg(htlc, nil) + + // TODO(roasbeef): batch trickle timer + cap + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + } + + state.sigPending = true + state.clearedHTCLs[index] = &pendingPayment{ + htlc: htlc, + index: index, + err: pkt.err, + } + } +} + +// handleUpstreamMsg processes wire messages related to commitment state +// updates from the upstream peer. The upstream peer is the peer whom we have a +// direct channel with, updating our respective commitment chains. +func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { + switch htlcPkt := msg.(type) { + // TODO(roasbeef): timeouts + case *lnwire.HTLCAddRequest: + // We just received an add request from an upstream peer, so we + // add it to our state machine, then add the HTLC to our + // "settle" list in the event that we know the pre-image + state.channel.AddHTLC(htlcPkt, true) + + rHash := htlcPkt.RedemptionHashes[0] + if invoice, found := p.server.invoices.lookupInvoice(rHash); found { + // TODO(roasbeef): check value + // * onion layer strip should also be before invoice lookup + // * also can immediately send the settle msg + pre := invoice.paymentPreimage + state.htlcsToSettle = append(state.htlcsToSettle, pre) + } + case *lnwire.HTLCSettleRequest: + // TODO(roasbeef): this assumes no "multi-sig" + pre := htlcPkt.RedemptionProofs[0] + if _, err := state.channel.SettleHTLC(pre, true); err != nil { + // TODO(roasbeef): broadcast on-chain + peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) + p.Disconnect() + return + } + case *lnwire.CommitSignature: + // We just received a new update to our local commitment chain, + // validate this new commitment, closing the link if invalid. + // TODO(roasbeef): use uint64 for indexes? + logIndex := uint32(htlcPkt.LogIndex) + sig := htlcPkt.CommitSig.Serialize() + if err := state.channel.ReceiveNewCommitment(sig, logIndex); err != nil { + peerLog.Errorf("unable to accept new commitment: %v", err) + p.Disconnect() + return + } + + // If we didn't initiate this state transition, then we'll + // update the remote commitment chain with a new commitment. + // Otherwise, we can reset the pending bit as we received the + // signature we were expecting. + // TODO(roasbeef): move sig updates to own trigger + // * can remove sigPending if so + if !state.sigPending { + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + return + } + } else { + state.sigPending = false + } + + // Finally, since we just accepted a new state, send the remote + // peer a revocation for our prior state. + nextRevocation, err := state.channel.RevokeCurrentCommitment() + if err != nil { + peerLog.Errorf("unable to revoke current commitment: %v", err) + return + } + p.queueMsg(nextRevocation, nil) + case *lnwire.CommitRevocation: + // We've received a revocation from the remote chain, if valid, + // this moves the remote chain forward, and expands our + // revocation window. + htlcsToForward, err := state.channel.ReceiveRevocation(htlcPkt) + if err != nil { + peerLog.Errorf("unable to accept revocation: %v", err) + p.Disconnect() + return + } + // TODO(roasbeef): send the locked-in HTLC's over the plex chan + // to the switch. + peerLog.Debugf("htlcs ready to forward: %v", + spew.Sdump(htlcsToForward)) + + // If any of the htlc's eligible for forwarding are pending + // settling or timeing out previous outgoing payments, then we + // can them from the pending set, and signal the requster (if + // existing) that the payment has been fully fulfilled. + for _, htlc := range htlcsToForward { + if p, ok := state.clearedHTCLs[htlc.ParentIndex]; ok { + peerLog.Debugf("local htlc %v cleared", + spew.Sdump(p.htlc)) + p.err <- nil + delete(state.clearedHTCLs, htlc.ParentIndex) + } + } + + // A full state transition has been completed, if we don't need + // to settle any HTLC's, then we're done. + if len(state.htlcsToSettle) == 0 { + return + } + + // Otherwise, we have some pending HTLC's which we can pull + // funds from, thereby settling. + peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle)) + for _, pre := range state.htlcsToSettle { + // Add each HTLC settle update to the channel's state + // update log, also sending the log update to the + // remote party. + logIndex, err := state.channel.SettleHTLC(pre, false) + if err != nil { + peerLog.Errorf("unable to settle htlc: %v", err) + continue + } + settleMsg := &lnwire.HTLCSettleRequest{ + ChannelPoint: state.chanPoint, + HTLCKey: lnwire.HTLCKey(logIndex), + RedemptionProofs: [][32]byte{pre}, + } + p.queueMsg(settleMsg, nil) + } + + // With all the settle updates added to the local and remote + // HTLC logs, initiate a state transition by updating the + // remote commitment chain. + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + return + } + state.sigPending = true + state.htlcsToSettle = nil + } +} + // updateCommitTx signs, then sends an update to the remote peer adding a new // commitment to their commitment chain which includes all the latest updates // we've received+processed up to this point.