diff --git a/htlcswitch.go b/htlcswitch.go index 8a24c620..3253fc7b 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -28,7 +28,7 @@ type link struct { availableBandwidth btcutil.Amount - linkChan chan lnwire.Message + linkChan chan *htlcPacket peer *peer @@ -39,9 +39,12 @@ type link struct { // settles an active HTLC. The dest field denotes the name of the interface to // forward this htlcPacket on. type htlcPacket struct { + src wire.ShaHash dest wire.ShaHash msg lnwire.Message + + err chan error } // HtlcSwitch is a central messaging bus for all incoming/outgoing HTLC's. @@ -115,22 +118,29 @@ func (h *htlcSwitch) Stop() error { // an error is returned. Additionally, if the interface cannot be found, an // alternative error is returned. func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) error { - // TODO(roasbeef): hook in errors + htlcPkt.err = make(chan error, 1) + h.outgoingPayments <- htlcPkt - return nil + + return <-htlcPkt.err } // htlcForwarder is responsible for optimally forwarding (and possibly // fragmenting) incoming/outgoing HTLC's amongst all active interfaces and // their links. func (h *htlcSwitch) htlcForwarder() { + // TODO(roasbeef): track pending payments here instead of within each peer? + // Examine settles/timeouts from htl cplex. Add src to htlcPacket, key by + // (src, htlcKey). out: for { select { case htlcPkt := <-h.outgoingPayments: chanInterface, ok := h.interfaces[htlcPkt.dest] if !ok { - hswcLog.Errorf("unable to locate link %x", htlcPkt.dest[:]) + err := fmt.Errorf("unable to locate link %x", htlcPkt.dest[:]) + hswcLog.Errorf(err.Error()) + htlcPkt.err <- err continue } @@ -146,7 +156,7 @@ out: link.chanPoint, amt, htlcPkt.dest[:]) wireMsg.ChannelPoint = link.chanPoint - link.linkChan <- wireMsg + link.linkChan <- htlcPkt // TODO(roasbeef): update link info on // timeout/settle link.availableBandwidth -= amt @@ -156,6 +166,8 @@ out: if wireMsg.ChannelPoint == nil { hswcLog.Errorf("unable to send payment, " + "insufficient capacity") + htlcPkt.err <- fmt.Errorf("insufficient capacity") + continue } case <-h.htlcPlex: case <-h.quit: @@ -278,15 +290,18 @@ type registerLinkMsg struct { peer *peer linkInfo *channeldb.ChannelSnapshot - linkChan chan lnwire.Message + linkChan chan *htlcPacket done chan struct{} } // RegisterLink requests the htlcSwitch to register a new active link. The new -// link encapsulates an active channel. +// link encapsulates an active channel. The htlc plex channel is returned. The +// plex channel allows the switch to properly de-multiplex incoming/outgoing +// HTLC messages forwarding them to their proper destination in the multi-hop +// settings. func (h *htlcSwitch) RegisterLink(p *peer, linkInfo *channeldb.ChannelSnapshot, - linkChan chan lnwire.Message) chan *htlcPacket { + linkChan chan *htlcPacket) chan *htlcPacket { done := make(chan struct{}, 1) req := ®isterLinkMsg{p, linkInfo, linkChan, done} diff --git a/peer.go b/peer.go index 73cb6549..cf9dc128 100644 --- a/peer.go +++ b/peer.go @@ -231,7 +231,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { // Register this new channel link with the HTLC Switch. This is // necessary to properly route multi-hop payments, and forward // new payments triggered by RPC clients. - downstreamLink := make(chan lnwire.Message) + downstreamLink := make(chan *htlcPacket) plexChan := p.server.htlcSwitch.RegisterLink(p, dbChan.Snapshot(), downstreamLink) @@ -579,7 +579,7 @@ out: // Now that the channel is open, notify the Htlc // Switch of a new active link. chanSnapShot := newChan.StateSnapshot() - downstreamLink := make(chan lnwire.Message) + downstreamLink := make(chan *htlcPacket) plexChan := p.server.htlcSwitch.RegisterLink(p, chanSnapShot, downstreamLink) @@ -750,14 +750,37 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) { } } +// pendingPayment represents a pending HTLC which has yet to be settled by the +// upstream peer. A pending payment encapsulates the initial HTLC add request +// additionally coupling the index of the HTLC within the log, and an error +// channel to signal the payment requester once the payment has been fully +// fufilled. +type pendingPayment struct { + htlc *lnwire.HTLCAddRequest + index uint32 + + err chan error +} + // commitmentState is the volatile+persistent state of an active channel's // commitment update state-machine. This struct is used by htlcManager's to // save meta-state required for proper functioning. type commitmentState struct { + // TODO(roasbeef): use once trickle+batch logic is in pendingLogLen uint32 + // htlcsToSettle is a list of preimages which allow us to settle one or + // many of the pending HTLC's we've received from the upstream peer. + // TODO(roasbeef): should send sig to settle once preimage is known. htlcsToSettle [][32]byte - sigPending bool + + // sigPending is a bool which indicates if we're currently awaiting a + // signature response to a commitment update we've initiated. + sigPending bool + + // clearedHTCLs is a map of outgoing HTLC's we've committed to in our + // chain which have not yet been settled by the upstream peer. + clearedHTCLs map[uint32]*pendingPayment channel *lnwallet.LightningChannel chanPoint *wire.OutPoint @@ -773,7 +796,7 @@ type commitmentState struct { // manages the channel's revocation window, and also the htlc trickle // queue+timer for this active channels. func (p *peer) htlcManager(channel *lnwallet.LightningChannel, - htlcPlex chan<- *htlcPacket, downstreamLink <-chan lnwire.Message, + htlcPlex chan<- *htlcPacket, downstreamLink <-chan *htlcPacket, upstreamLink <-chan lnwire.Message) { chanStats := channel.StateSnapshot() @@ -794,29 +817,15 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, } state := &commitmentState{ - channel: channel, - chanPoint: channel.ChannelPoint(), + channel: channel, + chanPoint: channel.ChannelPoint(), + clearedHTCLs: make(map[uint32]*pendingPayment), } out: for { select { - case msg := <-downstreamLink: - switch htlc := msg.(type) { - case *lnwire.HTLCAddRequest: - // A new payment has been initiated via the - // downstream channel, so we add the new HTLC - // to our local log, then update the commitment - // chains. - channel.AddHTLC(htlc, false) - p.queueMsg(htlc, nil) - - // TODO(roasbeef): batch trickle timer + cap - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - } - state.sigPending = true - } + case pkt := <-downstreamLink: + p.handleDownStreamPkt(state, pkt) case msg, ok := <-upstreamLink: // If the upstream message link is closed, this signals // that the channel itself is being closed, therefore @@ -825,125 +834,7 @@ out: break out } - switch htlcPkt := msg.(type) { - // TODO(roasbeef): timeouts - case *lnwire.HTLCAddRequest: - // We just received an add request from an - // upstream peer, so we add it to our state - // machine, then add the HTLC to our "settle" - // list in the event that we know the pre-image - channel.AddHTLC(htlcPkt, true) - - rHash := htlcPkt.RedemptionHashes[0] - if invoice, found := p.server.invoices.lookupInvoice(rHash); found { - // TODO(roasbeef): check value - // * onion layer strip should also be before invoice lookup - pre := invoice.paymentPreimage - state.htlcsToSettle = append(state.htlcsToSettle, pre) - } - case *lnwire.HTLCSettleRequest: - // TODO(roasbeef): this assumes no "multi-sig" - pre := htlcPkt.RedemptionProofs[0] - if _, err := channel.SettleHTLC(pre, true); err != nil { - // TODO(roasbeef): broadcast on-chain - peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) - p.Disconnect() - break out - } - case *lnwire.CommitSignature: - // We just received a new update to our local - // commitment chain, validate this new - // commitment, closing the link if invalid. - // TODO(roasbeef): use uint64 for indexes? - logIndex := uint32(htlcPkt.LogIndex) - sig := htlcPkt.CommitSig.Serialize() - if err := channel.ReceiveNewCommitment(sig, logIndex); err != nil { - peerLog.Errorf("unable to accept new commitment: %v", err) - p.Disconnect() - break out - } - - // If we didn't initiate this state transition, - // then we'll update the remote commitment - // chain with a new commitment. Otherwise, we - // can reset the pending bit as we received the - // signature we were expecting. - if !state.sigPending { - // TODO(roasbeef): may not always want to *immediatly* - // sign next commitment. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - continue - } - } else { - state.sigPending = false - } - - // Finally, since we just accepted a new state, - // send the remote peer a revocation for our - // prior state. - nextRevocation, err := channel.RevokeCurrentCommitment() - if err != nil { - peerLog.Errorf("unable to revoke current commitment: %v", err) - continue - } - p.queueMsg(nextRevocation, nil) - case *lnwire.CommitRevocation: - // We've received a revocation from the remote - // chain, if valid, this moves the remote chain - // forward, and expands our revocation window. - htlcsToForward, err := channel.ReceiveRevocation(htlcPkt) - if err != nil { - peerLog.Errorf("unable to accept revocation: %v", err) - p.Disconnect() - break out - } - // TODO(roasbeef): send the locked-in HTLC's - // over the plex chan to the switch. - peerLog.Debugf("htlcs ready to forward: %v", - spew.Sdump(htlcsToForward)) - - // A full state transition has been completed, - // if we don't need to settle any HTLC's, then - // we're done. - if len(state.htlcsToSettle) == 0 { - continue - } - - // Otherwise, we have some pending HTLC's which - // we can pull funds from, thereby settling. - peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle)) - for _, pre := range state.htlcsToSettle { - // Add each HTLC settle update to the - // channel's state update log, also - // sending the log update to the remote - // party. - logIndex, err := channel.SettleHTLC(pre, false) - if err != nil { - peerLog.Errorf("unable to settle htlc: %v", err) - continue - } - settleMsg := &lnwire.HTLCSettleRequest{ - ChannelPoint: state.chanPoint, - HTLCKey: lnwire.HTLCKey(logIndex), - RedemptionProofs: [][32]byte{pre}, - } - p.queueMsg(settleMsg, nil) - } - - // With all the settle updates added to the - // local and remote HTLC logs, initiate a state - // transition by updating the remote commitment - // chain. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - continue - } - state.sigPending = true - state.htlcsToSettle = nil - } + p.handleUpstreamMsg(state, msg) case <-p.quit: break out } @@ -953,6 +844,167 @@ out: peerLog.Tracef("htlcManager for peer %v done", p) } +// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC +// Switch. Possible messages sent by the switch include requests to forward new +// HTLC's, timeout previously cleared HTLC's, and finally to settle currently +// cleared HTLC's with the upstream peer. +func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { + switch htlc := pkt.msg.(type) { + case *lnwire.HTLCAddRequest: + // A new payment has been initiated via the + // downstream channel, so we add the new HTLC + // to our local log, then update the commitment + // chains. + index := state.channel.AddHTLC(htlc, false) + p.queueMsg(htlc, nil) + + // TODO(roasbeef): batch trickle timer + cap + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + } + + state.sigPending = true + state.clearedHTCLs[index] = &pendingPayment{ + htlc: htlc, + index: index, + err: pkt.err, + } + } +} + +// handleUpstreamMsg processes wire messages related to commitment state +// updates from the upstream peer. The upstream peer is the peer whom we have a +// direct channel with, updating our respective commitment chains. +func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { + switch htlcPkt := msg.(type) { + // TODO(roasbeef): timeouts + case *lnwire.HTLCAddRequest: + // We just received an add request from an upstream peer, so we + // add it to our state machine, then add the HTLC to our + // "settle" list in the event that we know the pre-image + state.channel.AddHTLC(htlcPkt, true) + + rHash := htlcPkt.RedemptionHashes[0] + if invoice, found := p.server.invoices.lookupInvoice(rHash); found { + // TODO(roasbeef): check value + // * onion layer strip should also be before invoice lookup + // * also can immediately send the settle msg + pre := invoice.paymentPreimage + state.htlcsToSettle = append(state.htlcsToSettle, pre) + } + case *lnwire.HTLCSettleRequest: + // TODO(roasbeef): this assumes no "multi-sig" + pre := htlcPkt.RedemptionProofs[0] + if _, err := state.channel.SettleHTLC(pre, true); err != nil { + // TODO(roasbeef): broadcast on-chain + peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) + p.Disconnect() + return + } + case *lnwire.CommitSignature: + // We just received a new update to our local commitment chain, + // validate this new commitment, closing the link if invalid. + // TODO(roasbeef): use uint64 for indexes? + logIndex := uint32(htlcPkt.LogIndex) + sig := htlcPkt.CommitSig.Serialize() + if err := state.channel.ReceiveNewCommitment(sig, logIndex); err != nil { + peerLog.Errorf("unable to accept new commitment: %v", err) + p.Disconnect() + return + } + + // If we didn't initiate this state transition, then we'll + // update the remote commitment chain with a new commitment. + // Otherwise, we can reset the pending bit as we received the + // signature we were expecting. + // TODO(roasbeef): move sig updates to own trigger + // * can remove sigPending if so + if !state.sigPending { + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + return + } + } else { + state.sigPending = false + } + + // Finally, since we just accepted a new state, send the remote + // peer a revocation for our prior state. + nextRevocation, err := state.channel.RevokeCurrentCommitment() + if err != nil { + peerLog.Errorf("unable to revoke current commitment: %v", err) + return + } + p.queueMsg(nextRevocation, nil) + case *lnwire.CommitRevocation: + // We've received a revocation from the remote chain, if valid, + // this moves the remote chain forward, and expands our + // revocation window. + htlcsToForward, err := state.channel.ReceiveRevocation(htlcPkt) + if err != nil { + peerLog.Errorf("unable to accept revocation: %v", err) + p.Disconnect() + return + } + // TODO(roasbeef): send the locked-in HTLC's over the plex chan + // to the switch. + peerLog.Debugf("htlcs ready to forward: %v", + spew.Sdump(htlcsToForward)) + + // If any of the htlc's eligible for forwarding are pending + // settling or timeing out previous outgoing payments, then we + // can them from the pending set, and signal the requster (if + // existing) that the payment has been fully fulfilled. + for _, htlc := range htlcsToForward { + if p, ok := state.clearedHTCLs[htlc.ParentIndex]; ok { + peerLog.Debugf("local htlc %v cleared", + spew.Sdump(p.htlc)) + p.err <- nil + delete(state.clearedHTCLs, htlc.ParentIndex) + } + } + + // A full state transition has been completed, if we don't need + // to settle any HTLC's, then we're done. + if len(state.htlcsToSettle) == 0 { + return + } + + // Otherwise, we have some pending HTLC's which we can pull + // funds from, thereby settling. + peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle)) + for _, pre := range state.htlcsToSettle { + // Add each HTLC settle update to the channel's state + // update log, also sending the log update to the + // remote party. + logIndex, err := state.channel.SettleHTLC(pre, false) + if err != nil { + peerLog.Errorf("unable to settle htlc: %v", err) + continue + } + settleMsg := &lnwire.HTLCSettleRequest{ + ChannelPoint: state.chanPoint, + HTLCKey: lnwire.HTLCKey(logIndex), + RedemptionProofs: [][32]byte{pre}, + } + p.queueMsg(settleMsg, nil) + } + + // With all the settle updates added to the local and remote + // HTLC logs, initiate a state transition by updating the + // remote commitment chain. + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + return + } + state.sigPending = true + state.htlcsToSettle = nil + } +} + // updateCommitTx signs, then sends an update to the remote peer adding a new // commitment to their commitment chain which includes all the latest updates // we've received+processed up to this point.