lnd: add support for multi-hop (Sphinx) onion routed payments
This commit adds full support for multi-hop onion routed payments within the daemon. The switch has been greatly extended in order to gain the functionality required to manage Sphinx payment circuits amongst active links. A payment circuit is initiated when a link sends an HTLC add to the downstream htlcSwitch it received from the upstream peer. The switch then examines the parsed sphinx packet to set up the clear/settle ends of the circuit. Created circuits can be re-used amongst HTLC payments which share the same RHash. All bandwidth updates within a link’s internal state are now managed with atomic increments/decrements in order to avoid race conditions amongst the two goroutines the switch currently uses. Each channel’s htlcManager has also been extended to parse out the next-hop contained within Sphinx packets, and construct a proper htlcPkt such that the htlcSwitch can initiate then manage the payment circuit.
This commit is contained in:
parent
17249316d6
commit
e29d8e7e06
255
htlcswitch.go
255
htlcswitch.go
@ -7,6 +7,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/crypto/ripemd160"
|
||||
|
||||
"github.com/btcsuite/fastsha256"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/lightningnetwork/lightning-onion"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/lnrpc"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
@ -28,7 +33,7 @@ const (
|
||||
type link struct {
|
||||
capacity btcutil.Amount
|
||||
|
||||
availableBandwidth btcutil.Amount
|
||||
availableBandwidth int64 // atomic
|
||||
|
||||
linkChan chan *htlcPacket
|
||||
|
||||
@ -41,15 +46,54 @@ type link struct {
|
||||
// settles an active HTLC. The dest field denotes the name of the interface to
|
||||
// forward this htlcPacket on.
|
||||
type htlcPacket struct {
|
||||
src wire.ShaHash
|
||||
sync.RWMutex
|
||||
|
||||
dest wire.ShaHash
|
||||
|
||||
index uint32
|
||||
srcLink wire.OutPoint
|
||||
onion *sphinx.ProcessedPacket
|
||||
|
||||
msg lnwire.Message
|
||||
amt btcutil.Amount
|
||||
|
||||
err chan error
|
||||
}
|
||||
|
||||
// circuitKey uniquely identifies an active Sphinx (onion routing) circuit
|
||||
// between two open channels. Currently, the rHash of the HTLC which created
|
||||
// the circuit is used to uniquely identify each circuit.
|
||||
type circuitKey [32]byte
|
||||
|
||||
// paymentCircuit represents an active Sphinx (onion routing) circuit between
|
||||
// two active links within the htlcSwitch. A payment circuit is created once a
|
||||
// link forwards an HTLC add request which initites the creation of the ciruit.
|
||||
// The onion routing informtion contained within this message is used to
|
||||
// identify the settle/clear ends of the circuit. A circuit may be re-used (not
|
||||
// torndown) in the case that multiple HTLC's with the send RHash are sent.
|
||||
type paymentCircuit struct {
|
||||
// TODO(roasbeef): add reference count so know when to delete?
|
||||
// * atomic int re
|
||||
// * due to same r-value being re-used?
|
||||
|
||||
// NOTE: This integer must be used *atomically*.
|
||||
refCount uint32
|
||||
|
||||
// clear is the link the htlcSwitch will forward the HTLC add message
|
||||
// that initiated the circuit to. Once the message is forwarded, the
|
||||
// payment circuit is considered "active" from the POV of the switch as
|
||||
// both the incoming/outgoing channels have the cleared HTLC within
|
||||
// their latest state.
|
||||
clear *link
|
||||
|
||||
// settle is the link the htlcSwitch will forward the HTLC settle it
|
||||
// receives from the outgoing peer to. Once the switch forwards the
|
||||
// settle message to this link, the payment circuit is considered
|
||||
// complete unless the reference count on the circuit is greater than
|
||||
// 1.
|
||||
settle *link
|
||||
}
|
||||
|
||||
// HtlcSwitch is a central messaging bus for all incoming/outgoing HTLC's.
|
||||
// Connected peers with active channels are treated as named interfaces which
|
||||
// refer to active channels as links. A link is the switche's message
|
||||
@ -59,6 +103,7 @@ type htlcPacket struct {
|
||||
// HTLC's, forwarding HTLC's initiated from within the daemon, and additionally
|
||||
// splitting up incoming/outgoing HTLC's to a particular interface amongst many
|
||||
// links (payment fragmentation).
|
||||
// TODO(roasbeef): active sphinx circuits need to be synced to disk
|
||||
type htlcSwitch struct {
|
||||
started int32 // atomic
|
||||
shutdown int32 // atomic
|
||||
@ -66,17 +111,38 @@ type htlcSwitch struct {
|
||||
// chanIndex maps a channel's outpoint to a link which contains
|
||||
// additional information about the channel, and additionally houses a
|
||||
// pointer to the peer mangaing the channel.
|
||||
chanIndex map[wire.OutPoint]*link
|
||||
chanIndexMtx sync.RWMutex
|
||||
chanIndex map[wire.OutPoint]*link
|
||||
|
||||
// interfaces maps a node's ID to the set of links (active channels) we
|
||||
// currently have open with that peer.
|
||||
interfaces map[wire.ShaHash][]*link
|
||||
// TODO(roasbeef): combine w/ onionIndex?
|
||||
interfaceMtx sync.RWMutex
|
||||
interfaces map[wire.ShaHash][]*link
|
||||
|
||||
// TODO(roasbeef): msgs for dynamic link quality
|
||||
// onionIndex is a secondary index used to properly forward a message
|
||||
// to the next hop within a Sphinx circuit.
|
||||
onionMtx sync.RWMutex
|
||||
onionIndex map[[ripemd160.Size]byte][]*link
|
||||
|
||||
// paymentCircuits maps a circuit key to an active payment circuit
|
||||
// amongst two oepn channels. This map is used to properly clear/settle
|
||||
// onion routed payments within the network.
|
||||
paymentCircuits map[circuitKey]*paymentCircuit
|
||||
|
||||
// linkControl is a channel used by connected links to notify the
|
||||
// switch of a non-multi-hop triggered link state update.
|
||||
linkControl chan interface{}
|
||||
|
||||
// outgoingPayments is a channel that outgoing payments initiated by
|
||||
// the RPC system.
|
||||
outgoingPayments chan *htlcPacket
|
||||
|
||||
// htlcPlex is the channel in which all connected links use to
|
||||
// coordinate the setup/tear down of Sphinx (onion routing) payment
|
||||
// circuits. Active links forward any add/settle messages over this
|
||||
// channel each state transition, sending new adds/settles which are
|
||||
// fully locked in.
|
||||
htlcPlex chan *htlcPacket
|
||||
|
||||
// TODO(roasbeef): messaging chan to/from upper layer (routing - L3)
|
||||
@ -92,6 +158,8 @@ func newHtlcSwitch() *htlcSwitch {
|
||||
return &htlcSwitch{
|
||||
chanIndex: make(map[wire.OutPoint]*link),
|
||||
interfaces: make(map[wire.ShaHash][]*link),
|
||||
onionIndex: make(map[[ripemd160.Size]byte][]*link),
|
||||
paymentCircuits: make(map[circuitKey]*paymentCircuit),
|
||||
linkControl: make(chan interface{}),
|
||||
htlcPlex: make(chan *htlcPacket, htlcQueueSize),
|
||||
outgoingPayments: make(chan *htlcPacket, htlcQueueSize),
|
||||
@ -141,11 +209,13 @@ func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) error {
|
||||
// fragmenting) incoming/outgoing HTLC's amongst all active interfaces and
|
||||
// their links. The duties of the forwarder are similar to that of a network
|
||||
// switch, in that it facilitates multi-hop payments by acting as a central
|
||||
// messaging bus. Each active channel is modeled as networked device with
|
||||
// meta-data such as the available payment bandwidth, and total link capacity.
|
||||
// messaging bus. The switch communicates will active links to create, manage,
|
||||
// and tearn down active onion routed payments.Each active channel is modeled
|
||||
// as networked device with meta-data such as the available payment bandwidth,
|
||||
// and total link capacity.
|
||||
func (h *htlcSwitch) htlcForwarder() {
|
||||
// TODO(roasbeef): track pending payments here instead of within each peer?
|
||||
// Examine settles/timeouts from htl cplex. Add src to htlcPacket, key by
|
||||
// Examine settles/timeouts from htlcPlex. Add src to htlcPacket, key by
|
||||
// (src, htlcKey).
|
||||
|
||||
// TODO(roasbeef): cleared vs settled distinction
|
||||
@ -157,9 +227,12 @@ out:
|
||||
select {
|
||||
case htlcPkt := <-h.outgoingPayments:
|
||||
dest := htlcPkt.dest
|
||||
h.interfaceMtx.RLock()
|
||||
chanInterface, ok := h.interfaces[dest]
|
||||
h.interfaceMtx.RUnlock()
|
||||
if !ok {
|
||||
err := fmt.Errorf("Unable to locate link %x", dest)
|
||||
err := fmt.Errorf("Unable to locate link %x",
|
||||
dest[:])
|
||||
hswcLog.Errorf(err.Error())
|
||||
htlcPkt.err <- err
|
||||
continue
|
||||
@ -171,48 +244,138 @@ out:
|
||||
// Handle this send request in a distinct goroutine in
|
||||
// order to avoid a possible deadlock between the htlc
|
||||
// switch and channel's htlc manager.
|
||||
var sent bool
|
||||
for _, link := range chanInterface {
|
||||
// TODO(roasbeef): implement HTLC fragmentation
|
||||
// * avoid full channel depletion at higher
|
||||
// level (here) instead of within state
|
||||
// machine?
|
||||
if link.availableBandwidth < amt {
|
||||
if link.availableBandwidth < int64(amt) {
|
||||
continue
|
||||
}
|
||||
|
||||
hswcLog.Tracef("Sending %v to %x", amt, dest[:])
|
||||
|
||||
// TODO(roasbeef): peer downstream should set chanPoint
|
||||
wireMsg.ChannelPoint = link.chanPoint
|
||||
go func() {
|
||||
link.linkChan <- htlcPkt
|
||||
}()
|
||||
|
||||
// TODO(roasbeef): update link info on
|
||||
// timeout/settle
|
||||
link.availableBandwidth -= amt
|
||||
sent = true
|
||||
}
|
||||
n := atomic.AddInt64(&link.availableBandwidth,
|
||||
-int64(amt))
|
||||
hswcLog.Tracef("Decrementing link %v bandwidth to %v",
|
||||
link.chanPoint, n)
|
||||
|
||||
if sent {
|
||||
continue
|
||||
continue out
|
||||
}
|
||||
|
||||
hswcLog.Errorf("Unable to send payment, insufficient capacity")
|
||||
htlcPkt.err <- fmt.Errorf("Insufficient capacity")
|
||||
case pkt := <-h.htlcPlex:
|
||||
numUpdates += 1
|
||||
// TODO(roasbeef): properly account with cleared vs settled
|
||||
switch pkt.msg.(type) {
|
||||
numUpdates += 1
|
||||
|
||||
hswcLog.Tracef("plex packet: %v", newLogClosure(func() string {
|
||||
return spew.Sdump(pkt)
|
||||
}))
|
||||
|
||||
switch wireMsg := pkt.msg.(type) {
|
||||
// A link has just forwarded us a new HTLC, therefore
|
||||
// we initiate the payment circuit within our internal
|
||||
// staate so we can properly forward the ultimate
|
||||
// settle message.
|
||||
case *lnwire.HTLCAddRequest:
|
||||
// Create the two ends of the payment circuit
|
||||
// required to ensure completion of this new
|
||||
// payment.
|
||||
nextHop := pkt.onion.NextHop
|
||||
h.onionMtx.RLock()
|
||||
clearLink, ok := h.onionIndex[nextHop]
|
||||
h.onionMtx.RUnlock()
|
||||
if !ok {
|
||||
hswcLog.Errorf("unable to find dest end of "+
|
||||
"circuit: %x", nextHop)
|
||||
continue
|
||||
}
|
||||
|
||||
h.chanIndexMtx.RLock()
|
||||
settleLink := h.chanIndex[pkt.srcLink]
|
||||
h.chanIndexMtx.RUnlock()
|
||||
|
||||
// TODO(roasbeef): examine per-hop info to decide on link?
|
||||
// * check clear has enough available sat
|
||||
circuit := &paymentCircuit{
|
||||
clear: clearLink[0],
|
||||
settle: settleLink,
|
||||
}
|
||||
|
||||
cKey := circuitKey(wireMsg.RedemptionHashes[0])
|
||||
h.paymentCircuits[cKey] = circuit
|
||||
|
||||
hswcLog.Debugf("Creating onion circuit for %x: %v<->%v",
|
||||
cKey[:], clearLink[0].chanPoint,
|
||||
settleLink.chanPoint)
|
||||
|
||||
// With the circuit initiated, send the htlcPkt
|
||||
// to the clearing link within the circuit to
|
||||
// continue propagating the HTLC accross the
|
||||
// network.
|
||||
circuit.clear.linkChan <- &htlcPacket{
|
||||
msg: wireMsg,
|
||||
err: make(chan error, 1),
|
||||
}
|
||||
|
||||
// Reduce the available bandwidth for the link
|
||||
// as it will clear the above HTLC, increasing
|
||||
// the limbo balance within the channel.
|
||||
n :=
|
||||
atomic.AddInt64(&circuit.clear.availableBandwidth,
|
||||
-int64(pkt.amt))
|
||||
hswcLog.Tracef("Decrementing link %v bandwidth to %v",
|
||||
circuit.clear.chanPoint, n)
|
||||
|
||||
satRecv += pkt.amt
|
||||
|
||||
// We've just received a settle message which means we
|
||||
// can finalize the payment circuit by forwarding the
|
||||
// settle msg to the link which initially created the
|
||||
// circuit.
|
||||
case *lnwire.HTLCSettleRequest:
|
||||
rHash := fastsha256.Sum256(wireMsg.RedemptionProofs[0][:])
|
||||
|
||||
var cKey circuitKey
|
||||
copy(cKey[:], rHash[:])
|
||||
|
||||
// If we initiated the payment then there won't
|
||||
// be an active circuit so continue propagating
|
||||
// the settle over. Therefore, we exit early.
|
||||
circuit, ok := h.paymentCircuits[cKey]
|
||||
if !ok {
|
||||
hswcLog.Debugf("No existing circuit "+
|
||||
"for %x", rHash[:])
|
||||
satSent += pkt.amt
|
||||
continue
|
||||
}
|
||||
|
||||
hswcLog.Debugf("Closing completed onion "+
|
||||
"circuit for %x: %v<->%v", rHash[:],
|
||||
circuit.clear.chanPoint,
|
||||
circuit.settle.chanPoint)
|
||||
|
||||
circuit.settle.linkChan <- &htlcPacket{
|
||||
msg: wireMsg,
|
||||
err: make(chan error, 1),
|
||||
}
|
||||
|
||||
// Increase the available bandwidth for the
|
||||
// link as it will settle the above HTLC,
|
||||
// subtracting from the limbo balacne and
|
||||
// incrementing its local balance.
|
||||
n := atomic.AddInt64(&circuit.settle.availableBandwidth,
|
||||
int64(pkt.amt))
|
||||
hswcLog.Tracef("Incrementing link %v bandwidth to %v",
|
||||
circuit.settle.chanPoint, n)
|
||||
|
||||
satSent += pkt.amt
|
||||
}
|
||||
|
||||
// TODO(roasbeef): parse dest/src, forward on outgoing
|
||||
// link to complete multi-hop payments.
|
||||
case <-logTicker.C:
|
||||
if numUpdates == 0 {
|
||||
continue
|
||||
@ -264,18 +427,32 @@ func (h *htlcSwitch) handleRegisterLink(req *registerLinkMsg) {
|
||||
chanPoint := req.linkInfo.ChannelPoint
|
||||
newLink := &link{
|
||||
capacity: req.linkInfo.Capacity,
|
||||
availableBandwidth: req.linkInfo.LocalBalance,
|
||||
availableBandwidth: int64(req.linkInfo.LocalBalance),
|
||||
linkChan: req.linkChan,
|
||||
peer: req.peer,
|
||||
chanPoint: chanPoint,
|
||||
}
|
||||
|
||||
h.chanIndexMtx.Lock()
|
||||
h.chanIndex[*chanPoint] = newLink
|
||||
h.chanIndexMtx.Unlock()
|
||||
|
||||
interfaceID := req.peer.lightningID
|
||||
h.interfaces[interfaceID] = append(h.interfaces[interfaceID], newLink)
|
||||
|
||||
hswcLog.Infof("registering new link, interface=%v, chan_point=%v, capacity=%v",
|
||||
hex.EncodeToString(interfaceID[:]), chanPoint, newLink.capacity)
|
||||
h.interfaceMtx.Lock()
|
||||
h.interfaces[interfaceID] = append(h.interfaces[interfaceID], newLink)
|
||||
h.interfaceMtx.Unlock()
|
||||
|
||||
var onionId [ripemd160.Size]byte
|
||||
copy(onionId[:], btcutil.Hash160(req.peer.identityPub.SerializeCompressed()))
|
||||
|
||||
h.onionMtx.Lock()
|
||||
h.onionIndex[onionId] = h.interfaces[interfaceID]
|
||||
h.onionMtx.Unlock()
|
||||
|
||||
hswcLog.Infof("registering new link, interface=%x, onion_link=%x, "+
|
||||
"chan_point=%v, capacity=%v", interfaceID[:], onionId,
|
||||
chanPoint, newLink.capacity)
|
||||
|
||||
if req.done != nil {
|
||||
req.done <- struct{}{}
|
||||
@ -290,7 +467,10 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
|
||||
hex.EncodeToString(req.chanInterface[:]), req.chanPoint)
|
||||
|
||||
chanInterface := req.chanInterface
|
||||
|
||||
h.interfaceMtx.RLock()
|
||||
links := h.interfaces[chanInterface]
|
||||
h.interfaceMtx.RUnlock()
|
||||
|
||||
// A request with a nil channel point indicates that all the current
|
||||
// links for this channel should be cleared.
|
||||
@ -299,11 +479,15 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
|
||||
hex.EncodeToString(chanInterface[:]))
|
||||
|
||||
for _, link := range links {
|
||||
h.chanIndexMtx.Lock()
|
||||
delete(h.chanIndex, *link.chanPoint)
|
||||
h.chanIndexMtx.Unlock()
|
||||
}
|
||||
links = nil
|
||||
} else {
|
||||
h.chanIndexMtx.Lock()
|
||||
delete(h.chanIndex, *req.chanPoint)
|
||||
h.chanIndexMtx.Unlock()
|
||||
|
||||
for i := 0; i < len(links); i++ {
|
||||
chanLink := links[i]
|
||||
@ -317,10 +501,15 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(roasbeef): clean up/modify onion links
|
||||
// * just have the interfaces index be keyed on hash160?
|
||||
|
||||
if len(links) == 0 {
|
||||
hswcLog.Infof("interface %v has no active links, destroying",
|
||||
hex.EncodeToString(chanInterface[:]))
|
||||
h.interfaceMtx.Lock()
|
||||
delete(h.interfaces, chanInterface)
|
||||
h.interfaceMtx.Unlock()
|
||||
}
|
||||
|
||||
if req.done != nil {
|
||||
@ -331,7 +520,10 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
|
||||
// handleCloseLink sends a message to the peer responsible for the target
|
||||
// channel point, instructing it to initiate a cooperative channel closure.
|
||||
func (h *htlcSwitch) handleCloseLink(req *closeLinkReq) {
|
||||
h.chanIndexMtx.RLock()
|
||||
targetLink, ok := h.chanIndex[*req.chanPoint]
|
||||
h.chanIndexMtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
req.err <- fmt.Errorf("channel point %v not found", req.chanPoint)
|
||||
return
|
||||
@ -345,8 +537,11 @@ func (h *htlcSwitch) handleCloseLink(req *closeLinkReq) {
|
||||
// handleLinkUpdate processes the link info update message by adjusting the
|
||||
// channels available bandwidth by the delta specified within the message.
|
||||
func (h *htlcSwitch) handleLinkUpdate(req *linkInfoUpdateMsg) {
|
||||
h.chanIndexMtx.RLock()
|
||||
link := h.chanIndex[*req.targetLink]
|
||||
link.availableBandwidth += req.bandwidthDelta
|
||||
h.chanIndexMtx.RUnlock()
|
||||
|
||||
atomic.AddInt64(&link.availableBandwidth, int64(req.bandwidthDelta))
|
||||
|
||||
hswcLog.Tracef("adjusting bandwidth of link %v by %v", req.targetLink,
|
||||
req.bandwidthDelta)
|
||||
|
@ -110,7 +110,7 @@ func (i *invoiceRegistry) LookupInvoice(rHash wire.ShaHash) (*channeldb.Invoice,
|
||||
// dbueg invoice, then this method is a nooop as debug invoices are never fully
|
||||
// settled.
|
||||
func (i *invoiceRegistry) SettleInvoice(rHash wire.ShaHash) error {
|
||||
ltndLog.Debugf("Setting invoice %x", rHash[:])
|
||||
ltndLog.Debugf("Settling invoice %x", rHash[:])
|
||||
|
||||
// First check the in-memory debug invoice index to see if this is an
|
||||
// existing invoice added for debugging.
|
||||
|
156
peer.go
156
peer.go
@ -891,6 +891,14 @@ type commitmentState struct {
|
||||
// within HTLC add messages.
|
||||
sphinx *sphinx.Router
|
||||
|
||||
// pendingCircuits tracks the remote log index of the incoming HTLC's,
|
||||
// mapped to the processed Sphinx packet contained within the HTLC.
|
||||
// This map is used as a staging area between when an HTLC is added to
|
||||
// the log, and when it's locked into the commitment state of both
|
||||
// chains. Once locked in, the processed packet is sent to the switch
|
||||
// along with the HTLC to forward the packet to the next hop.
|
||||
pendingCircuits map[uint32]*sphinx.ProcessedPacket
|
||||
|
||||
channel *lnwallet.LightningChannel
|
||||
chanPoint *wire.OutPoint
|
||||
}
|
||||
@ -926,12 +934,13 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
|
||||
}
|
||||
|
||||
state := &commitmentState{
|
||||
channel: channel,
|
||||
chanPoint: channel.ChannelPoint(),
|
||||
clearedHTCLs: make(map[uint32]*pendingPayment),
|
||||
htlcsToSettle: make(map[uint32]*channeldb.Invoice),
|
||||
sphinx: p.server.sphinx,
|
||||
switchChan: htlcPlex,
|
||||
channel: channel,
|
||||
chanPoint: channel.ChannelPoint(),
|
||||
clearedHTCLs: make(map[uint32]*pendingPayment),
|
||||
htlcsToSettle: make(map[uint32]*channeldb.Invoice),
|
||||
pendingCircuits: make(map[uint32]*sphinx.ProcessedPacket),
|
||||
sphinx: p.server.sphinx,
|
||||
switchChan: htlcPlex,
|
||||
}
|
||||
|
||||
// TODO(roasbeef): check to see if able to settle any currently pending
|
||||
@ -1022,12 +1031,14 @@ out:
|
||||
// HTLC's, timeout previously cleared HTLC's, and finally to settle currently
|
||||
// cleared HTLC's with the upstream peer.
|
||||
func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
|
||||
var isSettle bool
|
||||
switch htlc := pkt.msg.(type) {
|
||||
case *lnwire.HTLCAddRequest:
|
||||
// A new payment has been initiated via the
|
||||
// downstream channel, so we add the new HTLC
|
||||
// to our local log, then update the commitment
|
||||
// chains.
|
||||
htlc.ChannelPoint = state.chanPoint
|
||||
index := state.channel.AddHTLC(htlc)
|
||||
p.queueMsg(htlc, nil)
|
||||
|
||||
@ -1037,21 +1048,37 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
|
||||
err: pkt.err,
|
||||
})
|
||||
|
||||
// If this newly added update exceeds the max batch size, the
|
||||
// initiate an update.
|
||||
// TODO(roasbeef): enforce max HTLC's in flight limit
|
||||
if len(state.pendingBatch) >= 10 {
|
||||
if sent, err := p.updateCommitTx(state); err != nil {
|
||||
peerLog.Errorf("unable to update "+
|
||||
"commitment: %v", err)
|
||||
p.Disconnect()
|
||||
return
|
||||
} else if !sent {
|
||||
return
|
||||
}
|
||||
|
||||
state.numUnAcked += 1
|
||||
case *lnwire.HTLCSettleRequest:
|
||||
pre := htlc.RedemptionProofs[0]
|
||||
logIndex, err := state.channel.SettleHTLC(pre)
|
||||
if err != nil {
|
||||
// TODO(roasbeef): broadcast on-chain
|
||||
peerLog.Errorf("settle for incoming HTLC rejected: %v", err)
|
||||
p.Disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
htlc.ChannelPoint = state.chanPoint
|
||||
htlc.HTLCKey = lnwire.HTLCKey(logIndex)
|
||||
|
||||
p.queueMsg(htlc, nil)
|
||||
isSettle = true
|
||||
}
|
||||
|
||||
// If this newly added update exceeds the max batch size for adds, or
|
||||
// this is a settle request, then initiate an update.
|
||||
// TODO(roasbeef): enforce max HTLC's in flight limit
|
||||
if len(state.pendingBatch) >= 10 || isSettle {
|
||||
if sent, err := p.updateCommitTx(state); err != nil {
|
||||
peerLog.Errorf("unable to update "+
|
||||
"commitment: %v", err)
|
||||
p.Disconnect()
|
||||
return
|
||||
} else if !sent {
|
||||
return
|
||||
}
|
||||
|
||||
state.numUnAcked += 1
|
||||
}
|
||||
}
|
||||
|
||||
@ -1072,7 +1099,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
|
||||
p.Disconnect()
|
||||
return
|
||||
}
|
||||
mixHeader, err := state.sphinx.ProcessOnionPacket(onionPkt)
|
||||
sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt)
|
||||
if err != nil {
|
||||
peerLog.Errorf("unable to process onion pkt: %v", err)
|
||||
p.Disconnect()
|
||||
@ -1084,10 +1111,10 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
|
||||
// "settle" list in the event that we know the pre-image
|
||||
index := state.channel.ReceiveHTLC(htlcPkt)
|
||||
|
||||
switch mixHeader.Action {
|
||||
switch sphinxPacket.Action {
|
||||
// We're the designated payment destination. Therefore we
|
||||
// attempt to see if we have an invoice locally which'll
|
||||
// allow us to settle this HTLC.
|
||||
// attempt to see if we have an invoice locally which'll allow
|
||||
// us to settle this HTLC.
|
||||
case sphinx.ExitNode:
|
||||
rHash := htlcPkt.RedemptionHashes[0]
|
||||
invoice, err := p.server.invoices.LookupInvoice(rHash)
|
||||
@ -1100,11 +1127,15 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
|
||||
|
||||
// TODO(roasbeef): check values accept if >=
|
||||
state.htlcsToSettle[index] = invoice
|
||||
return
|
||||
|
||||
// There are additional hops left within this route, so we
|
||||
// track the next hop according to the index of this HTLC
|
||||
// within their log. When forwarding locked-in HLTC's to the
|
||||
// switch, we'll attach the routing information so the switch
|
||||
// can finalize the circuit.
|
||||
case sphinx.MoreHops:
|
||||
// TODO(roasbeef): parse out the next dest so can
|
||||
// attach to packet when forwarding.
|
||||
// * send cancel + error if not in rounting table
|
||||
// TODO(roasbeef): send cancel + error if not in rounting table
|
||||
state.pendingCircuits[index] = sphinxPacket
|
||||
default:
|
||||
peerLog.Errorf("mal formed onion packet")
|
||||
p.Disconnect()
|
||||
@ -1163,25 +1194,12 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
|
||||
return
|
||||
}
|
||||
|
||||
// We perform the HTLC forwarding to the switch in a distinct
|
||||
// goroutine in order not to block the post-processing of
|
||||
// HTLC's that are eligble for forwarding.
|
||||
// TODO(roasbeef): don't forward if we're going to settle them
|
||||
go func() {
|
||||
for _, htlc := range htlcsToForward {
|
||||
// Send this fully activated HTLC to the htlc
|
||||
// switch to continue the chained clear/settle.
|
||||
state.switchChan <- p.logEntryToHtlcPkt(htlc)
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
// If any of the htlc's eligible for forwarding are pending
|
||||
// settling or timeing out previous outgoing payments, then we
|
||||
// can them from the pending set, and signal the requster (if
|
||||
// existing) that the payment has been fully fulfilled.
|
||||
var bandwidthUpdate btcutil.Amount
|
||||
var settledPayments []wire.ShaHash
|
||||
settledPayments := make(map[lnwallet.PaymentHash]struct{})
|
||||
numSettled := 0
|
||||
for _, htlc := range htlcsToForward {
|
||||
if p, ok := state.clearedHTCLs[htlc.ParentIndex]; ok {
|
||||
@ -1222,12 +1240,37 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
|
||||
delete(state.htlcsToSettle, htlc.Index)
|
||||
|
||||
bandwidthUpdate += invoice.Terms.Value
|
||||
settledPayments = append(settledPayments,
|
||||
wire.ShaHash(htlc.RHash))
|
||||
settledPayments[htlc.RHash] = struct{}{}
|
||||
|
||||
numSettled++
|
||||
}
|
||||
|
||||
go func() {
|
||||
for _, htlc := range htlcsToForward {
|
||||
// We don't need to forward any HTLC's that we
|
||||
// just settled above.
|
||||
if _, ok := settledPayments[htlc.RHash]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
onionPkt := state.pendingCircuits[htlc.Index]
|
||||
delete(state.pendingCircuits, htlc.Index)
|
||||
|
||||
// Send this fully activated HTLC to the htlc
|
||||
// switch to continue the chained clear/settle.
|
||||
pkt, err := logEntryToHtlcPkt(*state.chanPoint,
|
||||
htlc, onionPkt)
|
||||
if err != nil {
|
||||
peerLog.Errorf("unable to make htlc pkt: %v",
|
||||
err)
|
||||
continue
|
||||
}
|
||||
|
||||
state.switchChan <- pkt
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
if numSettled == 0 {
|
||||
return
|
||||
}
|
||||
@ -1254,8 +1297,8 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
|
||||
|
||||
// Notify the invoiceRegistry of the invoices we just settled
|
||||
// with this latest commitment update.
|
||||
for _, invoice := range settledPayments {
|
||||
err := p.server.invoices.SettleInvoice(invoice)
|
||||
for invoice, _ := range settledPayments {
|
||||
err := p.server.invoices.SettleInvoice(wire.ShaHash(invoice))
|
||||
if err != nil {
|
||||
peerLog.Errorf("unable to settle invoice: %v", err)
|
||||
}
|
||||
@ -1305,7 +1348,10 @@ func (p *peer) updateCommitTx(state *commitmentState) (bool, error) {
|
||||
// log entry the corresponding htlcPacket with src/dest set along with the
|
||||
// proper wire message. This helepr method is provided in order to aide an
|
||||
// htlcManager in forwarding packets to the htlcSwitch.
|
||||
func (p *peer) logEntryToHtlcPkt(pd *lnwallet.PaymentDescriptor) *htlcPacket {
|
||||
func logEntryToHtlcPkt(chanPoint wire.OutPoint,
|
||||
pd *lnwallet.PaymentDescriptor,
|
||||
onionPkt *sphinx.ProcessedPacket) (*htlcPacket, error) {
|
||||
|
||||
pkt := &htlcPacket{}
|
||||
|
||||
// TODO(roasbeef): alter after switch to log entry interface
|
||||
@ -1313,23 +1359,29 @@ func (p *peer) logEntryToHtlcPkt(pd *lnwallet.PaymentDescriptor) *htlcPacket {
|
||||
switch pd.EntryType {
|
||||
case lnwallet.Add:
|
||||
// TODO(roasbeef): timeout, onion blob, etc
|
||||
var b bytes.Buffer
|
||||
if err := onionPkt.Packet.Encode(&b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
msg = &lnwire.HTLCAddRequest{
|
||||
Amount: lnwire.CreditsAmount(pd.Amount),
|
||||
RedemptionHashes: [][32]byte{pd.RHash},
|
||||
OnionBlob: b.Bytes(),
|
||||
}
|
||||
case lnwallet.Settle:
|
||||
// TODO(roasbeef): thread through preimage
|
||||
msg = &lnwire.HTLCSettleRequest{
|
||||
HTLCKey: lnwire.HTLCKey(pd.ParentIndex),
|
||||
RedemptionProofs: [][32]byte{pd.RPreimage},
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(roasbeef): set dest via onion blob or state
|
||||
pkt.amt = pd.Amount
|
||||
pkt.msg = msg
|
||||
pkt.src = p.lightningID
|
||||
|
||||
return pkt
|
||||
pkt.srcLink = chanPoint
|
||||
pkt.onion = onionPkt
|
||||
|
||||
return pkt, nil
|
||||
}
|
||||
|
||||
// TODO(roasbeef): make all start/stop mutexes a CAS
|
||||
|
Loading…
Reference in New Issue
Block a user