package routing import ( "sync" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/multimutex" "github.com/lightningnetwork/lnd/queue" ) // ControlTower tracks all outgoing payments made, whose primary purpose is to // prevent duplicate payments to the same payment hash. In production, a // persistent implementation is preferred so that tracking can survive across // restarts. Payments are transitioned through various payment states, and the // ControlTower interface provides access to driving the state transitions. type ControlTower interface { // This method checks that no suceeded payment exist for this payment // hash. InitPayment(lntypes.Hash, *channeldb.PaymentCreationInfo) error // RegisterAttempt atomically records the provided HTLCAttemptInfo. RegisterAttempt(lntypes.Hash, *channeldb.HTLCAttemptInfo) error // SettleAttempt marks the given attempt settled with the preimage. If // this is a multi shard payment, this might implicitly mean the the // full payment succeeded. // // After invoking this method, InitPayment should always return an // error to prevent us from making duplicate payments to the same // payment hash. The provided preimage is atomically saved to the DB // for record keeping. SettleAttempt(lntypes.Hash, uint64, *channeldb.HTLCSettleInfo) error // FailAttempt marks the given payment attempt failed. FailAttempt(lntypes.Hash, uint64, *channeldb.HTLCFailInfo) error // FetchPayment fetches the payment corresponding to the given payment // hash. FetchPayment(paymentHash lntypes.Hash) (*channeldb.MPPayment, error) // Fail transitions a payment into the Failed state, and records the // ultimate reason the payment failed. Note that this should only be // called when all active active attempts are already failed. After // invoking this method, InitPayment should return nil on its next call // for this payment hash, allowing the user to make a subsequent // payment. Fail(lntypes.Hash, channeldb.FailureReason) error // FetchInFlightPayments returns all payments with status InFlight. FetchInFlightPayments() ([]*channeldb.InFlightPayment, error) // SubscribePayment subscribes to updates for the payment with the given // hash. A first update with the current state of the payment is always // sent out immediately. SubscribePayment(paymentHash lntypes.Hash) (*ControlTowerSubscriber, error) } // ControlTowerSubscriber contains the state for a payment update subscriber. type ControlTowerSubscriber struct { // Updates is the channel over which *channeldb.MPPayment updates can be // received. Updates <-chan interface{} queue *queue.ConcurrentQueue quit chan struct{} } // newControlTowerSubscriber instantiates a new subscriber state object. func newControlTowerSubscriber() *ControlTowerSubscriber { // Create a queue for payment updates. queue := queue.NewConcurrentQueue(20) queue.Start() return &ControlTowerSubscriber{ Updates: queue.ChanOut(), queue: queue, quit: make(chan struct{}), } } // Close signals that the subscriber is no longer interested in updates. func (s *ControlTowerSubscriber) Close() { // Close quit channel so that any pending writes to the queue are // cancelled. close(s.quit) // Stop the queue goroutine so that it won't leak. s.queue.Stop() } // controlTower is persistent implementation of ControlTower to restrict // double payment sending. type controlTower struct { db *channeldb.PaymentControl subscribers map[lntypes.Hash][]*ControlTowerSubscriber subscribersMtx sync.Mutex // paymentsMtx provides synchronization on the payment level to ensure // that no race conditions occur in between updating the database and // sending a notification. paymentsMtx *multimutex.HashMutex } // NewControlTower creates a new instance of the controlTower. func NewControlTower(db *channeldb.PaymentControl) ControlTower { return &controlTower{ db: db, subscribers: make(map[lntypes.Hash][]*ControlTowerSubscriber), paymentsMtx: multimutex.NewHashMutex(), } } // InitPayment checks or records the given PaymentCreationInfo with the DB, // making sure it does not already exist as an in-flight payment. Then this // method returns successfully, the payment is guranteeed to be in the InFlight // state. func (p *controlTower) InitPayment(paymentHash lntypes.Hash, info *channeldb.PaymentCreationInfo) error { return p.db.InitPayment(paymentHash, info) } // RegisterAttempt atomically records the provided HTLCAttemptInfo to the // DB. func (p *controlTower) RegisterAttempt(paymentHash lntypes.Hash, attempt *channeldb.HTLCAttemptInfo) error { p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) payment, err := p.db.RegisterAttempt(paymentHash, attempt) if err != nil { return err } // Notify subscribers of the attempt registration. p.notifySubscribers(paymentHash, payment) return nil } // SettleAttempt marks the given attempt settled with the preimage. If // this is a multi shard payment, this might implicitly mean the the // full payment succeeded. func (p *controlTower) SettleAttempt(paymentHash lntypes.Hash, attemptID uint64, settleInfo *channeldb.HTLCSettleInfo) error { p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) payment, err := p.db.SettleAttempt(paymentHash, attemptID, settleInfo) if err != nil { return err } // Notify subscribers of success event. p.notifySubscribers(paymentHash, payment) return nil } // FailAttempt marks the given payment attempt failed. func (p *controlTower) FailAttempt(paymentHash lntypes.Hash, attemptID uint64, failInfo *channeldb.HTLCFailInfo) error { p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) payment, err := p.db.FailAttempt(paymentHash, attemptID, failInfo) if err != nil { return err } // Notify subscribers of failed attempt. p.notifySubscribers(paymentHash, payment) return nil } // FetchPayment fetches the payment corresponding to the given payment hash. func (p *controlTower) FetchPayment(paymentHash lntypes.Hash) ( *channeldb.MPPayment, error) { return p.db.FetchPayment(paymentHash) } // Fail transitions a payment into the Failed state, and records the reason the // payment failed. After invoking this method, InitPayment should return nil on // its next call for this payment hash, allowing the switch to make a // subsequent payment. func (p *controlTower) Fail(paymentHash lntypes.Hash, reason channeldb.FailureReason) error { p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) payment, err := p.db.Fail(paymentHash, reason) if err != nil { return err } // Notify subscribers of fail event. p.notifySubscribers(paymentHash, payment) return nil } // FetchInFlightPayments returns all payments with status InFlight. func (p *controlTower) FetchInFlightPayments() ([]*channeldb.InFlightPayment, error) { return p.db.FetchInFlightPayments() } // SubscribePayment subscribes to updates for the payment with the given hash. A // first update with the current state of the payment is always sent out // immediately. func (p *controlTower) SubscribePayment(paymentHash lntypes.Hash) ( *ControlTowerSubscriber, error) { // Take lock before querying the db to prevent missing or duplicating an // update. p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) payment, err := p.db.FetchPayment(paymentHash) if err != nil { return nil, err } subscriber := newControlTowerSubscriber() // Always write current payment state to the channel. subscriber.queue.ChanIn() <- payment // Payment is currently in flight. Register this subscriber for further // updates. Otherwise this update is the final update and the incoming // channel can be closed. This will close the queue's outgoing channel // when all updates have been written. if payment.Status == channeldb.StatusInFlight { p.subscribersMtx.Lock() p.subscribers[paymentHash] = append( p.subscribers[paymentHash], subscriber, ) p.subscribersMtx.Unlock() } else { close(subscriber.queue.ChanIn()) } return subscriber, nil } // notifySubscribers sends a final payment event to all subscribers of this // payment. The channel will be closed after this. Note that this function must // be executed atomically (by means of a lock) with the database update to // guarantuee consistency of the notifications. func (p *controlTower) notifySubscribers(paymentHash lntypes.Hash, event *channeldb.MPPayment) { // Get all subscribers for this payment. p.subscribersMtx.Lock() list, ok := p.subscribers[paymentHash] if !ok { p.subscribersMtx.Unlock() return } // If the payment reached a terminal state, the subscriber list can be // cleared. There won't be any more updates. terminal := event.Status != channeldb.StatusInFlight if terminal { delete(p.subscribers, paymentHash) } p.subscribersMtx.Unlock() // Notify all subscribers of the event. for _, subscriber := range list { select { case subscriber.queue.ChanIn() <- event: // If this event is the last, close the incoming channel // of the queue. This will signal the subscriber that // there won't be any more updates. if terminal { close(subscriber.queue.ChanIn()) } // If subscriber disappeared, skip notification. For further // notifications, we'll keep skipping over this subscriber. case <-subscriber.quit: } } }