diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 4e0117a7..99a435c5 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -9,8 +9,6 @@ import ( "io" - "encoding/hex" - "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" @@ -112,6 +110,10 @@ type ChannelLinkConfig struct { // switch. Additionally, the link encapsulate logic of commitment protocol // message ordering and updates. type channelLink struct { + // The following fields are only meant to be used *atomically* + started int32 + shutdown int32 + // cancelReasons stores the reason why a particular HTLC was cancelled. // The index of the HTLC within the log is mapped to the cancellation // reason. This value is used to thread the proper error through to the @@ -121,12 +123,15 @@ type channelLink struct { // htlc cancel reasons. cancelReasons map[uint64]lnwire.OpaqueReason - // blobs tracks the remote log index of the incoming htlc's, mapped to - // the htlc onion blob which encapsulates the next hop. + // clearedOnionBlobs tracks the remote log index of the incoming + // htlc's, mapped to the htlc onion blob which encapsulates the next + // hop. HTLC's are added to this map once the HTLC has been cleared, + // meaning the commitment state reflects the update encoded within this + // HTLC. // // TODO(andrew.shvv) remove after payment descriptor start store // htlc onion blobs. - blobs map[uint64][lnwire.OnionPacketSize]byte + clearedOnionBlobs map[uint64][lnwire.OnionPacketSize]byte // batchCounter is the number of updates which we received from remote // side, but not include in commitment transaction yet and plus the @@ -143,26 +148,25 @@ type channelLink struct { // cfg is a structure which carries all dependable fields/handlers // which may affect behaviour of the service. - cfg *ChannelLinkConfig + cfg ChannelLinkConfig // overflowQueue is used to store the htlc add updates which haven't // been processed because of the commitment transaction overflow. overflowQueue *packetQueue - // upstream is a channel which responsible for propagating the received - // from remote peer messages, with which we have an opened channel, to - // handler function. + // upstream is a channel that new messages sent from the remote peer to + // the local peer will be sent across. upstream chan lnwire.Message - // downstream is a channel which responsible for propagating the - // received htlc switch packet which are forwarded from anther channel - // to the handler function. + // downstream is a channel in which new multi-hop HTLC's to be + // forwarded will be sent across. Messages from this channel are sent + // by the HTLC switch. downstream chan *htlcPacket - // control is used to propagate the commands to its handlers. This - // channel is needed in order to handle commands in sequence manner, - // i.e in the main handler loop. - control chan interface{} + // linkControl is a channel which is used to query the state of the + // link, or update various policies used which govern if an HTLC is to + // be forwarded and/or accepted. + linkControl chan interface{} // logCommitTimer is a timer which is sent upon if we go an interval // without receiving/sending a commitment update. It's role is to @@ -172,27 +176,26 @@ type channelLink struct { logCommitTimer *time.Timer logCommitTick <-chan time.Time - started int32 - shutdown int32 - wg sync.WaitGroup - quit chan struct{} + wg sync.WaitGroup + quit chan struct{} } -// NewChannelLink create new instance of channel link. -func NewChannelLink(cfg *ChannelLinkConfig, +// NewChannelLink creates a new instance of a ChannelLink given a configuration +// and active channel that will be used to verify/apply updates to. +func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel) ChannelLink { return &channelLink{ - cfg: cfg, - channel: channel, - blobs: make(map[uint64][lnwire.OnionPacketSize]byte), - upstream: make(chan lnwire.Message), - downstream: make(chan *htlcPacket), - control: make(chan interface{}), - cancelReasons: make(map[uint64]lnwire.OpaqueReason), - logCommitTimer: time.NewTimer(300 * time.Millisecond), - overflowQueue: newWaitingQueue(), - quit: make(chan struct{}), + cfg: cfg, + channel: channel, + clearedOnionBlobs: make(map[uint64][lnwire.OnionPacketSize]byte), + upstream: make(chan lnwire.Message), + downstream: make(chan *htlcPacket), + linkControl: make(chan interface{}), + cancelReasons: make(map[uint64]lnwire.OpaqueReason), + logCommitTimer: time.NewTimer(300 * time.Millisecond), + overflowQueue: newWaitingQueue(), + quit: make(chan struct{}), } } @@ -210,10 +213,10 @@ func (l *channelLink) Start() error { return nil } - log.Infof("channel link(%v): starting", l) + log.Infof("ChannelLink(%v) is starting", l) l.wg.Add(1) - go l.htlcHandler() + go l.htlcManager() return nil } @@ -228,13 +231,13 @@ func (l *channelLink) Stop() { return } - log.Infof("channel link(%v): stopping", l) + log.Infof("ChannelLink(%v) is stopping", l) close(l.quit) l.wg.Wait() } -// htlcHandler is the primary goroutine which drives a channel's commitment +// htlcManager is the primary goroutine which drives a channel's commitment // update state-machine in response to messages received via several channels. // This goroutine reads messages from the upstream (remote) peer, and also from // downstream channel managed by the channel link. In the event that an htlc @@ -244,7 +247,7 @@ func (l *channelLink) Stop() { // window, and also the htlc trickle queue+timer for this active channels. // // NOTE: This MUST be run as a goroutine. -func (l *channelLink) htlcHandler() { +func (l *channelLink) htlcManager() { defer l.wg.Done() log.Infof("HTLC manager for ChannelPoint(%v) started, "+ @@ -274,23 +277,31 @@ func (l *channelLink) htlcHandler() { out: for { select { + // The underlying channel has notified us of a unilateral close + // carried out by the remote peer. In the case of such an + // event, we'll wipe the channel state from the peer, and mark + // the contract as fully settled. Afterwards we can exit. case <-l.channel.UnilateralCloseSignal: - // TODO(roasbeef): need to send HTLC outputs to nursery log.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", l.channel.ChannelPoint()) if err := l.cfg.Peer.WipeChannel(l.channel); err != nil { log.Errorf("unable to wipe channel %v", err) } + // TODO(roasbeef): need to send HTLC outputs to nursery + l.cfg.SettledContracts <- l.channel.ChannelPoint() break out + // A local sub-system has initiated a force close of the active + // channel. In this case we can exit immediately as no further + // updates should be processed for the channel. case <-l.channel.ForceCloseSignal: // TODO(roasbeef): path never taken now that server // force closes's directly? log.Warnf("ChannelPoint(%v) has been force "+ - "closed, disconnecting from peerID(%x)", - l.channel.ChannelPoint(), l.cfg.Peer.ID()) + "closed, disconnecting from peer(%x)", + l.channel.ChannelPoint(), l.cfg.Peer.PubKey()) break out case <-l.logCommitTick: @@ -335,23 +346,25 @@ out: case packet := <-l.overflowQueue.pending: msg := packet.htlc.(*lnwire.UpdateAddHTLC) log.Tracef("Reprocessing downstream add update "+ - "with payment hash(%v)", - hex.EncodeToString(msg.PaymentHash[:])) + "with payment hash(%x)", + msg.PaymentHash[:]) l.handleDownStreamPkt(packet) + // A message from the switch was just received. This indicates + // that the link is an intermediate hop in a multi-hop HTLC + // circuit. case pkt := <-l.downstream: - // If we have non empty processing queue then in order - // we'll add this to the overflow rather than - // processing it directly. Once an active HTLC is - // either settled or failed, then we'll free up a new - // slot. + // If we have non empty processing queue then we'll add + // this to the overflow rather than processing it + // directly. Once an active HTLC is either settled or + // failed, then we'll free up a new slot. htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) if ok && l.overflowQueue.length() != 0 { log.Infof("Downstream htlc add update with "+ - "payment hash(%v) have been added to "+ + "payment hash(%x) have been added to "+ "reprocessing queue, batch: %v", - hex.EncodeToString(htlc.PaymentHash[:]), + htlc.PaymentHash[:], l.batchCounter) l.overflowQueue.consume(pkt) @@ -359,13 +372,21 @@ out: } l.handleDownStreamPkt(pkt) + // A message from the connected peer was just received. This + // indicates that we have a new incoming HTLC, either directly + // for us, or part of a multi-hop HTLC circuit. case msg := <-l.upstream: l.handleUpstreamMsg(msg) - case cmd := <-l.control: - switch cmd := cmd.(type) { + case cmd := <-l.linkControl: + switch req := cmd.(type) { case *getBandwidthCmd: - cmd.done <- l.getBandwidth() + req.resp <- l.getBandwidth() + case *policyUpdate: + l.cfg.FwrdingPolicy = req.policy + if req.done != nil { + close(req.done) + } } case <-l.quit: @@ -373,7 +394,7 @@ out: } } - log.Infof("channel link(%v): htlc handler closed", l) + log.Infof("ChannelLink(%v) has exited", l) } // handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC @@ -391,33 +412,39 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { index, err := l.channel.AddHTLC(htlc) if err == lnwallet.ErrMaxHTLCNumber { log.Infof("Downstream htlc add update with "+ - "payment hash(%v) have been added to "+ + "payment hash(%x) have been added to "+ "reprocessing queue, batch: %v", - hex.EncodeToString(htlc.PaymentHash[:]), + htlc.PaymentHash[:], l.batchCounter) l.overflowQueue.consume(pkt) return } else if err != nil { + failPacket := newFailPacket( + l.ShortChanID(), + &lnwire.UpdateFailHTLC{ + Reason: []byte{byte(0)}, + }, + htlc.PaymentHash, + htlc.Amount, + ) + // The HTLC was unable to be added to the state // machine, as a result, we'll signal the switch to // cancel the pending payment. - go l.cfg.Switch.forward(newFailPacket(l.ChanID(), - &lnwire.UpdateFailHTLC{ - Reason: []byte{byte(0)}, - }, htlc.PaymentHash, htlc.Amount)) + go l.cfg.Switch.forward(failPacket) log.Errorf("unable to handle downstream add HTLC: %v", err) return } - log.Tracef("Received downstream htlc with payment hash"+ - "(%v), assign the index: %v, batch: %v", - hex.EncodeToString(htlc.PaymentHash[:]), - index, l.batchCounter+1) + + log.Tracef("Received downstream htlc: payment_hash=%x, "+ + "local_log_index=%v, batch_size=%v", + htlc.PaymentHash[:], index, l.batchCounter+1) + htlc.ID = index l.cfg.Peer.SendMessage(htlc) - l.batchCounter++ case *lnwire.UpdateFufillHTLC: // An HTLC we forward to the switch has just settled somewhere @@ -442,7 +469,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { // Then we send the HTLC settle message to the connected peer // so we can continue the propagation of the settle message. l.cfg.Peer.SendMessage(htlc) - l.batchCounter++ isSettle = true case *lnwire.UpdateFailHTLC: @@ -464,10 +490,11 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { // Finally, we send the HTLC message to the peer which // initially created the HTLC. l.cfg.Peer.SendMessage(htlc) - l.batchCounter++ isSettle = true } + l.batchCounter++ + // If this newly added update exceeds the min batch size for adds, or // this is a settle request, then initiate an update. if l.batchCounter >= 10 || isSettle { @@ -485,30 +512,28 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { // direct channel with, updating our respective commitment chains. func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { switch msg := msg.(type) { - // TODO(roasbeef): timeouts - // * fail if can't parse sphinx mix-header case *lnwire.UpdateAddHTLC: // We just received an add request from an upstream peer, so we // add it to our state machine, then add the HTLC to our // "settle" list in the event that we know the preimage. index, err := l.channel.ReceiveHTLC(msg) if err != nil { + // TODO(roasbeef): fail channel log.Errorf("unable to handle upstream add HTLC: %v", err) l.cfg.Peer.Disconnect() return } - log.Tracef("Receive upstream htlc with payment hash(%v), "+ - "assign the index: %v", - hex.EncodeToString(msg.PaymentHash[:]), index) + log.Tracef("Receive upstream htlc with payment hash(%x), "+ + "assigning index: %v", msg.PaymentHash[:], index) // TODO(roasbeef): perform sanity checks on per-hop payload // * time-lock is sane, fee, chain, etc // Store the onion blob which encapsulate the htlc route and - // use in on stage of htlc inclusion to retrieve the next hope - // and propagate the htlc farther. - l.blobs[index] = msg.OnionBlob + // use in on stage of HTLC inclusion to retrieve the next hop + // and propagate the HTLC along the remaining route. + l.clearedOnionBlobs[index] = msg.OnionBlob case *lnwire.UpdateFufillHTLC: pre := msg.PaymentPreimage @@ -685,7 +710,7 @@ func (l *channelLink) ChanID() lnwire.ChannelID { // getBandwidthCmd is a wrapper for get bandwidth handler. type getBandwidthCmd struct { - done chan btcutil.Amount + resp chan btcutil.Amount } // Bandwidth returns the amount which current link might pass through channel @@ -695,12 +720,12 @@ type getBandwidthCmd struct { // NOTE: Part of the ChannelLink interface. func (l *channelLink) Bandwidth() btcutil.Amount { command := &getBandwidthCmd{ - done: make(chan btcutil.Amount, 1), + resp: make(chan btcutil.Amount, 1), } select { - case l.control <- command: - return <-command.done + case l.linkControl <- command: + return <-command.resp case <-l.quit: return 0 } @@ -755,51 +780,74 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { } } -// processLockedInHtlcs function is used to proceed the HTLCs which was -// designated as eligible for forwarding. But not all htlc will be forwarder, -// if htlc reached its final destination that we should settle it. +// processLockedInHtlcs serially processes each of the log updates which have +// been "locked-in". An HTLC is considered locked-in once it has been fully +// committed to in both the remote and local commitment state. Once a channel +// updates is locked-in, then it can be acted upon, meaning: settling htlc's, +// cancelling them, or forwarding new HTLC's to the next hop. func (l *channelLink) processLockedInHtlcs( paymentDescriptors []*lnwallet.PaymentDescriptor) []*htlcPacket { - var needUpdate bool + var ( + needUpdate bool + packetsToForward []*htlcPacket + ) - var packetsToForward []*htlcPacket for _, pd := range paymentDescriptors { // TODO(roasbeef): rework log entries to a shared // interface. switch pd.EntryType { + // A settle for an HTLC we previously forwarded HTLC has been + // received. So we'll forward the HTLC to the switch which + // will handle propagating the settle to the prior hop. case lnwallet.Settle: - // Forward message to switch which will decide does - // this peer is the final destination of htlc and we - // should notify user about successful income or it - // should be propagated back to the origin peer. - packetsToForward = append(packetsToForward, - newSettlePacket(l.ChanID(), - &lnwire.UpdateFufillHTLC{ - PaymentPreimage: pd.RPreimage, - }, pd.RHash, pd.Amount)) + settleUpdate := &lnwire.UpdateFufillHTLC{ + PaymentPreimage: pd.RPreimage, + } + settlePacket := newSettlePacket(l.ShortChanID(), + settleUpdate, pd.RHash, pd.Amount) + + // Add the packet to the batch to be forwarded, and + // notify the overflow queue that a spare spot has been + // freed up within the commitment state. + packetsToForward = append(packetsToForward, settlePacket) l.overflowQueue.release() + // A failure message for a previously forwarded HTLC has been + // received. As a result a new slot will be freed up in our + // commitment state, so we'll forward this to the switch so the + // backwards undo can continue. case lnwallet.Fail: + // Fetch the reason the HTLC was cancelled so we can + // continue to propagate it. opaqueReason := l.cancelReasons[pd.ParentIndex] - // Forward message to switch which will decide does - // this peer is the final destination of htlc and we - // should notify user about fail income or it should be - // propagated back to the origin peer. - packetsToForward = append(packetsToForward, - newFailPacket(l.ChanID(), - &lnwire.UpdateFailHTLC{ - Reason: opaqueReason, - ChanID: l.ChanID(), - }, pd.RHash, pd.Amount)) + failUpdate := &lnwire.UpdateFailHTLC{ + Reason: opaqueReason, + ChanID: l.ChanID(), + } + failPacket := newFailPacket(l.ShortChanID(), failUpdate, + pd.RHash, pd.Amount) + + // Add the packet to the batch to be forwarded, and + // notify the overflow queue that a spare spot has been + // freed up within the commitment state. + packetsToForward = append(packetsToForward, failPacket) l.overflowQueue.release() + // An incoming HTLC add has been full-locked in. As a result we + // can no examine the forwarding details of the HTLC, and the + // HTLC itself ti decide if: we should forward it, cancel it, + // or are able to settle it (and it adheres to our fee related + // constraints). case lnwallet.Add: - blob := l.blobs[pd.Index] - buffer := bytes.NewBuffer(blob[:]) - delete(l.blobs, pd.Index) + // Fetch the onion blob that was included within this + // processed payment descriptor. + onionBlob := l.clearedOnionBlobs[pd.Index] + delete(l.clearedOnionBlobs, pd.Index) + + onionReader := bytes.NewReader(onionBlob[:]) // Before adding the new htlc to the state machine, // parse the onion object in order to obtain the @@ -812,7 +860,8 @@ func (l *channelLink) processLockedInHtlcs( // attacks. In the case of a replay, an attacker is // *forced* to use the same payment hash twice, thereby // losing their money entirely. - chanIterator, err := l.cfg.DecodeOnion(buffer, pd.RHash[:]) + chanIterator, err := l.cfg.DecodeOnion(onionReader, + pd.RHash[:]) if err != nil { // If we're unable to parse the Sphinx packet, // then we'll cancel the htlc. @@ -823,33 +872,10 @@ func (l *channelLink) processLockedInHtlcs( continue } - if nextChan := chanIterator.Next(); nextChan != nil { - // There are additional channels left within - // this route. - var ( - b bytes.Buffer - blob [lnwire.OnionPacketSize]byte - ) + fwdInfo := chanIterator.ForwardingInstructions() - err := chanIterator.Encode(&b) - if err != nil { - log.Errorf("unable to encode the "+ - "remaining route %v", err) - reason := []byte{byte(lnwire.UnknownError)} - l.sendHTLCError(pd.RHash, reason) - needUpdate = true - continue - } - copy(blob[:], b.Bytes()) - - packetsToForward = append(packetsToForward, - newAddPacket(l.ChanID(), *nextChan, - &lnwire.UpdateAddHTLC{ - Amount: pd.Amount, - PaymentHash: pd.RHash, - OnionBlob: blob, - })) - } else { + switch fwdInfo.NextHop { + case exitHop: // We're the designated payment destination. // Therefore we attempt to see if we have an // invoice locally which'll allow us to settle @@ -865,6 +891,38 @@ func (l *channelLink) processLockedInHtlcs( continue } + // As we're the exit hop, we'll double check + // the hop-payload included in the HTLC to + // ensure that it was crafted correctly by the + // sender and matches the HTLC we were + // extended. Additionally, we'll ensure that + // our time-lock value has been computed + // correctly. + if fwdInfo.AmountToForward != invoice.Terms.Value { + log.Errorf("Onion payload of incoming "+ + "htlc(%x) has incorrect value: "+ + "expected %v, got %v", pd.RHash, + invoice.Terms.Value, + fwdInfo.AmountToForward) + + reason := []byte{byte(lnwire.IncorrectValue)} + l.sendHTLCError(pd.RHash, reason) + needUpdate = true + continue + } + if fwdInfo.OutgoingCTLV != l.cfg.FwrdingPolicy.TimeLockDelta { + log.Errorf("Onion payload of incoming "+ + "htlc(%x) has incorrect time-lock: "+ + "expected %v, got %v", + l.cfg.FwrdingPolicy.TimeLockDelta, + fwdInfo.OutgoingCTLV) + + reason := []byte{byte(lnwire.UpstreamTimeout)} + l.sendHTLCError(pd.RHash, reason) + needUpdate = true + continue + } + // If we're not currently in debug mode, and // the extended htlc doesn't meet the value // requested, then we'll fail the htlc. @@ -875,6 +933,7 @@ func (l *channelLink) processLockedInHtlcs( log.Errorf("rejecting htlc due to incorrect "+ "amount: expected %v, received %v", invoice.Terms.Value, pd.Amount) + reason := []byte{byte(lnwire.IncorrectValue)} l.sendHTLCError(pd.RHash, reason) needUpdate = true @@ -907,6 +966,107 @@ func (l *channelLink) processLockedInHtlcs( PaymentPreimage: preimage, }) needUpdate = true + + // There are additional channels left within this + // route. So we'll verify that our forwarding + // constraints have been properly met by by this + // incoming HTLC. + default: + // As our first sanity check, we'll ensure that + // the passed HTLC isn't too small. If so, then + // we'll cancel the HTLC directly. + if pd.Amount < l.cfg.FwrdingPolicy.MinHTLC { + log.Errorf("Incoming htlc(%x) is too "+ + "small: min_htlc=%v, hltc_value=%v", + pd.RHash[:], l.cfg.FwrdingPolicy.MinHTLC, + pd.Amount) + + reason := []byte{byte(lnwire.IncorrectValue)} + l.sendHTLCError(pd.RHash, reason) + needUpdate = true + continue + } + + // Next, using the amount of the incoming + // HTLC, we'll calculate the expected fee this + // incoming HTLC must carry in order to be + // accepted. + expectedFee := ExpectedFee( + l.cfg.FwrdingPolicy, + pd.Amount, + ) + + // If the amount of the incoming HTLC, minus + // our expected fee isn't equal to the + // forwarding instructions, then either the + // values have been tampered with, or the send + // used incorrect/dated information to + // construct the forwarding information for + // this hop. In any case, we'll cancel this + // HTLC. + if pd.Amount-expectedFee != fwdInfo.AmountToForward { + log.Errorf("Incoming htlc(%x) has "+ + "insufficient fee: expected "+ + "%v, got %v", pd.RHash[:], + btcutil.Amount(pd.Amount-fwdInfo.AmountToForward), + btcutil.Amount(expectedFee)) + + // TODO(andrew.shvv): send proper back error + reason := []byte{byte(lnwire.IncorrectValue)} + l.sendHTLCError(pd.RHash, reason) + needUpdate = true + continue + } + + // Finally, we'll ensure that the time-lock on + // the outgoing HTLC meets the following + // constraint: the incoming time-lock minus our + // time-lock delta should equal the outgoing + // time lock. Otherwise, whether the sender + // messed up, or an intermediate node tampered + // with the HTLC. + timeDelta := l.cfg.FwrdingPolicy.TimeLockDelta + if pd.Timeout-timeDelta != fwdInfo.OutgoingCTLV { + // TODO(andrew.shvv): send proper back error + log.Errorf("Incoming htlc(%x) has "+ + "incorrect time-lock value: expected "+ + "%v blocks, got %v blocks", + pd.Timeout-timeDelta, fwdInfo.OutgoingCTLV) + + // TODO(andrew.shvv): send proper back error + reason := []byte{byte(lnwire.UpstreamTimeout)} + l.sendHTLCError(pd.RHash, reason) + needUpdate = true + continue + } + + // With all our forwarding constraints met, + // we'll create the outgoing HTLC using the + // parameters as specified in the forwarding + // info. + addMsg := &lnwire.UpdateAddHTLC{ + Expiry: fwdInfo.OutgoingCTLV, + Amount: fwdInfo.AmountToForward, + PaymentHash: pd.RHash, + } + + // Finally, we'll encode the onion packet for + // the _next_ hop using the hop iterator + // decoded for the current hop. + buf := bytes.NewBuffer(addMsg.OnionBlob[0:0]) + err := chanIterator.EncodeNextHop(buf) + if err != nil { + log.Errorf("unable to encode the "+ + "remaining route %v", err) + reason := []byte{byte(lnwire.UnknownError)} + l.sendHTLCError(pd.RHash, reason) + needUpdate = true + continue + } + + updatePacket := newAddPacket(l.ShortChanID(), + fwdInfo.NextHop, addMsg) + packetsToForward = append(packetsToForward, updatePacket) } } }