From b86409cdb371505cd83e97e4a52d4f85bbf31c8c Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Mon, 1 May 2017 19:58:08 +0300 Subject: [PATCH] htlcswitch: recreate hlcswitch from scratch This commit gives the start for making the htlc manager and htlc switch testable. The testability of htlc switch have been achieved by mocking all external subsystems. The concrete list of updates: 1. create standalone package for htlc switch. 2. add "ChannelLink" interface, which represent the previous htlc link. 3. add "Peer" interface, which represent the remote node inside our subsystem. 4. add htlc switch config to htlc switch susbystem, which stores the handlers which are not elongs to any of the above interfaces. With this commit we are able test htlc switch even without having the concrete implementation of Peer, ChannelLink structures, they will be added later. --- htlcswitch/circuit.go | 153 +++++++ htlcswitch/interfaces.go | 75 ++++ htlcswitch/iterator.go | 18 +- htlcswitch/log.go | 54 +++ htlcswitch/mock.go | 202 +++++++++ htlcswitch/packet.go | 78 ++++ htlcswitch/switch.go | 861 +++++++++++++++++++++++++++++++++++++ htlcswitch/switch_test.go | 378 ++++++++++++++++ lnwire/update_fail_htlc.go | 4 + 9 files changed, 1814 insertions(+), 9 deletions(-) create mode 100644 htlcswitch/circuit.go create mode 100644 htlcswitch/interfaces.go create mode 100644 htlcswitch/log.go create mode 100644 htlcswitch/mock.go create mode 100644 htlcswitch/packet.go create mode 100644 htlcswitch/switch.go create mode 100644 htlcswitch/switch_test.go diff --git a/htlcswitch/circuit.go b/htlcswitch/circuit.go new file mode 100644 index 00000000..978050ea --- /dev/null +++ b/htlcswitch/circuit.go @@ -0,0 +1,153 @@ +package htlcswitch + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "sync" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" +) + +// circuitKey uniquely identifies an active circuit between two open channels. +// Currently, the payment hash is used to uniquely identify each circuit. +type circuitKey [sha256.Size]byte + +// String represent the circuit key in string format. +func (k *circuitKey) String() string { + return hex.EncodeToString(k[:]) +} + +// paymentCircuit is used by htlc switch subsystem in order to determine +// backward path for settle/fail htlc messages. A payment circuit will be +// created once a channel link forwards the htlc add request and removed when we +// receive settle/fail htlc message. +// +// NOTE: In current implementation of htlc switch, the payment circuit might be +// uniquely identified by payment hash but in future we implement the payment +// fragmentation which makes possible for number of payments to have +// identical payments hashes, but different source or destination. +// +// For example if Alice(A) want to send 2BTC to Bob(B), then payment will be +// split on two parts and node N3 will have circuit with the same payment hash, +// and destination, but different channel source (N1,N2). +// +// 1BTC N1 1BTC +// + --------- o --------- + +// 2BTC | | 2BTC +// A o ------ o N0 N3 o ------ o B +// | | +// + --------- o --------- + +// 1BTC N2 1BTC +// +type paymentCircuit struct { + // PaymentHash used as unique identifier of payment. + PaymentHash circuitKey + + // Src identifies the channel from which add htlc request is came from + // and to which settle/fail htlc request will be returned back. Once the + // switch forwards the settle/fail message to the src the circuit is + // considered to be completed. + // TODO(andrew.shvv) use short channel id instead. + Src lnwire.ChannelID + + // Dest identifies the channel to which we propagate the htlc add + // update and from which we are expecting to receive htlc settle/fail + // request back. + // TODO(andrew.shvv) use short channel id instead. + Dest lnwire.ChannelID + + // RefCount is used to count the circuits with the same circuit key. + RefCount int +} + +// newPaymentCircuit creates new payment circuit instance. +func newPaymentCircuit(src, dest lnwire.ChannelID, key circuitKey) *paymentCircuit { + return &paymentCircuit{ + Src: src, + Dest: dest, + PaymentHash: key, + RefCount: 1, + } +} + +// isEqual checks the equality of two payment circuits. +func (a *paymentCircuit) isEqual(b *paymentCircuit) bool { + return bytes.Equal(a.PaymentHash[:], b.PaymentHash[:]) && + a.Src == b.Src && + a.Dest == b.Dest +} + +// circuitMap is a thread safe storage of circuits. Each circuit key (payment +// hash) might have numbers of circuits corresponding to it +// because of future payment fragmentation, now every circuit might be uniquely +// identified by payment hash (1-1 mapping). +// +// NOTE: Also we have the htlc debug mode and in this mode we have the same +// payment hash for all htlcs. +// TODO(andrew.shvv) make it persistent +type circuitMap struct { + mutex sync.RWMutex + circuits map[circuitKey]*paymentCircuit +} + +// newCircuitMap initialized circuit map with previously stored circuits and +// return circuit map instance. +func newCircuitMap() *circuitMap { + return &circuitMap{ + circuits: make(map[circuitKey]*paymentCircuit), + } +} + +// add function adds circuit in circuit map. +func (m *circuitMap) add(circuit *paymentCircuit) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + // Examine the circuit map to see if this + // circuit is already in use or not. If so, + // then we'll simply increment the reference + // count. Otherwise, we'll create a new circuit + // from scratch. + // TODO(roasbeef): include dest+src+amt in key + if c, ok := m.circuits[circuit.PaymentHash]; ok { + c.RefCount++ + } else { + m.circuits[circuit.PaymentHash] = circuit + } + + return nil +} + +// remove function removes circuit from map. +func (m *circuitMap) remove(key circuitKey) ( + *paymentCircuit, error) { + + m.mutex.Lock() + defer m.mutex.Unlock() + + if circuit, ok := m.circuits[key]; ok { + if circuit.RefCount--; circuit.RefCount == 0 { + delete(m.circuits, key) + } + return circuit, nil + } + + return nil, errors.Errorf("can't find circuit"+ + " for key %v", key) +} + +// pending returns number of circuits which are waiting for to be completed +// (settle/fail responses to be received) +func (m *circuitMap) pending() int { + m.mutex.RLock() + defer m.mutex.RUnlock() + + var length int + for _, circuits := range m.circuits { + length += circuits.RefCount + } + + return length +} diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go new file mode 100644 index 00000000..d370b111 --- /dev/null +++ b/htlcswitch/interfaces.go @@ -0,0 +1,75 @@ +package htlcswitch + +import ( + "crypto/sha256" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +// ChannelLink is an interface which represents the subsystem for managing +// the incoming htlc requests, applying the changes to the channel, and also +// propagating/forwarding it to htlc switch. +// +// abstraction level +// ^ +// | +// | - - - - - - - - - - - - Lightning - - - - - - - - - - - - - +// | +// | (Switch) (Switch) (Switch) +// | Alice <-- channel link --> Bob <-- channel link --> Carol +// | +// | - - - - - - - - - - - - - TCP - - - - - - - - - - - - - - - +// | +// | (Peer) (Peer) (Peer) +// | Alice <----- tcp conn --> Bob <---- tcp conn -----> Carol +// | +// +type ChannelLink interface { + // HandleSwitchPacket handles the switch packets. This packets might be + // forwarded to us from another channel link in case the htlc update + // came from another peer or if the update was created by user + // initially. + HandleSwitchPacket(*htlcPacket) + + // HandleChannelUpdate handles the htlc requests as settle/add/fail + // which sent to us from remote peer we have a channel with. + HandleChannelUpdate(lnwire.Message) + + // ChanID returns the unique identifier of the channel link. + ChanID() lnwire.ChannelID + + // Bandwidth returns the amount of satoshis which current link might + // pass through channel link. + Bandwidth() btcutil.Amount + + // Stats return the statistics of channel link. Number of updates, + // total sent/received satoshis. + Stats() (uint64, btcutil.Amount, btcutil.Amount) + + // Peer returns the representation of remote peer with which we + // have the channel link opened. + Peer() Peer + + // Start/Stop are used to initiate the start/stop of the the channel + // link functioning. + Start() error + Stop() +} + +// Peer is an interface which represents the remote lightning node inside our +// system. +type Peer interface { + // SendMessage sends message to remote peer. + SendMessage(lnwire.Message) error + + // ID returns the lightning network peer id. + ID() [sha256.Size]byte + + // PubKey returns the peer public key. + PubKey() []byte + + // Disconnect disconnects with peer if we have error which we can't + // properly handle. + Disconnect() +} diff --git a/htlcswitch/iterator.go b/htlcswitch/iterator.go index 0a9152ed..833aaff6 100644 --- a/htlcswitch/iterator.go +++ b/htlcswitch/iterator.go @@ -8,24 +8,24 @@ import ( "github.com/roasbeef/btcutil" ) -// hopID represents the id which is used by propagation subsystem in order to +// HopID represents the id which is used by propagation subsystem in order to // identify lightning network node. // TODO(andrew.shvv) remove after switching to the using channel id. -type hopID [ripemd160.Size]byte +type HopID [ripemd160.Size]byte -// newHopID creates new instance of hop form node public key. -func newHopID(pubKey []byte) hopID { - var routeId hopID - copy(routeId[:], btcutil.Hash160(pubKey)) - return routeId +// NewHopID creates new instance of hop form node public key. +func NewHopID(pubKey []byte) HopID { + var routeID HopID + copy(routeID[:], btcutil.Hash160(pubKey)) + return routeID } // String returns string representation of hop id. -func (h hopID) String() string { +func (h HopID) String() string { return hex.EncodeToString(h[:]) } // IsEqual checks does the two hop ids are equal. -func (h hopID) IsEqual(h2 hopID) bool { +func (h HopID) IsEqual(h2 HopID) bool { return bytes.Equal(h[:], h2[:]) } diff --git a/htlcswitch/log.go b/htlcswitch/log.go new file mode 100644 index 00000000..19c83038 --- /dev/null +++ b/htlcswitch/log.go @@ -0,0 +1,54 @@ +package htlcswitch + +import ( + "errors" + "io" + + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + DisableLog() +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until either UseLogger or SetLogWriter are called. +func DisableLog() { + log = btclog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} + +// SetLogWriter uses a specified io.Writer to output package logging info. +// This allows a caller to direct package logging output without needing a +// dependency on seelog. If the caller is also using btclog, UseLogger should +// be used instead. +func SetLogWriter(w io.Writer, level string) error { + if w == nil { + return errors.New("nil writer") + } + + lvl, ok := btclog.LogLevelFromString(level) + if !ok { + return errors.New("invalid log level") + } + + l, err := btclog.NewLoggerFromWriter(w, lvl) + if err != nil { + return err + } + + UseLogger(l) + return nil +} diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go new file mode 100644 index 00000000..442a6300 --- /dev/null +++ b/htlcswitch/mock.go @@ -0,0 +1,202 @@ +package htlcswitch + +import ( + "crypto/sha256" + "sync" + "testing" + + "sync/atomic" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +type mockServer struct { + sync.Mutex + + started int32 + shutdown int32 + wg sync.WaitGroup + quit chan bool + + t *testing.T + name string + messages chan lnwire.Message + + id []byte + htlcSwitch *Switch + + recordFuncs []func(lnwire.Message) +} + +var _ Peer = (*mockServer)(nil) + +func newMockServer(t *testing.T, name string) *mockServer { + return &mockServer{ + t: t, + id: []byte(name), + name: name, + messages: make(chan lnwire.Message, 3000), + + quit: make(chan bool), + htlcSwitch: New(Config{}), + recordFuncs: make([]func(lnwire.Message), 0), + } +} + +func (s *mockServer) Start() error { + if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { + return nil + } + + s.htlcSwitch.Start() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + for { + select { + case msg := <-s.messages: + for _, f := range s.recordFuncs { + f(msg) + } + + if err := s.readHandler(msg); err != nil { + s.Lock() + defer s.Unlock() + s.t.Fatalf("%v server error: %v", s.name, err) + } + case <-s.quit: + return + } + } + }() + + return nil +} + +// messageInterceptor is function that handles the incoming peer messages and +// may decide should we handle it or not. +type messageInterceptor func(m lnwire.Message) + +// Record is used to set the function which will be triggered when new +// lnwire message was received. +func (s *mockServer) record(f messageInterceptor) { + s.recordFuncs = append(s.recordFuncs, f) +} + +func (s *mockServer) SendMessage(message lnwire.Message) error { + select { + case s.messages <- message: + case <-s.quit: + } + + return nil +} + +func (s *mockServer) readHandler(message lnwire.Message) error { + var targetChan lnwire.ChannelID + + switch msg := message.(type) { + case *lnwire.UpdateAddHTLC: + targetChan = msg.ChanID + case *lnwire.UpdateFufillHTLC: + targetChan = msg.ChanID + case *lnwire.UpdateFailHTLC: + targetChan = msg.ChanID + case *lnwire.RevokeAndAck: + targetChan = msg.ChanID + case *lnwire.CommitSig: + targetChan = msg.ChanID + default: + return errors.New("unknown message type") + } + + // Dispatch the commitment update message to the proper + // channel link dedicated to this channel. + link, err := s.htlcSwitch.GetLink(targetChan) + if err != nil { + return err + } + + // Create goroutine for this, in order to be able to properly stop + // the server when handler stacked (server unavailable) + done := make(chan struct{}) + go func() { + defer func() { + done <- struct{}{} + }() + + link.HandleChannelUpdate(message) + }() + select { + case <-done: + case <-s.quit: + } + + return nil +} + +func (s *mockServer) ID() [sha256.Size]byte { + return [sha256.Size]byte{} +} + +func (s *mockServer) PubKey() []byte { + return s.id +} + +func (s *mockServer) Disconnect() { + s.Stop() + s.t.Fatalf("server %v was disconnected", s.name) +} + +func (s *mockServer) Stop() { + if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { + return + } + + go s.htlcSwitch.Stop() + + close(s.quit) + s.wg.Wait() +} + +func (s *mockServer) String() string { + return string(s.id) +} + +type mockChannelLink struct { + chanID lnwire.ChannelID + peer Peer + packets chan *htlcPacket +} + +func newMockChannelLink(chanID lnwire.ChannelID, + peer Peer) *mockChannelLink { + return &mockChannelLink{ + chanID: chanID, + packets: make(chan *htlcPacket, 1), + peer: peer, + } +} + +func (f *mockChannelLink) HandleSwitchPacket(packet *htlcPacket) { + f.packets <- packet +} + +func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) { +} + +func (f *mockChannelLink) Stats() (uint64, btcutil.Amount, btcutil.Amount) { + return 0, 0, 0 +} + +func (f *mockChannelLink) ChanID() lnwire.ChannelID { return f.chanID } +func (f *mockChannelLink) Bandwidth() btcutil.Amount { return 99999999 } +func (f *mockChannelLink) Peer() Peer { return f.peer } +func (f *mockChannelLink) Start() error { return nil } +func (f *mockChannelLink) Stop() {} + +var _ ChannelLink = (*mockChannelLink)(nil) diff --git a/htlcswitch/packet.go b/htlcswitch/packet.go new file mode 100644 index 00000000..4bc2e229 --- /dev/null +++ b/htlcswitch/packet.go @@ -0,0 +1,78 @@ +package htlcswitch + +import ( + "crypto/sha256" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +// htlcPacket is a wrapper around htlc lnwire update, which adds additional +// information which is needed by this package. +type htlcPacket struct { + // payHash payment hash of htlc request. + // NOTE: This fields is initialized only in settle and fail packets. + payHash [sha256.Size]byte + + // dest is the next channel to which this update will be applied. + // TODO(andrew.shvv) use short channel id instead. + dest HopID + + // src is a previous channel to which htlc was applied. + // TODO(andrew.shvv) use short channel id instead. + src lnwire.ChannelID + + // htlc lnwire message type of which depends on switch request type. + htlc lnwire.Message + + // TODO(andrew.shvv) should be removed after introducing sphinx payment. + amount btcutil.Amount +} + +// newInitPacket creates htlc switch add packet which encapsulates the +// add htlc request and additional information for proper forwarding over +// htlc switch. +func newInitPacket(dest HopID, htlc *lnwire.UpdateAddHTLC) *htlcPacket { + return &htlcPacket{ + dest: dest, + htlc: htlc, + } +} + +// newAddPacket creates htlc switch add packet which encapsulates the +// add htlc request and additional information for proper forwarding over +// htlc switch. +func newAddPacket(src lnwire.ChannelID, dest HopID, + htlc *lnwire.UpdateAddHTLC) *htlcPacket { + return &htlcPacket{ + dest: dest, + src: src, + htlc: htlc, + } +} + +// newSettlePacket creates htlc switch ack/settle packet which encapsulates the +// settle htlc request which should be created and sent back by last hope in +// htlc path. +func newSettlePacket(src lnwire.ChannelID, htlc *lnwire.UpdateFufillHTLC, + payHash [sha256.Size]byte, amount btcutil.Amount) *htlcPacket { + return &htlcPacket{ + src: src, + payHash: payHash, + htlc: htlc, + amount: amount, + } +} + +// newFailPacket creates htlc switch fail packet which encapsulates the fail +// htlc request which propagated back to the original hope who sent the htlc +// add request if something wrong happened on the path to the final destination. +func newFailPacket(src lnwire.ChannelID, htlc *lnwire.UpdateFailHTLC, + payHash [sha256.Size]byte, amount btcutil.Amount) *htlcPacket { + return &htlcPacket{ + src: src, + payHash: payHash, + htlc: htlc, + amount: amount, + } +} diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go new file mode 100644 index 00000000..3713efb7 --- /dev/null +++ b/htlcswitch/switch.go @@ -0,0 +1,861 @@ +package htlcswitch + +import ( + "sync" + "sync/atomic" + "time" + + "crypto/sha256" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +var ( + // ErrChannelLinkNotFound is used when channel link hasn't been found. + ErrChannelLinkNotFound = errors.New("channel link not found") + + // zeroPreimage is the empty preimage which is returned when we have + // some errors. + zeroPreimage [sha256.Size]byte +) + +// pendingPayment represents the payment which made by user and waits for +// updates to be received whether the payment has been rejected or proceed +// successfully. +type pendingPayment struct { + paymentHash lnwallet.PaymentHash + amount btcutil.Amount + + preimage chan [sha256.Size]byte + err chan error +} + +// forwardPacketCmd encapsulates switch packet and adds error channel to +// receive error from request handler. +type forwardPacketCmd struct { + pkt *htlcPacket + err chan error +} + +// ChannelCloseType is a enum which signals the type of channel closure the +// peer should execute. +type ChannelCloseType uint8 + +const ( + // CloseRegular indicates a regular cooperative channel closure + // should be attempted. + CloseRegular ChannelCloseType = iota + + // CloseBreach indicates that a channel breach has been dtected, and + // the link should immediately be marked as unavailable. + CloseBreach +) + +// ChanClose represents a request which close a particular channel specified by +// its id. +type ChanClose struct { + // CloseType is a variable which signals the type of channel closure the + // peer should execute. + CloseType ChannelCloseType + + // ChanPoint represent the id of the channel which should be closed. + ChanPoint *wire.OutPoint + + // Updates is used by request creator to receive the notifications about + // execution of the close channel request. + Updates chan *lnrpc.CloseStatusUpdate + + // Err is used by request creator to receive request execution error. + Err chan error +} + +// Config defines the configuration for the service. ALL elements within the +// configuration MUST be non-nil for the service to carry out its duties. +type Config struct { + // LocalChannelClose kicks-off the workflow to execute a cooperative + // or forced unilateral closure of the channel initiated by a local + // subsystem. + LocalChannelClose func(pubKey []byte, request *ChanClose) +} + +// Switch is a central messaging bus for all incoming/outgoing htlc's. +// The goal of the switch is forward the incoming/outgoing htlc messages from +// one channel to another, and also propagate the settle/fail htlc messages +// back to original requester by using payment circuits. Also switch is +// responsible for notifying the user about result of payment request. +type Switch struct { + started int32 + shutdown int32 + wg sync.WaitGroup + quit chan struct{} + + // cfg is a copy of the configuration struct that the htlc switch + // service was initialized with. + cfg *Config + + // pendingPayments is correspondence of user payments and its hashes, + // which is used to save the payments which made by user and notify + // them about result later. + pendingPayments map[lnwallet.PaymentHash][]*pendingPayment + pendingMutex sync.RWMutex + + // circuits is storage for payment circuits which are used to + // forward the settle/fail htlc updates back to the add htlc initiator. + circuits *circuitMap + + // links is a map of channel id and channel link which manages + // this channel. + links map[lnwire.ChannelID]ChannelLink + + // linksIndex is a map which is needed for quick lookup of channels + // which are belongs to specific peer. + linksIndex map[HopID][]ChannelLink + + // forwardCommands is used for propogating the htlc packet forward + // requests. + forwardCommands chan *forwardPacketCmd + + // chanCloseRequests is used to transfer the channel close request to + // the channel close handler. + chanCloseRequests chan *ChanClose + + // linkControl is a channel used to propogate add/remove/get htlc + // switch handler commands. + linkControl chan interface{} +} + +// New creates the new instance of htlc switch. +func New(cfg Config) *Switch { + return &Switch{ + cfg: &cfg, + circuits: newCircuitMap(), + links: make(map[lnwire.ChannelID]ChannelLink), + linksIndex: make(map[HopID][]ChannelLink), + pendingPayments: make(map[lnwallet.PaymentHash][]*pendingPayment), + forwardCommands: make(chan *forwardPacketCmd), + chanCloseRequests: make(chan *ChanClose), + linkControl: make(chan interface{}), + quit: make(chan struct{}), + } +} + +// SendHTLC is used by other subsystems which aren't belong to htlc switch +// package in order to send the htlc update. +func (s *Switch) SendHTLC(nextNode []byte, update lnwire.Message) ( + [sha256.Size]byte, error) { + + htlc := update.(*lnwire.UpdateAddHTLC) + + // Create payment and add to the map of payment in order later to be + // able to retrieve it and return response to the user. + payment := &pendingPayment{ + err: make(chan error, 1), + preimage: make(chan [sha256.Size]byte, 1), + paymentHash: htlc.PaymentHash, + amount: htlc.Amount, + } + + // Check that we do not have the payment with the same id in order to + // prevent map override. + s.pendingMutex.Lock() + s.pendingPayments[htlc.PaymentHash] = append( + s.pendingPayments[htlc.PaymentHash], payment) + s.pendingMutex.Unlock() + + // Generate and send new update packet, if error will be received + // on this stage it means that packet haven't left boundaries of our + // system and something wrong happened. + hop := NewHopID(nextNode) + packet := newInitPacket(hop, htlc) + if err := s.forward(packet); err != nil { + s.removePendingPayment(payment.amount, payment.paymentHash) + return zeroPreimage, err + } + + // Returns channels so that other subsystem might wait/skip the + // waiting of handling of payment. + var preimage [sha256.Size]byte + var err error + + select { + case e := <-payment.err: + err = e + case <-s.quit: + return zeroPreimage, errors.New("service is shutdown") + } + + select { + case p := <-payment.preimage: + preimage = p + case <-s.quit: + return zeroPreimage, errors.New("service is shutdown") + } + + return preimage, err +} + +// forward is used in order to find next channel link and apply htlc +// update. Also this function is used by channel links itself in order to +// forward the update after it has been included in the channel. +func (s *Switch) forward(packet *htlcPacket) error { + command := &forwardPacketCmd{ + pkt: packet, + err: make(chan error, 1), + } + + select { + case s.forwardCommands <- command: + return <-command.err + case <-s.quit: + return errors.New("Htlc Switch was stopped") + } +} + +// handleLocalDispatch is used at the start/end of the htlc update life +// cycle. At the start (1) it is used to send the htlc to the channel link +// without creation of circuit. At the end (2) it is used to notify the user +// about the result of his payment is it was successful or not. +// +// Alice Bob Carol +// o --add----> o ---add----> o +// (1) +// +// (2) +// o <-settle-- o <--settle-- o +// Alice Bob Carol +// +func (s *Switch) handleLocalDispatch(payment *pendingPayment, packet *htlcPacket) error { + switch htlc := packet.htlc.(type) { + + // User have created the htlc update therefore we should find the + // appropriate channel link and send the payment over this link. + case *lnwire.UpdateAddHTLC: + // Try to find links by node destination. + links, err := s.getLinks(packet.dest) + if err != nil { + log.Errorf("unable to find links by "+ + "destination %v", err) + return errors.New(lnwire.UnknownDestination) + } + + // Try to find destination channel link with appropriate + // bandwidth. + var destination ChannelLink + for _, link := range links { + if link.Bandwidth() >= htlc.Amount { + destination = link + break + } + } + + // If the channel link we're attempting to forward the update + // over has insufficient capacity, then we'll cancel the HTLC + // as the payment cannot succeed. + if destination == nil { + log.Errorf("unable to find appropriate channel link "+ + "insufficient capacity, need %v", htlc.Amount) + return errors.New(lnwire.InsufficientCapacity) + } + + // Send the packet to the destination channel link which + // manages then channel. + destination.HandleSwitchPacket(packet) + return nil + + // We've just received a settle update which means we can finalize + // the user payment and return successful response. + case *lnwire.UpdateFufillHTLC: + // Notify the user that his payment was + // successfully proceed. + payment.err <- nil + payment.preimage <- htlc.PaymentPreimage + s.removePendingPayment(payment.amount, payment.paymentHash) + + // We've just received a fail update which means we can finalize + // the user payment and return fail response. + case *lnwire.UpdateFailHTLC: + // Retrieving the fail code from byte representation of error. + var userErr error + if code, err := htlc.Reason.ToFailCode(); err != nil { + userErr = errors.Errorf("can't decode fail code id"+ + "(%v): %v", htlc.ID, err) + } else { + userErr = errors.New(code) + } + + // Notify user that his payment was discarded. + payment.err <- userErr + payment.preimage <- zeroPreimage + s.removePendingPayment(payment.amount, payment.paymentHash) + + default: + return errors.New("wrong update type") + } + + return nil +} + +// handlePacketForward is used in cases when we need forward the htlc +// update from one channel link to another and be able to propagate the +// settle/fail updates back. This behaviour is achieved by creation of payment +// circuits. +func (s *Switch) handlePacketForward(packet *htlcPacket) error { + switch htlc := packet.htlc.(type) { + + // Channel link forwarded us a new htlc, therefore we initiate the + // payment circuit within our internal state so we can properly forward + // the ultimate settle message back latter. + case *lnwire.UpdateAddHTLC: + source, err := s.getLink(packet.src) + if err != nil { + err := errors.Errorf("unable to find channel link "+ + "by channel point (%v): %v", packet.src, err) + log.Error(err) + return err + } + + // Try to find links by node destination. + links, err := s.getLinks(packet.dest) + if err != nil { + // If packet was forwarded from another + // channel link than we should notify this + // link that some error occurred. + reason := []byte{byte(lnwire.UnknownDestination)} + go source.HandleSwitchPacket(newFailPacket( + packet.src, + &lnwire.UpdateFailHTLC{ + Reason: reason, + }, + htlc.PaymentHash, 0, + )) + err := errors.Errorf("unable to find links with "+ + "destination %v", err) + log.Error(err) + return err + } + + // Try to find destination channel link with appropriate + // bandwidth. + var destination ChannelLink + for _, link := range links { + if link.Bandwidth() >= htlc.Amount { + destination = link + break + } + } + + // If the channel link we're attempting to forward the update + // over has insufficient capacity, then we'll cancel the htlc + // as the payment cannot succeed. + if destination == nil { + // If packet was forwarded from another + // channel link than we should notify this + // link that some error occurred. + reason := []byte{byte(lnwire.InsufficientCapacity)} + go source.HandleSwitchPacket(newFailPacket( + packet.src, + &lnwire.UpdateFailHTLC{ + Reason: reason, + }, + htlc.PaymentHash, + 0, + )) + + err := errors.Errorf("unable to find appropriate "+ + "channel link insufficient capacity, need "+ + "%v", htlc.Amount) + log.Error(err) + return err + } + + // If packet was forwarded from another channel link than we + // should create circuit (remember the path) in order to + // forward settle/fail packet back. + if err := s.circuits.add(newPaymentCircuit( + source.ChanID(), + destination.ChanID(), + htlc.PaymentHash, + )); err != nil { + reason := []byte{byte(lnwire.UnknownError)} + go source.HandleSwitchPacket(newFailPacket( + packet.src, + &lnwire.UpdateFailHTLC{ + Reason: reason, + }, + htlc.PaymentHash, + 0, + )) + err := errors.Errorf("unable to add circuit: "+ + "%v", err) + log.Error(err) + return err + } + + // Send the packet to the destination channel link which + // manages the channel. + destination.HandleSwitchPacket(packet) + return nil + + // We've just received a settle packet which means we can finalize the + // payment circuit by forwarding the settle msg to the channel from + // which htlc add packet was initially received. + case *lnwire.UpdateFufillHTLC, *lnwire.UpdateFailHTLC: + // Exit if we can't find and remove the active circuit to + // continue propagating the fail over. + circuit, err := s.circuits.remove(packet.payHash) + if err != nil { + err := errors.Errorf("unable to remove "+ + "circuit for payment hash: %v", packet.payHash) + log.Error(err) + return err + } + + // Propagating settle/fail htlc back to src of add htlc packet. + source, err := s.getLink(circuit.Src) + if err != nil { + err := errors.Errorf("unable to get source "+ + "channel link to forward settle/fail htlc: %v", + err) + log.Error(err) + return err + } + + log.Debugf("Closing completed onion "+ + "circuit for %x: %v<->%v", packet.payHash[:], + circuit.Src, circuit.Dest) + + source.HandleSwitchPacket(packet) + return nil + + default: + return errors.New("wrong update type") + } +} + +// CloseLink creates and sends the the close channel command. +func (s *Switch) CloseLink(chanPoint *wire.OutPoint, + closeType ChannelCloseType) (chan *lnrpc.CloseStatusUpdate, chan error) { + + // TODO(roasbeef) abstract out the close updates. + updateChan := make(chan *lnrpc.CloseStatusUpdate, 1) + errChan := make(chan error, 1) + + command := &ChanClose{ + CloseType: closeType, + ChanPoint: chanPoint, + Updates: updateChan, + Err: errChan, + } + + select { + case s.chanCloseRequests <- command: + return updateChan, errChan + + case <-s.quit: + errChan <- errors.New("unable close channel link, htlc " + + "switch already stopped") + close(updateChan) + return updateChan, errChan + } +} + +// handleCloseLink sends a message to the peer responsible for the target +// channel point, instructing it to initiate a cooperative channel closure. +func (s *Switch) handleChanelClose(req *ChanClose) { + chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) + + var link ChannelLink + for _, l := range s.links { + if l.ChanID() == chanID { + link = l + } + } + + if link == nil { + req.Err <- errors.Errorf("channel with ChannelID(%v) not "+ + "found", chanID) + return + } + + log.Debugf("requesting local channel close, peer(%v) channel(%v)", + link.Peer(), chanID) + + // TODO(roasbeef): if type was CloseBreach initiate force closure with + // all other channels (if any) we have with the remote peer. + s.cfg.LocalChannelClose(link.Peer().PubKey(), req) + return +} + +// startHandling start handling inner command requests and print the +// htlc switch statistics. +// NOTE: Should be run as goroutine. +func (s *Switch) startHandling() { + defer s.wg.Done() + + // Remove all links on stop. + defer func() { + for _, link := range s.links { + if err := s.removeLink(link.ChanID()); err != nil { + log.Errorf("unable to remove "+ + "channel link on stop: %v", err) + } + } + }() + + // TODO(roasbeef): cleared vs settled distinction + var prevNumUpdates uint64 + var prevSatSent btcutil.Amount + var prevSatRecv btcutil.Amount + + for { + select { + case req := <-s.chanCloseRequests: + s.handleChanelClose(req) + + case cmd := <-s.forwardCommands: + var paymentHash lnwallet.PaymentHash + var amount btcutil.Amount + + switch m := cmd.pkt.htlc.(type) { + case *lnwire.UpdateAddHTLC: + paymentHash = m.PaymentHash + amount = m.Amount + case *lnwire.UpdateFufillHTLC, *lnwire.UpdateFailHTLC: + paymentHash = cmd.pkt.payHash + amount = cmd.pkt.amount + default: + cmd.err <- errors.New("wrong type of update") + return + } + + payment, err := s.findPayment(amount, paymentHash) + if err != nil { + cmd.err <- s.handlePacketForward(cmd.pkt) + } else { + cmd.err <- s.handleLocalDispatch(payment, cmd.pkt) + } + + case <-time.Tick(10 * time.Second): + var overallNumUpdates uint64 + var overallSatSent btcutil.Amount + var overallSatRecv btcutil.Amount + + for _, link := range s.links { + updates, sent, recv := link.Stats() + overallNumUpdates += updates + overallSatSent += sent + overallSatRecv += recv + } + + diffNumUpdates := overallNumUpdates - prevNumUpdates + diffSatSent := overallSatSent - prevSatSent + diffSatRecv := overallSatRecv - prevSatRecv + + if diffNumUpdates == 0 { + continue + } + + log.Infof("sent %v satoshis received %v satoshi "+ + " in the last 10 seconds (%v tx/sec)", + diffSatSent, diffSatRecv, float64(diffNumUpdates)/10) + + prevNumUpdates = overallNumUpdates + prevSatSent = overallSatSent + prevSatRecv = overallSatRecv + + case cmd := <-s.linkControl: + switch cmd := cmd.(type) { + case *addLinkCmd: + cmd.err <- s.addLink(cmd.link) + case *removeLinkCmd: + cmd.err <- s.removeLink(cmd.chanID) + case *getLinkCmd: + link, err := s.getLink(cmd.chanID) + cmd.done <- link + cmd.err <- err + case *getLinksCmd: + links, err := s.getLinks(cmd.peer) + cmd.done <- links + cmd.err <- err + } + + case <-s.quit: + return + } + } +} + +// Start starts all helper goroutines required for the operation of the switch. +func (s *Switch) Start() error { + if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { + log.Warn("Htlc Switch already started") + return nil + } + + log.Infof("Htlc Switch starting") + + s.wg.Add(1) + go s.startHandling() + + return nil +} + +// Stop gracefully stops all active helper goroutines, then waits until they've +// exited. +func (s *Switch) Stop() error { + if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { + log.Warn("Htlc Switch already stopped") + return nil + } + + log.Infof("Htlc Switch shutting down") + + close(s.quit) + s.wg.Wait() + + return nil +} + +// addLinkCmd is a add link command wrapper, it is used to propagate handler +// parameters and return handler error. +type addLinkCmd struct { + link ChannelLink + err chan error +} + +// AddLink is used to initiate the handling of the add link command. The +// request will be propagated and handled in the main goroutine. +func (s *Switch) AddLink(link ChannelLink) error { + command := &addLinkCmd{ + link: link, + err: make(chan error, 1), + } + + select { + case s.linkControl <- command: + return <-command.err + case <-s.quit: + return errors.New("Htlc Switch was stopped") + } +} + +// addLink is used to add the newly created channel link and start +// use it to handle the channel updates. +func (s *Switch) addLink(link ChannelLink) error { + if err := link.Start(); err != nil { + return err + + } + + // Add channel link to the channel map, in order to quickly lookup + // channel by channel id. + s.links[link.ChanID()] = link + + // Add channel link to the index map, in order to quickly lookup + // channels by peer pub key. + hop := NewHopID(link.Peer().PubKey()) + s.linksIndex[hop] = append(s.linksIndex[hop], link) + + log.Infof("Added channel link with ChannelID(%v), bandwidth=%v", + link.ChanID(), link.Bandwidth()) + return nil +} + +// getLinkCmd is a get link command wrapper, it is used to propagate handler +// parameters and return handler error. +type getLinkCmd struct { + chanID lnwire.ChannelID + err chan error + done chan ChannelLink +} + +// GetLink is used to initiate the handling of the get link command. The +// request will be propagated/handled to/in the main goroutine. +func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelLink, error) { + command := &getLinkCmd{ + chanID: chanID, + err: make(chan error, 1), + done: make(chan ChannelLink, 1), + } + + select { + case s.linkControl <- command: + return <-command.done, <-command.err + case <-s.quit: + return nil, errors.New("Htlc Switch was stopped") + } +} + +// getLink returns the channel link by its channel point. +func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) { + link, ok := s.links[chanID] + if !ok { + return nil, ErrChannelLinkNotFound + } + + return link, nil +} + +// removeLinkCmd is a get link command wrapper, it is used to propagate handler +// parameters and return handler error. +type removeLinkCmd struct { + chanID lnwire.ChannelID + err chan error +} + +// RemoveLink is used to initiate the handling of the remove link command. The +// request will be propagated/handled to/in the main goroutine. +func (s *Switch) RemoveLink(chanID lnwire.ChannelID) error { + command := &removeLinkCmd{ + chanID: chanID, + err: make(chan error, 1), + } + + select { + case s.linkControl <- command: + return <-command.err + case <-s.quit: + return errors.New("Htlc Switch was stopped") + } +} + +// removeLink is used to remove and stop the channel link. +func (s *Switch) removeLink(chanID lnwire.ChannelID) error { + link, ok := s.links[chanID] + if !ok { + return ErrChannelLinkNotFound + } + + // Remove the channel from channel map. + delete(s.links, link.ChanID()) + + // Remove the channel from channel index. + hop := NewHopID(link.Peer().PubKey()) + links := s.linksIndex[hop] + for i, l := range links { + if l.ChanID() == link.ChanID() { + // Delete without preserving order + // Google: Golang slice tricks + links[i] = links[len(links)-1] + links[len(links)-1] = nil + s.linksIndex[hop] = links[:len(links)-1] + + if len(s.linksIndex[hop]) == 0 { + delete(s.linksIndex, hop) + } + break + } + } + + go link.Stop() + log.Infof("Remove channel link with ChannelID(%v)", link.ChanID()) + + return nil +} + +// getLinksCmd is a get links command wrapper, it is used to propagate handler +// parameters and return handler error. +type getLinksCmd struct { + peer HopID + err chan error + done chan []ChannelLink +} + +// GetLinks is used to initiate the handling of the get links command. The +// request will be propagated/handled to/in the main goroutine. +func (s *Switch) GetLinks(hop HopID) ([]ChannelLink, error) { + command := &getLinksCmd{ + peer: hop, + err: make(chan error, 1), + done: make(chan []ChannelLink, 1), + } + + select { + case s.linkControl <- command: + return <-command.done, <-command.err + case <-s.quit: + return nil, errors.New("Htlc Switch was stopped") + } +} + +// getLinks is function which returns the channel links of the peer by hop +// destination id. +func (s *Switch) getLinks(destination HopID) ([]ChannelLink, error) { + links, ok := s.linksIndex[destination] + if !ok { + return nil, errors.Errorf("unable to locate channel link by"+ + "destination hop id %v", destination) + } + + result := make([]ChannelLink, len(links)) + for i, link := range links { + result[i] = ChannelLink(link) + } + + return result, nil +} + +// removePendingPayment is the helper function which removes the pending user +// payment. +func (s *Switch) removePendingPayment(amount btcutil.Amount, + hash lnwallet.PaymentHash) error { + s.pendingMutex.Lock() + defer s.pendingMutex.Unlock() + + payments, ok := s.pendingPayments[hash] + if ok { + for i, payment := range payments { + if payment.amount == amount { + // Delete without preserving order + // Google: Golang slice tricks + payments[i] = payments[len(payments)-1] + payments[len(payments)-1] = nil + s.pendingPayments[hash] = payments[:len(payments)-1] + + if len(s.pendingPayments[hash]) == 0 { + delete(s.pendingPayments, hash) + } + + return nil + } + } + } + + return errors.Errorf("unable to remove pending payment with "+ + "hash(%v) and amount(%v)", hash, amount) +} + +// findPayment is the helper function which find the payment. +func (s *Switch) findPayment(amount btcutil.Amount, + hash lnwallet.PaymentHash) (*pendingPayment, error) { + s.pendingMutex.RLock() + defer s.pendingMutex.RUnlock() + + payments, ok := s.pendingPayments[hash] + if ok { + for _, payment := range payments { + if payment.amount == amount { + return payment, nil + } + } + } + + return nil, errors.Errorf("unable to remove pending payment with "+ + "hash(%v) and amount(%v)", hash, amount) +} + +// numPendingPayments is helper function which returns the overall number of +// pending user payments. +func (s *Switch) numPendingPayments() int { + var l int + for _, payments := range s.pendingPayments { + l += len(payments) + } + + return l +} diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go new file mode 100644 index 00000000..a6c62c51 --- /dev/null +++ b/htlcswitch/switch_test.go @@ -0,0 +1,378 @@ +package htlcswitch + +import ( + "bytes" + "crypto/sha256" + "testing" + "time" + + "github.com/btcsuite/fastsha256" + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" +) + +var ( + hash1, _ = chainhash.NewHash(bytes.Repeat([]byte("a"), 32)) + hash2, _ = chainhash.NewHash(bytes.Repeat([]byte("b"), 32)) + + chanPoint1 = wire.NewOutPoint(hash1, 0) + chanPoint2 = wire.NewOutPoint(hash2, 0) + + chanID1 = lnwire.NewChanIDFromOutPoint(chanPoint1) + chanID2 = lnwire.NewChanIDFromOutPoint(chanPoint2) +) + +// TestSwitchForward checks the ability of htlc switch to forward add/settle +// requests. +func TestSwitchForward(t *testing.T) { + var packet *htlcPacket + + alicePeer := newMockServer(t, "alice") + bobPeer := newMockServer(t, "bob") + + aliceChannelLink := newMockChannelLink(chanID1, alicePeer) + bobChannelLink := newMockChannelLink(chanID2, bobPeer) + + s := New(Config{}) + s.Start() + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add alice link: %v", err) + } + if err := s.AddLink(bobChannelLink); err != nil { + t.Fatalf("unable to add bob link: %v", err) + } + + // Create request which should be forwarder from alice channel + // link to bob channel link. + preimage := [sha256.Size]byte{1} + rhash := fastsha256.Sum256(preimage[:]) + packet = newAddPacket( + aliceChannelLink.ChanID(), + NewHopID(bobChannelLink.Peer().PubKey()), + &lnwire.UpdateAddHTLC{ + PaymentHash: rhash, + Amount: 1, + }, + ) + + // Handle the request and checks that bob channel link received it. + if err := s.forward(packet); err != nil { + t.Fatal(err) + } + + select { + case <-bobChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + if s.circuits.pending() != 1 { + t.Fatal("wrong amount of circuits") + } + + // Create settle request pretending that bob link handled + // the add htlc request and sent the htlc settle request back. This + // request should be forwarder back to alice link. + packet = newSettlePacket( + bobChannelLink.ChanID(), + &lnwire.UpdateFufillHTLC{ + PaymentPreimage: preimage, + }, + rhash, 1) + + // Handle the request and checks that payment circuit works properly. + if err := s.forward(packet); err != nil { + t.Fatal(err) + } + + select { + case <-aliceChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to channelPoint") + } + + if s.circuits.pending() != 0 { + t.Fatal("wrong amount of circuits") + } +} + +// TestSwitchCancel checks that if htlc was rejected we remove unused +// circuits. +func TestSwitchCancel(t *testing.T) { + var request *htlcPacket + + alicePeer := newMockServer(t, "alice") + bobPeer := newMockServer(t, "bob") + + aliceChannelLink := newMockChannelLink(chanID1, alicePeer) + bobChannelLink := newMockChannelLink(chanID2, bobPeer) + + s := New(Config{}) + s.Start() + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add alice link: %v", err) + } + if err := s.AddLink(bobChannelLink); err != nil { + t.Fatalf("unable to add bob link: %v", err) + } + + // Create request which should be forwarder from alice channel link + // to bob channel link. + preimage := [sha256.Size]byte{1} + rhash := fastsha256.Sum256(preimage[:]) + request = newAddPacket( + aliceChannelLink.ChanID(), + NewHopID(bobChannelLink.Peer().PubKey()), + &lnwire.UpdateAddHTLC{ + PaymentHash: rhash, + Amount: 1, + }, + ) + + // Handle the request and checks that bob channel link received it. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-bobChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + if s.circuits.pending() != 1 { + t.Fatal("wrong amount of circuits") + } + + // Create settle request pretending that bob channel link handled + // the add htlc request and sent the htlc settle request back. This + // request should be forwarder back to alice channel link. + request = newFailPacket( + bobChannelLink.ChanID(), + &lnwire.UpdateFailHTLC{}, + rhash, 1) + + // Handle the request and checks that payment circuit works properly. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-aliceChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to channelPoint") + } + + if s.circuits.pending() != 0 { + t.Fatal("wrong amount of circuits") + } +} + +// TestSwitchAddSamePayment tests that we send the payment with the same +// payment hash. +func TestSwitchAddSamePayment(t *testing.T) { + var request *htlcPacket + + alicePeer := newMockServer(t, "alice") + bobPeer := newMockServer(t, "bob") + + aliceChannelLink := newMockChannelLink(chanID1, alicePeer) + bobChannelLink := newMockChannelLink(chanID2, bobPeer) + + s := New(Config{}) + s.Start() + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add alice link: %v", err) + } + if err := s.AddLink(bobChannelLink); err != nil { + t.Fatalf("unable to add bob link: %v", err) + } + + // Create request which should be forwarder from alice channel link + // to bob channel link. + preimage := [sha256.Size]byte{1} + rhash := fastsha256.Sum256(preimage[:]) + request = newAddPacket( + aliceChannelLink.ChanID(), + NewHopID(bobChannelLink.Peer().PubKey()), + &lnwire.UpdateAddHTLC{ + PaymentHash: rhash, + Amount: 1, + }, + ) + + // Handle the request and checks that bob channel link received it. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-bobChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + if s.circuits.pending() != 1 { + t.Fatal("wrong amount of circuits") + } + + // Handle the request and checks that bob channel link received it. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + if s.circuits.pending() != 2 { + t.Fatal("wrong amount of circuits") + } + + // Create settle request pretending that bob channel link handled + // the add htlc request and sent the htlc settle request back. This + // request should be forwarder back to alice channel link. + request = newFailPacket( + bobChannelLink.ChanID(), + &lnwire.UpdateFailHTLC{}, + rhash, 1) + + // Handle the request and checks that payment circuit works properly. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-aliceChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to channelPoint") + } + + if s.circuits.pending() != 1 { + t.Fatal("wrong amount of circuits") + } + + // Handle the request and checks that payment circuit works properly. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-aliceChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to channelPoint") + } + + if s.circuits.pending() != 0 { + t.Fatal("wrong amount of circuits") + } +} + +// TestSwitchSendPayment tests ability of htlc switch to respond to the +// users when response is came back from channel link. +func TestSwitchSendPayment(t *testing.T) { + alicePeer := newMockServer(t, "alice") + aliceChannelLink := newMockChannelLink(chanID1, alicePeer) + + s := New(Config{}) + s.Start() + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add link: %v", err) + } + + // Create request which should be forwarder from alice channel link + // to bob channel link. + preimage := [sha256.Size]byte{1} + rhash := fastsha256.Sum256(preimage[:]) + update := &lnwire.UpdateAddHTLC{ + PaymentHash: rhash, + Amount: 1, + } + + // Handle the request and checks that bob channel link received it. + errChan := make(chan error) + go func() { + _, err := s.SendHTLC(aliceChannelLink.Peer().PubKey(), update) + errChan <- err + }() + + go func() { + // Send the payment with the same payment hash and same + // amount and check that it will be propagated successfully + _, err := s.SendHTLC(aliceChannelLink.Peer().PubKey(), update) + errChan <- err + }() + + select { + case <-aliceChannelLink.packets: + break + case err := <-errChan: + t.Fatalf("unable to send payment: %v", err) + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + select { + case <-aliceChannelLink.packets: + break + case err := <-errChan: + t.Fatalf("unable to send payment: %v", err) + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + if s.numPendingPayments() != 2 { + t.Fatal("wrong amount of pending payments") + } + + if s.circuits.pending() != 0 { + t.Fatal("wrong amount of circuits") + } + + // Create fail request pretending that bob channel link handled + // the add htlc request with error and sent the htlc fail request + // back. This request should be forwarder back to alice channel link. + packet := newFailPacket(aliceChannelLink.ChanID(), + &lnwire.UpdateFailHTLC{ + Reason: []byte{byte(lnwire.IncorrectValue)}, + ID: 1, + }, + rhash, 1) + + if err := s.forward(packet); err != nil { + t.Fatalf("can't forward htlc packet: %v", err) + } + + select { + case err := <-errChan: + if err.Error() != errors.New(lnwire.IncorrectValue).Error() { + t.Fatal("err wasn't received") + } + case <-time.After(time.Second): + t.Fatal("err wasn't received") + } + + // Send second failure response and check that user were able to + // receive the error. + if err := s.forward(packet); err != nil { + t.Fatalf("can't forward htlc packet: %v", err) + } + + select { + case err := <-errChan: + if err.Error() != errors.New(lnwire.IncorrectValue).Error() { + t.Fatal("err wasn't received") + } + case <-time.After(time.Second): + t.Fatal("err wasn't received") + } + + if s.numPendingPayments() != 0 { + t.Fatal("wrong amount of pending payments") + } +} diff --git a/lnwire/update_fail_htlc.go b/lnwire/update_fail_htlc.go index 32c2afff..4b7a92b4 100644 --- a/lnwire/update_fail_htlc.go +++ b/lnwire/update_fail_htlc.go @@ -40,6 +40,10 @@ const ( // IncorrectValue indicates that the HTLC ultimately extended to the // destination did not match the value that was expected. IncorrectValue FailCode = 5 + + // UnknownError indicates the error which should be returned, but + // not exist in specification yet. + UnknownError FailCode = 6 ) // String returns a human-readable version of the FailCode type.