diff --git a/htlcswitch/circuit.go b/htlcswitch/circuit.go new file mode 100644 index 00000000..978050ea --- /dev/null +++ b/htlcswitch/circuit.go @@ -0,0 +1,153 @@ +package htlcswitch + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "sync" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" +) + +// circuitKey uniquely identifies an active circuit between two open channels. +// Currently, the payment hash is used to uniquely identify each circuit. +type circuitKey [sha256.Size]byte + +// String represent the circuit key in string format. +func (k *circuitKey) String() string { + return hex.EncodeToString(k[:]) +} + +// paymentCircuit is used by htlc switch subsystem in order to determine +// backward path for settle/fail htlc messages. A payment circuit will be +// created once a channel link forwards the htlc add request and removed when we +// receive settle/fail htlc message. +// +// NOTE: In current implementation of htlc switch, the payment circuit might be +// uniquely identified by payment hash but in future we implement the payment +// fragmentation which makes possible for number of payments to have +// identical payments hashes, but different source or destination. +// +// For example if Alice(A) want to send 2BTC to Bob(B), then payment will be +// split on two parts and node N3 will have circuit with the same payment hash, +// and destination, but different channel source (N1,N2). +// +// 1BTC N1 1BTC +// + --------- o --------- + +// 2BTC | | 2BTC +// A o ------ o N0 N3 o ------ o B +// | | +// + --------- o --------- + +// 1BTC N2 1BTC +// +type paymentCircuit struct { + // PaymentHash used as unique identifier of payment. + PaymentHash circuitKey + + // Src identifies the channel from which add htlc request is came from + // and to which settle/fail htlc request will be returned back. Once the + // switch forwards the settle/fail message to the src the circuit is + // considered to be completed. + // TODO(andrew.shvv) use short channel id instead. + Src lnwire.ChannelID + + // Dest identifies the channel to which we propagate the htlc add + // update and from which we are expecting to receive htlc settle/fail + // request back. + // TODO(andrew.shvv) use short channel id instead. + Dest lnwire.ChannelID + + // RefCount is used to count the circuits with the same circuit key. + RefCount int +} + +// newPaymentCircuit creates new payment circuit instance. +func newPaymentCircuit(src, dest lnwire.ChannelID, key circuitKey) *paymentCircuit { + return &paymentCircuit{ + Src: src, + Dest: dest, + PaymentHash: key, + RefCount: 1, + } +} + +// isEqual checks the equality of two payment circuits. +func (a *paymentCircuit) isEqual(b *paymentCircuit) bool { + return bytes.Equal(a.PaymentHash[:], b.PaymentHash[:]) && + a.Src == b.Src && + a.Dest == b.Dest +} + +// circuitMap is a thread safe storage of circuits. Each circuit key (payment +// hash) might have numbers of circuits corresponding to it +// because of future payment fragmentation, now every circuit might be uniquely +// identified by payment hash (1-1 mapping). +// +// NOTE: Also we have the htlc debug mode and in this mode we have the same +// payment hash for all htlcs. +// TODO(andrew.shvv) make it persistent +type circuitMap struct { + mutex sync.RWMutex + circuits map[circuitKey]*paymentCircuit +} + +// newCircuitMap initialized circuit map with previously stored circuits and +// return circuit map instance. +func newCircuitMap() *circuitMap { + return &circuitMap{ + circuits: make(map[circuitKey]*paymentCircuit), + } +} + +// add function adds circuit in circuit map. +func (m *circuitMap) add(circuit *paymentCircuit) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + // Examine the circuit map to see if this + // circuit is already in use or not. If so, + // then we'll simply increment the reference + // count. Otherwise, we'll create a new circuit + // from scratch. + // TODO(roasbeef): include dest+src+amt in key + if c, ok := m.circuits[circuit.PaymentHash]; ok { + c.RefCount++ + } else { + m.circuits[circuit.PaymentHash] = circuit + } + + return nil +} + +// remove function removes circuit from map. +func (m *circuitMap) remove(key circuitKey) ( + *paymentCircuit, error) { + + m.mutex.Lock() + defer m.mutex.Unlock() + + if circuit, ok := m.circuits[key]; ok { + if circuit.RefCount--; circuit.RefCount == 0 { + delete(m.circuits, key) + } + return circuit, nil + } + + return nil, errors.Errorf("can't find circuit"+ + " for key %v", key) +} + +// pending returns number of circuits which are waiting for to be completed +// (settle/fail responses to be received) +func (m *circuitMap) pending() int { + m.mutex.RLock() + defer m.mutex.RUnlock() + + var length int + for _, circuits := range m.circuits { + length += circuits.RefCount + } + + return length +} diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go new file mode 100644 index 00000000..d370b111 --- /dev/null +++ b/htlcswitch/interfaces.go @@ -0,0 +1,75 @@ +package htlcswitch + +import ( + "crypto/sha256" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +// ChannelLink is an interface which represents the subsystem for managing +// the incoming htlc requests, applying the changes to the channel, and also +// propagating/forwarding it to htlc switch. +// +// abstraction level +// ^ +// | +// | - - - - - - - - - - - - Lightning - - - - - - - - - - - - - +// | +// | (Switch) (Switch) (Switch) +// | Alice <-- channel link --> Bob <-- channel link --> Carol +// | +// | - - - - - - - - - - - - - TCP - - - - - - - - - - - - - - - +// | +// | (Peer) (Peer) (Peer) +// | Alice <----- tcp conn --> Bob <---- tcp conn -----> Carol +// | +// +type ChannelLink interface { + // HandleSwitchPacket handles the switch packets. This packets might be + // forwarded to us from another channel link in case the htlc update + // came from another peer or if the update was created by user + // initially. + HandleSwitchPacket(*htlcPacket) + + // HandleChannelUpdate handles the htlc requests as settle/add/fail + // which sent to us from remote peer we have a channel with. + HandleChannelUpdate(lnwire.Message) + + // ChanID returns the unique identifier of the channel link. + ChanID() lnwire.ChannelID + + // Bandwidth returns the amount of satoshis which current link might + // pass through channel link. + Bandwidth() btcutil.Amount + + // Stats return the statistics of channel link. Number of updates, + // total sent/received satoshis. + Stats() (uint64, btcutil.Amount, btcutil.Amount) + + // Peer returns the representation of remote peer with which we + // have the channel link opened. + Peer() Peer + + // Start/Stop are used to initiate the start/stop of the the channel + // link functioning. + Start() error + Stop() +} + +// Peer is an interface which represents the remote lightning node inside our +// system. +type Peer interface { + // SendMessage sends message to remote peer. + SendMessage(lnwire.Message) error + + // ID returns the lightning network peer id. + ID() [sha256.Size]byte + + // PubKey returns the peer public key. + PubKey() []byte + + // Disconnect disconnects with peer if we have error which we can't + // properly handle. + Disconnect() +} diff --git a/htlcswitch/iterator.go b/htlcswitch/iterator.go index 0a9152ed..833aaff6 100644 --- a/htlcswitch/iterator.go +++ b/htlcswitch/iterator.go @@ -8,24 +8,24 @@ import ( "github.com/roasbeef/btcutil" ) -// hopID represents the id which is used by propagation subsystem in order to +// HopID represents the id which is used by propagation subsystem in order to // identify lightning network node. // TODO(andrew.shvv) remove after switching to the using channel id. -type hopID [ripemd160.Size]byte +type HopID [ripemd160.Size]byte -// newHopID creates new instance of hop form node public key. -func newHopID(pubKey []byte) hopID { - var routeId hopID - copy(routeId[:], btcutil.Hash160(pubKey)) - return routeId +// NewHopID creates new instance of hop form node public key. +func NewHopID(pubKey []byte) HopID { + var routeID HopID + copy(routeID[:], btcutil.Hash160(pubKey)) + return routeID } // String returns string representation of hop id. -func (h hopID) String() string { +func (h HopID) String() string { return hex.EncodeToString(h[:]) } // IsEqual checks does the two hop ids are equal. -func (h hopID) IsEqual(h2 hopID) bool { +func (h HopID) IsEqual(h2 HopID) bool { return bytes.Equal(h[:], h2[:]) } diff --git a/htlcswitch/log.go b/htlcswitch/log.go new file mode 100644 index 00000000..19c83038 --- /dev/null +++ b/htlcswitch/log.go @@ -0,0 +1,54 @@ +package htlcswitch + +import ( + "errors" + "io" + + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + DisableLog() +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until either UseLogger or SetLogWriter are called. +func DisableLog() { + log = btclog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} + +// SetLogWriter uses a specified io.Writer to output package logging info. +// This allows a caller to direct package logging output without needing a +// dependency on seelog. If the caller is also using btclog, UseLogger should +// be used instead. +func SetLogWriter(w io.Writer, level string) error { + if w == nil { + return errors.New("nil writer") + } + + lvl, ok := btclog.LogLevelFromString(level) + if !ok { + return errors.New("invalid log level") + } + + l, err := btclog.NewLoggerFromWriter(w, lvl) + if err != nil { + return err + } + + UseLogger(l) + return nil +} diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go new file mode 100644 index 00000000..442a6300 --- /dev/null +++ b/htlcswitch/mock.go @@ -0,0 +1,202 @@ +package htlcswitch + +import ( + "crypto/sha256" + "sync" + "testing" + + "sync/atomic" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +type mockServer struct { + sync.Mutex + + started int32 + shutdown int32 + wg sync.WaitGroup + quit chan bool + + t *testing.T + name string + messages chan lnwire.Message + + id []byte + htlcSwitch *Switch + + recordFuncs []func(lnwire.Message) +} + +var _ Peer = (*mockServer)(nil) + +func newMockServer(t *testing.T, name string) *mockServer { + return &mockServer{ + t: t, + id: []byte(name), + name: name, + messages: make(chan lnwire.Message, 3000), + + quit: make(chan bool), + htlcSwitch: New(Config{}), + recordFuncs: make([]func(lnwire.Message), 0), + } +} + +func (s *mockServer) Start() error { + if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { + return nil + } + + s.htlcSwitch.Start() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + for { + select { + case msg := <-s.messages: + for _, f := range s.recordFuncs { + f(msg) + } + + if err := s.readHandler(msg); err != nil { + s.Lock() + defer s.Unlock() + s.t.Fatalf("%v server error: %v", s.name, err) + } + case <-s.quit: + return + } + } + }() + + return nil +} + +// messageInterceptor is function that handles the incoming peer messages and +// may decide should we handle it or not. +type messageInterceptor func(m lnwire.Message) + +// Record is used to set the function which will be triggered when new +// lnwire message was received. +func (s *mockServer) record(f messageInterceptor) { + s.recordFuncs = append(s.recordFuncs, f) +} + +func (s *mockServer) SendMessage(message lnwire.Message) error { + select { + case s.messages <- message: + case <-s.quit: + } + + return nil +} + +func (s *mockServer) readHandler(message lnwire.Message) error { + var targetChan lnwire.ChannelID + + switch msg := message.(type) { + case *lnwire.UpdateAddHTLC: + targetChan = msg.ChanID + case *lnwire.UpdateFufillHTLC: + targetChan = msg.ChanID + case *lnwire.UpdateFailHTLC: + targetChan = msg.ChanID + case *lnwire.RevokeAndAck: + targetChan = msg.ChanID + case *lnwire.CommitSig: + targetChan = msg.ChanID + default: + return errors.New("unknown message type") + } + + // Dispatch the commitment update message to the proper + // channel link dedicated to this channel. + link, err := s.htlcSwitch.GetLink(targetChan) + if err != nil { + return err + } + + // Create goroutine for this, in order to be able to properly stop + // the server when handler stacked (server unavailable) + done := make(chan struct{}) + go func() { + defer func() { + done <- struct{}{} + }() + + link.HandleChannelUpdate(message) + }() + select { + case <-done: + case <-s.quit: + } + + return nil +} + +func (s *mockServer) ID() [sha256.Size]byte { + return [sha256.Size]byte{} +} + +func (s *mockServer) PubKey() []byte { + return s.id +} + +func (s *mockServer) Disconnect() { + s.Stop() + s.t.Fatalf("server %v was disconnected", s.name) +} + +func (s *mockServer) Stop() { + if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { + return + } + + go s.htlcSwitch.Stop() + + close(s.quit) + s.wg.Wait() +} + +func (s *mockServer) String() string { + return string(s.id) +} + +type mockChannelLink struct { + chanID lnwire.ChannelID + peer Peer + packets chan *htlcPacket +} + +func newMockChannelLink(chanID lnwire.ChannelID, + peer Peer) *mockChannelLink { + return &mockChannelLink{ + chanID: chanID, + packets: make(chan *htlcPacket, 1), + peer: peer, + } +} + +func (f *mockChannelLink) HandleSwitchPacket(packet *htlcPacket) { + f.packets <- packet +} + +func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) { +} + +func (f *mockChannelLink) Stats() (uint64, btcutil.Amount, btcutil.Amount) { + return 0, 0, 0 +} + +func (f *mockChannelLink) ChanID() lnwire.ChannelID { return f.chanID } +func (f *mockChannelLink) Bandwidth() btcutil.Amount { return 99999999 } +func (f *mockChannelLink) Peer() Peer { return f.peer } +func (f *mockChannelLink) Start() error { return nil } +func (f *mockChannelLink) Stop() {} + +var _ ChannelLink = (*mockChannelLink)(nil) diff --git a/htlcswitch/packet.go b/htlcswitch/packet.go new file mode 100644 index 00000000..4bc2e229 --- /dev/null +++ b/htlcswitch/packet.go @@ -0,0 +1,78 @@ +package htlcswitch + +import ( + "crypto/sha256" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +// htlcPacket is a wrapper around htlc lnwire update, which adds additional +// information which is needed by this package. +type htlcPacket struct { + // payHash payment hash of htlc request. + // NOTE: This fields is initialized only in settle and fail packets. + payHash [sha256.Size]byte + + // dest is the next channel to which this update will be applied. + // TODO(andrew.shvv) use short channel id instead. + dest HopID + + // src is a previous channel to which htlc was applied. + // TODO(andrew.shvv) use short channel id instead. + src lnwire.ChannelID + + // htlc lnwire message type of which depends on switch request type. + htlc lnwire.Message + + // TODO(andrew.shvv) should be removed after introducing sphinx payment. + amount btcutil.Amount +} + +// newInitPacket creates htlc switch add packet which encapsulates the +// add htlc request and additional information for proper forwarding over +// htlc switch. +func newInitPacket(dest HopID, htlc *lnwire.UpdateAddHTLC) *htlcPacket { + return &htlcPacket{ + dest: dest, + htlc: htlc, + } +} + +// newAddPacket creates htlc switch add packet which encapsulates the +// add htlc request and additional information for proper forwarding over +// htlc switch. +func newAddPacket(src lnwire.ChannelID, dest HopID, + htlc *lnwire.UpdateAddHTLC) *htlcPacket { + return &htlcPacket{ + dest: dest, + src: src, + htlc: htlc, + } +} + +// newSettlePacket creates htlc switch ack/settle packet which encapsulates the +// settle htlc request which should be created and sent back by last hope in +// htlc path. +func newSettlePacket(src lnwire.ChannelID, htlc *lnwire.UpdateFufillHTLC, + payHash [sha256.Size]byte, amount btcutil.Amount) *htlcPacket { + return &htlcPacket{ + src: src, + payHash: payHash, + htlc: htlc, + amount: amount, + } +} + +// newFailPacket creates htlc switch fail packet which encapsulates the fail +// htlc request which propagated back to the original hope who sent the htlc +// add request if something wrong happened on the path to the final destination. +func newFailPacket(src lnwire.ChannelID, htlc *lnwire.UpdateFailHTLC, + payHash [sha256.Size]byte, amount btcutil.Amount) *htlcPacket { + return &htlcPacket{ + src: src, + payHash: payHash, + htlc: htlc, + amount: amount, + } +} diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go new file mode 100644 index 00000000..3713efb7 --- /dev/null +++ b/htlcswitch/switch.go @@ -0,0 +1,861 @@ +package htlcswitch + +import ( + "sync" + "sync/atomic" + "time" + + "crypto/sha256" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +var ( + // ErrChannelLinkNotFound is used when channel link hasn't been found. + ErrChannelLinkNotFound = errors.New("channel link not found") + + // zeroPreimage is the empty preimage which is returned when we have + // some errors. + zeroPreimage [sha256.Size]byte +) + +// pendingPayment represents the payment which made by user and waits for +// updates to be received whether the payment has been rejected or proceed +// successfully. +type pendingPayment struct { + paymentHash lnwallet.PaymentHash + amount btcutil.Amount + + preimage chan [sha256.Size]byte + err chan error +} + +// forwardPacketCmd encapsulates switch packet and adds error channel to +// receive error from request handler. +type forwardPacketCmd struct { + pkt *htlcPacket + err chan error +} + +// ChannelCloseType is a enum which signals the type of channel closure the +// peer should execute. +type ChannelCloseType uint8 + +const ( + // CloseRegular indicates a regular cooperative channel closure + // should be attempted. + CloseRegular ChannelCloseType = iota + + // CloseBreach indicates that a channel breach has been dtected, and + // the link should immediately be marked as unavailable. + CloseBreach +) + +// ChanClose represents a request which close a particular channel specified by +// its id. +type ChanClose struct { + // CloseType is a variable which signals the type of channel closure the + // peer should execute. + CloseType ChannelCloseType + + // ChanPoint represent the id of the channel which should be closed. + ChanPoint *wire.OutPoint + + // Updates is used by request creator to receive the notifications about + // execution of the close channel request. + Updates chan *lnrpc.CloseStatusUpdate + + // Err is used by request creator to receive request execution error. + Err chan error +} + +// Config defines the configuration for the service. ALL elements within the +// configuration MUST be non-nil for the service to carry out its duties. +type Config struct { + // LocalChannelClose kicks-off the workflow to execute a cooperative + // or forced unilateral closure of the channel initiated by a local + // subsystem. + LocalChannelClose func(pubKey []byte, request *ChanClose) +} + +// Switch is a central messaging bus for all incoming/outgoing htlc's. +// The goal of the switch is forward the incoming/outgoing htlc messages from +// one channel to another, and also propagate the settle/fail htlc messages +// back to original requester by using payment circuits. Also switch is +// responsible for notifying the user about result of payment request. +type Switch struct { + started int32 + shutdown int32 + wg sync.WaitGroup + quit chan struct{} + + // cfg is a copy of the configuration struct that the htlc switch + // service was initialized with. + cfg *Config + + // pendingPayments is correspondence of user payments and its hashes, + // which is used to save the payments which made by user and notify + // them about result later. + pendingPayments map[lnwallet.PaymentHash][]*pendingPayment + pendingMutex sync.RWMutex + + // circuits is storage for payment circuits which are used to + // forward the settle/fail htlc updates back to the add htlc initiator. + circuits *circuitMap + + // links is a map of channel id and channel link which manages + // this channel. + links map[lnwire.ChannelID]ChannelLink + + // linksIndex is a map which is needed for quick lookup of channels + // which are belongs to specific peer. + linksIndex map[HopID][]ChannelLink + + // forwardCommands is used for propogating the htlc packet forward + // requests. + forwardCommands chan *forwardPacketCmd + + // chanCloseRequests is used to transfer the channel close request to + // the channel close handler. + chanCloseRequests chan *ChanClose + + // linkControl is a channel used to propogate add/remove/get htlc + // switch handler commands. + linkControl chan interface{} +} + +// New creates the new instance of htlc switch. +func New(cfg Config) *Switch { + return &Switch{ + cfg: &cfg, + circuits: newCircuitMap(), + links: make(map[lnwire.ChannelID]ChannelLink), + linksIndex: make(map[HopID][]ChannelLink), + pendingPayments: make(map[lnwallet.PaymentHash][]*pendingPayment), + forwardCommands: make(chan *forwardPacketCmd), + chanCloseRequests: make(chan *ChanClose), + linkControl: make(chan interface{}), + quit: make(chan struct{}), + } +} + +// SendHTLC is used by other subsystems which aren't belong to htlc switch +// package in order to send the htlc update. +func (s *Switch) SendHTLC(nextNode []byte, update lnwire.Message) ( + [sha256.Size]byte, error) { + + htlc := update.(*lnwire.UpdateAddHTLC) + + // Create payment and add to the map of payment in order later to be + // able to retrieve it and return response to the user. + payment := &pendingPayment{ + err: make(chan error, 1), + preimage: make(chan [sha256.Size]byte, 1), + paymentHash: htlc.PaymentHash, + amount: htlc.Amount, + } + + // Check that we do not have the payment with the same id in order to + // prevent map override. + s.pendingMutex.Lock() + s.pendingPayments[htlc.PaymentHash] = append( + s.pendingPayments[htlc.PaymentHash], payment) + s.pendingMutex.Unlock() + + // Generate and send new update packet, if error will be received + // on this stage it means that packet haven't left boundaries of our + // system and something wrong happened. + hop := NewHopID(nextNode) + packet := newInitPacket(hop, htlc) + if err := s.forward(packet); err != nil { + s.removePendingPayment(payment.amount, payment.paymentHash) + return zeroPreimage, err + } + + // Returns channels so that other subsystem might wait/skip the + // waiting of handling of payment. + var preimage [sha256.Size]byte + var err error + + select { + case e := <-payment.err: + err = e + case <-s.quit: + return zeroPreimage, errors.New("service is shutdown") + } + + select { + case p := <-payment.preimage: + preimage = p + case <-s.quit: + return zeroPreimage, errors.New("service is shutdown") + } + + return preimage, err +} + +// forward is used in order to find next channel link and apply htlc +// update. Also this function is used by channel links itself in order to +// forward the update after it has been included in the channel. +func (s *Switch) forward(packet *htlcPacket) error { + command := &forwardPacketCmd{ + pkt: packet, + err: make(chan error, 1), + } + + select { + case s.forwardCommands <- command: + return <-command.err + case <-s.quit: + return errors.New("Htlc Switch was stopped") + } +} + +// handleLocalDispatch is used at the start/end of the htlc update life +// cycle. At the start (1) it is used to send the htlc to the channel link +// without creation of circuit. At the end (2) it is used to notify the user +// about the result of his payment is it was successful or not. +// +// Alice Bob Carol +// o --add----> o ---add----> o +// (1) +// +// (2) +// o <-settle-- o <--settle-- o +// Alice Bob Carol +// +func (s *Switch) handleLocalDispatch(payment *pendingPayment, packet *htlcPacket) error { + switch htlc := packet.htlc.(type) { + + // User have created the htlc update therefore we should find the + // appropriate channel link and send the payment over this link. + case *lnwire.UpdateAddHTLC: + // Try to find links by node destination. + links, err := s.getLinks(packet.dest) + if err != nil { + log.Errorf("unable to find links by "+ + "destination %v", err) + return errors.New(lnwire.UnknownDestination) + } + + // Try to find destination channel link with appropriate + // bandwidth. + var destination ChannelLink + for _, link := range links { + if link.Bandwidth() >= htlc.Amount { + destination = link + break + } + } + + // If the channel link we're attempting to forward the update + // over has insufficient capacity, then we'll cancel the HTLC + // as the payment cannot succeed. + if destination == nil { + log.Errorf("unable to find appropriate channel link "+ + "insufficient capacity, need %v", htlc.Amount) + return errors.New(lnwire.InsufficientCapacity) + } + + // Send the packet to the destination channel link which + // manages then channel. + destination.HandleSwitchPacket(packet) + return nil + + // We've just received a settle update which means we can finalize + // the user payment and return successful response. + case *lnwire.UpdateFufillHTLC: + // Notify the user that his payment was + // successfully proceed. + payment.err <- nil + payment.preimage <- htlc.PaymentPreimage + s.removePendingPayment(payment.amount, payment.paymentHash) + + // We've just received a fail update which means we can finalize + // the user payment and return fail response. + case *lnwire.UpdateFailHTLC: + // Retrieving the fail code from byte representation of error. + var userErr error + if code, err := htlc.Reason.ToFailCode(); err != nil { + userErr = errors.Errorf("can't decode fail code id"+ + "(%v): %v", htlc.ID, err) + } else { + userErr = errors.New(code) + } + + // Notify user that his payment was discarded. + payment.err <- userErr + payment.preimage <- zeroPreimage + s.removePendingPayment(payment.amount, payment.paymentHash) + + default: + return errors.New("wrong update type") + } + + return nil +} + +// handlePacketForward is used in cases when we need forward the htlc +// update from one channel link to another and be able to propagate the +// settle/fail updates back. This behaviour is achieved by creation of payment +// circuits. +func (s *Switch) handlePacketForward(packet *htlcPacket) error { + switch htlc := packet.htlc.(type) { + + // Channel link forwarded us a new htlc, therefore we initiate the + // payment circuit within our internal state so we can properly forward + // the ultimate settle message back latter. + case *lnwire.UpdateAddHTLC: + source, err := s.getLink(packet.src) + if err != nil { + err := errors.Errorf("unable to find channel link "+ + "by channel point (%v): %v", packet.src, err) + log.Error(err) + return err + } + + // Try to find links by node destination. + links, err := s.getLinks(packet.dest) + if err != nil { + // If packet was forwarded from another + // channel link than we should notify this + // link that some error occurred. + reason := []byte{byte(lnwire.UnknownDestination)} + go source.HandleSwitchPacket(newFailPacket( + packet.src, + &lnwire.UpdateFailHTLC{ + Reason: reason, + }, + htlc.PaymentHash, 0, + )) + err := errors.Errorf("unable to find links with "+ + "destination %v", err) + log.Error(err) + return err + } + + // Try to find destination channel link with appropriate + // bandwidth. + var destination ChannelLink + for _, link := range links { + if link.Bandwidth() >= htlc.Amount { + destination = link + break + } + } + + // If the channel link we're attempting to forward the update + // over has insufficient capacity, then we'll cancel the htlc + // as the payment cannot succeed. + if destination == nil { + // If packet was forwarded from another + // channel link than we should notify this + // link that some error occurred. + reason := []byte{byte(lnwire.InsufficientCapacity)} + go source.HandleSwitchPacket(newFailPacket( + packet.src, + &lnwire.UpdateFailHTLC{ + Reason: reason, + }, + htlc.PaymentHash, + 0, + )) + + err := errors.Errorf("unable to find appropriate "+ + "channel link insufficient capacity, need "+ + "%v", htlc.Amount) + log.Error(err) + return err + } + + // If packet was forwarded from another channel link than we + // should create circuit (remember the path) in order to + // forward settle/fail packet back. + if err := s.circuits.add(newPaymentCircuit( + source.ChanID(), + destination.ChanID(), + htlc.PaymentHash, + )); err != nil { + reason := []byte{byte(lnwire.UnknownError)} + go source.HandleSwitchPacket(newFailPacket( + packet.src, + &lnwire.UpdateFailHTLC{ + Reason: reason, + }, + htlc.PaymentHash, + 0, + )) + err := errors.Errorf("unable to add circuit: "+ + "%v", err) + log.Error(err) + return err + } + + // Send the packet to the destination channel link which + // manages the channel. + destination.HandleSwitchPacket(packet) + return nil + + // We've just received a settle packet which means we can finalize the + // payment circuit by forwarding the settle msg to the channel from + // which htlc add packet was initially received. + case *lnwire.UpdateFufillHTLC, *lnwire.UpdateFailHTLC: + // Exit if we can't find and remove the active circuit to + // continue propagating the fail over. + circuit, err := s.circuits.remove(packet.payHash) + if err != nil { + err := errors.Errorf("unable to remove "+ + "circuit for payment hash: %v", packet.payHash) + log.Error(err) + return err + } + + // Propagating settle/fail htlc back to src of add htlc packet. + source, err := s.getLink(circuit.Src) + if err != nil { + err := errors.Errorf("unable to get source "+ + "channel link to forward settle/fail htlc: %v", + err) + log.Error(err) + return err + } + + log.Debugf("Closing completed onion "+ + "circuit for %x: %v<->%v", packet.payHash[:], + circuit.Src, circuit.Dest) + + source.HandleSwitchPacket(packet) + return nil + + default: + return errors.New("wrong update type") + } +} + +// CloseLink creates and sends the the close channel command. +func (s *Switch) CloseLink(chanPoint *wire.OutPoint, + closeType ChannelCloseType) (chan *lnrpc.CloseStatusUpdate, chan error) { + + // TODO(roasbeef) abstract out the close updates. + updateChan := make(chan *lnrpc.CloseStatusUpdate, 1) + errChan := make(chan error, 1) + + command := &ChanClose{ + CloseType: closeType, + ChanPoint: chanPoint, + Updates: updateChan, + Err: errChan, + } + + select { + case s.chanCloseRequests <- command: + return updateChan, errChan + + case <-s.quit: + errChan <- errors.New("unable close channel link, htlc " + + "switch already stopped") + close(updateChan) + return updateChan, errChan + } +} + +// handleCloseLink sends a message to the peer responsible for the target +// channel point, instructing it to initiate a cooperative channel closure. +func (s *Switch) handleChanelClose(req *ChanClose) { + chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) + + var link ChannelLink + for _, l := range s.links { + if l.ChanID() == chanID { + link = l + } + } + + if link == nil { + req.Err <- errors.Errorf("channel with ChannelID(%v) not "+ + "found", chanID) + return + } + + log.Debugf("requesting local channel close, peer(%v) channel(%v)", + link.Peer(), chanID) + + // TODO(roasbeef): if type was CloseBreach initiate force closure with + // all other channels (if any) we have with the remote peer. + s.cfg.LocalChannelClose(link.Peer().PubKey(), req) + return +} + +// startHandling start handling inner command requests and print the +// htlc switch statistics. +// NOTE: Should be run as goroutine. +func (s *Switch) startHandling() { + defer s.wg.Done() + + // Remove all links on stop. + defer func() { + for _, link := range s.links { + if err := s.removeLink(link.ChanID()); err != nil { + log.Errorf("unable to remove "+ + "channel link on stop: %v", err) + } + } + }() + + // TODO(roasbeef): cleared vs settled distinction + var prevNumUpdates uint64 + var prevSatSent btcutil.Amount + var prevSatRecv btcutil.Amount + + for { + select { + case req := <-s.chanCloseRequests: + s.handleChanelClose(req) + + case cmd := <-s.forwardCommands: + var paymentHash lnwallet.PaymentHash + var amount btcutil.Amount + + switch m := cmd.pkt.htlc.(type) { + case *lnwire.UpdateAddHTLC: + paymentHash = m.PaymentHash + amount = m.Amount + case *lnwire.UpdateFufillHTLC, *lnwire.UpdateFailHTLC: + paymentHash = cmd.pkt.payHash + amount = cmd.pkt.amount + default: + cmd.err <- errors.New("wrong type of update") + return + } + + payment, err := s.findPayment(amount, paymentHash) + if err != nil { + cmd.err <- s.handlePacketForward(cmd.pkt) + } else { + cmd.err <- s.handleLocalDispatch(payment, cmd.pkt) + } + + case <-time.Tick(10 * time.Second): + var overallNumUpdates uint64 + var overallSatSent btcutil.Amount + var overallSatRecv btcutil.Amount + + for _, link := range s.links { + updates, sent, recv := link.Stats() + overallNumUpdates += updates + overallSatSent += sent + overallSatRecv += recv + } + + diffNumUpdates := overallNumUpdates - prevNumUpdates + diffSatSent := overallSatSent - prevSatSent + diffSatRecv := overallSatRecv - prevSatRecv + + if diffNumUpdates == 0 { + continue + } + + log.Infof("sent %v satoshis received %v satoshi "+ + " in the last 10 seconds (%v tx/sec)", + diffSatSent, diffSatRecv, float64(diffNumUpdates)/10) + + prevNumUpdates = overallNumUpdates + prevSatSent = overallSatSent + prevSatRecv = overallSatRecv + + case cmd := <-s.linkControl: + switch cmd := cmd.(type) { + case *addLinkCmd: + cmd.err <- s.addLink(cmd.link) + case *removeLinkCmd: + cmd.err <- s.removeLink(cmd.chanID) + case *getLinkCmd: + link, err := s.getLink(cmd.chanID) + cmd.done <- link + cmd.err <- err + case *getLinksCmd: + links, err := s.getLinks(cmd.peer) + cmd.done <- links + cmd.err <- err + } + + case <-s.quit: + return + } + } +} + +// Start starts all helper goroutines required for the operation of the switch. +func (s *Switch) Start() error { + if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { + log.Warn("Htlc Switch already started") + return nil + } + + log.Infof("Htlc Switch starting") + + s.wg.Add(1) + go s.startHandling() + + return nil +} + +// Stop gracefully stops all active helper goroutines, then waits until they've +// exited. +func (s *Switch) Stop() error { + if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { + log.Warn("Htlc Switch already stopped") + return nil + } + + log.Infof("Htlc Switch shutting down") + + close(s.quit) + s.wg.Wait() + + return nil +} + +// addLinkCmd is a add link command wrapper, it is used to propagate handler +// parameters and return handler error. +type addLinkCmd struct { + link ChannelLink + err chan error +} + +// AddLink is used to initiate the handling of the add link command. The +// request will be propagated and handled in the main goroutine. +func (s *Switch) AddLink(link ChannelLink) error { + command := &addLinkCmd{ + link: link, + err: make(chan error, 1), + } + + select { + case s.linkControl <- command: + return <-command.err + case <-s.quit: + return errors.New("Htlc Switch was stopped") + } +} + +// addLink is used to add the newly created channel link and start +// use it to handle the channel updates. +func (s *Switch) addLink(link ChannelLink) error { + if err := link.Start(); err != nil { + return err + + } + + // Add channel link to the channel map, in order to quickly lookup + // channel by channel id. + s.links[link.ChanID()] = link + + // Add channel link to the index map, in order to quickly lookup + // channels by peer pub key. + hop := NewHopID(link.Peer().PubKey()) + s.linksIndex[hop] = append(s.linksIndex[hop], link) + + log.Infof("Added channel link with ChannelID(%v), bandwidth=%v", + link.ChanID(), link.Bandwidth()) + return nil +} + +// getLinkCmd is a get link command wrapper, it is used to propagate handler +// parameters and return handler error. +type getLinkCmd struct { + chanID lnwire.ChannelID + err chan error + done chan ChannelLink +} + +// GetLink is used to initiate the handling of the get link command. The +// request will be propagated/handled to/in the main goroutine. +func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelLink, error) { + command := &getLinkCmd{ + chanID: chanID, + err: make(chan error, 1), + done: make(chan ChannelLink, 1), + } + + select { + case s.linkControl <- command: + return <-command.done, <-command.err + case <-s.quit: + return nil, errors.New("Htlc Switch was stopped") + } +} + +// getLink returns the channel link by its channel point. +func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) { + link, ok := s.links[chanID] + if !ok { + return nil, ErrChannelLinkNotFound + } + + return link, nil +} + +// removeLinkCmd is a get link command wrapper, it is used to propagate handler +// parameters and return handler error. +type removeLinkCmd struct { + chanID lnwire.ChannelID + err chan error +} + +// RemoveLink is used to initiate the handling of the remove link command. The +// request will be propagated/handled to/in the main goroutine. +func (s *Switch) RemoveLink(chanID lnwire.ChannelID) error { + command := &removeLinkCmd{ + chanID: chanID, + err: make(chan error, 1), + } + + select { + case s.linkControl <- command: + return <-command.err + case <-s.quit: + return errors.New("Htlc Switch was stopped") + } +} + +// removeLink is used to remove and stop the channel link. +func (s *Switch) removeLink(chanID lnwire.ChannelID) error { + link, ok := s.links[chanID] + if !ok { + return ErrChannelLinkNotFound + } + + // Remove the channel from channel map. + delete(s.links, link.ChanID()) + + // Remove the channel from channel index. + hop := NewHopID(link.Peer().PubKey()) + links := s.linksIndex[hop] + for i, l := range links { + if l.ChanID() == link.ChanID() { + // Delete without preserving order + // Google: Golang slice tricks + links[i] = links[len(links)-1] + links[len(links)-1] = nil + s.linksIndex[hop] = links[:len(links)-1] + + if len(s.linksIndex[hop]) == 0 { + delete(s.linksIndex, hop) + } + break + } + } + + go link.Stop() + log.Infof("Remove channel link with ChannelID(%v)", link.ChanID()) + + return nil +} + +// getLinksCmd is a get links command wrapper, it is used to propagate handler +// parameters and return handler error. +type getLinksCmd struct { + peer HopID + err chan error + done chan []ChannelLink +} + +// GetLinks is used to initiate the handling of the get links command. The +// request will be propagated/handled to/in the main goroutine. +func (s *Switch) GetLinks(hop HopID) ([]ChannelLink, error) { + command := &getLinksCmd{ + peer: hop, + err: make(chan error, 1), + done: make(chan []ChannelLink, 1), + } + + select { + case s.linkControl <- command: + return <-command.done, <-command.err + case <-s.quit: + return nil, errors.New("Htlc Switch was stopped") + } +} + +// getLinks is function which returns the channel links of the peer by hop +// destination id. +func (s *Switch) getLinks(destination HopID) ([]ChannelLink, error) { + links, ok := s.linksIndex[destination] + if !ok { + return nil, errors.Errorf("unable to locate channel link by"+ + "destination hop id %v", destination) + } + + result := make([]ChannelLink, len(links)) + for i, link := range links { + result[i] = ChannelLink(link) + } + + return result, nil +} + +// removePendingPayment is the helper function which removes the pending user +// payment. +func (s *Switch) removePendingPayment(amount btcutil.Amount, + hash lnwallet.PaymentHash) error { + s.pendingMutex.Lock() + defer s.pendingMutex.Unlock() + + payments, ok := s.pendingPayments[hash] + if ok { + for i, payment := range payments { + if payment.amount == amount { + // Delete without preserving order + // Google: Golang slice tricks + payments[i] = payments[len(payments)-1] + payments[len(payments)-1] = nil + s.pendingPayments[hash] = payments[:len(payments)-1] + + if len(s.pendingPayments[hash]) == 0 { + delete(s.pendingPayments, hash) + } + + return nil + } + } + } + + return errors.Errorf("unable to remove pending payment with "+ + "hash(%v) and amount(%v)", hash, amount) +} + +// findPayment is the helper function which find the payment. +func (s *Switch) findPayment(amount btcutil.Amount, + hash lnwallet.PaymentHash) (*pendingPayment, error) { + s.pendingMutex.RLock() + defer s.pendingMutex.RUnlock() + + payments, ok := s.pendingPayments[hash] + if ok { + for _, payment := range payments { + if payment.amount == amount { + return payment, nil + } + } + } + + return nil, errors.Errorf("unable to remove pending payment with "+ + "hash(%v) and amount(%v)", hash, amount) +} + +// numPendingPayments is helper function which returns the overall number of +// pending user payments. +func (s *Switch) numPendingPayments() int { + var l int + for _, payments := range s.pendingPayments { + l += len(payments) + } + + return l +} diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go new file mode 100644 index 00000000..a6c62c51 --- /dev/null +++ b/htlcswitch/switch_test.go @@ -0,0 +1,378 @@ +package htlcswitch + +import ( + "bytes" + "crypto/sha256" + "testing" + "time" + + "github.com/btcsuite/fastsha256" + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" +) + +var ( + hash1, _ = chainhash.NewHash(bytes.Repeat([]byte("a"), 32)) + hash2, _ = chainhash.NewHash(bytes.Repeat([]byte("b"), 32)) + + chanPoint1 = wire.NewOutPoint(hash1, 0) + chanPoint2 = wire.NewOutPoint(hash2, 0) + + chanID1 = lnwire.NewChanIDFromOutPoint(chanPoint1) + chanID2 = lnwire.NewChanIDFromOutPoint(chanPoint2) +) + +// TestSwitchForward checks the ability of htlc switch to forward add/settle +// requests. +func TestSwitchForward(t *testing.T) { + var packet *htlcPacket + + alicePeer := newMockServer(t, "alice") + bobPeer := newMockServer(t, "bob") + + aliceChannelLink := newMockChannelLink(chanID1, alicePeer) + bobChannelLink := newMockChannelLink(chanID2, bobPeer) + + s := New(Config{}) + s.Start() + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add alice link: %v", err) + } + if err := s.AddLink(bobChannelLink); err != nil { + t.Fatalf("unable to add bob link: %v", err) + } + + // Create request which should be forwarder from alice channel + // link to bob channel link. + preimage := [sha256.Size]byte{1} + rhash := fastsha256.Sum256(preimage[:]) + packet = newAddPacket( + aliceChannelLink.ChanID(), + NewHopID(bobChannelLink.Peer().PubKey()), + &lnwire.UpdateAddHTLC{ + PaymentHash: rhash, + Amount: 1, + }, + ) + + // Handle the request and checks that bob channel link received it. + if err := s.forward(packet); err != nil { + t.Fatal(err) + } + + select { + case <-bobChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + if s.circuits.pending() != 1 { + t.Fatal("wrong amount of circuits") + } + + // Create settle request pretending that bob link handled + // the add htlc request and sent the htlc settle request back. This + // request should be forwarder back to alice link. + packet = newSettlePacket( + bobChannelLink.ChanID(), + &lnwire.UpdateFufillHTLC{ + PaymentPreimage: preimage, + }, + rhash, 1) + + // Handle the request and checks that payment circuit works properly. + if err := s.forward(packet); err != nil { + t.Fatal(err) + } + + select { + case <-aliceChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to channelPoint") + } + + if s.circuits.pending() != 0 { + t.Fatal("wrong amount of circuits") + } +} + +// TestSwitchCancel checks that if htlc was rejected we remove unused +// circuits. +func TestSwitchCancel(t *testing.T) { + var request *htlcPacket + + alicePeer := newMockServer(t, "alice") + bobPeer := newMockServer(t, "bob") + + aliceChannelLink := newMockChannelLink(chanID1, alicePeer) + bobChannelLink := newMockChannelLink(chanID2, bobPeer) + + s := New(Config{}) + s.Start() + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add alice link: %v", err) + } + if err := s.AddLink(bobChannelLink); err != nil { + t.Fatalf("unable to add bob link: %v", err) + } + + // Create request which should be forwarder from alice channel link + // to bob channel link. + preimage := [sha256.Size]byte{1} + rhash := fastsha256.Sum256(preimage[:]) + request = newAddPacket( + aliceChannelLink.ChanID(), + NewHopID(bobChannelLink.Peer().PubKey()), + &lnwire.UpdateAddHTLC{ + PaymentHash: rhash, + Amount: 1, + }, + ) + + // Handle the request and checks that bob channel link received it. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-bobChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + if s.circuits.pending() != 1 { + t.Fatal("wrong amount of circuits") + } + + // Create settle request pretending that bob channel link handled + // the add htlc request and sent the htlc settle request back. This + // request should be forwarder back to alice channel link. + request = newFailPacket( + bobChannelLink.ChanID(), + &lnwire.UpdateFailHTLC{}, + rhash, 1) + + // Handle the request and checks that payment circuit works properly. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-aliceChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to channelPoint") + } + + if s.circuits.pending() != 0 { + t.Fatal("wrong amount of circuits") + } +} + +// TestSwitchAddSamePayment tests that we send the payment with the same +// payment hash. +func TestSwitchAddSamePayment(t *testing.T) { + var request *htlcPacket + + alicePeer := newMockServer(t, "alice") + bobPeer := newMockServer(t, "bob") + + aliceChannelLink := newMockChannelLink(chanID1, alicePeer) + bobChannelLink := newMockChannelLink(chanID2, bobPeer) + + s := New(Config{}) + s.Start() + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add alice link: %v", err) + } + if err := s.AddLink(bobChannelLink); err != nil { + t.Fatalf("unable to add bob link: %v", err) + } + + // Create request which should be forwarder from alice channel link + // to bob channel link. + preimage := [sha256.Size]byte{1} + rhash := fastsha256.Sum256(preimage[:]) + request = newAddPacket( + aliceChannelLink.ChanID(), + NewHopID(bobChannelLink.Peer().PubKey()), + &lnwire.UpdateAddHTLC{ + PaymentHash: rhash, + Amount: 1, + }, + ) + + // Handle the request and checks that bob channel link received it. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-bobChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + if s.circuits.pending() != 1 { + t.Fatal("wrong amount of circuits") + } + + // Handle the request and checks that bob channel link received it. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + if s.circuits.pending() != 2 { + t.Fatal("wrong amount of circuits") + } + + // Create settle request pretending that bob channel link handled + // the add htlc request and sent the htlc settle request back. This + // request should be forwarder back to alice channel link. + request = newFailPacket( + bobChannelLink.ChanID(), + &lnwire.UpdateFailHTLC{}, + rhash, 1) + + // Handle the request and checks that payment circuit works properly. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-aliceChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to channelPoint") + } + + if s.circuits.pending() != 1 { + t.Fatal("wrong amount of circuits") + } + + // Handle the request and checks that payment circuit works properly. + if err := s.forward(request); err != nil { + t.Fatal(err) + } + + select { + case <-aliceChannelLink.packets: + break + case <-time.After(time.Second): + t.Fatal("request was not propogated to channelPoint") + } + + if s.circuits.pending() != 0 { + t.Fatal("wrong amount of circuits") + } +} + +// TestSwitchSendPayment tests ability of htlc switch to respond to the +// users when response is came back from channel link. +func TestSwitchSendPayment(t *testing.T) { + alicePeer := newMockServer(t, "alice") + aliceChannelLink := newMockChannelLink(chanID1, alicePeer) + + s := New(Config{}) + s.Start() + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add link: %v", err) + } + + // Create request which should be forwarder from alice channel link + // to bob channel link. + preimage := [sha256.Size]byte{1} + rhash := fastsha256.Sum256(preimage[:]) + update := &lnwire.UpdateAddHTLC{ + PaymentHash: rhash, + Amount: 1, + } + + // Handle the request and checks that bob channel link received it. + errChan := make(chan error) + go func() { + _, err := s.SendHTLC(aliceChannelLink.Peer().PubKey(), update) + errChan <- err + }() + + go func() { + // Send the payment with the same payment hash and same + // amount and check that it will be propagated successfully + _, err := s.SendHTLC(aliceChannelLink.Peer().PubKey(), update) + errChan <- err + }() + + select { + case <-aliceChannelLink.packets: + break + case err := <-errChan: + t.Fatalf("unable to send payment: %v", err) + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + select { + case <-aliceChannelLink.packets: + break + case err := <-errChan: + t.Fatalf("unable to send payment: %v", err) + case <-time.After(time.Second): + t.Fatal("request was not propogated to destination") + } + + if s.numPendingPayments() != 2 { + t.Fatal("wrong amount of pending payments") + } + + if s.circuits.pending() != 0 { + t.Fatal("wrong amount of circuits") + } + + // Create fail request pretending that bob channel link handled + // the add htlc request with error and sent the htlc fail request + // back. This request should be forwarder back to alice channel link. + packet := newFailPacket(aliceChannelLink.ChanID(), + &lnwire.UpdateFailHTLC{ + Reason: []byte{byte(lnwire.IncorrectValue)}, + ID: 1, + }, + rhash, 1) + + if err := s.forward(packet); err != nil { + t.Fatalf("can't forward htlc packet: %v", err) + } + + select { + case err := <-errChan: + if err.Error() != errors.New(lnwire.IncorrectValue).Error() { + t.Fatal("err wasn't received") + } + case <-time.After(time.Second): + t.Fatal("err wasn't received") + } + + // Send second failure response and check that user were able to + // receive the error. + if err := s.forward(packet); err != nil { + t.Fatalf("can't forward htlc packet: %v", err) + } + + select { + case err := <-errChan: + if err.Error() != errors.New(lnwire.IncorrectValue).Error() { + t.Fatal("err wasn't received") + } + case <-time.After(time.Second): + t.Fatal("err wasn't received") + } + + if s.numPendingPayments() != 0 { + t.Fatal("wrong amount of pending payments") + } +} diff --git a/lnwire/update_fail_htlc.go b/lnwire/update_fail_htlc.go index 32c2afff..4b7a92b4 100644 --- a/lnwire/update_fail_htlc.go +++ b/lnwire/update_fail_htlc.go @@ -40,6 +40,10 @@ const ( // IncorrectValue indicates that the HTLC ultimately extended to the // destination did not match the value that was expected. IncorrectValue FailCode = 5 + + // UnknownError indicates the error which should be returned, but + // not exist in specification yet. + UnknownError FailCode = 6 ) // String returns a human-readable version of the FailCode type.