diff --git a/htlcswitch/link.go b/htlcswitch/link.go new file mode 100644 index 00000000..b36e8166 --- /dev/null +++ b/htlcswitch/link.go @@ -0,0 +1,714 @@ +package htlcswitch + +import ( + "bytes" + "errors" + "fmt" + "time" + + "github.com/Masterminds/glide/cfg" + "github.com/lightningnetwork/lightning-onion" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +// commitmentState is the volatile+persistent state of an active channel's +// commitment update state-machine. This struct is used by htlcManager's to +// save meta-state required for proper functioning. +type commitmentState struct { + // htlcsToSettle is a list of preimages which allow us to settle one or + // many of the pending HTLCs we've received from the upstream peer. + htlcsToSettle map[uint64]*channeldb.Invoice + + // htlcsToCancel is a set of HTLCs identified by their log index which + // are to be cancelled upon the next state transition. + htlcsToCancel map[uint64]lnwire.FailCode + + // cancelReasons stores the reason why a particular HTLC was cancelled. + // The index of the HTLC within the log is mapped to the cancellation + // reason. This value is used to thread the proper error through to the + // htlcSwitch, or subsystem that initiated the HTLC. + cancelReasons map[uint64]lnwire.FailCode + + // pendingBatch is slice of payments which have been added to the + // channel update log, but not yet committed to latest commitment. + pendingBatch []*pendingPayment + + // clearedHTCLs is a map of outgoing HTLCs we've committed to in our + // chain which have not yet been settled by the upstream peer. + clearedHTCLs map[uint64]*pendingPayment + + // switchChan is a channel used to send packets to the htlc switch for + // forwarding. + switchChan chan<- *htlcPacket + + // sphinx is an instance of the Sphinx onion Router for this node. The + // router will be used to process all incoming Sphinx packets embedded + // within HTLC add messages. + sphinx *sphinx.Router + + // pendingCircuits tracks the remote log index of the incoming HTLCs, + // mapped to the processed Sphinx packet contained within the HTLC. + // This map is used as a staging area between when an HTLC is added to + // the log, and when it's locked into the commitment state of both + // chains. Once locked in, the processed packet is sent to the switch + // along with the HTLC to forward the packet to the next hop. + pendingCircuits map[uint64]*sphinx.ProcessedPacket + + channel *lnwallet.LightningChannel + chanPoint *wire.OutPoint + chanID lnwire.ChannelID +} + +// htlcManager is the primary goroutine which drives a channel's commitment +// update state-machine in response to messages received via several channels. +// The htlcManager reads messages from the upstream (remote) peer, and also +// from several possible downstream channels managed by the htlcSwitch. In the +// event that an htlc needs to be forwarded, then send-only htlcPlex chan is +// used which sends htlc packets to the switch for forwarding. Additionally, +// the htlcManager handles acting upon all timeouts for any active HTLCs, +// manages the channel's revocation window, and also the htlc trickle +// queue+timer for this active channels. +func (p *peer) htlcManager(channel *lnwallet.LightningChannel, + htlcPlex chan<- *htlcPacket, downstreamLink <-chan *htlcPacket, + upstreamLink <-chan lnwire.Message) { + + chanStats := channel.StateSnapshot() + peerLog.Infof("HTLC manager for ChannelPoint(%v) started, "+ + "our_balance=%v, their_balance=%v, chain_height=%v", + channel.ChannelPoint(), chanStats.LocalBalance, + chanStats.RemoteBalance, chanStats.NumUpdates) + + // A new session for this active channel has just started, therefore we + // need to send our initial revocation window to the remote peer. + for i := 0; i < lnwallet.InitialRevocationWindow; i++ { + rev, err := channel.ExtendRevocationWindow() + if err != nil { + peerLog.Errorf("unable to expand revocation window: %v", err) + continue + } + p.queueMsg(rev, nil) + } + + chanPoint := channel.ChannelPoint() + state := &commitmentState{ + channel: channel, + chanPoint: chanPoint, + chanID: lnwire.NewChanIDFromOutPoint(chanPoint), + clearedHTCLs: make(map[uint64]*pendingPayment), + htlcsToSettle: make(map[uint64]*channeldb.Invoice), + htlcsToCancel: make(map[uint64]lnwire.FailCode), + cancelReasons: make(map[uint64]lnwire.FailCode), + pendingCircuits: make(map[uint64]*sphinx.ProcessedPacket), + sphinx: p.server.sphinx, + switchChan: htlcPlex, + } + + // TODO(roasbeef): check to see if able to settle any currently pending + // HTLCs + // * also need signals when new invoices are added by the + // invoiceRegistry + + batchTimer := time.NewTicker(50 * time.Millisecond) + defer batchTimer.Stop() + + logCommitTimer := time.NewTicker(100 * time.Millisecond) + defer logCommitTimer.Stop() +out: + for { + select { + case <-channel.UnilateralCloseSignal: + // TODO(roasbeef): need to send HTLC outputs to nursery + peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", + state.chanPoint) + if err := wipeChannel(p, channel); err != nil { + peerLog.Errorf("unable to wipe channel %v", err) + } + + p.server.breachArbiter.settledContracts <- state.chanPoint + + break out + + case <-channel.ForceCloseSignal: + // TODO(roasbeef): path never taken now that server + // force closes's directly? + peerLog.Warnf("ChannelPoint(%v) has been force "+ + "closed, disconnecting from peerID(%x)", + state.chanPoint, p.id) + break out + + case <-logCommitTimer.C: + // If we haven't sent or received a new commitment + // update in some time, check to see if we have any + // pending updates we need to commit due to our + // commitment chains being desynchronized. + if state.channel.FullySynced() && + len(state.htlcsToSettle) == 0 { + continue + } + + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update commitment: %v", + err) + p.Disconnect() + break out + } + + case <-batchTimer.C: + // If the current batch is empty, then we have no work + // here. + if len(state.pendingBatch) == 0 { + continue + } + + // Otherwise, attempt to extend the remote commitment + // chain including all the currently pending entries. + // If the send was unsuccessful, then abandon the + // update, waiting for the revocation window to open + // up. + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + p.Disconnect() + break out + } + + case pkt := <-downstreamLink: + p.handleDownStreamPkt(state, pkt) + + case msg, ok := <-upstreamLink: + // If the upstream message link is closed, this signals + // that the channel itself is being closed, therefore + // we exit. + if !ok { + break out + } + + p.handleUpstreamMsg(state, msg) + case <-p.quit: + break out + } + } + + p.wg.Done() + peerLog.Tracef("htlcManager for peer %v done", p) +} + +// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC +// Switch. Possible messages sent by the switch include requests to forward new +// HTLCs, timeout previously cleared HTLCs, and finally to settle currently +// cleared HTLCs with the upstream peer. +func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { + var isSettle bool + switch htlc := pkt.msg.(type) { + case *lnwire.UpdateAddHTLC: + // A new payment has been initiated via the + // downstream channel, so we add the new HTLC + // to our local log, then update the commitment + // chains. + htlc.ChanID = state.chanID + index, err := state.channel.AddHTLC(htlc) + if err != nil { + // TODO: possibly perform fallback/retry logic + // depending on type of error + peerLog.Errorf("Adding HTLC rejected: %v", err) + pkt.err <- err + close(pkt.done) + + // The HTLC was unable to be added to the state + // machine, as a result, we'll signal the switch to + // cancel the pending payment. + // TODO(roasbeef): need to update link as well if local + // HTLC? + state.switchChan <- &htlcPacket{ + amt: htlc.Amount, + msg: &lnwire.UpdateFailHTLC{ + Reason: []byte{byte(0)}, + }, + srcLink: state.chanID, + } + return + } + + p.queueMsg(htlc, nil) + + state.pendingBatch = append(state.pendingBatch, &pendingPayment{ + htlc: htlc, + index: index, + preImage: pkt.preImage, + err: pkt.err, + done: pkt.done, + }) + + case *lnwire.UpdateFufillHTLC: + // An HTLC we forward to the switch has just settled somewhere + // upstream. Therefore we settle the HTLC within the our local + // state machine. + pre := htlc.PaymentPreimage + logIndex, err := state.channel.SettleHTLC(pre) + if err != nil { + // TODO(roasbeef): broadcast on-chain + peerLog.Errorf("settle for incoming HTLC rejected: %v", err) + p.Disconnect() + return + } + + // With the HTLC settled, we'll need to populate the wire + // message to target the specific channel and HTLC to be + // cancelled. + htlc.ChanID = state.chanID + htlc.ID = logIndex + + // Then we send the HTLC settle message to the connected peer + // so we can continue the propagation of the settle message. + p.queueMsg(htlc, nil) + isSettle = true + + case *lnwire.UpdateFailHTLC: + // An HTLC cancellation has been triggered somewhere upstream, + // we'll remove then HTLC from our local state machine. + logIndex, err := state.channel.FailHTLC(pkt.payHash) + if err != nil { + peerLog.Errorf("unable to cancel HTLC: %v", err) + return + } + + // With the HTLC removed, we'll need to populate the wire + // message to target the specific channel and HTLC to be + // cancelled. The "Reason" field will have already been set + // within the switch. + htlc.ChanID = state.chanID + htlc.ID = logIndex + + // Finally, we send the HTLC message to the peer which + // initially created the HTLC. + p.queueMsg(htlc, nil) + isSettle = true + } + + // If this newly added update exceeds the min batch size for adds, or + // this is a settle request, then initiate an update. + // TODO(roasbeef): enforce max HTLCs in flight limit + if len(state.pendingBatch) >= 10 || isSettle { + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + p.Disconnect() + return + } + } +} + +// handleUpstreamMsg processes wire messages related to commitment state +// updates from the upstream peer. The upstream peer is the peer whom we have a +// direct channel with, updating our respective commitment chains. +func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { + switch htlcPkt := msg.(type) { + // TODO(roasbeef): timeouts + // * fail if can't parse sphinx mix-header + case *lnwire.UpdateAddHTLC: + // Before adding the new HTLC to the state machine, parse the + // onion object in order to obtain the routing information. + blobReader := bytes.NewReader(htlcPkt.OnionBlob[:]) + onionPkt := &sphinx.OnionPacket{} + if err := onionPkt.Decode(blobReader); err != nil { + peerLog.Errorf("unable to decode onion pkt: %v", err) + p.Disconnect() + return + } + + // We just received an add request from an upstream peer, so we + // add it to our state machine, then add the HTLC to our + // "settle" list in the event that we know the preimage + index, err := state.channel.ReceiveHTLC(htlcPkt) + if err != nil { + peerLog.Errorf("Receiving HTLC rejected: %v", err) + p.Disconnect() + return + } + + // TODO(roasbeef): perform sanity checks on per-hop payload + // * time-lock is sane, fee, chain, etc + + // Attempt to process the Sphinx packet. We include the payment + // hash of the HTLC as it's authenticated within the Sphinx + // packet itself as associated data in order to thwart attempts + // a replay attacks. In the case of a replay, an attacker is + // *forced* to use the same payment hash twice, thereby losing + // their money entirely. + rHash := htlcPkt.PaymentHash[:] + sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt, rHash) + if err != nil { + // If we're unable to parse the Sphinx packet, then + // we'll cancel the HTLC after the current commitment + // transition. + peerLog.Errorf("unable to process onion pkt: %v", err) + state.htlcsToCancel[index] = lnwire.SphinxParseError + return + } + + switch sphinxPacket.Action { + // We're the designated payment destination. Therefore we + // attempt to see if we have an invoice locally which'll allow + // us to settle this HTLC. + case sphinx.ExitNode: + rHash := htlcPkt.PaymentHash + invoice, err := p.server.invoices.LookupInvoice(rHash) + if err != nil { + // If we're the exit node, but don't recognize + // the payment hash, then we'll fail the HTLC + // on the next state transition. + peerLog.Errorf("unable to settle HTLC, "+ + "payment hash (%x) unrecognized", rHash[:]) + state.htlcsToCancel[index] = lnwire.UnknownPaymentHash + return + } + + // If we're not currently in debug mode, and the + // extended HTLC doesn't meet the value requested, then + // we'll fail the HTLC. + if !cfg.DebugHTLC && htlcPkt.Amount < invoice.Terms.Value { + peerLog.Errorf("rejecting HTLC due to incorrect "+ + "amount: expected %v, received %v", + invoice.Terms.Value, htlcPkt.Amount) + state.htlcsToCancel[index] = lnwire.IncorrectValue + } else { + // Otherwise, everything is in order and we'll + // settle the HTLC after the current state + // transition. + state.htlcsToSettle[index] = invoice + } + + // There are additional hops left within this route, so we + // track the next hop according to the index of this HTLC + // within their log. When forwarding locked-in HLTC's to the + // switch, we'll attach the routing information so the switch + // can finalize the circuit. + case sphinx.MoreHops: + state.pendingCircuits[index] = sphinxPacket + default: + peerLog.Errorf("mal formed onion packet") + state.htlcsToCancel[index] = lnwire.SphinxParseError + } + + case *lnwire.UpdateFufillHTLC: + pre := htlcPkt.PaymentPreimage + idx := htlcPkt.ID + if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil { + // TODO(roasbeef): broadcast on-chain + peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) + p.Disconnect() + return + } + + // TODO(roasbeef): add preimage to DB in order to swipe + // repeated r-values + case *lnwire.UpdateFailHTLC: + idx := htlcPkt.ID + if err := state.channel.ReceiveFailHTLC(idx); err != nil { + peerLog.Errorf("unable to recv HTLC cancel: %v", err) + p.Disconnect() + return + } + + state.cancelReasons[idx] = lnwire.FailCode(htlcPkt.Reason[0]) + + case *lnwire.CommitSig: + // We just received a new update to our local commitment chain, + // validate this new commitment, closing the link if invalid. + // TODO(roasbeef): redundant re-serialization + sig := htlcPkt.CommitSig.Serialize() + if err := state.channel.ReceiveNewCommitment(sig); err != nil { + peerLog.Errorf("unable to accept new commitment: %v", err) + p.Disconnect() + return + } + + // As we've just just accepted a new state, we'll now + // immediately send the remote peer a revocation for our prior + // state. + nextRevocation, err := state.channel.RevokeCurrentCommitment() + if err != nil { + peerLog.Errorf("unable to revoke commitment: %v", err) + return + } + p.queueMsg(nextRevocation, nil) + + // If both commitment chains are fully synced from our PoV, + // then we don't need to reply with a signature as both sides + // already have a commitment with the latest accepted state. + if state.channel.FullySynced() { + return + } + + // Otherwise, the remote party initiated the state transition, + // so we'll reply with a signature to provide them with their + // version of the latest commitment state. + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update commitment: %v", err) + p.Disconnect() + return + } + + case *lnwire.RevokeAndAck: + // We've received a revocation from the remote chain, if valid, + // this moves the remote chain forward, and expands our + // revocation window. + htlcsToForward, err := state.channel.ReceiveRevocation(htlcPkt) + if err != nil { + peerLog.Errorf("unable to accept revocation: %v", err) + p.Disconnect() + return + } + + // If any of the HTLCs eligible for forwarding are pending + // settling or timing out previous outgoing payments, then we + // can them from the pending set, and signal the requester (if + // existing) that the payment has been fully fulfilled. + var bandwidthUpdate btcutil.Amount + settledPayments := make(map[lnwallet.PaymentHash]struct{}) + cancelledHtlcs := make(map[uint64]struct{}) + for _, htlc := range htlcsToForward { + parentIndex := htlc.ParentIndex + if p, ok := state.clearedHTCLs[parentIndex]; ok { + switch htlc.EntryType { + // If the HTLC was settled successfully, then + // we return a nil error as well as the payment + // preimage back to the possible caller. + case lnwallet.Settle: + p.preImage <- htlc.RPreimage + p.err <- nil + + // Otherwise, the HTLC failed, so we propagate + // the error back to the potential caller. + case lnwallet.Fail: + errMsg := state.cancelReasons[parentIndex] + p.preImage <- [32]byte{} + p.err <- errors.New(errMsg.String()) + } + + close(p.done) + + delete(state.clearedHTCLs, htlc.ParentIndex) + } + + // TODO(roasbeef): rework log entries to a shared + // interface. + if htlc.EntryType != lnwallet.Add { + continue + } + + // If we can settle this HTLC within our local state + // update log, then send the update entry to the remote + // party. + invoice, ok := state.htlcsToSettle[htlc.Index] + if ok { + preimage := invoice.Terms.PaymentPreimage + logIndex, err := state.channel.SettleHTLC(preimage) + if err != nil { + peerLog.Errorf("unable to settle htlc: %v", err) + p.Disconnect() + continue + } + + settleMsg := &lnwire.UpdateFufillHTLC{ + ChanID: state.chanID, + ID: logIndex, + PaymentPreimage: preimage, + } + p.queueMsg(settleMsg, nil) + + delete(state.htlcsToSettle, htlc.Index) + settledPayments[htlc.RHash] = struct{}{} + + bandwidthUpdate += htlc.Amount + continue + } + + // Alternatively, if we marked this HTLC for + // cancellation, then immediately cancel the HTLC as + // it's now locked in within both commitment + // transactions. + reason, ok := state.htlcsToCancel[htlc.Index] + if !ok { + continue + } + + logIndex, err := state.channel.FailHTLC(htlc.RHash) + if err != nil { + peerLog.Errorf("unable to cancel htlc: %v", err) + p.Disconnect() + continue + } + + cancelMsg := &lnwire.UpdateFailHTLC{ + ChanID: state.chanID, + ID: logIndex, + Reason: []byte{byte(reason)}, + } + p.queueMsg(cancelMsg, nil) + delete(state.htlcsToCancel, htlc.Index) + + cancelledHtlcs[htlc.Index] = struct{}{} + } + + go func() { + for _, htlc := range htlcsToForward { + // We don't need to forward any HTLCs that we + // just settled or cancelled above. + // TODO(roasbeef): key by index instead? + if _, ok := settledPayments[htlc.RHash]; ok { + continue + } + if _, ok := cancelledHtlcs[htlc.Index]; ok { + continue + } + + onionPkt := state.pendingCircuits[htlc.Index] + delete(state.pendingCircuits, htlc.Index) + + reason := state.cancelReasons[htlc.ParentIndex] + delete(state.cancelReasons, htlc.ParentIndex) + + // Send this fully activated HTLC to the htlc + // switch to continue the chained clear/settle. + pkt, err := logEntryToHtlcPkt(state.chanID, + htlc, onionPkt, reason) + if err != nil { + peerLog.Errorf("unable to make htlc pkt: %v", + err) + continue + } + + state.switchChan <- pkt + } + + }() + + if len(settledPayments) == 0 && len(cancelledHtlcs) == 0 { + return + } + + // Send an update to the htlc switch of our newly available + // payment bandwidth. + // TODO(roasbeef): ideally should wait for next state update. + if bandwidthUpdate != 0 { + p.server.htlcSwitch.UpdateLink(state.chanID, + bandwidthUpdate) + } + + // With all the settle updates added to the local and remote + // HTLC logs, initiate a state transition by updating the + // remote commitment chain. + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update commitment: %v", err) + p.Disconnect() + return + } + + // Notify the invoiceRegistry of the invoices we just settled + // with this latest commitment update. + // TODO(roasbeef): wait until next transition? + for invoice := range settledPayments { + err := p.server.invoices.SettleInvoice(chainhash.Hash(invoice)) + if err != nil { + peerLog.Errorf("unable to settle invoice: %v", err) + } + } + } +} + +// updateCommitTx signs, then sends an update to the remote peer adding a new +// commitment to their commitment chain which includes all the latest updates +// we've received+processed up to this point. +func (p *peer) updateCommitTx(state *commitmentState) error { + sigTheirs, err := state.channel.SignNextCommitment() + if err == lnwallet.ErrNoWindow { + peerLog.Tracef("revocation window exhausted, unable to send %v", + len(state.pendingBatch)) + return nil + } else if err != nil { + return err + } + + parsedSig, err := btcec.ParseSignature(sigTheirs, btcec.S256()) + if err != nil { + return fmt.Errorf("unable to parse sig: %v", err) + } + + commitSig := &lnwire.CommitSig{ + ChanID: state.chanID, + CommitSig: parsedSig, + } + p.queueMsg(commitSig, nil) + + // As we've just cleared out a batch, move all pending updates to the + // map of cleared HTLCs, clearing out the set of pending updates. + for _, update := range state.pendingBatch { + state.clearedHTCLs[update.index] = update + } + + // Finally, clear our the current batch, and flip the pendingUpdate + // bool to indicate were waiting for a commitment signature. + // TODO(roasbeef): re-slice instead to avoid GC? + state.pendingBatch = nil + + return nil +} + +// logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP) +// log entry the corresponding htlcPacket with src/dest set along with the +// proper wire message. This helper method is provided in order to aid an +// htlcManager in forwarding packets to the htlcSwitch. +func logEntryToHtlcPkt(chanID lnwire.ChannelID, pd *lnwallet.PaymentDescriptor, + onionPkt *sphinx.ProcessedPacket, + reason lnwire.FailCode) (*htlcPacket, error) { + + pkt := &htlcPacket{} + + // TODO(roasbeef): alter after switch to log entry interface + var msg lnwire.Message + switch pd.EntryType { + + case lnwallet.Add: + // TODO(roasbeef): timeout, onion blob, etc + var b bytes.Buffer + if err := onionPkt.Packet.Encode(&b); err != nil { + return nil, err + } + + htlc := &lnwire.UpdateAddHTLC{ + Amount: pd.Amount, + PaymentHash: pd.RHash, + } + copy(htlc.OnionBlob[:], b.Bytes()) + msg = htlc + + case lnwallet.Settle: + msg = &lnwire.UpdateFufillHTLC{ + PaymentPreimage: pd.RPreimage, + } + + case lnwallet.Fail: + // For cancellation messages, we'll also need to set the rHash + // within the htlcPacket so the switch knows on which outbound + // link to forward the cancellation message + msg = &lnwire.UpdateFailHTLC{ + Reason: []byte{byte(reason)}, + } + pkt.payHash = pd.RHash + } + + pkt.amt = pd.Amount + pkt.msg = msg + + pkt.srcLink = chanID + pkt.onion = onionPkt + + return pkt, nil +}