htlcswitch: move onion blob processing

Step №4 in making htlcManager (aka channelLink) testable:

This step consist of two:
1. Start using the hop iterator abstraction, the concrete
implementation of which will be added later, basically it will we the
same sphinx onion packet processor, but wrapped in hop iterator
abstraction.

2. The RevokAndAck processing part have been replaced by the
"processLockedInHtlcs" function which implement the same logic, but make
it a bit simpler.

Such changes will allow as to get rid of the the unnecessary variables.
This commit is contained in:
Andrey Samokhvalov 2017-05-03 18:57:13 +03:00 committed by Olaoluwa Osuntokun
parent de01721aed
commit 0e2209cb12

@ -13,7 +13,6 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/pkg/errors"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/wire"
@ -69,7 +68,13 @@ type channelLink struct {
// The index of the HTLC within the log is mapped to the cancellation
// reason. This value is used to thread the proper error through to the
// htlcSwitch, or subsystem that initiated the HTLC.
cancelReasons map[uint64]lnwire.FailCode
cancelReasons map[uint64]lnwire.OpaqueReason
// blobs tracks the remote log index of the incoming htlc's,
// mapped to the htlc onion blob which encapsulates the next hop.
// TODO(andrew.shvv) state machine might be used instead to determine
// the pending number of updates.
blobs map[uint64][lnwire.OnionPacketSize]byte
// pendingBatch is slice of payments which have been added to the
// channel update log, but not yet committed to latest commitment.
@ -385,16 +390,6 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// TODO(roasbeef): timeouts
// * fail if can't parse sphinx mix-header
case *lnwire.UpdateAddHTLC:
// Before adding the new HTLC to the state machine, parse the
// onion object in order to obtain the routing information.
blobReader := bytes.NewReader(msg.OnionBlob[:])
onionPkt := &sphinx.OnionPacket{}
if err := onionPkt.Decode(blobReader); err != nil {
log.Errorf("unable to decode onion pkt: %v", err)
l.cfg.Peer.Disconnect()
return
}
// We just received an add request from an upstream peer, so we
// add it to our state machine, then add the HTLC to our
// "settle" list in the event that we know the preimage
@ -409,67 +404,10 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// TODO(roasbeef): perform sanity checks on per-hop payload
// * time-lock is sane, fee, chain, etc
// Attempt to process the Sphinx packet. We include the payment
// hash of the HTLC as it's authenticated within the Sphinx
// packet itself as associated data in order to thwart attempts
// a replay attacks. In the case of a replay, an attacker is
// *forced* to use the same payment hash twice, thereby losing
// their money entirely.
rHash := msg.PaymentHash[:]
sphinxPacket, err := l.sphinx.ProcessOnionPacket(onionPkt, rHash)
if err != nil {
// If we're unable to parse the Sphinx packet, then
// we'll cancel the HTLC after the current commitment
// transition.
log.Errorf("unable to process onion pkt: %v",
err)
l.htlcsToCancel[index] = lnwire.SphinxParseError
return
}
switch sphinxPacket.Action {
// We're the designated payment destination. Therefore we
// attempt to see if we have an invoice locally which'll allow
// us to settle this HTLC.
case sphinx.ExitNode:
rHash := msg.PaymentHash
invoice, err := l.cfg.Registry.LookupInvoice(rHash)
if err != nil {
// If we're the exit node, but don't recognize
// the payment hash, then we'll fail the HTLC
// on the next state transition.
log.Errorf("unable to settle HTLC, "+
"payment hash (%x) unrecognized", rHash[:])
l.htlcsToCancel[index] = lnwire.UnknownPaymentHash
return
}
// If we're not currently in debug mode, and the
// extended HTLC doesn't meet the value requested, then
// we'll fail the HTLC.
if !l.cfg.DebugHTLC && msg.Amount < invoice.Terms.Value {
log.Errorf("rejecting HTLC due to incorrect "+
"amount: expected %v, received %v",
invoice.Terms.Value, msg.Amount)
l.htlcsToCancel[index] = lnwire.IncorrectValue
} else {
// Otherwise, everything is in order and we'll
// settle the HTLC after the current state
// transition.
l.htlcsToSettle[index] = invoice
}
// There are additional hops left within this route, so we
// track the next hop according to the index of this HTLC
// within their log. When forwarding locked-in HLTC's to the
// switch, we'll attach the routing information so the switch
// can finalize the circuit.
case sphinx.MoreHops:
l.pendingCircuits[index] = sphinxPacket
default:
log.Errorf("malformed onion packet")
l.htlcsToCancel[index] = lnwire.SphinxParseError
}
// Store the onion blob which encapsulate the htlc route and
// use in on stage of htlc inclusion to retrieve the
// next hope and propagate the htlc farther.
l.blobs[index] = msg.OnionBlob
case *lnwire.UpdateFufillHTLC:
pre := msg.PaymentPreimage
@ -493,7 +431,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
return
}
l.cancelReasons[idx] = lnwire.FailCode(msg.Reason[0])
l.cancelReasons[idx] = msg.Reason
case *lnwire.CommitSig:
// We just received a new update to our local commitment chain,
@ -536,167 +474,28 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// We've received a revocation from the remote chain, if valid,
// this moves the remote chain forward, and expands our
// revocation window.
htlcsToForward, err := l.channel.ReceiveRevocation(msg)
htlcs, err := l.channel.ReceiveRevocation(msg)
if err != nil {
log.Errorf("unable to accept revocation: %v", err)
l.cfg.Peer.Disconnect()
return
}
// If any of the HTLCs eligible for forwarding are pending
// settling or timing out previous outgoing payments, then we
// can them from the pending set, and signal the requester (if
// existing) that the payment has been fully fulfilled.
var bandwidthUpdate btcutil.Amount
settledPayments := make(map[lnwallet.PaymentHash]struct{})
cancelledHtlcs := make(map[uint64]struct{})
for _, htlc := range htlcsToForward {
parentIndex := htlc.ParentIndex
if p, ok := l.clearedHTCLs[parentIndex]; ok {
switch htlc.EntryType {
// If the HTLC was settled successfully, then
// we return a nil error as well as the payment
// preimage back to the possible caller.
case lnwallet.Settle:
p.preImage <- htlc.RPreimage
p.err <- nil
// Otherwise, the HTLC failed, so we propagate
// the error back to the potential caller.
case lnwallet.Fail:
errMsg := l.cancelReasons[parentIndex]
p.preImage <- [32]byte{}
p.err <- errors.New(errMsg.String())
}
close(p.done)
delete(l.clearedHTCLs, htlc.ParentIndex)
}
// TODO(roasbeef): rework log entries to a shared
// interface.
if htlc.EntryType != lnwallet.Add {
continue
}
// If we can settle this HTLC within our local state
// update log, then send the update entry to the remote
// party.
invoice, ok := l.htlcsToSettle[htlc.Index]
if ok {
preimage := invoice.Terms.PaymentPreimage
logIndex, err := l.channel.SettleHTLC(preimage)
if err != nil {
log.Errorf("unable to settle htlc: %v", err)
l.cfg.Peer.Disconnect()
continue
}
settleMsg := &lnwire.UpdateFufillHTLC{
ChanID: l.chanID,
ID: logIndex,
PaymentPreimage: preimage,
}
l.cfg.Peer.SendMessage(settleMsg)
delete(l.htlcsToSettle, htlc.Index)
settledPayments[htlc.RHash] = struct{}{}
bandwidthUpdate += htlc.Amount
continue
}
// Alternatively, if we marked this HTLC for
// cancellation, then immediately cancel the HTLC as
// it's now locked in within both commitment
// transactions.
reason, ok := l.htlcsToCancel[htlc.Index]
if !ok {
continue
}
logIndex, err := l.channel.FailHTLC(htlc.RHash)
if err != nil {
log.Errorf("unable to cancel htlc: %v", err)
l.cfg.Peer.Disconnect()
continue
}
cancelMsg := &lnwire.UpdateFailHTLC{
ChanID: l.chanID,
ID: logIndex,
Reason: []byte{byte(reason)},
}
l.cfg.Peer.SendMessage(cancelMsg)
delete(l.htlcsToCancel, htlc.Index)
cancelledHtlcs[htlc.Index] = struct{}{}
}
// After we treat HTLCs as included in both
// remote/local commitment transactions they might be
// safely propagated over htlc switch or settled if our node was
// last node in htlc path.
htlcsToForward := l.processLockedInHtlcs(htlcs)
go func() {
for _, htlc := range htlcsToForward {
// We don't need to forward any HTLCs that we
// just settled or cancelled above.
// TODO(roasbeef): key by index instead?
if _, ok := settledPayments[htlc.RHash]; ok {
continue
for _, packet := range htlcsToForward {
if err := l.cfg.Switch.forward(packet); err != nil {
log.Errorf("channel link(%v): "+
"unhandled error while forwarding "+
"htlc packet over htlc "+
"switch: %v", l, err)
}
if _, ok := cancelledHtlcs[htlc.Index]; ok {
continue
}
onionPkt := l.pendingCircuits[htlc.Index]
delete(l.pendingCircuits, htlc.Index)
reason := l.cancelReasons[htlc.ParentIndex]
delete(l.cancelReasons, htlc.ParentIndex)
// Send this fully activated HTLC to the htlc
// switch to continue the chained clear/settle.
pkt, err := logEntryToHtlcPkt(l.chanID,
htlc, onionPkt, reason)
if err != nil {
log.Errorf("unable to make htlc pkt: %v",
err)
continue
}
l.switchChan <- pkt
}
}()
if len(settledPayments) == 0 && len(cancelledHtlcs) == 0 {
return
}
// Send an update to the htlc switch of our newly available
// payment bandwidth.
// TODO(roasbeef): ideally should wait for next state update.
if bandwidthUpdate != 0 {
p.server.htlcSwitch.UpdateLink(state.chanID,
bandwidthUpdate)
}
// With all the settle updates added to the local and remote
// HTLC logs, initiate a state transition by updating the
// remote commitment chain.
if err := l.updateCommitTx(); err != nil {
log.Errorf("unable to update commitment: %v", err)
l.cfg.Peer.Disconnect()
return
}
// Notify the invoiceRegistry of the invoices we just settled
// with this latest commitment update.
// TODO(roasbeef): wait until next transition?
for invoice := range settledPayments {
err := l.cfg.Registry.SettleInvoice(chainhash.Hash(invoice))
if err != nil {
log.Errorf("unable to settle invoice: %v", err)
}
}
}
}
@ -865,3 +664,184 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
case <-l.quit:
}
}
// processLockedInHtlcs function is used to proceed the HTLCs which was
// designated as eligible for forwarding. But not all htlc will be
// forwarder, if htlc reached its final destination that we should settle it.
func (l *channelLink) processLockedInHtlcs(
paymentDescriptors []*lnwallet.PaymentDescriptor) []*htlcPacket {
var needUpdate bool
var packetsToForward []*htlcPacket
for _, pd := range paymentDescriptors {
// TODO(roasbeef): rework log entries to a shared
// interface.
switch pd.EntryType {
case lnwallet.Settle:
// forward message to switch which will decide does
// this peer is the final destination of htlc and we
// should notify user about successful income or it
// should be propagated back to the origin peer.
packetsToForward = append(packetsToForward,
newSettlePacket(l.ChanID(),
&lnwire.UpdateFufillHTLC{
PaymentPreimage: pd.RPreimage,
}, pd.RHash, pd.Amount))
case lnwallet.Fail:
opaqueReason := l.cancelReasons[pd.ParentIndex]
// forward message to switch which will decide does
// this peer is the final destination of htlc and we
// should notify user about fail income or it
// should be propagated back to the origin peer.
packetsToForward = append(packetsToForward,
newFailPacket(l.ChanID(),
&lnwire.UpdateFailHTLC{
Reason: opaqueReason,
ChanID: l.ChanID(),
}, pd.RHash, pd.Amount))
case lnwallet.Add:
blob := l.blobs[pd.Index]
buffer := bytes.NewBuffer(blob[:])
delete(l.blobs, pd.Index)
// Before adding the new htlc to the state machine,
// parse the onion object in order to obtain the routing
// information with DecodeOnion function which process
// the Sphinx packet.
// We include the payment hash of the htlc as it's
// authenticated within the Sphinx packet itself as
// associated data in order to thwart attempts a replay
// attacks. In the case of a replay, an attacker is
// *forced* to use the same payment hash twice, thereby
// losing their money entirely.
chanIterator, err := l.cfg.DecodeOnion(buffer, pd.RHash[:])
if err != nil {
// If we're unable to parse the Sphinx packet,
// then we'll cancel the htlc.
log.Errorf("unable to get the next hop: %v", err)
reason := []byte{byte(lnwire.SphinxParseError)}
l.sendHTLCError(pd.RHash, reason)
needUpdate = true
continue
}
if nextChan := chanIterator.Next(); nextChan != nil {
// There are additional channels left within this
// route.
var b bytes.Buffer
var blob [lnwire.OnionPacketSize]byte
err := chanIterator.Encode(&b)
if err != nil {
log.Errorf("unable to encode the "+
"remaining route %v", err)
reason := []byte{byte(lnwire.UnknownError)}
l.sendHTLCError(pd.RHash, reason)
needUpdate = true
continue
}
copy(blob[:], b.Bytes())
packetsToForward = append(packetsToForward,
newAddPacket(l.ChanID(), *nextChan,
&lnwire.UpdateAddHTLC{
Amount: pd.Amount,
PaymentHash: pd.RHash,
OnionBlob: blob,
}))
} else {
// We're the designated payment destination.
// Therefore we attempt to see if we have an
// invoice locally which'll allow us to settle
// this htlc.
invoiceHash := chainhash.Hash(pd.RHash)
invoice, err := l.cfg.Registry.LookupInvoice(invoiceHash)
if err != nil {
log.Errorf("unable to query to locate:"+
" %v", err)
reason := []byte{byte(lnwire.UnknownPaymentHash)}
l.sendHTLCError(pd.RHash, reason)
needUpdate = true
continue
}
// If we're not currently in debug mode, and the
// extended htlc doesn't meet the value requested,
// then we'll fail the htlc. Otherwise, we settle
// this htlc within our local state update log,
// then send the update entry to the remote party.
if !l.cfg.DebugHTLC && pd.Amount < invoice.Terms.Value {
log.Errorf("rejecting htlc due to incorrect "+
"amount: expected %v, received %v",
invoice.Terms.Value, pd.Amount)
reason := []byte{byte(lnwire.IncorrectValue)}
l.sendHTLCError(pd.RHash, reason)
needUpdate = true
continue
}
preimage := invoice.Terms.PaymentPreimage
logIndex, err := l.channel.SettleHTLC(preimage)
if err != nil {
log.Errorf("unable to settle htlc: %v", err)
l.cfg.Peer.Disconnect()
return nil
}
// Notify the invoiceRegistry of the invoices we
// just settled with this latest commitment
// update.
err = l.cfg.Registry.SettleInvoice(invoiceHash)
if err != nil {
log.Errorf("unable to settle invoice: %v", err)
l.cfg.Peer.Disconnect()
return nil
}
// htlc was successfully settled locally send
// notification about it remote peer.
l.cfg.Peer.SendMessage(&lnwire.UpdateFufillHTLC{
ChanID: l.ChanID(),
ID: logIndex,
PaymentPreimage: preimage,
})
needUpdate = true
}
}
}
if needUpdate {
// With all the settle/cancel updates added to the local and
// remote htlc logs, initiate a state transition by updating the
// remote commitment chain.
if err := l.updateCommitTx(); err != nil {
log.Errorf("unable to update commitment: %v", err)
l.cfg.Peer.Disconnect()
return nil
}
}
return packetsToForward
}
// sendHTLCError functions cancels htlc and send cancel message back to the
// peer from which htlc was received.
func (l *channelLink) sendHTLCError(rHash [32]byte,
reason lnwire.OpaqueReason) {
index, err := l.channel.FailHTLC(rHash)
if err != nil {
log.Errorf("unable cancel htlc: %v", err)
return
}
l.cfg.Peer.SendMessage(&lnwire.UpdateFailHTLC{
ChanID: l.ChanID(),
ID: index,
Reason: reason,
})
}