diff --git a/channeldb/payments.go b/channeldb/payments.go index 2c134ef9..fff9fcdc 100644 --- a/channeldb/payments.go +++ b/channeldb/payments.go @@ -573,6 +573,48 @@ func fetchPayment(bucket *bbolt.Bucket) (*Payment, error) { return p, nil } +// DeletePayments deletes all completed and failed payments from the DB. +func (db *DB) DeletePayments() error { + return db.Update(func(tx *bbolt.Tx) error { + payments := tx.Bucket(paymentsRootBucket) + if payments == nil { + return nil + } + + var deleteBuckets [][]byte + err := payments.ForEach(func(k, _ []byte) error { + bucket := payments.Bucket(k) + if bucket == nil { + // We only expect sub-buckets to be found in + // this top-level bucket. + return fmt.Errorf("non bucket element in " + + "payments bucket") + } + + // If the status is InFlight, we cannot safely delete + // the payment information, so we return early. + paymentStatus := fetchPaymentStatus(bucket) + if paymentStatus == StatusInFlight { + return nil + } + + deleteBuckets = append(deleteBuckets, k) + return nil + }) + if err != nil { + return err + } + + for _, k := range deleteBuckets { + if err := payments.DeleteBucket(k); err != nil { + return err + } + } + + return nil + }) +} + func serializePaymentCreationInfo(w io.Writer, c *PaymentCreationInfo) error { var scratch [8]byte diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 3bcf64d7..3c72958d 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -147,6 +147,15 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) { log.Debugf("Payment %x succeeded with pid=%v", p.payment.PaymentHash, p.attempt.PaymentID) + // In case of success we atomically store the db payment and + // move the payment to the success state. + err = p.router.cfg.Control.Success(p.payment.PaymentHash, result.Preimage) + if err != nil { + log.Errorf("Unable to succeed payment "+ + "attempt: %v", err) + return [32]byte{}, nil, err + } + // Terminal state, return the preimage and the route // taken. return result.Preimage, &p.attempt.Route, nil @@ -165,6 +174,15 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID, // attempt short. select { case <-p.timeoutChan: + // Mark the payment as failed because of the + // timeout. + err := p.router.cfg.Control.Fail( + p.payment.PaymentHash, + ) + if err != nil { + return lnwire.ShortChannelID{}, nil, err + } + errStr := fmt.Sprintf("payment attempt not completed " + "before timeout") @@ -186,6 +204,16 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID, p.payment, uint32(p.currentHeight), p.finalCLTVDelta, ) if err != nil { + // If we're unable to successfully make a payment using + // any of the routes we've found, then mark the payment + // as permanently failed. + saveErr := p.router.cfg.Control.Fail( + p.payment.PaymentHash, + ) + if saveErr != nil { + return lnwire.ShortChannelID{}, nil, saveErr + } + // If there was an error already recorded for this // payment, we'll return that. if p.lastError != nil { @@ -250,6 +278,17 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID, Route: *route, } + // Before sending this HTLC to the switch, we checkpoint the + // fresh paymentID and route to the DB. This lets us know on + // startup the ID of the payment that we attempted to send, + // such that we can query the Switch for its whereabouts. The + // route is needed to handle the result when it eventually + // comes back. + err = p.router.cfg.Control.RegisterAttempt(p.payment.PaymentHash, p.attempt) + if err != nil { + return lnwire.ShortChannelID{}, nil, err + } + return firstHop, htlcAdd, nil } @@ -295,6 +334,16 @@ func (p *paymentLifecycle) handleSendError(sendErr error) error { log.Errorf("Payment %x failed with final outcome: %v", p.payment.PaymentHash, sendErr) + // Mark the payment failed with no route. + // TODO(halseth): make payment codes for the actual reason we + // don't continue path finding. + err := p.router.cfg.Control.Fail( + p.payment.PaymentHash, + ) + if err != nil { + return err + } + // Terminal state, return the error we encountered. return sendErr } diff --git a/routing/router.go b/routing/router.go index 88bbb7bc..f2d9e9ba 100644 --- a/routing/router.go +++ b/routing/router.go @@ -199,6 +199,10 @@ type Config struct { // their results. Payer PaymentAttemptDispatcher + // Control keeps track of the status of ongoing payments, ensuring we + // can properly resume them across restarts. + Control channeldb.ControlTower + // ChannelPruneExpiry is the duration used to determine if a channel // should be pruned or not. If the delta between now and when the // channel was last updated is greater than ChannelPruneExpiry, then @@ -1551,7 +1555,23 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *route return [32]byte{}, nil, err } - return r.sendPayment(payment, paySession) + // Record this payment hash with the ControlTower, ensuring it is not + // already in-flight. + info := &channeldb.PaymentCreationInfo{ + PaymentHash: payment.PaymentHash, + Value: payment.Amount, + CreationDate: time.Now(), + PaymentRequest: nil, + } + + err = r.cfg.Control.InitPayment(payment.PaymentHash, info) + if err != nil { + return [32]byte{}, nil, err + } + + // Since this is the first time this payment is being made, we pass nil + // for the existing attempt. + return r.sendPayment(nil, payment, paySession) } // SendToRoute attempts to send a payment with the given hash through the @@ -1569,8 +1589,23 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) ( PaymentHash: hash, } - preimage, _, err := r.sendPayment(payment, paySession) + // Record this payment hash with the ControlTower, ensuring it is not + // already in-flight. + info := &channeldb.PaymentCreationInfo{ + PaymentHash: payment.PaymentHash, + Value: payment.Amount, + CreationDate: time.Now(), + PaymentRequest: nil, + } + err := r.cfg.Control.InitPayment(payment.PaymentHash, info) + if err != nil { + return [32]byte{}, err + } + + // Since this is the first time this payment is being made, we pass nil + // for the existing attempt. + preimage, _, err := r.sendPayment(nil, payment, paySession) return preimage, err } @@ -1581,8 +1616,19 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) ( // will be returned which describes the path the successful payment traversed // within the network to reach the destination. Additionally, the payment // preimage will also be returned. -func (r *ChannelRouter) sendPayment(payment *LightningPayment, - paySession *paymentSession) ([32]byte, *route.Route, error) { +// +// The existing attempt argument should be set to nil if this is a payment that +// haven't had any payment attempt sent to the switch yet. If it has had an +// attempt already, it should be passed such that the result can be retrieved. +// +// This method relies on the ControlTower's internal payment state machine to +// carry out its execution. After restarts it is safe, and assumed, that the +// router will call this method for every payment still in-flight according to +// the ControlTower. +func (r *ChannelRouter) sendPayment( + existingAttempt *channeldb.PaymentAttemptInfo, + payment *LightningPayment, paySession *paymentSession) ( + [32]byte, *route.Route, error) { log.Tracef("Dispatching route for lightning payment: %v", newLogClosure(func() string { @@ -1627,6 +1673,7 @@ func (r *ChannelRouter) sendPayment(payment *LightningPayment, timeoutChan: timeoutChan, currentHeight: currentHeight, finalCLTVDelta: finalCLTVDelta, + attempt: existingAttempt, circuit: nil, lastError: nil, } diff --git a/routing/router_test.go b/routing/router_test.go index 9dc0eed1..05811c68 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -50,6 +50,7 @@ func (c *testCtx) RestartRouter() error { Chain: c.chain, ChainView: c.chainView, Payer: &mockPaymentAttemptDispatcher{}, + Control: makeMockControlTower(), ChannelPruneExpiry: time.Hour * 24, GraphPruneInterval: time.Hour * 2, }) @@ -88,6 +89,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr Chain: chain, ChainView: chainView, Payer: &mockPaymentAttemptDispatcher{}, + Control: makeMockControlTower(), ChannelPruneExpiry: time.Hour * 24, GraphPruneInterval: time.Hour * 2, QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { @@ -1528,6 +1530,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) { Chain: ctx.chain, ChainView: ctx.chainView, Payer: &mockPaymentAttemptDispatcher{}, + Control: makeMockControlTower(), ChannelPruneExpiry: time.Hour * 24, GraphPruneInterval: time.Hour * 2, }) @@ -2490,3 +2493,30 @@ func assertChannelsPruned(t *testing.T, graph *channeldb.ChannelGraph, } } } + +type mockControlTower struct{} + +var _ channeldb.ControlTower = (*mockControlTower)(nil) + +func makeMockControlTower() *mockControlTower { + return &mockControlTower{} +} + +func (m *mockControlTower) InitPayment(lntypes.Hash, + *channeldb.PaymentCreationInfo) error { + return nil +} + +func (m *mockControlTower) RegisterAttempt(lntypes.Hash, + *channeldb.PaymentAttemptInfo) error { + return nil +} + +func (m *mockControlTower) Success(paymentHash lntypes.Hash, + preimg lntypes.Preimage) error { + return nil +} + +func (m *mockControlTower) Fail(paymentHash lntypes.Hash) error { + return nil +} diff --git a/rpcserver.go b/rpcserver.go index a98845df..3678978d 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2,7 +2,6 @@ package lnd import ( "bytes" - "crypto/sha256" "crypto/tls" "encoding/hex" "errors" @@ -2735,33 +2734,6 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription, } } -// savePayment saves a successfully completed payment to the database for -// historical record keeping. -func (r *rpcServer) savePayment(route *route.Route, - amount lnwire.MilliSatoshi, preImage []byte) error { - - paymentPath := make([][33]byte, len(route.Hops)) - for i, hop := range route.Hops { - hopPub := hop.PubKeyBytes - copy(paymentPath[i][:], hopPub[:]) - } - - payment := &channeldb.OutgoingPayment{ - Invoice: channeldb.Invoice{ - Terms: channeldb.ContractTerm{ - Value: amount, - }, - CreationDate: time.Now(), - }, - Path: paymentPath, - Fee: route.TotalFees(), - TimeLockLength: route.TotalTimeLock, - } - copy(payment.PaymentPreimage[:], preImage) - - return r.server.chanDB.AddPayment(payment) -} - // validatePayReqExpiry checks if the passed payment request has expired. In // the case it has expired, an error will be returned. func validatePayReqExpiry(payReq *zpay32.Invoice) error { @@ -3123,18 +3095,6 @@ func (r *rpcServer) dispatchPaymentIntent( }, nil } - // Calculate amount paid to receiver. - amt := route.TotalAmount - route.TotalFees() - - // Save the completed payment to the database for record keeping - // purposes. - err := r.savePayment(route, amt, preImage[:]) - if err != nil { - // We weren't able to save the payment, so we return the save - // err, but a nil routing err. - return nil, err - } - return &paymentIntentResponse{ Route: route, Preimage: preImage, @@ -4146,8 +4106,8 @@ func (r *rpcServer) ListPayments(ctx context.Context, rpcsLog.Debugf("[ListPayments]") - payments, err := r.server.chanDB.FetchAllPayments() - if err != nil && err != channeldb.ErrNoPaymentsCreated { + payments, err := r.server.chanDB.FetchPayments() + if err != nil { return nil, err } @@ -4155,24 +4115,37 @@ func (r *rpcServer) ListPayments(ctx context.Context, Payments: make([]*lnrpc.Payment, len(payments)), } for i, payment := range payments { - path := make([]string, len(payment.Path)) - for i, hop := range payment.Path { - path[i] = hex.EncodeToString(hop[:]) + // If a payment attempt has been made we can fetch the route. + // Otherwise we'll just populate the RPC response with an empty + // one. + var route route.Route + if payment.Attempt != nil { + route = payment.Attempt.Route + } + path := make([]string, len(route.Hops)) + for i, hop := range route.Hops { + path[i] = hex.EncodeToString(hop.PubKeyBytes[:]) } - msatValue := int64(payment.Terms.Value) - satValue := int64(payment.Terms.Value.ToSatoshis()) + // If this payment is settled, the preimage will be available. + var preimage lntypes.Preimage + if payment.PaymentPreimage != nil { + preimage = *payment.PaymentPreimage + } - paymentHash := sha256.Sum256(payment.PaymentPreimage[:]) + msatValue := int64(payment.Info.Value) + satValue := int64(payment.Info.Value.ToSatoshis()) + + paymentHash := payment.Info.PaymentHash paymentsResp.Payments[i] = &lnrpc.Payment{ PaymentHash: hex.EncodeToString(paymentHash[:]), Value: satValue, ValueMsat: msatValue, ValueSat: satValue, - CreationDate: payment.CreationDate.Unix(), + CreationDate: payment.Info.CreationDate.Unix(), Path: path, - Fee: int64(payment.Fee.ToSatoshis()), - PaymentPreimage: hex.EncodeToString(payment.PaymentPreimage[:]), + Fee: int64(route.TotalFees().ToSatoshis()), + PaymentPreimage: hex.EncodeToString(preimage[:]), } } @@ -4185,7 +4158,7 @@ func (r *rpcServer) DeleteAllPayments(ctx context.Context, rpcsLog.Debugf("[DeleteAllPayments]") - if err := r.server.chanDB.DeleteAllPayments(); err != nil { + if err := r.server.chanDB.DeletePayments(); err != nil { return nil, err } diff --git a/server.go b/server.go index cc2033ad..921e0475 100644 --- a/server.go +++ b/server.go @@ -621,6 +621,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, Chain: cc.chainIO, ChainView: cc.chainView, Payer: s.htlcSwitch, + Control: channeldb.NewPaymentControl(chanDB), ChannelPruneExpiry: routing.DefaultChannelPruneExpiry, GraphPruneInterval: time.Duration(time.Hour), QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {