diff --git a/htlcswitch.go b/htlcswitch.go index 05fd7b6b..829db4c4 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -41,7 +41,7 @@ type link struct { type htlcPacket struct { dest wire.ShaHash - htlc lnwire.Message + msg lnwire.Message } // HtlcSwitch is a central messaging bus for all incoming/outgoing HTLC's. @@ -65,6 +65,8 @@ type htlcSwitch struct { outgoingPayments chan *htlcPacket + htlcPlex chan *htlcPacket + // TODO(roasbeef): messaging chan to/from upper layer (routing - L3) wg sync.WaitGroup @@ -77,6 +79,7 @@ func newHtlcSwitch() *htlcSwitch { chanIndex: make(map[wire.OutPoint]*link), interfaces: make(map[wire.ShaHash][]*link), linkControl: make(chan interface{}), + htlcPlex: make(chan *htlcPacket, htlcQueueSize), outgoingPayments: make(chan *htlcPacket, 20), } } @@ -107,6 +110,16 @@ func (h *htlcSwitch) Stop() error { return nil } +// SendHTLC queues a HTLC packet for forwarding over the designated interface. +// In the event that the interface has insufficient capacity for the payment, +// an error is returned. Additionally, if the interface cannot be found, an +// alternative error is returned. +func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) error { + // TODO(roasbeef): hook in errors + h.outgoingPayments <- htlcPkt + return nil +} + // htlcForwarder is responsible for optimally forwarding (and possibly // fragmenting) incoming/outgoing HTLC's amongst all active interfaces and // their links. @@ -114,7 +127,37 @@ func (h *htlcSwitch) htlcForwarder() { out: for { select { - case <-h.outgoingPayments: + case htlcPkt := <-h.outgoingPayments: + chanInterface, ok := h.interfaces[htlcPkt.dest] + if !ok { + hswcLog.Errorf("unable to locate link %x", htlcPkt.dest[:]) + continue + } + + wireMsg := htlcPkt.msg.(*lnwire.HTLCAddRequest) + amt := btcutil.Amount(wireMsg.Amount) + hswcLog.Debugf("attempting to send %v to %v", amt, + hex.EncodeToString(htlcPkt.dest[:])) + + for _, link := range chanInterface { + // TODO(roasbeef): implement HTLC fragmentation + if link.availableBandwidth >= amt { + hswcLog.Debugf("selected %v for payment of %v to %x", + link.chanPoint, amt, htlcPkt.dest[:]) + + wireMsg.ChannelPoint = link.chanPoint + link.linkChan <- wireMsg + // TODO(roasbeef): update link info on + // timeout/settle + link.availableBandwidth -= amt + } + } + + if wireMsg.ChannelPoint == nil { + hswcLog.Errorf("unable to send payment, " + + "insufficient capacity") + } + case <-h.htlcPlex: case <-h.quit: break out } @@ -230,13 +273,15 @@ type registerLinkMsg struct { // RegisterLink requests the htlcSwitch to register a new active link. The new // link encapsulates an active channel. func (h *htlcSwitch) RegisterLink(p *peer, linkInfo *channeldb.ChannelSnapshot, - linkChan chan lnwire.Message) { + linkChan chan lnwire.Message) chan *htlcPacket { done := make(chan struct{}, 1) req := ®isterLinkMsg{p, linkInfo, linkChan, done} h.linkControl <- req <-done + + return h.htlcPlex } // unregisterLinkMsg is a message which requests the active ink be unregistered.