diff --git a/htlcswitch.go b/htlcswitch.go index 77acc316..34a057f8 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -9,12 +9,15 @@ import ( "golang.org/x/crypto/ripemd160" + "github.com/BitfuryLightning/tools/routing" + "github.com/BitfuryLightning/tools/rt/graph" "github.com/btcsuite/fastsha256" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" ) @@ -63,6 +66,8 @@ type htlcPacket struct { // circuitKey uniquely identifies an active Sphinx (onion routing) circuit // between two open channels. Currently, the rHash of the HTLC which created // the circuit is used to uniquely identify each circuit. +// TODO(roasbeef): need to also add in the settle/clear channel points in order +// to support fragmenting payments on the link layer: 1 to N, N to N, etc. type circuitKey [32]byte // paymentCircuit represents an active Sphinx (onion routing) circuit between @@ -145,6 +150,9 @@ type htlcSwitch struct { // fully locked in. htlcPlex chan *htlcPacket + gateway []byte + router *routing.RoutingManager + // TODO(roasbeef): messaging chan to/from upper layer (routing - L3) // TODO(roasbeef): sampler to log sat/sec and tx/sec @@ -154,8 +162,10 @@ type htlcSwitch struct { } // newHtlcSwitch creates a new htlcSwitch. -func newHtlcSwitch() *htlcSwitch { +func newHtlcSwitch(gateway []byte, r *routing.RoutingManager) *htlcSwitch { return &htlcSwitch{ + router: r, + gateway: gateway, chanIndex: make(map[wire.OutPoint]*link), interfaces: make(map[wire.ShaHash][]*link), onionIndex: make(map[[ripemd160.Size]byte][]*link), @@ -326,9 +336,8 @@ out: // Reduce the available bandwidth for the link // as it will clear the above HTLC, increasing // the limbo balance within the channel. - n := - atomic.AddInt64(&circuit.clear.availableBandwidth, - -int64(pkt.amt)) + n := atomic.AddInt64(&circuit.clear.availableBandwidth, + -int64(pkt.amt)) hswcLog.Tracef("Decrementing link %v bandwidth to %v", circuit.clear.chanPoint, n) @@ -345,7 +354,7 @@ out: copy(cKey[:], rHash[:]) // If we initiated the payment then there won't - // be an active circuit so continue propagating + // be an active circuit to continue propagating // the settle over. Therefore, we exit early. circuit, ok := h.paymentCircuits[cKey] if !ok { @@ -474,6 +483,7 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { // A request with a nil channel point indicates that all the current // links for this channel should be cleared. + chansRemoved := make([]*wire.OutPoint, 0, len(links)) if req.chanPoint == nil { hswcLog.Infof("purging all active links for interface %v", hex.EncodeToString(chanInterface[:])) @@ -482,6 +492,8 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { h.chanIndexMtx.Lock() delete(h.chanIndex, *link.chanPoint) h.chanIndexMtx.Unlock() + + chansRemoved = append(chansRemoved, link.chanPoint) } links = nil } else { @@ -492,6 +504,8 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { for i := 0; i < len(links); i++ { chanLink := links[i] if chanLink.chanPoint == req.chanPoint { + chansRemoved = append(chansRemoved, req.chanPoint) + copy(links[i:], links[i+1:]) links[len(links)-1] = nil links = links[:len(links)-1] @@ -501,6 +515,22 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { } } + // Purge the now inactive channels from the routing table. + // TODO(roasbeef): routing layer should only see the links as a + // summation of their capacity/etc + // * distinction between connection close and channel close + for _, linkChan := range chansRemoved { + err := h.router.RemoveChannel( + graph.NewID(hex.EncodeToString(h.gateway)), + graph.NewID(hex.EncodeToString(req.remoteID)), + graph.NewEdgeID(linkChan.String()), + ) + if err != nil { + hswcLog.Errorf("unable to remove channel from "+ + "routing table: %v", err) + } + } + // TODO(roasbeef): clean up/modify onion links // * just have the interfaces index be keyed on hash160? @@ -579,16 +609,25 @@ type unregisterLinkMsg struct { chanInterface [32]byte chanPoint *wire.OutPoint + // TODO(roasbeef): redo interface map + remoteID []byte + done chan struct{} } // UnregisterLink requets the htlcSwitch to unregiser the new active link. An // unregistered link will no longer be considered a candidate to forward // HTLC's. -func (h *htlcSwitch) UnregisterLink(chanInterface [32]byte, chanPoint *wire.OutPoint) { +func (h *htlcSwitch) UnregisterLink(remotePub *btcec.PublicKey, chanPoint *wire.OutPoint) { done := make(chan struct{}, 1) + rawPub := remotePub.SerializeCompressed() - h.linkControl <- &unregisterLinkMsg{chanInterface, chanPoint, done} + h.linkControl <- &unregisterLinkMsg{ + chanInterface: fastsha256.Sum256(rawPub), + chanPoint: chanPoint, + remoteID: rawPub, + done: done, + } <-done } diff --git a/peer.go b/peer.go index 59a40927..9c31074c 100644 --- a/peer.go +++ b/peer.go @@ -311,7 +311,7 @@ func (p *peer) Disconnect() { // Tell the switch to unregister all links associated with this // peer. Passing nil as the target link indicates that all links // associated with this interface should be closed. - p.server.htlcSwitch.UnregisterLink(p.lightningID, nil) + p.server.htlcSwitch.UnregisterLink(p.identityPub, nil) p.server.donePeers <- p }() @@ -814,7 +814,9 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) { // TODO(roasbeef): also wait for confs before removing state peerLog.Infof("ChannelPoint(%v) is now "+ "closed", key) - wipeChannel(p, channel) + if err := wipeChannel(p, channel); err != nil { + peerLog.Errorf("unable to wipe channel: %v", err) + } } // wipeChannel removes the passed channel from all indexes associated with the @@ -826,7 +828,7 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error { // Instruct the Htlc Switch to close this link as the channel is no // longer active. - p.server.htlcSwitch.UnregisterLink(p.lightningID, chanID) + p.server.htlcSwitch.UnregisterLink(p.identityPub, chanID) htlcWireLink, ok := p.htlcManagers[*chanID] if !ok { return nil @@ -956,8 +958,9 @@ out: peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", state.chanPoint) if err := wipeChannel(p, channel); err != nil { - peerLog.Errorf("Unable to wipe channel %v", err) + peerLog.Errorf("unable to wipe channel %v", err) } + break out case <-channel.ForceCloseSignal: peerLog.Warnf("ChannelPoint(%v) has been force "+ @@ -970,7 +973,7 @@ out: // If we haven't sent or received a new commitment // update in some time, check to see if we have any // pending updates we need to commit. If so, then send - // an update incrementing the unacked coutner is + // an update incrementing the unacked counter is // succesful. if !state.channel.PendingUpdates() { continue diff --git a/server.go b/server.go index aa1a31ba..0d193e0d 100644 --- a/server.go +++ b/server.go @@ -94,7 +94,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, chainNotifier: notifier, chanDB: chanDB, fundingMgr: newFundingManager(wallet), - htlcSwitch: newHtlcSwitch(), invoices: newInvoiceRegistry(chanDB), lnwallet: wallet, identityPriv: privKey, @@ -126,6 +125,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, // the graph. selfVertex := hex.EncodeToString(serializedPubKey) s.routingMgr = routing.NewRoutingManager(graph.NewID(selfVertex), nil) + s.htlcSwitch = newHtlcSwitch(serializedPubKey, s.routingMgr) s.rpcServer = newRpcServer(s)