htlcswitch: change over all internal indexes to use short channel ID's

This commit is contained in:
Olaoluwa Osuntokun 2017-06-16 23:49:38 +02:00
parent 4c7af9f16d
commit 1f5a4fcb8e
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
4 changed files with 197 additions and 191 deletions

@ -19,51 +19,31 @@ func (k *circuitKey) String() string {
return hex.EncodeToString(k[:]) return hex.EncodeToString(k[:])
} }
// paymentCircuit is used by htlc switch subsystem in order to determine // paymentCircuit is used by the htlc switch subsystem to determine the
// backward path for settle/fail htlc messages. A payment circuit will be // fowrards/backwards path for the settle/fail HTLC messages. A payment circuit
// created once a channel link forwards the htlc add request and removed when we // will be created once a channel link forwards the htlc add request and
// receive settle/fail htlc message. // removed when we receive settle/fail htlc message.
//
// NOTE: In current implementation of htlc switch, the payment circuit might be
// uniquely identified by payment hash but in future we implement the payment
// fragmentation which makes possible for number of payments to have
// identical payments hashes, but different source or destination.
//
// For example if Alice(A) want to send 2BTC to Bob(B), then payment will be
// split on two parts and node N3 will have circuit with the same payment hash,
// and destination, but different channel source (N1,N2).
//
// 1BTC N1 1BTC
// + --------- o --------- +
// 2BTC | | 2BTC
// A o ------ o N0 N3 o ------ o B
// | |
// + --------- o --------- +
// 1BTC N2 1BTC
//
type paymentCircuit struct { type paymentCircuit struct {
// PaymentHash used as unique identifier of payment. // PaymentHash used as unique identifier of payment.
PaymentHash circuitKey PaymentHash circuitKey
// Src identifies the channel from which add htlc request is came from // Src identifies the channel from which add htlc request is came from
// and to which settle/fail htlc request will be returned back. Once the // and to which settle/fail htlc request will be returned back. Once
// switch forwards the settle/fail message to the src the circuit is // the switch forwards the settle/fail message to the src the circuit
// considered to be completed. // is considered to be completed.
// TODO(andrew.shvv) use short channel id instead. Src lnwire.ShortChannelID
Src lnwire.ChannelID
// Dest identifies the channel to which we propagate the htlc add // Dest identifies the channel to which we propagate the htlc add
// update and from which we are expecting to receive htlc settle/fail // update and from which we are expecting to receive htlc settle/fail
// request back. // request back.
// TODO(andrew.shvv) use short channel id instead. Dest lnwire.ShortChannelID
Dest lnwire.ChannelID
// RefCount is used to count the circuits with the same circuit key. // RefCount is used to count the circuits with the same circuit key.
RefCount int RefCount int
} }
// newPaymentCircuit creates new payment circuit instance. // newPaymentCircuit creates new payment circuit instance.
func newPaymentCircuit(src, dest lnwire.ChannelID, key circuitKey) *paymentCircuit { func newPaymentCircuit(src, dest lnwire.ShortChannelID, key circuitKey) *paymentCircuit {
return &paymentCircuit{ return &paymentCircuit{
Src: src, Src: src,
Dest: dest, Dest: dest,
@ -79,53 +59,47 @@ func (a *paymentCircuit) isEqual(b *paymentCircuit) bool {
a.Dest == b.Dest a.Dest == b.Dest
} }
// circuitMap is a thread safe storage of circuits. Each circuit key (payment // circuitMap is a data structure that implements thread safe storage of
// hash) might have numbers of circuits corresponding to it // circuits. Each circuit key (payment hash) may have several of circuits
// because of future payment fragmentation, now every circuit might be uniquely // corresponding to it due to the possibility of repeated payment hashes.
// identified by payment hash (1-1 mapping).
// //
// NOTE: Also we have the htlc debug mode and in this mode we have the same
// payment hash for all htlcs.
// TODO(andrew.shvv) make it persistent // TODO(andrew.shvv) make it persistent
type circuitMap struct { type circuitMap struct {
mutex sync.RWMutex sync.RWMutex
circuits map[circuitKey]*paymentCircuit circuits map[circuitKey]*paymentCircuit
} }
// newCircuitMap initialized circuit map with previously stored circuits and // newCircuitMap creates a new instance of the circuitMap.
// return circuit map instance.
func newCircuitMap() *circuitMap { func newCircuitMap() *circuitMap {
return &circuitMap{ return &circuitMap{
circuits: make(map[circuitKey]*paymentCircuit), circuits: make(map[circuitKey]*paymentCircuit),
} }
} }
// add function adds circuit in circuit map. // add adds a new active payment circuit to the circuitMap.
func (m *circuitMap) add(circuit *paymentCircuit) error { func (m *circuitMap) add(circuit *paymentCircuit) error {
m.mutex.Lock() m.Lock()
defer m.mutex.Unlock() defer m.Unlock()
// Examine the circuit map to see if this // Examine the circuit map to see if this circuit is already in use or
// circuit is already in use or not. If so, // not. If so, then we'll simply increment the reference count.
// then we'll simply increment the reference // Otherwise, we'll create a new circuit from scratch.
// count. Otherwise, we'll create a new circuit //
// from scratch.
// TODO(roasbeef): include dest+src+amt in key // TODO(roasbeef): include dest+src+amt in key
if c, ok := m.circuits[circuit.PaymentHash]; ok { if c, ok := m.circuits[circuit.PaymentHash]; ok {
c.RefCount++ c.RefCount++
} else { return nil
m.circuits[circuit.PaymentHash] = circuit
} }
m.circuits[circuit.PaymentHash] = circuit
return nil return nil
} }
// remove function removes circuit from map. // remove destroys the target circuit by removing it from the circuit map.
func (m *circuitMap) remove(key circuitKey) ( func (m *circuitMap) remove(key circuitKey) (*paymentCircuit, error) {
*paymentCircuit, error) { m.Lock()
defer m.Unlock()
m.mutex.Lock()
defer m.mutex.Unlock()
if circuit, ok := m.circuits[key]; ok { if circuit, ok := m.circuits[key]; ok {
if circuit.RefCount--; circuit.RefCount == 0 { if circuit.RefCount--; circuit.RefCount == 0 {
@ -139,10 +113,10 @@ func (m *circuitMap) remove(key circuitKey) (
} }
// pending returns number of circuits which are waiting for to be completed // pending returns number of circuits which are waiting for to be completed
// (settle/fail responses to be received) // (settle/fail responses to be received).
func (m *circuitMap) pending() int { func (m *circuitMap) pending() int {
m.mutex.RLock() m.RLock()
defer m.mutex.RUnlock() defer m.RUnlock()
var length int var length int
for _, circuits := range m.circuits { for _, circuits := range m.circuits {

@ -10,40 +10,47 @@ import (
// htlcPacket is a wrapper around htlc lnwire update, which adds additional // htlcPacket is a wrapper around htlc lnwire update, which adds additional
// information which is needed by this package. // information which is needed by this package.
type htlcPacket struct { type htlcPacket struct {
// payHash payment hash of htlc request. // destNode is the first-hop destination of a local created HTLC add
// message.
destNode [33]byte
// payHash is the payment hash of the HTLC which was modified by either
// a settle or fail action.
//
// NOTE: This fields is initialized only in settle and fail packets. // NOTE: This fields is initialized only in settle and fail packets.
payHash [sha256.Size]byte payHash [sha256.Size]byte
// dest is the next channel to which this update will be applied. // dest is the destination of this packet identified by the short
// TODO(andrew.shvv) use short channel id instead. // channel ID of the target link.
dest HopID dest lnwire.ShortChannelID
// src is a previous channel to which htlc was applied. // src is the source of this packet identified by the short channel ID
// TODO(andrew.shvv) use short channel id instead. // of the target link.
src lnwire.ChannelID src lnwire.ShortChannelID
// amount is the value of the HTLC that is being created or modified.
//
// TODO(andrew.shvv) should be removed after introducing sphinx payment.
amount btcutil.Amount
// htlc lnwire message type of which depends on switch request type. // htlc lnwire message type of which depends on switch request type.
htlc lnwire.Message htlc lnwire.Message
// TODO(andrew.shvv) should be removed after introducing sphinx payment.
amount btcutil.Amount
} }
// newInitPacket creates htlc switch add packet which encapsulates the // newInitPacket creates htlc switch add packet which encapsulates the add htlc
// add htlc request and additional information for proper forwarding over // request and additional information for proper forwarding over htlc switch.
// htlc switch. func newInitPacket(destNode [33]byte, htlc *lnwire.UpdateAddHTLC) *htlcPacket {
func newInitPacket(dest HopID, htlc *lnwire.UpdateAddHTLC) *htlcPacket {
return &htlcPacket{ return &htlcPacket{
dest: dest, destNode: destNode,
htlc: htlc, htlc: htlc,
} }
} }
// newAddPacket creates htlc switch add packet which encapsulates the // newAddPacket creates htlc switch add packet which encapsulates the add htlc
// add htlc request and additional information for proper forwarding over // request and additional information for proper forwarding over htlc switch.
// htlc switch. func newAddPacket(src, dest lnwire.ShortChannelID,
func newAddPacket(src lnwire.ChannelID, dest HopID,
htlc *lnwire.UpdateAddHTLC) *htlcPacket { htlc *lnwire.UpdateAddHTLC) *htlcPacket {
return &htlcPacket{ return &htlcPacket{
dest: dest, dest: dest,
src: src, src: src,
@ -54,8 +61,9 @@ func newAddPacket(src lnwire.ChannelID, dest HopID,
// newSettlePacket creates htlc switch ack/settle packet which encapsulates the // newSettlePacket creates htlc switch ack/settle packet which encapsulates the
// settle htlc request which should be created and sent back by last hope in // settle htlc request which should be created and sent back by last hope in
// htlc path. // htlc path.
func newSettlePacket(src lnwire.ChannelID, htlc *lnwire.UpdateFufillHTLC, func newSettlePacket(src lnwire.ShortChannelID, htlc *lnwire.UpdateFufillHTLC,
payHash [sha256.Size]byte, amount btcutil.Amount) *htlcPacket { payHash [sha256.Size]byte, amount btcutil.Amount) *htlcPacket {
return &htlcPacket{ return &htlcPacket{
src: src, src: src,
payHash: payHash, payHash: payHash,
@ -66,8 +74,9 @@ func newSettlePacket(src lnwire.ChannelID, htlc *lnwire.UpdateFufillHTLC,
// newFailPacket creates htlc switch fail packet which encapsulates the fail // newFailPacket creates htlc switch fail packet which encapsulates the fail
// htlc request which propagated back to the original hope who sent the htlc // htlc request which propagated back to the original hope who sent the htlc
// add request if something wrong happened on the path to the final destination. // add request if something wrong happened on the path to the final
func newFailPacket(src lnwire.ChannelID, htlc *lnwire.UpdateFailHTLC, // destination.
func newFailPacket(src lnwire.ShortChannelID, htlc *lnwire.UpdateFailHTLC,
payHash [sha256.Size]byte, amount btcutil.Amount) *htlcPacket { payHash [sha256.Size]byte, amount btcutil.Amount) *htlcPacket {
return &htlcPacket{ return &htlcPacket{
src: src, src: src,

@ -10,8 +10,9 @@ import (
// packetQueue represent the wrapper around the original queue plus the // packetQueue represent the wrapper around the original queue plus the
// functionality for releasing the queue objects in object channel. Such // functionality for releasing the queue objects in object channel. Such
// structures allows storing of all pending object in queue before the moment of // structures allows storing of all pending object in queue before the moment
// actual releasing. // of actual releasing.
//
// TODO(andrew.shvv) structure not preserve the order if object failed second // TODO(andrew.shvv) structure not preserve the order if object failed second
// time. // time.
type packetQueue struct { type packetQueue struct {
@ -22,8 +23,8 @@ type packetQueue struct {
// be re-proceed. // be re-proceed.
pending chan *htlcPacket pending chan *htlcPacket
// grab channel represents the channel-lock which is needed in order // grab channel represents the channel-lock which is needed in order to
// to make "release" goroutines block during other release goroutine // make "release" goroutines block during other release goroutine
// processing. // processing.
grab chan struct{} grab chan struct{}
} }
@ -61,14 +62,14 @@ func (q *packetQueue) release() {
} }
go func() { go func() {
// Grab the pending mutex so that other goroutines waits // Grab the pending mutex so that other goroutines waits before
// before grabbing the object, otherwise the objects will be // grabbing the object, otherwise the objects will be send in
// send in the pending channel in random sequence. // the pending channel in random sequence.
<-q.grab <-q.grab
defer func() { defer func() {
// Release the channel-lock and give other goroutines the // Release the channel-lock and give other goroutines
// ability to // the ability to
q.grab <- struct{}{} q.grab <- struct{}{}
}() }()
@ -79,8 +80,8 @@ func (q *packetQueue) release() {
q.Unlock() q.Unlock()
if e != nil { if e != nil {
// Send the object in object queue and wait it to // Send the object in object queue and wait it to be
// be processed by other side. // processed by other side.
q.pending <- e.Value.(*htlcPacket) q.pending <- e.Value.(*htlcPacket)
// After object have been preprocessed remove it from // After object have been preprocessed remove it from

@ -35,9 +35,9 @@ type pendingPayment struct {
err chan error err chan error
} }
// forwardPacketCmd encapsulates switch packet and adds error channel to // plexPacket encapsulates switch packet and adds error channel to receive
// receive error from request handler. // error from request handler.
type forwardPacketCmd struct { type plexPacket struct {
pkt *htlcPacket pkt *htlcPacket
err chan error err chan error
} }
@ -113,15 +113,26 @@ type Switch struct {
// links is a map of channel id and channel link which manages // links is a map of channel id and channel link which manages
// this channel. // this channel.
links map[lnwire.ChannelID]ChannelLink linkIndex map[lnwire.ChannelID]ChannelLink
// linksIndex is a map which is needed for quick lookup of channels // forwardingIndex is an index which is consulted by the switch when it
// which are belongs to specific peer. // needs to locate the next hop to forward an incoming/outgoing HTLC
linksIndex map[HopID][]ChannelLink // update to/from.
//
// TODO(roasbeef): eventually add a NetworkHop mapping before the
// ChannelLink
forwardingIndex map[lnwire.ShortChannelID]ChannelLink
// forwardCommands is used for propogating the htlc packet forward // interfaceIndex maps the compressed public key of a peer to all the
// requests. // channels that the switch maintains iwht that peer.
forwardCommands chan *forwardPacketCmd interfaceIndex map[[33]byte]map[ChannelLink]struct{}
// htlcPlex is the channel which all connected links use to coordinate
// the setup/teardown of Sphinx (onion routing) payment circuits.
// Active links forward any add/settle messages over this channel each
// state transition, sending new adds/settles which are fully locked
// in.
htlcPlex chan *plexPacket
// chanCloseRequests is used to transfer the channel close request to // chanCloseRequests is used to transfer the channel close request to
// the channel close handler. // the channel close handler.
@ -137,10 +148,11 @@ func New(cfg Config) *Switch {
return &Switch{ return &Switch{
cfg: &cfg, cfg: &cfg,
circuits: newCircuitMap(), circuits: newCircuitMap(),
links: make(map[lnwire.ChannelID]ChannelLink), linkIndex: make(map[lnwire.ChannelID]ChannelLink),
linksIndex: make(map[HopID][]ChannelLink), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink),
interfaceIndex: make(map[[33]byte]map[ChannelLink]struct{}),
pendingPayments: make(map[lnwallet.PaymentHash][]*pendingPayment), pendingPayments: make(map[lnwallet.PaymentHash][]*pendingPayment),
forwardCommands: make(chan *forwardPacketCmd), htlcPlex: make(chan *plexPacket),
chanCloseRequests: make(chan *ChanClose), chanCloseRequests: make(chan *ChanClose),
linkControl: make(chan interface{}), linkControl: make(chan interface{}),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -149,7 +161,7 @@ func New(cfg Config) *Switch {
// SendHTLC is used by other subsystems which aren't belong to htlc switch // SendHTLC is used by other subsystems which aren't belong to htlc switch
// package in order to send the htlc update. // package in order to send the htlc update.
func (s *Switch) SendHTLC(nextNode []byte, update lnwire.Message) ( func (s *Switch) SendHTLC(nextNode [33]byte, update lnwire.Message) (
[sha256.Size]byte, error) { [sha256.Size]byte, error) {
htlc := update.(*lnwire.UpdateAddHTLC) htlc := update.(*lnwire.UpdateAddHTLC)
@ -163,18 +175,15 @@ func (s *Switch) SendHTLC(nextNode []byte, update lnwire.Message) (
amount: htlc.Amount, amount: htlc.Amount,
} }
// Check that we do not have the payment with the same id in order to
// prevent map override.
s.pendingMutex.Lock() s.pendingMutex.Lock()
s.pendingPayments[htlc.PaymentHash] = append( s.pendingPayments[htlc.PaymentHash] = append(
s.pendingPayments[htlc.PaymentHash], payment) s.pendingPayments[htlc.PaymentHash], payment)
s.pendingMutex.Unlock() s.pendingMutex.Unlock()
// Generate and send new update packet, if error will be received // Generate and send new update packet, if error will be received on
// on this stage it means that packet haven't left boundaries of our // this stage it means that packet haven't left boundaries of our
// system and something wrong happened. // system and something wrong happened.
hop := NewHopID(nextNode) packet := newInitPacket(nextNode, htlc)
packet := newInitPacket(hop, htlc)
if err := s.forward(packet); err != nil { if err := s.forward(packet); err != nil {
s.removePendingPayment(payment.amount, payment.paymentHash) s.removePendingPayment(payment.amount, payment.paymentHash)
return zeroPreimage, err return zeroPreimage, err
@ -206,14 +215,20 @@ func (s *Switch) SendHTLC(nextNode []byte, update lnwire.Message) (
// update. Also this function is used by channel links itself in order to // update. Also this function is used by channel links itself in order to
// forward the update after it has been included in the channel. // forward the update after it has been included in the channel.
func (s *Switch) forward(packet *htlcPacket) error { func (s *Switch) forward(packet *htlcPacket) error {
command := &forwardPacketCmd{ command := &plexPacket{
pkt: packet, pkt: packet,
err: make(chan error, 1), err: make(chan error, 1),
} }
select { select {
case s.forwardCommands <- command: case s.htlcPlex <- command:
return <-command.err case <-s.quit:
return errors.New("Htlc Switch was stopped")
}
select {
case err := <-command.err:
return err
case <-s.quit: case <-s.quit:
return errors.New("Htlc Switch was stopped") return errors.New("Htlc Switch was stopped")
} }
@ -239,7 +254,7 @@ func (s *Switch) handleLocalDispatch(payment *pendingPayment, packet *htlcPacket
// appropriate channel link and send the payment over this link. // appropriate channel link and send the payment over this link.
case *lnwire.UpdateAddHTLC: case *lnwire.UpdateAddHTLC:
// Try to find links by node destination. // Try to find links by node destination.
links, err := s.getLinks(packet.dest) links, err := s.getLinks(packet.destNode)
if err != nil { if err != nil {
log.Errorf("unable to find links by "+ log.Errorf("unable to find links by "+
"destination %v", err) "destination %v", err)
@ -303,10 +318,9 @@ func (s *Switch) handleLocalDispatch(payment *pendingPayment, packet *htlcPacket
return nil return nil
} }
// handlePacketForward is used in cases when we need forward the htlc // handlePacketForward is used in cases when we need forward the htlc update
// update from one channel link to another and be able to propagate the // from one channel link to another and be able to propagate the settle/fail
// settle/fail updates back. This behaviour is achieved by creation of payment // updates back. This behaviour is achieved by creation of payment circuits.
// circuits.
func (s *Switch) handlePacketForward(packet *htlcPacket) error { func (s *Switch) handlePacketForward(packet *htlcPacket) error {
switch htlc := packet.htlc.(type) { switch htlc := packet.htlc.(type) {
@ -314,7 +328,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
// payment circuit within our internal state so we can properly forward // payment circuit within our internal state so we can properly forward
// the ultimate settle message back latter. // the ultimate settle message back latter.
case *lnwire.UpdateAddHTLC: case *lnwire.UpdateAddHTLC:
source, err := s.getLink(packet.src) source, err := s.getLinkByShortID(packet.src)
if err != nil { if err != nil {
err := errors.Errorf("unable to find channel link "+ err := errors.Errorf("unable to find channel link "+
"by channel point (%v): %v", packet.src, err) "by channel point (%v): %v", packet.src, err)
@ -322,12 +336,11 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
return err return err
} }
// Try to find links by node destination. targetLink, err := s.getLinkByShortID(packet.dest)
links, err := s.getLinks(packet.dest)
if err != nil { if err != nil {
// If packet was forwarded from another // If packet was forwarded from another channel link
// channel link than we should notify this // than we should notify this link that some error
// link that some error occurred. // occurred.
reason := []byte{byte(lnwire.UnknownDestination)} reason := []byte{byte(lnwire.UnknownDestination)}
go source.HandleSwitchPacket(newFailPacket( go source.HandleSwitchPacket(newFailPacket(
packet.src, packet.src,
@ -336,16 +349,17 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
}, },
htlc.PaymentHash, 0, htlc.PaymentHash, 0,
)) ))
err := errors.Errorf("unable to find links with "+ err := errors.Errorf("unable to find link with "+
"destination %v", err) "destination %v", packet.dest)
log.Error(err) log.Error(err)
return err return err
} }
interfaceLinks, _ := s.getLinks(targetLink.Peer().PubKey())
// Try to find destination channel link with appropriate // Try to find destination channel link with appropriate
// bandwidth. // bandwidth.
var destination ChannelLink var destination ChannelLink
for _, link := range links { for _, link := range interfaceLinks {
if link.Bandwidth() >= htlc.Amount { if link.Bandwidth() >= htlc.Amount {
destination = link destination = link
break break
@ -356,9 +370,9 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
// over has insufficient capacity, then we'll cancel the htlc // over has insufficient capacity, then we'll cancel the htlc
// as the payment cannot succeed. // as the payment cannot succeed.
if destination == nil { if destination == nil {
// If packet was forwarded from another // If packet was forwarded from another channel link
// channel link than we should notify this // than we should notify this link that some error
// link that some error occurred. // occurred.
reason := []byte{byte(lnwire.InsufficientCapacity)} reason := []byte{byte(lnwire.InsufficientCapacity)}
go source.HandleSwitchPacket(newFailPacket( go source.HandleSwitchPacket(newFailPacket(
packet.src, packet.src,
@ -380,8 +394,8 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
// should create circuit (remember the path) in order to // should create circuit (remember the path) in order to
// forward settle/fail packet back. // forward settle/fail packet back.
if err := s.circuits.add(newPaymentCircuit( if err := s.circuits.add(newPaymentCircuit(
source.ChanID(), source.ShortChanID(),
destination.ChanID(), destination.ShortChanID(),
htlc.PaymentHash, htlc.PaymentHash,
)); err != nil { )); err != nil {
reason := []byte{byte(lnwire.UnknownError)} reason := []byte{byte(lnwire.UnknownError)}
@ -419,7 +433,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
} }
// Propagating settle/fail htlc back to src of add htlc packet. // Propagating settle/fail htlc back to src of add htlc packet.
source, err := s.getLink(circuit.Src) source, err := s.getLinkByShortID(circuit.Src)
if err != nil { if err != nil {
err := errors.Errorf("unable to get source "+ err := errors.Errorf("unable to get source "+
"channel link to forward settle/fail htlc: %v", "channel link to forward settle/fail htlc: %v",
@ -440,7 +454,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
} }
} }
// CloseLink creates and sends the the close channel command. // CloseLink creates and sends the close channel command.
func (s *Switch) CloseLink(chanPoint *wire.OutPoint, func (s *Switch) CloseLink(chanPoint *wire.OutPoint,
closeType ChannelCloseType) (chan *lnrpc.CloseStatusUpdate, chan error) { closeType ChannelCloseType) (chan *lnrpc.CloseStatusUpdate, chan error) {
@ -470,27 +484,28 @@ func (s *Switch) CloseLink(chanPoint *wire.OutPoint,
// handleCloseLink sends a message to the peer responsible for the target // handleCloseLink sends a message to the peer responsible for the target
// channel point, instructing it to initiate a cooperative channel closure. // channel point, instructing it to initiate a cooperative channel closure.
func (s *Switch) handleChanelClose(req *ChanClose) { func (s *Switch) handleChanelClose(req *ChanClose) {
chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) targetChanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint)
var link ChannelLink var link ChannelLink
for _, l := range s.links { for chanID, l := range s.linkIndex {
if l.ChanID() == chanID { if chanID == targetChanID {
link = l link = l
} }
} }
if link == nil { if link == nil {
req.Err <- errors.Errorf("channel with ChannelID(%v) not "+ req.Err <- errors.Errorf("channel with ChannelID(%v) not "+
"found", chanID) "found", targetChanID)
return return
} }
log.Debugf("requesting local channel close, peer(%v) channel(%v)", log.Debugf("requesting local channel close, peer(%v) channel(%v)",
link.Peer(), chanID) link.Peer(), targetChanID)
// TODO(roasbeef): if type was CloseBreach initiate force closure with // TODO(roasbeef): if type was CloseBreach initiate force closure with
// all other channels (if any) we have with the remote peer. // all other channels (if any) we have with the remote peer.
s.cfg.LocalChannelClose(link.Peer().PubKey(), req) peerPub := link.Peer().PubKey()
s.cfg.LocalChannelClose(peerPub[:], req)
return return
} }
@ -509,7 +524,7 @@ func (s *Switch) htlcForwarder() {
// Remove all links once we've been signalled for shutdown. // Remove all links once we've been signalled for shutdown.
defer func() { defer func() {
for _, link := range s.links { for _, link := range s.linkIndex {
if err := s.removeLink(link.ChanID()); err != nil { if err := s.removeLink(link.ChanID()); err != nil {
log.Errorf("unable to remove "+ log.Errorf("unable to remove "+
"channel link on stop: %v", err) "channel link on stop: %v", err)
@ -531,9 +546,11 @@ func (s *Switch) htlcForwarder() {
case req := <-s.chanCloseRequests: case req := <-s.chanCloseRequests:
s.handleChanelClose(req) s.handleChanelClose(req)
case cmd := <-s.forwardCommands: case cmd := <-s.htlcPlex:
var paymentHash lnwallet.PaymentHash var (
var amount btcutil.Amount paymentHash lnwallet.PaymentHash
amount btcutil.Amount
)
switch m := cmd.pkt.htlc.(type) { switch m := cmd.pkt.htlc.(type) {
case *lnwire.UpdateAddHTLC: case *lnwire.UpdateAddHTLC:
@ -572,7 +589,7 @@ func (s *Switch) htlcForwarder() {
// Next, we'll run through all the registered links and // Next, we'll run through all the registered links and
// compute their up-to-date forwarding stats. // compute their up-to-date forwarding stats.
for _, link := range s.links { for _, link := range s.linkIndex {
// TODO(roasbeef): when links first registered // TODO(roasbeef): when links first registered
// stats printed. // stats printed.
updates, sent, recv := link.Stats() updates, sent, recv := link.Stats()
@ -617,8 +634,8 @@ func (s *Switch) htlcForwarder() {
totalSatSent += diffSatSent totalSatSent += diffSatSent
totalSatRecv += diffSatRecv totalSatRecv += diffSatRecv
case cmd := <-s.linkControl: case req := <-s.linkControl:
switch cmd := cmd.(type) { switch cmd := req.(type) {
case *addLinkCmd: case *addLinkCmd:
cmd.err <- s.addLink(cmd.link) cmd.err <- s.addLink(cmd.link)
case *removeLinkCmd: case *removeLinkCmd:
@ -696,22 +713,28 @@ func (s *Switch) AddLink(link ChannelLink) error {
// addLink is used to add the newly created channel link and start // addLink is used to add the newly created channel link and start
// use it to handle the channel updates. // use it to handle the channel updates.
func (s *Switch) addLink(link ChannelLink) error { func (s *Switch) addLink(link ChannelLink) error {
// First we'll add the link to the linkIndex which lets us quickly look
// up a channel when we need to close or register it, and the
// forwarding index which'll be used when forwarding HTLC's in the
// multi-hop setting.
s.linkIndex[link.ChanID()] = link
s.forwardingIndex[link.ShortChanID()] = link
// Next we'll add the link to the interface index so we can quickly
// look up all the channels for a particular node.
peerPub := link.Peer().PubKey()
if _, ok := s.interfaceIndex[peerPub]; !ok {
s.interfaceIndex[peerPub] = make(map[ChannelLink]struct{})
}
s.interfaceIndex[peerPub][link] = struct{}{}
if err := link.Start(); err != nil { if err := link.Start(); err != nil {
return err return err
} }
// Add channel link to the channel map, in order to quickly lookup log.Infof("Added channel link with short_chan_id=(%v), bandwidth=%v",
// channel by channel id. link.ShortChanID(), link.Bandwidth())
s.links[link.ChanID()] = link
// Add channel link to the index map, in order to quickly lookup
// channels by peer pub key.
hop := NewHopID(link.Peer().PubKey())
s.linksIndex[hop] = append(s.linksIndex[hop], link)
log.Infof("Added channel link with ChannelID(%v), bandwidth=%v",
link.ChanID(), link.Bandwidth())
return nil return nil
} }
@ -740,9 +763,20 @@ func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelLink, error) {
} }
} }
// getLink returns the channel link by its channel point. // getLink attempts to return the link that has the specified channel ID.
func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) { func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
link, ok := s.links[chanID] link, ok := s.linkIndex[chanID]
if !ok {
return nil, ErrChannelLinkNotFound
}
return link, nil
}
// getLinkByShortID attempts to return the link which possesses the target
// short channel ID.
func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
link, ok := s.forwardingIndex[chanID]
if !ok { if !ok {
return nil, ErrChannelLinkNotFound return nil, ErrChannelLinkNotFound
} }
@ -775,34 +809,22 @@ func (s *Switch) RemoveLink(chanID lnwire.ChannelID) error {
// removeLink is used to remove and stop the channel link. // removeLink is used to remove and stop the channel link.
func (s *Switch) removeLink(chanID lnwire.ChannelID) error { func (s *Switch) removeLink(chanID lnwire.ChannelID) error {
link, ok := s.links[chanID] log.Infof("Removing channel link with ChannelID(%v)", chanID)
link, ok := s.linkIndex[chanID]
if !ok { if !ok {
return ErrChannelLinkNotFound return ErrChannelLinkNotFound
} }
// Remove the channel from channel map. // Remove the channel from channel map.
delete(s.links, link.ChanID()) delete(s.linkIndex, chanID)
delete(s.forwardingIndex, link.ShortChanID())
// Remove the channel from channel index. // Remove the channel from channel index.
hop := NewHopID(link.Peer().PubKey()) peerPub := link.Peer().PubKey()
links := s.linksIndex[hop] delete(s.interfaceIndex, peerPub)
for i, l := range links {
if l.ChanID() == link.ChanID() {
// Delete without preserving order
// Google: Golang slice tricks
links[i] = links[len(links)-1]
links[len(links)-1] = nil
s.linksIndex[hop] = links[:len(links)-1]
if len(s.linksIndex[hop]) == 0 {
delete(s.linksIndex, hop)
}
break
}
}
go link.Stop() go link.Stop()
log.Infof("Remove channel link with ChannelID(%v)", link.ChanID())
return nil return nil
} }
@ -810,14 +832,14 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) error {
// getLinksCmd is a get links command wrapper, it is used to propagate handler // getLinksCmd is a get links command wrapper, it is used to propagate handler
// parameters and return handler error. // parameters and return handler error.
type getLinksCmd struct { type getLinksCmd struct {
peer HopID peer [33]byte
err chan error err chan error
done chan []ChannelLink done chan []ChannelLink
} }
// GetLinks is used to initiate the handling of the get links command. The // GetLinks fetches all the links connected to a particular node identified by
// request will be propagated/handled to/in the main goroutine. // the serialized compressed form of its public key.
func (s *Switch) GetLinks(hop HopID) ([]ChannelLink, error) { func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelLink, error) {
command := &getLinksCmd{ command := &getLinksCmd{
peer: hop, peer: hop,
err: make(chan error, 1), err: make(chan error, 1),
@ -834,19 +856,19 @@ func (s *Switch) GetLinks(hop HopID) ([]ChannelLink, error) {
// getLinks is function which returns the channel links of the peer by hop // getLinks is function which returns the channel links of the peer by hop
// destination id. // destination id.
func (s *Switch) getLinks(destination HopID) ([]ChannelLink, error) { func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
links, ok := s.linksIndex[destination] links, ok := s.interfaceIndex[destination]
if !ok { if !ok {
return nil, errors.Errorf("unable to locate channel link by"+ return nil, errors.Errorf("unable to locate channel link by"+
"destination hop id %v", destination) "destination hop id %v", destination)
} }
result := make([]ChannelLink, len(links)) channelLinks := make([]ChannelLink, 0, len(links))
for i, link := range links { for link := range links {
result[i] = ChannelLink(link) channelLinks = append(channelLinks, link)
} }
return result, nil return channelLinks, nil
} }
// removePendingPayment is the helper function which removes the pending user // removePendingPayment is the helper function which removes the pending user