routing: purge closed channels from the routing table

This commit properly removes any/all closed channels from the routing
table. In the current implementation individual links (channels)
between nodes are treated sparely from the PoV of the routing table. In
the future, this behavior should be modified such that, the routing
table views all the links between nodes as a single channel. Such a
change will simplify the task of path finding as the links can simply
be viewed as a channel with the sum of their capacities. The link layer
(htlcSwitch) will handle the details of fragmentation on a local basis.
This commit is contained in:
Olaoluwa Osuntokun 2016-09-26 10:35:10 -07:00
parent 9bb917cd2a
commit f1d0b75b9d
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 55 additions and 13 deletions

@ -9,12 +9,15 @@ import (
"golang.org/x/crypto/ripemd160" "golang.org/x/crypto/ripemd160"
"github.com/BitfuryLightning/tools/routing"
"github.com/BitfuryLightning/tools/rt/graph"
"github.com/btcsuite/fastsha256" "github.com/btcsuite/fastsha256"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
) )
@ -63,6 +66,8 @@ type htlcPacket struct {
// circuitKey uniquely identifies an active Sphinx (onion routing) circuit // circuitKey uniquely identifies an active Sphinx (onion routing) circuit
// between two open channels. Currently, the rHash of the HTLC which created // between two open channels. Currently, the rHash of the HTLC which created
// the circuit is used to uniquely identify each circuit. // the circuit is used to uniquely identify each circuit.
// TODO(roasbeef): need to also add in the settle/clear channel points in order
// to support fragmenting payments on the link layer: 1 to N, N to N, etc.
type circuitKey [32]byte type circuitKey [32]byte
// paymentCircuit represents an active Sphinx (onion routing) circuit between // paymentCircuit represents an active Sphinx (onion routing) circuit between
@ -145,6 +150,9 @@ type htlcSwitch struct {
// fully locked in. // fully locked in.
htlcPlex chan *htlcPacket htlcPlex chan *htlcPacket
gateway []byte
router *routing.RoutingManager
// TODO(roasbeef): messaging chan to/from upper layer (routing - L3) // TODO(roasbeef): messaging chan to/from upper layer (routing - L3)
// TODO(roasbeef): sampler to log sat/sec and tx/sec // TODO(roasbeef): sampler to log sat/sec and tx/sec
@ -154,8 +162,10 @@ type htlcSwitch struct {
} }
// newHtlcSwitch creates a new htlcSwitch. // newHtlcSwitch creates a new htlcSwitch.
func newHtlcSwitch() *htlcSwitch { func newHtlcSwitch(gateway []byte, r *routing.RoutingManager) *htlcSwitch {
return &htlcSwitch{ return &htlcSwitch{
router: r,
gateway: gateway,
chanIndex: make(map[wire.OutPoint]*link), chanIndex: make(map[wire.OutPoint]*link),
interfaces: make(map[wire.ShaHash][]*link), interfaces: make(map[wire.ShaHash][]*link),
onionIndex: make(map[[ripemd160.Size]byte][]*link), onionIndex: make(map[[ripemd160.Size]byte][]*link),
@ -326,8 +336,7 @@ out:
// Reduce the available bandwidth for the link // Reduce the available bandwidth for the link
// as it will clear the above HTLC, increasing // as it will clear the above HTLC, increasing
// the limbo balance within the channel. // the limbo balance within the channel.
n := n := atomic.AddInt64(&circuit.clear.availableBandwidth,
atomic.AddInt64(&circuit.clear.availableBandwidth,
-int64(pkt.amt)) -int64(pkt.amt))
hswcLog.Tracef("Decrementing link %v bandwidth to %v", hswcLog.Tracef("Decrementing link %v bandwidth to %v",
circuit.clear.chanPoint, n) circuit.clear.chanPoint, n)
@ -345,7 +354,7 @@ out:
copy(cKey[:], rHash[:]) copy(cKey[:], rHash[:])
// If we initiated the payment then there won't // If we initiated the payment then there won't
// be an active circuit so continue propagating // be an active circuit to continue propagating
// the settle over. Therefore, we exit early. // the settle over. Therefore, we exit early.
circuit, ok := h.paymentCircuits[cKey] circuit, ok := h.paymentCircuits[cKey]
if !ok { if !ok {
@ -474,6 +483,7 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
// A request with a nil channel point indicates that all the current // A request with a nil channel point indicates that all the current
// links for this channel should be cleared. // links for this channel should be cleared.
chansRemoved := make([]*wire.OutPoint, 0, len(links))
if req.chanPoint == nil { if req.chanPoint == nil {
hswcLog.Infof("purging all active links for interface %v", hswcLog.Infof("purging all active links for interface %v",
hex.EncodeToString(chanInterface[:])) hex.EncodeToString(chanInterface[:]))
@ -482,6 +492,8 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
h.chanIndexMtx.Lock() h.chanIndexMtx.Lock()
delete(h.chanIndex, *link.chanPoint) delete(h.chanIndex, *link.chanPoint)
h.chanIndexMtx.Unlock() h.chanIndexMtx.Unlock()
chansRemoved = append(chansRemoved, link.chanPoint)
} }
links = nil links = nil
} else { } else {
@ -492,6 +504,8 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
for i := 0; i < len(links); i++ { for i := 0; i < len(links); i++ {
chanLink := links[i] chanLink := links[i]
if chanLink.chanPoint == req.chanPoint { if chanLink.chanPoint == req.chanPoint {
chansRemoved = append(chansRemoved, req.chanPoint)
copy(links[i:], links[i+1:]) copy(links[i:], links[i+1:])
links[len(links)-1] = nil links[len(links)-1] = nil
links = links[:len(links)-1] links = links[:len(links)-1]
@ -501,6 +515,22 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
} }
} }
// Purge the now inactive channels from the routing table.
// TODO(roasbeef): routing layer should only see the links as a
// summation of their capacity/etc
// * distinction between connection close and channel close
for _, linkChan := range chansRemoved {
err := h.router.RemoveChannel(
graph.NewID(hex.EncodeToString(h.gateway)),
graph.NewID(hex.EncodeToString(req.remoteID)),
graph.NewEdgeID(linkChan.String()),
)
if err != nil {
hswcLog.Errorf("unable to remove channel from "+
"routing table: %v", err)
}
}
// TODO(roasbeef): clean up/modify onion links // TODO(roasbeef): clean up/modify onion links
// * just have the interfaces index be keyed on hash160? // * just have the interfaces index be keyed on hash160?
@ -579,16 +609,25 @@ type unregisterLinkMsg struct {
chanInterface [32]byte chanInterface [32]byte
chanPoint *wire.OutPoint chanPoint *wire.OutPoint
// TODO(roasbeef): redo interface map
remoteID []byte
done chan struct{} done chan struct{}
} }
// UnregisterLink requets the htlcSwitch to unregiser the new active link. An // UnregisterLink requets the htlcSwitch to unregiser the new active link. An
// unregistered link will no longer be considered a candidate to forward // unregistered link will no longer be considered a candidate to forward
// HTLC's. // HTLC's.
func (h *htlcSwitch) UnregisterLink(chanInterface [32]byte, chanPoint *wire.OutPoint) { func (h *htlcSwitch) UnregisterLink(remotePub *btcec.PublicKey, chanPoint *wire.OutPoint) {
done := make(chan struct{}, 1) done := make(chan struct{}, 1)
rawPub := remotePub.SerializeCompressed()
h.linkControl <- &unregisterLinkMsg{chanInterface, chanPoint, done} h.linkControl <- &unregisterLinkMsg{
chanInterface: fastsha256.Sum256(rawPub),
chanPoint: chanPoint,
remoteID: rawPub,
done: done,
}
<-done <-done
} }

13
peer.go

@ -311,7 +311,7 @@ func (p *peer) Disconnect() {
// Tell the switch to unregister all links associated with this // Tell the switch to unregister all links associated with this
// peer. Passing nil as the target link indicates that all links // peer. Passing nil as the target link indicates that all links
// associated with this interface should be closed. // associated with this interface should be closed.
p.server.htlcSwitch.UnregisterLink(p.lightningID, nil) p.server.htlcSwitch.UnregisterLink(p.identityPub, nil)
p.server.donePeers <- p p.server.donePeers <- p
}() }()
@ -814,7 +814,9 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) {
// TODO(roasbeef): also wait for confs before removing state // TODO(roasbeef): also wait for confs before removing state
peerLog.Infof("ChannelPoint(%v) is now "+ peerLog.Infof("ChannelPoint(%v) is now "+
"closed", key) "closed", key)
wipeChannel(p, channel) if err := wipeChannel(p, channel); err != nil {
peerLog.Errorf("unable to wipe channel: %v", err)
}
} }
// wipeChannel removes the passed channel from all indexes associated with the // wipeChannel removes the passed channel from all indexes associated with the
@ -826,7 +828,7 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error {
// Instruct the Htlc Switch to close this link as the channel is no // Instruct the Htlc Switch to close this link as the channel is no
// longer active. // longer active.
p.server.htlcSwitch.UnregisterLink(p.lightningID, chanID) p.server.htlcSwitch.UnregisterLink(p.identityPub, chanID)
htlcWireLink, ok := p.htlcManagers[*chanID] htlcWireLink, ok := p.htlcManagers[*chanID]
if !ok { if !ok {
return nil return nil
@ -956,8 +958,9 @@ out:
peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain",
state.chanPoint) state.chanPoint)
if err := wipeChannel(p, channel); err != nil { if err := wipeChannel(p, channel); err != nil {
peerLog.Errorf("Unable to wipe channel %v", err) peerLog.Errorf("unable to wipe channel %v", err)
} }
break out break out
case <-channel.ForceCloseSignal: case <-channel.ForceCloseSignal:
peerLog.Warnf("ChannelPoint(%v) has been force "+ peerLog.Warnf("ChannelPoint(%v) has been force "+
@ -970,7 +973,7 @@ out:
// If we haven't sent or received a new commitment // If we haven't sent or received a new commitment
// update in some time, check to see if we have any // update in some time, check to see if we have any
// pending updates we need to commit. If so, then send // pending updates we need to commit. If so, then send
// an update incrementing the unacked coutner is // an update incrementing the unacked counter is
// succesful. // succesful.
if !state.channel.PendingUpdates() { if !state.channel.PendingUpdates() {
continue continue

@ -94,7 +94,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
chainNotifier: notifier, chainNotifier: notifier,
chanDB: chanDB, chanDB: chanDB,
fundingMgr: newFundingManager(wallet), fundingMgr: newFundingManager(wallet),
htlcSwitch: newHtlcSwitch(),
invoices: newInvoiceRegistry(chanDB), invoices: newInvoiceRegistry(chanDB),
lnwallet: wallet, lnwallet: wallet,
identityPriv: privKey, identityPriv: privKey,
@ -126,6 +125,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
// the graph. // the graph.
selfVertex := hex.EncodeToString(serializedPubKey) selfVertex := hex.EncodeToString(serializedPubKey)
s.routingMgr = routing.NewRoutingManager(graph.NewID(selfVertex), nil) s.routingMgr = routing.NewRoutingManager(graph.NewID(selfVertex), nil)
s.htlcSwitch = newHtlcSwitch(serializedPubKey, s.routingMgr)
s.rpcServer = newRpcServer(s) s.rpcServer = newRpcServer(s)