package htlcswitch import ( "bytes" "errors" "fmt" "math/rand" "sync" "sync/atomic" "time" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/ticker" ) const ( // DefaultFwdEventInterval is the duration between attempts to flush // pending forwarding events to disk. DefaultFwdEventInterval = 15 * time.Second // DefaultLogInterval is the duration between attempts to log statistics // about forwarding events. DefaultLogInterval = 10 * time.Second // DefaultAckInterval is the duration between attempts to ack any settle // fails in a forwarding package. DefaultAckInterval = 15 * time.Second // DefaultHTLCExpiry is the duration after which Adds will be cancelled // if they could not get added to an outgoing commitment. DefaultHTLCExpiry = time.Minute ) var ( // ErrChannelLinkNotFound is used when channel link hasn't been found. ErrChannelLinkNotFound = errors.New("channel link not found") // ErrDuplicateAdd signals that the ADD htlc was already forwarded // through the switch and is locked into another commitment txn. ErrDuplicateAdd = errors.New("duplicate add HTLC detected") // ErrUnknownErrorDecryptor signals that we were unable to locate the // error decryptor for this payment. This is likely due to restarting // the daemon. ErrUnknownErrorDecryptor = errors.New("unknown error decryptor") // ErrSwitchExiting signaled when the switch has received a shutdown // request. ErrSwitchExiting = errors.New("htlcswitch shutting down") // ErrNoLinksFound is an error returned when we attempt to retrieve the // active links in the switch for a specific destination. ErrNoLinksFound = errors.New("no channel links found") // ErrUnreadableFailureMessage is returned when the failure message // cannot be decrypted. ErrUnreadableFailureMessage = errors.New("unreadable failure message") // ErrLocalAddFailed signals that the ADD htlc for a local payment // failed to be processed. ErrLocalAddFailed = errors.New("local add HTLC failed") ) // plexPacket encapsulates switch packet and adds error channel to receive // error from request handler. type plexPacket struct { pkt *htlcPacket err chan error } // ChannelCloseType is an enum which signals the type of channel closure the // peer should execute. type ChannelCloseType uint8 const ( // CloseRegular indicates a regular cooperative channel closure // should be attempted. CloseRegular ChannelCloseType = iota // CloseBreach indicates that a channel breach has been detected, and // the link should immediately be marked as unavailable. CloseBreach ) // ChanClose represents a request which close a particular channel specified by // its id. type ChanClose struct { // CloseType is a variable which signals the type of channel closure the // peer should execute. CloseType ChannelCloseType // ChanPoint represent the id of the channel which should be closed. ChanPoint *wire.OutPoint // TargetFeePerKw is the ideal fee that was specified by the caller. // This value is only utilized if the closure type is CloseRegular. // This will be the starting offered fee when the fee negotiation // process for the cooperative closure transaction kicks off. TargetFeePerKw chainfee.SatPerKWeight // DeliveryScript is an optional delivery script to pay funds out to. DeliveryScript lnwire.DeliveryAddress // Updates is used by request creator to receive the notifications about // execution of the close channel request. Updates chan interface{} // Err is used by request creator to receive request execution error. Err chan error } // Config defines the configuration for the service. ALL elements within the // configuration MUST be non-nil for the service to carry out its duties. type Config struct { // FwdingLog is an interface that will be used by the switch to log // forwarding events. A forwarding event happens each time a payment // circuit is successfully completed. So when we forward an HTLC, and a // settle is eventually received. FwdingLog ForwardingLog // LocalChannelClose kicks-off the workflow to execute a cooperative or // forced unilateral closure of the channel initiated by a local // subsystem. LocalChannelClose func(pubKey []byte, request *ChanClose) // DB is the channeldb instance that will be used to back the switch's // persistent circuit map. DB *channeldb.DB // SwitchPackager provides access to the forwarding packages of all // active channels. This gives the switch the ability to read arbitrary // forwarding packages, and ack settles and fails contained within them. SwitchPackager channeldb.FwdOperator // ExtractErrorEncrypter is an interface allowing switch to reextract // error encrypters stored in the circuit map on restarts, since they // are not stored directly within the database. ExtractErrorEncrypter hop.ErrorEncrypterExtracter // FetchLastChannelUpdate retrieves the latest routing policy for a // target channel. This channel will typically be the outgoing channel // specified when we receive an incoming HTLC. This will be used to // provide payment senders our latest policy when sending encrypted // error messages. FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) // Notifier is an instance of a chain notifier that we'll use to signal // the switch when a new block has arrived. Notifier chainntnfs.ChainNotifier // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc // events through. HtlcNotifier htlcNotifier // FwdEventTicker is a signal that instructs the htlcswitch to flush any // pending forwarding events. FwdEventTicker ticker.Ticker // LogEventTicker is a signal instructing the htlcswitch to log // aggregate stats about it's forwarding during the last interval. LogEventTicker ticker.Ticker // AckEventTicker is a signal instructing the htlcswitch to ack any settle // fails in forwarding packages. AckEventTicker ticker.Ticker // AllowCircularRoute is true if the user has configured their node to // allow forwards that arrive and depart our node over the same channel. AllowCircularRoute bool // RejectHTLC is a flag that instructs the htlcswitch to reject any // HTLCs that are not from the source hop. RejectHTLC bool // Clock is a time source for the switch. Clock clock.Clock // HTLCExpiry is the interval after which Adds will be cancelled if they // have not been yet been delivered to a link. The computed deadline // will expiry this long after the Adds are added to a mailbox via // AddPacket. HTLCExpiry time.Duration } // Switch is the central messaging bus for all incoming/outgoing HTLCs. // Connected peers with active channels are treated as named interfaces which // refer to active channels as links. A link is the switch's message // communication point with the goroutine that manages an active channel. New // links are registered each time a channel is created, and unregistered once // the channel is closed. The switch manages the hand-off process for multi-hop // HTLCs, forwarding HTLCs initiated from within the daemon, and finally // notifies users local-systems concerning their outstanding payment requests. type Switch struct { started int32 // To be used atomically. shutdown int32 // To be used atomically. // bestHeight is the best known height of the main chain. The links will // be used this information to govern decisions based on HTLC timeouts. // This will be retrieved by the registered links atomically. bestHeight uint32 wg sync.WaitGroup quit chan struct{} // cfg is a copy of the configuration struct that the htlc switch // service was initialized with. cfg *Config // networkResults stores the results of payments initiated by the user. // results. The store is used to later look up the payments and notify // the user of the result when they are complete. Each payment attempt // should be given a unique integer ID when it is created, otherwise // results might be overwritten. networkResults *networkResultStore // circuits is storage for payment circuits which are used to // forward the settle/fail htlc updates back to the add htlc initiator. circuits CircuitMap // mailOrchestrator manages the lifecycle of mailboxes used throughout // the switch, and facilitates delayed delivery of packets to links that // later come online. mailOrchestrator *mailOrchestrator // indexMtx is a read/write mutex that protects the set of indexes // below. indexMtx sync.RWMutex // pendingLinkIndex holds links that have not had their final, live // short_chan_id assigned. These links can be transitioned into the // primary linkIndex by using UpdateShortChanID to load their live id. pendingLinkIndex map[lnwire.ChannelID]ChannelLink // links is a map of channel id and channel link which manages // this channel. linkIndex map[lnwire.ChannelID]ChannelLink // forwardingIndex is an index which is consulted by the switch when it // needs to locate the next hop to forward an incoming/outgoing HTLC // update to/from. // // TODO(roasbeef): eventually add a NetworkHop mapping before the // ChannelLink forwardingIndex map[lnwire.ShortChannelID]ChannelLink // interfaceIndex maps the compressed public key of a peer to all the // channels that the switch maintains with that peer. interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink // htlcPlex is the channel which all connected links use to coordinate // the setup/teardown of Sphinx (onion routing) payment circuits. // Active links forward any add/settle messages over this channel each // state transition, sending new adds/settles which are fully locked // in. htlcPlex chan *plexPacket // chanCloseRequests is used to transfer the channel close request to // the channel close handler. chanCloseRequests chan *ChanClose // resolutionMsgs is the channel that all external contract resolution // messages will be sent over. resolutionMsgs chan *resolutionMsg // pendingFwdingEvents is the set of forwarding events which have been // collected during the current interval, but hasn't yet been written // to the forwarding log. fwdEventMtx sync.Mutex pendingFwdingEvents []channeldb.ForwardingEvent // blockEpochStream is an active block epoch event stream backed by an // active ChainNotifier instance. This will be used to retrieve the // lastest height of the chain. blockEpochStream *chainntnfs.BlockEpochEvent // pendingSettleFails is the set of settle/fail entries that we need to // ack in the forwarding package of the outgoing link. This was added to // make pipelining settles more efficient. pendingSettleFails []channeldb.SettleFailRef } // New creates the new instance of htlc switch. func New(cfg Config, currentHeight uint32) (*Switch, error) { circuitMap, err := NewCircuitMap(&CircuitMapConfig{ DB: cfg.DB, ExtractErrorEncrypter: cfg.ExtractErrorEncrypter, }) if err != nil { return nil, err } s := &Switch{ bestHeight: currentHeight, cfg: &cfg, circuits: circuitMap, linkIndex: make(map[lnwire.ChannelID]ChannelLink), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), networkResults: newNetworkResultStore(cfg.DB), htlcPlex: make(chan *plexPacket), chanCloseRequests: make(chan *ChanClose), resolutionMsgs: make(chan *resolutionMsg), quit: make(chan struct{}), } s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{ fetchUpdate: s.cfg.FetchLastChannelUpdate, forwardPackets: s.ForwardPackets, clock: s.cfg.Clock, expiry: s.cfg.HTLCExpiry, }) return s, nil } // resolutionMsg is a struct that wraps an existing ResolutionMsg with a done // channel. We'll use this channel to synchronize delivery of the message with // the caller. type resolutionMsg struct { contractcourt.ResolutionMsg doneChan chan struct{} } // ProcessContractResolution is called by active contract resolvers once a // contract they are watching over has been fully resolved. The message carries // an external signal that *would* have been sent if the outgoing channel // didn't need to go to the chain in order to fulfill a contract. We'll process // this message just as if it came from an active outgoing channel. func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error { done := make(chan struct{}) select { case s.resolutionMsgs <- &resolutionMsg{ ResolutionMsg: msg, doneChan: done, }: case <-s.quit: return ErrSwitchExiting } select { case <-done: case <-s.quit: return ErrSwitchExiting } return nil } // GetPaymentResult returns the the result of the payment attempt with the // given paymentID. The method returns a channel where the payment result will // be sent when available, or an error is encountered during forwarding. When a // result is received on the channel, the HTLC is guaranteed to no longer be in // flight. The switch shutting down is signaled by closing the channel. If the // paymentID is unknown, ErrPaymentIDNotFound will be returned. func (s *Switch) GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash, deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) { var ( nChan <-chan *networkResult err error outKey = CircuitKey{ ChanID: hop.Source, HtlcID: paymentID, } ) // If the payment is not found in the circuit map, check whether a // result is already available. // Assumption: no one will add this payment ID other than the caller. if s.circuits.LookupCircuit(outKey) == nil { res, err := s.networkResults.getResult(paymentID) if err != nil { return nil, err } c := make(chan *networkResult, 1) c <- res nChan = c } else { // The payment was committed to the circuits, subscribe for a // result. nChan, err = s.networkResults.subscribeResult(paymentID) if err != nil { return nil, err } } resultChan := make(chan *PaymentResult, 1) // Since the payment was known, we can start a goroutine that can // extract the result when it is available, and pass it on to the // caller. s.wg.Add(1) go func() { defer s.wg.Done() var n *networkResult select { case n = <-nChan: case <-s.quit: // We close the result channel to signal a shutdown. We // don't send any result in this case since the HTLC is // still in flight. close(resultChan) return } // Extract the result and pass it to the result channel. result, err := s.extractResult( deobfuscator, n, paymentID, paymentHash, ) if err != nil { e := fmt.Errorf("unable to extract result: %v", err) log.Error(e) resultChan <- &PaymentResult{ Error: e, } return } resultChan <- result }() return resultChan, nil } // CleanStore calls the underlying result store, telling it is safe to delete // all entries except the ones in the keepPids map. This should be called // preiodically to let the switch clean up payment results that we have // handled. func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error { return s.networkResults.cleanStore(keepPids) } // SendHTLC is used by other subsystems which aren't belong to htlc switch // package in order to send the htlc update. The paymentID used MUST be unique // for this HTLC, and MUST be used only once, otherwise the switch might reject // it. func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64, htlc *lnwire.UpdateAddHTLC) error { // Generate and send new update packet, if error will be received on // this stage it means that packet haven't left boundaries of our // system and something wrong happened. packet := &htlcPacket{ incomingChanID: hop.Source, incomingHTLCID: paymentID, outgoingChanID: firstHop, htlc: htlc, } circuit := newPaymentCircuit(&htlc.PaymentHash, packet) actions, err := s.circuits.CommitCircuits(circuit) if err != nil { log.Errorf("unable to commit circuit in switch: %v", err) return err } // Drop duplicate packet if it has already been seen. switch { case len(actions.Drops) == 1: return ErrDuplicateAdd case len(actions.Fails) == 1: return ErrLocalAddFailed } // Send packet to link. packet.circuit = circuit // User has created the htlc update therefore we should find the // appropriate channel link and send the payment over this link. link, linkErr := s.getLocalLink(packet, htlc) if linkErr != nil { // Notify the htlc notifier of a link failure on our // outgoing link. Incoming timelock/amount values are // not set because they are not present for local sends. s.cfg.HtlcNotifier.NotifyLinkFailEvent( newHtlcKey(packet), HtlcInfo{ OutgoingTimeLock: htlc.Expiry, OutgoingAmt: htlc.Amount, }, HtlcEventTypeSend, linkErr, false, ) return linkErr } return link.HandleLocalAddPacket(packet) } // UpdateForwardingPolicies sends a message to the switch to update the // forwarding policies for the set of target channels, keyed in chanPolicies. // // NOTE: This function is synchronous and will block until either the // forwarding policies for all links have been updated, or the switch shuts // down. func (s *Switch) UpdateForwardingPolicies( chanPolicies map[wire.OutPoint]ForwardingPolicy) { log.Tracef("Updating link policies: %v", newLogClosure(func() string { return spew.Sdump(chanPolicies) })) s.indexMtx.RLock() // Update each link in chanPolicies. for targetLink, policy := range chanPolicies { cid := lnwire.NewChanIDFromOutPoint(&targetLink) link, ok := s.linkIndex[cid] if !ok { log.Debugf("Unable to find ChannelPoint(%v) to update "+ "link policy", targetLink) continue } link.UpdateForwardingPolicy(policy) } s.indexMtx.RUnlock() } // IsForwardedHTLC checks for a given channel and htlc index if it is related // to an opened circuit that represents a forwarded payment. func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID, htlcIndex uint64) bool { circuit := s.circuits.LookupOpenCircuit(channeldb.CircuitKey{ ChanID: chanID, HtlcID: htlcIndex, }) return circuit != nil && circuit.Incoming.ChanID != hop.Source } // ForwardPackets adds a list of packets to the switch for processing. Fails // and settles are added on a first past, simultaneously constructing circuits // for any adds. After persisting the circuits, another pass of the adds is // given to forward them through the router. The sending link's quit channel is // used to prevent deadlocks when the switch stops a link in the midst of // forwarding. func (s *Switch) ForwardPackets(linkQuit chan struct{}, packets ...*htlcPacket) error { var ( // fwdChan is a buffered channel used to receive err msgs from // the htlcPlex when forwarding this batch. fwdChan = make(chan error, len(packets)) // numSent keeps a running count of how many packets are // forwarded to the switch, which determines how many responses // we will wait for on the fwdChan.. numSent int ) // No packets, nothing to do. if len(packets) == 0 { return nil } // Setup a barrier to prevent the background tasks from processing // responses until this function returns to the user. var wg sync.WaitGroup wg.Add(1) defer wg.Done() // Before spawning the following goroutine to proxy our error responses, // check to see if we have already been issued a shutdown request. If // so, we exit early to avoid incrementing the switch's waitgroup while // it is already in the process of shutting down. select { case <-linkQuit: return nil case <-s.quit: return nil default: // Spawn a goroutine to log the errors returned from failed packets. s.wg.Add(1) go s.logFwdErrs(&numSent, &wg, fwdChan) } // Make a first pass over the packets, forwarding any settles or fails. // As adds are found, we create a circuit and append it to our set of // circuits to be written to disk. var circuits []*PaymentCircuit var addBatch []*htlcPacket for _, packet := range packets { switch htlc := packet.htlc.(type) { case *lnwire.UpdateAddHTLC: circuit := newPaymentCircuit(&htlc.PaymentHash, packet) packet.circuit = circuit circuits = append(circuits, circuit) addBatch = append(addBatch, packet) default: err := s.routeAsync(packet, fwdChan, linkQuit) if err != nil { return fmt.Errorf("failed to forward packet %v", err) } numSent++ } } // If this batch did not contain any circuits to commit, we can return // early. if len(circuits) == 0 { return nil } // Write any circuits that we found to disk. actions, err := s.circuits.CommitCircuits(circuits...) if err != nil { log.Errorf("unable to commit circuits in switch: %v", err) } // Split the htlc packets by comparing an in-order seek to the head of // the added, dropped, or failed circuits. // // NOTE: This assumes each list is guaranteed to be a subsequence of the // circuits, and that the union of the sets results in the original set // of circuits. var addedPackets, failedPackets []*htlcPacket for _, packet := range addBatch { switch { case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]: addedPackets = append(addedPackets, packet) actions.Adds = actions.Adds[1:] case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]: actions.Drops = actions.Drops[1:] case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]: failedPackets = append(failedPackets, packet) actions.Fails = actions.Fails[1:] } } // Now, forward any packets for circuits that were successfully added to // the switch's circuit map. for _, packet := range addedPackets { err := s.routeAsync(packet, fwdChan, linkQuit) if err != nil { return fmt.Errorf("failed to forward packet %v", err) } numSent++ } // Lastly, for any packets that failed, this implies that they were // left in a half added state, which can happen when recovering from // failures. if len(failedPackets) > 0 { var failure lnwire.FailureMessage update, err := s.cfg.FetchLastChannelUpdate( failedPackets[0].incomingChanID, ) if err != nil { failure = &lnwire.FailTemporaryNodeFailure{} } else { failure = lnwire.NewTemporaryChannelFailure(update) } linkError := NewDetailedLinkError( failure, OutgoingFailureIncompleteForward, ) for _, packet := range failedPackets { // We don't handle the error here since this method // always returns an error. _ = s.failAddPacket(packet, linkError) } } return nil } // logFwdErrs logs any errors received on `fwdChan` func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) { defer s.wg.Done() // Wait here until the outer function has finished persisting // and routing the packets. This guarantees we don't read from num until // the value is accurate. wg.Wait() numSent := *num for i := 0; i < numSent; i++ { select { case err := <-fwdChan: if err != nil { log.Errorf("Unhandled error while reforwarding htlc "+ "settle/fail over htlcswitch: %v", err) } case <-s.quit: log.Errorf("unable to forward htlc packet " + "htlc switch was stopped") return } } } // routeAsync sends a packet through the htlc switch, using the provided err // chan to propagate errors back to the caller. The link's quit channel is // provided so that the send can be canceled if either the link or the switch // receive a shutdown requuest. This method does not wait for a response from // the htlcForwarder before returning. func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error, linkQuit chan struct{}) error { command := &plexPacket{ pkt: packet, err: errChan, } select { case s.htlcPlex <- command: return nil case <-linkQuit: return ErrLinkShuttingDown case <-s.quit: return errors.New("htlc switch was stopped") } } // getLocalLink handles the addition of a htlc for a send that originates from // our node. It returns the link that the htlc should be forwarded outwards on, // and a link error if the htlc cannot be forwarded. func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) ( ChannelLink, *LinkError) { // Try to find links by node destination. s.indexMtx.RLock() link, err := s.getLinkByShortID(pkt.outgoingChanID) s.indexMtx.RUnlock() if err != nil { log.Errorf("Link %v not found", pkt.outgoingChanID) return nil, NewLinkError(&lnwire.FailUnknownNextPeer{}) } if !link.EligibleToForward() { log.Errorf("Link %v is not available to forward", pkt.outgoingChanID) // The update does not need to be populated as the error // will be returned back to the router. return nil, NewDetailedLinkError( lnwire.NewTemporaryChannelFailure(nil), OutgoingFailureLinkNotEligible, ) } // Ensure that the htlc satisfies the outgoing channel policy. currentHeight := atomic.LoadUint32(&s.bestHeight) htlcErr := link.CheckHtlcTransit( htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight, ) if htlcErr != nil { log.Errorf("Link %v policy for local forward not "+ "satisfied", pkt.outgoingChanID) return nil, htlcErr } return link, nil } // handleLocalResponse processes a Settle or Fail responding to a // locally-initiated payment. This is handled asynchronously to avoid blocking // the main event loop within the switch, as these operations can require // multiple db transactions. The guarantees of the circuit map are stringent // enough such that we are able to tolerate reordering of these operations // without side effects. The primary operations handled are: // 1. Save the payment result to the pending payment store. // 2. Notify subscribers about the payment result. // 3. Ack settle/fail references, to avoid resending this response internally // 4. Teardown the closing circuit in the circuit map // // NOTE: This method MUST be spawned as a goroutine. func (s *Switch) handleLocalResponse(pkt *htlcPacket) { defer s.wg.Done() paymentID := pkt.incomingHTLCID // The error reason will be unencypted in case this a local // failure or a converted error. unencrypted := pkt.localFailure || pkt.convertedError n := &networkResult{ msg: pkt.htlc, unencrypted: unencrypted, isResolution: pkt.isResolution, } // Store the result to the db. This will also notify subscribers about // the result. if err := s.networkResults.storeResult(paymentID, n); err != nil { log.Errorf("Unable to complete payment for pid=%v: %v", paymentID, err) return } // First, we'll clean up any fwdpkg references, circuit entries, and // mark in our db that the payment for this payment hash has either // succeeded or failed. // // If this response is contained in a forwarding package, we'll start by // acking the settle/fail so that we don't continue to retransmit the // HTLC internally. if pkt.destRef != nil { if err := s.ackSettleFail(*pkt.destRef); err != nil { log.Warnf("Unable to ack settle/fail reference: %s: %v", *pkt.destRef, err) return } } // Next, we'll remove the circuit since we are about to complete an // fulfill/fail of this HTLC. Since we've already removed the // settle/fail fwdpkg reference, the response from the peer cannot be // replayed internally if this step fails. If this happens, this logic // will be executed when a provided resolution message comes through. // This can only happen if the circuit is still open, which is why this // ordering is chosen. if err := s.teardownCircuit(pkt); err != nil { log.Warnf("Unable to teardown circuit %s: %v", pkt.inKey(), err) return } // Finally, notify on the htlc failure or success that has been handled. key := newHtlcKey(pkt) eventType := getEventType(pkt) switch pkt.htlc.(type) { case *lnwire.UpdateFulfillHTLC: s.cfg.HtlcNotifier.NotifySettleEvent(key, eventType) case *lnwire.UpdateFailHTLC: s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType) } } // extractResult uses the given deobfuscator to extract the payment result from // the given network message. func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult, paymentID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) { switch htlc := n.msg.(type) { // We've received a settle update which means we can finalize the user // payment and return successful response. case *lnwire.UpdateFulfillHTLC: return &PaymentResult{ Preimage: htlc.PaymentPreimage, }, nil // We've received a fail update which means we can finalize the // user payment and return fail response. case *lnwire.UpdateFailHTLC: paymentErr := s.parseFailedPayment( deobfuscator, paymentID, paymentHash, n.unencrypted, n.isResolution, htlc, ) return &PaymentResult{ Error: paymentErr, }, nil default: return nil, fmt.Errorf("received unknown response type: %T", htlc) } } // parseFailedPayment determines the appropriate failure message to return to // a user initiated payment. The three cases handled are: // 1) An unencrypted failure, which should already plaintext. // 2) A resolution from the chain arbitrator, which possibly has no failure // reason attached. // 3) A failure from the remote party, which will need to be decrypted using // the payment deobfuscator. func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter, paymentID uint64, paymentHash lntypes.Hash, unencrypted, isResolution bool, htlc *lnwire.UpdateFailHTLC) error { switch { // The payment never cleared the link, so we don't need to // decrypt the error, simply decode it them report back to the // user. case unencrypted: r := bytes.NewReader(htlc.Reason) failureMsg, err := lnwire.DecodeFailure(r, 0) if err != nil { // If we could not decode the failure reason, return a link // error indicating that we failed to decode the onion. linkError := NewDetailedLinkError( // As this didn't even clear the link, we don't // need to apply an update here since it goes // directly to the router. lnwire.NewTemporaryChannelFailure(nil), OutgoingFailureDecodeError, ) log.Errorf("%v: (hash=%v, pid=%d): %v", linkError.FailureDetail.FailureString(), paymentHash, paymentID, err) return linkError } // If we successfully decoded the failure reason, return it. return NewLinkError(failureMsg) // A payment had to be timed out on chain before it got past // the first hop. In this case, we'll report a permanent // channel failure as this means us, or the remote party had to // go on chain. case isResolution && htlc.Reason == nil: linkError := NewDetailedLinkError( &lnwire.FailPermanentChannelFailure{}, OutgoingFailureOnChainTimeout, ) log.Info("%v: hash=%v, pid=%d", linkError.FailureDetail.FailureString(), paymentHash, paymentID) return linkError // A regular multi-hop payment error that we'll need to // decrypt. default: // We'll attempt to fully decrypt the onion encrypted // error. If we're unable to then we'll bail early. failure, err := deobfuscator.DecryptError(htlc.Reason) if err != nil { log.Errorf("unable to de-obfuscate onion failure "+ "(hash=%v, pid=%d): %v", paymentHash, paymentID, err) return ErrUnreadableFailureMessage } return failure } } // handlePacketForward is used in cases when we need forward the htlc update // from one channel link to another and be able to propagate the settle/fail // updates back. This behaviour is achieved by creation of payment circuits. func (s *Switch) handlePacketForward(packet *htlcPacket) error { switch htlc := packet.htlc.(type) { // Channel link forwarded us a new htlc, therefore we initiate the // payment circuit within our internal state so we can properly forward // the ultimate settle message back latter. case *lnwire.UpdateAddHTLC: // Check if the node is set to reject all onward HTLCs and also make // sure that HTLC is not from the source node. if s.cfg.RejectHTLC { failure := NewDetailedLinkError( &lnwire.FailChannelDisabled{}, OutgoingFailureForwardsDisabled, ) return s.failAddPacket(packet, failure) } // Before we attempt to find a non-strict forwarding path for // this htlc, check whether the htlc is being routed over the // same incoming and outgoing channel. If our node does not // allow forwards of this nature, we fail the htlc early. This // check is in place to disallow inefficiently routed htlcs from // locking up our balance. linkErr := checkCircularForward( packet.incomingChanID, packet.outgoingChanID, s.cfg.AllowCircularRoute, htlc.PaymentHash, ) if linkErr != nil { return s.failAddPacket(packet, linkErr) } s.indexMtx.RLock() targetLink, err := s.getLinkByShortID(packet.outgoingChanID) if err != nil { s.indexMtx.RUnlock() log.Debugf("unable to find link with "+ "destination %v", packet.outgoingChanID) // If packet was forwarded from another channel link // than we should notify this link that some error // occurred. linkError := NewLinkError( &lnwire.FailUnknownNextPeer{}, ) return s.failAddPacket(packet, linkError) } targetPeerKey := targetLink.Peer().PubKey() interfaceLinks, _ := s.getLinks(targetPeerKey) s.indexMtx.RUnlock() // We'll keep track of any HTLC failures during the link // selection process. This way we can return the error for // precise link that the sender selected, while optimistically // trying all links to utilize our available bandwidth. linkErrs := make(map[lnwire.ShortChannelID]*LinkError) // Find all destination channel links with appropriate // bandwidth. var destinations []ChannelLink for _, link := range interfaceLinks { var failure *LinkError // We'll skip any links that aren't yet eligible for // forwarding. if !link.EligibleToForward() { failure = NewDetailedLinkError( &lnwire.FailUnknownNextPeer{}, OutgoingFailureLinkNotEligible, ) } else { // We'll ensure that the HTLC satisfies the // current forwarding conditions of this target // link. currentHeight := atomic.LoadUint32(&s.bestHeight) failure = link.CheckHtlcForward( htlc.PaymentHash, packet.incomingAmount, packet.amount, packet.incomingTimeout, packet.outgoingTimeout, currentHeight, ) } // If this link can forward the htlc, add it to the set // of destinations. if failure == nil { destinations = append(destinations, link) continue } linkErrs[link.ShortChanID()] = failure } // If we had a forwarding failure due to the HTLC not // satisfying the current policy, then we'll send back an // error, but ensure we send back the error sourced at the // *target* link. if len(destinations) == 0 { // At this point, some or all of the links rejected the // HTLC so we couldn't forward it. So we'll try to look // up the error that came from the source. linkErr, ok := linkErrs[packet.outgoingChanID] if !ok { // If we can't find the error of the source, // then we'll return an unknown next peer, // though this should never happen. linkErr = NewLinkError( &lnwire.FailUnknownNextPeer{}, ) log.Warnf("unable to find err source for "+ "outgoing_link=%v, errors=%v", packet.outgoingChanID, newLogClosure(func() string { return spew.Sdump(linkErrs) })) } log.Tracef("incoming HTLC(%x) violated "+ "target outgoing link (id=%v) policy: %v", htlc.PaymentHash[:], packet.outgoingChanID, linkErr) return s.failAddPacket(packet, linkErr) } // Choose a random link out of the set of links that can forward // this htlc. The reason for randomization is to evenly // distribute the htlc load without making assumptions about // what the best channel is. destination := destinations[rand.Intn(len(destinations))] // Send the packet to the destination channel link which // manages the channel. packet.outgoingChanID = destination.ShortChanID() return destination.HandleSwitchPacket(packet) case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC: // If the source of this packet has not been set, use the // circuit map to lookup the origin. circuit, err := s.closeCircuit(packet) if err != nil { return err } // closeCircuit returns a nil circuit when a settle packet returns an // ErrUnknownCircuit error upon the inner call to CloseCircuit. if circuit == nil { return nil } fail, isFail := htlc.(*lnwire.UpdateFailHTLC) if isFail && !packet.hasSource { switch { // No message to encrypt, locally sourced payment. case circuit.ErrorEncrypter == nil: // If this is a resolution message, then we'll need to // encrypt it as it's actually internally sourced. case packet.isResolution: var err error // TODO(roasbeef): don't need to pass actually? failure := &lnwire.FailPermanentChannelFailure{} fail.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop( failure, ) if err != nil { err = fmt.Errorf("unable to obfuscate "+ "error: %v", err) log.Error(err) } // Alternatively, if the remote party send us an // UpdateFailMalformedHTLC, then we'll need to convert // this into a proper well formatted onion error as // there's no HMAC currently. case packet.convertedError: log.Infof("Converting malformed HTLC error "+ "for circuit for Circuit(%x: "+ "(%s, %d) <-> (%s, %d))", packet.circuit.PaymentHash, packet.incomingChanID, packet.incomingHTLCID, packet.outgoingChanID, packet.outgoingHTLCID) fail.Reason = circuit.ErrorEncrypter.EncryptMalformedError( fail.Reason, ) default: // Otherwise, it's a forwarded error, so we'll perform a // wrapper encryption as normal. fail.Reason = circuit.ErrorEncrypter.IntermediateEncrypt( fail.Reason, ) } } else if !isFail && circuit.Outgoing != nil { // If this is an HTLC settle, and it wasn't from a // locally initiated HTLC, then we'll log a forwarding // event so we can flush it to disk later. // // TODO(roasbeef): only do this once link actually // fully settles? localHTLC := packet.incomingChanID == hop.Source if !localHTLC { log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+ "from IncomingChanID(%v) to OutgoingChanID(%v)", circuit.PaymentHash[:], circuit.OutgoingAmount, circuit.IncomingAmount-circuit.OutgoingAmount, circuit.Incoming.ChanID, circuit.Outgoing.ChanID) s.fwdEventMtx.Lock() s.pendingFwdingEvents = append( s.pendingFwdingEvents, channeldb.ForwardingEvent{ Timestamp: time.Now(), IncomingChanID: circuit.Incoming.ChanID, OutgoingChanID: circuit.Outgoing.ChanID, AmtIn: circuit.IncomingAmount, AmtOut: circuit.OutgoingAmount, }, ) s.fwdEventMtx.Unlock() } } // A blank IncomingChanID in a circuit indicates that it is a pending // user-initiated payment. if packet.incomingChanID == hop.Source { s.wg.Add(1) go s.handleLocalResponse(packet) return nil } // Check to see that the source link is online before removing // the circuit. return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) default: return errors.New("wrong update type") } } // checkCircularForward checks whether a forward is circular (arrives and // departs on the same link) and returns a link error if the switch is // configured to disallow this behaviour. func checkCircularForward(incoming, outgoing lnwire.ShortChannelID, allowCircular bool, paymentHash lntypes.Hash) *LinkError { // If the route is not circular we do not need to perform any further // checks. if incoming != outgoing { return nil } // If the incoming and outgoing link are equal, the htlc is part of a // circular route which may be used to lock up our liquidity. If the // switch is configured to allow circular routes, log that we are // allowing the route then return nil. if allowCircular { log.Debugf("allowing circular route over link: %v "+ "(payment hash: %x)", incoming, paymentHash) return nil } // If our node disallows circular routes, return a temporary channel // failure. There is nothing wrong with the policy used by the remote // node, so we do not include a channel update. return NewDetailedLinkError( lnwire.NewTemporaryChannelFailure(nil), OutgoingFailureCircularRoute, ) } // failAddPacket encrypts a fail packet back to an add packet's source. // The ciphertext will be derived from the failure message proivded by context. // This method returns the failErr if all other steps complete successfully. func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error { // Encrypt the failure so that the sender will be able to read the error // message. Since we failed this packet, we use EncryptFirstHop to // obfuscate the failure for their eyes only. reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage()) if err != nil { err := fmt.Errorf("unable to obfuscate "+ "error: %v", err) log.Error(err) return err } log.Error(failure.Error()) // Create a failure packet for this htlc. The the full set of // information about the htlc failure is included so that they can // be included in link failure notifications. failPkt := &htlcPacket{ sourceRef: packet.sourceRef, incomingChanID: packet.incomingChanID, incomingHTLCID: packet.incomingHTLCID, outgoingChanID: packet.outgoingChanID, outgoingHTLCID: packet.outgoingHTLCID, incomingAmount: packet.incomingAmount, amount: packet.amount, incomingTimeout: packet.incomingTimeout, outgoingTimeout: packet.outgoingTimeout, circuit: packet.circuit, linkFailure: failure, htlc: &lnwire.UpdateFailHTLC{ Reason: reason, }, } // Route a fail packet back to the source link. err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt) if err != nil { err = fmt.Errorf("source chanid=%v unable to "+ "handle switch packet: %v", packet.incomingChanID, err) log.Error(err) return err } return failure } // closeCircuit accepts a settle or fail htlc and the associated htlc packet and // attempts to determine the source that forwarded this htlc. This method will // set the incoming chan and htlc ID of the given packet if the source was // found, and will properly [re]encrypt any failure messages. func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { // If the packet has its source, that means it was failed locally by // the outgoing link. We fail it here to make sure only one response // makes it through the switch. if pkt.hasSource { circuit, err := s.circuits.FailCircuit(pkt.inKey()) switch err { // Circuit successfully closed. case nil: return circuit, nil // Circuit was previously closed, but has not been deleted. // We'll just drop this response until the circuit has been // fully removed. case ErrCircuitClosing: return nil, err // Failed to close circuit because it does not exist. This is // likely because the circuit was already successfully closed. // Since this packet failed locally, there is no forwarding // package entry to acknowledge. case ErrUnknownCircuit: return nil, err // Unexpected error. default: return nil, err } } // Otherwise, this is packet was received from the remote party. Use // circuit map to find the incoming link to receive the settle/fail. circuit, err := s.circuits.CloseCircuit(pkt.outKey()) switch err { // Open circuit successfully closed. case nil: pkt.incomingChanID = circuit.Incoming.ChanID pkt.incomingHTLCID = circuit.Incoming.HtlcID pkt.circuit = circuit pkt.sourceRef = &circuit.AddRef pktType := "SETTLE" if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok { pktType = "FAIL" } log.Debugf("Closed completed %s circuit for %x: "+ "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash, pkt.incomingChanID, pkt.incomingHTLCID, pkt.outgoingChanID, pkt.outgoingHTLCID) return circuit, nil // Circuit was previously closed, but has not been deleted. We'll just // drop this response until the circuit has been removed. case ErrCircuitClosing: return nil, err // Failed to close circuit because it does not exist. This is likely // because the circuit was already successfully closed. case ErrUnknownCircuit: if pkt.destRef != nil { // Add this SettleFailRef to the set of pending settle/fail entries // awaiting acknowledgement. s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef) } // If this is a settle, we will not log an error message as settles // are expected to hit the ErrUnknownCircuit case. The only way fails // can hit this case if the link restarts after having just sent a fail // to the switch. _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC) if !isSettle { err := fmt.Errorf("unable to find target channel "+ "for HTLC fail: channel ID = %s, "+ "HTLC ID = %d", pkt.outgoingChanID, pkt.outgoingHTLCID) log.Error(err) return nil, err } return nil, nil // Unexpected error. default: return nil, err } } // ackSettleFail is used by the switch to ACK any settle/fail entries in the // forwarding package of the outgoing link for a payment circuit. We do this if // we're the originator of the payment, so the link stops attempting to // re-broadcast. func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error { return kvdb.Batch(s.cfg.DB.Backend, func(tx kvdb.RwTx) error { return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...) }) } // teardownCircuit removes a pending or open circuit from the switch's circuit // map and prints useful logging statements regarding the outcome. func (s *Switch) teardownCircuit(pkt *htlcPacket) error { var pktType string switch htlc := pkt.htlc.(type) { case *lnwire.UpdateFulfillHTLC: pktType = "SETTLE" case *lnwire.UpdateFailHTLC: pktType = "FAIL" default: err := fmt.Errorf("cannot tear down packet of type: %T", htlc) log.Errorf(err.Error()) return err } switch { case pkt.circuit.HasKeystone(): log.Debugf("Tearing down open circuit with %s pkt, removing circuit=%v "+ "with keystone=%v", pktType, pkt.inKey(), pkt.outKey()) err := s.circuits.DeleteCircuits(pkt.inKey()) if err != nil { log.Warnf("Failed to tear down open circuit (%s, %d) <-> (%s, %d) "+ "with payment_hash-%v using %s pkt", pkt.incomingChanID, pkt.incomingHTLCID, pkt.outgoingChanID, pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType) return err } log.Debugf("Closed completed %s circuit for %x: "+ "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash, pkt.incomingChanID, pkt.incomingHTLCID, pkt.outgoingChanID, pkt.outgoingHTLCID) default: log.Debugf("Tearing down incomplete circuit with %s for inkey=%v", pktType, pkt.inKey()) err := s.circuits.DeleteCircuits(pkt.inKey()) if err != nil { log.Warnf("Failed to tear down pending %s circuit for %x: "+ "(%s, %d)", pktType, pkt.circuit.PaymentHash, pkt.incomingChanID, pkt.incomingHTLCID) return err } log.Debugf("Removed pending onion circuit for %x: "+ "(%s, %d)", pkt.circuit.PaymentHash, pkt.incomingChanID, pkt.incomingHTLCID) } return nil } // CloseLink creates and sends the close channel command to the target link // directing the specified closure type. If the closure type is CloseRegular, // targetFeePerKw parameter should be the ideal fee-per-kw that will be used as // a starting point for close negotiation. The deliveryScript parameter is an // optional parameter which sets a user specified script to close out to. func (s *Switch) CloseLink(chanPoint *wire.OutPoint, closeType ChannelCloseType, targetFeePerKw chainfee.SatPerKWeight, deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) { // TODO(roasbeef) abstract out the close updates. updateChan := make(chan interface{}, 2) errChan := make(chan error, 1) command := &ChanClose{ CloseType: closeType, ChanPoint: chanPoint, Updates: updateChan, TargetFeePerKw: targetFeePerKw, DeliveryScript: deliveryScript, Err: errChan, } select { case s.chanCloseRequests <- command: return updateChan, errChan case <-s.quit: errChan <- ErrSwitchExiting close(updateChan) return updateChan, errChan } } // htlcForwarder is responsible for optimally forwarding (and possibly // fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their // links. The duties of the forwarder are similar to that of a network switch, // in that it facilitates multi-hop payments by acting as a central messaging // bus. The switch communicates will active links to create, manage, and tear // down active onion routed payments. Each active channel is modeled as // networked device with metadata such as the available payment bandwidth, and // total link capacity. // // NOTE: This MUST be run as a goroutine. func (s *Switch) htlcForwarder() { defer s.wg.Done() defer func() { s.blockEpochStream.Cancel() // Remove all links once we've been signalled for shutdown. var linksToStop []ChannelLink s.indexMtx.Lock() for _, link := range s.linkIndex { activeLink := s.removeLink(link.ChanID()) if activeLink == nil { log.Errorf("unable to remove ChannelLink(%v) "+ "on stop", link.ChanID()) continue } linksToStop = append(linksToStop, activeLink) } for _, link := range s.pendingLinkIndex { pendingLink := s.removeLink(link.ChanID()) if pendingLink == nil { log.Errorf("unable to remove ChannelLink(%v) "+ "on stop", link.ChanID()) continue } linksToStop = append(linksToStop, pendingLink) } s.indexMtx.Unlock() // Now that all pending and live links have been removed from // the forwarding indexes, stop each one before shutting down. // We'll shut them down in parallel to make exiting as fast as // possible. var wg sync.WaitGroup for _, link := range linksToStop { wg.Add(1) go func(l ChannelLink) { defer wg.Done() l.Stop() }(link) } wg.Wait() // Before we exit fully, we'll attempt to flush out any // forwarding events that may still be lingering since the last // batch flush. if err := s.FlushForwardingEvents(); err != nil { log.Errorf("unable to flush forwarding events: %v", err) } }() // TODO(roasbeef): cleared vs settled distinction var ( totalNumUpdates uint64 totalSatSent btcutil.Amount totalSatRecv btcutil.Amount ) s.cfg.LogEventTicker.Resume() defer s.cfg.LogEventTicker.Stop() // Every 15 seconds, we'll flush out the forwarding events that // occurred during that period. s.cfg.FwdEventTicker.Resume() defer s.cfg.FwdEventTicker.Stop() defer s.cfg.AckEventTicker.Stop() out: for { // If the set of pending settle/fail entries is non-zero, // reinstate the ack ticker so we can batch ack them. if len(s.pendingSettleFails) > 0 { s.cfg.AckEventTicker.Resume() } select { case blockEpoch, ok := <-s.blockEpochStream.Epochs: if !ok { break out } atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height)) // A local close request has arrived, we'll forward this to the // relevant link (if it exists) so the channel can be // cooperatively closed (if possible). case req := <-s.chanCloseRequests: chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) s.indexMtx.RLock() link, ok := s.linkIndex[chanID] if !ok { s.indexMtx.RUnlock() req.Err <- fmt.Errorf("no peer for channel with "+ "chan_id=%x", chanID[:]) continue } s.indexMtx.RUnlock() peerPub := link.Peer().PubKey() log.Debugf("Requesting local channel close: peer=%v, "+ "chan_id=%x", link.Peer(), chanID[:]) go s.cfg.LocalChannelClose(peerPub[:], req) case resolutionMsg := <-s.resolutionMsgs: pkt := &htlcPacket{ outgoingChanID: resolutionMsg.SourceChan, outgoingHTLCID: resolutionMsg.HtlcIndex, isResolution: true, } // Resolution messages will either be cancelling // backwards an existing HTLC, or settling a previously // outgoing HTLC. Based on this, we'll map the message // to the proper htlcPacket. if resolutionMsg.Failure != nil { pkt.htlc = &lnwire.UpdateFailHTLC{} } else { pkt.htlc = &lnwire.UpdateFulfillHTLC{ PaymentPreimage: *resolutionMsg.PreImage, } } log.Infof("Received outside contract resolution, "+ "mapping to: %v", spew.Sdump(pkt)) // We don't check the error, as the only failure we can // encounter is due to the circuit already being // closed. This is fine, as processing this message is // meant to be idempotent. err := s.handlePacketForward(pkt) if err != nil { log.Errorf("Unable to forward resolution msg: %v", err) } // With the message processed, we'll now close out close(resolutionMsg.doneChan) // A new packet has arrived for forwarding, we'll interpret the // packet concretely, then either forward it along, or // interpret a return packet to a locally initialized one. case cmd := <-s.htlcPlex: cmd.err <- s.handlePacketForward(cmd.pkt) // When this time ticks, then it indicates that we should // collect all the forwarding events since the last internal, // and write them out to our log. case <-s.cfg.FwdEventTicker.Ticks(): s.wg.Add(1) go func() { defer s.wg.Done() if err := s.FlushForwardingEvents(); err != nil { log.Errorf("unable to flush "+ "forwarding events: %v", err) } }() // The log ticker has fired, so we'll calculate some forwarding // stats for the last 10 seconds to display within the logs to // users. case <-s.cfg.LogEventTicker.Ticks(): // First, we'll collate the current running tally of // our forwarding stats. prevSatSent := totalSatSent prevSatRecv := totalSatRecv prevNumUpdates := totalNumUpdates var ( newNumUpdates uint64 newSatSent btcutil.Amount newSatRecv btcutil.Amount ) // Next, we'll run through all the registered links and // compute their up-to-date forwarding stats. s.indexMtx.RLock() for _, link := range s.linkIndex { // TODO(roasbeef): when links first registered // stats printed. updates, sent, recv := link.Stats() newNumUpdates += updates newSatSent += sent.ToSatoshis() newSatRecv += recv.ToSatoshis() } s.indexMtx.RUnlock() var ( diffNumUpdates uint64 diffSatSent btcutil.Amount diffSatRecv btcutil.Amount ) // If this is the first time we're computing these // stats, then the diff is just the new value. We do // this in order to avoid integer underflow issues. if prevNumUpdates == 0 { diffNumUpdates = newNumUpdates diffSatSent = newSatSent diffSatRecv = newSatRecv } else { diffNumUpdates = newNumUpdates - prevNumUpdates diffSatSent = newSatSent - prevSatSent diffSatRecv = newSatRecv - prevSatRecv } // If the diff of num updates is zero, then we haven't // forwarded anything in the last 10 seconds, so we can // skip this update. if diffNumUpdates == 0 { continue } // If the diff of num updates is negative, then some // links may have been unregistered from the switch, so // we'll update our stats to only include our registered // links. if int64(diffNumUpdates) < 0 { totalNumUpdates = newNumUpdates totalSatSent = newSatSent totalSatRecv = newSatRecv continue } // Otherwise, we'll log this diff, then accumulate the // new stats into the running total. log.Debugf("Sent %d satoshis and received %d satoshis "+ "in the last 10 seconds (%f tx/sec)", diffSatSent, diffSatRecv, float64(diffNumUpdates)/10) totalNumUpdates += diffNumUpdates totalSatSent += diffSatSent totalSatRecv += diffSatRecv // The ack ticker has fired so if we have any settle/fail entries // for a forwarding package to ack, we will do so here in a batch // db call. case <-s.cfg.AckEventTicker.Ticks(): // If the current set is empty, pause the ticker. if len(s.pendingSettleFails) == 0 { s.cfg.AckEventTicker.Pause() continue } // Batch ack the settle/fail entries. if err := s.ackSettleFail(s.pendingSettleFails...); err != nil { log.Errorf("Unable to ack batch of settle/fails: %v", err) continue } log.Tracef("Acked %d settle fails: %v", len(s.pendingSettleFails), newLogClosure(func() string { return spew.Sdump(s.pendingSettleFails) })) // Reset the pendingSettleFails buffer while keeping acquired // memory. s.pendingSettleFails = s.pendingSettleFails[:0] case <-s.quit: return } } } // Start starts all helper goroutines required for the operation of the switch. func (s *Switch) Start() error { if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { log.Warn("Htlc Switch already started") return errors.New("htlc switch already started") } log.Infof("Starting HTLC Switch") blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } s.blockEpochStream = blockEpochStream s.wg.Add(1) go s.htlcForwarder() if err := s.reforwardResponses(); err != nil { s.Stop() log.Errorf("unable to reforward responses: %v", err) return err } return nil } // reforwardResponses for every known, non-pending channel, loads all associated // forwarding packages and reforwards any Settle or Fail HTLCs found. This is // used to resurrect the switch's mailboxes after a restart. func (s *Switch) reforwardResponses() error { openChannels, err := s.cfg.DB.FetchAllOpenChannels() if err != nil { return err } for _, openChannel := range openChannels { shortChanID := openChannel.ShortChanID() // Locally-initiated payments never need reforwarding. if shortChanID == hop.Source { continue } // If the channel is pending, it should have no forwarding // packages, and nothing to reforward. if openChannel.IsPending { continue } // Channels in open or waiting-close may still have responses in // their forwarding packages. We will continue to reattempt // forwarding on startup until the channel is fully-closed. // // Load this channel's forwarding packages, and deliver them to // the switch. fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID) if err != nil { log.Errorf("unable to load forwarding "+ "packages for %v: %v", shortChanID, err) return err } s.reforwardSettleFails(fwdPkgs) } return nil } // loadChannelFwdPkgs loads all forwarding packages owned by the `source` short // channel identifier. func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) { var fwdPkgs []*channeldb.FwdPkg if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error { var err error fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs( tx, source, ) return err }); err != nil { return nil, err } return fwdPkgs, nil } // reforwardSettleFails parses the Settle and Fail HTLCs from the list of // forwarding packages, and reforwards those that have not been acknowledged. // This is intended to occur on startup, in order to recover the switch's // mailboxes, and to ensure that responses can be propagated in case the // outgoing link never comes back online. // // NOTE: This should mimic the behavior processRemoteSettleFails. func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) { for _, fwdPkg := range fwdPkgs { settleFails, err := lnwallet.PayDescsFromRemoteLogUpdates( fwdPkg.Source, fwdPkg.Height, fwdPkg.SettleFails, ) if err != nil { log.Errorf("Unable to process remote log updates: %v", err) continue } switchPackets := make([]*htlcPacket, 0, len(settleFails)) for i, pd := range settleFails { // Skip any settles or fails that have already been // acknowledged by the incoming link that originated the // forwarded Add. if fwdPkg.SettleFailFilter.Contains(uint16(i)) { continue } switch pd.EntryType { // A settle for an HTLC we previously forwarded HTLC has // been received. So we'll forward the HTLC to the // switch which will handle propagating the settle to // the prior hop. case lnwallet.Settle: settlePacket := &htlcPacket{ outgoingChanID: fwdPkg.Source, outgoingHTLCID: pd.ParentIndex, destRef: pd.DestRef, htlc: &lnwire.UpdateFulfillHTLC{ PaymentPreimage: pd.RPreimage, }, } // Add the packet to the batch to be forwarded, and // notify the overflow queue that a spare spot has been // freed up within the commitment state. switchPackets = append(switchPackets, settlePacket) // A failureCode message for a previously forwarded HTLC has been // received. As a result a new slot will be freed up in our // commitment state, so we'll forward this to the switch so the // backwards undo can continue. case lnwallet.Fail: // Fetch the reason the HTLC was canceled so // we can continue to propagate it. This // failure originated from another node, so // the linkFailure field is not set on this // packet. failPacket := &htlcPacket{ outgoingChanID: fwdPkg.Source, outgoingHTLCID: pd.ParentIndex, destRef: pd.DestRef, htlc: &lnwire.UpdateFailHTLC{ Reason: lnwire.OpaqueReason(pd.FailReason), }, } // Add the packet to the batch to be forwarded, and // notify the overflow queue that a spare spot has been // freed up within the commitment state. switchPackets = append(switchPackets, failPacket) } } // Since this send isn't tied to a specific link, we pass a nil // link quit channel, meaning the send will fail only if the // switch receives a shutdown request. if err := s.ForwardPackets(nil, switchPackets...); err != nil { log.Errorf("Unhandled error while reforwarding packets "+ "settle/fail over htlcswitch: %v", err) } } } // Stop gracefully stops all active helper goroutines, then waits until they've // exited. func (s *Switch) Stop() error { if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { log.Warn("Htlc Switch already stopped") return errors.New("htlc switch already shutdown") } log.Infof("HTLC Switch shutting down") close(s.quit) s.wg.Wait() // Wait until all active goroutines have finished exiting before // stopping the mailboxes, otherwise the mailbox map could still be // accessed and modified. s.mailOrchestrator.Stop() return nil } // AddLink is used to initiate the handling of the add link command. The // request will be propagated and handled in the main goroutine. func (s *Switch) AddLink(link ChannelLink) error { s.indexMtx.Lock() defer s.indexMtx.Unlock() chanID := link.ChanID() // First, ensure that this link is not already active in the switch. _, err := s.getLink(chanID) if err == nil { return fmt.Errorf("unable to add ChannelLink(%v), already "+ "active", chanID) } // Get and attach the mailbox for this link, which buffers packets in // case there packets that we tried to deliver while this link was // offline. shortChanID := link.ShortChanID() mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID) link.AttachMailBox(mailbox) if err := link.Start(); err != nil { s.removeLink(chanID) return err } if shortChanID == hop.Source { log.Infof("Adding pending link chan_id=%v, short_chan_id=%v", chanID, shortChanID) s.pendingLinkIndex[chanID] = link } else { log.Infof("Adding live link chan_id=%v, short_chan_id=%v", chanID, shortChanID) s.addLiveLink(link) s.mailOrchestrator.BindLiveShortChanID( mailbox, chanID, shortChanID, ) } return nil } // addLiveLink adds a link to all associated forwarding index, this makes it a // candidate for forwarding HTLCs. func (s *Switch) addLiveLink(link ChannelLink) { // We'll add the link to the linkIndex which lets us quickly // look up a channel when we need to close or register it, and // the forwarding index which'll be used when forwarding HTLC's // in the multi-hop setting. s.linkIndex[link.ChanID()] = link s.forwardingIndex[link.ShortChanID()] = link // Next we'll add the link to the interface index so we can // quickly look up all the channels for a particular node. peerPub := link.Peer().PubKey() if _, ok := s.interfaceIndex[peerPub]; !ok { s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink) } s.interfaceIndex[peerPub][link.ChanID()] = link } // GetLink is used to initiate the handling of the get link command. The // request will be propagated/handled to/in the main goroutine. func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelLink, error) { s.indexMtx.RLock() defer s.indexMtx.RUnlock() return s.getLink(chanID) } // getLink returns the link stored in either the pending index or the live // lindex. func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) { link, ok := s.linkIndex[chanID] if !ok { link, ok = s.pendingLinkIndex[chanID] if !ok { return nil, ErrChannelLinkNotFound } } return link, nil } // getLinkByShortID attempts to return the link which possesses the target // short channel ID. // // NOTE: This MUST be called with the indexMtx held. func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) { link, ok := s.forwardingIndex[chanID] if !ok { return nil, ErrChannelLinkNotFound } return link, nil } // HasActiveLink returns true if the given channel ID has a link in the link // index AND the link is eligible to forward. func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool { s.indexMtx.RLock() defer s.indexMtx.RUnlock() if link, ok := s.linkIndex[chanID]; ok { return link.EligibleToForward() } return false } // RemoveLink purges the switch of any link associated with chanID. If a pending // or active link is not found, this method does nothing. Otherwise, the method // returns after the link has been completely shutdown. func (s *Switch) RemoveLink(chanID lnwire.ChannelID) { s.indexMtx.Lock() link := s.removeLink(chanID) s.indexMtx.Unlock() if link != nil { link.Stop() } } // removeLink is used to remove and stop the channel link. // // NOTE: This MUST be called with the indexMtx held. func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink { log.Infof("Removing channel link with ChannelID(%v)", chanID) link, err := s.getLink(chanID) if err != nil { return nil } // Remove the channel from live link indexes. delete(s.pendingLinkIndex, link.ChanID()) delete(s.linkIndex, link.ChanID()) delete(s.forwardingIndex, link.ShortChanID()) // If the link has been added to the peer index, then we'll move to // delete the entry within the index. peerPub := link.Peer().PubKey() if peerIndex, ok := s.interfaceIndex[peerPub]; ok { delete(peerIndex, link.ChanID()) // If after deletion, there are no longer any links, then we'll // remove the interface map all together. if len(peerIndex) == 0 { delete(s.interfaceIndex, peerPub) } } return link } // UpdateShortChanID updates the short chan ID for an existing channel. This is // required in the case of a re-org and re-confirmation or a channel, or in the // case that a link was added to the switch before its short chan ID was known. func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error { s.indexMtx.Lock() defer s.indexMtx.Unlock() // Locate the target link in the pending link index. If no such link // exists, then we will ignore the request. link, ok := s.pendingLinkIndex[chanID] if !ok { return fmt.Errorf("link %v not found", chanID) } oldShortChanID := link.ShortChanID() // Try to update the link's short channel ID, returning early if this // update failed. shortChanID, err := link.UpdateShortChanID() if err != nil { return err } // Reject any blank short channel ids. if shortChanID == hop.Source { return fmt.Errorf("refusing trivial short_chan_id for chan_id=%v"+ "live link", chanID) } log.Infof("Updated short_chan_id for ChannelLink(%v): old=%v, new=%v", chanID, oldShortChanID, shortChanID) // Since the link was in the pending state before, we will remove it // from the pending link index and add it to the live link index so that // it can be available in forwarding. delete(s.pendingLinkIndex, chanID) s.addLiveLink(link) // Finally, alert the mail orchestrator to the change of short channel // ID, and deliver any unclaimed packets to the link. mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID) s.mailOrchestrator.BindLiveShortChanID( mailbox, chanID, shortChanID, ) return nil } // GetLinksByInterface fetches all the links connected to a particular node // identified by the serialized compressed form of its public key. func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelLink, error) { s.indexMtx.RLock() defer s.indexMtx.RUnlock() return s.getLinks(hop) } // getLinks is function which returns the channel links of the peer by hop // destination id. // // NOTE: This MUST be called with the indexMtx held. func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) { links, ok := s.interfaceIndex[destination] if !ok { return nil, ErrNoLinksFound } channelLinks := make([]ChannelLink, 0, len(links)) for _, link := range links { channelLinks = append(channelLinks, link) } return channelLinks, nil } // CircuitModifier returns a reference to subset of the interfaces provided by // the circuit map, to allow links to open and close circuits. func (s *Switch) CircuitModifier() CircuitModifier { return s.circuits } // commitCircuits persistently adds a circuit to the switch's circuit map. func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) ( *CircuitFwdActions, error) { return s.circuits.CommitCircuits(circuits...) } // openCircuits preemptively writes the keystones for Adds that are about to be // added to a commitment txn. func (s *Switch) openCircuits(keystones ...Keystone) error { return s.circuits.OpenCircuits(keystones...) } // deleteCircuits persistently removes the circuit, and keystone if present, // from the circuit map. func (s *Switch) deleteCircuits(inKeys ...CircuitKey) error { return s.circuits.DeleteCircuits(inKeys...) } // FlushForwardingEvents flushes out the set of pending forwarding events to // the persistent log. This will be used by the switch to periodically flush // out the set of forwarding events to disk. External callers can also use this // method to ensure all data is flushed to dis before querying the log. func (s *Switch) FlushForwardingEvents() error { // First, we'll obtain a copy of the current set of pending forwarding // events. s.fwdEventMtx.Lock() // If we won't have any forwarding events, then we can exit early. if len(s.pendingFwdingEvents) == 0 { s.fwdEventMtx.Unlock() return nil } events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents)) copy(events[:], s.pendingFwdingEvents[:]) // With the copy obtained, we can now clear out the header pointer of // the current slice. This way, we can re-use the underlying storage // allocated for the slice. s.pendingFwdingEvents = s.pendingFwdingEvents[:0] s.fwdEventMtx.Unlock() // Finally, we'll write out the copied events to the persistent // forwarding log. return s.cfg.FwdingLog.AddForwardingEvents(events) } // BestHeight returns the best height known to the switch. func (s *Switch) BestHeight() uint32 { return atomic.LoadUint32(&s.bestHeight) }